Friday, November 18, 2022

DBT on Cloud Composer (Apache Airflow)

Introduction

Cloud Composer is managed service by Google Cloud Platform for Apache Airflow. DBT is an open source tool for transformation of Data in ETL

You can read more about DBT and DBT Core here  and Cloud Composer here 

There is no good document for DBT integration with Cloud Composer. I would like to share my learning for DBT integration with Cloud Composer.

Install DBT and Airflow-dbt PYPI Package

The airflow-dbt and dbt-bigquery pypi packages will install the required library for running DBT in Cloud Composer.

dbt-bigquery 1.1.0
airflow-dbt 0.4.0
You can install these packages from Terraform or Console using link


Click edit button and mention the above pypi dependency. 

DBT DAG

Once the dependencies are installed create below DAG as dp_obs.py

from datetime import datetime

from airflow import DAG
from airflow.contrib.operators import bigquery_operator
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

gcp_project_id = "my_gcp_project_id"

dag = DAG(
    dag_id='composer_demo_dag',
    start_date=datetime(2022, 11, 9),
    catchup=False,
    schedule_interval='0 2 * * *')


def print_hello():
    return "hello!"


print_hello = PythonOperator(
    task_id='print_hello',
    # python_callable param points to the function you want to run
    python_callable=print_hello,
    # dag param points to the DAG that this task is a part of
    dag=dag)


bq_task = bigquery_operator.BigQueryOperator(
    task_id='bq_task',
    sql="SELECT * from `{}.{}.{}` limit 20".format(gcp_project_id, 'aa', 'ds1'),
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    destination_dataset_table='{}.{}.{}'.format(gcp_project_id, 'aa', 'ds2'))

cli_command = BashOperator(
    task_id="bash_command",
    bash_command="dbt --version"
)

# Assign the order of the tasks in our DAG
print_hello >> bq_task >> cli_command

Upload DAG to Composer

You can upload DAG to Composer using gcloud or console.

 
gcloud composer environments storage dags import --environment  my-composer-envt-6f0d  --location us-central1 --source dp_obs.py

Airflow-UI Status

The DAG is deployed as shown below.



DAG executed successfully.




Let's check logs of the DAG


DBT and Cloud Composer are working good.

Airflow DBT Operators

For airflow-dbt operators to work we will have to create some file structure needed by operators. 



We can upload this file structure to GCS Bucket of cloud composer:

gsutil -m cp -R first_project gs://us-central1-my-composer-envt-6f0-35e280a2-bucket/dags

The DAGs folder on GCS Bucket will look like below

Drill Down of first_project folder on GCS.



DAG with Airflow-DBT Operators


I updated my DAG as below:

from datetime import datetime

from airflow import DAG
from airflow.contrib.operators import bigquery_operator
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow_dbt.operators.dbt_operator import (
    DbtSeedOperator,
    DbtRunOperator,
    DbtTestOperator
)

gcp_project_id = "my_gcp_project_id"

dag = DAG(
    dag_id='composer_demo_dag',
    start_date=datetime(2022, 11, 9),
    catchup=False,
    schedule_interval='0 2 * * *')


def print_hello():
    return "hello!"


print_hello = PythonOperator(
    task_id='print_hello',
    # python_callable param points to the function you want to run
    python_callable=print_hello,
    # dag param points to the DAG that this task is a part of
    dag=dag)


bq_task = bigquery_operator.BigQueryOperator(
    task_id='bq_task',
    sql="SELECT * from `{}.{}.{}` limit 20".format(gcp_project_id, 'aa', 'ds1'),
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    destination_dataset_table='{}.{}.{}'.format(gcp_project_id, 'aa', 'ds2'))

cli_command = BashOperator(
    task_id="bash_command",
    bash_command="dbt  run --project-dir /home/airflow/gcs/dags/first_project/ --profiles-dir /home/airflow/gcs/dags/first_project/"
)

dbt_seed = DbtSeedOperator(
    task_id='dbt_seed',
    profiles_dir='/home/airflow/gcs/dags/first_project/',
    dir='/home/airflow/gcs/dags/first_project/'
)

dbt_run = DbtRunOperator(
    task_id='dbt_run',
    profiles_dir='/home/airflow/gcs/dags/first_project/',
    dir='/home/airflow/gcs/dags/first_project/'
)

dbt_test = DbtTestOperator(
    task_id='dbt_test',
    profiles_dir='/home/airflow/gcs/dags/first_project/',
    dir='/home/airflow/gcs/dags/first_project/'
    # dag=dag,
    retries=0,  # Failing tests would fail the task, and we don't want Airflow to try again
)

# Assign the order of the tasks in our DAG
print_hello >> bq_task >> dbt_seed >> dbt_run >> dbt_test

I am using the Cloud Composer's Sevice Account to interact with BigQuery using gcloud_default_connection.

first_project:
  outputs:
    dev:
      dataset: aa
      job_execution_timeout_seconds: 300
      job_retries: 1
      location: US
      method: oauth
      priority: interactive
      project: my_gcp_project_id
      threads: 1
      type: bigquery
  target: dev

Testing DAG on Cloud Composer

The DAG run successfully. I wanted the last task to fail as it don't meet the Data Quality Check.

Logs of DbtTestOperator are below:


Please post your queries below.

Happy Coding


Monday, January 24, 2022

Running Spark on Kubernetes on Minikube

Start Minikube

minikube start --memory 8192 --cpus 4

Download spark and set SPARK_HOME

cd $SPARK_HOME

K8S_SERVER=$(k config view --output=jsonpath='{.clusters[].cluster.server}')

Build the spark image in Docker Daemon inside the Minikube

eval $(minikube docker-env)

docker build -t spark:latest -f kubernetes/dockerfiles/spark/Dockerfile .

Check Docker Image

$ docker images spark REPOSITORY TAG IMAGE ID CREATED SIZE spark latest 3686fa10e74a 3 months ago 529MB

Start the spark-shell using below command.

./bin/spark-shell \

  --master k8s://$K8S_SERVER \

  --conf spark.kubernetes.container.image=spark:latest \

  --conf spark.kubernetes.context=minikube \

  --conf spark.kubernetes.namespace=spark \

  --verbose

Error like "unable to find valid certification path to requested target"

Start Kubectl Proxy

$ kubectl proxy

Starting to serve on 127.0.0.1:8001

Change --master to --master k8s://http://localhost:8001 and start spark-shell again.

scala> spark.version

res0: String = 3.1.2


scala> sc.master

res1: String = k8s://http://localhost:8001


scala> val values = List(List("1", "One") ,List("2", "Two") ,List("3", "Three"),List("4","4")).map(x =>(x(0), x(1)))

values: List[(String, String)] = List((1,One), (2,Two), (3,Three), (4,4))


scala> val df = spark.createDataFrame(values.map(e => Tuple2(e._1, e._2))).toDF("col1", "col1")

df: org.apache.spark.sql.DataFrame = [col1: string, col1: string]


scala> df.show()

+----+-----+

|col1| col1|

+----+-----+

|   1|  One|

|   2|  Two|

|   3|Three|

|   4|    4|

+----+-----+



scala> df.count()

res3: Long = 4


scala> df.first()

res4: org.apache.spark.sql.Row = [1,One]


./bin/spark-submit \

  --master k8s://http://localhost:8001 \

  --deploy-mode client \

  --name a1 \

  --class org.apache.spark.examples.SparkPi \

  --conf spark.kubernetes.container.image=spark:latest \

  --conf spark.kubernetes.driver.pod.name=a1 \

  --conf spark.executor.instances=1 \

  --conf spark.kubernetes.driver.limit.cores=1 \

  --conf spark.executor.cores=1 \

  --conf spark.executor.memory=500m \

  --conf spark.kubernetes.namespace=spark \

  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-serviceaccount \

  --verbose \

  local:///Users/aagarwal/dev/spark/examples/jars/spark-examples_2.12-3.1.2.jar 10

Thursday, January 13, 2022

Running Node Port Service on Kubernetes

 We will be running the below deployment.yaml on minikube.

apiVersion: apps/v1
kind: Deployment
metadata:
 name: helloweb
 labels:
   app: hello
spec:
 selector:
   matchLabels:
     app: hello
 template:
   metadata:
     labels:
       app: hello
   spec:
     containers:
     - name: hello-app
       image: gcr.io/google-samples/hello-app:1.0
       ports:
       - containerPort: 8080
Applied above deployment using below command:
$ kubectl apply -f deployment.yaml
deployment.apps/helloweb created
Check the status using below command:
$ kubectl get all
NAME                            READY   STATUS    RESTARTS   AGE
pod/helloweb-5dfb8764bf-9k7ss   1/1     Running   0          9s

NAME                 TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)   AGE
service/kubernetes   ClusterIP   10.96.0.1    <none>        443/TCP   86d

NAME                       READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/helloweb   1/1     1            1           9s

NAME                                  DESIRED   CURRENT   READY   AGE
replicaset.apps/helloweb-5dfb8764bf   1         1         1       9s

This will create helloweb application.

We will expose the above application using node port service. The node port service opens a port on node and maps it to node/cluster ip.

apiVersion: v1
kind: Service
metadata:
  name: helloweb-svc
  labels:
    app: hello
spec:
  type: NodePort
  ports:
  - port: 9080
    targetPort: 8080
  selector:
    app: hello

This maps the application target port 8080 to port 9080.

Create node port service using below command:
$  kubectl apply -f service-nodeport.yaml
service/helloweb-svc created
 
Check the status using below command:
$ kubectl get all
NAME                            READY   STATUS    RESTARTS   AGE
pod/helloweb-5dfb8764bf-9k7ss   1/1     Running   0          52s

NAME                   TYPE        CLUSTER-IP    EXTERNAL-IP   PORT(S)          AGE
service/helloweb-svc   NodePort    10.97.186.9   <none>        9080:31467/TCP   4s
service/kubernetes     ClusterIP   10.96.0.1     <none>        443/TCP          87d

NAME                       READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/helloweb   1/1     1            1           52s

NAME                                  DESIRED   CURRENT   READY   AGE
replicaset.apps/helloweb-5dfb8764bf   1         1         1       52s        

The node port service maps the 9080 to 8080 using node port 31647.

We can see the application in browser by port forwarding to 7080 using below command:
$  kubectl port-forward service/helloweb-svc 7080:9080
Forwarding from 127.0.0.1:7080 -> 8080
Forwarding from [::1]:7080 -> 8080
Handling connection for 7080


Open the browser  http://localhost:7080/ :



You can also access using minikube service command:
$   minikube service helloweb-svc
|-----------|--------------|-------------|---------------------------|
| NAMESPACE |     NAME     | TARGET PORT |            URL            |
|-----------|--------------|-------------|---------------------------|
| default   | helloweb-svc |        9080 | http://192.168.49.2:31467 |
|-----------|--------------|-------------|---------------------------|
🏃  Starting tunnel for service helloweb-svc.
|-----------|--------------|-------------|------------------------|
| NAMESPACE |     NAME     | TARGET PORT |          URL           |
|-----------|--------------|-------------|------------------------|
| default   | helloweb-svc |             | http://127.0.0.1:56345 |
|-----------|--------------|-------------|------------------------|
🎉  Opening service default/helloweb-svc in default browser...
❗  Because you are using a Docker driver on darwin, the terminal needs to be open to run it.
This will open browser and run the service.

Running it on GKE:


We can create a cluster using below command:
$  gcloud container clusters create hello-cluster --num-nodes=1 --zone "us-central1-c" --project "jovial-honor-327604"
Once the cluster is created open terminal and run below command:
$  gcloud container clusters get-credentials hello-cluster --zone "us-central1-c"
Create deployment and service using above steps. This will launch the service with External-IP you can use this node IP to access the service from the browser by opening the node port on the firewall.

$  kubectl get service helloweb-svc --output yaml
spec:
    ...
    ports:
    - nodePort: 30876  # external-ip; http://ip-1:30876; http://ip-2:30876 ....
      port: 80         # cluster-ip:port; http://cluster-ip:80/
      protocol: TCP
      targetPort: 8080 # application port
    selector:
using the NodePort from above to open firewall:

$  gcloud compute firewall-rules create test-node-port --allow tcp:NODE_PORT

In your browser's address bar, enter the following:

NODE_IP_ADDRESS:NODE_PORT

Happy Coding !!!




Friday, October 22, 2021

Run Redis Cluster on Docker

Introduction 

I was recently asked to implement Redis-Cluster  This cluster will spawn 3 master and 3 slave nodes.

Cluster

I have used the docker-compose.yml from Bitnami:
version: '2'
services:
  r01:
    image: docker.io/bitnami/redis-cluster:6.2
    hostname: r01
    container_name: r01
    environment:
      - ALLOW_EMPTY_PASSWORD=yes
      - REDIS_NODES=r01 r02 r03 r04 r05 r06
    networks:
      - redis_net

  r02:
    image: docker.io/bitnami/redis-cluster:6.2
    hostname: r02
    container_name: r02
    environment:
      - ALLOW_EMPTY_PASSWORD=yes
      - REDIS_NODES=r01 r02 r03 r04 r05 r06
    networks:
      - redis_net

  r03:
    image: docker.io/bitnami/redis-cluster:6.2
    hostname: r03
    container_name: r03
    environment:
      - ALLOW_EMPTY_PASSWORD=yes
      - REDIS_NODES=r01 r02 r03 r04 r05 r06
    networks:
      - redis_net

  r04:
    image: docker.io/bitnami/redis-cluster:6.2
    hostname: r04
    container_name: r04
    environment:
      - ALLOW_EMPTY_PASSWORD=yes
      - REDIS_NODES=r01 r02 r03 r04 r05 r06
    networks:
      - redis_net

  r05:
    image: docker.io/bitnami/redis-cluster:6.2
    hostname: r05
    container_name: r05
    environment:
      - ALLOW_EMPTY_PASSWORD=yes
      - REDIS_NODES=r01 r02 r03 r04 r05 r06
    networks:
      - redis_net

  r06:
    image: docker.io/bitnami/redis-cluster:6.2
    hostname: r06
    container_name: r06
    depends_on:
      - r01
      - r02
      - r03
      - r04
      - r05
    environment:
      - ALLOW_EMPTY_PASSWORD=yes
      - REDIS_NODES=r01 r02 r03 r04 r05 r06
      - REDIS_CLUSTER_REPLICAS=1
      - REDIS_CLUSTER_CREATOR=yes
    networks:
      - redis_net


networks:
  redis_net:
    driver: bridge
This will launch 6 containers:

$ docker ps
CONTAINER ID   IMAGE                       COMMAND                  CREATED          STATUS          PORTS      NAMES
4a2d11bd1275   bitnami/redis-cluster:6.2   "/opt/bitnami/script…"   19 seconds ago   Up 18 seconds   6379/tcp   r06
702d098a2c9c   bitnami/redis-cluster:6.2   "/opt/bitnami/script…"   20 seconds ago   Up 19 seconds   6379/tcp   r03
878870c4e8e6   bitnami/redis-cluster:6.2   "/opt/bitnami/script…"   20 seconds ago   Up 18 seconds   6379/tcp   r02
a8d0d5302e1c   bitnami/redis-cluster:6.2   "/opt/bitnami/script…"   20 seconds ago   Up 19 seconds   6379/tcp   r01
9dd772da1aee   bitnami/redis-cluster:6.2   "/opt/bitnami/script…"   20 seconds ago   Up 18 seconds   6379/tcp   r04
263a280f180a   bitnami/redis-cluster:6.2   "/opt/bitnami/script…"   20 seconds ago   Up 18 seconds   6379/tcp   r05
We can now login to one of the containers:

$ redis-cluster % docker exec -it r01 /bin/bash
I have no name!@r01:/$
You can test the cluster settings:

$ redis-cli
127.0.0.1:6379> cluster info
cluster_state:ok
cluster_slots_assigned:16384
cluster_slots_ok:16384
cluster_slots_pfail:0
cluster_slots_fail:0
cluster_known_nodes:6
cluster_size:3
cluster_current_epoch:6
cluster_my_epoch:1
cluster_stats_messages_ping_sent:52
cluster_stats_messages_pong_sent:55
cluster_stats_messages_sent:107
cluster_stats_messages_ping_received:50
cluster_stats_messages_pong_received:52
cluster_stats_messages_meet_received:5
cluster_stats_messages_received:107
You can check the cluster nodes status:
 
$ cluster nodes
535a218f52e42503bfe306c82dc6308b1bdb0cd5 172.21.0.3:6379@16379 myself,master - 0 1634935181000 3 connected 10923-16383
3dec2af9999f386b0212104f08badd3124a43535 172.21.0.4:6379@16379 slave 535a218f52e42503bfe306c82dc6308b1bdb0cd5 0 1634935181702 3 connected
1b554ca155d2008b319b0f3117e970a446169977 172.21.0.5:6379@16379 slave b02f0d075b12ac25c16230b0ce14f9482ec251e1 0 1634935180692 1 connected
145398bae4b7347c382fb8417fb67d35f0e77715 172.21.0.7:6379@16379 slave 9932d062c2f355aab0565758b0d686b92a20c8f9 0 1634935180000 2 connected
b02f0d075b12ac25c16230b0ce14f9482ec251e1 172.21.0.2:6379@16379 master - 0 1634935179682 1 connected 0-5460
9932d062c2f355aab0565758b0d686b92a20c8f9 172.21.0.6:6379@16379 master - 0 1634935182677 2 connected 5461-10922
You can run some set and get:

172.21.0.3:6379> set hi "hello"
-> Redirected to slot [16140] located at 172.21.0.3:6379
OK
172.21.0.3:6379> get hi
"hello"
172.21.0.3:6379>
We can test the same on other nodes as well to test if the cluster is working by  et hi  on nodes r2-r6.

Happy Coding !!!

Monday, October 11, 2021

Nifi Multi Node Cluster on Docker

Introduction

I will run Apache Nifi multi node cluster on docker. I will use external Zookeeper for it.

Multi Node Cluster

I will use docker-compose to launch a 3 node Apache Nifi cluster. There will be a separate node for zookeeper.

I am using custom Docker file so that I don't download the 1.5GB+ zip file multiple times and slow down the container launch. I have manually downloaded Nifi and Nifi-toolkit zip file for 1.14.0 version.

FROM openjdk:8-jre

ARG UID=1000
ARG GID=1000
ARG NIFI_VERSION=1.14.0

ENV NIFI_BASE_DIR=/opt/nifi
ENV NIFI_HOME ${NIFI_BASE_DIR}/nifi-current
ENV NIFI_TOOLKIT_HOME ${NIFI_BASE_DIR}/nifi-toolkit-current

ENV NIFI_PID_DIR=${NIFI_HOME}/run
ENV NIFI_LOG_DIR=${NIFI_HOME}/logs

ADD sh/ ${NIFI_BASE_DIR}/scripts/
RUN chmod -R +x ${NIFI_BASE_DIR}/scripts/*.sh

# Setup NiFi user and create necessary directories
RUN groupadd -g ${GID} nifi || groupmod -n nifi `getent group ${GID} | cut -d: -f1` \
    && useradd --shell /bin/bash -u ${UID} -g ${GID} -m nifi \
    && mkdir -p ${NIFI_BASE_DIR} \
    && chown -R nifi:nifi ${NIFI_BASE_DIR} \
    && apt-get update \
    && apt-get install -y jq xmlstarlet procps nano vim iputils-ping

USER nifi

# Download, validate, and expand Apache NiFi Toolkit binary.
ADD nifi-toolkit-${NIFI_VERSION}-bin.zip ${NIFI_BASE_DIR}/
RUN unzip ${NIFI_BASE_DIR}/nifi-toolkit-${NIFI_VERSION}-bin.zip -d ${NIFI_BASE_DIR} \
    && rm ${NIFI_BASE_DIR}/nifi-toolkit-${NIFI_VERSION}-bin.zip \
    && mv ${NIFI_BASE_DIR}/nifi-toolkit-${NIFI_VERSION} ${NIFI_TOOLKIT_HOME} \
    && ln -s ${NIFI_TOOLKIT_HOME} ${NIFI_BASE_DIR}/nifi-toolkit-${NIFI_VERSION}

# Download, validate, and expand Apache NiFi binary.
ADD nifi-${NIFI_VERSION}-bin.zip ${NIFI_BASE_DIR}/
RUN unzip ${NIFI_BASE_DIR}/nifi-${NIFI_VERSION}-bin.zip -d ${NIFI_BASE_DIR} \
    && rm ${NIFI_BASE_DIR}/nifi-${NIFI_VERSION}-bin.zip \
    && mv ${NIFI_BASE_DIR}/nifi-${NIFI_VERSION} ${NIFI_HOME} \
    && mkdir -p ${NIFI_HOME}/conf \
    && mkdir -p ${NIFI_HOME}/database_repository \
    && mkdir -p ${NIFI_HOME}/flowfile_repository \
    && mkdir -p ${NIFI_HOME}/content_repository \
    && mkdir -p ${NIFI_HOME}/provenance_repository \
    && mkdir -p ${NIFI_HOME}/state \
    && mkdir -p ${NIFI_LOG_DIR} \
    && ln -s ${NIFI_HOME} ${NIFI_BASE_DIR}/nifi-${NIFI_VERSION}

VOLUME ${NIFI_LOG_DIR} \
       ${NIFI_HOME}/conf \
       ${NIFI_HOME}/database_repository \
       ${NIFI_HOME}/flowfile_repository \
       ${NIFI_HOME}/content_repository \
       ${NIFI_HOME}/provenance_repository \
       ${NIFI_HOME}/state

# Clear nifi-env.sh in favour of configuring all environment variables in the Dockerfile
RUN echo "#!/bin/sh\n" > $NIFI_HOME/bin/nifi-env.sh

# Web HTTP(s) & Socket Site-to-Site Ports
EXPOSE 8080 8443 10000 8000

WORKDIR ${NIFI_HOME}

ENTRYPOINT ["../scripts/start.sh"]
You can create a local image using below command: 

$  docker build -t my_nifi -f Dockerfile_manual .
This will generate the image in few seconds.


Th Let's now create a  docker-compose : 

version: "3"
services:
  zk01:
    hostname: zk01
    container_name: zk01
    image: 'bitnami/zookeeper:3.7'
    ports:
      - '2181'
      - '2888'
      - '3888'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
      - ZOO_SERVER_ID=1
      - ZOO_SERVERS=0.0.0.0:2888:3888
    networks:
      - nifinet

  nifi01:
    image: my_nifi:latest
    container_name: nifi01
    hostname: nifi01
    ports:
      - 6980:8080
    volumes:
      - /Users/aagarwal/dev/docker/java_cluster/nifi_conf1:/opt/nifi/nifi-current/conf
    networks:
      - nifinet
    environment:
      - NIFI_WEB_HTTP_PORT=8080
      - NIFI_CLUSTER_IS_NODE=true
      - NIFI_CLUSTER_NODE_PROTOCOL_PORT=8082
      - NIFI_ZK_CONNECT_STRING=zk01:2181
      - NIFI_ELECTION_MAX_WAIT=1 min
      - NIFI_SENSITIVE_PROPS_KEY=testpassword

  nifi02:
    image: my_nifi:latest
    container_name: nifi02
    hostname: nifi02
    ports:
      - 6979:8080
    volumes:
      - /Users/aagarwal/dev/docker/java_cluster/nifi_conf2:/opt/nifi/nifi-current/conf
    networks:
      - nifinet
    environment:
      - NIFI_WEB_HTTP_PORT=8080
      - NIFI_CLUSTER_IS_NODE=true
      - NIFI_CLUSTER_NODE_PROTOCOL_PORT=8082
      - NIFI_ZK_CONNECT_STRING=zk01:2181
      - NIFI_ELECTION_MAX_WAIT=1 min
      - NIFI_SENSITIVE_PROPS_KEY=testpassword

  nifi03:
    image: my_nifi:latest
    container_name: nifi03
    hostname: nifi03
    ports:
      - 6978:8080
    volumes:
      - /Users/aagarwal/dev/docker/java_cluster/nifi_conf3:/opt/nifi/nifi-current/conf
    networks:
      - nifinet
    environment:
      - NIFI_WEB_HTTP_PORT=8080
      - NIFI_CLUSTER_IS_NODE=true
      - NIFI_CLUSTER_NODE_PROTOCOL_PORT=8082
      - NIFI_ZK_CONNECT_STRING=zk01:2181
      - NIFI_ELECTION_MAX_WAIT=1 min
      - NIFI_SENSITIVE_PROPS_KEY=testpassword

networks:
  nifinet:
    driver: bridge

Use below command to create the cluster:

$  docker-compose -f docker-compose.yaml up
This will launch the 1 node for zookeeper and 3 nodes for Nifi.


This will take 10-15 minutes for the Nifi nodes for them to be available.

Open the browser  http://localhost:6979/nifi :

The nifi is ready for use.

Happy Coding !!!

Running Multi Node Zookeeper Cluster on Docker

Introduction 

I am working on installing Apache Nifi on multi node cluster. For this I needed a Zookeeper multi node cluster. 

Zookeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. 

Multi Node Cluster

We will run 3 node cluster on docker. We will use below docker-compose.yaml as below:

version: "3"
services:
  zk01:
    hostname: zk01
    container_name: zk01
    image: 'bitnami/zookeeper:3.7'
    ports:
      - '2181'
      - '2888'
      - '3888'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
      - ZOO_SERVER_ID=1
      - ZOO_SERVERS=0.0.0.0:2888:3888,zk02:2888:3888,zk03:2888:3888
    networks:
      - zk_net

  zk02:
    hostname: zk02
    container_name: zk02
    image: 'bitnami/zookeeper:3.7'
    ports:
      - '2181'
      - '2888'
      - '3888'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
      - ZOO_SERVER_ID=2
      - ZOO_SERVERS=zk01:2888:3888,0.0.0.0:2888:3888,zk03:2888:3888
    networks:
      - zk_net

networks:
  zk_net:
    driver: bridge

I am using bitnami-zookeeper image for it. You can use  latest version instead of 3.7 version if interested. 

This will launch a 3 node zookeeper cluster without publishing/exposing the nodes to the host machines. 

$  docker-compose -f docker-compose.yaml up
This will show the logs of all the 3 nodes as they start running.



After few minutes the log will be like below:



We can check the status by running below command:

$  docker ps
This will show all the 3 instances running.


You can now login to one of the instance/node using below command.

$ docker exec -it zk02 /bin/bash
I have no name!@zk02:/$
This will log you in to the zk02 node.

Testing the Cluster

We will now create some entry on zk02 and it will be reflected/replicated to all the other Zookeeper nodes immediately. We will start zookeeper cli for it.

$ zkCli.sh -server zk02:2181
/opt/bitnami/java/bin/java
Connecting to zk02:2181
2021-10-12 05:56:18,058 [myid:] - INFO  [main:Environment@98] - Client environment:zookeeper.version=3.7.0-e3704b390a6697bfdf4b0bef79e3da7a4f6bac4b, built on 2021-03-17 09:46 UTC
2021-10-12 05:56:18,064 [myid:] - INFO  [main:Environment@98] - Client environment:host.name=zk02
2021-10-12 05:56:18,065 [myid:] - INFO  [main:Environment@98] - Client environment:java.version=11.0.12
2021-10-12 05:56:18,069 [myid:] - INFO  [main:Environment@98] - Client environment:java.vendor=BellSoft

This will open below zk cli command prompt: 

2021-10-12 05:56:18,195 [myid:zk02:2181] - INFO  [main-SendThread(zk02:2181):ClientCnxn$SendThread@1005] - Socket connection established, initiating session, client: /172.28.0.2:43996, server: zk02/172.28.0.2:2181
2021-10-12 05:56:18,249 [myid:zk02:2181] - INFO  [main-SendThread(zk02:2181):ClientCnxn$SendThread@1438] - Session establishment complete on server zk02/172.28.0.2:2181, session id = 0x20010a8acf10000, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: zk02:2181(CONNECTED) 0] 
Now run create : 
[zk: zk02:2181(CONNECTED) 0] create /hello world
Created /hello
We can query it as below
[zk: zk02:2181(CONNECTED) 1] get /hello
world
We can also check by logging into other nodes. We will follow the above steps to login and query on other node.

Clean up

We can delete the entry:
[zk: zk02:2181(CONNECTED) 2] delete /hello

Happy Coding !!