添加依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<!-- Spark Core and SQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.4.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.1.8</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
<version>3.1.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.4.1</version>
<exclusions>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
</exclusions>
</dependency>

添加配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
spark:
app:
name: ECommerceAnalysis
home: 127.0.0.1
master:
uri: spark://my.hadoop.cn:7077
driver:
memory: 450m
maxResultSize: 450m
worker:
memory: 450m
executor:
cores: 1
memory: 450m
heartbeatInterval: 1000000
num:
executors: 1
network:
timeout: 1474830
rpc:
message:
maxSize: 1024

添加spark的Spingboot配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Configuration
public class SparkConfig {

@Value("${spark.app.name}")
private String appName;
@Value("${spark.home}")
private String sparkHome;
@Value("${spark.master.uri}")
private String sparkMasterUri;
@Value("${spark.driver.memory}")
private String sparkDriverMemory;
@Value("${spark.worker.memory}")
private String sparkWorkerMemory;
@Value("${spark.executor.memory}")
private String sparkExecutorMemory;
@Value("${spark.executor.cores}")
private String sparkExecutorCores;
@Value("${spark.num.executors}")
private String sparkExecutorsNum;
@Value("${spark.network.timeout}")
private String networkTimeout;
@Value("${spark.executor.heartbeatInterval}")
private String heartbeatIntervalTime;
@Value("${spark.driver.maxResultSize}")
private String maxResultSize;
@Value("${spark.rpc.message.maxSize}")
private String sparkRpcMessageMaxSize;

@Bean
public SparkConf sparkConf() {
SparkConf sparkConf = new SparkConf()
.setAppName(appName)
.setMaster("local[*]")
.set("spark.driver.memory",sparkDriverMemory)
.set("spark.driver.maxResultSize",maxResultSize)
.set("spark.worker.memory",sparkWorkerMemory) //"26g"
.set("spark.executor.memory",sparkExecutorMemory)
.set("spark.executor.cores",sparkExecutorCores)
.set("spark.executor.heartbeatInterval",heartbeatIntervalTime)
.set("spark.num.executors",sparkExecutorsNum)
.set("spark.network.timeout",networkTimeout)
.set("spark.rpc.message.maxSize",sparkRpcMessageMaxSize);
// .set("spark.shuffle.memoryFraction","0") //默认0.2
return sparkConf;
}

@Bean
@ConditionalOnMissingBean(JavaSparkContext.class)
public JavaSparkContext javaSparkContext(){
return new JavaSparkContext(sparkConf());
}

@Bean
public SparkSession sparkSession(){
return SparkSession
.builder()
.sparkContext(javaSparkContext().sc())
.appName(appName)
.getOrCreate();
}
@Bean
public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer(){
return new PropertySourcesPlaceholderConfigurer();
}

}

spark的数据分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package my.io.hadoop.service;

import lombok.extern.log4j.Log4j;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.Properties;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.sum;

/**
* Author reisen7
* Date 2025/4/4 13:03
* Description
*/

@Slf4j
@Service
public class DataAnalysisService {

@Autowired
private SparkSession sparkSession;

@Value("${spring.datasource.url}")
private String jdbcUrl;

@Value("${spring.datasource.username}")
private String username;

@Value("${spring.datasource.password}")
private String password;

private static final int LIMIT = 100; // 可根据需求调整这个值

public void performAnalysis() {
Properties connectionProperties = new Properties();
connectionProperties.put("user", username);
connectionProperties.put("password", password);
log.info("连接: {} ", jdbcUrl);
log.info("开始分析数据");
// 读取 MySQL 数据
Dataset<Row> goodsDF = sparkSession.read()
.jdbc(jdbcUrl, "goods", connectionProperties);
Dataset<Row> orderGoodsDF = sparkSession.read()
.jdbc(jdbcUrl, "order_goods", connectionProperties);
Dataset<Row> tOderDF = sparkSession.read()
.jdbc(jdbcUrl, "t_order", connectionProperties);
Dataset<Row> userDF = sparkSession.read()
.jdbc(jdbcUrl, "t_user", connectionProperties);
Dataset<Row> cartDF = sparkSession.read()
.jdbc(jdbcUrl, "cart", connectionProperties);
Dataset<Row> collectDF = sparkSession.read()
.jdbc(jdbcUrl, "collect", connectionProperties);
Dataset<Row> categoryDF = sparkSession.read()
.jdbc(jdbcUrl, "category", connectionProperties);

// 维度 1:各商品分类的销售总量
Dataset<Row> categorySales = orderGoodsDF.join(goodsDF, orderGoodsDF.col("goods_id").equalTo(goodsDF.col("id")))
.join(categoryDF, goodsDF.col("category_id").equalTo(categoryDF.col("id")))
.groupBy(categoryDF.col("name"))
.agg(sum(orderGoodsDF.col("count")).alias("total_sales"))
.orderBy(col("total_sales").desc())
.limit(LIMIT);
categorySales.write()
.mode(SaveMode.Overwrite)
.jdbc(jdbcUrl, "spark_category_sales", connectionProperties);


// 维度 2:各用户的购物车商品总数
Dataset<Row> userCartTotal = cartDF.join(userDF, cartDF.col("user_id").equalTo(userDF.col("id")))
.groupBy(userDF.col("username"))
.agg(sum(cartDF.col("count")).alias("total_cart_items"))
.orderBy(col("total_cart_items").desc())
.limit(LIMIT);
userCartTotal.write()
.mode(SaveMode.Overwrite)
.jdbc(jdbcUrl, "spark_user_cart_total", connectionProperties);

// 维度 3:各商品的收藏次数
Dataset<Row> goodsCollectCount = collectDF.join(goodsDF ,collectDF.col("goods_id").equalTo(goodsDF.col("id")))
.groupBy(goodsDF.col("name"))
.agg(count(collectDF.col("id")).alias("collect_count"))
.orderBy(col("collect_count").desc())
.limit(LIMIT);
goodsCollectCount.write()
.mode(SaveMode.Overwrite)
.jdbc(jdbcUrl, "spark_goods_collect_count", connectionProperties);

// 维度 4:各用户的订单数量
Dataset<Row> userOrderCount = orderGoodsDF.join(tOderDF, orderGoodsDF.col("order_id").equalTo(tOderDF.col("id")))
.join(userDF, tOderDF.col("user_id").equalTo(userDF.col("id")))
.groupBy(userDF.col("username"))
.agg(count(orderGoodsDF.col("order_id")).alias("order_count"))
.orderBy(col("order_count").desc())
.limit(LIMIT);
userOrderCount.write()
.mode(SaveMode.Overwrite)
.jdbc(jdbcUrl, "spark_user_order_count", connectionProperties);

// 维度 5:各商品分类的商品数量
Dataset<Row> categoryGoodsCount = goodsDF.join(categoryDF, goodsDF.col("category_id").equalTo(categoryDF.col("id")))
.groupBy(categoryDF.col("name"))
.agg(count(goodsDF.col("id")).alias("goods_count"))
.orderBy(col("goods_count").desc())
.limit(LIMIT);
categoryGoodsCount.write()
.mode(SaveMode.Overwrite)
.jdbc(jdbcUrl, "spark_category_goods_count", connectionProperties);

// 维度 6:各用户的收藏商品数量
Dataset<Row> userCollectCount = collectDF.join(userDF, collectDF.col("user_id").equalTo(userDF.col("id")))
.groupBy(userDF.col("username"))
.agg(count(collectDF.col("id")).alias("collect_goods_count"))
.orderBy(col("collect_goods_count").desc())
.limit(LIMIT);
userCollectCount.write()
.mode(SaveMode.Overwrite)
.jdbc(jdbcUrl, "spark_user_collect_count", connectionProperties);
}
}