An Apache Airflow - GCS and Cloud Pub/Sub
If you just installed Airflow on GKE, it is time to check it is working and we are able to communicate with different Google Cloud Components, including Google Cloud Storage and Google Pub/Sub.
An Airflow REST API
After installation, one may wish to visit an Airflow web UI and check that application is working. So far, we have not configured networking/load balancing (which is really a subject in its own), so the easiest way to access web UI is to use port forwarding:
kubectl port-forward pod/web-0 8080:8080
Now we may check Airflow state and run our DAGs manually or using Airflow API endpoints:
# test Airflow webserver is running curl localhost:8080/api/experimental/test {"status":"OK"} # Start DAG with <DAG_ID> curl -X POST http://localhost:8080/api/experimental/dags/<DAG_ID>/dag_runs -H 'Cache-Control: no-cache' -H 'Content-Type: application/json' -d '{"conf":"{"key":"value"}"}'
By default, upon creation, DAGs are paused, so you may need to enable DAGs manually or set configuration parameter dags_are_paused_at_creation
to False.
Google Cloud Storage: simple storage using GoogleCloudStorageHook
We have already configured access to Cloud SQL, so we may query data from SQL databases. Another useful possibility is the ability to save data in Google cloud storage. Airflow comes with many helpers, simplifying access to different services, including GCS. To test cloud storage, lets create GCS buckets using command line gsutil
utility (as bucket names must be globally unique you may need to change bucket names):
gsutil mb gs://airflow-test-source gsutil mb gs://airflow-test-dest
Now, using GoogleCloudStorageHook
we may test simple storage operations. During our test drive, we save file to bucket, copy it to another
bucket, and finally download it on a local disk:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
import json
from datetime import datetime
FILE_NAME = f'./test.json'
DOWNLOAD_NAME = f'./downloaded.json'
def save_data():
with open(FILE_NAME, 'w') as f:
json.dump({"name":"name", "age":10}, f)
def upload_file_func():
save_data()
hook = GoogleCloudStorageHook()
source_bucket = 'airflow-test-source'
source_object = '/data/test.json'
hook.upload(source_bucket, source_object, FILE_NAME)
def copy_file_func():
hook = GoogleCloudStorageHook()
source_bucket = 'airflow-test-source'
source_object = '/data/test.json'
dest_bucket = 'airflow-test-dest'
dest_object = 'test.json'
hook.copy(source_bucket, source_object, dest_bucket, dest_object)
def download_file_func():
hook = GoogleCloudStorageHook()
dest_bucket = 'airflow-test-dest'
dest_object = 'test.json'
hook.download(dest_bucket, dest_object, DOWNLOAD_NAME)
with open(DOWNLOAD_NAME) as json_file:
data = json.load(json_file)
print(data)
with DAG('gcs_examples', description='DAG', schedule_interval=None, start_date=datetime(2018, 11, 1)) as dag:
upload_file = PythonOperator(task_id='upload_file', python_callable=upload_file_func)
copy_file = PythonOperator(task_id='copy_file', python_callable=copy_file_func)
download_file = PythonOperator(task_id='download_file', python_callable=download_file_func)
upload_file >> copy_file >> download_file
Up to this point we have learned how to schedule our Airflow tasks, invoke them on demand, query data and save results after some data processing. The only missing piece is the ability to send notifications. We could do it using conventional HTTP requests, but on GCP we have an additional option: Cloud Pub/Sub. To test it, we may first create topic and publish/subscribe manually:
# create topic and subscription gcloud pubsub topics create my-topic gcloud pubsub subscriptions create my-sub --topic=my-topic # publish receive gcloud pubsub topics publish my-topic --message="hello" gcloud pubsub subscriptions pull my-sub --auto-ack
After which we may publish notification in our DAGs using PubSubHook
:
from airflow.contrib.hooks.gcp_pubsub_hook import PubSubHook from base64 import b64encode as b64e hook = PubSubHook() hook.publish("google_project_id", "my-topic", [{"data": b64e("hello".encode()).decode(), 'attributes': {'type': 'greeting'} }])
Summary
After installation to Google Kubernetes Engine, we checked our installation is working and tested two Google services: Google Cloud Storage and Google Cloud Pub/Sub. Now we are able to schedule our workflows, invoke them on demand, query and save processed data and notify other systems upon completion. With such a tooling under our belt we may start working on some serious ETL problems! The source code and installation scripts used in this project could be found on GitHub.