每次都需要通过read.jdbc读取,每次需要单独为每个表设置视图,这样我们后面才可以通过spark.sql来运行
spark = SparkSession.builder \
.appName("appname") \
.master("local[*]") \
.config("spark.sql.shuffle.partitions", 8) \
.getOrCreate()
# jdbc url地址 里面替换自己SQL的地址与数据 dbname这里是你操作的数据库名称
jdbc_url = "jdbc:mysql://localhost:3306/dbname?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT&useSSL=false"
properties = {
"user": "root",
"password": "pwd", # 数据库密码
"driver": "com.mysql.cj.jdbc.Driver" # 驱动 mysql8以上版本指定
}
# 读取表数据
df: DataFrame = spark.read.jdbc(jdbc_url, properties=properties,table='books')
# 创建视图名称
df.createTempView('books')
# 展示数据
df.show()
mock.py内容
# mock.py 这个是mock.py的内容
class Mock(object):
"""
使用SparkSession操作jdbc连接访问数据库的时候,不能像spark本地库或者 spark on hive那么方便
这里我们做个mock, 方便我们更好的操作表数据
"""
def mock(self, spark: 'SparkSession'):
"""
mock 为 SparkSession类增加对应的方法
:param spark: SparkSession 注意是类不是初始化后的对象
:return:
"""
from pyspark.sql import SparkSession
if spark is not SparkSession:
raise ValueError('spark must be SparkSession class')
def set_jdbc_properties(self, **kwargs) -> None:
"""
用于设置jdbc连接属性, 属性参考对应j read.jdbc函数的使用
:param self:
:param kwargs:
:return: None 没返回值
"""
if not isinstance(kwargs, dict):
raise ValueError("kwargs must be an dict type")
# 检查相关的属性,其他属性作为辅助
if not kwargs.get('url'):
raise ValueError("args not contains jdbc url, please set it ")
self.jdbc_properties = kwargs
def jdbc_table(self: 'SparkSession', tb_name, view_name=None, db_name=None, **kwargs) -> 'DataFrame':
import re, copy
# 检查是否初始化过 jdbc properties 如果没有需要初始化下
if not hasattr(self, 'jdbc_properties'):
raise AttributeError('raise Exception must be set jdbc_properties before call jdbc_table')
# 如果view name没有设置, 则设置于表相同的名字
if not view_name:
view_name = tb_name
properties: dict = copy.deepcopy(self.jdbc_properties)
jdbc_url = properties.pop('url')
if db_name:
# 获取url地址, 获取原始db名称, 进行调整
jdbc_url = re.sub(r'(?<=\d)/(?=[^\d])([^\?\s]+)', f'/{db_name}', jdbc_url)
df = self.read.jdbc(jdbc_url, table=tb_name, properties=properties, **kwargs)
df.createTempView(view_name)
return df
# 为spark session mock上设置属性
setattr(spark, 'set_jdbc_properties', set_jdbc_properties)
print('mock set_jdbc_properties method for spark session')
setattr(spark, 'jdbc_table', jdbc_table)
print('mock jdbc_table method for spark session')
test.py内容
import os
from pyspark.sql import SparkSession,DataFrame
from mock import Mock
os.environ['SPARK_HOME']=r'/export/server/spark'
os.environ['SPARK_PYTHON']=r'/root/anaconda3/bin/python'
mk = Mock()
mk.mock(SparkSession)
# 本地测试请把master设置为 local[*]
spark = SparkSession.builder \
.appName("appname") \
.master("yarn") \
.config("spark.sql.shuffle.partitions", 2) \
.config("spark.driver.memory","512m") \
.config("spark.executor.memory",'512m') \
.config("spark.executor.instances",'1') \
.getOrCreate()
jdbc_url = "jdbc:mysql://172.16.28.42:3310/testdb?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT&useSSL=false"
properties = {
"user": "root",
"password": "root",
"driver": "com.mysql.cj.jdbc.Driver"
}
# 设置一次即可
spark.set_jdbc_properties(url=jdbc_url, **properties)
# 后面通过mock的jdbc_table来读取mysql表数据即可, 自动创建对应的视图,视图名称与我们的表名字一致
df:DataFrame = spark.jdbc_table('ads')
# 打印查看视图名称是否注册成功
print(spark.catalog.listTables())
df.show()
spark.stop()
上述代码经过本地测试,以及yarn cluster模式测试
# 提交命令,由于是测试,我们配置更少的资源,方便你的测试环境可以正常使用
# --jars 如果环境检测不到对应jdbc jar包我们可以通过这个参数指定下
spark-submit --master yarn --deploy-mode cluster --name test --driver-memory 512m --executor-memory 512m --num-executors 2 --jars hdfs://node1:8020/spark/jars/mysql-connector-j-8.0.33.jar --conf spark.pyspark.python=/root/anaconda3/bin/python --py-files mock.py test.py
通过yarn日志,查看运行结果 [Table(name='ads', database=None, description=None, tableType='TEMPORARY', isTemporary=True)] +-----+-----------+---------+ |ad_id|customer_id|timestamp| +-----+-----------+---------+ | 1| 1| 5| | 2| 2| 17| | 3| 2| 20| +-----+-----------+---------+
0