From 1c0e6d277a09594989fe1fb1ed530241322d5449 Mon Sep 17 00:00:00 2001 From: John Wilson Date: Thu, 14 May 2026 21:02:31 +0100 Subject: [PATCH] Add optional client_ip_address parameter to DoIPClient.get_entity function --- doipclient/client.py | 18 ++++++++++++------ tests/test_client.py | 13 +++++++++++++ 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/doipclient/client.py b/doipclient/client.py index 7f35c38..7507eea 100644 --- a/doipclient/client.py +++ b/doipclient/client.py @@ -211,13 +211,15 @@ def __exit__(self, type, value, traceback): @staticmethod def _create_udp_socket( - ipv6=False, udp_port=UDP_DISCOVERY, timeout=None, source_interface=None + ipv6=False, udp_port=UDP_DISCOVERY, timeout=None, source_interface=None, ip_address=None ): if ipv6: sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) - # IPv6 version always uses link-local scope multicast address (FF02 16 ::1) - sock.bind((LINK_LOCAL_MULTICAST_ADDRESS, udp_port)) + if ip_address is None: + # IPv6 version uses link-local scope multicast address (FF02 16 ::1) by default + ip_address = LINK_LOCAL_MULTICAST_ADDRESS + sock.bind((ip_address, udp_port)) if source_interface is None: # 0 is the "default multicast interface" which is unlikely to be correct, but it will do @@ -239,7 +241,9 @@ def _create_udp_socket( sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # IPv4, use INADDR_ANY to listen to all interfaces for broadcasts (not multicast) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - sock.bind(("", udp_port)) + if ip_address is None: + ip_address = "" + sock.bind((ip_address, udp_port)) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if timeout is not None: @@ -328,7 +332,7 @@ def await_vehicle_announcement( @classmethod def get_entity( - cls, ecu_ip_address="255.255.255.255", protocol_version=0x02, eid=None, vin=None + cls, ecu_ip_address="255.255.255.255", protocol_version=0x02, eid=None, vin=None, client_ip_address="" ): """Sends a VehicleIdentificationRequest and awaits a VehicleIdentificationResponse from the ECU, either with a specified VIN, EIN, or nothing. Equivalent to the request_vehicle_identification() method @@ -345,12 +349,14 @@ def get_entity( :type eid: bytes, optional :param vin: VIN of the Vehicle :type vin: str, optional + :param client_ip_address: IP address to send VehicleIdentificationRequest from + :type client_ip_address: str, optional :return: The vehicle identification response message :rtype: VehicleIdentificationResponse """ # UDP_TEST_EQUIPMENT_REQUEST is dynamically assigned using udp_port=0 - sock = cls._create_udp_socket(udp_port=0, timeout=A_DOIP_CTRL) + sock = cls._create_udp_socket(udp_port=0, timeout=A_DOIP_CTRL, ip_address=client_ip_address) if eid: message = VehicleIdentificationRequestWithEID(eid) diff --git a/tests/test_client.py b/tests/test_client.py index 7ae9c4c..579bdac 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -810,6 +810,19 @@ def test_get_entity_with_vin(mock_socket): assert result.vin_sync_status == 0x00 +def test_get_entity_with_client_ip_address(mock_socket): + mock_socket.rx_queue.append(vehicle_identification_response) + _, result = DoIPClient.get_entity(client_ip_address="192.168.1.1") + assert mock_socket.tx_queue[-1] == vehicle_identification_request + assert result.vin == "1" * 17 + assert result.logical_address == 0x1234 + assert result.eid == b"1" * 6 + assert result.gid == b"2" * 6 + assert result.further_action_required == 0x00 + assert result.vin_sync_status == 0x00 + assert mock_socket._bound_ip == "192.168.1.1" + + def test_request_diagnostic_power_mode(mock_socket): sut = DoIPClient(test_ip, test_logical_address) mock_socket.rx_queue.append(diagnostic_power_mode_response)