ArkFlow - 高性能 Rust 流处理引擎
ArkFlow 是一个用 Rust 编写的高性能流处理引擎,提供强大的数据流处理能力。它支持多种数据源,包括 Kafka、MQTT、HTTP 和文件等,并提供 JSON、SQL、Protobuf 和批量处理等处理器。ArkFlow 基于 Rust 和 Tokio 构建,具有高性能和低延迟的特点,且设计模块化,易于扩展。目前该项目尚未达到生产可用状态。
chenquan/arkflow
高性能的 Rust 流处理引擎,提供强大的数据流处理能力,支持多种输入/输出源和处理器。
License
Apache-2.0 license 41 stars 0 forks Branches Tags Activity
ArkFlow
English | 中文
‼️ 尚未达到生产可用状态,请勿在生产环境中使用 ‼️
高性能的 Rust 流处理引擎,提供强大的数据流处理能力,支持多种输入/输出源和处理器。
特性
- 高性能: 构建于 Rust 和 Tokio 异步运行时之上,提供卓越的性能和低延迟。
- 多种数据源: 支持 Kafka, MQTT, HTTP, 文件和其他输入/输出源。
- 强大的处理能力: 内置 SQL 查询,JSON 处理,Protobuf 编码/解码,批量处理和其他处理器。
- 可扩展: 模块化设计,易于扩展新的输入、输出和处理器组件。
安装
从源码构建
# 克隆仓库
git clone https://github.com/chenquan/arkflow.git
cd arkflow
# 构建项目
cargo build --release
# 运行测试
cargo test
快速开始
- 创建一个配置文件
config.yaml
:
logging:
level: info
streams:
- input:
type: "generate"
context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
interval: 1s
batch_size: 10
pipeline:
thread_num: 4
processors:
- type: "json_to_arrow"
- type: "sql"
query: "SELECT * FROM flow WHERE value >= 10"
- type: "arrow_to_json"
output:
type: "stdout"
- 运行 ArkFlow:
./target/release/arkflow --config config.yaml
配置指南
ArkFlow 使用 YAML 格式的配置文件,支持以下主要配置项:
顶级配置
logging:
level: info # 日志级别: debug, info, warn, error
streams: # 流定义列表
- input: # 输入配置
# ...
pipeline: # 处理管道配置
# ...
output: # 输出配置
# ...
输入组件
ArkFlow 支持多种输入源:
- Kafka: 从 Kafka topic 读取数据
- MQTT: 订阅来自 MQTT topic 的消息
- HTTP: 通过 HTTP 接收数据
- File: 从文件读取数据
- Generator: 生成测试数据
- SQL: 从数据库查询数据
示例:
input:
type: kafka
brokers:
- localhost:9092
topics:
- test-topic
consumer_group: test-group
client_id: arkflow
start_from_latest: true
处理器
ArkFlow 提供多种数据处理器:
- JSON: JSON 数据处理和转换
- SQL: 使用 SQL 查询处理数据
- Protobuf: Protobuf 编码/解码
- Batch Processing: 批量处理消息
示例:
pipeline:
thread_num: 4
processors:
- type: json_to_arrow
- type: sql
query: "SELECT * FROM flow WHERE value >= 10"
- type: arrow_to_json
输出组件
ArkFlow 支持多种输出目标:
- Kafka: 将数据写入 Kafka topic
- MQTT: 将消息发布到 MQTT topic
- HTTP: 通过 HTTP 发送数据
- File: 将数据写入文件
- Standard Output: 将数据输出到控制台
示例:
output:
type: kafka
brokers:
- localhost:9092
topic: output-topic
client_id: arkflow-producer
示例
Kafka 到 Kafka 的数据处理
streams:
- input:
type: kafka
brokers:
- localhost:9092
topics:
- test-topic
consumer_group: test-group
pipeline:
thread_num: 4
processors:
- type: json_to_arrow
- type: sql
query: "SELECT * FROM flow WHERE value > 100"
- type: arrow_to_json
output:
type: kafka
brokers:
- localhost:9092
topic: processed-topic
生成测试数据并处理
streams:
- input:
type: "generate"
context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
interval: 1ms
batch_size: 10000
pipeline:
thread_num: 4
processors:
- type: "json_to_arrow"
- type: "sql"
query: "SELECT count(*) FROM flow WHERE value >= 10 group by sensor"
- type: "arrow_to_json"
output:
type: "stdout"
License
ArkFlow 使用 Apache License 2.0 协议。