大数据

pyspark 操作sql表比较麻烦的问题

文 / sptk 来源 / 原创 阅读 / 26 1天前

1. 官方操作SQL方式

每次都需要通过read.jdbc读取,每次需要单独为每个表设置视图,这样我们后面才可以通过spark.sql来运行

spark = SparkSession.builder \
    .appName("appname") \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", 8) \
    .getOrCreate()
# jdbc url地址  里面替换自己SQL的地址与数据  dbname这里是你操作的数据库名称
jdbc_url = "jdbc:mysql://localhost:3306/dbname?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT&useSSL=false"
properties = {
    "user": "root",
    "password": "pwd",  # 数据库密码
    "driver": "com.mysql.cj.jdbc.Driver"  # 驱动  mysql8以上版本指定
}

# 读取表数据
df: DataFrame = spark.read.jdbc(jdbc_url, properties=properties,table='books')
# 创建视图名称
df.createTempView('books')
# 展示数据
df.show()

2. 这里编写一个mock代码, 这样我们仅仅设置一次SQL连接,后续直接通过 jdbc_table操作表即可 更方便

mock.py内容

# mock.py  这个是mock.py的内容
class Mock(object):
    """
    使用SparkSession操作jdbc连接访问数据库的时候,不能像spark本地库或者 spark on hive那么方便
    这里我们做个mock, 方便我们更好的操作表数据
    """

    def mock(self, spark: 'SparkSession'):
        """
        mock 为 SparkSession类增加对应的方法
        :param spark: SparkSession 注意是类不是初始化后的对象
        :return:
        """
        from pyspark.sql import SparkSession
        if spark is not SparkSession:
            raise ValueError('spark must be SparkSession class')

        def set_jdbc_properties(self, **kwargs) -> None:
            """
            用于设置jdbc连接属性, 属性参考对应j read.jdbc函数的使用
            :param self:
            :param kwargs:
            :return: None 没返回值
            """
            if not isinstance(kwargs, dict):
                raise ValueError("kwargs must be an dict type")

            #  检查相关的属性,其他属性作为辅助
            if not kwargs.get('url'):
                raise ValueError("args not contains jdbc url, please set it ")

            self.jdbc_properties = kwargs

        def jdbc_table(self: 'SparkSession', tb_name, view_name=None, db_name=None, **kwargs) -> 'DataFrame':
            import re, copy
            #  检查是否初始化过 jdbc properties 如果没有需要初始化下
            if not hasattr(self, 'jdbc_properties'):
                raise AttributeError('raise Exception must be set jdbc_properties before call jdbc_table')

            #  如果view name没有设置, 则设置于表相同的名字
            if not view_name:
                view_name = tb_name

            properties: dict = copy.deepcopy(self.jdbc_properties)
            jdbc_url = properties.pop('url')
            if db_name:
                # 获取url地址, 获取原始db名称, 进行调整
                jdbc_url = re.sub(r'(?<=\d)/(?=[^\d])([^\?\s]+)', f'/{db_name}', jdbc_url)

            df = self.read.jdbc(jdbc_url, table=tb_name, properties=properties, **kwargs)
            df.createTempView(view_name)

            return df

        #  为spark session mock上设置属性
        setattr(spark, 'set_jdbc_properties', set_jdbc_properties)
        print('mock set_jdbc_properties method for spark session')

        setattr(spark, 'jdbc_table', jdbc_table)
        print('mock jdbc_table method for spark session')

test.py内容

import os
from pyspark.sql import SparkSession,DataFrame

from mock import Mock

os.environ['SPARK_HOME']=r'/export/server/spark'
os.environ['SPARK_PYTHON']=r'/root/anaconda3/bin/python'

mk = Mock()
mk.mock(SparkSession)

#  本地测试请把master设置为 local[*]
spark = SparkSession.builder \
        .appName("appname") \
        .master("yarn") \
        .config("spark.sql.shuffle.partitions", 2) \
        .config("spark.driver.memory","512m") \
        .config("spark.executor.memory",'512m') \
        .config("spark.executor.instances",'1') \
        .getOrCreate()

jdbc_url = "jdbc:mysql://172.16.28.42:3310/testdb?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT&useSSL=false"
properties = {
    "user": "root",
    "password": "root",
    "driver": "com.mysql.cj.jdbc.Driver"
}
# 设置一次即可
spark.set_jdbc_properties(url=jdbc_url, **properties)


# 后面通过mock的jdbc_table来读取mysql表数据即可, 自动创建对应的视图,视图名称与我们的表名字一致
df:DataFrame = spark.jdbc_table('ads')

#  打印查看视图名称是否注册成功
print(spark.catalog.listTables())
df.show()
spark.stop()

3. 测试

上述代码经过本地测试,以及yarn cluster模式测试

# 提交命令,由于是测试,我们配置更少的资源,方便你的测试环境可以正常使用
# --jars  如果环境检测不到对应jdbc jar包我们可以通过这个参数指定下

spark-submit --master yarn --deploy-mode cluster --name test  --driver-memory 512m --executor-memory 512m --num-executors 2 --jars hdfs://node1:8020/spark/jars/mysql-connector-j-8.0.33.jar --conf spark.pyspark.python=/root/anaconda3/bin/python --py-files mock.py test.py

通过yarn日志,查看运行结果 [Table(name='ads', database=None, description=None, tableType='TEMPORARY', isTemporary=True)] +-----+-----------+---------+ |ad_id|customer_id|timestamp| +-----+-----------+---------+ | 1| 1| 5| | 2| 2| 17| | 3| 2| 20| +-----+-----------+---------+

0

站点声明:站点主要用于个人技术文章。

冀ICP备19037883号
相关侵权、举报、投诉及建议等,请发E-mail:804330969@qq.com