包阅导读总结
1.
关键词:PyODPS、数据处理、优势劣势、脚手架、案例
2.
总结:本文分享了使用 PyODPS 的体验,探讨其在数据处理上的优势,如灵活处理复杂 JSON 和多条件筛选,也指出学习曲线、效率和文档等不足。作者还介绍了调试技巧、最佳实践及基本脚手架,并通过案例进行说明。
3.
主要内容:
– 初次使用 PyODPS 的经历
– 开始用 ODPS SQL 解决简单需求,后用 PyODPS 处理棘手需求
– PyODPS 的优势
– 灵活的数据处理
– 可全量加载少量表和文件资源
– 优秀的可配置能力
– PyODPS 的劣势
– 学习曲线陡峭
– 运行效率较低
– 文档细节不完善
– 条件判断易踩坑
– 调试技巧与最佳实践
– 推荐的判断条件写法
– 面向 DataFrame 和纯 Python 的思路
– PyODPS 开发的基本脚手架
– 示例代码及相关解释
– 常见写法的问题及正确方式
– 对 list type 的处理示例
– 结语
– 肯定 PyODPS 对数据处理的价值及潜力
思维导图:
文章地址:https://mp.weixin.qq.com/s/fZKZWt0R9WEWpYYU3uCWkA
文章来源:mp.weixin.qq.com
作者:特巴
发布时间:2024/9/11 9:47
语言:中文
总字数:4023字
预计阅读时间:17分钟
评分:87分
标签:PyODPS,数据处理,大数据,Python,调试技巧
以下为原文内容
本内容来源于用户推荐转载,旨在分享知识与观点,如有侵权请联系删除 联系邮箱 media@ilingban.com
刚开始接触ODPS时,最初有一个需求比较简单,通过ODPS SQL的方式很快得到了解决。
不过最近收到了一个稍微棘手一点的数据处理需求:
这里先总结一下PyODPS的优势:
-
灵活的row handle,能灵活地进行数据处理。事实上,需求中也需要对一个json对象进行统计分析,这点上用SQL会非常痛苦。
-
可以全量加载内容比较少的表、文件资源,降低表处理逻辑上的复杂性。而SQL在这点上没有优势,只能疯狂的join。
-
优秀的可配置能力,比如说在我这个需求中出现了需要hardCode配置的多关键字过滤
-
复用SQL处理逻辑,在我的场景里,我需要统计总的比例,与最近15天的比例。但统计逻辑是一样的,不同的是数据的范围~
劣势也很明显:
针对pyodps与python的区别, 我用一段条件判断代码来做个解释:
uv_table = visit_table[
visit_table.key.isin(target_key_list) \
& (visit_table.source == "A")
].groupby(visit_table.target_id)
uv_table = visit_table[
visit_table.key.isin(target_key_list)
and (visit_table.source == "A")
].groupby(visit_table.target_id)
uv_table = visit_table[
visit_table.key.isin(target_key_list) & (visit_table.source == "A")
].groupby(visit_table.target_id)
uv_table = visit_table[
visit_table.key.isin(target_key_list)
& (visit_table.source == "A")
].groupby(visit_table.target_id)
uv_table = visit_table[
visit_table.key in target_key_list & (visit_table.source == "A")
].groupby(visit_table.target_id)
上面的代码示例,全部都可以正常编译且执行,但是从结果上来说却大有不同:
所以先解释下为啥拿判断条件开头:已经被坑了n次了,编译全过,运行完成,但结果却经常没生效某一些条件,导致来来回回全文检查。甚至我感觉这个是目前来说最容易踩坑的点。
最后推荐的判断条件写法如下:
uv_table = visit_table[
(visit_table.key.isin(target_key_list))
& (visit_table.source == "A")
].groupby(visit_table.target_id)
每个判断条件均用()包裹,并换行or不换行&与、|或、~非,分割条件。
从这个点延伸开,我们已经发现了,PyODPS中,有两种思路。一种是面向DataFrame而另一种则是面向纯Python。
正常来说通篇均为面向DataFrame,除了以下情况:
-
通过TableReader、table.head(10)等方法将表数据读取为python的list对象数据。后续的处理逻辑均需要用python去解决。
-
@output代码处理逻辑,全部为python的能力去解决。
-
class Agg,这种自定义聚合代码,均为Python的逻辑进行处理。
而所有与DataFrame相关的逻辑,都必须查文档来处理,比如说对json的处理,我们就需要使用df.func.get_json_obj(table_name.field),而不能使用python的json.loads()。
数据的空判断则需要用a.isnull()或者a.notnull()等方法。
pyodps文档:https://pyodps.readthedocs.io/zh-cn/stable/api-df.html
写完了脚本回来一看就有种理所当然的感觉~不得不说设计上还是巧妙的。
但是这里不得不提一个点:
所以调试我们的PyODPS,就是重中之重!
同时,对于去重来说,官方文档的方法好像是有问题的。
show_room_uv = show_room_uv.agg(show_room_uv=show_room_uv.visitor_id.unique())
吐槽结束,接下来开始本期的重点。
PyODPS开发的基本脚手架
咱们的这个数据处理的功能非常适合以一个基础的脚手架起步~
这里我根据自己的开发经验总结了一个:
from odps.df import DataFrame, Scalar, func, output
bizdate = args["bizdate"]
output_table = "xxxx"
data_process_table = DataFrame(o.get_table("xxxx"))
import json
filters_words = []
with o.get_resource('filters_words.txt').open('r', encoding='utf-8') as f:
filters_words = json.loads(f.read())
data_process_table[
data_process_table.content.isin(filters_words)
]
data_process_table = data_process_table.query(
" or ".join([f"content.contains('{x}')" for x in filters_words])
)
@output(["content_len"], ["int64"])
def handle(row):
yield len(row.content)
res_t = data_process_table[
data_process_table,
data_process_table.apply(handle, axis = 1)
]
class Agg(object):
def buffer(self):
return {
"merge_length": 0
}
def __call__(self, buffer, content_len):
if content_len is not None:
buffer["merge_length"] += content_len
def merge(self, buffer, pbuffer):
buffer["merge_length"] += pbuffer["merge_length"]
def getvalue(self, buffer):
return buffer["merge_length"]
to_agg = agg(
[
res_t.content_len
],
Agg,
rtype="int64",
)
res_t = res_t.groupby("id").agg(value=to_agg)
res_t.head(10)
在总结脚手架的时候,不得不说PyODPS是一个精妙的设计,估计是再也回不去写SQL的日子了。
PyODPS核心思想就两点:
核心文档,写的过程中还是需要不断借鉴:
另外还得吐槽一句,确实很难写。
closely_count_table = data_process_table.groupby('content_len')
.agg(closely_count = data_process_table.content_len)
这个写法里有两个问题:
但是,自定义聚合连着写又没啥问题,只能说最终解释权都在PyOdps。所以这里这么写是最保险的。
closely_count_table = data_process_table.groupby('content_len')
closely_count_table = closely_count_table.agg(closely_count = closely_count_table.content_len)
即使同为DataFrame也有一样的问题,不要妄想用多个[][]来完成多次处理。第一个[]内可以用当前的DataFrame,但第二个[]就不一样了,它需要的是第一个[]返回的DataFrame对象。举个例子:
data_process_table = data_process_table[
data_process_table.content_len > 10
][
data_process_table.content
]
data_process_table = data_process_table[
data_process_table.content_len > 10
]["content"]
data_process_table = data_process_table[data_process_table.content_len > 10]
data_process_table = data_process_table[data_process_table.content]
关于list type:
@output(
[
"list_value"
],
["list<string>"]
)
def handle_list_type(row):
yield [["test1", "test2"]]
试了很多次才得到这个结果。看到结果的瞬间一下次就想明白了。
用这个例子做个解释:
@output(
[
"int_value",
"string_value"
],
["int64", "string"]
)
def handle_list_type(row):
yield 10, "test"
这里的10, “test”是一个元组,恐怕用了list()之类的方法对返回进行了包装。
我最初直接返回[“test1”, “test2”]的情况下,等同于返回2个string。
所以必须再包一层。想明白了这个原理,那么下面的写法会更加优雅:
@output(
[
"list_value"
],
["list<string>"]
)
def handle_list_type(row):
res = ["test1", "test2"]
yield res,
结语
PyODPS的列处理与聚合功能、行处理自定义逻辑,为大数据处理提供了新的视角和工具,让作者乃至更多开发者在告别纯SQL编写的同时,开启了数据处理的新篇章。总之,拥抱变化,勇于实践,PyODPS的潜力等待着每一位数据工程师去挖掘。
团队介绍
我们是淘天集团的场景智能技术团队,作为一支专注于通过AI和3D技术驱动商业创新的技术团队, 依托大淘宝丰富的业务形态和海量的用户、数据, 致力于为消费者提供创新的场景化导购体验, 为商家提供高效的场景化内容创作工具, 为淘宝打造围绕家的场景的第一消费入口。我们不断探索并实践新的技术, 通过持续的技术创新和突破,创新用户导购体验, 提升商家内容生产力, 让用户享受更好的消费体验, 让商家更高效、低成本地经营。