跳到主要内容

Airflow Sensors 概念

介绍

在 Apache Airflow 中,Sensors 是一种特殊类型的任务,用于监控外部系统的状态变化。它们会持续检查某个条件是否满足,直到条件为真或超时。Sensors 是 Airflow 中实现任务依赖性和外部系统集成的重要工具。

Sensors 通常用于以下场景:

  • 等待某个文件在文件系统中出现。
  • 等待数据库中的某条记录被更新。
  • 等待某个 API 返回特定的响应。

通过使用 Sensors,你可以确保任务只在特定条件满足时才会执行,从而提高工作流的可靠性和灵活性。

Sensors 的工作原理

Sensors 的核心思想是轮询。它们会定期检查某个条件是否满足,如果条件满足,则任务成功完成;如果条件未满足,则任务会继续等待,直到超时。

在 Airflow 中,Sensors 继承自 BaseSensorOperator 类。每个 Sensor 都需要实现 poke 方法,该方法定义了如何检查条件是否满足。

代码示例

以下是一个简单的 Sensor 示例,它等待某个文件在指定路径下出现:

python
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
import os

class FileSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, filepath, *args, **kwargs):
super(FileSensor, self).__init__(*args, **kwargs)
self.filepath = filepath

def poke(self, context):
self.log.info(f"Checking if file exists: {self.filepath}")
return os.path.exists(self.filepath)

在这个示例中,FileSensor 会定期检查 filepath 指定的文件是否存在。如果文件存在,poke 方法返回 True,任务成功完成;否则,任务会继续等待。

Sensors 的类型

Airflow 提供了多种内置的 Sensors,用于监控不同类型的外部系统。以下是一些常见的 Sensors:

  1. FileSensor: 监控文件系统中的文件是否存在。
  2. SqlSensor: 监控数据库中的某条记录是否满足特定条件。
  3. HttpSensor: 监控某个 HTTP 端点是否返回预期的响应。
  4. ExternalTaskSensor: 监控另一个 DAG 中的任务是否完成。

实际案例

假设你有一个工作流,需要在某个文件上传到指定目录后开始处理。你可以使用 FileSensor 来监控文件的上传情况:

python
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime

default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}

with DAG('file_processing_dag', default_args=default_args, schedule_interval='@daily') as dag:
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/path/to/uploaded_file.txt',
poke_interval=60, # 每60秒检查一次
timeout=600, # 最多等待10分钟
)

process_file = DummyOperator(task_id='process_file')

wait_for_file >> process_file

在这个例子中,FileSensor 会每60秒检查一次文件是否上传,最多等待10分钟。如果文件在10分钟内上传成功,process_file 任务将会执行。

总结

Sensors 是 Apache Airflow 中用于监控外部系统状态变化的重要工具。通过使用 Sensors,你可以确保任务只在特定条件满足时才会执行,从而提高工作流的可靠性和灵活性。

在实际应用中,你可以根据需求选择合适的内置 Sensor,或者自定义 Sensor 来满足特定的监控需求。

附加资源

练习

  1. 创建一个自定义 Sensor,用于监控某个 API 的响应状态码是否为200。
  2. 使用 ExternalTaskSensor 监控另一个 DAG 中的任务是否完成,并在完成后触发当前 DAG 的任务。