200字
sparkDataFrame笔记
2023-11-15
2024-04-11

read

# 读取文本文件
spark.read.text("path/to/text/file")
# 读取CSV文件
spark.read.csv("path/to/csv/file")
# 读取JSON文件
spark.read.json("path/to/json/file")
# 读取Parquet文件
spark.read.parquet("path/to/parquet/file")
# 读取jdbc
spark.read.jdbc(jdbcUrl, "表名", properties)
# 读取hive
spark.table("hive_table_name")

write

# 写入文本文件
df.write.text("path/to/output/text/file")
# 写入CSV文件
df.write.csv("path/to/output/csv/file")
# 写入JSON文件
df.write.json("path/to/output/json/file")
# 写入Parquet文件
df.write.parquet("path/to/output/parquet/file")
# 写入jdbc
df.write.jdbc(jdbcUrl, "表名", properties)
# 写入hive
df.write.saveAsTable("hive_table_name")
# 添加静态分区并写入hive
df.write.partitionBy("分区字段").saveAsTable("hive_table_name")

mode

# 前提:已存在数据

# 覆盖
SaveMode.Overwrite

# 追加
SaveMode.Append

# 忽略
SaveMode.Ignore

# 报错(默认)
SaveMode.ErrorIfExists

frame.write.mode(SaveMode.Overwrite)

JDBC

val properties = new Properties()
properties.setProperty("driver", "com.mysql.jdbc.Driver")
properties.setProperty("user", "用户名")
properties.setProperty("password", "密码")
val jdbcUrl = "jdbc:mysql://主机ip:3306/数据库名?useSSL=false"

show

df.show(100)

获取单个值

df.select("列名").collect()(x)(0)

# 第一个
df.select("列名").collect()(0)(0)
# 最后一个
df.select("列名").collect()(df.count().toInt-1)(0)

select

df.select("列名A", "列名A")

na.drop()

# 删除表中全部为NaN的行
df.na.drop("all")
# 删除表任一列中有NaN的行
df.na.drop("any")

dropDuplicates

df.dropDuplicates()

printSchema

df.printSchema()

filter

df.filter(df["age"] > 30)

col

df.select(col("列名"))

count

df.count()

join

# 内连接 返回df1与df2共有的记录
df1.join(df2, "column_name")

# 左连接 返回df1的所有记录,并将df2匹配的记录合并,没有匹配的记录为空值
df1.join(df2, "column_name", "left")

# 右连接 返回df2的所有记录,并将df1匹配的记录合并,没有匹配的记录为空值
df1.join(df2, "column_name", "right")

# 全外连接 返回df1与df2的所有记录,匹配则合并,未匹配则未空值
df1.join(df2, "column_name", "full_outer")

# 笛卡尔积 返回df1与df2中的所有可能的组合
df1.crossJoin(df2)

# 自连接 df1与df1自己连接
df1.as("df1").join(df1.as("df2"), $"df1.column_name" === $"df2.column_name")

# left_anti 返回df1中存在,但在df2中不存在的记录
df1.join(df2, "column_name", "left_anti")

# right_anti 返回df2中存在,但在df1中不存在的记录
df1.join(df2, "column_name", "right_anti")

union

# 删除重复行
df1.union(df2)

# 保留所有行
df1.unionAll(df2)

limit

df.limit(100)

sort

df.sort("列名")
df.sort(desc("列名")) # 倒序

order by

df.orderBy("列名")
df.orderBy(desc("列名")) # 倒序

df.orderBy(rand()) # 乱序
df.orderBy(rand()).orderBy(rand()) # 双重乱序

withColumn

df.withColumn("列名", lit("column对象"))

date_format

df.withColumn("列名", date_format(current_date(), "yyyyMMdd"))

y:年份的一部分。
M:月份的一部分。
d:月份中的一天。
H:一天中的小时(24小时制)。
h:一天中的小时(12小时制)。
m:小时中的分钟。
s:分钟中的秒。
S:毫秒。

常用格式
"yyyy-MM-dd" -> "2023-11-22"
"yyyy-MM-dd HH:mm:ss" -> "2023-11-22 14:30:45"
"MM/dd/yyyy" -> "11/22/2023"
"dd-MM-yyyy HH:mm:ss" -> "22-11-2023 14:30:45"

date_add

df.withColumn("列名", date_add(current_date(), 1))

date_sub

df.withColumn("列名", date_sub(current_date(), 1))

Timestamp

Timestamp.valueOf("日期格式字符串")

窗口函数

# 排名函数 
row_number() # 排序
rank() # 排名,跳过重复
dense_rank() # 排名,不跳过重复

# 聚合函数
max() # 最大值
min() # 最小值
count() # 总数
sum() # 求和
avg() # 平均值
median() # 中位数

# 取值函数
percent_rank() # 百分比
lag() # 向前取值
lead() # 向后取值
first_value() # 第一行的值
last_value() # 分组内最后一行的值
nth_value() # 分组内第n行的值

# 分箱函数
ntile()

sparkDataFrame笔记
作者
Administrator
发表于
2023-11-15
License
CC BY-NC-SA 4.0

评论