Airflow REST API 使用
Airflow 是一个强大的工作流调度工具,广泛用于数据管道的编排和管理。除了通过 Airflow 的 Web UI 和命令行工具来管理任务,你还可以通过 Airflow 的 REST API 与系统进行交互。本文将详细介绍如何使用 Airflow REST API,帮助你更好地管理和监控你的工作流。
什么是 Airflow REST API?
Airflow REST API 是一个基于 HTTP 的接口,允许你通过发送 HTTP 请求来与 Airflow 进行交互。通过 REST API,你可以执行诸如触发 DAG 运行、获取任务状态、管理变量和连接等操作。这使得 Airflow 可以与其他系统集成,或者通过编程方式自动化管理任务。
启用 Airflow REST API
在开始使用 Airflow REST API 之前,你需要确保它已启用。默认情况下,Airflow 2.0 及以上版本已经内置了 REST API。你可以通过以下步骤来启用和配置 REST API:
-
安装 Airflow:确保你已经安装了 Airflow,并且版本在 2.0 及以上。
-
配置认证:Airflow REST API 支持多种认证方式,如 Basic Auth、OAuth 等。你可以在
airflow.cfg
文件中配置认证方式。ini[api]
auth_backend = airflow.api.auth.backend.basic_auth -
启动 Airflow:确保 Airflow Web 服务器和调度器正在运行。
使用 Airflow REST API
1. 获取 DAG 列表
你可以通过发送 GET 请求到 /api/v1/dags
来获取所有 DAG 的列表。
curl -X GET http://localhost:8080/api/v1/dags -u admin:admin
响应示例:
{
"dags": [
{
"dag_id": "example_dag",
"is_paused": false,
"is_subdag": false,
"fileloc": "/path/to/dag.py",
"file_token": "token",
"owners": ["airflow"],
"description": "An example DAG",
"schedule_interval": "@daily",
"tags": ["example"]
}
],
"total_entries": 1
}
2. 触发 DAG 运行
你可以通过发送 POST 请求到 /api/v1/dags/{dag_id}/dagRuns
来触发一个 DAG 运行。
curl -X POST http://localhost:8080/api/v1/dags/example_dag/dagRuns \
-u admin:admin \
-H "Content-Type: application/json" \
-d '{"conf": {}}'
响应示例:
{
"conf": {},
"dag_id": "example_dag",
"dag_run_id": "manual__2023-10-01T12:00:00+00:00",
"end_date": null,
"execution_date": "2023-10-01T12:00:00+00:00",
"external_trigger": true,
"start_date": "2023-10-01T12:00:00+00:00",
"state": "running"
}
3. 获取任务状态
你可以通过发送 GET 请求到 /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}
来获取特定任务的状态。
curl -X GET http://localhost:8080/api/v1/dags/example_dag/dagRuns/manual__2023-10-01T12:00:00+00:00/taskInstances/task_1 \
-u admin:admin
响应示例:
{
"dag_id": "example_dag",
"dag_run_id": "manual__2023-10-01T12:00:00+00:00",
"duration": 10.5,
"end_date": "2023-10-01T12:00:10+00:00",
"execution_date": "2023-10-01T12:00:00+00:00",
"executor_config": {},
"hostname": "localhost",
"max_tries": 0,
"operator": "BashOperator",
"pid": 12345,
"pool": "default_pool",
"pool_slots": 1,
"priority_weight": 1,
"queue": "default",
"queued_when": "2023-10-01T12:00:00+00:00",
"start_date": "2023-10-01T12:00:00+00:00",
"state": "success",
"task_id": "task_1",
"try_number": 1,
"unixname": "airflow"
}
实际案例:自动化 DAG 触发
假设你有一个数据管道,每天需要从外部系统获取数据并处理。你可以编写一个脚本,每天定时调用 Airflow REST API 来触发 DAG 运行。
import requests
from datetime import datetime
AIRFLOW_API_URL = "http://localhost:8080/api/v1"
AUTH = ("admin", "admin")
def trigger_dag(dag_id):
url = f"{AIRFLOW_API_URL}/dags/{dag_id}/dagRuns"
response = requests.post(
url,
auth=AUTH,
headers={"Content-Type": "application/json"},
json={"conf": {}, "execution_date": datetime.utcnow().isoformat() + "Z"}
)
return response.json()
if __name__ == "__main__":
dag_id = "example_dag"
result = trigger_dag(dag_id)
print(result)
输出:
{
"conf": {},
"dag_id": "example_dag",
"dag_run_id": "manual__2023-10-01T12:00:00+00:00",
"end_date": null,
"execution_date": "2023-10-01T12:00:00+00:00",
"external_trigger": true,
"start_date": "2023-10-01T12:00:00+00:00",
"state": "running"
}
总结
通过 Airflow REST API,你可以以编程方式管理和监控你的工作流。本文介绍了如何启用 REST API,并通过几个常见的 API 调用示例展示了如何获取 DAG 列表、触发 DAG 运行以及获取任务状态。我们还通过一个实际案例展示了如何自动化 DAG 触发。
附加资源
练习
- 尝试使用 Airflow REST API 获取所有 DAG 的列表,并筛选出当前正在运行的 DAG。
- 编写一个脚本,每天定时触发一个特定的 DAG,并记录每次触发的状态。
通过实践这些练习,你将更深入地理解 Airflow REST API 的使用,并能够在实际项目中灵活应用。