Airflow Variables基础
介绍
在Apache Airflow中,Variables 是一种用于存储和传递配置信息的机制。它们可以存储简单的键值对,例如API密钥、数据库连接字符串或其他需要在DAG(有向无环图)中共享的配置。通过使用Variables,你可以避免在代码中硬编码敏感信息或配置,从而提高代码的可维护性和安全性。
Variables 是全局的,可以在多个DAG之间共享。它们可以通过Airflow的Web界面、CLI或Python代码进行管理和访问。
创建和访问Variables
1. 通过Web界面创建Variables
在Airflow的Web界面中,你可以轻松地创建和管理Variables:
- 导航到 Admin > Variables。
- 点击 Create 按钮。
- 输入键(Key)和值(Value),然后保存。
例如,你可以创建一个名为 my_api_key
的Variable,并将其值设置为 12345
。
2. 通过CLI创建Variables
你也可以使用Airflow的CLI来创建Variables:
bash
airflow variables set my_api_key 12345
3. 通过Python代码创建Variables
在DAG文件中,你可以使用以下代码来创建或更新Variables:
python
from airflow.models import Variable
Variable.set("my_api_key", "12345")
访问Variables
在DAG中,你可以通过以下方式访问Variables:
python
from airflow.models import Variable
my_api_key = Variable.get("my_api_key")
print(f"My API Key is: {my_api_key}")
输出:
My API Key is: 12345
提示
如果你希望Variable不存在时返回默认值,可以使用 default_var
参数:
python
my_api_key = Variable.get("non_existent_key", default_var="default_value")
实际应用场景
场景1:动态配置API端点
假设你有一个DAG,需要调用多个API端点,而这些端点的URL可能会根据环境(开发、测试、生产)而变化。你可以将这些URL存储在Variables中,并在DAG中动态获取:
python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
from datetime import datetime
def call_api():
api_url = Variable.get("api_endpoint")
print(f"Calling API at: {api_url}")
default_args = {
'start_date': datetime(2023, 1, 1),
}
with DAG('dynamic_api_dag', default_args=default_args, schedule_interval='@daily') as dag:
task = PythonOperator(
task_id='call_api_task',
python_callable=call_api,
)
场景2:管理敏感信息
Variables 非常适合存储敏感信息,如数据库连接字符串或API密钥。通过将这些信息存储在Variables中,你可以避免在代码中硬编码敏感数据:
python
from airflow.models import Variable
db_connection_string = Variable.get("db_connection_string")
print(f"Connecting to database with: {db_connection_string}")
总结
Airflow Variables 是一个强大的工具,可以帮助你管理和共享配置信息,避免在代码中硬编码敏感数据。通过Web界面、CLI或Python代码,你可以轻松地创建、更新和访问Variables。在实际应用中,Variables 可以用于动态配置API端点、管理敏感信息等场景。
附加资源与练习
练习
- 创建一个名为
my_database_url
的Variable,并在DAG中使用它来模拟数据库连接。 - 尝试通过CLI删除一个Variable,并在DAG中处理Variable不存在的情况。