from coordinator import ProtocolAdapter, Command, AssetState, AssetType
class DNP3Adapter(ProtocolAdapter):
def __init__(self, adapter_id: str, outstation_address: int):
super().__init__(adapter_id)
self.outstation_address = outstation_address
def send_command(self, command: Command) -> bool:
# Translate to DNP3 direct operate
dnp3_payload = {
"function": "DIRECT_OPERATE",
"address": self.outstation_address,
"index": self._command_to_index(command.command_type),
"value": command.parameters.get("power_kw", 0),
}
command.parameters["dnp3_payload"] = dnp3_payload
return True
def read_state(self, asset_id: str) -> AssetState:
return AssetState(
asset_id=asset_id,
asset_type=AssetType.INVERTER,
current_state="RUNNING",
)
# Register it
registry.register("ast_inverter_001", DNP3Adapter("dnp3_1", outstation_address=10))