Architecting Scalable Data Pipelines with Apache Kafka and the Elastic Stack

The integration of Apache Kafka into the Elastic Stack (ELK) represents a fundamental architectural shift from simple log aggregation to a robust, distributed streaming pipeline. At its core, the Elastic Stack—comprising Elasticsearch, Logstash, and Kibana—is designed for the search, analysis, and visualization of data. However, in high-throughput production environments, the linear flow of data from a source to Elasticsearch often encounters critical bottlenecks. When log bursts occur, the processing latency of Logstash's filters and the indexing overhead of Elasticsearch can lead to data loss or systemic instability.

Introducing Apache Kafka as a decoupled cache layer transforms the architecture. By placing Kafka between the data producers (such as Filebeat or syslog) and the data consumers (Logstash or Kafka Connect), organizations can implement a "buffer" that absorbs spikes in data volume. This ensures that the ingestion rate is decoupled from the indexing rate, allowing the Elastic Stack to process data at its own sustainable pace without risking the stability of the upstream producers.

Overcoming Performance Bottlenecks in the ELK Stack

In a traditional ELK deployment, data flows directly from the collector to Logstash and then to Elasticsearch. This synchronous chain creates two primary points of failure during high-traffic events.

The first bottleneck occurs within Logstash. Logstash must process logs through complex pipelines and filters, which are computationally expensive. When a burst of logs arrives, Logstash may lack the CPU or memory resources to process the queue in real-time, leading to backpressure.

The second bottleneck resides in Elasticsearch. The process of indexing data—which involves analyzing text, updating inverted indices, and committing data to disk—is time-intensive. If the volume of incoming documents exceeds the cluster's indexing capacity, the system may experience increased latency or refuse new writes.

By integrating Kafka, these bottlenecks are smoothed. Kafka acts as a distributed commit log, persisting the data across a cluster of brokers. This allows Logstash to act as a Kafka consumer, pulling data from the topic at a rate it can handle. If Logstash falls behind, the data remains safely stored in Kafka. To further optimize this, administrators can scale the number of Logstash instances belonging to the same consumer group to load balance the processing across multiple nodes.

Implementation Strategies for Kafka to Elasticsearch Integration

There are two primary methods for moving data from Kafka into Elasticsearch: using Logstash as a consumer or utilizing the Kafka Connect framework.

Logstash as a Kafka Consumer

Logstash provides a native Kafka input plugin that allows it to subscribe to one or more Kafka topics. This method is highly flexible, as it allows for complex transformations using the Logstash filter engine before the data reaches Elasticsearch.

In a production configuration, specific parameters must be defined to ensure unique identification and efficient consumption. The client_id must be set to a unique value for each instance to avoid conflicts within the Kafka cluster. The group_id allows multiple Logstash instances to share the load of a single topic.

The following configuration examples demonstrate how to segment data based on the source of the logs, such as array storage, server logs, or network switch data.

Example for array storage logs:

conf input { kafka { client_id => "logstash69158-array" group_id => "logstash-array" topics => ["unity", "vnx", "xio", "pssc", "powerstore"] codec => "json" bootstrap_servers => "10.226.69.155:9092,10.226.69.156:9092,10.226.69.157:9092" } } output { elasticsearch { hosts => ["http://e2e-l4-0690-152:9200", "http://e2e-l4-0690-153:9200", "http://e2e-l4-0690-154:9200"] index => "edc-storage-%{+YYYY.MM.dd}" } }

Example for server logs:

conf input { kafka { client_id => "logstash69158-server" group_id => "logstash-server" topics => ["sc-sles", "ps-rhel", "vnx-exchange", "vnx-mssql"] codec => "json" bootstrap_servers => "10.226.69.155:9092,10.226.69.156:9092,10.226.69.157:9092" } } output { elasticsearch { hosts => ["http://e2e-l4-0690-152:9200", "http://e2e-l4-0690-153:9200", "http://e2e-l4-0690-154:9200"] index => "edc-server-%{+YYYY.MM.dd}" } }

Example for network switch logs:

conf input { kafka { client_id => "logstash69158-switch" group_id => "logstash-switch" topics => ["ether-switch"] codec => "json" bootstrap_servers => "10.226.69.155:9092,10.226.69.156:9092,10.226.69.157:9092" } }

The Elasticsearch Sink Connector via Kafka Connect

For users seeking a more streamlined integration with minimum effort, the Elasticsearch sink connector provided by Confluent is the optimal choice. This connector is designed to push events directly from Kafka into an Elasticsearch index, bypassing the need for Logstash entirely if complex filtering is not required.

Deployment is typically handled via Docker Compose using Confluent Docker images. There are two primary image variants: one with connectors preinstalled and one that is "clean." For the clean image, the connector must be installed via the Confluent Hub using the following command:

confluent-hub install confluentinc/kafka-connect-elasticsearch:5.4.0

The sink connector maintains broad compatibility, supporting Elasticsearch versions 2.x, 5.x, 6.x, and 7.x.

Configuration and Tuning of the Elasticsearch Sink Connector

Configuring the Elasticsearch sink connector requires a balance between throughput and stability. The performance of the connector is heavily influenced by the Kafka topic volume, the size of the Elasticsearch cluster, the memory available to Kafka Connect nodes, the size of the documents, and the indexing latency of the target cluster.

Core Tuning Parameters

Three primary parameters govern how the connector batches and flushes data to Elasticsearch:

  • flush.timeout.ms: The maximum amount of time to wait before flushing the buffer. The default is 10000 (10 seconds).
  • max.buffered.events: The maximum number of events to hold in memory before flushing. The default is 20000.
  • batch.size: The number of events sent in a single request to Elasticsearch. The default is 2000.

With these default settings, each task is expected to flush 10 batches of 2,000 events every 10 seconds. If the infrastructure cannot sustain this rate, the connector may crash or fall behind, creating a backlog of data. Administrators must tune these values based on their specific hardware and data requirements to avoid systemic failure.

Handling Schemas and Keys

When streaming JSON documents without a predefined schema or a message key, specific attributes must be set in the connector configuration:

  • schema.ignore: When set to true, the connector ignores the lack of a schema, allowing the ingestion of raw JSON.
  • key.ignore: When set to true, the connector ignores the Kafka message key and only inserts the value into Elasticsearch.

A complete configuration request using a curl command to the Kafka Connect REST API is illustrated below:

bash curl -X PUT localhost:8083/connectors/simple-elasticsearch-connector/config -H "Content-Type: application/json" -d '{ "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "connection.url": "http://elasticsearch:9200", "tasks.max": "1", "topics": "simple.elasticsearch.data", "name": "simple-elasticsearch-connector", "type.name": "_doc", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "schema.ignore": "true", "key.ignore": "true" }'

Monitoring Kafka Health via the Elastic Stack

To maintain a healthy pipeline, it is critical to monitor the Kafka brokers using the Elastic Stack. The kafka.jvm dataset provides deep insights into the Java Virtual Machine running the broker, which is essential for diagnosing performance degradation.

JVM Metrics and Their Impact

The JVM dataset collects several categories of metrics:

  • Runtime metrics: Including uptime, VM name, version, and vendor.
  • Memory metrics: Heap and non-heap usage and memory pool statistics.
  • Threading metrics: Thread counts and states, as well as deadlocks.
  • Garbage collection (GC) metrics: Counts and durations of GC events.
  • Class loading: Total loaded and unloaded class counts.
  • Buffer pool: Memory usage and capacity.
  • JIT compilation: Time spent in Just-In-Time compilation.

Failure to monitor these metrics can lead to "Stop-the-World" GC pauses, which can cause Kafka brokers to be marked as offline by the ZooKeeper or KRaft controller, leading to partition instability.

Replica Manager and Partition Health

Beyond the JVM, the replica_manager metrics provide visibility into the data replication process. These metrics are critical for ensuring high availability.

Metric Description Type
kafka.replicamanager.reassigningpartitions Number of partitions currently being reassigned double
kafka.replicamanager.replicaalterlogdirsmanager.deadthread_count Dead threads in the ReplicaAlterLogDirsManager double
kafka.replicamanager.replicaalterlogdirsmanager.failedpartitions_count Failed partitions in the ReplicaAlterLogDirsManager double
kafka.replicamanager.replicaalterlogdirsmanager.maxlag Maximum lag for the ReplicaAlterLogDirsManager double
kafka.replicamanager.replicafetchermanager.deadthread_count Dead threads in the ReplicaFetcherManager double
kafka.replicamanager.replicafetchermanager.failedpartitions_count Failed partitions in the ReplicaFetcherManager double
kafka.replicamanager.replicafetchermanager.maxlag Maximum lag for the ReplicaFetcherManager double
kafka.replicamanager.underminisrpartition_count Partitions under minimum in-sync replica count double
kafka.replicamanager.underreplicated_partitions Number of under-replicated partitions double

The under_replicated_partitions metric is particularly vital. An increase in this value indicates that one or more followers are not keeping up with the leader, which compromises the fault tolerance of the cluster.

Conclusion

The integration of Apache Kafka into the Elastic Stack is not merely an additive feature but a strategic architectural decision to ensure scalability and resilience. By decoupling the ingestion and indexing layers, Kafka transforms the ELK stack from a fragile pipeline into a robust data streaming platform capable of handling unpredictable bursts of telemetry and log data.

Whether employing Logstash for its rich filtering capabilities or the Confluent Elasticsearch Sink Connector for its operational simplicity, the primary goal remains the same: the elimination of bottlenecks. The use of Kafka as a cache layer ensures that data is persisted and ordered, providing a safety net that allows the Elasticsearch cluster to be scaled or tuned without the risk of losing data. Furthermore, the implementation of comprehensive monitoring via the kafka.jvm and replica_manager datasets allows operators to move from reactive troubleshooting to proactive capacity management, ensuring that the entire data pipeline remains performant under any load.

Sources

  1. Confluent Blog - Kafka Elasticsearch Connector Tutorial
  2. Elastic Stack ReadTheDocs - E2E Kafka Practices
  3. Elastic Documentation - Kafka Integrations

Related Posts