Introduction
Cloud Composer is managed service by Google Cloud Platform for Apache Airflow. DBT is an open source tool for transformation of Data in ETL.
You can read more about DBT and DBT Core here and Cloud Composer here
There is no good document for DBT integration with Cloud Composer. I would like to share my learning for DBT integration with Cloud Composer.
Install DBT and Airflow-dbt PYPI Package
The airflow-dbt and dbt-bigquery pypi packages will install the required library for running DBT in Cloud Composer.
dbt-bigquery 1.1.0
airflow-dbt 0.4.0
You can install these packages from Terraform or Console using link
Click edit button and mention the above pypi dependency.
DBT DAG
Once the dependencies are installed create below DAG as dp_obs.py
from datetime import datetime
from airflow import DAG
from airflow.contrib.operators import bigquery_operator
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
gcp_project_id = "my_gcp_project_id"
dag = DAG(
dag_id='composer_demo_dag',
start_date=datetime(2022, 11, 9),
catchup=False,
schedule_interval='0 2 * * *')
def print_hello():
return "hello!"
print_hello = PythonOperator(
task_id='print_hello',
# python_callable param points to the function you want to run
python_callable=print_hello,
# dag param points to the DAG that this task is a part of
dag=dag)
bq_task = bigquery_operator.BigQueryOperator(
task_id='bq_task',
sql="SELECT * from `{}.{}.{}` limit 20".format(gcp_project_id, 'aa', 'ds1'),
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
destination_dataset_table='{}.{}.{}'.format(gcp_project_id, 'aa', 'ds2'))
cli_command = BashOperator(
task_id="bash_command",
bash_command="dbt --version"
)
# Assign the order of the tasks in our DAG
print_hello >> bq_task >> cli_command
Upload DAG to Composer
You can upload DAG to Composer using gcloud or console.
gcloud composer environments storage dags import --environment my-composer-envt-6f0d --location us-central1 --source dp_obs.py
Airflow-UI Status
The DAG is deployed as shown below.
DAG executed successfully.
Let's check logs of the DAG
DBT and Cloud Composer are working good.
Airflow DBT Operators
For airflow-dbt operators to work we will have to create some file structure needed by operators.
We can upload this file structure to GCS Bucket of cloud composer:
gsutil -m cp -R first_project gs://us-central1-my-composer-envt-6f0-35e280a2-bucket/dags
The DAGs folder on GCS Bucket will look like below
DAG with Airflow-DBT Operators
I updated my DAG as below:
from datetime import datetime
from airflow import DAG
from airflow.contrib.operators import bigquery_operator
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow_dbt.operators.dbt_operator import (
DbtSeedOperator,
DbtRunOperator,
DbtTestOperator
)
gcp_project_id = "my_gcp_project_id"
dag = DAG(
dag_id='composer_demo_dag',
start_date=datetime(2022, 11, 9),
catchup=False,
schedule_interval='0 2 * * *')
def print_hello():
return "hello!"
print_hello = PythonOperator(
task_id='print_hello',
# python_callable param points to the function you want to run
python_callable=print_hello,
# dag param points to the DAG that this task is a part of
dag=dag)
bq_task = bigquery_operator.BigQueryOperator(
task_id='bq_task',
sql="SELECT * from `{}.{}.{}` limit 20".format(gcp_project_id, 'aa', 'ds1'),
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
destination_dataset_table='{}.{}.{}'.format(gcp_project_id, 'aa', 'ds2'))
cli_command = BashOperator(
task_id="bash_command",
bash_command="dbt run --project-dir /home/airflow/gcs/dags/first_project/ --profiles-dir /home/airflow/gcs/dags/first_project/"
)
dbt_seed = DbtSeedOperator(
task_id='dbt_seed',
profiles_dir='/home/airflow/gcs/dags/first_project/',
dir='/home/airflow/gcs/dags/first_project/'
)
dbt_run = DbtRunOperator(
task_id='dbt_run',
profiles_dir='/home/airflow/gcs/dags/first_project/',
dir='/home/airflow/gcs/dags/first_project/'
)
dbt_test = DbtTestOperator(
task_id='dbt_test',
profiles_dir='/home/airflow/gcs/dags/first_project/',
dir='/home/airflow/gcs/dags/first_project/'
# dag=dag,
retries=0, # Failing tests would fail the task, and we don't want Airflow to try again
)
# Assign the order of the tasks in our DAG
print_hello >> bq_task >> dbt_seed >> dbt_run >> dbt_test
I am using the Cloud Composer's Sevice Account to interact with BigQuery using gcloud_default_connection.
first_project:
outputs:
dev:
dataset: aa
job_execution_timeout_seconds: 300
job_retries: 1
location: US
method: oauth
priority: interactive
project: my_gcp_project_id
threads: 1
type: bigquery
target: dev
Testing DAG on Cloud Composer
The DAG run successfully. I wanted the last task to fail as it don't meet the Data Quality Check.
Please post your queries below.
Happy Coding