流媒体工具教程¶
什么是流式模式?¶
*流式处理*允许工具在任务仍在运行时将部分结果返回给调用方。调用方无需等待完整的响应,而是可以实时接收数据块(文本片段、结构化更新等)。这种功能在以下情况下非常有用:
响应内容较长或较为详细时,您希望能够提前获得洞察。
用户界面(CLI、笔记本、MCP客户端)受益于渐进式更新
您计划将每个数据块传输到另一个系统(日志记录、部分渲染、增量处理)。
所有基于 AgenticTool 构建的工具已支持流式处理。其他类型的工具可以通过遵循 构建您自己的流媒体工具 中的说明选择加入流式处理功能。
流式内置智能工具¶
ToolUniverse 的发行版附带了许多 Agentic 工具,例如 ScientificTextSummarizer``(定义在 ``agentic_tools.json 中)。以下代码片段展示了如何使用 ToolUniverse.run 配合流式回调运行该工具:
from tooluniverse.execute_function import ToolUniverse
import json
tu = ToolUniverse()
tu.load_tools(include_tools=["ScientificTextSummarizer"])
chunks = []
def on_chunk(text: str) -> None:
chunks.append(text)
print(text, end="", flush=True)
fcall = json.dumps(
{
"name": "ScientificTextSummarizer",
"arguments": {
"text": "Long paper...",
"summary_length": "100",
"focus_area": "results",
},
}
)
result = tu.run(
fcall,
return_message=False,
verbose=False,
stream_callback=on_chunk,
)
print("\nFinal result:\n", result)
关键点:
通过传递
stream_callback,通知 ToolUniverse 调用方可以接受流式数据块。ToolUniverse.run接受与 MCP 调用中使用的相同 JSON 结构。当存在回调函数时,框架会自动在参数字典中设置AgenticTool.STREAM_FLAG_KEY,以便 AgenticTool 知道已请求流式处理。如果省略回调函数,该工具仍然可以正常运行——它只会返回一个最终的字符串。
流式演示脚本 examples/agentic_streaming_example.py 使用更长的提示包装了相同的逻辑,使数据块在视觉上更加明显。
构建您自己的流媒体工具¶
任何自定义工具只需进行三个小改动即可启用流式处理:
在类上声明一个标志键,以便 ToolUniverse 在提供回调时知道要填充哪个参数。
class MyStreamingTool(BaseTool): STREAM_FLAG_KEY = "_tooluniverse_stream"
在 ``run`` 中接受 ``stream_callback`` 并转发数据块。 在下游验证之前,从参数字典中移除该标志。
from typing import Callable, Optional class MyStreamingTool(BaseTool): STREAM_FLAG_KEY = "_tooluniverse_stream" def run( self, arguments: dict, stream_callback: Optional[Callable[[str], None]] = None, ): arguments = dict(arguments) stream_enabled = bool(arguments.pop(self.STREAM_FLAG_KEY, False)) if stream_enabled and stream_callback: for chunk in self.generate_chunks(arguments): stream_callback(chunk) return "".join(self.generate_chunks(arguments)) return self.run_without_streaming(arguments)
如果该工具由于某种原因无法传递数据块,请回退到非流式处理路径(如 AgenticTool 所做的)。
可选 – 如果需要允许外部调用者在不依赖``stream_callback``的情况下切换流式处理,请在工具的模式中记录该标志。
测试¶
在添加流式传输支持时,请参考现有的测试套件:
tests/test_streaming_support.py– 单元测试,涵盖回调注入和自动标志处理。tests/test_agentic_streaming_integration.py– 集成测试涵盖 AgenticTool 流式处理和 SMCP 日志传播。
使用以下命令运行:
pytest tests/test_streaming_support.py tests/test_agentic_streaming_integration.py