CROP
ProjectsPDF Parser

Data Preparation Service

MLflow-based pipeline service for preparing RAG data from GCS PDFs.

Data Preparation Service

MLflow-based pipeline service for preparing RAG data from GCS PDFs.

Overview

This service orchestrates the complete data preparation pipeline:

  1. Download PDFs from Google Cloud Storage
  2. Process PDFs via pdf-parser-service (split, parse tables/schemas)
  3. Prepare RAG documents from parsed data
  4. Store in Weaviate vector database

All steps are tracked using MLflow for monitoring and reproducibility.

Architecture

GCS Bucket (PDFs)

Data Preparation Service (MLflow Pipeline)

    ├─→ pdf-parser-service (/split-by-documents)
    ├─→ pdf-parser-service (/parse-table, /parse)
    ├─→ pdf-parser-service (/prepare-rag)
    ├─→ pdf-parser-service (/extract-images)
    ├─→ clip-service (/embed, /embed/batch)  # REQUIRED: for image embeddings
    └─→ weaviate-service (/store)

Local Development

Prerequisites

Installation

cd data_preparation_service
python -m venv .venv
source .venv/bin/activate  # On Windows: .venv\Scripts\activate
pip install -r requirements.txt

Environment Variables

Create a .env file:

GCS_BUCKET_NAME=your-bucket-name
GCS_PREFIX=pdfs/
PDF_PARSER_API_URL=http://localhost:8000  # REQUIRED
WEAVIATE_API_URL=http://localhost:8002    # REQUIRED
CLIP_API_URL=http://localhost:8002        # REQUIRED
MLFLOW_TRACKING_URI=http://localhost:5000
PORT=8003

Running the Service

uvicorn main:app --reload --port 8003

The service will be available at http://localhost:8003

API documentation: http://localhost:8003/docs

API Endpoints

POST /process-pdf

Process a single PDF through the pipeline.

⚠️ Requires CLIP Service: The pipeline extracts and processes images from PDFs. CLIP service must be running and accessible at CLIP_API_URL.

Request:

{
  "pdf_path": "pdfs/manual.pdf",
  "document_number": "SPD00805"  // Optional
}

Response:

{
  "run_id": "abc123...",
  "status": "started",
  "pdf_path": "pdfs/manual.pdf",
  "document_number": "SPD00805",
  "message": "Pipeline started. Track progress with run_id: abc123..."
}

POST /process-batch

Process multiple PDFs sequentially (one at a time) to save memory.

⚠️ Memory-efficient: Processes PDFs one by one, not in parallel.

Request:

{
  "pdf_paths": [
    "pdfs/manual1.pdf",
    "pdfs/manual2.pdf"
  ],
  "document_numbers": ["SPD00805", "SPD00806"]  // Optional
}

Response:

{
  "parent_run_id": "abc123...",
  "total": 2,
  "status": "started",
  "message": "Batch processing started. Track progress with parent_run_id: abc123..."
}

POST /process-from-gcs

Automatically discover and process all PDFs from GCS bucket.

⚠️ Memory-efficient: Processes PDFs sequentially, one at a time.

Query Parameters:

  • prefix: GCS prefix to filter PDFs (optional)
  • limit: Maximum number of PDFs to process (default: 100, max: 1000)

Example:

POST /process-from-gcs?prefix=pdfs/manuals&limit=50

Response:

{
  "pdfs_found": 50,
  "parent_run_id": "abc123...",
  "status": "started",
  "message": "Processing 50 PDFs sequentially. Track with parent_run_id: abc123..."
}

GET /runs/{run_id}

Get status and metrics for an MLflow run.

Response:

{
  "run_id": "abc123...",
  "status": "FINISHED",
  "start_time": 1234567890,
  "end_time": 1234567900,
  "params": {
    "pdf_path": "pdfs/manual.pdf",
    "document_number": "SPD00805"
  },
  "metrics": {
    "total_pages": 100,
    "documents_processed": 3,
    "rag_documents_created": 150
  }
}

GET /runs

List recent MLflow runs.

Query Parameters:

  • limit: Maximum number of runs (default: 20)

GET /health

Health check endpoint.

Memory Optimization

⚠️ Important: The pipeline processes PDFs one at a time to minimize memory usage:

  • Sequential processing: PDFs are processed one by one (not in parallel)
  • Memory cleanup: After each PDF, memory is explicitly freed using del and gc.collect()
  • No accumulation: Each PDF is fully processed and stored before moving to the next

This allows processing large batches of PDFs without running out of memory.

Pipeline Steps

The pipeline executes the following steps for each PDF (one at a time):

  1. Download PDF from GCS

    • Downloads PDF file from specified GCS bucket
    • Logs PDF size metric
  2. Split PDF by Documents

    • Calls /split-by-documents endpoint
    • Groups pages by document number (SPD[0-9A-Z]+)
    • Logs total pages and document count
  3. Parse Tables and Schemas

    • For each page with has_table or has_schema:
      • Calls /parse-table for tables
      • Calls /parse for schemas
      • Combines results
  4. Prepare RAG Documents

    • Calls /prepare-rag endpoint
    • Creates structured RAG documents
  5. Extract and Process Images (REQUIRES CLIP Service)

    • Calls /extract-images endpoint on pdf-parser-service
    • For each extracted image:
      • Calls CLIP service /embed/batch to generate image embeddings
      • Prepares image documents with embeddings for Weaviate storage
    • Links images to part numbers from tables when possible
  6. Store in Weaviate

    • Calls /store endpoint on weaviate-service
    • Stores text documents (from RAG preparation)
    • Stores image documents (with CLIP embeddings)
    • Enables image search via vector similarity

All steps are tracked in MLflow with:

  • Parameters (PDF path, document number, etc.)
  • Metrics (pages processed, documents created, etc.)
  • Status (started, completed, failed)

MLflow Integration

Tracking

All pipeline runs are tracked in MLflow with:

  • Parameters: PDF path, document number, processing options
  • Metrics: Pages processed, documents created, processing time
  • Tags: Status, error messages (if any)

Viewing Runs

Access MLflow UI:

mlflow ui --backend-store-uri http://localhost:5000

Or use the service endpoints:

  • GET /runs - List recent runs
  • GET /runs/{run_id} - Get specific run details

Deployment to GCP

Prerequisites

  1. GCP project with billing enabled
  2. gcloud CLI installed and authenticated
  3. Docker installed (for local testing)
  4. Environment variables in .env.deploy (project root):
    • PROJECT_ID - Your GCP project ID
    • REGION - GCP region (default: us-central1)
    • GCS_BUCKET_NAME=crop-documents - GCS bucket with PDF documents
    • GCS_PREFIX - Optional: prefix for PDF paths in bucket
    • PDF_PARSER_API_URL - REQUIRED: URL to PDF Parser Service (Cloud Run URL or VM IP)
    • WEAVIATE_API_URL - REQUIRED: URL to Weaviate Service (Cloud Run URL or VM IP). Pipelines require Weaviate to store processed documents. Service will not function without Weaviate.
    • CLIP_API_URL - REQUIRED: URL to CLIP Service (Cloud Run URL or VM IP:port). The pipeline requires CLIP service for generating image embeddings. Service will not start if CLIP is unavailable.
    • MLFLOW_TRACKING_URI - MLflow tracking server URL
    • WEAVIATE_URL - Optional: Direct Weaviate URL
    • WEAVIATE_API_KEY - Optional: Weaviate API key
    • EMBEDDING_MODEL_URL - Optional: Embedding model service URL
    • GOOGLE_APPLICATION_CREDENTIALS - Path to service account JSON (for local testing)

FastAPI service will run in Docker container on GCP Cloud Run:

cd data_preparation_service
./deploy.sh

Or use the general deployment script:

./scripts/deploy_service.sh data_preparation_service

The deployment script will:

  1. Build Docker image using Dockerfile
  2. Push image to Google Container Registry
  3. Deploy to Cloud Run with production settings (2GB memory, 2 CPU, 10min timeout)
  4. Auto-discover service URLs for dependencies

Local Docker Deployment

For local testing with Docker:

cd data_preparation_service
docker-compose up --build

The service will be available at http://localhost:8003

Environment Variables for Cloud Run

Add to .env.deploy (project root):

# Data Preparation Service
GCS_BUCKET_NAME=crop-documents
GCS_PREFIX=  # Optional: prefix for PDF paths in bucket
PDF_PARSER_API_URL=https://pdf-parser-service.run.app  # REQUIRED
WEAVIATE_API_URL=https://weaviate-service.run.app      # REQUIRED
CLIP_API_URL=http://clip-service-vm-ip:8002            # REQUIRED: CLIP service (typically on GPU VM)
MLFLOW_TRACKING_URI=https://mlflow-service.run.app
GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json

Note: crop-documents is the GCS bucket with PDF documents for the offline data preparation pipeline.

See also: See Usage Examples section below for offline pipeline documentation.

Usage Examples

Process Single PDF

import requests

response = requests.post(
    "http://localhost:8003/process-pdf",
    json={
        "pdf_path": "pdfs/manual.pdf",
        "document_number": "SPD00805"
    }
)

run_id = response.json()["run_id"]
print(f"Pipeline started: {run_id}")

# Check status
status = requests.get(f"http://localhost:8003/runs/{run_id}")
print(status.json())

Process Batch (Sequential)

response = requests.post(
    "http://localhost:8003/process-batch",
    json={
        "pdf_paths": [
            "pdfs/manual1.pdf",
            "pdfs/manual2.pdf"
        ]
    }
)

parent_run_id = response.json()["parent_run_id"]
print(f"Batch started: {parent_run_id}")

# Check status
status = requests.get(f"http://localhost:8003/runs/{parent_run_id}")
print(status.json())

Process All PDFs from GCS

# Automatically find and process all PDFs
response = requests.post(
    "http://localhost:8003/process-from-gcs",
    params={
        "prefix": "pdfs/manuals",  # Optional
        "limit": 100  # Optional, default: 100
    }
)

print(f"Found {response.json()['pdfs_found']} PDFs")
print(f"Parent run ID: {response.json()['parent_run_id']}")

Monitoring

MLflow Metrics

Key metrics tracked:

  • pdf_size_bytes: Size of downloaded PDF
  • total_pages: Total pages in PDF
  • document_count: Number of documents found
  • documents_processed: Documents successfully processed
  • pages_processed: Pages successfully processed
  • rag_documents_created: RAG documents created and stored
  • images_stored: Images with CLIP embeddings stored in Weaviate
  • images_page_{N}: Number of images found on page N

Error Handling

Errors are logged in MLflow as:

  • Parameters: error_page_{page_num}, weaviate_error_page_{page_num}
  • Status: failed with error message

Required Services

⚠️ Important: The following services are mandatory for the data preparation pipelines to function:

1. PDF Parser Service (REQUIRED)

  • Purpose: Parses PDFs, extracts tables, schemas, and images
  • Required for: All pipeline operations (splitting PDFs, parsing, RAG preparation)
  • If unavailable: Pipelines will fail when trying to process PDFs

2. Weaviate Service (REQUIRED)

  • Purpose: Vector database for storing processed documents and embeddings
  • Required for: Storing all processed data (text documents, image documents)
  • If unavailable: Pipelines cannot store processed documents and will fail
  • Setup: Typically runs on a GPU VM (see weaviate_service/deploy_gpu_vm.sh)

3. CLIP Service (REQUIRED)

  • Purpose: Generates embeddings for images extracted from PDFs
  • Required for: Image processing and semantic image search
  • Service Startup: The service will not start if CLIP service is unavailable or CLIP_API_URL is not set
  • Setup: Typically runs on a GPU VM (see clip_service/deploy_gpu_vm.sh)
  • Set CLIP_API_URL to the CLIP service URL (e.g., http://clip-service-vm-ip:8002)
  • The service verifies CLIP availability at startup and during pipeline execution

What Happens Without Required Services

If any required service is not available:

  • CLIP: Service startup will fail with error: CLIP service is required but failed to initialize
  • Weaviate: Pipelines cannot store documents and will fail during storage operations
  • PDF Parser: Pipelines cannot process PDFs and will fail during parsing operations

All three services must be deployed and accessible before running the data preparation pipelines.

On this page