LogTide

Python SDK

Official Python SDK for LogTide — self-hosted log management with sync & async clients, stdlib logging integration, automatic batching, retry logic, circuit breaker, payload limits, and Flask/Django/FastAPI/Starlette middleware.

Installation

Core (sync client only):

pip install logtide-sdk

With optional extras:

# Async client (requires aiohttp)
pip install logtide-sdk[async]

# Flask middleware
pip install logtide-sdk[flask]

# Django middleware
pip install logtide-sdk[django]

# FastAPI middleware
pip install logtide-sdk[fastapi]

# Starlette middleware (standalone)
pip install logtide-sdk[starlette]

# All extras
pip install logtide-sdk[async,flask,django,fastapi,starlette]

Quick Start

from logtide_sdk import LogTideClient, ClientOptions

client = LogTideClient(
    ClientOptions(
        api_url='http://localhost:8080',
        api_key='lp_your_api_key_here',
    )
)

client.info('api-gateway', 'Server started', {'port': 3000})
client.error('database', 'Connection failed', Exception('Timeout'))

# Graceful shutdown (also registered automatically via atexit)
client.close()

Features

  • ✅ Sync client (LogTideClient) and async client (AsyncLogTideClient via aiohttp)
  • ✅ stdlib logging integration via LogTideHandler
  • ✅ Automatic batching with configurable size and flush interval
  • ✅ Retry logic with exponential backoff
  • ✅ Circuit breaker pattern (CLOSED / OPEN / HALF_OPEN)
  • ✅ Payload limits — field truncation, base64 removal, field exclusion, max entry size
  • ✅ Max buffer size with silent drop policy to prevent memory leaks
  • ✅ Query API for searching and filtering logs
  • ✅ Live tail with Server-Sent Events (SSE)
  • ✅ Trace ID context for distributed tracing
  • ✅ Global metadata added to all logs
  • ✅ Structured exception serialization with parsed stack frames
  • ✅ Internal metrics (logs sent, dropped, errors, latency, circuit breaker trips)
  • ✅ Flask, Django, FastAPI & Starlette middleware for auto-logging HTTP requests
  • ✅ Full Python 3.8+ support with type hints

Configuration

All options are passed via ClientOptions:

from logtide_sdk import LogTideClient, ClientOptions, PayloadLimitsOptions

client = LogTideClient(
    ClientOptions(
        # Required
        api_url='http://localhost:8080',
        api_key='lp_your_api_key_here',

        # Performance
        batch_size=100,               # Logs per batch before immediate flush (default: 100)
        flush_interval=5000,          # Auto-flush interval in ms (default: 5000)
        max_buffer_size=10000,        # Max buffered logs; excess are silently dropped (default: 10000)

        # Reliability
        max_retries=3,                # Max retry attempts on send failure (default: 3)
        retry_delay_ms=1000,          # Initial retry delay, doubles each attempt (default: 1000)
        circuit_breaker_threshold=5,  # Consecutive failures before opening circuit (default: 5)
        circuit_breaker_reset_ms=30000, # Time before testing a half-open circuit (default: 30000)

        # Observability
        global_metadata={             # Merged into every log entry
            'env': 'production',
            'version': '1.0.0',
            'region': 'eu-west-1',
        },
        auto_trace_id=False,          # Auto-generate a UUID trace ID per log (default: False)

        # Payload safety
        payload_limits=PayloadLimitsOptions(
            max_field_size=10 * 1024,     # Max length of any single string field (default: 10 KB)
            max_log_size=100 * 1024,      # Max total serialized entry size (default: 100 KB)
            exclude_fields=['password'],  # Fields replaced with "[EXCLUDED]"
        ),

        # Debug
        debug=False,                  # Print debug output to console (default: False)
    )
)

Logging Methods

Basic Logging

client.debug('service', 'Debug message')
client.info('api-gateway', 'Request received', {'method': 'GET', 'path': '/users'})
client.warn('cache', 'Cache miss', {'key': 'user:123'})
client.error('database', 'Query failed', {'query': 'SELECT *'})
client.critical('system', 'Out of memory', {'used': '95%'})

Exception Auto-Serialization

Pass an Exception directly to error() or critical() — it is serialized automatically with full stack frames.

try:
    raise RuntimeError('Database timeout')
except Exception as e:
    client.error('database', 'Query failed', e)

# Generated metadata:
# {
#   "exception": {
#     "type": "RuntimeError",
#     "message": "Database timeout",
#     "language": "python",
#     "stacktrace": [{"file": "app.py", "function": "run_query", "line": 42}],
#     "raw": "Traceback (most recent call last): ..."
#   }
# }

Async Client

AsyncLogTideClient uses aiohttp and is the recommended client for async frameworks. Install with pip install logtide-sdk[async].

Context Manager (recommended)

import asyncio
from logtide_sdk import AsyncLogTideClient, ClientOptions

async def main():
    async with AsyncLogTideClient(ClientOptions(
        api_url='http://localhost:8080',
        api_key='lp_your_api_key_here',
    )) as client:
        await client.info('my-service', 'Hello from async!')
        await client.error('my-service', 'Something failed', Exception('oops'))

asyncio.run(main())

Manual Lifecycle

client = AsyncLogTideClient(options)
await client.start()   # starts background flush loop
try:
    await client.info('svc', 'message')
finally:
    await client.close()

stdlib logging Integration

LogTideHandler is a standard logging.Handler — drop it into any existing logging setup. Exception info is serialized with full structured stack frames when exc_info=True is used.

import logging
from logtide_sdk import LogTideClient, ClientOptions, LogTideHandler

client = LogTideClient(ClientOptions(
    api_url='http://localhost:8080',
    api_key='lp_your_api_key_here',
))

handler = LogTideHandler(client=client, service='my-service')
handler.setLevel(logging.WARNING)

logger = logging.getLogger(__name__)
logger.addHandler(handler)

# These are forwarded to LogTide automatically
logger.warning('Low disk space')
logger.error('Unhandled exception', exc_info=True)

Trace ID Context

Manual Trace ID

client.set_trace_id('request-123')

client.info('api', 'Request received')
client.info('db', 'Querying users')
client.info('api', 'Response sent')

client.set_trace_id(None)  # clear

Scoped Trace ID (Context Manager)

with client.with_trace_id('request-456'):
    client.info('api', 'Processing in context')
    client.warn('cache', 'Cache miss')
# Trace ID automatically restored after block

Auto-Generated Trace ID

with client.with_new_trace_id():
    client.info('worker', 'Background job started')
    client.info('worker', 'Job completed')

Payload Limits

PayloadLimitsOptions prevents 413 errors and protects against oversized or sensitive payloads. Base64-encoded strings are automatically replaced with "[BASE64 DATA REMOVED]".

from logtide_sdk import LogTideClient, ClientOptions, PayloadLimitsOptions

client = LogTideClient(
    ClientOptions(
        api_url='http://localhost:8080',
        api_key='lp_your_api_key_here',
        payload_limits=PayloadLimitsOptions(
            max_field_size=5 * 1024,           # Truncate strings longer than 5 KB
            max_log_size=100 * 1024,           # Drop entries larger than 100 KB
            exclude_fields=['password', 'token'],  # Replace with "[EXCLUDED]"
            truncation_marker='...[TRUNCATED]', # Appended to truncated strings
        ),
    )
)

Middleware Integration

All middleware auto-logs requests, responses (with duration and status code), and errors (with serialized exception metadata). Health check paths (/health, /healthz) are skipped by default.

Flask

from flask import Flask
from logtide_sdk import LogTideClient, ClientOptions
from logtide_sdk.middleware import LogTideFlaskMiddleware

app = Flask(__name__)
client = LogTideClient(ClientOptions(
    api_url='http://localhost:8080',
    api_key='lp_your_api_key_here',
))

LogTideFlaskMiddleware(
    app,
    client=client,
    service_name='flask-api',
    log_requests=True,
    log_responses=True,
    skip_paths=['/metrics'],
)

Django

# settings.py
from logtide_sdk import LogTideClient, ClientOptions

LOGTIDE_CLIENT = LogTideClient(ClientOptions(
    api_url='http://localhost:8080',
    api_key='lp_your_api_key_here',
))
LOGTIDE_SERVICE_NAME = 'django-api'

MIDDLEWARE = [
    'logtide_sdk.middleware.LogTideDjangoMiddleware',
    # ...
]

FastAPI

from fastapi import FastAPI
from logtide_sdk import LogTideClient, ClientOptions
from logtide_sdk.middleware import LogTideFastAPIMiddleware

app = FastAPI()
client = LogTideClient(ClientOptions(
    api_url='http://localhost:8080',
    api_key='lp_your_api_key_here',
))

app.add_middleware(LogTideFastAPIMiddleware, client=client, service_name='fastapi-api')

Starlette

from starlette.applications import Starlette
from logtide_sdk import LogTideClient, ClientOptions
from logtide_sdk.middleware import LogTideStarletteMiddleware

app = Starlette()
client = LogTideClient(ClientOptions(
    api_url='http://localhost:8080',
    api_key='lp_your_api_key_here',
))

app.add_middleware(LogTideStarletteMiddleware, client=client, service_name='starlette-api')

Query API

Search Logs

from datetime import datetime, timedelta
from logtide_sdk import QueryOptions, LogLevel

result = client.query(
    QueryOptions(
        service='api-gateway',
        level=LogLevel.ERROR,
        from_time=datetime.now() - timedelta(hours=24),
        to_time=datetime.now(),
        q='timeout',   # Full-text search
        limit=100,
        offset=0,
    )
)

print(f"Found {result.total} logs")
for log in result.logs:
    print(log)

Get Logs by Trace ID

logs = client.get_by_trace_id('trace-123')

Aggregated Statistics

from logtide_sdk import AggregatedStatsOptions

stats = client.get_aggregated_stats(
    AggregatedStatsOptions(
        from_time=datetime.now() - timedelta(days=7),
        to_time=datetime.now(),
        interval='1h',
    )
)

for service in stats.top_services:
    print(f"{service['service']}: {service['count']} logs")

Live Streaming (SSE)

stream() runs in a background daemon thread and returns immediately with a stop callable.

def handle_log(log):
    print(f"[{log['time']}] {log['level']}: {log['message']}")

stop = client.stream(
    on_log=handle_log,
    on_error=lambda e: print(f"Stream error: {e}"),
    filters={'service': 'api-gateway', 'level': 'error'},
)

# ... later, to stop:
stop()

Async Streaming

task = asyncio.create_task(client.stream(on_log=handle_log))
# ... later:
task.cancel()

Metrics

metrics = client.get_metrics()

print(f"Logs sent:              {metrics.logs_sent}")
print(f"Logs dropped:           {metrics.logs_dropped}")
print(f"Errors:                 {metrics.errors}")
print(f"Retries:                {metrics.retries}")
print(f"Avg latency:            {metrics.avg_latency_ms:.1f}ms")
print(f"Circuit breaker trips:  {metrics.circuit_breaker_trips}")

print(client.get_circuit_breaker_state())  # CLOSED | OPEN | HALF_OPEN

client.reset_metrics()

Best Practices

1. Always Close on Shutdown
client.close() is registered automatically via atexit, but call it explicitly in signal handlers or framework lifespan hooks to ensure buffered logs are flushed before exit.
2. Use Global Metadata
Set global_metadata in ClientOptions for fields common to all logs (environment, version, region) instead of repeating them in every call.
3. Use Context Managers for Tracing
Use with client.with_trace_id('id'): or with client.with_new_trace_id(): to automatically correlate all logs within a request or job.
4. Set Payload Limits in Production
Configure PayloadLimitsOptions to prevent 413 errors from oversized payloads and exclude sensitive fields like passwords or tokens.
5. Monitor Internal Metrics
Poll client.get_metrics() periodically to detect dropped logs (buffer full) or circuit breaker trips before they become production incidents.
Esc

Type to search across all documentation pages