Introduction
Cloud Composer is managed service by Google Cloud Platform for Apache Airflow. DBT is an open source tool for transformation of Data in ETL.
You can read more about DBT and DBT Core here and Cloud Composer here
There is no good document for DBT integration with Cloud Composer. I would like to share my learning for DBT integration with Cloud Composer.
Install DBT and Airflow-dbt PYPI Package
The airflow-dbt and dbt-bigquery pypi packages will install the required library for running DBT in Cloud Composer.
dbt-bigquery 1.1.0
airflow-dbt 0.4.0
DBT DAG
from datetime import datetime
from airflow import DAG
from airflow.contrib.operators import bigquery_operator
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
gcp_project_id = "my_gcp_project_id"
dag = DAG(
dag_id='composer_demo_dag',
start_date=datetime(2022, 11, 9),
catchup=False,
schedule_interval='0 2 * * *')
def print_hello():
return "hello!"
print_hello = PythonOperator(
task_id='print_hello',
# python_callable param points to the function you want to run
python_callable=print_hello,
# dag param points to the DAG that this task is a part of
dag=dag)
bq_task = bigquery_operator.BigQueryOperator(
task_id='bq_task',
sql="SELECT * from `{}.{}.{}` limit 20".format(gcp_project_id, 'aa', 'ds1'),
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
destination_dataset_table='{}.{}.{}'.format(gcp_project_id, 'aa', 'ds2'))
cli_command = BashOperator(
task_id="bash_command",
bash_command="dbt --version"
)
# Assign the order of the tasks in our DAG
print_hello >> bq_task >> cli_command
Upload DAG to Composer
gcloud composer environments storage dags import --environment my-composer-envt-6f0d --location us-central1 --source dp_obs.py
Airflow-UI Status
Airflow DBT Operators
gsutil -m cp -R first_project gs://us-central1-my-composer-envt-6f0-35e280a2-bucket/dags
DAG with Airflow-DBT Operators
from datetime import datetime
from airflow import DAG
from airflow.contrib.operators import bigquery_operator
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow_dbt.operators.dbt_operator import (
DbtSeedOperator,
DbtRunOperator,
DbtTestOperator
)
gcp_project_id = "my_gcp_project_id"
dag = DAG(
dag_id='composer_demo_dag',
start_date=datetime(2022, 11, 9),
catchup=False,
schedule_interval='0 2 * * *')
def print_hello():
return "hello!"
print_hello = PythonOperator(
task_id='print_hello',
# python_callable param points to the function you want to run
python_callable=print_hello,
# dag param points to the DAG that this task is a part of
dag=dag)
bq_task = bigquery_operator.BigQueryOperator(
task_id='bq_task',
sql="SELECT * from `{}.{}.{}` limit 20".format(gcp_project_id, 'aa', 'ds1'),
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
destination_dataset_table='{}.{}.{}'.format(gcp_project_id, 'aa', 'ds2'))
cli_command = BashOperator(
task_id="bash_command",
bash_command="dbt run --project-dir /home/airflow/gcs/dags/first_project/ --profiles-dir /home/airflow/gcs/dags/first_project/"
)
dbt_seed = DbtSeedOperator(
task_id='dbt_seed',
profiles_dir='/home/airflow/gcs/dags/first_project/',
dir='/home/airflow/gcs/dags/first_project/'
)
dbt_run = DbtRunOperator(
task_id='dbt_run',
profiles_dir='/home/airflow/gcs/dags/first_project/',
dir='/home/airflow/gcs/dags/first_project/'
)
dbt_test = DbtTestOperator(
task_id='dbt_test',
profiles_dir='/home/airflow/gcs/dags/first_project/',
dir='/home/airflow/gcs/dags/first_project/'
# dag=dag,
retries=0, # Failing tests would fail the task, and we don't want Airflow to try again
)
# Assign the order of the tasks in our DAG
print_hello >> bq_task >> dbt_seed >> dbt_run >> dbt_test
first_project:
outputs:
dev:
dataset: aa
job_execution_timeout_seconds: 300
job_retries: 1
location: US
method: oauth
priority: interactive
project: my_gcp_project_id
threads: 1
type: bigquery
target: dev