Skip to content

thecodingmage/LiveCold-Pathway-RAG

Repository files navigation

๐ŸงŠ LiveCold

Real-Time Cold Chain Intelligence Platform

AI-powered monitoring โ€ข Autonomous diversion decisions โ€ข Live SOP compliance Built on Pathway real-time streaming framework

Python Pathway Gemini MQTT Flask Docker


India loses โ‚น92,000 Crore annually in cold chain failures. LiveCold prevents this with real-time AI that monitors, predicts, decides, and acts โ€” in milliseconds.

Features ยท Architecture ยท Quick Start ยท How It Works ยท API Reference


๐ŸŽฏ What LiveCold Does

LiveCold is an end-to-end cold chain intelligence platform that goes beyond monitoring โ€” it thinks and acts:

โŒ Traditional: Sensor โ†’ Alert โ†’ Human reads email โ†’ Manual decision โ†’ Cargo already spoiled

โœ… LiveCold:    Sensor โ†’ Anomaly Filter โ†’ Risk Model โ†’ Cost Optimizer โ†’ Auto-Divert โ†’ Driver notified in 2 seconds
Capability Description
๐ŸŒก๏ธ 4-Stream IoT Monitoring Temperature, GPS, reefer telemetry, and door/shock events โ€” 25 trucks streaming every 2 seconds
๐Ÿ›ก๏ธ 4-Layer Anomaly Detection Filters sensor glitches before they trigger false decisions (physical bounds, rate-of-change, z-score, stuck sensor)
๐Ÿง  Sigmoid Risk Model Probability-based risk scoring โ€” not binary IF/ELSE rules
๐Ÿ’ฐ Cost-Benefit Diversion Compares expected cargo loss vs diversion cost โ€” only diverts when economically rational
๐Ÿญ Intelligent Hub Matching 15 Indian hubs filtered by temp-zone compatibility, capacity, and traffic-aware ETA
๐Ÿ”€ Live GPS Re-Routing Diverted trucks physically change course on the map toward the assigned hub
๐Ÿ“ฒ Interactive Notifications Drivers receive diversion proposals and must ACCEPT before GPS re-routing
๐Ÿ“„ Pathway RAG Pipeline Edit the SOP โ†’ AI learns it in 2 seconds โ†’ answers cite updated ยงsections
๐ŸŒฟ Carbon Credits Engine Calculates COโ‚‚ saved from prevented food waste, converts to โ‚น credits
โš–๏ธ Per-Shipment Routing Modes SAFETY / BALANCED / ECO โ€” configurable mid-journey with bidirectional MQTT sync

๐Ÿ—๏ธ System Architecture

flowchart TB
    subgraph SIM["๐Ÿšš IoT Simulation Layer (25 Trucks ร— 4 Sensors ร— Every 2s)"]
        TS["๐ŸŒก๏ธ Temperature Sensor"]
        GPS["๐Ÿ“ GPS Tracker"]
        RF["โ„๏ธ Reefer Telemetry"]
        DR["๐Ÿšช Door / Shock Sensor"]
    end

    subgraph MQTT["๐Ÿ“ก MQTT Broker (Mosquitto)"]
        T1["livecold/temp"]
        T2["livecold/gps"]
        T3["livecold/reefer"]
        T4["livecold/door"]
    end

    subgraph BRAIN["๐Ÿง  Intelligence Pipeline"]
        AD["๐Ÿ›ก๏ธ 4-Layer Anomaly\nDetector"]
        RM["๐Ÿ“ˆ Sigmoid Risk Model\nP(spoilage)"]
        DO["๐Ÿ’ฐ Diversion Cost\nOptimizer"]
        ME["๐Ÿ“Š Metrics Engine\nโ‚น saved โ€ข COโ‚‚ โ€ข rates"]
        HM["๐Ÿญ Hub Manager\n15 hubs โ€ข traffic-aware ETA\ntemp-zone โ€ข capacity"]
    end

    subgraph ACTION["โšก Decision & Action Layer"]
        AN["๐Ÿ“ฒ Alert Notifier\nWhatsApp โ†’ Driver\nWhatsApp โ†’ Ops\nEmail โ†’ Client"]
        GR["๐Ÿ”€ GPS Re-Router\nPublish to livecold/divert\nTruck changes course"]
    end

    subgraph RAG["๐Ÿ“š Pathway RAG Pipeline (:8765)"]
        FS["pw.io.fs.read\n(streaming mode)"]
        SOP["๐Ÿ“„ SOP Document\n(Single Source of Truth)"]
        LLM["๐Ÿค– Gemini LLM\nSOP-cited checklists"]
    end

    subgraph DASH["๐ŸŒ Presentation Layer (:5050)"]
        MAP["๐Ÿ—บ๏ธ Live Dashboard\nMap โ€ข Alerts โ€ข KPIs\nRouting Mode Toggles"]
        DRV["๐Ÿ“ฑ Driver Dashboard\nMobile View\nNotification Inbox"]
        ALY["๐Ÿ“ˆ Analytics\nCarbon Credits\nAnomaly Breakdown"]
        SED["๐Ÿ“„ SOP Editor\nLive Edit + RAG Test"]
    end

    TS --> T1
    GPS --> T2
    RF --> T3
    DR --> T4

    T1 & T2 --> AD
    AD -->|"Clean data only\n(~9% filtered)"| RM
    RM --> DO
    DO --> ME
    DO -->|"DIVERT decision"| HM
    HM --> AN
    HM --> GR
    GR -->|"New GPS target"| MQTT

    T3 & T4 --> DASH
    ME -->|"livecold/decisions"| DASH

    SOP --> FS
    FS --> LLM
    LLM --> DASH

    style SIM fill:#0d1b2a,stroke:#4fc3f7,stroke-width:2px,color:#e0e6f0
    style MQTT fill:#0d1b2a,stroke:#ffc107,stroke-width:2px,color:#e0e6f0
    style BRAIN fill:#0d1b2a,stroke:#ff5252,stroke-width:2px,color:#e0e6f0
    style ACTION fill:#0d1b2a,stroke:#ce93d8,stroke-width:2px,color:#e0e6f0
    style RAG fill:#0d1b2a,stroke:#81d4fa,stroke-width:2px,color:#e0e6f0
    style DASH fill:#0d1b2a,stroke:#4caf50,stroke-width:2px,color:#e0e6f0
Loading

๐Ÿ”„ Data Flow

sequenceDiagram
    participant S as ๐Ÿšš Simulators
    participant M as ๐Ÿ“ก MQTT
    participant A as ๐Ÿ›ก๏ธ Anomaly Detector
    participant P as ๐Ÿง  Risk + Decision
    participant H as ๐Ÿญ Hub Manager
    participant R as ๐Ÿ“š Pathway RAG
    participant D as ๐ŸŒ Dashboard

    loop Every 2 seconds (25 trucks)
        S->>M: Publish temp, GPS, reefer, door events
        M->>A: livecold/temp readings
        A->>A: L1 Bounds โ†’ L2 Rate โ†’ L3 Z-Score โ†’ L4 Stuck
        alt Anomaly detected (~9%)
            A--xP: โŒ Discard / use last good reading
        else Clean reading
            A->>P: โœ… Forward verified temp
        end
        P->>P: Sigmoid risk = ฯƒ(dev ร— 1.8 + exp ร— 0.2) ร— ETA
        alt Risk > threshold
            P->>H: ๐Ÿšจ Find best hub (temp-zone + capacity + traffic ETA)
            H-->>S: ๐Ÿ”€ DIVERT order via livecold/divert (GPS re-routes)
            H->>D: ๐Ÿ“ฒ WhatsApp to driver (hub name, Maps link, ETA)
            D->>R: POST /v2/answer (SOP query for product)
            R->>R: Pathway streams SOP โ†’ Gemini LLM
            R-->>D: SOP checklist (ยง3.2, ยง4.1...)
        else Risk โ‰ค threshold
            P->>D: โœ… CONTINUE
        end
        M->>D: Live update map, metrics, shipment cards (SSE)
    end
Loading

โšก How It Works

The Story of a Shipment

1. Departure โ€” 25 trucks leave cities across India carrying vaccines, dairy, seafood, frozen meat, and more. Temperature ranges come from the SOP document โ€” the single source of truth.

2. Anomaly Filtering โ€” A sensor reads -999ยฐC? Our 4-layer filter catches it instantly:

Layer What It Catches Action
L1: Physical Bounds Impossible temps (500ยฐC, -999ยฐC) โŒ Discard
L2: Rate-of-Change Sensor faults (20ยฐC jump in 2s) ๐Ÿ”„ Use last good reading
L3: Z-Score Statistical outliers (3.5ฯƒ from mean) ๐Ÿ”„ Use rolling mean
L4: Stuck Sensor 10 identical readings โš ๏ธ Flag for maintenance

3. Risk Calculation โ€” Clean data enters the sigmoid risk model:

risk = ฯƒ(deviation ร— 1.8 + exposure ร— 0.2) ร— ETA_factor

4. Diversion Decision โ€” Cost optimizer compares:

Expected Loss (Continue) = P(spoilage) ร— Cargo Value     = โ‚น6,96,000
Total Divert Cost        = Fuel + Residual Risk ร— Value  = โ‚น1,596

โ‚น6,96,000 > โ‚น1,596 โ†’ DIVERT โœ…  (Net saving: โ‚น6,94,404)

5. Hub Selection โ€” Not just "nearest" โ€” the smartest hub. 5 filters:

  • โœ… Available (not in maintenance)
  • ๐ŸŒก๏ธ Correct temperature zone (Ultra-Cold for vaccines, Chilled for dairy)
  • ๐Ÿ“ฆ Has capacity (available tonnes > cargo weight)
  • ๐Ÿš— Best traffic-aware ETA (congestion zones for Delhi, Mumbai, Bangalore...)
  • ๐Ÿ”ง Repair station capability (for anomaly-triggered alerts)

6. Action & Driver Acceptance โ€” Driver gets a dashboard notification with the proposed hub, Google Maps link, and ETA. The truck only diverts after the driver clicks Accept on their interactive mobile dashboard. The main dashboard updates live.


๐ŸŽ›๏ธ Three Routing Modes

Each shipment gets a routing mode that changes how the diversion optimizer behaves:

Mode Default For Behavior
๐Ÿ›ก๏ธ SAFETY Vaccines, Pharmaceuticals Nearest hub always, ignore cost
โš–๏ธ BALANCED Dairy, Seafood, Frozen Meat, Ice Cream Optimize cost vs. safety
๐ŸŒฟ ECO Fruits, Flowers Minimize COโ‚‚, accept longer detours

Modes can be changed mid-journey by the operations manager. The change syncs bidirectionally between dashboard and pipeline via MQTT.


๐Ÿ“š Pathway Integration โ€” Real-Time RAG

LiveCold uses Pathway as the core streaming framework for live document intelligence:

# Pathway watches SOP files in real-time (streaming mode)
documents = pw.io.fs.read(
    path="./watched_docs/",
    format="binary",
    mode="streaming",      # โ† Detects file changes automatically
    with_metadata=True,
)

# REST API accepts natural language queries
queries, response_writer = pw.io.http.rest_connector(
    host="0.0.0.0", port=8765,
    route="/v2/answer",
    schema=QuerySchema,
)

# LLM reads FRESH SOP content on every query
results = queries.select(result=build_answer(queries.prompt))

The magic: Edit the SOP file โ†’ Pathway detects the change โ†’ Next query automatically uses updated content โ†’ Answer cites the new ยงsections. No restart. No redeployment. 2-second latency.


๐Ÿš€ Quick Start

Prerequisites

  • Python 3.11+
  • Mosquitto MQTT broker
  • Google Gemini API key

Local Setup

# 1. Clone & setup
git clone https://github.com/di35117/Pathway-Hackathon.git
cd Pathway-Hackathon
git checkout Tarun

# 2. Create virtual environment
python3.11 -m venv .venv-slim
source .venv-slim/bin/activate
pip install -r requirements-slim.txt

# 3. Configure
echo "GOOGLE_API_KEY=your_gemini_key" > .env

# 4. Start MQTT broker
mosquitto -c mosquitto.conf -d

# 5. Launch (3 terminals)
python main.py mqtt         # Terminal 1: Intelligence Pipeline
python main.py dashboard    # Terminal 2: Dashboard (http://localhost:5050)
python main.py sim-all      # Terminal 3: All 4 IoT Simulators

# Optional: Pathway RAG Pipeline
python main.py rag-v2       # Terminal 4: RAG API (http://localhost:8765)

Docker

echo "GOOGLE_API_KEY=your_gemini_key" > .env
docker-compose up -d
# Dashboard: http://localhost:5050

Access Points

Service URL
๐ŸŒ Main Dashboard http://localhost:5050
๐Ÿ“ฑ Driver Dashboard http://localhost:5050/driver/SHP_1
๐Ÿ“ˆ Analytics http://localhost:5050/analytics
๐Ÿ“„ SOP Editor http://localhost:5050/sop-editor
๐Ÿ“š RAG API http://localhost:8765/v2/answer
โค๏ธ Health Check http://localhost:5050/health

๐Ÿ“ก API Reference

Pathway RAG โ€” SOP Q&A

curl -X POST http://localhost:8765/v2/answer \
  -H "Content-Type: application/json" \
  -d '{"prompt": "What should I do if dairy temperature exceeds 8ยฐC?"}'

Dashboard APIs

Endpoint Method Description
/api/shipments GET All 25 active shipments with live state
/api/alerts GET Recent DIVERT + door open alerts (last 50)
/api/metrics GET System-wide KPIs (โ‚น saved, COโ‚‚, diversions)
/api/stream GET Server-Sent Events for real-time updates
/api/hubs GET All 15 cold storage hubs with status
/api/nearest-hubs/<id> GET 3 nearest compatible hubs for a shipment
/api/history/<id> GET Temperature history + 30-min prediction
/api/analytics GET Carbon credits, anomaly breakdown, financials
/api/anomalies GET Global + per-shipment anomaly detection stats
/api/notifications GET All WhatsApp/email notification log
/api/shipment-report/<id> GET Full compliance report for a shipment
/api/routing-mode/<id> GET/POST Get or change routing mode (SAFETY/BALANCED/ECO)
/api/sop-content GET/POST Read or edit the SOP document
/api/rag-query POST Query SOP via RAG (with LLM fallback)
/api/sop-status GET SOP sync status (last modified, change count)

๐Ÿ“‚ Project Structure

hack/
โ”œโ”€โ”€ main.py                         # ๐ŸŽฎ Unified CLI (rag, dashboard, mqtt, sim-all, etc.)
โ”‚
โ”œโ”€โ”€ pipeline/
โ”‚   โ””โ”€โ”€ livecold_pipeline.py        # ๐Ÿง  Central brain: MQTT โ†’ Anomaly โ†’ Risk โ†’ Decision โ†’ Publish
โ”‚
โ”œโ”€โ”€ decision_engine/
โ”‚   โ”œโ”€โ”€ evaluator.py                # Orchestrator: risk โ†’ diversion โ†’ metrics
โ”‚   โ”œโ”€โ”€ risk_model.py               # Sigmoid P(spoilage) calculator
โ”‚   โ”œโ”€โ”€ diversion_optimizer.py      # Cost vs loss optimizer (supports cost/eco modes)
โ”‚   โ””โ”€โ”€ metrics_engine.py           # โ‚น saved, COโ‚‚ delta, diversion rates
โ”‚
โ”œโ”€โ”€ anomaly_detector.py             # ๐Ÿ›ก๏ธ 4-layer anomaly filter (267 lines of defense)
โ”œโ”€โ”€ hub_manager.py                  # ๐Ÿญ 15 hubs, traffic-aware ETA, temp-zone matching
โ”œโ”€โ”€ alert_notifier.py               # ๐Ÿ“ฒ WhatsApp + Email notifications (3 conditions)
โ”œโ”€โ”€ sop_parser.py                   # ๐Ÿ“„ Reads temp ranges from SOP (single source of truth)
โ”‚
โ”œโ”€โ”€ sim/
โ”‚   โ”œโ”€โ”€ shipment_factory.py         # ๐Ÿญ 25 shipments with scripted demo scenarios
โ”‚   โ”œโ”€โ”€ temp_simulator.py           # ๐ŸŒก๏ธ Temperature with drift/stable/critical modes
โ”‚   โ”œโ”€โ”€ gps_simulator.py            # ๐Ÿ“ GPS with live diversion re-routing
โ”‚   โ”œโ”€โ”€ reefer_simulator.py         # โ„๏ธ Compressor status, power draw, cycles
โ”‚   โ”œโ”€โ”€ door_simulator.py           # ๐Ÿšช Door open/close + shock events
โ”‚   โ””โ”€โ”€ config.py                   # 20 Indian cities, MQTT topics, intervals
โ”‚
โ”œโ”€โ”€ pathway_rag_pipeline_v2.py      # ๐Ÿ“š Pathway streaming RAG (pw.io.fs.read + REST)
โ”œโ”€โ”€ pathway_metrics_pipeline.py     # ๐Ÿ“Š Pathway metrics aggregation
โ”œโ”€โ”€ pathway_integrated_full.py      # ๐Ÿ”— Full integrated Pathway pipeline
โ”‚
โ”‚   โ”œโ”€โ”€ dashboard/
โ”‚   โ”‚   โ”œโ”€โ”€ app.py                      # ๐ŸŒ Flask server (1061 lines, 20+ routes)
โ”‚   โ”‚   โ””โ”€โ”€ templates/
โ”‚   โ”‚       โ”œโ”€โ”€ index_1.html            # Main dashboard (map + alerts + metrics)
โ”‚   โ”‚       โ”œโ”€โ”€ driver_1.html           # ๐Ÿ“ฑ Interactive mobile driver dashboard
โ”‚   โ”‚       โ”œโ”€โ”€ analytics_1.html        # ๐Ÿ“ˆ Carbon credits + anomaly analytics
โ”‚   โ”‚       โ””โ”€โ”€ sop_editor_1.html       # ๐Ÿ“„ Live SOP editor + RAG tester
โ”‚
โ”œโ”€โ”€ watched_docs/
โ”‚   โ””โ”€โ”€ cold_chain_SOP.txt          # ๐Ÿ“‹ SOP document (10 sections, 399 lines)
โ”‚
โ”œโ”€โ”€ Dockerfile                      # ๐Ÿณ Multi-component Docker image
โ”œโ”€โ”€ docker-compose.yml              # Full stack with Mosquitto
โ”œโ”€โ”€ requirements-slim.txt           # Python dependencies
โ””โ”€โ”€ mosquitto.conf                  # MQTT broker config

๐Ÿ”ง Configuration

Variable Default Description
GOOGLE_API_KEY โ€” Primary Gemini API key (required for RAG)
GOOGLE_API_KEY_2 โ€” Backup API key (auto-rotates on rate limit)
MQTT_HOST localhost MQTT broker hostname

๐Ÿ“Š Live Demo Metrics (25 shipments)

Metric Value
๐Ÿšš Active Shipments 25 across 20 Indian cities
๐ŸŒก๏ธ Sensor Events/Second ~50
๐Ÿšจ Diversions Triggered 15-17 within first 30 seconds
๐Ÿ›ก๏ธ Anomalies Filtered ~9% of readings (zero false diversions)
๐Ÿ’ฐ Cargo Value Monitored โ‚น2.71 Cr
๐Ÿ’ฐ Cargo Saved โ‚น1.5+ Cr
๐ŸŒฟ COโ‚‚ Impact Tracked 170+ kg
๐Ÿญ Hub Database 15 hubs across India
๐Ÿ“ฆ Product Types 9 (Vaccines, Meat, Dairy, Seafood, Vegetables, Fruits, Pharma, Ice Cream, Flowers)

๐Ÿ› ๏ธ Tech Stack

Layer Technology
Real-Time Streaming Pathway โ€” pw.io.fs.read, pw.io.http.rest_connector, UDFs
LLM Google Gemini 2.5 Flash (via LiteLLM, with 5-model fallback chain)
Message Broker Eclipse Mosquitto (MQTT)
Backend Flask + paho-mqtt
Frontend Leaflet.js (maps) + Server-Sent Events + Vanilla JS
Anomaly Detection Custom 4-layer engine (physical bounds, rate-of-change, z-score, stuck sensor)
Risk Model Sigmoid-based probability with exposure tracking
Containerization Docker + Docker Compose

๐Ÿ† Key Differentiators

  1. Math-based, not rule-based โ€” Sigmoid risk probability, not IF temp > threshold
  2. Smartest hub, not nearest โ€” Traffic-aware ETA with congestion zone modeling
  3. Actual re-routing โ€” GPS simulator physically moves trucks to diversion hubs
  4. Live document intelligence โ€” Edit the SOP, AI learns it in 2 seconds via Pathway streaming
  5. Zero false diversions โ€” 4-layer anomaly filter catches sensor glitches before the risk model
  6. Economically rational โ€” Every โ‚น1 spent on diversion is justified by โ‚น10+ in prevented loss
  7. Sustainable โ€” Carbon credits calculated for every prevented waste event
  8. Production-ready patterns โ€” Thread-safe state, API key rotation, rate-limit handling, dedup logic

๐Ÿ‘ฅ Team

Built for the Pathway Real-Time AI Hackathon โ€” demonstrating real-time streaming intelligence for India's cold chain logistics.


๐Ÿ“„ License

MIT

About

๐Ÿฅ‡ 1st Place at Hack for Green Bharat. LiveCold leverages Pathway to provide intelligent, real-time insights for reducing energy waste in temperature-sensitive supply chains.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors