CROP

Architecture Evolution Plan: MongoDB-First with Vector DB Adapter

3-stage evolution from current ES/Weaviate to consolidated MongoDB Atlas, with optional Pinecone migration path via adapter pattern.

Overview

A phased approach to consolidating three databases (MongoDB + Elasticsearch + Weaviate) into a single MongoDB Atlas cluster, with a clean adapter layer that allows adding a dedicated vector DB later if scale demands it.

graph LR
  subgraph "Stage 1 — Now"
    A["MongoDB Atlas<br/>(all data + vectors)"]
  end
  subgraph "Stage 2 — When needed"
    B["MongoDB Atlas<br/>(data + full-text)"]
    C["Pinecone<br/>(vectors only)"]
  end
  subgraph "Stage 3 — AI-core"
    D["MongoDB Atlas"]
    E["Vector DB"]
    F["Reranker Model"]
  end

  A -->|"> 50M vectors<br/>or < 20ms p99"| B
  A -->|"> 50M vectors<br/>or < 20ms p99"| C
  B -->|"AI becomes<br/>core product"| D
  C -->|"AI becomes<br/>core product"| E
  D --> F
  E --> F

Stage 1: MongoDB as Single Database (Current Target)

All data lives in MongoDB Atlas with built-in search capabilities.

graph TD
  subgraph MongoDB Atlas
    parts["parts<br/><small>catalog + embeddings</small>"]
    chunks["document_chunks<br/><small>RAG chunks + embeddings</small>"]
    manual["manual_parts<br/><small>part numbers from PDFs</small>"]
    images["part_images<br/><small>metadata + CLIP embeddings</small>"]
    docs["pl_documents<br/><small>PDF metadata</small>"]
    pages["pl_document_pages<br/><small>page-level data</small>"]
    convos["conversations<br/><small>chat + turn embeddings</small>"]
    fitment["equipment_fitment<br/><small>parts → equipment</small>"]
    ship["shipments"]
    rates["rate_queries"]
    track["tracking"]
    vapi["vapi_calls"]
  end

What MongoDB Replaces

MongoDB CapabilityReplacesUse Case
Atlas Vector SearchWeaviateSemantic search, RAG retrieval, image similarity
Atlas SearchElasticsearchFull-text product search with fuzzy, boost, filters
Metadata filtersWeaviate filtersProvider, page_type, brand, stock status filtering
Hybrid searchWeaviate hybridCombined vector + full-text in aggregation pipeline

When MongoDB Is Sufficient

MongoDB handles this workload well when:

  • < 10–30M embeddings — current system has ~1–5M vectors across all collections
  • Latency ~50–200ms — acceptable for RAG, product search, agent queries
  • Simple semantic search — nearest-neighbor with metadata filters

This covers 80% of AI systems and is the right starting point.


Stage 2: Add Specialized Vector DB (When Needed)

Add Pinecone (or Qdrant) only when real problems appear:

TriggerThresholdCurrent State
Too many embeddings> 50–100M vectorsFar below
Latency requirements< 20ms p99Not required
Complex AI search pipelinesHybrid + reranking + clusteringNot yet needed
Recommendation systemsReal-time similarity at scaleNot in scope

Architecture with Vector DB

sequenceDiagram
    participant U as User Query
    participant E as Embedding Service<br/>(BGE / CLIP)
    participant P as Pinecone<br/>(vectors)
    participant M as MongoDB<br/>(metadata)
    participant L as LLM<br/>(Claude / vLLM)

    U->>E: Query text
    E->>P: Query vector
    E->>M: Query filters
    P-->>L: Vector results
    M-->>L: Metadata results
    L-->>U: Response

Responsibility Split

MongoDB remains the primary database:

  • Documents and metadata
  • Users and permissions
  • Agent memory (long-term)
  • Business data (orders, shipments, tracking)
  • Full-text search (Atlas Search)

Pinecone handles vectors only:

  • Document chunk embeddings (1024d BGE)
  • Product embeddings (512d)
  • Image embeddings (512d CLIP)
  • Fast ANN with managed scaling

Why Pinecone

AdvantageDetail
Managed serviceNo index management, no infra ops
Fast ANNSub-20ms at 100M+ vectors
Easy scalingServerless or pod-based, auto-scales
Metadata filteringNative support for hybrid queries
No operational burdenUnlike self-hosted Weaviate/Qdrant

Stage 3: Full AI Search Pipeline

When AI search becomes a core product, add a reranker between retrieval and the LLM.

flowchart TD
    Q["User Query"] --> E["Embedding Service"]
    E --> VDB["Vector DB<br/>(retrieval)"]
    E --> M["MongoDB<br/>(metadata)"]
    VDB --> Merge["Merge Results"]
    M --> Merge
    Merge --> R["Reranker Model<br/>(cross-encoder)"]
    R --> LLM["LLM"]
    LLM --> A["Answer"]

Add reranker model (e.g., Cohere Rerank, BGE-reranker) between retrieval and LLM for higher-quality RAG results.


Agent Memory Architecture

For AI agents (CROP-Agents), use a tiered memory system:

flowchart LR
    subgraph Agent["AI Agent"]
        ST["Short-term memory<br/><small>current session, working context</small>"]
        LT["Long-term memory<br/><small>semantic recall, past conversations</small>"]
        KB["Knowledge base<br/><small>products, manuals, documents, rules</small>"]
    end

    ST -->|"TTL-based, fast reads"| Redis
    LT -->|"permanent"| VectorDB["Vector DB<br/>(MongoDB Vector Search<br/>or Pinecone)"]
    KB -->|"permanent"| MongoDB["MongoDB<br/>(Atlas Search +<br/>structured queries)"]
Memory TierStoreTTLPurpose
Short-termRedisSession durationCurrent conversation context, tool results, scratchpad
Long-termVector DB (Mongo or Pinecone)PermanentSemantic recall of past interactions, learned preferences
Knowledge baseMongoDBPermanentProducts, manuals, documents, business rules

Critical Design Pattern: Vector Store Adapter Layer

Do not couple application code to any specific vector database.

Interface Definition

from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
from dataclasses import dataclass


@dataclass
class VectorSearchResult:
    id: str
    score: float
    metadata: Dict[str, Any]
    text: Optional[str] = None


class VectorStore(ABC):
    """Abstract vector store interface.
    Switch from MongoDB to Pinecone (or vice versa) in 1 day."""

    @abstractmethod
    async def upsert(
        self,
        collection: str,
        ids: List[str],
        embeddings: List[List[float]],
        metadata: List[Dict[str, Any]],
        texts: Optional[List[str]] = None,
    ) -> int: ...

    @abstractmethod
    async def search(
        self,
        collection: str,
        query_embedding: List[float],
        limit: int = 10,
        filters: Optional[Dict[str, Any]] = None,
        min_score: float = 0.0,
    ) -> List[VectorSearchResult]: ...

    @abstractmethod
    async def hybrid_search(
        self,
        collection: str,
        query_text: str,
        query_embedding: List[float],
        limit: int = 10,
        alpha: float = 0.5,
        filters: Optional[Dict[str, Any]] = None,
    ) -> List[VectorSearchResult]: ...

    @abstractmethod
    async def delete(self, collection: str, ids: List[str]) -> int: ...

    @abstractmethod
    async def health_check(self) -> bool: ...

MongoDB Implementation (Stage 1)

class MongoVectorStore(VectorStore):
    """Uses $vectorSearch aggregation pipeline for semantic search
    and Atlas Search for full-text component of hybrid search."""

    def __init__(self, mongo_client: AsyncIOMotorClient, database: str):
        self.db = mongo_client[database]

    async def search(self, collection, query_embedding, limit=10, filters=None, min_score=0.0):
        pipeline = [
            {
                "$vectorSearch": {
                    "index": self._get_index_name(collection),
                    "path": self._get_vector_field(collection),
                    "queryVector": query_embedding,
                    "numCandidates": limit * 10,
                    "limit": limit,
                    **({"filter": self._build_filter(filters)} if filters else {}),
                }
            },
            {"$addFields": {"score": {"$meta": "vectorSearchScore"}}},
        ]
        if min_score > 0:
            pipeline.append({"$match": {"score": {"$gte": min_score}}})
        # ... execute pipeline and return results

Pinecone Implementation (Stage 2)

class PineconeVectorStore(VectorStore):
    """Vectors live in Pinecone, metadata is minimal (IDs + filters).
    Full metadata stays in MongoDB — results are enriched after retrieval."""

    def __init__(self, api_key: str, environment: str):
        self.pc = Pinecone(api_key=api_key)

    async def search(self, collection, query_embedding, limit=10, filters=None, min_score=0.0):
        index = self._get_index(collection)
        response = index.query(
            vector=query_embedding,
            top_k=limit,
            filter=self._build_pinecone_filter(filters) if filters else None,
            include_metadata=True,
        )
        # ... convert matches to VectorSearchResult

Factory — Switching Is a Config Change

class VectorStoreType(str, Enum):
    MONGO = "mongo"
    PINECONE = "pinecone"

def create_vector_store(store_type: VectorStoreType, **kwargs) -> VectorStore:
    if store_type == VectorStoreType.MONGO:
        return MongoVectorStore(mongo_client=kwargs["mongo_client"], database=kwargs["database"])
    elif store_type == VectorStoreType.PINECONE:
        return PineconeVectorStore(api_key=kwargs["api_key"], environment=kwargs["environment"])
# Stage 1
VECTOR_STORE_TYPE=mongo

# Stage 2
VECTOR_STORE_TYPE=pinecone
PINECONE_API_KEY=...
PINECONE_ENVIRONMENT=...

Implementation Roadmap

Phase 1: Consolidate to MongoDB (Current Priority)

gantt
    title Phase 1 — Consolidate to MongoDB
    dateFormat X
    axisFormat %s

    section Search Indexes
    Atlas Search indexes on parts          :a1, 0, 1
    Atlas Vector Search indexes            :a2, 1, 2

    section Data Migration
    Migrate Weaviate → MongoDB             :a3, 2, 3
    Migrate ES manual_parts → MongoDB      :a4, 2, 3

    section Code Changes
    Implement VectorStore adapter           :a5, 3, 4
    Update services to use adapter          :a6, 4, 5

    section Validation
    Parallel reads (old + new)              :a7, 5, 6
    Cut over services                       :a8, 6, 7
    Decommission ES + Weaviate              :a9, 7, 8

Phase 2: Add Pinecone (When Triggers Hit)

  1. Implement PineconeVectorStore
  2. Create Pinecone indexes mirroring MongoDB vector collections
  3. Backfill embeddings to Pinecone
  4. Switch VECTOR_STORE_TYPE=pinecone per service
  5. Monitor latency and recall improvements

Phase 3: Add Reranker (When AI Search Is Core)

  1. Add reranker step to VectorStore.hybrid_search() or as separate pipeline stage
  2. Deploy cross-encoder model (Cohere Rerank or BGE-reranker)
  3. A/B test RAG quality with and without reranking

Decision Log

DecisionRationale
MongoDB first, not PineconeCurrent scale (< 5M vectors) doesn't justify additional infra. Atlas Vector Search is sufficient.
Adapter patternDecouples app code from storage. Enables migration in 1 day when needed.
No ElasticsearchAtlas Search provides equivalent full-text capabilities. One less system to maintain.
No WeaviateSelf-hosted Weaviate adds operational burden. MongoDB Vector Search or Pinecone (managed) are better.
Pinecone over Qdrant/MilvusManaged service, no ops, fastest ANN. When we need a dedicated vector DB, managed wins.
Redis for short-term memoryAlready in stack (parse-pdf-api). TTL-based session data doesn't belong in MongoDB.

On this page