Getting Started with Layer 1

Get your energy data flowing through the Qubit Foundation infrastructure in just a few steps. This guide walks you through connecting your first device and processing energy data.

Prerequisites

This guide assumes you have basic familiarity with energy systems and Python programming. For production deployments, consult with your system administrator.
1

System Requirements

  • Python 3.8+ or Node.js 16+
  • Git for cloning repositories
  • Docker (optional, for containerized deployment)
  • Access to energy device or historical data
2

Network Access

  • MQTT broker connectivity (port 1883/8883)
  • OCPP WebSocket endpoints (port 80/443)
  • Modbus TCP connectivity (port 502)
  • Outbound HTTPS for schema validation

Quick Start: 5-Minute Setup

1. Install Qubit Components

# Install schemas and connectors
pip install qubit-energy-schemas qubit-energy-connectors[mqtt]

# Clone examples
git clone https://github.com/qubit-foundation/qubit-energy-schemas.git
cd qubit-energy-schemas

2. Choose Your Data Source

Use our live demo data to get started immediately:
from qubit.connectors.mqtt import MQTTConnector
import asyncio

# Demo configuration
config = {
    "broker": "mqtt://demo.qubit.energy:1883",
    "topics": ["demo/solar/+/+/readings"],
    "client_id": "demo_client_001"
}

async def handle_demo_data(data):
    print(f"Demo data: {data}")
    return data

async def main():
    connector = MQTTConnector(config)
    await connector.connect()
    await connector.subscribe("demo/solar/+/+/readings", handle_demo_data)
    await asyncio.sleep(60)  # Run for 1 minute
    await connector.disconnect()

asyncio.run(main())

3. Process and Validate Data

from qubit.adapters import AdapterPipeline
from qubit.adapters.units import UnitConverter
from qubit.adapters.timezone import TimezoneAdapter
from qubit.adapters.validation import SchemaValidator

# Create processing pipeline
pipeline = AdapterPipeline([
    UnitConverter(target_units={"power": "kW", "energy": "kWh"}),
    TimezoneAdapter(target_timezone="UTC"),
    SchemaValidator("timeseries", version="0.2")
])

# Process incoming data
@connector.on_message
async def process_energy_data(raw_message):
    try:
        # Run through adapter pipeline
        timeseries = pipeline.process(raw_message)
        
        print(f"✅ Processed: {timeseries.id}")
        print(f"   Asset: {timeseries.asset_id}")
        print(f"   Value: {timeseries.value} {timeseries.unit}")
        print(f"   Quality: {timeseries.quality}")
        
        # Ready for Layer 2 processing!
        return timeseries
        
    except Exception as e:
        print(f"❌ Processing failed: {e}")
        raise

Complete Example: Solar Farm Monitoring

Let’s build a complete solar farm monitoring system:

1. Project Structure

solar_monitoring/
├── config/
│   ├── production.yaml
│   └── development.yaml
├── src/
│   ├── main.py
│   ├── handlers.py
│   └── storage.py
├── requirements.txt
└── docker-compose.yml

2. Configuration

# Solar farm monitoring configuration
mqtt:
  broker: "mqtts://solar-farm-mqtt.example.com:8883"
  client_id: "solar_monitor_prod"
  topics:
    - "farm/inverter/+/telemetry"
    - "farm/weather/+/observations"
  qos: 1
  tls:
    ca_certs: "/certs/ca.pem"
    certfile: "/certs/client.crt" 
    keyfile: "/certs/client.key"
  auth:
    username: "${MQTT_USERNAME}"
    password: "${MQTT_PASSWORD}"

adapters:
  units:
    power_target: "kW"
    energy_target: "kWh"
    temperature_target: "C"
  timezone:
    site_timezone: "America/Los_Angeles"
    target_timezone: "UTC"
  validation:
    schema_version: "0.2"
    quality_threshold: 0.8

storage:
  timeseries_db: "postgresql://user:pass@db.example.com/energy"
  influxdb_url: "http://influx.example.com:8086"

3. Implementation

import asyncio
import yaml
from qubit.connectors.mqtt import MQTTConnector
from qubit.adapters import AdapterPipeline
from handlers import SolarDataHandler
from storage import TimeSeriesStore

async def main():
    # Load configuration
    with open('config/production.yaml') as f:
        config = yaml.safe_load(f)
    
    # Set up components
    connector = MQTTConnector(config['mqtt'])
    adapter_pipeline = AdapterPipeline.from_config(config['adapters'])
    storage = TimeSeriesStore(config['storage'])
    handler = SolarDataHandler(adapter_pipeline, storage)
    
    # Connect and start processing
    await connector.connect()
    
    for topic in config['mqtt']['topics']:
        await connector.subscribe(topic, handler.process_message)
    
    print("🌞 Solar farm monitoring started")
    
    try:
        # Run indefinitely
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        print("👋 Shutting down...")
    finally:
        await connector.disconnect()
        await storage.close()

if __name__ == "__main__":
    asyncio.run(main())

4. Run and Monitor

# Set environment variables
export MQTT_USERNAME=your_username
export MQTT_PASSWORD=your_password

# Run locally
python src/main.py

Verification

Once your system is running, verify data flow:

1. Check Data Quality

# Query recent data
from storage import TimeSeriesStore

store = TimeSeriesStore(config)
recent_data = await store.query(
    asset_id="ast_inverter_001",
    metric="power_generation", 
    start_time="2024-01-15T14:00:00Z",
    end_time="2024-01-15T15:00:00Z"
)

# Analyze quality
quality_stats = {
    "good": sum(1 for ts in recent_data if ts.quality == "good"),
    "questionable": sum(1 for ts in recent_data if ts.quality == "questionable"),
    "poor": sum(1 for ts in recent_data if ts.quality == "poor")
}

print(f"Data quality distribution: {quality_stats}")

2. Monitor Performance

# Check connector health
health_status = await connector.health_check()
print(f"Connector status: {health_status}")

# View processing metrics
metrics = pipeline.get_metrics()
print(f"Messages processed: {metrics['total_processed']}")
print(f"Validation success rate: {metrics['validation_success_rate']:.2%}")
print(f"Average processing time: {metrics['avg_processing_time']:.3f}s")

Common Integration Patterns

Pattern 1: Real-time Dashboard

# Stream processed data to dashboard
@connector.on_message
async def stream_to_dashboard(timeseries):
    if timeseries.metric in ["power_generation", "energy_consumption"]:
        await dashboard_websocket.send(timeseries.json())

Pattern 2: Data Lake Storage

# Batch write to data lake
batch_writer = BatchWriter(
    destination="s3://energy-data-lake/timeseries/",
    batch_size=1000,
    flush_interval=60  # seconds
)

@connector.on_message  
async def store_in_data_lake(timeseries):
    await batch_writer.add(timeseries)

Pattern 3: Alert Generation

# Generate alerts for anomalies
from qubit.alerts import AlertManager

alert_manager = AlertManager()

@connector.on_message
async def check_for_alerts(timeseries):
    if timeseries.quality == "poor":
        await alert_manager.send_alert(
            level="warning",
            message=f"Poor data quality from {timeseries.asset_id}",
            metadata=timeseries.metadata
        )

Troubleshooting

Production Deployment

High Availability Setup

apiVersion: apps/v1
kind: Deployment
metadata:
  name: qubit-energy-connector
spec:
  replicas: 3
  selector:
    matchLabels:
      app: energy-connector
  template:
    spec:
      containers:
      - name: connector
        image: qubit/connectors:latest
        env:
        - name: CONFIG_FILE
          value: "/config/production.yaml"
        resources:
          requests:
            cpu: 100m
            memory: 128Mi
          limits:
            cpu: 500m
            memory: 512Mi
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10

Monitoring and Alerting

# Comprehensive monitoring setup
from qubit.monitoring import PrometheusExporter, AlertManager

# Export metrics to Prometheus
prometheus = PrometheusExporter(port=8080)
prometheus.start()

# Set up alerts
alerts = AlertManager()

@connector.on_connect
async def on_connection():
    await alerts.send("info", "Connector started successfully")

@connector.on_disconnect  
async def on_disconnection():
    await alerts.send("warning", "Connector disconnected")

@pipeline.on_validation_failure
async def on_validation_failure(error, data):
    if error.severity == "critical":
        await alerts.send("critical", f"Schema validation failed: {error}")

Next Steps

Scale Your Deployment

Learn about production deployment patterns and scaling strategies

Layer 2: Predictions

Add AI-powered forecasting to your energy data

Custom Connectors

Build connectors for proprietary or emerging protocols

Advanced Monitoring

Set up comprehensive observability and alerting

Community and Support

1

Join the Community

Connect with other developers in our GitHub Discussions
2

Report Issues

Found a bug? Create an issue on GitHub
3

Contribute

Help improve the connectors by contributing code

You’ve now established the data foundation for your energy system. Layer 1 provides clean, standardized data ready for advanced processing, optimization, and coordination in the upper layers of the Qubit Foundation stack.