Qubit Energy Adapters
Data transformation adapters that normalize, validate, and enrich energy data from diverse sources. Adapters bridge the gap between raw device data and the standardized Qubit Energy Schema format.
Overview
Energy data arrives in countless formats with inconsistent units, timezones, and quality indicators. Qubit Adapters solve this by providing:
Unit Conversion Automatic conversion between measurement systems (SI, Imperial, custom)
Timezone Handling Intelligent timezone detection and UTC normalization
Data Validation Multi-layer validation with quality scoring
Enrichment Add metadata, context, and calculated fields
Core Adapters
Unit Converter
Handles conversion between different measurement systems commonly used in energy applications.
Power Units Energy Units Temperature Custom Units from qubit.adapters.units import PowerConverter
# Convert various power units to standard kW
power_kw = PowerConverter.to_kilowatts( 5000 , "W" ) # 5.0 kW
power_kw = PowerConverter.to_kilowatts( 2.5 , "MW" ) # 2500.0 kW
power_kw = PowerConverter.to_kilowatts( 3.4 , "hp" ) # 2.54 kW
# Automatic unit detection
normalized = PowerConverter.normalize( "1500 Watts" ) # (1.5, "kW")
Timezone Adapter
Handles the complexity of timezone conversion and DST transitions for global energy operations.
from qubit.adapters.timezone import TimezoneAdapter
import datetime
# Initialize adapter
tz_adapter = TimezoneAdapter()
# Convert local time to UTC
local_time = datetime.datetime( 2024 , 7 , 15 , 14 , 30 ) # 2:30 PM
utc_time = tz_adapter.to_utc(
local_time,
timezone = "America/Los_Angeles"
) # 2024-07-15T21:30:00Z
# Handle DST transitions automatically
winter_time = datetime.datetime( 2024 , 1 , 15 , 14 , 30 )
utc_winter = tz_adapter.to_utc(
winter_time,
timezone = "America/Los_Angeles"
) # 2024-01-15T22:30:00Z (PST)
# Bulk timezone conversion
timestamps = [
"2024-01-15 14:30:00" ,
"2024-07-15 14:30:00"
]
utc_timestamps = tz_adapter.batch_convert(
timestamps,
source_timezone = "America/Los_Angeles" ,
source_format = "%Y-%m- %d %H:%M:%S"
)
Data Validation
Multi-layer validation ensures data quality and schema compliance:
Schema Validation
Strict JSON Schema validation against Qubit Energy Schemas from qubit.adapters.validation import SchemaValidator
validator = SchemaValidator( "timeseries" , version = "0.2" )
result = validator.validate({
"id" : "ts_solar_001" ,
"asset_id" : "ast_inverter_001" ,
"metric" : "energy_generation" ,
"value" : 2500.0 ,
"unit" : "kWh" ,
"timestamp" : "2024-01-15T14:30:00Z"
})
if result.is_valid:
print ( "✅ Data is valid" )
else :
print ( f "❌ Validation errors: { result.errors } " )
Range Validation
Check that values fall within expected ranges for the metric type from qubit.adapters.validation import RangeValidator
# Define reasonable ranges for metrics
ranges = {
"energy_generation" : { "min" : 0 , "max" : 10000 }, # kWh
"ambient_temperature" : { "min" : - 40 , "max" : 60 } # °C
}
validator = RangeValidator(ranges)
result = validator.validate( "energy_generation" , 2500.0 )
Quality Scoring
Assign confidence scores based on data characteristics from qubit.adapters.validation import QualityScorer
scorer = QualityScorer()
quality = scorer.score_timeseries({
"value" : 2500.0 ,
"timestamp" : "2024-01-15T14:30:00Z" ,
"source_reliability" : 0.95 ,
"measurement_uncertainty" : 0.02
})
# Returns: {"score": 0.92, "category": "good"}
Advanced Features
Data Enrichment
Automatically add calculated fields and contextual metadata:
from qubit.adapters.enrichment import DataEnricher
enricher = DataEnricher()
# Add calculated fields
@enricher.calculator ( "solar_efficiency" )
def calculate_efficiency ( data ):
if "irradiance" in data.metadata and data.metric == "energy_generation" :
panel_area = data.asset.specifications.get( "panel_area_m2" , 1000 )
irradiance = data.metadata[ "irradiance_wm2" ]
theoretical_max = (irradiance * panel_area) / 1000 # kW
efficiency = data.value / theoretical_max if theoretical_max > 0 else 0
return { "solar_efficiency" : round (efficiency, 3 )}
return {}
# Add weather context
@enricher.context_provider ( "weather" )
async def add_weather_context ( data ):
if hasattr (data, 'location' ):
weather = await weather_api.get_current(
data.location.latitude,
data.location.longitude
)
return {
"weather_condition" : weather.condition,
"cloud_cover_percent" : weather.cloud_cover
}
return {}
Anomaly Detection
Built-in anomaly detection for data quality assurance:
from qubit.adapters.validation import AnomalyDetector
detector = AnomalyDetector(
metric = "energy_generation" ,
model = "isolation_forest"
)
# Train on historical data
training_data = load_historical_generation_data()
detector.fit(training_data)
# Detect anomalies in real-time
@connector.on_message
async def detect_anomalies ( data ):
anomaly_score = detector.predict(data.value)
if anomaly_score > 0.8 : # High anomaly score
data.metadata[ "anomaly_detected" ] = True
data.metadata[ "anomaly_score" ] = anomaly_score
data.quality = "questionable"
return data
Configuration Management
Environment-based Configuration
# config/production.yaml
adapters :
units :
strict_validation : true
decimal_precision : 6
timezone :
default_timezone : "UTC"
validate_transitions : true
validation :
schema_version : "0.2"
quality_threshold : 0.8
enable_anomaly_detection : true
logging :
level : "INFO"
structured : true
Dynamic Configuration
from qubit.adapters.config import ConfigManager
# Load configuration with environment overrides
config = ConfigManager.load( "production.yaml" )
# Runtime configuration updates
config.update( "adapters.validation.quality_threshold" , 0.9 )
# Environment variable overrides
# QUBIT_ADAPTERS_UNITS_STRICT_VALIDATION=true
For high-throughput applications, use batch processing and connection pooling to maximize performance.
Batch Processing
from qubit.adapters import BatchProcessor
processor = BatchProcessor(
batch_size = 1000 ,
max_wait_time = 1.0 # seconds
)
@processor.batch_handler
async def process_batch ( data_batch ):
# Process 1000 records at once
validated = validator.validate_batch(data_batch)
normalized = unit_converter.convert_batch(validated)
# Bulk insert to database
await database.insert_many(normalized)
# Messages automatically batched
await processor.add(timeseries_data)
Caching
from qubit.adapters.cache import AdapterCache
# Cache expensive operations
cache = AdapterCache( ttl = 300 ) # 5-minute TTL
@cache.memoize
def expensive_validation ( schema_name , data_hash ):
return validator.validate(data)
@cache.memoize
def timezone_lookup ( timezone_name ):
return pytz.timezone(timezone_name)
Error Handling
Robust error handling ensures data integrity:
try :
normalized_data = adapter.process(raw_data)
except UnitConversionError as e:
# Log error but continue with original units
logger.warning( f "Unit conversion failed: { e } " )
normalized_data = raw_data
normalized_data.quality = "questionable"
except ValidationError as e:
# Critical error - cannot process
logger.error( f "Schema validation failed: { e } " )
raise
from qubit.adapters.retry import RetryAdapter
@RetryAdapter (
max_attempts = 3 ,
backoff_factor = 2.0 ,
exceptions = [ValidationError, UnitConversionError]
)
async def process_with_retry ( data ):
return adapter.process(data)
from qubit.adapters.queue import DeadLetterQueue
dlq = DeadLetterQueue( "failed_validations" )
try :
result = adapter.process(data)
except Exception as e:
# Send to DLQ for manual review
await dlq.send(data, error = str (e))
raise
Testing
Always test adapters with representative data from your actual devices before deploying to production.
Unit Testing
import pytest
from qubit.adapters.units import PowerConverter
def test_power_conversion ():
# Test standard conversions
assert PowerConverter.to_kilowatts( 1000 , "W" ) == 1.0
assert PowerConverter.to_kilowatts( 2.5 , "MW" ) == 2500.0
# Test edge cases
assert PowerConverter.to_kilowatts( 0 , "W" ) == 0.0
# Test error handling
with pytest.raises(UnitConversionError):
PowerConverter.to_kilowatts( 100 , "invalid_unit" )
def test_timezone_conversion ():
from qubit.adapters.timezone import TimezoneAdapter
import datetime
adapter = TimezoneAdapter()
# Test DST transition
local_time = datetime.datetime( 2024 , 3 , 10 , 2 , 30 ) # Spring forward
utc_time = adapter.to_utc(local_time, "America/Los_Angeles" )
assert utc_time.hour == 10 # 2:30 AM PST -> 10:30 UTC
Integration Testing
from qubit.connectors.mqtt import MQTTConnector
from qubit.adapters import AdapterPipeline
@pytest.mark.integration
async def test_mqtt_to_schema_pipeline ():
# Set up test MQTT broker
test_broker = await setup_test_broker()
# Configure connector with adapters
connector = MQTTConnector({
"broker" : test_broker.url,
"topics" : [ "test/device/+/readings" ]
})
pipeline = AdapterPipeline([
( "units" , UnitConverter()),
( "timezone" , TimezoneAdapter()),
( "validation" , SchemaValidator( "timeseries" ))
])
# Send test message
await test_broker.publish(
"test/device/001/readings" ,
{ "power" : 1500 , "unit" : "W" , "timestamp" : "2024-01-15 14:30:00" }
)
# Verify normalized output
message = await connector.receive()
result = pipeline.process(message)
assert result.value == 1.5 # Converted to kW
assert result.unit == "kW"
assert result.timestamp.endswith( "Z" ) # UTC format
Best Practices
Batch Operations Process data in batches of 100-1000 records for optimal throughput
Connection Pooling Reuse database connections and validator instances
Async Processing Use async/await for I/O bound operations
Memory Management Stream large datasets instead of loading into memory
Data Quality
Validate Early
Perform validation as close to the data source as possible
Preserve Originals
Keep original values in metadata for debugging and auditing
Quality Scoring
Use quality indicators to flag questionable data for review
Monitor Trends
Track validation failure rates and common error patterns
Real-World Examples
Solar Inverter Data
# Raw inverter data (various units and timezone)
raw_data = {
"inverter_id" : "INV_001" ,
"power_output" : 1500 , # Watts
"energy_today" : 45.2 , # kWh
"timestamp" : "2024-01-15 14:30:00" , # Local time
"timezone" : "America/Los_Angeles" ,
"temperature" : 77.5 , # Fahrenheit
"efficiency" : 96.2 # Percent
}
# Process through adapters
adapter_pipeline = AdapterPipeline([
PowerConverter( target_unit = "kW" ),
TimezoneConverter( target_timezone = "UTC" ),
TemperatureConverter( target_unit = "C" ),
SchemaValidator( "timeseries" )
])
result = adapter_pipeline.process(raw_data)
# Normalized output
{
"id" : "ts_inv_001_2024_01_15_22_30" ,
"asset_id" : "ast_inverter_001" ,
"metric" : "power_generation" ,
"value" : 1.5 ,
"unit" : "kW" ,
"timestamp" : "2024-01-15T22:30:00Z" , # UTC
"quality" : "good" ,
"metadata" : {
"original_value" : 1500 ,
"original_unit" : "W" ,
"original_timestamp" : "2024-01-15 14:30:00 PST" ,
"ambient_temperature_c" : 25.3 ,
"inverter_efficiency" : 0.962
}
}
EV Charging Data
# OCPP message processing
ocpp_message = {
"messageType" : "MeterValues" ,
"chargePointId" : "CP_001" ,
"meterValue" : {
"timestamp" : "2024-01-15T14:30:00.000Z" ,
"sampledValue" : {
"value" : "15000" , # String format
"unit" : "W" , # Watts
"measurand" : "Power.Active.Import"
}
}
}
# Process through OCPP adapter
ocpp_adapter = OCPPAdapter()
timeseries = ocpp_adapter.process(ocpp_message)
# Output
{
"id" : "ts_cp_001_power_2024_01_15_14_30" ,
"asset_id" : "ast_charger_cp_001" ,
"metric" : "charging_power" ,
"value" : 15.0 ,
"unit" : "kW" ,
"timestamp" : "2024-01-15T14:30:00Z" ,
"quality" : "good" ,
"metadata" : {
"ocpp_measurand" : "Power.Active.Import" ,
"charging_session" : "active"
}
}
Monitoring Adapters
Track adapter performance and data quality:
from qubit.adapters.monitoring import AdapterMetrics
metrics = AdapterMetrics()
class MonitoredUnitConverter ( UnitConverter ):
def convert ( self , value , from_unit , to_unit ):
start_time = time.time()
try :
result = super ().convert(value, from_unit, to_unit)
metrics.record_conversion_success(from_unit, to_unit)
return result
except Exception as e:
metrics.record_conversion_failure(from_unit, to_unit, str (e))
raise
finally :
duration = time.time() - start_time
metrics.record_conversion_latency(duration)
# View metrics
print ( f "Conversion success rate: { metrics.success_rate() :.2%} " )
print ( f "Average latency: { metrics.avg_latency() :.3f} s" )
Next Steps
Getting Started Ready to start processing energy data? Follow our step-by-step guide.
Or explore related components:
Qubit Adapters ensure that regardless of how your energy data arrives, it’s transformed into high-quality, standardized formats ready for analysis and optimization.