Airflow XComs 概念
在 Apache Airflow 中,任务(Task)是工作流的基本构建块。每个任务通常执行一个特定的操作,但在复杂的场景中,任务之间可能需要共享数据。这就是 XComs(Cross-Communication)的用武之地。XComs 允许任务之间传递小量数据,从而实现任务间的通信。
什么是 XComs?
XComs 是 Airflow 中用于任务间通信的机制。它允许一个任务将数据推送到 XComs 存储中,另一个任务可以从该存储中拉取数据。XComs 的名称来源于 "Cross-Communication",即跨任务通信。
备注
XComs 适用于传递小量数据(例如字符串、数字或 JSON 对象)。如果需要传递大量数据,建议使用外部存储(如数据库或文件系统)。
XComs 的工作原理
XComs 的核心思想是通过键值对(key-value pairs)存储和检索数据。每个任务可以将数据推送到 XComs 存储中,并指定一个唯一的键(key)。其他任务可以通过该键从 XComs 中拉取数据。
数据推送与拉取
- 推送数据:使用
xcom_push
方法将数据推送到 XComs 存储中。 - 拉取数据:使用
xcom_pull
方法从 XComs 存储中拉取数据。
示例代码
以下是一个简单的示例,展示了如何在两个任务之间使用 XComs 传递数据:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def push_data(**kwargs):
# 推送数据到 XComs
kwargs['ti'].xcom_push(key='my_key', value='Hello from Task 1')
def pull_data(**kwargs):
# 从 XComs 拉取数据
value = kwargs['ti'].xcom_pull(key='my_key')
print(f"Received value: {value}")
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}
with DAG('xcom_example', default_args=default_args, schedule_interval=None) as dag:
task1 = PythonOperator(
task_id='task1',
python_callable=push_data,
provide_context=True,
)
task2 = PythonOperator(
task_id='task2',
python_callable=pull_data,
provide_context=True,
)
task1 >> task2
在这个示例中:
task1
推送了一个键为my_key
,值为'Hello from Task 1'
的数据到 XComs。task2
从 XComs 中拉取该数据并打印出来。