AI Powered Geospatial Disaster Intelligence · Himalayan Terrain
Real-time satellite and sensor fusion for forest fire, landslide, and flood prediction across Uttarakhand's fragile Himalayan ecosystem — driven by CNN, Random Forest, and XGBoost ensemble models with dynamic AI-powered geospatial risk mapping and early warning alerts.
// Data Ingestion
Multi-source geospatial data fusion from satellite constellations, ground sensors, meteorological networks, and topographic databases to drive comprehensive hazard intelligence.
// AI Pipeline Architecture
From raw satellite telemetry to actionable risk probability outputs — every stage engineered for speed, accuracy, and operational reliability in Himalayan terrain conditions.
// Hazard Intelligence
Specialized detection pipelines for each hazard type — calibrated on historical event data from Uttarakhand's disaster records (2000–2024) using TRINETRA's AI Powered Geospatial Disaster Intelligence engine.
// Feature Importance
SHAP-derived global feature importance averaged across the three-model ensemble.
// Risk Dashboard
// ML Models
Three complementary models fused via stacking meta-learner for robust geospatial disaster intelligence risk probability estimation.
// Source Code
""" TRINETRA — AI Powered Geospatial Disaster Intelligence Module: Data Ingestion from Satellite & Sensor Sources Author: TRINETRA Research Team """ import ee import geemap import numpy as np import pandas as pd import rasterio from rasterio.crs import CRS from datetime import datetime, timedelta import requests, json, os from pathlib import Path # Initialize Google Earth Engine ee.Initialize() # Uttarakhand bounding box UTTARAKHAND_BBOX = { 'lon_min': 77.5, 'lon_max': 81.1, 'lat_min': 28.7, 'lat_max': 31.5 } def get_study_region(): """Define Uttarakhand study region as GEE geometry.""" return ee.Geometry.Rectangle([ UTTARAKHAND_BBOX['lon_min'], UTTARAKHAND_BBOX['lat_min'], UTTARAKHAND_BBOX['lon_max'], UTTARAKHAND_BBOX['lat_max'] ]) class Sentinel2Ingester: """Ingests Sentinel-2 Level-2A multispectral imagery.""" def __init__(self, start_date: str, end_date: str, cloud_cover: float = 20): self.start_date = start_date self.end_date = end_date self.cloud_cover = cloud_cover self.region = get_study_region() def fetch_collection(self) -> ee.ImageCollection: """Fetch and filter Sentinel-2 collection.""" collection = ( ee.ImageCollection('COPERNICUS/S2_SR_HARMONIZED') .filterBounds(self.region) .filterDate(self.start_date, self.end_date) .filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', self.cloud_cover)) .select(['B2','B3','B4','B5','B6','B7', 'B8','B8A','B11','B12','SCL']) ) print(f"S2 images found: {collection.size().getInfo()}") return collection def apply_cloud_mask(self, image: ee.Image) -> ee.Image: """Apply Scene Classification Layer cloud masking.""" scl = image.select('SCL') clear = scl.neq(3).And(scl.neq(8)).And(scl.neq(9)).And(scl.neq(10)) return image.updateMask(clear).divide(10000) def get_median_composite(self) -> ee.Image: """Create cloud-free median mosaic.""" collection = self.fetch_collection() masked = collection.map(self.apply_cloud_mask) composite = masked.median().clip(self.region) print("Sentinel-2 median composite created.") return composite def export_to_drive(self, image: ee.Image, description: str): """Export GeoTIFF to Google Drive.""" task = ee.batch.Export.image.toDrive( image=image.toFloat(), description=description, folder='TRINETRA_Data', region=self.region, scale=10, crs='EPSG:32644', maxPixels=1e10 ) task.start() print(f"Export task started: {description}") return task class MODISFireIngester: """MODIS/VIIRS active fire product ingestion.""" def fetch_active_fires(self, start_date: str, end_date: str) -> pd.DataFrame: collection = ( ee.ImageCollection('FIRMS') .filterBounds(get_study_region()) .filterDate(start_date, end_date) ) fire_data = geemap.ee_to_pandas(collection.getRegion(get_study_region(), 1000)) fire_df = fire_data[fire_data['confidence'] >= 70] print(f"Active fire pixels: {len(fire_df)}") return fire_df class GPMRainfallIngester: """GPM IMERG daily rainfall ingestion.""" def fetch_rainfall(self, start_date: str, end_date: str) -> ee.Image: gpm = ( ee.ImageCollection('NASA/GPM_L3/IMERG_V06') .filterBounds(get_study_region()) .filterDate(start_date, end_date) .select('precipitationCal') ) cumulative_rain = gpm.sum().clip(get_study_region()) print("GPM cumulative rainfall computed.") return cumulative_rain class SRTMTerrainIngester: """SRTM Digital Elevation Model ingestion.""" def fetch_terrain(self) -> ee.Image: srtm = ee.Image('USGS/SRTMGL1_003') terrain = ee.Terrain.products(srtm) return terrain.select(['elevation','slope','aspect']).clip(get_study_region()) class SMAPSoilMoistureIngester: """NASA SMAP soil moisture ingestion.""" def fetch_soil_moisture(self, start_date: str, end_date: str) -> ee.Image: smap = ( ee.ImageCollection('NASA_USDA/HSL/SMAP10KM_soil_moisture') .filterBounds(get_study_region()) .filterDate(start_date, end_date) .select('ssm') ) return smap.mean().clip(get_study_region()) def run_ingestion_pipeline(start_date: str, end_date: str, output_dir: str): """Run complete data ingestion pipeline.""" Path(output_dir).mkdir(parents=True, exist_ok=True) print("\n=== TRINETRA AI Powered Geospatial Disaster Intelligence ===") s2 = Sentinel2Ingester(start_date, end_date, cloud_cover=15) composite = s2.get_median_composite() s2.export_to_drive(composite, f"S2_composite_{start_date}_{end_date}") gpm_data = GPMRainfallIngester().fetch_rainfall(start_date, end_date) terrain = SRTMTerrainIngester().fetch_terrain() soil_moisture = SMAPSoilMoistureIngester().fetch_soil_moisture(start_date, end_date) fires = MODISFireIngester().fetch_active_fires(start_date, end_date) fires.to_csv(f"{output_dir}/active_fires.csv", index=False) print("✓ Ingestion pipeline complete.") if __name__ == '__main__': run_ingestion_pipeline('2024-05-01', '2024-06-14', './data/raw')
""" TRINETRA — Feature Engineering Module Computes geospatial indices and hazard predictors. """ import numpy as np import rasterio from rasterio.warp import calculate_default_transform, reproject, Resampling import ee, geemap import pandas as pd from scipy.ndimage import uniform_filter, gaussian_filter class SpectralIndexCalculator: """Computes spectral vegetation and water indices from Sentinel-2.""" def compute_ndvi(self, nir: np.ndarray, red: np.ndarray) -> np.ndarray: """ NDVI = (NIR - Red) / (NIR + Red) Sentinel-2: NIR=B8, Red=B4 Range: [-1, 1]. Healthy vegetation > 0.4 """ ndvi = (nir - red) / (nir + red + 1e-8) return np.clip(ndvi, -1, 1) def compute_ndwi(self, green: np.ndarray, nir: np.ndarray) -> np.ndarray: """ NDWI = (Green - NIR) / (Green + NIR) Sentinel-2: Green=B3, NIR=B8 Positive values indicate water bodies. """ ndwi = (green - nir) / (green + nir + 1e-8) return np.clip(ndwi, -1, 1) def compute_nbr(self, nir: np.ndarray, swir2: np.ndarray) -> np.ndarray: """ NBR = (NIR - SWIR2) / (NIR + SWIR2) Sentinel-2: NIR=B8A, SWIR2=B12 Post-fire: significant NBR decrease (dNBR > 0.1 = burned) """ nbr = (nir - swir2) / (nir + swir2 + 1e-8) return np.clip(nbr, -1, 1) def compute_ndmi(self, nir: np.ndarray, swir1: np.ndarray) -> np.ndarray: """ NDMI (Moisture Index) = (NIR - SWIR1) / (NIR + SWIR1) Sentinel-2: NIR=B8, SWIR1=B11 Detects vegetation water stress (fire risk indicator). """ return (nir - swir1) / (nir + swir1 + 1e-8) def compute_bai(self, red: np.ndarray, nir: np.ndarray) -> np.ndarray: """Burn Area Index for fire scar detection.""" return 1.0 / ((red - 0.1)**2 + (nir - 0.06)**2 + 1e-8) class TerrainFeatureCalculator: """Computes slope-derived terrain features from DEM.""" def compute_slope(self, dem: np.ndarray, resolution_m: float = 30) -> np.ndarray: """Compute slope in degrees from DEM using gradient method.""" dy, dx = np.gradient(dem, resolution_m) slope_rad = np.arctan(np.sqrt(dx**2 + dy**2)) return np.degrees(slope_rad) def compute_aspect(self, dem: np.ndarray, resolution_m: float = 30) -> np.ndarray: """Compute aspect (degrees from North).""" dy, dx = np.gradient(dem, resolution_m) aspect = np.degrees(np.arctan2(dx, -dy)) return (aspect + 360) % 360 def compute_curvature(self, dem: np.ndarray) -> np.ndarray: """Second derivative curvature — negative = concave (runoff accumulation).""" laplacian = ( np.roll(dem, 1, axis=0) + np.roll(dem, -1, axis=0) + np.roll(dem, 1, axis=1) + np.roll(dem, -1, axis=1) - 4 * dem ) return laplacian def compute_twi(self, slope_deg: np.ndarray, flow_accum: np.ndarray) -> np.ndarray: """ Topographic Wetness Index = ln(flow_accumulation / tan(slope)) High TWI → prone to waterlogging → flood/landslide susceptibility. """ slope_rad = np.radians(np.clip(slope_deg, 0.1, 89)) twi = np.log(flow_accum / np.tan(slope_rad) + 1e-8) return twi class RainfallFeatureCalculator: """Derives rainfall-based hazard features from GPM IMERG.""" def antecedent_rainfall_index(self, rainfall_timeseries: np.ndarray, decay: float = 0.8) -> np.ndarray: """ Antecedent Rainfall Index (API): API_t = k * API_{t-1} + R_t (k = decay factor 0.85) High API = saturated soil = elevated landslide/flood risk. """ api = np.zeros_like(rainfall_timeseries) api[0] = rainfall_timeseries[0] for t in range(1, len(rainfall_timeseries)): api[t] = decay * api[t-1] + rainfall_timeseries[t] return api def compute_extreme_rainfall_index(self, daily_rain: np.ndarray, threshold_mm: float = 100) -> np.ndarray: """Flag extreme rainfall events exceeding IMD heavy-rain threshold.""" extreme = (daily_rain > threshold_mm).astype(np.float32) return gaussian_filter(extreme, sigma=2) def cumulative_72hr_rainfall(self, rainfall_3d: np.ndarray) -> np.ndarray: """Sum of last 72-hour rainfall — critical for landslide triggering.""" return np.sum(rainfall_3d[-3:, :, :], axis=0) class FeatureStackBuilder: """Assembles all features into a unified multi-band raster stack.""" def build_feature_stack(self, bands: dict) -> np.ndarray: """ bands dict keys: 's2': Sentinel-2 12-band array (H, W, 12) 'dem': Digital elevation model (H, W) 'slope': Slope in degrees (H, W) 'rainfall_72h': 72hr cumulative (H, W) 'soil_moisture': SMAP soil moisture (H, W) 'flow_accum': D8 flow accumulation (H, W) """ spec = SpectralIndexCalculator() terr = TerrainFeatureCalculator() rain = RainfallFeatureCalculator() b = bands['s2'] ndvi = spec.compute_ndvi(b[...,7], b[...,3]) ndwi = spec.compute_ndwi(b[...,2], b[...,7]) nbr = spec.compute_nbr(b[...,8], b[...,10]) ndmi = spec.compute_ndmi(b[...,7], b[...,9]) bai = spec.compute_bai(b[...,3], b[...,7]) curv = terr.compute_curvature(bands['dem']) aspect = terr.compute_aspect(bands['dem']) twi = terr.compute_twi(bands['slope'], bands['flow_accum']) exrain = rain.compute_extreme_rainfall_index(bands['rainfall_72h']) features = np.stack([ ndvi, ndwi, nbr, ndmi, bai, bands['dem'], bands['slope'], aspect, curv, twi, bands['rainfall_72h'], exrain, bands['soil_moisture'] ], axis=-1) print(f"Feature stack shape: {features.shape}") return features if __name__ == '__main__': # Example: load raster data and compute features with rasterio.open('./data/raw/S2_composite.tif') as src: s2_data = src.read().transpose(1, 2, 0).astype(np.float32) / 10000 builder = FeatureStackBuilder() print("Feature engineering complete.")
""" TRINETRA — U-Net CNN Hazard Segmentation Model Architecture: U-Net with ResNet encoder for geospatial disaster intelligence pixel classification. """ import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers, Model import numpy as np from sklearn.model_selection import train_test_split import os PATCH_SIZE = 256 # pixels N_CHANNELS = 13 # feature bands N_CLASSES = 4 # background + fire + flood + landslide BATCH_SIZE = 16 LEARNING_RATE = 1e-4 EPOCHS = 100 def conv_block(inputs, filters: int, dropout_rate: float = 0.1): """Double Conv-BN-ReLU block for U-Net.""" x = layers.Conv2D(filters, (3,3), padding='same')(inputs) x = layers.BatchNormalization()(x) x = layers.Activation('relu')(x) x = layers.Dropout(dropout_rate)(x) x = layers.Conv2D(filters, (3,3), padding='same')(x) x = layers.BatchNormalization()(x) x = layers.Activation('relu')(x) return x def encoder_block(inputs, filters: int): """Encoder block: conv + max pool.""" conv = conv_block(inputs, filters) pool = layers.MaxPooling2D((2,2))(conv) return conv, pool def decoder_block(inputs, skip, filters: int): """Decoder block: upsample + concat skip + conv.""" x = layers.Conv2DTranspose(filters, (2,2), strides=(2,2), padding='same')(inputs) x = layers.Concatenate()([x, skip]) x = conv_block(x, filters) return x def build_unet(input_shape=(256, 256, 13), n_classes: int = 4): """ Build U-Net architecture for geospatial disaster intelligence semantic segmentation. Input: (B, 256, 256, 13) — 13-band feature stack Output: (B, 256, 256, 4) — softmax class probabilities """ inputs = layers.Input(shape=input_shape) # Encoder path s1, p1 = encoder_block(inputs, 64) s2, p2 = encoder_block(p1, 128) s3, p3 = encoder_block(p2, 256) s4, p4 = encoder_block(p3, 512) # Bottleneck b = conv_block(p4, 1024, dropout_rate=0.3) # Decoder path (skip connections) d1 = decoder_block(b, s4, 512) d2 = decoder_block(d1, s3, 256) d3 = decoder_block(d2, s2, 128) d4 = decoder_block(d3, s1, 64) # Output segmentation head outputs = layers.Conv2D(n_classes, (1,1), activation='softmax')(d4) model = Model(inputs, outputs, name='TRINETRA_UNet') return model class HazardSegmentationTrainer: """Trains the U-Net model with spatial cross-validation.""" def __init__(self, model: Model): self.model = model self.history = None def compile_model(self): self.model.compile( optimizer=keras.optimizers.Adam(LEARNING_RATE), loss=self.combined_loss, metrics=[ keras.metrics.MeanIoU(num_classes=N_CLASSES), 'accuracy' ] ) def combined_loss(self, y_true, y_pred): """Dice + Focal loss for class-imbalanced hazard segmentation.""" # Focal loss (addresses class imbalance) focal = keras.losses.CategoricalFocalCrossentropy(gamma=2.0)(y_true, y_pred) # Dice loss smooth = 1e-6 intersection = tf.reduce_sum(y_true * y_pred, axis=[1,2]) dice = 1 - (2 * intersection + smooth) / ( tf.reduce_sum(y_true, axis=[1,2]) + tf.reduce_sum(y_pred, axis=[1,2]) + smooth ) dice_loss = tf.reduce_mean(dice) return 0.5 * focal + 0.5 * dice_loss def train(self, X_train, y_train, X_val, y_val, checkpoint_dir: str = './checkpoints'): os.makedirs(checkpoint_dir, exist_ok=True) callbacks = [ keras.callbacks.ModelCheckpoint( f"{checkpoint_dir}/best_unet.h5", monitor='val_mean_io_u', save_best_only=True, mode='max' ), keras.callbacks.EarlyStopping( monitor='val_loss', patience=15, restore_best_weights=True ), keras.callbacks.ReduceLROnPlateau( monitor='val_loss', factor=0.5, patience=7, min_lr=1e-7 ), keras.callbacks.TensorBoard(log_dir='./logs/unet') ] self.history = self.model.fit( X_train, y_train, validation_data=(X_val, y_val), epochs=EPOCHS, batch_size=BATCH_SIZE, callbacks=callbacks, class_weight={0:0.5, 1:3.0, 2:3.0, 3:4.0} ) return self.history def predict_risk_map(self, image_tiles: np.ndarray) -> np.ndarray: """Generate probability risk map from image tiles.""" probs = self.model.predict(image_tiles, batch_size=8) # probs shape: (N_tiles, H, W, 4) — background, fire, flood, landslide return probs if __name__ == '__main__': model = build_unet() model.summary() trainer = HazardSegmentationTrainer(model) trainer.compile_model() print("U-Net model ready for training.")
""" TRINETRA — Random Forest & XGBoost Hazard Classifiers Tabular geospatial features → multi-class hazard risk probability. """ import numpy as np import pandas as pd from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import StratifiedKFold, cross_validate from sklearn.preprocessing import StandardScaler from sklearn.calibration import CalibratedClassifierCV from sklearn.metrics import classification_report, roc_auc_score from imblearn.over_sampling import SMOTE import xgboost as xgb import optuna import shap, joblib, json FEATURE_NAMES = [ 'ndvi', 'ndwi', 'nbr', 'ndmi', 'bai', 'elevation', 'slope_deg', 'aspect_deg', 'curvature', 'twi', 'rainfall_72h', 'rainfall_extreme', 'soil_moisture', 'land_surface_temp', 'wind_speed', 'wind_direction', 'ndvi_diff_30d', 'nbr_diff_30d', 'ndwi_diff_7d', 'flow_accumulation', 'lithology_class', 'forest_type', 'dist_to_river_m', 'dist_to_road_m', 'population_density', 'api_7d', 'api_30d', 'vci' ] # Labels: 0=No hazard, 1=Fire, 2=Flood, 3=Landslide class RandomForestHazardClassifier: def __init__(self, n_estimators: int = 500): self.rf = RandomForestClassifier( n_estimators=n_estimators, max_depth=25, min_samples_split=5, min_samples_leaf=2, max_features='sqrt', class_weight='balanced_subsample', oob_score=True, random_state=42, n_jobs=-1 ) self.scaler = StandardScaler() self.calibrated_rf = None def preprocess(self, X: pd.DataFrame, fit: bool = True) -> np.ndarray: X_filled = X.fillna(X.median()) if fit: return self.scaler.fit_transform(X_filled) return self.scaler.transform(X_filled) def apply_smote(self, X: np.ndarray, y: np.ndarray): """SMOTE oversampling for rare hazard events.""" sm = SMOTE(sampling_strategy='not majority', random_state=42, k_neighbors=5) X_res, y_res = sm.fit_resample(X, y) print(f"Post-SMOTE: {dict(zip(*np.unique(y_res, return_counts=True)))}") return X_res, y_res def spatial_cross_validate(self, X: np.ndarray, y: np.ndarray, spatial_folds: list) -> dict: """5-fold spatial CV preserving geographic clusters.""" scores = {'f1':[], 'auc':[], 'precision':[], 'recall':[]} for fold_idx, (train_idx, test_idx) in enumerate(spatial_folds): X_tr, X_te = X[train_idx], X[test_idx] y_tr, y_te = y[train_idx], y[test_idx] X_tr_s, y_tr_s = self.apply_smote(X_tr, y_tr) self.rf.fit(X_tr_s, y_tr_s) probs = self.rf.predict_proba(X_te) preds = np.argmax(probs, axis=1) from sklearn.metrics import f1_score, precision_score, recall_score scores['f1'].append(f1_score(y_te, preds, average='weighted')) scores['auc'].append(roc_auc_score(y_te, probs, multi_class='ovr')) print(f" Fold {fold_idx+1}: F1={scores['f1'][-1]:.3f}, AUC={scores['auc'][-1]:.3f}") return {k: (np.mean(v), np.std(v)) for k,v in scores.items()} def train_and_calibrate(self, X_train, y_train): X_s, y_s = self.apply_smote(X_train, y_train) self.rf.fit(X_s, y_s) print(f"OOB Score: {self.rf.oob_score_:.3f}") self.calibrated_rf = CalibratedClassifierCV(self.rf, method='isotonic', cv='prefit') self.calibrated_rf.fit(X_train, y_train) def get_feature_importance(self) -> pd.DataFrame: imp = self.rf.feature_importances_ return pd.DataFrame({'feature': FEATURE_NAMES, 'importance': imp}) \ .sort_values('importance', ascending=False) def save(self, path: str): joblib.dump({'rf': self.rf, 'scaler': self.scaler, 'calibrated': self.calibrated_rf}, path) class XGBoostHazardClassifier: def __init__(self): self.model = None self.best_params = None def _objective(self, trial, X_train, y_train): """Optuna Bayesian hyperparameter optimization objective.""" params = { 'n_estimators': trial.suggest_int('n_estimators', 500, 2000), 'max_depth': trial.suggest_int('max_depth', 4, 12), 'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True), 'subsample': trial.suggest_float('subsample', 0.6, 1.0), 'colsample_bytree': trial.suggest_float('colsample_bytree', 0.5, 1.0), 'reg_alpha': trial.suggest_float('reg_alpha', 1e-5, 10, log=True), 'reg_lambda': trial.suggest_float('reg_lambda', 1e-5, 10, log=True), 'min_child_weight': trial.suggest_int('min_child_weight', 1, 10), 'objective': 'multi:softprob', 'num_class': 4, 'tree_method': 'gpu_hist', 'eval_metric': 'mlogloss', 'use_label_encoder': False, 'random_state': 42 } skf = StratifiedKFold(n_splits=3, shuffle=True, random_state=42) scores = [] for tr, va in skf.split(X_train, y_train): clf = xgb.XGBClassifier(**params) clf.fit(X_train[tr], y_train[tr], eval_set=[(X_train[va], y_train[va])], verbose=False, early_stopping_rounds=50) probs = clf.predict_proba(X_train[va]) scores.append(roc_auc_score(y_train[va], probs, multi_class='ovr')) return np.mean(scores) def optimize_hyperparameters(self, X_train, y_train, n_trials: int = 100): study = optuna.create_study(direction='maximize') study.optimize(lambda t: self._objective(t, X_train, y_train), n_trials=n_trials, show_progress_bar=True) self.best_params = study.best_params print(f"Best AUC: {study.best_value:.4f}") print(f"Best params: {json.dumps(self.best_params, indent=2)}") return self.best_params def train(self, X_train, y_train, X_val, y_val): if self.best_params is None: self.best_params = {'n_estimators':1200,'max_depth':8,'learning_rate':0.05} params = {**self.best_params, 'objective':'multi:softprob','num_class':4, 'tree_method':'gpu_hist','use_label_encoder':False} self.model = xgb.XGBClassifier(**params) self.model.fit(X_train, y_train, eval_set=[(X_val, y_val)], verbose=100, early_stopping_rounds=50) def shap_explain(self, X_sample: np.ndarray): """Generate SHAP feature attribution explanations.""" explainer = shap.TreeExplainer(self.model) shap_values = explainer.shap_values(X_sample) shap.summary_plot(shap_values, X_sample, feature_names=FEATURE_NAMES) return shap_values def predict_proba(self, X: np.ndarray) -> np.ndarray: return self.model.predict_proba(X) def save(self, path: str): self.model.save_model(path) if __name__ == '__main__': print("RF + XGBoost classifiers initialized.")
""" TRINETRA — Ensemble Fusion & Risk Probability Generation Stacking meta-learner combines CNN + RF + XGBoost outputs. """ import numpy as np import pandas as pd from sklearn.linear_model import LogisticRegression from sklearn.preprocessing import StandardScaler from sklearn.metrics import classification_report, roc_auc_score import rasterio from rasterio.transform import from_bounds import json, joblib from typing import Tuple HAZARD_CLASSES = {0: 'background', 1: 'fire', 2: 'flood', 3: 'landslide'} # Risk thresholds (probability) # L1: Monitor | L2: Watch | L3: Warning | L4: Emergency RISK_THRESHOLDS = { 'fire': {'L1':0.30, 'L2':0.50, 'L3':0.70, 'L4':0.85}, 'flood': {'L1':0.30, 'L2':0.50, 'L3':0.70, 'L4':0.85}, 'landslide': {'L1':0.25, 'L2':0.45, 'L3':0.65, 'L4':0.80}, } class EnsembleStackingClassifier: """ Stacking meta-learner that fuses CNN, RF, and XGBoost predictions. Level-0: CNN (spatial), RF (tabular), XGBoost (tabular) Level-1: Logistic Regression meta-learner on concatenated L0 probs """ def __init__(self): self.meta_learner = LogisticRegression( C=1.0, max_iter=500, multi_class='multinomial', solver='lbfgs' ) self.meta_scaler = StandardScaler() def build_meta_features(self, cnn_probs: np.ndarray, rf_probs: np.ndarray, xgb_probs: np.ndarray) -> np.ndarray: """ Concatenate L0 probability vectors into meta-feature matrix. cnn_probs: (N, 4) — CNN class probabilities rf_probs: (N, 4) — RF class probabilities xgb_probs: (N, 4) — XGBoost class probabilities Returns: (N, 12) meta-feature matrix """ meta_X = np.concatenate([cnn_probs, rf_probs, xgb_probs], axis=1) return meta_X def train_meta_learner(self, cnn_probs, rf_probs, xgb_probs, y_true: np.ndarray): meta_X = self.build_meta_features(cnn_probs, rf_probs, xgb_probs) meta_X_scaled = self.meta_scaler.fit_transform(meta_X) self.meta_learner.fit(meta_X_scaled, y_true) preds = self.meta_learner.predict(meta_X_scaled) print("Ensemble Meta-Learner Training Report:") print(classification_report(y_true, preds, target_names=list(HAZARD_CLASSES.values()))) def predict(self, cnn_probs, rf_probs, xgb_probs) -> Tuple[np.ndarray, np.ndarray]: meta_X = self.build_meta_features(cnn_probs, rf_probs, xgb_probs) meta_X_scaled = self.meta_scaler.transform(meta_X) final_probs = self.meta_learner.predict_proba(meta_X_scaled) final_class = np.argmax(final_probs, axis=1) return final_class, final_probs def save(self, path: str): joblib.dump({'meta': self.meta_learner, 'scaler': self.meta_scaler}, path) class RiskProbabilityMapper: """Generates geospatial risk probability output maps.""" def classify_alert_level(self, prob: float, hazard_type: str) -> str: thresholds = RISK_THRESHOLDS[hazard_type] if prob >= thresholds['L4']: return 'L4-EMERGENCY' elif prob >= thresholds['L3']: return 'L3-WARNING' elif prob >= thresholds['L2']: return 'L2-WATCH' elif prob >= thresholds['L1']: return 'L1-MONITOR' else: return 'NOMINAL' def generate_risk_output(self, final_probs: np.ndarray, pixel_coords: list, district_name: str) -> dict: """Generate structured risk probability output for a district.""" fire_prob = float(np.mean(final_probs[:, 1])) flood_prob = float(np.mean(final_probs[:, 2])) landslide_prob = float(np.mean(final_probs[:, 3])) output = { 'district': district_name, 'timestamp': pd.Timestamp.now().isoformat(), 'hazard_probabilities': { 'fire': {'probability': round(fire_prob, 4), 'alert_level': self.classify_alert_level(fire_prob, 'fire')}, 'flood': {'probability': round(flood_prob, 4), 'alert_level': self.classify_alert_level(flood_prob, 'flood')}, 'landslide': {'probability': round(landslide_prob, 4), 'alert_level': self.classify_alert_level(landslide_prob, 'landslide')}, }, 'overall_risk': round(max(fire_prob, flood_prob, landslide_prob), 4), 'pixel_count': len(pixel_coords), 'model_version': 'TRINETRA-v1.0' } print(json.dumps(output, indent=2)) return output def export_risk_geotiff(self, prob_array: np.ndarray, transform, crs, output_path: str): """Export risk probability raster as GeoTIFF.""" with rasterio.open(output_path, 'w', driver='GTiff', height=prob_array.shape[1], width=prob_array.shape[2], count=3, dtype=np.float32, crs=crs, transform=transform, compress='lzw') as dst: dst.write(prob_array[1], 1) # fire probability dst.write(prob_array[2], 2) # flood probability dst.write(prob_array[3], 3) # landslide probability dst.update_tags( model='TRINETRA-v1.0', bands='1=fire,2=flood,3=landslide' ) print(f"Risk GeoTIFF saved: {output_path}")
""" TRINETRA — FastAPI REST Server Serves real-time hazard risk predictions and GeoJSON risk maps. """ from fastapi import FastAPI, HTTPException, BackgroundTasks, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, FileResponse from pydantic import BaseModel from typing import Optional, List import numpy as np import joblib, json, redis from datetime import datetime import asyncio, uvicorn app = FastAPI( title="TRINETRA API", description="TRINETRA — AI Powered Geospatial Disaster Intelligence for Uttarakhand", version="1.0.0" ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Redis cache for risk map tiles cache = redis.Redis(host='localhost', port=6379, db=0) CACHE_TTL = 900 # 15 minutes # Load models at startup models = {} @app.on_event("startup") async def load_models(): models['rf'] = joblib.load('./models/rf_hazard.pkl') models['xgb'] = joblib.load('./models/xgb_hazard.json') models['ensemble'] = joblib.load('./models/ensemble_meta.pkl') print("✓ All models loaded.") class PredictionRequest(BaseModel): district: str ndvi: float ndwi: float nbr: float slope_deg: float elevation_m: float rainfall_72h: float soil_moisture: float twi: float wind_speed: Optional[float] = 0.0 land_surface_temp: Optional[float] = 25.0 class HazardAlert(BaseModel): district: str hazard_type: str probability: float alert_level: str timestamp: str coordinates: Optional[List[float]] = None @app.post("/v1/predict", response_model=dict) async def predict_hazard(request: PredictionRequest, bg: BackgroundTasks): """ Single-point hazard risk prediction. Returns fire, flood, and landslide probabilities with alert levels. """ cache_key = f"pred:{request.district}:{request.rainfall_72h}" cached = cache.get(cache_key) if cached: return json.loads(cached) features = np.array([[ request.ndvi, request.ndwi, request.nbr, 0, 0, request.elevation_m, request.slope_deg, 180, 0, request.twi, request.rainfall_72h, float(request.rainfall_72h > 100), request.soil_moisture, request.land_surface_temp, request.wind_speed, 270, 0, 0, 0, 100, 1, 2, 500, 200, 150, request.rainfall_72h * 0.85, request.rainfall_72h * 0.70, 0.6 ]]) rf_probs = models['rf']['calibrated'].predict_proba(features) xgb_probs = models['xgb'].predict_proba(features) cnn_probs = np.array([[0.4, 0.2, 0.2, 0.2]]) # CNN tile (placeholder) meta_X = np.concatenate([cnn_probs, rf_probs, xgb_probs], axis=1) meta_X_scaled = models['ensemble']['scaler'].transform(meta_X) final_probs = models['ensemble']['meta'].predict_proba(meta_X_scaled)[0] def alert_level(p, hz): t = {'fire':{'L1':0.3,'L2':0.5,'L3':0.7,'L4':0.85}, 'flood':{'L1':0.3,'L2':0.5,'L3':0.7,'L4':0.85}, 'landslide':{'L1':0.25,'L2':0.45,'L3':0.65,'L4':0.8}} for lv in ['L4','L3','L2','L1']: if p >= t[hz][lv]: return lv return 'NOMINAL' result = { 'district': request.district, 'timestamp': datetime.utcnow().isoformat(), 'hazard_probabilities': { 'fire': {'probability': round(float(final_probs[1]),4), 'alert_level': alert_level(float(final_probs[1]),'fire')}, 'flood': {'probability': round(float(final_probs[2]),4), 'alert_level': alert_level(float(final_probs[2]),'flood')}, 'landslide': {'probability': round(float(final_probs[3]),4), 'alert_level': alert_level(float(final_probs[3]),'landslide')}, }, 'model_version': 'TRINETRA-v1.0' } cache.setex(cache_key, CACHE_TTL, json.dumps(result)) bg.add_task(check_and_dispatch_alerts, result) return result @app.get("/v1/riskmap/{district}") async def get_risk_map(district: str, hazard: str = Query('all')): """Return GeoJSON risk map for a district.""" try: with open(f"./outputs/riskmap_{district}.geojson") as f: geojson = json.load(f) return JSONResponse(content=geojson) except FileNotFoundError: raise HTTPException(404, f"Risk map not found for {district}") @app.get("/v1/status") async def system_status(): return { 'status': 'operational', 'models_loaded': list(models.keys()), 'cache_connected': cache.ping(), 'uptime': 'system nominal' } async def check_and_dispatch_alerts(result: dict): """Dispatch alerts if risk thresholds are exceeded.""" from alert_system import AlertDispatcher dispatcher = AlertDispatcher() for hz, data in result['hazard_probabilities'].items(): if data['alert_level'] in ['L3', 'L4']: await dispatcher.dispatch( district=result['district'], hazard_type=hz, level=data['alert_level'], probability=data['probability'] ) if __name__ == '__main__': uvicorn.run(app, host='0.0.0.0', port=8000, reload=False)
""" TRINETRA — Early Warning Alert Dispatch System Multi-channel alerts: SMS, IVR, Email, Webhook to NDRF/SDRF. """ import asyncio, aiohttp, json from datetime import datetime from twilio.rest import Client as TwilioClient from typing import List import smtplib, logging from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart logger = logging.getLogger('trinetra.alerts') # District emergency contacts (NDRF, SDRF, DM offices) DISTRICT_CONTACTS = { 'chamoli': { 'dm_phone': '+911362242209', 'ndrf_phone': '+919557844829', 'sdrf_phone': '+911352712006', 'email': ['[email protected]', '[email protected]'], 'population_at_risk': 412920 }, 'rudraprayag': { 'dm_phone': '+911364233522', 'ndrf_phone': '+919557844829', 'sdrf_phone': '+911352712006', 'email': ['[email protected]'], 'population_at_risk': 242285 }, } ALERT_TEMPLATES = { 'fire': { 'L3': "⚠️ FOREST FIRE WARNING — {district}. Risk probability: {prob:.0%}. " "Deploy fire response teams. Initiate community evacuation advisory.", 'L4': "🔴 CRITICAL FIRE ALERT — {district}. Risk: {prob:.0%}. " "IMMEDIATE evacuation required. Deploy all available resources." }, 'flood': { 'L3': "⚠️ FLOOD WARNING — {district}. Risk probability: {prob:.0%}. " "Alert communities along river corridors. Open relief camps.", 'L4': "🔴 CRITICAL FLOOD ALERT — {district}. Risk: {prob:.0%}. " "FLASH FLOOD IMMINENT. Immediate evacuation of low-lying areas." }, 'landslide': { 'L3': "⚠️ LANDSLIDE WARNING — {district}. Risk: {prob:.0%}. " "Close mountain highways. Alert hill communities.", 'L4': "🔴 CRITICAL LANDSLIDE — {district}. Risk: {prob:.0%}. " "IMMEDIATE EVACUATION of vulnerable slopes. Close all mountain roads." } } class AlertDispatcher: """Dispatches multi-channel emergency alerts.""" def __init__(self): import os self.twilio = TwilioClient( os.environ['TWILIO_ACCOUNT_SID'], os.environ['TWILIO_AUTH_TOKEN'] ) self.from_number = os.environ['TWILIO_PHONE'] self.webhook_urls = json.loads(os.environ.get('ALERT_WEBHOOKS', '[]')) self.smtp_server = os.environ.get('SMTP_SERVER', 'smtp.gmail.com') self.smtp_port = int(os.environ.get('SMTP_PORT', 587)) self.smtp_user = os.environ.get('SMTP_USER') self.smtp_pass = os.environ.get('SMTP_PASS') async def dispatch(self, district: str, hazard_type: str, level: str, probability: float): """Dispatch alert via all channels concurrently.""" msg = ALERT_TEMPLATES.get(hazard_type, {}).get(level, "HAZARD ALERT").format( district=district.title(), prob=probability ) contacts = DISTRICT_CONTACTS.get(district.lower(), {}) tasks = [ self._send_sms(contacts.get('dm_phone'), msg), self._send_sms(contacts.get('ndrf_phone'), msg), self._send_sms(contacts.get('sdrf_phone'), msg), self._send_email(contacts.get('email', []), msg, district, hazard_type, probability), self._send_webhooks(district, hazard_type, level, probability), self._log_alert(district, hazard_type, level, probability) ] results = await asyncio.gather(*tasks, return_exceptions=True) logger.info(f"Alert dispatched: {district}/{hazard_type}/{level}. " f"Channels: {sum(r is True for r in results)}/5 successful.") async def _send_sms(self, phone: str, message: str) -> bool: if not phone: return False try: self.twilio.messages.create(body=message, from_=self.from_number, to=phone) logger.info(f"SMS sent to {phone}") return True except Exception as e: logger.error(f"SMS failed {phone}: {e}") return False async def _send_email(self, recipients: List[str], message: str, district: str, hazard: str, prob: float) -> bool: if not recipients: return False try: msg = MIMEMultipart('alternative') msg['Subject'] = f"[TRINETRA] {hazard.upper()} Alert — {district.title()}" msg['From'] = self.smtp_user msg['To'] = ', '.join(recipients) body = f"""TRINETRA Emergency Alert
District: {district.title()}
Hazard: {hazard.upper()}
Risk Probability: {prob:.1%}
Timestamp: {datetime.utcnow().isoformat()} UTC
{message}
TRINETRA AI — AI Powered Geospatial Disaster Intelligence
""" msg.attach(MIMEText(body, 'html')) with smtplib.SMTP(self.smtp_server, self.smtp_port) as server: server.starttls() server.login(self.smtp_user, self.smtp_pass) server.sendmail(self.smtp_user, recipients, msg.as_string()) return True except Exception as e: logger.error(f"Email failed: {e}") return False async def _send_webhooks(self, district, hazard, level, prob) -> bool: payload = { 'source': 'TRINETRA-v1.0', 'district': district, 'hazard': hazard, 'level': level, 'probability': prob, 'timestamp': datetime.utcnow().isoformat() } async with aiohttp.ClientSession() as session: for url in self.webhook_urls: try: async with session.post(url, json=payload, timeout=10) as r: logger.info(f"Webhook {url}: {r.status}") except: logger.warning(f"Webhook failed: {url}") return True async def _log_alert(self, district, hazard, level, prob) -> bool: import aiofiles log_entry = {'ts': datetime.utcnow().isoformat(), 'district': district, 'hazard': hazard, 'level': level, 'prob': prob} async with aiofiles.open('./logs/alert_log.jsonl', 'a') as f: await f.write(json.dumps(log_entry) + '\n') return True
// Alert Classification
Tiered alert architecture aligned with NDMA (National Disaster Management Authority) standard protocols.
// Technical Documentation