Source code for e3dc._e3dc_rscp_web

#!/usr/bin/env python
# Python class to connect to an E3/DC system.
#
# Copyright 2017 Francesco Santini <francesco.santini@gmail.com>
# Licensed under a MIT license. See LICENSE for details
import datetime
import hashlib
import struct
import threading
import time
from typing import Callable

import tzlocal
from websocket import ABNF, WebSocketApp

from ._rscpLib import (
    RscpMessage,
    rscpDecode,
    rscpEncode,
    rscpFindTag,
    rscpFindTagIndex,
    rscpFrame,
    rscpFrameDecode,
)
from ._rscpTags import RscpTag, RscpType, getRscpTag

"""
 The connection works the following way: (> outgoing, < incoming)

> [connect]
< SERVER_REGISTER_CONNECTION (conId, authLevel)
> SERVER_CONNECTION_REGISTERED
< SERVER_REQ_RSCP_CMD (information request from server)
> SERVER_REQ_RSCP_CMD (information back)
< INFO_SERIAL_NUMBER (webSerialno)
> SERVER_REQ_NEW_VIRTUAL_CONNECTION (username, password, sn)
< SERVER_REGISTER_CONNECTION (virtConId, virtAuthLevel)
> SERVER_CONNECTION_REGISTERED

 the connection is now established
 
 Communication is through SERVER_REQ_RSCP_CMD which is a container with:
 virtConId, virtAuthLevel, innerFrame
 
 and the innerframe is an RSCP frame with the appropriate command (HA_REQ_ACTUATOR_STATES for example)
 
"""

REMOTE_ADDRESS = "wss://s10.e3dc.com/ws"


[docs] class SocketNotReady(Exception): """Class for Socket Not Ready Exception.""" pass
[docs] class RequestTimeoutError(Exception): """Class for Request Timeout Error Exception.""" pass
def calcTimeZone(): """Method to calculate time zone. Returns: str: timezone string int: UTC diff """ localtz = tzlocal.get_localzone() naiveNow = datetime.datetime.now() utcDiff = localtz.utcoffset(naiveNow) # this is a timedelta utcDiffS = utcDiff.total_seconds() if utcDiff else 0 utcDiffH = int(utcDiffS / 60 / 60) # this is local - utc (eg 2 = UTC+2) TIMEZONE_STR = "GMT" + ("+" if utcDiffH > 0 else "-") + str(abs(utcDiffH)) return TIMEZONE_STR, utcDiffS def timestampEncode(ts: float): """Method to encode timestamp. Args: ts (time): timestamp """ sec = float(int(ts)) ms = int((ts - sec) * 1000) return struct.pack("<dI", sec, ms) class E3DC_RSCP_web: """A class describing an E3DC system connection using RSCP protocol over web.""" TIMEOUT = 10 # timeout in sec def __init__( self, username: str, password: str, serialNumberWithPrefix: str, isPasswordMd5: bool = True, ): """Constructor of an E3DC RSCP web object. Args: username (string): the user name to the E3DC portal password (string): the password (as md5 digest by default) serialNumberWithPrefix (string): the serial number of the system to monitor isPasswordMd5 (boolean, optional): indicates whether the password is already md5 digest (recommended, default = True) """ self.username = username.encode("utf-8") self.password = password.encode("utf-8") if isPasswordMd5: self.password = password else: self.password = hashlib.md5(bytes(password, "UTF-8")).hexdigest() self.serialNumberWithPrefix = serialNumberWithPrefix.encode("utf-8") self.ws = WebSocketApp( REMOTE_ADDRESS, on_message=lambda _, msg: self.on_message(msg), on_close=lambda _ws, _, __: self.reset(), on_error=lambda _ws, _: self.reset(), ) self.reset() def reset(self): """Method to reset E3DC rscp web instance.""" self.ws.close() # pyright: ignore [reportUnknownMemberType] self.conId: int = 0 self.authLevel = None self.virtConId = None self.virtAuthLevel = None self.webSerialno = None self.responseCallback: Callable[[RscpMessage], None] self.responseCallbackCalled = False self.requestResult: RscpMessage def buildVirtualConn(self): """Method to create Virtual Connection.""" virtualConn = rscpFrame( rscpEncode( RscpTag.SERVER_REQ_NEW_VIRTUAL_CONNECTION, RscpType.Container, [ (RscpTag.SERVER_USER, RscpType.CString, self.username), (RscpTag.SERVER_PASSWD, RscpType.CString, self.password), ( RscpTag.SERVER_IDENTIFIER, RscpType.CString, self.serialNumberWithPrefix, ), (RscpTag.SERVER_TYPE, RscpType.Int32, 4), (RscpTag.SERVER_HASH_CODE, RscpType.Int32, 1234567890), ], ) ) # print("--------------------- Sending virtual conn") self.ws.send(virtualConn, ABNF.OPCODE_BINARY) def respondToINFORequest(self, decoded: RscpMessage): """Create Response to INFO request.""" TIMEZONE_STR, utcDiffS = calcTimeZone() try: tag = getRscpTag(decoded[0]) except KeyError: # This is a tag unknown to this library return None if tag == RscpTag.INFO_REQ_IP_ADDRESS: return rscpEncode(RscpTag.INFO_IP_ADDRESS, RscpType.CString, "0.0.0.0") elif tag == RscpTag.INFO_REQ_SUBNET_MASK: return rscpEncode(RscpTag.INFO_SUBNET_MASK, RscpType.CString, "0.0.0.0") elif tag == RscpTag.INFO_REQ_GATEWAY: return rscpEncode(RscpTag.INFO_GATEWAY, RscpType.CString, "0.0.0.0") elif tag == RscpTag.INFO_REQ_DNS: return rscpEncode(RscpTag.INFO_DNS, RscpType.CString, "0.0.0.0") elif tag == RscpTag.INFO_REQ_DHCP_STATUS: return rscpEncode(RscpTag.INFO_DHCP_STATUS, RscpType.Bool, "false") elif tag == RscpTag.INFO_REQ_TIME: return rscpEncode( RscpTag.INFO_TIME, RscpType.ByteArray, timestampEncode(time.time()) ) elif tag == RscpTag.INFO_REQ_TIME_ZONE: return rscpEncode(RscpTag.INFO_TIME_ZONE, RscpType.CString, TIMEZONE_STR) elif tag == RscpTag.INFO_REQ_UTC_TIME: return rscpEncode( RscpTag.INFO_UTC_TIME, RscpType.ByteArray, timestampEncode(time.time() - utcDiffS), ) elif tag == RscpTag.INFO_REQ_A35_SERIAL_NUMBER: return rscpEncode( RscpTag.INFO_A35_SERIAL_NUMBER, RscpType.CString, "123456" ) elif tag == RscpTag.INFO_REQ_INFO: return rscpEncode( RscpTag.INFO_INFO, RscpType.Container, [ ( RscpTag.INFO_SERIAL_NUMBER, RscpType.CString, "WEB_" + hashlib.md5(self.username + bytes(self.conId)).hexdigest(), ), ( RscpTag.INFO_PRODUCTION_DATE, RscpType.CString, "570412800000", ), ( RscpTag.INFO_MAC_ADDRESS, RscpType.CString, "00:00:00:00:00:00", ), ], ) elif tag == RscpTag.INFO_SERIAL_NUMBER: self.webSerialno = decoded[2] self.buildVirtualConn() return "" return None # this is no standard request def registerConnectionHandler(self, decodedMsg: RscpMessage): """Registering Connection Handler.""" if self.conId == 0: self.conId = rscpFindTagIndex(decodedMsg, RscpTag.SERVER_CONNECTION_ID) self.authLevel = rscpFindTagIndex(decodedMsg, RscpTag.SERVER_AUTH_LEVEL) else: self.virtConId = rscpFindTagIndex(decodedMsg, RscpTag.SERVER_CONNECTION_ID) self.virtAuthLevel = rscpFindTagIndex(decodedMsg, RscpTag.SERVER_AUTH_LEVEL) # reply = rscpFrame(rscpEncode(RscpTag.SERVER_CONNECTION_REGISTERED, RscpType.Container, [decodedMsg[2][0], decodedMsg[2][1]])); reply = rscpFrame( rscpEncode( RscpTag.SERVER_CONNECTION_REGISTERED, RscpType.Container, [ rscpFindTag(decodedMsg, RscpTag.SERVER_CONNECTION_ID), rscpFindTag(decodedMsg, RscpTag.SERVER_AUTH_LEVEL), ], ) ) self.ws.send(reply, ABNF.OPCODE_BINARY) def on_message(self, message: bytes): """Method to handle a received message.""" # print "Received message", message if len(message) == 0: return decodedMsg = rscpDecode(message)[0] try: tag = getRscpTag(decodedMsg[0]) except KeyError: # This is a tag unknown to this library raise # print "Decoded received message", decodedMsg if tag == RscpTag.SERVER_REQ_PING: pingFrame = rscpFrame( rscpEncode(RscpTag.SERVER_PING, RscpType.NoneType, None) ) self.ws.send( pingFrame, ABNF.OPCODE_BINARY, ) return elif tag == RscpTag.SERVER_REGISTER_CONNECTION: self.registerConnectionHandler(decodedMsg) elif tag == RscpTag.SERVER_UNREGISTER_CONNECTION: # this signifies some error self.disconnect() elif tag == RscpTag.SERVER_REQ_RSCP_CMD: data = rscpFrameDecode( rscpFindTagIndex(decodedMsg, RscpTag.SERVER_RSCP_DATA) )[0] response = b"" self.responseCallbackCalled = False while len(data) > 0: ( decoded, size, # pyright: ignore [reportUnusedVariable] ) = rscpDecode(data) # print "Inner frame chunk decoded", decoded data = data[size:] responseChunk = self.respondToINFORequest(decoded) if responseChunk is None: # this is not a standard request: call the registered callback self.responseCallback( decoded ) # !!! Important!!! This is where the callback is called with the decoded inner frame self.responseCallbackCalled = True responseChunk = b"" if isinstance(responseChunk, str): responseChunk = responseChunk.encode("utf-8") response += responseChunk if len(response) == 0: return # do not send an empty response innerFrame = rscpFrame(response) responseContainer = rscpEncode( RscpTag.SERVER_REQ_RSCP_CMD, RscpType.Container, [ (RscpTag.SERVER_CONNECTION_ID, RscpType.Int64, self.conId), (RscpTag.SERVER_AUTH_LEVEL, RscpType.UChar8, self.authLevel), (RscpTag.SERVER_RSCP_DATA_LEN, RscpType.Int32, len(innerFrame)), (RscpTag.SERVER_RSCP_DATA, RscpType.ByteArray, innerFrame), ], ) self.ws.send( rscpFrame(responseContainer), ABNF.OPCODE_BINARY, ) def _defaultRequestCallback(self, msg: RscpMessage): self.requestResult = msg def sendRequest(self, message: RscpMessage) -> RscpMessage: """Send a request and wait for a response.""" self._sendRequest_internal(rscpFrame(rscpEncode(message))) for _ in range(self.TIMEOUT * 10): if self.responseCallbackCalled: break time.sleep(0.1) if not self.responseCallbackCalled: raise RequestTimeoutError return self.requestResult def sendCommand(self, message: RscpMessage): """Send a command.""" return self._sendRequest_internal(rscpFrame(rscpEncode(message))) def _sendRequest_internal( self, innerFrame: bytes | RscpMessage, callback: Callable[[RscpMessage], None] | None = None, ): """Internal send request method. Args: innerFrame (tuple | bytes): inner frame callback (str): callback method synchronous (bool): If True, the method waits for a response (i.e. exits after calling callback). If True and callback = None, the method returns the (last) response message """ if not self.isConnected(): raise SocketNotReady if isinstance(innerFrame, tuple): # if innerframe is a tuple then the message is not encoded innerFrame = rscpFrame(rscpEncode(*innerFrame)) # self.requestResult = None self.responseCallbackCalled = False if callback is None: self.responseCallback = lambda msg: self._defaultRequestCallback(msg) # type: ignore else: self.responseCallback = callback outerFrame = rscpFrame( rscpEncode( RscpTag.SERVER_REQ_RSCP_CMD, RscpType.Container, [ (RscpTag.SERVER_CONNECTION_ID, RscpType.Int64, self.virtConId), ( RscpTag.SERVER_AUTH_LEVEL, RscpType.UChar8, self.virtAuthLevel, ), (RscpTag.SERVER_RSCP_DATA_LEN, RscpType.Int32, len(innerFrame)), (RscpTag.SERVER_RSCP_DATA, RscpType.ByteArray, innerFrame), ], ) ) self.ws.send(outerFrame, ABNF.OPCODE_BINARY) def connect(self): """Connect to E3DC system.""" self.reset() self.thread = threading.Thread( target=self.ws.run_forever # pyright: ignore [reportUnknownMemberType, reportUnknownArgumentType] ) self.thread.start() for _ in range(self.TIMEOUT * 10): if self.isConnected(): break time.sleep(0.1) if not self.isConnected(): raise RequestTimeoutError def disconnect(self): """Disconnect from E3DC system.""" self.reset() def isConnected(self): """Validate connection status. Returns: bool: true if connected """ return self.virtConId is not None