Show HN: GlassFlow – 开源流式数据去重与 Kafka 到 ClickHouse 的 Join

跳至内容

glassflow / **clickhouse-etl ** Public

实时去重和流式数据的时间 Join

glassflow.dev/

License

Apache-2.0 license 82 stars 2 forks Branches Tags Activity Star Notifications You must be signed in to change notification settings

glassflow/clickhouse-etl

main BranchesTags Go to file Code

Folders and files

Name| Name| Last commit message| Last commit date
---|---|---|---

Latest commit

History

207 Commits
.github| .github
demos| demos
dev| dev
docs/assets| docs/assets
glassflow-api| glassflow-api
nats-kafka-bridge| nats-kafka-bridge
nginx| nginx
ui| ui
.gitignore| .gitignore
CODE_OF_CONDUCT.md| CODE_OF_CONDUCT.md
CONTRIBUTING.md| CONTRIBUTING.md
LICENSE| LICENSE
Makefile| Makefile
NOTICE| NOTICE
README.md| README.md
docker-compose.yaml| docker-compose.yaml
View all files

Repository files navigation

GlassFlow Logo Report Bug · Feature Request · Get Help Join our weekly office hours every Wednesday 15:00-18:00 CET Join Next Office Hour Slack Email Support Twitter

GlassFlow for ClickHouse Streaming ETL

GlassFlow for ClickHouse Streaming ETL 是一个实时流处理器,旨在简化 Kafka 和 ClickHouse 之间的数据管道创建和管理。它提供了一个强大的、用户友好的界面,用于构建和管理具有内置去重和时间 Join 支持的实时数据管道。

GlassFlow 专为数据工程师构建,可以处理迟到的事件,确保 exactly-once 的正确性,并通过高吞吐量数据进行扩展。它可以从流式数据中提供准确的、低延迟的结果,而不会影响简单性或性能。该工具的直观 Web 界面使配置和监控管道变得容易,而其强大的架构可确保可靠的数据处理。

主要特性

GlassFlow ClickHouse ETL Introduction

快速开始

前提条件

本地开发设置

  1. 克隆存储库:
git clone https://github.com/glassflow/clickhouse-etl.git
cd clickhouse-etl
  1. 使用 Docker Compose 启动服务:
docker-compose up
  1. 访问 http://localhost:8080 上的 Web 界面以配置您的管道:

用法

创建管道

  1. 访问 http://localhost:8080 上的 Web 界面
  2. 配置您的源 (Kafka) 和 sink (ClickHouse) 连接
  3. 定义您的管道转换:
    • 设置去重规则
    • 配置流之间的时间 Join
    • 定义数据转换
  4. 启动管道并监控其进度

本地测试

该项目在 demos 文件夹中包含一个全面的演示设置,该设置提供了完整的本地测试环境。此设置包括:

有关设置和运行本地测试环境的详细说明,请参阅 Demo README file.

架构

该项目包含几个关键组件:

有关第三方组件及其许可证的信息,请参阅我们的 NOTICE 文件。

管道配置

管道配置在 JSON 文件中定义,该文件指定源、sink 和任何转换。虽然 Web 界面会自动生成此配置,但了解其结构可能对高级用户有所帮助。

管道配置是一个 JSON 对象,用于定义数据如何从 Kafka 主题流向 ClickHouse 表。它由三个主要组件组成:

  1. 源配置 (Kafka)
  2. Sink 配置 (ClickHouse)
  3. Join 配置 (可选)

根配置

字段 | 类型 | 必需 | 描述
---|---|---|---
pipeline_id | string | 是 | 管道的唯一标识符。必须为非空。
source | object | 是 | Kafka 源的配置。请参阅源配置
sink | object | 是 | ClickHouse sink 的配置。请参阅Sink 配置
join | object | 否 | 用于 Join 多个 Kafka 主题的配置。请参阅Join 配置

源配置

源配置定义了如何连接到 Kafka 主题并从中消费。 字段 | 类型 | 必需 | 描述
---|---|---|---
type | string | 是 | "kafka" 是唯一支持的源
provider | string | 否 | Kafka 提供者,例如 "aiven"
topics | array | 是 | 要消费的 Kafka 主题列表。请参阅主题配置
connection_params | object | 是 | Kafka 连接参数。请参阅连接参数

连接参数

字段 | 类型 | 必需 | 描述
---|---|---|---
brokers | array | 是 | Kafka 代理地址列表 (例如,["localhost:9092"])。
protocol | string | 是 | Kafka 连接的安全协议 (例如,"SASL_SSL")。
mechanism | string | 是 | 身份验证机制 (例如,"SCRAM-SHA-256")。
username | string | 是 | Kafka 身份验证的用户名。
password | string | 是 | Kafka 身份验证的密码。
root_ca | string | 否 | Kafka 身份验证的 Cert 文件。

主题配置

topics 数组中的每个主题都具有以下配置: 字段 | 类型 | 必需 | 描述
---|---|---|---
name | string | 是 | Kafka 主题的名称。
consumer_group_initial_offset | string | 是 | 消费者组的初始偏移量("earliest" 或 "newest")。
schema | object | 是 | 事件模式定义。请参阅模式配置
deduplication | object | 是 | 去重设置。请参阅去重配置

模式配置

字段 | 类型 | 必需 | 描述
---|---|---|---
type | string | 是 | 模式类型(目前仅支持 "json")。
fields | array | 是 | 字段定义列表。请参阅字段配置

字段配置

字段 | 类型 | 必需 | 描述
---|---|---|---
name | string | 是 | 字段名称。
type | string | 是 | 字段类型 (例如,"String"、"Integer")。

去重配置

字段 | 类型 | 必需 | 描述
---|---|---|---
enabled | boolean | 是 | 是否启用去重。
id_field | string | 是 | 用于消息去重的字段名称。
id_field_type | string | 是 | ID 字段的类型 (例如,"string")。
time_window | string | 是 | 去重的时间窗口 (例如,"1h" 表示一小时)。

Sink 配置

Sink 配置定义了如何连接到 ClickHouse 并向其写入。 字段 | 类型 | 必需 | 描述
---|---|---|---
type | string | 是 | 必须为 "clickhouse"。
host | string | 是 | ClickHouse 服务器主机名。
port | integer | 是 | ClickHouse 服务器端口。
database | string | 是 | ClickHouse 数据库名称。
username | string | 是 | ClickHouse 用户名。
password | string | 是 | ClickHouse 密码。
table | string | 是 | 目标表名称。
secure | boolean | 否 | 是否使用安全连接。默认为 false。
max_batch_size | integer | 否 | 写入之前要批处理的最大记录数。默认为 1000。
max_delay_time | string | 否 | 消息刷新到 sink 之前的最大延迟时间。默认为 "10m"。
table_mapping | array | 是 | 字段到列的映射列表。请参阅表映射配置

表映射配置

table_mapping 数组中的每个映射都具有以下配置: 字段 | 类型 | 必需 | 描述
---|---|---|---
source_id | string | 是 | 源主题的名称。
field_name | string | 是 | 源字段名称。
column_name | string | 是 | 目标列名称。
column_type | string | 是 | 目标列类型。

Join 配置

Join 配置定义了如何 Join 来自多个 Kafka 主题的数据。 字段 | 类型 | 必需 | 描述
---|---|---|---
enabled | boolean | 是 | 是否启用 Join。
type | string | 是 | Join 类型 (例如,"temporal")。
sources | array | 是 | 要 Join 的源列表。请参阅Join 源配置

Join 源配置

sources 数组中的每个源都具有以下配置: 字段 | 类型 | 必需 | 描述
---|---|---|---
source_id | string | 是 | 要 Join 的 Kafka 主题的名称。
join_key | string | 是 | 用于 Join 记录的字段名称。
time_window | string | 是 | 用于 Join 记录的时间窗口 (例如,"1h" 表示一小时)。
orientation | string | 是 | Join 方向("left" 或 "right")。

示例配置

{
 "pipeline_id": "kafka-to-clickhouse-pipeline",
 "source": {
  "type": "kafka",
  "provider": "aiven",
  "connection_params": {
   "brokers": [
    "kafka-broker-0:9092",
    "kafka-broker-1:9092"
   ],
   "protocol": "SASL_SSL",
   "mechanism": "SCRAM-SHA-256",
   "username": "<user>",
   "password": "<password>",
   "root_ca": "<base64 encoded ca>"
  },
  "topics": [
   {
    "consumer_group_initial_offset": "earliest",
    "name": "user_logins",
    "schema": {
     "type": "json",
     "fields": [
      {
       "name": "session_id",
       "type": "string"
      },
      {
       "name": "user_id",
       "type": "string"
      },
      {
       "name": "timestamp",
       "type": "datetime"
      }
     ]
    },
    "deduplication": {
     "enabled": true,
     "id_field": "session_id",
     "id_field_type": "string",
     "time_window": "12h"
    }
   },
   {
    "consumer_group_initial_offset": "earliest",
    "name": "orders",
    "schema": {
     "type": "json",
     "fields": [
      {
       "name": "user_id",
       "type": "string"
      },
      {
       "name": "order_id",
       "type": "string"
      },
      {
       "name": "timestamp",
       "type": "datetime"
      }
     ]
    },
    "deduplication": {
     "enabled": true,
     "id_field": "order_id",
     "id_field_type": "string",
     "time_window": "12h"
    }
   }
  ]
 },
 "join": {
  "enabled": false,
  "type": "temporal",
  "sources": [
   {
    "source_id": "user_logins",
    "join_key": "user_id",
    "time_window": "1h",
    "orientation": "left"
   },
   {
    "source_id": "orders",
    "join_key": "user_id",
    "time_window": "1h",
    "orientation": "right"
   }
  ]
 },
 "sink": {
  "type": "clickhouse",
  "provider": "aiven",
  "host": "<host>",
  "port": "12753",
  "database": "default",
  "username": "<user>",
  "password": "<password>",
  "secure": true,
  "max_batch_size": 1,
  "max_delay_time": "10m",
  "table": "user_orders",
  "table_mapping": [
   {
    "source_id": "user_logins",
    "field_name": "session_id",
    "column_name": "session_id",
    "column_type": "UUID"
   },
   {
    "source_id": "user_logins",
    "field_name": "user_id",
    "column_name": "user_id",
    "column_type": "UUID"
   },
   {
    "source_id": "orders",
    "field_name": "order_id",
    "column_name": "order_id",
    "column_type": "UUID"
   },
   {
    "source_id": "user_logins",
    "field_name": "timestamp",
    "column_name": "login_at",
    "column_type": "DataTime"
   },
   {
    "source_id": "orders",
    "field_name": "timestamp",
    "column_name": "order_placed_at",
    "column_type": "DateTime"
   }
  ]
 }
}

💡 注意 : Web 界面会根据用户输入自动生成此配置,因此无需手动编辑。

贡献

欢迎贡献! 请参阅我们的 Contributing Guidelines 了解详情。

许可证

本项目采用 Apache License 2.0 授权。

关于

Real-time deduplication and temporal joins for streaming data glassflow.dev/

Resources

Readme

License

Apache-2.0 license

Code of conduct

Code of conduct Activity Custom properties

Stars

82 stars

Watchers

3 watching

Forks

2 forks Report repository

Releases 7

v0.0.1 Latest Apr 28, 2025 + 6 releases

Packages 0

No packages published

[Contributors 5](https://github