Skip to main content

Protocol Adapters

Protocol adapters translate abstract Qubit commands into device-specific message formats. Each adapter handles the encoding, transport semantics, and response parsing for a specific protocol.

Adapter Pattern

All adapters extend the ProtocolAdapter base class:
from coordinator import ProtocolAdapter, Command, AssetState

class MyCustomAdapter(ProtocolAdapter):
    def send_command(self, command: Command) -> bool:
        """Translate and send. Return True on success."""
        payload = self._translate(command)
        return self._send(payload)

    def read_state(self, asset_id: str) -> AssetState:
        """Read current device state."""
        raw = self._read_registers(asset_id)
        return self._parse_state(raw)

OCPP Adapter

For EV charging stations running OCPP 1.6 or 2.0:
from coordinator import OCPPAdapter

adapter = OCPPAdapter("ocpp_site_1")

Command Mapping

Qubit CommandOCPP ActionPayload
SET_CHARGE_RATESetChargingProfileCharging schedule with power limit in watts
SET_POWER_LIMITSetChargingProfileSame, used for dynamic power management
START_CHARGINGRemoteStartTransactionConnector ID + ID tag
STOP_CHARGINGRemoteStopTransactionTransaction ID
CHANGE_AVAILABILITYChangeAvailabilityOperative / Inoperative

Example: Set Charging Profile

from coordinator.base import Command, CommandType

cmd = Command(
    asset_id="ast_charger_001",
    command_type=CommandType.SET_CHARGE_RATE,
    parameters={"power_kw": 15.0, "connector_id": 1},
    scheduled_at=now
)

adapter.send_command(cmd)

# After send_command, the OCPP payload is attached:
print(cmd.parameters["ocpp_action"])
# "SetChargingProfile"

print(cmd.parameters["ocpp_payload"])
# {
#   "connectorId": 1,
#   "csChargingProfiles": {
#     "chargingSchedule": {
#       "chargingRateUnit": "W",
#       "chargingSchedulePeriod": [{"startPeriod": 0, "limit": 15000.0}]
#     }
#   }
# }

Modbus Adapter

For battery management systems and inverters using Modbus TCP/RTU:
from coordinator import ModbusAdapter

# Default register map
adapter = ModbusAdapter("modbus_bms_1")

# Custom register map for specific BMS
adapter = ModbusAdapter("modbus_bms_1", register_map={
    "charge_rate_kw": 30001,
    "discharge_rate_kw": 30002,
    "power_limit_kw": 30003,
    "soc_target_percent": 30004,
})

Register Mapping

Qubit CommandDefault RegisterValue
SET_CHARGE_RATE40001Power in kW
SET_DISCHARGE_RATE40002Power in kW
SET_POWER_LIMIT40003Power in kW
SET_SOC_TARGET40004Percent

Example: Battery Discharge

cmd = Command(
    asset_id="ast_batt_001",
    command_type=CommandType.SET_DISCHARGE_RATE,
    parameters={"power_kw": 50.0},
    scheduled_at=now
)

adapter.send_command(cmd)

print(cmd.parameters["modbus_writes"])
# [(40002, 50.0)]

Adapter Registry

The AdapterRegistry maps asset IDs to their protocol adapters:
from coordinator import AdapterRegistry, OCPPAdapter, ModbusAdapter

registry = AdapterRegistry()

# Register adapters
registry.register("ast_charger_001", OCPPAdapter("ocpp_1"))
registry.register("ast_charger_002", OCPPAdapter("ocpp_1"))
registry.register("ast_batt_001", ModbusAdapter("modbus_1"))

# Look up by asset ID
adapter = registry.get("ast_charger_001")

# Check registration
"ast_charger_001" in registry  # True
len(registry)                   # 3

# Remove
registry.unregister("ast_charger_002")
If an unregistered asset ID is requested, a KeyError is raised with diagnostic info:
registry.get("ast_unknown")
# KeyError: "No protocol adapter registered for asset 'ast_unknown'.
#            Registered assets: ['ast_charger_001', 'ast_batt_001']"

Writing Custom Adapters

To add support for a new protocol (e.g., DNP3, IEC 61850, REST API):
from coordinator import ProtocolAdapter, Command, AssetState, AssetType

class DNP3Adapter(ProtocolAdapter):
    def __init__(self, adapter_id: str, outstation_address: int):
        super().__init__(adapter_id)
        self.outstation_address = outstation_address

    def send_command(self, command: Command) -> bool:
        # Translate to DNP3 direct operate
        dnp3_payload = {
            "function": "DIRECT_OPERATE",
            "address": self.outstation_address,
            "index": self._command_to_index(command.command_type),
            "value": command.parameters.get("power_kw", 0),
        }
        command.parameters["dnp3_payload"] = dnp3_payload
        return True

    def read_state(self, asset_id: str) -> AssetState:
        return AssetState(
            asset_id=asset_id,
            asset_type=AssetType.INVERTER,
            current_state="RUNNING",
        )

# Register it
registry.register("ast_inverter_001", DNP3Adapter("dnp3_1", outstation_address=10))