跳到主要内容

Airflow Connections 加密

介绍

在 Apache Airflow 中,Connections 是用于存储和管理外部系统连接信息的核心组件。这些信息通常包括数据库连接字符串、API 密钥、用户名和密码等敏感数据。为了保护这些敏感信息,Airflow 提供了加密功能,确保 Connections 中的密码和其他机密数据在存储时是加密的。

本文将详细介绍如何在 Airflow 中启用和使用 Connections 加密功能,并通过实际案例展示其应用场景。

启用 Connections 加密

要启用 Connections 加密,首先需要在 Airflow 的配置文件中设置加密密钥。Airflow 使用 Fernet 对称加密算法来加密和解密敏感数据。

步骤 1: 生成 Fernet 密钥

Fernet 密钥是一个 32 字节的 base64 编码字符串。你可以使用 Python 的 cryptography 库来生成一个 Fernet 密钥:

python
from cryptography.fernet import Fernet
key = Fernet.generate_key()
print(key.decode())

输出示例:

your_generated_fernet_key

步骤 2: 配置 Airflow

将生成的 Fernet 密钥添加到 Airflow 的配置文件 airflow.cfg 中:

ini
[core]
fernet_key = your_generated_fernet_key

确保 fernet_key 的值是你生成的密钥。

步骤 3: 重启 Airflow

配置完成后,重启 Airflow 以应用更改。现在,Airflow 将使用该密钥来加密和解密 Connections 中的敏感信息。

加密 Connections 中的密码

在 Airflow 中,当你创建一个新的 Connection 时,密码字段会自动加密。以下是一个创建加密 Connection 的示例:

python
from airflow.models import Connection
from airflow.utils.db import provide_session

@provide_session
def create_encrypted_connection(session=None):
conn = Connection(
conn_id='my_encrypted_connection',
conn_type='mysql',
host='localhost',
login='myuser',
password='mypassword',
port=3306
)
session.add(conn)
session.commit()

create_encrypted_connection()

在这个示例中,password 字段在存储到数据库之前会被自动加密。

解密 Connections 中的密码

当你需要从 Airflow 中获取 Connection 信息时,密码字段会自动解密。以下是一个获取并解密 Connection 的示例:

python
from airflow.models import Connection
from airflow.utils.db import provide_session

@provide_session
def get_encrypted_connection(session=None):
conn = session.query(Connection).filter(Connection.conn_id == 'my_encrypted_connection').first()
print(f"Decrypted password: {conn.password}")

get_encrypted_connection()

输出示例:

Decrypted password: mypassword

实际应用场景

场景 1: 保护数据库连接信息

假设你有一个 Airflow DAG,需要连接到 MySQL 数据库来执行 ETL 任务。为了保护数据库的连接信息,你可以使用加密的 Connection 来存储数据库的用户名和密码。

python
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.utils.dates import days_ago

default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}

dag = DAG(
'mysql_etl_dag',
default_args=default_args,
schedule_interval='@daily',
)

extract_data = MySqlOperator(
task_id='extract_data',
mysql_conn_id='my_encrypted_connection',
sql='SELECT * FROM my_table',
dag=dag,
)

extract_data

在这个场景中,mysql_conn_id 指向一个加密的 Connection,确保数据库的敏感信息在存储和传输过程中是安全的。

场景 2: 保护 API 密钥

如果你需要在 Airflow 中调用外部 API,并且 API 密钥是敏感信息,你可以使用加密的 Connection 来存储 API 密钥。

python
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.utils.dates import days_ago

default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}

dag = DAG(
'api_call_dag',
default_args=default_args,
schedule_interval='@daily',
)

call_api = SimpleHttpOperator(
task_id='call_api',
http_conn_id='my_encrypted_api_connection',
endpoint='/api/data',
method='GET',
dag=dag,
)

call_api

在这个场景中,http_conn_id 指向一个加密的 Connection,确保 API 密钥在存储和传输过程中是安全的。

总结

通过启用 Airflow 的 Connections 加密功能,你可以有效地保护敏感信息,如数据库密码和 API 密钥。本文详细介绍了如何生成 Fernet 密钥、配置 Airflow 以及在实际应用中使用加密的 Connections。

附加资源

练习

  1. 生成一个新的 Fernet 密钥,并将其配置到你的 Airflow 环境中。
  2. 创建一个加密的 Connection,并尝试在 DAG 中使用它。
  3. 编写一个脚本,从 Airflow 中获取并解密一个 Connection 的密码。

通过完成这些练习,你将更好地理解 Airflow Connections 加密的工作原理及其在实际中的应用。