diff --git a/bumble/controller.py b/bumble/controller.py index fcb35de5..2106c13f 100644 --- a/bumble/controller.py +++ b/bumble/controller.py @@ -241,7 +241,7 @@ class Controller: lmp_features: bytes = bytes.fromhex( '0000000060000000' ) # BR/EDR Not Supported, LE Supported (Controller) - manufacturer_name: int = 0xFFFF + manufacturer_company_identifier: int = 0xFFFF acl_data_packet_length: int = 27 total_num_acl_data_packets: int = 64 le_acl_data_packet_length: int = 27 @@ -416,15 +416,20 @@ def on_hci_packet(self, packet: hci.HCI_Packet) -> None: def on_hci_command_packet(self, command: hci.HCI_Command) -> None: handler_name = f'on_{command.name.lower()}' handler = getattr(self, handler_name, self.on_hci_command) - result: bytes | None = handler(command) - if isinstance(result, bytes): + result: hci.HCI_ReturnParameters | None = handler(command) + if isinstance(command, hci.HCI_SyncCommand): + if result is None: + logger.error("Sync command handlers should return parameters, got None") + return self.send_hci_packet( hci.HCI_Command_Complete_Event( num_hci_command_packets=1, command_opcode=command.op_code, - return_parameters=hci.HCI_GenericReturnParameters(data=result), + return_parameters=result, ) ) + elif result is not None: + logger.error("Async command handlers should return None, got %s", result) def on_hci_event_packet(self, _event: hci.HCI_Packet) -> None: logger.warning('!!! unexpected event packet') @@ -449,6 +454,13 @@ def send_hci_packet(self, packet: hci.HCI_Packet) -> None: if self.host: asyncio.get_running_loop().call_soon(self.host.on_packet, bytes(packet)) + def _send_hci_command_status(self, status: int, op_code: int) -> None: + self.send_hci_packet( + hci.HCI_Command_Status_Event( + status=status, num_hci_command_packets=1, command_opcode=op_code + ) + ) + # This method allows the controller to emulate the same API as a transport source async def wait_for_termination(self) -> None: await self.terminated @@ -586,7 +598,7 @@ def on_le_connect_ind(self, packet: ll.ConnectInd) -> None: # Then say that the connection has completed self.send_hci_packet( hci.HCI_LE_Connection_Complete_Event( - status=hci.HCI_SUCCESS, + status=hci.HCI_ErrorCode.SUCCESS, connection_handle=connection.handle, role=connection.role, peer_address_type=peer_address.address_type, @@ -601,7 +613,7 @@ def on_le_connect_ind(self, packet: ll.ConnectInd) -> None: if isinstance(advertiser, AdvertisingSet): self.send_hci_packet( hci.HCI_LE_Advertising_Set_Terminated_Event( - status=hci.HCI_SUCCESS, + status=hci.HCI_ErrorCode.SUCCESS, advertising_handle=advertiser.handle, connection_handle=connection.handle, num_completed_extended_advertising_events=0, @@ -613,7 +625,7 @@ def on_le_disconnected(self, connection: Connection, reason: int) -> None: # Send a disconnection complete event self.send_hci_packet( hci.HCI_Disconnection_Complete_Event( - status=hci.HCI_SUCCESS, + status=hci.HCI_ErrorCode.SUCCESS, connection_handle=connection.handle, reason=reason, ) @@ -678,7 +690,7 @@ def create_le_connection(self, peer_address: hci.Address) -> None: self.send_hci_packet( # pylint: disable=line-too-long hci.HCI_LE_Connection_Complete_Event( - status=hci.HCI_SUCCESS, + status=hci.HCI_ErrorCode.SUCCESS, connection_handle=connection.handle if connection else 0, role=hci.Role.CENTRAL, peer_address_type=peer_address.address_type, @@ -825,7 +837,7 @@ def on_le_cis_established(self, cig_id: int, cis_id: int) -> None: self.send_hci_packet( hci.HCI_LE_CIS_Established_Event( - status=hci.HCI_SUCCESS, + status=hci.HCI_ErrorCode.SUCCESS, connection_handle=cis_link.handle, # CIS parameters are ignored. cig_sync_delay=0, @@ -875,9 +887,9 @@ def on_le_cis_disconnected(self, cig_id: int, cis_id: int) -> None: self.send_hci_packet( hci.HCI_Disconnection_Complete_Event( - status=hci.HCI_SUCCESS, + status=hci.HCI_ErrorCode.SUCCESS, connection_handle=cis_link.handle, - reason=hci.HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, + reason=hci.HCI_ErrorCode.REMOTE_USER_TERMINATED_CONNECTION_ERROR, ) ) @@ -896,13 +908,13 @@ def send_lmp_packet( ] = loop.create_future() return future - def on_lmp_packet(self, sender_address: hci.Address, packet: lmp.Packet): + def on_lmp_packet(self, sender_address: hci.Address, packet: lmp.Packet) -> None: match packet: case lmp.LmpAccepted() | lmp.LmpAcceptedExt(): if future := self.classic_pending_commands.setdefault( sender_address, {} ).get(packet.response_opcode): - future.set_result(hci.HCI_SUCCESS) + future.set_result(hci.HCI_ErrorCode.SUCCESS) else: logger.error("!!! Unhandled packet: %s", packet) case lmp.LmpNotAccepted() | lmp.LmpNotAcceptedExt(): @@ -926,7 +938,8 @@ def on_lmp_packet(self, sender_address: hci.Address, packet: lmp.Packet): ) case lmp.LmpDetach(): self.on_classic_disconnected( - sender_address, hci.HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR + sender_address, + hci.HCI_ErrorCode.REMOTE_USER_TERMINATED_CONNECTION_ERROR, ) case lmp.LmpSwitchReq(): self.on_classic_role_change_request(sender_address) @@ -976,7 +989,7 @@ def on_classic_connection_request( def on_classic_connection_complete( self, peer_address: hci.Address, status: int ) -> None: - if status == hci.HCI_SUCCESS: + if status == hci.HCI_ErrorCode.SUCCESS: # Allocate (or reuse) a connection handle peer_address = peer_address connection_handle = self.allocate_connection_handle() @@ -1023,7 +1036,7 @@ def on_classic_disconnected(self, peer_address: hci.Address, reason: int) -> Non if connection := self.classic_connections.pop(peer_address, None): self.send_hci_packet( hci.HCI_Disconnection_Complete_Event( - status=hci.HCI_SUCCESS, + status=hci.HCI_ErrorCode.SUCCESS, connection_handle=connection.handle, reason=reason, ) @@ -1038,7 +1051,7 @@ def on_classic_sco_disconnected( if sco_link := self.sco_links.pop(peer_address, None): self.send_hci_packet( hci.HCI_Disconnection_Complete_Event( - status=hci.HCI_SUCCESS, + status=hci.HCI_ErrorCode.SUCCESS, connection_handle=sco_link.handle, reason=reason, ) @@ -1052,7 +1065,8 @@ def on_classic_role_change_request(self, peer_address: hci.Address) -> None: self.send_lmp_packet( peer_address, lmp.LmpNotAccepted( - lmp.Opcode.LMP_SWITCH_REQ, hci.HCI_ROLE_CHANGE_NOT_ALLOWED_ERROR + lmp.Opcode.LMP_SWITCH_REQ, + hci.HCI_ErrorCode.ROLE_CHANGE_NOT_ALLOWED_ERROR, ), ) else: @@ -1071,7 +1085,7 @@ def classic_role_change(self, connection: Connection) -> None: connection.role = new_role self.send_hci_packet( hci.HCI_Role_Change_Event( - status=hci.HCI_SUCCESS, + status=hci.HCI_ErrorCode.SUCCESS, bd_addr=connection.peer_address, new_role=new_role, ) @@ -1080,7 +1094,7 @@ def classic_role_change(self, connection: Connection) -> None: def on_classic_sco_connection_complete( self, peer_address: hci.Address, status: int, link_type: int ) -> None: - if status == hci.HCI_SUCCESS: + if status == hci.HCI_ErrorCode.SUCCESS: # Allocate (or reuse) a connection handle connection_handle = self.allocate_connection_handle() sco_link = ScoLink( @@ -1110,7 +1124,7 @@ def on_classic_sco_connection_complete( def on_classic_remote_name_request( self, peer_address: hci.Address, name_offset: int - ): + ) -> None: self.send_lmp_packet( peer_address, lmp.LmpNameRes( @@ -1126,10 +1140,10 @@ def on_classic_remote_name_response( name_offset: int, name_length: int, name_fregment: bytes, - ): + ) -> None: self.send_hci_packet( hci.HCI_Remote_Name_Request_Complete_Event( - status=hci.HCI_SUCCESS, + status=hci.HCI_ErrorCode.SUCCESS, bd_addr=peer_address, remote_name=name_fregment, ) @@ -1148,13 +1162,17 @@ def is_advertising(self) -> bool: ############################################################ # HCI handlers ############################################################ - def on_hci_command(self, command: hci.HCI_Command) -> bytes | None: + def on_hci_command( + self, command: hci.HCI_Command + ) -> hci.HCI_StatusReturnParameters: logger.warning(color(f'--- Unsupported command {command}', 'red')) - return bytes([hci.HCI_UNKNOWN_HCI_COMMAND_ERROR]) + return hci.HCI_StatusReturnParameters( + hci.HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR + ) def on_hci_create_connection_command( self, command: hci.HCI_Create_Connection_Command - ) -> bytes | None: + ) -> None: ''' See Bluetooth spec Vol 4, Part E - 7.1.5 Create Connection command ''' @@ -1165,12 +1183,8 @@ def on_hci_create_connection_command( # Check that we don't already have a pending connection if self.pending_le_connection: - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_CONTROLLER_BUSY_ERROR, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) + self._send_hci_command_status( + hci.HCI_ErrorCode.CONTROLLER_BUSY_ERROR, command.op_code ) return None @@ -1187,35 +1201,21 @@ def on_hci_create_connection_command( ) # Say that the connection is pending - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_COMMAND_STATUS_PENDING, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) - ) + self._send_hci_command_status(hci.HCI_COMMAND_STATUS_PENDING, command.op_code) future = self.send_lmp_packet(command.bd_addr, lmp.LmpHostConnectionReq()) - def on_response(future: asyncio.Future[int]): + def on_response(future: asyncio.Future[int]) -> None: self.on_classic_connection_complete(command.bd_addr, future.result()) future.add_done_callback(on_response) return None - def on_hci_disconnect_command( - self, command: hci.HCI_Disconnect_Command - ) -> bytes | None: + def on_hci_disconnect_command(self, command: hci.HCI_Disconnect_Command) -> None: ''' See Bluetooth spec Vol 4, Part E - 7.1.6 Disconnect Command ''' # First, say that the disconnection is pending - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_COMMAND_STATUS_PENDING, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) - ) + self._send_hci_command_status(hci.HCI_COMMAND_STATUS_PENDING, command.op_code) # Notify the link of the disconnection handle = command.connection_handle @@ -1275,7 +1275,7 @@ def on_hci_disconnect_command( def on_hci_accept_connection_request_command( self, command: hci.HCI_Accept_Connection_Request_Command - ) -> bytes | None: + ) -> None: ''' See Bluetooth spec Vol 4, Part E - 7.1.8 Accept Connection Request command ''' @@ -1284,28 +1284,18 @@ def on_hci_accept_connection_request_command( return None if not (connection := self.classic_connections.get(command.bd_addr)): - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) + self._send_hci_command_status( + hci.HCI_ErrorCode.UNKNOWN_CONNECTION_IDENTIFIER_ERROR, command.op_code ) return None - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_SUCCESS, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) - ) + self._send_hci_command_status(hci.HCI_ErrorCode.SUCCESS, command.op_code) if command.role == hci.Role.CENTRAL: # Perform role switching before accept. future = self.send_lmp_packet(command.bd_addr, lmp.LmpSwitchReq()) - def on_response(future: asyncio.Future[int]): - if (status := future.result()) == hci.HCI_SUCCESS: + def on_response(future: asyncio.Future[int]) -> None: + if (status := future.result()) == hci.HCI_ErrorCode.SUCCESS: self.classic_role_change(connection) # Continue connection setup. self.send_lmp_packet( @@ -1328,22 +1318,18 @@ def on_response(future: asyncio.Future[int]): command.bd_addr, lmp.LmpAccepted(lmp.Opcode.LMP_HOST_CONNECTION_REQ), ) - self.on_classic_connection_complete(command.bd_addr, hci.HCI_SUCCESS) + self.on_classic_connection_complete( + command.bd_addr, hci.HCI_ErrorCode.SUCCESS + ) return None def on_hci_remote_name_request_command( self, command: hci.HCI_Remote_Name_Request_Command - ) -> bytes | None: + ) -> None: ''' See Bluetooth spec Vol 4, Part E - 7.1.19 Remote Name Request command ''' - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_SUCCESS, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) - ) + self._send_hci_command_status(hci.HCI_ErrorCode.SUCCESS, command.op_code) self.send_lmp_packet(command.bd_addr, lmp.LmpNameReq(0)) @@ -1351,7 +1337,7 @@ def on_hci_remote_name_request_command( def on_hci_enhanced_setup_synchronous_connection_command( self, command: hci.HCI_Enhanced_Setup_Synchronous_Connection_Command - ) -> bytes | None: + ) -> None: ''' See Bluetooth spec Vol 4, Part E - 7.1.45 Enhanced Setup Synchronous Connection command ''' @@ -1362,22 +1348,12 @@ def on_hci_enhanced_setup_synchronous_connection_command( if not ( connection := self.find_connection_by_handle(command.connection_handle) ): - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) + self._send_hci_command_status( + hci.HCI_ErrorCode.UNKNOWN_CONNECTION_IDENTIFIER_ERROR, command.op_code ) return None - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_SUCCESS, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) - ) + self._send_hci_command_status(hci.HCI_ErrorCode.SUCCESS, command.op_code) future = self.send_lmp_packet( connection.peer_address, lmp.LmpEscoLinkReq( @@ -1396,7 +1372,7 @@ def on_hci_enhanced_setup_synchronous_connection_command( ), ) - def on_response(future: asyncio.Future[int]): + def on_response(future: asyncio.Future[int]) -> None: self.on_classic_sco_connection_complete( connection.peer_address, future.result(), @@ -1408,7 +1384,7 @@ def on_response(future: asyncio.Future[int]): def on_hci_enhanced_accept_synchronous_connection_request_command( self, command: hci.HCI_Enhanced_Accept_Synchronous_Connection_Request_Command - ) -> bytes | None: + ) -> None: ''' See Bluetooth spec Vol 4, Part E - 7.1.46 Enhanced Accept Synchronous Connection Request command ''' @@ -1417,59 +1393,37 @@ def on_hci_enhanced_accept_synchronous_connection_request_command( return None if not (connection := self.classic_connections.get(command.bd_addr)): - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) + self._send_hci_command_status( + hci.HCI_ErrorCode.UNKNOWN_CONNECTION_IDENTIFIER_ERROR, command.op_code ) return None - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_SUCCESS, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) - ) + self._send_hci_command_status(hci.HCI_ErrorCode.SUCCESS, command.op_code) self.send_lmp_packet( connection.peer_address, lmp.LmpAcceptedExt(lmp.Opcode.LMP_ESCO_LINK_REQ), ) self.on_classic_sco_connection_complete( connection.peer_address, - hci.HCI_SUCCESS, + hci.HCI_ErrorCode.SUCCESS, hci.HCI_Connection_Complete_Event.LinkType.ESCO, ) return None - def on_hci_sniff_mode_command( - self, command: hci.HCI_Sniff_Mode_Command - ) -> bytes | None: + def on_hci_sniff_mode_command(self, command: hci.HCI_Sniff_Mode_Command) -> None: ''' See Bluetooth spec Vol 4, Part E - 7.2.2 Sniff Mode command ''' if self.link is None: - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) + self._send_hci_command_status( + hci.HCI_ErrorCode.UNKNOWN_CONNECTION_IDENTIFIER_ERROR, command.op_code ) return None - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_SUCCESS, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) - ) + self._send_hci_command_status(hci.HCI_ErrorCode.SUCCESS, command.op_code) self.send_hci_packet( hci.HCI_Mode_Change_Event( - status=hci.HCI_SUCCESS, + status=hci.HCI_ErrorCode.SUCCESS, connection_handle=command.connection_handle, current_mode=hci.HCI_Mode_Change_Event.Mode.SNIFF, interval=2, @@ -1479,31 +1433,21 @@ def on_hci_sniff_mode_command( def on_hci_exit_sniff_mode_command( self, command: hci.HCI_Exit_Sniff_Mode_Command - ) -> bytes | None: + ) -> None: ''' See Bluetooth spec Vol 4, Part E - 7.2.3 Exit Sniff Mode command ''' if self.link is None: - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) + self._send_hci_command_status( + hci.HCI_ErrorCode.UNKNOWN_CONNECTION_IDENTIFIER_ERROR, command.op_code ) return None - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_SUCCESS, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) - ) + self._send_hci_command_status(hci.HCI_ErrorCode.SUCCESS, command.op_code) self.send_hci_packet( hci.HCI_Mode_Change_Event( - status=hci.HCI_SUCCESS, + status=hci.HCI_ErrorCode.SUCCESS, connection_handle=command.connection_handle, current_mode=hci.HCI_Mode_Change_Event.Mode.ACTIVE, interval=2, @@ -1511,9 +1455,7 @@ def on_hci_exit_sniff_mode_command( ) return None - def on_hci_switch_role_command( - self, command: hci.HCI_Switch_Role_Command - ) -> bytes | None: + def on_hci_switch_role_command(self, command: hci.HCI_Switch_Role_Command) -> None: ''' See Bluetooth spec Vol 4, Part E - 7.2.8 Switch hci.Role command ''' @@ -1523,21 +1465,11 @@ def on_hci_switch_role_command( if connection := self.classic_connections.get(command.bd_addr): current_role = connection.role - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_SUCCESS, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) - ) + self._send_hci_command_status(hci.HCI_ErrorCode.SUCCESS, command.op_code) else: # Connection doesn't exist, reject. - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_COMMAND_DISALLOWED_ERROR, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) + self._send_hci_command_status( + hci.HCI_ErrorCode.COMMAND_DISALLOWED_ERROR, command.op_code ) return None @@ -1545,7 +1477,7 @@ def on_hci_switch_role_command( if current_role == command.role: self.send_hci_packet( hci.HCI_Role_Change_Event( - status=hci.HCI_SUCCESS, + status=hci.HCI_ErrorCode.SUCCESS, bd_addr=command.bd_addr, new_role=current_role, ) @@ -1553,8 +1485,8 @@ def on_hci_switch_role_command( else: future = self.send_lmp_packet(command.bd_addr, lmp.LmpSwitchReq()) - def on_response(future: asyncio.Future[int]): - if (status := future.result()) == hci.HCI_SUCCESS: + def on_response(future: asyncio.Future[int]) -> None: + if (status := future.result()) == hci.HCI_ErrorCode.SUCCESS: connection.role = hci.Role(command.role) self.send_hci_packet( hci.HCI_Role_Change_Event( @@ -1570,25 +1502,27 @@ def on_response(future: asyncio.Future[int]): def on_hci_set_event_mask_command( self, command: hci.HCI_Set_Event_Mask_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.3.1 Set Event Mask Command ''' self.event_mask = int.from_bytes( command.event_mask, byteorder='little', signed=False ) - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) - def on_hci_reset_command(self, _command: hci.HCI_Reset_Command) -> bytes | None: + def on_hci_reset_command( + self, _command: hci.HCI_Reset_Command + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.3.2 Reset Command ''' # TODO: cleanup what needs to be reset - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_write_local_name_command( self, command: hci.HCI_Write_Local_Name_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.3.11 Write Local Name Command ''' @@ -1601,11 +1535,11 @@ def on_hci_write_local_name_command( self.local_name = str(local_name, 'utf-8') except UnicodeDecodeError: pass - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_read_local_name_command( self, _command: hci.HCI_Read_Local_Name_Command - ) -> bytes | None: + ) -> hci.HCI_Read_Local_Name_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.3.12 Read Local Name Command ''' @@ -1613,27 +1547,31 @@ def on_hci_read_local_name_command( if len(local_name) < 248: local_name = local_name + bytes(248 - len(local_name)) - return bytes([hci.HCI_SUCCESS]) + local_name + return hci.HCI_Read_Local_Name_ReturnParameters( + hci.HCI_ErrorCode.SUCCESS, local_name=local_name + ) def on_hci_read_class_of_device_command( self, _command: hci.HCI_Read_Class_Of_Device_Command - ) -> bytes | None: + ) -> hci.HCI_Read_Class_Of_Device_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.3.25 Read Class of Device Command ''' - return bytes([hci.HCI_SUCCESS, 0, 0, 0]) + return hci.HCI_Read_Class_Of_Device_ReturnParameters( + status=hci.HCI_ErrorCode.SUCCESS, class_of_device=0 + ) def on_hci_write_class_of_device_command( self, _command: hci.HCI_Write_Class_Of_Device_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.3.26 Write Class of Device Command ''' - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_read_synchronous_flow_control_enable_command( self, _command: hci.HCI_Read_Synchronous_Flow_Control_Enable_Command - ) -> bytes | None: + ) -> hci.HCI_Read_Synchronous_Flow_Control_Enable_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.3.36 Read Synchronous Flow Control Enable Command @@ -1642,186 +1580,191 @@ def on_hci_read_synchronous_flow_control_enable_command( ret = 1 else: ret = 0 - return bytes([hci.HCI_SUCCESS, ret]) + return hci.HCI_Read_Synchronous_Flow_Control_Enable_ReturnParameters( + hci.HCI_ErrorCode.SUCCESS, ret + ) def on_hci_write_synchronous_flow_control_enable_command( self, command: hci.HCI_Write_Synchronous_Flow_Control_Enable_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.3.37 Write Synchronous Flow Control Enable Command ''' - ret = hci.HCI_SUCCESS + ret = hci.HCI_ErrorCode.SUCCESS if command.synchronous_flow_control_enable == 1: self.sync_flow_control = True elif command.synchronous_flow_control_enable == 0: self.sync_flow_control = False else: - ret = hci.HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR - return bytes([ret]) + ret = hci.HCI_ErrorCode.INVALID_COMMAND_PARAMETERS_ERROR + return hci.HCI_StatusReturnParameters(ret) def on_hci_set_controller_to_host_flow_control_command( self, _command: hci.HCI_Set_Controller_To_Host_Flow_Control_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.3.38 Set Controller To Host Flow Control Command ''' # For now we just accept the command but ignore the values. # TODO: respect the passed in values. - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_host_buffer_size_command( self, _command: hci.HCI_Host_Buffer_Size_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.3.39 Host Buffer Size Command ''' # For now we just accept the command but ignore the values. # TODO: respect the passed in values. - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_write_extended_inquiry_response_command( self, _command: hci.HCI_Write_Extended_Inquiry_Response_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.3.56 Write Extended Inquiry Response Command ''' - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_write_simple_pairing_mode_command( self, _command: hci.HCI_Write_Simple_Pairing_Mode_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.3.59 Write Simple Pairing Mode Command ''' - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_set_event_mask_page_2_command( self, command: hci.HCI_Set_Event_Mask_Page_2_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.3.69 Set Event Mask Page 2 Command ''' self.event_mask_page_2 = int.from_bytes( command.event_mask_page_2, byteorder='little', signed=False ) - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_read_le_host_support_command( self, _command: hci.HCI_Read_LE_Host_Support_Command - ) -> bytes | None: + ) -> hci.HCI_Read_LE_Host_Support_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.3.78 Write LE Host Support Command ''' - return bytes([hci.HCI_SUCCESS, 1, 0]) + return hci.HCI_Read_LE_Host_Support_ReturnParameters( + status=hci.HCI_ErrorCode.SUCCESS, le_supported_host=1, unused=0 + ) def on_hci_write_le_host_support_command( self, _command: hci.HCI_Write_LE_Host_Support_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.3.79 Write LE Host Support Command ''' # TODO / Just ignore for now - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_write_authenticated_payload_timeout_command( self, command: hci.HCI_Write_Authenticated_Payload_Timeout_Command - ) -> bytes | None: + ) -> hci.HCI_StatusAndConnectionHandleReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.3.94 Write Authenticated Payload Timeout Command ''' # TODO - return struct.pack(' bytes | None: + ) -> hci.HCI_Read_Local_Version_Information_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.4.1 Read Local Version Information Command ''' - return struct.pack( - ' bytes | None: + ) -> hci.HCI_Read_Local_Supported_Commands_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.4.2 Read Local Supported Commands Command ''' - return bytes([hci.HCI_SUCCESS]) + self.supported_commands + return hci.HCI_Read_Local_Supported_Commands_ReturnParameters( + hci.HCI_ErrorCode.SUCCESS, supported_commands=self.supported_commands + ) def on_hci_read_local_supported_features_command( self, _command: hci.HCI_Read_Local_Supported_Features_Command - ) -> bytes | None: + ) -> hci.HCI_Read_Local_Supported_Features_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.4.3 Read Local Supported Features Command ''' - return bytes([hci.HCI_SUCCESS]) + self.lmp_features[:8] + return hci.HCI_Read_Local_Supported_Features_ReturnParameters( + hci.HCI_ErrorCode.SUCCESS, lmp_features=self.lmp_features[:8] + ) def on_hci_read_local_extended_features_command( self, command: hci.HCI_Read_Local_Extended_Features_Command - ) -> bytes | None: + ) -> hci.HCI_Read_Local_Extended_Features_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.4.4 Read Local Extended Features Command ''' if command.page_number * 8 > len(self.lmp_features): - return bytes([hci.HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR]) - return ( - bytes( - [ - # Status - hci.HCI_SUCCESS, - # Page number - command.page_number, - # Max page number - len(self.lmp_features) // 8 - 1, - ] - ) - # Features of the current page - + self.lmp_features[command.page_number * 8 : (command.page_number + 1) * 8] + return hci.HCI_Read_Local_Extended_Features_ReturnParameters( + status=hci.HCI_ErrorCode.INVALID_COMMAND_PARAMETERS_ERROR, + page_number=command.page_number, + maximum_page_number=len(self.lmp_features) // 8 - 1, + extended_lmp_features=bytes(8), + ) + return hci.HCI_Read_Local_Extended_Features_ReturnParameters( + status=hci.HCI_ErrorCode.SUCCESS, + page_number=command.page_number, + maximum_page_number=len(self.lmp_features) // 8 - 1, + extended_lmp_features=self.lmp_features[ + command.page_number * 8 : (command.page_number + 1) * 8 + ], ) def on_hci_read_buffer_size_command( self, _command: hci.HCI_Read_Buffer_Size_Command - ) -> bytes | None: + ) -> hci.HCI_Read_Buffer_Size_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.4.5 Read Buffer Size Command ''' - return struct.pack( - ' bytes | None: + ) -> hci.HCI_Read_BD_ADDR_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.4.6 Read BD_ADDR Command ''' - bd_addr = ( - bytes(self._public_address) - if self._public_address is not None - else bytes(6) + return hci.HCI_Read_BD_ADDR_ReturnParameters( + status=hci.HCI_ErrorCode.SUCCESS, + bd_addr=self._public_address or hci.Address.ANY, ) - return bytes([hci.HCI_SUCCESS]) + bd_addr def on_hci_le_set_default_subrate_command( self, command: hci.HCI_LE_Set_Default_Subrate_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 6, Part E - 7.8.123 LE Set Event Mask Command ''' @@ -1831,13 +1774,15 @@ def on_hci_le_set_default_subrate_command( or command.subrate_max < command.subrate_min or command.continuation_number >= command.subrate_max ): - return bytes([hci.HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR]) + return hci.HCI_StatusReturnParameters( + hci.HCI_ErrorCode.INVALID_COMMAND_PARAMETERS_ERROR + ) - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_subrate_request_command( self, command: hci.HCI_LE_Subrate_Request_Command - ) -> bytes | None: + ) -> None: ''' See Bluetooth spec Vol 6, Part E - 7.8.124 LE Subrate Request command ''' @@ -1847,19 +1792,15 @@ def on_hci_le_subrate_request_command( or command.subrate_max < command.subrate_min or command.continuation_number >= command.subrate_max ): - return bytes([hci.HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR]) - - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_SUCCESS, - num_hci_command_packets=1, - command_opcode=command.op_code, + return self._send_hci_command_status( + hci.HCI_ErrorCode.INVALID_COMMAND_PARAMETERS_ERROR, command.op_code ) - ) + + self._send_hci_command_status(hci.HCI_ErrorCode.SUCCESS, command.op_code) self.send_hci_packet( hci.HCI_LE_Subrate_Change_Event( - status=hci.HCI_SUCCESS, + status=hci.HCI_ErrorCode.SUCCESS, connection_handle=command.connection_handle, subrate_factor=2, peripheral_latency=2, @@ -1871,77 +1812,78 @@ def on_hci_le_subrate_request_command( def on_hci_le_set_event_mask_command( self, command: hci.HCI_LE_Set_Event_Mask_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.1 LE Set Event Mask Command ''' self.le_event_mask = int.from_bytes( command.le_event_mask, byteorder='little', signed=False ) - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_read_buffer_size_command( self, _command: hci.HCI_LE_Read_Buffer_Size_Command - ) -> bytes | None: + ) -> hci.HCI_LE_Read_Buffer_Size_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.2 LE Read Buffer Size Command ''' - return struct.pack( - ' bytes | None: + ) -> hci.HCI_LE_Read_Buffer_Size_V2_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.2 LE Read Buffer Size Command ''' - return struct.pack( - ' bytes | None: + ) -> hci.HCI_LE_Read_Local_Supported_Features_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.3 LE Read Local Supported Features Command ''' - return bytes([hci.HCI_SUCCESS]) + self.le_features.value.to_bytes(8, 'little') + return hci.HCI_LE_Read_Local_Supported_Features_ReturnParameters( + status=hci.HCI_ErrorCode.SUCCESS, + le_features=self.le_features.value.to_bytes(8, 'little'), + ) def on_hci_le_read_all_local_supported_features_command( self, _command: hci.HCI_LE_Read_All_Local_Supported_Features_Command - ) -> bytes | None: + ) -> hci.HCI_LE_Read_All_Local_Supported_Features_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.128 LE Read All Local Supported Features Command ''' - return ( - bytes([hci.HCI_SUCCESS]) - + bytes([0]) - + self.le_features.value.to_bytes(248, 'little') + return hci.HCI_LE_Read_All_Local_Supported_Features_ReturnParameters( + status=hci.HCI_ErrorCode.SUCCESS, + max_page=0, + le_features=self.le_features.value.to_bytes(248, 'little'), ) def on_hci_le_set_random_address_command( self, command: hci.HCI_LE_Set_Random_Address_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.4 LE Set Random hci.Address Command ''' self.random_address = command.random_address - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_set_advertising_parameters_command( self, command: hci.HCI_LE_Set_Advertising_Parameters_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.5 LE Set Advertising Parameters Command ''' @@ -1961,39 +1903,41 @@ def on_hci_le_set_advertising_parameters_command( self.le_legacy_advertiser.advertising_filter_policy = ( command.advertising_filter_policy ) - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_read_advertising_physical_channel_tx_power_command( self, _command: hci.HCI_LE_Read_Advertising_Physical_Channel_Tx_Power_Command - ) -> bytes | None: + ) -> hci.HCI_LE_Read_Advertising_Physical_Channel_Tx_Power_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.6 LE Read Advertising Physical Channel Tx Power Command ''' - return bytes([hci.HCI_SUCCESS, self.advertising_channel_tx_power]) + return hci.HCI_LE_Read_Advertising_Physical_Channel_Tx_Power_ReturnParameters( + status=hci.HCI_ErrorCode.SUCCESS, tx_power_level=0 + ) def on_hci_le_set_advertising_data_command( self, command: hci.HCI_LE_Set_Advertising_Data_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.7 LE Set Advertising Data Command ''' self.le_legacy_advertiser.advertising_data = command.advertising_data - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_set_scan_response_data_command( self, command: hci.HCI_LE_Set_Scan_Response_Data_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.8 LE Set Scan Response Data Command ''' self.le_legacy_advertiser.scan_response_data = command.scan_response_data - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_set_advertising_enable_command( self, command: hci.HCI_LE_Set_Advertising_Enable_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.9 LE Set Advertising Enable Command ''' @@ -2002,37 +1946,39 @@ def on_hci_le_set_advertising_enable_command( else: self.le_legacy_advertiser.stop() - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_set_scan_parameters_command( self, command: hci.HCI_LE_Set_Scan_Parameters_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.10 LE Set Scan Parameters Command ''' if self.le_scan_enable: - return bytes([hci.HCI_COMMAND_DISALLOWED_ERROR]) + return hci.HCI_StatusReturnParameters( + hci.HCI_ErrorCode.COMMAND_DISALLOWED_ERROR + ) self.le_scan_type = command.le_scan_type self.le_scan_interval = command.le_scan_interval self.le_scan_window = command.le_scan_window self.le_scan_own_address_type = hci.AddressType(command.own_address_type) self.le_scanning_filter_policy = command.scanning_filter_policy - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_set_scan_enable_command( self, command: hci.HCI_LE_Set_Scan_Enable_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.11 LE Set Scan Enable Command ''' self.le_scan_enable = bool(command.le_scan_enable) self.filter_duplicates = bool(command.filter_duplicates) - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_create_connection_command( self, command: hci.HCI_LE_Create_Connection_Command - ) -> bytes | None: + ) -> None: ''' See Bluetooth spec Vol 4, Part E - 7.8.12 LE Create Connection Command ''' @@ -2044,113 +1990,94 @@ def on_hci_le_create_connection_command( # Check that we don't already have a pending connection if self.pending_le_connection: - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_COMMAND_DISALLOWED_ERROR, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) + self._send_hci_command_status( + hci.HCI_ErrorCode.COMMAND_DISALLOWED_ERROR, command.op_code ) return None self.pending_le_connection = command # Say that the connection is pending - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_COMMAND_STATUS_PENDING, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) - ) + self._send_hci_command_status(hci.HCI_COMMAND_STATUS_PENDING, command.op_code) return None def on_hci_le_create_connection_cancel_command( self, _command: hci.HCI_LE_Create_Connection_Cancel_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.13 LE Create Connection Cancel Command ''' - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_extended_create_connection_command( self, command: hci.HCI_LE_Extended_Create_Connection_Command - ) -> bytes | None: + ) -> None: ''' See Bluetooth spec Vol 4, Part E - 7.8.66 LE Extended Create Connection Command ''' if not self.link: - return None + return # Check pending if self.pending_le_connection: - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_COMMAND_DISALLOWED_ERROR, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) + self._send_hci_command_status( + hci.HCI_ErrorCode.COMMAND_DISALLOWED_ERROR, command.op_code ) - return None + return self.pending_le_connection = command - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_COMMAND_STATUS_PENDING, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) - ) - return None + self._send_hci_command_status(hci.HCI_COMMAND_STATUS_PENDING, command.op_code) def on_hci_le_read_filter_accept_list_size_command( self, _command: hci.HCI_LE_Read_Filter_Accept_List_Size_Command - ) -> bytes | None: + ) -> hci.HCI_LE_Read_Filter_Accept_List_Size_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.14 LE Read Filter Accept List Size Command ''' - return bytes([hci.HCI_SUCCESS, self.filter_accept_list_size]) + return hci.HCI_LE_Read_Filter_Accept_List_Size_ReturnParameters( + hci.HCI_ErrorCode.SUCCESS, self.filter_accept_list_size + ) def on_hci_le_clear_filter_accept_list_command( self, _command: hci.HCI_LE_Clear_Filter_Accept_List_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.15 LE Clear Filter Accept List Command ''' - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_add_device_to_filter_accept_list_command( self, _command: hci.HCI_LE_Add_Device_To_Filter_Accept_List_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.16 LE Add Device To Filter Accept List Command ''' - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_remove_device_from_filter_accept_list_command( self, _command: hci.HCI_LE_Remove_Device_From_Filter_Accept_List_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.17 LE Remove Device From Filter Accept List Command ''' - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_write_scan_enable_command( self, command: hci.HCI_Write_Scan_Enable_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.3.18 Write Scan Enable Command ''' self.classic_scan_enable = command.scan_enable - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_read_remote_features_command( self, command: hci.HCI_LE_Read_Remote_Features_Command - ) -> bytes | None: + ) -> None: ''' See Bluetooth spec Vol 4, Part E - 7.8.21 LE Read Remote Features Command ''' @@ -2158,48 +2085,43 @@ def on_hci_le_read_remote_features_command( handle = command.connection_handle if not self.find_connection_by_handle(handle): - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) + self._send_hci_command_status( + hci.HCI_ErrorCode.INVALID_COMMAND_PARAMETERS_ERROR, command.op_code ) return None # First, say that the command is pending - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_COMMAND_STATUS_PENDING, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) - ) + self._send_hci_command_status(hci.HCI_COMMAND_STATUS_PENDING, command.op_code) # Then send the remote features self.send_hci_packet( hci.HCI_LE_Read_Remote_Features_Complete_Event( - status=hci.HCI_SUCCESS, + status=hci.HCI_ErrorCode.SUCCESS, connection_handle=handle, le_features=bytes.fromhex('dd40000000000000'), ) ) return None - def on_hci_le_rand_command(self, _command: hci.HCI_LE_Rand_Command) -> bytes | None: + def on_hci_le_rand_command( + self, _command: hci.HCI_LE_Rand_Command + ) -> hci.HCI_LE_Rand_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.23 LE Rand Command ''' - return bytes([hci.HCI_SUCCESS]) + struct.pack('Q', random.randint(0, 1 << 64)) + return hci.HCI_LE_Rand_ReturnParameters( + status=hci.HCI_ErrorCode.SUCCESS, + random_number=struct.pack('Q', random.randint(0, 1 << 64)), + ) def on_hci_le_enable_encryption_command( self, command: hci.HCI_LE_Enable_Encryption_Command - ) -> bytes | None: + ) -> None: ''' See Bluetooth spec Vol 4, Part E - 7.8.24 LE Enable Encryption Command ''' if not self.link: - return None + return # Check the parameters if ( @@ -2209,7 +2131,9 @@ def on_hci_le_enable_encryption_command( or connection.transport != PhysicalTransport.LE ): logger.warning('connection not found') - return bytes([hci.HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR]) + return self._send_hci_command_status( + hci.HCI_ErrorCode.INVALID_COMMAND_PARAMETERS_ERROR, command.op_code + ) connection.send_ll_control_pdu( ll.EncReq( @@ -2219,44 +2143,37 @@ def on_hci_le_enable_encryption_command( ), ) - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_COMMAND_STATUS_PENDING, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) - ) + self._send_hci_command_status(hci.HCI_COMMAND_STATUS_PENDING, command.op_code) # TODO: Handle authentication self.on_le_encrypted(connection) - return None - def on_hci_le_read_supported_states_command( self, _command: hci.HCI_LE_Read_Supported_States_Command - ) -> bytes | None: + ) -> hci.HCI_LE_Read_Supported_States_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.27 LE Read Supported States Command ''' - return bytes([hci.HCI_SUCCESS]) + self.le_states + return hci.HCI_LE_Read_Supported_States_ReturnParameters( + status=hci.HCI_ErrorCode.SUCCESS, le_states=self.le_states + ) def on_hci_le_read_suggested_default_data_length_command( self, _command: hci.HCI_LE_Read_Suggested_Default_Data_Length_Command - ) -> bytes | None: + ) -> hci.HCI_LE_Read_Suggested_Default_Data_Length_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.34 LE Read Suggested Default Data Length Command ''' - return struct.pack( - ' bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.35 LE Write Suggested Default Data Length Command @@ -2264,111 +2181,112 @@ def on_hci_le_write_suggested_default_data_length_command( self.suggested_max_tx_octets, self.suggested_max_tx_time = struct.unpack( ' bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.36 LE Read P-256 Public Key Command ''' # TODO create key and send hci.HCI_LE_Read_Local_P-256_Public_Key_Complete event - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_add_device_to_resolving_list_command( self, _command: hci.HCI_LE_Add_Device_To_Resolving_List_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.38 LE Add Device To Resolving List Command ''' - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_clear_resolving_list_command( self, _command: hci.HCI_LE_Clear_Resolving_List_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.40 LE Clear Resolving List Command ''' - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_read_resolving_list_size_command( self, _command: hci.HCI_LE_Read_Resolving_List_Size_Command - ) -> bytes | None: + ) -> hci.HCI_LE_Read_Resolving_List_Size_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.41 LE Read Resolving List Size Command ''' - return bytes([hci.HCI_SUCCESS, self.resolving_list_size]) + return hci.HCI_LE_Read_Resolving_List_Size_ReturnParameters( + status=hci.HCI_ErrorCode.SUCCESS, + resolving_list_size=self.resolving_list_size, + ) def on_hci_le_set_address_resolution_enable_command( self, command: hci.HCI_LE_Set_Address_Resolution_Enable_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.44 LE Set hci.Address Resolution Enable Command ''' - ret = hci.HCI_SUCCESS + ret = hci.HCI_ErrorCode.SUCCESS if command.address_resolution_enable == 1: self.le_address_resolution = True elif command.address_resolution_enable == 0: self.le_address_resolution = False else: - ret = hci.HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR - return bytes([ret]) + ret = hci.HCI_ErrorCode.INVALID_COMMAND_PARAMETERS_ERROR + return hci.HCI_StatusReturnParameters(ret) def on_hci_le_set_resolvable_private_address_timeout_command( self, command: hci.HCI_LE_Set_Resolvable_Private_Address_Timeout_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.45 LE Set Resolvable Private hci.Address Timeout Command ''' self.le_rpa_timeout = command.rpa_timeout - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_read_maximum_data_length_command( self, _command: hci.HCI_LE_Read_Maximum_Data_Length_Command - ) -> bytes | None: + ) -> hci.HCI_LE_Read_Maximum_Data_Length_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.46 LE Read Maximum Data Length Command ''' - return struct.pack( - ' bytes | None: + ) -> hci.HCI_LE_Read_PHY_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.47 LE Read PHY Command ''' - return struct.pack( - ' bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.48 LE Set Default PHY Command ''' self.default_phy['all_phys'] = command.all_phys self.default_phy['tx_phys'] = command.tx_phys self.default_phy['rx_phys'] = command.rx_phys - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_set_advertising_set_random_address_command( self, command: hci.HCI_LE_Set_Advertising_Set_Random_Address_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.52 LE Set Advertising Set Random hci.Address Command @@ -2379,11 +2297,11 @@ def on_hci_le_set_advertising_set_random_address_command( controller=self, handle=handle ) self.advertising_sets[handle].random_address = command.random_address - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_set_extended_advertising_parameters_command( self, command: hci.HCI_LE_Set_Extended_Advertising_Parameters_Command - ) -> bytes | None: + ) -> hci.HCI_LE_Set_Extended_Advertising_Parameters_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.53 LE Set Extended Advertising Parameters Command @@ -2395,18 +2313,22 @@ def on_hci_le_set_extended_advertising_parameters_command( ) self.advertising_sets[handle].parameters = command - return bytes([hci.HCI_SUCCESS, 0]) + return hci.HCI_LE_Set_Extended_Advertising_Parameters_ReturnParameters( + status=hci.HCI_ErrorCode.SUCCESS, selected_tx_power=0 + ) def on_hci_le_set_extended_advertising_data_command( self, command: hci.HCI_LE_Set_Extended_Advertising_Data_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.54 LE Set Extended Advertising Data Command ''' handle = command.advertising_handle if not (adv_set := self.advertising_sets.get(handle)): - return bytes([hci.HCI_UNKNOWN_ADVERTISING_IDENTIFIER_ERROR]) + return hci.HCI_StatusReturnParameters( + hci.HCI_ErrorCode.UNKNOWN_ADVERTISING_IDENTIFIER_ERROR + ) if command.operation in ( hci.HCI_LE_Set_Extended_Advertising_Data_Command.Operation.FIRST_FRAGMENT, @@ -2419,18 +2341,20 @@ def on_hci_le_set_extended_advertising_data_command( ): adv_set.data.extend(command.advertising_data) - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_set_extended_scan_response_data_command( self, command: hci.HCI_LE_Set_Extended_Scan_Response_Data_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.55 LE Set Extended Scan Response Data Command ''' handle = command.advertising_handle if not (adv_set := self.advertising_sets.get(handle)): - return bytes([hci.HCI_UNKNOWN_ADVERTISING_IDENTIFIER_ERROR]) + return hci.HCI_StatusReturnParameters( + hci.HCI_ErrorCode.UNKNOWN_ADVERTISING_IDENTIFIER_ERROR + ) if command.operation in ( hci.HCI_LE_Set_Extended_Advertising_Data_Command.Operation.FIRST_FRAGMENT, @@ -2443,11 +2367,11 @@ def on_hci_le_set_extended_scan_response_data_command( ): adv_set.scan_response_data.extend(command.scan_response_data) - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_set_extended_advertising_enable_command( self, command: hci.HCI_LE_Set_Extended_Advertising_Enable_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.56 LE Set Extended Advertising Enable Command @@ -2464,86 +2388,92 @@ def on_hci_le_set_extended_advertising_enable_command( for handle in command.advertising_handles: if advertising_set := self.advertising_sets.get(handle): advertising_set.stop() - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_remove_advertising_set_command( self, command: hci.HCI_LE_Remove_Advertising_Set_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.59 LE Remove Advertising Set Command ''' handle = command.advertising_handle if advertising_set := self.advertising_sets.pop(handle, None): advertising_set.stop() - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_clear_advertising_sets_command( self, _command: hci.HCI_LE_Clear_Advertising_Sets_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.60 LE Clear Advertising Sets Command ''' for advertising_set in self.advertising_sets.values(): advertising_set.stop() self.advertising_sets.clear() - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_read_maximum_advertising_data_length_command( self, _command: hci.HCI_LE_Read_Maximum_Advertising_Data_Length_Command - ) -> bytes | None: + ) -> hci.HCI_LE_Read_Maximum_Advertising_Data_Length_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.57 LE Read Maximum Advertising Data Length Command ''' - return struct.pack(' bytes | None: + ) -> hci.HCI_LE_Read_Number_Of_Supported_Advertising_Sets_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.58 LE Read Number of Supported Advertising Set Command ''' - return struct.pack(' bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.61 LE Set Periodic Advertising Parameters Command ''' - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_set_periodic_advertising_data_command( self, _command: hci.HCI_LE_Set_Periodic_Advertising_Data_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.62 LE Set Periodic Advertising Data Command ''' - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_set_periodic_advertising_enable_command( self, _command: hci.HCI_LE_Set_Periodic_Advertising_Enable_Command - ) -> bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.63 LE Set Periodic Advertising Enable Command ''' - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) def on_hci_le_read_transmit_power_command( self, _command: hci.HCI_LE_Read_Transmit_Power_Command - ) -> bytes | None: + ) -> hci.HCI_LE_Read_Transmit_Power_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.74 LE Read Transmit Power Command ''' - return struct.pack(' bytes | None: + ) -> hci.HCI_LE_Set_CIG_Parameters_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.97 LE Set CIG Parameter Command ''' @@ -2563,13 +2493,15 @@ def on_hci_le_set_cig_parameters_command( cig_id=command.cig_id, handle=handle, ) - return struct.pack( - ' bytes | None: + ) -> None: ''' See Bluetooth spec Vol 4, Part E - 7.8.99 LE Create CIS Command ''' @@ -2581,11 +2513,17 @@ def on_hci_le_create_cis_command( ): if not (connection := self.find_connection_by_handle(acl_handle)): logger.error(f'Cannot find connection with handle={acl_handle}') - return bytes([hci.HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR]) + self._send_hci_command_status( + hci.HCI_ErrorCode.INVALID_COMMAND_PARAMETERS_ERROR, command.op_code + ) + return if not (cis_link := self.central_cis_links.get(cis_handle)): logger.error(f'Cannot find CIS with handle={cis_handle}') - return bytes([hci.HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR]) + self._send_hci_command_status( + hci.HCI_ErrorCode.INVALID_COMMAND_PARAMETERS_ERROR, command.op_code + ) + return cis_link.acl_connection = connection @@ -2593,35 +2531,28 @@ def on_hci_le_create_cis_command( ll.CisReq(cig_id=cis_link.cig_id, cis_id=cis_link.cis_id) ) - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_COMMAND_STATUS_PENDING, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) - ) - return None + self._send_hci_command_status(hci.HCI_COMMAND_STATUS_PENDING, command.op_code) def on_hci_le_remove_cig_command( self, command: hci.HCI_LE_Remove_CIG_Command - ) -> bytes | None: + ) -> hci.HCI_LE_Remove_CIG_ReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.100 LE Remove CIG Command ''' - status = hci.HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR + status = hci.HCI_ErrorCode.INVALID_COMMAND_PARAMETERS_ERROR cis_links = list(self.central_cis_links.items()) for cis_handle, cis_link in cis_links: if cis_link.cig_id == command.cig_id: self.central_cis_links.pop(cis_handle) - status = hci.HCI_SUCCESS + status = hci.HCI_ErrorCode.SUCCESS - return struct.pack(' bytes | None: + ) -> None: ''' See Bluetooth spec Vol 4, Part E - 7.8.101 LE Accept CIS Request Command ''' @@ -2632,53 +2563,48 @@ def on_hci_le_accept_cis_request_command( pending_cis_link := self.peripheral_cis_links.get(command.connection_handle) ): logger.error(f'Cannot find CIS with handle={command.connection_handle}') - return bytes([hci.HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR]) + self._send_hci_command_status( + hci.HCI_ErrorCode.INVALID_COMMAND_PARAMETERS_ERROR, command.op_code + ) + return assert pending_cis_link.acl_connection pending_cis_link.acl_connection.send_ll_control_pdu( ll.CisRsp(cig_id=pending_cis_link.cig_id, cis_id=pending_cis_link.cis_id), ) - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_COMMAND_STATUS_PENDING, - num_hci_command_packets=1, - command_opcode=command.op_code, - ) - ) + self._send_hci_command_status(hci.HCI_COMMAND_STATUS_PENDING, command.op_code) return None def on_hci_le_setup_iso_data_path_command( self, command: hci.HCI_LE_Setup_ISO_Data_Path_Command - ) -> bytes | None: + ) -> hci.HCI_StatusAndConnectionHandleReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.109 LE Setup ISO Data Path Command ''' if not (iso_link := self.find_iso_link_by_handle(command.connection_handle)): - return struct.pack( - ' bytes | None: + ) -> hci.HCI_StatusAndConnectionHandleReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.110 LE Remove ISO Data Path Command ''' if not (iso_link := self.find_iso_link_by_handle(command.connection_handle)): - return struct.pack( - ' bytes | None: + ) -> hci.HCI_StatusReturnParameters: ''' See Bluetooth spec Vol 4, Part E - 7.8.115 LE Set Host Feature command ''' - return bytes([hci.HCI_SUCCESS]) + return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) diff --git a/bumble/hci.py b/bumble/hci.py index 06671adf..0484cf2e 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -4715,7 +4715,7 @@ class HCI_LE_Clear_Resolving_List_Command(HCI_SyncCommand[HCI_StatusReturnParame # ----------------------------------------------------------------------------- @dataclasses.dataclass class HCI_LE_Read_Resolving_List_Size_ReturnParameters(HCI_StatusReturnParameters): - resolving_list_size: bytes = field(metadata=metadata(1)) + resolving_list_size: int = field(metadata=metadata(1)) @HCI_SyncCommand.sync_command(HCI_LE_Read_Resolving_List_Size_ReturnParameters)