Argo workflows for data pipelines in Kubernetes native way using Apache-Airflow operators and hooks
Concept:
There are not many open source options to do data pipelines native to modern container-orchestration system like Kubernetes. Argo allows for Kubernetes native workflows. The idea is to use the existing variety of hooks and operators available in Apache-Airflow and use them to run a data pipeline native to Kubernetes (using Kubernetes native primitives and Argo for workflow management). This allows us to define data-pipelines which are not limited to using only Apache-Airflow operators and we can using existing container images to do data operations which are either not supported in Apache Airflow or are not converted to python in some way. Argo workflows also have the advantage of being able to combine DAG’s with non-DAG steps based operations in a single workflow and can launch arbitrary container images to do the operation. Argo workflow DAGs will also allow users to execute arbitrary set of tasks for a given dag with just parameters (and it would execute their dependant tasks as well)
Architecture:
Break each individual task (instantiated operator) in Apache Airflow into separate files (python code used in writing Apache Airflow Dags) and define individual dags for each task.
As part of defining the task, we also define the connections needed for the task. These connections can usually be defined with a single environment variable in Apache Airflow, however for some connections like for google cloud, you need a separate operator (in Apache Airflow 1.8.2). See individual airflow tasks
Use a Apache-Airflow image containing all the operators (based on v1.8.2) but running with sqllite and sequential executor to execute the tasks as individual steps/dag entries in Argo workflow
Use backfill in Apache-Airflow to run the individual tasks to completion in a given step
Using Kubernetes native CronJob to do scheduling of workflows
argo submit data-pipeline/airflow-operator-examples/bigquery-example/workflows/bigquery_step_init_workflow.yaml -p gcp-project="<Name of your google cloud project>"
This will create bigquery dataset called github_trends and four tables github_daily_metrics, github_agg, hackernews_agg and hackernews_github_agg. It will also fill in the last 40 days of data for the table for the github_daily_metrics table so you don’t have to keep getting that data from the public set. See the Google example.
At this point you are ready to run the full data workflow. You have two options:
Run the CronJob and wait for a day to see the data fill out the final data in hackernews_github_agg by running the following:
If you do the above you need to modify the bigquery_hn_github_trends_cron.yaml to add -p gcp-project=”“ to end of line in file data-pipeline/airflow-operator-examples/bigquery-example/cronjobs/bigquery_hn_github_trends_cron.yaml
Run the workflow manually to fill out for various dates:
argo submit data-pipeline/airflow-operator-examples/bigquery-example/workflows/bigquery_step_workflow.yaml -p gcp-project="<Name of your google cloud project>" -p run-date="<Yesterday's date in form YYYYMMDD>"
The reason to use yesterday’s date is that sometimes the hackernews public data set is not updated for the previous day until much later the next day. At this point you should have the hackernews_github_agg table filled with the data for the day the workflow ran for.
You can plot the table in datastudio in google cloud apps by copying from my example and changing the data source you point to your own tables.
Argo workflows for data pipelines in Kubernetes native way using Apache-Airflow operators and hooks
Concept:
There are not many open source options to do data pipelines native to modern container-orchestration system like Kubernetes. Argo allows for Kubernetes native workflows. The idea is to use the existing variety of hooks and operators available in Apache-Airflow and use them to run a data pipeline native to Kubernetes (using Kubernetes native primitives and Argo for workflow management). This allows us to define data-pipelines which are not limited to using only Apache-Airflow operators and we can using existing container images to do data operations which are either not supported in Apache Airflow or are not converted to python in some way. Argo workflows also have the advantage of being able to combine DAG’s with non-DAG steps based operations in a single workflow and can launch arbitrary container images to do the operation. Argo workflow DAGs will also allow users to execute arbitrary set of tasks for a given dag with just parameters (and it would execute their dependant tasks as well)
Architecture:
Example:
Assumptions:
We use the example from Google using BigQuery related operators and Google Cloud connections to do hacker news and github trend
This example uses workflows for two things:
How to Run the Example:
Get access to BigQuery:
Create Secret in Kubernetes based on the Key.json file:
Check out example repository:
Submit the initialization workflow:
This will create bigquery dataset called github_trends and four tables github_daily_metrics, github_agg, hackernews_agg and hackernews_github_agg. It will also fill in the last 40 days of data for the table for the github_daily_metrics table so you don’t have to keep getting that data from the public set. See the Google example.
At this point you are ready to run the full data workflow. You have two options:
If you do the above you need to modify the bigquery_hn_github_trends_cron.yaml to add -p gcp-project=”“ to end of line in file data-pipeline/airflow-operator-examples/bigquery-example/cronjobs/bigquery_hn_github_trends_cron.yaml
CHANGE:
TO
Run the workflow manually to fill out for various dates:
The reason to use yesterday’s date is that sometimes the hackernews public data set is not updated for the previous day until much later the next day. At this point you should have the hackernews_github_agg table filled with the data for the day the workflow ran for.
You can plot the table in datastudio in google cloud apps by copying from my example and changing the data source you point to your own tables.