Airflow Sensors基础
介绍
在 Apache Airflow 中,Sensors 是一种特殊类型的 Operator,用于监控外部系统的状态变化。Sensors 会持续检查某个条件是否满足,直到条件为真或超时。它们通常用于等待某个外部事件的发生,例如文件到达、数据库记录更新或 API 调用完成。
Sensors 是 Airflow DAG 开发中的重要组成部分,特别是在需要与外部系统交互的场景中。通过 Sensors,你可以确保任务只在特定条件满足时执行,从而提高工作流的可靠性和效率。
Sensors 的工作原理
Sensors 的核心功能是轮询。它们会定期检查某个条件是否满足,如果条件满足,则任务成功完成;如果条件未满足,则继续等待。Sensors 的行为可以通过以下参数进行配置:
- poke_interval: 两次检查之间的时间间隔(默认值为 60 秒)。
- timeout: 传感器等待条件满足的最大时间(默认值为 7 天)。
- mode: 传感器的运行模式,可以是
poke
(默认)或reschedule
。
提示
在 poke
模式下,传感器会持续占用工作线程,直到条件满足或超时。而在 reschedule
模式下,传感器会在每次检查后释放工作线程,适合长时间等待的场景。
常用 Sensors 类型
Airflow 提供了多种内置 Sensors,以下是一些常见的 Sensors:
- FileSensor: 用于监控文件系统中的文件是否存在。
- SqlSensor: 用于监控数据库查询结果是否满足特定条件。
- HttpSensor: 用于监控 HTTP 请求的响应是否满足特定条件。
- ExternalTaskSensor: 用于监控其他 DAG 中的任务是否完成。
示例:使用 FileSensor
以下是一个使用 FileSensor
的示例,该传感器会等待指定文件出现在文件系统中:
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.dummy import DummyOperator
from datetime import datetime
with DAG(
dag_id="file_sensor_example",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
) as dag:
wait_for_file = FileSensor(
task_id="wait_for_file",
filepath="/path/to/your/file.txt",
poke_interval=30, # 每 30 秒检查一次
timeout=3600, # 最多等待 1 小时
mode="poke",
)
process_file = DummyOperator(task_id="process_file")
wait_for_file >> process_file
在这个示例中,FileSensor
会每 30 秒检查一次 /path/to/your/file.txt
文件是否存在。如果文件在 1 小时内出现,任务 process_file
将会执行;否则,任务将失败。