Getting Started with Layer 1
Get your energy data flowing through the Qubit Foundation infrastructure in just a few steps. This guide walks you through connecting your first device and processing energy data.
Prerequisites
This guide assumes you have basic familiarity with energy systems and Python programming. For production deployments, consult with your system administrator.
System Requirements
Python 3.8+ or Node.js 16+
Git for cloning repositories
Docker (optional, for containerized deployment)
Access to energy device or historical data
Network Access
MQTT broker connectivity (port 1883/8883)
OCPP WebSocket endpoints (port 80/443)
Modbus TCP connectivity (port 502)
Outbound HTTPS for schema validation
Quick Start: 5-Minute Setup
1. Install Qubit Components
# Install schemas and connectors
pip install qubit-energy-schemas qubit-energy-connectors[mqtt]
# Clone examples
git clone https://github.com/qubit-foundation/qubit-energy-schemas.git
cd qubit-energy-schemas
2. Choose Your Data Source
Demo MQTT Data Your MQTT Broker Historical CSV Data Use our live demo data to get started immediately: from qubit.connectors.mqtt import MQTTConnector
import asyncio
# Demo configuration
config = {
"broker" : "mqtt://demo.qubit.energy:1883" ,
"topics" : [ "demo/solar/+/+/readings" ],
"client_id" : "demo_client_001"
}
async def handle_demo_data ( data ):
print ( f "Demo data: { data } " )
return data
async def main ():
connector = MQTTConnector(config)
await connector.connect()
await connector.subscribe( "demo/solar/+/+/readings" , handle_demo_data)
await asyncio.sleep( 60 ) # Run for 1 minute
await connector.disconnect()
asyncio.run(main())
3. Process and Validate Data
from qubit.adapters import AdapterPipeline
from qubit.adapters.units import UnitConverter
from qubit.adapters.timezone import TimezoneAdapter
from qubit.adapters.validation import SchemaValidator
# Create processing pipeline
pipeline = AdapterPipeline([
UnitConverter( target_units = { "power" : "kW" , "energy" : "kWh" }),
TimezoneAdapter( target_timezone = "UTC" ),
SchemaValidator( "timeseries" , version = "0.2" )
])
# Process incoming data
@connector.on_message
async def process_energy_data ( raw_message ):
try :
# Run through adapter pipeline
timeseries = pipeline.process(raw_message)
print ( f "✅ Processed: { timeseries.id } " )
print ( f " Asset: { timeseries.asset_id } " )
print ( f " Value: { timeseries.value } { timeseries.unit } " )
print ( f " Quality: { timeseries.quality } " )
# Ready for Layer 2 processing!
return timeseries
except Exception as e:
print ( f "❌ Processing failed: { e } " )
raise
Complete Example: Solar Farm Monitoring
Let’s build a complete solar farm monitoring system:
1. Project Structure
solar_monitoring/
├── config/
│ ├── production.yaml
│ └── development.yaml
├── src/
│ ├── main.py
│ ├── handlers.py
│ └── storage.py
├── requirements.txt
└── docker-compose.yml
2. Configuration
config/production.yaml
requirements.txt
# Solar farm monitoring configuration
mqtt :
broker : "mqtts://solar-farm-mqtt.example.com:8883"
client_id : "solar_monitor_prod"
topics :
- "farm/inverter/+/telemetry"
- "farm/weather/+/observations"
qos : 1
tls :
ca_certs : "/certs/ca.pem"
certfile : "/certs/client.crt"
keyfile : "/certs/client.key"
auth :
username : "${MQTT_USERNAME}"
password : "${MQTT_PASSWORD}"
adapters :
units :
power_target : "kW"
energy_target : "kWh"
temperature_target : "C"
timezone :
site_timezone : "America/Los_Angeles"
target_timezone : "UTC"
validation :
schema_version : "0.2"
quality_threshold : 0.8
storage :
timeseries_db : "postgresql://user:pass@db.example.com/energy"
influxdb_url : "http://influx.example.com:8086"
3. Implementation
src/main.py
src/handlers.py
import asyncio
import yaml
from qubit.connectors.mqtt import MQTTConnector
from qubit.adapters import AdapterPipeline
from handlers import SolarDataHandler
from storage import TimeSeriesStore
async def main ():
# Load configuration
with open ( 'config/production.yaml' ) as f:
config = yaml.safe_load(f)
# Set up components
connector = MQTTConnector(config[ 'mqtt' ])
adapter_pipeline = AdapterPipeline.from_config(config[ 'adapters' ])
storage = TimeSeriesStore(config[ 'storage' ])
handler = SolarDataHandler(adapter_pipeline, storage)
# Connect and start processing
await connector.connect()
for topic in config[ 'mqtt' ][ 'topics' ]:
await connector.subscribe(topic, handler.process_message)
print ( "🌞 Solar farm monitoring started" )
try :
# Run indefinitely
while True :
await asyncio.sleep( 1 )
except KeyboardInterrupt :
print ( "👋 Shutting down..." )
finally :
await connector.disconnect()
await storage.close()
if __name__ == "__main__" :
asyncio.run(main())
4. Run and Monitor
Development
Production
docker-compose.yml
# Set environment variables
export MQTT_USERNAME = your_username
export MQTT_PASSWORD = your_password
# Run locally
python src/main.py
Verification
Once your system is running, verify data flow:
1. Check Data Quality
# Query recent data
from storage import TimeSeriesStore
store = TimeSeriesStore(config)
recent_data = await store.query(
asset_id = "ast_inverter_001" ,
metric = "power_generation" ,
start_time = "2024-01-15T14:00:00Z" ,
end_time = "2024-01-15T15:00:00Z"
)
# Analyze quality
quality_stats = {
"good" : sum ( 1 for ts in recent_data if ts.quality == "good" ),
"questionable" : sum ( 1 for ts in recent_data if ts.quality == "questionable" ),
"poor" : sum ( 1 for ts in recent_data if ts.quality == "poor" )
}
print ( f "Data quality distribution: { quality_stats } " )
# Check connector health
health_status = await connector.health_check()
print ( f "Connector status: { health_status } " )
# View processing metrics
metrics = pipeline.get_metrics()
print ( f "Messages processed: { metrics[ 'total_processed' ] } " )
print ( f "Validation success rate: { metrics[ 'validation_success_rate' ] :.2%} " )
print ( f "Average processing time: { metrics[ 'avg_processing_time' ] :.3f} s" )
Common Integration Patterns
Pattern 1: Real-time Dashboard
# Stream processed data to dashboard
@connector.on_message
async def stream_to_dashboard ( timeseries ):
if timeseries.metric in [ "power_generation" , "energy_consumption" ]:
await dashboard_websocket.send(timeseries.json())
Pattern 2: Data Lake Storage
# Batch write to data lake
batch_writer = BatchWriter(
destination = "s3://energy-data-lake/timeseries/" ,
batch_size = 1000 ,
flush_interval = 60 # seconds
)
@connector.on_message
async def store_in_data_lake ( timeseries ):
await batch_writer.add(timeseries)
Pattern 3: Alert Generation
# Generate alerts for anomalies
from qubit.alerts import AlertManager
alert_manager = AlertManager()
@connector.on_message
async def check_for_alerts ( timeseries ):
if timeseries.quality == "poor" :
await alert_manager.send_alert(
level = "warning" ,
message = f "Poor data quality from { timeseries.asset_id } " ,
metadata = timeseries.metadata
)
Troubleshooting
# Test MQTT connection
from qubit.connectors.mqtt import MQTTConnector
connector = MQTTConnector(config)
try :
await connector.connect( timeout = 10 )
print ( "✅ Connected successfully" )
except ConnectionError as e:
print ( f "❌ Connection failed: { e } " )
# Check firewall, credentials, broker URL
# Debug validation failures
from qubit.adapters.validation import SchemaValidator
validator = SchemaValidator( "timeseries" , version = "0.2" )
result = validator.validate(data)
if not result.is_valid:
print ( "Validation errors:" )
for error in result.errors:
print ( f " - { error.path } : { error.message } " )
# Debug unit conversions
from qubit.adapters.units import UnitConverter
try :
converted = UnitConverter.convert( 1500 , "W" , "kW" )
print ( f "Converted: { converted } kW" )
except UnitConversionError as e:
print ( f "Conversion error: { e } " )
print ( f "Supported units: { UnitConverter.supported_units() } " )
Production Deployment
High Availability Setup
apiVersion : apps/v1
kind : Deployment
metadata :
name : qubit-energy-connector
spec :
replicas : 3
selector :
matchLabels :
app : energy-connector
template :
spec :
containers :
- name : connector
image : qubit/connectors:latest
env :
- name : CONFIG_FILE
value : "/config/production.yaml"
resources :
requests :
cpu : 100m
memory : 128Mi
limits :
cpu : 500m
memory : 512Mi
livenessProbe :
httpGet :
path : /health
port : 8080
initialDelaySeconds : 30
periodSeconds : 10
Monitoring and Alerting
# Comprehensive monitoring setup
from qubit.monitoring import PrometheusExporter, AlertManager
# Export metrics to Prometheus
prometheus = PrometheusExporter( port = 8080 )
prometheus.start()
# Set up alerts
alerts = AlertManager()
@connector.on_connect
async def on_connection ():
await alerts.send( "info" , "Connector started successfully" )
@connector.on_disconnect
async def on_disconnection ():
await alerts.send( "warning" , "Connector disconnected" )
@pipeline.on_validation_failure
async def on_validation_failure ( error , data ):
if error.severity == "critical" :
await alerts.send( "critical" , f "Schema validation failed: { error } " )
Next Steps
Community and Support
You’ve now established the data foundation for your energy system. Layer 1 provides clean, standardized data ready for advanced processing, optimization, and coordination in the upper layers of the Qubit Foundation stack.