> ## Documentation Index
> Fetch the complete documentation index at: https://docs.qubit.energy/llms.txt
> Use this file to discover all available pages before exploring further.

# Adapters

> Data transformation, normalization, and validation for energy data

# Qubit Energy Adapters

Data transformation adapters that normalize, validate, and enrich energy data from diverse sources. Adapters bridge the gap between raw device data and the standardized Qubit Energy Schema format.

## Overview

Energy data arrives in countless formats with inconsistent units, timezones, and quality indicators. Qubit Adapters solve this by providing:

<CardGroup cols={2}>
  <Card title="Unit Conversion" icon="ruler">
    Automatic conversion between measurement systems (SI, Imperial, custom)
  </Card>

  <Card title="Timezone Handling" icon="clock">
    Intelligent timezone detection and UTC normalization
  </Card>

  <Card title="Data Validation" icon="check">
    Multi-layer validation with quality scoring
  </Card>

  <Card title="Enrichment" icon="sparkles">
    Add metadata, context, and calculated fields
  </Card>
</CardGroup>

## Core Adapters

### Unit Converter

Handles conversion between different measurement systems commonly used in energy applications.

<Tabs>
  <Tab title="Power Units">
    ```python theme={null}
    from qubit.adapters.units import PowerConverter

    # Convert various power units to standard kW
    power_kw = PowerConverter.to_kilowatts(5000, "W")        # 5.0 kW
    power_kw = PowerConverter.to_kilowatts(2.5, "MW")       # 2500.0 kW
    power_kw = PowerConverter.to_kilowatts(3.4, "hp")       # 2.54 kW

    # Automatic unit detection
    normalized = PowerConverter.normalize("1500 Watts")      # (1.5, "kW")
    ```
  </Tab>

  <Tab title="Energy Units">
    ```python theme={null}
    from qubit.adapters.units import EnergyConverter

    # Convert to standard kWh
    energy_kwh = EnergyConverter.to_kilowatt_hours(5000, "Wh")     # 5.0 kWh
    energy_kwh = EnergyConverter.to_kilowatt_hours(2.5, "MWh")     # 2500.0 kWh
    energy_kwh = EnergyConverter.to_kilowatt_hours(1.2, "GJ")      # 333.33 kWh

    # Complex conversions with time periods
    energy_kwh = EnergyConverter.from_rate(150, "kW", "30min")     # 75.0 kWh
    ```
  </Tab>

  <Tab title="Temperature">
    ```python theme={null}
    from qubit.adapters.units import TemperatureConverter

    # Convert to Celsius
    temp_c = TemperatureConverter.to_celsius(77, "F")        # 25.0°C
    temp_c = TemperatureConverter.to_celsius(298.15, "K")    # 25.0°C

    # With precision control
    temp_c = TemperatureConverter.to_celsius(
        98.6, "F", precision=1
    )  # 37.0°C
    ```
  </Tab>

  <Tab title="Custom Units">
    ```python theme={null}
    from qubit.adapters.units import UnitConverter

    # Register custom unit conversion
    UnitConverter.register_conversion(
        "BTU", "kWh", lambda x: x * 0.000293071
    )

    # Use custom conversion
    energy_kwh = UnitConverter.convert(10000, "BTU", "kWh")  # 2.93 kWh
    ```
  </Tab>
</Tabs>

### Timezone Adapter

Handles the complexity of timezone conversion and DST transitions for global energy operations.

```python theme={null}
from qubit.adapters.timezone import TimezoneAdapter
import datetime

# Initialize adapter
tz_adapter = TimezoneAdapter()

# Convert local time to UTC
local_time = datetime.datetime(2024, 7, 15, 14, 30)  # 2:30 PM
utc_time = tz_adapter.to_utc(
    local_time, 
    timezone="America/Los_Angeles"
)  # 2024-07-15T21:30:00Z

# Handle DST transitions automatically
winter_time = datetime.datetime(2024, 1, 15, 14, 30)
utc_winter = tz_adapter.to_utc(
    winter_time, 
    timezone="America/Los_Angeles"
)  # 2024-01-15T22:30:00Z (PST)

# Bulk timezone conversion
timestamps = [
    "2024-01-15 14:30:00",
    "2024-07-15 14:30:00"
]
utc_timestamps = tz_adapter.batch_convert(
    timestamps,
    source_timezone="America/Los_Angeles",
    source_format="%Y-%m-%d %H:%M:%S"
)
```

### Data Validation

Multi-layer validation ensures data quality and schema compliance:

<Steps>
  <Step title="Schema Validation">
    Strict JSON Schema validation against Qubit Energy Schemas

    ```python theme={null}
    from qubit.adapters.validation import SchemaValidator

    validator = SchemaValidator("timeseries", version="0.2")

    result = validator.validate({
        "id": "ts_solar_001",
        "asset_id": "ast_inverter_001",
        "metric": "energy_generation",
        "value": 2500.0,
        "unit": "kWh",
        "timestamp": "2024-01-15T14:30:00Z"
    })

    if result.is_valid:
        print("✅ Data is valid")
    else:
        print(f"❌ Validation errors: {result.errors}")
    ```
  </Step>

  <Step title="Range Validation">
    Check that values fall within expected ranges for the metric type

    ```python theme={null}
    from qubit.adapters.validation import RangeValidator

    # Define reasonable ranges for metrics
    ranges = {
        "energy_generation": {"min": 0, "max": 10000},  # kWh
        "ambient_temperature": {"min": -40, "max": 60}   # °C
    }

    validator = RangeValidator(ranges)
    result = validator.validate("energy_generation", 2500.0)
    ```
  </Step>

  <Step title="Quality Scoring">
    Assign confidence scores based on data characteristics

    ```python theme={null}
    from qubit.adapters.validation import QualityScorer

    scorer = QualityScorer()

    quality = scorer.score_timeseries({
        "value": 2500.0,
        "timestamp": "2024-01-15T14:30:00Z",
        "source_reliability": 0.95,
        "measurement_uncertainty": 0.02
    })

    # Returns: {"score": 0.92, "category": "good"}
    ```
  </Step>
</Steps>

## Advanced Features

### Data Enrichment

Automatically add calculated fields and contextual metadata:

```python theme={null}
from qubit.adapters.enrichment import DataEnricher

enricher = DataEnricher()

# Add calculated fields
@enricher.calculator("solar_efficiency")
def calculate_efficiency(data):
    if "irradiance" in data.metadata and data.metric == "energy_generation":
        panel_area = data.asset.specifications.get("panel_area_m2", 1000)
        irradiance = data.metadata["irradiance_wm2"]
        
        theoretical_max = (irradiance * panel_area) / 1000  # kW
        efficiency = data.value / theoretical_max if theoretical_max > 0 else 0
        
        return {"solar_efficiency": round(efficiency, 3)}
    
    return {}

# Add weather context
@enricher.context_provider("weather")
async def add_weather_context(data):
    if hasattr(data, 'location'):
        weather = await weather_api.get_current(
            data.location.latitude,
            data.location.longitude
        )
        return {
            "weather_condition": weather.condition,
            "cloud_cover_percent": weather.cloud_cover
        }
    return {}
```

### Anomaly Detection

Built-in anomaly detection for data quality assurance:

```python theme={null}
from qubit.adapters.validation import AnomalyDetector

detector = AnomalyDetector(
    metric="energy_generation",
    model="isolation_forest"
)

# Train on historical data
training_data = load_historical_generation_data()
detector.fit(training_data)

# Detect anomalies in real-time
@connector.on_message
async def detect_anomalies(data):
    anomaly_score = detector.predict(data.value)
    
    if anomaly_score > 0.8:  # High anomaly score
        data.metadata["anomaly_detected"] = True
        data.metadata["anomaly_score"] = anomaly_score
        data.quality = "questionable"
    
    return data
```

## Configuration Management

### Environment-based Configuration

<CodeGroup>
  ```yaml Production theme={null}
  # config/production.yaml
  adapters:
    units:
      strict_validation: true
      decimal_precision: 6
    timezone:
      default_timezone: "UTC"
      validate_transitions: true
    validation:
      schema_version: "0.2"
      quality_threshold: 0.8
      enable_anomaly_detection: true

  logging:
    level: "INFO"
    structured: true
  ```

  ```yaml Development theme={null}
  # config/development.yaml
  adapters:
    units:
      strict_validation: false
      decimal_precision: 3
    timezone:
      default_timezone: "America/Los_Angeles"
      validate_transitions: false
    validation:
      schema_version: "0.2"
      quality_threshold: 0.5
      enable_anomaly_detection: false

  logging:
    level: "DEBUG"
    structured: false
  ```
</CodeGroup>

### Dynamic Configuration

```python theme={null}
from qubit.adapters.config import ConfigManager

# Load configuration with environment overrides
config = ConfigManager.load("production.yaml")

# Runtime configuration updates
config.update("adapters.validation.quality_threshold", 0.9)

# Environment variable overrides
# QUBIT_ADAPTERS_UNITS_STRICT_VALIDATION=true
```

## Performance Optimization

<Tip>
  For high-throughput applications, use batch processing and connection pooling to maximize performance.
</Tip>

### Batch Processing

```python theme={null}
from qubit.adapters import BatchProcessor

processor = BatchProcessor(
    batch_size=1000,
    max_wait_time=1.0  # seconds
)

@processor.batch_handler
async def process_batch(data_batch):
    # Process 1000 records at once
    validated = validator.validate_batch(data_batch)
    normalized = unit_converter.convert_batch(validated)
    
    # Bulk insert to database
    await database.insert_many(normalized)

# Messages automatically batched
await processor.add(timeseries_data)
```

### Caching

```python theme={null}
from qubit.adapters.cache import AdapterCache

# Cache expensive operations
cache = AdapterCache(ttl=300)  # 5-minute TTL

@cache.memoize
def expensive_validation(schema_name, data_hash):
    return validator.validate(data)

@cache.memoize  
def timezone_lookup(timezone_name):
    return pytz.timezone(timezone_name)
```

## Error Handling

Robust error handling ensures data integrity:

<AccordionGroup>
  <Accordion title="Graceful Degradation">
    ```python theme={null}
    try:
        normalized_data = adapter.process(raw_data)
    except UnitConversionError as e:
        # Log error but continue with original units
        logger.warning(f"Unit conversion failed: {e}")
        normalized_data = raw_data
        normalized_data.quality = "questionable"
    except ValidationError as e:
        # Critical error - cannot process
        logger.error(f"Schema validation failed: {e}")
        raise
    ```
  </Accordion>

  <Accordion title="Retry Logic">
    ```python theme={null}
    from qubit.adapters.retry import RetryAdapter

    @RetryAdapter(
        max_attempts=3,
        backoff_factor=2.0,
        exceptions=[ValidationError, UnitConversionError]
    )
    async def process_with_retry(data):
        return adapter.process(data)
    ```
  </Accordion>

  <Accordion title="Dead Letter Queues">
    ```python theme={null}
    from qubit.adapters.queue import DeadLetterQueue

    dlq = DeadLetterQueue("failed_validations")

    try:
        result = adapter.process(data)
    except Exception as e:
        # Send to DLQ for manual review
        await dlq.send(data, error=str(e))
        raise
    ```
  </Accordion>
</AccordionGroup>

## Testing

<Warning>
  Always test adapters with representative data from your actual devices before deploying to production.
</Warning>

### Unit Testing

```python theme={null}
import pytest
from qubit.adapters.units import PowerConverter

def test_power_conversion():
    # Test standard conversions
    assert PowerConverter.to_kilowatts(1000, "W") == 1.0
    assert PowerConverter.to_kilowatts(2.5, "MW") == 2500.0
    
    # Test edge cases
    assert PowerConverter.to_kilowatts(0, "W") == 0.0
    
    # Test error handling
    with pytest.raises(UnitConversionError):
        PowerConverter.to_kilowatts(100, "invalid_unit")

def test_timezone_conversion():
    from qubit.adapters.timezone import TimezoneAdapter
    import datetime
    
    adapter = TimezoneAdapter()
    
    # Test DST transition
    local_time = datetime.datetime(2024, 3, 10, 2, 30)  # Spring forward
    utc_time = adapter.to_utc(local_time, "America/Los_Angeles")
    
    assert utc_time.hour == 10  # 2:30 AM PST -> 10:30 UTC
```

### Integration Testing

```python theme={null}
from qubit.connectors.mqtt import MQTTConnector
from qubit.adapters import AdapterPipeline

@pytest.mark.integration
async def test_mqtt_to_schema_pipeline():
    # Set up test MQTT broker
    test_broker = await setup_test_broker()
    
    # Configure connector with adapters
    connector = MQTTConnector({
        "broker": test_broker.url,
        "topics": ["test/device/+/readings"]
    })
    
    pipeline = AdapterPipeline([
        ("units", UnitConverter()),
        ("timezone", TimezoneAdapter()),
        ("validation", SchemaValidator("timeseries"))
    ])
    
    # Send test message
    await test_broker.publish(
        "test/device/001/readings",
        {"power": 1500, "unit": "W", "timestamp": "2024-01-15 14:30:00"}
    )
    
    # Verify normalized output
    message = await connector.receive()
    result = pipeline.process(message)
    
    assert result.value == 1.5  # Converted to kW
    assert result.unit == "kW"
    assert result.timestamp.endswith("Z")  # UTC format
```

## Best Practices

### Performance Guidelines

<CardGroup cols={2}>
  <Card title="Batch Operations">
    Process data in batches of 100-1000 records for optimal throughput
  </Card>

  <Card title="Connection Pooling">
    Reuse database connections and validator instances
  </Card>

  <Card title="Async Processing">
    Use async/await for I/O bound operations
  </Card>

  <Card title="Memory Management">
    Stream large datasets instead of loading into memory
  </Card>
</CardGroup>

### Data Quality

<Steps>
  <Step title="Validate Early">
    Perform validation as close to the data source as possible
  </Step>

  <Step title="Preserve Originals">
    Keep original values in metadata for debugging and auditing
  </Step>

  <Step title="Quality Scoring">
    Use quality indicators to flag questionable data for review
  </Step>

  <Step title="Monitor Trends">
    Track validation failure rates and common error patterns
  </Step>
</Steps>

## Real-World Examples

### Solar Inverter Data

```python theme={null}
# Raw inverter data (various units and timezone)
raw_data = {
    "inverter_id": "INV_001",
    "power_output": 1500,  # Watts
    "energy_today": 45.2,  # kWh
    "timestamp": "2024-01-15 14:30:00",  # Local time
    "timezone": "America/Los_Angeles",
    "temperature": 77.5,  # Fahrenheit
    "efficiency": 96.2     # Percent
}

# Process through adapters
adapter_pipeline = AdapterPipeline([
    PowerConverter(target_unit="kW"),
    TimezoneConverter(target_timezone="UTC"),
    TemperatureConverter(target_unit="C"),
    SchemaValidator("timeseries")
])

result = adapter_pipeline.process(raw_data)

# Normalized output
{
    "id": "ts_inv_001_2024_01_15_22_30",
    "asset_id": "ast_inverter_001", 
    "metric": "power_generation",
    "value": 1.5,
    "unit": "kW",
    "timestamp": "2024-01-15T22:30:00Z",  # UTC
    "quality": "good",
    "metadata": {
        "original_value": 1500,
        "original_unit": "W", 
        "original_timestamp": "2024-01-15 14:30:00 PST",
        "ambient_temperature_c": 25.3,
        "inverter_efficiency": 0.962
    }
}
```

### EV Charging Data

```python theme={null}
# OCPP message processing
ocpp_message = {
    "messageType": "MeterValues",
    "chargePointId": "CP_001",
    "meterValue": {
        "timestamp": "2024-01-15T14:30:00.000Z",
        "sampledValue": {
            "value": "15000",  # String format
            "unit": "W",       # Watts
            "measurand": "Power.Active.Import"
        }
    }
}

# Process through OCPP adapter
ocpp_adapter = OCPPAdapter()
timeseries = ocpp_adapter.process(ocpp_message)

# Output
{
    "id": "ts_cp_001_power_2024_01_15_14_30",
    "asset_id": "ast_charger_cp_001",
    "metric": "charging_power", 
    "value": 15.0,
    "unit": "kW",
    "timestamp": "2024-01-15T14:30:00Z",
    "quality": "good",
    "metadata": {
        "ocpp_measurand": "Power.Active.Import",
        "charging_session": "active"
    }
}
```

## Monitoring Adapters

Track adapter performance and data quality:

```python theme={null}
from qubit.adapters.monitoring import AdapterMetrics

metrics = AdapterMetrics()

class MonitoredUnitConverter(UnitConverter):
    def convert(self, value, from_unit, to_unit):
        start_time = time.time()
        
        try:
            result = super().convert(value, from_unit, to_unit)
            metrics.record_conversion_success(from_unit, to_unit)
            return result
        except Exception as e:
            metrics.record_conversion_failure(from_unit, to_unit, str(e))
            raise
        finally:
            duration = time.time() - start_time
            metrics.record_conversion_latency(duration)

# View metrics
print(f"Conversion success rate: {metrics.success_rate():.2%}")
print(f"Average latency: {metrics.avg_latency():.3f}s")
```

## Next Steps

<Card title="Getting Started" icon="play" href="/layer-1/getting-started">
  Ready to start processing energy data? Follow our step-by-step guide.
</Card>

Or explore related components:

<CardGroup cols={2}>
  <Card title="Schemas" href="/layer-1/schemas">
    Understand the data models your adapters will produce
  </Card>

  <Card title="Connectors" href="/layer-1/connectors">
    Learn how connectors and adapters work together
  </Card>
</CardGroup>

***

*Qubit Adapters ensure that regardless of how your energy data arrives, it's transformed into high-quality, standardized formats ready for analysis and optimization.*
