# Data Flow Architecture Understanding how data flows through the LocalGreenChain system. ## System Overview ``` ┌─────────────────────────────────────────────────────────────────────┐ │ LOCALGREENCHAIN DATA FLOW │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ INPUTS PROCESSING OUTPUTS │ │ ────── ────────── ─────── │ │ │ │ ┌─────────┐ ┌─────────────┐ ┌──────────┐ │ │ │Consumer │──preferences─│ Demand │──signals───→│ Planting │ │ │ │Prefs │ │ Forecaster │ │Recommends│ │ │ └─────────┘ └─────────────┘ └──────────┘ │ │ │ │ │ ┌─────────┐ ┌──────▼──────┐ ┌──────────┐ │ │ │Transport│──events─────→│ Transport │──journeys──→│ QR │ │ │ │Events │ │ Chain │ │ Codes │ │ │ └─────────┘ └─────────────┘ └──────────┘ │ │ │ │ │ ┌─────────┐ ┌──────▼──────┐ ┌──────────┐ │ │ │Vertical │──readings───→│ VF │──analytics─→│Dashboard │ │ │ │Farm IoT │ │ Controller │ │ │ │ │ └─────────┘ └─────────────┘ └──────────┘ │ │ │ │ │ ┌──────▼──────┐ │ │ │ Plant │ │ │ │ Blockchain │ │ │ └─────────────┘ │ │ │ │ │ ┌──────▼──────┐ │ │ │ JSON/DB │ │ │ │ Storage │ │ │ └─────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘ ``` ## Data Flow Patterns ### 1. Consumer Preference Flow ``` Consumer → Web UI → API → DemandForecaster → Storage │ │ │ ▼ │ Aggregation │ │ │ ▼ └────────────────── Demand Signal │ ▼ Planting Recommendations │ ▼ Grower Dashboard ``` ### 2. Transport Event Flow ``` Grower/Transporter │ ▼ API Request │ ├──→ Validation │ ├──→ Distance Calculation (Haversine) │ ├──→ Carbon Calculation │ ├──→ TransportChain.recordEvent() │ │ │ ├──→ Create Block │ ├──→ Mine Block (PoW) │ ├──→ Add to Chain │ └──→ Index Event │ └──→ Response with Block Hash ``` ### 3. Vertical Farm Data Flow ``` IoT Sensors │ ├──→ Temperature ├──→ Humidity ├──→ CO2 ├──→ Light ├──→ EC/pH │ ▼ API Endpoint (/api/vertical-farm/batch/{id}/environment) │ ▼ VerticalFarmController.recordEnvironment() │ ├──→ Compare to Targets │ │ │ ▼ │ Generate Alerts (if out of range) │ ├──→ Update Batch Health Score │ └──→ Store in Environment Log │ ▼ Analytics Generation ``` ## Data Transformations ### Preference → Demand Signal ```typescript // Input: Individual preferences const preferences: ConsumerPreference[] = [ { consumerId: "A", preferredItems: [{ produceType: "tomato", weeklyQuantity: 2 }] }, { consumerId: "B", preferredItems: [{ produceType: "tomato", weeklyQuantity: 1 }] }, { consumerId: "C", preferredItems: [{ produceType: "tomato", weeklyQuantity: 3 }] } ]; // Transformation: Aggregate by region // Output: Demand Signal const signal: DemandSignal = { region: { name: "Brooklyn", radiusKm: 25 }, demandItems: [ { produceType: "tomato", weeklyDemandKg: 6, // Sum of individual demands consumerCount: 3, aggregatePriority: 8.5 // Weighted average } ] }; ``` ### Transport Events → Plant Journey ```typescript // Input: Individual events const events: TransportEvent[] = [ { eventType: "seed_acquisition", plantId: "P1", distanceKm: 50 }, { eventType: "planting", plantId: "P1", distanceKm: 0 }, { eventType: "growing_transport", plantId: "P1", distanceKm: 5 }, { eventType: "harvest", plantId: "P1", distanceKm: 0 }, { eventType: "distribution", plantId: "P1", distanceKm: 20 } ]; // Transformation: Aggregate for plant // Output: Plant Journey const journey: PlantJourney = { plantId: "P1", events: events, totalFoodMiles: 75, // Sum of distances totalCarbonKg: 3.5, // Sum of carbon currentStage: "post_harvest" }; ``` ## Storage Architecture ### File-Based Storage (Default) ``` data/ ├── plantchain.json # Main plant blockchain │ └── { difficulty, chain: Block[] } │ ├── transport.json # Transport chain │ └── { difficulty, chain: TransportBlock[] } │ ├── demand.json # Demand data │ └── { │ preferences: Map, │ signals: Map, │ supply: Map, │ matches: Map │ } │ └── vertical-farms.json # VF data └── { farms: Map, recipes: Map, batches: Map } ``` ### Database Storage (Production) ```sql -- PostgreSQL schema (conceptual) CREATE TABLE blocks ( index INTEGER PRIMARY KEY, timestamp TIMESTAMPTZ, data JSONB, previous_hash VARCHAR(64), hash VARCHAR(64), nonce INTEGER ); CREATE TABLE plants ( id VARCHAR(36) PRIMARY KEY, common_name VARCHAR(255), scientific_name VARCHAR(255), location GEOGRAPHY(POINT), owner_id VARCHAR(36), generation INTEGER, parent_id VARCHAR(36), block_index INTEGER REFERENCES blocks(index) ); CREATE TABLE transport_events ( id VARCHAR(36) PRIMARY KEY, event_type VARCHAR(50), plant_ids VARCHAR(36)[], from_location GEOGRAPHY(POINT), to_location GEOGRAPHY(POINT), distance_km DECIMAL, carbon_kg DECIMAL, block_index INTEGER ); ``` ## Caching Strategy ### Cache Layers ``` Request │ ▼ ┌─────────────────┐ │ API Response │ TTL: 60s (dynamic data) │ Cache │ TTL: 3600s (static data) └────────┬────────┘ │ miss ▼ ┌─────────────────┐ │ Query Cache │ Pre-computed queries │ (Redis) │ Hot data in memory └────────┬────────┘ │ miss ▼ ┌─────────────────┐ │ Data Store │ JSON files or Database │ │ └─────────────────┘ ``` ### Cached Data | Data Type | Cache TTL | Reason | |-----------|-----------|--------| | Plant details | 1 hour | Rarely changes | | Transport journey | 5 minutes | Updates on events | | Demand signals | 1 hour | Recalculated periodically | | Farm analytics | 15 minutes | Computed from batches | | Recipes | 24 hours | Static data | ## Event-Driven Updates ### Event Types ```typescript type SystemEvent = | { type: "plant.registered"; plantId: string } | { type: "transport.recorded"; blockHash: string } | { type: "batch.started"; batchId: string } | { type: "alert.triggered"; zoneId: string; alert: EnvironmentAlert } | { type: "harvest.completed"; batchId: string } | { type: "supply.matched"; matchId: string }; ``` ### Event Flow ``` Event Source → Event Bus → Event Handlers │ ├──→ Cache Invalidation ├──→ Webhook Dispatch ├──→ Real-time UI Update └──→ Analytics Update ``` ## API Data Flow ### Request Lifecycle ``` Client Request │ ▼ ┌─────────────┐ │ Router │ Match endpoint └──────┬──────┘ │ ▼ ┌─────────────┐ │ Middleware │ Auth, validation, logging └──────┬──────┘ │ ▼ ┌─────────────┐ │ Handler │ Business logic └──────┬──────┘ │ ├──→ Service Layer (Forecaster, Controller, Chain) │ ├──→ Data Layer (Storage, Cache) │ ▼ ┌─────────────┐ │ Response │ Format JSON, headers └──────┬──────┘ │ ▼ Client Response ``` ## Real-Time Data ### WebSocket Connections ``` Client ←──WebSocket──→ Server │ ├── Environment readings ├── Alert notifications ├── Batch progress updates └── Demand signal changes ``` ### Polling Fallback ```typescript // If WebSocket unavailable const POLL_INTERVALS = { environment: 30_000, // 30 seconds batchProgress: 60_000, // 1 minute demandSignals: 300_000, // 5 minutes analytics: 900_000 // 15 minutes }; ``` ## Data Integrity ### Validation Points 1. **API Input** - Schema validation (JSON Schema/Zod) 2. **Business Logic** - Domain rules enforcement 3. **Blockchain** - Hash verification 4. **Storage** - Type checking ### Consistency Guarantees | Operation | Guarantee | |-----------|-----------| | Block creation | Atomic (single file write) | | Batch operations | Transaction-like (rollback on error) | | Cross-chain refs | Eventually consistent | | Cache updates | Best effort (can stale) |