AI-powered monitoring โข Autonomous diversion decisions โข Live SOP compliance Built on Pathway real-time streaming framework
India loses โน92,000 Crore annually in cold chain failures. LiveCold prevents this with real-time AI that monitors, predicts, decides, and acts โ in milliseconds.
Features ยท Architecture ยท Quick Start ยท How It Works ยท API Reference
LiveCold is an end-to-end cold chain intelligence platform that goes beyond monitoring โ it thinks and acts:
โ Traditional: Sensor โ Alert โ Human reads email โ Manual decision โ Cargo already spoiled
โ
LiveCold: Sensor โ Anomaly Filter โ Risk Model โ Cost Optimizer โ Auto-Divert โ Driver notified in 2 seconds
| Capability | Description |
|---|---|
| ๐ก๏ธ 4-Stream IoT Monitoring | Temperature, GPS, reefer telemetry, and door/shock events โ 25 trucks streaming every 2 seconds |
| ๐ก๏ธ 4-Layer Anomaly Detection | Filters sensor glitches before they trigger false decisions (physical bounds, rate-of-change, z-score, stuck sensor) |
| ๐ง Sigmoid Risk Model | Probability-based risk scoring โ not binary IF/ELSE rules |
| ๐ฐ Cost-Benefit Diversion | Compares expected cargo loss vs diversion cost โ only diverts when economically rational |
| ๐ญ Intelligent Hub Matching | 15 Indian hubs filtered by temp-zone compatibility, capacity, and traffic-aware ETA |
| ๐ Live GPS Re-Routing | Diverted trucks physically change course on the map toward the assigned hub |
| ๐ฒ Interactive Notifications | Drivers receive diversion proposals and must ACCEPT before GPS re-routing |
| ๐ Pathway RAG Pipeline | Edit the SOP โ AI learns it in 2 seconds โ answers cite updated ยงsections |
| ๐ฟ Carbon Credits Engine | Calculates COโ saved from prevented food waste, converts to โน credits |
| โ๏ธ Per-Shipment Routing Modes | SAFETY / BALANCED / ECO โ configurable mid-journey with bidirectional MQTT sync |
flowchart TB
subgraph SIM["๐ IoT Simulation Layer (25 Trucks ร 4 Sensors ร Every 2s)"]
TS["๐ก๏ธ Temperature Sensor"]
GPS["๐ GPS Tracker"]
RF["โ๏ธ Reefer Telemetry"]
DR["๐ช Door / Shock Sensor"]
end
subgraph MQTT["๐ก MQTT Broker (Mosquitto)"]
T1["livecold/temp"]
T2["livecold/gps"]
T3["livecold/reefer"]
T4["livecold/door"]
end
subgraph BRAIN["๐ง Intelligence Pipeline"]
AD["๐ก๏ธ 4-Layer Anomaly\nDetector"]
RM["๐ Sigmoid Risk Model\nP(spoilage)"]
DO["๐ฐ Diversion Cost\nOptimizer"]
ME["๐ Metrics Engine\nโน saved โข COโ โข rates"]
HM["๐ญ Hub Manager\n15 hubs โข traffic-aware ETA\ntemp-zone โข capacity"]
end
subgraph ACTION["โก Decision & Action Layer"]
AN["๐ฒ Alert Notifier\nWhatsApp โ Driver\nWhatsApp โ Ops\nEmail โ Client"]
GR["๐ GPS Re-Router\nPublish to livecold/divert\nTruck changes course"]
end
subgraph RAG["๐ Pathway RAG Pipeline (:8765)"]
FS["pw.io.fs.read\n(streaming mode)"]
SOP["๐ SOP Document\n(Single Source of Truth)"]
LLM["๐ค Gemini LLM\nSOP-cited checklists"]
end
subgraph DASH["๐ Presentation Layer (:5050)"]
MAP["๐บ๏ธ Live Dashboard\nMap โข Alerts โข KPIs\nRouting Mode Toggles"]
DRV["๐ฑ Driver Dashboard\nMobile View\nNotification Inbox"]
ALY["๐ Analytics\nCarbon Credits\nAnomaly Breakdown"]
SED["๐ SOP Editor\nLive Edit + RAG Test"]
end
TS --> T1
GPS --> T2
RF --> T3
DR --> T4
T1 & T2 --> AD
AD -->|"Clean data only\n(~9% filtered)"| RM
RM --> DO
DO --> ME
DO -->|"DIVERT decision"| HM
HM --> AN
HM --> GR
GR -->|"New GPS target"| MQTT
T3 & T4 --> DASH
ME -->|"livecold/decisions"| DASH
SOP --> FS
FS --> LLM
LLM --> DASH
style SIM fill:#0d1b2a,stroke:#4fc3f7,stroke-width:2px,color:#e0e6f0
style MQTT fill:#0d1b2a,stroke:#ffc107,stroke-width:2px,color:#e0e6f0
style BRAIN fill:#0d1b2a,stroke:#ff5252,stroke-width:2px,color:#e0e6f0
style ACTION fill:#0d1b2a,stroke:#ce93d8,stroke-width:2px,color:#e0e6f0
style RAG fill:#0d1b2a,stroke:#81d4fa,stroke-width:2px,color:#e0e6f0
style DASH fill:#0d1b2a,stroke:#4caf50,stroke-width:2px,color:#e0e6f0
sequenceDiagram
participant S as ๐ Simulators
participant M as ๐ก MQTT
participant A as ๐ก๏ธ Anomaly Detector
participant P as ๐ง Risk + Decision
participant H as ๐ญ Hub Manager
participant R as ๐ Pathway RAG
participant D as ๐ Dashboard
loop Every 2 seconds (25 trucks)
S->>M: Publish temp, GPS, reefer, door events
M->>A: livecold/temp readings
A->>A: L1 Bounds โ L2 Rate โ L3 Z-Score โ L4 Stuck
alt Anomaly detected (~9%)
A--xP: โ Discard / use last good reading
else Clean reading
A->>P: โ
Forward verified temp
end
P->>P: Sigmoid risk = ฯ(dev ร 1.8 + exp ร 0.2) ร ETA
alt Risk > threshold
P->>H: ๐จ Find best hub (temp-zone + capacity + traffic ETA)
H-->>S: ๐ DIVERT order via livecold/divert (GPS re-routes)
H->>D: ๐ฒ WhatsApp to driver (hub name, Maps link, ETA)
D->>R: POST /v2/answer (SOP query for product)
R->>R: Pathway streams SOP โ Gemini LLM
R-->>D: SOP checklist (ยง3.2, ยง4.1...)
else Risk โค threshold
P->>D: โ
CONTINUE
end
M->>D: Live update map, metrics, shipment cards (SSE)
end
1. Departure โ 25 trucks leave cities across India carrying vaccines, dairy, seafood, frozen meat, and more. Temperature ranges come from the SOP document โ the single source of truth.
2. Anomaly Filtering โ A sensor reads -999ยฐC? Our 4-layer filter catches it instantly:
| Layer | What It Catches | Action |
|---|---|---|
| L1: Physical Bounds | Impossible temps (500ยฐC, -999ยฐC) | โ Discard |
| L2: Rate-of-Change | Sensor faults (20ยฐC jump in 2s) | ๐ Use last good reading |
| L3: Z-Score | Statistical outliers (3.5ฯ from mean) | ๐ Use rolling mean |
| L4: Stuck Sensor | 10 identical readings |
3. Risk Calculation โ Clean data enters the sigmoid risk model:
risk = ฯ(deviation ร 1.8 + exposure ร 0.2) ร ETA_factor
4. Diversion Decision โ Cost optimizer compares:
Expected Loss (Continue) = P(spoilage) ร Cargo Value = โน6,96,000
Total Divert Cost = Fuel + Residual Risk ร Value = โน1,596
โน6,96,000 > โน1,596 โ DIVERT โ
(Net saving: โน6,94,404)
5. Hub Selection โ Not just "nearest" โ the smartest hub. 5 filters:
- โ Available (not in maintenance)
- ๐ก๏ธ Correct temperature zone (Ultra-Cold for vaccines, Chilled for dairy)
- ๐ฆ Has capacity (available tonnes > cargo weight)
- ๐ Best traffic-aware ETA (congestion zones for Delhi, Mumbai, Bangalore...)
- ๐ง Repair station capability (for anomaly-triggered alerts)
6. Action & Driver Acceptance โ Driver gets a dashboard notification with the proposed hub, Google Maps link, and ETA. The truck only diverts after the driver clicks Accept on their interactive mobile dashboard. The main dashboard updates live.
Each shipment gets a routing mode that changes how the diversion optimizer behaves:
| Mode | Default For | Behavior |
|---|---|---|
| ๐ก๏ธ SAFETY | Vaccines, Pharmaceuticals | Nearest hub always, ignore cost |
| โ๏ธ BALANCED | Dairy, Seafood, Frozen Meat, Ice Cream | Optimize cost vs. safety |
| ๐ฟ ECO | Fruits, Flowers | Minimize COโ, accept longer detours |
Modes can be changed mid-journey by the operations manager. The change syncs bidirectionally between dashboard and pipeline via MQTT.
LiveCold uses Pathway as the core streaming framework for live document intelligence:
# Pathway watches SOP files in real-time (streaming mode)
documents = pw.io.fs.read(
path="./watched_docs/",
format="binary",
mode="streaming", # โ Detects file changes automatically
with_metadata=True,
)
# REST API accepts natural language queries
queries, response_writer = pw.io.http.rest_connector(
host="0.0.0.0", port=8765,
route="/v2/answer",
schema=QuerySchema,
)
# LLM reads FRESH SOP content on every query
results = queries.select(result=build_answer(queries.prompt))The magic: Edit the SOP file โ Pathway detects the change โ Next query automatically uses updated content โ Answer cites the new ยงsections. No restart. No redeployment. 2-second latency.
- Python 3.11+
- Mosquitto MQTT broker
- Google Gemini API key
# 1. Clone & setup
git clone https://github.com/di35117/Pathway-Hackathon.git
cd Pathway-Hackathon
git checkout Tarun
# 2. Create virtual environment
python3.11 -m venv .venv-slim
source .venv-slim/bin/activate
pip install -r requirements-slim.txt
# 3. Configure
echo "GOOGLE_API_KEY=your_gemini_key" > .env
# 4. Start MQTT broker
mosquitto -c mosquitto.conf -d
# 5. Launch (3 terminals)
python main.py mqtt # Terminal 1: Intelligence Pipeline
python main.py dashboard # Terminal 2: Dashboard (http://localhost:5050)
python main.py sim-all # Terminal 3: All 4 IoT Simulators
# Optional: Pathway RAG Pipeline
python main.py rag-v2 # Terminal 4: RAG API (http://localhost:8765)echo "GOOGLE_API_KEY=your_gemini_key" > .env
docker-compose up -d
# Dashboard: http://localhost:5050| Service | URL |
|---|---|
| ๐ Main Dashboard | http://localhost:5050 |
| ๐ฑ Driver Dashboard | http://localhost:5050/driver/SHP_1 |
| ๐ Analytics | http://localhost:5050/analytics |
| ๐ SOP Editor | http://localhost:5050/sop-editor |
| ๐ RAG API | http://localhost:8765/v2/answer |
| โค๏ธ Health Check | http://localhost:5050/health |
curl -X POST http://localhost:8765/v2/answer \
-H "Content-Type: application/json" \
-d '{"prompt": "What should I do if dairy temperature exceeds 8ยฐC?"}'| Endpoint | Method | Description |
|---|---|---|
/api/shipments |
GET | All 25 active shipments with live state |
/api/alerts |
GET | Recent DIVERT + door open alerts (last 50) |
/api/metrics |
GET | System-wide KPIs (โน saved, COโ, diversions) |
/api/stream |
GET | Server-Sent Events for real-time updates |
/api/hubs |
GET | All 15 cold storage hubs with status |
/api/nearest-hubs/<id> |
GET | 3 nearest compatible hubs for a shipment |
/api/history/<id> |
GET | Temperature history + 30-min prediction |
/api/analytics |
GET | Carbon credits, anomaly breakdown, financials |
/api/anomalies |
GET | Global + per-shipment anomaly detection stats |
/api/notifications |
GET | All WhatsApp/email notification log |
/api/shipment-report/<id> |
GET | Full compliance report for a shipment |
/api/routing-mode/<id> |
GET/POST | Get or change routing mode (SAFETY/BALANCED/ECO) |
/api/sop-content |
GET/POST | Read or edit the SOP document |
/api/rag-query |
POST | Query SOP via RAG (with LLM fallback) |
/api/sop-status |
GET | SOP sync status (last modified, change count) |
hack/
โโโ main.py # ๐ฎ Unified CLI (rag, dashboard, mqtt, sim-all, etc.)
โ
โโโ pipeline/
โ โโโ livecold_pipeline.py # ๐ง Central brain: MQTT โ Anomaly โ Risk โ Decision โ Publish
โ
โโโ decision_engine/
โ โโโ evaluator.py # Orchestrator: risk โ diversion โ metrics
โ โโโ risk_model.py # Sigmoid P(spoilage) calculator
โ โโโ diversion_optimizer.py # Cost vs loss optimizer (supports cost/eco modes)
โ โโโ metrics_engine.py # โน saved, COโ delta, diversion rates
โ
โโโ anomaly_detector.py # ๐ก๏ธ 4-layer anomaly filter (267 lines of defense)
โโโ hub_manager.py # ๐ญ 15 hubs, traffic-aware ETA, temp-zone matching
โโโ alert_notifier.py # ๐ฒ WhatsApp + Email notifications (3 conditions)
โโโ sop_parser.py # ๐ Reads temp ranges from SOP (single source of truth)
โ
โโโ sim/
โ โโโ shipment_factory.py # ๐ญ 25 shipments with scripted demo scenarios
โ โโโ temp_simulator.py # ๐ก๏ธ Temperature with drift/stable/critical modes
โ โโโ gps_simulator.py # ๐ GPS with live diversion re-routing
โ โโโ reefer_simulator.py # โ๏ธ Compressor status, power draw, cycles
โ โโโ door_simulator.py # ๐ช Door open/close + shock events
โ โโโ config.py # 20 Indian cities, MQTT topics, intervals
โ
โโโ pathway_rag_pipeline_v2.py # ๐ Pathway streaming RAG (pw.io.fs.read + REST)
โโโ pathway_metrics_pipeline.py # ๐ Pathway metrics aggregation
โโโ pathway_integrated_full.py # ๐ Full integrated Pathway pipeline
โ
โ โโโ dashboard/
โ โ โโโ app.py # ๐ Flask server (1061 lines, 20+ routes)
โ โ โโโ templates/
โ โ โโโ index_1.html # Main dashboard (map + alerts + metrics)
โ โ โโโ driver_1.html # ๐ฑ Interactive mobile driver dashboard
โ โ โโโ analytics_1.html # ๐ Carbon credits + anomaly analytics
โ โ โโโ sop_editor_1.html # ๐ Live SOP editor + RAG tester
โ
โโโ watched_docs/
โ โโโ cold_chain_SOP.txt # ๐ SOP document (10 sections, 399 lines)
โ
โโโ Dockerfile # ๐ณ Multi-component Docker image
โโโ docker-compose.yml # Full stack with Mosquitto
โโโ requirements-slim.txt # Python dependencies
โโโ mosquitto.conf # MQTT broker config
| Variable | Default | Description |
|---|---|---|
GOOGLE_API_KEY |
โ | Primary Gemini API key (required for RAG) |
GOOGLE_API_KEY_2 |
โ | Backup API key (auto-rotates on rate limit) |
MQTT_HOST |
localhost |
MQTT broker hostname |
| Metric | Value |
|---|---|
| ๐ Active Shipments | 25 across 20 Indian cities |
| ๐ก๏ธ Sensor Events/Second | ~50 |
| ๐จ Diversions Triggered | 15-17 within first 30 seconds |
| ๐ก๏ธ Anomalies Filtered | ~9% of readings (zero false diversions) |
| ๐ฐ Cargo Value Monitored | โน2.71 Cr |
| ๐ฐ Cargo Saved | โน1.5+ Cr |
| ๐ฟ COโ Impact Tracked | 170+ kg |
| ๐ญ Hub Database | 15 hubs across India |
| ๐ฆ Product Types | 9 (Vaccines, Meat, Dairy, Seafood, Vegetables, Fruits, Pharma, Ice Cream, Flowers) |
| Layer | Technology |
|---|---|
| Real-Time Streaming | Pathway โ pw.io.fs.read, pw.io.http.rest_connector, UDFs |
| LLM | Google Gemini 2.5 Flash (via LiteLLM, with 5-model fallback chain) |
| Message Broker | Eclipse Mosquitto (MQTT) |
| Backend | Flask + paho-mqtt |
| Frontend | Leaflet.js (maps) + Server-Sent Events + Vanilla JS |
| Anomaly Detection | Custom 4-layer engine (physical bounds, rate-of-change, z-score, stuck sensor) |
| Risk Model | Sigmoid-based probability with exposure tracking |
| Containerization | Docker + Docker Compose |
- Math-based, not rule-based โ Sigmoid risk probability, not
IF temp > threshold - Smartest hub, not nearest โ Traffic-aware ETA with congestion zone modeling
- Actual re-routing โ GPS simulator physically moves trucks to diversion hubs
- Live document intelligence โ Edit the SOP, AI learns it in 2 seconds via Pathway streaming
- Zero false diversions โ 4-layer anomaly filter catches sensor glitches before the risk model
- Economically rational โ Every โน1 spent on diversion is justified by โน10+ in prevented loss
- Sustainable โ Carbon credits calculated for every prevented waste event
- Production-ready patterns โ Thread-safe state, API key rotation, rate-limit handling, dedup logic
Built for the Pathway Real-Time AI Hackathon โ demonstrating real-time streaming intelligence for India's cold chain logistics.
MIT