Apache Kafka Log Pipeline Integration
Use Apache Kafka as a log transport layer to buffer and route high-volume log streams into LogTide reliably.
When you’re generating millions of log events per second, shipping directly to LogTide may not be enough. Apache Kafka acts as a durable buffer between your applications and LogTide, providing backpressure handling, replay capability, and multi-consumer fanout. This guide shows you how to build a Kafka-backed log pipeline.
Why Kafka for log transport?
- Backpressure handling: Applications never block on log shipping — Kafka absorbs bursts
- Durability: Logs are persisted to disk before acknowledgment — no data loss on restarts
- Replay: Re-process historical logs by resetting consumer offsets
- Fanout: Send the same log stream to LogTide, S3 archival, and real-time alerting
- Decoupling: Applications don’t need to know about LogTide — they publish to Kafka
- Throughput: Kafka handles millions of messages per second per partition
Architecture
┌─────────┐ ┌─────────┐ ┌─────────┐
│ App 1 │────▶│ │ │ │
├─────────┤ │ Kafka │────▶│ LogTide │
│ App 2 │────▶│ Cluster │ │ │
├─────────┤ │ │ └─────────┘
│ App 3 │────▶│ │────▶ S3 Archive
├─────────┤ │ │
│ nginx │────▶│ │────▶ Alerting
└─────────┘ └─────────┘
Applications produce logs to Kafka topics. A Kafka Connect sink or Fluent Bit consumer ships logs to LogTide. Other consumers can archive to S3 or trigger real-time alerts.
Prerequisites
- Apache Kafka 3.0+ (or Redpanda as a Kafka-compatible alternative)
- Fluent Bit or a custom consumer
- LogTide instance with API key
- Docker or Kubernetes for deployment
Kafka Topic Setup
Create Log Topics
# Create a topic for application logs
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic logs.application \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=86400000 \ # 24 hour retention
--config retention.bytes=10737418240 \ # 10 GB max per partition
--config compression.type=lz4
# Create a topic for infrastructure logs
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic logs.infrastructure \
--partitions 6 \
--replication-factor 3 \
--config retention.ms=86400000 \
--config compression.type=lz4
# Create a topic for security events (higher retention)
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic logs.security \
--partitions 6 \
--replication-factor 3 \
--config retention.ms=604800000 \ # 7 day retention
--config compression.type=lz4
Topic Naming Convention
logs.{category}
Examples:
logs.application # App-level logs
logs.infrastructure # nginx, systemd, etc.
logs.security # Auth events, SIEM
logs.{service-name} # Per-service topics (optional)
Producing Logs to Kafka
Node.js Producer
// lib/kafka-logger.ts
import { Kafka, Partitioners } from 'kafkajs';
const kafka = new Kafka({
clientId: 'my-service',
brokers: process.env.KAFKA_BROKERS!.split(','),
});
const producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
allowAutoTopicCreation: false,
});
await producer.connect();
interface LogEvent {
level: string;
message: string;
service: string;
metadata?: Record<string, unknown>;
}
export async function sendLog(event: LogEvent) {
await producer.send({
topic: 'logs.application',
messages: [{
// Partition by service name for ordering
key: event.service,
value: JSON.stringify({
...event,
timestamp: new Date().toISOString(),
hostname: process.env.HOSTNAME,
}),
headers: {
'content-type': 'application/json',
},
}],
});
}
// Graceful shutdown
process.on('SIGTERM', async () => {
await producer.disconnect();
});
Python Producer
# lib/kafka_logger.py
import json
import os
import socket
from datetime import datetime, timezone
from confluent_kafka import Producer
conf = {
'bootstrap.servers': os.environ['KAFKA_BROKERS'],
'client.id': 'my-service',
'compression.type': 'lz4',
'linger.ms': 50, # Batch for 50ms
'batch.num.messages': 1000,
}
producer = Producer(conf)
def send_log(level: str, message: str, service: str, **metadata):
event = {
'level': level,
'message': message,
'service': service,
'metadata': metadata,
'timestamp': datetime.now(timezone.utc).isoformat(),
'hostname': socket.gethostname(),
}
producer.produce(
topic='logs.application',
key=service.encode('utf-8'),
value=json.dumps(event).encode('utf-8'),
)
# Trigger delivery callbacks
producer.poll(0)
def flush():
producer.flush(timeout=10)
Consuming Logs into LogTide
Fluent Bit Kafka Consumer
# /etc/fluent-bit/fluent-bit.conf
[SERVICE]
Flush 5
Log_Level info
# Consume from Kafka
[INPUT]
Name kafka
Tag kafka.logs
Brokers kafka-1:9092,kafka-2:9092,kafka-3:9092
Topics logs.application,logs.infrastructure,logs.security
Group_Id logtide-consumer
Format json
Poll_Ms 100
# Output to LogTide
[OUTPUT]
Name http
Match *
Host api.logtide.dev
Port 443
URI /api/v1/ingest/single
Format json
Header X-API-Key ${LOGTIDE_API_KEY}
Header Content-Type application/json
tls On
tls.verify On
Retry_Limit 5
Custom Consumer (Node.js)
For more control, use a custom consumer:
// consumer.ts
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: 'logtide-consumer',
brokers: process.env.KAFKA_BROKERS!.split(','),
});
const consumer = kafka.consumer({
groupId: 'logtide-ingest',
maxWaitTimeInMs: 100,
maxBytesPerPartition: 1048576, // 1MB per partition
});
const LOGTIDE_API_URL = process.env.LOGTIDE_API_URL!;
const LOGTIDE_API_KEY = process.env.LOGTIDE_API_KEY!;
const buffer: any[] = [];
const BATCH_SIZE = 100;
const FLUSH_INTERVAL = 5000;
async function flushToLogTide() {
if (buffer.length === 0) return;
const batch = buffer.splice(0, BATCH_SIZE);
const response = await fetch(`${LOGTIDE_API_URL}/api/v1/ingest/batch`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-API-Key': LOGTIDE_API_KEY,
},
body: JSON.stringify({ events: batch }),
});
if (!response.ok) {
console.error(`LogTide ingest failed: ${response.status}`);
// Put back in buffer for retry
buffer.unshift(...batch);
}
}
setInterval(flushToLogTide, FLUSH_INTERVAL);
await consumer.connect();
await consumer.subscribe({
topics: ['logs.application', 'logs.infrastructure', 'logs.security'],
});
await consumer.run({
eachMessage: async ({ topic, message }) => {
try {
const event = JSON.parse(message.value!.toString());
event._topic = topic;
buffer.push(event);
if (buffer.length >= BATCH_SIZE) {
await flushToLogTide();
}
} catch (error) {
console.error('Failed to parse message:', error);
}
},
});
Docker Setup
# docker-compose.yml
services:
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_LOG_RETENTION_HOURS: 24
KAFKA_LOG_SEGMENT_BYTES: 1073741824
KAFKA_AUTO_CREATE_TOPICS_ENABLE: false
CLUSTER_ID: kafka-logtide-cluster-001
volumes:
- kafka_data:/var/lib/kafka/data
kafka-consumer:
image: fluent/fluent-bit:latest
volumes:
- ./fluent-bit-kafka.conf:/fluent-bit/etc/fluent-bit.conf:ro
environment:
- LOGTIDE_API_KEY=${LOGTIDE_API_KEY}
depends_on:
- kafka
volumes:
kafka_data:
Kubernetes Deployment
# kafka-consumer-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: logtide-kafka-consumer
spec:
replicas: 3 # One per Kafka partition group
selector:
matchLabels:
app: logtide-kafka-consumer
template:
metadata:
labels:
app: logtide-kafka-consumer
spec:
containers:
- name: consumer
image: fluent/fluent-bit:latest
volumeMounts:
- name: config
mountPath: /fluent-bit/etc/
env:
- name: LOGTIDE_API_KEY
valueFrom:
secretKeyRef:
name: logtide-credentials
key: api-key
- name: KAFKA_BROKERS
value: "kafka-0.kafka:9092,kafka-1.kafka:9092,kafka-2.kafka:9092"
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
volumes:
- name: config
configMap:
name: kafka-consumer-config
Monitoring the Pipeline
Consumer Lag
Track consumer lag to ensure LogTide keeps up:
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group logtide-consumer \
--describe
Pipeline Health Metrics
| Metric | Healthy | Warning | Critical |
|---|---|---|---|
| Consumer lag | <1000 | 1000-10000 | >10000 |
| Throughput | >1000 msgs/s | - | <100 msgs/s |
| Partition balance | Even | 2x skew | 5x skew |
| Error rate | <0.01% | 0.01-1% | >1% |
Performance Tuning
Producer Tuning
linger.ms=50 # Batch window (50ms)
batch.size=65536 # Max batch size (64KB)
compression.type=lz4 # Compress batches
acks=1 # Leader acknowledgment (balance speed/durability)
Consumer Tuning
fetch.min.bytes=1048576 # Wait for 1MB before fetching
fetch.max.wait.ms=500 # Max wait time
max.poll.records=500 # Process 500 records per poll
Partition Strategy
- 12 partitions for high-throughput topics (allows 12 parallel consumers)
- 6 partitions for moderate topics
- 3 partitions for low-volume security events
Troubleshooting
Consumer lag increasing
- Check consumer health:
kafka-consumer-groups.sh --describe - Add more consumer instances (up to partition count)
- Check if LogTide is accepting logs (connection issues?)
- Increase batch size for more efficient shipping
Messages not being consumed
- Verify topic exists:
kafka-topics.sh --list - Check consumer group:
kafka-consumer-groups.sh --describe - Verify Fluent Bit Kafka plugin configuration
Data loss during restarts
Ensure acks=all for critical topics:
acks=all
min.insync.replicas=2
This guarantees logs survive broker failures.
Next Steps
- Docker Integration - Container log routing to Kafka
- Kubernetes Integration - Kafka on K8s
- Real-Time Alerting - Alert on Kafka streams
- Node.js Integration - Application log producers
Frequently Asked Questions
Why should I use Kafka as a log transport layer in front of LogTide?
Kafka acts as a durable buffer between your applications and LogTide, absorbing traffic bursts without blocking application code. It provides at-least-once delivery with disk persistence, the ability to replay logs by resetting consumer offsets, and fanout so the same stream can feed LogTide, S3 archival, and real-time alerting simultaneously.
How do I ship logs from Kafka into LogTide?
The recommended approach is to run Fluent Bit as a Kafka consumer using its built-in kafka input plugin, then configure an http output that posts to your LogTide instance at /api/v1/ingest/single with your API key in the X-API-Key header. Alternatively you can write a custom Node.js or Python consumer that batches records and posts them to /api/v1/ingest/batch.
Does LogTide support structured JSON logs coming through Kafka?
Yes. Both the Fluent Bit consumer and the example Node.js and Python producers serialize log events as JSON. As long as each message includes the expected fields (level, message, service, timestamp), LogTide ingests and indexes them as fully structured log events.
Can I use Redpanda instead of Apache Kafka with this setup?
Yes. The guide explicitly lists Redpanda as a Kafka-compatible alternative. Because Redpanda implements the Kafka protocol, the same Fluent Bit configuration, producer code, and consumer setup work without changes.