A production-ready data pipeline for processing environmental sensor data (temperature, humidity, pressure, vibration, voltage) with real-time anomaly detection, multi-stage transformation, and Databricks Unity Catalog integration.
For complete setup and demo instructions, see: MASTER_SETUP_AND_DEMO.md
This is the single, authoritative guide that will take you from zero to a fully working demo.
.
βββ MASTER_SETUP_AND_DEMO.md # β START HERE - Complete setup guide
βββ .env.example # Environment configuration template
βββ databricks-notebooks/ # Databricks AutoLoader notebooks
β βββ setup-and-run-autoloader.py # Main AutoLoader notebook
βββ scripts/ # Automation and utility scripts
β βββ validate-env.sh # Validate environment configuration
β βββ create-all-buckets.sh # Create S3 buckets
β βββ seed-buckets-for-autoloader.py # Seed buckets with sample data
β βββ fix-external-locations-individual.py # Fix external location URLs
β βββ turbo-delete-buckets.py # Fast bucket deletion
β βββ clean-all-data.sh # Clean bucket contents
β βββ start-environmental-sensor.sh # Start environmental sensor
β βββ run-anomaly-demo.sh # Run complete demo
βββ docs/ # Additional documentation
β βββ DEVELOPMENT_RULES.md
β βββ ENVIRONMENT_SETUP.md
β βββ QUICK_START_CHECKLIST.md
βββ jobs/ # Expanso job specifications
β βββ edge-processing-job.yaml
βββ spot/ # Edge node deployment files
βββ instance-files/ # Files deployed to edge nodes
βββ etc/ # Configuration files
βββ opt/ # Service files and scripts
βββ setup.sh # Node setup script
- Edge-First Architecture: Distributed data processing using Expanso Edge nodes
- Anomaly Detection: Physics-based validation for wind turbine data at the edge
- Real-Time Processing: Streaming ingestion with Databricks AutoLoader
- Schema Validation: Automatic JSON schema validation at the edge
- Unity Catalog: Enterprise governance and data management
- Containerized Edge Services: Docker-based sensors running on edge nodes
graph LR
A[Edge Node] --> B[Sensor]
B --> C[SQLite]
C --> D[Expanso Edge]
D --> E{Validator}
E -->|Valid| F[Validated S3]
E -->|Invalid| G[Anomalies S3]
F --> H[AutoLoader]
G --> H
H --> I[Databricks]
I --> J[Unity Catalog]
- Raw Data: All sensor readings as received at edge nodes
- Validated Data: Readings that pass physics validation at the edge
- Anomalies: Readings that violate physics rules detected at the edge
- Schematized Data: Structured with enforced schema in Databricks
- Aggregated Data: Analytics-ready summaries in Databricks
The system detects anomalies in environmental sensor data:
- Temperature anomalies: Values outside -20Β°C to 60Β°C range
- Humidity anomalies: Values outside 5% to 95% range
- Pressure anomalies: Values outside 950-1050 hPa range
- Vibration anomalies: Values exceeding 10 mm/sΒ²
- Voltage anomalies: Values outside 20-25V range
- Sensor-flagged anomalies: Records with anomaly_flag = 1 from sensor
- Python 3.11+
- Docker
- AWS Account with S3 access
- Databricks Workspace with Unity Catalog enabled
uvpackage manager (pip install uv)- Expanso CLI (latest version)
- Clone and Configure:
# Clone the repository
cd aerolake- Validate Configuration:
./scripts/validate-env.sh- Create S3 Buckets:
./scripts/create-all-buckets.shThis creates all required buckets:
expanso-raw-data-{region}expanso-validated-data-{region}expanso-anomalies-data-{region}expanso-schematized-data-{region}expanso-aggregated-data-{region}expanso-checkpoints-{region}expanso-metadata-{region}
- Setup IAM Role:
./scripts/create-databricks-iam-role.sh
./scripts/update-iam-role-for-new-buckets.sh- Setup Unity Catalog:
cd scripts
uv run -s setup-unity-catalog-storage.py- Fix External Locations (Critical!):
# External locations may have wrong URLs - fix them
uv run -s fix-external-locations-individual.py- Seed Buckets with Sample Data:
# AutoLoader needs sample files to infer schemas
uv run -s seed-buckets-for-autoloader.py- Upload and Run AutoLoader Notebook:
uv run -s upload-and-run-notebook.py \
--notebook ../databricks-notebooks/setup-and-run-autoloader.py- Start Environmental Sensor:
# Normal sensor (no anomalies)
./scripts/start-environmental-sensor.sh 300
# With anomalies (25% probability)
./scripts/start-environmental-sensor.sh 300 --with-anomalies-
Monitor Processing:
- Open Databricks workspace
- Navigate to the uploaded notebook
- Watch as data flows through all 5 pipeline stages
- View anomalies being detected and routed
-
Query Results:
-- View ingested data
SELECT * FROM expanso_catalog.sensor_data.sensor_readings_ingestion;
-- View detected anomalies
SELECT * FROM expanso_catalog.sensor_data.sensor_readings_anomalies;
-- View aggregated metrics
SELECT * FROM expanso_catalog.sensor_data.sensor_readings_aggregated;-
AutoLoader Schema Inference Error:
- Cause: Empty buckets
- Solution: Run
scripts/seed-buckets-for-autoloader.py
-
UNAUTHORIZED_ACCESS Error:
- Cause: External locations pointing to wrong bucket URLs
- Solution: Run
scripts/fix-external-locations-individual.py
-
Permission Denied on External Locations:
- Cause: IAM role not updated for new buckets
- Solution: Run
scripts/update-iam-role-for-new-buckets.sh
-
No Data Flowing:
- Cause: Checkpoints preventing reprocessing
- Solution: Clean checkpoints with
scripts/clean-all-data.sh
# Check bucket structure
./scripts/check-bucket-structure.sh
# Verify Databricks setup
cd scripts && uv run -s verify-databricks-setup.py
# List external locations
uv run --with databricks-sdk --with python-dotenv python3 -c "
from databricks.sdk import WorkspaceClient
from dotenv import load_dotenv
import os
from pathlib import Path
load_dotenv(Path('.').parent / '.env')
w = WorkspaceClient(host=os.getenv('DATABRICKS_HOST'), token=os.getenv('DATABRICKS_TOKEN'))
print('External Locations:')
for loc in w.external_locations.list():
if 'expanso' in loc.name.lower():
print(f' {loc.name}: {loc.url}')
"- Ingestion Rate: Files/second being processed
- Anomaly Rate: Percentage of readings flagged
- Processing Latency: Time from sensor to Unity Catalog
- Schema Evolution: New columns being added
-- Anomaly detection rate
SELECT
DATE(processing_timestamp) as date,
COUNT(*) as anomaly_count,
AVG(wind_speed) as avg_wind_speed,
AVG(power_output) as avg_power
FROM expanso_catalog.sensor_data.sensor_readings_anomalies
GROUP BY DATE(processing_timestamp)
ORDER BY date DESC;
-- Pipeline throughput
SELECT
stage,
COUNT(*) as record_count,
MAX(processing_timestamp) as last_update
FROM (
SELECT 'ingestion' as stage, processing_timestamp
FROM expanso_catalog.sensor_data.sensor_readings_ingestion
UNION ALL
SELECT 'validated' as stage, processing_timestamp
FROM expanso_catalog.sensor_data.sensor_readings_validated
UNION ALL
SELECT 'anomalies' as stage, processing_timestamp
FROM expanso_catalog.sensor_data.sensor_readings_anomalies
)
GROUP BY stage;To completely clean up all resources:
# Delete all S3 buckets and contents
./scripts/turbo-delete-buckets.py
# Clean Unity Catalog objects
cd scripts && uv run -s cleanup-unity-catalog.py
# Remove local artifacts
rm -rf .flox/run .env *.db *.log- Development Rules - Coding standards and best practices
- Environment Setup Guide - Detailed environment configuration
- Quick Start Checklist - Step-by-step verification
- Master Setup Guide - Complete walkthrough
- Follow the coding standards in
docs/DEVELOPMENT_RULES.md - Always use
uvfor Python scripts - Test with both normal and anomaly data
- Update documentation for any new features
MIT License - See LICENSE file for details
For issues or questions:
- Check the Master Setup Guide
- Review troubleshooting section above
- Open an issue with:
- Environment details (
.envwithout secrets) - Error messages and logs
- Steps to reproduce
- Environment details (
You know the pipeline is working when:
- β All 5 S3 buckets have data flowing
- β Unity Catalog tables are populated
- β Anomalies are being detected and routed
- β AutoLoader is processing files continuously
- β No errors in Databricks notebook execution