diff --git a/src/nethsec/conntrack/__init__.py b/src/nethsec/conntrack/__init__.py index b55018f7..9baabd68 100644 --- a/src/nethsec/conntrack/__init__.py +++ b/src/nethsec/conntrack/__init__.py @@ -13,33 +13,6 @@ from xml.etree import ElementTree from xml.etree.ElementTree import Element -def __parse_meta_connection_tag(meta: Element) -> dict: - """ - From a meta tag, extract the connection information. - - Args: - - meta: ElementTree.Element with the meta tag. - - Returns: - dictionary with the connection information. - """ - result = {'src': '', 'dest': '', 'protocol': '', 'packets': '0', 'bytes': '0'} - layer3 = meta.find('layer3') - result['src'] = layer3.find('src').text - result['dest'] = layer3.find('dst').text - layer4 = meta.find('layer4') - result['protocol'] = layer4.get('protoname') - # start port and end port might not be present for some protocols, like ICMP. - if layer4.find('sport') is not None: - result['start_port'] = layer4.find('sport').text - if layer4.find('dport') is not None: - result['end_port'] = layer4.find('dport').text - counters = meta.find('counters') - if counters is not None: - result['packets'] = counters.find('packets').text - result['bytes'] = counters.find('bytes').text - return result - def __parse_connection_info(flow: Element) -> dict: """ @@ -54,28 +27,30 @@ def __parse_connection_info(flow: Element) -> dict: result = {} # expand meta tags for child in flow.findall('meta'): - # parse the meta tag using __parse_meta_connection_tag function - if child.get('direction') == 'original' or child.get('direction') == 'reply': - connection_info = __parse_meta_connection_tag(child) - if child.get('direction') == 'original': - result['source'] = connection_info['src'] - result['destination'] = connection_info['dest'] - result['protocol'] = connection_info['protocol'] - if 'start_port' in connection_info: - result['source_port'] = connection_info['start_port'] - if 'end_port' in connection_info: - result['destination_port'] = connection_info['end_port'] + if child.get('direction') == 'original': + layer3 = child.find('layer3') + result['source'] = layer3.find('src').text + result['destination'] = layer3.find('dst').text + layer4 = child.find('layer4') + result['protocol'] = layer4.get('protoname') + if layer4.find('sport') is not None: + result['source_port'] = layer4.find('sport').text + if layer4.find('dport') is not None: + result['destination_port'] = layer4.find('dport').text + counters = child.find('counters') + if counters is not None: result['source_stats'] = { - 'packets': connection_info['packets'], - 'bytes': connection_info['bytes'] + 'packets': int(counters.find('packets').text), + 'bytes': int(counters.find('bytes').text) } - else: + if child.get('direction') == 'reply': + counters = child.find('counters') + if counters is not None: result['destination_stats'] = { - 'packets': connection_info['packets'], - 'bytes': connection_info['bytes'] + 'packets': int(counters.find('packets').text), + 'bytes': int(counters.find('bytes').text) } - # not easily parsable, just add the values - else: + if child.get('direction') == 'independent': result['id'] = child.find('id').text if child.find('timeout') is not None: result['timeout'] = child.find('timeout').text @@ -83,6 +58,9 @@ def __parse_connection_info(flow: Element) -> dict: result['unreplied'] = True if child.find('state') is not None: result['state'] = child.find('state').text + result['labels'] = [] + for label in child.findall('labels/label'): + result['labels'].append(label.text) return result @@ -94,13 +72,10 @@ def list_connections(): Returns: dict of applications and their connections. """ - result = subprocess.run(["conntrack", "-L", "-o", "xml"], capture_output=True, text=True) + result = subprocess.run(["conntrack", "-L", "-o", "labels,xml"], capture_output=True, text=True) root = ElementTree.fromstring(result.stdout) result = [] for flow in root.findall('flow'): - # download - # upload - # wan result.append(__parse_connection_info(flow)) return result diff --git a/tests/test_conntrack.py b/tests/test_conntrack.py index 58c5b106..e9138f19 100644 --- a/tests/test_conntrack.py +++ b/tests/test_conntrack.py @@ -261,47 +261,21 @@ """ -def test_parse_meta_tag(): - # get first meta tag in first flow - meta_tag = ElementTree.fromstring(conntrack_response).find('flow/meta') - result = conntrack.__parse_meta_connection_tag(meta_tag) - assert result['src'] == '192.168.122.234' - assert result['dest'] == '31.14.133.122' - assert result['protocol'] == 'udp' - assert result['start_port'] == '41692' - assert result['end_port'] == '123' - assert result['packets'] == '1' - assert result['bytes'] == '76' - - -def test_parse_meta_tag_without_ports(): - # get the meta tag with the ICMP protocol - meta_tag = ElementTree.fromstring(conntrack_response).findall('flow')[1][1] - result = conntrack.__parse_meta_connection_tag(meta_tag) - assert result['src'] == '192.168.122.155' - assert result['dest'] == '192.168.122.1' - assert result['protocol'] == 'icmp' - assert 'start_port' not in result - assert 'end_port' not in result - assert result['packets'] == '2' - assert result['bytes'] == '168' - - def test_connection_info(): # get the first flow flow = ElementTree.fromstring(conntrack_response).findall('flow')[0] result = conntrack.__parse_connection_info(flow) original = flow[0] reply = flow[1] - assert result['source'] == conntrack.__parse_meta_connection_tag(original)['src'] - assert result['destination'] == conntrack.__parse_meta_connection_tag(original)['dest'] - assert result['protocol'] == conntrack.__parse_meta_connection_tag(original)['protocol'] - assert result['source_port'] == conntrack.__parse_meta_connection_tag(original)['start_port'] - assert result['destination_port'] == conntrack.__parse_meta_connection_tag(original)['end_port'] - assert result['source_stats']['packets'] == conntrack.__parse_meta_connection_tag(original)['packets'] - assert result['source_stats']['bytes'] == conntrack.__parse_meta_connection_tag(original)['bytes'] - assert result['destination_stats']['packets'] == conntrack.__parse_meta_connection_tag(reply)['packets'] - assert result['destination_stats']['bytes'] == conntrack.__parse_meta_connection_tag(reply)['bytes'] + assert result['source'] == '192.168.122.234' + assert result['destination'] == '31.14.133.122' + assert result['protocol'] == 'udp' + assert result['source_port'] == '41692' + assert result['destination_port'] == '123' + assert result['source_stats']['packets'] == 1 + assert result['source_stats']['bytes'] == 76 + assert result['destination_stats']['packets'] == 1 + assert result['destination_stats']['bytes'] == 76 assert result['timeout'] == '47' assert result['id'] == '1905826093' flow = ElementTree.fromstring(conntrack_response).findall('flow')[1]