Try Apache Airflow
We have talked about 3 tools for integrating data with scheduling
- Task scheduler for Windows (Data Integration (EP 3 end) - clock-work)
- Crontab for UNIX (Data Integration (EP 3 end) - clock-work)
- Rundeck for UNIX (Try Rundeck for automated deployment)
Here we go again with this, Apache Airflow. It is the main tools of us right now.
Prologue
Apache Airflow is an open-source program under Apache foundation. It allows us to create each step to run in arbitrary sequences and conditions like a flow. The flow is called “DAG” in which stands for “Directed Acyclic Graph”.
Apache Airflow is one of popular tools for Data Engineers like us as it is easy to use and yes, it’s free. We can deploy either Bash or Python scripts on it from day one.
Time to go!
Build an Airflow Docker
We use a Docker image of Airflow this time.
docker pull puckel/docker-airflow
docker run -d -p 8080:8080 – name airflow -v /path/in/my/machine/:/usr/local/airflow/dags puckel/docker-airflow webserver
Commands above is to download an image of Puckel/docker-airflow (It’s unofficial one before an official image). Then we use docker run
to build a container with the following parameters:
-d
(detach) execute it as a background process-p 8080:8080
(publish) to forward a port of the container to our machine. The first 8080 is a port of our machine and the last 8080 is the container’s.--name airflow
to name the container as “airflow”-v /path/in/my/machine/:/usr/local/airflow/dags
(volume) to mount path of machine to the container. Here we mount the path to/airflow/dags
that is the main folder to run scripts on Airflowpuckel/docker-airflow
is the name of the imagewebserver
specify a service name. It is a web service because Apache Airflow works as a web-based application.
Start a web of Airflow
We need to verify the container is running by this command:
docker ps -a
That is running as we saw “Up” not “Exited”. We now can open a web of Airflow at http://localhost:8080/admin/ with the default username/password that is “airflow”/”airflow”.
Run a sample DAG
We are going to use the sample script from the official doc https://airflow.apache.org/docs/stable/tutorial.html. Save it as a new Python file named “tutorial.py” in the mounted path.
Refresh the page and it will be a new row in the table. We can see it is “tutorial” DAG.
Click on it to show “Tree view”, the hierarchy and history of this DAG.
When click on “Code”, the source code of the DAG will be shown.
Run the DAG
There are 2 ways to run the DAG – manual run by click the first play button located in the column “Link” and schedule run. Enable the DAG at the first of all via the switch at the left of DAG’s name.
When it started to run, the DAG()
will be initiated at the first place.
dag = DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
'tutorial'
is the DAG’s name as we saw in the table. The name must be alphabets, numbers and underscores.
Operators
Each steps in the DAG is operators. Here is one of operators in the “tutorial.py”. It is BashOperator means it can run Bash script.
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
The t1 is BashOperator
to run a command date
to print the current date on the screen. The task_id
is the name of the step (only contains numbers, alphabets, and underscores as well). And the dag=
is for mapping the operator to the DAG()
variable.
Let’s say we have declared all operators then this is the sample flow.
t1 >> [t2, t3]
It says, when t1
is done, do t2
and t3
in parallel. The Graph View becomes this.
If we want to run all three in a straight sequence, here is the example.
t1 >> t2 >> t3
For more operators, check it out here https://airflow.apache.org/docs/stable/howto/operator/index.html.
Schedule
Scheduling is a main feature of Airflow.
Airflow uses crontab (try this: https://crontab.guru/) to control the schedule time.
In Airflow, all schedule time is the past by one cycle. Here are the samples:
Run every day at 8:29 PM
Each run will always be stamped as the day before. The image below shows the actual time is on 27th (Started: 2020-09-27) but Airflow specifies run date as of 26th (Run: 2020-09-26).
Run every 19th minute
It represents the time by last hour. This actual time is 10:19 PM (Started: 2020-10-10T23:19:20) but the run timestamp was a hour ago (Run: 2020-10-10T22:19:00).
Here is the reason. Airflow sets the time of schedule by data date as the ETL (Extract-Load-Transform) batching is for a dataset with the past timestamps. For example, our scheduled job is now processing the data as of yesterday, Airflow treats the schedule time is as of yesterday.
Reference: https://stackoverflow.com/questions/39612488/airflow-trigger-dag-execution-date-is-the-next-day-why
For manual run, the run timestamp is the current time as it is supposed to be. The run_id
of this case contains manual_
as a prefix.
System variables
Airflow also provides the system variables (see more at https://airflow.apache.org/docs/stable/macros-ref). For my own experiences, have only chances to use {{ds}}
that is the current date.
Here is the sample code. It is to print the current date.
t4 = BashOperator(
task_id="test_t4",
depends_on_past=False,
bash_command="echo {{ds}}",
dag=dag
)
They are just an example of the usage for Apache Airflow. Hope this is useful for ya
See ya~