Airflow Sensors 概念
介绍
在 Apache Airflow 中,Sensors 是一种特殊类型的任务,用于监控外部系统的状态变化。它们会持续检查某个条件是否满足,直到条件为真或超时。Sensors 是 Airflow 中实现任务依赖性和外部系统集成的重要工具。
Sensors 通常用于以下场景:
- 等待某个文件在文件系统中出现。
- 等待数据库中的某条记录被更新。
- 等待某个 API 返回特定的响应。
通过使用 Sensors,你可以确保任务只在特定条件满足时才会执行,从而提高工作流的可靠性和灵活性。
Sensors 的工作原理
Sensors 的核心思想是轮询。它们会定期检查某个条件是否满足,如果条件满足,则任务成功完成;如果条件未满足,则任务会继续等待,直到超时。
在 Airflow 中,Sensors 继承自 BaseSensorOperator
类。每个 Sensor 都需要实现 poke
方法,该方法定义了如何检查条件是否满足。
代码示例
以下是一个简单的 Sensor 示例,它等待某个文件在指定路径下出现:
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
import os
class FileSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, filepath, *args, **kwargs):
super(FileSensor, self).__init__(*args, **kwargs)
self.filepath = filepath
def poke(self, context):
self.log.info(f"Checking if file exists: {self.filepath}")
return os.path.exists(self.filepath)
在这个示例中,FileSensor
会定期检查 filepath
指定的文件是否存在。如果文件存在,poke
方法返回 True
,任务成功完成;否则,任务会继续等待。
Sensors 的类型
Airflow 提供了多种内置的 Sensors,用于监控不同类型的外部系统。以下是一些常见的 Sensors:
- FileSensor: 监控文件系统中的文件是否存在。
- SqlSensor: 监控数据库中的某条记录是否满足特定条件。
- HttpSensor: 监控某个 HTTP 端点是否返回预期的响应。
- ExternalTaskSensor: 监控另一个 DAG 中的任务是否完成。
实际案例
假设你有一个工作流,需要在某个文件上传到指定目录后开始处理。你可以使用 FileSensor
来监控文件的上传情况:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}
with DAG('file_processing_dag', default_args=default_args, schedule_interval='@daily') as dag:
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/path/to/uploaded_file.txt',
poke_interval=60, # 每60秒检查一次
timeout=600, # 最多等待10分钟
)
process_file = DummyOperator(task_id='process_file')
wait_for_file >> process_file
在这个例子中,FileSensor
会每60秒检查一次文件是否上传,最多等待10分钟。如果文件在10分钟内上传成功,process_file
任务将会执行。
总结
Sensors 是 Apache Airflow 中用于监控外部系统状态变化的重要工具。通过使用 Sensors,你可以确保任务只在特定条件满足时才会执行,从而提高工作流的可靠性和灵活性。
在实际应用中,你可以根据需求选择合适的内置 Sensor,或者自定义 Sensor 来满足特定的监控需求。
附加资源
练习
- 创建一个自定义 Sensor,用于监控某个 API 的响应状态码是否为200。
- 使用
ExternalTaskSensor
监控另一个 DAG 中的任务是否完成,并在完成后触发当前 DAG 的任务。