Skip to main content

Dispatch Engine

The DispatchEngine is the core bridge between Layer 3’s optimization output and Layer 4’s command execution. It reads schedule DataFrames and produces a queue of timed Command objects, then manages their lifecycle through dispatch, response handling, retries, and timeout detection.

Schedule-to-Command Conversion

The dispatch engine parses Layer 3 schedule DataFrame columns using naming conventions:
Column PatternCommand TypeAsset Type
ev_{vehicle_id}_kwSET_CHARGE_RATEEV Charger
battery_charge_kwSET_CHARGE_RATEBattery
battery_discharge_kwSET_DISCHARGE_RATEBattery
from coordinator import DispatchEngine, EventBus

engine = DispatchEngine(event_bus=EventBus())

# From Layer 3 EV scheduler result
commands = engine.schedule_to_commands(
    result.schedule,
    asset_map={
        "ev_001": "ast_charger_bay_1",
        "ev_002": "ast_charger_bay_2",
        "battery": "ast_batt_001"
    }
)

Change Detection

The engine only emits commands when power values change between consecutive time slots. This avoids flooding assets with redundant setpoints:
Hour 8:  22 kW  → Command: SET_CHARGE_RATE 22 kW
Hour 9:  22 kW  → (skipped — same value)
Hour 10: 22 kW  → (skipped — same value)
Hour 11:  0 kW  → Command: SET_CHARGE_RATE 0 kW

Command Lifecycle

Dispatching Commands

from datetime import datetime, timezone

# Get commands ready for execution
now = datetime.now(timezone.utc)
pending = engine.get_pending_commands(now)

# Dispatch through protocol adapters
for cmd in pending:
    adapter = registry.get(cmd.asset_id)
    engine.dispatch_command(cmd, adapter)

Handling Responses

# Asset acknowledged and executed
engine.handle_response(cmd, success=True)

# Asset reported an error — will retry if under max_retries
engine.handle_response(cmd, success=False, error="connection timeout")

Timeout Detection

# Check for commands stuck in DISPATCHED state
timed_out = engine.check_timeouts(now)
for cmd in timed_out:
    print(f"Command {cmd.command_id} to {cmd.asset_id} timed out")

Command Object

from coordinator.base import Command, CommandType, CommandStatus

cmd = Command(
    asset_id="ast_charger_001",
    command_type=CommandType.SET_CHARGE_RATE,
    parameters={"power_kw": 22.0},
    scheduled_at=datetime(2025, 1, 15, 8, 0, tzinfo=timezone.utc),
    # Auto-generated:
    # command_id="cmd_a1b2c3d4e5f6"
    # status=CommandStatus.PENDING
    # max_retries=3
    # timeout_seconds=30.0
)

Command Types

TypeDescriptionTypical Target
SET_CHARGE_RATESet charging powerEV charger, battery
SET_DISCHARGE_RATESet discharging powerBattery
SET_POWER_LIMITSet power limitAny
START_CHARGINGBegin charging sessionEV charger
STOP_CHARGINGEnd charging sessionEV charger
CHANGE_AVAILABILITYEnable/disable assetEV charger
SET_SOC_TARGETSet target SOCBattery

Event Publishing

The dispatch engine publishes events at each lifecycle transition:
ActionEvent Type
Commands generatedSCHEDULE_RECEIVED
Command sentCOMMAND_DISPATCHED
Command succeededCOMMAND_COMPLETED
Command failedCOMMAND_FAILED
Command timed outCOMMAND_TIMED_OUT
# Monitor all dispatched commands
bus.subscribe(EventType.COMMAND_DISPATCHED, lambda e: log.info(
    f"Dispatched {e.payload['command_type']} to {e.payload['asset_id']}"
))

Lookup Methods

# Find a specific command
cmd = engine.get_command("cmd_a1b2c3d4e5f6")

# Get all commands for an asset
asset_cmds = engine.get_commands_for_asset("ast_charger_001")