Source code for e3dc._e3dc_rscp_local

#!/usr/bin/env python
# Python class to connect to an E3/DC system.
#
# Copyright 2017 Francesco Santini <francesco.santini@gmail.com>
# Licensed under a MIT license. See LICENSE for details

import socket

from ._RSCPEncryptDecrypt import RSCPEncryptDecrypt
from ._rscpLib import RscpMessage, rscpDecode, rscpEncode, rscpFrame
from ._rscpTags import RscpError, RscpTag, RscpType

DEFAULT_PORT = 5033
BUFFER_SIZE = 1024 * 32


[docs] class RSCPAuthenticationError(Exception): """Class for RSCP Authentication Error Exception.""" pass
class RSCPNotAvailableError(Exception): """Class for RSCP Not Available Error Exception.""" pass
[docs] class RSCPKeyError(Exception): """Class for RSCP Encryption Key Error Exception.""" pass
[docs] class CommunicationError(Exception): """Class for Communication Error Exception.""" pass
class E3DC_RSCP_local: """A class describing an E3DC system connection using RSCP protocol locally.""" def __init__( self, username: str, password: str, ip: str, key: str, port: int | None = None ): """Constructor of an E3DC RSCP local object. Args: username (str): username password (str): password (plain text) ip (str): IP address of the E3DC system key (str): encryption key as set in the E3DC settings port (int, optional): port number. Defaults to PORT. """ self.username = username.encode("utf-8") self.password = password.encode("utf-8") self.ip = ip self.port = port or DEFAULT_PORT self.key = key.encode("utf-8") self.socket: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.connected: bool = False self.encdec: RSCPEncryptDecrypt self.processedData = None def _send(self, plainMsg: RscpMessage) -> None: sendData = rscpFrame(rscpEncode(plainMsg)) encData = self.encdec.encrypt(sendData) self.socket.send(encData) def _receive(self): data = self.socket.recv(BUFFER_SIZE) if len(data) == 0: raise RSCPKeyError decData = rscpDecode(self.encdec.decrypt(data))[0] return decData def sendCommand(self, plainMsg: RscpMessage) -> None: """Sending RSCP command. Args: plainMsg (tuple): plain message """ self.sendRequest(plainMsg) # same as sendRequest but doesn't return a value def sendRequest(self, plainMsg: RscpMessage) -> RscpMessage: """Sending RSCP request. Args: plainMsg (tuple): plain message Returns: tuple: received message """ try: self._send(plainMsg) receive = self._receive() except RSCPKeyError: self.disconnect() raise except Exception: self.disconnect() raise CommunicationError if receive[1] == "Error": self.disconnect() if receive[2] == RscpError.RSCP_ERR_ACCESS_DENIED.name: raise RSCPAuthenticationError elif receive[2] == RscpError.RSCP_ERR_NOT_AVAILABLE.name: raise RSCPNotAvailableError else: raise CommunicationError(receive[2]) return receive def connect(self) -> None: """Establishes connection to the E3DC system.""" try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.settimeout(5) self.socket.connect((self.ip, self.port)) self.processedData = None self.connected = True except Exception: self.disconnect() raise CommunicationError self.encdec = RSCPEncryptDecrypt(self.key) self.sendRequest( ( RscpTag.RSCP_REQ_AUTHENTICATION, RscpType.Container, [ (RscpTag.RSCP_AUTHENTICATION_USER, RscpType.CString, self.username), ( RscpTag.RSCP_AUTHENTICATION_PASSWORD, RscpType.CString, self.password, ), ], ) ) def disconnect(self) -> None: """Disconnects from the E3DC system.""" self.socket.close() self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.connected = False def isConnected(self) -> bool: """Validate connection status. Returns: bool: true if connected """ return self.connected