Friday, November 18, 2022

DBT on Cloud Composer (Apache Airflow)

Introduction

Cloud Composer is managed service by Google Cloud Platform for Apache Airflow. DBT is an open source tool for transformation of Data in ETL

You can read more about DBT and DBT Core here  and Cloud Composer here 

There is no good document for DBT integration with Cloud Composer. I would like to share my learning for DBT integration with Cloud Composer.

Install DBT and Airflow-dbt PYPI Package

The airflow-dbt and dbt-bigquery pypi packages will install the required library for running DBT in Cloud Composer.

dbt-bigquery 1.1.0
airflow-dbt 0.4.0
You can install these packages from Terraform or Console using link


Click edit button and mention the above pypi dependency. 

DBT DAG

Once the dependencies are installed create below DAG as dp_obs.py

from datetime import datetime

from airflow import DAG
from airflow.contrib.operators import bigquery_operator
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

gcp_project_id = "my_gcp_project_id"

dag = DAG(
    dag_id='composer_demo_dag',
    start_date=datetime(2022, 11, 9),
    catchup=False,
    schedule_interval='0 2 * * *')


def print_hello():
    return "hello!"


print_hello = PythonOperator(
    task_id='print_hello',
    # python_callable param points to the function you want to run
    python_callable=print_hello,
    # dag param points to the DAG that this task is a part of
    dag=dag)


bq_task = bigquery_operator.BigQueryOperator(
    task_id='bq_task',
    sql="SELECT * from `{}.{}.{}` limit 20".format(gcp_project_id, 'aa', 'ds1'),
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    destination_dataset_table='{}.{}.{}'.format(gcp_project_id, 'aa', 'ds2'))

cli_command = BashOperator(
    task_id="bash_command",
    bash_command="dbt --version"
)

# Assign the order of the tasks in our DAG
print_hello >> bq_task >> cli_command

Upload DAG to Composer

You can upload DAG to Composer using gcloud or console.

 
gcloud composer environments storage dags import --environment  my-composer-envt-6f0d  --location us-central1 --source dp_obs.py

Airflow-UI Status

The DAG is deployed as shown below.



DAG executed successfully.




Let's check logs of the DAG


DBT and Cloud Composer are working good.

Airflow DBT Operators

For airflow-dbt operators to work we will have to create some file structure needed by operators. 



We can upload this file structure to GCS Bucket of cloud composer:

gsutil -m cp -R first_project gs://us-central1-my-composer-envt-6f0-35e280a2-bucket/dags

The DAGs folder on GCS Bucket will look like below

Drill Down of first_project folder on GCS.



DAG with Airflow-DBT Operators


I updated my DAG as below:

from datetime import datetime

from airflow import DAG
from airflow.contrib.operators import bigquery_operator
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow_dbt.operators.dbt_operator import (
    DbtSeedOperator,
    DbtRunOperator,
    DbtTestOperator
)

gcp_project_id = "my_gcp_project_id"

dag = DAG(
    dag_id='composer_demo_dag',
    start_date=datetime(2022, 11, 9),
    catchup=False,
    schedule_interval='0 2 * * *')


def print_hello():
    return "hello!"


print_hello = PythonOperator(
    task_id='print_hello',
    # python_callable param points to the function you want to run
    python_callable=print_hello,
    # dag param points to the DAG that this task is a part of
    dag=dag)


bq_task = bigquery_operator.BigQueryOperator(
    task_id='bq_task',
    sql="SELECT * from `{}.{}.{}` limit 20".format(gcp_project_id, 'aa', 'ds1'),
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    destination_dataset_table='{}.{}.{}'.format(gcp_project_id, 'aa', 'ds2'))

cli_command = BashOperator(
    task_id="bash_command",
    bash_command="dbt  run --project-dir /home/airflow/gcs/dags/first_project/ --profiles-dir /home/airflow/gcs/dags/first_project/"
)

dbt_seed = DbtSeedOperator(
    task_id='dbt_seed',
    profiles_dir='/home/airflow/gcs/dags/first_project/',
    dir='/home/airflow/gcs/dags/first_project/'
)

dbt_run = DbtRunOperator(
    task_id='dbt_run',
    profiles_dir='/home/airflow/gcs/dags/first_project/',
    dir='/home/airflow/gcs/dags/first_project/'
)

dbt_test = DbtTestOperator(
    task_id='dbt_test',
    profiles_dir='/home/airflow/gcs/dags/first_project/',
    dir='/home/airflow/gcs/dags/first_project/'
    # dag=dag,
    retries=0,  # Failing tests would fail the task, and we don't want Airflow to try again
)

# Assign the order of the tasks in our DAG
print_hello >> bq_task >> dbt_seed >> dbt_run >> dbt_test

I am using the Cloud Composer's Sevice Account to interact with BigQuery using gcloud_default_connection.

first_project:
  outputs:
    dev:
      dataset: aa
      job_execution_timeout_seconds: 300
      job_retries: 1
      location: US
      method: oauth
      priority: interactive
      project: my_gcp_project_id
      threads: 1
      type: bigquery
  target: dev

Testing DAG on Cloud Composer

The DAG run successfully. I wanted the last task to fail as it don't meet the Data Quality Check.

Logs of DbtTestOperator are below:


Please post your queries below.

Happy Coding