Airflow 与BigQuery交互
Apache Airflow 是一个强大的工作流管理工具,广泛用于数据管道的编排和调度。Google BigQuery 是 Google Cloud 提供的一个完全托管的、无服务器的大数据分析平台。将 Airflow 与 BigQuery 集成,可以帮助你自动化数据管道的各个阶段,从数据提取到转换和加载(ETL)。
本文将逐步介绍如何在 Airflow 中与 BigQuery 进行交互,包括设置、任务定义以及实际应用场景。
1. 环境准备
在开始之前,确保你已经完成以下准备工作:
- Google Cloud 项目:你需要一个 Google Cloud 项目,并启用 BigQuery API。
- 服务账户:创建一个服务账户,并为其分配适当的权限(例如
BigQuery Admin
角色)。 - Airflow 环境:确保你已经安装并配置了 Apache Airflow,并且可以访问 Google Cloud。
2. 安装必要的依赖
为了在 Airflow 中与 BigQuery 交互,你需要安装 apache-airflow-providers-google
包。这个包包含了与 Google Cloud 服务(包括 BigQuery)交互所需的操作符和钩子。
pip install apache-airflow-providers-google
3. 配置 Airflow 连接
在 Airflow 中,你需要配置一个连接来访问 BigQuery。你可以通过 Airflow 的 UI 或直接编辑 airflow.cfg
文件来完成此操作。
通过 UI 配置连接
- 登录 Airflow Web UI。
- 导航到 Admin > Connections。
- 点击 Create 按钮。
- 填写以下信息:
- Conn Id:
google_cloud_default
- Conn Type:
Google Cloud
- Project Id: 你的 Google Cloud 项目 ID
- Keyfile Path: 服务账户密钥文件的路径
- Conn Id:
通过配置文件配置连接
你也可以在 airflow.cfg
文件中添加以下内容:
[connections]
google_cloud_default = google-cloud-platform://?extra__google_cloud_platform__key_path=/path/to/your/service-account-key.json
4. 定义 Airflow DAG
接下来,我们将定义一个简单的 DAG,用于从 BigQuery 中查询数据并将结果存储到另一个表中。
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
from airflow.utils.dates import days_ago
default_args = {
'start_date': days_ago(1),
}
with DAG('bigquery_example', default_args=default_args, schedule_interval='@daily') as dag:
query_job = BigQueryExecuteQueryOperator(
task_id='run_query',
sql='SELECT * FROM `your_project.your_dataset.your_table`',
destination_dataset_table='your_project.your_dataset.your_new_table',
write_disposition='WRITE_TRUNCATE',
use_legacy_sql=False,
)
代码解释
- BigQueryExecuteQueryOperator: 这个操作符用于在 BigQuery 中执行 SQL 查询。你可以指定查询语句、目标表以及写入模式。
- sql: 要执行的 SQL 查询。
- destination_dataset_table: 查询结果存储的目标表。
- write_disposition: 写入模式,
WRITE_TRUNCATE
表示如果目标表已存在,则覆盖它。
5. 实际应用场景
假设你有一个包含销售数据的 BigQuery 表,你希望每天计算当天的总销售额,并将结果存储到另一个表中。你可以使用以下 DAG 来实现这一需求:
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
from airflow.utils.dates import days_ago
default_args = {
'start_date': days_ago(1),
}
with DAG('daily_sales_summary', default_args=default_args, schedule_interval='@daily') as dag:
calculate_daily_sales = BigQueryExecuteQueryOperator(
task_id='calculate_daily_sales',
sql='''
SELECT
DATE(timestamp) AS sale_date,
SUM(amount) AS total_sales
FROM
`your_project.your_dataset.sales`
WHERE
DATE(timestamp) = "{{ ds }}"
GROUP BY
sale_date
''',
destination_dataset_table='your_project.your_dataset.daily_sales_summary',
write_disposition='WRITE_APPEND',
use_legacy_sql=False,
)
代码解释
- sql: 这个查询计算当天的总销售额,并按日期分组。
- destination_dataset_table: 结果存储到
daily_sales_summary
表中。 - write_disposition:
WRITE_APPEND
表示将结果追加到目标表中。
6. 总结
通过本文,你学习了如何在 Apache Airflow 中与 Google BigQuery 进行交互。我们介绍了如何配置 Airflow 连接、定义 DAG 以及使用 BigQueryExecuteQueryOperator
执行查询。我们还通过一个实际应用场景展示了如何自动化每日销售数据的汇总。
7. 附加资源与练习
- Google Cloud 文档: BigQuery 官方文档
- Airflow 文档: Airflow BigQuery 操作符
练习
- 修改示例 DAG,使其计算每周的销售总额,并将结果存储到一个新的表中。
- 尝试使用
BigQueryInsertJobOperator
来执行更复杂的查询任务。
如果你在练习中遇到问题,可以参考 Airflow 和 BigQuery 的官方文档,或者在社区论坛中寻求帮助。