diff --git a/axiomatic_adapter/README.md b/axiomatic_adapter/README.md index e8f9b50..56a3a8f 100644 --- a/axiomatic_adapter/README.md +++ b/axiomatic_adapter/README.md @@ -11,7 +11,7 @@ https://products.axiomatic.com/viewitems/connectivity/ethernet-can-converters ### Socketcan-Axiomatic Bridge ```bash -ros2 run axiomatic_adapter axiomatic_socketcan_bridge [CAN_INTERFACE_NAME] [IP_ADDRESS] [PORT] [OPTIONAL]--retry-connection[-r] [OPTIONAL]--max-retry-attempts [OPTIONAL]--verbose[-v] +ros2 run axiomatic_adapter axiomatic_socketcan_bridge [CAN_INTERFACE_NAME] [IP_ADDRESS] [PORT] [OPTIONAL]--retry-connection[-r] [OPTIONAL]--max-retry-attempts [OPTIONAL]--verbose[-v] [OPTIONAL]--no-tcp-nodelay # Examples # generic example to bridge vcan0 with axiomatic using 192.168.50.34:4000 @@ -22,6 +22,11 @@ ros2 run axiomatic_adapter axiomatic_socketcan_bridge vcan0 192.168.50.34 4000 - # this will attempt to reconnect a max number of 100 times before failing ros2 run axiomatic_adapter axiomatic_socketcan_bridge vcan0 192.168.50.34 4000 -r --max-retry-attempts 100 + +# disable TCP_NODELAY (re-enable Nagle's algorithm). Default is on; only use if +# you are pushing bulk traffic where throughput matters more than per-frame latency. +# See the Library section below for the full tradeoff discussion. +ros2 run axiomatic_adapter axiomatic_socketcan_bridge vcan0 192.168.50.34 4000 --no-tcp-nodelay ``` ### Library @@ -38,7 +43,8 @@ polymath::can::AxiomaticAdapter adapter( port, [](std::unique_ptr /*frame*/) { /* No-op */ }, [](polymath::can::AxiomaticAdapter::socket_error_string_t /*error*/) { /*do nothing*/ }, - receive_timeout_ms + receive_timeout_ms, + /*tcp_nodelay=*/true // optional, defaults to true; see below ); // open the socket @@ -50,6 +56,28 @@ adapter.startReceptionThread(); // on shutdown/destruction, thread will join and socket will close ``` +#### `tcp_nodelay` parameter + +Controls whether `TCP_NODELAY` is set on the socket after a successful connect. +Defaults to `true`. + +- **`true` (default)**: Nagle's algorithm is disabled. Each CAN frame leaves the + host as its own TCP segment. The converter's delayed-ACK behaviour otherwise + interacts with Nagle to produce 30–450 ms inter-segment stalls under sustained + CAN traffic, which breaks UDS-style request/response timing (flash sessions, + control loops). This is the right choice for any latency-sensitive workload. +- **`false`**: Nagle's algorithm stays enabled. The kernel may coalesce many + small CAN frames into fewer, larger TCP segments. This lowers + packets-per-second on the network and reduces per-segment header overhead + (~40 bytes IP+TCP per ~24 byte CAN frame payload), at the cost of much higher + worst-case latency per individual frame. Useful only if you are pushing bulk + data where overall throughput matters more than per-frame latency, or if the + network path / receiving device cannot keep up at high PPS. + +If you have any doubt, leave it at the default. + ## KNOWN ISSUES -1. There seems to be an issue with massive amounts of CAN data causing timing inconsistencies through the adapter library - 1. This has not been fully diagnosed +1. Axiomatic-specific heartbeat messages are not handled and are deliberately skipped. Using TCP, this does not cause issues. In future updates heartbeat messages should be consumed and sent as needed +2. Axiomatic-specific status messages are ignored and deliberately skipped. In future revisions this should be handled and reported as necessary. +3. CAN FD is not supported +4. Only TCP mode is supported; no UDP support (heartbeats are required for UDP support) diff --git a/axiomatic_adapter/include/axiomatic_adapter/axiomatic_adapter.hpp b/axiomatic_adapter/include/axiomatic_adapter/axiomatic_adapter.hpp index 61a48e6..9c34752 100644 --- a/axiomatic_adapter/include/axiomatic_adapter/axiomatic_adapter.hpp +++ b/axiomatic_adapter/include/axiomatic_adapter/axiomatic_adapter.hpp @@ -54,7 +54,18 @@ class AxiomaticAdapter : public std::enable_shared_from_this /// @brief AxiomaticAdapter Class Init /// @param ip_address Axiomatic Device IP address to connect /// @param port Axiomatic Device Port to connect to + /// @param receive_callback_function called for each CAN frame received from the converter + /// @param error_callback_function called for socket errors and unrecoverable parser conditions /// @param receive_timeout_ms receive timeout in milliseconds + /// @param tcp_nodelay when true (default), sets TCP_NODELAY on the socket after connect to + /// disable Nagle's algorithm. This eliminates the device's delayed-ACK + + /// Nagle interaction that produces 30-450 ms inter-segment stalls under + /// sustained CAN traffic, at the cost of higher packet-per-second rate + /// on the network (each small CAN frame becomes its own TCP segment with + /// ~40 bytes of IP+TCP header overhead — meaning more interrupts/syscalls + /// on both sides, more switch/NIC PPS load, and worse bytes-on-wire + /// efficiency for bulk transfers). Better for real-time, but can, in constrained + /// resource environments, cause issues AxiomaticAdapter( const std::string & ip_address, const std::string & port, @@ -62,7 +73,8 @@ class AxiomaticAdapter : public std::enable_shared_from_this [](std::unique_ptr /*frame*/) { /*do nothing*/ }, const std::function && error_callback_function = [](socket_error_string_t /*error*/) { /*do nothing*/ }, - const std::chrono::milliseconds & receive_timeout_ms = AxiomaticAdapter::DEFAULT_SOCKET_RECEIVE_TIMEOUT_MS); + const std::chrono::milliseconds & receive_timeout_ms = AxiomaticAdapter::DEFAULT_SOCKET_RECEIVE_TIMEOUT_MS, + bool tcp_nodelay = true); /// @brief Destructor for AxiomaticAdapter virtual ~AxiomaticAdapter(); diff --git a/axiomatic_adapter/include/axiomatic_adapter/axiomatic_socketcan_bridge.hpp b/axiomatic_adapter/include/axiomatic_adapter/axiomatic_socketcan_bridge.hpp index 5456b24..e2841cd 100644 --- a/axiomatic_adapter/include/axiomatic_adapter/axiomatic_socketcan_bridge.hpp +++ b/axiomatic_adapter/include/axiomatic_adapter/axiomatic_socketcan_bridge.hpp @@ -36,8 +36,15 @@ class AxiomaticSocketcanBridge /// @param ip IP address of the ethernet can device /// @param port port for the ethernet can device /// @param verbose enables printing of debug logs if true. Defaults to false + /// @param tcp_nodelay when true (default), sets TCP_NODELAY on the underlying + /// AxiomaticAdapter socket. See AxiomaticAdapter for the + /// full tradeoff discussion. AxiomaticSocketcanBridge( - const std::string & can_interface_name, const std::string & ip, const std::string & port, bool verbose = false); + const std::string & can_interface_name, + const std::string & ip, + const std::string & port, + bool verbose = false, + bool tcp_nodelay = true); /// @brief Destruct axiomatic socketcan bridge ~AxiomaticSocketcanBridge(); diff --git a/axiomatic_adapter/src/axiomatic_adapter.cpp b/axiomatic_adapter/src/axiomatic_adapter.cpp index 8fb1491..ceaa88a 100644 --- a/axiomatic_adapter/src/axiomatic_adapter.cpp +++ b/axiomatic_adapter/src/axiomatic_adapter.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -44,7 +45,8 @@ class AxiomaticAdapter::AxiomaticAdapterImpl const std::string & port, const std::function frame)> && receive_callback_function, const std::function && error_callback_function, - const std::chrono::milliseconds & receive_timeout_ms) + const std::chrono::milliseconds & receive_timeout_ms, + bool tcp_nodelay) : tcp_io_context_() , tcp_socket_(tcp_io_context_) , ip_address_(ip_address) @@ -52,6 +54,8 @@ class AxiomaticAdapter::AxiomaticAdapterImpl , receive_callback_(receive_callback_function) , error_callback_(error_callback_function) , receive_timeout_ms_(receive_timeout_ms) + , rx_buffer_(RECEIVE_BUFFER_SIZE, 0) + , tcp_nodelay_(tcp_nodelay) {} ~AxiomaticAdapterImpl() @@ -72,17 +76,17 @@ class AxiomaticAdapter::AxiomaticAdapterImpl TCPSocketConnectionState connection_state{false, boost::asio::error::would_block}; std::mutex connection_state_mutex; - // Asynchronously attempt to connect + // asynchronously attempt to connect boost::asio::async_connect( tcp_socket_, endpoints, [&](const boost::system::error_code & error, const boost::asio::ip::tcp::endpoint &) { std::lock_guard guard(connection_state_mutex); connection_state.error_code = error; connection_state.connected = !error; - // Cancel timeout if connected successfully + // cancel timeout if connected successfully timer.cancel(); }); - // Set up a timer to cancel the operation if it exceeds the timeout + // set up a timer to cancel the operation if it exceeds the timeout timer.async_wait([&](const boost::system::error_code & error) { if (!error) { std::lock_guard guard(connection_state_mutex); @@ -93,7 +97,7 @@ class AxiomaticAdapter::AxiomaticAdapterImpl } }); - // Run the I/O context to handle events + // run the I/O context to handle events tcp_io_context_.restart(); tcp_io_context_.run(); @@ -111,6 +115,20 @@ class AxiomaticAdapter::AxiomaticAdapterImpl socket_state_ = TCPSocketState::ERROR; return false; } + // optionally disable Nagle's algorithm so each CAN frame becomes its + // own TCP segment instead of being coalesced + if (tcp_nodelay_) { + boost::system::error_code nd_ec; + tcp_socket_.set_option(boost::asio::ip::tcp::no_delay(true), nd_ec); + if (nd_ec) { + std::cerr << "[Axiomatic] Failed to set TCP_NODELAY: " << nd_ec.message() << std::endl; + } else { + std::cout << "[Axiomatic] TCP_NODELAY enabled (Nagle's algorithm disabled)" << std::endl; + } + } else { + std::cout << "[Axiomatic] TCP_NODELAY disabled (Nagle's algorithm active — kernel may coalesce small writes)" + << std::endl; + } socket_state_ = TCPSocketState::OPEN; return true; @@ -170,10 +188,10 @@ class AxiomaticAdapter::AxiomaticAdapterImpl stop_thread_requested_ = true; if (tcp_receive_thread_.joinable()) { - // Use std::async to wait asynchronously for the thread to stop + // use std::async to wait asynchronously for the thread to stop std::future join_future = std::async(std::launch::async, [this] { tcp_receive_thread_.join(); }); - // Wait for the thread to stop within the timeout period + // wait for the thread to stop within the timeout period return join_future.wait_for(timeout_s) == std::future_status::ready; } @@ -182,39 +200,49 @@ class AxiomaticAdapter::AxiomaticAdapterImpl std::optional receive(polymath::socketcan::CanFrame & can_frame) { - std::vector data(1024, 0); + // A previous TCP read may have decoded several CAN frames out of a single + // packed CAN Stream message — deliver those one at a time before doing + // another network read, so packed frames don't get silently dropped + if (!pending_frames_.empty()) { + can_frame = pending_frames_.front(); + pending_frames_.pop_front(); + return std::nullopt; + } + + size_t bytes_received = 0; std::atomic data_received(false); boost::system::error_code error_code; - // Set up the timer for timeout + // set up the timer for timeout boost::asio::steady_timer timer(tcp_io_context_); timer.expires_after(receive_timeout_ms_); - // Start async receive operation + // start async receive operation tcp_socket_.async_receive( - boost::asio::buffer(data), [&](const boost::system::error_code & error, std::size_t bytes_transferred) { + boost::asio::buffer(rx_buffer_.data(), RECEIVE_BUFFER_SIZE), + [&](const boost::system::error_code & error, std::size_t bytes_transferred) { error_code = error; if (!error) { - data.resize(bytes_transferred); + bytes_received = bytes_transferred; data_received = true; } timer.cancel(); }); - // Set up the timer to handle timeout cancellation + // set up the timer to handle timeout cancellation timer.async_wait([&](const boost::system::error_code & error) { if (!error && !data_received.load()) { error_code = boost::asio::error::timed_out; - // Cancel the ongoing async receive operation on timeout (does not close socket) + // cancel the ongoing async receive operation on timeout (does not close socket) tcp_socket_.cancel(); } }); - // Run the I/O operations concurrently (this allows for new async operations in the future) + // run the I/O operations concurrently (this allows for new async operations in the future) tcp_io_context_.restart(); tcp_io_context_.run(); - // Check for timeout or other errors + // check for timeout or other errors if (error_code == boost::asio::error::timed_out) { return std::optional("Receive operation timed out"); } else if (error_code) { @@ -223,63 +251,107 @@ class AxiomaticAdapter::AxiomaticAdapterImpl } // --- Process the received data --- - - // Check size, header info and message type - if (data.size() < AXIOMATIC_CAN_MESSAGE_HEADER.size() + 5) { - return std::make_optional("Data too short for header and control byte."); - } - if (!std::equal(AXIOMATIC_CAN_MESSAGE_HEADER.begin(), AXIOMATIC_CAN_MESSAGE_HEADER.end(), data.begin())) { - return std::make_optional("Not a valid CAN message."); - } - - uint8_t control_byte = data[11]; - // Extract timestamp size (bits 6 & 5) - size_t timestamp_size = (control_byte & 0x60) >> 5; - // Check if the frame is extended (bit 4) - bool is_can_extended = (control_byte & 0x10) >> 4; - // Extract CAN frame length (lower 4 bits) - size_t can_length = control_byte & 0x0F; - - // Determine where the CAN ID starts (after timestamp bytes) - size_t can_id_start = 12 + timestamp_size; - uint32_t can_id = 0; - size_t can_data_start = 0; - - // Ensure data is large enough for CAN ID extraction - size_t min_id_size = is_can_extended ? 4 : 2; - if (data.size() < can_id_start + min_id_size) { - return std::make_optional("Data too short for CAN ID."); + // walk the buffer starting at position 0, dispatching every Axiomatic protocol message we find by its Message ID + + // NOTE: given the axiomatic documentation claims a deliberate 256-byte design + the large buffer size used in + // rx_buffer_, a mid-tcp-message split is never supposed to occur. In testing this drop has never happened. + // it is possible that with non-standard very low MTU's or in future revisions, this assumption no longer holds + if (bytes_received < PROTOCOL_HEADER_BYTES) { + std::cerr << "[Axiomatic parser] DROP: received " << bytes_received + << " bytes, too short to contain a complete protocol header" << std::endl; + return std::make_optional("Data too short for header."); } - // Extract CAN ID (little-endian) - if (!is_can_extended) { - can_id = static_cast(data[can_id_start] | (data[can_id_start + 1] << 8)); - can_data_start = can_id_start + 2; - } else { - can_id = static_cast( - data[can_id_start] | (data[can_id_start + 1] << 8) | (data[can_id_start + 2] << 16) | - (data[can_id_start + 3] << 24)); - can_data_start = can_id_start + 4; - can_frame.set_id_as_extended(); + size_t scan_pos = 0; + while (scan_pos + PROTOCOL_HEADER_BYTES <= bytes_received) { + auto protocol_header_match = std::equal( + AXIOMATIC_CAN_MESSAGE_HEADER.begin(), + AXIOMATIC_CAN_MESSAGE_HEADER.begin() + PROTOCOL_SYNC_PREFIX_BYTES, + rx_buffer_.begin() + scan_pos); + if (!protocol_header_match) { + std::cerr << "[Axiomatic parser] DROP: sync prefix mismatch at offset " << scan_pos << " of " << bytes_received + << "-byte TCP read; first " << PROTOCOL_SYNC_PREFIX_BYTES << " bytes there: " << std::hex; + for (size_t i = 0; i < PROTOCOL_SYNC_PREFIX_BYTES && scan_pos + i < bytes_received; ++i) { + std::cerr << ' ' << static_cast(rx_buffer_[scan_pos + i]); + } + std::cerr << std::dec << " (stopping scan; remaining " << (bytes_received - scan_pos) + << " bytes ignored — possible truncated message or partial TCP read)" << std::endl; + break; + } + const auto msg_id = static_cast( + static_cast(rx_buffer_[scan_pos + MESSAGE_ID_HEADER_OFFSET]) | + (static_cast(rx_buffer_[scan_pos + MESSAGE_ID_HEADER_OFFSET + 1]) << BITS_PER_BYTE)); + const size_t decl_len = + static_cast(rx_buffer_[scan_pos + MESSAGE_DATA_LENGTH_HEADER_OFFSET]) | + (static_cast(rx_buffer_[scan_pos + MESSAGE_DATA_LENGTH_HEADER_OFFSET + 1]) << BITS_PER_BYTE); + const size_t body_end = std::min(scan_pos + PROTOCOL_HEADER_BYTES + decl_len, bytes_received); + if (msg_id == MessageId::CanStream) { + decodePackedCanFramesInto(rx_buffer_, scan_pos + PROTOCOL_HEADER_BYTES, body_end); + } else { + std::cerr << "[Axiomatic parser] SKIP: non-CAN-Stream message (Message ID " << static_cast(msg_id) + << ", " << decl_len << "-byte body) at offset " << scan_pos + << " — heartbeat/status/FD/unknown; not delivered to caller" << std::endl; + } + scan_pos += PROTOCOL_HEADER_BYTES + decl_len; } - // Ensure data is large enough for CAN payload - if (data.size() < can_data_start + can_length) { - return std::make_optional("Data too short for CAN payload."); + if (pending_frames_.empty()) { + if (scan_pos == 0) { + return std::make_optional("Not a valid Axiomatic message."); + } + return std::make_optional("No CAN frames in received protocol traffic."); } - // Extract CAN data (zero-padded to 8 bytes) - std::array can_data = {0}; - std::copy_n(data.begin() + can_data_start, can_length, can_data.begin()); - - // Set CAN frame properties - can_frame.set_can_id(can_id); - can_frame.set_len(can_length); - can_frame.set_data(can_data); - + can_frame = pending_frames_.front(); + pending_frames_.pop_front(); return std::nullopt; } + // walks the body of a single CAN Stream message and pushes every CAN frame onto pending_frames_ + void decodePackedCanFramesInto(const std::vector & data_buffer, size_t body_start, size_t body_end) + { + size_t walker = body_start; + while (walker + CONTROL_BYTE_BYTES <= body_end) { + const uint8_t control_byte = data_buffer[walker]; + if ((control_byte & CONTROL_BYTE_NOTIFICATION_FRAME_FLAG) != 0) { + std::cerr << "[Axiomatic parser] SKIP: notification frame (CB=0x" << std::hex << static_cast(control_byte) + << std::dec << ") at offset " << walker << " — not delivered to caller" << std::endl; + walker += NOTIFICATION_FRAME_TOTAL_BYTES; + continue; + } + const size_t timestamp_size = TIMESTAMP_LENGTH_BYTES_TABLE + [(control_byte & CONTROL_BYTE_TIMESTAMP_LENGTH_MASK) >> CONTROL_BYTE_TIMESTAMP_LENGTH_SHIFT]; + const bool extended_id = (control_byte & CONTROL_BYTE_EXTENDED_ID_FLAG) != 0; + const size_t id_size = extended_id ? EXTENDED_CAN_ID_BYTES : STANDARD_CAN_ID_BYTES; + const size_t can_data_length = control_byte & CONTROL_BYTE_CAN_DATA_LENGTH_MASK; + const size_t frame_bytes = CONTROL_BYTE_BYTES + timestamp_size + id_size + can_data_length; + if (walker + frame_bytes > body_end) { + // truncated final frame — abandon rather than misdecode. + std::cerr << "[Axiomatic parser] DROP: truncated CAN frame at offset " << walker << " (CB=0x" << std::hex + << static_cast(control_byte) << std::dec << " declares " << frame_bytes << " bytes but only " + << (body_end - walker) << " bytes remain in message body) — frame and remainder dropped" << std::endl; + break; + } + const size_t id_offset = walker + CONTROL_BYTE_BYTES + timestamp_size; + uint32_t can_id = 0; + for (size_t i = 0; i < id_size; ++i) { + can_id |= static_cast(data_buffer[id_offset + i]) << (BITS_PER_BYTE * i); + } + std::array data_bytes = {0}; + std::copy_n(data_buffer.begin() + id_offset + id_size, can_data_length, data_bytes.begin()); + + polymath::socketcan::CanFrame extra; + extra.set_can_id(can_id); + extra.set_len(static_cast(can_data_length)); + extra.set_data(data_bytes); + if (extended_id) { + extra.set_id_as_extended(); + } + pending_frames_.push_back(extra); + walker += frame_bytes; + } + } + std::optional receive() { polymath::socketcan::CanFrame can_frame = polymath::socketcan::CanFrame(); @@ -291,38 +363,50 @@ class AxiomaticAdapter::AxiomaticAdapterImpl { auto frame_data = frame.get_data(); auto frame_data_length = frame.get_len(); - size_t control_timestamp_byte_length = 3; - // Determine the CAN frame ID length (extended or standard) + // the CAN Frame body emitted on the wire is laid out as: + // 1 byte Control Byte + 2 bytes Time Stamp + N bytes CAN ID + 8 bytes data + static constexpr size_t SEND_TIME_STAMP_BYTES = 2; + static constexpr size_t SEND_CONTROL_AND_TIMESTAMP_BYTES = CONTROL_BYTE_BYTES + SEND_TIME_STAMP_BYTES; + + // hard coded outbound time-stamp bytes (0x46C0) + static constexpr uint8_t SEND_TIME_STAMP_BYTE_0 = 192; + static constexpr uint8_t SEND_TIME_STAMP_BYTE_1 = 70; + + // reserved high bytes of the header that sit between Protocol ID and Message Data Length (spec calls this "Message ID high byte" + "Message + // version", both 0 for our outbound CAN Stream messages). + static constexpr uint8_t SEND_RESERVED_BYTE_0 = 0x00; + static constexpr uint8_t SEND_RESERVED_BYTE_1 = 0x00; + size_t frame_id_byte_length; bool is_extended = false; if (frame.get_id_type() == polymath::socketcan::IdType::EXTENDED) { - frame_id_byte_length = 4; + frame_id_byte_length = EXTENDED_CAN_ID_BYTES; is_extended = true; } else { - frame_id_byte_length = 2; + frame_id_byte_length = STANDARD_CAN_ID_BYTES; } - size_t message_length = frame_data_length + control_timestamp_byte_length + frame_id_byte_length; - unsigned char control_byte = (1 << 6); - control_byte |= (is_extended ? (1 << 4) : 0); - control_byte |= (frame_data_length & 0x0F); + size_t message_length = frame_data_length + SEND_CONTROL_AND_TIMESTAMP_BYTES + frame_id_byte_length; + unsigned char control_byte = CONTROL_BYTE_TIMESTAMP_2_BYTE_VALUE; + control_byte |= (is_extended ? CONTROL_BYTE_EXTENDED_ID_FLAG : 0); + control_byte |= (frame_data_length & CONTROL_BYTE_CAN_DATA_LENGTH_MASK); // initialize the full message with the header, control bytes, timestamp bytes std::vector full_message; full_message.insert(full_message.end(), AXIOMATIC_CAN_MESSAGE_HEADER.begin(), AXIOMATIC_CAN_MESSAGE_HEADER.end()); - full_message.push_back(0x00); - full_message.push_back(0x00); + full_message.push_back(SEND_RESERVED_BYTE_0); + full_message.push_back(SEND_RESERVED_BYTE_1); full_message.push_back(static_cast(message_length & 0xFF)); - full_message.push_back(static_cast((message_length >> 8) & 0xFF)); + full_message.push_back(static_cast((message_length >> BITS_PER_BYTE) & 0xFF)); full_message.push_back(control_byte); - full_message.push_back(192); - full_message.push_back(70); + full_message.push_back(SEND_TIME_STAMP_BYTE_0); + full_message.push_back(SEND_TIME_STAMP_BYTE_1); // insert the can frame id auto can_id = frame.get_id(); for (size_t i = 0; i < frame_id_byte_length; ++i) { - full_message.push_back(static_cast((can_id >> (i * 8)) & 0xFF)); + full_message.push_back(static_cast((can_id >> (i * BITS_PER_BYTE)) & 0xFF)); } // insert the can frame data full_message.insert(full_message.end(), frame_data.begin(), frame_data.end()); @@ -349,7 +433,80 @@ class AxiomaticAdapter::AxiomaticAdapterImpl static constexpr std::array AXIOMATIC_CAN_MESSAGE_HEADER = {'A', 'X', 'I', 'O', 0xBA, 0x36, 0x01}; static constexpr std::chrono::milliseconds TCP_IP_CONNECTION_TIMEOUT_MS{3000}; - /// @brief Socket connection state as a struct for the mutex during TCP Open Socket to update the variables together + // receive buffer size for each async_receive call. Larger than the + // protocol's per-message cap (256 bytes) by a wide margin + static constexpr size_t RECEIVE_BUFFER_SIZE = 65536; + static constexpr int BITS_PER_BYTE = 8; + + // axiomatic Protocol Message Header layout (per spec v6, Table 1): + // bytes 0-3 : Axiomatic Tag "AXIO" + // bytes 4-5 : Protocol ID (0x36BA, LSB first → 0xBA 0x36 on wire) + // bytes 6-7 : Message ID (LSB first) + // byte 8 : Message Version + // bytes 9-10 : Message Data Length (LSB first) + // total header = 11 bytes; Message Data follows immediately after. + static constexpr size_t PROTOCOL_HEADER_BYTES = 11; + + // the first 6 bytes of every Axiomatic header are constant ("AXIO" + + // protocol ID). Useful when we only want to validate "this is some + // axiomatic protocol message" without dictating Message ID. + static constexpr size_t PROTOCOL_SYNC_PREFIX_BYTES = 6; + + // byte offset of the Message ID field (LSB) within a protocol header. + // message ID is a 2-byte little-endian value at offsets 6 and 7. + static constexpr size_t MESSAGE_ID_HEADER_OFFSET = 6; + + // byte offset of the Message Data Length field (LSB) within a protocol + // header. 2-byte little-endian value at offsets 9 and 10. Names the body + // length, not the total message length. + static constexpr size_t MESSAGE_DATA_LENGTH_HEADER_OFFSET = 9; + + // message IDs defined by the Axiomatic Communication Protocol (spec v6, table 3). + enum class MessageId : uint16_t + { + Undefined = 0, + CanStream = 1, + StatusRequest = 2, + StatusResponse = 3, + Heartbeat = 4, + CanFdStream = 5, + }; + + // CAN ID field width inside a CAN Stream message body + static constexpr size_t STANDARD_CAN_ID_BYTES = 2; + static constexpr size_t EXTENDED_CAN_ID_BYTES = 4; + + // the Control Byte is a single byte at the start of every CAN/Notification + // frame in a CAN Stream message body + static constexpr size_t CONTROL_BYTE_BYTES = 1; + + // control byte (CB) field layout — first byte of every CAN/Notification + // frame inside a CAN Stream message body. Per Axiomatic Communication + // protocol spec v6, Section "Control Byte": + // bit 7 : C_Bit — 0 = CAN Frame, 1 = Notification Frame + // bits 6:5 : TS_Bit — Time Stamp length code (see TIMESTAMP_LENGTH_BYTES_TABLE) + // bit 4 : EID_Bit — 0 = standard 11-bit ID, 1 = extended 29-bit ID + // bits 3:0 : L_Bit — CAN Data Length (DLC), 0..8 valid + static constexpr uint8_t CONTROL_BYTE_NOTIFICATION_FRAME_FLAG = 0x80; + static constexpr uint8_t CONTROL_BYTE_TIMESTAMP_LENGTH_MASK = 0x60; + static constexpr int CONTROL_BYTE_TIMESTAMP_LENGTH_SHIFT = 5; + static constexpr uint8_t CONTROL_BYTE_EXTENDED_ID_FLAG = 0x10; + static constexpr uint8_t CONTROL_BYTE_CAN_DATA_LENGTH_MASK = 0x0F; + + // TS_Bit code → number of timestamp bytes that follow CB. Per Table 4 in + // the spec: 00→0 bytes, 01→1, 10→2, 11→4. Note this mapping is non-linear + // at index 3 (which is why a lookup table is needed instead of using the + // raw 2-bit value as the byte count directly). + static constexpr size_t TIMESTAMP_LENGTH_BYTES_TABLE[4] = {0, 1, 2, 4}; + + // TS_Bit code that send() writes into the Control Byte + static constexpr uint8_t CONTROL_BYTE_TIMESTAMP_2_BYTE_VALUE = static_cast(2) + << CONTROL_BYTE_TIMESTAMP_LENGTH_SHIFT; + + // notification Frame fixed size: 1-byte NIDB + 4-byte NDB1..NDB4. + static constexpr size_t NOTIFICATION_FRAME_TOTAL_BYTES = 5; + + /// @brief socket connection state as a struct for the mutex during TCP Open Socket to update the variables together struct TCPSocketConnectionState { bool connected{false}; @@ -364,12 +521,22 @@ class AxiomaticAdapter::AxiomaticAdapterImpl std::atomic thread_running_; std::atomic stop_thread_requested_; + // CAN frames decoded from a packed CAN Stream message but not yet delivered + // through receive(). Drained one at a time, ahead of the next TCP read. + std::deque pending_frames_; + // from construction std::string ip_address_; std::string port_; std::function frame)> receive_callback_; std::function error_callback_; std::chrono::milliseconds receive_timeout_ms_; + + // receive buffer for async_receive — allocated once at construction and reused across every receive() call + std::vector rx_buffer_; + + // when true, disable Nagle's algorithm on the TCP socket after connect + bool tcp_nodelay_; }; AxiomaticAdapter::AxiomaticAdapter( @@ -377,9 +544,15 @@ AxiomaticAdapter::AxiomaticAdapter( const std::string & port, const std::function frame)> && receive_callback_function, const std::function && error_callback_function, - const std::chrono::milliseconds & receive_timeout_ms) + const std::chrono::milliseconds & receive_timeout_ms, + bool tcp_nodelay) : pimpl_(std::make_unique( - ip_address, port, std::move(receive_callback_function), std::move(error_callback_function), receive_timeout_ms)) + ip_address, + port, + std::move(receive_callback_function), + std::move(error_callback_function), + receive_timeout_ms, + tcp_nodelay)) {} AxiomaticAdapter::~AxiomaticAdapter() diff --git a/axiomatic_adapter/src/axiomatic_socketcan_bridge.cpp b/axiomatic_adapter/src/axiomatic_socketcan_bridge.cpp index 79fdb37..01630e5 100644 --- a/axiomatic_adapter/src/axiomatic_socketcan_bridge.cpp +++ b/axiomatic_adapter/src/axiomatic_socketcan_bridge.cpp @@ -23,9 +23,19 @@ namespace can { AxiomaticSocketcanBridge::AxiomaticSocketcanBridge( - const std::string & can_interface_name, const std::string & ip, const std::string & port, bool verbose) + const std::string & can_interface_name, + const std::string & ip, + const std::string & port, + bool verbose, + bool tcp_nodelay) : socketcan_adapter_(can_interface_name) -, axiomatic_adapter_(ip, port, std::bind(&AxiomaticSocketcanBridge::ethcanReceiveCallback, this, std::placeholders::_1)) +, axiomatic_adapter_( + ip, + port, + std::bind(&AxiomaticSocketcanBridge::ethcanReceiveCallback, this, std::placeholders::_1), + [](AxiomaticAdapter::socket_error_string_t /*error*/) { /* no-op */ }, + AxiomaticAdapter::DEFAULT_SOCKET_RECEIVE_TIMEOUT_MS, + tcp_nodelay) , verbose_(verbose) { socketcan_adapter_.setOnReceiveCallback( diff --git a/axiomatic_adapter/src/axiomatic_socketcan_bridge_node.cpp b/axiomatic_adapter/src/axiomatic_socketcan_bridge_node.cpp index b2536f4..80833e7 100644 --- a/axiomatic_adapter/src/axiomatic_socketcan_bridge_node.cpp +++ b/axiomatic_adapter/src/axiomatic_socketcan_bridge_node.cpp @@ -48,7 +48,8 @@ void configureArguments( std::string & port, bool & verbose, bool & retry_connection, - int & max_retry_attempts) + int & max_retry_attempts, + bool & tcp_nodelay) { app.add_option("can_interface", can_interface, "CAN interface to use (default: vcan0)")->default_val("vcan0"); app.add_option("ip", ip, "IP address of the bridge (default: 192.168.0.34)")->default_val("192.168.0.34"); @@ -58,6 +59,10 @@ void configureArguments( app .add_option("--max-retry-attempts", max_retry_attempts, "Maximum number of retry attempts (default: -1 = infinite)") ->default_val(-1); + app.add_flag( + "!--no-tcp-nodelay", + tcp_nodelay, + "Disable TCP_NODELAY (re-enable Nagle's algorithm). Default is on; see the AxiomaticAdapter docs for tradeoffs."); } int main(int argc, char * argv[]) @@ -65,11 +70,12 @@ int main(int argc, char * argv[]) std::signal(SIGINT, signalHandler); std::signal(SIGTERM, signalHandler); std::string can_interface, ip, port; + bool tcp_nodelay = true; CLI::App app{"Axiomatic SocketCAN Bridge"}; - configureArguments(app, can_interface, ip, port, verbose, retry_connection, max_retry_attempts); + configureArguments(app, can_interface, ip, port, verbose, retry_connection, max_retry_attempts, tcp_nodelay); CLI11_PARSE(app, argc, argv); - polymath::can::AxiomaticSocketcanBridge bridge(can_interface, ip, port, verbose); + polymath::can::AxiomaticSocketcanBridge bridge(can_interface, ip, port, verbose, tcp_nodelay); std::cout << "Axiomatic Socketcan Bridge configuring..." << std::endl;