Data Preparation Service
MLflow-based pipeline service for preparing RAG data from GCS PDFs.
Data Preparation Service
MLflow-based pipeline service for preparing RAG data from GCS PDFs.
Overview
This service orchestrates the complete data preparation pipeline:
- Download PDFs from Google Cloud Storage
- Process PDFs via pdf-parser-service (split, parse tables/schemas)
- Prepare RAG documents from parsed data
- Store in Weaviate vector database
All steps are tracked using MLflow for monitoring and reproducibility.
Architecture
GCS Bucket (PDFs)
↓
Data Preparation Service (MLflow Pipeline)
↓
├─→ pdf-parser-service (/split-by-documents)
├─→ pdf-parser-service (/parse-table, /parse)
├─→ pdf-parser-service (/prepare-rag)
├─→ pdf-parser-service (/extract-images)
├─→ clip-service (/embed, /embed/batch) # REQUIRED: for image embeddings
└─→ weaviate-service (/store)Local Development
Prerequisites
- Python 3.11+
- pdf-parser-service running (REQUIRED) - for PDF processing (default: http://localhost:8000)
- weaviate-service running (REQUIRED) - for storing documents (default: http://localhost:8002)
- CLIP service running (REQUIRED) - for image embeddings (default: http://localhost:8002)
- MLflow server running (default: http://localhost:5000)
- GCS bucket with PDFs
Installation
cd data_preparation_service
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
pip install -r requirements.txtEnvironment Variables
Create a .env file:
GCS_BUCKET_NAME=your-bucket-name
GCS_PREFIX=pdfs/
PDF_PARSER_API_URL=http://localhost:8000 # REQUIRED
WEAVIATE_API_URL=http://localhost:8002 # REQUIRED
CLIP_API_URL=http://localhost:8002 # REQUIRED
MLFLOW_TRACKING_URI=http://localhost:5000
PORT=8003Running the Service
uvicorn main:app --reload --port 8003The service will be available at http://localhost:8003
API documentation: http://localhost:8003/docs
API Endpoints
POST /process-pdf
Process a single PDF through the pipeline.
⚠️ Requires CLIP Service: The pipeline extracts and processes images from PDFs. CLIP service must be running and accessible at CLIP_API_URL.
Request:
{
"pdf_path": "pdfs/manual.pdf",
"document_number": "SPD00805" // Optional
}Response:
{
"run_id": "abc123...",
"status": "started",
"pdf_path": "pdfs/manual.pdf",
"document_number": "SPD00805",
"message": "Pipeline started. Track progress with run_id: abc123..."
}POST /process-batch
Process multiple PDFs sequentially (one at a time) to save memory.
⚠️ Memory-efficient: Processes PDFs one by one, not in parallel.
Request:
{
"pdf_paths": [
"pdfs/manual1.pdf",
"pdfs/manual2.pdf"
],
"document_numbers": ["SPD00805", "SPD00806"] // Optional
}Response:
{
"parent_run_id": "abc123...",
"total": 2,
"status": "started",
"message": "Batch processing started. Track progress with parent_run_id: abc123..."
}POST /process-from-gcs
Automatically discover and process all PDFs from GCS bucket.
⚠️ Memory-efficient: Processes PDFs sequentially, one at a time.
Query Parameters:
prefix: GCS prefix to filter PDFs (optional)limit: Maximum number of PDFs to process (default: 100, max: 1000)
Example:
POST /process-from-gcs?prefix=pdfs/manuals&limit=50Response:
{
"pdfs_found": 50,
"parent_run_id": "abc123...",
"status": "started",
"message": "Processing 50 PDFs sequentially. Track with parent_run_id: abc123..."
}GET /runs/{run_id}
Get status and metrics for an MLflow run.
Response:
{
"run_id": "abc123...",
"status": "FINISHED",
"start_time": 1234567890,
"end_time": 1234567900,
"params": {
"pdf_path": "pdfs/manual.pdf",
"document_number": "SPD00805"
},
"metrics": {
"total_pages": 100,
"documents_processed": 3,
"rag_documents_created": 150
}
}GET /runs
List recent MLflow runs.
Query Parameters:
limit: Maximum number of runs (default: 20)
GET /health
Health check endpoint.
Memory Optimization
⚠️ Important: The pipeline processes PDFs one at a time to minimize memory usage:
- ✅ Sequential processing: PDFs are processed one by one (not in parallel)
- ✅ Memory cleanup: After each PDF, memory is explicitly freed using
delandgc.collect() - ✅ No accumulation: Each PDF is fully processed and stored before moving to the next
This allows processing large batches of PDFs without running out of memory.
Pipeline Steps
The pipeline executes the following steps for each PDF (one at a time):
-
Download PDF from GCS
- Downloads PDF file from specified GCS bucket
- Logs PDF size metric
-
Split PDF by Documents
- Calls
/split-by-documentsendpoint - Groups pages by document number (SPD[0-9A-Z]+)
- Logs total pages and document count
- Calls
-
Parse Tables and Schemas
- For each page with
has_tableorhas_schema:- Calls
/parse-tablefor tables - Calls
/parsefor schemas - Combines results
- Calls
- For each page with
-
Prepare RAG Documents
- Calls
/prepare-ragendpoint - Creates structured RAG documents
- Calls
-
Extract and Process Images (REQUIRES CLIP Service)
- Calls
/extract-imagesendpoint on pdf-parser-service - For each extracted image:
- Calls CLIP service
/embed/batchto generate image embeddings - Prepares image documents with embeddings for Weaviate storage
- Calls CLIP service
- Links images to part numbers from tables when possible
- Calls
-
Store in Weaviate
- Calls
/storeendpoint on weaviate-service - Stores text documents (from RAG preparation)
- Stores image documents (with CLIP embeddings)
- Enables image search via vector similarity
- Calls
All steps are tracked in MLflow with:
- Parameters (PDF path, document number, etc.)
- Metrics (pages processed, documents created, etc.)
- Status (started, completed, failed)
MLflow Integration
Tracking
All pipeline runs are tracked in MLflow with:
- Parameters: PDF path, document number, processing options
- Metrics: Pages processed, documents created, processing time
- Tags: Status, error messages (if any)
Viewing Runs
Access MLflow UI:
mlflow ui --backend-store-uri http://localhost:5000Or use the service endpoints:
GET /runs- List recent runsGET /runs/{run_id}- Get specific run details
Deployment to GCP
Prerequisites
- GCP project with billing enabled
gcloudCLI installed and authenticated- Docker installed (for local testing)
- Environment variables in
.env.deploy(project root):PROJECT_ID- Your GCP project IDREGION- GCP region (default: us-central1)GCS_BUCKET_NAME=crop-documents- GCS bucket with PDF documentsGCS_PREFIX- Optional: prefix for PDF paths in bucketPDF_PARSER_API_URL- REQUIRED: URL to PDF Parser Service (Cloud Run URL or VM IP)WEAVIATE_API_URL- REQUIRED: URL to Weaviate Service (Cloud Run URL or VM IP). Pipelines require Weaviate to store processed documents. Service will not function without Weaviate.CLIP_API_URL- REQUIRED: URL to CLIP Service (Cloud Run URL or VM IP:port). The pipeline requires CLIP service for generating image embeddings. Service will not start if CLIP is unavailable.MLFLOW_TRACKING_URI- MLflow tracking server URLWEAVIATE_URL- Optional: Direct Weaviate URLWEAVIATE_API_KEY- Optional: Weaviate API keyEMBEDDING_MODEL_URL- Optional: Embedding model service URLGOOGLE_APPLICATION_CREDENTIALS- Path to service account JSON (for local testing)
Deploy to Cloud Run (Recommended)
FastAPI service will run in Docker container on GCP Cloud Run:
cd data_preparation_service
./deploy.shOr use the general deployment script:
./scripts/deploy_service.sh data_preparation_serviceThe deployment script will:
- Build Docker image using
Dockerfile - Push image to Google Container Registry
- Deploy to Cloud Run with production settings (2GB memory, 2 CPU, 10min timeout)
- Auto-discover service URLs for dependencies
Local Docker Deployment
For local testing with Docker:
cd data_preparation_service
docker-compose up --buildThe service will be available at http://localhost:8003
Environment Variables for Cloud Run
Add to .env.deploy (project root):
# Data Preparation Service
GCS_BUCKET_NAME=crop-documents
GCS_PREFIX= # Optional: prefix for PDF paths in bucket
PDF_PARSER_API_URL=https://pdf-parser-service.run.app # REQUIRED
WEAVIATE_API_URL=https://weaviate-service.run.app # REQUIRED
CLIP_API_URL=http://clip-service-vm-ip:8002 # REQUIRED: CLIP service (typically on GPU VM)
MLFLOW_TRACKING_URI=https://mlflow-service.run.app
GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.jsonNote: crop-documents is the GCS bucket with PDF documents for the offline data preparation pipeline.
See also: See Usage Examples section below for offline pipeline documentation.
Usage Examples
Process Single PDF
import requests
response = requests.post(
"http://localhost:8003/process-pdf",
json={
"pdf_path": "pdfs/manual.pdf",
"document_number": "SPD00805"
}
)
run_id = response.json()["run_id"]
print(f"Pipeline started: {run_id}")
# Check status
status = requests.get(f"http://localhost:8003/runs/{run_id}")
print(status.json())Process Batch (Sequential)
response = requests.post(
"http://localhost:8003/process-batch",
json={
"pdf_paths": [
"pdfs/manual1.pdf",
"pdfs/manual2.pdf"
]
}
)
parent_run_id = response.json()["parent_run_id"]
print(f"Batch started: {parent_run_id}")
# Check status
status = requests.get(f"http://localhost:8003/runs/{parent_run_id}")
print(status.json())Process All PDFs from GCS
# Automatically find and process all PDFs
response = requests.post(
"http://localhost:8003/process-from-gcs",
params={
"prefix": "pdfs/manuals", # Optional
"limit": 100 # Optional, default: 100
}
)
print(f"Found {response.json()['pdfs_found']} PDFs")
print(f"Parent run ID: {response.json()['parent_run_id']}")Monitoring
MLflow Metrics
Key metrics tracked:
pdf_size_bytes: Size of downloaded PDFtotal_pages: Total pages in PDFdocument_count: Number of documents founddocuments_processed: Documents successfully processedpages_processed: Pages successfully processedrag_documents_created: RAG documents created and storedimages_stored: Images with CLIP embeddings stored in Weaviateimages_page_{N}: Number of images found on page N
Error Handling
Errors are logged in MLflow as:
- Parameters:
error_page_{page_num},weaviate_error_page_{page_num} - Status:
failedwith error message
Required Services
⚠️ Important: The following services are mandatory for the data preparation pipelines to function:
1. PDF Parser Service (REQUIRED)
- Purpose: Parses PDFs, extracts tables, schemas, and images
- Required for: All pipeline operations (splitting PDFs, parsing, RAG preparation)
- If unavailable: Pipelines will fail when trying to process PDFs
2. Weaviate Service (REQUIRED)
- Purpose: Vector database for storing processed documents and embeddings
- Required for: Storing all processed data (text documents, image documents)
- If unavailable: Pipelines cannot store processed documents and will fail
- Setup: Typically runs on a GPU VM (see
weaviate_service/deploy_gpu_vm.sh)
3. CLIP Service (REQUIRED)
- Purpose: Generates embeddings for images extracted from PDFs
- Required for: Image processing and semantic image search
- Service Startup: The service will not start if CLIP service is unavailable or
CLIP_API_URLis not set - Setup: Typically runs on a GPU VM (see
clip_service/deploy_gpu_vm.sh) - Set
CLIP_API_URLto the CLIP service URL (e.g.,http://clip-service-vm-ip:8002) - The service verifies CLIP availability at startup and during pipeline execution
What Happens Without Required Services
If any required service is not available:
- ❌ CLIP: Service startup will fail with error:
CLIP service is required but failed to initialize - ❌ Weaviate: Pipelines cannot store documents and will fail during storage operations
- ❌ PDF Parser: Pipelines cannot process PDFs and will fail during parsing operations
All three services must be deployed and accessible before running the data preparation pipelines.
Related Documentation
- PDF Parser Service - PDF parsing endpoints (REQUIRED)
- Weaviate Service - Vector database service (REQUIRED)
- CLIP Service - Image embedding service (REQUIRED)
- MLflow Documentation - MLflow framework