Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.qubit.energy/llms.txt

Use this file to discover all available pages before exploring further.

Dispatch Engine

The DispatchEngine is the core bridge between Layer 3’s optimization output and Layer 4’s command execution. It reads schedule DataFrames and produces a queue of timed Command objects, then manages their lifecycle through dispatch, response handling, retries, and timeout detection.

Schedule-to-Command Conversion

The dispatch engine parses Layer 3 schedule DataFrame columns using naming conventions:
Column PatternCommand TypeAsset Type
ev_{vehicle_id}_kwSET_CHARGE_RATEEV Charger
battery_charge_kwSET_CHARGE_RATEBattery
battery_discharge_kwSET_DISCHARGE_RATEBattery
from coordinator import DispatchEngine, EventBus

engine = DispatchEngine(event_bus=EventBus())

# From Layer 3 EV scheduler result
commands = engine.schedule_to_commands(
    result.schedule,
    asset_map={
        "ev_001": "ast_charger_bay_1",
        "ev_002": "ast_charger_bay_2",
        "battery": "ast_batt_001"
    }
)

Change Detection

The engine only emits commands when power values change between consecutive time slots. This avoids flooding assets with redundant setpoints:
Hour 8:  22 kW  → Command: SET_CHARGE_RATE 22 kW
Hour 9:  22 kW  → (skipped — same value)
Hour 10: 22 kW  → (skipped — same value)
Hour 11:  0 kW  → Command: SET_CHARGE_RATE 0 kW

Command Lifecycle

Dispatching Commands

from datetime import datetime, timezone

# Get commands ready for execution
now = datetime.now(timezone.utc)
pending = engine.get_pending_commands(now)

# Dispatch through protocol adapters
for cmd in pending:
    adapter = registry.get(cmd.asset_id)
    engine.dispatch_command(cmd, adapter)

Handling Responses

# Asset acknowledged and executed
engine.handle_response(cmd, success=True)

# Asset reported an error — will retry if under max_retries
engine.handle_response(cmd, success=False, error="connection timeout")

Timeout Detection

# Check for commands stuck in DISPATCHED state
timed_out = engine.check_timeouts(now)
for cmd in timed_out:
    print(f"Command {cmd.command_id} to {cmd.asset_id} timed out")

Command Object

from coordinator.base import Command, CommandType, CommandStatus

cmd = Command(
    asset_id="ast_charger_001",
    command_type=CommandType.SET_CHARGE_RATE,
    parameters={"power_kw": 22.0},
    scheduled_at=datetime(2025, 1, 15, 8, 0, tzinfo=timezone.utc),
    # Auto-generated:
    # command_id="cmd_a1b2c3d4e5f6"
    # status=CommandStatus.PENDING
    # max_retries=3
    # timeout_seconds=30.0
)

Command Types

TypeDescriptionTypical Target
SET_CHARGE_RATESet charging powerEV charger, battery
SET_DISCHARGE_RATESet discharging powerBattery
SET_POWER_LIMITSet power limitAny
START_CHARGINGBegin charging sessionEV charger
STOP_CHARGINGEnd charging sessionEV charger
CHANGE_AVAILABILITYEnable/disable assetEV charger
SET_SOC_TARGETSet target SOCBattery

Event Publishing

The dispatch engine publishes events at each lifecycle transition:
ActionEvent Type
Commands generatedSCHEDULE_RECEIVED
Command sentCOMMAND_DISPATCHED
Command succeededCOMMAND_COMPLETED
Command failedCOMMAND_FAILED
Command timed outCOMMAND_TIMED_OUT
# Monitor all dispatched commands
bus.subscribe(EventType.COMMAND_DISPATCHED, lambda e: log.info(
    f"Dispatched {e.payload['command_type']} to {e.payload['asset_id']}"
))

Lookup Methods

# Find a specific command
cmd = engine.get_command("cmd_a1b2c3d4e5f6")

# Get all commands for an asset
asset_cmds = engine.get_commands_for_asset("ast_charger_001")