ProjectsPDF Parser
Event Sourcing Guide for Weaviate Service
Event Sourcing is implemented to provide: - Transparency - complete history of all changes - Audit - tracking of all operations - Recovery - time-travel...
Event Sourcing Guide for Weaviate Service
📋 Overview
Event Sourcing is implemented to provide:
- Transparency - complete history of all changes
- Audit - tracking of all operations
- Recovery - time-travel queries
- Analytics - event statistics
- Model tracking - support for multiple embedding models
🗄️ Event Storage
Storage Options:
1. MongoDB (RECOMMENDED for production)
- ✅ Flexible storage
- ✅ Fast queries
- ✅ Easy to scale
- ✅ Index support
# Set up MongoDB
EVENT_BACKEND=mongodb
MONGODB_URL=mongodb://localhost:27017/
MONGODB_DATABASE=weaviate_eventsInstallation:
pip install pymongo2. PostgreSQL (optional, if you already have PostgreSQL infrastructure)
- ✅ ACID transactions
- ✅ SQL queries
- ✅ Good for analytics
- ✅ JSONB support
EVENT_BACKEND=postgresql
POSTGRESQL_URL=postgresql://user:password@localhost:5432/weaviate_eventsInstallation:
pip install psycopg2-binaryWhen to use PostgreSQL:
- If you already have PostgreSQL infrastructure
- If you need complex SQL queries for analytics
- If you need ACID transactions
💡 Recommendation: If you already have MongoDB, use it - it's good enough for Event Sourcing.
3. File-based (for development)
- ✅ Simple
- ✅ No database required
- ❌ Doesn't scale
- ❌ Slower for large volumes
EVENT_BACKEND=file
EVENT_FILE_PATH=/tmp/weaviate_events.jsonl📊 How to Perform Audit
1. Get Event History
# All events
GET /events
# Events for a specific document
GET /events?aggregate_id=doc_123
# Events by type
GET /events?event_type=DOCUMENT_CREATED
# Events from a specific time
GET /events?since=2024-01-01T00:00:00Z
# Combined filters
GET /events?aggregate_type=document&event_type=DOCUMENT_CREATED&since=2024-01-01T00:00:00Z&limit=1002. Recover Document State
# Current state
GET /events/{aggregate_id}/state
# State at a specific time
GET /events/{aggregate_id}/state?at_time=2024-01-15T10:30:00Z3. Event Statistics
# General statistics
GET /events/stats
# Statistics by type
GET /events/stats?aggregate_type=document
# Statistics from a specific time
GET /events/stats?since=2024-01-01T00:00:00Z🎨 UI for Tracking
Recommended Solutions:
1. Grafana + MongoDB/PostgreSQL
- ✅ Ready-to-use dashboards
- ✅ Event visualization
- ✅ Real-time analytics
Configuration:
# grafana-datasource.yaml
datasources:
- name: MongoDB
type: mongodb
url: mongodb://localhost:27017
database: weaviate_events2. Custom Dashboard (React/Vue)
- ✅ Full control
- ✅ Custom visualizations
- ✅ Integration with your system
API Integration Example:
// Get events
const events = await fetch('/events?aggregate_id=doc_123')
.then(r => r.json());
// Statistics
const stats = await fetch('/events/stats?since=2024-01-01T00:00:00Z')
.then(r => r.json());3. Elasticsearch + Kibana
- ✅ Powerful search
- ✅ Visualization
- ✅ Analytics
Configuration:
- Export events from MongoDB/PostgreSQL to Elasticsearch
- Use Kibana for visualization
📝 How to Use Properly
1. Operation Audit
# Check who and when created a document
events = client.get("/events", params={
"aggregate_id": "doc_123",
"event_type": "DOCUMENT_CREATED"
})
# Check all document changes
events = client.get("/events", params={
"aggregate_id": "doc_123"
})2. Embedding Model Tracking
# Find all documents created with a specific model
events = client.get("/events", params={
"event_type": "DOCUMENT_CREATED",
# Filter by data.embedding_model in UI or via aggregation
})
# Check model changes
stats = client.get("/events/stats")
# Analyze embedding_model in events3. Search Analytics
# Most popular queries
events = client.get("/events", params={
"event_type": "SEARCH_PERFORMED",
"limit": 1000
})
# Analyze search success rate
for event in events:
query = event["data"]["query"]
results_count = event["data"]["results_count"]
avg_score = event["data"].get("avg_score", 0)
print(f"{query}: {results_count} results, avg score: {avg_score}")4. Recovery After Errors
# Recover document state at a moment before error
state = client.get(f"/events/{doc_id}/state", params={
"at_time": "2024-01-15T10:00:00Z" # Before error
})
# Check what changed
events_after = client.get("/events", params={
"aggregate_id": doc_id,
"since": "2024-01-15T10:00:00Z"
})5. Compliance and Regulations
# Export events for audit
events = client.get("/events", params={
"since": "2024-01-01T00:00:00Z",
"limit": 10000
})
# Save to archive
with open("audit_export.json", "w") as f:
json.dump(events, f, indent=2)🔧 Production Configuration
MongoDB (RECOMMENDED - sufficient for Event Sourcing)
# .env.deploy
EVENT_BACKEND=mongodb
MONGODB_URL=mongodb://user:password@mongodb-host:27017/
MONGODB_DATABASE=weaviate_eventsRecommendations:
- Use replica set for reliability
- Configure TTL indexes for automatic deletion of old events
- Regularly perform backups
PostgreSQL (optional, if you already have PostgreSQL infrastructure)
# .env.deploy
EVENT_BACKEND=postgresql
POSTGRESQL_URL=postgresql://user:password@postgres-host:5432/weaviate_eventsRecommendations:
- Use date partitioning
- Configure automatic archiving of old events
- Regularly perform vacuum
3. Monitoring
# Check event store health
health = client.get("/health")
if health["data_management"]["event_sourcing_ready"]:
print("✅ Event Sourcing ready")
# Check event count
stats = client.get("/events/stats")
print(f"Total events: {stats['total_events']}")📈 Best Practices
- Regularly archive old events (if not needed for compliance)
- Monitor event store size - events accumulate
- Use indexes for fast queries
- Export events for long-term storage
- Configure alerts for event anomalies
🚀 Usage Example
import requests
BASE_URL = "http://weaviate-service:8002"
# 1. Create document (DOCUMENT_CREATED event automatically recorded)
response = requests.post(f"{BASE_URL}/store", json={
"documents": [{
"content": "Test document",
"metadata": {"source": "test.pdf"}
}]
})
# 2. Check events
events = requests.get(f"{BASE_URL}/events", params={
"event_type": "DOCUMENT_CREATED",
"limit": 10
}).json()
# 3. Recover document state
doc_id = events["events"][0]["aggregate_id"]
state = requests.get(f"{BASE_URL}/events/{doc_id}/state").json()
# 4. Statistics
stats = requests.get(f"{BASE_URL}/events/stats").json()
print(f"Total events: {stats['total_events']}")