Qubit Energy Adapters

Data transformation adapters that normalize, validate, and enrich energy data from diverse sources. Adapters bridge the gap between raw device data and the standardized Qubit Energy Schema format.

Overview

Energy data arrives in countless formats with inconsistent units, timezones, and quality indicators. Qubit Adapters solve this by providing:

Unit Conversion

Automatic conversion between measurement systems (SI, Imperial, custom)

Timezone Handling

Intelligent timezone detection and UTC normalization

Data Validation

Multi-layer validation with quality scoring

Enrichment

Add metadata, context, and calculated fields

Core Adapters

Unit Converter

Handles conversion between different measurement systems commonly used in energy applications.
from qubit.adapters.units import PowerConverter

# Convert various power units to standard kW
power_kw = PowerConverter.to_kilowatts(5000, "W")        # 5.0 kW
power_kw = PowerConverter.to_kilowatts(2.5, "MW")       # 2500.0 kW
power_kw = PowerConverter.to_kilowatts(3.4, "hp")       # 2.54 kW

# Automatic unit detection
normalized = PowerConverter.normalize("1500 Watts")      # (1.5, "kW")

Timezone Adapter

Handles the complexity of timezone conversion and DST transitions for global energy operations.
from qubit.adapters.timezone import TimezoneAdapter
import datetime

# Initialize adapter
tz_adapter = TimezoneAdapter()

# Convert local time to UTC
local_time = datetime.datetime(2024, 7, 15, 14, 30)  # 2:30 PM
utc_time = tz_adapter.to_utc(
    local_time, 
    timezone="America/Los_Angeles"
)  # 2024-07-15T21:30:00Z

# Handle DST transitions automatically
winter_time = datetime.datetime(2024, 1, 15, 14, 30)
utc_winter = tz_adapter.to_utc(
    winter_time, 
    timezone="America/Los_Angeles"
)  # 2024-01-15T22:30:00Z (PST)

# Bulk timezone conversion
timestamps = [
    "2024-01-15 14:30:00",
    "2024-07-15 14:30:00"
]
utc_timestamps = tz_adapter.batch_convert(
    timestamps,
    source_timezone="America/Los_Angeles",
    source_format="%Y-%m-%d %H:%M:%S"
)

Data Validation

Multi-layer validation ensures data quality and schema compliance:
1

Schema Validation

Strict JSON Schema validation against Qubit Energy Schemas
from qubit.adapters.validation import SchemaValidator

validator = SchemaValidator("timeseries", version="0.2")

result = validator.validate({
    "id": "ts_solar_001",
    "asset_id": "ast_inverter_001",
    "metric": "energy_generation",
    "value": 2500.0,
    "unit": "kWh",
    "timestamp": "2024-01-15T14:30:00Z"
})

if result.is_valid:
    print("✅ Data is valid")
else:
    print(f"❌ Validation errors: {result.errors}")
2

Range Validation

Check that values fall within expected ranges for the metric type
from qubit.adapters.validation import RangeValidator

# Define reasonable ranges for metrics
ranges = {
    "energy_generation": {"min": 0, "max": 10000},  # kWh
    "ambient_temperature": {"min": -40, "max": 60}   # °C
}

validator = RangeValidator(ranges)
result = validator.validate("energy_generation", 2500.0)
3

Quality Scoring

Assign confidence scores based on data characteristics
from qubit.adapters.validation import QualityScorer

scorer = QualityScorer()

quality = scorer.score_timeseries({
    "value": 2500.0,
    "timestamp": "2024-01-15T14:30:00Z",
    "source_reliability": 0.95,
    "measurement_uncertainty": 0.02
})

# Returns: {"score": 0.92, "category": "good"}

Advanced Features

Data Enrichment

Automatically add calculated fields and contextual metadata:
from qubit.adapters.enrichment import DataEnricher

enricher = DataEnricher()

# Add calculated fields
@enricher.calculator("solar_efficiency")
def calculate_efficiency(data):
    if "irradiance" in data.metadata and data.metric == "energy_generation":
        panel_area = data.asset.specifications.get("panel_area_m2", 1000)
        irradiance = data.metadata["irradiance_wm2"]
        
        theoretical_max = (irradiance * panel_area) / 1000  # kW
        efficiency = data.value / theoretical_max if theoretical_max > 0 else 0
        
        return {"solar_efficiency": round(efficiency, 3)}
    
    return {}

# Add weather context
@enricher.context_provider("weather")
async def add_weather_context(data):
    if hasattr(data, 'location'):
        weather = await weather_api.get_current(
            data.location.latitude,
            data.location.longitude
        )
        return {
            "weather_condition": weather.condition,
            "cloud_cover_percent": weather.cloud_cover
        }
    return {}

Anomaly Detection

Built-in anomaly detection for data quality assurance:
from qubit.adapters.validation import AnomalyDetector

detector = AnomalyDetector(
    metric="energy_generation",
    model="isolation_forest"
)

# Train on historical data
training_data = load_historical_generation_data()
detector.fit(training_data)

# Detect anomalies in real-time
@connector.on_message
async def detect_anomalies(data):
    anomaly_score = detector.predict(data.value)
    
    if anomaly_score > 0.8:  # High anomaly score
        data.metadata["anomaly_detected"] = True
        data.metadata["anomaly_score"] = anomaly_score
        data.quality = "questionable"
    
    return data

Configuration Management

Environment-based Configuration

# config/production.yaml
adapters:
  units:
    strict_validation: true
    decimal_precision: 6
  timezone:
    default_timezone: "UTC"
    validate_transitions: true
  validation:
    schema_version: "0.2"
    quality_threshold: 0.8
    enable_anomaly_detection: true

logging:
  level: "INFO"
  structured: true

Dynamic Configuration

from qubit.adapters.config import ConfigManager

# Load configuration with environment overrides
config = ConfigManager.load("production.yaml")

# Runtime configuration updates
config.update("adapters.validation.quality_threshold", 0.9)

# Environment variable overrides
# QUBIT_ADAPTERS_UNITS_STRICT_VALIDATION=true

Performance Optimization

For high-throughput applications, use batch processing and connection pooling to maximize performance.

Batch Processing

from qubit.adapters import BatchProcessor

processor = BatchProcessor(
    batch_size=1000,
    max_wait_time=1.0  # seconds
)

@processor.batch_handler
async def process_batch(data_batch):
    # Process 1000 records at once
    validated = validator.validate_batch(data_batch)
    normalized = unit_converter.convert_batch(validated)
    
    # Bulk insert to database
    await database.insert_many(normalized)

# Messages automatically batched
await processor.add(timeseries_data)

Caching

from qubit.adapters.cache import AdapterCache

# Cache expensive operations
cache = AdapterCache(ttl=300)  # 5-minute TTL

@cache.memoize
def expensive_validation(schema_name, data_hash):
    return validator.validate(data)

@cache.memoize  
def timezone_lookup(timezone_name):
    return pytz.timezone(timezone_name)

Error Handling

Robust error handling ensures data integrity:

Testing

Always test adapters with representative data from your actual devices before deploying to production.

Unit Testing

import pytest
from qubit.adapters.units import PowerConverter

def test_power_conversion():
    # Test standard conversions
    assert PowerConverter.to_kilowatts(1000, "W") == 1.0
    assert PowerConverter.to_kilowatts(2.5, "MW") == 2500.0
    
    # Test edge cases
    assert PowerConverter.to_kilowatts(0, "W") == 0.0
    
    # Test error handling
    with pytest.raises(UnitConversionError):
        PowerConverter.to_kilowatts(100, "invalid_unit")

def test_timezone_conversion():
    from qubit.adapters.timezone import TimezoneAdapter
    import datetime
    
    adapter = TimezoneAdapter()
    
    # Test DST transition
    local_time = datetime.datetime(2024, 3, 10, 2, 30)  # Spring forward
    utc_time = adapter.to_utc(local_time, "America/Los_Angeles")
    
    assert utc_time.hour == 10  # 2:30 AM PST -> 10:30 UTC

Integration Testing

from qubit.connectors.mqtt import MQTTConnector
from qubit.adapters import AdapterPipeline

@pytest.mark.integration
async def test_mqtt_to_schema_pipeline():
    # Set up test MQTT broker
    test_broker = await setup_test_broker()
    
    # Configure connector with adapters
    connector = MQTTConnector({
        "broker": test_broker.url,
        "topics": ["test/device/+/readings"]
    })
    
    pipeline = AdapterPipeline([
        ("units", UnitConverter()),
        ("timezone", TimezoneAdapter()),
        ("validation", SchemaValidator("timeseries"))
    ])
    
    # Send test message
    await test_broker.publish(
        "test/device/001/readings",
        {"power": 1500, "unit": "W", "timestamp": "2024-01-15 14:30:00"}
    )
    
    # Verify normalized output
    message = await connector.receive()
    result = pipeline.process(message)
    
    assert result.value == 1.5  # Converted to kW
    assert result.unit == "kW"
    assert result.timestamp.endswith("Z")  # UTC format

Best Practices

Performance Guidelines

Batch Operations

Process data in batches of 100-1000 records for optimal throughput

Connection Pooling

Reuse database connections and validator instances

Async Processing

Use async/await for I/O bound operations

Memory Management

Stream large datasets instead of loading into memory

Data Quality

1

Validate Early

Perform validation as close to the data source as possible
2

Preserve Originals

Keep original values in metadata for debugging and auditing
3

Quality Scoring

Use quality indicators to flag questionable data for review
4

Monitor Trends

Track validation failure rates and common error patterns

Real-World Examples

Solar Inverter Data

# Raw inverter data (various units and timezone)
raw_data = {
    "inverter_id": "INV_001",
    "power_output": 1500,  # Watts
    "energy_today": 45.2,  # kWh
    "timestamp": "2024-01-15 14:30:00",  # Local time
    "timezone": "America/Los_Angeles",
    "temperature": 77.5,  # Fahrenheit
    "efficiency": 96.2     # Percent
}

# Process through adapters
adapter_pipeline = AdapterPipeline([
    PowerConverter(target_unit="kW"),
    TimezoneConverter(target_timezone="UTC"),
    TemperatureConverter(target_unit="C"),
    SchemaValidator("timeseries")
])

result = adapter_pipeline.process(raw_data)

# Normalized output
{
    "id": "ts_inv_001_2024_01_15_22_30",
    "asset_id": "ast_inverter_001", 
    "metric": "power_generation",
    "value": 1.5,
    "unit": "kW",
    "timestamp": "2024-01-15T22:30:00Z",  # UTC
    "quality": "good",
    "metadata": {
        "original_value": 1500,
        "original_unit": "W", 
        "original_timestamp": "2024-01-15 14:30:00 PST",
        "ambient_temperature_c": 25.3,
        "inverter_efficiency": 0.962
    }
}

EV Charging Data

# OCPP message processing
ocpp_message = {
    "messageType": "MeterValues",
    "chargePointId": "CP_001",
    "meterValue": {
        "timestamp": "2024-01-15T14:30:00.000Z",
        "sampledValue": {
            "value": "15000",  # String format
            "unit": "W",       # Watts
            "measurand": "Power.Active.Import"
        }
    }
}

# Process through OCPP adapter
ocpp_adapter = OCPPAdapter()
timeseries = ocpp_adapter.process(ocpp_message)

# Output
{
    "id": "ts_cp_001_power_2024_01_15_14_30",
    "asset_id": "ast_charger_cp_001",
    "metric": "charging_power", 
    "value": 15.0,
    "unit": "kW",
    "timestamp": "2024-01-15T14:30:00Z",
    "quality": "good",
    "metadata": {
        "ocpp_measurand": "Power.Active.Import",
        "charging_session": "active"
    }
}

Monitoring Adapters

Track adapter performance and data quality:
from qubit.adapters.monitoring import AdapterMetrics

metrics = AdapterMetrics()

class MonitoredUnitConverter(UnitConverter):
    def convert(self, value, from_unit, to_unit):
        start_time = time.time()
        
        try:
            result = super().convert(value, from_unit, to_unit)
            metrics.record_conversion_success(from_unit, to_unit)
            return result
        except Exception as e:
            metrics.record_conversion_failure(from_unit, to_unit, str(e))
            raise
        finally:
            duration = time.time() - start_time
            metrics.record_conversion_latency(duration)

# View metrics
print(f"Conversion success rate: {metrics.success_rate():.2%}")
print(f"Average latency: {metrics.avg_latency():.3f}s")

Next Steps

Getting Started

Ready to start processing energy data? Follow our step-by-step guide.
Or explore related components:
Qubit Adapters ensure that regardless of how your energy data arrives, it’s transformed into high-quality, standardized formats ready for analysis and optimization.