Airflow Connections 加密
介绍
在 Apache Airflow 中,Connections 是用于存储和管理外部系统连接信息的核心组件。这些信息通常包括数据库连接字符串、API 密钥、用户名和密码等敏感数据。为了保护这些敏感信息,Airflow 提供了加密功能,确保 Connections 中的密码和其他机密数据在存储时是加密的。
本文将详细介绍如何在 Airflow 中启用和使用 Connections 加密功能,并通过实际案例展示其应用场景。
启用 Connections 加密
要启用 Connections 加密,首先需要在 Airflow 的配置文件中设置加密密钥。Airflow 使用 Fernet 对称加密算法来加密和解密敏感数据。
步骤 1: 生成 Fernet 密钥
Fernet 密钥是一个 32 字节的 base64 编码字符串。你可以使用 Python 的 cryptography
库来生成一个 Fernet 密钥:
from cryptography.fernet import Fernet
key = Fernet.generate_key()
print(key.decode())
输出示例:
your_generated_fernet_key
步骤 2: 配置 Airflow
将生成的 Fernet 密钥添加到 Airflow 的配置文件 airflow.cfg
中:
[core]
fernet_key = your_generated_fernet_key
确保 fernet_key
的值是你生成的密钥。
步骤 3: 重启 Airflow
配置完成后,重启 Airflow 以应用更改。现在,Airflow 将使用该密钥来加密和解密 Connections 中的敏感信息。
加密 Connections 中的密码
在 Airflow 中,当你创建一个新的 Connection 时,密码字段会自动加密。以下是一个创建加密 Connection 的示例:
from airflow.models import Connection
from airflow.utils.db import provide_session
@provide_session
def create_encrypted_connection(session=None):
conn = Connection(
conn_id='my_encrypted_connection',
conn_type='mysql',
host='localhost',
login='myuser',
password='mypassword',
port=3306
)
session.add(conn)
session.commit()
create_encrypted_connection()
在这个示例中,password
字段在存储到数据库之前会被自动加密。
解密 Connections 中的密码
当你需要从 Airflow 中获取 Connection 信息时,密码字段会自动解密。以下是一个获取并解密 Connection 的示例:
from airflow.models import Connection
from airflow.utils.db import provide_session
@provide_session
def get_encrypted_connection(session=None):
conn = session.query(Connection).filter(Connection.conn_id == 'my_encrypted_connection').first()
print(f"Decrypted password: {conn.password}")
get_encrypted_connection()
输出示例:
Decrypted password: mypassword
实际应用场景
场景 1: 保护数据库连接信息
假设你有一个 Airflow DAG,需要连接到 MySQL 数据库来执行 ETL 任务。为了保护数据库的连接信息,你可以使用加密的 Connection 来存储数据库的用户名和密码。
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}
dag = DAG(
'mysql_etl_dag',
default_args=default_args,
schedule_interval='@daily',
)
extract_data = MySqlOperator(
task_id='extract_data',
mysql_conn_id='my_encrypted_connection',
sql='SELECT * FROM my_table',
dag=dag,
)
extract_data
在这个场景中,mysql_conn_id
指向一个加密的 Connection,确保数据库的敏感信息在存储和传输过程中是安全的。
场景 2: 保护 API 密钥
如果你需要在 Airflow 中调用外部 API,并且 API 密钥是敏感信息,你可以使用加密的 Connection 来存储 API 密钥。
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}
dag = DAG(
'api_call_dag',
default_args=default_args,
schedule_interval='@daily',
)
call_api = SimpleHttpOperator(
task_id='call_api',
http_conn_id='my_encrypted_api_connection',
endpoint='/api/data',
method='GET',
dag=dag,
)
call_api
在这个场景中,http_conn_id
指向一个加密的 Connection,确保 API 密钥在存储和传输过程中是安全的。
总结
通过启用 Airflow 的 Connections 加密功能,你可以有效地保护敏感信息,如数据库密码和 API 密钥。本文详细介绍了如何生成 Fernet 密钥、配置 Airflow 以及在实际应用中使用加密的 Connections。
附加资源
练习
- 生成一个新的 Fernet 密钥,并将其配置到你的 Airflow 环境中。
- 创建一个加密的 Connection,并尝试在 DAG 中使用它。
- 编写一个脚本,从 Airflow 中获取并解密一个 Connection 的密码。
通过完成这些练习,你将更好地理解 Airflow Connections 加密的工作原理及其在实际中的应用。