1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
| package my.io.hadoop.service;
import lombok.extern.log4j.Log4j; import lombok.extern.slf4j.Slf4j; import org.apache.spark.sql.*; import org.springframework.beans.factory.annotation.Autowired; import org.apache.spark.sql.SparkSession; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.util.Properties; import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.count; import static org.apache.spark.sql.functions.sum;
@Slf4j @Service public class DataAnalysisService {
@Autowired private SparkSession sparkSession;
@Value("${spring.datasource.url}") private String jdbcUrl;
@Value("${spring.datasource.username}") private String username;
@Value("${spring.datasource.password}") private String password;
private static final int LIMIT = 100;
public void performAnalysis() { Properties connectionProperties = new Properties(); connectionProperties.put("user", username); connectionProperties.put("password", password); log.info("连接: {} ", jdbcUrl); log.info("开始分析数据"); Dataset<Row> goodsDF = sparkSession.read() .jdbc(jdbcUrl, "goods", connectionProperties); Dataset<Row> orderGoodsDF = sparkSession.read() .jdbc(jdbcUrl, "order_goods", connectionProperties); Dataset<Row> tOderDF = sparkSession.read() .jdbc(jdbcUrl, "t_order", connectionProperties); Dataset<Row> userDF = sparkSession.read() .jdbc(jdbcUrl, "t_user", connectionProperties); Dataset<Row> cartDF = sparkSession.read() .jdbc(jdbcUrl, "cart", connectionProperties); Dataset<Row> collectDF = sparkSession.read() .jdbc(jdbcUrl, "collect", connectionProperties); Dataset<Row> categoryDF = sparkSession.read() .jdbc(jdbcUrl, "category", connectionProperties);
Dataset<Row> categorySales = orderGoodsDF.join(goodsDF, orderGoodsDF.col("goods_id").equalTo(goodsDF.col("id"))) .join(categoryDF, goodsDF.col("category_id").equalTo(categoryDF.col("id"))) .groupBy(categoryDF.col("name")) .agg(sum(orderGoodsDF.col("count")).alias("total_sales")) .orderBy(col("total_sales").desc()) .limit(LIMIT); categorySales.write() .mode(SaveMode.Overwrite) .jdbc(jdbcUrl, "spark_category_sales", connectionProperties);
Dataset<Row> userCartTotal = cartDF.join(userDF, cartDF.col("user_id").equalTo(userDF.col("id"))) .groupBy(userDF.col("username")) .agg(sum(cartDF.col("count")).alias("total_cart_items")) .orderBy(col("total_cart_items").desc()) .limit(LIMIT); userCartTotal.write() .mode(SaveMode.Overwrite) .jdbc(jdbcUrl, "spark_user_cart_total", connectionProperties);
Dataset<Row> goodsCollectCount = collectDF.join(goodsDF ,collectDF.col("goods_id").equalTo(goodsDF.col("id"))) .groupBy(goodsDF.col("name")) .agg(count(collectDF.col("id")).alias("collect_count")) .orderBy(col("collect_count").desc()) .limit(LIMIT); goodsCollectCount.write() .mode(SaveMode.Overwrite) .jdbc(jdbcUrl, "spark_goods_collect_count", connectionProperties);
Dataset<Row> userOrderCount = orderGoodsDF.join(tOderDF, orderGoodsDF.col("order_id").equalTo(tOderDF.col("id"))) .join(userDF, tOderDF.col("user_id").equalTo(userDF.col("id"))) .groupBy(userDF.col("username")) .agg(count(orderGoodsDF.col("order_id")).alias("order_count")) .orderBy(col("order_count").desc()) .limit(LIMIT); userOrderCount.write() .mode(SaveMode.Overwrite) .jdbc(jdbcUrl, "spark_user_order_count", connectionProperties);
Dataset<Row> categoryGoodsCount = goodsDF.join(categoryDF, goodsDF.col("category_id").equalTo(categoryDF.col("id"))) .groupBy(categoryDF.col("name")) .agg(count(goodsDF.col("id")).alias("goods_count")) .orderBy(col("goods_count").desc()) .limit(LIMIT); categoryGoodsCount.write() .mode(SaveMode.Overwrite) .jdbc(jdbcUrl, "spark_category_goods_count", connectionProperties);
Dataset<Row> userCollectCount = collectDF.join(userDF, collectDF.col("user_id").equalTo(userDF.col("id"))) .groupBy(userDF.col("username")) .agg(count(collectDF.col("id")).alias("collect_goods_count")) .orderBy(col("collect_goods_count").desc()) .limit(LIMIT); userCollectCount.write() .mode(SaveMode.Overwrite) .jdbc(jdbcUrl, "spark_user_collect_count", connectionProperties); } }
|