From 6e87413a8f78fd6f552784feb6d3fdff7f9a963f Mon Sep 17 00:00:00 2001 From: David Tarazi Date: Tue, 19 May 2026 16:33:26 -0700 Subject: [PATCH 01/12] tcp nodelay to avoid batching TCP packets --- axiomatic_adapter/src/axiomatic_adapter.cpp | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/axiomatic_adapter/src/axiomatic_adapter.cpp b/axiomatic_adapter/src/axiomatic_adapter.cpp index 8fb1491..725570d 100644 --- a/axiomatic_adapter/src/axiomatic_adapter.cpp +++ b/axiomatic_adapter/src/axiomatic_adapter.cpp @@ -112,6 +112,20 @@ class AxiomaticAdapter::AxiomaticAdapterImpl return false; } + // Disable Nagle's algorithm. The Axiomatic device's TCP stack uses delayed + // ACKs, which interacts with Nagle to produce 30-450 ms bursts of segments + // under sustained CAN traffic (observed: avg 33 ms / max 448 ms inter-segment + // gaps at 940 Hz). TCP_NODELAY forces every write to go on the wire + // immediately, which is what we want for streaming small CAN frames where + // latency matters more than per-segment overhead. Failure is non-fatal. + { + boost::system::error_code nd_ec; + tcp_socket_.set_option(boost::asio::ip::tcp::no_delay(true), nd_ec); + if (nd_ec) { + std::cerr << "[Axiomatic] Failed to set TCP_NODELAY: " << nd_ec.message() << std::endl; + } + } + socket_state_ = TCPSocketState::OPEN; return true; } catch (std::exception & e) { From 5c9b2c03800508084f05dfd5289cbbb6a80ca09a Mon Sep 17 00:00:00 2001 From: David Tarazi Date: Wed, 20 May 2026 10:58:29 -0700 Subject: [PATCH 02/12] adding multi-frame decode --- axiomatic_adapter/src/axiomatic_adapter.cpp | 64 +++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/axiomatic_adapter/src/axiomatic_adapter.cpp b/axiomatic_adapter/src/axiomatic_adapter.cpp index 725570d..2a766e2 100644 --- a/axiomatic_adapter/src/axiomatic_adapter.cpp +++ b/axiomatic_adapter/src/axiomatic_adapter.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -196,6 +197,15 @@ class AxiomaticAdapter::AxiomaticAdapterImpl std::optional receive(polymath::socketcan::CanFrame & can_frame) { + // A previous TCP read may have decoded several CAN frames out of a single + // packed CAN Stream message — deliver those one at a time before doing + // another network read, so packed frames don't get silently dropped. + if (!pending_frames_.empty()) { + can_frame = pending_frames_.front(); + pending_frames_.pop_front(); + return std::nullopt; + } + std::vector data(1024, 0); std::atomic data_received(false); boost::system::error_code error_code; @@ -291,6 +301,56 @@ class AxiomaticAdapter::AxiomaticAdapterImpl can_frame.set_len(can_length); can_frame.set_data(can_data); + // Continue walking the rest of this CAN Stream message body. The Axiomatic + // device legally packs multiple CAN frames into one protocol message when + // bus traffic arrives in quick bursts — typical for UDS-style request / + // response patterns where several ECUs reply within a few ms. Any frames + // past the first must be queued so subsequent receive() calls deliver them + // before the next TCP read, otherwise they're silently dropped. + // + // The message body length is at header bytes 9-10 (declared Message Data + // Length). Each CAN/Notification Frame is variable-sized per its Control + // Byte: CB(1) + TS(0/1/2/4) + ID(2 or 4) + DLC bytes for CAN frames; + // fixed 5 bytes total for Notification frames (control bit 7 = 1). + static constexpr size_t TS_LENGTH_TABLE[4] = {0, 1, 2, 4}; + const size_t declared_data_len = static_cast(data[9]) | (static_cast(data[10]) << 8); + const size_t body_end = std::min(11 + declared_data_len, data.size()); + size_t walker = can_data_start + can_length; + while (walker + 1 <= body_end) { + const uint8_t cb = data[walker]; + if ((cb & 0x80) != 0) { + // Notification Frame — fixed 5-byte format, no CAN payload to deliver. + walker += 5; + continue; + } + const size_t ts_size = TS_LENGTH_TABLE[(cb >> 5) & 0b11]; + const bool ext_id = ((cb >> 4) & 0x01) != 0; + const size_t id_size = ext_id ? 4 : 2; + const size_t dlc = cb & 0x0F; + const size_t frame_bytes = 1 + ts_size + id_size + dlc; + if (walker + frame_bytes > body_end) { + // Truncated final frame in this read — abandon rather than misdecode. + break; + } + const size_t id_offset = walker + 1 + ts_size; + uint32_t cid = 0; + for (size_t i = 0; i < id_size; ++i) { + cid |= static_cast(data[id_offset + i]) << (8 * i); + } + std::array dbytes = {0}; + std::copy_n(data.begin() + id_offset + id_size, dlc, dbytes.begin()); + + polymath::socketcan::CanFrame extra; + extra.set_can_id(cid); + extra.set_len(static_cast(dlc)); + extra.set_data(dbytes); + if (ext_id) { + extra.set_id_as_extended(); + } + pending_frames_.push_back(extra); + walker += frame_bytes; + } + return std::nullopt; } @@ -378,6 +438,10 @@ class AxiomaticAdapter::AxiomaticAdapterImpl std::atomic thread_running_; std::atomic stop_thread_requested_; + // CAN frames decoded from a packed CAN Stream message but not yet delivered + // through receive(). Drained one at a time, ahead of the next TCP read. + std::deque pending_frames_; + // from construction std::string ip_address_; std::string port_; From 793d8ae571d98913b4765383c08050ad4897b046 Mon Sep 17 00:00:00 2001 From: David Tarazi Date: Wed, 20 May 2026 11:05:37 -0700 Subject: [PATCH 03/12] walk on both sides --- axiomatic_adapter/src/axiomatic_adapter.cpp | 65 +++++++++++++++------ 1 file changed, 48 insertions(+), 17 deletions(-) diff --git a/axiomatic_adapter/src/axiomatic_adapter.cpp b/axiomatic_adapter/src/axiomatic_adapter.cpp index 2a766e2..a4e1496 100644 --- a/axiomatic_adapter/src/axiomatic_adapter.cpp +++ b/axiomatic_adapter/src/axiomatic_adapter.cpp @@ -301,21 +301,54 @@ class AxiomaticAdapter::AxiomaticAdapterImpl can_frame.set_len(can_length); can_frame.set_data(can_data); - // Continue walking the rest of this CAN Stream message body. The Axiomatic - // device legally packs multiple CAN frames into one protocol message when - // bus traffic arrives in quick bursts — typical for UDS-style request / - // response patterns where several ECUs reply within a few ms. Any frames - // past the first must be queued so subsequent receive() calls deliver them - // before the next TCP read, otherwise they're silently dropped. - // - // The message body length is at header bytes 9-10 (declared Message Data - // Length). Each CAN/Notification Frame is variable-sized per its Control - // Byte: CB(1) + TS(0/1/2/4) + ID(2 or 4) + DLC bytes for CAN frames; - // fixed 5 bytes total for Notification frames (control bit 7 = 1). - static constexpr size_t TS_LENGTH_TABLE[4] = {0, 1, 2, 4}; + // Walk the rest of this CAN Stream message body for packed CAN frames. + // The Axiomatic device legally packs multiple CAN frames into one protocol + // message when bus traffic arrives in quick bursts — typical for UDS-style + // request / response patterns where several ECUs reply within a few ms. const size_t declared_data_len = static_cast(data[9]) | (static_cast(data[10]) << 8); - const size_t body_end = std::min(11 + declared_data_len, data.size()); - size_t walker = can_data_start + can_length; + const size_t first_msg_body_end = std::min(11 + declared_data_len, data.size()); + decodePackedCanFramesInto(data, can_data_start + can_length, first_msg_body_end); + + // Walk forward through any additional Axiomatic protocol messages that + // arrived in the same TCP read. Even with TCP_NODELAY on, the kernel + // receive buffer can hold multiple complete protocol messages by the time + // async_receive() fires — common at higher CAN bus rates. For each + // CAN Stream message we find, decode all its CAN frames; non-CAN-Stream + // messages (heartbeats, status responses, CAN FD stream, unknown IDs) are + // skipped using their declared Message Data Length. We compare the first + // 6 bytes of the header constant ("AXIO" + 0xBA 0x36) so the sync check + // doesn't require Message ID = 1 like the position-0 validation above. + size_t scan_pos = 11 + declared_data_len; + while (scan_pos + 11 <= data.size()) { + if (!std::equal( + AXIOMATIC_CAN_MESSAGE_HEADER.begin(), AXIOMATIC_CAN_MESSAGE_HEADER.begin() + 6, data.begin() + scan_pos)) + { + // Garbage or padding bytes — give up rather than misframe. + break; + } + const uint16_t next_msg_id = + static_cast(data[scan_pos + 6]) | (static_cast(data[scan_pos + 7]) << 8); + const size_t next_decl_len = + static_cast(data[scan_pos + 9]) | (static_cast(data[scan_pos + 10]) << 8); + const size_t next_body_end = std::min(scan_pos + 11 + next_decl_len, data.size()); + if (next_msg_id == 1) { + decodePackedCanFramesInto(data, scan_pos + 11, next_body_end); + } + // Else: heartbeat / status response / CAN FD stream / unknown — skip silently. + scan_pos += 11 + next_decl_len; + } + + return std::nullopt; + } + + // Walks the body of a single CAN Stream message and pushes every CAN frame + // it finds onto pending_frames_. Each CAN/Notification Frame is variable- + // sized per its Control Byte: CB(1) + TS(0/1/2/4) + ID(2 or 4) + DLC for CAN + // frames; fixed 5 bytes for Notification frames (control bit 7 = 1). + void decodePackedCanFramesInto(const std::vector & data, size_t body_start, size_t body_end) + { + static constexpr size_t TS_LENGTH_TABLE[4] = {0, 1, 2, 4}; + size_t walker = body_start; while (walker + 1 <= body_end) { const uint8_t cb = data[walker]; if ((cb & 0x80) != 0) { @@ -329,7 +362,7 @@ class AxiomaticAdapter::AxiomaticAdapterImpl const size_t dlc = cb & 0x0F; const size_t frame_bytes = 1 + ts_size + id_size + dlc; if (walker + frame_bytes > body_end) { - // Truncated final frame in this read — abandon rather than misdecode. + // Truncated final frame — abandon rather than misdecode. break; } const size_t id_offset = walker + 1 + ts_size; @@ -350,8 +383,6 @@ class AxiomaticAdapter::AxiomaticAdapterImpl pending_frames_.push_back(extra); walker += frame_bytes; } - - return std::nullopt; } std::optional receive() From 1278ba0556a052044bb42c8c5e752bb0a198958d Mon Sep 17 00:00:00 2001 From: David Tarazi Date: Wed, 20 May 2026 11:41:17 -0700 Subject: [PATCH 04/12] cleanup --- axiomatic_adapter/src/axiomatic_adapter.cpp | 94 +++++++++++---------- 1 file changed, 51 insertions(+), 43 deletions(-) diff --git a/axiomatic_adapter/src/axiomatic_adapter.cpp b/axiomatic_adapter/src/axiomatic_adapter.cpp index a4e1496..7420068 100644 --- a/axiomatic_adapter/src/axiomatic_adapter.cpp +++ b/axiomatic_adapter/src/axiomatic_adapter.cpp @@ -112,13 +112,8 @@ class AxiomaticAdapter::AxiomaticAdapterImpl socket_state_ = TCPSocketState::ERROR; return false; } - - // Disable Nagle's algorithm. The Axiomatic device's TCP stack uses delayed - // ACKs, which interacts with Nagle to produce 30-450 ms bursts of segments - // under sustained CAN traffic (observed: avg 33 ms / max 448 ms inter-segment - // gaps at 940 Hz). TCP_NODELAY forces every write to go on the wire - // immediately, which is what we want for streaming small CAN frames where - // latency matters more than per-segment overhead. Failure is non-fatal. + // Disable Nagle's algorithm. This removes batching TCP packets for low latency comms and + // keeps one CAN frame per TCP message { boost::system::error_code nd_ec; tcp_socket_.set_option(boost::asio::ip::tcp::no_delay(true), nd_ec); @@ -257,12 +252,10 @@ class AxiomaticAdapter::AxiomaticAdapterImpl } uint8_t control_byte = data[11]; - // Extract timestamp size (bits 6 & 5) - size_t timestamp_size = (control_byte & 0x60) >> 5; - // Check if the frame is extended (bit 4) - bool is_can_extended = (control_byte & 0x10) >> 4; - // Extract CAN frame length (lower 4 bits) - size_t can_length = control_byte & 0x0F; + const uint8_t ts_bits = (control_byte & CONTROL_BYTE_TIMESTAMP_LENGTH_MASK) >> CONTROL_BYTE_TIMESTAMP_LENGTH_SHIFT; + size_t timestamp_size = TIMESTAMP_LENGTH_BYTES_TABLE[ts_bits]; + bool is_can_extended = (control_byte & CONTROL_BYTE_EXTENDED_ID_FLAG) != 0; + size_t can_length = control_byte & CONTROL_BYTE_CAN_DATA_LENGTH_MASK; // Determine where the CAN ID starts (after timestamp bytes) size_t can_id_start = 12 + timestamp_size; @@ -301,29 +294,19 @@ class AxiomaticAdapter::AxiomaticAdapterImpl can_frame.set_len(can_length); can_frame.set_data(can_data); - // Walk the rest of this CAN Stream message body for packed CAN frames. - // The Axiomatic device legally packs multiple CAN frames into one protocol - // message when bus traffic arrives in quick bursts — typical for UDS-style - // request / response patterns where several ECUs reply within a few ms. + // walk the rest of this CAN Stream message body for packed CAN frames -- one protocol message can have >1 CAN Frame + // The Axiomatic device packs multiple CAN frames into one protocol message const size_t declared_data_len = static_cast(data[9]) | (static_cast(data[10]) << 8); const size_t first_msg_body_end = std::min(11 + declared_data_len, data.size()); decodePackedCanFramesInto(data, can_data_start + can_length, first_msg_body_end); - // Walk forward through any additional Axiomatic protocol messages that - // arrived in the same TCP read. Even with TCP_NODELAY on, the kernel - // receive buffer can hold multiple complete protocol messages by the time - // async_receive() fires — common at higher CAN bus rates. For each - // CAN Stream message we find, decode all its CAN frames; non-CAN-Stream - // messages (heartbeats, status responses, CAN FD stream, unknown IDs) are - // skipped using their declared Message Data Length. We compare the first - // 6 bytes of the header constant ("AXIO" + 0xBA 0x36) so the sync check - // doesn't require Message ID = 1 like the position-0 validation above. + // walk forward through any additional Axiomatic protocol messages that arrived in the same TCP buffer size_t scan_pos = 11 + declared_data_len; while (scan_pos + 11 <= data.size()) { + // check for garbage -- non-axiomatic header if (!std::equal( AXIOMATIC_CAN_MESSAGE_HEADER.begin(), AXIOMATIC_CAN_MESSAGE_HEADER.begin() + 6, data.begin() + scan_pos)) { - // Garbage or padding bytes — give up rather than misframe. break; } const uint16_t next_msg_id = @@ -334,35 +317,32 @@ class AxiomaticAdapter::AxiomaticAdapterImpl if (next_msg_id == 1) { decodePackedCanFramesInto(data, scan_pos + 11, next_body_end); } - // Else: heartbeat / status response / CAN FD stream / unknown — skip silently. + // continue on any unhandled frame: heartbeat / status response / CAN FD stream / unknown + // todo(david): find any unhandled status, heartbeat messages and take care of them scan_pos += 11 + next_decl_len; } return std::nullopt; } - // Walks the body of a single CAN Stream message and pushes every CAN frame - // it finds onto pending_frames_. Each CAN/Notification Frame is variable- - // sized per its Control Byte: CB(1) + TS(0/1/2/4) + ID(2 or 4) + DLC for CAN - // frames; fixed 5 bytes for Notification frames (control bit 7 = 1). + // walks the body of a single CAN Stream message and pushes every CAN frame onto pending_frames_ void decodePackedCanFramesInto(const std::vector & data, size_t body_start, size_t body_end) { - static constexpr size_t TS_LENGTH_TABLE[4] = {0, 1, 2, 4}; size_t walker = body_start; while (walker + 1 <= body_end) { const uint8_t cb = data[walker]; - if ((cb & 0x80) != 0) { - // Notification Frame — fixed 5-byte format, no CAN payload to deliver. - walker += 5; + if ((cb & CONTROL_BYTE_NOTIFICATION_FRAME_FLAG) != 0) { + walker += NOTIFICATION_FRAME_TOTAL_BYTES; continue; } - const size_t ts_size = TS_LENGTH_TABLE[(cb >> 5) & 0b11]; - const bool ext_id = ((cb >> 4) & 0x01) != 0; + const size_t ts_size = + TIMESTAMP_LENGTH_BYTES_TABLE[(cb & CONTROL_BYTE_TIMESTAMP_LENGTH_MASK) >> CONTROL_BYTE_TIMESTAMP_LENGTH_SHIFT]; + const bool ext_id = (cb & CONTROL_BYTE_EXTENDED_ID_FLAG) != 0; const size_t id_size = ext_id ? 4 : 2; - const size_t dlc = cb & 0x0F; + const size_t dlc = cb & CONTROL_BYTE_CAN_DATA_LENGTH_MASK; const size_t frame_bytes = 1 + ts_size + id_size + dlc; if (walker + frame_bytes > body_end) { - // Truncated final frame — abandon rather than misdecode. + // truncated final frame — abandon rather than misdecode. break; } const size_t id_offset = walker + 1 + ts_size; @@ -409,9 +389,9 @@ class AxiomaticAdapter::AxiomaticAdapterImpl } size_t message_length = frame_data_length + control_timestamp_byte_length + frame_id_byte_length; - unsigned char control_byte = (1 << 6); - control_byte |= (is_extended ? (1 << 4) : 0); - control_byte |= (frame_data_length & 0x0F); + unsigned char control_byte = CONTROL_BYTE_TIMESTAMP_2_BYTE_VALUE; + control_byte |= (is_extended ? CONTROL_BYTE_EXTENDED_ID_FLAG : 0); + control_byte |= (frame_data_length & CONTROL_BYTE_CAN_DATA_LENGTH_MASK); // initialize the full message with the header, control bytes, timestamp bytes std::vector full_message; @@ -454,6 +434,34 @@ class AxiomaticAdapter::AxiomaticAdapterImpl static constexpr std::array AXIOMATIC_CAN_MESSAGE_HEADER = {'A', 'X', 'I', 'O', 0xBA, 0x36, 0x01}; static constexpr std::chrono::milliseconds TCP_IP_CONNECTION_TIMEOUT_MS{3000}; + // Control Byte (CB) field layout — first byte of every CAN/Notification + // Frame inside a CAN Stream message body. Per Axiomatic Communication + // Protocol spec v6, Section "Control Byte": + // bit 7 : C_Bit — 0 = CAN Frame, 1 = Notification Frame + // bits 6:5 : TS_Bit — Time Stamp length code (see TIMESTAMP_LENGTH_BYTES_TABLE) + // bit 4 : EID_Bit — 0 = standard 11-bit ID, 1 = extended 29-bit ID + // bits 3:0 : L_Bit — CAN Data Length (DLC), 0..8 valid + static constexpr uint8_t CONTROL_BYTE_NOTIFICATION_FRAME_FLAG = 0x80; + static constexpr uint8_t CONTROL_BYTE_TIMESTAMP_LENGTH_MASK = 0x60; + static constexpr int CONTROL_BYTE_TIMESTAMP_LENGTH_SHIFT = 5; + static constexpr uint8_t CONTROL_BYTE_EXTENDED_ID_FLAG = 0x10; + static constexpr uint8_t CONTROL_BYTE_CAN_DATA_LENGTH_MASK = 0x0F; + + // TS_Bit code → number of timestamp bytes that follow CB. Per Table 4 in + // the spec: 00→0 bytes, 01→1, 10→2, 11→4. Note this mapping is non-linear + // at index 3 (which is why a lookup table is needed instead of using the + // raw 2-bit value as the byte count directly). + static constexpr size_t TIMESTAMP_LENGTH_BYTES_TABLE[4] = {0, 1, 2, 4}; + + // TS_Bit code that send() writes into the Control Byte. We always include + // a 2-byte timestamp slot on outbound; firmware tolerates whatever bytes + // are there (spec says outbound TS is ignored by the converter). + static constexpr uint8_t CONTROL_BYTE_TIMESTAMP_2_BYTE_VALUE = static_cast(2) + << CONTROL_BYTE_TIMESTAMP_LENGTH_SHIFT; + + // Notification Frame fixed size: 1-byte NIDB + 4-byte NDB1..NDB4. + static constexpr size_t NOTIFICATION_FRAME_TOTAL_BYTES = 5; + /// @brief Socket connection state as a struct for the mutex during TCP Open Socket to update the variables together struct TCPSocketConnectionState { From 7089c48ba0130f3b5e7c0441c23d5f323253bf39 Mon Sep 17 00:00:00 2001 From: David Tarazi Date: Wed, 20 May 2026 15:46:24 -0700 Subject: [PATCH 05/12] adding logs and increasing buffer size --- axiomatic_adapter/src/axiomatic_adapter.cpp | 42 +++++++++++++++++++-- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/axiomatic_adapter/src/axiomatic_adapter.cpp b/axiomatic_adapter/src/axiomatic_adapter.cpp index 7420068..fda77f8 100644 --- a/axiomatic_adapter/src/axiomatic_adapter.cpp +++ b/axiomatic_adapter/src/axiomatic_adapter.cpp @@ -201,7 +201,7 @@ class AxiomaticAdapter::AxiomaticAdapterImpl return std::nullopt; } - std::vector data(1024, 0); + std::vector data(RECEIVE_BUFFER_SIZE, 0); std::atomic data_received(false); boost::system::error_code error_code; @@ -245,9 +245,18 @@ class AxiomaticAdapter::AxiomaticAdapterImpl // Check size, header info and message type if (data.size() < AXIOMATIC_CAN_MESSAGE_HEADER.size() + 5) { + std::cerr << "[Axiomatic parser] DROP: received " << data.size() + << " bytes, too short to contain a complete protocol header" << std::endl; return std::make_optional("Data too short for header and control byte."); } if (!std::equal(AXIOMATIC_CAN_MESSAGE_HEADER.begin(), AXIOMATIC_CAN_MESSAGE_HEADER.end(), data.begin())) { + std::cerr << "[Axiomatic parser] DROP: position-0 header mismatch in " << data.size() + << "-byte TCP read; first 11 bytes: " << std::hex; + for (size_t i = 0; i < std::min(11, data.size()); ++i) { + std::cerr << ' ' << static_cast(data[i]); + } + std::cerr << std::dec << " (likely a non-CAN-Stream message at position 0 — heartbeat, status, etc." + << " — entire TCP read dropped including any trailing CAN frames)" << std::endl; return std::make_optional("Not a valid CAN message."); } @@ -265,6 +274,8 @@ class AxiomaticAdapter::AxiomaticAdapterImpl // Ensure data is large enough for CAN ID extraction size_t min_id_size = is_can_extended ? 4 : 2; if (data.size() < can_id_start + min_id_size) { + std::cerr << "[Axiomatic parser] DROP: TCP read truncated before CAN ID (need " << (can_id_start + min_id_size) + << " bytes, have " << data.size() << ")" << std::endl; return std::make_optional("Data too short for CAN ID."); } @@ -282,6 +293,9 @@ class AxiomaticAdapter::AxiomaticAdapterImpl // Ensure data is large enough for CAN payload if (data.size() < can_data_start + can_length) { + std::cerr << "[Axiomatic parser] DROP: TCP read truncated before CAN payload (need " + << (can_data_start + can_length) << " bytes for " << can_length << "-byte payload, have " << data.size() + << ")" << std::endl; return std::make_optional("Data too short for CAN payload."); } @@ -307,6 +321,13 @@ class AxiomaticAdapter::AxiomaticAdapterImpl if (!std::equal( AXIOMATIC_CAN_MESSAGE_HEADER.begin(), AXIOMATIC_CAN_MESSAGE_HEADER.begin() + 6, data.begin() + scan_pos)) { + std::cerr << "[Axiomatic parser] DROP: sync prefix mismatch at offset " << scan_pos << " of " << data.size() + << "-byte TCP read; first 6 bytes there: " << std::hex; + for (size_t i = 0; i < 6 && scan_pos + i < data.size(); ++i) { + std::cerr << ' ' << static_cast(data[scan_pos + i]); + } + std::cerr << std::dec << " (stopping scan; remaining " << (data.size() - scan_pos) + << " bytes ignored — possible truncated message or partial TCP read)" << std::endl; break; } const uint16_t next_msg_id = @@ -316,9 +337,11 @@ class AxiomaticAdapter::AxiomaticAdapterImpl const size_t next_body_end = std::min(scan_pos + 11 + next_decl_len, data.size()); if (next_msg_id == 1) { decodePackedCanFramesInto(data, scan_pos + 11, next_body_end); + } else { + std::cerr << "[Axiomatic parser] SKIP: non-CAN-Stream message (Message ID " << next_msg_id << ", " + << next_decl_len << "-byte body) at offset " << scan_pos + << " — heartbeat/status/FD/unknown; not delivered to caller" << std::endl; } - // continue on any unhandled frame: heartbeat / status response / CAN FD stream / unknown - // todo(david): find any unhandled status, heartbeat messages and take care of them scan_pos += 11 + next_decl_len; } @@ -332,6 +355,8 @@ class AxiomaticAdapter::AxiomaticAdapterImpl while (walker + 1 <= body_end) { const uint8_t cb = data[walker]; if ((cb & CONTROL_BYTE_NOTIFICATION_FRAME_FLAG) != 0) { + std::cerr << "[Axiomatic parser] SKIP: notification frame (CB=0x" << std::hex << static_cast(cb) + << std::dec << ") at offset " << walker << " — not delivered to caller" << std::endl; walker += NOTIFICATION_FRAME_TOTAL_BYTES; continue; } @@ -343,6 +368,9 @@ class AxiomaticAdapter::AxiomaticAdapterImpl const size_t frame_bytes = 1 + ts_size + id_size + dlc; if (walker + frame_bytes > body_end) { // truncated final frame — abandon rather than misdecode. + std::cerr << "[Axiomatic parser] DROP: truncated CAN frame at offset " << walker << " (CB=0x" << std::hex + << static_cast(cb) << std::dec << " declares " << frame_bytes << " bytes but only " + << (body_end - walker) << " bytes remain in message body) — frame and remainder dropped" << std::endl; break; } const size_t id_offset = walker + 1 + ts_size; @@ -434,6 +462,14 @@ class AxiomaticAdapter::AxiomaticAdapterImpl static constexpr std::array AXIOMATIC_CAN_MESSAGE_HEADER = {'A', 'X', 'I', 'O', 0xBA, 0x36, 0x01}; static constexpr std::chrono::milliseconds TCP_IP_CONNECTION_TIMEOUT_MS{3000}; + // Receive buffer size for each async_receive call. Larger than the + // protocol's per-message cap (256 bytes) by a wide margin so that bursts + // of protocol messages coalesced by the kernel into a single TCP read fit + // comfortably without truncating any message mid-body. 64 KiB is below + // the typical Linux TCP receive-buffer default (~85 KiB), so we drain + // everything the kernel had buffered in a single call. + static constexpr size_t RECEIVE_BUFFER_SIZE = 65536; + // Control Byte (CB) field layout — first byte of every CAN/Notification // Frame inside a CAN Stream message body. Per Axiomatic Communication // Protocol spec v6, Section "Control Byte": From f6134cd7394d7a2b9fd2cbbb58728446877078be Mon Sep 17 00:00:00 2001 From: David Tarazi Date: Wed, 20 May 2026 15:57:04 -0700 Subject: [PATCH 06/12] walk over full beffer --- axiomatic_adapter/src/axiomatic_adapter.cpp | 125 +++++++------------- 1 file changed, 44 insertions(+), 81 deletions(-) diff --git a/axiomatic_adapter/src/axiomatic_adapter.cpp b/axiomatic_adapter/src/axiomatic_adapter.cpp index fda77f8..a0f0481 100644 --- a/axiomatic_adapter/src/axiomatic_adapter.cpp +++ b/axiomatic_adapter/src/axiomatic_adapter.cpp @@ -242,82 +242,34 @@ class AxiomaticAdapter::AxiomaticAdapterImpl } // --- Process the received data --- - - // Check size, header info and message type - if (data.size() < AXIOMATIC_CAN_MESSAGE_HEADER.size() + 5) { + // + // Walk the buffer starting at position 0, dispatching every Axiomatic + // protocol message we find by its Message ID: + // - CAN Stream (ID 1): extract every CAN frame from its body into + // pending_frames_. This handles both frames packed inside one message + // and multiple CAN Stream messages coalesced into one TCP read. + // - Heartbeat / Status Response / CAN FD Stream / unknown ID: skip past + // the message using its declared Message Data Length. The frames are + // not surfaced to the caller, but trailing CAN frames in the same + // read are still recovered. + // + // We compare only the first 6 bytes of the header constant ("AXIO" + + // 0xBA 0x36) so the sync match doesn't require Message ID = 1 like the + // previous strict 7-byte check did. That earlier behavior would reject + // an entire TCP read whose first protocol message happened to be a + // heartbeat — including any CAN frames that followed it in the same + // buffer. Loss of those trailing CAN frames was the most likely cause + // of single-frame drops during UDS flashes (heartbeats land at the + // front of a TCP read once per second on average, and a flash takes + // long enough to make a coincidence with a critical response likely). + if (data.size() < 11) { std::cerr << "[Axiomatic parser] DROP: received " << data.size() << " bytes, too short to contain a complete protocol header" << std::endl; - return std::make_optional("Data too short for header and control byte."); - } - if (!std::equal(AXIOMATIC_CAN_MESSAGE_HEADER.begin(), AXIOMATIC_CAN_MESSAGE_HEADER.end(), data.begin())) { - std::cerr << "[Axiomatic parser] DROP: position-0 header mismatch in " << data.size() - << "-byte TCP read; first 11 bytes: " << std::hex; - for (size_t i = 0; i < std::min(11, data.size()); ++i) { - std::cerr << ' ' << static_cast(data[i]); - } - std::cerr << std::dec << " (likely a non-CAN-Stream message at position 0 — heartbeat, status, etc." - << " — entire TCP read dropped including any trailing CAN frames)" << std::endl; - return std::make_optional("Not a valid CAN message."); - } - - uint8_t control_byte = data[11]; - const uint8_t ts_bits = (control_byte & CONTROL_BYTE_TIMESTAMP_LENGTH_MASK) >> CONTROL_BYTE_TIMESTAMP_LENGTH_SHIFT; - size_t timestamp_size = TIMESTAMP_LENGTH_BYTES_TABLE[ts_bits]; - bool is_can_extended = (control_byte & CONTROL_BYTE_EXTENDED_ID_FLAG) != 0; - size_t can_length = control_byte & CONTROL_BYTE_CAN_DATA_LENGTH_MASK; - - // Determine where the CAN ID starts (after timestamp bytes) - size_t can_id_start = 12 + timestamp_size; - uint32_t can_id = 0; - size_t can_data_start = 0; - - // Ensure data is large enough for CAN ID extraction - size_t min_id_size = is_can_extended ? 4 : 2; - if (data.size() < can_id_start + min_id_size) { - std::cerr << "[Axiomatic parser] DROP: TCP read truncated before CAN ID (need " << (can_id_start + min_id_size) - << " bytes, have " << data.size() << ")" << std::endl; - return std::make_optional("Data too short for CAN ID."); - } - - // Extract CAN ID (little-endian) - if (!is_can_extended) { - can_id = static_cast(data[can_id_start] | (data[can_id_start + 1] << 8)); - can_data_start = can_id_start + 2; - } else { - can_id = static_cast( - data[can_id_start] | (data[can_id_start + 1] << 8) | (data[can_id_start + 2] << 16) | - (data[can_id_start + 3] << 24)); - can_data_start = can_id_start + 4; - can_frame.set_id_as_extended(); + return std::make_optional("Data too short for header."); } - // Ensure data is large enough for CAN payload - if (data.size() < can_data_start + can_length) { - std::cerr << "[Axiomatic parser] DROP: TCP read truncated before CAN payload (need " - << (can_data_start + can_length) << " bytes for " << can_length << "-byte payload, have " << data.size() - << ")" << std::endl; - return std::make_optional("Data too short for CAN payload."); - } - - // Extract CAN data (zero-padded to 8 bytes) - std::array can_data = {0}; - std::copy_n(data.begin() + can_data_start, can_length, can_data.begin()); - - // Set CAN frame properties - can_frame.set_can_id(can_id); - can_frame.set_len(can_length); - can_frame.set_data(can_data); - - // walk the rest of this CAN Stream message body for packed CAN frames -- one protocol message can have >1 CAN Frame - // The Axiomatic device packs multiple CAN frames into one protocol message - const size_t declared_data_len = static_cast(data[9]) | (static_cast(data[10]) << 8); - const size_t first_msg_body_end = std::min(11 + declared_data_len, data.size()); - decodePackedCanFramesInto(data, can_data_start + can_length, first_msg_body_end); - - // walk forward through any additional Axiomatic protocol messages that arrived in the same TCP buffer - size_t scan_pos = 11 + declared_data_len; + size_t scan_pos = 0; while (scan_pos + 11 <= data.size()) { - // check for garbage -- non-axiomatic header if (!std::equal( AXIOMATIC_CAN_MESSAGE_HEADER.begin(), AXIOMATIC_CAN_MESSAGE_HEADER.begin() + 6, data.begin() + scan_pos)) { @@ -330,21 +282,32 @@ class AxiomaticAdapter::AxiomaticAdapterImpl << " bytes ignored — possible truncated message or partial TCP read)" << std::endl; break; } - const uint16_t next_msg_id = + const uint16_t msg_id = static_cast(data[scan_pos + 6]) | (static_cast(data[scan_pos + 7]) << 8); - const size_t next_decl_len = - static_cast(data[scan_pos + 9]) | (static_cast(data[scan_pos + 10]) << 8); - const size_t next_body_end = std::min(scan_pos + 11 + next_decl_len, data.size()); - if (next_msg_id == 1) { - decodePackedCanFramesInto(data, scan_pos + 11, next_body_end); + const size_t decl_len = static_cast(data[scan_pos + 9]) | (static_cast(data[scan_pos + 10]) << 8); + const size_t body_end = std::min(scan_pos + 11 + decl_len, data.size()); + if (msg_id == 1) { // CAN Stream (deprecated, but what V5.05 firmware emits) + decodePackedCanFramesInto(data, scan_pos + 11, body_end); } else { - std::cerr << "[Axiomatic parser] SKIP: non-CAN-Stream message (Message ID " << next_msg_id << ", " - << next_decl_len << "-byte body) at offset " << scan_pos - << " — heartbeat/status/FD/unknown; not delivered to caller" << std::endl; + std::cerr << "[Axiomatic parser] SKIP: non-CAN-Stream message (Message ID " << msg_id << ", " << decl_len + << "-byte body) at offset " << scan_pos << " — heartbeat/status/FD/unknown; not delivered to caller" + << std::endl; + } + scan_pos += 11 + decl_len; + } + + if (pending_frames_.empty()) { + // Either the buffer started with non-Axiomatic bytes (scan_pos still 0), + // or it contained only non-CAN-Stream protocol traffic (heartbeats etc.). + // Neither case is a real error; the default error_callback_ swallows it. + if (scan_pos == 0) { + return std::make_optional("Not a valid Axiomatic message."); } - scan_pos += 11 + next_decl_len; + return std::make_optional("No CAN frames in received protocol traffic."); } + can_frame = pending_frames_.front(); + pending_frames_.pop_front(); return std::nullopt; } From 03de341036d5278c919d982d62ccce46da2c9ea9 Mon Sep 17 00:00:00 2001 From: David Tarazi Date: Wed, 20 May 2026 16:16:53 -0700 Subject: [PATCH 07/12] cleanup comments --- axiomatic_adapter/src/axiomatic_adapter.cpp | 31 +++------------------ 1 file changed, 4 insertions(+), 27 deletions(-) diff --git a/axiomatic_adapter/src/axiomatic_adapter.cpp b/axiomatic_adapter/src/axiomatic_adapter.cpp index a0f0481..97adc51 100644 --- a/axiomatic_adapter/src/axiomatic_adapter.cpp +++ b/axiomatic_adapter/src/axiomatic_adapter.cpp @@ -194,7 +194,7 @@ class AxiomaticAdapter::AxiomaticAdapterImpl { // A previous TCP read may have decoded several CAN frames out of a single // packed CAN Stream message — deliver those one at a time before doing - // another network read, so packed frames don't get silently dropped. + // another network read, so packed frames don't get silently dropped if (!pending_frames_.empty()) { can_frame = pending_frames_.front(); pending_frames_.pop_front(); @@ -242,26 +242,8 @@ class AxiomaticAdapter::AxiomaticAdapterImpl } // --- Process the received data --- - // - // Walk the buffer starting at position 0, dispatching every Axiomatic - // protocol message we find by its Message ID: - // - CAN Stream (ID 1): extract every CAN frame from its body into - // pending_frames_. This handles both frames packed inside one message - // and multiple CAN Stream messages coalesced into one TCP read. - // - Heartbeat / Status Response / CAN FD Stream / unknown ID: skip past - // the message using its declared Message Data Length. The frames are - // not surfaced to the caller, but trailing CAN frames in the same - // read are still recovered. - // - // We compare only the first 6 bytes of the header constant ("AXIO" + - // 0xBA 0x36) so the sync match doesn't require Message ID = 1 like the - // previous strict 7-byte check did. That earlier behavior would reject - // an entire TCP read whose first protocol message happened to be a - // heartbeat — including any CAN frames that followed it in the same - // buffer. Loss of those trailing CAN frames was the most likely cause - // of single-frame drops during UDS flashes (heartbeats land at the - // front of a TCP read once per second on average, and a flash takes - // long enough to make a coincidence with a critical response likely). + + // walk the buffer starting at position 0, dispatching every Axiomatic protocol message we find by its Message ID if (data.size() < 11) { std::cerr << "[Axiomatic parser] DROP: received " << data.size() << " bytes, too short to contain a complete protocol header" << std::endl; @@ -297,9 +279,6 @@ class AxiomaticAdapter::AxiomaticAdapterImpl } if (pending_frames_.empty()) { - // Either the buffer started with non-Axiomatic bytes (scan_pos still 0), - // or it contained only non-CAN-Stream protocol traffic (heartbeats etc.). - // Neither case is a real error; the default error_callback_ swallows it. if (scan_pos == 0) { return std::make_optional("Not a valid Axiomatic message."); } @@ -428,9 +407,7 @@ class AxiomaticAdapter::AxiomaticAdapterImpl // Receive buffer size for each async_receive call. Larger than the // protocol's per-message cap (256 bytes) by a wide margin so that bursts // of protocol messages coalesced by the kernel into a single TCP read fit - // comfortably without truncating any message mid-body. 64 KiB is below - // the typical Linux TCP receive-buffer default (~85 KiB), so we drain - // everything the kernel had buffered in a single call. + // comfortably without truncating any message mid-body static constexpr size_t RECEIVE_BUFFER_SIZE = 65536; // Control Byte (CB) field layout — first byte of every CAN/Notification From e39116738defb1c9d170cb0fa6603db3aa70e223 Mon Sep 17 00:00:00 2001 From: David Tarazi Date: Wed, 20 May 2026 17:06:47 -0700 Subject: [PATCH 08/12] commenting fixes, buffer optimization, removal of magic numbers --- axiomatic_adapter/README.md | 4 +- axiomatic_adapter/src/axiomatic_adapter.cpp | 190 +++++++++++++------- 2 files changed, 128 insertions(+), 66 deletions(-) diff --git a/axiomatic_adapter/README.md b/axiomatic_adapter/README.md index e8f9b50..62f5db8 100644 --- a/axiomatic_adapter/README.md +++ b/axiomatic_adapter/README.md @@ -51,5 +51,5 @@ adapter.startReceptionThread(); ``` ## KNOWN ISSUES -1. There seems to be an issue with massive amounts of CAN data causing timing inconsistencies through the adapter library - 1. This has not been fully diagnosed +1. Heartbeat messages are not handled. Using TCP, this does not cause issues. In future updates heartbeat messages should be consumed and sent as needed +2. Only TCP mode is supported; no UDP support diff --git a/axiomatic_adapter/src/axiomatic_adapter.cpp b/axiomatic_adapter/src/axiomatic_adapter.cpp index 97adc51..aa29b14 100644 --- a/axiomatic_adapter/src/axiomatic_adapter.cpp +++ b/axiomatic_adapter/src/axiomatic_adapter.cpp @@ -53,6 +53,7 @@ class AxiomaticAdapter::AxiomaticAdapterImpl , receive_callback_(receive_callback_function) , error_callback_(error_callback_function) , receive_timeout_ms_(receive_timeout_ms) + , rx_buffer_(RECEIVE_BUFFER_SIZE, 0) {} ~AxiomaticAdapterImpl() @@ -73,17 +74,17 @@ class AxiomaticAdapter::AxiomaticAdapterImpl TCPSocketConnectionState connection_state{false, boost::asio::error::would_block}; std::mutex connection_state_mutex; - // Asynchronously attempt to connect + // asynchronously attempt to connect boost::asio::async_connect( tcp_socket_, endpoints, [&](const boost::system::error_code & error, const boost::asio::ip::tcp::endpoint &) { std::lock_guard guard(connection_state_mutex); connection_state.error_code = error; connection_state.connected = !error; - // Cancel timeout if connected successfully + // cancel timeout if connected successfully timer.cancel(); }); - // Set up a timer to cancel the operation if it exceeds the timeout + // set up a timer to cancel the operation if it exceeds the timeout timer.async_wait([&](const boost::system::error_code & error) { if (!error) { std::lock_guard guard(connection_state_mutex); @@ -94,7 +95,7 @@ class AxiomaticAdapter::AxiomaticAdapterImpl } }); - // Run the I/O context to handle events + // run the I/O context to handle events tcp_io_context_.restart(); tcp_io_context_.run(); @@ -112,7 +113,7 @@ class AxiomaticAdapter::AxiomaticAdapterImpl socket_state_ = TCPSocketState::ERROR; return false; } - // Disable Nagle's algorithm. This removes batching TCP packets for low latency comms and + // disable Nagle's algorithm. This removes batching TCP packets for low latency comms and // keeps one CAN frame per TCP message { boost::system::error_code nd_ec; @@ -180,10 +181,10 @@ class AxiomaticAdapter::AxiomaticAdapterImpl stop_thread_requested_ = true; if (tcp_receive_thread_.joinable()) { - // Use std::async to wait asynchronously for the thread to stop + // use std::async to wait asynchronously for the thread to stop std::future join_future = std::async(std::launch::async, [this] { tcp_receive_thread_.join(); }); - // Wait for the thread to stop within the timeout period + // wait for the thread to stop within the timeout period return join_future.wait_for(timeout_s) == std::future_status::ready; } @@ -201,39 +202,41 @@ class AxiomaticAdapter::AxiomaticAdapterImpl return std::nullopt; } - std::vector data(RECEIVE_BUFFER_SIZE, 0); + auto & data = rx_buffer_; + size_t bytes_received = 0; std::atomic data_received(false); boost::system::error_code error_code; - // Set up the timer for timeout + // set up the timer for timeout boost::asio::steady_timer timer(tcp_io_context_); timer.expires_after(receive_timeout_ms_); - // Start async receive operation + // start async receive operation tcp_socket_.async_receive( - boost::asio::buffer(data), [&](const boost::system::error_code & error, std::size_t bytes_transferred) { + boost::asio::buffer(data.data(), RECEIVE_BUFFER_SIZE), + [&](const boost::system::error_code & error, std::size_t bytes_transferred) { error_code = error; if (!error) { - data.resize(bytes_transferred); + bytes_received = bytes_transferred; data_received = true; } timer.cancel(); }); - // Set up the timer to handle timeout cancellation + // set up the timer to handle timeout cancellation timer.async_wait([&](const boost::system::error_code & error) { if (!error && !data_received.load()) { error_code = boost::asio::error::timed_out; - // Cancel the ongoing async receive operation on timeout (does not close socket) + // cancel the ongoing async receive operation on timeout (does not close socket) tcp_socket_.cancel(); } }); - // Run the I/O operations concurrently (this allows for new async operations in the future) + // run the I/O operations concurrently (this allows for new async operations in the future) tcp_io_context_.restart(); tcp_io_context_.run(); - // Check for timeout or other errors + // check for timeout or other errors if (error_code == boost::asio::error::timed_out) { return std::optional("Receive operation timed out"); } else if (error_code) { @@ -244,38 +247,43 @@ class AxiomaticAdapter::AxiomaticAdapterImpl // --- Process the received data --- // walk the buffer starting at position 0, dispatching every Axiomatic protocol message we find by its Message ID - if (data.size() < 11) { - std::cerr << "[Axiomatic parser] DROP: received " << data.size() + if (bytes_received < PROTOCOL_HEADER_BYTES) { + std::cerr << "[Axiomatic parser] DROP: received " << bytes_received << " bytes, too short to contain a complete protocol header" << std::endl; return std::make_optional("Data too short for header."); } size_t scan_pos = 0; - while (scan_pos + 11 <= data.size()) { - if (!std::equal( - AXIOMATIC_CAN_MESSAGE_HEADER.begin(), AXIOMATIC_CAN_MESSAGE_HEADER.begin() + 6, data.begin() + scan_pos)) - { - std::cerr << "[Axiomatic parser] DROP: sync prefix mismatch at offset " << scan_pos << " of " << data.size() - << "-byte TCP read; first 6 bytes there: " << std::hex; - for (size_t i = 0; i < 6 && scan_pos + i < data.size(); ++i) { + while (scan_pos + PROTOCOL_HEADER_BYTES <= bytes_received) { + auto protocol_header_match = std::equal( + AXIOMATIC_CAN_MESSAGE_HEADER.begin(), + AXIOMATIC_CAN_MESSAGE_HEADER.begin() + PROTOCOL_SYNC_PREFIX_BYTES, + data.begin() + scan_pos); + if (!protocol_header_match) { + std::cerr << "[Axiomatic parser] DROP: sync prefix mismatch at offset " << scan_pos << " of " << bytes_received + << "-byte TCP read; first " << PROTOCOL_SYNC_PREFIX_BYTES << " bytes there: " << std::hex; + for (size_t i = 0; i < PROTOCOL_SYNC_PREFIX_BYTES && scan_pos + i < bytes_received; ++i) { std::cerr << ' ' << static_cast(data[scan_pos + i]); } - std::cerr << std::dec << " (stopping scan; remaining " << (data.size() - scan_pos) + std::cerr << std::dec << " (stopping scan; remaining " << (bytes_received - scan_pos) << " bytes ignored — possible truncated message or partial TCP read)" << std::endl; break; } - const uint16_t msg_id = - static_cast(data[scan_pos + 6]) | (static_cast(data[scan_pos + 7]) << 8); - const size_t decl_len = static_cast(data[scan_pos + 9]) | (static_cast(data[scan_pos + 10]) << 8); - const size_t body_end = std::min(scan_pos + 11 + decl_len, data.size()); - if (msg_id == 1) { // CAN Stream (deprecated, but what V5.05 firmware emits) - decodePackedCanFramesInto(data, scan_pos + 11, body_end); + const auto msg_id = static_cast( + static_cast(data[scan_pos + MESSAGE_ID_HEADER_OFFSET]) | + (static_cast(data[scan_pos + MESSAGE_ID_HEADER_OFFSET + 1]) << BITS_PER_BYTE)); + const size_t decl_len = + static_cast(data[scan_pos + MESSAGE_DATA_LENGTH_HEADER_OFFSET]) | + (static_cast(data[scan_pos + MESSAGE_DATA_LENGTH_HEADER_OFFSET + 1]) << BITS_PER_BYTE); + const size_t body_end = std::min(scan_pos + PROTOCOL_HEADER_BYTES + decl_len, bytes_received); + if (msg_id == MessageId::CanStream) { + decodePackedCanFramesInto(data, scan_pos + PROTOCOL_HEADER_BYTES, body_end); } else { - std::cerr << "[Axiomatic parser] SKIP: non-CAN-Stream message (Message ID " << msg_id << ", " << decl_len - << "-byte body) at offset " << scan_pos << " — heartbeat/status/FD/unknown; not delivered to caller" - << std::endl; + std::cerr << "[Axiomatic parser] SKIP: non-CAN-Stream message (Message ID " << static_cast(msg_id) + << ", " << decl_len << "-byte body) at offset " << scan_pos + << " — heartbeat/status/FD/unknown; not delivered to caller" << std::endl; } - scan_pos += 11 + decl_len; + scan_pos += PROTOCOL_HEADER_BYTES + decl_len; } if (pending_frames_.empty()) { @@ -294,7 +302,7 @@ class AxiomaticAdapter::AxiomaticAdapterImpl void decodePackedCanFramesInto(const std::vector & data, size_t body_start, size_t body_end) { size_t walker = body_start; - while (walker + 1 <= body_end) { + while (walker + CONTROL_BYTE_BYTES <= body_end) { const uint8_t cb = data[walker]; if ((cb & CONTROL_BYTE_NOTIFICATION_FRAME_FLAG) != 0) { std::cerr << "[Axiomatic parser] SKIP: notification frame (CB=0x" << std::hex << static_cast(cb) @@ -305,9 +313,9 @@ class AxiomaticAdapter::AxiomaticAdapterImpl const size_t ts_size = TIMESTAMP_LENGTH_BYTES_TABLE[(cb & CONTROL_BYTE_TIMESTAMP_LENGTH_MASK) >> CONTROL_BYTE_TIMESTAMP_LENGTH_SHIFT]; const bool ext_id = (cb & CONTROL_BYTE_EXTENDED_ID_FLAG) != 0; - const size_t id_size = ext_id ? 4 : 2; + const size_t id_size = ext_id ? EXTENDED_CAN_ID_BYTES : STANDARD_CAN_ID_BYTES; const size_t dlc = cb & CONTROL_BYTE_CAN_DATA_LENGTH_MASK; - const size_t frame_bytes = 1 + ts_size + id_size + dlc; + const size_t frame_bytes = CONTROL_BYTE_BYTES + ts_size + id_size + dlc; if (walker + frame_bytes > body_end) { // truncated final frame — abandon rather than misdecode. std::cerr << "[Axiomatic parser] DROP: truncated CAN frame at offset " << walker << " (CB=0x" << std::hex @@ -315,10 +323,10 @@ class AxiomaticAdapter::AxiomaticAdapterImpl << (body_end - walker) << " bytes remain in message body) — frame and remainder dropped" << std::endl; break; } - const size_t id_offset = walker + 1 + ts_size; + const size_t id_offset = walker + CONTROL_BYTE_BYTES + ts_size; uint32_t cid = 0; for (size_t i = 0; i < id_size; ++i) { - cid |= static_cast(data[id_offset + i]) << (8 * i); + cid |= static_cast(data[id_offset + i]) << (BITS_PER_BYTE * i); } std::array dbytes = {0}; std::copy_n(data.begin() + id_offset + id_size, dlc, dbytes.begin()); @@ -346,19 +354,31 @@ class AxiomaticAdapter::AxiomaticAdapterImpl { auto frame_data = frame.get_data(); auto frame_data_length = frame.get_len(); - size_t control_timestamp_byte_length = 3; - // Determine the CAN frame ID length (extended or standard) + // the CAN Frame body emitted on the wire is laid out as: + // 1 byte Control Byte + 2 bytes Time Stamp + N bytes CAN ID + 8 bytes data + static constexpr size_t SEND_TIME_STAMP_BYTES = 2; + static constexpr size_t SEND_CONTROL_AND_TIMESTAMP_BYTES = CONTROL_BYTE_BYTES + SEND_TIME_STAMP_BYTES; + + // hard coded outbound time-stamp bytes (0x46C0) + static constexpr uint8_t SEND_TIME_STAMP_BYTE_0 = 192; + static constexpr uint8_t SEND_TIME_STAMP_BYTE_1 = 70; + + // reserved high bytes of the header that sit between Protocol ID and Message Data Length (spec calls this "Message ID high byte" + "Message + // version", both 0 for our outbound CAN Stream messages). + static constexpr uint8_t SEND_RESERVED_BYTE_0 = 0x00; + static constexpr uint8_t SEND_RESERVED_BYTE_1 = 0x00; + size_t frame_id_byte_length; bool is_extended = false; if (frame.get_id_type() == polymath::socketcan::IdType::EXTENDED) { - frame_id_byte_length = 4; + frame_id_byte_length = EXTENDED_CAN_ID_BYTES; is_extended = true; } else { - frame_id_byte_length = 2; + frame_id_byte_length = STANDARD_CAN_ID_BYTES; } - size_t message_length = frame_data_length + control_timestamp_byte_length + frame_id_byte_length; + size_t message_length = frame_data_length + SEND_CONTROL_AND_TIMESTAMP_BYTES + frame_id_byte_length; unsigned char control_byte = CONTROL_BYTE_TIMESTAMP_2_BYTE_VALUE; control_byte |= (is_extended ? CONTROL_BYTE_EXTENDED_ID_FLAG : 0); control_byte |= (frame_data_length & CONTROL_BYTE_CAN_DATA_LENGTH_MASK); @@ -366,18 +386,18 @@ class AxiomaticAdapter::AxiomaticAdapterImpl // initialize the full message with the header, control bytes, timestamp bytes std::vector full_message; full_message.insert(full_message.end(), AXIOMATIC_CAN_MESSAGE_HEADER.begin(), AXIOMATIC_CAN_MESSAGE_HEADER.end()); - full_message.push_back(0x00); - full_message.push_back(0x00); + full_message.push_back(SEND_RESERVED_BYTE_0); + full_message.push_back(SEND_RESERVED_BYTE_1); full_message.push_back(static_cast(message_length & 0xFF)); - full_message.push_back(static_cast((message_length >> 8) & 0xFF)); + full_message.push_back(static_cast((message_length >> BITS_PER_BYTE) & 0xFF)); full_message.push_back(control_byte); - full_message.push_back(192); - full_message.push_back(70); + full_message.push_back(SEND_TIME_STAMP_BYTE_0); + full_message.push_back(SEND_TIME_STAMP_BYTE_1); // insert the can frame id auto can_id = frame.get_id(); for (size_t i = 0; i < frame_id_byte_length; ++i) { - full_message.push_back(static_cast((can_id >> (i * 8)) & 0xFF)); + full_message.push_back(static_cast((can_id >> (i * BITS_PER_BYTE)) & 0xFF)); } // insert the can frame data full_message.insert(full_message.end(), frame_data.begin(), frame_data.end()); @@ -404,15 +424,56 @@ class AxiomaticAdapter::AxiomaticAdapterImpl static constexpr std::array AXIOMATIC_CAN_MESSAGE_HEADER = {'A', 'X', 'I', 'O', 0xBA, 0x36, 0x01}; static constexpr std::chrono::milliseconds TCP_IP_CONNECTION_TIMEOUT_MS{3000}; - // Receive buffer size for each async_receive call. Larger than the - // protocol's per-message cap (256 bytes) by a wide margin so that bursts - // of protocol messages coalesced by the kernel into a single TCP read fit - // comfortably without truncating any message mid-body + // receive buffer size for each async_receive call. Larger than the + // protocol's per-message cap (256 bytes) by a wide margin static constexpr size_t RECEIVE_BUFFER_SIZE = 65536; + static constexpr int BITS_PER_BYTE = 8; + + // axiomatic Protocol Message Header layout (per spec v6, Table 1): + // bytes 0-3 : Axiomatic Tag "AXIO" + // bytes 4-5 : Protocol ID (0x36BA, LSB first → 0xBA 0x36 on wire) + // bytes 6-7 : Message ID (LSB first) + // byte 8 : Message Version + // bytes 9-10 : Message Data Length (LSB first) + // total header = 11 bytes; Message Data follows immediately after. + static constexpr size_t PROTOCOL_HEADER_BYTES = 11; + + // the first 6 bytes of every Axiomatic header are constant ("AXIO" + + // protocol ID). Useful when we only want to validate "this is some + // axiomatic protocol message" without dictating Message ID. + static constexpr size_t PROTOCOL_SYNC_PREFIX_BYTES = 6; + + // byte offset of the Message ID field (LSB) within a protocol header. + // message ID is a 2-byte little-endian value at offsets 6 and 7. + static constexpr size_t MESSAGE_ID_HEADER_OFFSET = 6; + + // byte offset of the Message Data Length field (LSB) within a protocol + // header. 2-byte little-endian value at offsets 9 and 10. Names the body + // length, not the total message length. + static constexpr size_t MESSAGE_DATA_LENGTH_HEADER_OFFSET = 9; + + // message IDs defined by the Axiomatic Communication Protocol (spec v6, table 3). + enum class MessageId : uint16_t + { + Undefined = 0, + CanStream = 1, + StatusRequest = 2, + StatusResponse = 3, + Heartbeat = 4, + CanFdStream = 5, + }; + + // CAN ID field width inside a CAN Stream message body + static constexpr size_t STANDARD_CAN_ID_BYTES = 2; + static constexpr size_t EXTENDED_CAN_ID_BYTES = 4; - // Control Byte (CB) field layout — first byte of every CAN/Notification - // Frame inside a CAN Stream message body. Per Axiomatic Communication - // Protocol spec v6, Section "Control Byte": + // the Control Byte is a single byte at the start of every CAN/Notification + // frame in a CAN Stream message body + static constexpr size_t CONTROL_BYTE_BYTES = 1; + + // control Byte (CB) field layout — first byte of every CAN/Notification + // frame inside a CAN Stream message body. Per Axiomatic Communication + // protocol spec v6, Section "Control Byte": // bit 7 : C_Bit — 0 = CAN Frame, 1 = Notification Frame // bits 6:5 : TS_Bit — Time Stamp length code (see TIMESTAMP_LENGTH_BYTES_TABLE) // bit 4 : EID_Bit — 0 = standard 11-bit ID, 1 = extended 29-bit ID @@ -429,16 +490,14 @@ class AxiomaticAdapter::AxiomaticAdapterImpl // raw 2-bit value as the byte count directly). static constexpr size_t TIMESTAMP_LENGTH_BYTES_TABLE[4] = {0, 1, 2, 4}; - // TS_Bit code that send() writes into the Control Byte. We always include - // a 2-byte timestamp slot on outbound; firmware tolerates whatever bytes - // are there (spec says outbound TS is ignored by the converter). + // TS_Bit code that send() writes into the Control Byte static constexpr uint8_t CONTROL_BYTE_TIMESTAMP_2_BYTE_VALUE = static_cast(2) << CONTROL_BYTE_TIMESTAMP_LENGTH_SHIFT; - // Notification Frame fixed size: 1-byte NIDB + 4-byte NDB1..NDB4. + // notification Frame fixed size: 1-byte NIDB + 4-byte NDB1..NDB4. static constexpr size_t NOTIFICATION_FRAME_TOTAL_BYTES = 5; - /// @brief Socket connection state as a struct for the mutex during TCP Open Socket to update the variables together + /// @brief socket connection state as a struct for the mutex during TCP Open Socket to update the variables together struct TCPSocketConnectionState { bool connected{false}; @@ -463,6 +522,9 @@ class AxiomaticAdapter::AxiomaticAdapterImpl std::function frame)> receive_callback_; std::function error_callback_; std::chrono::milliseconds receive_timeout_ms_; + + // receive buffer for async_receive — allocated once at construction and reused across every receive() call + std::vector rx_buffer_; }; AxiomaticAdapter::AxiomaticAdapter( From 4dea64d8332db457203fa2ec114d3bcfd81387a2 Mon Sep 17 00:00:00 2001 From: David Tarazi Date: Wed, 20 May 2026 17:21:38 -0700 Subject: [PATCH 09/12] rename shortened acronyms --- axiomatic_adapter/src/axiomatic_adapter.cpp | 42 ++++++++++----------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/axiomatic_adapter/src/axiomatic_adapter.cpp b/axiomatic_adapter/src/axiomatic_adapter.cpp index aa29b14..c48b22b 100644 --- a/axiomatic_adapter/src/axiomatic_adapter.cpp +++ b/axiomatic_adapter/src/axiomatic_adapter.cpp @@ -303,39 +303,39 @@ class AxiomaticAdapter::AxiomaticAdapterImpl { size_t walker = body_start; while (walker + CONTROL_BYTE_BYTES <= body_end) { - const uint8_t cb = data[walker]; - if ((cb & CONTROL_BYTE_NOTIFICATION_FRAME_FLAG) != 0) { - std::cerr << "[Axiomatic parser] SKIP: notification frame (CB=0x" << std::hex << static_cast(cb) + const uint8_t control_byte = data[walker]; + if ((control_byte & CONTROL_BYTE_NOTIFICATION_FRAME_FLAG) != 0) { + std::cerr << "[Axiomatic parser] SKIP: notification frame (CB=0x" << std::hex << static_cast(control_byte) << std::dec << ") at offset " << walker << " — not delivered to caller" << std::endl; walker += NOTIFICATION_FRAME_TOTAL_BYTES; continue; } - const size_t ts_size = - TIMESTAMP_LENGTH_BYTES_TABLE[(cb & CONTROL_BYTE_TIMESTAMP_LENGTH_MASK) >> CONTROL_BYTE_TIMESTAMP_LENGTH_SHIFT]; - const bool ext_id = (cb & CONTROL_BYTE_EXTENDED_ID_FLAG) != 0; - const size_t id_size = ext_id ? EXTENDED_CAN_ID_BYTES : STANDARD_CAN_ID_BYTES; - const size_t dlc = cb & CONTROL_BYTE_CAN_DATA_LENGTH_MASK; - const size_t frame_bytes = CONTROL_BYTE_BYTES + ts_size + id_size + dlc; + const size_t timestamp_size = TIMESTAMP_LENGTH_BYTES_TABLE + [(control_byte & CONTROL_BYTE_TIMESTAMP_LENGTH_MASK) >> CONTROL_BYTE_TIMESTAMP_LENGTH_SHIFT]; + const bool extended_id = (control_byte & CONTROL_BYTE_EXTENDED_ID_FLAG) != 0; + const size_t id_size = extended_id ? EXTENDED_CAN_ID_BYTES : STANDARD_CAN_ID_BYTES; + const size_t can_data_length = control_byte & CONTROL_BYTE_CAN_DATA_LENGTH_MASK; + const size_t frame_bytes = CONTROL_BYTE_BYTES + timestamp_size + id_size + can_data_length; if (walker + frame_bytes > body_end) { // truncated final frame — abandon rather than misdecode. std::cerr << "[Axiomatic parser] DROP: truncated CAN frame at offset " << walker << " (CB=0x" << std::hex - << static_cast(cb) << std::dec << " declares " << frame_bytes << " bytes but only " + << static_cast(control_byte) << std::dec << " declares " << frame_bytes << " bytes but only " << (body_end - walker) << " bytes remain in message body) — frame and remainder dropped" << std::endl; break; } - const size_t id_offset = walker + CONTROL_BYTE_BYTES + ts_size; - uint32_t cid = 0; + const size_t id_offset = walker + CONTROL_BYTE_BYTES + timestamp_size; + uint32_t can_id = 0; for (size_t i = 0; i < id_size; ++i) { - cid |= static_cast(data[id_offset + i]) << (BITS_PER_BYTE * i); + can_id |= static_cast(data[id_offset + i]) << (BITS_PER_BYTE * i); } - std::array dbytes = {0}; - std::copy_n(data.begin() + id_offset + id_size, dlc, dbytes.begin()); + std::array data_bytes = {0}; + std::copy_n(data.begin() + id_offset + id_size, can_data_length, data_bytes.begin()); polymath::socketcan::CanFrame extra; - extra.set_can_id(cid); - extra.set_len(static_cast(dlc)); - extra.set_data(dbytes); - if (ext_id) { + extra.set_can_id(can_id); + extra.set_len(static_cast(can_data_length)); + extra.set_data(data_bytes); + if (extended_id) { extra.set_id_as_extended(); } pending_frames_.push_back(extra); @@ -471,13 +471,13 @@ class AxiomaticAdapter::AxiomaticAdapterImpl // frame in a CAN Stream message body static constexpr size_t CONTROL_BYTE_BYTES = 1; - // control Byte (CB) field layout — first byte of every CAN/Notification + // control byte (CB) field layout — first byte of every CAN/Notification // frame inside a CAN Stream message body. Per Axiomatic Communication // protocol spec v6, Section "Control Byte": // bit 7 : C_Bit — 0 = CAN Frame, 1 = Notification Frame // bits 6:5 : TS_Bit — Time Stamp length code (see TIMESTAMP_LENGTH_BYTES_TABLE) // bit 4 : EID_Bit — 0 = standard 11-bit ID, 1 = extended 29-bit ID - // bits 3:0 : L_Bit — CAN Data Length (DLC), 0..8 valid + // bits 3:0 : L_Bit — CAN Data Length (can_data_length), 0..8 valid static constexpr uint8_t CONTROL_BYTE_NOTIFICATION_FRAME_FLAG = 0x80; static constexpr uint8_t CONTROL_BYTE_TIMESTAMP_LENGTH_MASK = 0x60; static constexpr int CONTROL_BYTE_TIMESTAMP_LENGTH_SHIFT = 5; From 349007b68a6695759cfa853b8b75063741009eca Mon Sep 17 00:00:00 2001 From: David Tarazi Date: Wed, 20 May 2026 17:24:05 -0700 Subject: [PATCH 10/12] fix datalength --- axiomatic_adapter/src/axiomatic_adapter.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axiomatic_adapter/src/axiomatic_adapter.cpp b/axiomatic_adapter/src/axiomatic_adapter.cpp index c48b22b..3140462 100644 --- a/axiomatic_adapter/src/axiomatic_adapter.cpp +++ b/axiomatic_adapter/src/axiomatic_adapter.cpp @@ -477,7 +477,7 @@ class AxiomaticAdapter::AxiomaticAdapterImpl // bit 7 : C_Bit — 0 = CAN Frame, 1 = Notification Frame // bits 6:5 : TS_Bit — Time Stamp length code (see TIMESTAMP_LENGTH_BYTES_TABLE) // bit 4 : EID_Bit — 0 = standard 11-bit ID, 1 = extended 29-bit ID - // bits 3:0 : L_Bit — CAN Data Length (can_data_length), 0..8 valid + // bits 3:0 : L_Bit — CAN Data Length (DLC), 0..8 valid static constexpr uint8_t CONTROL_BYTE_NOTIFICATION_FRAME_FLAG = 0x80; static constexpr uint8_t CONTROL_BYTE_TIMESTAMP_LENGTH_MASK = 0x60; static constexpr int CONTROL_BYTE_TIMESTAMP_LENGTH_SHIFT = 5; From 6b313b20aea501b95ef215b6255cf2b24ac6236c Mon Sep 17 00:00:00 2001 From: David Tarazi Date: Thu, 21 May 2026 14:30:17 -0700 Subject: [PATCH 11/12] made tcp-nodelay an argument, cleaned up documentation and made rx_buffer_ easier to read --- axiomatic_adapter/README.md | 36 +++++++++++-- .../axiomatic_adapter/axiomatic_adapter.hpp | 14 ++++- .../axiomatic_socketcan_bridge.hpp | 9 +++- axiomatic_adapter/src/axiomatic_adapter.cpp | 53 ++++++++++++------- .../src/axiomatic_socketcan_bridge.cpp | 14 ++++- .../src/axiomatic_socketcan_bridge_node.cpp | 12 +++-- 6 files changed, 107 insertions(+), 31 deletions(-) diff --git a/axiomatic_adapter/README.md b/axiomatic_adapter/README.md index 62f5db8..56a3a8f 100644 --- a/axiomatic_adapter/README.md +++ b/axiomatic_adapter/README.md @@ -11,7 +11,7 @@ https://products.axiomatic.com/viewitems/connectivity/ethernet-can-converters ### Socketcan-Axiomatic Bridge ```bash -ros2 run axiomatic_adapter axiomatic_socketcan_bridge [CAN_INTERFACE_NAME] [IP_ADDRESS] [PORT] [OPTIONAL]--retry-connection[-r] [OPTIONAL]--max-retry-attempts [OPTIONAL]--verbose[-v] +ros2 run axiomatic_adapter axiomatic_socketcan_bridge [CAN_INTERFACE_NAME] [IP_ADDRESS] [PORT] [OPTIONAL]--retry-connection[-r] [OPTIONAL]--max-retry-attempts [OPTIONAL]--verbose[-v] [OPTIONAL]--no-tcp-nodelay # Examples # generic example to bridge vcan0 with axiomatic using 192.168.50.34:4000 @@ -22,6 +22,11 @@ ros2 run axiomatic_adapter axiomatic_socketcan_bridge vcan0 192.168.50.34 4000 - # this will attempt to reconnect a max number of 100 times before failing ros2 run axiomatic_adapter axiomatic_socketcan_bridge vcan0 192.168.50.34 4000 -r --max-retry-attempts 100 + +# disable TCP_NODELAY (re-enable Nagle's algorithm). Default is on; only use if +# you are pushing bulk traffic where throughput matters more than per-frame latency. +# See the Library section below for the full tradeoff discussion. +ros2 run axiomatic_adapter axiomatic_socketcan_bridge vcan0 192.168.50.34 4000 --no-tcp-nodelay ``` ### Library @@ -38,7 +43,8 @@ polymath::can::AxiomaticAdapter adapter( port, [](std::unique_ptr /*frame*/) { /* No-op */ }, [](polymath::can::AxiomaticAdapter::socket_error_string_t /*error*/) { /*do nothing*/ }, - receive_timeout_ms + receive_timeout_ms, + /*tcp_nodelay=*/true // optional, defaults to true; see below ); // open the socket @@ -50,6 +56,28 @@ adapter.startReceptionThread(); // on shutdown/destruction, thread will join and socket will close ``` +#### `tcp_nodelay` parameter + +Controls whether `TCP_NODELAY` is set on the socket after a successful connect. +Defaults to `true`. + +- **`true` (default)**: Nagle's algorithm is disabled. Each CAN frame leaves the + host as its own TCP segment. The converter's delayed-ACK behaviour otherwise + interacts with Nagle to produce 30–450 ms inter-segment stalls under sustained + CAN traffic, which breaks UDS-style request/response timing (flash sessions, + control loops). This is the right choice for any latency-sensitive workload. +- **`false`**: Nagle's algorithm stays enabled. The kernel may coalesce many + small CAN frames into fewer, larger TCP segments. This lowers + packets-per-second on the network and reduces per-segment header overhead + (~40 bytes IP+TCP per ~24 byte CAN frame payload), at the cost of much higher + worst-case latency per individual frame. Useful only if you are pushing bulk + data where overall throughput matters more than per-frame latency, or if the + network path / receiving device cannot keep up at high PPS. + +If you have any doubt, leave it at the default. + ## KNOWN ISSUES -1. Heartbeat messages are not handled. Using TCP, this does not cause issues. In future updates heartbeat messages should be consumed and sent as needed -2. Only TCP mode is supported; no UDP support +1. Axiomatic-specific heartbeat messages are not handled and are deliberately skipped. Using TCP, this does not cause issues. In future updates heartbeat messages should be consumed and sent as needed +2. Axiomatic-specific status messages are ignored and deliberately skipped. In future revisions this should be handled and reported as necessary. +3. CAN FD is not supported +4. Only TCP mode is supported; no UDP support (heartbeats are required for UDP support) diff --git a/axiomatic_adapter/include/axiomatic_adapter/axiomatic_adapter.hpp b/axiomatic_adapter/include/axiomatic_adapter/axiomatic_adapter.hpp index 61a48e6..9c34752 100644 --- a/axiomatic_adapter/include/axiomatic_adapter/axiomatic_adapter.hpp +++ b/axiomatic_adapter/include/axiomatic_adapter/axiomatic_adapter.hpp @@ -54,7 +54,18 @@ class AxiomaticAdapter : public std::enable_shared_from_this /// @brief AxiomaticAdapter Class Init /// @param ip_address Axiomatic Device IP address to connect /// @param port Axiomatic Device Port to connect to + /// @param receive_callback_function called for each CAN frame received from the converter + /// @param error_callback_function called for socket errors and unrecoverable parser conditions /// @param receive_timeout_ms receive timeout in milliseconds + /// @param tcp_nodelay when true (default), sets TCP_NODELAY on the socket after connect to + /// disable Nagle's algorithm. This eliminates the device's delayed-ACK + + /// Nagle interaction that produces 30-450 ms inter-segment stalls under + /// sustained CAN traffic, at the cost of higher packet-per-second rate + /// on the network (each small CAN frame becomes its own TCP segment with + /// ~40 bytes of IP+TCP header overhead — meaning more interrupts/syscalls + /// on both sides, more switch/NIC PPS load, and worse bytes-on-wire + /// efficiency for bulk transfers). Better for real-time, but can, in constrained + /// resource environments, cause issues AxiomaticAdapter( const std::string & ip_address, const std::string & port, @@ -62,7 +73,8 @@ class AxiomaticAdapter : public std::enable_shared_from_this [](std::unique_ptr /*frame*/) { /*do nothing*/ }, const std::function && error_callback_function = [](socket_error_string_t /*error*/) { /*do nothing*/ }, - const std::chrono::milliseconds & receive_timeout_ms = AxiomaticAdapter::DEFAULT_SOCKET_RECEIVE_TIMEOUT_MS); + const std::chrono::milliseconds & receive_timeout_ms = AxiomaticAdapter::DEFAULT_SOCKET_RECEIVE_TIMEOUT_MS, + bool tcp_nodelay = true); /// @brief Destructor for AxiomaticAdapter virtual ~AxiomaticAdapter(); diff --git a/axiomatic_adapter/include/axiomatic_adapter/axiomatic_socketcan_bridge.hpp b/axiomatic_adapter/include/axiomatic_adapter/axiomatic_socketcan_bridge.hpp index 5456b24..e2841cd 100644 --- a/axiomatic_adapter/include/axiomatic_adapter/axiomatic_socketcan_bridge.hpp +++ b/axiomatic_adapter/include/axiomatic_adapter/axiomatic_socketcan_bridge.hpp @@ -36,8 +36,15 @@ class AxiomaticSocketcanBridge /// @param ip IP address of the ethernet can device /// @param port port for the ethernet can device /// @param verbose enables printing of debug logs if true. Defaults to false + /// @param tcp_nodelay when true (default), sets TCP_NODELAY on the underlying + /// AxiomaticAdapter socket. See AxiomaticAdapter for the + /// full tradeoff discussion. AxiomaticSocketcanBridge( - const std::string & can_interface_name, const std::string & ip, const std::string & port, bool verbose = false); + const std::string & can_interface_name, + const std::string & ip, + const std::string & port, + bool verbose = false, + bool tcp_nodelay = true); /// @brief Destruct axiomatic socketcan bridge ~AxiomaticSocketcanBridge(); diff --git a/axiomatic_adapter/src/axiomatic_adapter.cpp b/axiomatic_adapter/src/axiomatic_adapter.cpp index 3140462..2b65f92 100644 --- a/axiomatic_adapter/src/axiomatic_adapter.cpp +++ b/axiomatic_adapter/src/axiomatic_adapter.cpp @@ -45,7 +45,8 @@ class AxiomaticAdapter::AxiomaticAdapterImpl const std::string & port, const std::function frame)> && receive_callback_function, const std::function && error_callback_function, - const std::chrono::milliseconds & receive_timeout_ms) + const std::chrono::milliseconds & receive_timeout_ms, + bool tcp_nodelay) : tcp_io_context_() , tcp_socket_(tcp_io_context_) , ip_address_(ip_address) @@ -54,6 +55,7 @@ class AxiomaticAdapter::AxiomaticAdapterImpl , error_callback_(error_callback_function) , receive_timeout_ms_(receive_timeout_ms) , rx_buffer_(RECEIVE_BUFFER_SIZE, 0) + , tcp_nodelay_(tcp_nodelay) {} ~AxiomaticAdapterImpl() @@ -113,9 +115,9 @@ class AxiomaticAdapter::AxiomaticAdapterImpl socket_state_ = TCPSocketState::ERROR; return false; } - // disable Nagle's algorithm. This removes batching TCP packets for low latency comms and - // keeps one CAN frame per TCP message - { + // optionally disable Nagle's algorithm so each CAN frame becomes its + // own TCP segment instead of being coalesced + if (tcp_nodelay_) { boost::system::error_code nd_ec; tcp_socket_.set_option(boost::asio::ip::tcp::no_delay(true), nd_ec); if (nd_ec) { @@ -202,7 +204,6 @@ class AxiomaticAdapter::AxiomaticAdapterImpl return std::nullopt; } - auto & data = rx_buffer_; size_t bytes_received = 0; std::atomic data_received(false); boost::system::error_code error_code; @@ -213,7 +214,7 @@ class AxiomaticAdapter::AxiomaticAdapterImpl // start async receive operation tcp_socket_.async_receive( - boost::asio::buffer(data.data(), RECEIVE_BUFFER_SIZE), + boost::asio::buffer(rx_buffer_.data(), RECEIVE_BUFFER_SIZE), [&](const boost::system::error_code & error, std::size_t bytes_transferred) { error_code = error; if (!error) { @@ -245,8 +246,11 @@ class AxiomaticAdapter::AxiomaticAdapterImpl } // --- Process the received data --- - // walk the buffer starting at position 0, dispatching every Axiomatic protocol message we find by its Message ID + + // NOTE: given the axiomatic documentation claims a deliberate 256-byte design + the large buffer size used in + // rx_buffer_, a mid-tcp-message split is never supposed to occur. In testing this drop has never happened. + // it is possible that with non-standard very low MTU's or in future revisions, this assumption no longer holds if (bytes_received < PROTOCOL_HEADER_BYTES) { std::cerr << "[Axiomatic parser] DROP: received " << bytes_received << " bytes, too short to contain a complete protocol header" << std::endl; @@ -258,26 +262,26 @@ class AxiomaticAdapter::AxiomaticAdapterImpl auto protocol_header_match = std::equal( AXIOMATIC_CAN_MESSAGE_HEADER.begin(), AXIOMATIC_CAN_MESSAGE_HEADER.begin() + PROTOCOL_SYNC_PREFIX_BYTES, - data.begin() + scan_pos); + rx_buffer_.begin() + scan_pos); if (!protocol_header_match) { std::cerr << "[Axiomatic parser] DROP: sync prefix mismatch at offset " << scan_pos << " of " << bytes_received << "-byte TCP read; first " << PROTOCOL_SYNC_PREFIX_BYTES << " bytes there: " << std::hex; for (size_t i = 0; i < PROTOCOL_SYNC_PREFIX_BYTES && scan_pos + i < bytes_received; ++i) { - std::cerr << ' ' << static_cast(data[scan_pos + i]); + std::cerr << ' ' << static_cast(rx_buffer_[scan_pos + i]); } std::cerr << std::dec << " (stopping scan; remaining " << (bytes_received - scan_pos) << " bytes ignored — possible truncated message or partial TCP read)" << std::endl; break; } const auto msg_id = static_cast( - static_cast(data[scan_pos + MESSAGE_ID_HEADER_OFFSET]) | - (static_cast(data[scan_pos + MESSAGE_ID_HEADER_OFFSET + 1]) << BITS_PER_BYTE)); + static_cast(rx_buffer_[scan_pos + MESSAGE_ID_HEADER_OFFSET]) | + (static_cast(rx_buffer_[scan_pos + MESSAGE_ID_HEADER_OFFSET + 1]) << BITS_PER_BYTE)); const size_t decl_len = - static_cast(data[scan_pos + MESSAGE_DATA_LENGTH_HEADER_OFFSET]) | - (static_cast(data[scan_pos + MESSAGE_DATA_LENGTH_HEADER_OFFSET + 1]) << BITS_PER_BYTE); + static_cast(rx_buffer_[scan_pos + MESSAGE_DATA_LENGTH_HEADER_OFFSET]) | + (static_cast(rx_buffer_[scan_pos + MESSAGE_DATA_LENGTH_HEADER_OFFSET + 1]) << BITS_PER_BYTE); const size_t body_end = std::min(scan_pos + PROTOCOL_HEADER_BYTES + decl_len, bytes_received); if (msg_id == MessageId::CanStream) { - decodePackedCanFramesInto(data, scan_pos + PROTOCOL_HEADER_BYTES, body_end); + decodePackedCanFramesInto(rx_buffer_, scan_pos + PROTOCOL_HEADER_BYTES, body_end); } else { std::cerr << "[Axiomatic parser] SKIP: non-CAN-Stream message (Message ID " << static_cast(msg_id) << ", " << decl_len << "-byte body) at offset " << scan_pos @@ -299,11 +303,11 @@ class AxiomaticAdapter::AxiomaticAdapterImpl } // walks the body of a single CAN Stream message and pushes every CAN frame onto pending_frames_ - void decodePackedCanFramesInto(const std::vector & data, size_t body_start, size_t body_end) + void decodePackedCanFramesInto(const std::vector & data_buffer, size_t body_start, size_t body_end) { size_t walker = body_start; while (walker + CONTROL_BYTE_BYTES <= body_end) { - const uint8_t control_byte = data[walker]; + const uint8_t control_byte = data_buffer[walker]; if ((control_byte & CONTROL_BYTE_NOTIFICATION_FRAME_FLAG) != 0) { std::cerr << "[Axiomatic parser] SKIP: notification frame (CB=0x" << std::hex << static_cast(control_byte) << std::dec << ") at offset " << walker << " — not delivered to caller" << std::endl; @@ -326,10 +330,10 @@ class AxiomaticAdapter::AxiomaticAdapterImpl const size_t id_offset = walker + CONTROL_BYTE_BYTES + timestamp_size; uint32_t can_id = 0; for (size_t i = 0; i < id_size; ++i) { - can_id |= static_cast(data[id_offset + i]) << (BITS_PER_BYTE * i); + can_id |= static_cast(data_buffer[id_offset + i]) << (BITS_PER_BYTE * i); } std::array data_bytes = {0}; - std::copy_n(data.begin() + id_offset + id_size, can_data_length, data_bytes.begin()); + std::copy_n(data_buffer.begin() + id_offset + id_size, can_data_length, data_bytes.begin()); polymath::socketcan::CanFrame extra; extra.set_can_id(can_id); @@ -525,6 +529,9 @@ class AxiomaticAdapter::AxiomaticAdapterImpl // receive buffer for async_receive — allocated once at construction and reused across every receive() call std::vector rx_buffer_; + + // when true, disable Nagle's algorithm on the TCP socket after connect + bool tcp_nodelay_; }; AxiomaticAdapter::AxiomaticAdapter( @@ -532,9 +539,15 @@ AxiomaticAdapter::AxiomaticAdapter( const std::string & port, const std::function frame)> && receive_callback_function, const std::function && error_callback_function, - const std::chrono::milliseconds & receive_timeout_ms) + const std::chrono::milliseconds & receive_timeout_ms, + bool tcp_nodelay) : pimpl_(std::make_unique( - ip_address, port, std::move(receive_callback_function), std::move(error_callback_function), receive_timeout_ms)) + ip_address, + port, + std::move(receive_callback_function), + std::move(error_callback_function), + receive_timeout_ms, + tcp_nodelay)) {} AxiomaticAdapter::~AxiomaticAdapter() diff --git a/axiomatic_adapter/src/axiomatic_socketcan_bridge.cpp b/axiomatic_adapter/src/axiomatic_socketcan_bridge.cpp index 79fdb37..01630e5 100644 --- a/axiomatic_adapter/src/axiomatic_socketcan_bridge.cpp +++ b/axiomatic_adapter/src/axiomatic_socketcan_bridge.cpp @@ -23,9 +23,19 @@ namespace can { AxiomaticSocketcanBridge::AxiomaticSocketcanBridge( - const std::string & can_interface_name, const std::string & ip, const std::string & port, bool verbose) + const std::string & can_interface_name, + const std::string & ip, + const std::string & port, + bool verbose, + bool tcp_nodelay) : socketcan_adapter_(can_interface_name) -, axiomatic_adapter_(ip, port, std::bind(&AxiomaticSocketcanBridge::ethcanReceiveCallback, this, std::placeholders::_1)) +, axiomatic_adapter_( + ip, + port, + std::bind(&AxiomaticSocketcanBridge::ethcanReceiveCallback, this, std::placeholders::_1), + [](AxiomaticAdapter::socket_error_string_t /*error*/) { /* no-op */ }, + AxiomaticAdapter::DEFAULT_SOCKET_RECEIVE_TIMEOUT_MS, + tcp_nodelay) , verbose_(verbose) { socketcan_adapter_.setOnReceiveCallback( diff --git a/axiomatic_adapter/src/axiomatic_socketcan_bridge_node.cpp b/axiomatic_adapter/src/axiomatic_socketcan_bridge_node.cpp index b2536f4..80833e7 100644 --- a/axiomatic_adapter/src/axiomatic_socketcan_bridge_node.cpp +++ b/axiomatic_adapter/src/axiomatic_socketcan_bridge_node.cpp @@ -48,7 +48,8 @@ void configureArguments( std::string & port, bool & verbose, bool & retry_connection, - int & max_retry_attempts) + int & max_retry_attempts, + bool & tcp_nodelay) { app.add_option("can_interface", can_interface, "CAN interface to use (default: vcan0)")->default_val("vcan0"); app.add_option("ip", ip, "IP address of the bridge (default: 192.168.0.34)")->default_val("192.168.0.34"); @@ -58,6 +59,10 @@ void configureArguments( app .add_option("--max-retry-attempts", max_retry_attempts, "Maximum number of retry attempts (default: -1 = infinite)") ->default_val(-1); + app.add_flag( + "!--no-tcp-nodelay", + tcp_nodelay, + "Disable TCP_NODELAY (re-enable Nagle's algorithm). Default is on; see the AxiomaticAdapter docs for tradeoffs."); } int main(int argc, char * argv[]) @@ -65,11 +70,12 @@ int main(int argc, char * argv[]) std::signal(SIGINT, signalHandler); std::signal(SIGTERM, signalHandler); std::string can_interface, ip, port; + bool tcp_nodelay = true; CLI::App app{"Axiomatic SocketCAN Bridge"}; - configureArguments(app, can_interface, ip, port, verbose, retry_connection, max_retry_attempts); + configureArguments(app, can_interface, ip, port, verbose, retry_connection, max_retry_attempts, tcp_nodelay); CLI11_PARSE(app, argc, argv); - polymath::can::AxiomaticSocketcanBridge bridge(can_interface, ip, port, verbose); + polymath::can::AxiomaticSocketcanBridge bridge(can_interface, ip, port, verbose, tcp_nodelay); std::cout << "Axiomatic Socketcan Bridge configuring..." << std::endl; From cbaa4adf43f5adf227cc1de30d88718090c73881 Mon Sep 17 00:00:00 2001 From: David Tarazi Date: Thu, 21 May 2026 14:44:09 -0700 Subject: [PATCH 12/12] adding print statement for tcp-nodelay --- axiomatic_adapter/src/axiomatic_adapter.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/axiomatic_adapter/src/axiomatic_adapter.cpp b/axiomatic_adapter/src/axiomatic_adapter.cpp index 2b65f92..ceaa88a 100644 --- a/axiomatic_adapter/src/axiomatic_adapter.cpp +++ b/axiomatic_adapter/src/axiomatic_adapter.cpp @@ -122,7 +122,12 @@ class AxiomaticAdapter::AxiomaticAdapterImpl tcp_socket_.set_option(boost::asio::ip::tcp::no_delay(true), nd_ec); if (nd_ec) { std::cerr << "[Axiomatic] Failed to set TCP_NODELAY: " << nd_ec.message() << std::endl; + } else { + std::cout << "[Axiomatic] TCP_NODELAY enabled (Nagle's algorithm disabled)" << std::endl; } + } else { + std::cout << "[Axiomatic] TCP_NODELAY disabled (Nagle's algorithm active — kernel may coalesce small writes)" + << std::endl; } socket_state_ = TCPSocketState::OPEN;