前言
最近,LLM 与 OpenAI 非常流行, 相信很多人从 Complete 和 Chat API 中的 stream 模式第一次接触到了 SSE API。SSE(Server-Sent Events)是一种支持从 Server Side 向 Client Side 推送事件的方式。它与传统的 Restful API 不同,传统的 Restful 架构只能从服务器端发起请求,然后客户端返回响应的方式进行数据传输。在需要实时更新数据的场景中,只能依赖轮询或者异步任务机制来实现。但前者性能不高,而后者会增加前后端架构的复杂性。
WebSocket 也是一个选择,但实现则更加复杂。
对于需要服务端主动向客户端推送消息,而客户端不需要向服务器发送信息的情况下,SSE 会是一种高性能且简单的实现方式。
本文介绍了如何使用 python 和 FastAPI 实现 SSE API,并且探讨了如何结合 Pydantic 书写类型安全的代码
使用FastAPI
和sse-starlette
实现 SSE API
sse-starlette
包包含了 SSE 的大部分实现。代码实现非常简单:
创建一个返回ServerSentEvent
的AsyncGenerator
,然后作为EventSourceResponse
的 content 返回即可。
ServerSentEvent
的三个参数event
,id
和data
分别可以用来设置服务端事件的事件名,事件 ID 和 Payload 的内容。
输出:
curl -N 'http://127.0.0.1:12345/sse?message=hello'
id: 0
event: echo
data: {'message': 'hello', 'created_at': 1699067601.359488}
id: 1
event: echo
data: {'message': 'hello', 'created_at': 1699067602.36079}
id: 2
event: echo
data: {'message': 'hello', 'created_at': 1699067603.361532}
id: 3
event: echo
data: {'message': 'hello', 'created_at': 1699067604.365153}
id: 4
event: echo
data: {'message': 'hello', 'created_at': 1699067605.367895}
基于 Pydantic 保证类型安全
在上述代码中,事件名使用了字面量,返回数据的内容是字典。当涉及管理复杂的数据内容结构和多个事件时,这可能会变得很麻烦。这时可以结合 Pydantic 写出更加类型安全的代码。
首先创建一个描述事件的类BaseEvent[T]
,它根据类名推断事件名,并包含一个类型为T
的 Payload。
然后创建一个装饰器,将BaseEvent[T]
的流转换为ServerSentEvent
一个的流。 https://github.com/akishichinibu/zenn-doc/blob/b5889866383b0a24cf9d2c7bb5a887ff1ede3a51/code/1423539f71d8c2/main2.py#L32-L54
然后我们定义 Echo 的 PayloadEchoPayload
,然后定义事件Echo
,最后创建事件流echo_stream2
。
curl -N 'http://127.0.0.1:12345/sse2?message=hello'
id: 0
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:02:10.760797"}
id: 1
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:02:11.762338"}
id: 2
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:02:12.764145"}
id: 3
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:02:13.765405"}
id: 4
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:02:14.765821"}
id: 5
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:02:15.767593"}
如果需要在一个 API 中合并多个流
创建另外一个流echo_stream3
并且将其与echo_stream2
进行合并,可以看到两个流交替输出。
curl -N 'http://127.0.0.1:12345/sse3?message=hello'
id: 0
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:04:26.129152"}
id: 1
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:04:27.130363"}
id: 0
event: reverse_echo
data: {"message":"olleh","created_at":"2023-11-04T13:04:27.130311"}
id: 2
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:04:28.131686"}
id: 1
event: reverse_echo
data: {"message":"olleh","created_at":"2023-11-04T13:04:28.131802"}
id: 3
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:04:29.132987"}
id: 2
event: reverse_echo
data: {"message":"olleh","created_at":"2023-11-04T13:04:29.133048"}
id: 4
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:04:30.133555"}
id: 3
event: reverse_echo
data: {"message":"olleh","created_at":"2023-11-04T13:04:30.133599"}
id: 5
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:04:31.134985"}
id: 4
event: reverse_echo
data: {"message":"olleh","created_at":"2023-11-04T13:04:31.135166"}
总结
本文介绍了如何使用 FastAPI 和sse-starlette
构建 SSE API,实现了从服务端实时推送数据的方式。我们还讨论了如何通过 Pydantic 确保类型安全,提高代码可维护性,以及在一个 API 中合并多个流的方法。