CROP
ProjectsPDF Parser

Event Sourcing Guide for Weaviate Service

Event Sourcing is implemented to provide: - Transparency - complete history of all changes - Audit - tracking of all operations - Recovery - time-travel...

Event Sourcing Guide for Weaviate Service

📋 Overview

Event Sourcing is implemented to provide:

  • Transparency - complete history of all changes
  • Audit - tracking of all operations
  • Recovery - time-travel queries
  • Analytics - event statistics
  • Model tracking - support for multiple embedding models

🗄️ Event Storage

Storage Options:

  • ✅ Flexible storage
  • ✅ Fast queries
  • ✅ Easy to scale
  • ✅ Index support
# Set up MongoDB
EVENT_BACKEND=mongodb
MONGODB_URL=mongodb://localhost:27017/
MONGODB_DATABASE=weaviate_events

Installation:

pip install pymongo

2. PostgreSQL (optional, if you already have PostgreSQL infrastructure)

  • ✅ ACID transactions
  • ✅ SQL queries
  • ✅ Good for analytics
  • ✅ JSONB support
EVENT_BACKEND=postgresql
POSTGRESQL_URL=postgresql://user:password@localhost:5432/weaviate_events

Installation:

pip install psycopg2-binary

When to use PostgreSQL:

  • If you already have PostgreSQL infrastructure
  • If you need complex SQL queries for analytics
  • If you need ACID transactions

💡 Recommendation: If you already have MongoDB, use it - it's good enough for Event Sourcing.

3. File-based (for development)

  • ✅ Simple
  • ✅ No database required
  • ❌ Doesn't scale
  • ❌ Slower for large volumes
EVENT_BACKEND=file
EVENT_FILE_PATH=/tmp/weaviate_events.jsonl

📊 How to Perform Audit

1. Get Event History

# All events
GET /events

# Events for a specific document
GET /events?aggregate_id=doc_123

# Events by type
GET /events?event_type=DOCUMENT_CREATED

# Events from a specific time
GET /events?since=2024-01-01T00:00:00Z

# Combined filters
GET /events?aggregate_type=document&event_type=DOCUMENT_CREATED&since=2024-01-01T00:00:00Z&limit=100

2. Recover Document State

# Current state
GET /events/{aggregate_id}/state

# State at a specific time
GET /events/{aggregate_id}/state?at_time=2024-01-15T10:30:00Z

3. Event Statistics

# General statistics
GET /events/stats

# Statistics by type
GET /events/stats?aggregate_type=document

# Statistics from a specific time
GET /events/stats?since=2024-01-01T00:00:00Z

🎨 UI for Tracking

1. Grafana + MongoDB/PostgreSQL

  • ✅ Ready-to-use dashboards
  • ✅ Event visualization
  • ✅ Real-time analytics

Configuration:

# grafana-datasource.yaml
datasources:
  - name: MongoDB
    type: mongodb
    url: mongodb://localhost:27017
    database: weaviate_events

2. Custom Dashboard (React/Vue)

  • ✅ Full control
  • ✅ Custom visualizations
  • ✅ Integration with your system

API Integration Example:

// Get events
const events = await fetch('/events?aggregate_id=doc_123')
  .then(r => r.json());

// Statistics
const stats = await fetch('/events/stats?since=2024-01-01T00:00:00Z')
  .then(r => r.json());

3. Elasticsearch + Kibana

  • ✅ Powerful search
  • ✅ Visualization
  • ✅ Analytics

Configuration:

  • Export events from MongoDB/PostgreSQL to Elasticsearch
  • Use Kibana for visualization

📝 How to Use Properly

1. Operation Audit

# Check who and when created a document
events = client.get("/events", params={
    "aggregate_id": "doc_123",
    "event_type": "DOCUMENT_CREATED"
})

# Check all document changes
events = client.get("/events", params={
    "aggregate_id": "doc_123"
})

2. Embedding Model Tracking

# Find all documents created with a specific model
events = client.get("/events", params={
    "event_type": "DOCUMENT_CREATED",
    # Filter by data.embedding_model in UI or via aggregation
})

# Check model changes
stats = client.get("/events/stats")
# Analyze embedding_model in events

3. Search Analytics

# Most popular queries
events = client.get("/events", params={
    "event_type": "SEARCH_PERFORMED",
    "limit": 1000
})

# Analyze search success rate
for event in events:
    query = event["data"]["query"]
    results_count = event["data"]["results_count"]
    avg_score = event["data"].get("avg_score", 0)
    print(f"{query}: {results_count} results, avg score: {avg_score}")

4. Recovery After Errors

# Recover document state at a moment before error
state = client.get(f"/events/{doc_id}/state", params={
    "at_time": "2024-01-15T10:00:00Z"  # Before error
})

# Check what changed
events_after = client.get("/events", params={
    "aggregate_id": doc_id,
    "since": "2024-01-15T10:00:00Z"
})

5. Compliance and Regulations

# Export events for audit
events = client.get("/events", params={
    "since": "2024-01-01T00:00:00Z",
    "limit": 10000
})

# Save to archive
with open("audit_export.json", "w") as f:
    json.dump(events, f, indent=2)

🔧 Production Configuration

# .env.deploy
EVENT_BACKEND=mongodb
MONGODB_URL=mongodb://user:password@mongodb-host:27017/
MONGODB_DATABASE=weaviate_events

Recommendations:

  • Use replica set for reliability
  • Configure TTL indexes for automatic deletion of old events
  • Regularly perform backups

PostgreSQL (optional, if you already have PostgreSQL infrastructure)

# .env.deploy
EVENT_BACKEND=postgresql
POSTGRESQL_URL=postgresql://user:password@postgres-host:5432/weaviate_events

Recommendations:

  • Use date partitioning
  • Configure automatic archiving of old events
  • Regularly perform vacuum

3. Monitoring

# Check event store health
health = client.get("/health")
if health["data_management"]["event_sourcing_ready"]:
    print("✅ Event Sourcing ready")
    
# Check event count
stats = client.get("/events/stats")
print(f"Total events: {stats['total_events']}")

📈 Best Practices

  1. Regularly archive old events (if not needed for compliance)
  2. Monitor event store size - events accumulate
  3. Use indexes for fast queries
  4. Export events for long-term storage
  5. Configure alerts for event anomalies

🚀 Usage Example

import requests

BASE_URL = "http://weaviate-service:8002"

# 1. Create document (DOCUMENT_CREATED event automatically recorded)
response = requests.post(f"{BASE_URL}/store", json={
    "documents": [{
        "content": "Test document",
        "metadata": {"source": "test.pdf"}
    }]
})

# 2. Check events
events = requests.get(f"{BASE_URL}/events", params={
    "event_type": "DOCUMENT_CREATED",
    "limit": 10
}).json()

# 3. Recover document state
doc_id = events["events"][0]["aggregate_id"]
state = requests.get(f"{BASE_URL}/events/{doc_id}/state").json()

# 4. Statistics
stats = requests.get(f"{BASE_URL}/events/stats").json()
print(f"Total events: {stats['total_events']}")

On this page