> ## Documentation Index
> Fetch the complete documentation index at: https://docs.qubit.energy/llms.txt
> Use this file to discover all available pages before exploring further.

# Getting Started

> Set up your first energy data connector in minutes

# Getting Started with Layer 1

Get your energy data flowing through the Qubit Foundation infrastructure in just a few steps. This guide walks you through connecting your first device and processing energy data.

## Prerequisites

<Info>
  This guide assumes you have basic familiarity with energy systems and Python programming. For production deployments, consult with your system administrator.
</Info>

<Steps>
  <Step title="System Requirements">
    * **Python 3.8+** or **Node.js 16+**
    * **Git** for cloning repositories
    * **Docker** (optional, for containerized deployment)
    * **Access to energy device** or historical data
  </Step>

  <Step title="Network Access">
    * MQTT broker connectivity (port 1883/8883)
    * OCPP WebSocket endpoints (port 80/443)
    * Modbus TCP connectivity (port 502)
    * Outbound HTTPS for schema validation
  </Step>
</Steps>

## Quick Start: 5-Minute Setup

### 1. Install Qubit Components

<Tabs>
  <Tab title="Python">
    ```bash theme={null}
    # Install schemas and connectors
    pip install qubit-energy-schemas qubit-energy-connectors[mqtt]

    # Clone examples
    git clone https://github.com/qubit-foundation/qubit-energy-schemas.git
    cd qubit-energy-schemas
    ```
  </Tab>

  <Tab title="Docker">
    ```bash theme={null}
    # Run pre-configured MQTT connector
    docker run -d \
      --name qubit-mqtt \
      -e MQTT_BROKER=mqtt://demo.qubit.energy:1883 \
      -e MQTT_TOPICS=demo/solar/+/+/readings \
      qubit/connectors:mqtt-latest
    ```
  </Tab>

  <Tab title="Node.js">
    ```bash theme={null}
    # Install Node.js packages
    npm install @qubit/energy-schemas @qubit/connectors

    # Clone examples
    git clone https://github.com/qubit-foundation/qubit-energy-schemas.git
    cd qubit-energy-schemas
    ```
  </Tab>
</Tabs>

### 2. Choose Your Data Source

<Tabs>
  <Tab title="Demo MQTT Data">
    Use our live demo data to get started immediately:

    ```python theme={null}
    from qubit.connectors.mqtt import MQTTConnector
    import asyncio

    # Demo configuration
    config = {
        "broker": "mqtt://demo.qubit.energy:1883",
        "topics": ["demo/solar/+/+/readings"],
        "client_id": "demo_client_001"
    }

    async def handle_demo_data(data):
        print(f"Demo data: {data}")
        return data

    async def main():
        connector = MQTTConnector(config)
        await connector.connect()
        await connector.subscribe("demo/solar/+/+/readings", handle_demo_data)
        await asyncio.sleep(60)  # Run for 1 minute
        await connector.disconnect()

    asyncio.run(main())
    ```
  </Tab>

  <Tab title="Your MQTT Broker">
    Connect to your existing MQTT infrastructure:

    ```python theme={null}
    config = {
        "broker": "mqtt://your-broker.example.com:1883",
        "topics": ["your/topic/pattern/+/readings"],
        "username": "your_username",
        "password": "your_password",
        "tls": {
            "ca_certs": "/path/to/ca.pem"  # If using TLS
        }
    }
    ```
  </Tab>

  <Tab title="Historical CSV Data">
    Start with historical data to understand the flow:

    ```python theme={null}
    from qubit.connectors.csv import CSVConnector

    config = {
        "file_path": "historical_generation.csv",
        "mapping": {
            "timestamp": "DateTime",
            "asset_id": "InverterID",
            "value": "Generation_kWh", 
            "unit": "kWh"
        },
        "timestamp_format": "%Y-%m-%d %H:%M:%S",
        "timezone": "America/Los_Angeles"
    }
    ```
  </Tab>
</Tabs>

### 3. Process and Validate Data

```python theme={null}
from qubit.adapters import AdapterPipeline
from qubit.adapters.units import UnitConverter
from qubit.adapters.timezone import TimezoneAdapter
from qubit.adapters.validation import SchemaValidator

# Create processing pipeline
pipeline = AdapterPipeline([
    UnitConverter(target_units={"power": "kW", "energy": "kWh"}),
    TimezoneAdapter(target_timezone="UTC"),
    SchemaValidator("timeseries", version="0.2")
])

# Process incoming data
@connector.on_message
async def process_energy_data(raw_message):
    try:
        # Run through adapter pipeline
        timeseries = pipeline.process(raw_message)
        
        print(f"✅ Processed: {timeseries.id}")
        print(f"   Asset: {timeseries.asset_id}")
        print(f"   Value: {timeseries.value} {timeseries.unit}")
        print(f"   Quality: {timeseries.quality}")
        
        # Ready for Layer 2 processing!
        return timeseries
        
    except Exception as e:
        print(f"❌ Processing failed: {e}")
        raise
```

## Complete Example: Solar Farm Monitoring

Let's build a complete solar farm monitoring system:

### 1. Project Structure

```
solar_monitoring/
├── config/
│   ├── production.yaml
│   └── development.yaml
├── src/
│   ├── main.py
│   ├── handlers.py
│   └── storage.py
├── requirements.txt
└── docker-compose.yml
```

### 2. Configuration

<CodeGroup>
  ```yaml config/production.yaml theme={null}
  # Solar farm monitoring configuration
  mqtt:
    broker: "mqtts://solar-farm-mqtt.example.com:8883"
    client_id: "solar_monitor_prod"
    topics:
      - "farm/inverter/+/telemetry"
      - "farm/weather/+/observations"
    qos: 1
    tls:
      ca_certs: "/certs/ca.pem"
      certfile: "/certs/client.crt" 
      keyfile: "/certs/client.key"
    auth:
      username: "${MQTT_USERNAME}"
      password: "${MQTT_PASSWORD}"

  adapters:
    units:
      power_target: "kW"
      energy_target: "kWh"
      temperature_target: "C"
    timezone:
      site_timezone: "America/Los_Angeles"
      target_timezone: "UTC"
    validation:
      schema_version: "0.2"
      quality_threshold: 0.8

  storage:
    timeseries_db: "postgresql://user:pass@db.example.com/energy"
    influxdb_url: "http://influx.example.com:8086"
  ```

  ```python requirements.txt theme={null}
  qubit-energy-schemas>=0.2.0
  qubit-energy-connectors[mqtt]>=1.0.0
  qubit-energy-adapters>=1.0.0
  asyncio
  pydantic
  influxdb-client
  psycopg2-binary
  ```
</CodeGroup>

### 3. Implementation

<CodeGroup>
  ```python src/main.py theme={null}
  import asyncio
  import yaml
  from qubit.connectors.mqtt import MQTTConnector
  from qubit.adapters import AdapterPipeline
  from handlers import SolarDataHandler
  from storage import TimeSeriesStore

  async def main():
      # Load configuration
      with open('config/production.yaml') as f:
          config = yaml.safe_load(f)
      
      # Set up components
      connector = MQTTConnector(config['mqtt'])
      adapter_pipeline = AdapterPipeline.from_config(config['adapters'])
      storage = TimeSeriesStore(config['storage'])
      handler = SolarDataHandler(adapter_pipeline, storage)
      
      # Connect and start processing
      await connector.connect()
      
      for topic in config['mqtt']['topics']:
          await connector.subscribe(topic, handler.process_message)
      
      print("🌞 Solar farm monitoring started")
      
      try:
          # Run indefinitely
          while True:
              await asyncio.sleep(1)
      except KeyboardInterrupt:
          print("👋 Shutting down...")
      finally:
          await connector.disconnect()
          await storage.close()

  if __name__ == "__main__":
      asyncio.run(main())
  ```

  ```python src/handlers.py theme={null}
  from qubit.adapters import AdapterPipeline
  from qubit.schemas import TimeSeries
  import logging

  logger = logging.getLogger(__name__)

  class SolarDataHandler:
      def __init__(self, adapter_pipeline, storage):
          self.pipeline = adapter_pipeline
          self.storage = storage
      
      async def process_message(self, topic: str, message: dict):
          """Process incoming MQTT message from solar equipment."""
          try:
              # Determine message type from topic
              topic_parts = topic.split('/')
              device_type = topic_parts[1]  # inverter, weather
              device_id = topic_parts[2]
              
              if device_type == "inverter":
                  return await self._process_inverter_data(device_id, message)
              elif device_type == "weather":
                  return await self._process_weather_data(device_id, message)
              
          except Exception as e:
              logger.error(f"Failed to process message from {topic}: {e}")
              raise
      
      async def _process_inverter_data(self, device_id: str, data: dict):
          """Process solar inverter telemetry."""
          # Run through adapter pipeline
          timeseries_list = self.pipeline.process_inverter_batch(data)
          
          # Store all metrics
          for ts in timeseries_list:
              await self.storage.insert(ts)
              logger.info(f"Stored {ts.metric}: {ts.value} {ts.unit}")
          
          return timeseries_list
      
      async def _process_weather_data(self, device_id: str, data: dict):
          """Process weather station data."""
          timeseries = self.pipeline.process_weather(data)
          await self.storage.insert(timeseries)
          
          return timeseries
  ```
</CodeGroup>

### 4. Run and Monitor

<CodeGroup>
  ```bash Development theme={null}
  # Set environment variables
  export MQTT_USERNAME=your_username
  export MQTT_PASSWORD=your_password

  # Run locally
  python src/main.py
  ```

  ```bash Production theme={null}
  # Use Docker Compose
  docker-compose up -d

  # Check logs
  docker-compose logs -f solar-monitor

  # Monitor metrics
  curl http://localhost:8080/metrics
  ```

  ```yaml docker-compose.yml theme={null}
  version: '3.8'

  services:
    solar-monitor:
      build: .
      environment:
        - CONFIG_FILE=config/production.yaml
        - MQTT_USERNAME=${MQTT_USERNAME}
        - MQTT_PASSWORD=${MQTT_PASSWORD}
      volumes:
        - ./config:/app/config
        - ./certs:/certs
      depends_on:
        - influxdb
        - postgres
      restart: unless-stopped
    
    influxdb:
      image: influxdb:2.0
      environment:
        - INFLUXDB_DB=energy
        - INFLUXDB_ADMIN_USER=admin
        - INFLUXDB_ADMIN_PASSWORD=password
      volumes:
        - influx_data:/var/lib/influxdb2
    
    postgres:
      image: postgres:14
      environment:
        - POSTGRES_DB=energy_metadata
        - POSTGRES_USER=energy
        - POSTGRES_PASSWORD=secure_password
      volumes:
        - postgres_data:/var/lib/postgresql/data

  volumes:
    influx_data:
    postgres_data:
  ```
</CodeGroup>

## Verification

Once your system is running, verify data flow:

### 1. Check Data Quality

```python theme={null}
# Query recent data
from storage import TimeSeriesStore

store = TimeSeriesStore(config)
recent_data = await store.query(
    asset_id="ast_inverter_001",
    metric="power_generation", 
    start_time="2024-01-15T14:00:00Z",
    end_time="2024-01-15T15:00:00Z"
)

# Analyze quality
quality_stats = {
    "good": sum(1 for ts in recent_data if ts.quality == "good"),
    "questionable": sum(1 for ts in recent_data if ts.quality == "questionable"),
    "poor": sum(1 for ts in recent_data if ts.quality == "poor")
}

print(f"Data quality distribution: {quality_stats}")
```

### 2. Monitor Performance

```python theme={null}
# Check connector health
health_status = await connector.health_check()
print(f"Connector status: {health_status}")

# View processing metrics
metrics = pipeline.get_metrics()
print(f"Messages processed: {metrics['total_processed']}")
print(f"Validation success rate: {metrics['validation_success_rate']:.2%}")
print(f"Average processing time: {metrics['avg_processing_time']:.3f}s")
```

## Common Integration Patterns

### Pattern 1: Real-time Dashboard

```python theme={null}
# Stream processed data to dashboard
@connector.on_message
async def stream_to_dashboard(timeseries):
    if timeseries.metric in ["power_generation", "energy_consumption"]:
        await dashboard_websocket.send(timeseries.json())
```

### Pattern 2: Data Lake Storage

```python theme={null}
# Batch write to data lake
batch_writer = BatchWriter(
    destination="s3://energy-data-lake/timeseries/",
    batch_size=1000,
    flush_interval=60  # seconds
)

@connector.on_message  
async def store_in_data_lake(timeseries):
    await batch_writer.add(timeseries)
```

### Pattern 3: Alert Generation

```python theme={null}
# Generate alerts for anomalies
from qubit.alerts import AlertManager

alert_manager = AlertManager()

@connector.on_message
async def check_for_alerts(timeseries):
    if timeseries.quality == "poor":
        await alert_manager.send_alert(
            level="warning",
            message=f"Poor data quality from {timeseries.asset_id}",
            metadata=timeseries.metadata
        )
```

## Troubleshooting

<AccordionGroup>
  <Accordion title="Connection Issues">
    ```python theme={null}
    # Test MQTT connection
    from qubit.connectors.mqtt import MQTTConnector

    connector = MQTTConnector(config)

    try:
        await connector.connect(timeout=10)
        print("✅ Connected successfully")
    except ConnectionError as e:
        print(f"❌ Connection failed: {e}")
        # Check firewall, credentials, broker URL
    ```
  </Accordion>

  <Accordion title="Schema Validation Errors">
    ```python theme={null}
    # Debug validation failures
    from qubit.adapters.validation import SchemaValidator

    validator = SchemaValidator("timeseries", version="0.2")

    result = validator.validate(data)
    if not result.is_valid:
        print("Validation errors:")
        for error in result.errors:
            print(f"  - {error.path}: {error.message}")
    ```
  </Accordion>

  <Accordion title="Unit Conversion Issues">
    ```python theme={null}
    # Debug unit conversions
    from qubit.adapters.units import UnitConverter

    try:
        converted = UnitConverter.convert(1500, "W", "kW")
        print(f"Converted: {converted} kW")
    except UnitConversionError as e:
        print(f"Conversion error: {e}")
        print(f"Supported units: {UnitConverter.supported_units()}")
    ```
  </Accordion>

  <Accordion title="Performance Issues">
    ```python theme={null}
    # Monitor processing performance
    import time

    @connector.on_message
    async def monitor_performance(data):
        start_time = time.time()
        
        result = await pipeline.process(data)
        
        processing_time = time.time() - start_time
        if processing_time > 0.1:  # >100ms
            logger.warning(f"Slow processing: {processing_time:.3f}s")
        
        return result
    ```
  </Accordion>
</AccordionGroup>

## Production Deployment

### High Availability Setup

<Tabs>
  <Tab title="Kubernetes">
    ```yaml theme={null}
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: qubit-energy-connector
    spec:
      replicas: 3
      selector:
        matchLabels:
          app: energy-connector
      template:
        spec:
          containers:
          - name: connector
            image: qubit/connectors:latest
            env:
            - name: CONFIG_FILE
              value: "/config/production.yaml"
            resources:
              requests:
                cpu: 100m
                memory: 128Mi
              limits:
                cpu: 500m
                memory: 512Mi
            livenessProbe:
              httpGet:
                path: /health
                port: 8080
              initialDelaySeconds: 30
              periodSeconds: 10
    ```
  </Tab>

  <Tab title="Docker Swarm">
    ```yaml theme={null}
    version: '3.8'
    services:
      energy-connector:
        image: qubit/connectors:latest
        deploy:
          replicas: 3
          update_config:
            parallelism: 1
            delay: 10s
          restart_policy:
            condition: on-failure
        environment:
          - MQTT_BROKER=mqtt://production-broker:1883
        volumes:
          - config:/app/config
        networks:
          - energy_network
    ```
  </Tab>
</Tabs>

### Monitoring and Alerting

```python theme={null}
# Comprehensive monitoring setup
from qubit.monitoring import PrometheusExporter, AlertManager

# Export metrics to Prometheus
prometheus = PrometheusExporter(port=8080)
prometheus.start()

# Set up alerts
alerts = AlertManager()

@connector.on_connect
async def on_connection():
    await alerts.send("info", "Connector started successfully")

@connector.on_disconnect  
async def on_disconnection():
    await alerts.send("warning", "Connector disconnected")

@pipeline.on_validation_failure
async def on_validation_failure(error, data):
    if error.severity == "critical":
        await alerts.send("critical", f"Schema validation failed: {error}")
```

## Next Steps

<CardGroup cols={2}>
  <Card title="Scale Your Deployment" icon="chart-line">
    Learn about production deployment patterns and scaling strategies
  </Card>

  <Card title="Layer 2: Predictions" href="/layer-2/overview">
    Add AI-powered forecasting to your energy data
  </Card>

  <Card title="Custom Connectors" icon="code">
    Build connectors for proprietary or emerging protocols
  </Card>

  <Card title="Advanced Monitoring" icon="gauge">
    Set up comprehensive observability and alerting
  </Card>
</CardGroup>

## Community and Support

<Steps>
  <Step title="Join the Community">
    Connect with other developers in our [GitHub Discussions](https://github.com/qubit-foundation/qubit-energy-schemas/discussions)
  </Step>

  <Step title="Report Issues">
    Found a bug? [Create an issue](https://github.com/qubit-foundation/qubit-energy-connectors/issues) on GitHub
  </Step>

  <Step title="Contribute">
    Help improve the connectors by [contributing code](https://github.com/qubit-foundation/qubit-energy-connectors/blob/main/CONTRIBUTING.md)
  </Step>
</Steps>

***

*You've now established the data foundation for your energy system. Layer 1 provides clean, standardized data ready for advanced processing, optimization, and coordination in the upper layers of the Qubit Foundation stack.*
