大规模对象存储的垃圾回收 (Garbage Collection)
PricingCompany
Product
Bring Your Own CloudDeploy in any cloud or self-host, using your own compute and object storage.Managed Data PipelinesZero ops. Powerful, scalable stream processing in your cloud.OrbitOffset-preserving replication from any source Apache Kafka cluster.Data GovernanceSchema registry, linking, and validation in your own cloud account.
BlogDocsDocsContactExecutive Summary
WarpStream Executive Summary
Sign instart freeGet started free
Engineering
清理垃圾:大规模对象存储的垃圾回收
2025年4月10日 Richard Artoul Link copied!Subscribe HN 免责声明: WarpStream 销售一种直接构建于对象存储之上的 Apache Kafka 即插即用替代品。
在过去的 10 年里,我基于对象存储构建了多个分布式系统,其中 WarpStream 是最新的一个。所有这些系统都存在一个共同的问题:我们需要花费大量时间来解决一个看似简单的问题:从对象存储中删除由于数据过期或压缩而逻辑删除的文件。
我在 “共享存储案例”博客文章 中更详细地讨论了这一点,但简而言之:我构建的每个共享存储系统都类似于以下结构:
客户端与无状态节点交互(这些节点可能分为不同的“角色”)。 无状态节点抽象出一个共享存储后端(如对象存储)和一个强一致性元数据存储,以创建某种逻辑抽象,在 WarpStream 的例子中,就是 Apache Kafka 协议。
WarpStream 文件最终会在元数据存储中被逻辑删除,因此需要从对象存储中物理删除,有以下几种方式:
由于配置的 topic TTL,文件中的所有数据都已过期:↴
由于显式 topic 删除,文件中的所有数据都已删除:↴
该文件参与了压缩,并在压缩中被逻辑删除:↴
在本文的其余部分,我将介绍使用 延迟队列、异步协调 或 两者结合 来解决此问题的几种不同方法。 但是,在介绍我认为解决此问题的最佳方法之前,让我们首先介绍一些看起来很明显,但在实践中效果不佳的方法,例如 bucket policies 和 同步删除。
为什么不直接使用 Bucket Policy?
处理对象存储清理的最简单方法是使用具有可配置 TTL 的 bucket policy。 例如,我们可以配置一个对象存储策略,该策略会自动删除超过 7 天的文件。 对于简单的或面向时间序列的系统,这通常是一个不错的解决方案。
但是,对于像 WarpStream 这样更复杂的系统,它必须提供 Apache Kafka 的抽象,这种方法行不通。 例如,考虑一个具有数百或数千个不同 topic 的 WarpStream 集群。 一些 topic 可以配置为低至 1 小时的保留期,而另一些 topic 可以配置为高达 90 天的保留期。 如果我们依赖于简单的 bucket policy,那么我们将不得不将 bucket policy 配置为 至少 90 天,这将导致具有较低保留期的 topic 产生过高的存储成本,因为 WarpStream 文件可以包含许多不同 topic 的数据。
即使我们乐于要求单个集群中的所有 topic 共享一个保留期,Kafka 中的其他实现细节和功能也无法通过简单的对象存储 bucket policy 来实现。 例如,Kafka 具有一个名为“压缩 topic”的功能。 在压缩 topic 中,记录不会在它们太旧时被删除/过期,而是在_它们被具有相同 key 的新记录覆盖时_。 记录可能在首次写入后的几秒钟内被覆盖,也可能在几年后被覆盖。
不幸的是,bucket policy 仅作为清理最简单用例的对象存储文件的机制。 需要提供更高级功能的共享存储系统必须在系统本身中实现对象清理。
为什么不直接使用同步删除?
天真地看,似乎每当元数据存储决定逻辑删除一个文件时,它应该能够同时去物理地从对象存储中删除该文件,从而使两个系统保持同步:
// Tada.
metadataStore.DeleteFile(fileID)
objectStore.DeleteFile(fileID)
在传统的编程语言理论中,这种垃圾回收方法类似于“引用跟踪”。 但是分布式系统不是编程语言,上面的代码在现实世界中不起作用:
if err := metadataStore.DeleteFile(fileID); err != nil {
// 这没关系,我们可以稍后重试。
}
if err := objectStore.DeleteFile(fileID); err != nil {
// 糟糕。 该文件将永远在对象存储中成为孤儿。
}
如果该文件已从元数据存储中成功删除,但未从对象存储中删除(因为节点崩溃、我们收到 500 错误等),那么该文件将在对象存储中_变为孤儿_。
孤儿文件是指物理存在于对象存储中,但在逻辑上未在元数据存储中跟踪,因此不属于分布式数据库的文件。 这是一个问题,因为这些孤儿文件会随着时间的推移而积累,并花费你很多钱。
但实际上,即使两个删除操作以某种方式原子地成功,这种方法不起作用的另一个原因是:正在进行的查询。 共享存储系统中的查询生命周期通常分两个步骤进行:
- 查询元数据存储以查找相关文件。
- 在相关文件上执行查询。
如果在步骤 1 中返回文件后但在步骤 2 完成之前物理删除了文件,那么该查询将失败,因为其查询计划具有对不再存在文件的引用。
为了使其具体化,假设 WarpStream 中消费者 Fetch 请求的生命周期,该消费者尝试读取一个名为 logs 的 topic 的分区 2,并且要读取的下一个 offset 是 300:
- WarpStream Agent 将查询元数据存储,以查找哪个文件包含从 logs topic 的分区 2 的 offset 300 开始的数据批次。 在此示例中,元数据存储返回文件 ID 451。
- 接下来,WarpStream Agents 将使用从元数据存储返回的文件元数据作为索引,从文件 451 中读取数据。
但是,WarpStream Agents 也 运行压缩。 假设在步骤 1 和步骤 2 之间,文件 451 参与了压缩。 文件 451 在逻辑上将不再存在,并且它包含的 logs topic 的分区 2 的数据现在将位于完全不同的文件(例如 936)中。
如果在压缩后立即删除文件 451,则步骤 2 很可能会失败,因为元数据存储告诉 Agent 读取的文件不再物理存在。
然后,Agent 将不得不 再次 查询元数据存储以查找要读取的新文件,并希望在它可以完成运行 Fetch 请求之前,该文件这次没有再次被压缩。 这将是浪费的,并且会增加延迟。
相反,如果通过压缩逻辑删除的文件在对象存储中继续存在一段时间,以便正在进行的查询可以继续使用它们,那将会更好。
方法 #1:延迟队列
既然我们已经研究了两种不起作用的方法,那么让我们解释一种有效的方法。 解决此类问题的规范解决方案是引入延迟队列:从元数据存储中删除的文件首先会被持久地排队,然后在经过足够的延迟后才会被删除,以避免中断实时查询。 但是,使用外部队列会引入与同步删除相同的问题:如果文件从元数据存储中删除,但随后入队操作失败,则该文件将在对象存储中变为孤儿。
幸运的是,我们不必使用外部队列。 共享存储系统中元数据的后备数据库几乎总是一个具有强一致性和事务保证的数据库。 WarpStream 也是如此。 因此,我们可以使用这些事务属性来删除元数据存储中的文件,并将其添加到在单个原子操作中,元数据存储本身中的延迟队列:
if err := metadataStore.DeleteFileAndEnqueueForDeletion(fileID); err != nil {
// 这没关系,我们可以稍后重试。
}
通过这种方法,永远不会引入孤儿文件(除非实现中存在错误),并且我们没有添加任何额外的依赖项或潜在的故障模式。 双赢!
当然,上面的声明中有一个很大的 if:它假设实现中没有错误,并且我们永远不会意外地使文件成为孤儿。 事实证明,在整个项目的生命周期中,这都是一个难以维护的恒定条件。
当然,即使你从未在系统中引入任何导致某些文件成为孤儿的错误,延迟文件删除也很重要的另一个原因是:灾难恢复。 假设出现问题:损坏的数据进入系统,有人错误地删除了重要数据,或者元数据存储本身以某种灾难性的方式发生故障。
元数据存储本身由一个实际的数据库支持,因此可以从快照或备份还原它以从数据丢失中恢复。 但是,只有在备份引用的所有文件 仍然存在于对象存储中 的情况下,还原元数据存储的备份才有效。
因此,在元数据存储中逻辑删除文件和从对象存储中物理删除文件之间的延迟量充当了可以还原的备份的最早期限的硬性边界!
方法 #2:异步协调
除了延迟队列方法之外,另一个有效的解决方案是使用异步协调。 在共享存储系统中,元数据存储始终是系统中存在的数据和文件的真实来源。 这意味着,从对象存储中清除逻辑删除的文件可以被视为一个协调过程,其中扫描对象存储以识别元数据存储不再跟踪的任何文件。
如果找到未跟踪的文件,则可以安全地从对象存储中删除该文件(考虑到适当的延迟,该延迟足够大以适应实时查询和所需的灾难恢复要求):
for _, file := range objectStore.ListFiles() {
if !metadataStore.Contains(file.FileID) && file.Age() > $DELETION_DELAY {
objectStore.DeleteFile(fileID)
}
}
在传统的编程语言理论中,这种垃圾回收方法类似于“标记和清除”算法。 这种方法更容易正确并保持正确。 对象存储中元数据存储未跟踪的任何文件按定义都是孤儿文件:它不能被查询使用或参与压缩,因此可以安全地删除它。
这种方法的问题是它比以前的方法更昂贵,并且难以调整。 在商品对象存储中列出文件是一项出了名的缓慢且昂贵的操作,很容易导致超出速率限制。 此外,获取文件的使用时长需要对文件发出 HEAD 请求,这也需要花钱。
在我最早使用的共享存储系统中,我们最初使用了延迟队列方法,因为它更容易调整和扩展。 但是,不变的是,我们总是在项目后期添加一个协调循环,该循环 除了 延迟队列系统之外运行,以清除以某种方式遗漏的任何孤儿文件。
在设计 WarpStream 时,我们讨论了从哪种方法开始。 最终,我们决定使用协调方法,尽管它更昂贵且更难以调整,原因有两个:
- 我们迟早需要添加一个,因此我们决定从一开始就构建它。
- 我们的 BYOC 部署模型意味着,如果我们在客户对象存储 bucket 中使文件成为孤儿,我们将不得不以某种方式让他们参与清理它,这让我们觉得无法接受。
我们构建了一个相当复杂的设置,它可以根据集群的观察到的吞吐量自动调整自身。 我们还添加了许多内置的安全措施,以避免触发任何对象存储速率限制。 例如,WarpStream 的协调扫描程序会自动将其 LIST 和 HEAD 请求尽可能均匀地分布在对象存储的所有前缀中。 这显着降低了受到速率限制的风险,因为对象存储速率限制与几乎每个主要实现中的 key 范围/前缀相关联。
整合所有内容
协调循环为 WarpStream 提供了很长时间的服务,但随着我们客户的集群变得更大和更高容量,我们不得不允许协调过程运行得越来越快,这进一步增加了成本。
最终,我们决定是时候一劳永逸地解决这个问题了。 我们从之前的经验中了解到,为了避免定期列出整个 bucket,我们需要跟踪已删除的文件队列,以便稍后删除它们。
我们可以按照我们之前描述的方式将此队列引入到我们的控制平面元数据存储中,但这感觉很浪费。 WarpStream 的元数据存储是一个强一致的数据库,可提供极高的可用性、持久性和一致性保证。 这些是理想的属性,但它们带有字面意义上的成本。 就每字节存储的成本而言,WarpStream 的控制平面元数据存储是堆栈中最昂贵的组件。 这意味着我们只想使用它来存储和跟踪绝对需要保证系统正确性和性能的元数据。
如果我们还没有协调过程,那么元数据存储将是跟踪已删除文件的唯一可行的地方,因为丢失对其中任何一个文件的跟踪将导致永久性的孤儿对象存储文件。 但是由于我们已经有一个协调循环,因此跟踪已删除的文件 ID 只是降低成本的一种优化。 在最坏的情况下,如果我们从删除队列中丢失了一些文件 ID,协调循环将在几个小时内捕获它们并清理文件。
因此,我们决定采用一种略有不同的方法,并在 WarpStream Agent 中创建我们称之为“乐观删除队列”的东西。 每当 WarpStream Agent 完成压缩时,它知道参与压缩的输入文件在控制平面中被逻辑删除,因此应该稍后从对象存储中删除。
压缩完成后,WarpStream Agent 会将删除的文件 ID 插入到一个大型缓冲的 Go 通道(一个大型缓冲队列)中。 在后台运行的单独 goroutine 从通道中提取文件 ID 并等待适当的时间过去,然后从对象存储中物理删除该文件:
// Goroutine 1
err := controlPlane.ApplyCompaction(req)
if err == nil {
delayedDeletionQueue.Submit(inputFileIDs)
}
// Goroutine 2
for _, fileID := range delayedDeletionQueue {
time.Sleep(time.Until(fileID.CreatedAt + $DELETION_DELAY))
if !metadataStore.Contains(file.FileID) {
objectStore.DeleteFile(fileID)
}
}
请注意,这种方法仅适用于作为压缩的一部分删除的文件,而不适用于由于它们包含的所有数据都逻辑过期而被逻辑删除的文件。 我们认为这在实践中意义不大,因为 WarpStream 的存储引擎是一个 log-structured merge tree,因此,压缩应该是已删除文件的最大来源。
这在实践中得到了证实,通过这种新的混合方法,我们发现绝大多数文件都可以在协调循环找到它们之前被删除,从而大大降低了成本和开销。
如果 WarpStream Agent 恰好死机或被重新调度并丢失了对某些计划删除文件的跟踪怎么办? 没有危害,没有犯规,协调循环将在几个小时内检测并清理问题。
在我职业生涯中已经三次以上解决了这个问题之后,我可以自信地说,这现在是我最喜欢的解决方案:它具有高度的可扩展性、廉价且易于推理。
Create a free WarpStream account and start streaming with $400 in free credits.Get Started
Author
Richard Artoul
Co-Founder
Table of Contents
Why Not Just Use a Bucket Policy? Why Not Just Use Synchronous Deletion? Approach #1: Delayed Queue Approach #2: Asynchronous Reconciliation Bringing It All Together Talk to an expert
Subscribe
Subscribe
WarpStream is an Kafka compatible data streaming platform built directly on top of object storage: no inter-AZ networking costs, no disks to manage, and infinitely scalable, all within your VPC.
Join our mailing list to stay up to date
New blogs, product updates, events and more.
Subscribe
SlackDiscordX (Twitter)BlueskyLinkedInFacebookYoutube
PricingCompanyDocumentationBlogContact UsExecutive SummaryConfluent
© 2025 WarpStream. All rights reserved.
Apache, Apache Kafka, Kafka, and associated open source project names are trademarks of the Apache Software Foundation.
StatusPrivacy PolicySLASupport PolicyTerms of ServiceTrust CenterSupport
WarpStream S3 Express One Zone Benchmark...
Taking out the Trash: Garbage Collection...
A Trip Down Memory Lane: How We Resolved...
Zero Ops Schema Migration: WarpStream Sc...
WarpStream Diagnostics: Keep Your Data S...
How WarpStream Powers Grafana Labs’ Rede...
Character.AI's Transition to WarpStream
Kafka Transactions Explained (Twice!)
Getting Rid of (Kafka) Noisy Neighbors W...
Introducing WarpStream BYOC Schema Regis...
The Case for Shared Storage
Kafka Replication Without the (Offset) G...
Announcing Schema Validation with AWS Gl...
WarpStream is Dead, Long Live WarpStream
Dealing with rejection (in distributed s...
Announcing WarpStream Schema Validation
The Kafka Metric You're Not Using: Stop ...
Multiple Regions, Single Pane of Glass
Secure by default: How WarpStream’s BYOC...
Announcing Bento, the open source fork o...
Zero Disks is Better (for Kafka)
Pixel Federation Powers Mobile Analytics...
Introducing WarpStream Managed Data Pipe...
Tiered Storage Won’t Fix Kafka
Cloud Disks are (Really!) Expensive
Real-Time Website Security Monitoring wi...
Fancy Stream Processing Made (even more)...
The Original Sin of Cloud Infrastructure
Deterministic Simulation Testing for Our...
Public Benchmarks and TCO Analysis
Kafka as a KV Store: deduplicating milli...
Anatomy of a serverless usage based bill...
S3 Express is All You Need
Unlocking Idempotency with Retroactive T...
Minimizing S3 API Costs with Distributed...
Hacking the Kafka PRoTocOL
Kafka is dead, long live Kafka
Return To Blog
Return To Blog
![](https://analytics.twitter.com/1/i/adsct?bci=4&dv=UTC%26en-US%26Google%20Inc.%26Linux%20x86_64%26255%261080%26600%264%2624%261080%26600%260%26na&eci=3&event=%7B%7D&event_id=ca9c3b56-8b28-46e8-8f66-70f0cf1489d2&integration=gtm&p_id=Twitter&p_user_i