from typing import *
import copy
import datetime
[docs]def handle_key_not_found(d: dict, key: str):
return d.get(key).items() if d.get(key) != None else []
[docs]def field(default_factory):
return default_factory()
[docs]def make_dict():
return {}
[docs]def normalize(xs: Dict[str, Any], key: Callable[[Any], str] = None):
""" Update xs dictionary keys according to key.
By default key is `lambda x : x.name`
:param xs: Dictionary containing spec node
:param key: Function that returns the key for a particular node
"""
if key == None:
key = lambda x: x.name
aux = []
for k, x in xs.items():
if k != key(x):
aux.append((k, key(x)))
for k, key in aux:
xs[key] = xs[k]
del xs[k]
[docs]class Log:
""" Log protocol node.
:param id: Log integer identifier.
:param name: Name of the Log node.
:param n_args: Number of arguments in the Log node.
:param comment: Description of the Log node
:param string: Display string for the Log node.
"""
def __init__(
self,
parent: "Spec" = None,
id: int = 0,
name: str = "",
n_args: int = 3,
comment: str = "",
string: str = "",
):
self.parent = parent
assert self.parent is not None
c = max([log.id for log in self.parent.logs.values()]+[0]) + 1
self.id = id or c
self.name = name
self.n_args = n_args
self.comment = comment
self.string = string
self.creation_date = datetime.datetime.now()
[docs] def compile(self) -> Dict[str, Any]:
""" Transform python class node to its dictionary representation.
:return: A dictionary containing the node parameters
"""
d = copy.deepcopy(self.__dict__)
del d["creation_date"]
del d["parent"]
return d
[docs] def decompile(self, d: Dict[str, Any]) -> None:
""" Transform node dictionary representation into a python class.
:param d: Node dictionary
"""
self.__dict__.update(d)
[docs] def get_id(self) -> int:
return self.id
[docs] def get_name(self) -> str:
return self.name
[docs] def get_n_args(self) -> int:
return self.n_args
[docs] def get_string(self) -> str:
return self.string
[docs] def set_id(self, id: int) -> None:
try:
self.id = int(id)
except Exception as e:
return
[docs] def set_name(self, name: str) -> None:
try:
self.name = name
except Exception as e:
return
[docs] def set_n_args(self, n_args: int) -> None:
try:
self.n_args = int(n_args)
except Exception as e:
return
[docs] def set_string(self, string: str) -> None:
try:
self.string = string
except Exception as e:
return
def __hash__(self):
return hash((self.name, self.id, self.creation_date))
def __repr__(self):
return "name: {}, id: {}, string: {}, n_args: {}, comment: {}".format(self.name, self.id, self.string, self.n_args, self.comment)
[docs]class EnumValue:
""" Fcp EnumValue. C lookalike for FCP type definitions with name-value
associations.
"""
def __init__(self, parent: "Enum" = None) -> None:
self.parent = parent
assert self.parent is not None
c = max([value.value for value in self.parent.enumeration.values()] + [0])+1
self.name = ""
self.value = c
self.creation_date = datetime.datetime.now()
[docs] def compile(self) -> Dict[str, Any]:
d = copy.deepcopy(self.__dict__)
del d["creation_date"]
del d["parent"]
return d
[docs] def decompile(self, d: Dict[str, Any]) -> None:
""" Transform node dictionary representation into a python class.
:param d: Node dictionary
"""
self.__dict__.update(d)
[docs] def get_name(self) -> str:
return self.name
[docs] def set_name(self, name: str) -> None:
self.name = name
[docs] def get_value(self) -> int:
return self.value
[docs] def set_value(self, value: int) -> None:
self.value = int(value)
def __hash__(self):
return hash((self.name, self.creation_date))
def __repr__(self):
return "name: {}".format(self.name)
[docs]class Enum:
""" Fcp Enum. C lookalike for FCP type definitions with name-value
associations.
"""
def __init__(self, parent: "Spec" = None) -> None:
self.parent = parent
self.name = ""
self.enumeration = {}
self.creation_date = datetime.datetime.now()
[docs] def compile(self) -> Dict[str, Any]:
enums = {}
for k, v in self.enumeration.items():
enums[k] = v.compile()
d = copy.deepcopy(self.__dict__)
d["enumeration"] = enums
del d["creation_date"]
del d["parent"]
return d
[docs] def decompile(self, d: Dict[str, Any]) -> None:
""" Transform node dictionary representation into a python class.
:param d: Node dictionary
"""
enumeration = d["enumeration"]
del d["enumeration"]
self.__dict__.update(d)
for k,v in enumeration.items():
enum_value = EnumValue(self)
enum_value.decompile(v)
self.enumeration[k] = enum_value
[docs] def get_name(self) -> str:
return self.name
[docs] def set_name(self, name: str) -> None:
self.name = name
[docs] def get_value(self) -> int:
return int(self.value)
[docs] def set_value(self, name: int) -> None:
self.value = int(value)
[docs] def normalize(self):
normalize(self.enumeration)
def __hash__(self):
return hash((self.name, self.creation_date))
def __repr__(self):
return "name: {}".format(self.name)
[docs]class Spec:
""" FCP root node. Holds all Devices, Messages, Signals, Logs, Configs,
Commands and Arguments.
"""
def __init__(self) -> None:
self.devices = {}
self.common = Common()
self.logs = {}
self.enums = {}
self.version = "0.2"
self.parent = None
self.name = ""
[docs] def add_device(self, device: "Device") -> bool:
""" Add a Device to Spec.
:param device: Device to be added
:return: Operation success status: True - Success, False - Failure
"""
if device is None:
return False
if device.name in self.devices.keys():
return False
self.devices[device.name] = device
return True
[docs] def add_enum(self, enum: "Enum") -> bool:
""" Add a Enum to Spec.
:param enum: Enum to be added
:return: Operation success status: True - Success, False - Failure
"""
if enum == None:
return False
if enum.name in self.enums.keys():
return False
self.enums[enum.name] = enum
return True
[docs] def add_log(self, log: "Log") -> bool:
""" Add a Log to Spec.
:param log: Log to be added
:return: Operation success status: True - Success, False - Failure
"""
if log == None:
return False
if log.name in self.logs.keys():
return False
self.logs[log.name] = log
return True
[docs] def get_device(self, name: str) -> Optional["Device"]:
""" Get a Device from Spec by its name.
:param name: Device name.
:return: Device or None if not found.
"""
return self.devices.get(name)
[docs] def get_log(self, name: str) -> Optional["Log"]:
""" Get a Log from Spec by its name.
:param name: Log name.
:return: Log or None if not found.
"""
return self.logs.get(name)
[docs] def rm_node(self, node: Any) -> None:
""" Remove a node from Spec.
:param node: node to be removed.
"""
if type(node) == Device:
self.rm_device(node)
elif type(node) == Message:
self.rm_message(node)
elif type(node) == Signal:
self.rm_signal(node)
elif type(node) == Config:
self.rm_config(node)
elif type(node) == Command:
self.rm_cmd(node)
elif type(node) == Log:
self.rm_log(node)
elif type(node) == Enum:
self.rm_enum(node)
elif type(node) == EnumValue:
self.rm_enum_value(node)
[docs] def rm_device(self, device: "Device") -> None:
""" Remove a Device from Spec.
:param device: Device to be removed.
"""
devs = [dev.name for dev in self.devices.values() if dev == device]
for name in devs:
del self.devices[name]
[docs] def rm_message(self, message: "Message") -> None:
""" Remove a Message from Spec.
:param message: Message to be removed.
"""
for dev in self.devices.values():
msgs = []
for msg in dev.msgs.values():
if msg == message:
msgs.append(msg.name)
for name in msgs:
dev.rm_msg(name)
[docs] def rm_signal(self, signal: "Signal") -> None:
""" Remove a Signal from Spec.
:param signal: Signal to be removed.
"""
for dev in self.devices.values():
for msg in dev.msgs.values():
signals = []
for sig in msg.signals.values():
if sig is signal:
print("found signal")
signals.append(signal.name)
for name in signals:
msg.rm_signal(name)
[docs] def rm_config(self, config: "Config") -> None:
""" Remove a Config from Spec.
:param config: Config to be removed.
"""
for dev in self.devices.values():
cfgs = []
for cfg in dev.cfgs.values():
if cfg is config:
cfgs.append(cfg.name)
for name in cfgs:
dev.rm_cfg(name)
[docs] def rm_cmd(self, command: "Command") -> None:
""" Remove a Command from Spec.
:param command: Command to be removed.
"""
for dev in self.devices.values():
cmds = []
for cmd in dev.cfgs.values():
if cmd is command:
cmds.append(cmd)
for name in cmds:
dev.rm_cmd(name)
[docs] def rm_log(self, log):
""" Remove a Log from Spec.
:param log: Log to be removed.
"""
logs = []
for l in self.logs.values():
if log is l:
logs.append(l.name)
for name in logs:
del self.logs[name]
[docs] def rm_enum(self, enum):
""" Remove a Enum from Spec.
:param log: Enum to be removed.
"""
enums = []
for l in self.enums.values():
if enum is l:
enums.append(l.name)
for name in enums:
del self.enums[name]
[docs] def rm_enum_value(self, enum_value):
""" Remove a Enum from Spec.
:param log: EnumValue to be removed.
"""
for e in self.enums.values():
enum_values = []
for ev in e.enumeration.values():
if enum_value is ev:
enum_values.append(ev.name)
for name in enum_values:
del e.enumeration[name]
[docs] def compile(self) -> Dict[str, Any]:
""" Transform python class node to its dictionary representation.
:return: A dictionary containing the node parameters
"""
d = {"devices": {}, "logs": {}, "enums": {}}
for dev_k, dev_v in self.devices.items():
d["devices"][dev_k] = dev_v.compile()
for log_k, log_v in self.logs.items():
d["logs"][log_k] = log_v.compile()
for enum_k, enum_v in self.enums.items():
d["enums"][enum_k] = enum_v.compile()
d["common"] = self.common.compile()
d["version"] = self.version
return d
[docs] def decompile(self, d: Dict[str, Any]) -> None:
""" Transform node dictionary representation into a python class.
:param d: Node dictionary
"""
d = copy.deepcopy(d)
self.devices = {}
self.logs = {}
self.common.decompile(d["common"])
for k, v in handle_key_not_found(d, "devices"):
dev = Device(self)
dev.decompile(v)
self.devices[k] = dev
for k, v in handle_key_not_found(d, "logs"):
log = Log(self)
log.decompile(v)
self.logs[k] = log
for k, v in handle_key_not_found(d, "enums"):
enum = Enum(self)
enum.decompile(v)
self.enums[k] = enum
self.versionn = d["version"]
[docs] def normalize(self):
""" Update devices and logs dictionary keys. """
normalize(self.devices)
normalize(self.logs)
normalize(self.enums)
for enum in self.enums.values():
enum.normalize()
for key, dev in self.devices.items():
dev.normalize()
def __repr__(self):
out = "Spec: \n"
for device in self.devices.values():
out += ""
out += str(device)
out += "\n"
return out
[docs]class Signal:
"""
Signal node. Represents a CAN signal, similar to a DBC signal.
:param name: Name of the Signal.
:param start: Start bit
:param length: Signal bit size.
:param scale: Scaling applied to the signal's data.
:param offset: Offset applied to the signal's data.
:param unit: Unit of the Signal after applying scaling and offset.
:param comment: Description of the Signal.
:param min_value: Minimum value allowed to the Signal's data.
:param max_value: Maximum value allowed to the Signal's data.
:param type: Type of the Signal's data.
:param mux: Name of the mux Signal. None if the Signal doesn't belong to a multiplexed Message.
:param mux_count: Number of signals that the mux can reference for this Muxed signal.
"""
def __init__(
self,
parent: "Message" = None,
name: str = "",
start: int = 0,
length: int = 0,
scale: float = 1,
offset: float = 0,
unit: str = "",
comment: str = "",
min_value: int = 0,
max_value: int = 0,
type: str = "unsigned",
byte_order: str = "little_endian",
mux: str = "",
mux_count: int = 1,
alias: str = "",
):
self.parent = parent
self.name = name
self.start = start
self.length = length
self.scale = scale
self.offset = offset
self.unit = unit
self.comment = comment
self.min_value = min_value
self.max_value = max_value
self.type = type
self.byte_order = byte_order
self.mux = mux
self.mux_count = mux_count
self.alias = alias
self.creation_date = datetime.datetime.now()
[docs] def set_name(self, name: str) -> None:
try:
self.name = name
except Exception as e:
return
[docs] def set_start(self, start: int) -> None:
try:
self.start = int(start)
except Exception as e:
return
[docs] def set_length(self, length: int) -> None:
try:
self.length = int(length)
except Exception as e:
return
[docs] def set_scale(self, scale: float) -> None:
try:
self.scale = float(scale)
except Exception as e:
return
[docs] def set_offset(self, offset: float) -> None:
try:
self.offset = float(offset)
except Exception as e:
return
[docs] def set_unit(self, unit: str) -> None:
try:
self.unit = unit
except Exception as e:
return
[docs] def set_min_value(self, min_value: float) -> None:
try:
self.min_value = float(min_value)
except Exception as e:
return
[docs] def set_max_value(self, max_value: float) -> None:
try:
self.max_value = float(max_value)
except Exception as e:
return
[docs] def set_type(self, type: str) -> None:
try:
self.type = type
except Exception as e:
return
[docs] def set_byte_order(self, byte_order: str) -> None:
try:
self.byte_order = byte_order
except Exception as e:
return
[docs] def set_mux(self, mux: str) -> None:
try:
self.mux = mux
except Exception as e:
return
[docs] def set_mux_count(self, mux_count: int) -> None:
try:
self.mux_count = int(mux_count)
except Exception as e:
return
[docs] def set_alias(self, alias: str) -> None:
try:
self.alias = alias
except Exception as e:
return
[docs] def get_name(self) -> str:
return self.name
[docs] def get_start(self) -> int:
return self.start
[docs] def get_length(self) -> int:
return self.length
[docs] def get_scale(self) -> float:
return self.scale
[docs] def get_offset(self) -> float:
return self.offset
[docs] def get_unit(self) -> str:
return self.unit
[docs] def get_min_value(self) -> float:
return self.min_value
[docs] def get_max_value(self) -> float:
return self.max_value
[docs] def get_type(self) -> str:
return self.type
[docs] def get_byte_order(self) -> str:
return self.byte_order
[docs] def get_mux(self) -> str:
return self.mux
[docs] def get_mux_count(self) -> int:
return self.mux_count
[docs] def get_alias(self) -> str:
return self.alias
[docs] def compile(self) -> Dict[str, Any]:
""" Transform python class node to its dictionary representation.
:return: A dictionary containing the node parameters
"""
d = copy.deepcopy(self.__dict__)
del d["creation_date"]
del d["parent"]
return d
[docs] def decompile(self, d: Dict[str, Any]) -> None:
""" Transform node dictionary representation into a python class.
:param d: Node dictionary
"""
self.__dict__.update(d)
def __hash__(self):
return hash((self.name, self.start, self.length, self.creation_date))
# def __repr__(self):
# return """ {
# name: {},
# start: {},
# length: {},
# scale: {},
# offset: {},
# unit: {},
# comment: {},
# min: {},
# max: {},
# type: {},
# byte_order: {},
# mux: {},
# mux_count: {}
# }
# """.format(self.name, self.start, self.length, self.scale, self.offset, self.unit, self.comment, self.min_value, self.max_value, self.type, self.byte_order, self.mux,self.mux_count)
def __repr__(self):
return ""
[docs]class Message:
""" Message node, Represents a CAN message, similar to a DBC message.
:param name: Name of the Message.
:param id: FST Message identifier, highest 6 bits of the identifier.
:param dlc: Message DLC.
:param signals: Dictionary containing the Message signals.
:param frequency: Transmission period in millisecond. If 0 message
isn't automatically sent.
"""
def __init__(
self,
parent: "Device" = None,
name: str = "",
id: int = 0,
dlc: int = 8,
signals: Dict[str, Signal] = None,
frequency: int = 0,
description: str = "",
):
self.parent = parent
assert self.parent is not None
c = max([msg.id for msg in self.parent.msgs.values()] + [0]) + 1
self.name = name
self.id = id or c
self.dlc = dlc
self.signals = {} if signals == None else signals
self.frequency = frequency
self.description = description
self.creation_date = datetime.datetime.now()
[docs] def set_name(self, name: str) -> None:
try:
self.name = name
except Exception as e:
return
[docs] def set_id(self, id: int) -> None:
try:
self.id = int(id)
except Exception as e:
return
[docs] def set_dlc(self, dlc: int) -> None:
try:
self.dlc = int(dlc)
except Exception as e:
return
[docs] def set_frequency(self, frequency: int) -> None:
try:
self.frequency = int(frequency)
except Exception as e:
return
[docs] def set_description(self, description: str) -> None:
try:
self.description = description
except Exception as e:
return
[docs] def get_name(self) -> str:
return self.name
[docs] def get_id(self) -> int:
return self.id
[docs] def get_dlc(self) -> int:
return self.dlc
[docs] def get_frequency(self) -> int:
return self.frequency
[docs] def get_description(self) -> str:
return self.description
[docs] def compile(self) -> Dict[str, Any]:
""" Transform python class node to its dictionary representation.
:return: A dictionary containing the node parameters
"""
msgs = {}
for k, v in self.signals.items():
msgs[k] = v.compile()
d = copy.deepcopy(self.__dict__)
d["signals"] = msgs
del d["creation_date"]
del d["parent"]
return d
[docs] def decompile(self, d: Dict[str, Any]) -> None:
""" Transform node dictionary representation into a python class.
:param d: Node dictionary
"""
signals = d["signals"]
del d["signals"]
self.__dict__.update(d)
for key, value in signals.items():
sig = Signal(self)
sig.decompile(value)
self.signals[key] = sig
[docs] def add_signal(self, signal: Signal) -> bool:
""" Add a Signal to Message.
:param signal: Signal to be added
:return: Operation success status: True - Success, False - Failure
"""
if signal == None:
return False
if signal.name in self.signals.keys():
self.signals[signal.name].mux_count += 1
return True
self.signals[signal.name] = signal
return True
[docs] def get_signal(self, name: str) -> Optional[Signal]:
""" Get a Signal from Message by its name.
:param name: Signal name.
:return: Signal or None if not found.
"""
return self.signals.get(name)
[docs] def rm_signal(self, name: str) -> bool:
""" Remove a Signal from Spec.
:param signal: Signal to be removed.
"""
if self.get_signal(name) is None:
print("Not found", name)
return False
print("deleting signal")
del self.signals[name]
return True
[docs] def normalize(self):
""" Update signals dictionary keys."""
normalize(self.signals)
def __hash__(self):
return hash((self.name, self.id, self.creation_date))
def __repr__(self):
return (
"{"
+ f"""
name: {self.name},
id: {self.id},
dlc: {self.dlc},
frequency: {self.frequency}
"""
+ "}"
)
[docs]class Argument:
""" Argument node. Represents a Command Argument.
:param name: Name of the Argument.
:param id: Argument identifier.
:param comment: description of the Argument.
"""
def __init__(self, parent: "Command" = None, name: str = "", id: int = 0, comment: str = ""):
self.parent = parent
self.name = name
self.id = id
self.comment = comment
self.creation_date = datetime.datetime.now()
[docs] def get_name(self) -> str:
return self.name
[docs] def get_id(self) -> int:
return self.id
[docs] def set_name(self, name: str) -> None:
try:
self.name = name
except Exception as e:
return
[docs] def set_id(self, id: int) -> None:
try:
self.id = int(id)
except Exception as e:
return
[docs] def compile(self) -> Dict[str, Any]:
""" Transform python class node to its dictionary representation.
:return: A dictionary containing the node parameters
"""
print("compiling argument")
d = copy.deepcopy(self.__dict__)
del d["creation_date"]
del d["parent"]
return d
[docs] def decompile(self, d: Dict[str, Any]) -> None:
""" Transform node dictionary representation into a python class.
:param d: Node dictionary
"""
self.__dict__.update(d)
[docs]class Command:
""" Command node. Represents a Command.
:param name: Name of the Command.
:param n_args: Number of arguments in the Command.
:param comment: description of the Command.
:param id: Command identifier.
:param args: Dictionary containing the Command's input Arguments.
:param rets: Dictionary containing the Command's output Arguments.
"""
def __init__(
self,
parent: "Device" = None,
name: str = "",
n_args: int = 3,
comment: str = "",
id: int = 0,
args: Dict[str, Argument] = None,
rets: Dict[str, Argument] = None,
):
self.parent = parent
assert self.parent is not None
c = max([cmd.id for cmd in self.parent.cmds.values()] + [0]) + 1
self.name = name
self.n_args = n_args
self.comment = comment
self.id = int(id)
self.args = {} if args == None else args
self.rets = {} if rets == None else rets
self.creation_date = datetime.datetime.now()
[docs] def get_name(self) -> str:
return self.name
[docs] def get_n_args(self) -> int:
return self.n_args
[docs] def get_id(self) -> int:
return self.id
[docs] def set_name(self, name: str) -> None:
try:
self.name = name
except Exception as e:
return
[docs] def set_n_args(self, n_args: int) -> None:
try:
self.n_args = int(n_args)
except Exception as e:
return
[docs] def set_id(self, id: int) -> None:
try:
self.id = int(id)
except Exception as e:
return
[docs] def compile(self) -> Dict[str, Any]:
""" Transform python class node to its dictionary representation.
:return: A dictionary containing the node parameters
"""
args = {}
rets = {}
for k, v in self.args.items():
args[k] = v.compile()
for k, v in self.rets.items():
rets[k] = v.compile()
att = copy.deepcopy(self.__dict__)
att["args"] = args
att["rets"] = rets
del att["creation_date"]
del att["parent"]
return att
[docs] def add_arg(self, arg: Argument) -> None:
""" Add a input Argument to Command.
:param arg: Argument to be added
:return: Operation success status: True - Success, False - Failure
"""
self.args[arg.name] = arg
[docs] def add_ret(self, ret: Argument) -> None:
""" Add a output Argument to Command.
:param ret: Argument to be added
:return: Operation success status: True - Success, False - Failure
"""
self.rets[ret.name] = ret
[docs] def decompile(self, d: Dict[str, Any]) -> None:
""" Transform node dictionary representation into a python class.
:param d: Node dictionary
"""
args = d["args"]
rets = d["rets"]
del d["args"]
del d["rets"]
self.__dict__.update(d)
self.id = int(self.id)
self.n_args = int(self.n_args)
for arg_k, arg_v in args.items():
arg = Argument()
arg.decompile(arg_v)
self.args[arg_k] = arg
for ret_k, ret_v in rets.items():
ret = Argument()
ret.decompile(ret_v)
self.rets[ret_k] = ret
[docs] def normalize(self):
return
def __hash__(self):
return hash((self.name, self.id, self.creation_date))
[docs]class Config:
""" Config node. Represents a Config.
:param name: Name of the Config.
:param id: Config identifier.
:param comment: description of the Config.
"""
def __init__(self, parent: "Device" = None, name: str = "", id: int = 0, comment: str = ""):
self.parent = parent
self.name = name
self.id = int(id)
self.comment = comment
self.creation_date = datetime.datetime.now()
[docs] def get_name(self) -> str:
return self.name
[docs] def get_id(self) -> int:
return self.id
[docs] def set_name(self, name: str) -> None:
try:
self.name = name
except Exception as e:
return
[docs] def set_id(self, id: int) -> None:
try:
self.id = int(id)
except Exception as e:
return
[docs] def compile(self) -> Dict[str, Any]:
""" Transform python class node to its dictionary representation.
:return: A dictionary containing the node parameters
"""
d = copy.deepcopy(self.__dict__)
del d["creation_date"]
del d["parent"]
return d
[docs] def decompile(self, d: Dict[str, Any]) -> None:
""" Transform node dictionary representation into a python class.
:param d: Node dictionary
"""
self.__dict__.update(d)
self.id = int(self.id)
[docs] def normalize(self):
return
def __hash__(self):
return hash((self.name, self.id, self.creation_date))
[docs]class Device:
""" Device node, Represents a CAN device.
:param name: Name of the Device.
:param id: FST Device identifier, lowest 5 bits of the identifier.
:param msgs: Dictionary containing the Device messages.
:param cmds: Dictionary containing the Device commands.
:param cfgs: Dictionary containing the Device configs.
isn't automatically sent.
"""
def __init__(
self,
parent: "Spec" = None,
name: str = "default_name",
id: int = 0,
msgs: Dict[str, Message] = None,
cmds: Dict[str, Command] = None,
cfgs: Dict[str, Config] = None,
):
self.parent = parent
assert self.parent is not None
c = max([dev.id for dev in self.parent.devices.values()] + [0]) + 1
self.name = name
self.id = id or c
self.msgs = {} if msgs == None else msgs
self.cmds = {} if cmds == None else cmds
self.cfgs = {} if cfgs == None else cfgs
self.creation_date = datetime.datetime.now()
[docs] def set_name(self, name: str) -> None:
try:
self.name = name
except Exception as e:
return
[docs] def set_id(self, id: int) -> None:
try:
self.id = int(id)
except Exception as e:
return
[docs] def get_name(self) -> str:
return self.name
[docs] def get_id(self) -> int:
return self.id
[docs] def add_cmd(self, cmd: Command) -> None:
self.cmds[cmd.name] = cmd
[docs] def add_cfg(self, cfg: Config) -> None:
self.cfgs[cfg.name] = cfg
[docs] def compile(self) -> Dict[str, Any]:
""" Transform python class node to its dictionary representation.
:return: A dictionary containing the node parameters
"""
msgs = {}
cmds = {}
cfgs = {}
for msg_k, msg_v in self.msgs.items():
msgs[msg_k] = msg_v.compile()
for cmd_k, cmd_v in self.cmds.items():
cmds[cmd_k] = cmd_v.compile()
for cfg_k, cfg_v in self.cfgs.items():
cfgs[cfg_k] = cfg_v.compile()
att = copy.deepcopy(self.__dict__)
att["msgs"] = msgs
att["cmds"] = cmds
att["cfgs"] = cfgs
del att["creation_date"]
del att["parent"]
return att
[docs] def decompile(self, d: Dict[str, Any]) -> None:
""" Transform node dictionary representation into a python class.
:param d: Node dictionary
"""
msgs = d["msgs"]
cmds = d["cmds"]
cfgs = d["cfgs"]
del d["msgs"]
del d["cmds"]
del d["cfgs"]
self.__dict__.update(d)
for k, v in msgs.items():
msg = Message(self)
msg.decompile(v)
self.msgs[k] = msg
for k, v in cmds.items():
cmd = Command(self)
cmd.decompile(v)
self.cmds[k] = cmd
for k, v in cfgs.items():
cfg = Config(self)
cfg.decompile(v)
self.cfgs[k] = cfg
[docs] def add_msg(self, msg: Message) -> bool:
""" Add a Message to Device.
:param msg: Message to be added.
:return: Operation success status: True - Success, False - Failure
"""
if msg == None:
return False
if msg.name in self.msgs.keys():
return False
self.msgs[msg.name] = msg
return True
[docs] def get_msg(self, name: str) -> Optional[Message]:
""" Get a Message from Device by its name.
:param name: Message name.
:return: Message or None if not found.
"""
return self.msgs.get(name)
[docs] def rm_msg(self, name: str) -> bool:
""" Remove a Message from Device.
:param name: Name of the Message to be removed.
"""
if self.get_msg(name) is None:
return False
del self.msgs[name]
return True
[docs] def get_cmd(self, name: str) -> Optional[Command]:
""" Get a Command from Device by its name.
:param name: Command name.
:return: Command or None if not found.
"""
return self.cmds.get(name)
[docs] def rm_cmd(self, name: str) -> bool:
""" Remove a Command from Device.
:param name: Name of the Command to be removed.
"""
if self.get_cmd(name) is None:
return False
del self.cmds[name]
return True
[docs] def get_cfg(self, name: str) -> Optional[Config]:
""" Get a Config from Device by its name.
:param name: Config name.
:return: Config or None if not found.
"""
return self.cfgs.get(name)
[docs] def rm_cfg(self, name: str) -> bool:
""" Remove a Config from Device.
:param name: Name of the Config to be removed.
"""
if self.get_cfg(name) is None:
return False
del self.cfgs[name]
return True
[docs] def normalize(self):
""" Update messages, commands and configs dictionary keys. """
normalize(self.msgs)
normalize(self.cmds)
normalize(self.cfgs)
for key, msg in self.msgs.items():
msg.normalize()
for key, cmd in self.cmds.items():
cmd.normalize()
for key, cfg in self.cfgs.items():
cfg.normalize()
def __hash__(self):
return hash((self.name, self.id, self.creation_date))
def __repr__(self):
return (
#name: {self.name},
#id: {self.id}
"{"
+ f"""
"""
+ "}"
)
[docs]class Common:
def __init__(
self,
parent: Spec = None,
name: str = "common",
id: int = 0,
msgs: Dict[str, Message] = None,
cfgs: Dict[str, Config] = None,
cmds: Dict[str, Command] = None,
):
self.parent = parent
self.name = name
self.id = id
self.msgs = {} if msgs == None else msgs
self.cfgs = {} if cfgs == None else cfgs
self.cmds = {} if cmds == None else cmds
self.creation_date = datetime.datetime.now()
[docs] def set_name(self, name: str) -> None:
try:
self.name = name
except Exception as e:
return
[docs] def set_id(self, id: int) -> None:
try:
self.id = int(id)
except Exception as e:
return
[docs] def get_name(self) -> str:
return self.name
[docs] def get_id(self) -> int:
return self.id
[docs] def add_msg(self, msg: Message) -> bool:
if msg == None:
return False
if msg.name in self.msgs.keys():
return False
self.msgs[msg.name] = msg
return True
[docs] def compile(self) -> Dict[str, Any]:
""" Transform python class node to its dictionary representation.
:return: A dictionary containing the node parameters
"""
msgs = {}
for msg_k, msg_v in self.msgs.items():
msgs[msg_k] = msg_v.compile()
att = copy.deepcopy(self.__dict__)
att["msgs"] = msgs
del att["creation_date"]
del att["parent"]
return att
[docs] def decompile(self, d: Dict[str, Any]) -> None:
""" Transform node dictionary representation into a python class.
:param d: Node dictionary
"""
msgs = d["msgs"]
del d["msgs"]
self.__dict__.update(d)
for key, value in msgs.items():
msg = Message(self)
msg.decompile(value)
self.msgs[key] = msg
def __hash__(self):
return hash((self.name, self.id, self.creation_date))
[docs]def make_sid(dev_id: int, msg_id: int) -> int:
""" Find the sid from the dev_id and the msg_id """
return msg_id * 32 + dev_id
[docs]def decompose_id(sid: int) -> Tuple[int, int]:
""" Find the dev_id and the msg_id from the sid."""
return sid & 0x1F, (sid >> 5) & 0x3F