Airflow 与GCS交互
Apache Airflow 是一个强大的工作流管理工具,广泛用于数据管道的编排和调度。Google Cloud Storage(GCS)是Google Cloud Platform(GCP)提供的一种对象存储服务,常用于存储和管理大规模数据。将Airflow与GCS集成,可以帮助您自动化数据存储、传输和处理任务。
本文将逐步介绍如何在Airflow中与GCS进行交互,包括如何上传、下载和管理GCS中的文件。我们还将通过实际案例展示这些操作的真实应用场景。
1. 准备工作
在开始之前,您需要完成以下准备工作:
- GCP项目:确保您已经创建了一个GCP项目,并启用了Google Cloud Storage API。
- 服务账户:创建一个GCP服务账户,并为其分配适当的权限(例如
Storage Admin
角色)。 - 密钥文件:下载服务账户的JSON密钥文件,并将其存储在安全的位置。
- Airflow环境:确保您的Airflow环境已经安装并配置好。
2. 安装依赖
为了在Airflow中与GCS交互,您需要安装apache-airflow-providers-google
包。该包提供了与GCP服务集成的操作符和钩子。
pip install apache-airflow-providers-google
3. 配置GCP连接
在Airflow中,您需要配置一个GCP连接,以便与GCS进行交互。您可以通过Airflow的Web UI或直接编辑airflow.cfg
文件来完成此操作。
通过Web UI配置
- 登录Airflow Web UI。
- 导航到
Admin
>Connections
。 - 点击
+
按钮创建一个新连接。 - 填写以下字段:
- Conn Id:
google_cloud_default
- Conn Type:
Google Cloud
- Keyfile Path: 填写服务账户密钥文件的路径(例如
/path/to/your/keyfile.json
)
- Conn Id:
通过配置文件配置
您也可以在airflow.cfg
文件中直接配置GCP连接:
[connections]
google_cloud_default = google-cloud-platform://?extra__google_cloud_platform__key_path=/path/to/your/keyfile.json
4. 使用GCSHook与GCS交互
Airflow提供了GCSHook
,这是一个用于与GCS交互的钩子。您可以使用它来执行各种操作,例如上传、下载和删除文件。
上传文件到GCS
以下是一个使用GCSHook
上传文件到GCS的示例:
from airflow import DAG
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def upload_to_gcs():
hook = GCSHook()
bucket_name = 'your-bucket-name'
object_name = 'your-object-name'
filename = '/path/to/your/local/file.txt'
hook.upload(bucket_name, object_name, filename)
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}
with DAG('gcs_upload_example', default_args=default_args, schedule_interval=None) as dag:
upload_task = PythonOperator(
task_id='upload_to_gcs',
python_callable=upload_to_gcs,
)
从GCS下载文件
以下是一个使用GCSHook
从GCS下载文件的示例:
def download_from_gcs():
hook = GCSHook()
bucket_name = 'your-bucket-name'
object_name = 'your-object-name'
filename = '/path/to/your/local/destination/file.txt'
hook.download(bucket_name, object_name, filename)
with DAG('gcs_download_example', default_args=default_args, schedule_interval=None) as dag:
download_task = PythonOperator(
task_id='download_from_gcs',
python_callable=download_from_gcs,
)