data.csv 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
员工ID,部门,工作年限,学历,沟通能力,团队协作,问题解决,学习能力,创新能力,能力得分
1,技术部,1,本科,70,80,75,85,60,75
2,技术部,2,本科,72,82,77,87,62,77
3,技术部,3,硕士,75,85,80,90,65,80
4,市场部,1,大专,60,70,65,75,50,65
5,市场部,2,本科,62,72,67,77,52,67
6,市场部,3,本科,65,75,70,80,55,70
7,财务部,1,大专,65,70,75,80,60,70
8,财务部,2,本科,67,72,77,82,62,72
9,财务部,3,本科,70,75,80,85,65,75
10,人力资源部,1,本科,70,75,80,85,70,78
11,人力资源部,2,本科,72,77,82,87,72,80
12,人力资源部,3,硕士,75,80,85,90,75,83

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
'''
Author: Reisen7
Date: 2025-03-11 18:00:09
LastEditTime: 2025-03-11 18:27:20
'''
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import numpy as np
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

# 设置 HADOOP_HOME 和 hadoop.home.dir 环境变量
os.environ["HADOOP_HOME"] = os.path.join(BASE_DIR, "hadoop-3.3.5")
os.environ["hadoop.home.dir"] = os.path.join(BASE_DIR, "hadoop-3.3.5")
os.environ['PYSPARK_PYTHON'] = r'F:/usr/anaconda3/envs/hadoop_env/python.exe' #写你自己的路径

# 创建 SparkSession
spark = SparkSession.builder \
.appName("TalentAnalysis") \
.config("spark.driver.extraClassPath", "mysql-connector-java-8.0.32.jar") \
.config("spark.executor.extraClassPath", "mysql-connector-java-8.0.32.jar") \
.config("spark.python.executable", "F:/usr/anaconda3/envs/hadoop_env/python.exe") \
.getOrCreate()

# 读取数据,假设数据为 CSV 格式
data = spark.read.csv("data.csv", header=True, inferSchema=True)

# 数据库连接配置
jdbc_url = "jdbc:mysql://localhost:3306/talent_capability_analysis?useUnicode=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8"
connection_properties = {
"user": "root",
"password": "123456",
"driver": "com.mysql.cj.jdbc.Driver"
}

# 1. 不同部门员工平均能力得分(柱状图)
department_avg_score = data.groupBy("部门") \
.agg(F.avg("能力得分").alias("avg_score"))
department_avg_score.write.jdbc(url=jdbc_url, table="department_avg_score", mode="overwrite", properties=connection_properties)

# 2. 员工能力得分随工作年限变化(折线图)
score_by_work_years = data.groupBy("工作年限") \
.agg(F.avg("能力得分").alias("avg_score"))
score_by_work_years.write.jdbc(url=jdbc_url, table="score_by_work_years", mode="overwrite", properties=connection_properties)

# 3. 不同学历员工占比(饼图)
total_count = data.count()
education_ratio = data.groupBy("学历") \
.agg(F.count("*").alias("count")) \
.withColumn("ratio", F.col("count") / total_count) \
.select("学历", "ratio")
education_ratio.write.jdbc(url=jdbc_url, table="education_ratio", mode="overwrite", properties=connection_properties)

# 4. 员工各项能力得分分布(雷达图)
ability_columns = ["沟通能力", "团队协作", "问题解决", "学习能力", "创新能力"]
ability_distribution = []
for ability in ability_columns:
avg_score = data.agg(F.avg(ability)).collect()[0][0]
ability_distribution.append((ability, avg_score))

ability_distribution_df = spark.createDataFrame(ability_distribution, ["ability", "avg_score"])
ability_distribution_df.write.jdbc(url=jdbc_url, table="ability_distribution", mode="overwrite", properties=connection_properties)

# 5. 员工工作经验与能力得分关系(散点图)
experience_score_relation = data.select("工作年限", "能力得分")
experience_score_relation.write.jdbc(url=jdbc_url, table="experience_score_relation", mode="overwrite", properties=connection_properties)

# 6. 不同部门员工能力得分分布(箱线图)
def calculate_boxplot_stats(score_list):
score_array = np.array(score_list)
return (
float(np.min(score_array)),
float(np.percentile(score_array, 25)),
float(np.percentile(score_array, 50)),
float(np.percentile(score_array, 75)),
float(np.max(score_array))
)


udf_calculate_boxplot_stats = F.udf(calculate_boxplot_stats, "struct<minimum:float,q1:float,median:float,q3:float,maximum:float>")

department_score_distribution = data.groupBy("部门") \
.agg(F.collect_list("能力得分").alias("score_list")) \
.withColumn("boxplot_stats", udf_calculate_boxplot_stats(F.col("score_list"))) \
.select("部门",
F.col("boxplot_stats.minimum").alias("minimum"),
F.col("boxplot_stats.q1").alias("q1"),
F.col("boxplot_stats.median").alias("median"),
F.col("boxplot_stats.q3").alias("q3"),
F.col("boxplot_stats.maximum").alias("maximum"))

department_score_distribution.write.jdbc(url=jdbc_url, table="department_score_distribution", mode="overwrite", properties=connection_properties)

# 停止 SparkSession
spark.stop()