Airflow Kubernetes Orchestration and Execution

The intersection of Apache Airflow and Kubernetes represents a paradigm shift in how data pipelines are scaled and managed. By leveraging the container orchestration capabilities of Kubernetes, Airflow evolves from a static scheduler into a dynamic engine capable of launching isolated, ephemeral workloads. This architectural synergy allows for the execution of tasks in their own dedicated containers, ensuring that dependency conflicts are eliminated and resource allocation is optimized. Whether through the specialized use of the KubernetesPodOperator or the structural implementation of the KubernetesExecutor, the objective remains the same: to decouple the task execution environment from the Airflow scheduler and webserver. This decoupling ensures that a failure in a specific task does not jeopardize the stability of the entire orchestration layer, while simultaneously providing the ability to scale workers based on the real-time demand of the DAG (Directed Acyclic Graph) queue.

KubernetesPodOperator Architecture

The KubernetesPodOperator serves as a bridge between Airflow's orchestration logic and the Kubernetes API. Unlike standard operators that execute code within the Airflow worker process, this operator triggers the creation of a completely new Pod on the cluster. This Pod exists solely for the duration of the task's execution and is managed by the Kubernetes API.

The utility of this operator is primarily found in its ability to handle containerized workloads. Because the task runs in a separate pod, it can utilize any image available in a container registry, regardless of the language or dependencies required. This means a single Airflow DAG can orchestrate a pipeline where one task runs in a Python 3.8 container, the next in a Java 11 container, and the third in a specialized Ubuntu environment, all without requiring those environments to be installed on the Airflow worker itself.

To implement this functionality, specific prerequisites must be met within the cluster. A ServiceAccount must be established to allow the Airflow operator to authenticate with the Kubernetes API and possess the necessary permissions to create, monitor, and delete pods within a specified namespace.

The following configuration defines the necessary ServiceAccount for this operation:

yaml v1 kind: ServiceAccount metadata: name: airflow-k8spodoperator namespace: airflow-k8spodoperator

The existence of this ServiceAccount is critical. Without it, the KubernetesPodOperator would fail to authenticate, resulting in a permission error when attempting to launch the task pod. This ensures that the security boundary is maintained, limiting the operator's influence to the airflow-k8spodoperator namespace.

Implementation of the KubernetesPodOperator DAG

A practical implementation of the KubernetesPodOperator involves defining a DAG that specifies the container image, the commands to be executed, and the resource constraints. In a production scenario, this allows for highly reproducible tasks.

Consider the following DAG implementation:

python import logging from datetime import datetime, timedelta from pathlib import Path from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator log = logging.getLogger(__name__) dag = DAG( "example_using_k8s_pod_operator", schedule_interval="0 1 * * *", catchup=False, default_args={ "owner": "admin", "depends_on_past": False, "start_date": datetime(2020, 8, 7), "email_on_failure": False, "email_on_retry": False, "retries": 2, "retry_delay": timedelta(seconds=30), "sla": timedelta(hours=23), }, ) with dag: task_1 = KubernetesPodOperator( image="ubuntu:16.04", namespace="airflow-k8spodoperator", cmds=["bash", "-cx"], arguments=["echo", "10"], labels={"foo": "bar"}, name="test-using-k8spodoperator-task-1", task_id="task-1-echo", is_delete_operator_pod=False, in_cluster=True, ) task_2 = KubernetesPodOperator( image="ubuntu:16.04", namespace="airflow-k8spodoperator", cmds=["sleep"], arguments=["300"], labels={"foo": "bar"}, name="test-using-k8spodoperator-task-2", task_id="task-2-sleep", is_delete_operator_pod=False, in_cluster=True, ) task_1 >> task_2

In this configuration, several key parameters govern the behavior of the pods:

  • image: Specifies the container image to be used. In the example, ubuntu:16.04 is utilized. This ensures the environment is consistent across all executions.
  • namespace: The pod is launched in the airflow-k8spodoperator namespace, keeping it isolated from other cluster workloads.
  • cmds: The entrypoint command. For task_1, it uses bash -cx, allowing for the execution of shell commands.
  • arguments: The specific values passed to the command. task_1 echoes "10", while task_2 invokes a sleep command for 300 seconds.
  • isdeleteoperator_pod: When set to False, the pod remains in the cluster after completion. This is invaluable for debugging, as it allows administrators to inspect the pod logs and state.
  • in_cluster: Set to True to indicate that the operator is running inside the Kubernetes cluster and should use the internal service account for authentication.

The execution flow is sequential, where task_1 must complete before task_2 begins. When these tasks run, the cluster reflects the current state. For instance, the pod list might show:

  • airflow-56f875bb-dk6vq: The main Airflow deployment.
  • airflow-db-57548fc4d-qvbgf: The metadata database.
  • test-using-k8spodoperator-task-1-5a055bae: A completed pod resulting from the first task.
  • test-using-k8spodoperator-task-1-60caa2f3: A completed pod from a subsequent run.

KubernetesExecutor Configuration and Deployment

While the KubernetesPodOperator is used for specific tasks, the KubernetesExecutor is a systemic choice that determines how all tasks in a DAG are executed. The KubernetesExecutor allows the Airflow scheduler to request a new pod for every single task instance. This provides maximum elasticity, as resources are only consumed when a task is actually running.

Setting up the KubernetesExecutor requires a deployment that integrates the webserver and scheduler. The infrastructure typically consists of a webserver for the UI, a scheduler for task orchestration, and a PostgreSQL database for metadata.

The deployment configuration for the KubernetesExecutor is as follows:

yaml apiVersion: apps/v1 kind: Deployment metadata: name: airflow namespace: airflow-k8sexecutor spec: replicas: 1 selector: matchLabels: name: airflow template: metadata: labels: name: airflow spec: automountServiceAccountToken: true containers: - args: - webserver - -p - "8000" env: - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN value: postgresql://postgres:password@airflow-db:5432/postgres - name: AIRFLOW__CORE__EXECUTOR value: KubernetesExecutor - name: AIRFLOW__KUBERNETES__NAMESPACE value: airflow-k8sexecutor - name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME value: default - name: AIRFLOW__KUBERNETES__IN_CLUSTER value: 'true' - name: AIRFLOW__KUBERNETES__DAG's_IN_IMAGE value: 'true' image: apache/airflow:1.10.12 imagePullPolicy: Always volumeMounts: - mountPath: /opt/airflow/logs/ mountPropagation: None name: airflow-logs - args: - scheduler env: - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN value: postgresql://postgres:password@airflow-db:5432/postgres - name: AIRFLOW__CORE__EXECUTOR value: KubernetesExecutor - name: AIRFLOW__KUBERNETES__NAMESPACE value: airflow-k8sexecutor

The configuration emphasizes several environment variables that dictate the executor's behavior:

  • AIRFLOWCOREEXECUTOR: Set to KubernetesExecutor, which tells the scheduler to launch pods instead of using a local worker.
  • AIRFLOWKUBERNETESNAMESPACE: Defined as airflow-k8sexecutor, ensuring all spawned workers are placed in the correct logical segment of the cluster.
  • AIRFLOWKUBERNETESIN_CLUSTER: Set to true, enabling the use of internal cluster authentication.
  • AIRFLOWKUBERNETESDAG'sINIMAGE: Set to true, indicating that the DAG files are baked into the container image, preventing the need for external volume mounts for DAG synchronization.

The use of the apache/airflow:1.10.12 image provides the core binaries, while the imagePullPolicy: Always ensures that the most recent version of the image is fetched from the registry during pod creation.

Metadata Database Integration

A critical component of any Airflow deployment on Kubernetes is the metadata database. Airflow requires a relational database to track task states, DAG runs, and user permissions. In the provided architecture, PostgreSQL is used for this purpose.

The database deployment is defined in the following manifest:

yaml apiVersion: apps/v1 kind: Deployment metadata: name: airflow-db namespace: airflow-k8spodoperator spec: replicas: 1 selector: matchLabels: name: airflow-db template: metadata: labels: name: airflow-db spec: containers: - env: - name: POSTGRES_PASSWORD value: password image: postgres:9.6 imagePullPolicy: IfNotPresent name: airflow-db volumeMounts: - mountPath: /var/lib/postgresql/data mountPropagation: None name: postgresql-data restartPolicy: Always schedulerName: default-scheduler terminationGracePeriodSeconds: 30 volumes: - emptyDir: {} name: postgresql-data

To ensure the Airflow components can communicate with this database, a ClusterIP service is required. This provides a stable DNS name that the scheduler and webserver can target.

The service configuration is as follows:

yaml apiVersion: v1 kind: Service metadata: name: airflow-db namespace: airflow-k8spodoperator spec: clusterIP: None ports: - port: 5432 protocol: TCP targetPort: 5432 selector: name: airflow-db sessionAffinity: None type: ClusterIP status: loadBalancer: {}

This service maps port 5432, the default PostgreSQL port, to the database pod. The clusterIP: None indicates a headless service, which is often used in Kubernetes to allow direct pod communication or for specific DNS resolution patterns.

Elastic Scaling with KEDA

To optimize resource utilization and performance, KEDA (Kubernetes Event-driven Autoscaling) can be integrated into the Airflow ecosystem. KEDA allows for scaling based on external events, which in the case of Airflow, is the number of tasks currently queued or running in the metadata database.

This is particularly powerful when combined with the CeleryExecutor, as it allows the cluster to maintain a baseline of Celery Workers for low-overhead execution while scaling up rapidly to handle bursts of high-volume workloads.

KEDA utilizes a Custom Resource Definition (CRD) called a ScaledObject. This object monitors a specific trigger and adjusts the replica count of a target deployment accordingly.

The KEDA configuration for Airflow workers is as follows:

yaml apiVersion: keda.k8s.io/v1alpha1 kind: ScaledObject metadata: name: airflow-worker spec: scaleTargetRef: deploymentName: airflow-worker pollingInterval: 10 # Optional. Default: 30 seconds cooldownPeriod: 30 # Optional. Default: 300 seconds maxReplicaCount: 10 # Optional. Default: 100 triggers: - type: postgresql metadata: connection: AIRFLOW_CONN_AIRFLOW_DB query: "SELECT ceil(COUNT(*)::decimal / 4) FROM task_instance WHERE state='running' OR state='queued'" targetQueryValue: "1"

The operational mechanics of this scaling strategy are detailed below:

  • Scale Target: The ScaledObject targets the airflow-worker deployment.
  • Polling Interval: Set to 10 seconds, meaning KEDA checks the trigger every 10 seconds.
  • Cooldown Period: Set to 30 seconds, which prevents the cluster from scaling down too rapidly (thrashing) if the queue fluctuates briefly.
  • Max Replica Count: Limited to 10 replicas to prevent runaway scaling from consuming all cluster resources.
  • Trigger Mechanism: The trigger is a postgresql scaler. It executes a specific SQL query against the Airflow database: SELECT ceil(COUNT(*)::decimal / 4) FROM task_instance WHERE state='running' OR state='queued'.
  • Scaling Logic: The query calculates the number of running or queued tasks and divides by 4. If the result meets the targetQueryValue of "1", KEDA triggers the scaling of the worker pods.

This event-driven approach ensures that the infrastructure is right-sized. When the queue is empty, the worker count drops to the minimum, saving costs. When a large DAG triggers hundreds of tasks, KEDA rapidly increases the worker count to maintain throughput.

Comparative Analysis of Airflow Execution Models

The choice between different execution models on Kubernetes depends on the specific requirements of the workload. The following table compares the primary methods discussed.

Feature KubernetesPodOperator KubernetesExecutor Celery + KEDA
Isolation High (Per Task Pod) High (Per Task Pod) Medium (Shared Worker)
Startup Latency High (Pod Creation) High (Pod Creation) Low (Pre-warmed Workers)
Scaling Method Individual Pods Individual Pods Scaled Worker Sets
Dependency Mgmt Container Image Container Image Worker Image
Resource Efficiency High (Ephemeral) High (Ephemeral) Medium (Constant Base)

Detailed Component Analysis

Scheduler and Worker Configuration

The scheduler is the heart of the Airflow system, responsible for monitoring DAGs and triggering task instances. When using the KubernetesExecutor, the scheduler does not execute the task itself but instead sends a request to the Kubernetes API to create a pod.

The scheduler configuration includes critical environment variables for the worker's identity:

  • AIRFLOWKUBERNETESWORKERSERVICEACCOUNT_NAME: Set to default in some configurations, this defines the identity the worker pod assumes.
  • AIRFLOWKUBERNETESWORKERCONTAINERREPOSITORY: Defines the image source, such as eu.gcr.io/fullstaq-st-tim/st-airflow-executor.
  • AIRFLOWKUBERNETESWORKERCONTAINERTAG: Specifies the version, such as latest or 1.10.10.
  • AIRFLOWKUBERNETESRUNASUSER: Set to 50000, ensuring the container runs as a non-root user for enhanced security.

Init Container Functionality

In many Airflow Kubernetes deployments, an initContainer is utilized. The primary responsibility of the init container is to bootstrap the database. This ensures that the database is initialized with the necessary schema before the webserver or scheduler attempts to connect to it.

The init container configuration typically looks like this:

yaml initContainers: - args: - initdb env: - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN value: postgresql://postgres:password@airflow-db:5432/postgres - name: AIRFLOW__CORE__EXECUTOR value: KubernetesExecutor - name: AIRFLOW__KUBERNETES__NAMESPACE value: airflow-k8sexecutor - name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME value: default - name: AIRFLOW__KUBERNETES__IN_CLUSTER value: 'true' - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY value: apache/airflow - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG value: 1.10.10 - name: AIRFLOW__KUBERNETES__DAG's_IN_IMAGE value: 'true' image: apache/airflow:1.10.12

By running the initdb argument, the container ensures the metadata tables are created. This prevents the scheduler from crashing upon startup due to a missing database schema.

Conclusion

The integration of Apache Airflow with Kubernetes transforms the orchestration of data pipelines into a scalable, resilient, and highly flexible operation. The KubernetesPodOperator provides an elegant solution for executing tasks in isolated containers, allowing for diverse technology stacks within a single pipeline. Simultaneously, the KubernetesExecutor offers a structural approach to elasticity, where every task is treated as an ephemeral pod.

The addition of KEDA introduces a layer of intelligent automation, allowing the cluster to respond dynamically to the volume of work by querying the PostgreSQL metadata database. This hybrid approach—combining the strengths of container isolation with event-driven scaling—allows organizations to maximize their hardware utilization while maintaining strict environment isolation. The shift toward this architecture is not merely a technical upgrade but a strategic move toward a more robust and maintainable data infrastructure, where the operational overhead of managing worker dependencies is replaced by the streamlined efficiency of container images.

Sources

  1. TrueFullStaq

Related Posts