LLM Workflows then Agents: Getting Started with Apache Airflow
从 LLM Workflows 到 Agents:使用 Apache Airflow 入门
[ astronomer ](https://github.com/astronomer/</astronomer>) / **[airflow-ai-sdk](https://github.com/astronomer/</astronomer/airflow-ai-sdk>) ** Public
一个基于 Pydantic AI,用于从 Apache Airflow 使用 LLM 和 AI Agents 的 SDK。
许可协议
100 stars 5 forks Branches Tags Activity
astronomer/airflow-ai-sdk
main
文件夹和文件
| 名称 | 名称 | 上次提交信息 | 上次提交日期 | | :------------------------------------------------------------------------------------------------------------------------------------------------------ | :------------------------------------------------------------------------------------------------------------------------------------------------------ | :------- | :------- | | .github/workflows | .github/workflows | | | | airflow_ai_sdk | airflow_ai_sdk | | | | examples | examples | | | | tests | tests | | | | .gitignore | .gitignore | | | | .python-version | .python-version | | | | LICENSE | LICENSE | | | | README.md | README.md | | | | Tiltfile | Tiltfile | | | | pyproject.toml | pyproject.toml | | | | uv.lock | uv.lock | | |
最新提交
历史记录
仓库文件导航
airflow-ai-sdk
该仓库包含一个基于 Pydantic AI,用于从 Apache Airflow 使用 LLM 的 SDK。它允许用户使用基于装饰器的任务直接在其 Airflow pipelines 中调用 LLM 和编排 agent 调用。该 SDK 利用熟悉的 Airflow @task
语法,并扩展了 @task.llm
、@task.llm_branch
和 @task.agent
等。
要开始使用,请查看 examples repository here,该仓库提供了一个完整的本地 Airflow 实例,其中安装了 AI SDK 和 5 个示例 pipelines。要在本地运行此程序,请运行:
git clone https://github.com/astronomer/ai-sdk-examples.git
cd ai-sdk-examples
astro dev start
如果您没有安装 Astro CLI,请运行 brew install astro
(或查看 here 上的其他选项)。
如果您已经运行了 Airflow,您还可以使用所需的任何可选依赖项安装该包:
pip install airflow-ai-sdk[openai,duckduckgo]
请注意,安装没有可选依赖项的软件包将安装软件包的精简版本,其中不包括任何 LLM 模型或工具。可用的可选软件包列在 here。虽然此 SDK 为了方便起见提供了可选的依赖项,但您也可以直接从 Pydantic AI 安装可选的依赖项。
目录:
Features
- LLM tasks with
@task.llm
: 定义调用语言模型(例如 GPT-3.5-turbo)来处理文本的任务。 - Agent tasks with
@task.agent
: 通过利用自定义工具来编排多步骤 AI 推理。 - Automatic output parsing: 使用函数类型提示(包括 Pydantic 模型)来自动解析和验证 LLM 输出。
- Branching with
@task.llm_branch
: 根据 LLM 的输出更改 DAG 的控制流。 - Model support: 支持 Pydantic AI 库中的所有模型 (OpenAI, Anthropic, Gemini, Ollama, Groq, Mistral, Cohere, Bedrock)
Design Principles
我们遵循 Airflow 的 taskflow 模式,带有三个装饰器:
@task.llm
:定义调用 LLM 的任务。在底层,这将创建一个没有工具的 Pydantic AIAgent
。@task.agent
:定义调用 agent 的任务。您可以直接传入 Pydantic AIAgent
。@task.llm_branch
:定义一个基于 LLM 的输出来分支 DAG 控制流的任务。强制 LLM 输出是下游 task_ids 之一。
提供给每个装饰器的函数是一个转换函数,它将 Airflow 任务的输入转换为 LLM 的输入。如果您不想进行任何转换,您可以保持输入不变直接返回即可。
Motivation
随着组织寻找从 LLM 中获取价值的务实方法,AI workflows 变得越来越普遍。与任何 workflow 一样,拥有一个灵活且可扩展的方式来编排它们非常重要。
Airflow 是编排数据 pipelines 的流行选择。它是一个强大的工具,用于管理任务之间的依赖关系以及调度和监视它们,并且已经受到各地数据团队的信任超过 10 年。它“电池包含”地提供了一组丰富的功能:
- 灵活的调度: 在固定时间表、按需或基于外部事件运行任务
- 动态任务映射: 轻松并行处理多个输入,并具有完整的错误处理和可观察性
- 分支和条件逻辑: 根据某些任务的输出更改 DAG 的控制流
- 错误处理: 内置对重试、指数退避和超时的支持
- 资源管理: 使用 Airflow Pools 限制任务的并发性
- 监控: 详细的日志和监控功能
- 可伸缩性: 专为生产 workflows 设计
此 SDK 旨在使将 LLM workflows 集成到您的 Airflow pipelines 中变得容易。它允许您执行从简单的 LLM 调用到复杂的 agentic workflows 的任何操作。
Examples
在 examples/dags 目录中查看完整的示例集。
LLM calls from a DAG (summarize Airflow's commits)
此示例展示了如何将 @task.llm
装饰器用作 Airflow DAG 的一部分。在 @task.llm
装饰器中,我们可以指定模型和系统提示。该装饰器允许您将 Airflow 任务的输入转换为 LLM 的输入。
参见完整示例:github_changelog.py
import os
import pendulum
from airflow.decorators import dag, task
from github import Github
@task
def get_recent_commits(data_interval_start: pendulum.DateTime, data_interval_end: pendulum.DateTime) -> list[str]:
"""
此任务返回最近提交的模拟列表。在实际的 workflow 中,此
任务将从数据库或 API 获取最近的提交。
"""
print(f"获取 {data_interval_start} 到 {data_interval_end} 的提交")
gh = Github(os.getenv("GITHUB_TOKEN"))
repo = gh.get_repo("apache/airflow")
commits = repo.get_commits(since=data_interval_start, until=data_interval_end)
return [f"{commit.commit.sha}: {commit.commit.message}" for commit in commits]
@task.llm(
model="gpt-4o-mini",
result_type=str,
system_prompt="""
您的工作是总结给定的一个星期的 Airflow 项目的提交。
特别注意大的更改和新功能,而不是
bug 修复和小的更改。
您不需要包括所有提交,只需包括最重要的提交。添加一行
顶部的更改的总体摘要,后跟最重要的要点的项目符号点
重要变化。
示例输出:
本周,我们对核心调度程序进行了架构更改,使其更
可维护且更易于理解。
- 使调度程序快 20% (commit 1234567)
- 添加了一个新的任务类型:`example_task` (commit 1234568)
- 添加了一个新的运算符:`example_operator` (commit 1234569)
- 添加了一个新的传感器:`example_sensor` (commit 1234570)
"""
)
def summarize_commits(commits: list[str] | None = None) -> str:
"""
此任务总结了提交。您可以在此处添加逻辑来转换输入
然后将其传递给 LLM。
"""
# 不需要进行任何转换
return "\n".join(commits)
@task
def send_summaries(summaries: str):
...
@dag(
schedule="@weekly",
start_date=pendulum.datetime(2025, 3, 1, tz="UTC"),
catchup=False,
)
def github_changelog():
commits = get_recent_commits()
summaries = summarize_commits(commits=commits)
send_summaries(summaries)
github_changelog()
LLM calls with structured outputs using @task.llm
(user feedback -> sentiment and feature requests)
此示例演示了如何使用 @task.llm
装饰器来调用 LLM 并返回结构化输出。在本例中,我们使用 Pydantic 模型来验证 LLM 的输出。我们建议使用 airflow_ai_sdk.BaseModel
类来定义您的 Pydantic 模型,以防我们在将来添加更多功能。
参见完整示例:product_feedback_summarization.py
import pendulum
from typing import Literal, Any
from airflow.decorators import dag, task
from airflow.exceptions import AirflowSkipException
import airflow_ai_sdk as ai_sdk
from include.pii import mask_pii
@task
def get_product_feedback() -> list[str]:
"""
此任务返回产品反馈的模拟列表。在实际的 workflow 中,此
任务将从数据库或 API 获取产品反馈。
"""
...
class ProductFeedbackSummary(ai_sdk.BaseModel):
summary: str
sentiment: Literal["positive", "negative", "neutral"]
feature_requests: list[str]
@task.llm(
model="gpt-4o-mini",
result_type=ProductFeedbackSummary,
system_prompt="""
您是一个有用的助手,可以总结产品反馈。
"""
)
def summarize_product_feedback(feedback: str | None = None) -> ProductFeedbackSummary:
"""
此任务总结了产品反馈。您可以在此处添加逻辑来转换输入
然后总结它。
"""
# 如果反馈没有提及 Airflow,则跳过它
if "Airflow" not in feedback:
raise AirflowSkipException("反馈没有提及 Airflow")
# 屏蔽反馈中的 PII
feedback = mask_pii(feedback)
return feedback
@task
def upload_summaries(summaries: list[dict[str, Any]]):
...
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
)
def product_feedback_summarization():
feedback = get_product_feedback()
summaries = summarize_product_feedback.expand(feedback=feedback)
upload_summaries(summaries)
product_feedback_summarization()
Agent calls with @task.agent
(deep research agent)
此示例展示了如何构建一个 AI agent,该 agent 可以在回答用户问题时自主调用外部工具(例如,知识库搜索)。
参见完整示例:deep_research.py
import pendulum
import requests
from airflow.decorators import dag, task
from airflow.models.dagrun import DagRun
from airflow.models.param import Param
from bs4 import BeautifulSoup
from pydantic_ai import Agent
from pydantic_ai.common_tools.duckduckgo import duckduckgo_search_tool
# custom tool to get the content of a page
def get_page_content(url: str) -> str:
"""
获取页面的内容。
"""
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")
distillation_agent = Agent(
"gpt-4o-mini",
system_prompt="""
您负责从文本中提取信息。摘要将由研究 agent 用于生成研究报告。
保持摘要简洁明了,只关注关键信息。
""",
)
return distillation_agent.run_sync(soup.get_text())
deep_research_agent = Agent(
"o3-mini",
system_prompt="""
您是一个深度研究 agent,非常擅长从网络中提取信息。您会收到一个查询,您的工作是生成研究报告。
您可以使用 `duckduckgo_search_tool` 搜索网络。您也可以使用 `get_page_content` 工具来获取页面的内容。
继续直到您有足够的信息来生成研究报告。假设您对查询或内容一无所知,因此您需要搜索网络以获取相关信息。
不要生成新信息,只提取网络中的信息。
""",
tools=[duckduckgo_search_tool(), get_page_content],
)
@task.agent(agent=deep_research_agent)
def deep_research_task(dag_run: DagRun) -> str:
"""
此任务对给定的查询执行深度研究。
"""
query = dag_run.conf.get("query")
if not query:
raise ValueError("查询是必需的")
print(f"对 {query} 执行深度研究")
return query
@task
def upload_results(results: str):
...
@dag(
schedule=None,
start_date=pendulum.datetime(2025, 3, 1, tz="UTC"),
catchup=False,
params={
"query": Param(
type="string",
default="How has the field of data engineering evolved in the last 5 years?",
),
},
)
def deep_research():
results = deep_research_task()
upload_results(results)
deep_research()
Changing dag control flow with @task.llm_branch
(support ticket routing)
此示例演示了如何使用 @task.llm_branch
装饰器来根据 LLM 的输出更改 DAG 的控制流。在本例中,我们根据工单的严重程度来路由支持工单。
参见完整示例:support_ticket_routing.py
import pendulum
from airflow.decorators import dag, task
from airflow.models.dagrun import DagRun
@task.llm_branch(
model="gpt-4o-mini",
system_prompt="""
您是一个支持 agent,可以根据工单的优先级来路由支持工单。
以下是优先级定义:
- P0:影响用户使用产品的关键问题,特别是对于生产部署。
- P1:影响用户使用产品的问题,但不如严重(或不适用于他们的生产部署)。
- P2:优先级较低的问题,可以等到下一个工作日
- P3:不重要或对时间不敏感的问题
以下是一些工单及其优先级的示例:
- "我们的生产部署因内存不足而崩溃了。请帮忙。": P0
- "我们的暂存/开发/质量保证部署因内存不足而崩溃了。请帮忙。": P1
- "我无法登录我的帐户。": P1
- "UI 没有加载。": P1
- "我需要帮助设置我的帐户。": P2
- "我对产品有疑问。": P3
""",
allow_multiple_branches=True,
)
def route_ticket(dag_run: DagRun) -> str:
return dag_run.conf.get("ticket")
@task
def handle_p0_ticket(ticket: str):
print(f"处理 P0 工单:{ticket}")
@task
def handle_p1_ticket(ticket: str):
print(f"处理 P1 工单:{ticket}")
@task
def handle_p2_ticket(ticket: str):
print(f"处理 P2 工单:{ticket}")
@task
def handle_p3_ticket(ticket: str):
print(f"处理 P3 工单:{ticket}")
@dag(
start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
schedule=None,
catchup=False,
params={"ticket": "Hi, our production deployment just went down because it ran out of memory. Please help."}
)
def support_ticket_routing():
ticket = route_ticket()
handle_p0_ticket(ticket)
handle_p1_ticket(ticket)
handle_p2_ticket(ticket)
handle_p3_ticket(ticket)
support_ticket_routing()
关于
一个基于 Pydantic AI,用于从 Apache Airflow 使用 LLM 和 AI Agents 的 SDK。
资源
许可协议
Stars
Watchers
Forks
Releases 2
Packages 0
未发布任何软件包
语言
页脚
GitHub © 2025 GitHub, Inc.
页脚导航
您目前无法执行该操作。