Show HN: Hatchet v1 – a task orchestration platform built on Postgres
Show HN:Hatchet v1 - 构建于Postgres之上的任务编排平台
大规模运行后台任务
Hatchet Cloud · Documentation · Website · Issues
什么是 Hatchet?
Hatchet 是一个基于 Postgres 构建的后台任务运行平台。无需管理你自己的任务队列或发布/订阅系统,你可以使用 Hatchet 在一组 worker 之间分发你的函数,而无需过多的配置或基础设施。
什么时候应该使用 Hatchet?
后台任务对于从你的主 Web 应用程序卸载工作至关重要。通常,后台任务通过 FIFO(先进先出)队列发送,这有助于防御流量高峰(队列可以吸收大量负载),并确保在任务处理程序出错时重试任务。大多数技术栈都从由 Redis 或 RabbitMQ 支持的基于库的队列开始(例如 Celery 或 BullMQ)。但是,随着你的任务变得越来越复杂,这些队列变得难以调试、监控,并开始以意想不到的方式失败。
这就是 Hatchet 的用武之地。Hatchet 是一个功能齐全的后台任务管理平台,内置支持将复杂的任务链接到工作流中,在发生故障时发出警报,使任务更持久,并在实时 Web 仪表板中查看任务。
特点
📥 队列
Hatchet 构建于一个持久的任务队列之上,该队列对你的任务进行排队,并以你的 worker 可以处理的速率将它们发送到你的 worker。Hatchet 将跟踪你的任务的进度,并确保工作完成(或者你收到警报),即使你的应用程序崩溃。
这对于以下情况尤其有用:
- 确保你永远不会丢弃用户请求
- 削平应用程序中的大型峰值
- 将大型、复杂的逻辑分解为更小、可重用的任务
Python
# 1. 定义你的任务输入
class SimpleInput(BaseModel):
message: str
# 2. 使用 hatchet.task 定义你的任务
@hatchet.task(name="SimpleWorkflow", input_validator=SimpleInput)
def simple(input: SimpleInput, ctx: Context) -> dict[str, str]:
return {
"transformed_message": input.message.lower(),
}
# 3. 在你的 worker 上注册你的任务
worker = hatchet.worker("test-worker", workflows=[simple])
worker.start()
# 4. 从你的应用程序调用任务
simple.run(SimpleInput(message="Hello World!"))
Typescript
// 1. 定义你的任务输入
export type SimpleInput = {
Message: string;
};
// 2. 使用 hatchet.task 定义你的任务
export const simple = hatchet.task({
name: "simple",
fn: (input: SimpleInput) => {
return {
TransformedMessage: input.Message.toLowerCase(),
};
},
});
// 3. 在你的 worker 上注册你的任务
const worker = await hatchet.worker("simple-worker", {
workflows: [simple],
});
await worker.start();
// 4. 从你的应用程序调用任务
await simple.run({
Message: "Hello World!",
});
Go
// 1. 定义你的任务输入
type SimpleInput struct {
Message string `json:"message"`
}
// 2. 使用 factory.NewTask 定义你的任务
simple := factory.NewTask(
create.StandaloneTask{
Name: "simple-task",
}, func(ctx worker.HatchetContext, input SimpleInput) (*SimpleResult, error) {
return &SimpleResult{
TransformedMessage: strings.ToLower(input.Message),
}, nil
},
hatchet,
)
// 3. 在你的 worker 上注册你的任务
worker, err := hatchet.Worker(v1worker.WorkerOpts{
Name: "simple-worker",
Workflows: []workflow.WorkflowBase{
simple,
},
})
worker.StartBlocking()
// 4. 从你的应用程序调用任务
simple.Run(context.Background(), SimpleInput{Message: "Hello, World!"})
🎻 任务编排
Hatchet 允许你构建由多个任务组成的复杂工作流。例如,如果你想将工作负载分解为更小的任务,你可以使用 Hatchet 创建一个扇出工作流,该工作流并行生成多个任务。
Hatchet 支持以下任务编排机制:
-
DAGs (有向无环图) — 预先定义你的工作形状,自动将父任务的输出路由到子任务的输入。阅读更多 ➶
-
Durable tasks (持久化任务) — 这些任务负责编排其他任务。它们存储所有生成的任务的完整历史记录,允许你缓存中间结果。阅读更多 ➶
-
Python
# 1. 定义一个工作流(工作流是任务的集合)
simple = hatchet.workflow(name="SimpleWorkflow")
# 2. 将第一个任务附加到工作流
@simple.task()
def task_1(input: EmptyModel, ctx: Context) -> dict[str, str]:
print("executed task_1")
return {"result": "task_1"}
# 3. 将第二个任务附加到工作流,该任务在 task_1 之后执行
@simple.task(parents=[task_1])
def task_2(input: EmptyModel, ctx: Context) -> None:
first_result = ctx.task_output(task_1)
print(first_result)
# 4. 从你的应用程序调用工作流
result = simple.run(input_data)
Typescript
// 1. 定义一个工作流(工作流是任务的集合)
const simple = hatchet.workflow<DagInput, DagOutput>({
name: "simple",
});
// 2. 将第一个任务附加到工作流
const task1 = simple.task({
name: "task-1",
fn: (input) => {
return {
result: "task-1",
};
},
});
// 3. 将第二个任务附加到工作流,该任务在 task-1 之后执行
const task2 = simple.task({
name: "task-2",
parents: [task1],
fn: (input, ctx) => {
const firstResult = ctx.getParentOutput(task1);
console.log(firstResult);
},
});
// 4. 从你的应用程序调用工作流
await simple.run({ Message: "Hello World" });
Go
// 1. 定义一个工作流(工作流是任务的集合)
simple := v1.WorkflowFactory[DagInput, DagOutput](
workflow.CreateOpts[DagInput]{
Name: "simple-workflow",
},
hatchet,
)
// 2. 将第一个任务附加到工作流
const task1 = simple.Task(
task.CreateOpts[DagInput]{
Name: "task-1",
Fn: func(ctx worker.HatchetContext, _ DagInput) (*SimpleOutput, error) {
return &SimpleOutput{
Result: "task-1",
}, nil
},
},
);
// 3. 将第二个任务附加到工作流,该任务在 task-1 之后执行
const task2 = simple.Task(
task.CreateOpts[DagInput]{
Name: "task-2",
Parents: []task.NamedTask{
step1,
},
Fn: func(ctx worker.HatchetContext, _ DagInput) (*SimpleOutput, error) {
return &SimpleOutput{
Result: "task-2",
}, nil
},
},
);
// 4. 从你的应用程序调用工作流
simple.Run(ctx, DagInput{})
🚦 流量控制
不要让繁忙的用户使你的应用程序崩溃。使用 Hatchet,你可以基于每个用户、每个租户和每个队列来限制执行,从而提高系统稳定性并限制繁忙用户对系统其余部分的影响。
Hatchet 支持以下流量控制原语:
-
Concurrency(并发) — 根据动态并发键设置并发限制(例如,每个用户在给定时间只能运行 10 个批处理作业)。阅读更多 ➶
-
Rate limiting(速率限制) — 创建全局和动态速率限制。阅读更多 ➶
-
Python
# limit concurrency on a per-user basis
flow_control_workflow = hatchet.workflow(
name="FlowControlWorkflow",
concurrency=ConcurrencyExpression(
expression="input.user_id",
max_runs=5,
limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
),
input_validator=FlowControlInput,
)
# rate limit a task per user to 10 tasks per minute, with each task consuming 1 unit
@flow_control_workflow.task(
rate_limits=[
RateLimit(
dynamic_key="input.user_id",
units=1,
limit=10,
duration=RateLimitDuration.MINUTE,
)
]
)
def rate_limit_task(input: FlowControlInput, ctx: Context) -> None:
print("executed rate_limit_task")
Typescript
// limit concurrency on a per-user basis
flowControlWorkflow = hatchet.workflow<SimpleInput, SimpleOutput>({
name: "ConcurrencyLimitWorkflow",
concurrency: {
expression: "input.userId",
maxRuns: 5,
limitStrategy: ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
},
});
// rate limit a task per user to 10 tasks per minute, with each task consuming 1 unit
flowControlWorkflow.task({
name: "rate-limit-task",
rateLimits: [
{
dynamicKey: "input.userId",
units: 1,
limit: 10,
duration: RateLimitDuration.MINUTE,
},
],
fn: async (input) => {
return {
Completed: true,
};
},
});
Go
// limit concurrency on a per-user basis
flowControlWorkflow := factory.NewWorkflow[DagInput, DagResult](
create.WorkflowCreateOpts[DagInput]{
Name: "simple-dag",
Concurrency: []*types.Concurrency{
{
Expression: "input.userId",
MaxRuns: 1,
LimitStrategy: types.GroupRoundRobin,
},
},
},
hatchet,
)
// rate limit a task per user to 10 tasks per minute, with each task consuming 1 unit
flowControlWorkflow.Task(
create.WorkflowTask[FlowControlInput, FlowControlOutput]{
Name: "rate-limit-task",
RateLimits: []*types.RateLimit{
{
Key: "user-rate-limit",
KeyExpr: "input.userId",
Units: 1,
LimitValueExpr: 10,
Duration: types.Minute,
},
},
}, func(ctx worker.HatchetContext, input FlowControlInput) (interface{}, error) {
return &SimpleOutput{
Step: 1,
}, nil
},
)
📅 调度
Hatchet 完全支持调度功能,包括 cron、一次性调度和暂停执行一段时间。这对于以下情况尤其有用:
-
Cron schedules(Cron 调度) – 以 cron 调度运行数据管道、批处理或通知系统 阅读更多 ➶
-
One-time tasks(一次性任务) – 安排在未来的特定时间运行工作流 阅读更多 ➶
-
Durable sleep(持久睡眠) – 暂停任务执行一段特定的时间 阅读更多 ➶
-
Python
tomorrow = datetime.today() + timedelta(days=1)
# schedule a task to run tomorrow
scheduled = simple.schedule(
tomorrow,
SimpleInput(message="Hello, World!")
)
# schedule a task to run every day at midnight
cron = simple.cron(
"every-day",
"0 0 * * *",
SimpleInput(message="Hello, World!")
)
Typescript
const tomorrow = new Date(Date.now() + 1000 * 60 * 60 * 24);
// schedule a task to run tomorrow
const scheduled = simple.schedule(tomorrow, {
Message: "Hello, World!",
});
// schedule a task to run every day at midnight
const cron = simple.cron("every-day", "0 0 * * *", {
Message: "Hello, World!",
});
Go
const tomorrow = time.Now().Add(24 * time.Hour);
// schedule a task to run tomorrow
simple.Schedule(ctx, tomorrow, ScheduleInput{
Message: "Hello, World!",
})
// schedule a task to run every day at midnight
simple.Cron(ctx, "every-day", "0 0 * * *", CronInput{
Message: "Hello, World!",
})
🚏 任务路由
虽然 Hatchet 的默认行为是实现 FIFO 队列,但它也支持额外的调度机制来将你的任务路由到理想的 worker。
-
Sticky assignment(粘性分配) — 允许生成的任务倾向于或要求在同一个 worker 上执行。阅读更多 ➶
-
Worker affinity(Worker 亲和性) — 对 worker 进行排名,以发现哪个最适合处理给定的任务。阅读更多 ➶
-
Python
# create a workflow which prefers to run on the same worker, but can be
# scheduled on any worker if the original worker is busy
hatchet.workflow(
name="StickyWorkflow",
sticky=StickyStrategy.SOFT,
)
# create a workflow which must run on the same worker
hatchet.workflow(
name="StickyWorkflow",
sticky=StickyStrategy.HARD,
)
Typescript
// create a workflow which prefers to run on the same worker, but can be
// scheduled on any worker if the original worker is busy
hatchet.workflow({
name: "StickyWorkflow",
sticky: StickyStrategy.SOFT,
});
// create a workflow which must run on the same worker
hatchet.workflow({
name: "StickyWorkflow",
sticky: StickyStrategy.HARD,
});
Go
// create a workflow which prefers to run on the same worker, but can be
// scheduled on any worker if the original worker is busy
factory.NewWorkflow[StickyInput, StickyOutput](
create.WorkflowCreateOpts[StickyInput]{
Name: "sticky-dag",
StickyStrategy: types.StickyStrategy_SOFT,
},
hatchet,
);
// create a workflow which must run on the same worker
factory.NewWorkflow[StickyInput, StickyOutput](
create.WorkflowCreateOpts[StickyInput]{
Name: "sticky-dag",
StickyStrategy: types.StickyStrategy_HARD,
},
hatchet,
);
⚡️ 事件触发器和监听器
Hatchet 支持基于事件的架构,其中任务和工作流可以暂停执行,同时等待特定的外部事件。它支持以下功能:
-
Event listening(事件监听) — 可以暂停任务,直到触发特定事件。阅读更多 ➶
-
Event triggering(事件触发) — 事件可以触发新的工作流或工作流中的步骤。阅读更多 ➶
-
Python
# Create a task which waits for an external user event or sleeps for 10 seconds
@dag_with_conditions.task(
parents=[first_task],
wait_for=[
or_(
SleepCondition(timedelta(seconds=10)),
UserEventCondition(event_key="user:event"),
)
]
)
def second_task(input: EmptyModel, ctx: Context) -> dict[str, str]:
return {"completed": "true"}
Typescript
// Create a task which waits for an external user event or sleeps for 10 seconds
dagWithConditions.task({
name: "secondTask",
parents: [firstTask],
waitFor: Or({ eventKey: "user:event" }, { sleepFor: "10s" }),
fn: async (_, ctx) => {
return {
Completed: true,
};
},
});
Go
// Create a task which waits for an external user event or sleeps for 10 seconds
simple.Task(
conditionOpts{
Name: "Step2",
Parents: []create.NamedTask{
step1,
},
WaitFor: condition.Conditions(
condition.UserEventCondition("user:event", "'true'"),
condition.SleepCondition(10 * time.Second),
),
}, func(ctx worker.HatchetContext, input DagWithConditionsInput) (interface{}, error) {
// ...
},
);
🖥️ 实时 Web UI
Hatchet 捆绑了许多功能,可帮助你监控任务、工作流和队列。
实时仪表板和指标
通过实时更新监控你的任务、工作流和队列,以快速检测问题。内置警报,以便你可以尽快响应问题。
日志记录
Hatchet 支持从你的任务进行日志记录,允许你轻松地将任务失败与系统中的日志相关联。不再需要在日志服务中挖掘以找出任务失败的原因。
警报
Hatchet 支持 Slack 和基于电子邮件的警报,用于在任务失败时发出警报。警报是实时的,具有可调整的警报窗口。
快速开始
Hatchet 可作为云版本或自托管版本提供。请参阅以下文档以快速启动并运行:
文档
最新的文档可以在 https://docs.hatchet.run 找到。
社区和支持
- Discord - 最适合与维护者联系并与社区交流
- Github Issues - 用于提交错误报告
- Github Discussions - 用于发起适合异步通信的深入技术讨论
- Email - 最适合获得 Hatchet Cloud 支持以及账单、数据删除等方面的帮助。
Hatchet vs...
Hatchet vs Temporal
Hatchet 旨在成为一个通用的任务编排平台——它可以作为队列、基于 DAG 的编排器、持久执行引擎或所有三个。因此,Hatchet 涵盖了更广泛的用例,例如多种排队策略、速率限制、DAG 功能、条件触发、流式传输功能等等。
Temporal 专注于持久执行,并支持更广泛的数据库后端和结果存储,例如 Apache Cassandra、MySQL、PostgreSQL 和 SQLite。
何时使用 Hatchet: 当你希望更好地控制底层队列逻辑、运行基于 DAG 的工作流,或者希望通过仅运行 Hatchet 引擎和 Postgres 来简化自托管时。
何时使用 Temporal: 当你希望使用非 Postgres 结果存储,或者你的唯一工作负载最适合持久执行时。
Hatchet vs 任务队列 (BullMQ, Celery)
Hatchet 是一个持久的任务队列,这意味着它会持久保存所有执行的历史记录(最多保留期限),这使得易于监控 + 调试,并为上述许多持久性功能提供支持。这不是 Celery 和 BullMQ 的标准行为(你需要依赖第三方 UI 工具,这些工具的功能非常有限,例如 Celery Flower)。
何时使用 Hatchet: 当你希望结果被持久保存并在 UI 中可见时
何时使用任务队列库(如 BullMQ/Celery): 当你需要非常高的吞吐量 (>10k/s) 而无需保留,或者当你希望使用单个库(而不是像 Hatchet 这样的独立服务)与你的队列交互时。
Hatchet vs 基于 DAG 的平台 (Airflow, Prefect, Dagster)
这些工具通常是为数据工程师设计的,并且并非旨在作为高容量应用程序的一部分运行。它们通常具有更高的延迟和更高的成本,其主要卖点是与常见数据存储和连接器的集成。
何时使用 Hatchet: 当你希望使用基于 DAG 的框架、编写你自己的集成和函数,并且需要更高的吞吐量 (>100/s) 时
何时使用其他基于 DAG 的平台: 当你希望使用开箱即用的其他数据存储和连接器时
Hatchet vs AI 框架
大多数 AI 框架都是为了在内存中运行而构建的,将横向扩展和持久性作为事后考虑。虽然你可以将 AI 框架与 Hatchet 结合使用,但我们的大多数用户都会丢弃他们的 AI 框架并使用 Hatchet 的原语来构建他们的应用程序。
何时使用 Hatchet: 当你希望完全控制底层函数和 LLM 调用,或者你要求函数具有高可用性和持久性时。
何时使用 AI 框架: 当你希望快速开始使用简单的抽象时。
Issues
请通过 Github issues 提交你遇到的任何错误。
我想贡献
请在 Discord 上的 #contributing 频道中告诉我们你感兴趣的工作。这将有助于我们塑造项目的方向,并使协作更加容易!