Airflow Connections 配置
介绍
在 Apache Airflow 中,Connections 是用于管理与外部系统(如数据库、API、云服务等)的连接配置的核心组件。通过 Connections,Airflow 可以安全地存储和访问外部系统的凭据(如用户名、密码、API 密钥等),而无需将这些敏感信息硬编码到 DAG 或任务中。
Connections 的配置是 Airflow 工作流自动化的重要部分,尤其是在需要与多种外部服务交互的场景中。本文将详细介绍如何配置和管理 Airflow Connections,并通过实际案例展示其应用。
什么是 Airflow Connections?
Airflow Connections 是一种存储外部系统连接信息的机制。每个 Connection 包含以下关键信息:
- Conn Id:连接的唯一标识符。
- Conn Type:连接类型(如 MySQL、PostgreSQL、HTTP、AWS 等)。
- Host:目标系统的主机地址。
- Login:用户名或 API 密钥。
- Password:密码或 API 密钥。
- Port:目标系统的端口号(如果适用)。
- Extra:其他配置参数(JSON 格式)。
Connections 可以通过 Airflow 的 Web UI、CLI 或环境变量进行配置和管理。
配置 Airflow Connections
1. 通过 Web UI 配置
Airflow 提供了一个直观的 Web UI 来管理 Connections。以下是配置步骤:
- 登录 Airflow Web UI。
- 导航到 Admin > Connections。
- 点击 Create 按钮。
- 填写以下字段:
- Conn Id:唯一标识符(如
my_db_connection
)。 - Conn Type:选择连接类型(如
MySQL
)。 - Host:目标系统的主机地址(如
localhost
)。 - Login:用户名(如
root
)。 - Password:密码(如
password
)。 - Port:端口号(如
3306
)。 - Extra:其他配置参数(如
{"charset": "utf8"}
)。
- Conn Id:唯一标识符(如
- 点击 Save 保存配置。
通过 Web UI 配置 Connections 是最直观的方式,适合初学者快速上手。
2. 通过环境变量配置
Airflow 还支持通过环境变量配置 Connections。环境变量的命名规则为:
AIRFLOW_CONN_{CONN_ID}=<connection_uri>
其中,{CONN_ID}
是连接的唯一标识符,<connection_uri>
是连接 URI,格式如下:
<conn_type>://<login>:<password>@<host>:<port>/<schema>?<extra_params>
例如,配置一个 MySQL 连接:
export AIRFLOW_CONN_MY_DB_CONNECTION="mysql://root:password@localhost:3306/mydb?charset=utf8"
确保环境变量中的敏感信息(如密码)不被泄露。
3. 通过 CLI 配置
Airflow CLI 提供了 connections
命令来管理 Connections。以下是一些常用命令:
-
添加 Connection:
bashairflow connections add my_db_connection --conn-type mysql --conn-host localhost --conn-login root --conn-password password --conn-port 3306
-
删除 Connection:
bashairflow connections delete my_db_connection
-
列出所有 Connections:
bashairflow connections list
在 DAG 中使用 Connections
配置好 Connections 后,可以在 DAG 中通过 BaseHook
或 Operator 使用它们。以下是一个使用 PostgresOperator
的示例:
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}
with DAG(
'example_dag',
default_args=default_args,
schedule_interval='@daily',
) as dag:
query_task = PostgresOperator(
task_id='run_query',
postgres_conn_id='my_db_connection', # 使用配置的 Connection
sql='SELECT * FROM my_table;',
)
确保在 DAG 中使用的 conn_id
与配置的 Connection 一致。
实际案例:连接 AWS S3
假设我们需要从 AWS S3 读取数据并将其加载到数据库中。以下是配置和使用 AWS S3 Connection 的步骤:
-
在 Airflow Web UI 中创建一个 AWS S3 Connection:
- Conn Id:
my_s3_connection
- Conn Type:
S3
- Extra:
{"aws_access_key_id": "your_access_key", "aws_secret_access_key": "your_secret_key"}
- Conn Id:
-
在 DAG 中使用
S3Hook
读取数据:
from airflow import DAG
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}
with DAG(
's3_example_dag',
default_args=default_args,
schedule_interval='@daily',
) as dag:
def read_from_s3():
s3_hook = S3Hook(aws_conn_id='my_s3_connection')
data = s3_hook.read_key(bucket_name='my_bucket', key='my_file.csv')
print(data)
read_task = PythonOperator(
task_id='read_from_s3',
python_callable=read_from_s3,
)
总结
Airflow Connections 是管理与外部系统连接的关键工具。通过 Web UI、环境变量或 CLI 配置 Connections,可以确保敏感信息的安全存储和便捷访问。在实际应用中,Connections 能够显著简化工作流的开发和维护。
附加资源与练习
- 官方文档:Airflow Connections
- 练习:尝试配置一个连接 Google Cloud Storage 的 Connection,并在 DAG 中使用它读取文件。
- 扩展阅读:学习如何使用 Airflow 的 Secrets Backend 进一步优化敏感信息的管理。
Happy coding! 🚀