Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion extract_icsneo40_defines.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,20 @@ def extract():
# This removes anything after the equal sign as we don't need it
sline[0] = sline[0].split("=")[0]
value = sline[0].replace(",", "")
print(f"\tresult += PyModule_AddObjectRef(module, \"{value}\", PyLong_FromLongLong({value}));", file=f)
# Hex enumerators with the high bit set (e.g. 0xFFFFFFFF) get an
# implementation-defined underlying type: MSVC wraps them negative
# while GCC/Clang keep them unsigned. Normalize through unsigned
# int so every platform exposes the positive value.
unsigned32 = False
code_part = line.split("//")[0]
if "=" in code_part:
rhs = code_part.split("=", 1)[1].strip().rstrip(",").strip().strip("()")
if re.match(r"^0[xX][0-9a-fA-F]+$", rhs) and 0x7FFFFFFF < int(rhs, 16) <= 0xFFFFFFFF:
unsigned32 = True
if unsigned32:
print(f"\tresult += PyModule_AddObjectRef(module, \"{value}\", PyLong_FromUnsignedLongLong(static_cast<unsigned int>({value})));", file=f)
else:
print(f"\tresult += PyModule_AddObjectRef(module, \"{value}\", PyLong_FromLongLong({value}));", file=f)
continue
if "#define" in line:
sline = line.split("//")[0].split()
Expand Down
10 changes: 9 additions & 1 deletion generate_icsneo40_structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,8 @@ def convert_to_ctype_object(data_type):
ctype_types[f"c_uint{d}"] = (f"uint{d}_t",)

# This is dirty but we don't parse typedefs...
ctype_types["c_uint16"] = ctype_types["c_uint16"] + ("descIdType",)
# descIdType is int16_t in icsnVC40.h (EXTERNAL_PROJECT build)
ctype_types["c_int16"] = ctype_types["c_int16"] + ("descIdType",)
ctype_types["c_uint64"] = ctype_types["c_uint64"] + ("_clock_identity",)

is_pointer = "*" in data_type and not "void" in data_type
Expand Down Expand Up @@ -818,6 +819,13 @@ def _write_member(f, member, is_struct_or_union=False):
# 4 byte integer on most systems (even 64-bit)
# This is a potential hole but nothing we can do here
data_type = "ctypes.c_int32"
# Use the unsigned variant when any enumerator needs the
# full 32 bits (e.g. DeviceSettingsNone = 0xFFFFFFFF) so
# the value round-trips through the field.
for enum_member in obj.members:
if isinstance(enum_member.enum_value, int) and enum_member.enum_value > 0x7FFFFFFF:
data_type = "ctypes.c_uint32"
break
else:
data_type = member.data_type
if member.bitfield_size:
Expand Down
13 changes: 8 additions & 5 deletions src/ics/py_neo_device_ex.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,15 @@ def SerialNumber(self) -> int:
@property
def serial_number(self) -> str:
"""Return the serial number as a string."""
if self.SerialNumber <= ics.MIN_BASE36_SERIAL:
return str(self.SerialNumber)
elif self.SerialNumber <= ics.MAX_SERIAL:
return ics.base36enc(self.SerialNumber)
# SerialNumber is an int32_t in the NeoDevice struct, so serials
# above 0x7FFFFFFF ("ZIK0ZK".."ZZZZZZ") read back negative.
serial = self.SerialNumber & 0xFFFFFFFF
if serial < ics.MIN_BASE36_SERIAL:
return str(serial)
elif serial <= ics.MAX_SERIAL:
return ics.base36enc(serial)
else:
raise ValueError(f"Failed to convert SerialNumber {self.SerialNumber} to a valid serial number.")
raise ValueError(f"Failed to convert SerialNumber {serial} to a valid serial number.")

@property
def AutoHandleClose(self) -> bool:
Expand Down
130 changes: 96 additions & 34 deletions src/methods.cpp

Large diffs are not rendered by default.

181 changes: 96 additions & 85 deletions src/object_spy_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,38 @@ static void spy_message_object_dealloc(spy_message_object* self)
Py_TYPE(self)->tp_free((PyObject*)self);
}

// Validates that value is a tuple of at most max_length ints and copies the
// elements into dest. Returns the tuple length, or -1 with an exception set.
// dest is only written on success.
static Py_ssize_t _copy_byte_tuple(PyObject* value, PyObject* attr_name, unsigned char* dest, Py_ssize_t max_length)
{
if (!PyTuple_Check(value)) {
PyErr_Format(PyExc_AttributeError,
"'%.50s' object attribute '%U' needs to be a tuple",
MODULE_NAME "." SPY_MESSAGE_OBJECT_NAME,
attr_name);
return -1;
}
Py_ssize_t length = PyTuple_Size(value);
if (length > max_length) {
PyErr_Format(
PyExc_ValueError, "'%U' accepts at most %zd bytes (got %zd)", attr_name, max_length, length);
return -1;
}
unsigned char temp[8] = { 0 };
for (Py_ssize_t i = 0; i < length; ++i) {
PyObject* item = PyTuple_GetItem(value, i);
if (!item)
return -1;
long byte = PyLong_AsLong(item);
if (byte == -1 && PyErr_Occurred())
return -1;
temp[i] = (unsigned char)byte;
}
memcpy(dest, temp, (size_t)length);
return length;
}

static PyObject* spy_message_object_getattr(PyObject* o, PyObject* attr_name)
{
#if PY_MAJOR_VERSION >= 3
Expand Down Expand Up @@ -97,7 +129,7 @@ static PyObject* spy_message_object_getattr(PyObject* o, PyObject* attr_name)
}
return tuple;
} else {
return Py_None;
Py_RETURN_NONE;
}
} else {
return PyObject_GenericGetAttr(o, attr_name);
Expand All @@ -108,108 +140,88 @@ static int spy_message_object_setattr(PyObject* o, PyObject* name, PyObject* val
{
spy_message_object* obj = (spy_message_object*)o;
if (PyUnicode_CompareWithASCIIString(name, "Data") == 0) {
// Make sure we are a tuple and len() == 8
if (!PyTuple_Check(value)) {
PyErr_Format(PyExc_AttributeError,
"'%.50s' object attribute '%.400s' needs to be a tuple",
MODULE_NAME "." SPY_MESSAGE_OBJECT_NAME,
name);
Py_ssize_t length = _copy_byte_tuple(value, name, obj->msg.Data, sizeof(obj->msg.Data));
if (length < 0)
return -1;
}
// Get tuple items and place them in array, set as 0 if error.
for (int i = 0; i < 8 && i < PyObject_Length(value); ++i) {
PyObject* data = PyTuple_GetItem(value, i);
if (!data && !PyLong_Check(data)) {
obj->msg.Data[i] = 0;
} else {
obj->msg.Data[i] = (unsigned char)PyLong_AsLong(data);
}
}
obj->msg.NumberBytesData = static_cast<uint8_t>(PyObject_Length(value));
obj->msg.NumberBytesData = static_cast<uint8_t>(length);
return 0;
} else if (PyUnicode_CompareWithASCIIString(name, "AckBytes") == 0) {
// Make sure we are a tuple and len() == 8
if (!PyTuple_Check(value)) {
PyErr_Format(PyExc_AttributeError,
"'%.50s' object attribute '%.400s' needs to be a tuple",
MODULE_NAME "." SPY_MESSAGE_OBJECT_NAME,
name);
if (_copy_byte_tuple(value, name, obj->msg.AckBytes, sizeof(obj->msg.AckBytes)) < 0)
return -1;
}
// Get tuple items and place them in array, set as 0 if error.
for (int i = 0; i < 8 && i < PyObject_Length(value); ++i) {
PyObject* data = PyTuple_GetItem(value, i);
if (!data && !PyLong_Check(data)) {
obj->msg.AckBytes[i] = 0;
} else {
obj->msg.AckBytes[i] = static_cast<uint8_t>(PyLong_AsLong(data));
}
}
return 0;
} else if (PyUnicode_CompareWithASCIIString(name, "Header") == 0) {
// Make sure we are a tuple and len() == 8
if (!PyTuple_Check(value)) {
PyErr_Format(PyExc_AttributeError,
"'%.50s' object attribute '%.400s' needs to be a tuple",
MODULE_NAME "." SPY_MESSAGE_OBJECT_NAME,
name);
spy_message_j1850_object* j1850_obj = (spy_message_j1850_object*)obj;
Py_ssize_t length = _copy_byte_tuple(value, name, j1850_obj->msg.Header, sizeof(j1850_obj->msg.Header));
if (length < 0)
return -1;
}
// Get tuple items and place them in array, set as 0 if error.
for (int i = 0; i < 8 && i < PyObject_Length(value); ++i) {
PyObject* data = PyTuple_GetItem(value, i);
if (!data && !PyLong_Check(data)) {
((spy_message_j1850_object*)obj)->msg.Header[i] = 0;
} else {
((spy_message_j1850_object*)obj)->msg.Header[i] = (unsigned char)PyLong_AsLong(data);
}
}
obj->msg.NumberBytesHeader = static_cast<uint8_t>(PyObject_Length(value));
obj->msg.NumberBytesHeader = static_cast<uint8_t>(length);
return 0;
} else if (PyUnicode_CompareWithASCIIString(name, "Protocol") == 0) {
// Ethernet behavior is backward to CAN and will crash if enabled.
if (PyLong_AsLong(value) == SPY_PROTOCOL_ETHERNET)
long protocol = PyLong_AsLong(value);
if (protocol == -1 && PyErr_Occurred())
PyErr_Clear(); // let PyObject_GenericSetAttr report the type error
else if (protocol == SPY_PROTOCOL_ETHERNET)
obj->msg.ExtraDataPtrEnabled = 0;
return PyObject_GenericSetAttr(o, name, value);
} else if (PyUnicode_CompareWithASCIIString(name, "ExtraDataPtr") == 0) {
// Make sure we are a tuple and len() == 8
if (!PyTuple_Check(value)) {
PyErr_Format(PyExc_AttributeError,
"'%.50s' object attribute '%.400s' needs to be a tuple",
"'%.50s' object attribute '%U' needs to be a tuple",
MODULE_NAME "." SPY_MESSAGE_OBJECT_NAME,
name);
return -1;
}
// Get tuple items and place them in array, set as 0 if error.
size_t length = static_cast<uint16_t>(PyObject_Length(value));
if (obj->msg.ExtraDataPtr != NULL)
delete[] (unsigned char*)obj->msg.ExtraDataPtr;
obj->msg.ExtraDataPtr = new unsigned char[length];
// Some newer protocols are packing the length into NumberBytesHeader also so lets handle it here...
if (obj->msg.Protocol == SPY_PROTOCOL_A2B || obj->msg.Protocol == SPY_PROTOCOL_ETHERNET ||
obj->msg.Protocol == SPY_PROTOCOL_SPI || obj->msg.Protocol == SPY_PROTOCOL_WBMS) {
obj->msg.NumberBytesHeader = static_cast<uint8_t>(length >> 8);
bool packs_length = obj->msg.Protocol == SPY_PROTOCOL_A2B || obj->msg.Protocol == SPY_PROTOCOL_ETHERNET ||
obj->msg.Protocol == SPY_PROTOCOL_SPI || obj->msg.Protocol == SPY_PROTOCOL_WBMS;
Py_ssize_t length = PyTuple_Size(value);
Py_ssize_t max_length = packs_length ? 0xFFFF : 0xFF;
if (length > max_length) {
PyErr_Format(PyExc_ValueError,
"'%U' accepts at most %zd bytes for this protocol (got %zd)",
name,
max_length,
length);
return -1;
}
obj->msg.NumberBytesData = length & 0xFF;
if (obj->msg.Protocol != SPY_PROTOCOL_ETHERNET)
obj->msg.ExtraDataPtrEnabled = 1;
unsigned char* ExtraDataPtr = (unsigned char*)(obj->msg.ExtraDataPtr);
for (size_t i = 0; i < length; ++i) {
PyObject* data = PyTuple_GetItem(value, static_cast<Py_ssize_t>(i));
if (!data && !PyLong_Check(data)) {
ExtraDataPtr[i] = (unsigned char)0;
} else {
ExtraDataPtr[i] = (unsigned char)PyLong_AsLong(data);
// Validate and copy into a fresh buffer before touching the message.
unsigned char* buffer = new unsigned char[length > 0 ? (size_t)length : 1];
for (Py_ssize_t i = 0; i < length; ++i) {
PyObject* data = PyTuple_GetItem(value, i);
long byte = data ? PyLong_AsLong(data) : -1;
if (!data || (byte == -1 && PyErr_Occurred())) {
delete[] buffer;
return -1;
}
buffer[i] = (unsigned char)byte;
}
// Never delete[] a buffer this extension didn't allocate (e.g. DLL-owned
// pointers on received messages flagged with noExtraDataPtrCleanup).
if (obj->msg.ExtraDataPtr != NULL && !obj->noExtraDataPtrCleanup)
delete[] (unsigned char*)obj->msg.ExtraDataPtr;
obj->msg.ExtraDataPtr = buffer;
if (packs_length)
obj->msg.NumberBytesHeader = static_cast<uint8_t>(length >> 8);
obj->msg.NumberBytesData = static_cast<uint8_t>(length & 0xFF);
if (obj->msg.Protocol != SPY_PROTOCOL_ETHERNET)
obj->msg.ExtraDataPtrEnabled = 1;
obj->noExtraDataPtrCleanup = false;
return 0;
} else if (PyUnicode_CompareWithASCIIString(name, "ExtraDataPtrEnabled") == 0) {
long enabled = PyLong_AsLong(value);
if (enabled == -1 && PyErr_Occurred()) {
PyErr_Clear(); // let PyObject_GenericSetAttr report the type error
return PyObject_GenericSetAttr(o, name, value);
}
// Make sure we clean up here so we don't memory leak
if ((!obj->noExtraDataPtrCleanup && PyLong_AsLong(value) != 1 && obj->msg.ExtraDataPtrEnabled == 1) ||
(!obj->noExtraDataPtrCleanup && PyLong_AsLong(value) != 1 && obj->msg.Protocol == SPY_PROTOCOL_ETHERNET)) {
if (obj->msg.ExtraDataPtr != NULL)
if ((!obj->noExtraDataPtrCleanup && enabled != 1 && obj->msg.ExtraDataPtrEnabled == 1) ||
(!obj->noExtraDataPtrCleanup && enabled != 1 && obj->msg.Protocol == SPY_PROTOCOL_ETHERNET)) {
if (obj->msg.ExtraDataPtr != NULL) {
delete[] (unsigned char*)obj->msg.ExtraDataPtr;
} else if (PyLong_AsLong(value) != 0 && obj->msg.Protocol == SPY_PROTOCOL_ETHERNET) {
obj->msg.ExtraDataPtr = NULL;
}
} else if (enabled != 0 && obj->msg.Protocol == SPY_PROTOCOL_ETHERNET) {
// Ethernet always needs to be set to 0
return 0;
}
Expand Down Expand Up @@ -285,11 +297,11 @@ static PyMemberDef spy_message_object_members[] = {
"This value is used to identify which network this message was received on." },
{ "DescriptionID", T_SHORT, offsetof(spy_message_object, msg.DescriptionID), 0, "Not Used" },
{ "ArbIDOrHeader", T_UINT, offsetof(spy_message_object, msg.ArbIDOrHeader), 0, "" },
{ "Data", T_OBJECT_EX, 0, 0, "" },
// Data, AckBytes and ExtraDataPtr are handled by tp_getattro/tp_setattro;
// a member entry here would expose a descriptor that reinterprets raw
// struct memory as a PyObject*.
{ "StatusBitField3", T_UINT, offsetof(spy_message_object, msg.StatusBitField3), 0, "StatusBitField3" },
{ "StatusBitField4", T_UINT, offsetof(spy_message_object, msg.StatusBitField4), 0, "StatusBitField4" },
{ "AckBytes", T_OBJECT_EX, 0, 0, "" },
{ "ExtraDataPtr", T_OBJECT_EX, offsetof(spy_message_object, msg.ExtraDataPtr), 0, "" },
{ "MiscData", T_UBYTE, offsetof(spy_message_object, msg.MiscData), 0, "" },
{ "noExtraDataPtrCleanup",
T_BOOL,
Expand Down Expand Up @@ -360,20 +372,19 @@ static PyMemberDef spy_message_j1850_object_members[] = {
"Holds the number of bytes in the Data(1 to 8) array or the number of bytes in a CAN remote frame (The DLC)." },
{ "NetworkID2",
T_UBYTE,
offsetof(spy_message_object, msg.NetworkID2),
offsetof(spy_message_j1850_object, msg.NetworkID2),
0,
"This value is used to identify which network this message was received on." },
{ "DescriptionID", T_SHORT, offsetof(spy_message_j1850_object, msg.DescriptionID), 0, "Not Used" },
{ "Header", T_OBJECT_EX, 0, 0, "" },
{ "Data", T_OBJECT_EX, 0, 0, "" },
// Header, Data, AckBytes and ExtraDataPtr are handled by
// tp_getattro/tp_setattro; a member entry here would expose a descriptor
// that reinterprets raw struct memory as a PyObject*.
{ "StatusBitField3", T_UINT, offsetof(spy_message_j1850_object, msg.StatusBitField3), 0, "StatusBitField3" },
{ "StatusBitField4", T_UINT, offsetof(spy_message_j1850_object, msg.StatusBitField4), 0, "StatusBitField4" },
{ "AckBytes", T_OBJECT_EX, 0, 0, "" },
{ "ExtraDataPtr", T_OBJECT_EX, offsetof(spy_message_j1850_object, msg.ExtraDataPtr), 0, "" },
{ "MiscData", T_UBYTE, offsetof(spy_message_j1850_object, msg.MiscData), 0, "" },
{ "noExtraDataPtrCleanup",
T_BOOL,
offsetof(spy_message_object, noExtraDataPtrCleanup),
offsetof(spy_message_j1850_object, noExtraDataPtrCleanup),
0,
"Tells Python to not clean up ExtraDataPtrMemory, If this is enabled. Ignore, if unsure." },
{ NULL, 0, 0, 0, NULL },
Expand Down
2 changes: 1 addition & 1 deletion src/setup_module_auto_defines.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ int setup_module_auto_defines(PyObject * module)
result += PyModule_AddObjectRef(module, "DeviceRADGeminiSettingsType", PyLong_FromLongLong(DeviceRADGeminiSettingsType));
result += PyModule_AddObjectRef(module, "DeviceFire3T1sLinSettingsType", PyLong_FromLongLong(DeviceFire3T1sLinSettingsType));
result += PyModule_AddObjectRef(module, "DeviceSettingsTypeMax", PyLong_FromLongLong(DeviceSettingsTypeMax));
result += PyModule_AddObjectRef(module, "DeviceSettingsNone", PyLong_FromLongLong(DeviceSettingsNone));
result += PyModule_AddObjectRef(module, "DeviceSettingsNone", PyLong_FromUnsignedLongLong(static_cast<unsigned int>(DeviceSettingsNone)));
// end of enum - } EDeviceSettingsType;

// enum
Expand Down
16 changes: 12 additions & 4 deletions tests/test_defines.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,24 @@

def test_unsigned_overflow():
"""Test that certain defines are correctly set to large unsigned values."""
# These values are defined in icsnVC40.h and should be interpreted as unsigned
# These values are defined in icsnVC40.h/cicsSpyStatusBits.h and should be
# interpreted as unsigned. No hasattr() guard: a missing name is a
# regression in the define generation and must fail loudly.
values = (
"SPY_STATUS_EXTENDED",
"SPY_STATUS2_ETHERNET_T1S_ETHERNET",
"SPY_STATUS2_LIN_NO_SLAVE_DATA",
"SPY_STATUS2_MOST_CHANGED_PAR"
"SPY_STATUS2_MOST_CHANGED_PAR",
"NEODEVICE_OBD2_SIM_DEPRECATED",
"ETHERNET_SETTINGS10G_FLAG_COMM_IN_USE",
)
for value in values:
if hasattr(ics, value):
assert getattr(ics, value) == 0x80000000
assert getattr(ics, value) == 0x80000000, value


def test_serial_range_defines():
assert ics.MIN_BASE36_SERIAL == int("A0000", 36)
assert ics.MAX_SERIAL == int("ZZZZZZ", 36)

if __name__ == "__main__":
import pytest
Expand Down
Loading
Loading