The intersection of Apache Airflow and Kubernetes represents a paradigm shift in how data pipelines are scaled and managed. By leveraging the container orchestration capabilities of Kubernetes, Airflow evolves from a static scheduler into a dynamic engine capable of launching isolated, ephemeral workloads. This architectural synergy allows for the execution of tasks in their own dedicated containers, ensuring that dependency conflicts are eliminated and resource allocation is optimized. Whether through the specialized use of the KubernetesPodOperator or the structural implementation of the KubernetesExecutor, the objective remains the same: to decouple the task execution environment from the Airflow scheduler and webserver. This decoupling ensures that a failure in a specific task does not jeopardize the stability of the entire orchestration layer, while simultaneously providing the ability to scale workers based on the real-time demand of the DAG (Directed Acyclic Graph) queue.
KubernetesPodOperator Architecture
The KubernetesPodOperator serves as a bridge between Airflow's orchestration logic and the Kubernetes API. Unlike standard operators that execute code within the Airflow worker process, this operator triggers the creation of a completely new Pod on the cluster. This Pod exists solely for the duration of the task's execution and is managed by the Kubernetes API.
The utility of this operator is primarily found in its ability to handle containerized workloads. Because the task runs in a separate pod, it can utilize any image available in a container registry, regardless of the language or dependencies required. This means a single Airflow DAG can orchestrate a pipeline where one task runs in a Python 3.8 container, the next in a Java 11 container, and the third in a specialized Ubuntu environment, all without requiring those environments to be installed on the Airflow worker itself.
To implement this functionality, specific prerequisites must be met within the cluster. A ServiceAccount must be established to allow the Airflow operator to authenticate with the Kubernetes API and possess the necessary permissions to create, monitor, and delete pods within a specified namespace.
The following configuration defines the necessary ServiceAccount for this operation:
yaml
v1
kind: ServiceAccount
metadata:
name: airflow-k8spodoperator
namespace: airflow-k8spodoperator
The existence of this ServiceAccount is critical. Without it, the KubernetesPodOperator would fail to authenticate, resulting in a permission error when attempting to launch the task pod. This ensures that the security boundary is maintained, limiting the operator's influence to the airflow-k8spodoperator namespace.
Implementation of the KubernetesPodOperator DAG
A practical implementation of the KubernetesPodOperator involves defining a DAG that specifies the container image, the commands to be executed, and the resource constraints. In a production scenario, this allows for highly reproducible tasks.
Consider the following DAG implementation:
python
import logging
from datetime import datetime, timedelta
from pathlib import Path
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
log = logging.getLogger(__name__)
dag = DAG(
"example_using_k8s_pod_operator",
schedule_interval="0 1 * * *",
catchup=False,
default_args={
"owner": "admin",
"depends_on_past": False,
"start_date": datetime(2020, 8, 7),
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(seconds=30),
"sla": timedelta(hours=23),
},
)
with dag:
task_1 = KubernetesPodOperator(
image="ubuntu:16.04",
namespace="airflow-k8spodoperator",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
name="test-using-k8spodoperator-task-1",
task_id="task-1-echo",
is_delete_operator_pod=False,
in_cluster=True,
)
task_2 = KubernetesPodOperator(
image="ubuntu:16.04",
namespace="airflow-k8spodoperator",
cmds=["sleep"],
arguments=["300"],
labels={"foo": "bar"},
name="test-using-k8spodoperator-task-2",
task_id="task-2-sleep",
is_delete_operator_pod=False,
in_cluster=True,
)
task_1 >> task_2
In this configuration, several key parameters govern the behavior of the pods:
- image: Specifies the container image to be used. In the example,
ubuntu:16.04is utilized. This ensures the environment is consistent across all executions. - namespace: The pod is launched in the
airflow-k8spodoperatornamespace, keeping it isolated from other cluster workloads. - cmds: The entrypoint command. For
task_1, it usesbash -cx, allowing for the execution of shell commands. - arguments: The specific values passed to the command.
task_1echoes "10", whiletask_2invokes a sleep command for 300 seconds. - isdeleteoperator_pod: When set to
False, the pod remains in the cluster after completion. This is invaluable for debugging, as it allows administrators to inspect the pod logs and state. - in_cluster: Set to
Trueto indicate that the operator is running inside the Kubernetes cluster and should use the internal service account for authentication.
The execution flow is sequential, where task_1 must complete before task_2 begins. When these tasks run, the cluster reflects the current state. For instance, the pod list might show:
airflow-56f875bb-dk6vq: The main Airflow deployment.airflow-db-57548fc4d-qvbgf: The metadata database.test-using-k8spodoperator-task-1-5a055bae: A completed pod resulting from the first task.test-using-k8spodoperator-task-1-60caa2f3: A completed pod from a subsequent run.
KubernetesExecutor Configuration and Deployment
While the KubernetesPodOperator is used for specific tasks, the KubernetesExecutor is a systemic choice that determines how all tasks in a DAG are executed. The KubernetesExecutor allows the Airflow scheduler to request a new pod for every single task instance. This provides maximum elasticity, as resources are only consumed when a task is actually running.
Setting up the KubernetesExecutor requires a deployment that integrates the webserver and scheduler. The infrastructure typically consists of a webserver for the UI, a scheduler for task orchestration, and a PostgreSQL database for metadata.
The deployment configuration for the KubernetesExecutor is as follows:
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow
namespace: airflow-k8sexecutor
spec:
replicas: 1
selector:
matchLabels:
name: airflow
template:
metadata:
labels:
name: airflow
spec:
automountServiceAccountToken: true
containers:
- args:
- webserver
- -p
- "8000"
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://postgres:password@airflow-db:5432/postgres
- name: AIRFLOW__CORE__EXECUTOR
value: KubernetesExecutor
- name: AIRFLOW__KUBERNETES__NAMESPACE
value: airflow-k8sexecutor
- name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
value: default
- name: AIRFLOW__KUBERNETES__IN_CLUSTER
value: 'true'
- name: AIRFLOW__KUBERNETES__DAG's_IN_IMAGE
value: 'true'
image: apache/airflow:1.10.12
imagePullPolicy: Always
volumeMounts:
- mountPath: /opt/airflow/logs/
mountPropagation: None
name: airflow-logs
- args:
- scheduler
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://postgres:password@airflow-db:5432/postgres
- name: AIRFLOW__CORE__EXECUTOR
value: KubernetesExecutor
- name: AIRFLOW__KUBERNETES__NAMESPACE
value: airflow-k8sexecutor
The configuration emphasizes several environment variables that dictate the executor's behavior:
- AIRFLOWCOREEXECUTOR: Set to
KubernetesExecutor, which tells the scheduler to launch pods instead of using a local worker. - AIRFLOWKUBERNETESNAMESPACE: Defined as
airflow-k8sexecutor, ensuring all spawned workers are placed in the correct logical segment of the cluster. - AIRFLOWKUBERNETESIN_CLUSTER: Set to
true, enabling the use of internal cluster authentication. - AIRFLOWKUBERNETESDAG'sINIMAGE: Set to
true, indicating that the DAG files are baked into the container image, preventing the need for external volume mounts for DAG synchronization.
The use of the apache/airflow:1.10.12 image provides the core binaries, while the imagePullPolicy: Always ensures that the most recent version of the image is fetched from the registry during pod creation.
Metadata Database Integration
A critical component of any Airflow deployment on Kubernetes is the metadata database. Airflow requires a relational database to track task states, DAG runs, and user permissions. In the provided architecture, PostgreSQL is used for this purpose.
The database deployment is defined in the following manifest:
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow-db
namespace: airflow-k8spodoperator
spec:
replicas: 1
selector:
matchLabels:
name: airflow-db
template:
metadata:
labels:
name: airflow-db
spec:
containers:
- env:
- name: POSTGRES_PASSWORD
value: password
image: postgres:9.6
imagePullPolicy: IfNotPresent
name: airflow-db
volumeMounts:
- mountPath: /var/lib/postgresql/data
mountPropagation: None
name: postgresql-data
restartPolicy: Always
schedulerName: default-scheduler
terminationGracePeriodSeconds: 30
volumes:
- emptyDir: {}
name: postgresql-data
To ensure the Airflow components can communicate with this database, a ClusterIP service is required. This provides a stable DNS name that the scheduler and webserver can target.
The service configuration is as follows:
yaml
apiVersion: v1
kind: Service
metadata:
name: airflow-db
namespace: airflow-k8spodoperator
spec:
clusterIP: None
ports:
- port: 5432
protocol: TCP
targetPort: 5432
selector:
name: airflow-db
sessionAffinity: None
type: ClusterIP
status:
loadBalancer: {}
This service maps port 5432, the default PostgreSQL port, to the database pod. The clusterIP: None indicates a headless service, which is often used in Kubernetes to allow direct pod communication or for specific DNS resolution patterns.
Elastic Scaling with KEDA
To optimize resource utilization and performance, KEDA (Kubernetes Event-driven Autoscaling) can be integrated into the Airflow ecosystem. KEDA allows for scaling based on external events, which in the case of Airflow, is the number of tasks currently queued or running in the metadata database.
This is particularly powerful when combined with the CeleryExecutor, as it allows the cluster to maintain a baseline of Celery Workers for low-overhead execution while scaling up rapidly to handle bursts of high-volume workloads.
KEDA utilizes a Custom Resource Definition (CRD) called a ScaledObject. This object monitors a specific trigger and adjusts the replica count of a target deployment accordingly.
The KEDA configuration for Airflow workers is as follows:
yaml
apiVersion: keda.k8s.io/v1alpha1
kind: ScaledObject
metadata:
name: airflow-worker
spec:
scaleTargetRef:
deploymentName: airflow-worker
pollingInterval: 10 # Optional. Default: 30 seconds
cooldownPeriod: 30 # Optional. Default: 300 seconds
maxReplicaCount: 10 # Optional. Default: 100
triggers:
- type: postgresql
metadata:
connection: AIRFLOW_CONN_AIRFLOW_DB
query: "SELECT ceil(COUNT(*)::decimal / 4) FROM task_instance WHERE state='running' OR state='queued'"
targetQueryValue: "1"
The operational mechanics of this scaling strategy are detailed below:
- Scale Target: The
ScaledObjecttargets theairflow-workerdeployment. - Polling Interval: Set to
10seconds, meaning KEDA checks the trigger every 10 seconds. - Cooldown Period: Set to
30seconds, which prevents the cluster from scaling down too rapidly (thrashing) if the queue fluctuates briefly. - Max Replica Count: Limited to
10replicas to prevent runaway scaling from consuming all cluster resources. - Trigger Mechanism: The trigger is a
postgresqlscaler. It executes a specific SQL query against the Airflow database:SELECT ceil(COUNT(*)::decimal / 4) FROM task_instance WHERE state='running' OR state='queued'. - Scaling Logic: The query calculates the number of running or queued tasks and divides by 4. If the result meets the
targetQueryValueof "1", KEDA triggers the scaling of the worker pods.
This event-driven approach ensures that the infrastructure is right-sized. When the queue is empty, the worker count drops to the minimum, saving costs. When a large DAG triggers hundreds of tasks, KEDA rapidly increases the worker count to maintain throughput.
Comparative Analysis of Airflow Execution Models
The choice between different execution models on Kubernetes depends on the specific requirements of the workload. The following table compares the primary methods discussed.
| Feature | KubernetesPodOperator | KubernetesExecutor | Celery + KEDA |
|---|---|---|---|
| Isolation | High (Per Task Pod) | High (Per Task Pod) | Medium (Shared Worker) |
| Startup Latency | High (Pod Creation) | High (Pod Creation) | Low (Pre-warmed Workers) |
| Scaling Method | Individual Pods | Individual Pods | Scaled Worker Sets |
| Dependency Mgmt | Container Image | Container Image | Worker Image |
| Resource Efficiency | High (Ephemeral) | High (Ephemeral) | Medium (Constant Base) |
Detailed Component Analysis
Scheduler and Worker Configuration
The scheduler is the heart of the Airflow system, responsible for monitoring DAGs and triggering task instances. When using the KubernetesExecutor, the scheduler does not execute the task itself but instead sends a request to the Kubernetes API to create a pod.
The scheduler configuration includes critical environment variables for the worker's identity:
- AIRFLOWKUBERNETESWORKERSERVICEACCOUNT_NAME: Set to
defaultin some configurations, this defines the identity the worker pod assumes. - AIRFLOWKUBERNETESWORKERCONTAINERREPOSITORY: Defines the image source, such as
eu.gcr.io/fullstaq-st-tim/st-airflow-executor. - AIRFLOWKUBERNETESWORKERCONTAINERTAG: Specifies the version, such as
latestor1.10.10. - AIRFLOWKUBERNETESRUNASUSER: Set to
50000, ensuring the container runs as a non-root user for enhanced security.
Init Container Functionality
In many Airflow Kubernetes deployments, an initContainer is utilized. The primary responsibility of the init container is to bootstrap the database. This ensures that the database is initialized with the necessary schema before the webserver or scheduler attempts to connect to it.
The init container configuration typically looks like this:
yaml
initContainers:
- args:
- initdb
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://postgres:password@airflow-db:5432/postgres
- name: AIRFLOW__CORE__EXECUTOR
value: KubernetesExecutor
- name: AIRFLOW__KUBERNETES__NAMESPACE
value: airflow-k8sexecutor
- name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
value: default
- name: AIRFLOW__KUBERNETES__IN_CLUSTER
value: 'true'
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
value: apache/airflow
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
value: 1.10.10
- name: AIRFLOW__KUBERNETES__DAG's_IN_IMAGE
value: 'true'
image: apache/airflow:1.10.12
By running the initdb argument, the container ensures the metadata tables are created. This prevents the scheduler from crashing upon startup due to a missing database schema.
Conclusion
The integration of Apache Airflow with Kubernetes transforms the orchestration of data pipelines into a scalable, resilient, and highly flexible operation. The KubernetesPodOperator provides an elegant solution for executing tasks in isolated containers, allowing for diverse technology stacks within a single pipeline. Simultaneously, the KubernetesExecutor offers a structural approach to elasticity, where every task is treated as an ephemeral pod.
The addition of KEDA introduces a layer of intelligent automation, allowing the cluster to respond dynamically to the volume of work by querying the PostgreSQL metadata database. This hybrid approach—combining the strengths of container isolation with event-driven scaling—allows organizations to maximize their hardware utilization while maintaining strict environment isolation. The shift toward this architecture is not merely a technical upgrade but a strategic move toward a more robust and maintainable data infrastructure, where the operational overhead of managing worker dependencies is replaced by the streamlined efficiency of container images.