
3 Manieren om Airflow op Kubernetes te draaien
Wij benutten graag de kracht van Airflow met Kubernetes. Horizontaal schaalbare dynamische data pipelines, wie wil dat nou niet? Als je wilt beginnen met het draaien van Airflow op Kubernetes, je workloads wilt containeriseren en het meeste uit beide platforms wilt halen, dan laat deze post je zien hoe je dat op drie verschillende manieren kunt doen.

3 Manieren om Airflow op Kubernetes te draaien
- Gebruik van de KubernetesPodOperator
De KubernetesPodOperator stelt je in staat om gecontaineriseerde workloads vanuit je DAG's te draaien. - Gebruik van de KubernetesExecutor
De KubernetesExecutor zorgt ervoor dat elke taak in je DAG als een pod op Kubernetes draait. - Gebruik van KEDA met Airflow
KEDA maakt elastische schaling van en naar nul mogelijk voor Airflow CeleryWorkers op Kubernetes.
Enige voorkennis van Airflow en Kubernetes is vereist.
Gebruik van de KubernetesPodOperator
De KubernetesPodOperator is een ingebouwde Airflow operator die je kunt gebruiken als bouwsteen binnen je DAG's.
Een DAG staat voor Acyclic Directed Graph en is in principe je pipelinedefinitie/workflow geschreven in pure Python. In je DAG specificeer je je pipelinestappen als taken met behulp van operators en definieer je hun flow (upstream en downstream afhankelijkheden). Deze DAG wordt dan gepland door de Airflow scheduler en uitgevoerd door de Executor.
Er zijn veel verschillende operators beschikbaar. Van operators die je eigen Python code kunnen uitvoeren tot MySQL, Azure, Spark, cloud storage of serverloze operators.
De KubernetesPodOperator stelt je in staat om gecontaineriseerde workloads als pods op Kubernetes vanuit je DAG te draaien.
Het is perfect voor wanneer je bestaande containers in je ecosysteem wilt gebruiken/hergebruiken, maar ze wilt plannen vanuit Airflow en opnemen in je workflow. Dit is verreweg de gemakkelijkste manier om te beginnen met het draaien van container workloads vanuit Airflow op Kubernetes.
De setup
We willen ervoor zorgen dat we Airflow op ons cluster hebben draaien. Hiervoor gebruiken we een eenvoudige deployment bestaande uit de Airflow webserver, scheduler/executor en een aparte PostgreSQL database deployment voor de Airflow metadata DB.
airflow.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow
namespace: airflow-k8spodoperator
spec:
replicas: 1
selector:
matchLabels:
name: airflow
template:
metadata:
labels:
name: airflow
spec:
automountServiceAccountToken: true
containers:
- args:
- webserver
- -p
- "8000"
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://postgres:password@airflow-db:5432/postgres
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
image: apache/airflow:1.10.12
imagePullPolicy: Always
volumeMounts:
- mountPath: /opt/airflow/logs/
mountPropagation: None
name: airflow-logs
- args:
- scheduler
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://postgres:password@airflow-db:5432/postgres
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
image: eu.gcr.io/fullstaq-st-tim/st-airflow:latest
imagePullPolicy: Always
name: airflow-scheduler
volumeMounts:
- mountPath: /opt/airflow/logs/
mountPropagation: None
name: airflow-logs
initContainers:
- args:
- initdb
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://postgres:password@airflow-db:5432/postgres
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
image: apache/airflow:1.10.12
imagePullPolicy: Always
Ik heb hier wat configuratie weggelaten zoals livenessProbe en readinessProbe om het iets beknopter te maken, maar het volledige codevoorbeeld is te vinden in de repository.
Airflow heeft op zijn minst een webserver en scheduler component nodig. De init container is verantwoordelijk voor het bootstrappen van de database.
airflow-db.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow-db
namespace: airflow-k8spodoperator
spec:
replicas: 1
selector:
matchLabels:
name: airflow-db
template:
metadata:
labels:
name: airflow-db
spec:
containers:
- env:
- name: POSTGRES_PASSWORD
value: password
image: postgres:9.6
imagePullPolicy: IfNotPresent
name: airflow-db
volumeMounts:
- mountPath: /var/lib/postgresql/data
mountPropagation: None
name: postgresql-data
restartPolicy: Always
schedulerName: default-scheduler
terminationGracePeriodSeconds: 30
volumes:
- emptyDir: {}
name: postgresql-data
Een eenvoudige PostgreSQL database setup is vereist, met interne service om Airflow in staat te stellen er verbinding mee te maken:
airflow-db-svc.yaml
apiVersion: v1
kind: Service
metadata:
name: airflow-db
namespace: airflow-k8spodoperator
spec:
clusterIP: None
ports:
- port: 5432
protocol: TCP
targetPort: 5432
selector:
name: airflow-db
sessionAffinity: None
type: ClusterIP
status:
loadBalancer: {}
We hebben ook een service account en service account token nodig in ons cluster om ervoor te zorgen dat de Operator zich kan authenticeren en toestemming heeft om Pods uit te voeren in de namespace die we opgeven.
serviceaccount.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: airflow-k8spodoperator
namespace: airflow-k8spodoperator
Wanneer aan deze twee voorwaarden is voldaan, kunnen we beginnen met het uitvoeren van gecontaineriseerde workloads op Kubernetes vanuit onze DAG's met behulp van de KubernetesPodOperator.
Als voorbeeld: deze DAG hieronder:
import logging
from datetime import datetime, timedelta
from pathlib import Path
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
log = logging.getLogger(__name__)
dag = DAG(
"example_using_k8s_pod_operator",
schedule_interval="0 1 * * *",
catchup=False,
default_args={
"owner": "admin",
"depends_on_past": False,
"start_date": datetime(2020, 8, 7),
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(seconds=30),
"sla": timedelta(hours=23),
},
)
with dag:
task_1 = KubernetesPodOperator(
image="ubuntu:16.04",
namespace="airflow-k8spodoperator",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
name="test-using-k8spodoperator-task-1",
task_id="task-1-echo",
is_delete_operator_pod=False,
in_cluster=True,
)
task_2 = KubernetesPodOperator(
image="ubuntu:16.04",
namespace="airflow-k8spodoperator",
cmds=["sleep"],
arguments=["300"],
labels={"foo": "bar"},
name="test-using-k8spodoperator-task-2",
task_id="task-2-sleep",
is_delete_operator_pod=False,
in_cluster=True,
)
task_1 >> task_2
Hier gebruiken we de KubernetesPodOperator om een container als pod vanuit onze DAG te draaien.
De KubernetesPodOperator heeft enkele vereiste parameters zoals image, namespace, cmds, name en task_id, maar de volledige Kubernetes pod API wordt ondersteund. We specificeren ook om te zoeken naar de in_cluster authenticatieconfiguratie (die onze service account token gebruikt) en om voltooide pods te behouden met is_delete_operator_pod.
Aangezien de volledige pod k8s API wordt ondersteund, kunnen we alles wat onze pod nodig heeft als argumenten aan de Operator meegeven:
secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
secret_all_keys = Secret('env', None, 'airflow-secrets-2')
volume_mount = VolumeMount('test-volume',
mount_path='/root/mount_file',
sub_path=None,
read_only=True)
port = Port('http', 80)
configmaps = ['test-configmap-1', 'test-configmap-2']
volume_config= {
'persistentVolumeClaim':
{
'claimName': 'test-volume'
}
}
volume = Volume(name='test-volume', configs=volume_config)
k = KubernetesPodOperator(namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
secrets=[secret_file, secret_env, secret_all_keys],
ports=[port]
volumes=[volume],
volume_mounts=[volume_mount]
name="test",
task_id="task",
affinity=affinity,
is_delete_operator_pod=True,
hostnetwork=False,
tolerations=tolerations,
configmaps=configmaps
)
Wanneer deze DAG gepland staat om uitgevoerd te worden en uitgevoerd wordt door de Airflow executor, zal onze operator pods maken met de naamparameters om de podnaam samen te stellen:
NAME READY STATUS RESTARTS AGE
airflow-56f875bb-dk6vq 2/2 Running 0 46h
airflow-db-57548fc4d-qvbgf 1/1 Running 0 47h
test-using-k8spodoperator-task-1-5a055bae 0/1 Completed 0 8s
test-using-k8spodoperator-task-1-60caa2f3 0/1 Completed 0 8s
De KubernetesPodOperator is verreweg de gemakkelijkste manier om te beginnen met het draaien van gecontaineriseerde workloads vanuit Airflow op Kubernetes. Setup en beheer zijn minimaal en omdat je parameters en argumenten per workload kunt aanpassen, is er een hoge mate van flexibiliteit. De KubernetesPodOperator is nuttig wanneer je al workloads als containers hebt, misschien heb je wat aangepaste Java- of Go-code die je in je pipeline wilt opnemen of je wilt beginnen met het overdragen van container workloads naar Airflow.
Het nadeel van deze aanpak is dat voor sterk aangepaste containers met veel afhankelijkheden deze moeten worden vertaald naar argumenten die aan de Operator worden doorgegeven. Dit kan wat onderzoek, trial en error vereisen om goed te krijgen.
Als je elke taak in je DAG op een native manier als Kubernetes pod wilt uitvoeren, ben je beter af met:
Gebruik van de KubernetesExecutor
De KubernetesExecutor is een abstractielaag die elke taak in je DAG als een pod op je Kubernetesinfrastructuur laat draaien. Je configureert deze executor als onderdeel van je Airflow Deployment net zoals je elke andere executor zou doen, zij het met enkele aanvullende configuratieopties.
De setup
Voor dit voorbeeld gebruiken we weer de eenvoudige Airflow deployment die we gebruikten voor de KubernetesPodOperator met wat aanvullende configuratie die vereist is door de KubernetesExecutor.
airflow.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow
namespace: airflow-k8sexecutor
spec:
replicas: 1
selector:
matchLabels:
name: airflow
template:
metadata:
labels:
name: airflow
spec:
automountServiceAccountToken: true
containers:
- args:
- webserver
- -p
- "8000"
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://postgres:password@airflow-db:5432/postgres
- name: AIRFLOW__CORE__EXECUTOR
value: KubernetesExecutor
- name: AIRFLOW__KUBERNETES__NAMESPACE
value: airflow-k8sexecutor
- name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
value: default
- name: AIRFLOW__KUBERNETES__IN_CLUSTER
value: 'true'
- name: AIRFLOW__KUBERNETES__DAG's_IN_IMAGE
value: 'true'
image: apache/airflow:1.10.12
imagePullPolicy: Always
volumeMounts:
- mountPath: /opt/airflow/logs/
mountPropagation: None
name: airflow-logs
- args:
- scheduler
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://postgres:password@airflow-db:5432/postgres
- name: AIRFLOW__CORE__EXECUTOR
value: KubernetesExecutor
- name: AIRFLOW__KUBERNETES__NAMESPACE
value: airflow-k8sexecutor
- name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
value: default
- name: AIRFLOW__KUBERNETES__IN_CLUSTER
value: 'true'
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
value: eu.gcr.io/fullstaq-st-tim/st-airflow-executor
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
value: latest
- name: AIRFLOW__KUBERNETES__DAG's_IN_IMAGE
value: 'true'
- name: AIRFLOW__KUBERNETES__RUN_AS_USER
value: '50000'
image: apache/airflow:1.10.12
imagePullPolicy: Always
name: airflow-scheduler
volumeMounts:
- mountPath: /opt/airflow/logs/
mountPropagation: None
name: airflow-logs
initContainers:
- args:
- initdb
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://postgres:password@airflow-db:5432/postgres
- name: AIRFLOW__CORE__EXECUTOR
value: KubernetesExecutor
- name: AIRFLOW__KUBERNETES__NAMESPACE
value: airflow-k8sexecutor
- name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
value: default
- name: AIRFLOW__KUBERNETES__IN_CLUSTER
value: 'true'
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
value: apache/airflow
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
value: 1.10.10
- name: AIRFLOW__KUBERNETES__DAG's_IN_IMAGE
value: 'true'
image: apache/airflow:1.10.12
imagePullPolicy: Always
Opnieuw is sommige configuratie weggelaten omwille van de beknoptheid. Om de KubernetesExecutor in te schakelen, configureren we het in onze deployment met de volgende extra configuratieparameters:
We moeten instellen:
De executor die we met Airflow willen gebruiken op KubernetesExecutor.
- name: AIRFLOW__CORE__EXECUTOR
value: KubernetesExecutor
De namespace waar we onze worker pods willen draaien.
- name: AIRFLOW__KUBERNETES__NAMESPACE
value: airflow-k8sexecutor
De Kubernetes service account naam om te gebruiken voor onze workers
- name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
value: default
Als onze Kubernetes authenticatieconfiguratie aanwezig is in het cluster.
- name: AIRFLOW__KUBERNETES__IN_CLUSTER
value: 'true'
Het container-register en container-imagenaam die we voor onze pod worker containers willen gebruiken. En een aparte omgevingsvariabele voor de tag die we willen gebruiken. Omdat we mogelijk elke aangeleverde Airflow operator als een taak in een Kubernetes pod gaan draaien, moeten we ervoor zorgen dat aan de afhankelijkheden voor deze operators wordt voldaan in ons worker image. Daarom is het een goed idee om het Airflow docker image als basis te gebruiken.
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
value: apache/airflow:1.10.12
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
value: latest
Vervolgens moeten we opgeven hoe Airflow en Kubernetes toegang hebben tot onze dags. We kunnen git-sync, een gedeeld volume of de DAG's in onze Airflow images inbakken gebruiken. In dit geval doen we het laatste en vertellen we Airflow om te zoeken naar dags in ons Image. Als je deze route kiest, moet je je eigen Airflow image maken met de airflow basis en een map met je DAG's toevoegen.
- name: AIRFLOW__KUBERNETES__DAG's_IN_IMAGE
value: 'true'
Ten slotte vertellen we Airflow om de 'airflow' gebruiker te gebruiken om als te draaien. Deze is noodzakelijk, zonder deze kan de KubernetesExecutor je workloads niet uitvoeren.
- name: AIRFLOW__KUBERNETES__RUN_AS_USER
value: '50000'
Met deze configuratie zetten we Airflow op om de KubernetesExecutor te gebruiken. Nu naar onze DAG:
import logging
import os
from datetime import datetime, timedelta
from pathlib import Path
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
log = logging.getLogger(__name__)
dag = DAG(
"example_using_k8s_executor",
schedule_interval="0 1 * * *",
catchup=False,
default_args={
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2020, 8, 7),
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(seconds=30),
"sla": timedelta(hours=23),
},
)
def use_airflow_binary():
rc = os.system("airflow -h")
assert rc == 0
with dag:
task_1 = PythonOperator(
task_id="task-1",
python_callable=use_airflow_binary,
)
task_2 = PythonOperator(
task_id="task-2",
python_callable=use_airflow_binary,
)
task_1 >> task_2
Dit voorbeeld toont een eenvoudige DAG met twee taken die de PythonOperator gebruiken. Wat er gebeurt wanneer we het uitvoeren, is dat de Kubernetes executor de Airflow-taakwachtrij bekijkt en elke taak in die wachtrij oppikt en er een KubernetesPod van maakt. De executor houdt vervolgens de pod in de gaten voor de status en synchroniseert de status terug naar de scheduler, die dan weet of de taak opnieuw moet worden gepland, opnieuw moet worden geprobeerd, of dat de volgende downstream taak kan worden gepland.
Wanneer we deze DAG draaien, wordt onze taak uitgevoerd in onze worker pod door de KubernetesExecutor en opgeruimd na succes of falen. Als je niet wilt dat je worker pods worden opgeruimd, kun je de extra ENV var aan je Airflow-configuratie toevoegen: AIRFLOW__KUBERNETS__IS_DELETE_WORKER_POD en deze op false zetten.
NAME READY STATUS RESTARTS AGE
airflow-76c4f47d78-hw52g 2/2 Running 0 5d17h
airflow-db-6944c99c7c-fsmnz 1/1 Running 0 5d17h
exampleusingk8sexecutortask1-f1f0316e08bf49b3a4f97f10a12db542 0/1 ContainerCreating 0 2s
NAME READY STATUS RESTARTS AGE
airflow-76c4f47d78-hw52g 2/2 Running 0 5d17h
airflow-db-6944c99c7c-fsmnz 1/1 Running 0 5d17h
exampleusingk8sexecutortask1-f1f0316e08bf49b3a4f97f10a12db542 1/1 Running 0 5s
De KubernetesExecutor is geweldig omdat je de natuurlijke elasticiteit van Kubernetes krijgt samen met alle goede dingen van Airflow. Het is niet langer noodzakelijk om je eigen containers te bouwen met aangepaste workloads aangezien elke taak in je DAG als een pod zal worden uitgevoerd. Er is wat extra initiële configuratie nodig bij het opzetten van Airflow, maar de setup blijft redelijk gemakkelijk te beheren.
Houd er wel rekening mee dat wanneer je veel taken parallel gaat plannen, dit zeer snel duur kan worden voor je clusterresources. Houd hier altijd een oogje op of stel limieten in door Airflow Pools te gebruiken (deze kunnen worden ingesteld via ENV vars of via de UI).
Ook is er wat opstart- en shutdown overhead elke keer dat een taak als een Pod wordt opgestart. Maar afhankelijk van je verwachtingen of vereisten is dit verwaarloosbaar.
En tot slot, veruit mijn favoriet:
Gebruik van KEDA met Airflow
Hier gebruiken we een CNCF Sandbox project dat oorspronkelijk is ontwikkeld door het Microsoft Azure Functions team: KEDA
KEDA staat voor Kubernetes Event Driven Autoscaler. Het stelt ons in staat om deployments op Kubernetes te schalen op basis van externe events/metrics, waardoor schaling naar en van nul mogelijk wordt. Dit samen met Airflow's CeleryExecutor brengt ons het beste van beide werelden. Kubernetes-pedigree bij het draaien van elastische scaling workloads en CerelyWorkers voor lage overhead continue uitvoering van Airflow DAG-taken.
KEDA werkt door een nieuwe custom resource definition (CRD) te gebruiken genaamd ScaledObject. Het scaled object is het werkpaard voor het omhoog en omlaag schalen van deployments.
scaledobject.yaml
apiVersion: keda.k8s.io/v1alpha1
kind: ScaledObject
metadata:
name: airflow-worker
spec:
scaleTargetRef:
deploymentName: airflow-worker
pollingInterval: 10 # Optional. Default: 30 seconds
cooldownPeriod: 30 # Optional. Default: 300 seconds
maxReplicaCount: 10 # Optional. Default: 100
triggers:
- type: postgresql
metadata:
connection: AIRFLOW_CONN_AIRFLOW_DB
query: "SELECT ceil(COUNT(*)::decimal / 4) FROM task_instance WHERE state='running' OR state='queued'"
targetQueryValue: "1"
Hoe het werkt is dat het scaled object een deployment target heeft om te schalen in het geval dat aan een trigger wordt voldaan. De trigger wordt een Scaler genoemd en is in dit geval een query die wordt uitgevoerd op de Airflow PostgreSQL metadata database. De query wordt uitgevoerd op interval en het resultaat van de query bepaalt of de deployment moet worden opgeschaald of afgeschaald. In dit geval controleren we het aantal Airflow taakinstanties die ofwel een running of queued status hebben.
Als je wilt beginnen met KEDA voor het schalen van je deployments, zijn er al meerdere scalers beschikbaar, zoals een scaler voor Azure Blob Storage, een voor het schalen op basis van RabbitMQ messages, een MySQL scaler, of een scaler die schaalt op basis van Prometheus metrics.
Voor de volledige lijst, bekijk de documentatie op keda.sh
Voor Airflow werkt KEDA in combinatie met de CeleryExecutor. Celery is een implementatie van een taakwachtrij in Python en samen met KEDA stelt het Airflow in staat om dynamisch taken in celery workers parallel uit te voeren. Het schaalt CeleryWorkers op en af naargelang nodig op basis van wachtende of lopende taken.
Dit heeft het voordeel dat de CeleryWorkers over het algemeen minder overhead hebben bij het sequentieel uitvoeren van taken, aangezien er geen opstart is zoals bij de KubernetesExecutor.
Je krijgt dus de elasticiteit van Kubernetes, samen met alle voordelen die Celery biedt op het gebied van prestaties.
De setup
Voor deze setup gebruiken we de HELM chart ontwikkeld door Astronomer om Airflow + KEDA op Kubernetes te deployen.
Ik verwijs naar de officiële setup documentatie van Astronomer Airflow, deze is hier te vinden.
helm repo add kedacore https://kedacore.github.io/charts
helm repo add astronomer https://helm.astronomer.io
helm repo update
kubectl create namespace keda
helm install keda \
--namespace keda kedacore/keda
kubectl create namespace airflow
helm install airflow \
--set executor=CeleryExecutor \
--set workers.keda.enabled=true \
--set workers.persistence.enabled=false \
--namespace airflow \
astronomer/airflow
Nadat we Airflow met KEDA hebben gedeployed met de hierboven vermelde stappen, hebben we nu twee namespaces.
namespace=airflow
NAME READY STATUS RESTARTS AGE
airflow-flower-5966c99975-7vh9r 1/1 Running 0 93s
airflow-postgresql-0 1/1 Running 0 92s
airflow-redis-0 1/1 Running 0 92s
airflow-scheduler-7f64b6cd67-lr95p 2/2 Running 0 93s
airflow-statsd-f7647597-9xdzv 1/1 Running 0 93s
airflow-webserver-74b794d767-rzp49 1/1 Running 0 93s
Omdat we Celery als onze Executor gebruiken, hebben we enkele extra componenten in onze Airflow deployment. Namelijk flower, de beheer/monitoring UI voor Celery, redis voor het bemiddelen van berichten naar onze CeleryWorkers, en een statsd server die is opgenomen in de Helm chart voor het verzamelen van Airflow metrics.
namespace=keda
NAME READY STATUS RESTARTS AGE
keda-operator-5d44c49879-wpknv 1/1 Running 0 3m16s
keda-operator-metrics-apiserver-86d8bbc4df-884cr 1/1 Running 0 2m49s
In onze keda namespace hebben we de keda-operator die verantwoordelijk is voor het monitoren van onze trigger (de PostgreSQL Airflow DB) en het dienovereenkomstig schalen van onze deployment, en de keda-operator-metrics-apiserver die verantwoordelijk is voor het serveren van metrics aan de HorizontalPodAutoscaler. De native Kubernetes HorizontalPodAutoscaler wordt gebruikt om onze target CeleryWorker deployment uit te schalen.
KEDA maakt gebruik van deze reeds bestaande Autoscaler om onze deployment uit te schalen, waardoor KEDA een zeer lichte component is. Behoorlijk cool.
Neem nu deze DAG:
Hier gaan we 20 taken genereren. De DAG zal continu draaien en nieuwe taken blijven genereren voor onze CeleryWorkers om te verwerken.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
def my_custom_function(ts,**kwargs):
"""
This can be any python code you want and is called from the python operator. The code is not executed until
the task is run by the airflow scheduler.
"""
print(f"I am task number {kwargs['task_number']}. This DAG Run execution date is {ts} and the current time is {datetime.now()}")
print('Here is the full DAG Run context. It is available because provide_context=True')
print(kwargs)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG('example_dag_generated',
start_date=datetime(2019, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(minutes=30),
default_args=default_args,
) as dag:
t0 = DummyOperator(
task_id='start'
)
# generate tasks with a loop. task_id must be unique
for task in range(20):
tn = PythonOperator(
task_id=f'python_print_date_{task}',
python_callable=my_custom_function,
op_kwargs={'task_number': task},
provide_context=True
)
t0 >> tn
Wanneer we deze DAG uitvoeren, gebeurt het volgende:
Airflow begint taken te plannen, die in de queued-status terechtkomen. Onze KEDA-operator monitort op running en queued taken en wordt getriggerd vanwege de taken in de wachtrij.
kubectl -f logs -n keda keda-operator…
{"level":"info","ts":1603266764.262705,"logger":"scalehandler","msg":"Successfully updated deployment","ScaledObject.Namespace":"airflow","ScaledObject.Name":"airflow-worker","ScaledObject.ScaleType":"deployment","Deployment.Namespace":"airflow","Deployment.Name":"airflow-worker","Original Replicas Count":0,"New Replicas Count":1}
{"level":"info","ts":1603266771.0487137,"logger":"controller_scaledobject","msg":"Reconciling ScaledObject","Request.Namespace":"airflow","Request.Name":"airflow-worker"}
{"level":"info","ts":1603266771.04885,"logger":"controller_scaledobject","msg":"Detected ScaleType = Deployment","Request.Namespace":"airflow","Request.Name":"airflow-worker"}
Omdat hij getriggerd wordt, meldt hij dat het deployment target: airflow-worker moet worden opgeschaald.
airflow-worker-dc75d6597-xtlhq 0/1 PodInitializing 0 22s
De deployment wordt opgeschaald en Celery zal berichten naar deze nieuwe CeleryWorker sturen. De CeleryWorker begint met het verwerken van taken.
Dit proces loopt continu en KEDA zorgt ervoor dat wanneer er meer taken worden gepland, meer workers worden opgestart.
airflow-worker-dc75d6597-8kvg5 0/1 Init:0/1 0 6s
airflow-worker-dc75d6597-glcd8 0/1 Init:0/1 0 6s
airflow-worker-dc75d6597-nxlwc 0/1 PodInitializing 0 6s
airflow-worker-dc75d6597-xtlhq 1/1 Running 0 81s
---
airflow-worker-dc75d6597-8kvg5 1/1 Running 0 84s
airflow-worker-dc75d6597-glcd8 1/1 Running 0 84s
airflow-worker-dc75d6597-nxlwc 1/1 Running 0 84s
airflow-worker-dc75d6597-xtlhq 1/1 Running 0 2m39s
Zodra er geen taken meer te verwerken zijn, zal KEDA onze worker deployment afschalen naar nul.
{"level":"info","ts":1603274700.8434458,"logger":"scalehandler","msg":"Successfully scaled deployment to 0 replicas","ScaledObject.Namespace":"airflow","ScaledObject.Name":"airflow-worker","ScaledObject.ScaleType":"deployment","Deployment.Namespace":"airflow","Deployment.Name":"airflow-worker"}
Het draaien van Airflow met KEDA brengt het beste van beide werelden samen. De elasticiteit van Kubernetes ontsloten door KEDA in combinatie met de snel-opvolgende, lage overhead geoptimaliseerde CeleryExecutor en workers.
De setup is eenvoudig met behulp van de Helm chart van Astronomer, hoewel de toegevoegde complexiteit van extra componenten een intensievere debugging-flow zal vereisen wanneer er ergens onderweg iets misgaat.
Al met al ben ik een grote fan van het draaien van Airflow samen met KEDA en kijk ik uit naar een mooie toekomst voor Airflow op Kubernetes in combinatie met KEDA.