跳到主要内容

Airflow Variables管理

在Apache Airflow中,Variables 是一种用于存储和传递配置数据的机制。它们允许你在DAG(有向无环图)中定义和使用全局变量,从而避免硬编码配置值。通过使用Variables,你可以更灵活地管理配置,并在不同的任务或DAG之间共享数据。

什么是Airflow Variables?

Airflow Variables是键值对,用于存储配置信息、环境变量或其他需要在DAG中使用的数据。它们可以存储在Airflow的元数据数据库中,并通过Airflow的UI或API进行管理。Variables的一个常见用途是存储API密钥、数据库连接字符串或其他敏感信息。

创建和更新Variables

通过Airflow UI创建Variables

  1. 登录到Airflow的Web UI。
  2. 导航到 Admin > Variables
  3. 点击 Create 按钮。
  4. 输入 KeyValue,然后点击 Save

通过命令行创建Variables

你可以使用Airflow的命令行工具来创建或更新Variables:

bash
airflow variables set my_key "my_value"

通过Python代码创建Variables

你也可以在Python代码中使用Airflow的API来创建或更新Variables:

python
from airflow.models import Variable

Variable.set("my_key", "my_value")

使用Variables

在DAG中使用Variables

在DAG中,你可以通过 Variable.get() 方法来获取Variables的值:

python
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def print_variable():
my_value = Variable.get("my_key")
print(f"The value of my_key is: {my_value}")

default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}

with DAG('example_dag', default_args=default_args, schedule_interval='@daily') as dag:
task = PythonOperator(
task_id='print_variable_task',
python_callable=print_variable,
)

在模板中使用Variables

你还可以在Jinja模板中使用Variables:

python
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}

with DAG('template_dag', default_args=default_args, schedule_interval='@daily') as dag:
task = BashOperator(
task_id='template_task',
bash_command='echo "The value of my_key is: {{ var.value.my_key }}"',
)

删除Variables

通过Airflow UI删除Variables

  1. 登录到Airflow的Web UI。
  2. 导航到 Admin > Variables
  3. 找到你要删除的Variable,点击 Delete 按钮。

通过命令行删除Variables

你可以使用Airflow的命令行工具来删除Variables:

bash
airflow variables delete my_key

通过Python代码删除Variables

你也可以在Python代码中使用Airflow的API来删除Variables:

python
from airflow.models import Variable

Variable.delete("my_key")

实际案例

假设你正在开发一个DAG,该DAG需要从多个API端点获取数据。每个API端点都有一个不同的API密钥。为了避免在代码中硬编码这些密钥,你可以使用Variables来存储它们。

python
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def fetch_data_from_api(api_key):
# 模拟从API获取数据的操作
print(f"Fetching data using API key: {api_key}")

def fetch_data_task():
api_key = Variable.get("api_key_1")
fetch_data_from_api(api_key)

default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}

with DAG('api_data_fetch_dag', default_args=default_args, schedule_interval='@daily') as dag:
task = PythonOperator(
task_id='fetch_data_task',
python_callable=fetch_data_task,
)

在这个例子中,api_key_1 是一个Variable,它存储了第一个API端点的密钥。通过这种方式,你可以轻松地在不同的环境中切换API密钥,而无需修改代码。

总结

Airflow Variables提供了一种灵活的方式来管理配置数据,避免了在代码中硬编码敏感信息。通过Airflow UI、命令行或Python代码,你可以轻松地创建、更新、删除和使用Variables。在实际应用中,Variables可以用于存储API密钥、数据库连接字符串等配置信息,从而提高代码的可维护性和安全性。

附加资源

练习

  1. 创建一个名为 my_variable 的Variable,并在DAG中使用它。
  2. 尝试在Jinja模板中使用Variable,并在BashOperator中输出它的值。
  3. 删除一个现有的Variable,并验证它是否已被成功删除。

通过完成这些练习,你将更好地理解如何在Airflow中管理Variables。