Source code for e3dc._rscpLib

#!/usr/bin/env python
# Python class to connect to an E3/DC system.
#
# Copyright 2017 Francesco Santini <francesco.santini@gmail.com>
# Licensed under a MIT license. See LICENSE for details

import math
import struct
import time
import zlib
from typing import Any, TypeAlias, cast

from ._rscpTags import (
    RscpTag,
    RscpType,
    getHexRscpTag,
    getHexRscpType,
    getRscpType,
    getStrRscpError,
    getStrRscpTag,
    getStrRscpType,
)

# Type alias for RSCP messages
RscpMessage: TypeAlias = tuple[str | int | RscpTag, str | int | RscpType, Any]

DEBUG_DICT = {"print_rscp": False}


def set_debug(debug: bool):
    """Turns debug on/off.

    Args:
        debug (bool): the status

    Returns:
        Nothing
    """
    DEBUG_DICT["print_rscp"] = debug


packFmtDict_FixedSize = {
    RscpType.Bool: "?",
    RscpType.Char8: "b",
    RscpType.UChar8: "B",
    RscpType.Int16: "h",
    RscpType.Uint16: "H",
    RscpType.Int32: "i",
    RscpType.Uint32: "I",
    RscpType.Int64: "q",
    RscpType.Uint64: "Q",
    RscpType.Float32: "f",
    RscpType.Double64: "d",
}

packFmtDict_VarSize = {
    RscpType.Bitfield: "s",
    RscpType.CString: "s",
    RscpType.Container: "s",
    RscpType.ByteArray: "s",
    RscpType.Error: "s",
}


def rscpFindTag(
    decodedMsg: RscpMessage | None,
    tag: int | str | RscpTag,
) -> RscpMessage | None:
    """Finds a submessage with a specific tag.

    Args:
    decodedMsg (tuple): the decoded message
    tag (RscpTag): the RSCP Tag to search for

    Returns:
        list: the found tag
    """
    try:
        tagStr = getStrRscpTag(tag)
    except KeyError:
        # Tag is unknown to this library
        return None

    if decodedMsg is None:
        return None
    if decodedMsg[0] == tagStr:
        return decodedMsg
    if isinstance(decodedMsg[2], list):
        msgList: list[RscpMessage] = cast(list[RscpMessage], decodedMsg[2])
        for msg in msgList:
            msgValue = rscpFindTag(msg, tag)
            if msgValue is not None:
                return msgValue
    return None


def rscpFindTagIndex(
    decodedMsg: RscpMessage | None,
    tag: int | str | RscpTag,
    index: int = 2,
) -> Any:
    """Finds a submessage with a specific tag and extracts an index.

    Args:
    decodedMsg (Tuple): the decoded message
    tag (RscpTag): the RSCP Tag to search for
    index (int): the index of the found tag to return. Default is 2, the value of the Tag.

    Returns:
        the content of the configured index for the tag.
    """
    res = rscpFindTag(decodedMsg, tag)
    if res is not None:
        return res[index]
    else:
        return None


def endianSwapUint16(val: int):
    """Endian swaps magic and ctrl."""
    return struct.unpack("<H", struct.pack(">H", val))[0]


[docs] class FrameError(Exception): """Class for Frame Error Exception.""" pass
def rscpEncode( tag: int | str | RscpTag | RscpMessage, rscptype: int | str | RscpType | None = None, data: Any = None, ) -> bytes: """RSCP encodes data.""" if isinstance(tag, tuple): rscptype = tag[1] data = tag[2] tag = tag[0] elif rscptype is None: raise TypeError("Second argument must not be none if first is not a tuple") tagHex = getHexRscpTag(tag) rscptypeHex = getHexRscpType(rscptype) rscptype = getRscpType(rscptype) if DEBUG_DICT["print_rscp"]: print(">", tag, rscptype, data) if isinstance(data, str): data = data.encode("utf-8") packFmt = ( "<IBH" # format of header: little-endian, Uint32 tag, Uint8 type, Uint16 length ) headerLen = struct.calcsize(packFmt) if rscptype == RscpType.NoneType: # special case: no content return struct.pack(packFmt, tagHex, rscptypeHex, 0) elif ( rscptype == RscpType.Timestamp ): # timestamp has a special format, divided into 32 bit integers ts = int(data / 1000) # this is int64 ms = (data - ts * 1000) * 1e6 # ms are multiplied by 10^6 hiword = ts >> 32 loword = ts & 0xFFFFFFFF packFmt += "iii" length = struct.calcsize(packFmt) - headerLen return struct.pack(packFmt, tagHex, rscptypeHex, length, hiword, loword, ms) elif rscptype == RscpType.Container: if isinstance(data, list): newData = b"" dataList: list[RscpMessage] = cast(list[RscpMessage], data) for dataChunk in dataList: newData += rscpEncode( dataChunk[0], dataChunk[1], dataChunk[2] ) # transform each dataChunk into byte array data = newData packFmt += str(len(data)) + packFmtDict_VarSize[rscptype] elif rscptype in packFmtDict_FixedSize: packFmt += packFmtDict_FixedSize[rscptype] elif rscptype in packFmtDict_VarSize: packFmt += str(len(data)) + packFmtDict_VarSize[rscptype] length = struct.calcsize(packFmt) - headerLen return struct.pack(packFmt, tagHex, rscptypeHex, length, data) def rscpFrame(data: bytes) -> bytes: """Generates RSCP frame.""" magic = endianSwapUint16(0xE3DC) ctrl = endianSwapUint16(0x11) t = time.time() sec1 = math.ceil(t) sec2 = 0 ns = round((t - int(t)) * 1000) length = len(data) packFmt = "<HHIIIH" + str(length) + "s" frame = struct.pack(packFmt, magic, ctrl, sec1, sec2, ns, length, data) crc = zlib.crc32(frame) % (1 << 32) # unsigned crc32 frame += struct.pack("<I", crc) return frame def rscpFrameDecode(frameData: bytes, returnFrameLen: bool = False): """Decodes RSCP Frame.""" headerFmt = "<HHIIIH" crcFmt = "I" crc = None magic, ctrl, sec1, _, ns, length = struct.unpack( headerFmt, frameData[: struct.calcsize(headerFmt)] ) magic = endianSwapUint16(magic) ctrl = endianSwapUint16(ctrl) if ctrl & 0x10: # crc enabled totalLen = struct.calcsize(headerFmt) + length + struct.calcsize(crcFmt) data, crc = struct.unpack( "<" + str(length) + "s" + crcFmt, frameData[struct.calcsize(headerFmt) : totalLen], ) else: totalLen = struct.calcsize(headerFmt) + length data = struct.unpack( "<" + str(length) + "s", frameData[struct.calcsize(headerFmt) : totalLen] )[0] # check crc if crc is not None: crcCalc = zlib.crc32(frameData[: -struct.calcsize("<" + crcFmt)]) % ( 1 << 32 ) # unsigned crc32 if crcCalc != crc: raise FrameError("CRC32 not validated") timestamp = sec1 + float(ns) / 1000 if returnFrameLen: return data, timestamp, totalLen else: return data, timestamp def rscpDecode( data: bytes, ) -> tuple[RscpMessage, int]: """Decodes RSCP data.""" headerFmt = ( "<IBH" # format of header: little-endian, Uint32 tag, Uint8 type, Uint16 length ) headerSize = struct.calcsize(headerFmt) magicCheckFmt = ">H" magic = struct.unpack(magicCheckFmt, data[: struct.calcsize(magicCheckFmt)])[0] if magic == 0xE3DC: # we have a frame: decode it # print "Decoding frame in rscpDecode" return rscpDecode(rscpFrameDecode(data)[0]) # decode header hexTag, hexType, length = struct.unpack( headerFmt, data[: struct.calcsize(headerFmt)] ) # print (hex(hexTag), hex(hexType), length, data[struct.calcsize(headerFmt):]) strTag = getStrRscpTag(hexTag) strType = getStrRscpType(hexType) type_ = getRscpType(hexType) if type_ == RscpType.Container: # this is a container: parse the inside dataList: list[RscpMessage] = [] curByte = headerSize while curByte < headerSize + length: innerData, usedLength = rscpDecode(data[curByte:]) curByte += usedLength dataList.append(innerData) return (strTag, strType, dataList), curByte elif type_ == RscpType.Timestamp: fmt = "<iii" hiword, loword, ms = struct.unpack( fmt, data[headerSize : headerSize + struct.calcsize(fmt)] ) # t = float((hiword << 32) + loword) + (float(ms)*1e-9) # this should work, but doesn't t = float(hiword + loword) + (float(ms) * 1e-9) # this seems to be correct return (strTag, strType, t), headerSize + struct.calcsize(fmt) elif type_ == RscpType.NoneType: return (strTag, strType, None), headerSize elif type_ in packFmtDict_FixedSize: fmt = "<" + packFmtDict_FixedSize[type_] elif type_ in packFmtDict_VarSize: fmt = "<" + str(length) + packFmtDict_VarSize[type_] else: raise Exception("data can't be decoded") val = struct.unpack(fmt, data[headerSize : headerSize + struct.calcsize(fmt)])[0] if type_ == RscpType.Error: val = getStrRscpError(int.from_bytes(val, "little")) elif isinstance(val, bytes) and type_ == RscpType.CString: # return string instead of bytes # ignore none utf-8 bytes val = val.decode("utf-8", "ignore") if DEBUG_DICT["print_rscp"]: print("<", strTag, strType, val) return (strTag, strType, val), headerSize + struct.calcsize(fmt)