當(dāng)大量小文件變成大麻煩(以及我是如何用PyArrow編寫壓縮工具來解決的)
这一切始于一个相当正常的数据管道。事件通过Kafka进入,经过一些轻量级的微批处理后,以Parquet文件的形式落在AWS S3上。乍一看,一切都很整洁。高效且可预测。但有一天,我打开其中一个按小时划分的文件夹,看到了我无意中制造的混乱——里面有数百个文件,只有几KB大小。
并不急迫 —— 直到变得非常紧迫
Parquet 应该是高效的。它是列式的,经过压缩并设计用于数据分析工作流。但没有人警告你的是,当你的管道变成了生成小文件的工厂时会发生什么。所有这些小文件,频繁且自动地生成,不仅数量增加,管理成本也随之上升——而这不仅仅是技术上的开销。在 Amazon S3 中,每个对象的最小计费单位为 128KB,这意味着即使你的文件只有几个 KB,仍然会被按 128KB 来计费。当你每小时生成几百到几千个小文件时,这些成本开始增加。总存储空间可能看起来不大,但对象数量、列出时间以及像 Athena 这样的工具处理碎片数据的方式很快就会变成真正的瓶颈。流程并没有出问题,但它显然变得臃肿和效率低下——我意识到我必须采取行动了。
我原本不想把它复杂化了
在编写任何代码之前,我做了任何一个称职的工程师都会做的事情——看看是否有人已经以比我更好的方式解决了这个问题。首先想到的是Spark,它处理Parquet文件得心应手,速度快且分布式,但它似乎是个应对本质上是文件管理任务的重量级解决方案。AWS Glue也很有吸引力:它是无服务器的,与Parquet文件配合得很好,但启动时间较长,如果频繁运行作业或处理大量小文件,成本会很快累积。Dask也进入了我的视野,但在处理S3时,我对其保持谨慎。DuckDB是最有趣的考虑对象。我确实喜欢本地使用它,但由于这个管道是在一个内存有限的Kubernetes容器中运行的,我不确定它能否一次性处理几个百个文件而不最终不堪重负。
PyArrow 已经存在了 —— 所以我用它了。
最后我还是回到了PyArrow。我已经在管道的某些小部分使用它了,它正好合适。没有编排工具,没有YAML配置,没有spark-submit命令,没有平台开销。我不是想重建世界,我只是想减少复杂性。
为了试一下,我从纽约市开放数据门户获取了2025年1月的黄色出租车行程数据集(数据集)。我将完整的Parquet文件拆分成更小的部分,拆分后的文件一共有696个,每个文件都有5000行。这大概是频繁刷新或在较小时间间隔进行微批处理的流处理管道生成的数据规模和结构。这感觉就像是真实地模拟了生产环境中的混乱情况。
这是我用来分割它的代码:
# 原代码位置
此文件包含隐藏的或双向Unicode文本,可能与下面看到的内容有所不同。要查看,请用能显示隐藏Unicode字符的编辑器打开文件。了解有关双向Unicode字符的更多信息
[显示隐藏字符](https://dev.to/{{ revealButtonHref }})
导入 pyarrow.parquet as pq, |
---|
导入 pyarrow.dataset as ds, |
导入 pyarrow as pa, |
导入 os, |
source_path = "yellow_tripdata_2025-01.parquet" |
output_base = "demo_data" |
dataset = ds.dataset(source_path, format="parquet") |
batches = dataset.to_batches(batch_size=5000) |
for i, batch in enumerate(batches): |
os.makedirs('demo_data', exist_ok=True) |
table = pa.Table.from_batches([batch]) |
pq.write_table(table, f"demodata/split{i}.parquet", compression='gzip') |
查看原始 gistfile1.txt 由 GitHub 主办
顺便说一下:pyarrow.dataset
允许你将一组文件视作一个逻辑表,这使得加载、转换或压缩多文件中的数据变得非常容易,而无需一次性将所有数据加载到内存中。如果你之前还没用过,不妨试试。可以在这里查看文档。
当我有了一个装满小文件的文件夹时,我编写了一个简短的脚本,使用 pyarrow.dataset
来读取所有文件,将它们合并成一个单一的数据集,并将它们以压缩形式写回。你可以通过调整参数,如 min_rows_per_group
、max_rows_per_group
和 max_rows_per_file
来微调输出大小——仅仅几行代码就能获得如此多的控制权真是太神奇了。下面是代码示例展示压缩部分:
# 下面是代码示例展示压缩部分:
此文件包含隐藏或双向Unicode文本,可能与下面看到的有所差异。请在可显示隐藏Unicode字符的编辑器中打开文件。了解更多关于双向Unicode字符的信息
[显示隐藏字符](https://dev.to/{{ revealButtonHref }})
from pyarrow import fs |
---|
import pyarrow.dataset as ds |
def list_parquet_files(local_fs, path): |
file_infos = local_fs.get_file_info(fs.FileSelector(path, recursive=True)) |
parquet_files = [ |
info.path |
for info in file_infos |
if info.type == fs.FileType.File and info.path.endswith(".parquet") |
] |
return parquet_files |
local_fs = fs.LocalFileSystem() |
files = list_parquet_files(local_fs, "demo_data") |
print(f"发现 {len(files)} 个示例数据文件") |
dataset = ds.dataset( |
files, |
format="parquet", |
filesystem=local_fs, |
) |
min_rows_per_group = 500 |
max_rows_per_file = 1000000 |
max_rows_per_group = 10000 |
ds.write_dataset( |
dataset, |
"output_demo_data", |
format="parquet", |
basenametemplate=f"compacted{{i}}.parquet", |
min_rows_per_group=min_rows_per_group, |
max_rows_per_file=max_rows_per_file, |
max_rows_per_group=max_rows_per_group, |
existing_data_behavior="overwrite_or_ignore", |
use_threads=True, |
filesystem=fs.LocalFileSystem(), |
file_options=ds.ParquetFileFormat().make_write_options(compression="gzip"), |
) |
查看原始代码 without_batching.py 文件 由 GitHub 提供支持
就这样,我从接近700个微文件减少到几个压缩文件——每个文件在S3中更经济,并且在下游分析中快速扫描。
在测试期间,它运行得非常顺利,当时我在一个小文件夹中运行它,文件夹里只有几份。但在实际数据中,有些小时比其他小时更繁忙。根据流量,一个单小时文件夹最终可能包含数千个文件,这时问题就开始显现了。PyArrow 的 dataset
方法尝试一次性读取所有文件,虽然它给你很大的控制权,但也假设你知道要给系统带来什么样的负载。直到我在 Kubernetes 的一个 Pod 中运行压缩器并遇到内存溢出错误,我才意识到问题的严重性。原来,即使是较小的 Parquet 文件也会迅速累积起来,特别是当你考虑到所有在后台加载到内存中的元数据和行组。
批处理就是解决办法
我把整个过程重新写了一遍,考虑到了批量处理。我没有一次加载所有文件,而是将它们分成每100个为一组,读取每批文件,并将它们压缩成一个新文件,清理内存后继续处理。这方法简单有效。最终版本大致如下:
此文件包含隐藏或双向Unicode文本,可能与这里显示的内容不同。要查看,请使用能显示隐藏或双向Unicode字符的编辑器打开文件。了解更多关于双向Unicode字符的信息
[显示隐藏字符](https://dev.to/{{ revealButtonHref }})
import pyarrow.dataset as ds |
---|
from pyarrow import fs |
import os |
def 列出parquet文件列表(local_fs, path): |
file_infos = local_fs.get_file_info(fs.FileSelector(path, recursive=True)) |
return [ |
info.path |
for info in file_infos |
if info.type == fs.FileType.File and info.path.endswith(".parquet") |
] |
input_path = 'demo_data' |
output_path = 'output_demo_data_batched' |
files_per_batch = 100 |
min_rows_per_group = 500 |
max_rows_per_file = 1000000 |
max_rows_per_group = 10000 |
local_fs = fs.LocalFileSystem() |
all_files = 列出parquet文件列表(local_fs, input_path) |
print(f"找到了 {len(all_files)} 个输入的parquet文件") |
os.makedirs(output_path, exist_ok=True) |
for batch_idx in range(0, len(all_files), files_per_batch): |
batch_files = all_files[batch_idx: batch_idx + files_per_batch] |
print(f"正在处理从 {batch_idx} 到 {batch_idx + len(batch_files)} 的文件") |
dataset = ds.dataset(batch_files, format="parquet", filesystem=local_fs) |
ds.write_dataset( |
dataset, |
output_path, |
format="parquet", |
basename_template=f"compactedbatch{batch_idx // files_perbatch}{{i}}.parquet", |
min_rows_per_group=min_rows_per_group, |
max_rows_per_file=max_rows_per_file, |
max_rows_per_group=max_rows_per_group, |
existing_data_behavior="overwrite_or_ignore", |
use_threads=True, |
filesystem=local_fs, |
file_options=ds.ParquetFileFormat().make_write_options(compression="gzip"), |
) |
del dataset |
查看原始代码 with_batching.py 代码文件 由 GitHub 提供支持
这个版本在处理少量文件时也同样快速,但在处理数百个文件时也不会变得缓慢。你可以根据你的内存大小调整批处理量,输出文件仍然会结构清晰并且被有效压缩。
想在S3上跑一下这个程序吗?
如果你是在本地测试,只需改动一行代码即可切换到S3。将 fs.LocalFileSystem()
替换为 fs.S3FileSystem(region="us-east-1")
,一切就会正常工作。如果已经在环境里设置了AWS凭证,PyArrow会自动找到这些凭证。如果没有设置凭证,你可以直接传递它们,例如:
s3 = fs.S3FileSystem(
access_key="YOUR_ACCESS_KEY",
secret_key="YOUR_SECRET_KEY",
region="us-east-1"
)
# 这里的S3FileSystem是用于S3文件系统的访问,access_key和secret_key是访问密钥和秘密密钥,region指的是区域,这里是美国东部。
进入全屏
退出全屏
就这样了...
随着这些改动的实施,压缩机不再只是一个快速实验,而是开始像一个真正的工具一样工作。它读取了数百个小型的Parquet文件,将它们合并成可管理的批处理,并以干净的GZIP压缩格式输出,同时保持合理的行组长度。输出整洁、扫描速度快且存储成本低。内存使用情况如何?非常可靠。我在初次运行时监控了它的内存使用,但从那以后,它只是静悄悄地在后台运行,默默地完成它的任务,不引人注目,这实际上是对任何数据管道组件所能说的最好的评价了。我已经在生产环境中运行了一段时间,自那以后从未改动过。
如果你也被小文件淹没...
如果你也被小文件淹没,可以......
你可能不需要 Spark。你可能也不需要 Glue。你真正需要的是一个知道如何表现的小脚本,不会占用过多资源,并在流处理管道完成后默默地清理一切。PyArrow 虽然不那么引人注目,但对于这样的任务,它正好合适——有足够的控制力来完成任务,而不会引入整个数据平台。
📂 代码可在这里获取
📚 了解更多关于pyarrow.dataset
的内容
共同學(xué)習(xí),寫下你的評(píng)論
評(píng)論加載中...
作者其他優(yōu)質(zhì)文章