Airflow Variables管理
在Apache Airflow中,Variables 是一种用于存储和传递配置数据的机制。它们允许你在DAG(有向无环图)中定义和使用全局变量,从而避免硬编码配置值。通过使用Variables,你可以更灵活地管理配置,并在不同的任务或DAG之间共享数据。
什么是Airflow Variables?
Airflow Variables是键值对,用于存储配置信息、环境变量或其他需要在DAG中使用的数据。它们可以存储在Airflow的元数据数据库中,并通过Airflow的UI或API进行管理。Variables的一个常见用途是存储API密钥、数据库连接字符串或其他敏感信息。
创建和更新Variables
通过Airflow UI创建Variables
- 登录到Airflow的Web UI。
- 导航到 Admin > Variables。
- 点击 Create 按钮。
- 输入 Key 和 Value,然后点击 Save。
通过命令行创建Variables
你可以使用Airflow的命令行工具来创建或更新Variables:
airflow variables set my_key "my_value"
通过Python代码创建Variables
你也可以在Python代码中使用Airflow的API来创建或更新Variables:
from airflow.models import Variable
Variable.set("my_key", "my_value")
使用Variables
在DAG中使用Variables
在DAG中,你可以通过 Variable.get()
方法来获取Variables的值:
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def print_variable():
my_value = Variable.get("my_key")
print(f"The value of my_key is: {my_value}")
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}
with DAG('example_dag', default_args=default_args, schedule_interval='@daily') as dag:
task = PythonOperator(
task_id='print_variable_task',
python_callable=print_variable,
)
在模板中使用Variables
你还可以在Jinja模板中使用Variables:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}
with DAG('template_dag', default_args=default_args, schedule_interval='@daily') as dag:
task = BashOperator(
task_id='template_task',
bash_command='echo "The value of my_key is: {{ var.value.my_key }}"',
)
删除Variables
通过Airflow UI删除Variables
- 登录到Airflow的Web UI。
- 导航到 Admin > Variables。
- 找到你要删除的Variable,点击 Delete 按钮。
通过命令行删除Variables
你可以使用Airflow的命令行工具来删除Variables:
airflow variables delete my_key
通过Python代码删除Variables
你也可以在Python代码中使用Airflow的API来删除Variables:
from airflow.models import Variable
Variable.delete("my_key")
实际案例
假设你正在开发一个DAG,该DAG需要从多个API端点获取数据。每个API端点都有一个不同的API密钥。为了避免在代码中硬编码这些密钥,你可以使用Variables来存储它们。
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def fetch_data_from_api(api_key):
# 模拟从API获取数据的操作
print(f"Fetching data using API key: {api_key}")
def fetch_data_task():
api_key = Variable.get("api_key_1")
fetch_data_from_api(api_key)
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}
with DAG('api_data_fetch_dag', default_args=default_args, schedule_interval='@daily') as dag:
task = PythonOperator(
task_id='fetch_data_task',
python_callable=fetch_data_task,
)
在这个例子中,api_key_1
是一个Variable,它存储了第一个API端点的密钥。通过这种方式,你可以轻松地在不同的环境中切换API密钥,而无需修改代码。
总结
Airflow Variables提供了一种灵活的方式来管理配置数据,避免了在代码中硬编码敏感信息。通过Airflow UI、命令行或Python代码,你可以轻松地创建、更新、删除和使用Variables。在实际应用中,Variables可以用于存储API密钥、数据库连接字符串等配置信息,从而提高代码的可维护性和安全性。
附加资源
练习
- 创建一个名为
my_variable
的Variable,并在DAG中使用它。 - 尝试在Jinja模板中使用Variable,并在BashOperator中输出它的值。
- 删除一个现有的Variable,并验证它是否已被成功删除。
通过完成这些练习,你将更好地理解如何在Airflow中管理Variables。