Source code for e3dc._e3dc

#!/usr/bin/env python
# Python class to connect to an E3/DC system.
#
# Copyright 2017 Francesco Santini <francesco.santini@gmail.com>
# Licensed under a MIT license. See LICENSE for details
import datetime
import hashlib
import struct
import time
import uuid
from calendar import monthrange
from typing import Any, Literal

from ._e3dc_rscp_local import (
    E3DC_RSCP_local,
    RSCPAuthenticationError,
    RSCPKeyError,
    RSCPNotAvailableError,
)
from ._e3dc_rscp_web import E3DC_RSCP_web
from ._rscpLib import rscpFindTag, rscpFindTagIndex
from ._rscpTags import RscpTag, RscpType, getStrPowermeterType, getStrPviType

REMOTE_ADDRESS = "https://s10.e3dc.com/s10/phpcmd/cmd.php"
REQUEST_INTERVAL_SEC = 10  # minimum interval between requests
REQUEST_INTERVAL_SEC_LOCAL = 1  # minimum interval between requests


[docs] class AuthenticationError(Exception): """Class for Authentication Error Exception.""" pass
[docs] class NotAvailableError(Exception): """Class for Not Available Error Exception.""" pass
[docs] class PollError(Exception): """Class for Poll Error Exception.""" pass
[docs] class SendError(Exception): """Class for Send Error Exception.""" pass
[docs] class E3DC: """A class describing an E3DC system.""" CONNECT_LOCAL = 1 CONNECT_WEB = 2 _IDLE_TYPE = {"idleCharge": 0, "idleDischarge": 1} def __init__(self, connectType: int, **kwargs: Any) -> None: """Constructor of an E3DC object. Args: connectType: can be one of the following E3DC.CONNECT_LOCAL use local rscp connection E3DC.CONNECT_WEB use web connection **kwargs: Arbitrary keyword argument Keyword Args: username (str): username password (str): password (plain text) ipAddress (str): IP address of the E3DC system - required for CONNECT_LOCAL key (str): encryption key as set in the E3DC settings - required for CONNECT_LOCAL serialNumber (str): the serial number of the system to monitor - required for CONNECT_WEB isPasswordMd5 (bool): indicates whether the password is already md5 digest (recommended, default = True) - required for CONNECT_WEB configuration (dict | None): dict containing details of the E3DC configuration. {"pvis": [{"index": 0, "strings": 2, "phases": 3}], "powermeters": [{"index": 0}], "batteries": [{"index": 0, "dcbs": 1}]} port (int, optional): port number for local connection. Defaults to None, which means default port 5033 is used. """ self.connectType = connectType self.username = kwargs["username"] self.serialNumber = None self.serialNumberPrefix = None self.jar = None self.guid = "GUID-" + str(uuid.uuid1()) self.lastRequestTime = -1 self.lastRequest = None self.connected = False # static values self.deratePercent = None self.deratePower = None self.installedPeakPower = None self.installedBatteryCapacity = None self.externalSourceAvailable = None self.macAddress = None self.model = None self.maxAcPower = None self.maxBatChargePower = None self.maxBatDischargePower = None self.startDischargeDefault = None self.powermeters: list[dict[str, Any]] = [] self.pvis: list[dict[str, Any]] = [] self.batteries: list[dict[str, Any]] = [] self.pmIndexExt = None if "configuration" in kwargs: configuration = kwargs["configuration"] if "pvis" in configuration and isinstance(configuration["pvis"], list): self.pvis = configuration["pvis"] if "powermeters" in configuration and isinstance( configuration["powermeters"], list ): self.powermeters = configuration["powermeters"] if "batteries" in configuration and isinstance( configuration["batteries"], list ): self.batteries = configuration["batteries"] if connectType == self.CONNECT_LOCAL: self.ip = kwargs["ipAddress"] self.key = kwargs["key"] self.password = kwargs["password"] self.port = kwargs.get("port", None) self.rscp = E3DC_RSCP_local( self.username, self.password, self.ip, self.key, self.port ) else: self._set_serial(kwargs["serialNumber"]) if "isPasswordMd5" in kwargs and not kwargs["isPasswordMd5"]: self.password = kwargs["password"] else: self.password = hashlib.md5( kwargs["password"].encode("utf-8") ).hexdigest() self.rscp = E3DC_RSCP_web( self.username, self.password, "{}{}".format(self.serialNumberPrefix, self.serialNumber), ) self.get_system_info_static(keepAlive=True) def _set_serial(self, serial: str): self.batteries = self.batteries or [{"index": 0}] self.pmIndexExt = 1 if serial[0].isdigit(): self.serialNumber = serial else: self.serialNumber = serial[4:] self.serialNumberPrefix = serial[:4] if self.serialNumber.startswith("4") or self.serialNumber.startswith("72"): self.model = "S10E" self.powermeters = self.powermeters or [{"index": 0}] self.pvis = self.pvis or [{"index": 0}] if not self.serialNumberPrefix: self.serialNumberPrefix = "S10-" elif self.serialNumber.startswith("74"): self.model = "S10E_Compact" self.powermeters = self.powermeters or [{"index": 0}] self.pvis = self.pvis or [{"index": 0}] if not self.serialNumberPrefix: self.serialNumberPrefix = "S10-" elif self.serialNumber.startswith("5"): self.model = "S10_Mini" self.powermeters = self.powermeters or [{"index": 6}] self.pvis = self.pvis or [{"index": 0, "phases": 1}] if not self.serialNumberPrefix: self.serialNumberPrefix = "S10-" elif self.serialNumber.startswith("6"): self.model = "Quattroporte" self.powermeters = self.powermeters or [{"index": 6}] self.pvis = self.pvis or [{"index": 0}] if not self.serialNumberPrefix: self.serialNumberPrefix = "Q10-" elif self.serialNumber.startswith("70"): self.model = "S10E_Pro" self.powermeters = self.powermeters or [{"index": 0}] self.pvis = self.pvis or [{"index": 0}] if not self.serialNumberPrefix: self.serialNumberPrefix = "P10-" elif self.serialNumber.startswith("75"): self.model = "S10E_Pro_Compact" self.powermeters = self.powermeters or [{"index": 0}] self.pvis = self.pvis or [{"index": 0}] if not self.serialNumberPrefix: self.serialNumberPrefix = "P10-" elif self.serialNumber.startswith("8"): self.model = "S10X" self.powermeters = self.powermeters or [{"index": 0}] self.pvis = self.pvis or [{"index": 0}] if not self.serialNumberPrefix: self.serialNumberPrefix = "H20-" else: self.model = "NA" self.powermeters = self.powermeters or [{"index": 0}] self.pvis = self.pvis or [{"index": 0}]
[docs] def sendRequest( self, request: tuple[str | int | RscpTag, str | int | RscpType, Any], retries: int = 3, keepAlive: bool = False, ) -> tuple[str | int | RscpTag, str | int | RscpType, Any]: """This function uses the RSCP interface to make a request. Does make retries in case of exceptions like Socket.Error Args: request: the request to send retries (int): number of retries. Defaults to 3. keepAlive (bool): True to keep connection alive. Defaults to False. Returns: An object with the received data Raises: e3dc.AuthenticationError: login error e3dc.SendError: if retries are reached """ retry = 0 while True: try: if not self.rscp.isConnected(): self.rscp.connect() result = self.rscp.sendRequest(request) break except RSCPAuthenticationError: raise AuthenticationError() except RSCPNotAvailableError: raise NotAvailableError() except RSCPKeyError: raise except Exception: retry += 1 if retry > retries: raise SendError("Max retries reached") if not keepAlive: self.rscp.disconnect() return result
[docs] def sendRequestTag( self, tag: str | int | RscpTag, retries: int = 3, keepAlive: bool = False ): """This function uses the RSCP interface to make a request for a single tag. Does make retries in case of exceptions like Socket.Error Args: tag (str): the request to send retries (int): number of retries. Defaults to 3. keepAlive (bool): True to keep connection alive. Defaults to False. Returns: An object with the received data Raises: e3dc.AuthenticationError: login error e3dc.SendError: if retries are reached """ return self.sendRequest( (tag, RscpType.NoneType, None), retries=retries, keepAlive=keepAlive )[2]
[docs] def disconnect(self): """This function does disconnect the connection.""" self.rscp.disconnect()
[docs] def poll(self, keepAlive: bool = False): """Polls via rscp protocol. Args: keepAlive (bool): True to keep connection alive. Defaults to False. Returns: dict: Dictionary containing the condensed status information structured as follows:: { "autarky": <autarky in %>, "consumption": { "battery": <power entering battery (positive: charging, negative: discharging)>, "house": <house consumption>, "wallbox": <wallbox consumption> } "production": { "solar" : <production from solar in W>, "add" : <additional external power in W>, "grid" : <absorption from grid in W> } "stateOfCharge": <battery charge status in %>, "selfConsumption": <self consumed power in %>, "time": <datetime object containing the timestamp> } """ if ( self.lastRequest is not None and (time.time() - self.lastRequestTime) < REQUEST_INTERVAL_SEC_LOCAL ): return self.lastRequest ts = self.sendRequestTag(RscpTag.INFO_REQ_UTC_TIME, keepAlive=True) soc = self.sendRequestTag(RscpTag.EMS_REQ_BAT_SOC, keepAlive=True) solar = self.sendRequestTag(RscpTag.EMS_REQ_POWER_PV, keepAlive=True) add = self.sendRequestTag(RscpTag.EMS_REQ_POWER_ADD, keepAlive=True) bat = self.sendRequestTag(RscpTag.EMS_REQ_POWER_BAT, keepAlive=True) home = self.sendRequestTag(RscpTag.EMS_REQ_POWER_HOME, keepAlive=True) grid = self.sendRequestTag(RscpTag.EMS_REQ_POWER_GRID, keepAlive=True) wb = self.sendRequestTag(RscpTag.EMS_REQ_POWER_WB_ALL, keepAlive=True) sc = self.sendRequestTag(RscpTag.EMS_REQ_SELF_CONSUMPTION, keepAlive=True) # last call, use keepAlive value autarky = self.sendRequestTag(RscpTag.EMS_REQ_AUTARKY, keepAlive=keepAlive) outObj = { "autarky": autarky, "consumption": {"battery": bat, "house": home, "wallbox": wb}, "production": {"solar": solar, "add": -add, "grid": grid}, "selfConsumption": sc, "stateOfCharge": soc, "time": datetime.datetime.fromtimestamp(ts, tz=datetime.timezone.utc), } self.lastRequest = outObj self.lastRequestTime = time.time() return outObj
[docs] def poll_switches(self, keepAlive: bool = False): """This function uses the RSCP interface to poll the switch status. Args: keepAlive (bool): True to keep connection alive. Defaults to False. Returns: list[dict]: list of the switches:: [ { "id": <id>, "type": <type>, "name": <name>, "status": <status> } ] """ if not self.rscp.isConnected(): self.rscp.connect() switchDesc = self.sendRequest( (RscpTag.HA_REQ_DATAPOINT_LIST, RscpType.NoneType, None), keepAlive=True ) switchStatus = self.sendRequest( (RscpTag.HA_REQ_ACTUATOR_STATES, RscpType.NoneType, None), keepAlive=keepAlive, ) descList = switchDesc[2] # get the payload of the container statusList = switchStatus[2] switchList: list[dict[str, Any]] = [] for switch in range(len(descList)): switchID = rscpFindTagIndex(descList[switch], RscpTag.HA_DATAPOINT_INDEX) switchType = rscpFindTagIndex(descList[switch], RscpTag.HA_DATAPOINT_TYPE) switchName = rscpFindTagIndex(descList[switch], RscpTag.HA_DATAPOINT_NAME) switchStatus = rscpFindTagIndex( statusList[switch], RscpTag.HA_DATAPOINT_STATE ) switchList.append( { "id": switchID, "type": switchType, "name": switchName, "status": switchStatus, } ) return switchList
[docs] def set_switch_onoff( self, switchID: int, value: Literal["on", "off"], keepAlive: bool = False ): """This function uses the RSCP interface to turn a switch on or off. Args: switchID (int): id of the switch value (str): value keepAlive (bool): True to keep connection alive. Defaults to False. Returns: True/False """ result = self.sendRequest( ( RscpTag.HA_REQ_COMMAND_ACTUATOR, RscpType.Container, [ (RscpTag.HA_DATAPOINT_INDEX, RscpType.Uint16, switchID), (RscpTag.HA_REQ_COMMAND, RscpType.CString, value), ], ), keepAlive=keepAlive, ) if result[0] == RscpTag.HA_COMMAND_ACTUATOR and result[2]: return True else: return False # operation did not succeed
[docs] def get_idle_periods(self, keepAlive: bool = False): """Poll via rscp protocol to get idle periods. Args: keepAlive (bool): True to keep connection alive. Defaults to False. Returns: dict: Dictionary containing the idle periods structured as follows:: { "idleCharge": [ { "day": <the week day from 0 to 6>, "start": ( <hour from 0 to 23>, <minute from 0 to 59> ), "end": ( <hour from 0 to 23>, <minute from 0 to 59> ), "active": <boolean of state> } ], "idleDischarge": [ { "day": <the week day from 0 to 6>, "start": ( <hour from 0 to 23>, <minute from 0 to 59> ), "end": ( <hour from 0 to 23>, <minute from 0 to 59> ), "active": <boolean of state> } ] } """ idlePeriodsRaw = self.sendRequest( (RscpTag.EMS_REQ_GET_IDLE_PERIODS, RscpType.NoneType, None), keepAlive=keepAlive, ) raw = idlePeriodsRaw[0] if isinstance(raw, RscpTag): tag = raw elif isinstance(raw, str): try: tag = RscpTag[raw] except KeyError: return None else: return None if tag != RscpTag.EMS_GET_IDLE_PERIODS: return None idlePeriods: dict[str, list[dict[str, Any]]] = { "idleCharge": [] * 7, "idleDischarge": [] * 7, } # initialize for period in idlePeriodsRaw[2]: active: bool = rscpFindTagIndex(period, RscpTag.EMS_IDLE_PERIOD_ACTIVE) typ = rscpFindTagIndex(period, RscpTag.EMS_IDLE_PERIOD_TYPE) day: int = rscpFindTagIndex(period, RscpTag.EMS_IDLE_PERIOD_DAY) start = rscpFindTag(period, RscpTag.EMS_IDLE_PERIOD_START) startHour: int = rscpFindTagIndex(start, RscpTag.EMS_IDLE_PERIOD_HOUR) startMin: int = rscpFindTagIndex(start, RscpTag.EMS_IDLE_PERIOD_MINUTE) end = rscpFindTag(period, RscpTag.EMS_IDLE_PERIOD_END) endHour: int = rscpFindTagIndex(end, RscpTag.EMS_IDLE_PERIOD_HOUR) endMin: int = rscpFindTagIndex(end, RscpTag.EMS_IDLE_PERIOD_MINUTE) periodObj = { "day": day, "start": (startHour, startMin), "end": (endHour, endMin), "active": active, } if typ == self._IDLE_TYPE["idleCharge"]: idlePeriods["idleCharge"][day] = periodObj else: idlePeriods["idleDischarge"][day] = periodObj return idlePeriods
[docs] def set_idle_periods( self, idlePeriods: dict[str, list[dict[str, Any]]], keepAlive: bool = False ): """Set idle periods via rscp protocol. Args: idlePeriods (dict): Dictionary containing one or many idle periods:: { "idleCharge": [ { "day": <the week day from 0 to 6>, "start": ( <hour from 0 to 23>, <minute from 0 to 59> ), "end": ( <hour from 0 to 23>, <minute from 0 to 59> ), "active": <boolean of state> } ], "idleDischarge": [ { "day": <the week day from 0 to 6>, "start": ( <hour from 0 to 23>, <minute from 0 to 59> ), "end": ( <hour from 0 to 23>, <minute from 0 to 59> ), "active": <boolean of state> } ] } keepAlive (bool): True to keep connection alive. Defaults to False. Returns: True if success False if error """ periodList: list[tuple[RscpTag, RscpType, Any]] = [] if "idleCharge" not in idlePeriods and "idleDischarge" not in idlePeriods: raise ValueError("neither key idleCharge nor idleDischarge in object") for idle_type in ["idleCharge", "idleDischarge"]: if idle_type in idlePeriods: for idlePeriod in idlePeriods[idle_type]: if "day" not in idlePeriod: raise ValueError("day key in " + idle_type + " missing") elif isinstance(idlePeriod["day"], bool): raise TypeError("day in " + idle_type + " not a bool") elif not (0 <= idlePeriod["day"] <= 6): raise ValueError("day in " + idle_type + " out of range") if idlePeriod.keys() & ["active", "start", "end"]: if "active" in idlePeriod: if isinstance(idlePeriod["active"], bool): idlePeriod["active"] = idlePeriod["active"] else: raise TypeError( "period " + str(idlePeriod["day"]) + " in " + idle_type + " not a bool" ) for key in ["start", "end"]: if key in idlePeriod: if ( isinstance(idlePeriod[key], list) and len(idlePeriod[key]) == 2 ): for i in range(2): if isinstance(idlePeriod[key][i], int): if idlePeriod[key][i] >= 0 and ( (i == 0 and idlePeriod[key][i] < 24) or (i == 1 and idlePeriod[key][i] < 60) ): idlePeriod[key][i] = idlePeriod[key][i] else: raise ValueError( key in " period " + str(idlePeriod["day"]) + " in " + idle_type + " is not between 00:00 and 23:59" ) if (idlePeriod["start"][0] * 60 + idlePeriod["start"][1]) < ( idlePeriod["end"][0] * 60 + idlePeriod["end"][1] ): periodList.append( ( RscpTag.EMS_IDLE_PERIOD, RscpType.Container, [ ( RscpTag.EMS_IDLE_PERIOD_TYPE, RscpType.UChar8, self._IDLE_TYPE[idle_type], ), ( RscpTag.EMS_IDLE_PERIOD_DAY, RscpType.UChar8, idlePeriod["day"], ), ( RscpTag.EMS_IDLE_PERIOD_ACTIVE, RscpType.Bool, idlePeriod["active"], ), ( RscpTag.EMS_IDLE_PERIOD_START, RscpType.Container, [ ( RscpTag.EMS_IDLE_PERIOD_HOUR, RscpType.UChar8, idlePeriod["start"][0], ), ( RscpTag.EMS_IDLE_PERIOD_MINUTE, RscpType.UChar8, idlePeriod["start"][1], ), ], ), ( RscpTag.EMS_IDLE_PERIOD_END, RscpType.Container, [ ( RscpTag.EMS_IDLE_PERIOD_HOUR, RscpType.UChar8, idlePeriod["end"][0], ), ( RscpTag.EMS_IDLE_PERIOD_MINUTE, RscpType.UChar8, idlePeriod["end"][1], ), ], ), ], ) ) else: raise ValueError( "end time is smaller than start time in period " + str(idlePeriod["day"]) + " in " + idle_type + " is not between 00:00 and 23:59" ) else: raise TypeError("period in " + idle_type + " is not a dict") else: raise TypeError(idle_type + " is not a dict") result = self.sendRequest( (RscpTag.EMS_REQ_SET_IDLE_PERIODS, RscpType.Container, periodList), keepAlive=keepAlive, ) if result[0] != RscpTag.EMS_SET_IDLE_PERIODS or result[2] != 1: return False return True
[docs] def get_db_data_timestamp( self, startTimestamp: int, timespanSeconds: int, keepAlive: bool = False ): """Reads DB data and summed up values for the given timespan via rscp protocol. Args: startTimestamp (int): UNIX timestampt from where the db data should be collected timespanSeconds (int): number of seconds for which the data should be collected keepAlive (bool): True to keep connection alive. Defaults to False. Returns: dict: Dictionary containing the stored db information structured as follows:: { "autarky": <autarky in the period in %>, "bat_power_in": <power entering battery, charging>, "bat_power_out": <power leaving battery, discharging>, "consumed_production": <power directly consumed in %>, "consumption": <self consumed power>, "grid_power_in": <power sent into the grid (production)>, "grid_power_out": <power taken from the grid (consumption)>, "startTimestamp": <timestamp from which db data is fetched of>, "stateOfCharge": <battery charge level in %>, "solarProduction": <power production>, "pm0Production": <power production>, "pm1Production": <power production>, "timespanSeconds": <timespan in seconds of which db data is collected> } """ if timespanSeconds == 0: return None response = self.sendRequest( ( RscpTag.DB_REQ_HISTORY_DATA_DAY, RscpType.Container, [ ( RscpTag.DB_REQ_HISTORY_TIME_START, RscpType.Uint64, startTimestamp, ), ( RscpTag.DB_REQ_HISTORY_TIME_INTERVAL, RscpType.Uint64, timespanSeconds, ), ( RscpTag.DB_REQ_HISTORY_TIME_SPAN, RscpType.Uint64, timespanSeconds, ), ], ), keepAlive=keepAlive, ) outObj = { "autarky": rscpFindTagIndex(response[2][0], RscpTag.DB_AUTARKY), "bat_power_in": rscpFindTagIndex(response[2][0], RscpTag.DB_BAT_POWER_IN), "bat_power_out": rscpFindTagIndex(response[2][0], RscpTag.DB_BAT_POWER_OUT), "consumed_production": rscpFindTagIndex( response[2][0], RscpTag.DB_CONSUMED_PRODUCTION ), "consumption": rscpFindTagIndex(response[2][0], RscpTag.DB_CONSUMPTION), "grid_power_in": rscpFindTagIndex(response[2][0], RscpTag.DB_GRID_POWER_IN), "grid_power_out": rscpFindTagIndex( response[2][0], RscpTag.DB_GRID_POWER_OUT ), "startTimestamp": startTimestamp, "stateOfCharge": rscpFindTagIndex( response[2][0], RscpTag.DB_BAT_CHARGE_LEVEL ), "solarProduction": rscpFindTagIndex(response[2][0], RscpTag.DB_DC_POWER), "pm0Production": rscpFindTagIndex(response[2][0], RscpTag.DB_PM_0_POWER), "pm1Production": rscpFindTagIndex(response[2][0], RscpTag.DB_PM_1_POWER), "timespanSeconds": timespanSeconds, } return outObj
[docs] def get_db_data( self, startDate: datetime.date = datetime.date.today(), timespan: Literal["DAY", "MONTH", "YEAR"] = "DAY", keepAlive: bool = False, ): """Reads DB data and summed up values for the given timespan via rscp protocol. Args: startDate (datetime.date): start date for timespan, default today. Depending on timespan given, the startDate is automatically adjusted to the first of the month or the year timespan (str): string specifying the time span ["DAY", "MONTH", "YEAR"] keepAlive (bool): True to keep connection alive. Defaults to False. Returns: dict: Dictionary containing the stored db information structured as follows:: { "autarky": <autarky in the period in %>, "bat_power_in": <power entering battery, charging>, "bat_power_out": <power leaving battery, discharging>, "consumed_production": <power directly consumed in %>, "consumption": <self consumed power>, "grid_power_in": <power sent into the grid (production)>, "grid_power_out": <power taken from the grid (consumption)>, "startDate": <date from which db data is fetched of>, "stateOfCharge": <battery charge level in %>, "solarProduction": <power production>, "pm0Production": <power production>, "pm1Production": <power production>, "timespan": <timespan of which db data is collected>, "timespanSeconds": <timespan in seconds of which db data is collected> } """ if "YEAR" == timespan: requestDate = startDate.replace(day=1, month=1) span = 365 * 24 * 60 * 60 elif "MONTH" == timespan: requestDate = startDate.replace(day=1) num_days = monthrange(requestDate.year, requestDate.month)[1] span = num_days * 24 * 60 * 60 else: requestDate = startDate span = 24 * 60 * 60 startTimestamp = int(time.mktime(requestDate.timetuple())) outObj = self.get_db_data_timestamp( startTimestamp=startTimestamp, timespanSeconds=span, keepAlive=keepAlive ) if outObj is not None: del outObj["startTimestamp"] outObj["startDate"] = requestDate outObj["timespan"] = timespan outObj = {k: v for k, v in sorted(outObj.items())} return outObj
[docs] def get_system_info_static(self, keepAlive: bool = False): """Polls the static system info via rscp protocol. Args: keepAlive (bool): True to keep connection alive. Defaults to False. """ self.deratePercent = ( self.sendRequestTag(RscpTag.EMS_REQ_DERATE_AT_PERCENT_VALUE, keepAlive=True) * 100 ) self.deratePower = self.sendRequestTag( RscpTag.EMS_REQ_DERATE_AT_POWER_VALUE, keepAlive=True ) self.installedPeakPower = self.sendRequestTag( RscpTag.EMS_REQ_INSTALLED_PEAK_POWER, keepAlive=True ) self.externalSourceAvailable = self.sendRequestTag( RscpTag.EMS_REQ_EXT_SRC_AVAILABLE, keepAlive=True ) self.macAddress = self.sendRequestTag( RscpTag.INFO_REQ_MAC_ADDRESS, keepAlive=True ) if ( not self.serialNumber ): # do not send this for a web connection because it screws up the handshake! self._set_serial( self.sendRequestTag(RscpTag.INFO_REQ_SERIAL_NUMBER, keepAlive=True) ) sys_specs = self.sendRequestTag( RscpTag.EMS_REQ_GET_SYS_SPECS, keepAlive=keepAlive ) for item in sys_specs: if ( rscpFindTagIndex(item, RscpTag.EMS_SYS_SPEC_NAME) == "installedBatteryCapacity" ): self.installedBatteryCapacity = rscpFindTagIndex( item, RscpTag.EMS_SYS_SPEC_VALUE_INT ) elif rscpFindTagIndex(item, RscpTag.EMS_SYS_SPEC_NAME) == "maxAcPower": self.maxAcPower = rscpFindTagIndex(item, RscpTag.EMS_SYS_SPEC_VALUE_INT) elif ( rscpFindTagIndex(item, RscpTag.EMS_SYS_SPEC_NAME) == "maxBatChargePower" ): self.maxBatChargePower = rscpFindTagIndex( item, RscpTag.EMS_SYS_SPEC_VALUE_INT ) elif ( rscpFindTagIndex(item, RscpTag.EMS_SYS_SPEC_NAME) == "maxBatDischargPower" ): self.maxBatDischargePower = rscpFindTagIndex( item, RscpTag.EMS_SYS_SPEC_VALUE_INT ) elif ( rscpFindTagIndex(item, RscpTag.EMS_SYS_SPEC_NAME) == "startDischargeDefault" ): self.startDischargeDefault = rscpFindTagIndex( item, RscpTag.EMS_SYS_SPEC_VALUE_INT ) # EMS_REQ_SPECIFICATION_VALUES return True
[docs] def get_system_info(self, keepAlive: bool = False): """Polls the system info via rscp protocol. Args: keepAlive (bool): True to keep connection alive. Defaults to False. Returns: dict: Dictionary containing the system info structured as follows:: { "deratePercent": <% of installed peak power the feed in will be derated>, "deratePower": <W at which the feed in will be derated>, "externalSourceAvailable": <wether an additional power meter is installed>, "installedBatteryCapacity": <installed Battery Capacity in W>, "installedPeakPower": <installed peak power in W>, "maxAcPower": <max AC power>, "macAddress": <the mac address>, "maxBatChargePower": <max Battery charge power>, "maxBatDischargePower": <max Battery discharge power>, "model": <model connected to>, "release": <release version>, "serial": <serial number of the system> } """ # use keepAlive setting for last request sw = self.sendRequestTag(RscpTag.INFO_REQ_SW_RELEASE, keepAlive=keepAlive) # EMS_EMERGENCY_POWER_STATUS outObj = { "deratePercent": self.deratePercent, "deratePower": self.deratePower, "externalSourceAvailable": self.externalSourceAvailable, "installedBatteryCapacity": self.installedBatteryCapacity, "installedPeakPower": self.installedPeakPower, "maxAcPower": self.maxAcPower, "macAddress": self.macAddress, "maxBatChargePower": self.maxBatChargePower, "maxBatDischargePower": self.maxBatDischargePower, "model": self.model, "release": sw, "serial": self.serialNumber, } return outObj
[docs] def get_system_status(self, keepAlive: bool = False): """Polls the system status via rscp protocol. Args: keepAlive (bool): True to keep connection alive. Defaults to False. Returns: dict: Dictionary containing the system status structured as follows:: { "dcdcAlive": <dcdc alive>, "powerMeterAlive": <power meter alive>, "batteryModuleAlive": <battery module alive>, "pvModuleAlive": <pv module alive>, "pvInverterInited": <pv inverter inited>, "serverConnectionAlive": <server connection alive>, "pvDerated": <pv derated due to deratePower limit reached>, "emsAlive": <emd alive>, "acModeBlocked": <ad mode blocked>, "sysConfChecked": <sys conf checked>, "emergencyPowerStarted": <emergency power started>, "emergencyPowerOverride": <emergency power override>, "wallBoxAlive": <wall box alive>, "powerSaveEnabled": <power save enabled>, "chargeIdlePeriodActive": <charge idle period active>, "dischargeIdlePeriodActive": <discharge idle period active>, "waitForWeatherBreakthrough": <wait for weather breakthrouhgh>, "rescueBatteryEnabled": <rescue battery enabled>, "emergencyReserveReached": <emergencey reserve reached>, "socSyncRequested": <soc sync requested> } """ # use keepAlive setting for last request sw = self.sendRequestTag(RscpTag.EMS_REQ_SYS_STATUS, keepAlive=keepAlive) SystemStatusBools = [bool(int(i)) for i in reversed(list(f"{sw:022b}"))] outObj = { "dcdcAlive": 0, "powerMeterAlive": 1, "batteryModuleAlive": 2, "pvModuleAlive": 3, "pvInverterInited": 4, "serverConnectionAlive": 5, "pvDerated": 6, "emsAlive": 7, # "acCouplingMode:2; // 8-9 "acModeBlocked": 10, "sysConfChecked": 11, "emergencyPowerStarted": 12, "emergencyPowerOverride": 13, "wallBoxAlive": 14, "powerSaveEnabled": 15, "chargeIdlePeriodActive": 16, "dischargeIdlePeriodActive": 17, "waitForWeatherBreakthrough": 18, # this status bit shows if weather regulated charge is active and the system is waiting for the sun power breakthrough. (PV power > derating power) "rescueBatteryEnabled": 19, "emergencyReserveReached": 20, "socSyncRequested": 21, } outObj = {k: SystemStatusBools[v] for k, v in outObj.items()} return outObj
[docs] def get_wallbox_data(self, wbIndex: int = 0, keepAlive: bool = False): """Polls the wallbox status via rscp protocol locally. Args: wbIndex (int | None): Index of the wallbox to poll data for keepAlive (bool | None): True to keep connection alive Returns: dict: Dictionary containing the wallbox status structured as follows:: { "appSoftware": <version of the app>, "batteryToCar": <true if the wallbox may use the battery, otherwise false>, "chargingActive": <true if charging is currently active, otherwise false>, "chargingCanceled": <true if charging was manually canceled, otherwise false>, "consumptionNet": <power currently consumed by the wallbox, provided by the grid in watts>, "consumptionSun": <power currently consumed by the wallbox, provided by the solar panels in watts>, "energyAll": <total consumed energy this month in watthours>, "energyNet": <consumed net energy this month in watthours>, "energySun": <consumed solar energy this month in watthours>, "index": <index of the requested wallbox>, "keyState": <state of the key switch at the wallbox>, "maxChargeCurrent": <configured maximum charge current in A>, "phases": <number of phases used for charging>, "schukoOn": <true if the connected schuko of the wallbox is on, otherwise false>, "soc": <state of charge>, "sunModeOn": <true if sun-only-mode is active, false if mixed mode is active> } """ req = self.sendRequest( ( RscpTag.WB_REQ_DATA, RscpType.Container, [ (RscpTag.WB_INDEX, RscpType.UChar8, wbIndex), (RscpTag.WB_REQ_EXTERN_DATA_ALG, RscpType.NoneType, None), (RscpTag.WB_REQ_EXTERN_DATA_SUN, RscpType.NoneType, None), (RscpTag.WB_REQ_EXTERN_DATA_NET, RscpType.NoneType, None), (RscpTag.WB_REQ_APP_SOFTWARE, RscpType.NoneType, None), (RscpTag.WB_REQ_KEY_STATE, RscpType.NoneType, None), ], ), keepAlive=True, ) outObj = { "index": rscpFindTagIndex(req, RscpTag.WB_INDEX), "appSoftware": rscpFindTagIndex(req, RscpTag.WB_APP_SOFTWARE), } extern_data_alg = rscpFindTag(req, RscpTag.WB_EXTERN_DATA_ALG) if extern_data_alg is not None: extern_data = rscpFindTagIndex(extern_data_alg, RscpTag.WB_EXTERN_DATA) status_byte = extern_data[2] outObj["sunModeOn"] = (status_byte & 128) != 0 outObj["chargingCanceled"] = (status_byte & 64) != 0 outObj["chargingActive"] = (status_byte & 32) != 0 outObj["plugLocked"] = (status_byte & 16) != 0 outObj["plugged"] = (status_byte & 8) != 0 outObj["soc"] = extern_data[0] outObj["phases"] = extern_data[1] outObj["maxChargeCurrent"] = extern_data[3] outObj["schukoOn"] = extern_data[5] != 0 extern_data_sun = rscpFindTag(req, RscpTag.WB_EXTERN_DATA_SUN) if extern_data_sun is not None: extern_data = rscpFindTagIndex(extern_data_sun, RscpTag.WB_EXTERN_DATA) outObj["consumptionSun"] = struct.unpack("h", extern_data[0:2])[0] outObj["energySun"] = struct.unpack("i", extern_data[2:6])[0] extern_data_net = rscpFindTag(req, RscpTag.WB_EXTERN_DATA_NET) if extern_data_net is not None: extern_data = rscpFindTagIndex(extern_data_net, RscpTag.WB_EXTERN_DATA) outObj["consumptionNet"] = struct.unpack("h", extern_data[0:2])[0] outObj["energyNet"] = struct.unpack("i", extern_data[2:6])[0] if "energySun" in outObj and "energyNet" in outObj: outObj["energyAll"] = outObj["energyNet"] + outObj["energySun"] key_state = rscpFindTag(req, RscpTag.WB_KEY_STATE) if key_state is not None: outObj["keyState"] = rscpFindTagIndex(key_state, RscpTag.WB_KEY_STATE) req = self.sendRequest( (RscpTag.EMS_REQ_BATTERY_TO_CAR_MODE, RscpType.NoneType, None), keepAlive=keepAlive, ) battery_to_car = rscpFindTag(req, RscpTag.EMS_BATTERY_TO_CAR_MODE) if battery_to_car is not None: outObj["batteryToCar"] = rscpFindTagIndex( battery_to_car, RscpTag.EMS_BATTERY_TO_CAR_MODE ) outObj = {k: v for k, v in sorted(outObj.items())} return outObj
[docs] def set_wallbox_sunmode( self, enable: bool, wbIndex: int = 0, keepAlive: bool = False ) -> bool: """Sets the sun mode of the wallbox via rscp protocol locally. Args: enable (bool): True to enable sun mode, otherwise false, wbIndex (int | None): index of the requested wallbox, keepAlive (bool | None): True to keep connection alive Returns: True if success False if error """ return self.sendWallboxSetRequest( dataIndex=0, value=1 if enable else 2, wbIndex=wbIndex, keepAlive=keepAlive )
[docs] def set_wallbox_schuko( self, on: bool, wbIndex: int = 0, keepAlive: bool = False ) -> bool: """Sets the Schuko of the wallbox via rscp protocol locally. Args: on (bool): True to activate the Schuko, otherwise false wbIndex (int | None): index of the requested wallbox, keepAlive (bool | None): True to keep connection alive Returns: True if success (wallbox has understood the request, but might have ignored an unsupported value) False if error """ return self.sendWallboxSetRequest( dataIndex=5, value=1 if on else 0, wbIndex=wbIndex, keepAlive=keepAlive )
[docs] def set_wallbox_max_charge_current( self, max_charge_current: int, wbIndex: int = 0, keepAlive: bool = False ) -> bool: """Sets the maximum charge current of the wallbox via rscp protocol locally. Args: max_charge_current (int): maximum allowed charge current in A wbIndex (int | None): index of the requested wallbox, keepAlive (bool | None): True to keep connection alive Returns: True if success (wallbox has understood the request, but might have clipped the value) False if error """ return self.sendWallboxSetRequest( dataIndex=2, value=max_charge_current, request=RscpTag.WB_REQ_SET_PARAM_1, wbIndex=wbIndex, keepAlive=keepAlive, )
[docs] def toggle_wallbox_charging( self, wbIndex: int = 0, keepAlive: bool = False ) -> bool: """Toggles charging of the wallbox via rscp protocol locally. Args: wbIndex (int | None): index of the requested wallbox, keepAlive (bool | None): True to keep connection alive Returns: True if success False if error """ return self.sendWallboxSetRequest( dataIndex=4, value=1, wbIndex=wbIndex, keepAlive=keepAlive )
[docs] def toggle_wallbox_phases(self, wbIndex: int = 0, keepAlive: bool = False) -> bool: """Toggles the number of phases used for charging by the wallbox between 1 and 3 via rscp protocol locally. Args: wbIndex (int | None): index of the requested wallbox, keepAlive (bool | None): True to keep connection alive Returns: True if success False if error """ return self.sendWallboxSetRequest( dataIndex=3, value=1, wbIndex=wbIndex, keepAlive=keepAlive )
[docs] def sendWallboxRequest( self, dataIndex: int, value: int, request: RscpTag = RscpTag.WB_REQ_SET_EXTERN, wbIndex: int = 0, keepAlive: bool = False, ) -> tuple[str | int | RscpTag, str | int | RscpType, Any]: """Sends a low-level request with WB_EXTERN_DATA to the wallbox via rscp protocol locally. Args: dataIndex (int): byte index in the WB_EXTERN_DATA array (values: 0-5) value (int): byte value to be set in the WB_EXTERN_DATA array at the given index request (RscpTag | None): request identifier (WB_REQ_SET_EXTERN, WB_REQ_SET_PARAM_1 or WB_REQ_SET_PARAM_2), wbIndex (int | None): index of the requested wallbox, keepAlive (bool | None): True to keep connection alive Returns: An object with the received data """ dataArray = bytearray([0, 0, 0, 0, 0, 0]) dataArray[dataIndex] = value result = self.sendRequest( ( RscpTag.WB_REQ_DATA, RscpType.Container, [ (RscpTag.WB_INDEX, RscpType.UChar8, wbIndex), ( request, RscpType.Container, [ (RscpTag.WB_EXTERN_DATA, RscpType.ByteArray, dataArray), ( RscpTag.WB_EXTERN_DATA_LEN, RscpType.UChar8, len(dataArray), ), ], ), ], ), keepAlive=keepAlive, ) return result
[docs] def sendWallboxSetRequest( self, dataIndex: int, value: int, request: RscpTag = RscpTag.WB_REQ_SET_EXTERN, wbIndex: int = 0, keepAlive: bool = False, ) -> bool: """Sends a low-level set request with WB_EXTERN_DATA to the wallbox via rscp protocol locally and evaluates the response. Args: dataIndex (int): byte index in the WB_EXTERN_DATA array (values: 0-5) value (int): byte value to be set in the WB_EXTERN_DATA array at the given index request (RscpTag | None): request identifier (WB_REQ_SET_EXTERN, WB_REQ_SET_PARAM_1 or WB_REQ_SET_PARAM_2), wbIndex (int | None): index of the requested wallbox, keepAlive (bool | None): True to keep connection alive Returns: True if success False if error """ response = self.sendWallboxRequest( dataIndex, value, request, wbIndex, keepAlive ) if response[0] != RscpTag.WB_DATA.name: return False responseData = response[2][-1] return ( responseData[0][2:] == request.name[6:] and responseData[1] != RscpType.Error.name )
[docs] def set_battery_to_car_mode(self, enabled: bool, keepAlive: bool = False): """Sets whether the wallbox may use the battery. Args: enabled (bool): True to enable charging the car using the battery keepAlive (bool | None): True to keep connection alive Returns: True if success False if error """ enabledValue = 1 if enabled else 0 response = self.sendRequest( (RscpTag.EMS_REQ_SET_BATTERY_TO_CAR_MODE, RscpType.UChar8, enabledValue), keepAlive=keepAlive, ) return response == ( RscpTag.EMS_SET_BATTERY_TO_CAR_MODE.name, RscpType.UChar8.name, enabledValue, )
[docs] def get_batteries(self, keepAlive: bool = False): """Scans for installed batteries via rscp protocol. Args: keepAlive (bool): True to keep connection alive. Defaults to False. Returns: list[dict]: List containing the found batteries as follows.: [ {'index': 0, "dcbs": 3} ] """ maxBatteries = 8 outObj: list[dict[str, int]] = [] for batIndex in range(maxBatteries): try: req = self.sendRequest( ( RscpTag.BAT_REQ_DATA, RscpType.Container, [ (RscpTag.BAT_INDEX, RscpType.Uint16, batIndex), (RscpTag.BAT_REQ_DCB_COUNT, RscpType.NoneType, None), ], ), keepAlive=True if batIndex < (maxBatteries - 1) else keepAlive, ) except NotAvailableError: continue dcbCount = rscpFindTagIndex(req, RscpTag.BAT_DCB_COUNT) if dcbCount is not None: outObj.append( { "index": batIndex, "dcbs": dcbCount, } ) return outObj
[docs] def get_battery_data( self, batIndex: int | None = None, dcbs: list[int] | None = None, keepAlive: bool = False, ): """Polls the battery data via rscp protocol. Args: batIndex (int | None): battery index dcbs (list | None): dcb list keepAlive (bool): True to keep connection alive. Defaults to False. Returns: dict: Dictionary containing the battery data structured as follows:: { "asoc": <absolute state of charge>, "chargeCycles": <charge cycles>, "current": <current>, "dcbCount": <dcb count>, "dcbs": {0: { "current": <current>, "currentAvg30s": <current average 30s>, "cycleCount": <cycle count>, "designCapacity": <design capacity>, "designVoltage": <design voltage>, "deviceName": <device name>, "endOfDischarge": <end of discharge>, "error": <error>, "fullChargeCapacity": <full charge capacity>, "fwVersion": <firmware version>, "manufactureDate": <manufacture date>, "manufactureName": <manufacture name>, "maxChargeCurrent": <max charge current>, "maxChargeTemperature": <max charge temperature>, "maxChargeVoltage": <max charge voltage>, "maxDischargeCurrent": <max discharge current>, "minChargeTemperature": <min charge temperature>, "parallelCellCount": <parallel cell count>, "sensorCount": <sensor countt>, "seriesCellCount": <cells in series count>, "pcbVersion": <pcb version>, "protocolVersion": <protocol version>, "remainingCapacity": <remaining capacity>, "serialCode": <serial code>, "serialNo": <serial no>, "soc": <state of charge>, "soh": <state of health>, "status": <status>, "temperatures": <temperatures>, "voltage": <voltage>, "voltageAvg30s": <voltage average 30s>, "voltages": <voltages>, "warning": <warning> } }, "designCapacity": <design capacity>, "deviceConnected": <device connected>, "deviceInService": <device in service>, "deviceName": <device name>, "deviceWorking": <device working>, "eodVoltage": <eod voltage>, "errorCode": <error code>, "fcc": <full charge capacity>, "index": <batIndex>, "maxBatVoltage": <max battery voltage>, "maxChargeCurrent": <max charge current>, "maxDischargeCurrent": <max discharge current>, "maxDcbCellTemp": <max DCB cell temp>, "minDcbCellTemp": <min DCB cell temp>, "moduleVoltage": <module voltage>, "rc": <rc>, "readyForShutdown": <ready for shutdown>, "rsoc": <relative state of charge>, "rsocReal": <real relative state of charge>, "statusCode": <status code>, "terminalVoltage": <terminal voltage>, "totalUseTime": <total use time>, "totalDischargeTime": <total discharge time>, "trainingMode": <training mode>, "usuableCapacity": <usuable capacity> "usuableRemainingCapacity": <usuable remaining capacity> } """ if batIndex is None: batIndex = self.batteries[0]["index"] req = self.sendRequest( ( RscpTag.BAT_REQ_DATA, RscpType.Container, [ (RscpTag.BAT_INDEX, RscpType.Uint16, batIndex), (RscpTag.BAT_REQ_ASOC, RscpType.NoneType, None), (RscpTag.BAT_REQ_CHARGE_CYCLES, RscpType.NoneType, None), (RscpTag.BAT_REQ_CURRENT, RscpType.NoneType, None), (RscpTag.BAT_REQ_DCB_COUNT, RscpType.NoneType, None), (RscpTag.BAT_REQ_DESIGN_CAPACITY, RscpType.NoneType, None), (RscpTag.BAT_REQ_DEVICE_NAME, RscpType.NoneType, None), (RscpTag.BAT_REQ_DEVICE_STATE, RscpType.NoneType, None), (RscpTag.BAT_REQ_EOD_VOLTAGE, RscpType.NoneType, None), (RscpTag.BAT_REQ_ERROR_CODE, RscpType.NoneType, None), (RscpTag.BAT_REQ_FCC, RscpType.NoneType, None), (RscpTag.BAT_REQ_MAX_BAT_VOLTAGE, RscpType.NoneType, None), (RscpTag.BAT_REQ_MAX_CHARGE_CURRENT, RscpType.NoneType, None), ( RscpTag.BAT_REQ_MAX_DISCHARGE_CURRENT, RscpType.NoneType, None, ), ( RscpTag.BAT_REQ_MAX_DCB_CELL_TEMPERATURE, RscpType.NoneType, None, ), ( RscpTag.BAT_REQ_MIN_DCB_CELL_TEMPERATURE, RscpType.NoneType, None, ), (RscpTag.BAT_REQ_INTERNALS, RscpType.NoneType, None), (RscpTag.BAT_REQ_MODULE_VOLTAGE, RscpType.NoneType, None), (RscpTag.BAT_REQ_RC, RscpType.NoneType, None), (RscpTag.BAT_REQ_READY_FOR_SHUTDOWN, RscpType.NoneType, None), (RscpTag.BAT_REQ_RSOC, RscpType.NoneType, None), (RscpTag.BAT_REQ_RSOC_REAL, RscpType.NoneType, None), (RscpTag.BAT_REQ_STATUS_CODE, RscpType.NoneType, None), (RscpTag.BAT_REQ_TERMINAL_VOLTAGE, RscpType.NoneType, None), (RscpTag.BAT_REQ_TOTAL_USE_TIME, RscpType.NoneType, None), (RscpTag.BAT_REQ_TOTAL_DISCHARGE_TIME, RscpType.NoneType, None), (RscpTag.BAT_REQ_TRAINING_MODE, RscpType.NoneType, None), (RscpTag.BAT_REQ_USABLE_CAPACITY, RscpType.NoneType, None), ( RscpTag.BAT_REQ_USABLE_REMAINING_CAPACITY, RscpType.NoneType, None, ), ], ), keepAlive=True, ) dcbCount = rscpFindTagIndex(req, RscpTag.BAT_DCB_COUNT) deviceStateContainer = rscpFindTag(req, RscpTag.BAT_DEVICE_STATE) outObj: dict[str, Any] = { "asoc": rscpFindTagIndex(req, RscpTag.BAT_ASOC), "chargeCycles": rscpFindTagIndex(req, RscpTag.BAT_CHARGE_CYCLES), "current": rscpFindTagIndex(req, RscpTag.BAT_CURRENT), "dcbCount": dcbCount, "dcbs": {}, "designCapacity": rscpFindTagIndex(req, RscpTag.BAT_DESIGN_CAPACITY), "deviceConnected": rscpFindTagIndex( deviceStateContainer, RscpTag.BAT_DEVICE_CONNECTED ), "deviceInService": rscpFindTagIndex( deviceStateContainer, RscpTag.BAT_DEVICE_IN_SERVICE ), "deviceName": rscpFindTagIndex(req, RscpTag.BAT_DEVICE_NAME), "deviceWorking": rscpFindTagIndex( deviceStateContainer, RscpTag.BAT_DEVICE_WORKING ), "eodVoltage": rscpFindTagIndex(req, RscpTag.BAT_EOD_VOLTAGE), "errorCode": rscpFindTagIndex(req, RscpTag.BAT_ERROR_CODE), "fcc": rscpFindTagIndex(req, RscpTag.BAT_FCC), "index": batIndex, "maxBatVoltage": rscpFindTagIndex(req, RscpTag.BAT_MAX_BAT_VOLTAGE), "maxChargeCurrent": rscpFindTagIndex(req, RscpTag.BAT_MAX_CHARGE_CURRENT), "maxDischargeCurrent": rscpFindTagIndex( req, RscpTag.BAT_MAX_DISCHARGE_CURRENT ), "maxDcbCellTemp": rscpFindTagIndex( req, RscpTag.BAT_MAX_DCB_CELL_TEMPERATURE ), "minDcbCellTemp": rscpFindTagIndex( req, RscpTag.BAT_MIN_DCB_CELL_TEMPERATURE ), "moduleVoltage": rscpFindTagIndex(req, RscpTag.BAT_MODULE_VOLTAGE), "rc": rscpFindTagIndex(req, RscpTag.BAT_RC), "readyForShutdown": rscpFindTagIndex(req, RscpTag.BAT_READY_FOR_SHUTDOWN), "rsoc": rscpFindTagIndex(req, RscpTag.BAT_RSOC), "rsocReal": rscpFindTagIndex(req, RscpTag.BAT_RSOC_REAL), "statusCode": rscpFindTagIndex(req, RscpTag.BAT_STATUS_CODE), "terminalVoltage": rscpFindTagIndex(req, RscpTag.BAT_TERMINAL_VOLTAGE), "totalUseTime": rscpFindTagIndex(req, RscpTag.BAT_TOTAL_USE_TIME), "totalDischargeTime": rscpFindTagIndex( req, RscpTag.BAT_TOTAL_DISCHARGE_TIME ), "trainingMode": rscpFindTagIndex(req, RscpTag.BAT_TRAINING_MODE), "usuableCapacity": rscpFindTagIndex(req, RscpTag.BAT_USABLE_CAPACITY), "usuableRemainingCapacity": rscpFindTagIndex( req, RscpTag.BAT_USABLE_REMAINING_CAPACITY ), } if dcbs is None: dcbs = list(range(0, dcbCount)) for dcb in dcbs: req = self.sendRequest( ( RscpTag.BAT_REQ_DATA, RscpType.Container, [ (RscpTag.BAT_INDEX, RscpType.Uint16, batIndex), ( RscpTag.BAT_REQ_DCB_ALL_CELL_TEMPERATURES, RscpType.Uint16, dcb, ), ( RscpTag.BAT_REQ_DCB_ALL_CELL_VOLTAGES, RscpType.Uint16, dcb, ), (RscpTag.BAT_REQ_DCB_INFO, RscpType.Uint16, dcb), ], ), keepAlive=( True if dcb != dcbs[-1] else keepAlive ), # last request should honor keepAlive ) info = rscpFindTag(req, RscpTag.BAT_DCB_INFO) # For some devices, no info for the DCBs exists. Skip those. if info is None or len(info) < 3 or info[1] == "Error": continue # Initialize default values for DCB sensorCount = 0 temperatures: list[float] = [] seriesCellCount = 0 voltages: list[float] = [] # Set temperatures, if available for the device temperatures_raw = rscpFindTag(req, RscpTag.BAT_DCB_ALL_CELL_TEMPERATURES) if ( temperatures_raw is not None and len(temperatures_raw) == 3 and temperatures_raw[1] != "Error" ): temperatures_data = rscpFindTagIndex(temperatures_raw, RscpTag.BAT_DATA) sensorCount = rscpFindTagIndex(info, RscpTag.BAT_DCB_NR_SENSOR) # As sensorCount can return bigger values than we have actual temperatures_data, # we use the smaller count for robustness. sensors = min(sensorCount, len(temperatures_data)) for sensor in range(0, sensors): temperatures.append(temperatures_data[sensor][2]) # Set voltages, if available for the device voltages_raw = rscpFindTag(req, RscpTag.BAT_DCB_ALL_CELL_VOLTAGES) if ( voltages_raw is not None and len(voltages_raw) == 3 and voltages_raw[1] != "Error" ): voltages_data = rscpFindTagIndex(voltages_raw, RscpTag.BAT_DATA) for cell_voltage in voltages_data: voltages.append(cell_voltage[2]) dcbobj: dict[str, Any] = { "current": rscpFindTagIndex(info, RscpTag.BAT_DCB_CURRENT), "currentAvg30s": rscpFindTagIndex( info, RscpTag.BAT_DCB_CURRENT_AVG_30S ), "cycleCount": rscpFindTagIndex(info, RscpTag.BAT_DCB_CYCLE_COUNT), "designCapacity": rscpFindTagIndex( info, RscpTag.BAT_DCB_DESIGN_CAPACITY ), "designVoltage": rscpFindTagIndex(info, RscpTag.BAT_DCB_DESIGN_VOLTAGE), "deviceName": rscpFindTagIndex(info, RscpTag.BAT_DCB_DEVICE_NAME), "endOfDischarge": rscpFindTagIndex( info, RscpTag.BAT_DCB_END_OF_DISCHARGE ), "error": rscpFindTagIndex(info, RscpTag.BAT_DCB_ERROR), "fullChargeCapacity": rscpFindTagIndex( info, RscpTag.BAT_DCB_FULL_CHARGE_CAPACITY ), "fwVersion": rscpFindTagIndex(info, RscpTag.BAT_DCB_FW_VERSION), "manufactureDate": rscpFindTagIndex( info, RscpTag.BAT_DCB_MANUFACTURE_DATE ), "manufactureName": rscpFindTagIndex( info, RscpTag.BAT_DCB_MANUFACTURE_NAME ), "maxChargeCurrent": rscpFindTagIndex( info, RscpTag.BAT_DCB_MAX_CHARGE_CURRENT ), "maxChargeTemperature": rscpFindTagIndex( info, RscpTag.BAT_DCB_CHARGE_HIGH_TEMPERATURE ), "maxChargeVoltage": rscpFindTagIndex( info, RscpTag.BAT_DCB_MAX_CHARGE_VOLTAGE ), "maxDischargeCurrent": rscpFindTagIndex( info, RscpTag.BAT_DCB_MAX_DISCHARGE_CURRENT ), "minChargeTemperature": rscpFindTagIndex( info, RscpTag.BAT_DCB_CHARGE_LOW_TEMPERATURE ), "parallelCellCount": rscpFindTagIndex( info, RscpTag.BAT_DCB_NR_PARALLEL_CELL ), "sensorCount": sensorCount, "seriesCellCount": seriesCellCount, "pcbVersion": rscpFindTagIndex(info, RscpTag.BAT_DCB_PCB_VERSION), "protocolVersion": rscpFindTagIndex( info, RscpTag.BAT_DCB_PROTOCOL_VERSION ), "remainingCapacity": rscpFindTagIndex( info, RscpTag.BAT_DCB_REMAINING_CAPACITY ), "serialCode": rscpFindTagIndex(info, RscpTag.BAT_DCB_SERIALCODE), "serialNo": rscpFindTagIndex(info, RscpTag.BAT_DCB_SERIALNO), "soc": rscpFindTagIndex(info, RscpTag.BAT_DCB_SOC), "soh": rscpFindTagIndex(info, RscpTag.BAT_DCB_SOH), "status": rscpFindTagIndex(info, RscpTag.BAT_DCB_STATUS), "temperatures": temperatures, "voltage": rscpFindTagIndex(info, RscpTag.BAT_DCB_VOLTAGE), "voltageAvg30s": rscpFindTagIndex( info, RscpTag.BAT_DCB_VOLTAGE_AVG_30S ), "voltages": voltages, "warning": rscpFindTagIndex(info, RscpTag.BAT_DCB_WARNING), } outObj["dcbs"].update({dcb: dcbobj}) # type: ignore return outObj
[docs] def get_batteries_data( self, batteries: list[dict[str, Any]] | None = None, keepAlive: bool = False ): """Polls the batteries data via rscp protocol. Args: batteries (dict | None): batteries dict keepAlive (bool): True to keep connection alive. Defaults to False. Returns: list[dict]: Returns a list of batteries data """ if batteries is None: batteries = self.batteries outObj: list[dict[str, Any]] = [] for battery in batteries: if "dcbs" in battery: dcbs = list(range(0, battery["dcbs"])) else: dcbs = None outObj.append( self.get_battery_data( batIndex=battery["index"], dcbs=dcbs, keepAlive=( True if battery["index"] != batteries[-1]["index"] else keepAlive ), # last request should honor keepAlive ) ) return outObj
[docs] def get_pvis(self, keepAlive: bool = False): """Scans for installed pvis via rscp protocol. Args: keepAlive (bool): True to keep connection alive. Defaults to False. Returns: list[dict]: List containing the found pvis as follows.:: [ {'index': 0, "phases": 3, "strings": 2, 'type': 3, 'typeName': 'PVI_TYPE_E3DC_E'} ] """ maxPvis = 8 outObj: list[dict[str, Any]] = [] for pviIndex in range(maxPvis): req = self.sendRequest( ( RscpTag.PVI_REQ_DATA, "Container", [ (RscpTag.PVI_INDEX, RscpType.Uint16, pviIndex), (RscpTag.PVI_REQ_TYPE, RscpType.NoneType, None), (RscpTag.PVI_REQ_USED_STRING_COUNT, RscpType.NoneType, None), (RscpTag.PVI_REQ_AC_MAX_PHASE_COUNT, RscpType.NoneType, None), ], ), keepAlive=True if pviIndex < (maxPvis - 1) else keepAlive, ) pviType = rscpFindTagIndex(req, RscpTag.PVI_TYPE) if pviType is not None: maxPhaseCount = int( rscpFindTagIndex(req, RscpTag.PVI_AC_MAX_PHASE_COUNT) ) usedStringCount = int( rscpFindTagIndex(req, RscpTag.PVI_USED_STRING_COUNT) ) outObj.append( { "index": pviIndex, "phases": maxPhaseCount, "strings": usedStringCount, "type": pviType, "typeName": getStrPviType(pviType), } ) return outObj
[docs] def get_pvi_data( self, pviIndex: int | None = None, strings: list[int] | None = None, phases: list[int] | None = None, keepAlive: bool = False, ): """Polls the inverter data via rscp protocol. Args: pviIndex (int): pv inverter index strings (list | None): string list phases (list | None): phase list keepAlive (bool): True to keep connection alive. Defaults to False. Returns: dict: Dictionary containing the pvi data structured as follows:: { "acMaxApparentPower": <max apparent AC power>, "cosPhi": { "active": <active>, "value": <value>, "excited": <excited> }, "deviceState": { "connected": <connected>, "working": <working>, "inService": <in service> }, "frequency": { "under": <frequency under>, "over": <frequency over> }, "index": <pviIndex>, "lastError": <last error>, "maxPhaseCount": <max phase count>, "maxStringCount": <max string count>, "onGrid": <on grid>, "phases": { 0: { "power": <power>, "voltage": <voltage>, "current": <current>, "apparentPower": <apparent power>, "reactivePower": <reactive power>, "energyAll": <energy all>, "energyGridConsumption": <energy grid consumption> } }, "powerMode": <power mode>, "serialNumber": <serial number>, "state": <state>, "strings": { 0: { "power": <power>, "voltage": <voltage>, "current": <current>, "energyAll": <energy all> } }, "systemMode": <system mode>, "temperature": { "max": <max temperature>, "min": <min temperature>, "values": [<value>,<value>], }, "type": <type>, "version": <version>, "voltageMonitoring": { "thresholdTop": <voltage threshold top>, "thresholdBottom": <voltage threshold bottom>, "slopeUp": <voltage slope up>, "slopeDown": <voltage slope down>, } } """ if pviIndex is None: pviIndex = self.pvis[0]["index"] if phases is None and "phases" in self.pvis[0]: phases = list(range(0, self.pvis[0]["phases"])) req = self.sendRequest( ( RscpTag.PVI_REQ_DATA, RscpType.Container, [ (RscpTag.PVI_INDEX, RscpType.Uint16, pviIndex), (RscpTag.PVI_REQ_AC_MAX_PHASE_COUNT, RscpType.NoneType, None), (RscpTag.PVI_REQ_TEMPERATURE_COUNT, RscpType.NoneType, None), (RscpTag.PVI_REQ_DC_MAX_STRING_COUNT, RscpType.NoneType, None), (RscpTag.PVI_REQ_USED_STRING_COUNT, RscpType.NoneType, None), (RscpTag.PVI_REQ_TYPE, RscpType.NoneType, None), (RscpTag.PVI_REQ_SERIAL_NUMBER, RscpType.NoneType, None), (RscpTag.PVI_REQ_VERSION, RscpType.NoneType, None), (RscpTag.PVI_REQ_ON_GRID, RscpType.NoneType, None), (RscpTag.PVI_REQ_STATE, RscpType.NoneType, None), (RscpTag.PVI_REQ_LAST_ERROR, RscpType.NoneType, None), (RscpTag.PVI_REQ_COS_PHI, RscpType.NoneType, None), (RscpTag.PVI_REQ_VOLTAGE_MONITORING, RscpType.NoneType, None), (RscpTag.PVI_REQ_POWER_MODE, RscpType.NoneType, None), (RscpTag.PVI_REQ_SYSTEM_MODE, RscpType.NoneType, None), (RscpTag.PVI_REQ_FREQUENCY_UNDER_OVER, RscpType.NoneType, None), (RscpTag.PVI_REQ_MAX_TEMPERATURE, RscpType.NoneType, None), (RscpTag.PVI_REQ_MIN_TEMPERATURE, RscpType.NoneType, None), (RscpTag.PVI_REQ_AC_MAX_APPARENTPOWER, RscpType.NoneType, None), (RscpTag.PVI_REQ_DEVICE_STATE, RscpType.NoneType, None), ], ), keepAlive=True, ) maxPhaseCount = int(rscpFindTagIndex(req, RscpTag.PVI_AC_MAX_PHASE_COUNT)) maxStringCount = int(rscpFindTagIndex(req, RscpTag.PVI_DC_MAX_STRING_COUNT)) usedStringCount = int(rscpFindTagIndex(req, RscpTag.PVI_USED_STRING_COUNT)) voltageMonitoring = rscpFindTag(req, RscpTag.PVI_VOLTAGE_MONITORING) cosPhi = rscpFindTag(req, RscpTag.PVI_COS_PHI) frequency = rscpFindTag(req, RscpTag.PVI_FREQUENCY_UNDER_OVER) deviceState = rscpFindTag(req, RscpTag.PVI_DEVICE_STATE) outObj: dict[str, Any] = { "acMaxApparentPower": rscpFindTagIndex( rscpFindTag(req, RscpTag.PVI_AC_MAX_APPARENTPOWER), RscpTag.PVI_VALUE ), "cosPhi": { "active": rscpFindTagIndex(cosPhi, RscpTag.PVI_COS_PHI_IS_AKTIV), "value": rscpFindTagIndex(cosPhi, RscpTag.PVI_COS_PHI_VALUE), "excited": rscpFindTagIndex(cosPhi, RscpTag.PVI_COS_PHI_EXCITED), }, "deviceState": { "connected": rscpFindTagIndex( deviceState, RscpTag.PVI_DEVICE_CONNECTED ), "working": rscpFindTagIndex(deviceState, RscpTag.PVI_DEVICE_WORKING), "inService": rscpFindTagIndex( deviceState, RscpTag.PVI_DEVICE_IN_SERVICE ), }, "frequency": { "under": rscpFindTagIndex(frequency, RscpTag.PVI_FREQUENCY_UNDER), "over": rscpFindTagIndex(frequency, RscpTag.PVI_FREQUENCY_OVER), }, "index": pviIndex, "lastError": rscpFindTagIndex(req, RscpTag.PVI_LAST_ERROR), "maxPhaseCount": maxPhaseCount, "maxStringCount": maxStringCount, "onGrid": rscpFindTagIndex(req, RscpTag.PVI_ON_GRID), "phases": {}, "powerMode": rscpFindTagIndex(req, RscpTag.PVI_POWER_MODE), "serialNumber": rscpFindTagIndex(req, RscpTag.PVI_SERIAL_NUMBER), "state": rscpFindTagIndex(req, RscpTag.PVI_STATE), "strings": {}, "systemMode": rscpFindTagIndex(req, RscpTag.PVI_SYSTEM_MODE), "temperature": { "max": rscpFindTagIndex( rscpFindTag(req, RscpTag.PVI_MAX_TEMPERATURE), RscpTag.PVI_VALUE ), "min": rscpFindTagIndex( rscpFindTag(req, RscpTag.PVI_MIN_TEMPERATURE), RscpTag.PVI_VALUE ), "values": [], }, "type": rscpFindTagIndex(req, RscpTag.PVI_TYPE), "version": rscpFindTagIndex( rscpFindTag(req, RscpTag.PVI_VERSION), RscpTag.PVI_VERSION_MAIN ), "voltageMonitoring": { "thresholdTop": rscpFindTagIndex( voltageMonitoring, RscpTag.PVI_VOLTAGE_MONITORING_THRESHOLD_TOP ), "thresholdBottom": rscpFindTagIndex( voltageMonitoring, RscpTag.PVI_VOLTAGE_MONITORING_THRESHOLD_BOTTOM ), "slopeUp": rscpFindTagIndex( voltageMonitoring, RscpTag.PVI_VOLTAGE_MONITORING_SLOPE_UP ), "slopeDown": rscpFindTagIndex( voltageMonitoring, RscpTag.PVI_VOLTAGE_MONITORING_SLOPE_DOWN ), }, } temperatures = range( 0, int(rscpFindTagIndex(req, RscpTag.PVI_TEMPERATURE_COUNT)) ) for temperature in temperatures: req = self.sendRequest( ( RscpTag.PVI_REQ_DATA, RscpType.Container, [ (RscpTag.PVI_INDEX, RscpType.Uint16, pviIndex), (RscpTag.PVI_REQ_TEMPERATURE, RscpType.Uint16, temperature), ], ), keepAlive=True, ) outObj["temperature"]["values"].append( # type: ignore rscpFindTagIndex( rscpFindTag(req, RscpTag.PVI_TEMPERATURE), RscpTag.PVI_VALUE ) ) if phases is None: phases = list(range(0, maxPhaseCount)) for phase in phases: req = self.sendRequest( ( RscpTag.PVI_REQ_DATA, RscpType.Container, [ (RscpTag.PVI_INDEX, RscpType.Uint16, pviIndex), (RscpTag.PVI_REQ_AC_POWER, RscpType.Uint16, phase), (RscpTag.PVI_REQ_AC_VOLTAGE, RscpType.Uint16, phase), (RscpTag.PVI_REQ_AC_CURRENT, RscpType.Uint16, phase), (RscpTag.PVI_REQ_AC_APPARENTPOWER, RscpType.Uint16, phase), (RscpTag.PVI_REQ_AC_REACTIVEPOWER, RscpType.Uint16, phase), (RscpTag.PVI_REQ_AC_ENERGY_ALL, RscpType.Uint16, phase), ( RscpTag.PVI_REQ_AC_ENERGY_GRID_CONSUMPTION, RscpType.Uint16, phase, ), ], ), keepAlive=True, ) phaseobj = { "power": rscpFindTagIndex( rscpFindTag(req, RscpTag.PVI_AC_POWER), RscpTag.PVI_VALUE ), "voltage": rscpFindTagIndex( rscpFindTag(req, RscpTag.PVI_AC_VOLTAGE), RscpTag.PVI_VALUE ), "current": rscpFindTagIndex( rscpFindTag(req, RscpTag.PVI_AC_CURRENT), RscpTag.PVI_VALUE ), "apparentPower": rscpFindTagIndex( rscpFindTag(req, RscpTag.PVI_AC_APPARENTPOWER), RscpTag.PVI_VALUE, ), "reactivePower": rscpFindTagIndex( rscpFindTag(req, RscpTag.PVI_AC_REACTIVEPOWER), RscpTag.PVI_VALUE, ), "energyAll": rscpFindTagIndex( rscpFindTag(req, RscpTag.PVI_AC_ENERGY_ALL), RscpTag.PVI_VALUE ), "energyGridConsumption": rscpFindTagIndex( rscpFindTag(req, RscpTag.PVI_AC_ENERGY_GRID_CONSUMPTION), RscpTag.PVI_VALUE, ), } outObj["phases"].update({phase: phaseobj}) # type: ignore if strings is None: strings = list(range(0, usedStringCount)) for string in strings: req = self.sendRequest( ( RscpTag.PVI_REQ_DATA, RscpType.Container, [ (RscpTag.PVI_INDEX, RscpType.Uint16, pviIndex), (RscpTag.PVI_REQ_DC_POWER, RscpType.Uint16, string), (RscpTag.PVI_REQ_DC_VOLTAGE, RscpType.Uint16, string), (RscpTag.PVI_REQ_DC_CURRENT, RscpType.Uint16, string), ( RscpTag.PVI_REQ_DC_STRING_ENERGY_ALL, RscpType.Uint16, string, ), ], ), keepAlive=( True if string != strings[-1] else keepAlive ), # last request should honor keepAlive ) stringobj = { "power": rscpFindTagIndex( rscpFindTag(req, RscpTag.PVI_DC_POWER), RscpTag.PVI_VALUE ), "voltage": rscpFindTagIndex( rscpFindTag(req, RscpTag.PVI_DC_VOLTAGE), RscpTag.PVI_VALUE ), "current": rscpFindTagIndex( rscpFindTag(req, RscpTag.PVI_DC_CURRENT), RscpTag.PVI_VALUE ), "energyAll": rscpFindTagIndex( rscpFindTag(req, RscpTag.PVI_DC_STRING_ENERGY_ALL), RscpTag.PVI_VALUE, ), } outObj["strings"].update({string: stringobj}) # type: ignore return outObj
[docs] def get_pvis_data( self, pvis: list[dict[str, Any]] | None = None, keepAlive: bool = False ): """Polls the inverters data via rscp protocol. Args: pvis (dict | None): pvis dict keepAlive (bool): True to keep connection alive. Defaults to False. Returns: list[dict]: Returns a list of pvi data """ if pvis is None: pvis = self.pvis outObj: list[dict[str, Any]] = [] for pvi in pvis: if "strings" in pvi: strings = list(range(0, pvi["strings"])) else: strings = None if "phases" in pvi: phases = list(range(0, pvi["phases"])) else: phases = None outObj.append( self.get_pvi_data( pviIndex=pvi["index"], strings=strings, phases=phases, keepAlive=( True if pvi["index"] != pvis[-1]["index"] else keepAlive ), # last request should honor keepAlive ) ) return outObj
[docs] def get_powermeters(self, keepAlive: bool = False): """Scans for installed power meters via rscp protocol. Args: keepAlive (bool): True to keep connection alive. Defaults to False. Returns: list[dict]: List containing the found powermeters as follows.:: [ {'index': 0, 'type': 1, 'typeName': 'PM_TYPE_ROOT'}, {'index': 1, 'type': 4, 'typeName': 'PM_TYPE_ADDITIONAL_CONSUMPTION'} ] """ maxPowermeters = 8 outObj: list[dict[str, Any]] = [] for pmIndex in range( maxPowermeters ): # max 8 powermeters according to E3DC spec req = self.sendRequest( ( RscpTag.PM_REQ_DATA, RscpType.Container, [ (RscpTag.PM_INDEX, RscpType.Uint16, pmIndex), (RscpTag.PM_REQ_TYPE, RscpType.NoneType, None), ], ), keepAlive=True if pmIndex < (maxPowermeters - 1) else keepAlive, ) pmType = rscpFindTagIndex(req, RscpTag.PM_TYPE) if pmType is not None: outObj.append( { "index": pmIndex, "type": pmType, "typeName": getStrPowermeterType(pmType), } ) return outObj
[docs] def get_powermeter_data(self, pmIndex: int | None = None, keepAlive: bool = False): """Polls the power meter data via rscp protocol. Args: pmIndex (int | None): power meter index keepAlive (bool): True to keep connection alive. Defaults to False. Returns: dict: Dictionary containing the power data structured as follows:: { "activePhases": <active phases>, "energy": { "L1": <L1 energy>, "L2": <L2 energy>, "L3": <L3 energy> }, "index": <pm index>, "maxPhasePower": <max phase power>, "mode": <mode>, "power": { "L1": <L1 power>, "L2": <L2 power>, "L3": <L3 power> }, "type": <type>, "voltage": { "L1": <L1 voltage>, "L2": <L1 voltage>, "L3": <L1 voltage> } } """ if pmIndex is None: pmIndex = self.powermeters[0]["index"] res = self.sendRequest( ( RscpTag.PM_REQ_DATA, RscpType.Container, [ (RscpTag.PM_INDEX, RscpType.Uint16, pmIndex), (RscpTag.PM_REQ_POWER_L1, RscpType.NoneType, None), (RscpTag.PM_REQ_POWER_L2, RscpType.NoneType, None), (RscpTag.PM_REQ_POWER_L3, RscpType.NoneType, None), (RscpTag.PM_REQ_VOLTAGE_L1, RscpType.NoneType, None), (RscpTag.PM_REQ_VOLTAGE_L2, RscpType.NoneType, None), (RscpTag.PM_REQ_VOLTAGE_L3, RscpType.NoneType, None), (RscpTag.PM_REQ_ENERGY_L1, RscpType.NoneType, None), (RscpTag.PM_REQ_ENERGY_L2, RscpType.NoneType, None), (RscpTag.PM_REQ_ENERGY_L3, RscpType.NoneType, None), (RscpTag.PM_REQ_MAX_PHASE_POWER, RscpType.NoneType, None), (RscpTag.PM_REQ_ACTIVE_PHASES, RscpType.NoneType, None), (RscpTag.PM_REQ_TYPE, RscpType.NoneType, None), (RscpTag.PM_REQ_MODE, RscpType.NoneType, None), ], ), keepAlive=keepAlive, ) activePhasesChar = rscpFindTagIndex(res, RscpTag.PM_ACTIVE_PHASES) activePhases = f"{activePhasesChar:03b}" outObj = { "activePhases": activePhases, "energy": { "L1": rscpFindTagIndex(res, RscpTag.PM_ENERGY_L1), "L2": rscpFindTagIndex(res, RscpTag.PM_ENERGY_L2), "L3": rscpFindTagIndex(res, RscpTag.PM_ENERGY_L3), }, "index": pmIndex, "maxPhasePower": rscpFindTagIndex(res, RscpTag.PM_MAX_PHASE_POWER), "mode": rscpFindTagIndex(res, RscpTag.PM_MODE), "power": { "L1": rscpFindTagIndex(res, RscpTag.PM_POWER_L1), "L2": rscpFindTagIndex(res, RscpTag.PM_POWER_L2), "L3": rscpFindTagIndex(res, RscpTag.PM_POWER_L3), }, "type": rscpFindTagIndex(res, RscpTag.PM_TYPE), "voltage": { "L1": rscpFindTagIndex(res, RscpTag.PM_VOLTAGE_L1), "L2": rscpFindTagIndex(res, RscpTag.PM_VOLTAGE_L2), "L3": rscpFindTagIndex(res, RscpTag.PM_VOLTAGE_L3), }, } return outObj
[docs] def get_powermeters_data( self, powermeters: list[dict[str, Any]] | None = None, keepAlive: bool = False ): """Polls the powermeters data via rscp protocol. Args: powermeters (dict | None): powermeters dict keepAlive (bool): True to keep connection alive. Defaults to False. Returns: list[dict]: Returns a list of powermeters data """ if powermeters is None: powermeters = self.powermeters outObj: list[dict[str, Any]] = [] for powermeter in powermeters: outObj.append( self.get_powermeter_data( pmIndex=powermeter["index"], keepAlive=( True if powermeter["index"] != powermeters[-1]["index"] else keepAlive ), # last request should honor keepAlive ) ) return outObj
[docs] def get_power_settings(self, keepAlive: bool = False): """Polls the power settings via rscp protocol. Args: keepAlive (bool): True to keep connection alive. Defaults to False. Returns: dict: Dictionary containing the power settings structured as follows:: { "discharge_start_power": <minimum power requested to enable discharge>, "maxChargePower": <maximum charge power dependent on E3DC model>, "maxDischargePower": <maximum discharge power dependent on E3DC model>, "powerSaveEnabled": <status if power save is enabled>, "powerLimitsUsed": <status if power limites are enabled>, "weatherForecastMode": <Weather Forcast Mode>, "weatherRegulatedChargeEnabled": <status if weather regulated charge is enabled> } """ res = self.sendRequest( (RscpTag.EMS_REQ_GET_POWER_SETTINGS, RscpType.NoneType, None), keepAlive=keepAlive, ) dischargeStartPower = rscpFindTagIndex(res, RscpTag.EMS_DISCHARGE_START_POWER) maxChargePower = rscpFindTagIndex(res, RscpTag.EMS_MAX_CHARGE_POWER) maxDischargePower = rscpFindTagIndex(res, RscpTag.EMS_MAX_DISCHARGE_POWER) powerLimitsUsed = rscpFindTagIndex(res, RscpTag.EMS_POWER_LIMITS_USED) powerSaveEnabled = rscpFindTagIndex(res, RscpTag.EMS_POWERSAVE_ENABLED) weatherForecastMode = rscpFindTagIndex(res, RscpTag.EMS_WEATHER_FORECAST_MODE) weatherRegulatedChargeEnabled = rscpFindTagIndex( res, RscpTag.EMS_WEATHER_REGULATED_CHARGE_ENABLED ) outObj = { "dischargeStartPower": dischargeStartPower, "maxChargePower": maxChargePower, "maxDischargePower": maxDischargePower, "powerLimitsUsed": powerLimitsUsed, "powerSaveEnabled": powerSaveEnabled, "weatherForecastMode": weatherForecastMode, "weatherRegulatedChargeEnabled": weatherRegulatedChargeEnabled, } return outObj
[docs] def set_power_limits( self, enable: bool, max_charge: int | None = None, max_discharge: int | None = None, discharge_start: int | None = None, keepAlive: bool = False, ): """Setting the SmartPower power limits via rscp protocol. Args: enable (bool): True/False max_charge (int | None): maximum charge power max_discharge (int | None): maximum discharge power discharge_start (int | None): power where discharged is started keepAlive (bool): True to keep connection alive. Defaults to False. Returns: 0 if success -1 if error 1 if one value is nonoptimal """ if max_charge is None: max_charge = self.maxBatChargePower if max_discharge is None: max_discharge = self.maxBatDischargePower if discharge_start is None: discharge_start = self.startDischargeDefault if enable: res = self.sendRequest( ( RscpTag.EMS_REQ_SET_POWER_SETTINGS, RscpType.Container, [ (RscpTag.EMS_POWER_LIMITS_USED, RscpType.Bool, True), ( RscpTag.EMS_MAX_DISCHARGE_POWER, RscpType.Uint32, max_discharge, ), (RscpTag.EMS_MAX_CHARGE_POWER, RscpType.Uint32, max_charge), ( RscpTag.EMS_DISCHARGE_START_POWER, RscpType.Uint32, discharge_start, ), ], ), keepAlive=keepAlive, ) else: res = self.sendRequest( ( RscpTag.EMS_REQ_SET_POWER_SETTINGS, RscpType.Container, [(RscpTag.EMS_POWER_LIMITS_USED, RscpType.Bool, False)], ), keepAlive=keepAlive, ) # validate all return codes for each limit to be 0 for success, 1 for nonoptimal value and -1 for failure return_code = 0 for result in res[2]: if result[2] == -1: return_code = -1 elif result[2] == 1 and return_code == 0: return_code = 1 return return_code
[docs] def set_powersave(self, enable: bool, keepAlive: bool = False): """Setting the SmartPower power save via rscp protocol. Args: enable (bool): True/False keepAlive (bool): True to keep connection alive. Defaults to False. Returns: 0 if success -1 if error """ res = self.sendRequest( ( RscpTag.EMS_REQ_SET_POWER_SETTINGS, RscpType.Container, [(RscpTag.EMS_POWERSAVE_ENABLED, RscpType.UChar8, int(enable))], ), keepAlive=keepAlive, ) # Returns value of EMS_REQ_SET_POWER_SETTINGS, we get a success flag here, # that we normalize and push outside. # [ RscpTag.EMS_SET_POWER_SETTINGS, # RscpType.Container, # [ # [RscpTag.EMS_RES_POWERSAVE_ENABLED, "Char8", 0] # ] # ] if rscpFindTagIndex(res, RscpTag.EMS_RES_POWERSAVE_ENABLED) == 0: return 0 else: return -1
[docs] def set_weather_regulated_charge(self, enable: bool, keepAlive: bool = False): """Setting the SmartCharge weather regulated charge via rscp protocol. Args: enable (bool): True/False keepAlive (bool): True to keep connection alive. Defaults to False. Returns: 0 if success -1 if error """ if enable: res = self.sendRequest( ( RscpTag.EMS_REQ_SET_POWER_SETTINGS, RscpType.Container, [ ( RscpTag.EMS_WEATHER_REGULATED_CHARGE_ENABLED, RscpType.UChar8, 1, ) ], ), keepAlive=keepAlive, ) else: res = self.sendRequest( ( RscpTag.EMS_REQ_SET_POWER_SETTINGS, RscpType.Container, [ ( RscpTag.EMS_WEATHER_REGULATED_CHARGE_ENABLED, RscpType.UChar8, 0, ) ], ), keepAlive=keepAlive, ) # validate return code for EMS_RES_WEATHER_REGULATED_CHARGE_ENABLED is 0 if res[2][0][2] == 0: return 0 else: return -1