CROP
ProjectsParts Services

Data Flows Documentation

This document describes all data flows in the CROP-parts-services platform.

Data Flows Documentation

This document describes all data flows in the CROP-parts-services platform.

Overview

┌─────────────────────────────────────────────────────────────────────────────┐
│                           CROP Data Architecture                             │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   ┌──────────┐     ┌──────────────┐     ┌─────────────────┐                │
│   │ GCS      │────▶│ MongoDB      │────▶│ Elasticsearch   │                │
│   │ (Images) │     │ (Source)     │     │ (Search Index)  │                │
│   └──────────┘     └──────────────┘     └─────────────────┘                │
│        ▲                  ▲                                                 │
│        │                  │                                                 │
│   ┌────┴────┐      ┌─────┴──────┐                                          │
│   │ Mac     │      │ External   │                                          │
│   │ Studio  │      │ APIs       │                                          │
│   │ (Photos)│      │ (DIS/K&M)  │                                          │
│   └─────────┘      └────────────┘                                          │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

1. MongoDB → Elasticsearch Sync

Purpose: Keep search index in sync with source data.

Flow Diagram

MongoDB Collection          Transformer           Elasticsearch
     │                          │                      │
     │  parts_nhl               │                      │
     │  parts_kuh        ──────▶│  transformDoc()      │
     │  parts_kmt               │  validatePart()  ───▶│  parts_current
     │  nh_unified              │                      │  (alias)
     │                          │                      │

Entry Points

MethodEndpointDescription
POST/syncFull collection sync
POST/sync/documentSingle document sync
POST/sync/deleteRemove from ES
GET/sync/statusCompare counts

Key Files

  • services/catalog/src/routes/sync.ts - HTTP handlers
  • packages/shared-catalog/src/transformers.ts - Manufacturer transformers
  • services/search/src/services/index-rebuild-service.ts - Zero-downtime migrations

Configuration

# Batch settings
SYNC_BATCH_SIZE=500        # Documents per batch (max 2000)
SYNC_PROGRESS_INTERVAL=5000 # Log every N docs

Usage

# Full sync with dry-run
curl -X POST https://catalog-service/sync \
  -H "X-Admin-Token: $TOKEN" \
  -d '{"collection": "nh_unified", "dryRun": true}'

# Single document
curl -X POST https://catalog-service/sync/document \
  -H "X-Admin-Token: $TOKEN" \
  -d '{"collection": "parts_nhl", "partNumber": "87840296"}'

2. GCS Image Sync

Purpose: Sync product images from Mac Studio to GCS and register in MongoDB.

Flow Diagram

Mac Studio                    GCS Bucket                 MongoDB
     │                            │                         │
     │  /crop_parts/              │                         │
     │    KUHN/                   │                         │
     │    Hotsy/          ──────▶ │  gs://crop_parts/       │
     │    ...                     │    ct/kuh/123/      ───▶│  discovery.gcpPath
     │                            │    gallery/             │  media.images[]
     │  fswatch daemon            │                         │

Entry Points

TypeLocationDescription
DaemonMac Studiofswatch + crop_sync.sh
Scriptscripts/sync-gcp-to-mongo.tsManual sync
Scriptscripts/import-ct-photos.tsLocal import
HTTPPOST /pipeline/upload-photoSingle upload

GCS Path Structure

gs://crop_parts/ct/{vendor}/{partNumber}/{type}/{filename}.jpg

Examples:
  gs://crop_parts/ct/kuh/70176699/gallery/70176699-1.jpg
  gs://crop_parts/ct/nhl/87840296/360/frame-001.jpg

Sync Process

  1. Watch: fswatch monitors /Users/john/crop_parts/ for new images
  2. Queue: New files queued with 2-minute debounce
  3. Upload: gsutil cp to GCS bucket
  4. Notify: POST to catalog service to register in MongoDB
  5. Log: Success/failure logged to sync.log

Mac Studio Status Check

ssh crop-john "tail -20 /Users/john/crop_parts/logs/sync.log"
ssh crop-john "ps aux | grep fswatch"

3. DIS Inventory Check

Purpose: Check real-time inventory and pricing from DIS (Dealer Information System).

Flow Diagram

Catalog Service              DIS API                    MongoDB
     │                          │                          │
     │  vendorCode + partNumber │                          │
     │  ────────────────────▶   │                          │
     │                          │  partsInquiry            │
     │  ◀────────────────────   │                          │
     │  price, qty, available   │                          │
     │  ────────────────────────────────────────────────▶  │  inventory.dis*

Vendor Code Mapping

InternalDIS CodeNotes
NHLNHLNew Holland
KUHKUHKuhn (91% coverage)
BNSFERBriggs → Ferris
HOTHOTHotsy (92% coverage)
MCHMCHMcHale (94% coverage)
HARHARHarvest Tech (79%)
MARMARMarcrest (80%)

BNS Special Routing

BNS parts route differently based on pattern:
  - DOT format (06.0043)     → VNT (Ventrac)
  - BER*, F350*, 790xxx      → NHL
  - All other                → FER (Ferris)

API Configuration

DIS_API_URL=https://cl2342.disprism.com
DIS_DEALER_ID=C
DIS_API_KEY=<secret>

Key Files

  • services/catalog/src/services/dis-service.ts - API wrapper
  • services/catalog/src/routes/dis.ts - HTTP endpoints

Cache

  • TTL: 5 minutes
  • Max entries: 10,000 (LRU)

4. K&M Tire API

Purpose: Sync tire inventory from K&M Tire wholesale API.

Flow Diagram

K&M Tire API                 Catalog Service             MongoDB
     │                            │                         │
     │  XML Request               │                         │
     │  ◀────────────────────     │                         │
     │                            │                         │
     │  Tire data (size/price)    │                         │
     │  ────────────────────▶     │  transform ──────────▶  │  parts_kmt
     │                            │  to unified              │

Endpoints

MethodEndpointDescription
GET/km-tire/statusConnection check
GET/km-tire/vendorsAvailable brands
POST/km-tire/search/sizeSearch by size
POST/km-tire/search/partSearch by part
POST/km-tire/syncSync single size
POST/km-tire/batch-syncBulk sync
POST/km-tire/batch-sync-streamSSE progress
GET/km-tire/inventory/{pn}Real-time check

Batch Configuration

KM_TIRE_CONCURRENCY=5      # Parallel requests (1-10)
KM_TIRE_BATCH_SIZE=100     # Sizes per batch
KM_TIRE_CACHE_TTL=300000   # 5 minutes

Key Files

  • services/catalog/src/routes/km-tire.ts
  • services/catalog/src/services/km-tire-service.ts
  • services/catalog/src/services/km-tire-batch-sync.ts

5. CTR (Click-Through Rate) Tracking

Purpose: Track autocomplete suggestion performance.

Flow Diagram

Search Service               Kafka                    Analytics Service
     │                         │                            │
     │  Impression event       │                            │
     │  ─────────────────▶     │  ─────────────────────▶    │
     │                         │                            │
     │  Click event            │                            │  MongoDB
     │  ─────────────────▶     │  ─────────────────────▶    │  ctr_events
     │                         │                            │     │
     │                         │                            │     ▼
     │                         │                            │  Aggregations

Event Types

Impression Event:

{
  "sessionId": "abc123",
  "requestId": "req-456",
  "query": "hydraulic",
  "suggestions": ["hydraulic pump", "hydraulic filter"],
  "timestamp": "2026-01-21T12:00:00Z"
}

Click Event:

{
  "sessionId": "abc123",
  "requestId": "req-456",
  "query": "hydraulic",
  "clickedSuggestion": "hydraulic pump",
  "clickedIndex": 0,
  "timeToClick": 1500
}

Metrics

MetricDescription
CTRclicks / impressions
Avg PositionAverage clicked position
Time to ClickMilliseconds to click
Top QueriesBy CTR/impressions/clicks

Configuration

CTR_TRACKING_ENABLED=false  # Enable/disable
CTR_COLLECTION_NAME=autocomplete_ctr_events
CTR_RETENTION_DAYS=90
CTR_BATCH_SIZE=100
CTR_FLUSH_INTERVAL_MS=5000

Key Files

  • services/search/src/services/ctr-analytics-service.ts
  • services/analytics/src/config/env.ts

6. Stripe Webhook Flow

Purpose: Process payment events and create orders.

Flow Diagram

Stripe                    Payment Service              MongoDB
  │                            │                          │
  │  checkout.session.completed│                          │
  │  ─────────────────────────▶│                          │
  │                            │  Create order            │
  │                            │  ────────────────────▶   │  orders
  │                            │                          │
  │                            │  Create shipment         │
  │                            │  ────────────────────▶   │  shipments
  │                            │                          │
  │                            │  Delete cart             │
  │                            │  ────────────────────▶   │  carts
  │                            │                          │
  │                            │  Publish event           │
  │                            │  ──────▶ Kafka           │

Webhook Events

EventAction
checkout.session.completedCreate order
payment_intent.succeededCreate order + shipment
payment_intent.payment_failedMark failed
payment_intent.canceledCancel order
charge.refundedTrack refund
refund.failedAlert + log
charge.dispute.createdLog dispute
setup_intent.succeededSave payment method

Security

  • Signature verification: Stripe webhook secret
  • Idempotency: SHA-256 hash of payload
  • Retry logic: 500 = retryable, 400 = permanent

Configuration

STRIPE_WEBHOOK_SECRET=whsec_...
STRIPE_SECRET_KEY=sk_...

Key Files

  • services/payment/src/routes/webhooks.ts

Summary Table

FlowSourceTargetTriggerAuth
MongoDB→ESMongoDBElasticsearchHTTP/ScriptAdmin Token
GCS ImagesMac StudioGCS→MongoDBfswatch daemonGCP SA
DIS InventoryDIS APIMongoDBOn-demandAPI Key
K&M TireK&M APIMongoDBHTTP/BatchAPI Key
CTR TrackingSearch eventsMongoDBKafkaInternal
WebhooksStripeMongoDBHTTP POSTWebhook Secret

Monitoring & Debugging

Health Checks

# All services
curl https://api.crop-dev.app/health

# Elasticsearch
curl https://search-service/health/ready

# MongoDB
curl https://catalog-service/health

Logs

# Cloud Run logs
gcloud logging read "resource.type=cloud_run_revision"

# Mac Studio sync logs
ssh crop-john "tail -f /Users/john/crop_parts/logs/sync.log"

Common Issues

IssueCheckFix
ES sync stuck/sync/statusRestart sync job
Images not appearingsync.log on MacCheck fswatch process
DIS timeoutsDIS API statusIncrease timeout
Webhook failuresStripe dashboardCheck signature

Last updated: 2026-01-21

On this page