跳到主要内容

Airflow Connections 类型

介绍

在 Apache Airflow 中,Connections 是用于管理与外部系统(如数据库、API、云服务等)的连接信息的核心组件。通过 Connections,Airflow 可以安全地存储和访问这些系统的凭据和配置,而无需在代码中硬编码敏感信息。

Connections 类型定义了 Airflow 如何与特定的外部系统进行交互。每种类型都有其独特的配置参数,以适应不同系统的需求。本文将详细介绍常见的 Connections 类型,并通过实际案例展示其应用。

常见的 Connections 类型

Airflow 支持多种 Connections 类型,以下是一些常见的类型及其用途:

  1. Postgres: 用于连接 PostgreSQL 数据库。
  2. MySQL: 用于连接 MySQL 数据库。
  3. HTTP: 用于与 HTTP API 进行交互。
  4. Google Cloud: 用于连接 Google Cloud 服务(如 BigQuery、Cloud Storage 等)。
  5. AWS: 用于连接 Amazon Web Services(如 S3、Redshift 等)。
  6. SSH: 用于通过 SSH 连接到远程服务器。
  7. FTP: 用于与 FTP 服务器进行文件传输。

配置 Connections

在 Airflow 中,Connections 可以通过以下方式配置:

  1. Airflow UI: 通过 Airflow 的 Web 界面添加和管理 Connections。
  2. 环境变量: 通过设置环境变量来定义 Connections。
  3. 代码: 在 DAG 文件中直接定义 Connections。

通过 Airflow UI 配置 Connections

  1. 登录 Airflow Web 界面。
  2. 导航到 Admin > Connections
  3. 点击 Create 按钮。
  4. 选择 Connection 类型,并填写相关参数(如主机、端口、用户名、密码等)。
  5. 点击 Save 保存配置。

通过环境变量配置 Connections

Airflow 允许通过环境变量来定义 Connections。环境变量的命名规则为:

AIRFLOW_CONN_{CONN_ID}=conn_type://login:password@host:port

例如,配置一个 PostgreSQL 连接:

bash
export AIRFLOW_CONN_MY_POSTGRES_CONN=postgres://user:password@localhost:5432/mydatabase

通过代码配置 Connections

在 DAG 文件中,可以通过 BaseHook 类来创建和管理 Connections。以下是一个示例:

python
from airflow.hooks.base import BaseHook

# 创建一个新的 Connection
new_conn = BaseHook.get_connection('my_new_conn')
new_conn.set(
conn_id='my_new_conn',
conn_type='postgres',
host='localhost',
login='user',
password='password',
port=5432,
schema='mydatabase'
)

实际案例

案例 1: 使用 PostgreSQL Connection 查询数据

假设我们有一个 PostgreSQL 数据库,存储了销售数据。我们可以使用 Airflow 的 PostgreSQL Connection 来查询数据并生成报告。

python
from airflow import DAG
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def query_sales_data():
hook = PostgresHook(postgres_conn_id='my_postgres_conn')
sql = "SELECT * FROM sales WHERE sale_date >= '2023-01-01';"
df = hook.get_pandas_df(sql)
print(df)

default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}

with DAG('sales_report', default_args=default_args, schedule_interval='@daily') as dag:
query_task = PythonOperator(
task_id='query_sales_data',
python_callable=query_sales_data
)

query_task

案例 2: 使用 HTTP Connection 调用 API

假设我们需要调用一个外部 API 来获取天气数据。我们可以使用 Airflow 的 HTTP Connection 来完成此任务。

python
from airflow import DAG
from airflow.providers.http.hooks.http import HttpHook
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def get_weather_data():
hook = HttpHook(method='GET', http_conn_id='my_http_conn')
response = hook.run('/weather', headers={'Authorization': 'Bearer my_token'})
print(response.json())

default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}

with DAG('weather_data', default_args=default_args, schedule_interval='@daily') as dag:
weather_task = PythonOperator(
task_id='get_weather_data',
python_callable=get_weather_data
)

weather_task

总结

Airflow Connections 是管理和使用外部系统连接信息的关键组件。通过 Connections,我们可以安全地存储和访问敏感信息,并在 DAG 中轻松地使用这些连接。本文介绍了常见的 Connections 类型,并通过实际案例展示了其应用。

提示

建议初学者通过 Airflow UI 来配置和管理 Connections,这样可以更直观地理解其工作原理。

附加资源

练习

  1. 在 Airflow UI 中创建一个新的 PostgreSQL Connection,并尝试在 DAG 中使用它查询数据。
  2. 使用环境变量配置一个 HTTP Connection,并调用一个公开的 API(如 OpenWeatherMap)。
  3. 尝试通过代码创建一个新的 Connection,并在 DAG 中使用它。

通过以上练习,您将更深入地理解 Airflow Connections 的使用方法。