Official Python SDK for LogTide — self-hosted log management with sync & async clients,
stdlib logging integration, automatic batching,
retry logic, circuit breaker, payload limits, and Flask/Django/FastAPI/Starlette middleware.
Installation
Core (sync client only):
pip install logtide-sdk With optional extras:
# Async client (requires aiohttp)
pip install logtide-sdk[async]
# Flask middleware
pip install logtide-sdk[flask]
# Django middleware
pip install logtide-sdk[django]
# FastAPI middleware
pip install logtide-sdk[fastapi]
# Starlette middleware (standalone)
pip install logtide-sdk[starlette]
# All extras
pip install logtide-sdk[async,flask,django,fastapi,starlette] Quick Start
from logtide_sdk import LogTideClient, ClientOptions
client = LogTideClient(
ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
)
)
client.info('api-gateway', 'Server started', {'port': 3000})
client.error('database', 'Connection failed', Exception('Timeout'))
# Graceful shutdown (also registered automatically via atexit)
client.close() Features
- ✅ Sync client (
LogTideClient) and async client (AsyncLogTideClientvia aiohttp) - ✅ stdlib
loggingintegration viaLogTideHandler - ✅ Automatic batching with configurable size and flush interval
- ✅ Retry logic with exponential backoff
- ✅ Circuit breaker pattern (CLOSED / OPEN / HALF_OPEN)
- ✅ Payload limits — field truncation, base64 removal, field exclusion, max entry size
- ✅ Max buffer size with silent drop policy to prevent memory leaks
- ✅ Query API for searching and filtering logs
- ✅ Live tail with Server-Sent Events (SSE)
- ✅ Trace ID context for distributed tracing
- ✅ Global metadata added to all logs
- ✅ Structured exception serialization with parsed stack frames
- ✅ Internal metrics (logs sent, dropped, errors, latency, circuit breaker trips)
- ✅ Flask, Django, FastAPI & Starlette middleware for auto-logging HTTP requests
- ✅ Full Python 3.8+ support with type hints
Configuration
All options are passed via ClientOptions:
from logtide_sdk import LogTideClient, ClientOptions, PayloadLimitsOptions
client = LogTideClient(
ClientOptions(
# Required
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
# Performance
batch_size=100, # Logs per batch before immediate flush (default: 100)
flush_interval=5000, # Auto-flush interval in ms (default: 5000)
max_buffer_size=10000, # Max buffered logs; excess are silently dropped (default: 10000)
# Reliability
max_retries=3, # Max retry attempts on send failure (default: 3)
retry_delay_ms=1000, # Initial retry delay, doubles each attempt (default: 1000)
circuit_breaker_threshold=5, # Consecutive failures before opening circuit (default: 5)
circuit_breaker_reset_ms=30000, # Time before testing a half-open circuit (default: 30000)
# Observability
global_metadata={ # Merged into every log entry
'env': 'production',
'version': '1.0.0',
'region': 'eu-west-1',
},
auto_trace_id=False, # Auto-generate a UUID trace ID per log (default: False)
# Payload safety
payload_limits=PayloadLimitsOptions(
max_field_size=10 * 1024, # Max length of any single string field (default: 10 KB)
max_log_size=100 * 1024, # Max total serialized entry size (default: 100 KB)
exclude_fields=['password'], # Fields replaced with "[EXCLUDED]"
),
# Debug
debug=False, # Print debug output to console (default: False)
)
) Logging Methods
Basic Logging
client.debug('service', 'Debug message')
client.info('api-gateway', 'Request received', {'method': 'GET', 'path': '/users'})
client.warn('cache', 'Cache miss', {'key': 'user:123'})
client.error('database', 'Query failed', {'query': 'SELECT *'})
client.critical('system', 'Out of memory', {'used': '95%'}) Exception Auto-Serialization
Pass an Exception directly to error() or critical() — it is serialized automatically with full stack frames.
try:
raise RuntimeError('Database timeout')
except Exception as e:
client.error('database', 'Query failed', e)
# Generated metadata:
# {
# "exception": {
# "type": "RuntimeError",
# "message": "Database timeout",
# "language": "python",
# "stacktrace": [{"file": "app.py", "function": "run_query", "line": 42}],
# "raw": "Traceback (most recent call last): ..."
# }
# } Async Client
AsyncLogTideClient uses aiohttp and is the recommended client for async frameworks. Install with pip install logtide-sdk[async].
Context Manager (recommended)
import asyncio
from logtide_sdk import AsyncLogTideClient, ClientOptions
async def main():
async with AsyncLogTideClient(ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
)) as client:
await client.info('my-service', 'Hello from async!')
await client.error('my-service', 'Something failed', Exception('oops'))
asyncio.run(main()) Manual Lifecycle
client = AsyncLogTideClient(options)
await client.start() # starts background flush loop
try:
await client.info('svc', 'message')
finally:
await client.close()
stdlib logging Integration
LogTideHandler is a standard logging.Handler — drop it into any existing logging setup. Exception info is serialized with full structured stack frames when exc_info=True is used.
import logging
from logtide_sdk import LogTideClient, ClientOptions, LogTideHandler
client = LogTideClient(ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
))
handler = LogTideHandler(client=client, service='my-service')
handler.setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
logger.addHandler(handler)
# These are forwarded to LogTide automatically
logger.warning('Low disk space')
logger.error('Unhandled exception', exc_info=True) Trace ID Context
Manual Trace ID
client.set_trace_id('request-123')
client.info('api', 'Request received')
client.info('db', 'Querying users')
client.info('api', 'Response sent')
client.set_trace_id(None) # clear Scoped Trace ID (Context Manager)
with client.with_trace_id('request-456'):
client.info('api', 'Processing in context')
client.warn('cache', 'Cache miss')
# Trace ID automatically restored after block Auto-Generated Trace ID
with client.with_new_trace_id():
client.info('worker', 'Background job started')
client.info('worker', 'Job completed') Payload Limits
PayloadLimitsOptions prevents 413 errors and protects against oversized or sensitive payloads. Base64-encoded strings are automatically replaced with "[BASE64 DATA REMOVED]".
from logtide_sdk import LogTideClient, ClientOptions, PayloadLimitsOptions
client = LogTideClient(
ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
payload_limits=PayloadLimitsOptions(
max_field_size=5 * 1024, # Truncate strings longer than 5 KB
max_log_size=100 * 1024, # Drop entries larger than 100 KB
exclude_fields=['password', 'token'], # Replace with "[EXCLUDED]"
truncation_marker='...[TRUNCATED]', # Appended to truncated strings
),
)
) Middleware Integration
All middleware auto-logs requests, responses (with duration and status code), and errors (with serialized exception metadata). Health check paths (/health, /healthz) are skipped by default.
Flask
from flask import Flask
from logtide_sdk import LogTideClient, ClientOptions
from logtide_sdk.middleware import LogTideFlaskMiddleware
app = Flask(__name__)
client = LogTideClient(ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
))
LogTideFlaskMiddleware(
app,
client=client,
service_name='flask-api',
log_requests=True,
log_responses=True,
skip_paths=['/metrics'],
) Django
# settings.py
from logtide_sdk import LogTideClient, ClientOptions
LOGTIDE_CLIENT = LogTideClient(ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
))
LOGTIDE_SERVICE_NAME = 'django-api'
MIDDLEWARE = [
'logtide_sdk.middleware.LogTideDjangoMiddleware',
# ...
] FastAPI
from fastapi import FastAPI
from logtide_sdk import LogTideClient, ClientOptions
from logtide_sdk.middleware import LogTideFastAPIMiddleware
app = FastAPI()
client = LogTideClient(ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
))
app.add_middleware(LogTideFastAPIMiddleware, client=client, service_name='fastapi-api') Starlette
from starlette.applications import Starlette
from logtide_sdk import LogTideClient, ClientOptions
from logtide_sdk.middleware import LogTideStarletteMiddleware
app = Starlette()
client = LogTideClient(ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
))
app.add_middleware(LogTideStarletteMiddleware, client=client, service_name='starlette-api') Query API
Search Logs
from datetime import datetime, timedelta
from logtide_sdk import QueryOptions, LogLevel
result = client.query(
QueryOptions(
service='api-gateway',
level=LogLevel.ERROR,
from_time=datetime.now() - timedelta(hours=24),
to_time=datetime.now(),
q='timeout', # Full-text search
limit=100,
offset=0,
)
)
print(f"Found {result.total} logs")
for log in result.logs:
print(log) Get Logs by Trace ID
logs = client.get_by_trace_id('trace-123') Aggregated Statistics
from logtide_sdk import AggregatedStatsOptions
stats = client.get_aggregated_stats(
AggregatedStatsOptions(
from_time=datetime.now() - timedelta(days=7),
to_time=datetime.now(),
interval='1h',
)
)
for service in stats.top_services:
print(f"{service['service']}: {service['count']} logs") Live Streaming (SSE)
stream() runs in a background daemon thread and returns immediately with a stop callable.
def handle_log(log):
print(f"[{log['time']}] {log['level']}: {log['message']}")
stop = client.stream(
on_log=handle_log,
on_error=lambda e: print(f"Stream error: {e}"),
filters={'service': 'api-gateway', 'level': 'error'},
)
# ... later, to stop:
stop() Async Streaming
task = asyncio.create_task(client.stream(on_log=handle_log))
# ... later:
task.cancel() Metrics
metrics = client.get_metrics()
print(f"Logs sent: {metrics.logs_sent}")
print(f"Logs dropped: {metrics.logs_dropped}")
print(f"Errors: {metrics.errors}")
print(f"Retries: {metrics.retries}")
print(f"Avg latency: {metrics.avg_latency_ms:.1f}ms")
print(f"Circuit breaker trips: {metrics.circuit_breaker_trips}")
print(client.get_circuit_breaker_state()) # CLOSED | OPEN | HALF_OPEN
client.reset_metrics() Best Practices
client.close() is registered automatically via atexit, but call it explicitly in signal handlers or framework lifespan hooks to ensure buffered logs are flushed before exit.
global_metadata in ClientOptions for fields common to all logs (environment, version, region) instead of repeating them in every call.
with client.with_trace_id('id'): or with client.with_new_trace_id(): to automatically correlate all logs within a request or job.
PayloadLimitsOptions to prevent 413 errors from oversized payloads and exclude sensitive fields like passwords or tokens.
client.get_metrics() periodically to detect dropped logs (buffer full) or circuit breaker trips before they become production incidents.