Axitinib
781 文字
4 分
FastAPI 实现 SSE API

前言#

最近,LLM 与 OpenAI 非常流行, 相信很多人从 Complete 和 Chat API 中的 stream 模式第一次接触到了 SSE API。SSE(Server-Sent Events)是一种支持从 Server Side 向 Client Side 推送事件的方式。它与传统的 Restful API 不同,传统的 Restful 架构只能从服务器端发起请求,然后客户端返回响应的方式进行数据传输。在需要实时更新数据的场景中,只能依赖轮询或者异步任务机制来实现。但前者性能不高,而后者会增加前后端架构的复杂性。

WebSocket 也是一个选择,但实现则更加复杂。

对于需要服务端主动向客户端推送消息,而客户端不需要向服务器发送信息的情况下,SSE 会是一种高性能且简单的实现方式。

本文介绍了如何使用 python 和 FastAPI 实现 SSE API,并且探讨了如何结合 Pydantic 书写类型安全的代码

使用FastAPIsse-starlette实现 SSE API#

sse-starlette包包含了 SSE 的大部分实现。代码实现非常简单:

https://github.com/akishichinibu/zenn-doc/blob/b5889866383b0a24cf9d2c7bb5a887ff1ede3a51/code/1423539f71d8c2/main.py#L1-L27

创建一个返回ServerSentEventAsyncGenerator,然后作为EventSourceResponse的 content 返回即可。

ServerSentEvent的三个参数event,iddata分别可以用来设置服务端事件的事件名,事件 ID 和 Payload 的内容。

输出:


curl -N 'http://127.0.0.1:12345/sse?message=hello'

id: 0
event: echo
data: {'message': 'hello', 'created_at': 1699067601.359488}

id: 1
event: echo
data: {'message': 'hello', 'created_at': 1699067602.36079}

id: 2
event: echo
data: {'message': 'hello', 'created_at': 1699067603.361532}

id: 3
event: echo
data: {'message': 'hello', 'created_at': 1699067604.365153}

id: 4
event: echo
data: {'message': 'hello', 'created_at': 1699067605.367895}

基于 Pydantic 保证类型安全#

在上述代码中,事件名使用了字面量,返回数据的内容是字典。当涉及管理复杂的数据内容结构和多个事件时,这可能会变得很麻烦。这时可以结合 Pydantic 写出更加类型安全的代码。

首先创建一个描述事件的类BaseEvent[T],它根据类名推断事件名,并包含一个类型为T的 Payload。

https://github.com/akishichinibu/zenn-doc/blob/b5889866383b0a24cf9d2c7bb5a887ff1ede3a51/code/1423539f71d8c2/main2.py#L13-L26

然后创建一个装饰器,将BaseEvent[T]的流转换为ServerSentEvent一个的流。 https://github.com/akishichinibu/zenn-doc/blob/b5889866383b0a24cf9d2c7bb5a887ff1ede3a51/code/1423539f71d8c2/main2.py#L32-L54

然后我们定义 Echo 的 PayloadEchoPayload,然后定义事件Echo,最后创建事件流echo_stream2

https://github.com/akishichinibu/zenn-doc/blob/b5889866383b0a24cf9d2c7bb5a887ff1ede3a51/code/1423539f71d8c2/main2.py#L57-L76

curl -N 'http://127.0.0.1:12345/sse2?message=hello'
id: 0
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:02:10.760797"}

id: 1
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:02:11.762338"}

id: 2
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:02:12.764145"}

id: 3
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:02:13.765405"}

id: 4
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:02:14.765821"}

id: 5
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:02:15.767593"}

如果需要在一个 API 中合并多个流#

可以使用aiostream中的merge操作符。

创建另外一个流echo_stream3并且将其与echo_stream2进行合并,可以看到两个流交替输出。

https://github.com/akishichinibu/zenn-doc/blob/b5889866383b0a24cf9d2c7bb5a887ff1ede3a51/code/1423539f71d8c2/main2.py#L84-L109

curl -N 'http://127.0.0.1:12345/sse3?message=hello'

id: 0
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:04:26.129152"}

id: 1
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:04:27.130363"}

id: 0
event: reverse_echo
data: {"message":"olleh","created_at":"2023-11-04T13:04:27.130311"}

id: 2
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:04:28.131686"}

id: 1
event: reverse_echo
data: {"message":"olleh","created_at":"2023-11-04T13:04:28.131802"}

id: 3
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:04:29.132987"}

id: 2
event: reverse_echo
data: {"message":"olleh","created_at":"2023-11-04T13:04:29.133048"}

id: 4
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:04:30.133555"}

id: 3
event: reverse_echo
data: {"message":"olleh","created_at":"2023-11-04T13:04:30.133599"}

id: 5
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:04:31.134985"}

id: 4
event: reverse_echo
data: {"message":"olleh","created_at":"2023-11-04T13:04:31.135166"}

总结#

本文介绍了如何使用 FastAPI 和sse-starlette构建 SSE API,实现了从服务端实时推送数据的方式。我们还讨论了如何通过 Pydantic 确保类型安全,提高代码可维护性,以及在一个 API 中合并多个流的方法。

FastAPI 实现 SSE API
https://akishichinibu.github.io/posts/20240713sse/
作者
Axitinib
公開日
2024-07-13
ライセンス
CC BY-NC-SA 4.0