从 LLM Workflows 到 Agents:使用 Apache Airflow 入门

跳至内容

[ astronomer ](https://github.com/astronomer/</astronomer>) / **[airflow-ai-sdk](https://github.com/astronomer/</astronomer/airflow-ai-sdk>) ** Public

一个基于 Pydantic AI,用于从 Apache Airflow 使用 LLM 和 AI Agents 的 SDK。

许可协议

Apache-2.0 license

100 stars 5 forks Branches Tags Activity

astronomer/airflow-ai-sdk

main

BranchesTags

文件夹和文件

| 名称 | 名称 | 上次提交信息 | 上次提交日期 | | :------------------------------------------------------------------------------------------------------------------------------------------------------ | :------------------------------------------------------------------------------------------------------------------------------------------------------ | :------- | :------- | | .github/workflows | .github/workflows | | | | airflow_ai_sdk | airflow_ai_sdk | | | | examples | examples | | | | tests | tests | | | | .gitignore | .gitignore | | | | .python-version | .python-version | | | | LICENSE | LICENSE | | | | README.md | README.md | | | | Tiltfile | Tiltfile | | | | pyproject.toml | pyproject.toml | | | | uv.lock | uv.lock | | |

最新提交

历史记录

11 Commits

查看所有文件

仓库文件导航

airflow-ai-sdk

该仓库包含一个基于 Pydantic AI,用于从 Apache Airflow 使用 LLM 的 SDK。它允许用户使用基于装饰器的任务直接在其 Airflow pipelines 中调用 LLM 和编排 agent 调用。该 SDK 利用熟悉的 Airflow @task 语法,并扩展了 @task.llm@task.llm_branch@task.agent 等。

要开始使用,请查看 examples repository here,该仓库提供了一个完整的本地 Airflow 实例,其中安装了 AI SDK 和 5 个示例 pipelines。要在本地运行此程序,请运行:

git clone https://github.com/astronomer/ai-sdk-examples.git
cd ai-sdk-examples
astro dev start

如果您没有安装 Astro CLI,请运行 brew install astro(或查看 here 上的其他选项)。

如果您已经运行了 Airflow,您还可以使用所需的任何可选依赖项安装该包:

pip install airflow-ai-sdk[openai,duckduckgo]

请注意,安装没有可选依赖项的软件包将安装软件包的精简版本,其中不包括任何 LLM 模型或工具。可用的可选软件包列在 here。虽然此 SDK 为了方便起见提供了可选的依赖项,但您也可以直接从 Pydantic AI 安装可选的依赖项。

目录:

Features

Design Principles

我们遵循 Airflow 的 taskflow 模式,带有三个装饰器:

提供给每个装饰器的函数是一个转换函数,它将 Airflow 任务的输入转换为 LLM 的输入。如果您不想进行任何转换,您可以保持输入不变直接返回即可。

Motivation

随着组织寻找从 LLM 中获取价值的务实方法,AI workflows 变得越来越普遍。与任何 workflow 一样,拥有一个灵活且可扩展的方式来编排它们非常重要。

Airflow 是编排数据 pipelines 的流行选择。它是一个强大的工具,用于管理任务之间的依赖关系以及调度和监视它们,并且已经受到各地数据团队的信任超过 10 年。它“电池包含”地提供了一组丰富的功能:

此 SDK 旨在使将 LLM workflows 集成到您的 Airflow pipelines 中变得容易。它允许您执行从简单的 LLM 调用到复杂的 agentic workflows 的任何操作。

Examples

examples/dags 目录中查看完整的示例集。

LLM calls from a DAG (summarize Airflow's commits)

此示例展示了如何将 @task.llm 装饰器用作 Airflow DAG 的一部分。在 @task.llm 装饰器中,我们可以指定模型和系统提示。该装饰器允许您将 Airflow 任务的输入转换为 LLM 的输入。

参见完整示例:github_changelog.py

import os
import pendulum
from airflow.decorators import dag, task
from github import Github

@task
def get_recent_commits(data_interval_start: pendulum.DateTime, data_interval_end: pendulum.DateTime) -> list[str]:
    """
    此任务返回最近提交的模拟列表。在实际的 workflow 中,此
    任务将从数据库或 API 获取最近的提交。
    """
    print(f"获取 {data_interval_start} 到 {data_interval_end} 的提交")
    gh = Github(os.getenv("GITHUB_TOKEN"))
    repo = gh.get_repo("apache/airflow")
    commits = repo.get_commits(since=data_interval_start, until=data_interval_end)
    return [f"{commit.commit.sha}: {commit.commit.message}" for commit in commits]

@task.llm(
    model="gpt-4o-mini",
    result_type=str,
    system_prompt="""
    您的工作是总结给定的一个星期的 Airflow 项目的提交。
    特别注意大的更改和新功能,而不是
    bug 修复和小的更改。
    您不需要包括所有提交,只需包括最重要的提交。添加一行
    顶部的更改的总体摘要,后跟最重要的要点的项目符号点
    重要变化。
    示例输出:
    本周,我们对核心调度程序进行了架构更改,使其更
    可维护且更易于理解。
    - 使调度程序快 20% (commit 1234567)
    - 添加了一个新的任务类型:`example_task` (commit 1234568)
    - 添加了一个新的运算符:`example_operator` (commit 1234569)
    - 添加了一个新的传感器:`example_sensor` (commit 1234570)
    """
)
def summarize_commits(commits: list[str] | None = None) -> str:
    """
    此任务总结了提交。您可以在此处添加逻辑来转换输入
    然后将其传递给 LLM。
    """
    # 不需要进行任何转换
    return "\n".join(commits)

@task
def send_summaries(summaries: str):
    ...

@dag(
    schedule="@weekly",
    start_date=pendulum.datetime(2025, 3, 1, tz="UTC"),
    catchup=False,
)
def github_changelog():
    commits = get_recent_commits()
    summaries = summarize_commits(commits=commits)
    send_summaries(summaries)

github_changelog()

LLM calls with structured outputs using @task.llm (user feedback -> sentiment and feature requests)

此示例演示了如何使用 @task.llm 装饰器来调用 LLM 并返回结构化输出。在本例中,我们使用 Pydantic 模型来验证 LLM 的输出。我们建议使用 airflow_ai_sdk.BaseModel 类来定义您的 Pydantic 模型,以防我们在将来添加更多功能。

参见完整示例:product_feedback_summarization.py

import pendulum
from typing import Literal, Any
from airflow.decorators import dag, task
from airflow.exceptions import AirflowSkipException
import airflow_ai_sdk as ai_sdk
from include.pii import mask_pii

@task
def get_product_feedback() -> list[str]:
    """
    此任务返回产品反馈的模拟列表。在实际的 workflow 中,此
    任务将从数据库或 API 获取产品反馈。
    """
    ...

class ProductFeedbackSummary(ai_sdk.BaseModel):
    summary: str
    sentiment: Literal["positive", "negative", "neutral"]
    feature_requests: list[str]

@task.llm(
    model="gpt-4o-mini",
    result_type=ProductFeedbackSummary,
    system_prompt="""
    您是一个有用的助手,可以总结产品反馈。
    """
)
def summarize_product_feedback(feedback: str | None = None) -> ProductFeedbackSummary:
    """
    此任务总结了产品反馈。您可以在此处添加逻辑来转换输入
    然后总结它。
    """
    # 如果反馈没有提及 Airflow,则跳过它
    if "Airflow" not in feedback:
        raise AirflowSkipException("反馈没有提及 Airflow")
    # 屏蔽反馈中的 PII
    feedback = mask_pii(feedback)
    return feedback

@task
def upload_summaries(summaries: list[dict[str, Any]]):
    ...

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
)
def product_feedback_summarization():
    feedback = get_product_feedback()
    summaries = summarize_product_feedback.expand(feedback=feedback)
    upload_summaries(summaries)

product_feedback_summarization()

Agent calls with @task.agent (deep research agent)

此示例展示了如何构建一个 AI agent,该 agent 可以在回答用户问题时自主调用外部工具(例如,知识库搜索)。

参见完整示例:deep_research.py

import pendulum
import requests
from airflow.decorators import dag, task
from airflow.models.dagrun import DagRun
from airflow.models.param import Param
from bs4 import BeautifulSoup
from pydantic_ai import Agent
from pydantic_ai.common_tools.duckduckgo import duckduckgo_search_tool

# custom tool to get the content of a page
def get_page_content(url: str) -> str:
    """
    获取页面的内容。
    """
    response = requests.get(url)
    soup = BeautifulSoup(response.text, "html.parser")
    distillation_agent = Agent(
        "gpt-4o-mini",
        system_prompt="""
        您负责从文本中提取信息。摘要将由研究 agent 用于生成研究报告。
        保持摘要简洁明了,只关注关键信息。
        """,
    )
    return distillation_agent.run_sync(soup.get_text())

deep_research_agent = Agent(
    "o3-mini",
    system_prompt="""
    您是一个深度研究 agent,非常擅长从网络中提取信息。您会收到一个查询,您的工作是生成研究报告。
    您可以使用 `duckduckgo_search_tool` 搜索网络。您也可以使用 `get_page_content` 工具来获取页面的内容。
    继续直到您有足够的信息来生成研究报告。假设您对查询或内容一无所知,因此您需要搜索网络以获取相关信息。
    不要生成新信息,只提取网络中的信息。
    """,
    tools=[duckduckgo_search_tool(), get_page_content],
)

@task.agent(agent=deep_research_agent)
def deep_research_task(dag_run: DagRun) -> str:
    """
    此任务对给定的查询执行深度研究。
    """
    query = dag_run.conf.get("query")
    if not query:
        raise ValueError("查询是必需的")
    print(f"对 {query} 执行深度研究")
    return query

@task
def upload_results(results: str):
    ...

@dag(
    schedule=None,
    start_date=pendulum.datetime(2025, 3, 1, tz="UTC"),
    catchup=False,
    params={
        "query": Param(
            type="string",
            default="How has the field of data engineering evolved in the last 5 years?",
        ),
    },
)
def deep_research():
    results = deep_research_task()
    upload_results(results)

deep_research()

Changing dag control flow with @task.llm_branch (support ticket routing)

此示例演示了如何使用 @task.llm_branch 装饰器来根据 LLM 的输出更改 DAG 的控制流。在本例中,我们根据工单的严重程度来路由支持工单。

参见完整示例:support_ticket_routing.py

import pendulum
from airflow.decorators import dag, task
from airflow.models.dagrun import DagRun

@task.llm_branch(
    model="gpt-4o-mini",
    system_prompt="""
    您是一个支持 agent,可以根据工单的优先级来路由支持工单。
    以下是优先级定义:
    - P0:影响用户使用产品的关键问题,特别是对于生产部署。
    - P1:影响用户使用产品的问题,但不如严重(或不适用于他们的生产部署)。
    - P2:优先级较低的问题,可以等到下一个工作日
    - P3:不重要或对时间不敏感的问题
    以下是一些工单及其优先级的示例:
    - "我们的生产部署因内存不足而崩溃了。请帮忙。": P0
    - "我们的暂存/开发/质量保证部署因内存不足而崩溃了。请帮忙。": P1
    - "我无法登录我的帐户。": P1
    - "UI 没有加载。": P1
    - "我需要帮助设置我的帐户。": P2
    - "我对产品有疑问。": P3
    """,
    allow_multiple_branches=True,
)
def route_ticket(dag_run: DagRun) -> str:
    return dag_run.conf.get("ticket")

@task
def handle_p0_ticket(ticket: str):
    print(f"处理 P0 工单:{ticket}")

@task
def handle_p1_ticket(ticket: str):
    print(f"处理 P1 工单:{ticket}")

@task
def handle_p2_ticket(ticket: str):
    print(f"处理 P2 工单:{ticket}")

@task
def handle_p3_ticket(ticket: str):
    print(f"处理 P3 工单:{ticket}")

@dag(
    start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
    schedule=None,
    catchup=False,
    params={"ticket": "Hi, our production deployment just went down because it ran out of memory. Please help."}
)
def support_ticket_routing():
    ticket = route_ticket()
    handle_p0_ticket(ticket)
    handle_p1_ticket(ticket)
    handle_p2_ticket(ticket)
    handle_p3_ticket(ticket)

support_ticket_routing()

关于

一个基于 Pydantic AI,用于从 Apache Airflow 使用 LLM 和 AI Agents 的 SDK。

资源

Readme

许可协议

Apache-2.0 license

Activity

Custom properties

Stars

100 stars

Watchers

4 watching

Forks

5 forks

报告存储库

Releases 2

v0.1.0 最新 Mar 27, 2025

+ 1 release

Packages 0

未发布任何软件包

语言

页脚

GitHub © 2025 GitHub, Inc.

页脚导航

您目前无法执行该操作。