Show HN:Slipstream - 用于状态流处理的 Python 库
`Slipstream` 是一个基于 `Python` 的状态流处理库,旨在简化状态流应用的开发。它具有简洁性、自由性和速度的特点,支持并行处理,允许使用任意代码,并提供优化配置。`Slipstream` 可以消费来自 `Kafka` 等 `Async Iterable` 的数据源,并将数据汇集或缓存到 `Kafka`、`RocksDB` 等目标。文章通过一个简单的示例演示了如何使用 `Slipstream` 创建一个定时器,并展示了其核心功能,如源、目标和处理器。
Documentation
Slipstream
提供了一种数据流模型,用于简化状态流应用程序的开发。
- Simplicity(简洁性): 并行处理,映射源到目标。
- Freedom(自由性): 允许任意代码,避免限制性的抽象。
- Speed(速度): 优化和可配置的默认设置,以便快速开始。
消费任何可以转换为 Async Iterable
的源;Kafka
、流式 API 等。将数据汇集或缓存到任何 Callable
中;Kafka
、RocksDB
、API。使用常规 Python
代码执行任何任意状态操作 – 连接、聚合、过滤。检测依赖流的停机时间,暂停依赖流,或发出更正。
Demo
因为一切都是用基本的 Python
构建块构建的,所以可以轻松地制作类似框架的功能。 例如,虽然不包含计时器,但您可以毫不费力地创建一个:
from asyncio import run, sleep
async def timer(interval=1.0):
while True:
await sleep(interval)
yield
我们将使用 print
作为我们的目标:
print
让我们以 1 秒的固定间隔向下游发送我们的吉祥物 🐟 – blub:
from slipstream import handle, stream
@handle(timer(), sink=[print])
def handler():
yield '🐟 - blub'
run(stream())
# 🐟 - blub
# 🐟 - blub
# 🐟 - blub
# ...
一些突出的地方:
- 我们创建了一个
Async Iterable
源timer()
(不生成数据,只是触发handler
)。 - 我们使用了
[slipstream.handle
](https://slipstream.readthedocs.io/en/1.0.1/<slipstream.html#slipstream.handle> "slipstream.handle") 将源和目标绑定到handler
函数。 - 我们产生了
🐟 - blub
,它被发送到所有Callable
目标(在本例中只有print
)。 - 运行
[slipstream.stream
](https://slipstream.readthedocs.io/en/1.0.1/<slipstream.html#slipstream.stream> "slipstream.stream") 从源开始,通过handler
进入目标。
这种数据流模型简化了状态流应用程序的开发!
Contents
通过与 Kafka
交互并在以下位置缓存应用程序状态来继续:getting started。