service-streamer
Service Streamer 是一款专为深度学习 Web 服务设计的中间件,旨在显著提升模型在线推理的性能。在传统的 Web 服务架构中,用户请求通常是离散且逐个到达的,这导致拥有强大并行计算能力的 GPU 在处理单个请求时处于闲置状态,不仅资源利用率低,而且在高并发场景下延迟会线性增加。
Service Streamer 巧妙地解决了这一痛点。它通过在服务端引入队列机制,将来自不同用户的离散请求自动收集并组装成“小批量”(mini-batch)数据,再统一送入深度学习模型进行计算。这种机制充分利用了 GPU 的并行处理优势,在不改变原有模型逻辑的前提下,大幅提高了吞吐量并降低了响应延迟。官方测试显示,仅需对代码进行微调,即可使模型处理速度提升十倍。
该工具非常适合需要将 NLP、图像识别等深度学习模型部署为在线服务的开发者和研究人员。其核心亮点在于极高的易用性与兼容性:它与任何 Web 框架(如 Flask、Django)及深度学习框架均能无缝协作,支持快速集成;同时具备良好的扩展性,能轻松适配多 GPU 环境以应对海量请求。如果你希望以最小成本优化现有 AI 服务的效率,Service Streamer 是一个值得尝试的专业方案。
使用场景
某电商平台的智能客服团队正在部署基于 BERT 模型的自动回复系统,需实时处理成千上万条用户咨询文本以生成精准回答。
没有 service-streamer 时
- GPU 资源严重浪费:用户请求是离散到达的,传统服务器逐个处理请求,导致昂贵的 GPU 大部分时间处于空闲等待状态。
- 高并发下延迟激增:当促销活动期间流量洪峰到来,排队请求线性增加,用户收到回复的等待时间从毫秒级飙升至数秒。
- 吞吐量遭遇瓶颈:单线程或简单多线程模式无法利用深度学习模型的批处理(Mini-batch)特性,系统每秒仅能处理极少量句子。
- 扩容成本高昂:为了维持响应速度,不得不堆砌更多服务器实例,导致硬件和运维成本大幅上升。
使用 service-streamer 后
- GPU 利用率最大化:service-streamer 作为中间件,自动将离散的用户请求动态聚合成 Mini-batch,让 GPU 始终满负荷并行计算。
- 低延迟稳定响应:即使在数万并发请求下,通过高效的队列采样机制,系统仍能保持毫秒级的在线推理延迟。
- 吞吐量提升十倍:借助批处理优势,单节点处理速度从每秒几百句跃升至数千句(如官方案例中的 1400 句/秒),轻松应对流量高峰。
- 无缝集成与扩展:无需重构现有的 Flask 或 Django 代码,仅需微小改动即可接入,并轻松扩展至多 GPU 集群场景。
service-streamer 通过将离散请求转化为批量计算,彻底解决了深度学习 Web 服务中 GPU 闲置与高并发延迟的矛盾,以极低改造成本实现了性能的数量级飞跃。
运行环境要求
- 未说明
需要 NVIDIA GPU (示例中使用了 Titan Xp),需安装 CUDA (示例环境为 CUDA 9.0),支持多 GPU 部署
未说明

快速开始
服务流处理器
助力您的深度学习应用Web服务。 中文README
什么是服务流处理器? • 亮点 • 安装 • 5分钟开发BERT服务 • API • 基准测试 • 常见问题解答 •
• 由ShannonAI制作 • :globe_with_meridians: http://www.shannonai.com/
什么是服务流处理器?
在深度学习模型中,通常会将数据样本收集为一个“小批量”进行处理。这样可以充分利用GPU的并行计算能力。然而,用户对Web服务的请求通常是离散的。如果使用传统的循环服务器或线程服务器,GPU每次只能处理一个请求,导致空闲时间增加,并且在有并发请求时,延迟会线性增长。
ServiceStreamer是一款用于机器学习应用Web服务的中间件。它能够将用户的请求排队并采样成小批量,通过提高GPU利用率来显著提升系统的整体性能。
亮点
- :hatching_chick: 易于使用:只需少量改动,即可使模型加速十倍。
- :zap: 处理速度快:降低机器学习模型在线推理的延迟。
- :octopus: 扩展性强:易于应用于多GPU场景,以处理海量请求。
- :crossed_swords: 适用性广:可与任何Web框架和/或深度学习框架配合使用。
安装
使用pip安装ServiceStream,需要Python >= 3.5:
pip install service_streamer
5分钟开发BERT服务
我们提供了一个分步教程,帮助您在5分钟内将BERT部署上线。该服务每秒可处理1400个句子。
“文本补全”是自然语言处理中的一个任务:给定一个随机缺失了若干单词的句子,模型需要根据上下文预测这些被删除的单词。
“BERT”在过去两年中备受关注,在许多NLP任务上都取得了最先进的成果。BERT利用“掩码语言模型(MLM)”作为其预训练目标之一。MLM会随机遮盖输入中的部分标记,目标是根据上下文预测被遮盖单词的原始词汇ID。MLM与文本补全任务有相似之处,因此将BERT引入文本补全任务是非常自然的选择。
首先,我们为文本补全任务定义一个模型bert_model.py。
predict函数接受一批句子,并返回[MASK]标记的预测位置结果。class TextInfillingModel(object): ... batch = ["twinkle twinkle [MASK] star.", "Happy birthday to [MASK].", 'the answer to life, the [MASK], and everything.'] model = TextInfillingModel() outputs = model.predict(batch) print(outputs) # ['little', 'you', 'universe']注意:请先下载预训练的BERT模型。
其次,利用Flask将预测接口封装成Web服务。flask_example.py
model = TextInfillingModel() @app.route("/naive", methods=["POST"]) def naive_predict(): inputs = request.form.getlist("s") outputs = model.predict(inputs) return jsonify(outputs) app.run(port=5005)请运行flask_example.py,您将得到一个基础的Web服务器。
curl -X POST http://localhost:5005/naive -d 's=Happy birthday to [MASK].' ["you"]此时,您的Web服务器每秒只能处理12个请求。更多细节请参见基准测试。
第三,通过
service_streamer封装模型函数。仅需三行代码,BERT服务的预测速度就能达到每秒200+个句子(快了16倍)。from service_streamer import ThreadedStreamer streamer = ThreadedStreamer(model.predict, batch_size=64, max_latency=0.1) @app.route("/stream", methods=["POST"]) def stream_predict(): inputs = request.form.getlist("s") outputs = streamer.predict(inputs) return jsonify(outputs) app.run(port=5005, debug=False)运行flask_example.py,然后使用wrk测试性能。
wrk -t 2 -c 128 -d 20s --timeout=10s -s benchmark.lua http://127.0.0.1:5005/stream ... Requests/sec: 200.31最后,通过
Streamer封装模型,并在多个GPU上启动服务工作进程。Streamer进一步提升了推理速度,使服务达到每秒1000+个句子(快了80倍)。from service_streamer import ManagedModel, Streamer class ManagedBertModel(ManagedModel): def init_model(self): self.model = TextInfillingModel() def predict(self, batch): return self.model.predict(batch) streamer = Streamer(ManagedBertModel, batch_size=64, max_latency=0.1, worker_num=8, cuda_devices=(0, 1, 2, 3)) app.run(port=5005, debug=False)可以启动8个GPU工作进程,并均匀分配到4个GPU上。
API
快速入门
一般来说,利用并行计算可以加快推理速度。
outputs = model.predict(batch_inputs)
ServiceStreamer是用于机器学习应用Web服务的中间件。它会将用户的请求排队并调度成小批量,然后转发到GPU工作进程中。ServiceStreamer会在一定程度上牺牲延迟(默认最大为0.1秒),并通过提高GPU利用率来提升整体性能。
from service_streamer import ThreadedStreamer
# 使用Streamer封装batch_predict函数
streamer = ThreadedStreamer(model.predict, batch_size=64, max_latency=0.1)
# 将 model.predict 替换为 streamer.predict
outputs = streamer.predict(batch_inputs)
在多线程(或协程)环境下启动 Web 服务器。只需添加几行代码,您的服务器通常就能提升 10 倍(batch_size/batch_per_request)的处理速度。
分布式 GPU 工作进程
实际应用中,Web 服务器的性能(QPS)远高于 GPU 模型本身。我们还支持一个 Web 服务器搭配多个 GPU 工作进程。
from service_streamer import Streamer
# 使用 spawn 启动 4 个 GPU 工作进程
streamer = Streamer(model.predict, 64, 0.1, worker_num=4)
outputs = streamer.predict(batch)
Streamer 默认使用 spawn 子进程来运行 GPU 工作进程。它通过进程间队列进行通信和任务排队,能够将大量请求分发到多个工作进程进行处理。
随后,模型的预测结果会批量返回给对应的 Web 服务器,并进一步转发到相应的 HTTP 响应中。
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 390.116 Driver Version: 390.116 |
|-------------------------------+----------------------+----------------------+
...
+-----------------------------------------------------------------------------+
| Processes: GPU Memory |
| GPU PID Type Process name Usage |
|=============================================================================|
| 0 7574 C /home/liuxin/nlp/venv/bin/python 1889MiB |
| 1 7575 C /home/liuxin/nlp/venv/bin/python 1889MiB |
| 2 7576 C /home/liuxin/nlp/venv/bin/python 1889MiB |
| 3 7577 C /home/liuxin/nlp/venv/bin/python 1889MiB |
+-----------------------------------------------------------------------------+
上述方法定义简单,但主进程需要初始化模型,这会额外占用一部分内存。而且模型只能运行在同一个 GPU 上。因此,我们提供了 ManagedModel 类,以方便模型的延迟初始化和迁移,同时支持多 GPU 运行。
from service_streamer import ManagedModel
class ManagedBertModel(ManagedModel):
def init_model(self):
self.model = Model()
def predict(self, batch):
return self.model.predict(batch)
# 使用 spawn 启动 4 个 GPU 工作进程,并均匀分配到 0/1/2/3 号 GPU
streamer = Streamer(ManagedBertModel, 64, 0.1, worker_num=4, cuda_devices=(0, 1, 2, 3))
outputs = streamer.predict(batch)
分布式 Web 服务器
一些 CPU 密集型计算,例如图像和文本预处理,通常需要先在 Web 服务器中完成。预处理后的数据再传递给 GPU 工作进程进行预测。然而,在实际应用中,CPU 资源往往会成为性能瓶颈。为此,我们也提供了多台 Web 服务器与单个或多个 GPU 工作进程相匹配的模式。
使用 RedisStream 为所有 Web 服务器和 GPU 工作进程指定一个唯一的 Redis 地址。
# 默认参数可以省略,默认使用 localhost:6379。
streamer = RedisStreamer(redis_broker="172.22.22.22:6379")
我们利用 gunicorn 或 uwsgi 实现反向代理和负载均衡。
cd example
gunicorn -c redis_streamer_gunicorn.py flask_example:app
每个请求都会被负载均衡到不同的 Web 服务器进行 CPU 预处理,然后再均匀分配到 GPU 工作进程进行模型预测。
Future API
如果您使用过任何并发库,应该对 future 不陌生。如果希望将 service_streamer 用于请求排队或分布式 GPU 计算,且应用场景并非 Web 服务,可以直接使用 Future API。
from service_streamer import ThreadedStreamer
streamer = ThreadedStreamer(model.predict, 64, 0.1)
xs = []
for i in range 200:
future = streamer.submit([["How", "are", "you", "?"], ["Fine", "."], ["Thank", "you", "."]])
xs.append(future)
# 获取所有 Future 对象实例并等待异步响应。
for future in xs:
outputs = future.result()
print(outputs)
基准测试
基准测试
我们使用 wrk 进行基准测试。
测试示例和脚本可在 example 中找到。
环境
- GPU:Titan Xp
- CUDA:9.0
- PyTorch:1.1
单 GPU 进程
# 启动 Flask 多线程服务器
python example/flask_example.py
# 测试无 service_streamer 的原生 API
wrk -t 4 -c 128 -d 20s --timeout=10s -s benchmark.lua http://127.0.0.1:5005/naive
# 测试带有 service_streamer 的流式 API
wrk -t 4 -c 128 -d 20s --timeout=10s -s benchmark.lua http://127.0.0.1:5005/stream
| Naive | ThreaedStreamer | Streamer | RedisStreamer | |
|---|---|---|---|---|
| qps | 12.78 | 207.59 | 321.70 | 372.45 |
| 延迟 | 8440ms | 603.35ms | 392.66ms | 340.74ms |
多 GPU 进程
与单个 Web 服务器进程相比,多 GPU 工作进程的通信和负载均衡机制会导致一定的性能损失。由于多线程 Flask 服务器已成为性能瓶颈,我们采用了 gevent 服务器。请参阅 flask_multigpu_example.py。
wrk -t 8 -c 512 -d 20s --timeout=10s -s benchmark.lua http://127.0.0.1:5005/stream
| gpu_worker_num | Naive | ThreadedStreamer | Streamer | RedisStreamer |
|---|---|---|---|---|
| 1 | 11.62 | 211.02 | 362.69 | 365.80 |
| 2 | N/A | N/A | 488.40 | 609.63 |
| 4 | N/A | N/A | 494.20 | 1034.57 |
Threaded Streamer由于 Python GIL 的限制,多工作进程并无意义。我们仅使用单 GPU 工作进程进行对比研究。Streamer在超过 2 个 GPU 工作进程时,性能提升不再线性。此时 CPU 利用率达到 100%,瓶颈在于 CPU,Flask 的性能问题也成为阻碍。
使用 Future API 启动多 GPU 进程
我们采用 Future API 在本地进行多 GPU 基准测试,以减少 Web 服务器对性能的影响。请参考 future_example.py 中的代码示例。
| gpu_worker_num | 批量处理 | ThreadedStreamer | Streamer | RedisStreamer |
|---|---|---|---|---|
| 1 | 422.883 | 401.01 | 399.26 | 384.79 |
| 2 | 无 | 无 | 742.16 | 714.781 |
| 4 | 无 | 无 | 1400.12 | 1356.47 |
可以看出,service_streamer 的性能几乎与 GPU 工作进程的数量呈线性关系。在 service_streamer 中,进程间通信比使用 Redis 更高效。
常见问题解答
问: 使用从 allennlp 训练的模型,在推理时将 Streamer 的 worker_num=4,为什么会出现 16 核 CPU 被打满,且速度反而比 worker_num=1 的 Streamer 还慢的情况?
答: 对于多进程推理,如果模型在处理数据时使用了多线程的 NumPy 操作,可能会导致较高的 CPU 开销,从而使得多核并行计算的速度反而不如单核。这类问题在使用 alennlp、spacy 等第三方库时较为常见。可以通过设置 numpy threads 相关环境变量来解决。
import os
os.environ["MKL_NUM_THREADS"] = "1" # export MKL_NUM_THREADS=1
os.environ["NUMEXPR_NUM_THREADS"] = "1" # export NUMEXPR_NUM_THREADS=1
os.environ["OMP_NUM_THREADS"] = "1" # export OMP_NUM_THREADS=1
import numpy
请注意,必须在 import numpy 之前设置这些环境变量。
问: 使用 RedisStreamer 时,如果只有一个 Redis 代理,但有多个模型,输入批次的结构可能不同。如何处理这种情况?
答: 在初始化工作进程和流式处理器时指定前缀,每个流式处理器将使用唯一的通道。
初始化工作进程的示例:
from service_streamer import run_redis_workers_forever
from bert_model import ManagedBertModel
if __name__ == "__main__":
from multiprocessing import freeze_support
freeze_support()
run_redis_workers_forever(ManagedBertModel, 64, prefix='channel_1')
run_redis_workers_forever(ManagedBertModel, 64, prefix='channel_2')
使用流式处理器获取结果的示例:
from service_streamer import RedisStreamer
streamer_1 = RedisStreaemr(prefix='channel_1')
streamer_2 = RedisStreaemr(prefix='channel_2')
# 预测
output_1 = streamer_1.predict(batch)
output_2 = streamer_2.predict(batch)
常见问题
相似工具推荐
openclaw
OpenClaw 是一款专为个人打造的本地化 AI 助手,旨在让你在自己的设备上拥有完全可控的智能伙伴。它打破了传统 AI 助手局限于特定网页或应用的束缚,能够直接接入你日常使用的各类通讯渠道,包括微信、WhatsApp、Telegram、Discord、iMessage 等数十种平台。无论你在哪个聊天软件中发送消息,OpenClaw 都能即时响应,甚至支持在 macOS、iOS 和 Android 设备上进行语音交互,并提供实时的画布渲染功能供你操控。 这款工具主要解决了用户对数据隐私、响应速度以及“始终在线”体验的需求。通过将 AI 部署在本地,用户无需依赖云端服务即可享受快速、私密的智能辅助,真正实现了“你的数据,你做主”。其独特的技术亮点在于强大的网关架构,将控制平面与核心助手分离,确保跨平台通信的流畅性与扩展性。 OpenClaw 非常适合希望构建个性化工作流的技术爱好者、开发者,以及注重隐私保护且不愿被单一生态绑定的普通用户。只要具备基础的终端操作能力(支持 macOS、Linux 及 Windows WSL2),即可通过简单的命令行引导完成部署。如果你渴望拥有一个懂你
stable-diffusion-webui
stable-diffusion-webui 是一个基于 Gradio 构建的网页版操作界面,旨在让用户能够轻松地在本地运行和使用强大的 Stable Diffusion 图像生成模型。它解决了原始模型依赖命令行、操作门槛高且功能分散的痛点,将复杂的 AI 绘图流程整合进一个直观易用的图形化平台。 无论是希望快速上手的普通创作者、需要精细控制画面细节的设计师,还是想要深入探索模型潜力的开发者与研究人员,都能从中获益。其核心亮点在于极高的功能丰富度:不仅支持文生图、图生图、局部重绘(Inpainting)和外绘(Outpainting)等基础模式,还独创了注意力机制调整、提示词矩阵、负向提示词以及“高清修复”等高级功能。此外,它内置了 GFPGAN 和 CodeFormer 等人脸修复工具,支持多种神经网络放大算法,并允许用户通过插件系统无限扩展能力。即使是显存有限的设备,stable-diffusion-webui 也提供了相应的优化选项,让高质量的 AI 艺术创作变得触手可及。
everything-claude-code
everything-claude-code 是一套专为 AI 编程助手(如 Claude Code、Codex、Cursor 等)打造的高性能优化系统。它不仅仅是一组配置文件,而是一个经过长期实战打磨的完整框架,旨在解决 AI 代理在实际开发中面临的效率低下、记忆丢失、安全隐患及缺乏持续学习能力等核心痛点。 通过引入技能模块化、直觉增强、记忆持久化机制以及内置的安全扫描功能,everything-claude-code 能显著提升 AI 在复杂任务中的表现,帮助开发者构建更稳定、更智能的生产级 AI 代理。其独特的“研究优先”开发理念和针对 Token 消耗的优化策略,使得模型响应更快、成本更低,同时有效防御潜在的攻击向量。 这套工具特别适合软件开发者、AI 研究人员以及希望深度定制 AI 工作流的技术团队使用。无论您是在构建大型代码库,还是需要 AI 协助进行安全审计与自动化测试,everything-claude-code 都能提供强大的底层支持。作为一个曾荣获 Anthropic 黑客大奖的开源项目,它融合了多语言支持与丰富的实战钩子(hooks),让 AI 真正成长为懂上
ComfyUI
ComfyUI 是一款功能强大且高度模块化的视觉 AI 引擎,专为设计和执行复杂的 Stable Diffusion 图像生成流程而打造。它摒弃了传统的代码编写模式,采用直观的节点式流程图界面,让用户通过连接不同的功能模块即可构建个性化的生成管线。 这一设计巧妙解决了高级 AI 绘图工作流配置复杂、灵活性不足的痛点。用户无需具备编程背景,也能自由组合模型、调整参数并实时预览效果,轻松实现从基础文生图到多步骤高清修复等各类复杂任务。ComfyUI 拥有极佳的兼容性,不仅支持 Windows、macOS 和 Linux 全平台,还广泛适配 NVIDIA、AMD、Intel 及苹果 Silicon 等多种硬件架构,并率先支持 SDXL、Flux、SD3 等前沿模型。 无论是希望深入探索算法潜力的研究人员和开发者,还是追求极致创作自由度的设计师与资深 AI 绘画爱好者,ComfyUI 都能提供强大的支持。其独特的模块化架构允许社区不断扩展新功能,使其成为当前最灵活、生态最丰富的开源扩散模型工具之一,帮助用户将创意高效转化为现实。
gemini-cli
gemini-cli 是一款由谷歌推出的开源 AI 命令行工具,它将强大的 Gemini 大模型能力直接集成到用户的终端环境中。对于习惯在命令行工作的开发者而言,它提供了一条从输入提示词到获取模型响应的最短路径,无需切换窗口即可享受智能辅助。 这款工具主要解决了开发过程中频繁上下文切换的痛点,让用户能在熟悉的终端界面内直接完成代码理解、生成、调试以及自动化运维任务。无论是查询大型代码库、根据草图生成应用,还是执行复杂的 Git 操作,gemini-cli 都能通过自然语言指令高效处理。 它特别适合广大软件工程师、DevOps 人员及技术研究人员使用。其核心亮点包括支持高达 100 万 token 的超长上下文窗口,具备出色的逻辑推理能力;内置 Google 搜索、文件操作及 Shell 命令执行等实用工具;更独特的是,它支持 MCP(模型上下文协议),允许用户灵活扩展自定义集成,连接如图像生成等外部能力。此外,个人谷歌账号即可享受免费的额度支持,且项目基于 Apache 2.0 协议完全开源,是提升终端工作效率的理想助手。
markitdown
MarkItDown 是一款由微软 AutoGen 团队打造的轻量级 Python 工具,专为将各类文件高效转换为 Markdown 格式而设计。它支持 PDF、Word、Excel、PPT、图片(含 OCR)、音频(含语音转录)、HTML 乃至 YouTube 链接等多种格式的解析,能够精准提取文档中的标题、列表、表格和链接等关键结构信息。 在人工智能应用日益普及的今天,大语言模型(LLM)虽擅长处理文本,却难以直接读取复杂的二进制办公文档。MarkItDown 恰好解决了这一痛点,它将非结构化或半结构化的文件转化为模型“原生理解”且 Token 效率极高的 Markdown 格式,成为连接本地文件与 AI 分析 pipeline 的理想桥梁。此外,它还提供了 MCP(模型上下文协议)服务器,可无缝集成到 Claude Desktop 等 LLM 应用中。 这款工具特别适合开发者、数据科学家及 AI 研究人员使用,尤其是那些需要构建文档检索增强生成(RAG)系统、进行批量文本分析或希望让 AI 助手直接“阅读”本地文件的用户。虽然生成的内容也具备一定可读性,但其核心优势在于为机器