跳到主要内容

Airflow Connections 概念

什么是 Airflow Connections?

在 Apache Airflow 中,Connections 是一种用于管理与外部系统(如数据库、API、云服务等)连接的方式。通过 Connections,Airflow 可以安全地存储和访问这些外部系统的认证信息(如用户名、密码、API 密钥等),而无需将这些敏感信息硬编码到 DAG 或任务中。

Connections 是 Airflow 的核心功能之一,它使得任务与外部系统的交互更加灵活和安全。无论是连接到 MySQL 数据库、调用 REST API,还是与 AWS S3 交互,Connections 都能帮助你轻松实现。


Connections 的基本结构

一个 Connection 通常包含以下信息:

  • Conn Id:连接的唯一标识符(例如 my_db_connection)。
  • Conn Type:连接类型(例如 mysqlhttps3 等)。
  • Host:目标系统的主机地址。
  • Login:用户名或 API 密钥。
  • Password:密码或密钥。
  • Port:端口号(如果需要)。
  • Extra:其他配置参数(JSON 格式)。
备注

Connections 的敏感信息(如密码)会被加密存储,确保安全性。


如何创建和管理 Connections

1. 通过 Airflow UI 创建 Connection

Airflow 提供了一个用户友好的界面来创建和管理 Connections。以下是步骤:

  1. 登录 Airflow Web UI。
  2. 导航到 Admin > Connections
  3. 点击 Create 按钮。
  4. 填写 Connection 的详细信息(如 Conn Id、Conn Type、Host 等)。
  5. 点击 Save 保存。

2. 通过环境变量创建 Connection

你也可以通过环境变量来定义 Connections。Airflow 会自动解析以 AIRFLOW_CONN_ 开头的环境变量。例如:

bash
export AIRFLOW_CONN_MY_DB_CONN="mysql://user:password@localhost:3306/mydatabase"

3. 通过代码创建 Connection

在 DAG 文件中,你可以使用 airflow.models.Connection 类来动态创建 Connection:

python
from airflow.models import Connection
from airflow import settings

# 创建一个新的 Connection
conn = Connection(
conn_id='my_db_connection',
conn_type='mysql',
host='localhost',
login='user',
password='password',
port=3306
)

# 将 Connection 添加到 Airflow 的元数据数据库
session = settings.Session()
session.add(conn)
session.commit()

如何在任务中使用 Connections

在任务中,你可以通过 BaseHook.get_connection() 方法获取 Connection 的详细信息,并使用它来与外部系统交互。以下是一个示例:

python
from airflow import DAG
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def query_database():
# 获取 Connection
mysql_hook = MySqlHook(mysql_conn_id='my_db_connection')
connection = mysql_hook.get_conn()

# 执行查询
cursor = connection.cursor()
cursor.execute("SELECT * FROM my_table")
results = cursor.fetchall()
for row in results:
print(row)

# 定义 DAG
dag = DAG(
'example_dag',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily'
)

# 定义任务
task = PythonOperator(
task_id='query_database',
python_callable=query_database,
dag=dag
)

实际应用场景

场景 1:连接到 MySQL 数据库

假设你需要从 MySQL 数据库中提取数据并存储到 S3 中。你可以创建一个 MySQL Connection,然后在任务中使用它来执行查询并将结果上传到 S3。

场景 2:调用 REST API

如果你需要调用一个受保护的 REST API,可以创建一个 HTTP Connection,存储 API 的 URL 和认证信息,然后在任务中使用它来发送请求。


总结

Airflow Connections 是管理外部系统连接的核心工具。通过 Connections,你可以安全地存储和访问认证信息,避免硬编码敏感数据。无论是通过 UI、环境变量还是代码,创建和管理 Connections 都非常简单。


附加资源与练习

  • 官方文档Airflow Connections
  • 练习
    1. 创建一个 MySQL Connection,并在任务中执行一个简单的查询。
    2. 尝试通过环境变量定义一个 HTTP Connection,并在任务中调用一个 REST API。

通过实践,你将更好地掌握 Airflow Connections 的使用方法!