3 Manieren om Airflow op Kubernetes te draaien 

Wij benutten graag de kracht van Airflow met Kubernetes. Horizontaal schaalbare dynamische data pipelines, wie wil dat nou niet? Als je wilt beginnen met het draaien van Airflow op Kubernetes, je workloads wilt containeriseren en het meeste uit beide platforms wilt halen, dan laat deze post je zien hoe je dat op drie verschillende manieren kunt doen. 

airflow kubernetes

3 Manieren om Airflow op Kubernetes te draaien 

  • Gebruik van de KubernetesPodOperator  
    De KubernetesPodOperator stelt je in staat om gecontaineriseerde workloads vanuit je DAG's te draaien.
  • Gebruik van de KubernetesExecutor  
    De KubernetesExecutor zorgt ervoor dat elke taak in je DAG als een pod op Kubernetes draait.
  • Gebruik van KEDA met Airflow  
    KEDA maakt elastische schaling van en naar nul mogelijk voor Airflow CeleryWorkers op Kubernetes. 

Code voorbeelden en configuratie vind je hier.

Als je liever de bijbehorende video bij deze post bekijkt, ik heb eerder een Show 'n Tell webinar over dit onderwerp gedaan dat je hier kunt terugkijken.  

Enige voorkennis van Airflow en Kubernetes is vereist. 

Gebruik van de KubernetesPodOperator 

De KubernetesPodOperator is een ingebouwde Airflow operator die je kunt gebruiken als bouwsteen binnen je DAG's.

Een DAG staat voor Acyclic Directed Graph en is in principe je pipelinedefinitie/workflow geschreven in pure Python. In je DAG specificeer je je pipelinestappen als taken met behulp van operators en definieer je hun flow (upstream en downstream afhankelijkheden). Deze DAG wordt dan gepland door de Airflow scheduler en uitgevoerd door de Executor.

Er zijn veel verschillende operators beschikbaar. Van operators die je eigen Python code kunnen uitvoeren tot MySQL, Azure, Spark, cloud storage of serverloze operators.

De KubernetesPodOperator stelt je in staat om gecontaineriseerde workloads als pods op Kubernetes vanuit je DAG te draaien.

Het is perfect voor wanneer je bestaande containers in je ecosysteem wilt gebruiken/hergebruiken, maar ze wilt plannen vanuit Airflow en opnemen in je workflow. Dit is verreweg de gemakkelijkste manier om te beginnen met het draaien van container workloads vanuit Airflow op Kubernetes. 

De setup

We willen ervoor zorgen dat we Airflow op ons cluster hebben draaien. Hiervoor gebruiken we een eenvoudige deployment bestaande uit de Airflow webserver, scheduler/executor en een aparte PostgreSQL database deployment voor de Airflow metadata DB. 

airflow.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
 name: airflow
   namespace: airflow-k8spodoperator
spec:
 replicas: 1
 selector:
   matchLabels:
     name: airflow
 template:
   metadata:
     labels:
       name: airflow
   spec:
     automountServiceAccountToken: true
     containers:
     - args:
       - webserver
       - -p
       - "8000"
       env:
       - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
         value: postgresql://postgres:password@airflow-db:5432/postgres
       - name: AIRFLOW__CORE__EXECUTOR
         value: LocalExecutor
       image: apache/airflow:1.10.12
       imagePullPolicy: Always
       volumeMounts:
       - mountPath: /opt/airflow/logs/
         mountPropagation: None
         name: airflow-logs
     - args:
       - scheduler
       env:
       - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
         value: postgresql://postgres:password@airflow-db:5432/postgres
       - name: AIRFLOW__CORE__EXECUTOR
         value: LocalExecutor
       image: eu.gcr.io/fullstaq-st-tim/st-airflow:latest
       imagePullPolicy: Always
       name: airflow-scheduler
       volumeMounts:
       - mountPath: /opt/airflow/logs/
         mountPropagation: None
         name: airflow-logs
     initContainers:
     - args:
       - initdb
       env:
       - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
         value: postgresql://postgres:password@airflow-db:5432/postgres
       - name: AIRFLOW__CORE__EXECUTOR
         value: LocalExecutor
       image: apache/airflow:1.10.12
       imagePullPolicy: Always
 

Ik heb hier wat configuratie weggelaten zoals livenessProbe en readinessProbe om het iets beknopter te maken, maar het volledige codevoorbeeld is te vinden in de repository.

Airflow heeft op zijn minst een webserver en scheduler component nodig. De init container is verantwoordelijk voor het bootstrappen van de database. 

airflow-db.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
 name: airflow-db
 namespace: airflow-k8spodoperator
spec:
 replicas: 1
 selector:
   matchLabels:
     name: airflow-db
 template:
   metadata:
     labels:
       name: airflow-db
   spec:
     containers:
     - env:
       - name: POSTGRES_PASSWORD
         value: password
       image: postgres:9.6
       imagePullPolicy: IfNotPresent
       name: airflow-db
       volumeMounts:
       - mountPath: /var/lib/postgresql/data
         mountPropagation: None
         name: postgresql-data
     restartPolicy: Always
     schedulerName: default-scheduler
     terminationGracePeriodSeconds: 30
     volumes:
     - emptyDir: {}
       name: postgresql-data


Een eenvoudige PostgreSQL database setup is vereist, met interne service om Airflow in staat te stellen er verbinding mee te maken: 

airflow-db-svc.yaml

apiVersion: v1
kind: Service
metadata:
 name: airflow-db
 namespace: airflow-k8spodoperator
spec:
 clusterIP: None
 ports:
 - port: 5432
   protocol: TCP
   targetPort: 5432
 selector:
   name: airflow-db
 sessionAffinity: None
 type: ClusterIP
status:
 loadBalancer: {}


We hebben ook een service account en service account token nodig in ons cluster om ervoor te zorgen dat de Operator zich kan authenticeren en toestemming heeft om Pods uit te voeren in de namespace die we opgeven. 

serviceaccount.yaml

apiVersion: v1
kind: ServiceAccount
metadata:
 name: airflow-k8spodoperator
 namespace: airflow-k8spodoperator


Wanneer aan deze twee voorwaarden is voldaan, kunnen we beginnen met het uitvoeren van gecontaineriseerde workloads op Kubernetes vanuit onze DAG's met behulp van de KubernetesPodOperator.

Als voorbeeld: deze DAG hieronder: 

import logging
from datetime import datetime, timedelta
from pathlib import Path

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

log = logging.getLogger(__name__)

dag = DAG(
   "example_using_k8s_pod_operator",
   schedule_interval="0 1 * * *",
   catchup=False,
   default_args={
       "owner": "admin",
       "depends_on_past": False,
       "start_date": datetime(2020, 8, 7),
       "email_on_failure": False,
       "email_on_retry": False,
       "retries": 2,
       "retry_delay": timedelta(seconds=30),
       "sla": timedelta(hours=23),
   },
)

with dag:
   task_1 = KubernetesPodOperator(
       image="ubuntu:16.04",
       namespace="airflow-k8spodoperator",
       cmds=["bash", "-cx"],
       arguments=["echo", "10"],
       labels={"foo": "bar"},
       name="test-using-k8spodoperator-task-1",
       task_id="task-1-echo",
       is_delete_operator_pod=False,
       in_cluster=True,
   )
   task_2 = KubernetesPodOperator(
       image="ubuntu:16.04",
       namespace="airflow-k8spodoperator",
       cmds=["sleep"],
       arguments=["300"],
       labels={"foo": "bar"},
       name="test-using-k8spodoperator-task-2",
       task_id="task-2-sleep",
       is_delete_operator_pod=False,
       in_cluster=True,
   )

task_1 >> task_2
 

Hier gebruiken we de KubernetesPodOperator om een container als pod vanuit onze DAG te draaien.

De KubernetesPodOperator heeft enkele vereiste parameters zoals image, namespace, cmds, name en task_id, maar de volledige Kubernetes pod API wordt ondersteund. We specificeren ook om te zoeken naar de in_cluster authenticatieconfiguratie (die onze service account token gebruikt) en om voltooide pods te behouden met is_delete_operator_pod.

Aangezien de volledige pod k8s API wordt ondersteund, kunnen we alles wat onze pod nodig heeft als argumenten aan de Operator meegeven: 

secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
secret_env  = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
secret_all_keys  = Secret('env', None, 'airflow-secrets-2')
volume_mount = VolumeMount('test-volume',
                           mount_path='/root/mount_file',
                           sub_path=None,
                           read_only=True)
port = Port('http', 80)
configmaps = ['test-configmap-1', 'test-configmap-2']

volume_config= {
   'persistentVolumeClaim':
     {
       'claimName': 'test-volume'
     }
   }
volume = Volume(name='test-volume', configs=volume_config)

k = KubernetesPodOperator(namespace='default',
                         image="ubuntu:16.04",
                         cmds=["bash", "-cx"],
                         arguments=["echo", "10"],
                         labels={"foo": "bar"},
                         secrets=[secret_file, secret_env, secret_all_keys],
                         ports=[port]
                         volumes=[volume],
                         volume_mounts=[volume_mount]
                         name="test",
                         task_id="task",
                         affinity=affinity,
                         is_delete_operator_pod=True,
                         hostnetwork=False,
                         tolerations=tolerations,
                         configmaps=configmaps
                         )

Wanneer deze DAG gepland staat om uitgevoerd te worden en uitgevoerd wordt door de Airflow executor, zal onze operator pods maken met de naamparameters om de podnaam samen te stellen: 

NAME                                        READY   STATUS      RESTARTS   AGE
airflow-56f875bb-dk6vq                      2/2     Running     0          46h
airflow-db-57548fc4d-qvbgf                  1/1     Running     0          47h
test-using-k8spodoperator-task-1-5a055bae   0/1     Completed   0          8s
test-using-k8spodoperator-task-1-60caa2f3   0/1     Completed   0          8s

De KubernetesPodOperator is verreweg de gemakkelijkste manier om te beginnen met het draaien van gecontaineriseerde workloads vanuit Airflow op Kubernetes. Setup en beheer zijn minimaal en omdat je parameters en argumenten per workload kunt aanpassen, is er een hoge mate van flexibiliteit. De KubernetesPodOperator is nuttig wanneer je al workloads als containers hebt, misschien heb je wat aangepaste Java- of Go-code die je in je pipeline wilt opnemen of je wilt beginnen met het overdragen van container workloads naar Airflow.

Het nadeel van deze aanpak is dat voor sterk aangepaste containers met veel afhankelijkheden deze moeten worden vertaald naar argumenten die aan de Operator worden doorgegeven. Dit kan wat onderzoek, trial en error vereisen om goed te krijgen. 

 

Als je elke taak in je DAG op een native manier als Kubernetes pod wilt uitvoeren, ben je beter af met: 

Gebruik van de KubernetesExecutor 

De KubernetesExecutor is een abstractielaag die elke taak in je DAG als een pod op je Kubernetesinfrastructuur laat draaien. Je configureert deze executor als onderdeel van je Airflow Deployment net zoals je elke andere executor zou doen, zij het met enkele aanvullende configuratieopties. 

De setup

Voor dit voorbeeld gebruiken we weer de eenvoudige Airflow deployment die we gebruikten voor de KubernetesPodOperator met wat aanvullende configuratie die vereist is door de KubernetesExecutor. 

airflow.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
 name: airflow
 namespace: airflow-k8sexecutor
spec:
 replicas: 1
 selector:
   matchLabels:
     name: airflow
 template:
   metadata:
     labels:
       name: airflow
   spec:
     automountServiceAccountToken: true
     containers:
     - args:
       - webserver
       - -p
       - "8000"
       env:
       - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
         value: postgresql://postgres:password@airflow-db:5432/postgres
       - name: AIRFLOW__CORE__EXECUTOR
         value: KubernetesExecutor
       - name: AIRFLOW__KUBERNETES__NAMESPACE
         value: airflow-k8sexecutor
       - name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
         value: default
       - name: AIRFLOW__KUBERNETES__IN_CLUSTER
         value: 'true'
       - name: AIRFLOW__KUBERNETES__DAG's_IN_IMAGE
         value: 'true'
       image: apache/airflow:1.10.12
       imagePullPolicy: Always
       volumeMounts:
       - mountPath: /opt/airflow/logs/
         mountPropagation: None
         name: airflow-logs
     - args:
       - scheduler
       env:
       - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
         value: postgresql://postgres:password@airflow-db:5432/postgres
       - name: AIRFLOW__CORE__EXECUTOR
         value: KubernetesExecutor
       - name: AIRFLOW__KUBERNETES__NAMESPACE
         value: airflow-k8sexecutor
       - name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
         value: default
       - name: AIRFLOW__KUBERNETES__IN_CLUSTER
         value: 'true'
       - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
         value: eu.gcr.io/fullstaq-st-tim/st-airflow-executor
       - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
         value: latest
       - name: AIRFLOW__KUBERNETES__DAG's_IN_IMAGE
         value: 'true'
       - name: AIRFLOW__KUBERNETES__RUN_AS_USER
         value: '50000'
       image: apache/airflow:1.10.12
       imagePullPolicy: Always
       name: airflow-scheduler
       volumeMounts:
       - mountPath: /opt/airflow/logs/
         mountPropagation: None
         name: airflow-logs
     initContainers:
     - args:
       - initdb
       env:
       - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
         value: postgresql://postgres:password@airflow-db:5432/postgres
       - name: AIRFLOW__CORE__EXECUTOR
         value: KubernetesExecutor
       - name: AIRFLOW__KUBERNETES__NAMESPACE
         value: airflow-k8sexecutor
       - name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
         value: default
       - name: AIRFLOW__KUBERNETES__IN_CLUSTER
         value: 'true'
       - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
         value: apache/airflow
       - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
         value: 1.10.10
       - name: AIRFLOW__KUBERNETES__DAG's_IN_IMAGE
         value: 'true'
       image: apache/airflow:1.10.12
       imagePullPolicy: Always
 

Opnieuw is sommige configuratie weggelaten omwille van de beknoptheid. Om de KubernetesExecutor in te schakelen, configureren we het in onze deployment met de volgende extra configuratieparameters:

We moeten instellen:

De executor die we met Airflow willen gebruiken op KubernetesExecutor. 

       - name: AIRFLOW__CORE__EXECUTOR
         value: KubernetesExecutor


De namespace waar we onze worker pods willen draaien. 

       - name: AIRFLOW__KUBERNETES__NAMESPACE
         value: airflow-k8sexecutor


De Kubernetes service account naam om te gebruiken voor onze workers 

       - name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
         value: default


Als onze Kubernetes authenticatieconfiguratie aanwezig is in het cluster. 

       - name: AIRFLOW__KUBERNETES__IN_CLUSTER
         value: 'true'


Het container-register en container-imagenaam die we voor onze pod worker containers willen gebruiken. En een aparte omgevingsvariabele voor de tag die we willen gebruiken. Omdat we mogelijk elke aangeleverde Airflow operator als een taak in een Kubernetes pod gaan draaien, moeten we ervoor zorgen dat aan de afhankelijkheden voor deze operators wordt voldaan in ons worker image. Daarom is het een goed idee om het Airflow docker image als basis te gebruiken. 

       - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
         value: apache/airflow:1.10.12
       - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
         value: latest


Vervolgens moeten we opgeven hoe Airflow en Kubernetes toegang hebben tot onze dags. We kunnen git-sync, een gedeeld volume of de DAG's in onze Airflow images inbakken gebruiken. In dit geval doen we het laatste en vertellen we Airflow om te zoeken naar dags in ons Image. Als je deze route kiest, moet je je eigen Airflow image maken met de airflow basis en een map met je DAG's toevoegen. 

       - name: AIRFLOW__KUBERNETES__DAG's_IN_IMAGE
         value: 'true'


Ten slotte vertellen we Airflow om de 'airflow' gebruiker te gebruiken om als te draaien. Deze is noodzakelijk, zonder deze kan de KubernetesExecutor je workloads niet uitvoeren. 

       - name: AIRFLOW__KUBERNETES__RUN_AS_USER
         value: '50000'


Met deze configuratie zetten we Airflow op om de KubernetesExecutor te gebruiken. Nu naar onze DAG: 

import logging
import os
from datetime import datetime, timedelta
from pathlib import Path

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

log = logging.getLogger(__name__)

dag = DAG(
   "example_using_k8s_executor",
   schedule_interval="0 1 * * *",
   catchup=False,
   default_args={
       "owner": "airflow",
       "depends_on_past": False,
       "start_date": datetime(2020, 8, 7),
       "email_on_failure": False,
       "email_on_retry": False,
       "retries": 2,
       "retry_delay": timedelta(seconds=30),
       "sla": timedelta(hours=23),
   },
)

def use_airflow_binary():
   rc = os.system("airflow -h")
   assert rc == 0

with dag:
   task_1 = PythonOperator(
       task_id="task-1",
       python_callable=use_airflow_binary,
   )
   task_2 = PythonOperator(
       task_id="task-2",
       python_callable=use_airflow_binary,
   )

task_1 >> task_2


Dit voorbeeld toont een eenvoudige DAG met twee taken die de PythonOperator gebruiken. Wat er gebeurt wanneer we het uitvoeren, is dat de Kubernetes executor de Airflow-taakwachtrij bekijkt en elke taak in die wachtrij oppikt en er een KubernetesPod van maakt. De executor houdt vervolgens de pod in de gaten voor de status en synchroniseert de status terug naar de scheduler, die dan weet of de taak opnieuw moet worden gepland, opnieuw moet worden geprobeerd, of dat de volgende downstream taak kan worden gepland.

Wanneer we deze DAG draaien, wordt onze taak uitgevoerd in onze worker pod door de KubernetesExecutor en opgeruimd na succes of falen. Als je niet wilt dat je worker pods worden opgeruimd, kun je de extra ENV var aan je Airflow-configuratie toevoegen: AIRFLOW__KUBERNETS__IS_DELETE_WORKER_POD en deze op false zetten. 

NAME                                                            READY   STATUS              RESTARTS   AGE
airflow-76c4f47d78-hw52g                                        2/2     Running             0          5d17h
airflow-db-6944c99c7c-fsmnz                                     1/1     Running             0          5d17h
exampleusingk8sexecutortask1-f1f0316e08bf49b3a4f97f10a12db542   0/1     ContainerCreating   0          2s
NAME                                                            READY   STATUS    RESTARTS   AGE
airflow-76c4f47d78-hw52g                                        2/2     Running   0          5d17h
airflow-db-6944c99c7c-fsmnz                                     1/1     Running   0          5d17h
exampleusingk8sexecutortask1-f1f0316e08bf49b3a4f97f10a12db542   1/1     Running   0          5s


De KubernetesExecutor is geweldig omdat je de natuurlijke elasticiteit van Kubernetes krijgt samen met alle goede dingen van Airflow. Het is niet langer noodzakelijk om je eigen containers te bouwen met aangepaste workloads aangezien elke taak in je DAG als een pod zal worden uitgevoerd. Er is wat extra initiële configuratie nodig bij het opzetten van Airflow, maar de setup blijft redelijk gemakkelijk te beheren.

Houd er wel rekening mee dat wanneer je veel taken parallel gaat plannen, dit zeer snel duur kan worden voor je clusterresources. Houd hier altijd een oogje op of stel limieten in door Airflow Pools te gebruiken (deze kunnen worden ingesteld via ENV vars of via de UI).

Ook is er wat opstart- en shutdown overhead elke keer dat een taak als een Pod wordt opgestart. Maar afhankelijk van je verwachtingen of vereisten is dit verwaarloosbaar. 

 

En tot slot, veruit mijn favoriet: 

Gebruik van KEDA met Airflow 

Hier gebruiken we een CNCF Sandbox project dat oorspronkelijk is ontwikkeld door het Microsoft Azure Functions team: KEDA

KEDA staat voor Kubernetes Event Driven Autoscaler. Het stelt ons in staat om deployments op Kubernetes te schalen op basis van externe events/metrics, waardoor schaling naar en van nul mogelijk wordt. Dit samen met Airflow's CeleryExecutor brengt ons het beste van beide werelden. Kubernetes-pedigree bij het draaien van elastische scaling workloads en CerelyWorkers voor lage overhead continue uitvoering van Airflow DAG-taken.

KEDA werkt door een nieuwe custom resource definition (CRD) te gebruiken genaamd ScaledObject. Het scaled object is het werkpaard voor het omhoog en omlaag schalen van deployments. 

scaledobject.yaml

apiVersion: keda.k8s.io/v1alpha1
kind: ScaledObject
metadata:
 name: airflow-worker
spec:
 scaleTargetRef:
   deploymentName: airflow-worker
 pollingInterval: 10   # Optional. Default: 30 seconds
 cooldownPeriod: 30    # Optional. Default: 300 seconds
 maxReplicaCount: 10   # Optional. Default: 100
 triggers:
   - type: postgresql
     metadata:
       connection: AIRFLOW_CONN_AIRFLOW_DB
       query: "SELECT ceil(COUNT(*)::decimal / 4) FROM task_instance WHERE state='running' OR state='queued'"
       targetQueryValue: "1"


Hoe het werkt is dat het scaled object een deployment target heeft om te schalen in het geval dat aan een trigger wordt voldaan. De trigger wordt een Scaler genoemd en is in dit geval een query die wordt uitgevoerd op de Airflow PostgreSQL metadata database. De query wordt uitgevoerd op interval en het resultaat van de query bepaalt of de deployment moet worden opgeschaald of afgeschaald. In dit geval controleren we het aantal Airflow taakinstanties die ofwel een running of queued status hebben. 

Als je wilt beginnen met KEDA voor het schalen van je deployments, zijn er al meerdere scalers beschikbaar, zoals een scaler voor  Azure Blob Storage,  een voor het schalen op basis van  RabbitMQ messages, een MySQL scaler, of een scaler die schaalt op basis van Prometheus metrics.

Voor de volledige lijst, bekijk de documentatie op  keda.sh

Voor Airflow werkt KEDA in combinatie met de CeleryExecutor. Celery is een implementatie van een taakwachtrij in Python en samen met KEDA stelt het Airflow in staat om dynamisch taken in celery workers parallel uit te voeren. Het schaalt CeleryWorkers op en af naargelang nodig op basis van wachtende of lopende taken. 

Dit heeft het voordeel dat de CeleryWorkers over het algemeen minder overhead hebben bij het sequentieel uitvoeren van taken, aangezien er geen opstart is zoals bij de KubernetesExecutor. 

Je krijgt dus de elasticiteit van Kubernetes, samen met alle voordelen die Celery biedt op het gebied van prestaties.

De setup

Voor deze setup gebruiken we de HELM chart ontwikkeld door Astronomer om Airflow + KEDA op Kubernetes te deployen.

Ik verwijs naar de officiële setup documentatie van Astronomer Airflow, deze is hier te vinden.  

helm repo add kedacore https://kedacore.github.io/charts
helm repo add astronomer https://helm.astronomer.io

helm repo update

kubectl create namespace keda
helm install keda \
   --namespace keda kedacore/keda

kubectl create namespace airflow

helm install airflow \
   --set executor=CeleryExecutor \
   --set workers.keda.enabled=true \
   --set workers.persistence.enabled=false \
   --namespace airflow \
   astronomer/airflow


Nadat we Airflow met KEDA hebben gedeployed met de hierboven vermelde stappen, hebben we nu twee namespaces. 

namespace=airflow

NAME                                 READY   STATUS    RESTARTS   AGE
airflow-flower-5966c99975-7vh9r      1/1     Running   0          93s
airflow-postgresql-0                 1/1     Running   0          92s
airflow-redis-0                      1/1     Running   0          92s
airflow-scheduler-7f64b6cd67-lr95p   2/2     Running   0          93s
airflow-statsd-f7647597-9xdzv        1/1     Running   0          93s
airflow-webserver-74b794d767-rzp49   1/1     Running   0          93s
 

Omdat we Celery als onze Executor gebruiken, hebben we enkele extra componenten in onze Airflow deployment. Namelijk flower, de beheer/monitoring UI voor Celery, redis voor het bemiddelen van berichten naar onze CeleryWorkers, en een statsd server die is opgenomen in de Helm chart voor het verzamelen van Airflow metrics. 

namespace=keda

NAME                                               READY   STATUS    RESTARTS   AGE
keda-operator-5d44c49879-wpknv                     1/1     Running   0          3m16s
keda-operator-metrics-apiserver-86d8bbc4df-884cr   1/1     Running   0          2m49s
 

In onze keda namespace hebben we de keda-operator die verantwoordelijk is voor het monitoren van onze trigger (de PostgreSQL Airflow DB) en het dienovereenkomstig schalen van onze deployment, en de keda-operator-metrics-apiserver die verantwoordelijk is voor het serveren van metrics aan de HorizontalPodAutoscaler. De native Kubernetes HorizontalPodAutoscaler wordt gebruikt om onze target CeleryWorker deployment uit te schalen.

KEDA maakt gebruik van deze reeds bestaande Autoscaler om onze deployment uit te schalen, waardoor KEDA een zeer lichte component is. Behoorlijk cool.

Neem nu deze DAG:

Hier gaan we 20 taken genereren. De DAG zal continu draaien en nieuwe taken blijven genereren voor onze CeleryWorkers om te verwerken. 

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta


def my_custom_function(ts,**kwargs):
   """
   This can be any python code you want and is called from the python operator. The code is not executed until
   the task is run by the airflow scheduler.
   """
   print(f"I am task number {kwargs['task_number']}. This DAG Run execution date is {ts} and the current time is {datetime.now()}")
   print('Here is the full DAG Run context. It is available because provide_context=True')
   print(kwargs)


default_args = {
   'owner': 'airflow',
   'depends_on_past': False,
   'email_on_failure': False,
   'email_on_retry': False,
   'retries': 1,
   'retry_delay': timedelta(minutes=5)
}

with DAG('example_dag_generated',
        start_date=datetime(2019, 1, 1),
        max_active_runs=3,
        schedule_interval=timedelta(minutes=30),
        default_args=default_args,
        ) as dag:

   t0 = DummyOperator(
       task_id='start'
   )

   # generate tasks with a loop. task_id must be unique
   for task in range(20):
       tn = PythonOperator(
           task_id=f'python_print_date_{task}',
           python_callable=my_custom_function,
           op_kwargs={'task_number': task},
           provide_context=True
       )


       t0 >> tn
 

Wanneer we deze DAG uitvoeren, gebeurt het volgende:

Airflow begint taken te plannen, die in de queued-status terechtkomen. Onze KEDA-operator monitort op running en queued taken en wordt getriggerd vanwege de taken in de wachtrij. 

kubectl -f logs -n keda keda-operator…

{"level":"info","ts":1603266764.262705,"logger":"scalehandler","msg":"Successfully updated deployment","ScaledObject.Namespace":"airflow","ScaledObject.Name":"airflow-worker","ScaledObject.ScaleType":"deployment","Deployment.Namespace":"airflow","Deployment.Name":"airflow-worker","Original Replicas Count":0,"New Replicas Count":1}
{"level":"info","ts":1603266771.0487137,"logger":"controller_scaledobject","msg":"Reconciling ScaledObject","Request.Namespace":"airflow","Request.Name":"airflow-worker"}
{"level":"info","ts":1603266771.04885,"logger":"controller_scaledobject","msg":"Detected ScaleType = Deployment","Request.Namespace":"airflow","Request.Name":"airflow-worker"}

Omdat hij getriggerd wordt, meldt hij dat het deployment target: airflow-worker moet worden opgeschaald. 

airflow-worker-dc75d6597-xtlhq       0/1     PodInitializing   0          22s


De deployment wordt opgeschaald en Celery zal berichten naar deze nieuwe CeleryWorker sturen. De CeleryWorker begint met het verwerken van taken.

Dit proces loopt continu en KEDA zorgt ervoor dat wanneer er meer taken worden gepland, meer workers worden opgestart. 

airflow-worker-dc75d6597-8kvg5       0/1     Init:0/1          0          6s
airflow-worker-dc75d6597-glcd8       0/1     Init:0/1          0          6s
airflow-worker-dc75d6597-nxlwc       0/1     PodInitializing   0          6s
airflow-worker-dc75d6597-xtlhq       1/1     Running           0          81s
---
airflow-worker-dc75d6597-8kvg5       1/1     Running   0          84s
airflow-worker-dc75d6597-glcd8       1/1     Running   0          84s
airflow-worker-dc75d6597-nxlwc       1/1     Running   0          84s
airflow-worker-dc75d6597-xtlhq       1/1     Running   0          2m39s


Zodra er geen taken meer te verwerken zijn, zal KEDA onze worker deployment afschalen naar nul. 

{"level":"info","ts":1603274700.8434458,"logger":"scalehandler","msg":"Successfully scaled deployment to 0 replicas","ScaledObject.Namespace":"airflow","ScaledObject.Name":"airflow-worker","ScaledObject.ScaleType":"deployment","Deployment.Namespace":"airflow","Deployment.Name":"airflow-worker"}
 

Het draaien van Airflow met KEDA brengt het beste van beide werelden samen. De elasticiteit van Kubernetes ontsloten door KEDA in combinatie met de snel-opvolgende, lage overhead geoptimaliseerde CeleryExecutor en workers.

De setup is eenvoudig met behulp van de Helm chart van Astronomer, hoewel de toegevoegde complexiteit van extra componenten een intensievere debugging-flow zal vereisen wanneer er ergens onderweg iets misgaat.

Al met al ben ik een grote fan van het draaien van Airflow samen met KEDA en kijk ik uit naar een mooie toekomst voor Airflow op Kubernetes in combinatie met KEDA.