Skip to main content

Qubit Energy Connectors

Protocol-specific connectors for ingesting energy data from various sources into the Qubit Energy platform. These connectors handle the complexity of different communication protocols and translate raw device data into standardized formats.

Overview

Energy systems communicate using hundreds of different protocols. Qubit Connectors provide a unified interface that abstracts away protocol complexity while maintaining full feature support for each protocol’s unique capabilities.

MQTT

Real-time IoT device telemetry and messaging

OCPP

EV charger communication (1.6J & 2.0.1)

Modbus

Industrial equipment and smart meter integration

CSV/API

Bulk historical data import and REST APIs

Architecture

All connectors follow a consistent architecture pattern:

Installation

  • Python Package
  • Docker
  • Kubernetes
# Install base package
pip install qubit-energy-connectors

# With specific protocol support
pip install qubit-energy-connectors[mqtt]
pip install qubit-energy-connectors[modbus]
pip install qubit-energy-connectors[ocpp]
pip install qubit-energy-connectors[all]

MQTT Connector

Features

  • MQTT 3.1.1 and 5.0 support
  • QoS levels 0, 1, and 2
  • TLS/SSL encryption with certificate validation
  • WebSocket transport for browser compatibility
  • Auto-reconnection with exponential backoff
  • Hierarchical topic structures (org/site/asset/device/metric)
  • Wildcard subscriptions (+ single level, # multi-level)
  • Dynamic topic discovery and subscription
  • Topic filtering and routing rules
  • JSON message parsing with schema validation
  • Binary payload support (Protobuf, MessagePack)
  • Batch message processing for high throughput
  • Message deduplication and ordering

Configuration

{
  "mqtt": {
    "broker": "mqtt://broker.example.com:1883",
    "client_id": "qubit_connector_01",
    "topics": [
      "org_001/sit_solar_01/+/+/telemetry",
      "org_001/sit_storage_01/+/+/telemetry"
    ],
    "qos": 1,
    "keepalive": 60
  }
}

Usage Example

from qubit.connectors.mqtt import MQTTConnector
import asyncio

# Configuration
config = {
    "broker": "mqtt://localhost:1883",
    "topics": ["org_001/sit_solar_01/+/+/telemetry"],
    "username": "user",
    "password": "pass"
}

# Data handler
async def handle_solar_data(data):
    """Process incoming solar telemetry."""
    print(f"Solar generation: {data['value']} {data['unit']}")
    
    # Automatically normalized to TimeSeries schema
    timeseries = data.to_timeseries()
    return timeseries

# Main application
async def main():
    connector = MQTTConnector(config)
    
    # Connect and subscribe
    await connector.connect()
    await connector.subscribe(
        "org_001/sit_solar_01/+/+/telemetry",
        handle_solar_data
    )
    
    # Keep running
    await asyncio.sleep(3600)
    
    # Cleanup
    await connector.disconnect()

asyncio.run(main())

OCPP Connector

OCPP (Open Charge Point Protocol) is the industry standard for EV charger communication. Qubit supports both OCPP 1.6J and 2.0.1.

Features

  • OCPP 1.6J and 2.0.1 protocol support
  • WebSocket transport with automatic reconnection
  • Message queuing for offline operation
  • Transaction management with session tracking
  • Smart charging capabilities
  • Firmware update management

Configuration Example

from qubit.connectors.ocpp import OCPPConnector

config = {
    "version": "2.0.1",
    "websocket_url": "wss://chargers.network.com/ocpp",
    "charge_point_id": "CP_001",
    "auth": {
        "basic_auth": {
            "username": "charger_001", 
            "password": "secure_password"
        }
    }
}

connector = OCPPConnector(config)

@connector.on_message("MeterValues")
async def handle_meter_values(message):
    # Convert OCPP message to TimeSeries
    return TimeSeries(
        id=f"ts_{message.charge_point_id}_{message.timestamp}",
        asset_id=message.charge_point_id,
        metric="charging_power",
        value=message.meter_value.sampled_value.value,
        unit="kW",
        timestamp=message.timestamp
    )

Modbus Connector

Protocol Support

  • Modbus RTU
  • Modbus TCP
config = {
    "protocol": "rtu",
    "port": "/dev/ttyUSB0",
    "baudrate": 19200,
    "parity": "even",
    "stopbits": 1,
    "bytesize": 8,
    "timeout": 1.0
}

Register Mapping

{
  "modbus": {
    "host": "192.168.1.100",
    "port": 502,
    "registers": {
      "active_power": {
        "address": 40001,
        "type": "float32",
        "scale": 1000,
        "unit": "kW"
      },
      "energy_total": {
        "address": 40003,
        "type": "uint32",
        "scale": 100,
        "unit": "kWh"
      }
    },
    "polling_interval": 5
  }
}

CSV Connector

Bulk Data Import

from qubit.connectors.csv import CSVConnector

config = {
    "file_path": "/data/historical_generation.csv",
    "mapping": {
        "timestamp": "DateTime",
        "asset_id": "InverterID", 
        "value": "Generation_kWh",
        "unit": "kWh"
    },
    "timestamp_format": "%Y-%m-%d %H:%M:%S",
    "timezone": "America/Los_Angeles"
}

connector = CSVConnector(config)

# Process large files with progress tracking
async def process_historical_data():
    async for timeseries_batch in connector.process():
        print(f"Processed {len(timeseries_batch)} records")
        # Send to Layer 2 for processing
        await layer2_client.send(timeseries_batch)

Monitoring and Observability

Health Metrics

  • Connection status and uptime
  • Message throughput and latency
  • Error rates and retry counts
  • Queue depths and processing lag

Data Quality

  • Schema validation success rates
  • Data completeness metrics
  • Outlier detection and flagging
  • Source reliability scoring

Example Monitoring

from qubit.monitoring import ConnectorMetrics

metrics = ConnectorMetrics()

@connector.on_message
async def track_message_metrics(message):
    metrics.increment_counter("messages_received")
    metrics.record_histogram("message_size", len(message))
    
    # Process message
    result = await process_message(message)
    
    if result.valid:
        metrics.increment_counter("messages_valid")
    else:
        metrics.increment_counter("messages_invalid")
        metrics.record_error("validation_failed", result.error)

Next Steps

1

Choose Your Protocol

Select the connector that matches your energy devices
2

Configure Connection

Set up authentication and connection parameters
3

Test Integration

Start with a small subset of devices to validate data flow
4

Scale Deployment

Deploy across your full device fleet with monitoring

Get Started

Ready to connect your first device? Follow our getting started guide.

Qubit Connectors handle the complexity of energy protocols so you can focus on building innovative applications on top of clean, standardized data.