Apache Kafka Log Pipeline Integration
Use Apache Kafka as a log transport layer to buffer and route high-volume log streams into LogTide reliably.
When you’re generating millions of log events per second, shipping directly to LogTide may not be enough. Apache Kafka acts as a durable buffer between your applications and LogTide, providing backpressure handling, replay capability, and multi-consumer fanout. This guide shows you how to build a Kafka-backed log pipeline.
Why Kafka for log transport?
- Backpressure handling: Applications never block on log shipping — Kafka absorbs bursts
- Durability: Logs are persisted to disk before acknowledgment — no data loss on restarts
- Replay: Re-process historical logs by resetting consumer offsets
- Fanout: Send the same log stream to LogTide, S3 archival, and real-time alerting
- Decoupling: Applications don’t need to know about LogTide — they publish to Kafka
- Throughput: Kafka handles millions of messages per second per partition
Architecture
┌─────────┐ ┌─────────┐ ┌─────────┐
│ App 1 │────▶│ │ │ │
├─────────┤ │ Kafka │────▶│ LogTide │
│ App 2 │────▶│ Cluster │ │ │
├─────────┤ │ │ └─────────┘
│ App 3 │────▶│ │────▶ S3 Archive
├─────────┤ │ │
│ nginx │────▶│ │────▶ Alerting
└─────────┘ └─────────┘
Applications produce logs to Kafka topics. A Kafka Connect sink or Fluent Bit consumer ships logs to LogTide. Other consumers can archive to S3 or trigger real-time alerts.
Prerequisites
- Apache Kafka 3.0+ (or Redpanda as a Kafka-compatible alternative)
- Fluent Bit or a custom consumer
- LogTide instance with API key
- Docker or Kubernetes for deployment
Kafka Topic Setup
Create Log Topics
# Create a topic for application logs
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic logs.application \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=86400000 \ # 24 hour retention
--config retention.bytes=10737418240 \ # 10 GB max per partition
--config compression.type=lz4
# Create a topic for infrastructure logs
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic logs.infrastructure \
--partitions 6 \
--replication-factor 3 \
--config retention.ms=86400000 \
--config compression.type=lz4
# Create a topic for security events (higher retention)
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic logs.security \
--partitions 6 \
--replication-factor 3 \
--config retention.ms=604800000 \ # 7 day retention
--config compression.type=lz4
Topic Naming Convention
logs.{category}
Examples:
logs.application # App-level logs
logs.infrastructure # nginx, systemd, etc.
logs.security # Auth events, SIEM
logs.{service-name} # Per-service topics (optional)
Producing Logs to Kafka
Node.js Producer
// lib/kafka-logger.ts
import { Kafka, Partitioners } from 'kafkajs';
const kafka = new Kafka({
clientId: 'my-service',
brokers: process.env.KAFKA_BROKERS!.split(','),
});
const producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
allowAutoTopicCreation: false,
});
await producer.connect();
interface LogEvent {
level: string;
message: string;
service: string;
metadata?: Record<string, unknown>;
}
export async function sendLog(event: LogEvent) {
await producer.send({
topic: 'logs.application',
messages: [{
// Partition by service name for ordering
key: event.service,
value: JSON.stringify({
...event,
timestamp: new Date().toISOString(),
hostname: process.env.HOSTNAME,
}),
headers: {
'content-type': 'application/json',
},
}],
});
}
// Graceful shutdown
process.on('SIGTERM', async () => {
await producer.disconnect();
});
Python Producer
# lib/kafka_logger.py
import json
import os
import socket
from datetime import datetime, timezone
from confluent_kafka import Producer
conf = {
'bootstrap.servers': os.environ['KAFKA_BROKERS'],
'client.id': 'my-service',
'compression.type': 'lz4',
'linger.ms': 50, # Batch for 50ms
'batch.num.messages': 1000,
}
producer = Producer(conf)
def send_log(level: str, message: str, service: str, **metadata):
event = {
'level': level,
'message': message,
'service': service,
'metadata': metadata,
'timestamp': datetime.now(timezone.utc).isoformat(),
'hostname': socket.gethostname(),
}
producer.produce(
topic='logs.application',
key=service.encode('utf-8'),
value=json.dumps(event).encode('utf-8'),
)
# Trigger delivery callbacks
producer.poll(0)
def flush():
producer.flush(timeout=10)
Consuming Logs into LogTide
Fluent Bit Kafka Consumer
# /etc/fluent-bit/fluent-bit.conf
[SERVICE]
Flush 5
Log_Level info
# Consume from Kafka
[INPUT]
Name kafka
Tag kafka.logs
Brokers kafka-1:9092,kafka-2:9092,kafka-3:9092
Topics logs.application,logs.infrastructure,logs.security
Group_Id logtide-consumer
Format json
Poll_Ms 100
# Output to LogTide
[OUTPUT]
Name http
Match *
Host api.logtide.dev
Port 443
URI /api/v1/ingest/single
Format json
Header X-API-Key ${LOGTIDE_API_KEY}
Header Content-Type application/json
tls On
tls.verify On
Retry_Limit 5
Custom Consumer (Node.js)
For more control, use a custom consumer:
// consumer.ts
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: 'logtide-consumer',
brokers: process.env.KAFKA_BROKERS!.split(','),
});
const consumer = kafka.consumer({
groupId: 'logtide-ingest',
maxWaitTimeInMs: 100,
maxBytesPerPartition: 1048576, // 1MB per partition
});
const LOGTIDE_API_URL = process.env.LOGTIDE_API_URL!;
const LOGTIDE_API_KEY = process.env.LOGTIDE_API_KEY!;
const buffer: any[] = [];
const BATCH_SIZE = 100;
const FLUSH_INTERVAL = 5000;
async function flushToLogTide() {
if (buffer.length === 0) return;
const batch = buffer.splice(0, BATCH_SIZE);
const response = await fetch(`${LOGTIDE_API_URL}/api/v1/ingest/batch`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-API-Key': LOGTIDE_API_KEY,
},
body: JSON.stringify({ events: batch }),
});
if (!response.ok) {
console.error(`LogTide ingest failed: ${response.status}`);
// Put back in buffer for retry
buffer.unshift(...batch);
}
}
setInterval(flushToLogTide, FLUSH_INTERVAL);
await consumer.connect();
await consumer.subscribe({
topics: ['logs.application', 'logs.infrastructure', 'logs.security'],
});
await consumer.run({
eachMessage: async ({ topic, message }) => {
try {
const event = JSON.parse(message.value!.toString());
event._topic = topic;
buffer.push(event);
if (buffer.length >= BATCH_SIZE) {
await flushToLogTide();
}
} catch (error) {
console.error('Failed to parse message:', error);
}
},
});
Docker Setup
# docker-compose.yml
services:
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_LOG_RETENTION_HOURS: 24
KAFKA_LOG_SEGMENT_BYTES: 1073741824
KAFKA_AUTO_CREATE_TOPICS_ENABLE: false
CLUSTER_ID: kafka-logtide-cluster-001
volumes:
- kafka_data:/var/lib/kafka/data
kafka-consumer:
image: fluent/fluent-bit:latest
volumes:
- ./fluent-bit-kafka.conf:/fluent-bit/etc/fluent-bit.conf:ro
environment:
- LOGTIDE_API_KEY=${LOGTIDE_API_KEY}
depends_on:
- kafka
volumes:
kafka_data:
Kubernetes Deployment
# kafka-consumer-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: logtide-kafka-consumer
spec:
replicas: 3 # One per Kafka partition group
selector:
matchLabels:
app: logtide-kafka-consumer
template:
metadata:
labels:
app: logtide-kafka-consumer
spec:
containers:
- name: consumer
image: fluent/fluent-bit:latest
volumeMounts:
- name: config
mountPath: /fluent-bit/etc/
env:
- name: LOGTIDE_API_KEY
valueFrom:
secretKeyRef:
name: logtide-credentials
key: api-key
- name: KAFKA_BROKERS
value: "kafka-0.kafka:9092,kafka-1.kafka:9092,kafka-2.kafka:9092"
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
volumes:
- name: config
configMap:
name: kafka-consumer-config
Monitoring the Pipeline
Consumer Lag
Track consumer lag to ensure LogTide keeps up:
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group logtide-consumer \
--describe
Pipeline Health Metrics
| Metric | Healthy | Warning | Critical |
|---|---|---|---|
| Consumer lag | <1000 | 1000-10000 | >10000 |
| Throughput | >1000 msgs/s | - | <100 msgs/s |
| Partition balance | Even | 2x skew | 5x skew |
| Error rate | <0.01% | 0.01-1% | >1% |
Performance Tuning
Producer Tuning
linger.ms=50 # Batch window (50ms)
batch.size=65536 # Max batch size (64KB)
compression.type=lz4 # Compress batches
acks=1 # Leader acknowledgment (balance speed/durability)
Consumer Tuning
fetch.min.bytes=1048576 # Wait for 1MB before fetching
fetch.max.wait.ms=500 # Max wait time
max.poll.records=500 # Process 500 records per poll
Partition Strategy
- 12 partitions for high-throughput topics (allows 12 parallel consumers)
- 6 partitions for moderate topics
- 3 partitions for low-volume security events
Troubleshooting
Consumer lag increasing
- Check consumer health:
kafka-consumer-groups.sh --describe - Add more consumer instances (up to partition count)
- Check if LogTide is accepting logs (connection issues?)
- Increase batch size for more efficient shipping
Messages not being consumed
- Verify topic exists:
kafka-topics.sh --list - Check consumer group:
kafka-consumer-groups.sh --describe - Verify Fluent Bit Kafka plugin configuration
Data loss during restarts
Ensure acks=all for critical topics:
acks=all
min.insync.replicas=2
This guarantees logs survive broker failures.
Next Steps
- Docker Integration - Container log routing to Kafka
- Kubernetes Integration - Kafka on K8s
- Real-Time Alerting - Alert on Kafka streams
- Node.js Integration - Application log producers