datatrove
datatrove 是一款专为大规模文本数据处理设计的开源库,旨在帮助开发者和研究人员高效完成数据清洗、过滤及去重任务。面对大语言模型训练中海量数据处理的复杂性,传统脚本往往难以维护且扩展性差,datatrove 通过提供一套可自定义的流水线处理模块,将繁琐的脚本工作转化为清晰、模块化的流程,从而解决了“脚本混乱”的痛点。
该工具特别适合需要处理 TB 级数据的 AI 工程师、数据科学家及大模型研究者。其核心亮点在于平台无关性:同一套处理逻辑无需修改代码,即可在本地单机或 Slurm 集群上无缝运行。datatrove 采用多步骤流水线设计,内存占用低,支持从 CommonCrawl 等原始格式提取文本、合成数据生成到最终分词保存的全链路操作。此外,它基于 fsspec 兼容本地、S3 等多种文件系统,并内置对 Ray 等分布式计算引擎的支持,让用户能灵活应对不同规模的基础设施环境,轻松构建稳健的数据预处理管道。
使用场景
某 AI 实验室团队正致力于构建一个万亿级 token 的大语言模型训练数据集,需要处理从 Common Crawl 抓取的数百 TB 原始网页数据。
没有 datatrove 时
- 脚本维护噩梦:工程师不得不编写数千行复杂的自定义 Python 脚本来串联解压、文本提取、过滤和去重步骤,代码耦合度高,难以复用和调试。
- 扩展性瓶颈:本地单机处理速度极慢,而将任务迁移到 Slurm 集群时,需重写大量分布式逻辑和容错代码,耗时数周且极易出错。
- 资源管理失控:缺乏优化的内存管理机制,处理大文件时频繁发生内存溢出(OOM),导致任务中断,数据中间状态丢失。
- 格式适配困难:面对 WARC、Parquet、Arrow 等多种输入输出格式,需手动集成各类底层库,开发效率低下。
使用 datatrove 后
- 流水线模块化:利用 datatrove 预置的标准化处理模块(如文本提取器、过滤器),通过简单配置即可组装复杂流程,彻底告别“脚本拼凑”。
- 无缝集群部署:凭借平台无关的特性,只需切换执行器(Executor),同一套代码即可直接从本地运行平滑迁移至 Slurm 或 Ray 集群,实现弹性扩容。
- 高效稳定运行:datatrove 的多阶段设计和低内存占用特性,确保在大规模数据处理中稳定运行,显著降低硬件成本并提升吞吐量。
- 统一数据接口:内置对 fsspec 的支持,让团队能透明地读写本地磁盘、S3 存储或 Hugging Face Hub 上的多种文件格式,无需关心底层细节。
datatrove 将原本混乱的数据清洗工程转化为可维护、可扩展的标准化流水线,让团队能专注于数据策略而非基础设施搭建。
运行环境要求
- Linux
- macOS
- Windows
未说明
未说明(文档提及相对较低的内存占用,适合大规模工作负载)

快速开始
DataTrove
DataTrove 是一个用于大规模处理、过滤和去重文本数据的库。它提供了一组预构建的常用处理模块,并配备了一个框架,方便用户添加自定义功能。
DataTrove 的处理管道与平台无关,既可以在本地直接运行,也可以在 Slurm 集群上运行。由于其内存占用相对较低且采用多步骤设计,因此非常适合处理大型工作负载,例如处理大型语言模型的训练数据。
通过 fsspec 支持本地、远程及其他文件系统。
目录
安装
需要 Python 3.10 或更高版本。
pip install datatrove[FLAVOUR]
可用的风味(用逗号分隔,例如 [processing,s3]):
all:安装所有依赖项:pip install datatrove[all]io:用于读取warc/arc/wet文件以及 arrow/parquet/Optimized-parquet 格式的依赖项:pip install datatrove[io]processing:用于文本提取、过滤和分词的依赖项:pip install datatrove[processing]s3:S3 支持:pip install datatrove[s3]cli:命令行工具支持:pip install datatrove[cli]ray:分布式计算引擎支持:pip install datatrove[ray]inference:用于 LLM 推理管道:pip install datatrove[inference]decont:使用 lighteval 进行去污支持:pip install datatrove[decont]multilingual:用于多语言文本处理:pip install datatrove[multilingual]
快速入门示例
您可以查看以下 示例:
- fineweb.py:完整复现 FineWeb 数据集
- process_common_crawl_dump.py:完整管道,用于读取 CommonCrawl 的 WARC 文件,提取其中的文本内容,进行过滤,并将结果保存到 S3。该示例在 Slurm 上运行。
- tokenize_c4.py:直接从 Hugging Face Hub 读取数据,使用
gpt2分词器对 C4 数据集的英文部分进行分词。 - estimate_tokens.py:估算大型 HF 数据集的总 token 数——在创建随机打乱的子样本时(例如从数千亿 token 的数据集中抽取 1000 亿 token),这一步骤是设置正确
SamplerFilter比率所必需的。该示例会流式处理每个数据集的小样本,最终收敛到平均每个文档的 token 数,再乘以总行数。 - smol_data.py:为多个大型 Hugging Face 数据集构建约 1000 亿 token 的子集,包括 50-30-20 的混合比例以及打乱后的变体。
- minhash_deduplication.py:完整的 MinHash 去重管道。
- sentence_deduplication.py:句子级别的精确去重示例。
- exact_substrings.py:ExactSubstr 示例(需要 此仓库)。
- finephrase.py:独立示例,使用多种提示模板大规模生成合成数据集。
术语
pipeline:要执行的一系列处理步骤(读取数据、过滤、写入磁盘等)。executor:在特定执行环境中运行特定管道的组件(如 Slurm、多核 CPU 机器等)。job:在给定执行器上执行的一个管道。task:一个job由多个task组成,这些task用于并行化执行,通常每个task处理一个shard的数据。Datatrove 会跟踪哪些任务已完成,重新启动时只会运行未完成的任务。file:单个输入文件(如 .json、.csv 等)。
[!TIP] 请注意,每个文件将由一个单独的
task处理。Datatrove 不会自动将一个文件拆分为多个部分,因此为了实现完全并行化,您应该准备多个中等大小的文件,而不是一个大文件。
shard:一组输入数据(通常是若干file),会被分配给特定的task。每个task将处理来自全部输入文件的不同且不重叠的shard数据。worker:一次只能执行一个任务的计算资源,例如,如果您有 50 个 CPU 核心,则可以使用workers=50的 LocalPipelineExecutor 来同时执行 50 个task(每个 CPU 执行一个)。一旦某个worker完成一个task,就会开始处理下一个等待中的task。
[!TIP] 您的
tasks数量决定了可以并行化的程度,也影响了每个处理单元所需的时间。如果您的任务数量较少(因此每个任务需要处理大量文件),并且这些任务失败,您将不得不从头开始;而如果任务数量较多但每个任务处理的文件较少,则每次失败的任务重新运行所需的时间会短得多。
[!CAUTION] 如果您的
tasks数量大于files数量,那么有些任务将不会处理任何数据,因此通常没有必要将tasks设置为超过files数量。
示例
在一个拥有 100 个 CPU 核心(workers)的机器上,运行一个处理 10000 个 files 的 job。如果我们选择使用 1000 个 task,那么每个 task 将处理包含 10 个文件的 shard。workers=100 表示我们可以同时处理 100 个 task。
管道
DataTrove 文档
每个管道块都以 Datatrove 的 Document 格式处理数据:
text:每个样本的实际文本内容。id:该样本的唯一 ID(字符串)。metadata:一个字典,可用于存储任何附加信息。
管道块的类型
每个管道块都以 Document 的生成器作为输入,并返回另一个 Document 的生成器。
- 读取器 从不同格式中读取数据,并生成
Document - 写入器 将
Document以不同格式保存到磁盘或云端 - 提取器 从原始格式(如网页 HTML)中提取文本内容
- 过滤器 根据特定规则或标准过滤掉部分
Document - 统计器 用于收集数据集统计信息的块
- 分词器 用于对数据进行分词或统计词数的块
- 去重器 用于去重的块
完整管道
管道被定义为一系列管道块。例如,以下管道将从磁盘读取数据,随机过滤(移除)部分文档,然后将其写回磁盘:
from datatrove.pipeline.readers import CSVReader
from datatrove.pipeline.filters import SamplerFilter
from datatrove.pipeline.writers import JsonlWriter
pipeline = [
CSVReader(
data_folder="/my/input/path"
),
SamplerFilter(rate=0.5),
JsonlWriter(
output_folder="/my/output/path"
)
]
执行器
管道与平台无关,这意味着同一管道无需更改其步骤即可在不同的执行环境中顺利运行。每种环境都有其专属的管道执行器。
所有执行器通用的一些选项:
pipeline:一个由要运行的管道步骤组成的列表logging_dir:用于保存日志文件、统计信息等的数据文件夹。请勿为不同管道或作业重复使用同一文件夹,否则会覆盖您的统计、日志和已完成任务记录。skip_completed(布尔值,默认为True):Datatrove 会跟踪已完成的任务,以便在重新启动作业时跳过这些任务。将其设置为False可禁用此行为。randomize_start_duration(整数,默认为0):每个任务开始前的最大延迟秒数,用于防止所有任务同时启动而可能造成系统过载。
调用执行器的 run 方法即可执行其管道。
[!提示] Datatrove 通过在
${logging_dir}/completions文件夹中创建标记文件(空文件)来跟踪哪些任务已成功完成。作业完成后,如果部分任务失败,您可以直接重新启动完全相同的执行器,Datatrove 会检查并仅运行之前未完成的任务。
[!注意] 如果因部分任务失败而重新启动管道,请不要更改总任务数,因为这会影响输入文件的分配和分片。
本地管道执行器
该执行器将在本地机器上启动管道。选项如下:
tasks:要运行的总任务数workers:同时运行的任务数量。若设置为-1,则无限制。任何大于1的值都会使用多进程来执行任务。start_method:用于启动多进程池的方法。若workers为1,则忽略此选项。
执行器示例
from datatrove.executor import LocalPipelineExecutor
executor = LocalPipelineExecutor(
pipeline=[
...
],
logging_dir="logs/",
tasks=10,
workers=5
)
executor.run()
多节点并行处理
您可以通过使用 local_tasks 和 local_rank_offset 参数,让不同的节点或机器分别处理总任务的不同部分。对于每个节点、实例或机器,应使用以下选项启动:
tasks:需要执行的总任务数(跨所有机器)。此值必须在每台机器上保持一致,否则输入文件的分配可能会重叠! 示例:500local_tasks:本机将执行的总任务中的子任务数。请注意,每台机器可以使用不同的数值。示例:100local_rank_offset:本机将开始执行的第一项任务的排名。假设这是您启动作业的第 3 台机器,前两台机器分别执行了 250 和 150 个任务,则当前机器的local_rank_offset应设为400。
要获得最终合并的统计信息,您需要手动在包含所有机器统计信息的路径上调用 merge_stats 脚本。
SlurmPipelineExecutor
此执行器将在 Slurm 集群上启动一个流水线,使用 Slurm 作业数组来分组和管理任务。 选项:
tasks要运行的总任务数。必填timeSlurm 时间限制字符串。必填partitionSlurm 分区。必填workers同时运行的任务数量。如果设置为-1,则无限制。Slurm 将一次运行workers个任务。(默认值:-1)job_nameSlurm 作业名称(默认值:"data_processing")depends另一个 SlurmPipelineExecutor 实例,该实例将成为此流水线的依赖项(当前流水线仅在依赖的流水线成功完成后才会开始执行)sbatch_args字典,包含您希望传递给 sbatch 的任何其他参数slurm_logs_folder用于保存 Slurm 日志文件的目录。如果使用本地路径作为logging_dir,日志将保存在logging_dir/slurm_logs中。否则,它们将被保存为当前目录下的子目录。
其他选项
cpus_per_task每个任务分配的 CPU 数量(默认值:1)qosSlurm QoS(默认值:"normal")mem_per_cpu_gb每个 CPU 的内存大小,单位为 GB(默认值:2)env_command自定义命令,用于激活 Python 环境(如有需要)condaenv要激活的 Conda 环境venv_path要激活的 Python 环境路径max_array_size$ scontrol show config中的 MaxArraySize 值。如果任务数量超过此值,将拆分为多个数组作业(默认值:1001)max_array_launch_parallel如果由于 max_array_size 需要多个作业,是同时并行启动还是依次启动(默认值:False)stagger_max_array_jobs当 max_array_launch_parallel 为 True 时,此选项决定并行启动每个作业之间应等待多少秒(默认值:0)run_on_dependency_fail即使我们所依赖的作业失败,也应在该作业完成后开始执行(默认值:False)randomize_start在约 3 分钟的时间窗口内随机化每个作业中任务的启动时间。例如,在大量访问 S3 存储桶时非常有用。(默认值:False)
执行器示例
from datatrove.executor import SlurmPipelineExecutor
executor1 = SlurmPipelineExecutor(
pipeline=[
...
],
job_name="my_cool_job1",
logging_dir="logs/job1",
tasks=500,
workers=100, # 不设置则所有任务同时运行
time="10:00:00", # 10 小时
partition="hopper-cpu"
)
executor2 = SlurmPipelineExecutor(
pipeline=[
...
],
job_name="my_cool_job2",
logging_dir="logs/job2",
tasks=1,
time="5:00:00", # 5 小时
partition="hopper-cpu",
depends=executor1 # 该流水线仅在 executor1 成功完成后才会启动
)
# executor1.run()
executor2.run() # 这实际上会启动 executor1,因为它是一个依赖项,因此无需显式启动
RayPipelineExecutor
此执行器将在 Ray 集群上启动一个流水线,使用 Ray 任务进行并行执行。 选项:
tasks要运行的总任务数。workers同时运行的任务数量。如果设置为-1,则无限制。Ray 将一次运行workers个任务。(默认值:-1)depends又一个 RayPipelineExecutor 实例,该实例将成为此流水线的依赖项(当前流水线仅在依赖的流水线成功完成后才会开始执行)
其他选项
cpus_per_task每个任务分配的 CPU 数量(默认值:1)mem_per_cpu_gb每个 CPU 的内存大小,单位为 GB(默认值:2)ray_remote_kwargs传递给 ray.remote 装饰器的额外关键字参数
执行器示例
import ray
from datatrove.executor import RayPipelineExecutor
ray.init()
executor = RayPipelineExecutor(
pipeline=[
...
],
logging_dir="logs/",
tasks=500,
workers=100, # 不设置则所有任务同时运行
)
executor.run()
日志记录
对于具有 logging_dir mylogspath/exp1 的流水线,将创建如下文件夹结构:
查看文件夹结构
└── mylogspath/exp1
│── executor.json ⟵ 执行器选项和流水线步骤的 JSON 转储
│── launch_script.slurm ⟵ 用于启动此作业的 Slurm 配置文件(如果在 Slurm 上运行)
│── executor.pik ⟵ 用于启动此作业的 Slurm 配置文件(如果在 Slurm 上运行)
│── ranks_to_run.json ⟵ 正在运行的任务列表
│── logs/
│ └──[task_00000.log, task_00001.log, task_00002.log, ...] ⟵ 每个任务的单独日志文件
│── completions/
│ └──[00004, 00007, 00204, ...] ⟵ 标记任务已完成的空文件。用于重新启动或恢复作业时使用(只会运行未完成的任务)
│── stats/
│ └──[00000.json, 00001.json, 00002.json, ...] ⟵ 每个任务的单独统计信息(处理、过滤、移除等样本数量)
└── stats.json ⟵ 来自所有任务的全局统计信息
颜色化
日志消息支持颜色化。默认情况下,控制台消息的颜色化会自动检测,而日志文件(logs/task_XXXXX.log)中的颜色化则会被禁用。 要显式启用或禁用颜色化,您可以设置以下环境变量:
DATATROVE_COLORIZE_LOGS设置为"1"以在控制台日志消息中添加 ANSI 颜色,设置为"0"则禁用颜色化。DATATROVE_COLORIZE_LOG_FILES设置为"1"以在保存到 logs/task_XXXXX.log 的日志消息中添加 ANSI 颜色。
DataFolder / 路径
Datatrove 通过 fsspec 支持各种输入/输出源。
有几种方式可以为 Datatrove 块提供路径(用于 input_folder、logging_dir、data_folder 等参数):
str:最简单的方式是传递一个字符串。例如:/home/user/mydir、s3://mybucket/myinputdata、hf://datasets/allenai/c4/en/(str, fsspec 文件系统实例):一个字符串路径和一个完全初始化的文件系统对象。例如:("s3://mybucket/myinputdata", S3FileSystem(client_kwargs={"endpoint_url": endpoint_uri}))(str, dict):一个字符串路径和一个包含初始化文件系统选项的字典。例如(与上一行等效):("s3://mybucket/myinputdata", {"client_kwargs": {"endpoint_url": endpoint_uri}})DataFolder:您可以直接初始化一个 DataFolder 对象,并将其作为参数传递。
这些参数组合会在后台由 get_datafolder 解析。
实用指南
读取数据
通常,管道会从一个 Reader 块开始。大多数 reader 都会接受一个 data_folder 参数——指向包含待读取数据的文件夹路径。
这些文件会被分配到各个任务中。如果你有 N 个任务,那么排名为 i(从 0 开始)的任务将处理文件 i, i+N, i+2N, i+3N,...。
在内部,每个 reader 会读取数据并将其转换为字典,然后再创建一个 Document 对象。
一些大多数 reader 都通用的选项:
text_key:用于指定每个样本中文本内容的字典键。默认值为text。id_key:用于指定每个样本的 ID 的字典键。默认值为id。default_metadata:一个字典,用于添加任何你希望的默认元数据值(例如来源等)。recursive:是否递归地在data_folder的子目录中查找文件。glob_pattern:使用此字段来匹配特定的文件。例如,glob_pattern="*/warc/*.warc.gz"将匹配data_folder每个子目录的warc/文件夹下所有.warc.gz文件扩展名的文件。adapter:此函数接收 reader 返回的原始字典,并返回一个包含Document字段名称的字典。如果你需要,可以覆盖这个函数(_default_adapter)。limit:只读取一定数量的样本。这对于测试和调试非常有用。
合成数据生成
通过运行 pip install datatrove[inference] 安装推理相关的额外依赖,以引入轻量级 HTTP 客户端、检查点相关依赖以及异步 SQLite 缓存。
我们支持 vLLM、SGLang、与 OpenAI 兼容的 HTTPS 端点,以及通过 InferenceRunner 块提供的本地 dummy 服务器。每个 datatrove 任务都可以启动自己的服务器副本(适用于 vllm、sglang 或 dummy),也可以直接与外部端点通信,同时通过异步批处理保持较高的 GPU 利用率。
自定义 rollout 流程
核心抽象是一个 rollout 函数——一个普通的异步可调用对象,它接收一个 Document、一个 generate(payload) 回调函数,以及来自 shared_context 的任何额外关键字参数。你可以在 rollout 中自由编排多个顺序或并行的 generate 调用。这使你能够完全控制如何构建提示以及如何组合生成结果。请参阅 inference_chunked.py,其中包含以下示例:
- 简单的单请求 rollout;
- 分块 rollout,将长文档拆分后再将生成结果拼接起来;
- 使用进程池进行 CPU 密集型预处理,借助
shared_context实现; - 多节点分布式推理。
设置 rollouts_per_document 可以自动对每个样本运行多次相同的 rollout;运行器会将成功的输出收集到 document.metadata["rollout_results"] 中。
即用型生成脚本
对于大规模合成数据生成的即用型脚本(支持从 1B 到 1T 参数规模的模型、本地/SLURM 执行以及多节点设置),请参阅 generate_data.py。该脚本处理基于提示的生成,并提供可配置的系统提示和模板。
高级配置
shared_context 允许你将共享状态注入到每次 rollout 调用中。它可以接受:
- 一个字典(作为关键字参数传递);
- 一个返回字典的可调用对象(便于延迟创建资源);
- 上下文管理器或返回上下文管理器的可调用对象(非常适合池、GPU 分配器、临时目录等)。上下文管理器会在每个任务中正确地进入和退出一次。
可恢复的生成:
- 设置
checkpoints_local_dir并配合records_per_chunk,会将每个Document写入本地分块文件(请记得在输出文件名模板中包含${chunk_index}),然后通过配置好的写入器上传这些文件。失败的任务会自动从最后一个完成的分块继续执行。 - 当启用检查点功能时,基于 SQLite 的
RequestCache会通过负载哈希值来去重单个 rollout(需要xxhash和aiosqlite),这样在重试过程中就不会重复发送已完成的生成内容。 - 在
InferenceRunner上设置skip_bad_requests=True,可以跳过提供商端的BadRequestError(例如上下文/窗口溢出),并让其余文档继续运行。
通过调整 max_concurrent_generations 来优化批处理,当预处理或后处理工作量较大时,可以提高 max_concurrent_documents 的值,以便在请求发送期间有更多的 rollout 协程可以构建负载。
最小化端到端示例
from datatrove.data import Document
from datatrove.executor.local import LocalPipelineExecutor
from datatrove.pipeline.inference.run_inference import InferenceConfig, InferenceRunner
from datatrove.pipeline.writers import JsonlWriter
async def simple_rollout(doc: Document, generate):
payload = {"messages": [{"role": "user", "content": [{"type": "text", "text": doc.text}]}], "max_tokens": 2048}
return await generate(payload)
documents = [Document(text="东京的天气怎么样?", id=str(i)) for i in range(1005)]
config = InferenceConfig(server_type="vllm", model_name_or_path="google/gemma-3-27b-it", rollouts_per_document=1, max_concurrent_generations=500)
LocalPipelineExecutor(
pipeline=[
documents,
InferenceRunner(
rollout_fn=simple_rollout,
config=config,
skip_bad_requests=True,
records_per_chunk=500,
checkpoints_local_dir="/fsx/.../translate-checkpoints",
output_writer=JsonlWriter("s3://.../final_output_data", output_filename="${rank}_chunk_${chunk_index}.jsonl"),
),
],
logging_dir="/fsx/.../inference_logs",
tasks=1,
).run()
扩展版的 inference_chunked.py 脚本展示了单次和多次 rollout 流程、可恢复的检查点机制,以及在多个 rollout 之间共享进程池的方法。
进度监控
对于长时间运行的推理任务,你可以使用 InferenceProgressMonitor 定期更新 HuggingFace 数据集卡片上的进度条和预计完成时间。推理完成后,InferenceDatasetCardGenerator 会生成包含统计数据的最终数据集卡片。
from datatrove.pipeline.inference import InferenceDatasetCardParams, InferenceProgressMonitor, InferenceDatasetCardGenerator
params = InferenceDatasetCardParams(
output_repo_id="your-username/output-dataset",
input_dataset_name="simplescaling/s1K-1.1",
input_dataset_split="train",
model_name="Qwen/Qwen3-0.6B",
# ... 其他参数
)
# 监控管道(与 SLURM 上的推理并行运行)
monitor_pipeline = [InferenceProgressMonitor(params=params, update_interval=3600)]
# 最终卡片生成(在推理完成后运行)
datacard_pipeline = [InferenceDatasetCardGenerator(params=params)]
完整示例及 Slurm 集成请参阅 progress_monitoring.py。
基准测试
要测量不同模型和配置(TP、PP、推测解码)下的 vLLM 吞吐量,请使用 基准测试工具。该基准测试套件提供:
launch_experiments.py:从 YAML 配置文件启动扫描实验,并自动提交 Slurm 作业analyze_results.py:解析服务器日志并生成包含每 GPU 每秒处理的 token 数、处理 10 亿 token 所需的 GPU 天数等指标的 CSV 摘要
提取文本
您可以使用 extractors 从原始 HTML 中提取文本内容。Datatrove 中最常用的提取器是 Trafilatura,它基于 trafilatura 库。
数据过滤
Filters 是任何数据处理管道中最重要的模块之一。Datatrove 的过滤模块接收一个 Document 对象,并返回一个布尔值(True 表示保留文档,False 表示移除文档)。被移除的样本不会传递到下一个管道阶段。您还可以通过将 Writer 传递给 exclusion_writer 参数,将这些被移除的样本保存到磁盘上。
保存数据
完成数据处理后,您可能希望将其保存到某个位置。为此,可以使用 writer。
写入器需要指定 output_folder(数据应保存的路径)。您可以选择使用的压缩方式(默认为 gzip),以及每个文件的保存名称。
对于 output_filename,会应用以下模板,其中替换参数如下:
${rank}替换为当前任务的排名。请注意,如果未使用此标记,不同任务可能会尝试写入同一位置${id}替换为样本 ID- 元数据:任何其他
${tag}将被替换为对应的document.metadata['tag']值
以下示例根据样本的 lang 元数据字段按语言分离样本:
JsonlWriter(
f"{MAIN_OUTPUT_PATH}/non_english/",
output_filename="${language}/" + DUMP + "/${rank}.jsonl.gz", # 文件夹结构:语言/转储/文件
)
数据去重
有关数据去重的示例,请参阅 minhash_deduplication.py、sentence_deduplication.py 和 exact_substrings.py。
概要统计
要对您的数据进行概要统计,可以使用 Stats 模块。这些模块提供了一种简单的方法,可以在分布式环境中收集数据集的特征信息。过程分为两步:
- 对每个分片中的文档进行遍历,将统计数据归入以下分组:
summary(所有文档计入“summary”键)、fqdn(完全合格域名分组)、suffix(URL 路径末尾部分分组)或histogram(基于数值的分组)。 - 将不同分片的统计数据合并到一个文件中。 更多详情请参阅 summary_stats.py。
每个统计结果都会保存在一个单独的文件中,文件结构如下:output_folder/{fqdn,suffix,summary,histogram}/{stat_name}/metric.json
每个此类文件都是一个 MetricStatsDict 对象,您可以轻松加载它,例如:
from datatrove.pipeline.stats.summary_stats import MetricStatsDict
import json
stats = MetricStatsDict.from_dict(json.load(open("fqdn/length/metric.json")))
# 例如,计算 nytimes.com 文档的总长度
stats["nytimes.com"].total
# 或者计算 cnn.com 文档的平均长度
stats["cnn.com"].mean
可用的统计包括:
contamination_stats.py:word_contamination_{words[0]}:文档中特定单词出现的频率。doc_stats.py:length:文档长度,white_space_ratio:空白字符比例,non_alpha_digit_ratio:非字母和非数字字符比例,digit_ratio:数字比例,uppercase_ratio:大写字母比例,elipsis_ratio:省略号比例,punctuation_ratio:标点符号比例。lang_stats.py:fasttext_{language}:文档使用指定语言书写的得分。得分由 FastText 模型计算得出。line_stats.py:n_lines:每篇文档的行数,avg_line_length:每篇文档的平均行长,long_line_ratio_chars_{chars}:超过 k 个字符的行所占比例,short_line_ratio_chars_{chars}:少于 k 个字符的行所占比例,bullet_point_lines_ratio:以项目符号开头的行所占比例,line_duplicates:重复行所占比例,line_char_duplicates:重复行中重复字符占总字符的比例。paragraph_stats.py:n_paragraphs:段落数,avg_paragraph_length:平均段落长度,short_paragraph_ratio_{chars}:短段落所占比例(小于{chars}个字符),long_paragraph_ratio_{chars}:长段落所占比例(大于{chars}个字符)。perplexity_stats.py:ccnet_perplexity_{model_dataset}_{language}:使用 CCNet 模型,在{dataset}上以{language}书写的文档的困惑度。sentence_stats.py:n_sentences:句子数量,avg_sentence_length:平均句子长度,short_sentence_ratio_{chars}:短句子所占比例(小于{chars}个字符),long_sentence_ratio_{chars}:长句子所占比例(大于{chars}个字符)。token_stats.py:token_count:文档中的标记总数。word_stats.py:n_words:文档中的单词数量,avg_word_length:文档中单词的平均长度,avg_words_per_line:文档中平均每行的单词数,short_word_ratio_{chars}:短于{chars}个字符的单词所占比例,stop_word_ratio:停用词所占比例,long_word_ratio_{chars}:长于{chars}个字符的单词所占比例,type_token_ratio:唯一单词数与总标记数之比,capitalized_word_ratio:首字母大写的单词所占比例,uppercase_word_ratio:全部大写的单词所占比例。
自定义块
简单数据
您可以直接将一个 Document 对象的可迭代对象作为管道块传递,如下所示:
from datatrove.data import Document
from datatrove.pipeline.filters import SamplerFilter
from datatrove.pipeline.writers import JsonlWriter
pipeline = [
[
Document(text="some data", id="0"),
Document(text="some more data", id="1"),
Document(text="even more data", id="2"),
],
SamplerFilter(rate=0.5),
JsonlWriter(
output_folder="/my/output/path"
)
]
请注意,这个可迭代对象不会被分片(如果您启动超过 1 个任务,它们都会获得完整的可迭代对象)。这通常适用于小型工作负载或测试。
自定义函数
对于简单的处理,您可以直接传入一个具有以下签名的自定义函数:
from datatrove.data import DocumentsPipeline
def uppercase_everything(data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
"""
`data` 是一个 Document 的生成器。您也必须返回一个 Document 的生成器(使用 yield)。
您可以选择性地使用 `rank` 和 `world_size` 进行分片。
"""
for document in data:
document.text = document.text.upper()
yield document
pipeline = [
...,
uppercase_everything,
...
]
[!提示] 由于导入的原因,您可能会遇到一些序列化问题。如果发生这种情况,只需将所需的导入语句移到函数体内即可。
自定义块
您还可以定义一个完整的块,继承自 PipelineStep(位于 src/datatrove/pipeline/base.py)或其子类:
from datatrove.pipeline.base import PipelineStep
from datatrove.data import DocumentsPipeline
from datatrove.io import DataFolderLike, get_datafolder
class UppercaserBlock(PipelineStep):
def __init__(self, some_folder: DataFolderLike, some_param: int = 5):
super().__init__()
# 您可以在此处接收并保存所需的任何参数
self.some_param = some_param
# 使用 get_datafolder() 来加载数据文件夹
self.some_folder = get_datafolder(some_folder)
def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
# 您也可以从 `some_folder` 加载数据:
for filepath in self.some_folder.get_shard(rank, world_size): # 它还接受 glob 模式等
with self.some_folder.open(filepath, "rt") as f:
# 执行某些操作
...
yield doc
#
# 或者处理来自先前块的数据 (`data`)
#
for doc in data:
with self.track_time():
# 您可以将主要处理代码包裹在 `track_time` 中,以了解每篇文档的处理时间
nr_uppercase_letters = sum(map(lambda c: c.isupper(), doc.text))
# 您还可以使用 stat_update 跟踪每篇文档的统计信息
self.stat_update("og_upper_letters", value=nr_uppercase_letters)
doc.text = doc.text.upper()
# 确保 yield 语句位于 track_time 块之外,否则会影响时间计算
yield doc
#
# 或者将数据保存到磁盘
#
with self.some_folder.open("myoutput", "wt") as f:
for doc in data:
f.write(doc...)
pipeline = [
...,
UppercaserBlock("somepath"),
...
]
您也可以继承自 BaseExtractor(位于 src/datatrove/pipeline/extractors/base.py)、BaseFilter(位于 src/datatrove/pipeline/filters/base_filter.py)、BaseReader/BaseDiskReader(位于 src/datatrove/pipeline/readers/base.py),或 DiskWriter(位于 src/datatrove/pipeline/writers/disk_base.py)。
贡献
git clone git@github.com:huggingface/datatrove.git && cd datatrove
pip install -e ".[dev]"
安装 pre-commit 代码风格钩子:
pre-commit install
运行代码风格检查:
# 快速本地循环(仅检查已更改的 Python 文件)
make quality
make style
# 整个仓库的检查(与 CI 相同的范围)
make quality-full
make style-full
运行测试:
pytest -sv ./tests/
引用
@misc{penedo2024datatrove,
author = {Penedo, Guilherme and Kydlíček, Hynek and Cappelli, Alessandro and Sasko, Mario and Wolf, Thomas},
title = {DataTrove: large scale data processing},
year = {2024},
publisher = {GitHub},
journal = {GitHub repository},
url = {https://github.com/huggingface/datatrove}
}
版本历史
v0.9.02026/03/04v0.8.02026/01/19v0.7.02026/01/19v0.6.02025/08/07v0.5.02025/05/01v0.4.02024/12/06v0.3.02024/08/28v0.2.02024/04/22v0.0.12024/02/07常见问题
相似工具推荐
everything-claude-code
everything-claude-code 是一套专为 AI 编程助手(如 Claude Code、Codex、Cursor 等)打造的高性能优化系统。它不仅仅是一组配置文件,而是一个经过长期实战打磨的完整框架,旨在解决 AI 代理在实际开发中面临的效率低下、记忆丢失、安全隐患及缺乏持续学习能力等核心痛点。 通过引入技能模块化、直觉增强、记忆持久化机制以及内置的安全扫描功能,everything-claude-code 能显著提升 AI 在复杂任务中的表现,帮助开发者构建更稳定、更智能的生产级 AI 代理。其独特的“研究优先”开发理念和针对 Token 消耗的优化策略,使得模型响应更快、成本更低,同时有效防御潜在的攻击向量。 这套工具特别适合软件开发者、AI 研究人员以及希望深度定制 AI 工作流的技术团队使用。无论您是在构建大型代码库,还是需要 AI 协助进行安全审计与自动化测试,everything-claude-code 都能提供强大的底层支持。作为一个曾荣获 Anthropic 黑客大奖的开源项目,它融合了多语言支持与丰富的实战钩子(hooks),让 AI 真正成长为懂上
NextChat
NextChat 是一款轻量且极速的 AI 助手,旨在为用户提供流畅、跨平台的大模型交互体验。它完美解决了用户在多设备间切换时难以保持对话连续性,以及面对众多 AI 模型不知如何统一管理的痛点。无论是日常办公、学习辅助还是创意激发,NextChat 都能让用户随时随地通过网页、iOS、Android、Windows、MacOS 或 Linux 端无缝接入智能服务。 这款工具非常适合普通用户、学生、职场人士以及需要私有化部署的企业团队使用。对于开发者而言,它也提供了便捷的自托管方案,支持一键部署到 Vercel 或 Zeabur 等平台。 NextChat 的核心亮点在于其广泛的模型兼容性,原生支持 Claude、DeepSeek、GPT-4 及 Gemini Pro 等主流大模型,让用户在一个界面即可自由切换不同 AI 能力。此外,它还率先支持 MCP(Model Context Protocol)协议,增强了上下文处理能力。针对企业用户,NextChat 提供专业版解决方案,具备品牌定制、细粒度权限控制、内部知识库整合及安全审计等功能,满足公司对数据隐私和个性化管理的高标准要求。
ML-For-Beginners
ML-For-Beginners 是由微软推出的一套系统化机器学习入门课程,旨在帮助零基础用户轻松掌握经典机器学习知识。这套课程将学习路径规划为 12 周,包含 26 节精炼课程和 52 道配套测验,内容涵盖从基础概念到实际应用的完整流程,有效解决了初学者面对庞大知识体系时无从下手、缺乏结构化指导的痛点。 无论是希望转型的开发者、需要补充算法背景的研究人员,还是对人工智能充满好奇的普通爱好者,都能从中受益。课程不仅提供了清晰的理论讲解,还强调动手实践,让用户在循序渐进中建立扎实的技能基础。其独特的亮点在于强大的多语言支持,通过自动化机制提供了包括简体中文在内的 50 多种语言版本,极大地降低了全球不同背景用户的学习门槛。此外,项目采用开源协作模式,社区活跃且内容持续更新,确保学习者能获取前沿且准确的技术资讯。如果你正寻找一条清晰、友好且专业的机器学习入门之路,ML-For-Beginners 将是理想的起点。
ragflow
RAGFlow 是一款领先的开源检索增强生成(RAG)引擎,旨在为大语言模型构建更精准、可靠的上下文层。它巧妙地将前沿的 RAG 技术与智能体(Agent)能力相结合,不仅支持从各类文档中高效提取知识,还能让模型基于这些知识进行逻辑推理和任务执行。 在大模型应用中,幻觉问题和知识滞后是常见痛点。RAGFlow 通过深度解析复杂文档结构(如表格、图表及混合排版),显著提升了信息检索的准确度,从而有效减少模型“胡编乱造”的现象,确保回答既有据可依又具备时效性。其内置的智能体机制更进一步,使系统不仅能回答问题,还能自主规划步骤解决复杂问题。 这款工具特别适合开发者、企业技术团队以及 AI 研究人员使用。无论是希望快速搭建私有知识库问答系统,还是致力于探索大模型在垂直领域落地的创新者,都能从中受益。RAGFlow 提供了可视化的工作流编排界面和灵活的 API 接口,既降低了非算法背景用户的上手门槛,也满足了专业开发者对系统深度定制的需求。作为基于 Apache 2.0 协议开源的项目,它正成为连接通用大模型与行业专有知识之间的重要桥梁。
PaddleOCR
PaddleOCR 是一款基于百度飞桨框架开发的高性能开源光学字符识别工具包。它的核心能力是将图片、PDF 等文档中的文字提取出来,转换成计算机可读取的结构化数据,让机器真正“看懂”图文内容。 面对海量纸质或电子文档,PaddleOCR 解决了人工录入效率低、数字化成本高的问题。尤其在人工智能领域,它扮演着连接图像与大型语言模型(LLM)的桥梁角色,能将视觉信息直接转化为文本输入,助力智能问答、文档分析等应用场景落地。 PaddleOCR 适合开发者、算法研究人员以及有文档自动化需求的普通用户。其技术优势十分明显:不仅支持全球 100 多种语言的识别,还能在 Windows、Linux、macOS 等多个系统上运行,并灵活适配 CPU、GPU、NPU 等各类硬件。作为一个轻量级且社区活跃的开源项目,PaddleOCR 既能满足快速集成的需求,也能支撑前沿的视觉语言研究,是处理文字识别任务的理想选择。
OpenHands
OpenHands 是一个专注于 AI 驱动开发的开源平台,旨在让智能体(Agent)像人类开发者一样理解、编写和调试代码。它解决了传统编程中重复性劳动多、环境配置复杂以及人机协作效率低等痛点,通过自动化流程显著提升开发速度。 无论是希望提升编码效率的软件工程师、探索智能体技术的研究人员,还是需要快速原型验证的技术团队,都能从中受益。OpenHands 提供了灵活多样的使用方式:既可以通过命令行(CLI)或本地图形界面在个人电脑上轻松上手,体验类似 Devin 的流畅交互;也能利用其强大的 Python SDK 自定义智能体逻辑,甚至在云端大规模部署上千个智能体并行工作。 其核心技术亮点在于模块化的软件智能体 SDK,这不仅构成了平台的引擎,还支持高度可组合的开发模式。此外,OpenHands 在 SWE-bench 基准测试中取得了 77.6% 的优异成绩,证明了其解决真实世界软件工程问题的能力。平台还具备完善的企业级功能,支持与 Slack、Jira 等工具集成,并提供细粒度的权限管理,适合从个人开发者到大型企业的各类用户场景。