Data Flows
This document describes how data flows through the CROP parts catalog system, from external APIs to the end user.
Data Flows
This document describes how data flows through the CROP parts catalog system, from external APIs to the end user.
Overview
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ External APIs │───▶│ MongoDB │───▶│ Elasticsearch │───▶│ Website │
│ (DIS, K&M) │ │ (crop_dev) │ │ (parts_current)│ │ (Next.js SSR) │
└─────────────────┘ └─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │ │
Catalog Service Raw Storage Search Index User Interface1. External API → MongoDB (Catalog Service)
DIS API Collections
DIS (Dealer Information Systems) provides parts data for multiple manufacturers.
| Collection | Manufacturer | Endpoint |
|---|---|---|
parts_kuh | Kubota | DIS API |
parts_nhl | New Holland | DIS API |
parts_bns | Briggs & Stratton | DIS API |
parts_vnt | Ventrac | DIS API |
parts_mch | McHale | DIS API |
parts_mar | Maruyama | DIS API |
parts_kin | Kinze | DIS API |
parts_har | Harley | DIS API |
parts_fer | Ferris | DIS API |
parts_hot | Hotsy | DIS API |
Sync Trigger: Manual via catalog service endpoints or scheduled jobs.
Data Flow:
DIS API ──▶ GET /parts/{vendor} ──▶ Transform ──▶ MongoDB (parts_{vendor})K&M Tire API
K&M Tire provides tire and wheel data via their proprietary API.
| Collection | Source | Items |
|---|---|---|
parts_kmt | K&M Tire API | ~7,000 tires |
Sync Endpoints:
POST /catalog/km-tire/sync- Single page syncPOST /catalog/km-tire/batch-sync- Full catalog sync (parallel)
Data Flow:
K&M API ──▶ GET /Tires (paginated) ──▶ Transform ──▶ MongoDB (parts_kmt)
│
▼
Elasticsearch (optional)Rate Limiting:
- Token bucket: 10 requests/second
- Retry with exponential backoff
- Parallel processing: 3 concurrent pages
Transformer: indexedPartFormat in packages/shared-catalog/src/transformers.ts
2. MongoDB → Elasticsearch (Search Sync)
Sync Process
The sync is triggered by GitHub Actions workflow (search-deploy.yml) during deployment.
Dual Sync Configuration:
| Sync Job | Collections | Target Index | Documents |
|---|---|---|---|
| Parts Sync | parts_nhl,parts_bns,parts_vnt,parts_mch,parts_kuh,parts_hot,parts_har,parts_kin,parts_mar | parts_current | ~2,500 |
| Tires Sync | parts_kmt | tires_current | ~7,000 |
Sync Script: services/search/scripts/sync-mongodb-to-es.ts
Usage:
# Parts sync (DIS collections → parts_current)
bash scripts/gcp-sync-data.sh "parts_nhl,parts_bns,..." "parts_current"
# Tires sync (K&M → tires_current)
bash scripts/gcp-sync-data.sh "parts_kmt" "tires_current"Data Flow:
MongoDB ──▶ Cursor (batched) ──▶ Transform ──▶ Validate ──▶ Bulk Index ──▶ Elasticsearch
│ │ │ │ │
│ │ │ │ ▼
parts_{vendor} 2000 docs/batch IndexedPart Skip invalid parts_current OR tires_currentTransformer Registry
Located in packages/shared-catalog/src/transformers.ts:
| Collection Pattern | Transformer | Description |
|---|---|---|
crop_stage.parts, crop_prod.parts | indexedPartFormat | Pre-indexed format |
parts_kmt | indexedPartFormat | K&M Tire (pre-indexed) |
parts_* (DIS) | disApi | DIS API format |
| Archive collections | Config-based | Legacy data |
| Unknown | fallback | Auto-detect fields |
Index Management
Dual-Index Architecture:
The search system uses two separate indices to optimize for different use cases:
| Index Alias | Content | Collections | Documents |
|---|---|---|---|
parts_current | DIS parts catalog | parts_nhl,parts_bns,parts_vnt,parts_mch,parts_kuh,parts_hot,parts_har,parts_kin,parts_mar | ~2,500 |
tires_current | K&M Tire catalog | parts_kmt | ~7,000 |
Alias Structure:
parts_current_read ──▶ parts_v{YYYY_MM_DD} (DIS parts)
parts_current_write ──▶ parts_v{YYYY_MM_DD}
tires_current_read ──▶ tires_v{YYYY_MM_DD} (K&M Tires)
tires_current_write ──▶ tires_v{YYYY_MM_DD}Index Routing:
- Category filter
tires,wheels,wheels & tires→tires_current - Manufacturer filter
KMT,K&M,km-tire→tires_current - All other queries →
parts_current
Alias Switch Script: services/search/scripts/switch-alias-to-latest.ts
3. Elasticsearch → Website (Search Service)
Search Service API
Base URL: https://search-service-atife5uvka-ue.a.run.app
Key Endpoints:
| Endpoint | Purpose | Response |
|---|---|---|
GET /api/search | Full search with pagination | Parts + facets |
GET /api/filters | Facets only (no hits) | Aggregations |
GET /health | Service health check | Index stats |
Query Flow:
Request ──▶ Parse Params ──▶ Build Query ──▶ Elasticsearch ──▶ Transform ──▶ Response
│ │ │ │ │
│ │ │ │ ▼
?q=tire QueryFilters bool query Search response PartPreview[]
&category=tires + aggregationsResponse Structure
{
parts: PartPreview[], // Search results
pagination: {
page: number,
pageSize: number,
total: number,
totalPages: number
},
facets: {
manufacturer: [...],
category: [...],
price: {...}
}
}4. Website Data Flow (Next.js)
Server-Side Rendering
The website uses Next.js with React Server Components for initial data fetch.
Tires Page: https://www.clintontractor.net/parts/tires
User Request ──▶ Next.js Server ──▶ Search API ──▶ SSR ──▶ HTML Response
│ │ │
│ ▼ ▼
Server Component Elasticsearch Pre-rendered
tire catalogClient-Side Updates:
- Pagination: Client-side navigation
- Filters: Re-fetch via API
- Search: Debounced queries
5. Environment Configuration
Database Selection
| Environment | MongoDB Database | Usage |
|---|---|---|
| Production | crop_dev | Live website |
| Stage | crop_dev | Testing (same as prod) |
| Development | crop_dev | Local development |
Note:
crop_devis currently used for production. Historicalcrop_stagereferences have been migrated.
Elasticsearch Configuration
ELASTICSEARCH_URL=http://10.0.0.52:9200 # VPC internal
SEARCH_INDEX_NAME=parts_current # Alias name6. Deployment Triggers
GitHub Actions Workflow
File: .github/workflows/search-deploy.yml
Trigger: Push to main branch
Steps:
- Build Docker image
- Deploy to Cloud Run (preview)
- Sync MongoDB → Elasticsearch
- Switch aliases
- Shift traffic gradually
- Run smoke tests
Manual Sync
To trigger a manual sync:
# Via GitHub Actions
gh workflow run search-deploy.yml
# Via Cloud Run Job
gcloud run jobs execute sync-data-auto --region=us-east17. Monitoring
Health Checks
# Search service health
curl https://search-service-atife5uvka-ue.a.run.app/health
# Response includes:
# - elasticsearch: ok/error
# - mongodb: ok/error
# - docCount_es: number of indexed documents
# - aliasTarget: current index nameKey Metrics
| Metric | Location | Description |
|---|---|---|
docCount_es | /health | Documents in Elasticsearch |
docCount_api | /health | Documents via API count |
delta | /health | Difference (should be 0) |
8. Troubleshooting
Common Issues
Issue: Website shows 0 tires
- Check: Elasticsearch has data (
/health) - Check: Search API returns results (
/api/search?category=tires) - Check: Sync job completed successfully
Issue: Partial sync (fewer docs than expected)
- Check: Cloud Run job logs for errors
- Check: Transformer is registered for collection
- Check: Job timeout (default 1800s)
Issue: Stale data on website
- Check: Alias points to latest index
- Check: Website cache (CDN/browser)
- Trigger: New deployment or manual sync
Debug Commands
# Check index document count
curl "https://search-service-atife5uvka-ue.a.run.app/api/filters" | jq '.total'
# Check specific manufacturer
curl "https://search-service-atife5uvka-ue.a.run.app/api/search?manufacturer=kmt" | jq '.pagination.total'
# Check MongoDB collection
# Use MongoDB MCP tool: count(database: "crop_dev", collection: "parts_kmt")9. Data Schema
IndexedPart (Elasticsearch Document)
interface IndexedPart {
id: string;
slug: string;
sku: string; // CT-{MFG}-{PN}
skuVariants: string[];
partNumber: string;
pnNorm: string; // Normalized for search
title: string;
description?: string;
manufacturer: {
name: string;
code: string; // KMT, BNS, etc.
};
category: string; // "Wheels & Tires", "parts"
price?: {
list: { value: number; currency: string; }
};
media?: {
hasImages: boolean;
imageCount: number;
images?: Array<{ url: string; type: string; }>;
};
status: 'active' | 'discontinued';
sources: string[]; // ['dis'], ['km-tire']
createdAt: string;
updatedAt: string;
}10. Future Improvements
- Full K&M Sync: Currently syncing ~2000 of 7091 tires. Need to investigate timeout/chunking.
- Real-time Sync: Webhook-based updates instead of batch sync.
- Index Versioning: Implement blue-green index deployment.
- Monitoring: Add alerting for sync failures and data drift.