构建幂等的 Email API:利用 River Unique Jobs

DocsFeaturesBlogAbout River ProRiver on GitHub Theme All articles Blake Gentry

2025 年 3 月 24 日

我们使用邮件服务来发送各种与 River 相关的通知。 尽管我们尽了最大努力设置 SPF、DKIM 和 DMARC,但我们仍然收到邮件进入垃圾邮件的报告,因此我们开始研究其他的邮件提供商,以了解它们的情况。 在研究过程中,我们注意到一个令人震惊的现象:极少数邮件提供商具有 API 工具来保证幂等性——这令人惊讶,因为电子邮件是一个非常敏感的主题。 也许没有信用卡扣款那么重要,但如果用户没有收到他们期望的重要电子邮件,或者意外地收到了两封,至少会让人感到烦恼。

如果在两个服务之间发生通信错误(例如 River 与其邮件服务通信以发送电子邮件),客户端无法确定其请求是否已成功传输,因此它必须重试以确保操作成功。 最好使用支持幂等性的 API 来实现这一点,以便第一次之后的请求不会重复执行原始操作。

在这里,我们将演示如何使用 River 来提供一个玩具版本的邮件 API,它是幂等安全的。 我们选择邮件作为我们的测试对象,相同的技术广泛适用于许多需要良好幂等行为的地方——确认付款、注册域名、提交 AI prompt 或任何其他用途。

什么是 Idempotency Key?

一些 HTTP 动词(如 GETPUTDELETE)旨在具有固有的幂等性,重复调用不应产生副作用。 这些动词涉及资源读取、替换或删除,并且稍微注意一下,幂等实现对于这些类型的 CRUD 操作通常非常简单:

Terminal window


# reads are usually idempotent with no additional work
GET/contacts/123
# deletes may return an error on repeated calls, but generally have no
# additional side effects, and are therefore considered idempotent
DELETE/contacts/123

PUTDELETE 这样的修改操作比读取操作需要更加谨慎,但特别是在使用像 Postgres 这样的 ACID 数据库时,只要所有更改都在一个事务中发生,通常很容易实现幂等性。

但并非所有操作都如此简单。 当请求具有无法限制在事务中的副作用时,API 需要提供一些其他机制来保证幂等性。 我们的玩具电子邮件服务将尝试通过 SMTP 发送邮件,而这没有任何事务性可言。

像 Stripe 这样的 API 开创了一个名为 "Idempotency Key" 的概念,它是一个与请求一起传输的唯一值:

Terminal window


curl-XPOSThttps://api.stripe.com/v1/customers\
-H"Idempotency-Key: KG5LxwFBepaKHyUD"

如果结果不确定,客户端将使用与第一次相同的 Idempotency Key 重试。 如果服务器第一次看到该密钥,则请求将正常执行。 如果之前已经看到过该密钥,则请求会响应一个有效结果,告知客户端可以停止重试。

定义 River Job

让我们编写一个 River Job 来发送电子邮件。 在现实生活中,发送电子邮件很复杂,发送者必须担心 SFP、DKIM、DMARC、过滤、源 IP 和声誉,但为了演示的目的,我们将展示最简单的 SMTP 发送。 我们试图展示的是幂等性的使用,无论我们的邮件代码变得多么复杂,它都将保持不变。

定义 job args 和 worker:


typeSendEmailArgsstruct{
AccountID   uuid.UUID`json:"account_id"   river:"unique"`
Body      string`json:"body"      river:"-"`
EmailRecipient string`json:"email_recipient" river:"-"`
EmailSender  string`json:"email_sender"  river:"-"`
IdempotencyKey uuid.UUID`json:"idempotency_key" river:"unique"`
Subject    string`json:"subject"     river:"-"`
}
func(SendEmailArgs)Kind()string{return"send_email"}
func(SendEmailArgs)InsertOpts()river.InsertOpts{
returnriver.InsertOpts{
UniqueOpts:river.UniqueOpts{
ByArgs:true,
},
}
}


typeSendEmailWorkerstruct{
river.WorkerDefaults[SendEmailArgs]
smtpHost, smtpPass, smtpUser string
}
func(w *SendEmailWorker)Work(ctxcontext.Context,job*river.Job[SendEmailArgs])error{
// This will probably too simple to work in reality, but is here to
// demonstrate the basic shape of what sending an email would look like.
var(
auth  = smtp.PlainAuth("", w.smtpUser, w.smtpPass, w.smtpHost)
message =[]byte(fmt.Sprintf("To: %s\r\n"+
"Subject: %s\r\n"+
"\r\n"+
"%s\r\n",
job.Args.EmailRecipient,
job.Args.Subject,
job.Args.Body,
))
)
return smtp.SendMail(w.smtpHost, auth, job.Args.EmailSender,[]string{job.Args.EmailRecipient}, message)
}

Job args 指定如何将 Job 序列化到数据库,以及元插入行为(如其唯一性处理)。 Workers 通过实现 Work 函数来决定 Job 在被出队后做什么。

通过 Unique Arg 实现幂等性

我们可以通过多种方式实现幂等性,但为了方便起见,我们将利用 River 的内置系统 for unique jobs。 它的内部机制相当复杂,但在最基本的层面上,它通过从 Job 的唯一属性构建一个唯一键,并尝试使用 Postgres 中的唯一索引来 upsert 该 Job 来运作:


CREATEUNIQUE INDEXriver_job_kind_unique_key_idx
ON river_job (kind, unique_key)
WHERE unique_key IS NOT NULL;

在上面的 Job 示例中,唯一性基于 Job kind(只有其他 SendEmailArgs 才有资格进行比较)、经过身份验证的帐户 ID (AccountID) 和 Idempotency Key (IdempotencyKey)。 其他 Job 属性(如 EmailRecipient)不影响唯一性,这是一个安全功能,我们稍后会介绍。

按帐户分片

仅基于传入的 Idempotency Key 来确定唯一性是一个诱人的错误,但我们必须记住,Idempotency Key 是用户指定的,因此帐户之间可能存在冲突。 希望所有负责任的用户都将生成 V4 UUID,这些 UUID 在统计上永远不会与其他任何内容发生冲突,但错误或恶意行为可能导致重复的密钥,此时将它们限定到特定帐户变得很重要,因此我们的唯一性基于 AccountID + IdempotencyKey


typeSendEmailArgsstruct{
AccountID   uuid.UUID`json:"account_id"   river:"unique"`
Body      string`json:"body"      river:"-"`
EmailRecipient string`json:"email_recipient" river:"-"`
EmailSender  string`json:"email_sender"  river:"-"`
IdempotencyKey uuid.UUID`json:"idempotency_key" river:"unique"`
Subject    string`json:"subject"     river:"-"`
}

参见 AccountIDIdempotencyKey 上的 river:"unique" 注释,它告诉 River 只考虑这两个字段的唯一性。

Unique Keys 通过哈希传入的 Unique 属性和 args 生成,因此输入长度几乎无关紧要,并且不会对性能产生任何可衡量的影响。

填充 API 的其余部分

接下来,填充一个 HTTP 处理程序,它将充当我们的 API endpoint。 您可以在 project's GitHub repo 中找到完整的工作代码。

定义一个 APIService,它将包含一个 River client 和 API service handlers。 它获取一个事务 begin 引用,以便测试可以轻松地注入一个 test transaction,并且它可以将自己挂载到 http.ServeMux 上:


typeAPIServicestruct{
begin    func(ctxcontext.Context)(pgx.Tx,error)
riverClient *river.Client[pgx.Tx]
}
func(s *APIService)ServeMux()*http.ServeMux{
mux := http.NewServeMux()
mux.Handle("POST /emails",MakeHandler(s.EmailCreate))
return mux
}

一个 "create email" 服务处理程序接受输入参数并将 Job 排队:


typeHandleEmailCreateRequeststruct{
AccountID   uuid.UUID`json:"account_id"   validate:"required"`
Body      string`json:"body"      validate:"required"`
EmailRecipient string`json:"email_recipient" validate:"required"`
EmailSender  string`json:"email_sender"  validate:"required"`
IdempotencyKey uuid.UUID`json:"idempotency_key" validate:"required"`
Subject    string`json:"subject"     validate:"required"`
}
typeHandleEmailCreateResponsestruct{
Message string`json:"message"`
}
func(s *APIService)EmailCreate(ctxcontext.Context,req*HandleEmailCreateRequest)(*HandleEmailCreateResponse,error){
tx, err := s.begin(ctx)
if err !=nil{
returnnil, err
}
deferfunc(){ _ = tx.Rollback(ctx)}()
insertRes, err := s.riverClient.InsertTx(ctx, tx,SendEmailArgs{
AccountID:   req.AccountID,
Body:      req.Body,
EmailRecipient: req.EmailRecipient,
EmailSender:  req.EmailSender,
IdempotencyKey: req.IdempotencyKey,
Subject:    req.Subject,
},nil)
if err !=nil{
returnnil, err
}
if err := tx.Commit(ctx); err !=nil{
returnnil, err
}
if insertRes.UniqueSkippedAsDuplicate {
// see the "Reacting to duplicates" section below
}
return&HandleEmailCreateResponse{Message:"Email has been queued for sending."},nil
}

一个 run 函数初始化一个 River client 并将 APIService 挂载到 net/http.Server 上:


dbPool, err := pgxpool.New(ctx, config.DatabaseURL)
if err !=nil{
return err
}
riverClient, err := river.NewClient(riverpgxv5.New(dbPool),&river.Config{
Queues:map[string]river.QueueConfig{
river.QueueDefault:{MaxWorkers:100},
},
Workers:makeWorkers(&config),
})
if err !=nil{
return err
}
server :=&http.Server{
Addr:":8080",
Handler:(&APIService{
begin:    dbPool.Begin,
riverClient: riverClient,
}).ServeMux(),
}
fmt.Printf("Listening on %s\n", server.Addr)
if err := server.ListenAndServe(); err !=nil{
return err
}

对重复项做出反应

当由于已经存在具有相同 Unique Key 的 Job 而跳过插入时,River 的 API 会告诉我们。 编写 API 实现以了解这些潜在的 no-ops,并在它们发生时返回更具体的消息:


if insertRes.UniqueSkippedAsDuplicate {
if insertRes.Job.State == rivertype.JobStateCompleted {
return&HandleEmailCreateResponse{Message:"Email has been sent."},nil
}
return&HandleEmailCreateResponse{Message:"Email was already queued and is pending send."},nil
}

参见 rivertype.JobInsertResult.

这是我们需要编写的唯一幂等性逻辑,因为 River 会自动完成幕后的大部分繁重工作。

幂等调用的示例

启动 API 服务器并使用 cURL 发送电子邮件:

Terminal window


curl-i-XPOSThttp://localhost:8080/emails-d'{
"account_id":"bb381da5-8275-41f2-9238-4afaf9f8e359",
"body":"Hello from email demo.",
"email_recipient":"receiver@example.com",
"email_sender":"sender@example.com",
"idempotency_key":"d8923851-4bc5-45ba-a9fa-077ed8755ef1",
"subject":"Hello."
}'
HTTP/1.1200OK
Date:Sat,22Mar202506:41:22GMT
Content-Length:48
Content-Type:text/plain; charset=utf-8
{"message":"Email has been queued for sending."}

后台 worker 尚未启动,因此 Job 不会立即被处理,但后续请求仍然会检测到它的存在。 再次尝试该请求,看看 API 是否注意到 Job 已经排队:

Terminal window


curl-i-XPOSThttp://localhost:8080/emails-d'{
"account_id":"bb381da5-8275-41f2-9238-4afaf9f8e359",
"body":"Hello from email demo.",
"email_recipient":"receiver@example.com",
"email_sender":"sender@example.com",
"idempotency_key":"d8923851-4bc5-45ba-a9fa-077ed8755ef1",
"subject":"Hello."
}'
HTTP/1.1200OK
Date:Sat,22Mar202506:41:29GMT
Content-Length:59
Content-Type:text/plain; charset=utf-8
{"message":"Email was already queued and is pending send."}

Idempotency Key 的重用和过期

River 会将已完成的 Job 保留一段时间,以便用于分析或检查。 24 小时后(可配置),它们将被在群集领导者上运行的 Job 清理器 maintenance service 清理掉。

默认情况下,在此(可配置的)状态集中检查 Job 唯一性:


[]rivertype.JobState{
rivertype.JobStateAvailable,
rivertype.JobStateCompleted,
rivertype.JobStatePending,
rivertype.JobStateRunning,
rivertype.JobStateRetryable,
rivertype.JobStateScheduled,
}

即使邮件 Job 运行并设置为 completed,当插入具有相同 Idempotency Key 的新邮件 Job 时,仍然会观察到它。 24 小时后,completed 记录将被删除,并且可以插入具有相同 Key 的新 Job。

24 小时与 like Stripe 等公司的 Idempotency Key 保持有效的持续时间一致。 对于失败的间歇性请求重试多次来说,时间非常充裕,但时间又不会太长,以至于集成开始依赖于 Idempotency Key 的永久可用性。

参数匹配安全性

作为一项额外的安全功能,如果传入参数与创建现有 Job 的参数不匹配,API 将返回一个错误。 参数不匹配可能意味着 Idempotency Key 已被重用,这可能是调用集成中的一个错误,它应该知道:


if insertRes.UniqueSkippedAsDuplicate {
var existingArgs SendEmailArgs
if err := json.Unmarshal(insertRes.Job.EncodedArgs,&existingArgs); err !=nil{
returnnil, err
}
// If incoming parameters don't match those of an already queued job,
// tell the user about it. There's probably a bug in the caller.
if req.Body != existingArgs.Body ||
req.EmailRecipient != existingArgs.EmailRecipient ||
req.EmailSender != existingArgs.EmailSender ||
req.Subject != existingArgs.Subject {
returnnil,&APIError{
Message:"Incoming parameters don't match those of queued email. You may have a bug.",
StatusCode: http.StatusBadRequest,
}
}
if insertRes.Job.State == rivertype.JobStateCompleted {
return&HandleEmailCreateResponse{Message:"Email has been sent."},nil
}
return&HandleEmailCreateResponse{Message:"Email was already queued and is pending send."},nil
}

回到 cURL,我们可以演示如何使用与之前相同的参数,但使用新的 idempotency_key 值绕过唯一性检查并排队新的电子邮件:

Terminal window


curl-i-XPOSThttp://localhost:8080/emails-d'{
"account_id":"bb381da5-8275-41f2-9238-4afaf9f8e359",
"body":"Hello from email demo.",
"email_recipient":"receiver@example.com",
"email_sender":"sender@example.com",
"idempotency_key":"668298b1-b59b-405d-894f-1dde8847e66e", # new key!
"subject":"Hello."
}'
HTTP/1.1200OK
Date:Sat,22Mar202507:01:17GMT
Content-Length:48
Content-Type:text/plain; charset=utf-8
{"message":"Email has been queued for sending."}

简化但达到生产级别

我们构建的 API 显然已经为了演示目的而被简化,但即便如此,它的幂等性逻辑已经比市场上存在多年的某些最大的邮件 API 更复杂。 在大约 300 行代码中,我们构建了一个 API,它可以:

从中得出的教训是,提供幂等性确实需要一些思考,但它完全可以掌握,并且使用像 River 这样的现成产品甚至可以使其变得容易。 您的 API 可以(并且应该!)也拥有它。

目录

  1. 什么是 Idempotency Key?

  2. 定义 River Job

    1. 通过 Unique Arg 实现幂等性
    2. 按帐户分片
  3. 填充 API 的其余部分

    1. 对重复项做出反应
  4. 幂等调用的示例

  5. Idempotency Key 的重用和过期

  6. 参数匹配安全性

  7. 简化但达到生产级别

页脚

GitHub

© 2025 River Software, LLC. All rights reserved.