catalyst

GitHub
3.4k 401 非常简单 1 次阅读 2天前Apache-2.0语言模型其他图像开发框架
AI 解读 由 AI 自动生成,仅供参考

Catalyst 是一个基于 PyTorch 构建的深度学习研发框架,旨在加速人工智能领域的探索与创新过程。它的核心使命是帮助开发者打破“重复编写训练循环”的枯燥循环,让团队能将宝贵精力集中在创造新模型和解决核心问题上,而非耗费在基础代码的堆砌中。

针对深度学习研究中常见的实验复现难、代码复用率低以及迭代速度慢等痛点,Catalyst 提供了一套高度结构化且灵活的解决方案。它通过标准化的流程设计,确保了实验结果的可复现性,并极大提升了快速验证想法的效率。无论是调整超参数还是尝试新的网络架构,用户都能以更少的代码量完成更复杂的实验配置。

这款工具特别适合 AI 研究人员、算法工程师以及需要频繁进行模型迭代的开发团队使用。其独特的技术亮点在于对 PyTorch 生态的深度集成与扩展,支持从简单的原型验证到大规模分布式训练的无缝切换。同时,Catalyst 拥有完善的文档体系和活跃的社区支持(包括 Telegram 和 Slack 频道),兼容 Linux、macOS 及 WSL 等多种操作系统,并提供了 Docker 镜像以简化环境部署。如果你希望在不牺牲灵活性的前提下提升研发效率,Catalyst 将是一个值得信赖的选择。

使用场景

某计算机视觉算法团队正在研发一套基于 PyTorch 的医疗影像病灶分割系统,需要在短时间内验证多种网络架构与训练策略的有效性。

没有 catalyst 时

  • 重复造轮子:每次尝试新模型(如从 U-Net 切换到 DeepLab),工程师都需手动重写繁琐的数据加载、训练循环及验证逻辑,耗费大量时间在样板代码上。
  • 实验难以复现:由于缺乏统一的实验管理标准,不同成员编写的训练脚本风格迥异,导致超参数调整记录混乱,难以精确回溯和对比历史实验结果。
  • 扩展成本高昂:想要引入新的评估指标(如 Dice 系数)或自定义回调函数时,往往需要深入修改核心训练流程,极易引入 Bug 且维护困难。
  • 协作效率低下:团队成员间无法直接复用彼此的代码模块,每个人都在编写功能相似但实现各异的“一次性”训练脚本。

使用 catalyst 后

  • 专注核心创新:catalyst 提供了标准化的 Runner 和 Trainer 抽象,团队只需定义模型和数据集,即可自动获得完整的训练流程,将精力完全集中在网络结构优化上。
  • 实验可追溯性强:借助内置的实验日志管理和配置系统,所有超参数、指标变化及模型版本均被自动记录,轻松实现实验结果的精准复现与横向对比。
  • 灵活插拔扩展:通过简单的配置即可集成自定义指标或回调机制,无需触碰底层训练循环,像搭积木一样快速构建复杂的实验流水线。
  • 代码高度复用:统一的框架规范使得团队成员可以无缝共享和组合各自的模块,显著提升了整体研发迭代速度。

catalyst 通过标准化深度学习研发流程,让团队从繁琐的工程实现中解放出来,真正实现了“只写新逻辑,不写训练循环”的高效研发模式。

运行环境要求

操作系统
  • Linux
  • macOS
  • Windows
  • WSL
GPU

未说明 (基于 PyTorch,支持 CPU/GPU)

内存

未说明

依赖
notes该工具是 PyTorch 的高级封装框架。基础安装仅需 catalyst,也可按需安装特定扩展(如 catalyst[ml] 或 catalyst[cv])。已在 Ubuntu 16.04/18.04/20.04, macOS 10.15, Windows 10 及 WSL 上通过测试。
python3.7+
torch>=1.4
catalyst hero image

快速开始

Catalyst logo

加速深度学习研发

CodeFactor Pipi版本 文档 Docker PyPI热度

Twitter Telegram Slack Github贡献者

codestyle docs catalyst integrations

python python python

os os os

Catalyst 是一个用于深度学习研究与开发的 PyTorch 框架。它专注于可复现性、快速实验以及代码库的重用,使您能够专注于创新,而不是重复编写训练循环。
打破循环——使用 Catalyst!

Catalyst 在 2021 年 PyTorch 生态系统日上的展示

Catalyst 海报

Catalyst 在 2021 年 PyTorch 开发者日上的展示

Catalyst 海报


快速入门

pip install -U catalyst
import os
from torch import nn, optim
from torch.utils.data import DataLoader
from catalyst import dl, utils
from catalyst.contrib.datasets import MNIST

model = nn.Sequential(nn.Flatten(), nn.Linear(28 * 28, 10))
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.02)
loaders = {
    "train": DataLoader(MNIST(os.getcwd(), train=True), batch_size=32),
    "valid": DataLoader(MNIST(os.getcwd(), train=False), batch_size=32),
}

runner = dl.SupervisedRunner(
    input_key="features", output_key="logits", target_key="targets", loss_key="loss"
)

# 模型训练
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    loaders=loaders,
    num_epochs=1,
    callbacks=[
        dl.AccuracyCallback(input_key="logits", target_key="targets", topk=(1, 3, 5)),
        dl.PrecisionRecallF1SupportCallback(input_key="logits", target_key="targets"),
    ],
    logdir="./logs",
    valid_loader="valid",
    valid_metric="loss",
    minimize_valid_metric=True,
    verbose=True,
)

# 模型评估
metrics = runner.evaluate_loader(
    loader=loaders["valid"],
    callbacks=[dl.AccuracyCallback(input_key="logits", target_key="targets", topk=(1, 3, 5))],
)

# 模型推理
for prediction in runner.predict_loader(loader=loaders["valid"]):
    assert prediction["logits"].detach().cpu().numpy().shape[-1] == 10

# 模型后处理
model = runner.model.cpu()
batch = next(iter(loaders["valid"]))[0]
utils.trace_model(model=model, batch=batch)
utils.quantize_model(model=model)
utils.prune_model(model=model, pruning_fn="l1_unstructured", amount=0.8)
utils.onnx_export(model=model, batch=batch, file="./logs/mnist.onnx", verbose=True)

分步指南

  1. Catalyst — 一个用于加速深度学习研发的 PyTorch 框架 的介绍开始。
  2. 尝试 笔记本教程 或查看 最小示例 进行首次深入探索。
  3. 阅读包含用例和指南的 博客文章
  4. 参加我们的 "使用 Catalyst 进行深度学习" 课程 学习机器学习知识。
  5. 最后,如果您想与团队及贡献者交流,可以 加入我们的 Slack

目录

概述

Catalyst 帮助您仅用几行代码即可实现紧凑但功能齐全的深度学习流水线。您无需编写大量样板代码,即可获得包含指标、早停、模型检查点等功能的训练循环。

安装

通用安装:

pip install -U catalyst
专用版本,可能需要额外依赖

pip install catalyst[ml]         # 安装基于机器学习的 Catalyst
pip install catalyst[cv]         # 安装基于计算机视觉的 Catalyst
# 安装主分支版本
pip install git+https://github.com/catalyst-team/catalyst@master --upgrade
# 所有可用扩展在此列出:
# https://github.com/catalyst-team/catalyst/blob/master/setup.py

Catalyst 兼容:Python 3.7+、PyTorch 1.4+。
已在 Ubuntu 16.04/18.04/20.04、macOS 10.15、Windows 10 以及 Windows Subsystem for Linux 上进行过测试。

文档

最小示例

CustomRunner – PyTorch 循环分解

import os
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import DataLoader
from catalyst import dl, metrics
from catalyst.contrib.datasets import MNIST

model = nn.Sequential(nn.Flatten(), nn.Linear(28 * 28, 10))
optimizer = optim.Adam(model.parameters(), lr=0.02)

train_data = MNIST(os.getcwd(), train=True)
valid_data = MNIST(os.getcwd(), train=False)
loaders = {
    "train": DataLoader(train_data, batch_size=32),
    "valid": DataLoader(valid_data, batch_size=32),
}

class CustomRunner(dl.Runner):
    def predict_batch(self, batch):
        # 模型推理步骤
        return self.model(batch[0].to(self.engine.device))

    def on_loader_start(self, runner):
        super().on_loader_start(runner)
        self.meters = {
            key: metrics.AdditiveMetric(compute_on_call=False)
            for key in ["loss", "accuracy01", "accuracy03"]
        }

    def handle_batch(self, batch):
        # 模型训练/验证步骤
        # 解包批次数据
        x, y = batch
        # 执行模型前向传播
        logits = self.model(x)
        # 计算损失
        loss = F.cross_entropy(logits, y)
        # 计算指标
        accuracy01、accuracy03 = metrics.accuracy(logits, y, topk=(1, 3))
        # 记录指标
        self.batch_metrics.update(
            {"loss": loss, "accuracy01": accuracy01, "accuracy03": accuracy03}
        )
        for key in ["loss", "accuracy01", "accuracy03"]:
            self.meters[key].update(self.batch_metrics[key].item(), self.batch_size)
        # 执行模型反向传播
        if self.is_train_loader:
            self.engine.backward(loss)
            self.optimizer.step()
            self.optimizer.zero_grad()

    def on_loader_end(self, runner):
        for key in ["loss", "accuracy01", "accuracy03"]:
            self.loader_metrics[key] = self.meters[key].compute()[0]
        super().on_loader_end(runner)

runner = CustomRunner()

# 模型训练
runner.train(
    model=model,
    optimizer=optimizer,
    loaders=loaders,
    logdir="./logs",
    num_epochs=5,
    verbose=True,
    valid_loader="valid",
    valid_metric="loss",
    minimize_valid_metric=True,
)
# 模型推理
for logits in runner.predict_loader(loader=loaders["valid"]):
    assert logits.detach().cpu().numpy().shape[-1] == 10

机器学习 - 线性回归

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# 数据
num_samples, num_features = int(1e4), int(1e1)
X, y = torch.rand(num_samples, num_features), torch.rand(num_samples)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# 模型、损失函数、优化器、学习率调度器
model = torch.nn.Linear(num_features, 1)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

# 模型训练
runner = dl.SupervisedRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    valid_loader="valid",
    valid_metric="loss",
    minimize_valid_metric=True,
    num_epochs=8,
    verbose=True,
)

机器学习 - 多分类

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# 样本数据
num_samples, num_features, num_classes = int(1e4), int(1e1), 4
X = torch.rand(num_samples, num_features)
y = (torch.rand(num_samples,) * num_classes).to(torch.int64)

# PyTorch 数据加载器
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# 模型、损失函数、优化器、学习率调度器
model = torch.nn.Linear(num_features, num_classes)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# 模型训练
runner = dl.SupervisedRunner(
    input_key="features", output_key="logits", target_key="targets", loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=3,
    valid_loader="valid",
    valid_metric="accuracy03",
    minimize_valid_metric=False,
    verbose=True,
    callbacks=[
        dl.AccuracyCallback(input_key="logits", target_key="targets", num_classes=num_classes),
        # 如需额外指标,可取消注释:
        # dl.PrecisionRecallF1SupportCallback(
        #     input_key="logits", target_key="targets", num_classes=num_classes
        # ),
        # dl.AUCCallback(input_key="logits", target_key="targets"),
        # catalyst[ml] 需要 ``pip install catalyst[ml]``
        # dl.ConfusionMatrixCallback(
        #     input_key="logits", target_key="targets", num_classes=num_classes
        # ),
    ],
)

机器学习 - 多标签分类

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# 样本数据
num_samples, num_features, num_classes = int(1e4), int(1e1), 4
X = torch.rand(num_samples, num_features)
y = (torch.rand(num_samples, num_classes) > 0.5).to(torch.float32)

# PyTorch 数据加载器
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# 模型、损失函数、优化器、学习率调度器
model = torch.nn.Linear(num_features, num_classes)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# 模型训练
runner = dl.SupervisedRunner(
    input_key="features", output_key="logits", target_key="targets", loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=3,
    valid_loader="valid",
    valid_metric="accuracy01",
    minimize_valid_metric=False,
    verbose=True,
    callbacks=[
        dl.BatchTransformCallback(
            transform=torch.sigmoid,
            scope="on_batch_end",
            input_key="logits",
            output_key="scores"
        ),
        dl.AUCCallback(input_key="scores", target_key="targets"),
        // 如需额外指标,可取消注释:
        // dl.MultilabelAccuracyCallback(input_key="scores", target_key="targets", threshold=0.5),
        // dl.MultilabelPrecisionRecallF1SupportCallback(
        //     input_key="scores", target_key="targets", threshold=0.5
        // ),
    ]
)

机器学习 - 多头分类

import torch
from torch import nn, optim
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# 样本数据
num_samples, num_features, num_classes1, num_classes2 = int(1e4), int(1e1), 4, 10
X = torch.rand(num_samples, num_features)
y1 = (torch.rand(num_samples,) * num_classes1).to(torch.int64)
y2 = (torch.rand(num_samples,) * num_classes2).to(torch.int64)

# PyTorch 数据加载器
dataset = TensorDataset(X, y1, y2)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

class CustomModule(nn.Module):
    def __init__(self, in_features: int, out_features1: int, out_features2: int):
        super().__init__()
        self.shared = nn.Linear(in_features, 128)
        self.head1 = nn.Linear(128, out_features1)
        self.head2 = nn.Linear(128, out_features2)

    def forward(self, x):
        x = self.shared(x)
        y1 = self.head1(x)
        y2 = self.head2(x)
        return y1, y2

# 模型、损失函数、优化器、学习率调度器
model = CustomModule(num_features, num_classes1, num_classes2)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())
scheduler = optim.lr_scheduler.MultiStepLR(optimizer, [2])

class CustomRunner(dl.Runner):
    def handle_batch(self, batch):
        x, y1, y2 = batch
        y1_hat, y2_hat = self.model(x)
        self.batch = {
            "features": x,
            "logits1": y1_hat,
            "logits2": y2_hat,
            "targets1": y1,
            "targets2": y2,
        }

# 模型训练
runner = CustomRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    num_epochs=3,
    verbose=True,
    callbacks=[
        dl.CriterionCallback(metric_key="loss1", input_key="logits1", target_key="targets1"),
        dl.CriterionCallback(metric_key="loss2", input_key="logits2", target_key="targets2"),
        dl.MetricAggregationCallback(metric_key="loss", metrics=["loss1", "loss2"], mode="mean"),
        dl.BackwardCallback(metric_key="loss"),
        dl.OptimizerCallback(metric_key="loss"),
        dl.SchedulerCallback(),
        dl.AccuracyCallback(
            input_key="logits1", target_key="targets1", num_classes=num_classes1, prefix="one_"
        ),
        dl.AccuracyCallback(
            input_key="logits2", target_key="targets2", num_classes=num_classes2, prefix="two_"
        ),
        # catalyst[ml] 需要 ``pip install catalyst[ml]``
        # dl.ConfusionMatrixCallback(
        #     input_key="logits1", target_key="targets1", num_classes=num_classes1, prefix="one_cm"
        # ),
        # dl.ConfusionMatrixCallback(
        #     input_key="logits2", target_key="targets2", num_classes=num_classes2, prefix="two_cm"
        # ),
        dl.CheckpointCallback(
            logdir="./logs/one",
            loader_key="valid", metric_key="one_accuracy01", minimize=False, topk=1
        ),
        dl.CheckpointCallback(
            logdir="./logs/two",
            loader_key="valid", metric_key="two_accuracy03", minimize=False, topk=3
        ),
    ],
    loggers={"console": dl.ConsoleLogger(), "tb": dl.TensorboardLogger("./logs/tb")},
)

ML – 推荐系统

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# 样本数据
num_users, num_features, num_items = int(1e4), int(1e1), 10
X = torch.rand(num_users, num_features)
y = (torch.rand(num_users, num_items) > 0.5).to(torch.float32)

# PyTorch 数据加载器
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# 模型、损失函数、优化器、学习率调度器
model = torch.nn.Linear(num_features, num_items)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# 模型训练
runner = dl.SupervisedRunner(
    input_key="features", output_key="logits", target_key="targets", loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    num_epochs=3,
    verbose=True,
    callbacks=[
        dl.BatchTransformCallback(
            transform=torch.sigmoid,
            scope="on_batch_end",
            input_key="logits",
            output_key="scores"
        ),
        dl.CriterionCallback(input_key="logits", target_key="targets", metric_key="loss"),
        # 如需额外指标,请取消注释:
        # dl.AUCCallback(input_key="scores", target_key="targets"),
        # dl.HitrateCallback(input_key="scores", target_key="targets", topk=(1, 3, 5)),
        # dl.MRRCallback(input_key="scores", target_key="targets", topk=(1, 3, 5)),
        # dl.MAPCallback(input_key="scores", target_key="targets", topk=(1, 3, 5)),
        # dl.NDCGCallback(input_key="scores", target_key="targets", topk=(1, 3, 5)),
        dl.BackwardCallback(metric_key="loss"),
        dl.OptimizerCallback(metric_key="loss"),
        dl.SchedulerCallback(),
        dl.CheckpointCallback(
            logdir="./logs", loader_key="valid", metric_key="loss", minimize=True
        ),
    ]
)

CV - MNIST 分类

import os
from torch import nn, optim
from torch.utils.data import DataLoader
from catalyst import dl
from catalyst.contrib.datasets import MNIST

model = nn.Sequential(nn.Flatten(), nn.Linear(28 * 28, 10))
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.02)

train_data = MNIST(os.getcwd(), train=True)
valid_data = MNIST(os.getcwd(), train=False)
loaders = {
    "train": DataLoader(train_data, batch_size=32),
    "valid": DataLoader(valid_data, batch_size=32),
}

runner = dl.SupervisedRunner()
# 模型训练
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    loaders=loaders,
    num_epochs=1,
    logdir="./logs",
    valid_loader="valid",
    valid_metric="loss",
    minimize_valid_metric=True,
    verbose=True,
# 如需额外指标,请取消注释:
#     callbacks=[
#         dl.AccuracyCallback(input_key="logits", target_key="targets", num_classes=10),
#         dl.PrecisionRecallF1SupportCallback(
#             input_key="logits", target_key="targets", num_classes=10
#         ),
#         dl.AUCCallback(input_key="logits", target_key="targets"),
#         // catalyst[ml] 需要 ``pip install catalyst[ml]``
#         dl.ConfusionMatrixCallback(
#             input_key="logits", target_key="targets", num_classes=num_classes
#         ),
#     ]
)

CV - MNIST 分割

import os
import torch
from torch import nn
from torch.utils.data import DataLoader
from catalyst import dl
from catalyst.contrib.datasets import MNIST
from catalyst.contrib.losses import IoULoss


model = nn.Sequential(
    nn.Conv2d(1, 1, 3, 1, 1), nn.ReLU(),
    nn.Conv2d(1, 1, 3, 1, 1), nn.Sigmoid(),
)
criterion = IoULoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.02)

train_data = MNIST(os.getcwd(), train=True)
valid_data = MNIST(os.getcwd(), train=False)
loaders = {
    "train": DataLoader(train_data, batch_size=32),
    "valid": DataLoader(valid_data, batch_size=32),
}

class CustomRunner(dl.SupervisedRunner):
    def handle_batch(self, batch):
        x = batch[self._input_key]
        x_noise = (x + torch.rand_like(x)).clamp_(0, 1)
        x_ = self.model(x_noise)
        self.batch = {self._input_key: x, self._output_key: x_, self._target_key: x}

runner = CustomRunner(
    input_key="features", output_key="scores", target_key="targets", loss_key="loss"
)

# 模型训练
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    loaders=loaders,
    num_epochs=1,
    callbacks=[
        dl.IOUCallback(input_key="scores", target_key="targets"),
        dl.DiceCallback(input_key="scores", target_key="targets"),
        dl.TrevskyCallback(input_key="scores", target_key="targets", alpha=0.2),
    ],
    logdir="./logdir",
    valid_loader="valid",
    valid_metric="loss",
    minimize_valid_metric=True,
    verbose=True,
)

CV - MNIST 度量学习

import os
from torch.optim import Adam
from torch.utils.data import DataLoader
from catalyst import dl
from catalyst.contrib.data import HardTripletsSampler
from catalyst.contrib.datasets import MnistMLDataset, MnistQGDataset
from catalyst.contrib.losses import TripletMarginLossWithSampler
from catalyst.contrib.models import MnistSimpleNet
from catalyst.data.sampler import BatchBalanceClassSampler


# 1. 训练和验证数据加载器
train_dataset = MnistMLDataset(root=os.getcwd())
sampler = BatchBalanceClassSampler(
    labels=train_dataset.get_labels(), num_classes=5, num_samples=10, num_batches=10
)
train_loader = DataLoader(dataset=train_dataset, batch_sampler=sampler)

valid_dataset = MnistQGDataset(root=os.getcwd(), gallery_fraq=0.2)
valid_loader = DataLoader(dataset=valid_dataset,batch_size=1024)

# 2. 模型和优化器
model = MnistSimpleNet(out_features=16)
optimizer = Adam(model.parameters(), lr=0.001)

# 3. 带三元组采样的损失函数
sampler_inbatch = HardTripletsSampler(norm_required=False)
criterion = TripletMarginLossWithSampler(margin=0.5, sampler_inbatch=sampler_inbatch)

# 4. 使用 Catalyst Runner 进行训练
class CustomRunner(dl.SupervisedRunner):
    def handle_batch(self, batch) -> None:
        if self.is_train_loader:
            images, targets = batch["features"].float(), batch["targets"].long()
            features = self.model(images)
            self.batch = {"embeddings": features, "targets": targets,}
        else:
            images, targets, is_query = \
                batch["features"].float(), batch["targets"].long(), batch["is_query"].bool()
            features = self.model(images)
            self.batch = {"embeddings": features, "targets": targets, "is_query": is_query}

callbacks = [
    dl.ControlFlowCallbackWrapper(
        dl.CriterionCallback(input_key="embeddings", target_key="targets", metric_key="loss"),
        loaders="train",
    ),
    dl.ControlFlowCallbackWrapper(
        dl.CMCScoreCallback(
            embeddings_key="embeddings",
            labels_key="targets",
            is_query_key="is_query",
            topk=[1],
        ),
        loaders="valid",
    ),
    dl.PeriodicLoaderCallback(
        valid_loader_key="valid", valid_metric_key="cmc01", minimize=False, valid=2
    ),
]

runner = CustomRunner(input_key="features", output_key="embeddings")
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    callbacks=callbacks,
    loaders={"train": train_loader, "valid": valid_loader},
    verbose=False,
    logdir="./logs",
    valid_loader="valid",
    valid_metric="cmc01",
    minimize_valid_metric=False,
    num_epochs=10,
)

CV - MNIST GAN

import os
import torch
from torch import nn
from torch.utils.data import DataLoader
from catalyst import dl
from catalyst.contrib.datasets import MNIST
from catalyst.contrib.layers import GlobalMaxPool2d, Lambda

latent_dim = 128
generator = nn.Sequential(
    // 我们希望生成128个系数,以便重塑为7x7x128的特征图
    nn.Linear(128, 128 * 7 * 7),
    nn.LeakyReLU(0.2, inplace=True),
    Lambda(lambda x: x.view(x.size(0), 128, 7, 7)),
    nn.ConvTranspose2d(128, 128, (4, 4), stride=(2, 2), padding=1),
    nn.LeakyReLU(0.2, inplace=True),
    nn.ConvTranspose2d(128, 128, (4, 4), stride=(2, 2), padding=1),
    nn.LeakyReLU(0.2, inplace=True),
    nn.Conv2d(128, 1, (7, 7), padding=3),
    nn.Sigmoid(),
)
discriminator = nn.Sequential(
    nn.Conv2d(1, 64, (3, 3), stride=(2, 2), padding=1),
    nn.LeakyReLU(0.2, inplace=True),
    nn.Conv2d(64, 128, (3, 3), stride=(2, 2), padding=1),
    nn.LeakyReLU(0.2, inplace=True),
    GlobalMaxPool2d(),
    nn.Flatten(),
    nn.Linear(128, 1),
)

model = nn.ModuleDict({"generator": generator, "discriminator": discriminator})
criterion = {"generator": nn.BCEWithLogitsLoss(), "discriminator": nn.BCEWithLogitsLoss()}
optimizer = {
    "generator": torch.optim.Adam(generator.parameters(), lr=0.0003, betas=(0.5, 0.999)),
    "discriminator": torch.optim.Adam(discriminator.parameters(), lr=0.0003, betas=(0.5, 0.999)),
}
train_data = MNIST(os.getcwd(), train=False)
loaders = {"train": DataLoader(train_data, batch_size=32)}

class CustomRunner(dl.Runner):
    def predict_batch(self, batch):
        batch_size = 1
        // 在潜在空间中随机采样点
        random_latent_vectors = torch.randn(batch_size, latent_dim).to(self.engine.device)
        // 将其解码为假图像
        generated_images = self.model["generator"](random_latent_vectors).detach()
        return generated_images

    def handle_batch(self, batch):
        real_images, _ = batch
        batch_size = real_images.shape[0]

        // 在潜在空间中随机采样点
        random_latent_vectors = torch.randn(batch_size, latent_dim).to(self.engine.device)

        // 将其解码为假图像
        generated_images = self.model["generator"](random_latent_vectors).detach()
        // 将其与真实图像结合
        combined_images = torch.cat([generated_images, real_images])

        // 组装用于区分真假图像的标签
        labels = \
            torch.cat([torch.ones((batch_size, 1)), torch.zeros((batch_size, 1))]).to(self.engine.device)
        // 向标签中添加随机噪声——这是一个重要的技巧!
        labels += 0.05 * torch.rand(labels.shape).to(self.engine.device)

        // 判别器前向传播
        combined_predictions = self.model["discriminator"](combined_images)

        // 再次在潜在空间中随机采样点
        random_latent_vectors = torch.randn(batch_size, latent_dim).to(self.engine.device)
        // 组装表示“所有图像均为真”的标签
        misleading_labels = torch.zeros((batch_size, 1)).to(self.engine.device)

        // 生成器前向传播
        generated_images = self.model["generator"](random_latent_vectors)
        generated_predictions = self.model["discriminator"](generated_images)

        self.batch = {
            "combined_predictions": combined_predictions,
            "labels": labels,
            "generated_predictions": generated_predictions,
            "misleading_labels": misleading_labels,
        }


runner = CustomRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    loaders=loaders,
    callbacks=[
        dl.CriterionCallback(
            input_key="combined_predictions",
            target_key="labels",
            metric_key="loss_discriminator",
            criterion_key="discriminator",
        ),
        dl.BackwardCallback(metric_key="loss_discriminator"),
        dl.OptimizerCallback(
            optimizer_key="discriminator",
            metric_key="loss_discriminator",
        ),
        dl.CriterionCallback(
            input_key="generated_predictions",
            target_key="misleading_labels",
            metric_key="loss_generator",
            criterion_key="generator",
        ),
        dl.BackwardCallback(metric_key="loss_generator"),
        dl.OptimizerCallback(
            optimizer_key="generator",
            metric_key="loss_generator",
        ),
    ],
    valid_loader="train",
    valid_metric="loss_generator",
    minimize_valid_metric=True,
    num_epochs=20,
    verbose=True,
    logdir="./logs_gan",
)

// 可视化(需要 matplotlib):
// import matplotlib.pyplot as plt
// %matplotlib inline

# plt.imshow(runner.predict_batch(None)[0, 0].cpu().numpy())

计算机视觉 - MNIST 变分自编码器

import os
import torch
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import DataLoader
from catalyst import dl, metrics
from catalyst.contrib.datasets import MNIST

LOG_SCALE_MAX = 2
LOG_SCALE_MIN = -10

def normal_sample(loc, log_scale):
    scale = torch.exp(0.5 * log_scale)
    return loc + scale * torch.randn_like(scale)

class VAE(nn.Module):
    def __init__(self, in_features, hid_features):
        super().__init__()
        self.hid_features = hid_features
        self.encoder = nn.Linear(in_features, hid_features * 2)
        self.decoder = nn.Sequential(nn.Linear(hid_features, in_features), nn.Sigmoid())

    def forward(self, x, deterministic=False):
        z = self.encoder(x)
        bs, z_dim = z.shape

        loc, log_scale = z[:, : z_dim // 2], z[:, z_dim // 2 :]
        log_scale = torch.clamp(log_scale, LOG_SCALE_MIN, LOG_SCALE_MAX)

        z_ = loc if deterministic else normal_sample(loc, log_scale)
        z_ = z_.view(bs, -1)
        x_ = self.decoder(z_)

        return x_, loc, log_scale

class CustomRunner(dl.IRunner):
    def __init__(self, hid_features, logdir, engine):
        super().__init__()
        self.hid_features = hid_features
        self._logdir = logdir
        self._engine = engine

    def get_engine(self):
        return self._engine

    def get_loggers(self):
        return {
            "console": dl.ConsoleLogger(),
            "csv": dl.CSVLogger(logdir=self._logdir),
            "tensorboard": dl.TensorboardLogger(logdir=self._logdir),
        }

    @property
    def num_epochs(self) -> int:
        return 1

    def get_loaders(self):
        loaders = {
            "train": DataLoader(MNIST(os.getcwd(), train=False), batch_size=32),
            "valid": DataLoader(MNIST(os.getcwd(), train=False), batch_size=32),
        }
        return loaders

    def get_model(self):
        model = self.model if self.model is not None else VAE(28 * 28, self.hid_features)
        return model

    def get_optimizer(self, model):
        return optim.Adam(model.parameters(), lr=0.02)

    def get_callbacks(self):
        return {
            "backward": dl.BackwardCallback(metric_key="loss"),
            "optimizer": dl.OptimizerCallback(metric_key="loss"),
            "checkpoint": dl.CheckpointCallback(
                self._logdir,
                loader_key="valid",
                metric_key="loss",
                minimize=True,
                topk=3,
            ),
        }

    def on_loader_start(self, runner):
        super().on_loader_start(runner)
        self.meters = {
            key: metrics.AdditiveMetric(compute_on_call=False)
            for key in ["loss_ae", "loss_kld", "loss"]
        }

    def handle_batch(self, batch):
        x, _ = batch
        x = x.view(x.size(0), -1)
        x_, loc, log_scale = self.model(x, deterministic=not self.is_train_loader)

        loss_ae = F.mse_loss(x_, x)
        loss_kld = (
            -0.5 * torch.sum(1 + log_scale - loc.pow(2) - log_scale.exp(), dim=1)
        ).mean()
        loss = loss_ae + loss_kld * 0.01

        self.batch_metrics = {"loss_ae": loss_ae, "loss_kld": loss_kld, "loss": loss}
        for key in ["loss_ae", "loss_kld", "loss"]:
            self.meters[key].update(self.batch_metrics[key].item(), self.batch_size)

    def on_loader_end(self, runner):
        for key in ["loss_ae", "loss_kld", "loss"]:
            self.loader_metrics[key] = self.meters[key].compute()[0]
        super().on_loader_end(runner)

    def predict_batch(self, batch):
        random_latent_vectors = torch.randn(1, self.hid_features).to(self.engine.device)
        generated_images = self.model.decoder(random_latent_vectors).detach()
        return generated_images

runner = CustomRunner(128, "./logs", dl.CPUEngine())
runner.run()
# 可视化(需要 matplotlib):
# import matplotlib.pyplot as plt
# %matplotlib inline

# plt.imshow(runner.predict_batch(None)[0].cpu().numpy().reshape(28, 28))

AutoML - hyperparameters optimization with Optuna

import os
import optuna
import torch
from torch import nn
from torch.utils.data import DataLoader
from catalyst import dl
from catalyst.contrib.datasets import MNIST


def objective(trial):
    lr = trial.suggest_loguniform("lr", 1e-3, 1e-1)
    num_hidden = int(trial.suggest_loguniform("num_hidden", 32, 128))

    train_data = MNIST(os.getcwd(), train=True)
    valid_data = MNIST(os.getcwd(), train=False)
    loaders = {
        "train": DataLoader(train_data, batch_size=32),
        "valid": DataLoader(valid_data, batch_size=32),
    }
    model = nn.Sequential(
        nn.Flatten(), nn.Linear(784, num_hidden), nn.ReLU(), nn.Linear(num_hidden, 10)
    )
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    runner = dl.SupervisedRunner(input_key="features", output_key="logits", target_key="targets")
    runner.train(
        model=model,
        criterion=criterion,
        optimizer=optimizer,
        loaders=loaders,
        callbacks={
            "accuracy": dl.AccuracyCallback(
                input_key="logits", target_key="targets", num_classes=10
            ),
            # catalyst[optuna] required ``pip install catalyst[optuna]``
            "optuna": dl.OptunaPruningCallback(
                loader_key="valid", metric_key="accuracy01", minimize=False, trial=trial
            ),
        },
        num_epochs=3,
    )
    score = trial.best_score
    return score

study = optuna.create_study(
    direction="maximize",
    pruner=optuna.pruners.MedianPruner(
        n_startup_trials=1, n_warmup_steps=0, interval_steps=1
    ),
)
study.optimize(objective, n_trials=3, timeout=300)
print(study.best_value, study.best_params)

Config API - minimal example

runner:
  _target_: catalyst.runners.SupervisedRunner
  model:
    _var_: model
    _target_: torch.nn.Sequential
    args:
      - _target_: torch.nn.Flatten
      - _target_: torch.nn.Linear
        in_features: 784  # 28 * 28
        out_features: 10
  input_key: features
  output_key: &output_key logits
  target_key: &target_key targets
  loss_key: &loss_key loss

run:
  # ≈ stage 1
  - _call_: train  # runner.train(...)

    criterion:
      _target_: torch.nn.CrossEntropyLoss

    optimizer:
      _target_: torch.optim.Adam
      params:  # model.parameters()
        _var_: model.parameters
      lr: 0.02

    loaders:
      train:
        _target_: torch.utils.data.DataLoader
        dataset:
          _target_: catalyst.contrib.datasets.MNIST
          root: data
          train: y
        batch_size: 32

      &valid_loader_key valid:
        &valid_loader
        _target_: torch.utils.data.DataLoader
        dataset:
          _target_: catalyst.contrib.datasets.MNIST
          root: data
          train: n
        batch_size: 32

    callbacks:
      - &accuracy_metric
        _target_: catalyst.callbacks.AccuracyCallback
        input_key: *output_key
        target_key: *target_key
        topk: [1,3,5]
      - _target_: catalyst.callbacks.PrecisionRecallF1SupportCallback
        input_key: *output_key
        target_key: *target_key

    num_epochs: 1
    logdir: logs
    valid_loader: *valid_loader_key
    valid_metric: *loss_key
    minimize_valid_metric: y
    verbose: y

  # ≈ stage 2
  - _call_: evaluate_loader  # runner.evaluate_loader(...)
    loader: *valid_loader
    callbacks:
      - *accuracy_metric
catalyst-run --config example.yaml

Tests

All Catalyst code, features, and pipelines are fully tested. We also have our own catalyst-codestyle and a corresponding pre-commit hook. During testing, we train a variety of different models: image classification, image segmentation, text classification, GANs, and much more. We then compare their convergence metrics in order to verify the correctness of the training procedure and its reproducibility. As a result, Catalyst provides fully tested and reproducible best practices for your deep learning research and development.

Blog Posts

Talks

Community

由 Catalyst 加速

研究论文

博客文章

竞赛

工具包

其他

更多项目请参阅 GitHub依赖关系图

如果您的项目实现了某篇论文、具有显著的应用案例或教程、或是Kaggle竞赛的解决方案,或者您的代码仅仅展示了有趣的结果并使用了Catalyst,我们非常乐意将您的项目添加到上述列表中!请随时向我们提交一个包含类似上述简要描述的PR。

贡献指南

我们感谢所有贡献。如果您计划贡献修复bug的代码,无需事先与我们沟通,直接提交PR即可。如果您计划贡献新功能、新的实用函数或扩展,请先创建一个问题,并与我们讨论。

用户反馈

我们创建了 feedback@catalyst-team.com 作为用户反馈的额外渠道。

  • 如果您喜欢这个项目并想感谢我们,这里就是合适的地方。
  • 如果您希望与 Catalyst 团队合作以提升深度学习研发水平,我们随时欢迎。
  • 如果您不喜欢使用 GitHub Issues 而更倾向于通过电子邮件沟通,请随时给我们发送邮件。
  • 最后,如果您对某些方面不满意,请务必告诉我们,我们将一起探讨改进方案。

我们非常感谢任何形式的反馈。谢谢!

致谢

自 Catalyst 开发之初以来,许多人都以各种方式对其产生了重要影响。

Catalyst 团队

Catalyst 贡献者

受信任的机构

引用

如果您希望在您的出版物中引用本仓库,请使用以下 BibTeX 格式:

@misc{catalyst,
    author = {Kolesnikov, Sergey},
    title = {Catalyst - 加速深度学习研发},
    year = {2018},
    publisher = {GitHub},
    journal = {GitHub 代码库},
    howpublished = {\url{https://github.com/catalyst-team/catalyst}},
}

版本历史

v22.042022/04/29
v22.02.12022/02/27
v22.022022/02/13
v22.02rc02022/02/07
v21.122021/12/28
v21.112021/11/30
v21.102021/10/30
v21.092021/09/30
v21.09rc12021/09/27
v21.09rc02021/09/27
v21.082021/08/31
v21.072021/07/29
v21.062021/06/29
v21.052021/05/31
v21.04.22021/04/30
v21.04.12021/04/19
v21.042021/04/17
v21.03.22021/03/29
v21.03.12021/03/28
v21.032021/03/13

常见问题

相似工具推荐

stable-diffusion-webui

stable-diffusion-webui 是一个基于 Gradio 构建的网页版操作界面,旨在让用户能够轻松地在本地运行和使用强大的 Stable Diffusion 图像生成模型。它解决了原始模型依赖命令行、操作门槛高且功能分散的痛点,将复杂的 AI 绘图流程整合进一个直观易用的图形化平台。 无论是希望快速上手的普通创作者、需要精细控制画面细节的设计师,还是想要深入探索模型潜力的开发者与研究人员,都能从中获益。其核心亮点在于极高的功能丰富度:不仅支持文生图、图生图、局部重绘(Inpainting)和外绘(Outpainting)等基础模式,还独创了注意力机制调整、提示词矩阵、负向提示词以及“高清修复”等高级功能。此外,它内置了 GFPGAN 和 CodeFormer 等人脸修复工具,支持多种神经网络放大算法,并允许用户通过插件系统无限扩展能力。即使是显存有限的设备,stable-diffusion-webui 也提供了相应的优化选项,让高质量的 AI 艺术创作变得触手可及。

162.1k|★★★☆☆|今天
开发框架图像Agent

everything-claude-code

everything-claude-code 是一套专为 AI 编程助手(如 Claude Code、Codex、Cursor 等)打造的高性能优化系统。它不仅仅是一组配置文件,而是一个经过长期实战打磨的完整框架,旨在解决 AI 代理在实际开发中面临的效率低下、记忆丢失、安全隐患及缺乏持续学习能力等核心痛点。 通过引入技能模块化、直觉增强、记忆持久化机制以及内置的安全扫描功能,everything-claude-code 能显著提升 AI 在复杂任务中的表现,帮助开发者构建更稳定、更智能的生产级 AI 代理。其独特的“研究优先”开发理念和针对 Token 消耗的优化策略,使得模型响应更快、成本更低,同时有效防御潜在的攻击向量。 这套工具特别适合软件开发者、AI 研究人员以及希望深度定制 AI 工作流的技术团队使用。无论您是在构建大型代码库,还是需要 AI 协助进行安全审计与自动化测试,everything-claude-code 都能提供强大的底层支持。作为一个曾荣获 Anthropic 黑客大奖的开源项目,它融合了多语言支持与丰富的实战钩子(hooks),让 AI 真正成长为懂上

139k|★★☆☆☆|今天
开发框架Agent语言模型

ComfyUI

ComfyUI 是一款功能强大且高度模块化的视觉 AI 引擎,专为设计和执行复杂的 Stable Diffusion 图像生成流程而打造。它摒弃了传统的代码编写模式,采用直观的节点式流程图界面,让用户通过连接不同的功能模块即可构建个性化的生成管线。 这一设计巧妙解决了高级 AI 绘图工作流配置复杂、灵活性不足的痛点。用户无需具备编程背景,也能自由组合模型、调整参数并实时预览效果,轻松实现从基础文生图到多步骤高清修复等各类复杂任务。ComfyUI 拥有极佳的兼容性,不仅支持 Windows、macOS 和 Linux 全平台,还广泛适配 NVIDIA、AMD、Intel 及苹果 Silicon 等多种硬件架构,并率先支持 SDXL、Flux、SD3 等前沿模型。 无论是希望深入探索算法潜力的研究人员和开发者,还是追求极致创作自由度的设计师与资深 AI 绘画爱好者,ComfyUI 都能提供强大的支持。其独特的模块化架构允许社区不断扩展新功能,使其成为当前最灵活、生态最丰富的开源扩散模型工具之一,帮助用户将创意高效转化为现实。

107.7k|★★☆☆☆|2天前
开发框架图像Agent

NextChat

NextChat 是一款轻量且极速的 AI 助手,旨在为用户提供流畅、跨平台的大模型交互体验。它完美解决了用户在多设备间切换时难以保持对话连续性,以及面对众多 AI 模型不知如何统一管理的痛点。无论是日常办公、学习辅助还是创意激发,NextChat 都能让用户随时随地通过网页、iOS、Android、Windows、MacOS 或 Linux 端无缝接入智能服务。 这款工具非常适合普通用户、学生、职场人士以及需要私有化部署的企业团队使用。对于开发者而言,它也提供了便捷的自托管方案,支持一键部署到 Vercel 或 Zeabur 等平台。 NextChat 的核心亮点在于其广泛的模型兼容性,原生支持 Claude、DeepSeek、GPT-4 及 Gemini Pro 等主流大模型,让用户在一个界面即可自由切换不同 AI 能力。此外,它还率先支持 MCP(Model Context Protocol)协议,增强了上下文处理能力。针对企业用户,NextChat 提供专业版解决方案,具备品牌定制、细粒度权限控制、内部知识库整合及安全审计等功能,满足公司对数据隐私和个性化管理的高标准要求。

87.6k|★★☆☆☆|今天
开发框架语言模型

ML-For-Beginners

ML-For-Beginners 是由微软推出的一套系统化机器学习入门课程,旨在帮助零基础用户轻松掌握经典机器学习知识。这套课程将学习路径规划为 12 周,包含 26 节精炼课程和 52 道配套测验,内容涵盖从基础概念到实际应用的完整流程,有效解决了初学者面对庞大知识体系时无从下手、缺乏结构化指导的痛点。 无论是希望转型的开发者、需要补充算法背景的研究人员,还是对人工智能充满好奇的普通爱好者,都能从中受益。课程不仅提供了清晰的理论讲解,还强调动手实践,让用户在循序渐进中建立扎实的技能基础。其独特的亮点在于强大的多语言支持,通过自动化机制提供了包括简体中文在内的 50 多种语言版本,极大地降低了全球不同背景用户的学习门槛。此外,项目采用开源协作模式,社区活跃且内容持续更新,确保学习者能获取前沿且准确的技术资讯。如果你正寻找一条清晰、友好且专业的机器学习入门之路,ML-For-Beginners 将是理想的起点。

85k|★★☆☆☆|今天
图像数据工具视频

ragflow

RAGFlow 是一款领先的开源检索增强生成(RAG)引擎,旨在为大语言模型构建更精准、可靠的上下文层。它巧妙地将前沿的 RAG 技术与智能体(Agent)能力相结合,不仅支持从各类文档中高效提取知识,还能让模型基于这些知识进行逻辑推理和任务执行。 在大模型应用中,幻觉问题和知识滞后是常见痛点。RAGFlow 通过深度解析复杂文档结构(如表格、图表及混合排版),显著提升了信息检索的准确度,从而有效减少模型“胡编乱造”的现象,确保回答既有据可依又具备时效性。其内置的智能体机制更进一步,使系统不仅能回答问题,还能自主规划步骤解决复杂问题。 这款工具特别适合开发者、企业技术团队以及 AI 研究人员使用。无论是希望快速搭建私有知识库问答系统,还是致力于探索大模型在垂直领域落地的创新者,都能从中受益。RAGFlow 提供了可视化的工作流编排界面和灵活的 API 接口,既降低了非算法背景用户的上手门槛,也满足了专业开发者对系统深度定制的需求。作为基于 Apache 2.0 协议开源的项目,它正成为连接通用大模型与行业专有知识之间的重要桥梁。

77.1k|★★★☆☆|昨天
Agent图像开发框架