sparkDataFrame笔记
read
# 读取文本文件
spark.read.text("path/to/text/file")
# 读取CSV文件
spark.read.csv("path/to/csv/file")
# 读取JSON文件
spark.read.json("path/to/json/file")
# 读取Parquet文件
spark.read.parquet("path/to/parquet/file")
# 读取jdbc
spark.read.jdbc(jdbcUrl, "表名", properties)
# 读取hive
spark.table("hive_table_name")
write
# 写入文本文件
df.write.text("path/to/output/text/file")
# 写入CSV文件
df.write.csv("path/to/output/csv/file")
# 写入JSON文件
df.write.json("path/to/output/json/file")
# 写入Parquet文件
df.write.parquet("path/to/output/parquet/file")
# 写入jdbc
df.write.jdbc(jdbcUrl, "表名", properties)
# 写入hive
df.write.saveAsTable("hive_table_name")
# 添加静态分区并写入hive
df.write.partitionBy("分区字段").saveAsTable("hive_table_name")
mode
# 前提:已存在数据
# 覆盖
SaveMode.Overwrite
# 追加
SaveMode.Append
# 忽略
SaveMode.Ignore
# 报错(默认)
SaveMode.ErrorIfExists
frame.write.mode(SaveMode.Overwrite)
JDBC
val properties = new Properties()
properties.setProperty("driver", "com.mysql.jdbc.Driver")
properties.setProperty("user", "用户名")
properties.setProperty("password", "密码")
val jdbcUrl = "jdbc:mysql://主机ip:3306/数据库名?useSSL=false"
show
df.show(100)
获取单个值
df.select("列名").collect()(x)(0)
# 第一个
df.select("列名").collect()(0)(0)
# 最后一个
df.select("列名").collect()(df.count().toInt-1)(0)
select
df.select("列名A", "列名A")
na.drop()
# 删除表中全部为NaN的行
df.na.drop("all")
# 删除表任一列中有NaN的行
df.na.drop("any")
dropDuplicates
df.dropDuplicates()
printSchema
df.printSchema()
filter
df.filter(df["age"] > 30)
col
df.select(col("列名"))
count
df.count()
join
# 内连接 返回df1与df2共有的记录
df1.join(df2, "column_name")
# 左连接 返回df1的所有记录,并将df2匹配的记录合并,没有匹配的记录为空值
df1.join(df2, "column_name", "left")
# 右连接 返回df2的所有记录,并将df1匹配的记录合并,没有匹配的记录为空值
df1.join(df2, "column_name", "right")
# 全外连接 返回df1与df2的所有记录,匹配则合并,未匹配则未空值
df1.join(df2, "column_name", "full_outer")
# 笛卡尔积 返回df1与df2中的所有可能的组合
df1.crossJoin(df2)
# 自连接 df1与df1自己连接
df1.as("df1").join(df1.as("df2"), $"df1.column_name" === $"df2.column_name")
# left_anti 返回df1中存在,但在df2中不存在的记录
df1.join(df2, "column_name", "left_anti")
# right_anti 返回df2中存在,但在df1中不存在的记录
df1.join(df2, "column_name", "right_anti")
union
# 删除重复行
df1.union(df2)
# 保留所有行
df1.unionAll(df2)
limit
df.limit(100)
sort
df.sort("列名")
df.sort(desc("列名")) # 倒序
order by
df.orderBy("列名")
df.orderBy(desc("列名")) # 倒序
df.orderBy(rand()) # 乱序
df.orderBy(rand()).orderBy(rand()) # 双重乱序
withColumn
df.withColumn("列名", lit("column对象"))
date_format
df.withColumn("列名", date_format(current_date(), "yyyyMMdd"))
y:年份的一部分。
M:月份的一部分。
d:月份中的一天。
H:一天中的小时(24小时制)。
h:一天中的小时(12小时制)。
m:小时中的分钟。
s:分钟中的秒。
S:毫秒。
常用格式
"yyyy-MM-dd" -> "2023-11-22"
"yyyy-MM-dd HH:mm:ss" -> "2023-11-22 14:30:45"
"MM/dd/yyyy" -> "11/22/2023"
"dd-MM-yyyy HH:mm:ss" -> "22-11-2023 14:30:45"
date_add
df.withColumn("列名", date_add(current_date(), 1))
date_sub
df.withColumn("列名", date_sub(current_date(), 1))
Timestamp
Timestamp.valueOf("日期格式字符串")
窗口函数
# 排名函数
row_number() # 排序
rank() # 排名,跳过重复
dense_rank() # 排名,不跳过重复
# 聚合函数
max() # 最大值
min() # 最小值
count() # 总数
sum() # 求和
avg() # 平均值
median() # 中位数
# 取值函数
percent_rank() # 百分比
lag() # 向前取值
lead() # 向后取值
first_value() # 第一行的值
last_value() # 分组内最后一行的值
nth_value() # 分组内第n行的值
# 分箱函数
ntile()
License:
CC BY 4.0