Airflow API触发
介绍
Apache Airflow 是一个强大的工作流调度工具,广泛用于数据管道的编排和管理。除了通过 Airflow Web UI 或命令行触发 DAG 运行外,Airflow 还提供了 REST API,允许开发者通过编程方式触发和管理 DAG。这对于自动化任务、集成外部系统或构建自定义调度逻辑非常有用。
本文将详细介绍如何使用 Airflow API 触发 DAG 运行,并提供实际案例和代码示例,帮助你快速上手。
Airflow API 的基本概念
Airflow API 是 Airflow 提供的 RESTful 接口,允许用户通过 HTTP 请求与 Airflow 进行交互。通过 API,你可以执行以下操作:
- 触发 DAG 运行
- 获取 DAG 状态
- 管理任务实例
- 查询日志等
在本文中,我们将重点介绍如何使用 API 触发 DAG 运行。
准备工作
在开始之前,请确保以下条件已满足:
- Airflow 已安装并运行:确保你的 Airflow 实例已启动并可以访问。
- API 访问权限:确保你有权限访问 Airflow API。默认情况下,Airflow 2.0 及以上版本启用了 REST API。
- API 认证:Airflow API 需要认证。你可以使用 Basic Auth 或 Token 进行认证。
使用 Airflow API 触发 DAG
1. 获取 API 端点
Airflow API 的默认端点是 /api/v1/dags/{dag_id}/dagRuns
,其中 {dag_id}
是你要触发的 DAG 的 ID。
2. 构造请求
你需要发送一个 POST 请求到上述端点,请求体中包含触发 DAG 所需的参数。以下是一个示例请求:
curl -X POST "http://<airflow-host>/api/v1/dags/<dag_id>/dagRuns" \
-H "Content-Type: application/json" \
-H "Authorization: Basic <base64-encoded-credentials>" \
-d '{
"conf": {},
"dag_run_id": "manual__2023-10-01T00:00:00",
"logical_date": "2023-10-01T00:00:00"
}'
<airflow-host>
:你的 Airflow 实例的主机地址。<dag_id>
:你要触发的 DAG 的 ID。<base64-encoded-credentials>
:你的认证信息(用户名和密码)的 Base64 编码。
3. 参数说明
conf
:可选参数,用于传递配置信息给 DAG。dag_run_id
:可选参数,指定 DAG 运行的唯一 ID。如果不提供,Airflow 会自动生成一个。logical_date
:可选参数,指定 DAG 运行的逻辑日期。如果不提供,默认使用当前时间。
4. 示例代码
以下是一个使用 Python 的 requests
库触发 DAG 的示例:
import requests
import base64
# Airflow API 端点
url = "http://<airflow-host>/api/v1/dags/<dag_id>/dagRuns"
# 认证信息
username = "your_username"
password = "your_password"
credentials = base64.b64encode(f"{username}:{password}".encode()).decode()
# 请求头
headers = {
"Content-Type": "application/json",
"Authorization": f"Basic {credentials}"
}
# 请求体
data = {
"conf": {},
"dag_run_id": "manual__2023-10-01T00:00:00",
"logical_date": "2023-10-01T00:00:00"
}
# 发送请求
response = requests.post(url, headers=headers, json=data)
# 输出响应
print(response.status_code)
print(response.json())
实际应用场景
场景 1:自动化任务触发
假设你有一个每天凌晨运行的 DAG,用于处理前一天的数据。如果某天数据延迟到达,你可以通过 API 手动触发 DAG 运行,而无需登录 Airflow Web UI。
场景 2:与外部系统集成
如果你的系统需要根据外部事件(如文件上传或数据库更新)触发 Airflow DAG,可以通过 API 实现无缝集成。
总结
通过 Airflow API 触发 DAG 是一种灵活且强大的方式,特别适合需要自动化或与外部系统集成的场景。本文介绍了如何使用 API 触发 DAG,并提供了实际案例和代码示例。
附加资源与练习
- Airflow 官方文档:了解更多关于 Airflow API 的详细信息,请访问 Airflow API 文档。
- 练习:尝试使用 Python 脚本触发一个简单的 DAG,并观察其运行状态。
如果你在实践过程中遇到问题,可以参考 Airflow 社区论坛或 GitHub 上的讨论。