Documentation

_images/logo.png

Slipstream 提供了一种数据流模型,用于简化状态流应用程序的开发。

消费任何可以转换为 Async Iterable 的源;Kafka、流式 API 等。将数据汇集或缓存到任何 Callable 中;KafkaRocksDB、API。使用常规 Python 代码执行任何任意状态操作 – 连接、聚合、过滤。检测依赖流的停机时间,暂停依赖流,或发出更正。

Demo

因为一切都是用基本的 Python 构建块构建的,所以可以轻松地制作类似框架的功能。 例如,虽然不包含计时器,但您可以毫不费力地创建一个:

from asyncio import run, sleep
async def timer(interval=1.0):
  while True:
    await sleep(interval)
    yield

我们将使用 print 作为我们的目标:

print

让我们以 1 秒的固定间隔向下游发送我们的吉祥物 🐟 – blub

from slipstream import handle, stream
@handle(timer(), sink=[print])
def handler():
  yield '🐟 - blub'
run(stream())

# 🐟 - blub
# 🐟 - blub
# 🐟 - blub
# ...

一些突出的地方:

这种数据流模型简化了状态流应用程序的开发!

Contents

通过与 Kafka 交互并在以下位置缓存应用程序状态来继续:getting started