离线计算的情况,我们很少使用读取检查点的方式,如果有需要可以参考下面的方式完成
# 由于pyspark没有提供直接的api让我们完成,我们需要借助SparkContext._checkpointFile来完成
# SparkContext 源码地址: https://spark.apache.org/docs/3.1.1/api/python/_modules/pyspark/context.html
# 这个里面需要我们关注2个方法: 1. _checkpointFile 2. _do_init方法 这2个方法没有直接对外暴露但是我们可以使用
# 看下下面的示例代码,这个示例代码仅仅是简单封装,实际可以根据业务情况做灵活调整
from typing import Union
from pyspark import SparkContext,RDD
from pyspark.sql import SparkSession,DataFrame
def read_checkpoint(sc: Union[SparkContext, SparkSession], path=None, return_type: str = 'rdd', *args, **kwargs)->Union[RDD,DataFrame]:
"""
读取检查点路径,并返回一个rdd或者dataframe对象
:param sc: 可以是SparkSession或者SparkContext对象
:param path: 检查点的路径, 可以通过rdd.getCheckpointFile 获取rdd的检查点文件路径,注意这个是一个目录
:param return_type: rdd or df rdd表示RDD 类型, df表示DataFrame
:param args: 保留 目前不使用
:param kwargs: 保留 方便拓展功能使用
:return: 与 return_type参数指定的类型一致
"""
if isinstance(sc, (SparkContext, SparkSession)):
sc: SparkContext = sc.sparkContext if isinstance(sc, SparkSession) else sc
else:
raise ValueError('sc must be SparkContext or SparkSession instance!')
# 注意这里我们直接使用 SparkContext中的创建好的serializer 这个我们可以从_do_init
rdd = sc._checkpointFile(path, sc.serializer)
if return_type == 'rdd':
return rdd
if return_type == 'df':
return rdd.toDF()
# 创建SparkSession 示例
spark = SparkSession \
.builder \
.master("local[*]") \
.appName("read_checkpoint") \
.config("spark.sql.shuffle,partitions", "4") \
.config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse") \
.config("spark.hive.metastore.uris", "thrift://node1:9083") \
.enableHiveSupport() \
.getOrCreate()
# 设置检查点路径,这里设置的是一个本地路径, 实际使用是hdfs路径
spark.sparkContext.setCheckpointDir('ck')
# 指定你读取的一个表名字
df = spark.table('table_name')
# 注意虽然 df提供了checkpoint 但是不能保证我们直接从文件读取,我们需要借助rdd.checkpoint() 来完成
rdd = df.rdd
rdd.checkpoint()
# 执行action 算子 触发检查点
print(rdd.collect())
# 获取检查点的存储路径,这个路径就是我们后面用于读取使用的
ck_df_path = rdd.getCheckpointFile() # 实际使用ck_path可能记录到数据库或者文件中,方便我们读取并用于恢复
# 下面是读取检查点并恢复df的过程示例代码
spark.sparkContext.read_checkpoint = read_checkpoint # 偷懒的方式,我们方法设置为sparkContext一个属性
# 读取检查点数据
# spark.sparkContext.read_checkpoint(spark, path, return_type='df') 不设置属性的读取方式
new_df = spark.sparkContext.read_checkpoint(ck_df_path,'df')
new_df.show()
spark.stop()
0