chenquan/arkflow

高性能的 Rust 流处理引擎,提供强大的数据流处理能力,支持多种输入/输出源和处理器。

License

Apache-2.0 license 41 stars 0 forks Branches Tags Activity

ArkFlow

English | 中文

‼️ 尚未达到生产可用状态,请勿在生产环境中使用 ‼️

Rust License

高性能的 Rust 流处理引擎,提供强大的数据流处理能力,支持多种输入/输出源和处理器。

特性

安装

从源码构建

# 克隆仓库
git clone https://github.com/chenquan/arkflow.git
cd arkflow
# 构建项目
cargo build --release
# 运行测试
cargo test

快速开始

  1. 创建一个配置文件 config.yaml:
logging:
  level: info
streams:
  - input:
    type: "generate"
    context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
    interval: 1s
    batch_size: 10
  pipeline:
    thread_num: 4
    processors:
      - type: "json_to_arrow"
      - type: "sql"
        query: "SELECT * FROM flow WHERE value >= 10"
      - type: "arrow_to_json"
  output:
    type: "stdout"
  1. 运行 ArkFlow:
./target/release/arkflow --config config.yaml

配置指南

ArkFlow 使用 YAML 格式的配置文件,支持以下主要配置项:

顶级配置

logging:
  level: info # 日志级别: debug, info, warn, error
streams:    # 流定义列表
  - input:   # 输入配置
    # ...
    pipeline:  # 处理管道配置
    # ...
    output:   # 输出配置
    # ...

输入组件

ArkFlow 支持多种输入源:

示例:

input:
  type: kafka
  brokers:
    - localhost:9092
  topics:
    - test-topic
  consumer_group: test-group
  client_id: arkflow
  start_from_latest: true

处理器

ArkFlow 提供多种数据处理器:

示例:

pipeline:
  thread_num: 4
  processors:
    - type: json_to_arrow
    - type: sql
      query: "SELECT * FROM flow WHERE value >= 10"
    - type: arrow_to_json

输出组件

ArkFlow 支持多种输出目标:

示例:

output:
  type: kafka
  brokers:
    - localhost:9092
  topic: output-topic
  client_id: arkflow-producer

示例

Kafka 到 Kafka 的数据处理

streams:
  - input:
    type: kafka
    brokers:
      - localhost:9092
    topics:
      - test-topic
    consumer_group: test-group
  pipeline:
    thread_num: 4
    processors:
      - type: json_to_arrow
      - type: sql
        query: "SELECT * FROM flow WHERE value > 100"
      - type: arrow_to_json
  output:
    type: kafka
    brokers:
      - localhost:9092
    topic: processed-topic

生成测试数据并处理

streams:
  - input:
    type: "generate"
    context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
    interval: 1ms
    batch_size: 10000
  pipeline:
    thread_num: 4
    processors:
      - type: "json_to_arrow"
      - type: "sql"
        query: "SELECT count(*) FROM flow WHERE value >= 10 group by sensor"
      - type: "arrow_to_json"
  output:
    type: "stdout"

License

ArkFlow 使用 Apache License 2.0 协议。