Source code for fcp.can

from typing import *
import json
import struct
import datetime

[docs]class CANMessage: def __init__(self, sid: int, dlc: int, timestamp: int, data16: List[int] = [], data64 = -1): assert isinstance(sid, int) == True self.sid = sid self.dlc = dlc if data64 == -1: assert len(data16) == 4 self.data64 = sum([data << (16*i) for i,data in enumerate(data16)]) #assert self.data64 != 0 else: self.data64 = data64 self.timestamp = timestamp
[docs] def get_data8(self): data = [] for i in range(8): data.append((self.data64 >> (8*i)) & 0xFF) return data
[docs] def get_data16(self): return [(self.data64 >> 16*i) & 0xFFFF for i in range(4)]
[docs] def get_data64(self): return self.data64
[docs] def encode_json(self): """ Encode message as a utf-8 byte string encoded JSON object. Example: {'sid': 100, 'dlc': 8, 'data': [1,2,3,4], 'timestamp': 1} """ return bytes( json.dumps( { "sid": self.sid, "dlc": self.dlc, "data": self.get_data16(), "timestamp": self.timestamp, } ), "utf-8", )
[docs] def encode_bstring(self): """ Encode message as a utf-8 byte string encoded comma separated list of fields. Example: 100,8,1,2,3,4,1. Encodes sid: 100, dlc: 8, data[0]: 1, data[1]: 2, data[2]: 3, data[3]: 4, timestamp: 1 """ data = self.get_data16() return bytes( str(self.sid)+","+str(self.dlc)+","+str(data[0])+","+str(data[1])+","+str(data[2])+","+str(data[3])+","+str(self.timestamp)+"\n", "utf-8", )
[docs] def encode_struct(self): """ Encode message as a CAN struct: .. code:: C typedef struct { union { struct { uint16_t dev_id:5; // Least significant // First bit of msg_id determines if msg is reserved or not. // 0 == reserved (higher priority) (0-31 decimal) uint16_t msg_id:6; // Most significant }; uint16_t sid; }; uint16_t dlc:4; uint16_t data[4]; } CANdata; """ s = struct.pack("HH4HI", self.sid, self.dlc, *self.get_data16(), self.timestamp) return s
[docs] @staticmethod def decode_json(j): """ Decode JSON encoded message into CANMessage. See encode_json for details on the JSON format used. """ d = json.loads(j.decode()) msg = CANMessage(d.get("sid"),d.get("dlc"),d.get("timestamp"),data16=d.get("data")) if (msg.sid and msg.dlc and msg.data64 and msg.timestamp) == None: raise ValueError("Received invalid message: " + j.decode()) return return msg
[docs] @staticmethod def decode_bstring(bstring): """ Decode bytestring encoded message into CANMessage. See encode_bstring for details on the string format used. """ sid, dlc, *data, timestamp = bstring.decode().split(",") data = [int(d) for d in data] sid = int(sid) dlc = int(dlc) timestamp = int(timestamp) msg = CANMessage(sid,dlc,timestamp, data16=data) return msg
[docs] @staticmethod def decode_CANmessage(message): msg = CANMessage() msg.sid = message.sid msg.dlc = message.dlc msg.data = [0, 0, 0, 0] for i in range(4): msg.data[i] = (message.data >> i * 16) & 0xFFFF return msg
[docs] @staticmethod def decode_struct(s): """ Decode C struct encoded message into CANMessage. See encode_struct for details on the struct format used. """ sid, dlc, *data, timestamp = struct.unpack("HH4HI", s) msg = CANMessage(sid, dlc, timestamp, data16=data) return msg
[docs] @staticmethod def decode_kvaser_csv(csv, start_time): try: line = csv.split(",") timestamp = start_time + datetime.timedelta(seconds=float(line[0])) sid = int(line[2]) dlc = int(line[4]) data8 = line[5:13] data64 = sum([(0 if d == '' else int(d)) << (i*8) for i, d in enumerate(data8)]) data = [] for i in range(4): data.append((data64 >> 16*i) & 0xFFFF) return CANMessage(sid, dlc, timestamp, data16=data) except Exception as e: return None
def __repr__(self): return str(self.encode_json())[1:]
[docs]def test_encode_json(): msg = CANMessage(100, 8, 100, data16=[1,2,3,4]) j = json.loads(msg.encode_json()) assert j["sid"] == 100 assert j["dlc"] == 8 assert j["data"] == [1,2,3,4] assert j["timestamp"] == 100
[docs]def test_encode_bstring(): msg = CANMessage(100, 8, 100, data16=[1,2,3,4]) js = msg.encode_bstring().split(b",") js = [int(j) for j in js] assert js[0] == 100 assert js[1] == 8 assert js[2] == 1 assert js[3] == 2 assert js[4] == 3 assert js[5] == 4 assert js[6] == 100
[docs]def test_encode_struct(): msg = CANMessage(100, 8, 100, data16=[1,2,3,4]) msg = CANMessage.decode_struct(msg.encode_struct()) assert msg.sid == 100 assert msg.dlc == 8 assert msg.get_data16() == [1,2,3,4] assert msg.timestamp == 100
[docs]def test_decode_json(): msg = CANMessage(100, 8, 100, data16=[1,2,3,4]) msg = CANMessage.decode_json(msg.encode_json()) assert msg.sid == 100 assert msg.dlc == 8 assert msg.get_data16() == [1,2,3,4] assert msg.timestamp == 100
[docs]def test_decode_bstring(): msg = CANMessage(100, 8, 100, data16=[1,2,3,4]) msg = CANMessage.decode_bstring(msg.encode_bstring()) assert msg.sid == 100 assert msg.dlc == 8 assert msg.get_data16() == [1,2,3,4] assert msg.timestamp == 100
[docs]def test_decode_kvaser_csv(): msg = CANMessage.decode_kvaser_csv("0.31947,1,1099,2,8,20,178,20,178,61,74,0,0,1", datetime.datetime(year=2019, month=9, day=3)) assert msg.sid == 1099