Airflow Docker with Xcom push and pull
Recently, in one projects I'm working on, we started to research technologies that can be used to design and execute data processing flows. Amount of data to be processed is counted in terabytes, hence we were aiming at solutions that can be deployed in the cloud. Solutions from Apache umbrella like Hadoop, Spark, or Flink were at the table from the very beginning, but we also looked at others like Luigi or Airflow, because our use case was neither MapReducable nor stream-based.
Airflow caught our attention and we decided to give it a shot just to see if we can create PoC using it*. In order to execute PoC faster rather than slower, we planned to provision Swarm cluster for this.
In the Airflow you can find couple of so-called operators that allow you to execute actions. There are operators for Bash or Python, but you can also find something for e.g. Hive. Fortunately there is also Docker operator for us.
Local PoC
PoC started on my laptop and not in the cluster. Thankfully, DockerOperator allows you to pass URL to docker daemon, so moving from laptop to cluster is close to just changing one parameter. Nice!
If you want to run Airflow server locally from inside container, and have it running as non-root (you should!) and you bind docker.sock from host to the container, you must create docker group in the container that mirrors docker group on your host and then add e.g. airflow user to this group. That does the trick...
So just running DockerOperator is not black magic. However, if your containers need to exchange data it starts to be a little bit more tricky.
Xcom push/pull
The push part is simple and documented. Just set xcom_push parameter to True and last line of container stdout will be published by Airflow as it was pushed programatically. It looks that this is natural Airflow way.
Pull is not that obvious. Perhaps because it's not documented. You can't read stdin or something. The way to do this involves joining two dots:
FROM debian
ENTRYPOINT echo '{"i_am_pushing": "json"}'
Simple enough. Now pulling container:
FROM debian
COPY ./entrypoint /
ENTRYPOINT ["/entrypoint"]
Entrypoint script can be whatever and will get the JSON as $1. Crucial (and also easy to miss) thing that is required for it to work is that ENTRYPOINT must use exec form. Yes, there are two forms of ENTRYPOINT. If you use the one without array, then parameters will not be passed to the container!
Finally, you can glue things together and you're done. The ti macro allows us to get data pushed by other task. ti stands for task_instance.
dag = DAG('docker', default_args=default_args, schedule_interval=timedelta(1))
t1 = DockerOperator(task_id='docker_1', dag=dag, image='docker_1', xcom_push=True)
t2 = DockerOperator(task_id='docker_2', dag=dag, image='docker_2', command='{{ ti.xcom_pull(task_ids="docker_1") }}')
t2.set_upstream(t1)
Conclusion
Docker can be used in Airflow along with Xcom push/pull functionality. It isn't very convenient and is not well documented I would say, but at least it works.
If time permits I'm going to create PR for documenting pull op. I don't know how it works out, because in Airflow GH project there are 237 PRs now and some of them are there since May 2016!
* the funny thing is that we considered Jenkins too! ;-)
Airflow caught our attention and we decided to give it a shot just to see if we can create PoC using it*. In order to execute PoC faster rather than slower, we planned to provision Swarm cluster for this.
In the Airflow you can find couple of so-called operators that allow you to execute actions. There are operators for Bash or Python, but you can also find something for e.g. Hive. Fortunately there is also Docker operator for us.
Local PoC
PoC started on my laptop and not in the cluster. Thankfully, DockerOperator allows you to pass URL to docker daemon, so moving from laptop to cluster is close to just changing one parameter. Nice!
If you want to run Airflow server locally from inside container, and have it running as non-root (you should!) and you bind docker.sock from host to the container, you must create docker group in the container that mirrors docker group on your host and then add e.g. airflow user to this group. That does the trick...
So just running DockerOperator is not black magic. However, if your containers need to exchange data it starts to be a little bit more tricky.
Xcom push/pull
The push part is simple and documented. Just set xcom_push parameter to True and last line of container stdout will be published by Airflow as it was pushed programatically. It looks that this is natural Airflow way.
Pull is not that obvious. Perhaps because it's not documented. You can't read stdin or something. The way to do this involves joining two dots:
- command parameter can be Jinja2-templated
- one of the macros allows you to do xcom_pull
FROM debian
ENTRYPOINT echo '{"i_am_pushing": "json"}'
Simple enough. Now pulling container:
FROM debian
COPY ./entrypoint /
ENTRYPOINT ["/entrypoint"]
Entrypoint script can be whatever and will get the JSON as $1. Crucial (and also easy to miss) thing that is required for it to work is that ENTRYPOINT must use exec form. Yes, there are two forms of ENTRYPOINT. If you use the one without array, then parameters will not be passed to the container!
Finally, you can glue things together and you're done. The ti macro allows us to get data pushed by other task. ti stands for task_instance.
dag = DAG('docker', default_args=default_args, schedule_interval=timedelta(1))
t1 = DockerOperator(task_id='docker_1', dag=dag, image='docker_1', xcom_push=True)
t2 = DockerOperator(task_id='docker_2', dag=dag, image='docker_2', command='{{ ti.xcom_pull(task_ids="docker_1") }}')
t2.set_upstream(t1)
Conclusion
Docker can be used in Airflow along with Xcom push/pull functionality. It isn't very convenient and is not well documented I would say, but at least it works.
If time permits I'm going to create PR for documenting pull op. I don't know how it works out, because in Airflow GH project there are 237 PRs now and some of them are there since May 2016!
* the funny thing is that we considered Jenkins too! ;-)