跳到主要内容

Airflow API触发

介绍

Apache Airflow 是一个强大的工作流调度工具,广泛用于数据管道的编排和管理。除了通过 Airflow Web UI 或命令行触发 DAG 运行外,Airflow 还提供了 REST API,允许开发者通过编程方式触发和管理 DAG。这对于自动化任务、集成外部系统或构建自定义调度逻辑非常有用。

本文将详细介绍如何使用 Airflow API 触发 DAG 运行,并提供实际案例和代码示例,帮助你快速上手。


Airflow API 的基本概念

Airflow API 是 Airflow 提供的 RESTful 接口,允许用户通过 HTTP 请求与 Airflow 进行交互。通过 API,你可以执行以下操作:

  • 触发 DAG 运行
  • 获取 DAG 状态
  • 管理任务实例
  • 查询日志等

在本文中,我们将重点介绍如何使用 API 触发 DAG 运行。


准备工作

在开始之前,请确保以下条件已满足:

  1. Airflow 已安装并运行:确保你的 Airflow 实例已启动并可以访问。
  2. API 访问权限:确保你有权限访问 Airflow API。默认情况下,Airflow 2.0 及以上版本启用了 REST API。
  3. API 认证:Airflow API 需要认证。你可以使用 Basic Auth 或 Token 进行认证。

使用 Airflow API 触发 DAG

1. 获取 API 端点

Airflow API 的默认端点是 /api/v1/dags/{dag_id}/dagRuns,其中 {dag_id} 是你要触发的 DAG 的 ID。

2. 构造请求

你需要发送一个 POST 请求到上述端点,请求体中包含触发 DAG 所需的参数。以下是一个示例请求:

bash
curl -X POST "http://<airflow-host>/api/v1/dags/<dag_id>/dagRuns" \
-H "Content-Type: application/json" \
-H "Authorization: Basic <base64-encoded-credentials>" \
-d '{
"conf": {},
"dag_run_id": "manual__2023-10-01T00:00:00",
"logical_date": "2023-10-01T00:00:00"
}'
  • <airflow-host>:你的 Airflow 实例的主机地址。
  • <dag_id>:你要触发的 DAG 的 ID。
  • <base64-encoded-credentials>:你的认证信息(用户名和密码)的 Base64 编码。

3. 参数说明

  • conf:可选参数,用于传递配置信息给 DAG。
  • dag_run_id:可选参数,指定 DAG 运行的唯一 ID。如果不提供,Airflow 会自动生成一个。
  • logical_date:可选参数,指定 DAG 运行的逻辑日期。如果不提供,默认使用当前时间。

4. 示例代码

以下是一个使用 Python 的 requests 库触发 DAG 的示例:

python
import requests
import base64

# Airflow API 端点
url = "http://<airflow-host>/api/v1/dags/<dag_id>/dagRuns"

# 认证信息
username = "your_username"
password = "your_password"
credentials = base64.b64encode(f"{username}:{password}".encode()).decode()

# 请求头
headers = {
"Content-Type": "application/json",
"Authorization": f"Basic {credentials}"
}

# 请求体
data = {
"conf": {},
"dag_run_id": "manual__2023-10-01T00:00:00",
"logical_date": "2023-10-01T00:00:00"
}

# 发送请求
response = requests.post(url, headers=headers, json=data)

# 输出响应
print(response.status_code)
print(response.json())

实际应用场景

场景 1:自动化任务触发

假设你有一个每天凌晨运行的 DAG,用于处理前一天的数据。如果某天数据延迟到达,你可以通过 API 手动触发 DAG 运行,而无需登录 Airflow Web UI。

场景 2:与外部系统集成

如果你的系统需要根据外部事件(如文件上传或数据库更新)触发 Airflow DAG,可以通过 API 实现无缝集成。


总结

通过 Airflow API 触发 DAG 是一种灵活且强大的方式,特别适合需要自动化或与外部系统集成的场景。本文介绍了如何使用 API 触发 DAG,并提供了实际案例和代码示例。


附加资源与练习

  1. Airflow 官方文档:了解更多关于 Airflow API 的详细信息,请访问 Airflow API 文档
  2. 练习:尝试使用 Python 脚本触发一个简单的 DAG,并观察其运行状态。
提示

如果你在实践过程中遇到问题,可以参考 Airflow 社区论坛或 GitHub 上的讨论。