大数据

PySpark 离线读取检查点(checkpoin)目录

文 / sptk 来源 / 原创 阅读 / 24 1周前

离线计算的情况,我们很少使用读取检查点的方式,如果有需要可以参考下面的方式完成

# 由于pyspark没有提供直接的api让我们完成,我们需要借助SparkContext._checkpointFile来完成
# SparkContext 源码地址:  https://spark.apache.org/docs/3.1.1/api/python/_modules/pyspark/context.html
# 这个里面需要我们关注2个方法: 1. _checkpointFile 2. _do_init方法   这2个方法没有直接对外暴露但是我们可以使用
# 看下下面的示例代码,这个示例代码仅仅是简单封装,实际可以根据业务情况做灵活调整

from typing import Union
from pyspark import SparkContext,RDD
from pyspark.sql import SparkSession,DataFrame

def read_checkpoint(sc: Union[SparkContext, SparkSession], path=None, return_type: str = 'rdd', *args, **kwargs)->Union[RDD,DataFrame]:
    """
    读取检查点路径,并返回一个rdd或者dataframe对象
    :param sc: 可以是SparkSession或者SparkContext对象
    :param path: 检查点的路径, 可以通过rdd.getCheckpointFile 获取rdd的检查点文件路径,注意这个是一个目录
    :param return_type: rdd or df   rdd表示RDD 类型, df表示DataFrame
    :param args:  保留 目前不使用
    :param kwargs: 保留 方便拓展功能使用
    :return:  与 return_type参数指定的类型一致
    """
    if isinstance(sc, (SparkContext, SparkSession)):
        sc: SparkContext = sc.sparkContext if isinstance(sc, SparkSession) else sc
    else:
        raise ValueError('sc must be SparkContext or SparkSession instance!')
    # 注意这里我们直接使用 SparkContext中的创建好的serializer  这个我们可以从_do_init
    rdd = sc._checkpointFile(path, sc.serializer)
    if return_type == 'rdd':
        return rdd
    if return_type == 'df':
        return rdd.toDF()

    #  创建SparkSession 示例
    spark = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("read_checkpoint") \
        .config("spark.sql.shuffle,partitions", "4") \
        .config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse") \
        .config("spark.hive.metastore.uris", "thrift://node1:9083") \
        .enableHiveSupport() \
        .getOrCreate()

    # 设置检查点路径,这里设置的是一个本地路径, 实际使用是hdfs路径
    spark.sparkContext.setCheckpointDir('ck')  
    # 指定你读取的一个表名字
    df = spark.table('table_name') 
    # 注意虽然 df提供了checkpoint 但是不能保证我们直接从文件读取,我们需要借助rdd.checkpoint() 来完成
    rdd = df.rdd
    rdd.checkpoint()

    # 执行action 算子 触发检查点
    print(rdd.collect()) 

    #  获取检查点的存储路径,这个路径就是我们后面用于读取使用的
    ck_df_path = rdd.getCheckpointFile()   # 实际使用ck_path可能记录到数据库或者文件中,方便我们读取并用于恢复

    #  下面是读取检查点并恢复df的过程示例代码
    spark.sparkContext.read_checkpoint = read_checkpoint   # 偷懒的方式,我们方法设置为sparkContext一个属性

    # 读取检查点数据
    # spark.sparkContext.read_checkpoint(spark, path, return_type='df') 不设置属性的读取方式
    new_df = spark.sparkContext.read_checkpoint(ck_df_path,'df')
    new_df.show()

    spark.stop()

0

站点声明:站点主要用于个人技术文章。

冀ICP备19037883号
相关侵权、举报、投诉及建议等,请发E-mail:804330969@qq.com