流媒体工具教程

什么是流式模式?

*流式处理*允许工具在任务仍在运行时将部分结果返回给调用方。调用方无需等待完整的响应,而是可以实时接收数据块(文本片段、结构化更新等)。这种功能在以下情况下非常有用:

  • 响应内容较长或较为详细时,您希望能够提前获得洞察。

  • 用户界面(CLI、笔记本、MCP客户端)受益于渐进式更新

  • 您计划将每个数据块传输到另一个系统(日志记录、部分渲染、增量处理)。

所有基于 AgenticTool 构建的工具已支持流式处理。其他类型的工具可以通过遵循 构建您自己的流媒体工具 中的说明选择加入流式处理功能。

流式内置智能工具

ToolUniverse 的发行版附带了许多 Agentic 工具,例如 ScientificTextSummarizer``(定义在 ``agentic_tools.json 中)。以下代码片段展示了如何使用 ToolUniverse.run 配合流式回调运行该工具:

from tooluniverse.execute_function import ToolUniverse
import json

tu = ToolUniverse()
tu.load_tools(include_tools=["ScientificTextSummarizer"])

chunks = []

def on_chunk(text: str) -> None:
    chunks.append(text)
    print(text, end="", flush=True)

fcall = json.dumps(
    {
        "name": "ScientificTextSummarizer",
        "arguments": {
            "text": "Long paper...",
            "summary_length": "100",
            "focus_area": "results",
        },
    }
)

result = tu.run(
    fcall,
    return_message=False,
    verbose=False,
    stream_callback=on_chunk,
)

print("\nFinal result:\n", result)

关键点:

  • 通过传递 stream_callback,通知 ToolUniverse 调用方可以接受流式数据块。

  • ToolUniverse.run 接受与 MCP 调用中使用的相同 JSON 结构。当存在回调函数时,框架会自动在参数字典中设置 AgenticTool.STREAM_FLAG_KEY,以便 AgenticTool 知道已请求流式处理。

  • 如果省略回调函数,该工具仍然可以正常运行——它只会返回一个最终的字符串。

流式演示脚本 examples/agentic_streaming_example.py 使用更长的提示包装了相同的逻辑,使数据块在视觉上更加明显。

构建您自己的流媒体工具

任何自定义工具只需进行三个小改动即可启用流式处理:

  1. 在类上声明一个标志键,以便 ToolUniverse 在提供回调时知道要填充哪个参数。

    class MyStreamingTool(BaseTool):
        STREAM_FLAG_KEY = "_tooluniverse_stream"
    
  2. 在 ``run`` 中接受 ``stream_callback`` 并转发数据块。 在下游验证之前,从参数字典中移除该标志。

    from typing import Callable, Optional
    
    class MyStreamingTool(BaseTool):
        STREAM_FLAG_KEY = "_tooluniverse_stream"
    
        def run(
            self,
            arguments: dict,
            stream_callback: Optional[Callable[[str], None]] = None,
        ):
            arguments = dict(arguments)
            stream_enabled = bool(arguments.pop(self.STREAM_FLAG_KEY, False))
    
            if stream_enabled and stream_callback:
                for chunk in self.generate_chunks(arguments):
                    stream_callback(chunk)
                return "".join(self.generate_chunks(arguments))
    
            return self.run_without_streaming(arguments)
    

    如果该工具由于某种原因无法传递数据块,请回退到非流式处理路径(如 AgenticTool 所做的)。

  3. 可选 – 如果需要允许外部调用者在不依赖``stream_callback``的情况下切换流式处理,请在工具的模式中记录该标志。

测试

在添加流式传输支持时,请参考现有的测试套件:

  • tests/test_streaming_support.py – 单元测试,涵盖回调注入和自动标志处理。

  • tests/test_agentic_streaming_integration.py – 集成测试涵盖 AgenticTool 流式处理和 SMCP 日志传播。

使用以下命令运行:

pytest tests/test_streaming_support.py tests/test_agentic_streaming_integration.py