From a437244ebb8855862603c4ad1688b171b3dab351 Mon Sep 17 00:00:00 2001 From: David Rebbe Date: Fri, 12 Jun 2026 11:22:46 -0400 Subject: [PATCH 1/4] Fix memory corruption and exception leaks in SpyMessage attribute handlers - Header setter wrote up to 8 bytes into the 4-byte J1850 Header array, spilling into Data[0..3]; tuple lengths are now validated against the real field sizes (ValueError instead of silent corruption) - ExtraDataPtr setter truncated the tuple length through uint16_t and through NumberBytesData for non length-packing protocols; lengths are now range checked per protocol - Broken null checks (!data && !PyLong_Check(data)) could dereference NULL and leaked pending TypeErrors into later calls (surfacing as SystemError); element conversion errors now propagate immediately - getattr("ExtraDataPtr") returned Py_None without incref, underflowing None's refcount on Python < 3.12 - ExtraDataPtr setter could delete[] DLL-owned buffers on received messages (noExtraDataPtrCleanup) and the ExtraDataPtrEnabled path left a dangling pointer behind after freeing (double free in dealloc) - Removed T_OBJECT_EX member entries that exposed descriptors reinterpreting raw struct memory as PyObject* (reachable via type(msg).__dict__) and fixed cross-type offsetof in the J1850 table Co-Authored-By: Claude Fable 5 --- src/object_spy_message.cpp | 181 ++++++++++++++++++++----------------- tests/test_spy_message.py | 122 +++++++++++++++++++++++++ 2 files changed, 218 insertions(+), 85 deletions(-) create mode 100644 tests/test_spy_message.py diff --git a/src/object_spy_message.cpp b/src/object_spy_message.cpp index 267c95aa0..0c1c4a7a8 100644 --- a/src/object_spy_message.cpp +++ b/src/object_spy_message.cpp @@ -23,6 +23,38 @@ static void spy_message_object_dealloc(spy_message_object* self) Py_TYPE(self)->tp_free((PyObject*)self); } +// Validates that value is a tuple of at most max_length ints and copies the +// elements into dest. Returns the tuple length, or -1 with an exception set. +// dest is only written on success. +static Py_ssize_t _copy_byte_tuple(PyObject* value, PyObject* attr_name, unsigned char* dest, Py_ssize_t max_length) +{ + if (!PyTuple_Check(value)) { + PyErr_Format(PyExc_AttributeError, + "'%.50s' object attribute '%U' needs to be a tuple", + MODULE_NAME "." SPY_MESSAGE_OBJECT_NAME, + attr_name); + return -1; + } + Py_ssize_t length = PyTuple_Size(value); + if (length > max_length) { + PyErr_Format( + PyExc_ValueError, "'%U' accepts at most %zd bytes (got %zd)", attr_name, max_length, length); + return -1; + } + unsigned char temp[8] = { 0 }; + for (Py_ssize_t i = 0; i < length; ++i) { + PyObject* item = PyTuple_GetItem(value, i); + if (!item) + return -1; + long byte = PyLong_AsLong(item); + if (byte == -1 && PyErr_Occurred()) + return -1; + temp[i] = (unsigned char)byte; + } + memcpy(dest, temp, (size_t)length); + return length; +} + static PyObject* spy_message_object_getattr(PyObject* o, PyObject* attr_name) { #if PY_MAJOR_VERSION >= 3 @@ -97,7 +129,7 @@ static PyObject* spy_message_object_getattr(PyObject* o, PyObject* attr_name) } return tuple; } else { - return Py_None; + Py_RETURN_NONE; } } else { return PyObject_GenericGetAttr(o, attr_name); @@ -108,108 +140,88 @@ static int spy_message_object_setattr(PyObject* o, PyObject* name, PyObject* val { spy_message_object* obj = (spy_message_object*)o; if (PyUnicode_CompareWithASCIIString(name, "Data") == 0) { - // Make sure we are a tuple and len() == 8 - if (!PyTuple_Check(value)) { - PyErr_Format(PyExc_AttributeError, - "'%.50s' object attribute '%.400s' needs to be a tuple", - MODULE_NAME "." SPY_MESSAGE_OBJECT_NAME, - name); + Py_ssize_t length = _copy_byte_tuple(value, name, obj->msg.Data, sizeof(obj->msg.Data)); + if (length < 0) return -1; - } - // Get tuple items and place them in array, set as 0 if error. - for (int i = 0; i < 8 && i < PyObject_Length(value); ++i) { - PyObject* data = PyTuple_GetItem(value, i); - if (!data && !PyLong_Check(data)) { - obj->msg.Data[i] = 0; - } else { - obj->msg.Data[i] = (unsigned char)PyLong_AsLong(data); - } - } - obj->msg.NumberBytesData = static_cast(PyObject_Length(value)); + obj->msg.NumberBytesData = static_cast(length); return 0; } else if (PyUnicode_CompareWithASCIIString(name, "AckBytes") == 0) { - // Make sure we are a tuple and len() == 8 - if (!PyTuple_Check(value)) { - PyErr_Format(PyExc_AttributeError, - "'%.50s' object attribute '%.400s' needs to be a tuple", - MODULE_NAME "." SPY_MESSAGE_OBJECT_NAME, - name); + if (_copy_byte_tuple(value, name, obj->msg.AckBytes, sizeof(obj->msg.AckBytes)) < 0) return -1; - } - // Get tuple items and place them in array, set as 0 if error. - for (int i = 0; i < 8 && i < PyObject_Length(value); ++i) { - PyObject* data = PyTuple_GetItem(value, i); - if (!data && !PyLong_Check(data)) { - obj->msg.AckBytes[i] = 0; - } else { - obj->msg.AckBytes[i] = static_cast(PyLong_AsLong(data)); - } - } return 0; } else if (PyUnicode_CompareWithASCIIString(name, "Header") == 0) { - // Make sure we are a tuple and len() == 8 - if (!PyTuple_Check(value)) { - PyErr_Format(PyExc_AttributeError, - "'%.50s' object attribute '%.400s' needs to be a tuple", - MODULE_NAME "." SPY_MESSAGE_OBJECT_NAME, - name); + spy_message_j1850_object* j1850_obj = (spy_message_j1850_object*)obj; + Py_ssize_t length = _copy_byte_tuple(value, name, j1850_obj->msg.Header, sizeof(j1850_obj->msg.Header)); + if (length < 0) return -1; - } - // Get tuple items and place them in array, set as 0 if error. - for (int i = 0; i < 8 && i < PyObject_Length(value); ++i) { - PyObject* data = PyTuple_GetItem(value, i); - if (!data && !PyLong_Check(data)) { - ((spy_message_j1850_object*)obj)->msg.Header[i] = 0; - } else { - ((spy_message_j1850_object*)obj)->msg.Header[i] = (unsigned char)PyLong_AsLong(data); - } - } - obj->msg.NumberBytesHeader = static_cast(PyObject_Length(value)); + obj->msg.NumberBytesHeader = static_cast(length); return 0; } else if (PyUnicode_CompareWithASCIIString(name, "Protocol") == 0) { // Ethernet behavior is backward to CAN and will crash if enabled. - if (PyLong_AsLong(value) == SPY_PROTOCOL_ETHERNET) + long protocol = PyLong_AsLong(value); + if (protocol == -1 && PyErr_Occurred()) + PyErr_Clear(); // let PyObject_GenericSetAttr report the type error + else if (protocol == SPY_PROTOCOL_ETHERNET) obj->msg.ExtraDataPtrEnabled = 0; return PyObject_GenericSetAttr(o, name, value); } else if (PyUnicode_CompareWithASCIIString(name, "ExtraDataPtr") == 0) { - // Make sure we are a tuple and len() == 8 if (!PyTuple_Check(value)) { PyErr_Format(PyExc_AttributeError, - "'%.50s' object attribute '%.400s' needs to be a tuple", + "'%.50s' object attribute '%U' needs to be a tuple", MODULE_NAME "." SPY_MESSAGE_OBJECT_NAME, name); return -1; } - // Get tuple items and place them in array, set as 0 if error. - size_t length = static_cast(PyObject_Length(value)); - if (obj->msg.ExtraDataPtr != NULL) - delete[] (unsigned char*)obj->msg.ExtraDataPtr; - obj->msg.ExtraDataPtr = new unsigned char[length]; // Some newer protocols are packing the length into NumberBytesHeader also so lets handle it here... - if (obj->msg.Protocol == SPY_PROTOCOL_A2B || obj->msg.Protocol == SPY_PROTOCOL_ETHERNET || - obj->msg.Protocol == SPY_PROTOCOL_SPI || obj->msg.Protocol == SPY_PROTOCOL_WBMS) { - obj->msg.NumberBytesHeader = static_cast(length >> 8); + bool packs_length = obj->msg.Protocol == SPY_PROTOCOL_A2B || obj->msg.Protocol == SPY_PROTOCOL_ETHERNET || + obj->msg.Protocol == SPY_PROTOCOL_SPI || obj->msg.Protocol == SPY_PROTOCOL_WBMS; + Py_ssize_t length = PyTuple_Size(value); + Py_ssize_t max_length = packs_length ? 0xFFFF : 0xFF; + if (length > max_length) { + PyErr_Format(PyExc_ValueError, + "'%U' accepts at most %zd bytes for this protocol (got %zd)", + name, + max_length, + length); + return -1; } - obj->msg.NumberBytesData = length & 0xFF; - if (obj->msg.Protocol != SPY_PROTOCOL_ETHERNET) - obj->msg.ExtraDataPtrEnabled = 1; - unsigned char* ExtraDataPtr = (unsigned char*)(obj->msg.ExtraDataPtr); - for (size_t i = 0; i < length; ++i) { - PyObject* data = PyTuple_GetItem(value, static_cast(i)); - if (!data && !PyLong_Check(data)) { - ExtraDataPtr[i] = (unsigned char)0; - } else { - ExtraDataPtr[i] = (unsigned char)PyLong_AsLong(data); + // Validate and copy into a fresh buffer before touching the message. + unsigned char* buffer = new unsigned char[length > 0 ? (size_t)length : 1]; + for (Py_ssize_t i = 0; i < length; ++i) { + PyObject* data = PyTuple_GetItem(value, i); + long byte = data ? PyLong_AsLong(data) : -1; + if (!data || (byte == -1 && PyErr_Occurred())) { + delete[] buffer; + return -1; } + buffer[i] = (unsigned char)byte; } + // Never delete[] a buffer this extension didn't allocate (e.g. DLL-owned + // pointers on received messages flagged with noExtraDataPtrCleanup). + if (obj->msg.ExtraDataPtr != NULL && !obj->noExtraDataPtrCleanup) + delete[] (unsigned char*)obj->msg.ExtraDataPtr; + obj->msg.ExtraDataPtr = buffer; + if (packs_length) + obj->msg.NumberBytesHeader = static_cast(length >> 8); + obj->msg.NumberBytesData = static_cast(length & 0xFF); + if (obj->msg.Protocol != SPY_PROTOCOL_ETHERNET) + obj->msg.ExtraDataPtrEnabled = 1; + obj->noExtraDataPtrCleanup = false; return 0; } else if (PyUnicode_CompareWithASCIIString(name, "ExtraDataPtrEnabled") == 0) { + long enabled = PyLong_AsLong(value); + if (enabled == -1 && PyErr_Occurred()) { + PyErr_Clear(); // let PyObject_GenericSetAttr report the type error + return PyObject_GenericSetAttr(o, name, value); + } // Make sure we clean up here so we don't memory leak - if ((!obj->noExtraDataPtrCleanup && PyLong_AsLong(value) != 1 && obj->msg.ExtraDataPtrEnabled == 1) || - (!obj->noExtraDataPtrCleanup && PyLong_AsLong(value) != 1 && obj->msg.Protocol == SPY_PROTOCOL_ETHERNET)) { - if (obj->msg.ExtraDataPtr != NULL) + if ((!obj->noExtraDataPtrCleanup && enabled != 1 && obj->msg.ExtraDataPtrEnabled == 1) || + (!obj->noExtraDataPtrCleanup && enabled != 1 && obj->msg.Protocol == SPY_PROTOCOL_ETHERNET)) { + if (obj->msg.ExtraDataPtr != NULL) { delete[] (unsigned char*)obj->msg.ExtraDataPtr; - } else if (PyLong_AsLong(value) != 0 && obj->msg.Protocol == SPY_PROTOCOL_ETHERNET) { + obj->msg.ExtraDataPtr = NULL; + } + } else if (enabled != 0 && obj->msg.Protocol == SPY_PROTOCOL_ETHERNET) { // Ethernet always needs to be set to 0 return 0; } @@ -285,11 +297,11 @@ static PyMemberDef spy_message_object_members[] = { "This value is used to identify which network this message was received on." }, { "DescriptionID", T_SHORT, offsetof(spy_message_object, msg.DescriptionID), 0, "Not Used" }, { "ArbIDOrHeader", T_UINT, offsetof(spy_message_object, msg.ArbIDOrHeader), 0, "" }, - { "Data", T_OBJECT_EX, 0, 0, "" }, + // Data, AckBytes and ExtraDataPtr are handled by tp_getattro/tp_setattro; + // a member entry here would expose a descriptor that reinterprets raw + // struct memory as a PyObject*. { "StatusBitField3", T_UINT, offsetof(spy_message_object, msg.StatusBitField3), 0, "StatusBitField3" }, { "StatusBitField4", T_UINT, offsetof(spy_message_object, msg.StatusBitField4), 0, "StatusBitField4" }, - { "AckBytes", T_OBJECT_EX, 0, 0, "" }, - { "ExtraDataPtr", T_OBJECT_EX, offsetof(spy_message_object, msg.ExtraDataPtr), 0, "" }, { "MiscData", T_UBYTE, offsetof(spy_message_object, msg.MiscData), 0, "" }, { "noExtraDataPtrCleanup", T_BOOL, @@ -360,20 +372,19 @@ static PyMemberDef spy_message_j1850_object_members[] = { "Holds the number of bytes in the Data(1 to 8) array or the number of bytes in a CAN remote frame (The DLC)." }, { "NetworkID2", T_UBYTE, - offsetof(spy_message_object, msg.NetworkID2), + offsetof(spy_message_j1850_object, msg.NetworkID2), 0, "This value is used to identify which network this message was received on." }, { "DescriptionID", T_SHORT, offsetof(spy_message_j1850_object, msg.DescriptionID), 0, "Not Used" }, - { "Header", T_OBJECT_EX, 0, 0, "" }, - { "Data", T_OBJECT_EX, 0, 0, "" }, + // Header, Data, AckBytes and ExtraDataPtr are handled by + // tp_getattro/tp_setattro; a member entry here would expose a descriptor + // that reinterprets raw struct memory as a PyObject*. { "StatusBitField3", T_UINT, offsetof(spy_message_j1850_object, msg.StatusBitField3), 0, "StatusBitField3" }, { "StatusBitField4", T_UINT, offsetof(spy_message_j1850_object, msg.StatusBitField4), 0, "StatusBitField4" }, - { "AckBytes", T_OBJECT_EX, 0, 0, "" }, - { "ExtraDataPtr", T_OBJECT_EX, offsetof(spy_message_j1850_object, msg.ExtraDataPtr), 0, "" }, { "MiscData", T_UBYTE, offsetof(spy_message_j1850_object, msg.MiscData), 0, "" }, { "noExtraDataPtrCleanup", T_BOOL, - offsetof(spy_message_object, noExtraDataPtrCleanup), + offsetof(spy_message_j1850_object, noExtraDataPtrCleanup), 0, "Tells Python to not clean up ExtraDataPtrMemory, If this is enabled. Ignore, if unsure." }, { NULL, 0, 0, 0, NULL }, diff --git a/tests/test_spy_message.py b/tests/test_spy_message.py new file mode 100644 index 000000000..f25371da5 --- /dev/null +++ b/tests/test_spy_message.py @@ -0,0 +1,122 @@ +import pytest +import ics + + +def test_data_roundtrip(): + msg = ics.SpyMessage() + msg.Data = (1, 2, 3, 4, 5, 6, 7, 8) + assert msg.Data == (1, 2, 3, 4, 5, 6, 7, 8) + assert msg.NumberBytesData == 8 + + +def test_data_partial_roundtrip(): + msg = ics.SpyMessage() + msg.Data = (0xAA, 0xBB) + assert msg.Data == (0xAA, 0xBB) + assert msg.NumberBytesData == 2 + + +def test_data_rejects_more_than_8_bytes(): + msg = ics.SpyMessage() + with pytest.raises(ValueError): + msg.Data = tuple(range(9)) + + +def test_data_rejects_non_int_elements(): + msg = ics.SpyMessage() + with pytest.raises(TypeError): + msg.Data = (1, "x", 3) + # No exception may be left pending after the failed assignment. + assert msg.NumberBytesData == 0 + + +def test_ack_bytes_rejects_more_than_8_bytes(): + msg = ics.SpyMessage() + with pytest.raises(ValueError): + msg.AckBytes = tuple(range(9)) + + +def test_ack_bytes_rejects_non_int_elements(): + msg = ics.SpyMessage() + with pytest.raises(TypeError): + msg.AckBytes = (None,) + + +def test_header_does_not_overflow_into_data(): + # icsSpyMessageJ1850.Header is only 4 bytes; a longer tuple used to + # spill into Data[0..3]. + msg = ics.SpyMessageJ1850() + msg.Data = (1, 2, 3, 4, 5, 6, 7, 8) + with pytest.raises(ValueError): + msg.Header = (0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF, 0x11, 0x22) + assert msg.Data == (1, 2, 3, 4, 5, 6, 7, 8) + + +def test_header_roundtrip(): + msg = ics.SpyMessageJ1850() + msg.Header = (0xAA, 0xBB, 0xCC, 0xDD) + assert msg.Header == (0xAA, 0xBB, 0xCC, 0xDD) + assert msg.NumberBytesHeader == 4 + + +def test_header_rejects_non_int_elements(): + msg = ics.SpyMessageJ1850() + with pytest.raises(TypeError): + msg.Header = ("a",) + + +def test_extra_data_ptr_roundtrip(): + msg = ics.SpyMessage() + msg.Protocol = ics.SPY_PROTOCOL_CANFD + payload = tuple(x & 0xFF for x in range(64)) + msg.ExtraDataPtr = payload + assert msg.ExtraDataPtr == payload + + +def test_extra_data_ptr_large_roundtrip_packing_protocol(): + # A2B/Ethernet/SPI/WBMS pack a 16-bit length into + # NumberBytesHeader:NumberBytesData. + msg = ics.SpyMessage() + msg.Protocol = ics.SPY_PROTOCOL_ETHERNET + payload = tuple(x & 0xFF for x in range(1500)) + msg.ExtraDataPtr = payload + assert msg.ExtraDataPtr == payload + + +def test_extra_data_ptr_rejects_unrepresentable_length(): + # Length is carried in 16 bits at most; longer payloads used to be + # silently truncated to (len & 0xFFFF) bytes. + msg = ics.SpyMessage() + msg.Protocol = ics.SPY_PROTOCOL_ETHERNET + with pytest.raises(ValueError): + msg.ExtraDataPtr = tuple(x & 0xFF for x in range(0x10000)) + + +def test_extra_data_ptr_rejects_unrepresentable_length_non_packing_protocol(): + # Non length-packing protocols only have the 8-bit NumberBytesData. + msg = ics.SpyMessage() + msg.Protocol = ics.SPY_PROTOCOL_CANFD + with pytest.raises(ValueError): + msg.ExtraDataPtr = tuple(x & 0xFF for x in range(0x100)) + + +def test_extra_data_ptr_rejects_non_int_elements(): + msg = ics.SpyMessage() + msg.Protocol = ics.SPY_PROTOCOL_CANFD + with pytest.raises(TypeError): + msg.ExtraDataPtr = (1, 2, "x") + + +def test_extra_data_ptr_none_when_unset(): + msg = ics.SpyMessage() + assert msg.ExtraDataPtr is None + + +def test_status_bit_field_high_bit_roundtrip(): + msg = ics.SpyMessage() + msg.StatusBitField = ics.SPY_STATUS_EXTENDED + assert msg.StatusBitField == 0x80000000 + + +if __name__ == "__main__": + pytest.main(args=[__file__, "--verbose", "-s"]) From 2b4c2b0a799b553841811daf2afda695a5f6fb00 Mon Sep 17 00:00:00 2001 From: David Rebbe Date: Fri, 12 Jun 2026 11:23:04 -0400 Subject: [PATCH 2/4] Fix 32/64-bit type mismatches and integer overflows in methods.cpp - find_devices: scalar new unsigned int(N) allocated one element instead of an array (heap overflow with more than one device type, delete[] on scalar new); device type masks like NEODEVICE_ALL now convert through PyLong_AsUnsignedLongMask with error checks instead of overflowing PyLong_AsLong on Windows - get/set_device_settings: 'k' wrote a full unsigned long (8 bytes on Linux/macOS x64) into the 4-byte EPlasmaIonVnetChannel_t enum, corrupting the stack; parse with 'I' and cast - get_accessory_firmware_version: 'i' wrote 4 bytes into a 1-byte char index (now int with a 0..255 range check) - uart_write: 'p' wrote a full int into a 1-byte bool (now int); return bytesActuallySent via 'n' instead of truncating size_t through 'i' - get_serial_number: serials >= 0x80000000 ("ZIK0ZK".."ZZZZZZ") came back negative through Py_BuildValue("i"); use 'I' - script_get_script_status: sizeof(parameters)/sizeof(¶meters[0]) divided by pointer size, reporting 127 instead of 255 entries on Win64; elements returned via PyLong_FromUnsignedLong instead of a signed truncating 'i' (also fixes a per-element reference leak) - read_jupiter_firmware: 'i' wrote 4 of size_t's 8 bytes and rejected sizes >= 2GB; parse/return with 'n' - coremini load / read_bin: validate ftell() results (32-bit long on Windows) before malloc - get_rtc: return the __int64 offset via 'L' instead of casting through unsigned long and 'i' - open_device: check PyLong_AsUnsignedLong errors for integer and base36 serial arguments (previously left pending exceptions and an 0xFFFFFFFF serial), free the int() conversion result - reflash progress callback: pass unsigned long progress with 'k' - FlashDevice2 (internal builds): avoid sign-extending the -1 network sentinel to 64 bits on LP64 - byte-sized 'b' parse targets declared as int/unsigned int now use unsigned char (single byte writes into 4-byte stack slots) - get_bus_voltage / vnet channel getters: return unsigned long values via 'k' instead of 'i' Co-Authored-By: Claude Fable 5 --- src/methods.cpp | 130 +++++++++++++++++++++++++++++++++++------------- 1 file changed, 96 insertions(+), 34 deletions(-) diff --git a/src/methods.cpp b/src/methods.cpp index c410b4591..ce42adaa8 100644 --- a/src/methods.cpp +++ b/src/methods.cpp @@ -15,6 +15,7 @@ #include "object_spy_message.h" #include "setup_module_auto_defines.h" +#include #include #include #include @@ -1054,9 +1055,14 @@ PyObject* meth_find_devices(PyObject* self, PyObject* args, PyObject* keywords) if (!_convertListOrTupleToArray(device_types, &device_type_vector)) return NULL; device_types_list_size = static_cast(device_type_vector.size()); - device_types_list.reset((new unsigned int(device_types_list_size))); - for (unsigned int i = 0; i < device_types_list_size; ++i) - device_types_list[i] = (unsigned int)PyLong_AsLong(device_type_vector[i]); + device_types_list.reset(new unsigned int[device_types_list_size]); + for (unsigned int i = 0; i < device_types_list_size; ++i) { + // Device type masks like NEODEVICE_ALL (0xFFFFFFFF) don't fit in a + // 32 bit signed long, so use the masked unsigned conversion. + device_types_list[i] = (unsigned int)PyLong_AsUnsignedLongMask(device_type_vector[i]); + if (PyErr_Occurred()) + return NULL; + } } // Lets finally call the icsneo40 function try { @@ -1163,8 +1169,11 @@ PyObject* meth_open_device(PyObject* self, PyObject* args, PyObject* keywords) return NULL; network_ids_list_size = static_cast(network_ids_vector.size()); network_ids_list.reset(new unsigned char[network_ids_list_size]); - for (unsigned int i = 0; i < network_ids_list_size; ++i) + for (unsigned int i = 0; i < network_ids_list_size; ++i) { network_ids_list[i] = (unsigned char)PyLong_AsLong(network_ids_vector[i]); + if (PyErr_Occurred()) + return NULL; + } use_network_ids = true; } @@ -1172,6 +1181,8 @@ PyObject* meth_open_device(PyObject* self, PyObject* args, PyObject* keywords) if (device && PyLong_CheckExact(device)) { // Device is a serial number in integer format serial_number = PyLong_AsUnsignedLong(device); + if (PyErr_Occurred()) + return NULL; } else if (device && PyUnicode_CheckExact(device)) { // Lets convert the base36 string into an integer PyObject* module_name = PyUnicode_FromString("builtins"); @@ -1190,6 +1201,9 @@ PyObject* meth_open_device(PyObject* self, PyObject* args, PyObject* keywords) return NULL; } else { serial_number = PyLong_AsUnsignedLong(return_value); + Py_DECREF(return_value); + if (PyErr_Occurred()) + return NULL; } } else { return set_ics_exception(exception_runtime_error(), "Failed to convert serial number string to integer."); @@ -1440,7 +1454,7 @@ PyObject* meth_get_rtc(PyObject* self, PyObject* args) device_utc_time.tm_year = ics_time.year; device_utc_time.tm_isdst = -1; device_utc_time.tm_year += 100; - unsigned long offset = (unsigned long)_tm_secs_offset(&device_utc_time, current_utc_time); + long long offset = _tm_secs_offset(&device_utc_time, current_utc_time); PyDateTime_IMPORT; if (!PyDateTimeAPI) { return set_ics_exception(exception_runtime_error(), "Failed to initialize PyDateTime"); @@ -1452,7 +1466,7 @@ PyObject* meth_get_rtc(PyObject* self, PyObject* args) device_utc_time.tm_min, device_utc_time.tm_sec, 0); - return Py_BuildValue("(O,i)", datetime, offset); + return Py_BuildValue("(O,L)", datetime, offset); } catch (ice::Exception& ex) { return set_ics_exception(exception_runtime_error(), (char*)ex.what()); } @@ -1555,6 +1569,12 @@ PyObject* meth_coremini_load(PyObject* self, PyObject* args) fseek(f, 0, SEEK_END); fsize = ftell(f); rewind(f); + // ftell() returns a 32 bit long on Windows; a negative or huge result + // means the file is too large to load through this int-sized API. + if (fsize < 0 || fsize > INT_MAX) { + fclose(f); + return set_ics_exception(exception_runtime_error(), "CoreMini script file size is invalid"); + } data = (unsigned char*)malloc(sizeof(char) * fsize); data_size = (int)fread(data, 1, static_cast(fsize), f); fclose(f); @@ -1809,7 +1829,8 @@ PyObject* meth_get_messages(PyObject* self, PyObject* args) { (void)self; double timeout = 0.1; - int use_j1850 = 0; + // 'b' writes a single byte, so the target must be byte sized + unsigned char use_j1850 = 0; PyObject* obj = NULL; if (!PyArg_ParseTuple(args, arg_parse("O|bd:", __FUNCTION__), &obj, &use_j1850, &timeout)) { return NULL; @@ -1920,7 +1941,7 @@ PyObject* meth_get_script_status(PyObject* self, PyObject* args) icsneoScriptGetScriptStatusEx(lib, "icsneoScriptGetScriptStatusEx"); auto gil = PyAllowThreads(); if (!icsneoScriptGetScriptStatusEx( - handle, parameters, sizeof(parameters) / sizeof(¶meters[0]), parameters_count)) { + handle, parameters, sizeof(parameters) / sizeof(parameters[0]), parameters_count)) { gil.restore(); return set_ics_exception(exception_runtime_error(), "icsneoScriptGetScriptStatusEx() Failed"); } @@ -1930,8 +1951,9 @@ PyObject* meth_get_script_status(PyObject* self, PyObject* args) } PyObject* list_object = PyList_New(0); for (unsigned long i = 0; i < parameters_count; ++i) { - PyObject* _obj = Py_BuildValue("i", parameters[i]); + PyObject* _obj = PyLong_FromUnsignedLong(parameters[i]); PyList_Append(list_object, _obj); + Py_XDECREF(_obj); } return list_object; } @@ -2094,7 +2116,16 @@ PyObject* meth_flash_devices(PyObject* self, PyObject* args) void (*MessageCallback)(const char* message, bool success))> FlashDevice2(lib, "FlashDevice2"); auto gil = PyAllowThreads(); - if (!FlashDevice2(0x3835C256, &nde->neoDevice, rc, reflash_count, network_id, iOptions, 0, &message_callback)) { + // Avoid sign-extending the -1 sentinel to 0xFFFFFFFFFFFFFFFF on LP64; + // the DLL expects the 32 bit value on every platform. + if (!FlashDevice2(0x3835C256, + &nde->neoDevice, + rc, + reflash_count, + static_cast(static_cast(network_id)), + iOptions, + 0, + &message_callback)) { gil.restore(); PyBuffer_Release(&buffer); return set_ics_exception(exception_runtime_error(), "FlashDevice2() Failed"); @@ -2116,9 +2147,9 @@ static void message_reflash_callback(const wchar_t* message, unsigned long progr if (!msg_reflash_callback) { PySys_WriteStdout("%ls -%ld\n", message, progress); } else if (PyObject_HasAttrString(msg_reflash_callback, "reflash_callback")) { - PyObject_CallMethod(msg_reflash_callback, "reflash_callback", "u,i", message, progress); + PyObject_CallMethod(msg_reflash_callback, "reflash_callback", "u,k", message, progress); } else { - PyObject_CallFunction(msg_reflash_callback, "u,i", message, progress); + PyObject_CallFunction(msg_reflash_callback, "u,k", message, progress); } // Unlock the GIL here again... PyGILState_Release(state); @@ -2163,10 +2194,13 @@ PyObject* meth_get_device_settings(PyObject* self, PyObject* args) (void)self; PyObject* obj = NULL; long device_type_override = -1; - EPlasmaIonVnetChannel_t vnet_slot = (EPlasmaIonVnetChannel_t)PlasmaIonVnetChannelMain; - if (!PyArg_ParseTuple(args, arg_parse("O|lk:", __FUNCTION__), &obj, &device_type_override, &vnet_slot)) { + // 'k' would write a full unsigned long (8 bytes on LP64) into a 4 byte + // enum, so parse into an unsigned int and cast at the call sites. + unsigned int vnet_slot_arg = (unsigned int)PlasmaIonVnetChannelMain; + if (!PyArg_ParseTuple(args, arg_parse("O|lI:", __FUNCTION__), &obj, &device_type_override, &vnet_slot_arg)) { return NULL; } + EPlasmaIonVnetChannel_t vnet_slot = static_cast(vnet_slot_arg); // Before we do anything, we need to grab the python s_device_settings ctype.Structure. PyObject* settings = _getPythonModuleObject("ics.structures.s_device_settings", "s_device_settings"); @@ -2236,10 +2270,13 @@ PyObject* meth_set_device_settings(PyObject* self, PyObject* args) PyObject* obj = NULL; PyObject* settings = NULL; int save_to_eeprom = 1; - EPlasmaIonVnetChannel_t vnet_slot = PlasmaIonVnetChannelMain; - if (!PyArg_ParseTuple(args, arg_parse("OO|ik:", __FUNCTION__), &obj, &settings, &save_to_eeprom, &vnet_slot)) { + // 'k' would write a full unsigned long (8 bytes on LP64) into a 4 byte + // enum, so parse into an unsigned int and cast at the call site. + unsigned int vnet_slot_arg = (unsigned int)PlasmaIonVnetChannelMain; + if (!PyArg_ParseTuple(args, arg_parse("OO|iI:", __FUNCTION__), &obj, &settings, &save_to_eeprom, &vnet_slot_arg)) { return NULL; } + EPlasmaIonVnetChannel_t vnet_slot = static_cast(vnet_slot_arg); if (!PyNeoDeviceEx_CheckExact(obj)) { return set_ics_exception(exception_runtime_error(), "Argument must be of type " MODULE_NAME ".PyNeoDeviceEx"); } @@ -2640,7 +2677,8 @@ PyObject* meth_coremini_read_tx_message(PyObject* self, PyObject* args) // Scrip (void)self; unsigned int index; PyObject* obj = NULL; - int j1850 = 0; + // 'b' writes a single byte, so the target must be byte sized + unsigned char j1850 = 0; if (!PyArg_ParseTuple(args, arg_parse("OI|b:", __FUNCTION__), &obj, &index, &j1850)) { return NULL; } @@ -2699,7 +2737,8 @@ PyObject* meth_coremini_read_rx_message(PyObject* self, PyObject* args) // Scrip (void)self; unsigned int index; PyObject* obj = NULL; - int j1850 = 0; + // 'b' writes a single byte, so the target must be byte sized + unsigned char j1850 = 0; if (!PyArg_ParseTuple(args, arg_parse("OI|b:", __FUNCTION__), &obj, &index, &j1850)) { return NULL; } @@ -2777,7 +2816,8 @@ PyObject* meth_coremini_write_tx_message(PyObject* self, PyObject* args) // icsn unsigned int index; PyObject* obj = NULL; PyObject* msg_obj = NULL; - int j1850 = 0; + // 'b' writes a single byte, so the target must be byte sized + unsigned char j1850 = 0; if (!PyArg_ParseTuple(args, arg_parse("OIO|b:", __FUNCTION__), &obj, &index, &msg_obj, &j1850)) { return NULL; } @@ -2831,7 +2871,8 @@ PyObject* meth_coremini_write_rx_message(PyObject* self, PyObject* args) // icsn PyObject* obj = NULL; PyObject* msg_obj = NULL; PyObject* msg_mask_obj = NULL; - int j1850 = 0; + // 'b' writes a single byte, so the target must be byte sized + unsigned char j1850 = 0; if (!PyArg_ParseTuple(args, arg_parse("OIOO|b:", __FUNCTION__), &obj, &index, &msg_obj, &msg_mask_obj, &j1850)) { return NULL; } @@ -3080,7 +3121,8 @@ PyObject* meth_get_serial_number(PyObject* self, PyObject* args) return set_ics_exception(exception_runtime_error(), "icsneoGetSerialNumber() Failed"); } gil.restore(); - return Py_BuildValue("i", serial); + // 'I': serials above 0x7FFFFFFF ("ZIK0ZK".."ZZZZZZ") must stay positive + return Py_BuildValue("I", serial); } catch (ice::Exception& ex) { return set_ics_exception(exception_runtime_error(), (char*)ex.what()); } @@ -3370,7 +3412,8 @@ PyObject* meth_set_backup_power_enabled(PyObject* self, PyObject* args) { (void)self; PyObject* obj = NULL; - unsigned int enabled = 1; + // 'b' writes a single byte, so the target must be byte sized + unsigned char enabled = 1; if (!PyArg_ParseTuple(args, arg_parse("O|b:", __FUNCTION__), &obj, &enabled)) { return NULL; } @@ -3475,6 +3518,12 @@ PyObject* meth_load_readbin(PyObject* self, PyObject* args) fseek(f, 0, SEEK_END); fsize = ftell(f); rewind(f); + // ftell() returns a 32 bit long on Windows; a negative or huge result + // means the file is too large to load through this int-sized API. + if (fsize < 0 || fsize > INT_MAX) { + fclose(f); + return set_ics_exception(exception_runtime_error(), "Readbin file size is invalid"); + } data = (unsigned char*)malloc(sizeof(char) * fsize); data_size = (int)fread(data, 1, static_cast(fsize), f); fclose(f); @@ -3714,7 +3763,7 @@ PyObject* meth_get_active_vnet_channel(PyObject* self, PyObject* args) return set_ics_exception(exception_runtime_error(), "icsneoGetActiveVNETChannel() Failed"); } gil.restore(); - return Py_BuildValue("i", channel); + return Py_BuildValue("k", channel); } catch (ice::Exception& ex) { return set_ics_exception(exception_runtime_error(), (char*)ex.what()); } @@ -3749,7 +3798,7 @@ PyObject* meth_set_active_vnet_channel(PyObject* self, PyObject* args) return set_ics_exception(exception_runtime_error(), "icsneoSetActiveVNETChannel() Failed"); } gil.restore(); - return Py_BuildValue("i", channel); + return Py_BuildValue("k", channel); } catch (ice::Exception& ex) { return set_ics_exception(exception_runtime_error(), (char*)ex.what()); } @@ -3906,7 +3955,8 @@ PyObject* meth_get_device_status(PyObject* self, PyObject* args) { (void)self; PyObject* obj = NULL; - int throw_exception_on_size_mismatch = 0; + // 'b' writes a single byte, so the target must be byte sized + unsigned char throw_exception_on_size_mismatch = 0; if (!PyArg_ParseTuple(args, arg_parse("O|b:", __FUNCTION__), &obj, &throw_exception_on_size_mismatch)) { return NULL; } @@ -4066,7 +4116,7 @@ PyObject* meth_get_bus_voltage(PyObject* self, PyObject* args) return set_ics_exception(exception_runtime_error(), "icsneoGetBusVoltage() Failed"); } gil.restore(); - return Py_BuildValue("i", mV); + return Py_BuildValue("k", mV); } catch (ice::Exception& ex) { return set_ics_exception(exception_runtime_error(), (char*)ex.what()); } @@ -4076,11 +4126,17 @@ PyObject* meth_read_jupiter_firmware(PyObject* self, PyObject* args) { (void)self; PyObject* obj = NULL; - size_t fileSize = 0; + // 'i' would write only 4 of size_t's 8 bytes (and reject sizes >= 2GB), + // so parse with 'n' into a Py_ssize_t. + Py_ssize_t file_size_arg = 0; EPlasmaIonVnetChannel_t channel = PlasmaIonVnetChannelMain; - if (!PyArg_ParseTuple(args, arg_parse("Oi|i:", __FUNCTION__), &obj, &fileSize, &channel)) { + if (!PyArg_ParseTuple(args, arg_parse("On|i:", __FUNCTION__), &obj, &file_size_arg, &channel)) { return NULL; } + if (file_size_arg < 0) { + return set_ics_exception(exception_runtime_error(), "file_size must not be negative"); + } + size_t fileSize = static_cast(file_size_arg); // Create the ByteArray PyObject* ba = PyObject_CallObject((PyObject*)&PyByteArray_Type, NULL); @@ -4123,7 +4179,7 @@ PyObject* meth_read_jupiter_firmware(PyObject* self, PyObject* args) } gil.restore(); PyBuffer_Release(&ba_buffer); - return Py_BuildValue("Oi", ba, fileSize); + return Py_BuildValue("On", ba, (Py_ssize_t)fileSize); } catch (ice::Exception& ex) { return set_ics_exception(exception_runtime_error(), (char*)ex.what()); } @@ -4295,8 +4351,9 @@ PyObject* meth_get_accessory_firmware_version(PyObject* self, PyObject* args) { (void)self; PyObject* obj = NULL; - char accessory_indx = 0; - bool check_success = true; + // 'i' writes a full int, so the target must be int sized + int accessory_indx = 0; + unsigned char check_success = 1; if (!PyArg_ParseTuple(args, arg_parse("Oi|b:", __FUNCTION__), &obj, &accessory_indx, &check_success)) { return NULL; } @@ -4320,10 +4377,14 @@ PyObject* meth_get_accessory_firmware_version(PyObject* self, PyObject* args) ice::Function icsneoGetAccessoryFirmwareVersion( lib, "icsneoGetAccessoryFirmwareVersion"); + if (accessory_indx < 0 || accessory_indx > 255) { + return set_ics_exception(exception_runtime_error(), "Accessory index must be between 0 and 255"); + } unsigned int accessory_version = 0; int function_error = 0; auto gil = PyAllowThreads(); - if (!icsneoGetAccessoryFirmwareVersion(handle, accessory_indx, &accessory_version, &function_error)) { + if (!icsneoGetAccessoryFirmwareVersion( + handle, static_cast(accessory_indx), &accessory_version, &function_error)) { gil.restore(); return set_ics_exception(exception_runtime_error(), "icsneoGetAccessoryFirmwareVersion() Failed"); } @@ -4962,7 +5023,8 @@ PyObject* meth_uart_write(PyObject* self, PyObject* args) EUartPort_t port = eUART0; Py_buffer data = {}; uint8_t flags = 0; - bool check_size = true; + // 'p' writes a full int, so the target must be int sized + int check_size = 1; if (!PyArg_ParseTuple(args, arg_parse("OIy*|Bp:", __FUNCTION__), &obj, &port, &data, &flags, &check_size)) { return NULL; } @@ -4996,7 +5058,7 @@ PyObject* meth_uart_write(PyObject* self, PyObject* args) return set_ics_exception(exception_runtime_error(), "Bytes actually sent didn't match bytes to send length"); } - return Py_BuildValue("i", bytesActuallySent); + return Py_BuildValue("n", (Py_ssize_t)bytesActuallySent); } catch (ice::Exception& ex) { return set_ics_exception(exception_runtime_error(), (char*)ex.what()); } From d77b95b68c25c0ca3afa55294d26df4bbba1fab2 Mon Sep 17 00:00:00 2001 From: David Rebbe Date: Fri, 12 Jun 2026 11:23:20 -0400 Subject: [PATCH 3/4] Normalize DeviceSettingsNone and unsigned enum handling across platforms ics.DeviceSettingsNone was -1 on Windows (MSVC wraps the 0xFFFFFFFF enumerator to a signed int) but 4294967295 on Linux/macOS and in the generated Python enum, so settings.DeviceSettingType comparisons gave different answers per platform. - extract_icsneo40_defines.py: enum members whose header value is a hex literal above 0x7FFFFFFF are now emitted through PyLong_FromUnsignedLongLong(static_cast(...)) so every platform exposes the positive value; intentionally negative enumerators (= -1) are unaffected - generate_icsneo40_structs.py: struct fields of enum types with an enumerator above 0x7FFFFFFF are emitted as ctypes.c_uint32 instead of c_int32 so the value round-trips (s_device_settings.DeviceSettingType) - generate_icsneo40_structs.py: descIdType is int16_t in icsnVC40.h, not uint16_t; DescriptionID fields are now signed and consistent with ics_spy_message_vsb - regenerate src/setup_module_auto_defines.cpp - tests: pin DeviceSettingsNone, DeviceSettingType round-trip and DescriptionID signedness; drop the hasattr() guards in test_defines so missing constants fail loudly, and pin the remaining high-bit defines and the serial range constants Co-Authored-By: Claude Fable 5 --- extract_icsneo40_defines.py | 15 ++++++++++++- generate_icsneo40_structs.py | 10 ++++++++- src/setup_module_auto_defines.cpp | 2 +- tests/test_defines.py | 16 ++++++++++---- tests/test_structures.py | 35 +++++++++++++++++++++++++++++++ 5 files changed, 71 insertions(+), 7 deletions(-) create mode 100644 tests/test_structures.py diff --git a/extract_icsneo40_defines.py b/extract_icsneo40_defines.py index 868c16885..e15a2a18d 100644 --- a/extract_icsneo40_defines.py +++ b/extract_icsneo40_defines.py @@ -135,7 +135,20 @@ def extract(): # This removes anything after the equal sign as we don't need it sline[0] = sline[0].split("=")[0] value = sline[0].replace(",", "") - print(f"\tresult += PyModule_AddObjectRef(module, \"{value}\", PyLong_FromLongLong({value}));", file=f) + # Hex enumerators with the high bit set (e.g. 0xFFFFFFFF) get an + # implementation-defined underlying type: MSVC wraps them negative + # while GCC/Clang keep them unsigned. Normalize through unsigned + # int so every platform exposes the positive value. + unsigned32 = False + code_part = line.split("//")[0] + if "=" in code_part: + rhs = code_part.split("=", 1)[1].strip().rstrip(",").strip().strip("()") + if re.match(r"^0[xX][0-9a-fA-F]+$", rhs) and 0x7FFFFFFF < int(rhs, 16) <= 0xFFFFFFFF: + unsigned32 = True + if unsigned32: + print(f"\tresult += PyModule_AddObjectRef(module, \"{value}\", PyLong_FromUnsignedLongLong(static_cast({value})));", file=f) + else: + print(f"\tresult += PyModule_AddObjectRef(module, \"{value}\", PyLong_FromLongLong({value}));", file=f) continue if "#define" in line: sline = line.split("//")[0].split() diff --git a/generate_icsneo40_structs.py b/generate_icsneo40_structs.py index 41699bd85..4a466deaf 100644 --- a/generate_icsneo40_structs.py +++ b/generate_icsneo40_structs.py @@ -466,7 +466,8 @@ def convert_to_ctype_object(data_type): ctype_types[f"c_uint{d}"] = (f"uint{d}_t",) # This is dirty but we don't parse typedefs... - ctype_types["c_uint16"] = ctype_types["c_uint16"] + ("descIdType",) + # descIdType is int16_t in icsnVC40.h (EXTERNAL_PROJECT build) + ctype_types["c_int16"] = ctype_types["c_int16"] + ("descIdType",) ctype_types["c_uint64"] = ctype_types["c_uint64"] + ("_clock_identity",) is_pointer = "*" in data_type and not "void" in data_type @@ -818,6 +819,13 @@ def _write_member(f, member, is_struct_or_union=False): # 4 byte integer on most systems (even 64-bit) # This is a potential hole but nothing we can do here data_type = "ctypes.c_int32" + # Use the unsigned variant when any enumerator needs the + # full 32 bits (e.g. DeviceSettingsNone = 0xFFFFFFFF) so + # the value round-trips through the field. + for enum_member in obj.members: + if isinstance(enum_member.enum_value, int) and enum_member.enum_value > 0x7FFFFFFF: + data_type = "ctypes.c_uint32" + break else: data_type = member.data_type if member.bitfield_size: diff --git a/src/setup_module_auto_defines.cpp b/src/setup_module_auto_defines.cpp index fb90c6404..d53d7a605 100644 --- a/src/setup_module_auto_defines.cpp +++ b/src/setup_module_auto_defines.cpp @@ -1009,7 +1009,7 @@ int setup_module_auto_defines(PyObject * module) result += PyModule_AddObjectRef(module, "DeviceRADGeminiSettingsType", PyLong_FromLongLong(DeviceRADGeminiSettingsType)); result += PyModule_AddObjectRef(module, "DeviceFire3T1sLinSettingsType", PyLong_FromLongLong(DeviceFire3T1sLinSettingsType)); result += PyModule_AddObjectRef(module, "DeviceSettingsTypeMax", PyLong_FromLongLong(DeviceSettingsTypeMax)); - result += PyModule_AddObjectRef(module, "DeviceSettingsNone", PyLong_FromLongLong(DeviceSettingsNone)); + result += PyModule_AddObjectRef(module, "DeviceSettingsNone", PyLong_FromUnsignedLongLong(static_cast(DeviceSettingsNone))); // end of enum - } EDeviceSettingsType; // enum diff --git a/tests/test_defines.py b/tests/test_defines.py index fb8962744..71d756433 100644 --- a/tests/test_defines.py +++ b/tests/test_defines.py @@ -3,16 +3,24 @@ def test_unsigned_overflow(): """Test that certain defines are correctly set to large unsigned values.""" - # These values are defined in icsnVC40.h and should be interpreted as unsigned + # These values are defined in icsnVC40.h/cicsSpyStatusBits.h and should be + # interpreted as unsigned. No hasattr() guard: a missing name is a + # regression in the define generation and must fail loudly. values = ( "SPY_STATUS_EXTENDED", "SPY_STATUS2_ETHERNET_T1S_ETHERNET", "SPY_STATUS2_LIN_NO_SLAVE_DATA", - "SPY_STATUS2_MOST_CHANGED_PAR" + "SPY_STATUS2_MOST_CHANGED_PAR", + "NEODEVICE_OBD2_SIM_DEPRECATED", + "ETHERNET_SETTINGS10G_FLAG_COMM_IN_USE", ) for value in values: - if hasattr(ics, value): - assert getattr(ics, value) == 0x80000000 + assert getattr(ics, value) == 0x80000000, value + + +def test_serial_range_defines(): + assert ics.MIN_BASE36_SERIAL == int("A0000", 36) + assert ics.MAX_SERIAL == int("ZZZZZZ", 36) if __name__ == "__main__": import pytest diff --git a/tests/test_structures.py b/tests/test_structures.py new file mode 100644 index 000000000..2d4ca369a --- /dev/null +++ b/tests/test_structures.py @@ -0,0 +1,35 @@ +import ctypes + +import pytest +import ics +from ics.structures.e_device_settings_type import e_device_settings_type +from ics.structures.s_device_settings import s_device_settings +from ics.structures.ics_spy_message_flex_ray import ics_spy_message_flex_ray +from ics.structures.ics_spy_message_vsb import ics_spy_message_vsb + + +def test_device_settings_none_consistent(): + # The module constant, the generated enum, and the struct field must all + # agree on 0xFFFFFFFF on every platform. MSVC used to wrap the module + # constant to -1. + assert ics.DeviceSettingsNone == 0xFFFFFFFF + assert e_device_settings_type.DeviceSettingsNone.value == 0xFFFFFFFF + + +def test_device_setting_type_field_roundtrip(): + settings = s_device_settings() + settings.DeviceSettingType = 0xFFFFFFFF + assert settings.DeviceSettingType == ics.DeviceSettingsNone + + +def test_description_id_is_signed_int16(): + # descIdType is int16_t in icsnVC40.h (EXTERNAL_PROJECT build). + for struct in (ics_spy_message_flex_ray, ics_spy_message_vsb): + instance = struct() + instance.DescriptionID = -1 + assert instance.DescriptionID == -1, struct.__name__ + assert type(instance).DescriptionID.size == 2, struct.__name__ + + +if __name__ == "__main__": + pytest.main(args=[__file__, "--verbose", "-s"]) From aed1822549f657079d3aa5d5f122dcadc5882bcf Mon Sep 17 00:00:00 2001 From: David Rebbe Date: Fri, 12 Jun 2026 11:23:20 -0400 Subject: [PATCH 4/4] Fix serial_number property for serials above 0x7FFFFFFF NeoDevice.SerialNumber is an int32_t, so serials "ZIK0ZK".."ZZZZZZ" read back negative and the property returned a negative decimal string. Mask to 32 bits before the range checks, and use < for the MIN_BASE36_SERIAL boundary so "A0000" (the first base36 serial) renders as base36 instead of decimal. Co-Authored-By: Claude Fable 5 --- src/ics/py_neo_device_ex.py | 13 ++++++----- tests/test_neo_device_ex.py | 43 +++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 5 deletions(-) create mode 100644 tests/test_neo_device_ex.py diff --git a/src/ics/py_neo_device_ex.py b/src/ics/py_neo_device_ex.py index 0c4424bc2..e21c0270a 100644 --- a/src/ics/py_neo_device_ex.py +++ b/src/ics/py_neo_device_ex.py @@ -83,12 +83,15 @@ def SerialNumber(self) -> int: @property def serial_number(self) -> str: """Return the serial number as a string.""" - if self.SerialNumber <= ics.MIN_BASE36_SERIAL: - return str(self.SerialNumber) - elif self.SerialNumber <= ics.MAX_SERIAL: - return ics.base36enc(self.SerialNumber) + # SerialNumber is an int32_t in the NeoDevice struct, so serials + # above 0x7FFFFFFF ("ZIK0ZK".."ZZZZZZ") read back negative. + serial = self.SerialNumber & 0xFFFFFFFF + if serial < ics.MIN_BASE36_SERIAL: + return str(serial) + elif serial <= ics.MAX_SERIAL: + return ics.base36enc(serial) else: - raise ValueError(f"Failed to convert SerialNumber {self.SerialNumber} to a valid serial number.") + raise ValueError(f"Failed to convert SerialNumber {serial} to a valid serial number.") @property def AutoHandleClose(self) -> bool: diff --git a/tests/test_neo_device_ex.py b/tests/test_neo_device_ex.py new file mode 100644 index 000000000..bd969250d --- /dev/null +++ b/tests/test_neo_device_ex.py @@ -0,0 +1,43 @@ +import pytest +import ics + + +def _make_device(serial: int) -> ics.PyNeoDeviceEx: + device = ics.PyNeoDeviceEx() + # No handle was opened; don't try to close one on GC. + device._auto_handle_close = False + # SerialNumber is an int32_t in the NeoDevice struct, so values with the + # high bit set are stored as negative. + device.neoDevice.SerialNumber = serial - 0x100000000 if serial > 0x7FFFFFFF else serial + return device + + +def test_serial_number_decimal(): + assert _make_device(53123).serial_number == "53123" + + +def test_serial_number_first_base36(): + # MIN_BASE36_SERIAL is exactly "A0000" and must render as base36. + assert int("A0000", 36) == ics.MIN_BASE36_SERIAL + assert _make_device(ics.MIN_BASE36_SERIAL).serial_number == "A0000" + + +def test_serial_number_last_decimal(): + assert _make_device(ics.MIN_BASE36_SERIAL - 1).serial_number == str(ics.MIN_BASE36_SERIAL - 1) + + +def test_serial_number_high_bit_serial(): + # Serials ZIK0ZK..ZZZZZZ exceed 0x7FFFFFFF and read back negative from + # the int32 struct field; the property must still render them correctly. + assert int("ZZZZZZ", 36) == ics.MAX_SERIAL + assert _make_device(ics.MAX_SERIAL).serial_number == "ZZZZZZ" + assert _make_device(int("ZIK0ZK", 36)).serial_number == "ZIK0ZK" + + +def test_serial_number_out_of_range_raises(): + with pytest.raises(ValueError): + _make_device(ics.MAX_SERIAL + 1).serial_number + + +if __name__ == "__main__": + pytest.main(args=[__file__, "--verbose", "-s"])