HTTP Feeds

使用简单的 HTTP APIs 实现异步事件流数据复制

HTTP feeds 是一种基于 HTTP 进行事件轮询的最小化规范:

HTTP feeds 可用于异步解耦系统无需消息中间件,例如 Kafka 或 RabbitMQ。

示例

GET /inventory HTTP/1.1
Host: https://example.http-feeds.org

HTTP/1.1 200 OK
Content-Type: application/cloudevents-batch+json
[{
 "specversion" : "1.0",
 "type" : "org.http-feeds.example.inventory",
 "source" : "https://example.http-feeds.org/inventory",
 "id" : "1c6b8c6e-d8d0-4a91-b51c-1f56bd04c758",
 "time" : "2021-01-01T00:00:01Z",
 "subject" : "9521234567899",
 "data" : {
  "sku": "9521234567899",
  "updated": "2022-01-01T00:00:01Z",
  "quantity": 5
 }
},{
 "specversion" : "1.0",
 "type" : "org.http-feeds.example.inventory",
 "source" : "https://example.http-feeds.org/inventory",
 "id" : "292042fb-ab04-4653-af90-19a24032bffe",
 "time" : "2021-12-01T00:00:15Z",
 "subject" : "9521234512349",
 "data" : {
  "sku": "9521234512349",
  "updated": "2022-01-01T00:00:12Z",
  "quantity": 0
 }
},{
 "specversion" : "1.0",
 "type" : "org.http-feeds.example.inventory",
 "source" : "https://example.http-feeds.org/inventory",
 "id" : "fa3e2a22-398c-4d02-ad08-9415e43178e6",
 "time" : "2021-01-01T00:00:22Z",
 "subject" : "9521234567899",
 "data" : {
  "sku": "9521234567899",
  "updated": "2022-01-01T00:00:21Z",
  "quantity": 4
 }
}]

客户端使用上次处理的事件 ID 再次调用。

GET /inventory?lastEventId=fa3e2a22-398c-4d02-ad08-9415e43178e6 HTTP/1.1
Host: https://example.http-feeds.org

HTTP/1.1 200 OK
Content-Type: application/cloudevents-batch+json
[]

空数组表示 feed 的结束。

轮询

客户端可以继续在一个无限循环中轮询以订阅一个 feed。

简单轮询

客户端在一个无限循环中,使用已知的最后一个事件 ID 作为 lastEventId 查询参数来调用端点。如果响应是一个空数组,则客户端到达流的末尾,并等待一段时间以进行另一次调用,从而获取在此期间发生的事件。

伪代码:

endpoint = "https://example.http-feeds.org/inventory"
lastEventId = null
while true:
 try:
  response = GET endpoint + "?lastEventId=" + lastEventId 
  for event in response:
   process event
   lastEventId = event.id
  if response is empty:
   wait N seconds 
 except:
  wait N seconds 

客户端必须持久化最后处理的事件的 id 作为 lastEventId,用于进一步的获取。

客户端的事件处理必须是幂等的(至少一次的传递语义)。id 可以用于幂等性检查。

长轮询

服务器也可以支持长轮询以降低延迟。客户端添加一个 timeout 查询参数来指定等待响应的最长时间(以毫秒为单位)。

客户端伪代码:

endpoint = "https://example.http-feeds.org/inventory"
lastEventId = null
timeout = 5000 // 5000 milliseconds is a good timeout period for long polling
while true:
 try:
  response = GET endpoint + "?lastEventId=" + lastEventId + "&timeout=" + timeout
  for event in response:
   process event
   lastEventId = event.id
  // no client wait step within the loop
 except:
  // Delay the next request only in case of a server error
  wait N seconds

如果没有更新的事件可用,服务器会保持连接打开,直到新事件到达或定义的超时时间到期。 然后,服务器发送响应(带有新事件或空数组),客户端可以立即执行另一次调用。

由于服务器可以通过实现内部事件通知、变更数据捕获触发器或对数据库执行高频率轮询来有效地对新事件做出反应,因此可以提高延迟。

长轮询的成本是服务器需要并发处理更多打开的连接。当连接数超过 10K 连接 时,这可能会成为问题。

Event ID

event.id 用作 lastEventId 以滚动浏览更多事件。 这意味着事件需要强排序才能检索后续事件。

事件也可能从 feed 中删除,例如通过压缩删除。 即使删除了 lastEventId 的事件,服务器也必须仍然尊重原始位置,并且只发送较新的事件。

实现此目的的一种方法是使用按时间排序的 UUIDv6 作为事件 ID(请参见IETF 草案Java-Library)。 这是一种可行的选择,特别是如果只有一个服务器将新事件附加到 feed 中,但是当多个服务器的时钟不同步时,可能会出现问题。

另一种方法是使用数据库序列,该序列用作 event.id 的一部分,并在查询数据库中的下一个批次时进行解释。 示例:event.id 0000001000001::5f8de8ff-30d8-4fab-8f5a-c32f326d6f26 包含一个数据库序列 0000001000001 和一个随机 UUID。

Event Feeds

HTTP feeds 可用于为将不可变的领域事件发布到其他系统的API。

一个 http feed 端点包含属于同一有界上下文的不同事件 type 是很常见的。

Aggregate Feeds

HTTP feeds 可用于为将可变对象(又名聚合,主数据)的数据集合提供给其他系统的API。

聚合通过其 subject 标识。 聚合 feed 必须至少包含每个聚合一次。 每个创建的聚合和每次更新都会导致附加 feed 条目,其中包含完整的当前状态。

Feed 使用者可以订阅聚合 feed以执行近乎实时的数据同步,以构建本地读取模型并在收到新的或更新的数据时触发操作。 当 feed 使用者到达 feed 的末尾时,它具有一致的状态。

服务器应实现压缩,并且可以根据业务需求实现删除

压缩

每个聚合更新都会导致 feed 中添加一个条目。 保持 feed 较小是一种好习惯,这样可以快速同步新客户端。 当 feed 项包含资源的完整当前状态时,同一聚合的较旧 feed 项可能已过时。

因此,从头开始读取 feed 的客户端会在短时间内出现不一致的状态。 为了缓解这种情况,当另一个条目附加到具有相同 subject 的 feed 时,可以从 feed 中删除条目。

示例:subject 9521234567899 有更新。

HTTP/1.1 200 OK
Content-Type: application/cloudevents-batch+json
[{
 "specversion" : "1.0",
 "type" : "org.http-feeds.example.inventory",
 "source" : "https://example.http-feeds.org/inventory",
 "id" : "1c6b8c6e-d8d0-4a91-b51c-1f56bd04c758",
 "time" : "2021-01-01T00:00:01Z",
 "subject" : "9521234567899",
 "data" : {
  "sku": "9521234567899",
  "updated": "2022-01-01T00:00:01Z",
  "quantity": 5
 }
},{
 "specversion" : "1.0",
 "type" : "org.http-feeds.example.inventory",
 "source" : "https://example.http-feeds.org/inventory",
 "id" : "292042fb-ab04-4653-af90-19a24032bffe",
 "time" : "2021-12-01T00:00:15Z",
 "subject" : "9521234512349",
 "data" : {
  "sku": "9521234512349",
  "updated": "2022-01-01T00:00:12Z",
  "quantity": 0
 }
},{
 "specversion" : "1.0",
 "type" : "org.http-feeds.example.inventory",
 "source" : "https://example.http-feeds.org/inventory",
 "id" : "fa3e2a22-398c-4d02-ad08-9415e43178e6",
 "time" : "2021-01-01T00:00:22Z",
 "subject" : "9521234567899",
 "data" : {
  "sku": "9521234567899",
  "updated": "2022-01-01T00:00:21Z",
  "quantity": 4
 }
}]

在压缩运行后,第一个条目消失:

HTTP/1.1 200 OK
Content-Type: application/cloudevents-batch+json
[{
 "specversion" : "1.0",
 "type" : "org.http-feeds.example.inventory",
 "source" : "https://example.http-feeds.org/inventory",
 "id" : "292042fb-ab04-4653-af90-19a24032bffe",
 "time" : "2021-12-01T00:00:15Z",
 "subject" : "9521234512349",
 "data" : {
  "sku": "9521234512349",
  "updated": "2022-01-01T00:00:12Z",
  "quantity": 0
 }
},{
 "specversion" : "1.0",
 "type" : "org.http-feeds.example.inventory",
 "source" : "https://example.http-feeds.org/inventory",
 "id" : "fa3e2a22-398c-4d02-ad08-9415e43178e6",
 "time" : "2021-01-01T00:00:22Z",
 "subject" : "9521234567899",
 "data" : {
  "sku": "9521234567899",
  "updated": "2022-01-01T00:00:21Z",
  "quantity": 4
 }
}]

删除

某些聚合需要删除,例如由于法规要求。

聚合 feed 使用带有值 DELETEmethod 字段来向之前构建本地读取模型的消费者发出 subject 删除的信号。

当聚合被删除时,服务器必须附加一个带有要删除的 subject 且没有数据内容的 DELETE 条目。

{"specversion":"1.0","type":"org.http-feeds.example.inventory","source":"https://example.http-feeds.org/inventory","id":"06b13630-e4c3-4d85-a669-ce66fc4daa75","time":"2021-12-31T00:00:01Z","subject":"9521234567899","method":"DELETE"}

客户端必须删除此聚合或以其他方式处理删除。

服务器应该在之后启动压缩运行,以删除同一聚合的先前条目。

数据模型

HTTP feed 端点必须支持以下查询参数:

查询参数 | 类型 | 强制性 | 描述 ---|---|---|--- lastEventId | String | 可选 | 上次处理的事件 id。 用于滚动浏览更多事件。 可能缺少或 null,表示从 feed 的开头开始。 timeout | Number | 可选 | 设置超时,当长轮询应该使用并且被服务器支持时。 长轮询的最长等待时间(以毫秒为单位),之后服务器必须发送响应。 一个典型的值是 5000

响应 body 包含一个事件数组,这些事件符合 application/cloudevents-batch+json 格式中的 CloudEvents Specification

字段 | 类型 | Event Feed | Aggregate Feed | 描述 ---|---|---|---|--- specversion | String | 强制性 | 强制性 | 当前支持的 CloudEvents 规范版本。 id | String | 强制性 | 强制性 | 此事件的唯一值(例如 UUID)。 它可以用于在下游系统中实现重复数据删除/幂等性处理。 在后续调用中用作 lastEventIdtype | String | 强制性 | 强制性 | 事件的类型。 可以用于指定和反序列化有效负载。 一个 feed 可能包含不同的事件类型。 它应该以反向 DNS 名称作为前缀。 source | String | 强制性 | 强制性 | 创建事件的源系统。 应该是系统的 URI。 time | String | 强制性 | 强制性 | 事件添加时间戳。 ISO 8601 UTC 日期和时间格式。 subject | String | n/a | 强制性 | 用于标识业务对象的键。 它不必在 feed 中是唯一的。 这应该代表一个业务键,例如订单号或 SKU。 用于压缩删除(如果已实现)。 method | String | n/a | 可选 | feed 项在 subject 上执行的 HTTP 等效方法类型。 PUT 表示已创建或更新 subjectDELETE 表示已删除 subject。 默认为 PUTdatacontenttype | String | 可选 | 可选 | 默认为 application/jsondata | Object | 强制性 | 可选 | 项的有效负载。 默认为 JSON。 可能缺少,例如当方法为 DELETE 时。

可以添加其他元数据,例如用于可追溯性。

认证

HTTP feeds 可以使用 HTTP authentication 进行保护。

最常见的身份验证方案是 BasicBearer

服务器可以根据主体筛选 feed 项。 当应用筛选时,缓存可能不可行。

缓存

当一个批次已满且不再会被修改时,服务器可以设置适当的响应头,例如 Cache-Control: public, max-age=31536000

库和示例

关于

此站点由 Jochen Christ 维护。 Jochen 在 INNOQ 担任技术负责人,也是 WhichJDK.comRemote Mob ProgrammingData Mesh Architecture 的作者。

HTTP feeds 由 rest-feeds 演变而来。

非常感谢大家的贡献,例如使用不同语言或框架的库或示例会有很大帮助。

发现错误或缺少某些内容? 请提出问题创建拉取请求

本规范根据 CC BY 4.0 发布。