大数据

自定义hive udf 流程步骤 python版本

文 / sptk 来源 / 原创 阅读 / 412 8月前

我们通过下面的示例来介绍下 使用python来完成udf的实现,严格来说不能算udf 而是hive支持调用外部脚本来完成数据转换的功能 下面的示例也可以理解为 自定义map reduer

1. 编写对用的python 脚本

# mapper.py
#!/usr/bin/env python
import sys

# hive借助了标准的输入输出 来完成数据的读写
# 从标准输入读取传入的每行数据
for line in sys.stdin:
    # 假设每一行数据以逗号分隔,并且包含两列:value 和 count
    value, count = line.strip().split(',')

    # 进行数据转换逻辑,这里仅将 value 和 count 作为输出
    transformed_value = value
    transformed_count = count

    # 输出转换后的结果,以制表符分隔
    print('\t'.join([transformed_value, transformed_count]))
# producer.py
#!/usr/bin/env python
import sys

# 当前的 key 和计数器
current_key = None
count = 0

# 从标准输入读取每一行数据
for line in sys.stdin:
    # 假设每一行数据以制表符分隔,并且包含两列:value 和 count
    value, count = line.strip().split('\t')

    # 将 count 转换为整型
    count = int(count)

    # 如果当前的 key 和读取的 key 不同,则输出前一个 key 的结果
    if current_key is not None and current_key != value:
        print('\t'.join([current_key, str(count)]))

    # 更新当前的 key 和计数器
    current_key = value
    count += count

# 输出最后一个 key 的结果
if current_key is not None:
    print('\t'.join([current_key, str(count)]))

上面的2个脚本都可以当做一个 transform脚本, 或者理解成它们与udf方法有类似的功能

2. 创建hive的测试数据

-- 创建表数据
create table tst (id int, data string) row format delimited fields terminated by '\t';
-- 插入测试数据
insert into table tst values (1,'word,10'),(2,'hello,20'),(3,'play,50');

3. hive中调用python脚本来完成数据转换操作

-- 由于输入输出都是经过标准输入流所以最终转换后得到的结果是string类型
-- 如果在这个上面的基础上做sort by排序转换,还需要额外的查询转换。
FROM (FROM (FROM tst
            SELECT TRANSFORM(data)
            USING '/root/anaconda3/bin/python /root/test/mapper.py'
            AS value, count) mapped
      SELECT value, cast(count as int) AS count
      SORT BY value, count) sorted
SELECT TRANSFORM(value, count)
USING '/root/anaconda3/bin/python /root/test/reducer.py'
AS k,v;


-- 上面的流程
-- 1. 从tst使用指定的map脚本对 data进行 transform转换,变成2个值。
-- 2. 转换后的值是字符类型,如果使用sort by 没办法直接按照值的方式排序。需要先做cast转换
-- 3. 使用指定的脚本做reduce操作,返回指定的字段 并使用k,v字段来接受。

-- 更详细的 transform map-reduce语法规则 参考: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Transform#LanguageManualTransform-Transform/Map-ReduceSyntax

4. 注意事项 说明

transform 不能与其他字段一起查询使用,仅仅是单独的一个tranform语句. 例如: from tmp select id, transform(data) as (value,count) 这个是一个错误的示例不允许使用 USING 指定外部脚本 注意要指定运行脚本的命令 /root/anaconda3/bin/python /root/test/mapper.py 需要我们每个节点都有相同的python环境,因为如果我们的任务提交到yarn执行不确定会在那个节点来运行我们的脚本

5. 下面提供一个利用pyhive pyodbc 等使用自定义udf的模块 是对上面的做了封装. 感兴趣的可以看看

https://pypi.org/project/hive-udf/

6. 关于using 脚本的说明

# 注意这个里面我们说明下 关于脚本文件的问题 
-- 方式1 [推荐]
-- 指定hdfs路径,注意这个情况 使用的是集群环境中的默认的解释器
select  TRANSFORM (data) USING 'hdfs://node1:8020/test/mapper.py' AS value, count from tst;
-- 上面的会加载文件到一个临时目录 可以使用下面的命令查看
list files;  -- /tmp/65487ca3-5a99-4dbf-afbc-7de4ee499187_resources/mapper.py

-- 如果要调整脚本文件 注意要删除指定的文件重新上传
-- 注意不要使用delete files 这样会删除所有的脚本
delete file /tmp/65487ca3-5a99-4dbf-afbc-7de4ee499187_resources/mapper.py

-- 方式2 指定解释器 并制定路径 注意路径不能是hdfs路径 [不推荐,这样会限制transform执行节点]
-- 注意 这个可以灵活指定不同的环境 但是要求脚本所在的路径必须有对应的python环境
-- 集群执行任务过程,会在对应的节点单独启动容器来运行transform 脚本
select  TRANSFORM (data) USING '/root/anaconda3/bin/python /root/test/mapper.py' AS value, count from tst;

-- 方式3  先添加脚本文件 再引用脚本 [推荐]
add file  /root/test/mapper.py;
select TRANSFORM (data) USING 'mapper.py' AS value, count from tst;
-- 或者 
select TRANSFORM (data) USING '/root/anaconda3/bin/python
 mapper.py' AS value, count from tst;
 -- 注意 如果要调整脚本 那么我们需要delete file 删除后重新添加

1

站点声明:站点主要用于个人技术文章。

冀ICP备19037883号
相关侵权、举报、投诉及建议等,请发E-mail:804330969@qq.com