An Apache Airflow - GCS and Cloud Pub/Sub

If you just installed Airflow on GKE, it is time to check it is working and we are able to communicate with different Google Cloud Components, including Google Cloud Storage and Google Pub/Sub.

An Airflow REST API

After installation, one may wish to visit an Airflow web UI and check that application is working. So far, we have not configured networking/load balancing (which is really a subject in its own), so the easiest way to access web UI is to use port forwarding:

kubectl port-forward pod/web-0 8080:8080

Now we may check Airflow state and run our DAGs manually or using Airflow API endpoints:

# test Airflow webserver is running
curl localhost:8080/api/experimental/test
{"status":"OK"}
# Start DAG with <DAG_ID>
curl -X POST http://localhost:8080/api/experimental/dags/<DAG_ID>/dag_runs     -H 'Cache-Control: no-cache'     -H 'Content-Type: application/json'     -d '{"conf":"{"key":"value"}"}'

By default, upon creation, DAGs are paused, so you may need to enable DAGs manually or set configuration parameter dags_are_paused_at_creation to False.

Google Cloud Storage: simple storage using GoogleCloudStorageHook

We have already configured access to Cloud SQL, so we may query data from SQL databases. Another useful possibility is the ability to save data in Google cloud storage. Airflow comes with many helpers, simplifying access to different services, including GCS. To test cloud storage, lets create GCS buckets using command line gsutil utility (as bucket names must be globally unique you may need to change bucket names):

gsutil mb gs://airflow-test-source
gsutil mb gs://airflow-test-dest

Now, using GoogleCloudStorageHook we may test simple storage operations. During our test drive, we save file to bucket, copy it to another bucket, and finally download it on a local disk:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
import json
from datetime import datetime

FILE_NAME = f'./test.json'
DOWNLOAD_NAME = f'./downloaded.json'

def save_data():
    with open(FILE_NAME, 'w') as f:
        json.dump({"name":"name", "age":10}, f)

def upload_file_func():
    save_data()
    hook = GoogleCloudStorageHook()
    source_bucket = 'airflow-test-source'
    source_object = '/data/test.json'
    hook.upload(source_bucket, source_object, FILE_NAME)

def copy_file_func():
    hook = GoogleCloudStorageHook()
    source_bucket = 'airflow-test-source'
    source_object = '/data/test.json'

    dest_bucket = 'airflow-test-dest'
    dest_object = 'test.json'
    hook.copy(source_bucket, source_object, dest_bucket, dest_object)

def download_file_func():
    hook = GoogleCloudStorageHook()
    dest_bucket = 'airflow-test-dest'
    dest_object = 'test.json'
    hook.download(dest_bucket, dest_object, DOWNLOAD_NAME)
    with open(DOWNLOAD_NAME) as json_file:
        data = json.load(json_file)
    print(data)

with DAG('gcs_examples', description='DAG', schedule_interval=None, start_date=datetime(2018, 11, 1)) as dag:
    upload_file = PythonOperator(task_id='upload_file', python_callable=upload_file_func)
    copy_file   = PythonOperator(task_id='copy_file', python_callable=copy_file_func)
    download_file = PythonOperator(task_id='download_file', python_callable=download_file_func)
    upload_file >> copy_file >> download_file

Up to this point we have learned how to schedule our Airflow tasks, invoke them on demand, query data and save results after some data processing. The only missing piece is the ability to send notifications. We could do it using conventional HTTP requests, but on GCP we have an additional option: Cloud Pub/Sub. To test it, we may first create topic and publish/subscribe manually:

# create topic and subscription
gcloud pubsub topics create my-topic
gcloud pubsub subscriptions create my-sub --topic=my-topic
# publish receive
gcloud pubsub topics publish my-topic --message="hello"
gcloud pubsub subscriptions pull my-sub --auto-ack

After which we may publish notification in our DAGs using PubSubHook:

from airflow.contrib.hooks.gcp_pubsub_hook import PubSubHook
from base64 import b64encode as b64e

hook = PubSubHook()

hook.publish("google_project_id", "my-topic", [{"data": b64e("hello".encode()).decode(), 'attributes': {'type': 'greeting'}  }])

Summary

After installation to Google Kubernetes Engine, we checked our installation is working and tested two Google services: Google Cloud Storage and Google Cloud Pub/Sub. Now we are able to schedule our workflows, invoke them on demand, query and save processed data and notify other systems upon completion. With such a tooling under our belt we may start working on some serious ETL problems! The source code and installation scripts used in this project could be found on GitHub.