import json
from fcp.spec import *
from fcp.can import CANMessage
from math import *
[docs]def bitmask(n):
return 2 ** n - 1
[docs]def test_bitmask():
assert 0xF == bitmask(4)
[docs]def encode_signal(signal, value):
value = (value - signal.offset) / signal.scale
return (int(value) & bitmask(signal.length)) << signal.start
[docs]def conv_endianess(value: int, signal: Signal):
length = signal.get_length()/8
log2_length = log2(length)
if length == 1:
return value
if log2_length != int(log2_length):
return value
if signal.get_byte_order() != "big_endian":
return value
length = int(length)
def swap64(x):
return int.from_bytes(value.to_bytes(8, byteorder='little'), byteorder='big', signed=False)
def swap32(x):
return int.from_bytes(value.to_bytes(4, byteorder='little'), byteorder='big', signed=False)
def swap16(x):
return int.from_bytes(value.to_bytes(2, byteorder='little'), byteorder='big', signed=False)
if length == 8:
return swap64(value)
if length == 4:
return swap32(value)
if length == 2:
return swap16(value)
[docs]def decode_signal(signal, value):
value = (value >> signal.start) & bitmask(signal.length)
value = conv_endianess(value, signal)
if signal.type == 'signed':
value = int(value)
if (value >> (signal.length - 1)) == 1:
value = - ((value ^ bitmask(signal.length)) + 1)
return (value * signal.scale) + signal.offset
[docs]class Fcp:
def __init__(self, spec):
self.messages = {}
self.spec = spec
for device in spec.devices.values():
for msg in device.msgs.values():
self.messages[msg.name] = msg
for msg in spec.common.msgs.values():
self.messages[msg.name] = msg
[docs] def encode_msg(self, sid: int, msg_name: str, signals: Dict[str, float]) -> CANMessage:
msg = self.messages.get(msg_name)
data = 0
sigs = msg.signals
for name, value in signals.items():
signal = sigs[name]
data |= encode_signal(signal, value)
sid = make_sid(sid, msg.id)
return CANMessage(sid, msg.dlc, 1, data64 = data)
[docs] def find_msg(self, msg):
dev_id, msg_id = decompose_id(msg.sid)
for dev in self.spec.devices.values():
if dev.id == dev_id:
for msg in dev.msgs.values():
if msg.id == msg_id:
return msg
for msg in self.spec.common.msgs.values():
if msg.id == msg_id:
return msg
[docs] def find_device(self, id):
for dev in self.spec.devices.values:
if isinstance(id, int) and dev.id == id:
return dev
if isinstance(id, str) and dev.name == id:
return dev
return None
[docs] def find_config(self, dev, id):
for name, cfg in dev.cfgs.items():
if isinstance(id, str) and name == id:
return cfg
return None
[docs] def decode_msg(self, msg: CANMessage):
fcp_msg = self.find_msg(msg)
if fcp_msg == None:
return "", {}
signals = {}
data = 0
for i, d in enumerate(msg.get_data16()):
data += d << 16 * i
for name, signal in fcp_msg.signals.items():
signals[name] = decode_signal(signal, data)
return fcp_msg.name, signals
[docs] def decode_log(self, signal):
for name, log in self.spec.logs.items():
if log.id == signal["id"]:
return log.string
return ""
[docs] def decode_get(self, signal):
dev = self.find_device(signal['dst'])
if dev == None:
return None
for name, config in dev.cfgs.items():
if config.id == signal['id']:
return {config.name : signal['data']}
[docs] def encode_get(self, sid: int, dst: int, config: int):
return encode_msg(sid, "req_get", {"dst": dst, "id": config})
[docs] def decode_set(self, signal):
dev = self.find_device(signal['dst'])
if dev == None:
return None
for name, config in dev.cfgs.items():
if config.id == signal['id']:
return {"device": device.name, config.name : signal['data']}
[docs] def encode_set(self, sid: int, dst: int, config: int, value: int):
return encode_msg(sid, "req_set", {"dst": dst, "id": config, "value": value})
#class FCPCom():
# def __init__(self, fcp, com, sid):
# self.fcp = fcp
# self.com = com
# self.sid = sid
#
# def get(self, device, config):
# dev = self.fcp.find_device(device)
# if dev == None:
# return None
#
# cfg = self.fcp.find_config(dev, config)
# if cfg == None:
# return None
#
# msg = self.fcp.encode_get(self.sid, dev.id, cfg.id)
#
# com.send(msg)