我们通过下面的示例来介绍下 使用python来完成udf的实现,严格来说不能算udf 而是hive支持调用外部脚本来完成数据转换的功能 下面的示例也可以理解为 自定义map reduer
# mapper.py
#!/usr/bin/env python
import sys
# hive借助了标准的输入输出 来完成数据的读写
# 从标准输入读取传入的每行数据
for line in sys.stdin:
# 假设每一行数据以逗号分隔,并且包含两列:value 和 count
value, count = line.strip().split(',')
# 进行数据转换逻辑,这里仅将 value 和 count 作为输出
transformed_value = value
transformed_count = count
# 输出转换后的结果,以制表符分隔
print('\t'.join([transformed_value, transformed_count]))
# producer.py
#!/usr/bin/env python
import sys
# 当前的 key 和计数器
current_key = None
count = 0
# 从标准输入读取每一行数据
for line in sys.stdin:
# 假设每一行数据以制表符分隔,并且包含两列:value 和 count
value, count = line.strip().split('\t')
# 将 count 转换为整型
count = int(count)
# 如果当前的 key 和读取的 key 不同,则输出前一个 key 的结果
if current_key is not None and current_key != value:
print('\t'.join([current_key, str(count)]))
# 更新当前的 key 和计数器
current_key = value
count += count
# 输出最后一个 key 的结果
if current_key is not None:
print('\t'.join([current_key, str(count)]))
上面的2个脚本都可以当做一个 transform脚本, 或者理解成它们与udf方法有类似的功能
-- 创建表数据
create table tst (id int, data string) row format delimited fields terminated by '\t';
-- 插入测试数据
insert into table tst values (1,'word,10'),(2,'hello,20'),(3,'play,50');
-- 由于输入输出都是经过标准输入流所以最终转换后得到的结果是string类型
-- 如果在这个上面的基础上做sort by排序转换,还需要额外的查询转换。
FROM (FROM (FROM tst
SELECT TRANSFORM(data)
USING '/root/anaconda3/bin/python /root/test/mapper.py'
AS value, count) mapped
SELECT value, cast(count as int) AS count
SORT BY value, count) sorted
SELECT TRANSFORM(value, count)
USING '/root/anaconda3/bin/python /root/test/reducer.py'
AS k,v;
-- 上面的流程
-- 1. 从tst使用指定的map脚本对 data进行 transform转换,变成2个值。
-- 2. 转换后的值是字符类型,如果使用sort by 没办法直接按照值的方式排序。需要先做cast转换
-- 3. 使用指定的脚本做reduce操作,返回指定的字段 并使用k,v字段来接受。
-- 更详细的 transform map-reduce语法规则 参考: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Transform#LanguageManualTransform-Transform/Map-ReduceSyntax
transform 不能与其他字段一起查询使用,仅仅是单独的一个tranform语句. 例如:
from tmp select id, transform(data) as (value,count)
这个是一个错误的示例不允许使用 USING 指定外部脚本 注意要指定运行脚本的命令 /root/anaconda3/bin/python /root/test/mapper.py 需要我们每个节点都有相同的python环境,因为如果我们的任务提交到yarn执行不确定会在那个节点来运行我们的脚本
https://pypi.org/project/hive-udf/
# 注意这个里面我们说明下 关于脚本文件的问题
-- 方式1 [推荐]
-- 指定hdfs路径,注意这个情况 使用的是集群环境中的默认的解释器
select TRANSFORM (data) USING 'hdfs://node1:8020/test/mapper.py' AS value, count from tst;
-- 上面的会加载文件到一个临时目录 可以使用下面的命令查看
list files; -- /tmp/65487ca3-5a99-4dbf-afbc-7de4ee499187_resources/mapper.py
-- 如果要调整脚本文件 注意要删除指定的文件重新上传
-- 注意不要使用delete files 这样会删除所有的脚本
delete file /tmp/65487ca3-5a99-4dbf-afbc-7de4ee499187_resources/mapper.py
-- 方式2 指定解释器 并制定路径 注意路径不能是hdfs路径 [不推荐,这样会限制transform执行节点]
-- 注意 这个可以灵活指定不同的环境 但是要求脚本所在的路径必须有对应的python环境
-- 集群执行任务过程,会在对应的节点单独启动容器来运行transform 脚本
select TRANSFORM (data) USING '/root/anaconda3/bin/python /root/test/mapper.py' AS value, count from tst;
-- 方式3 先添加脚本文件 再引用脚本 [推荐]
add file /root/test/mapper.py;
select TRANSFORM (data) USING 'mapper.py' AS value, count from tst;
-- 或者
select TRANSFORM (data) USING '/root/anaconda3/bin/python
mapper.py' AS value, count from tst;
-- 注意 如果要调整脚本 那么我们需要delete file 删除后重新添加
1