Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 24 additions & 49 deletions src/nethsec/conntrack/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,6 @@
from xml.etree import ElementTree
from xml.etree.ElementTree import Element

def __parse_meta_connection_tag(meta: Element) -> dict:
"""
From a meta tag, extract the connection information.

Args:
- meta: ElementTree.Element with the meta tag.

Returns:
dictionary with the connection information.
"""
result = {'src': '', 'dest': '', 'protocol': '', 'packets': '0', 'bytes': '0'}
layer3 = meta.find('layer3')
result['src'] = layer3.find('src').text
result['dest'] = layer3.find('dst').text
layer4 = meta.find('layer4')
result['protocol'] = layer4.get('protoname')
# start port and end port might not be present for some protocols, like ICMP.
if layer4.find('sport') is not None:
result['start_port'] = layer4.find('sport').text
if layer4.find('dport') is not None:
result['end_port'] = layer4.find('dport').text
counters = meta.find('counters')
if counters is not None:
result['packets'] = counters.find('packets').text
result['bytes'] = counters.find('bytes').text
return result


def __parse_connection_info(flow: Element) -> dict:
"""
Expand All @@ -54,35 +27,40 @@ def __parse_connection_info(flow: Element) -> dict:
result = {}
# expand meta tags
for child in flow.findall('meta'):
# parse the meta tag using __parse_meta_connection_tag function
if child.get('direction') == 'original' or child.get('direction') == 'reply':
connection_info = __parse_meta_connection_tag(child)
if child.get('direction') == 'original':
result['source'] = connection_info['src']
result['destination'] = connection_info['dest']
result['protocol'] = connection_info['protocol']
if 'start_port' in connection_info:
result['source_port'] = connection_info['start_port']
if 'end_port' in connection_info:
result['destination_port'] = connection_info['end_port']
if child.get('direction') == 'original':
layer3 = child.find('layer3')
result['source'] = layer3.find('src').text
result['destination'] = layer3.find('dst').text
layer4 = child.find('layer4')
result['protocol'] = layer4.get('protoname')
if layer4.find('sport') is not None:
result['source_port'] = layer4.find('sport').text
if layer4.find('dport') is not None:
result['destination_port'] = layer4.find('dport').text
counters = child.find('counters')
if counters is not None:
result['source_stats'] = {
'packets': connection_info['packets'],
'bytes': connection_info['bytes']
'packets': int(counters.find('packets').text),
'bytes': int(counters.find('bytes').text)
}
else:
if child.get('direction') == 'reply':
counters = child.find('counters')
if counters is not None:
result['destination_stats'] = {
'packets': connection_info['packets'],
'bytes': connection_info['bytes']
'packets': int(counters.find('packets').text),
'bytes': int(counters.find('bytes').text)
}
# not easily parsable, just add the values
else:
if child.get('direction') == 'independent':
result['id'] = child.find('id').text
if child.find('timeout') is not None:
result['timeout'] = child.find('timeout').text
if child.find('unreplied') is not None:
result['unreplied'] = True
if child.find('state') is not None:
result['state'] = child.find('state').text
result['labels'] = []
for label in child.findall('labels/label'):
result['labels'].append(label.text)

return result

Expand All @@ -94,13 +72,10 @@ def list_connections():
Returns:
dict of applications and their connections.
"""
result = subprocess.run(["conntrack", "-L", "-o", "xml"], capture_output=True, text=True)
result = subprocess.run(["conntrack", "-L", "-o", "labels,xml"], capture_output=True, text=True)
root = ElementTree.fromstring(result.stdout)
result = []
for flow in root.findall('flow'):
# download
# upload
# wan
result.append(__parse_connection_info(flow))

return result
Expand Down
44 changes: 9 additions & 35 deletions tests/test_conntrack.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,47 +261,21 @@
"""


def test_parse_meta_tag():
# get first meta tag in first flow
meta_tag = ElementTree.fromstring(conntrack_response).find('flow/meta')
result = conntrack.__parse_meta_connection_tag(meta_tag)
assert result['src'] == '192.168.122.234'
assert result['dest'] == '31.14.133.122'
assert result['protocol'] == 'udp'
assert result['start_port'] == '41692'
assert result['end_port'] == '123'
assert result['packets'] == '1'
assert result['bytes'] == '76'


def test_parse_meta_tag_without_ports():
# get the meta tag with the ICMP protocol
meta_tag = ElementTree.fromstring(conntrack_response).findall('flow')[1][1]
result = conntrack.__parse_meta_connection_tag(meta_tag)
assert result['src'] == '192.168.122.155'
assert result['dest'] == '192.168.122.1'
assert result['protocol'] == 'icmp'
assert 'start_port' not in result
assert 'end_port' not in result
assert result['packets'] == '2'
assert result['bytes'] == '168'


def test_connection_info():
# get the first flow
flow = ElementTree.fromstring(conntrack_response).findall('flow')[0]
result = conntrack.__parse_connection_info(flow)
original = flow[0]
reply = flow[1]
assert result['source'] == conntrack.__parse_meta_connection_tag(original)['src']
assert result['destination'] == conntrack.__parse_meta_connection_tag(original)['dest']
assert result['protocol'] == conntrack.__parse_meta_connection_tag(original)['protocol']
assert result['source_port'] == conntrack.__parse_meta_connection_tag(original)['start_port']
assert result['destination_port'] == conntrack.__parse_meta_connection_tag(original)['end_port']
assert result['source_stats']['packets'] == conntrack.__parse_meta_connection_tag(original)['packets']
assert result['source_stats']['bytes'] == conntrack.__parse_meta_connection_tag(original)['bytes']
assert result['destination_stats']['packets'] == conntrack.__parse_meta_connection_tag(reply)['packets']
assert result['destination_stats']['bytes'] == conntrack.__parse_meta_connection_tag(reply)['bytes']
assert result['source'] == '192.168.122.234'
assert result['destination'] == '31.14.133.122'
assert result['protocol'] == 'udp'
assert result['source_port'] == '41692'
assert result['destination_port'] == '123'
assert result['source_stats']['packets'] == 1
assert result['source_stats']['bytes'] == 76
assert result['destination_stats']['packets'] == 1
assert result['destination_stats']['bytes'] == 76
assert result['timeout'] == '47'
assert result['id'] == '1905826093'
flow = ElementTree.fromstring(conntrack_response).findall('flow')[1]
Expand Down