Skip to main content

Command Palette

Search for a command to run...

Running Apache Flink on Kubernetes: From Zero to a Fully Utilized Cluster

Updated
8 min read
Running Apache Flink on Kubernetes: From Zero to a Fully Utilized Cluster
S

DevOps & Cloud Engineer — building scalable, automated, and intelligent systems. Developer of sorts | Automator | Innovator

This blog walks through Apache Flink end to end, starting from what Flink is, how its architecture works, and how to deploy and properly utilize a Kubernetes cluster using Flink’s standalone Kubernetes mode. The goal is not just to get Flink running, but to make sure it runs correctly, efficiently, and in a way that matches how Flink is designed to work.

This guide is based on a real Kubernetes cluster with one control plane and two worker nodes, each with roughly 8 GB RAM.


Apache Flink is a distributed stream and batch processing engine designed for stateful, low-latency, high-throughput data processing. Unlike traditional batch systems, Flink treats streaming as the primary model, with batch being a special case of bounded streams.

Key properties of Flink:

• True streaming engine, not micro-batching • Stateful processing with exactly-once guarantees • Event-time processing and watermarks • Horizontal scalability • Fault tolerance via checkpoints and state backends

Flink is commonly used for real-time analytics, event-driven applications, fraud detection, metrics aggregation, and complex event processing.


A Flink cluster is composed of a small number of well-defined components.

JobManager

The JobManager is the brain of the cluster.

It is responsible for:

• Accepting jobs • Creating execution graphs • Scheduling tasks • Coordinating checkpoints • Handling failures and restarts

Only one active JobManager exists at a time in standalone mode.


TaskManager

TaskManagers are the workers of the Flink cluster.

Each TaskManager:

• Runs tasks (operators) • Manages task slots • Executes user code • Maintains local state

A TaskManager exposes a fixed number of task slots. Slots are the unit of parallelism in Flink.


Slots and Parallelism

A slot represents a share of a TaskManager’s resources.

Total available parallelism is:

TaskManagers × Slots per TaskManager

For example:

• 4 TaskManagers • 2 slots each • Total parallelism = 8

Jobs can only run with parallelism up to the available slots.


Kubernetes provides a natural runtime for Flink because:

• Pods map cleanly to JobManager and TaskManager • Kubernetes scheduler handles placement • Native scaling via replicas • Built-in service discovery • Persistent volumes for state

Flink supports Kubernetes in multiple ways. In this blog we use Standalone Kubernetes mode, where Flink runs continuously as a cluster inside Kubernetes.


Cluster Prerequisites

The Kubernetes cluster used here:

• 1 control plane node • 2 worker nodes • ~8 GB RAM per worker • containerd runtime • local-path storage provisioner

Both worker nodes are labeled to allow Flink scheduling:

kubectl label node nyzex-worker-node1 flink-role=worker
kubectl label node nyzex-worker-node2 flink-role=worker

Namespace Setup

Create a dedicated namespace for Flink.

kubectl create namespace flink

This keeps Flink resources isolated and easier to manage.


Flink requires persistent storage for:

• Checkpoints • Savepoints • High availability metadata (optional)

Using a PersistentVolumeClaim allows Kubernetes to dynamically provision storage.

PVC Definition

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: flink-storage
  namespace: flink
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 20Gi
  storageClassName: local-path

Apply it:

kubectl apply -f flink-pvc.yaml

With WaitForFirstConsumer, the volume binds only after a pod requests it. This is expected behavior.


Flink is configured using flink-conf.yaml mounted into the pods via ConfigMap.

Key configuration:

jobmanager.rpc.address: flink-jobmanager

state.backend: filesystem
state.checkpoints.dir: file:///opt/flink/state/checkpoints
state.savepoints.dir: file:///opt/flink/state/savepoints

execution.checkpointing.interval: 10s

parallelism.default: 2

kubernetes.taskmanager.node-selector.flink-role: worker
kubernetes.jobmanager.node-selector.flink-role: worker

This ensures:

• State is persisted • Checkpointing is enabled • Pods run only on worker nodes

What I used:

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  namespace: flink
data:
  flink-conf.yaml: |
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    parallelism.default: 8

    # Memory
    jobmanager.memory.process.size: 1024m
    taskmanager.memory.process.size: 3g

    # State backend
    state.backend: rocksdb
    state.backend.incremental: true
    state.checkpoints.dir: file:///flink-data/checkpoints
    state.savepoints.dir: file:///flink-data/savepoints

    execution.checkpointing.interval: 60s
    execution.checkpointing.min-pause: 30s
    execution.checkpointing.timeout: 10m

JobManager Deployment

The JobManager runs as a Deployment with a single replica.

Key points:

• Uses Flink image • Exposes RPC and Web UI ports • Mounts persistent storage • Uses node selector

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
  namespace: flink
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
        - name: jobmanager
          image: flink:1.18
          args: ["jobmanager"]
          ports:
            - containerPort: 6123
            - containerPort: 8081
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
            - name: flink-storage
              mountPath: /flink-data
          resources:
            requests:
              memory: "1Gi"
              cpu: "1"
            limits:
              memory: "1Gi"
              cpu: "1"
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
        - name: flink-storage
          persistentVolumeClaim:
            claimName: flink-storage

JobManager Service

A Kubernetes Service exposes the JobManager internally.

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
  namespace: flink
spec:
  ports:
    - name: rpc
      port: 6123
    - name: webui
      port: 8081
  selector:
    app: flink
    component: jobmanager

TaskManager Deployment

TaskManagers scale horizontally using replicas.

Important aspects:

• Resource requests and limits • Slot configuration • Pod anti-affinity for spreading

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: flink
spec:
  replicas: 4
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      nodeSelector:
        flink-role: worker
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
            - weight: 100
              podAffinityTerm:
                labelSelector:
                  matchLabels:
                    component: taskmanager
                topologyKey: kubernetes.io/hostname
      containers:
        - name: taskmanager
          image: flink:1.18
          args: ["taskmanager"]
          env:
            - name: TASK_MANAGER_NUMBER_OF_TASK_SLOTS
              value: "2"
          resources:
            requests:
              memory: "2Gi"
              cpu: "1"
            limits:
              memory: "3Gi"
              cpu: "2"

What I used:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: flink
spec:
  replicas: 4
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
            - weight: 100
              podAffinityTerm:
                labelSelector:
                  matchLabels:
                    component: taskmanager
                topologyKey: kubernetes.io/hostname
      containers:
        - name: taskmanager
          image: flink:1.18
          args: ["taskmanager"]
          resources:
            requests:
              memory: "3Gi"
              cpu: "2"
            limits:
              memory: "3Gi"
              cpu: "2"
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
            - name: flink-tmp
              mountPath: /tmp
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
        - name: flink-tmp
          emptyDir: {}

This configuration allows Kubernetes to spread TaskManagers across both worker nodes.

Some important queries and information:

Why TaskManagers are getting distributed across nodes:

TaskManagers are distributed because:

  • Kubernetes schedules pods

  • You allowed scheduling on both worker nodes

  • Flink TaskManagers are stateless compute workers

  • You added anti-affinity, so Kubernetes spreads them

Flink itself does not decide node placement. Kubernetes does.

Who decides where a TaskManager runs?

When you create this:

replicas: 4
kind: Deployment

You are telling Kubernetes:

“I want 4 identical TaskManager pods.”

Kubernetes then:

  • Looks at available nodes

  • Checks nodeSelector

  • Checks resource requests

  • Applies affinity rules

  • Chooses nodes

Flink only sees:

“I now have 4 TaskManagers connected to me.”


Why they don’t all land on one node anymore:

Once this is added:

podAntiAffinity:
  preferredDuringSchedulingIgnoredDuringExecution:
    - weight: 100
      podAffinityTerm:
        labelSelector:
          matchLabels:
            component: taskmanager
        topologyKey: kubernetes.io/hostname

Result:

  • Pods spread evenly

  • 2 TaskManagers on worker-node1

  • 2 TaskManagers on worker-node2


Why TaskManagers do NOT use the PVC

Key principle

TaskManagers are ephemeral compute.
They should be disposable.

Flink is designed so that:

  • TaskManagers can die at any time

  • State is NOT tied to a specific TaskManager pod

So by default:

  • TaskManagers do NOT mount persistent volumes

  • TaskManagers use local ephemeral storage

  • Persistent state lives elsewhere

This is intentional.


So what is the PVC actually used for?

In this setup, the PVC is mounted only on the JobManager:

volumeMounts:
  - name: flink-storage
    mountPath: /opt/flink/state

And configured in flink-conf.yaml:

state.backend: filesystem
state.checkpoints.dir: file:///opt/flink/state/checkpoints
state.savepoints.dir: file:///opt/flink/state/savepoints

This means:

  • Checkpoints are written to the PVC

  • Savepoints are written to the PVC

  • Job metadata survives pod restarts

During a checkpoint:

  1. TaskManagers snapshot their local state

  2. State is sent to the JobManager

  3. JobManager persists it to the shared storage

If a TaskManager dies:

  • Kubernetes restarts it

  • Flink restores state from the checkpoint directory

  • Processing resumes


Verifying Cluster Distribution

After deployment:

kubectl get pods -n flink -o wide

Expected result:

• JobManager on one worker • TaskManagers evenly split across nodes • No Pending pods

This confirms proper scheduling and full cluster utilization.


Port-forward the JobManager service:

kubectl port-forward svc/flink-jobmanager 8081:8081 -n flink

Open in browser:

http://localhost:8081

You should see:

• All TaskManagers registered • Total available slots • Healthy cluster status


Running a Sample Job

Run a built-in Flink example:

kubectl exec -n flink deploy/flink-jobmanager -- \
  flink run /opt/flink/examples/streaming/WordCount.jar

Observe task distribution in the UI.


Kubernetes handles:

• Pod scheduling • Resource isolation • Restarting failed pods • Networking

Flink handles:

• Task scheduling • State management • Checkpoints • Fault recovery

This separation keeps responsibilities clean and scalable.


What Makes This Production-Ready

This setup already includes:

• Persistent state • Checkpointing • Horizontal scalability • Proper pod distribution

Next improvements can include:

• High availability JobManager • External state backend (S3, MinIO) • Kafka integration • Prometheus metrics • Autoscaling


Final Thoughts

Running Apache Flink on Kubernetes is not just about starting pods. Correct scheduling, slot planning, storage configuration, and understanding Flink’s execution model are critical.With this setup, your cluster resources are fully utilized, workloads scale correctly, and you are ready to run real streaming jobs with confidence..