ProjectsParts Services
Data Flows Documentation
This document describes all data flows in the CROP-parts-services platform.
Data Flows Documentation
This document describes all data flows in the CROP-parts-services platform.
Overview
┌─────────────────────────────────────────────────────────────────────────────┐
│ CROP Data Architecture │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────────┐ ┌─────────────────┐ │
│ │ GCS │────▶│ MongoDB │────▶│ Elasticsearch │ │
│ │ (Images) │ │ (Source) │ │ (Search Index) │ │
│ └──────────┘ └──────────────┘ └─────────────────┘ │
│ ▲ ▲ │
│ │ │ │
│ ┌────┴────┐ ┌─────┴──────┐ │
│ │ Mac │ │ External │ │
│ │ Studio │ │ APIs │ │
│ │ (Photos)│ │ (DIS/K&M) │ │
│ └─────────┘ └────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘1. MongoDB → Elasticsearch Sync
Purpose: Keep search index in sync with source data.
Flow Diagram
MongoDB Collection Transformer Elasticsearch
│ │ │
│ parts_nhl │ │
│ parts_kuh ──────▶│ transformDoc() │
│ parts_kmt │ validatePart() ───▶│ parts_current
│ nh_unified │ │ (alias)
│ │ │Entry Points
| Method | Endpoint | Description |
|---|---|---|
| POST | /sync | Full collection sync |
| POST | /sync/document | Single document sync |
| POST | /sync/delete | Remove from ES |
| GET | /sync/status | Compare counts |
Key Files
services/catalog/src/routes/sync.ts- HTTP handlerspackages/shared-catalog/src/transformers.ts- Manufacturer transformersservices/search/src/services/index-rebuild-service.ts- Zero-downtime migrations
Configuration
# Batch settings
SYNC_BATCH_SIZE=500 # Documents per batch (max 2000)
SYNC_PROGRESS_INTERVAL=5000 # Log every N docsUsage
# Full sync with dry-run
curl -X POST https://catalog-service/sync \
-H "X-Admin-Token: $TOKEN" \
-d '{"collection": "nh_unified", "dryRun": true}'
# Single document
curl -X POST https://catalog-service/sync/document \
-H "X-Admin-Token: $TOKEN" \
-d '{"collection": "parts_nhl", "partNumber": "87840296"}'2. GCS Image Sync
Purpose: Sync product images from Mac Studio to GCS and register in MongoDB.
Flow Diagram
Mac Studio GCS Bucket MongoDB
│ │ │
│ /crop_parts/ │ │
│ KUHN/ │ │
│ Hotsy/ ──────▶ │ gs://crop_parts/ │
│ ... │ ct/kuh/123/ ───▶│ discovery.gcpPath
│ │ gallery/ │ media.images[]
│ fswatch daemon │ │Entry Points
| Type | Location | Description |
|---|---|---|
| Daemon | Mac Studio | fswatch + crop_sync.sh |
| Script | scripts/sync-gcp-to-mongo.ts | Manual sync |
| Script | scripts/import-ct-photos.ts | Local import |
| HTTP | POST /pipeline/upload-photo | Single upload |
GCS Path Structure
gs://crop_parts/ct/{vendor}/{partNumber}/{type}/{filename}.jpg
Examples:
gs://crop_parts/ct/kuh/70176699/gallery/70176699-1.jpg
gs://crop_parts/ct/nhl/87840296/360/frame-001.jpgSync Process
- Watch:
fswatchmonitors/Users/john/crop_parts/for new images - Queue: New files queued with 2-minute debounce
- Upload:
gsutil cpto GCS bucket - Notify: POST to catalog service to register in MongoDB
- Log: Success/failure logged to
sync.log
Mac Studio Status Check
ssh crop-john "tail -20 /Users/john/crop_parts/logs/sync.log"
ssh crop-john "ps aux | grep fswatch"3. DIS Inventory Check
Purpose: Check real-time inventory and pricing from DIS (Dealer Information System).
Flow Diagram
Catalog Service DIS API MongoDB
│ │ │
│ vendorCode + partNumber │ │
│ ────────────────────▶ │ │
│ │ partsInquiry │
│ ◀──────────────────── │ │
│ price, qty, available │ │
│ ────────────────────────────────────────────────▶ │ inventory.dis*Vendor Code Mapping
| Internal | DIS Code | Notes |
|---|---|---|
| NHL | NHL | New Holland |
| KUH | KUH | Kuhn (91% coverage) |
| BNS | FER | Briggs → Ferris |
| HOT | HOT | Hotsy (92% coverage) |
| MCH | MCH | McHale (94% coverage) |
| HAR | HAR | Harvest Tech (79%) |
| MAR | MAR | Marcrest (80%) |
BNS Special Routing
BNS parts route differently based on pattern:
- DOT format (06.0043) → VNT (Ventrac)
- BER*, F350*, 790xxx → NHL
- All other → FER (Ferris)API Configuration
DIS_API_URL=https://cl2342.disprism.com
DIS_DEALER_ID=C
DIS_API_KEY=<secret>Key Files
services/catalog/src/services/dis-service.ts- API wrapperservices/catalog/src/routes/dis.ts- HTTP endpoints
Cache
- TTL: 5 minutes
- Max entries: 10,000 (LRU)
4. K&M Tire API
Purpose: Sync tire inventory from K&M Tire wholesale API.
Flow Diagram
K&M Tire API Catalog Service MongoDB
│ │ │
│ XML Request │ │
│ ◀──────────────────── │ │
│ │ │
│ Tire data (size/price) │ │
│ ────────────────────▶ │ transform ──────────▶ │ parts_kmt
│ │ to unified │Endpoints
| Method | Endpoint | Description |
|---|---|---|
| GET | /km-tire/status | Connection check |
| GET | /km-tire/vendors | Available brands |
| POST | /km-tire/search/size | Search by size |
| POST | /km-tire/search/part | Search by part |
| POST | /km-tire/sync | Sync single size |
| POST | /km-tire/batch-sync | Bulk sync |
| POST | /km-tire/batch-sync-stream | SSE progress |
| GET | /km-tire/inventory/{pn} | Real-time check |
Batch Configuration
KM_TIRE_CONCURRENCY=5 # Parallel requests (1-10)
KM_TIRE_BATCH_SIZE=100 # Sizes per batch
KM_TIRE_CACHE_TTL=300000 # 5 minutesKey Files
services/catalog/src/routes/km-tire.tsservices/catalog/src/services/km-tire-service.tsservices/catalog/src/services/km-tire-batch-sync.ts
5. CTR (Click-Through Rate) Tracking
Purpose: Track autocomplete suggestion performance.
Flow Diagram
Search Service Kafka Analytics Service
│ │ │
│ Impression event │ │
│ ─────────────────▶ │ ─────────────────────▶ │
│ │ │
│ Click event │ │ MongoDB
│ ─────────────────▶ │ ─────────────────────▶ │ ctr_events
│ │ │ │
│ │ │ ▼
│ │ │ AggregationsEvent Types
Impression Event:
{
"sessionId": "abc123",
"requestId": "req-456",
"query": "hydraulic",
"suggestions": ["hydraulic pump", "hydraulic filter"],
"timestamp": "2026-01-21T12:00:00Z"
}Click Event:
{
"sessionId": "abc123",
"requestId": "req-456",
"query": "hydraulic",
"clickedSuggestion": "hydraulic pump",
"clickedIndex": 0,
"timeToClick": 1500
}Metrics
| Metric | Description |
|---|---|
| CTR | clicks / impressions |
| Avg Position | Average clicked position |
| Time to Click | Milliseconds to click |
| Top Queries | By CTR/impressions/clicks |
Configuration
CTR_TRACKING_ENABLED=false # Enable/disable
CTR_COLLECTION_NAME=autocomplete_ctr_events
CTR_RETENTION_DAYS=90
CTR_BATCH_SIZE=100
CTR_FLUSH_INTERVAL_MS=5000Key Files
services/search/src/services/ctr-analytics-service.tsservices/analytics/src/config/env.ts
6. Stripe Webhook Flow
Purpose: Process payment events and create orders.
Flow Diagram
Stripe Payment Service MongoDB
│ │ │
│ checkout.session.completed│ │
│ ─────────────────────────▶│ │
│ │ Create order │
│ │ ────────────────────▶ │ orders
│ │ │
│ │ Create shipment │
│ │ ────────────────────▶ │ shipments
│ │ │
│ │ Delete cart │
│ │ ────────────────────▶ │ carts
│ │ │
│ │ Publish event │
│ │ ──────▶ Kafka │Webhook Events
| Event | Action |
|---|---|
checkout.session.completed | Create order |
payment_intent.succeeded | Create order + shipment |
payment_intent.payment_failed | Mark failed |
payment_intent.canceled | Cancel order |
charge.refunded | Track refund |
refund.failed | Alert + log |
charge.dispute.created | Log dispute |
setup_intent.succeeded | Save payment method |
Security
- Signature verification: Stripe webhook secret
- Idempotency: SHA-256 hash of payload
- Retry logic: 500 = retryable, 400 = permanent
Configuration
STRIPE_WEBHOOK_SECRET=whsec_...
STRIPE_SECRET_KEY=sk_...Key Files
services/payment/src/routes/webhooks.ts
Summary Table
| Flow | Source | Target | Trigger | Auth |
|---|---|---|---|---|
| MongoDB→ES | MongoDB | Elasticsearch | HTTP/Script | Admin Token |
| GCS Images | Mac Studio | GCS→MongoDB | fswatch daemon | GCP SA |
| DIS Inventory | DIS API | MongoDB | On-demand | API Key |
| K&M Tire | K&M API | MongoDB | HTTP/Batch | API Key |
| CTR Tracking | Search events | MongoDB | Kafka | Internal |
| Webhooks | Stripe | MongoDB | HTTP POST | Webhook Secret |
Monitoring & Debugging
Health Checks
# All services
curl https://api.crop-dev.app/health
# Elasticsearch
curl https://search-service/health/ready
# MongoDB
curl https://catalog-service/healthLogs
# Cloud Run logs
gcloud logging read "resource.type=cloud_run_revision"
# Mac Studio sync logs
ssh crop-john "tail -f /Users/john/crop_parts/logs/sync.log"Common Issues
| Issue | Check | Fix |
|---|---|---|
| ES sync stuck | /sync/status | Restart sync job |
| Images not appearing | sync.log on Mac | Check fswatch process |
| DIS timeouts | DIS API status | Increase timeout |
| Webhook failures | Stripe dashboard | Check signature |
Last updated: 2026-01-21