Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 183 additions & 89 deletions volatility3/framework/plugins/windows/strings.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,83 @@
# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
from __future__ import annotations

import logging
import re
from typing import Dict, Generator, List, Set, Tuple, Optional
from dataclasses import dataclass
from typing import Generator

from volatility3.framework import interfaces, renderers, exceptions, constants
from volatility3.framework import constants, exceptions, interfaces, renderers
from volatility3.framework.configuration import requirements
from volatility3.framework.layers import intel, resources, linear
from volatility3.framework.layers import intel, linear, resources
from volatility3.framework.renderers import format_hints
from volatility3.plugins.windows import pslist

vollog = logging.getLogger(__name__)


@dataclass
class MappingNode:
physical_addr_start: int
physical_addr_end: int
virtual_addr_start: int
virtual_addr_end: int
process_id: int | str
region: str


@dataclass
class MappingTree:
root: MappingNode | None = None
left: MappingTree | None = None
right: MappingTree | None = None

def add(self, node: MappingNode, depth: int = 0) -> None:
# Iteratively add to avoid recursion issues
if not isinstance(node, MappingNode):
raise TypeError
parent_node: MappingTree | None = self
while parent_node is not None:
if parent_node.root is None:
parent_node.root = node
parent_node = None
elif node.physical_addr_start < parent_node.root.physical_addr_start:
if parent_node.left is None:
parent_node.left = MappingTree(node)
parent_node = None
else:
parent_node = parent_node.left
else:
if parent_node.right is None:
parent_node.right = MappingTree(node)
parent_node = None
else:
parent_node = parent_node.right

def at(self, point):
if self.root:
if self.root.physical_addr_start <= point <= self.root.physical_addr_end:
yield self.root
if point < self.root.physical_addr_start and self.left:
yield from self.left.at(point)
elif self.right:
yield from self.right.at(point)


class Strings(interfaces.plugins.PluginInterface):
"""Reads output from the strings command and indicates which process(es) each string belongs to."""

_required_framework_version = (2, 0, 0)

# 2.0.0 - change signature of `generate_mapping`
_version = (2, 0, 0)
# 3.0.0 - Interval mapping
_version = (3, 0, 0)

strings_pattern = re.compile(rb"^(?:\W*)([0-9]+)(?:\W*)(\w[\w\W]+)\n?")

@classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
def get_requirements(cls) -> list[interfaces.configuration.RequirementInterface]:
return [
requirements.ModuleRequirement(
name="kernel",
Expand All @@ -46,21 +97,27 @@
name="strings_file", description="Strings file"
),
]
# TODO: Make URLRequirement that can accept a file address which the framework can open

def run(self):
return renderers.TreeGrid(
[("String", str), ("Physical Address", format_hints.Hex), ("Result", str)],
[
("String", str),
("Region", str),
("PID", int),
("Physical Address", format_hints.Hex),
("Virtual Address", format_hints.Hex),
],
self._generator(),
)

def _generator(self) -> Generator[Tuple, None, None]:
def _generator(self) -> Generator[tuple, None, None]:
"""Generates results from a strings file."""
string_list: List[Tuple[int, bytes]] = []
string_list: list[tuple[int, bytes]] = []

# Test strings file format is accurate
accessor = resources.ResourceAccessor()
strings_fp = accessor.open(self.config["strings_file"], "rb")
strings_fp = resources.ResourceAccessor().open(
self.config["strings_file"], "rb"
)
Comment on lines +118 to +120

Check warning

Code scanning / CodeQL

File is not always closed Warning

File is opened but is not closed.

Copilot Autofix

AI 24 days ago

To fix this problem, we should ensure that the opened file is always closed, even if exceptions are raised during file processing. The best way to do this in Python is to use a with statement, which guarantees that the file is closed when the block is exited. Specifically, the assignment to strings_fp and all usages of strings_fp should be indented under a with block, replacing:

strings_fp = resources.ResourceAccessor().open(self.config["strings_file"], "rb")
line = strings_fp.readline()
...
line = strings_fp.readline()

with

with resources.ResourceAccessor().open(self.config["strings_file"], "rb") as strings_fp:
    line = strings_fp.readline()
    ...
    line = strings_fp.readline()

No external libraries are necessary for this fix. The only modification is to wrap the code that uses the file handle in a with block, ensuring the file is always closed.

Suggested changeset 1
volatility3/framework/plugins/windows/strings.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/volatility3/framework/plugins/windows/strings.py b/volatility3/framework/plugins/windows/strings.py
--- a/volatility3/framework/plugins/windows/strings.py
+++ b/volatility3/framework/plugins/windows/strings.py
@@ -115,19 +115,19 @@
         string_list: list[tuple[int, bytes]] = []
 
         # Test strings file format is accurate
-        strings_fp = resources.ResourceAccessor().open(
+        with resources.ResourceAccessor().open(
             self.config["strings_file"], "rb"
-        )
-        line = strings_fp.readline()
-        count: float = 0
-        while line:
-            count += 1
-            try:
-                offset, string = self._parse_line(line)
-                string_list.append((offset, string))
-            except ValueError:
-                vollog.error(f"Line in unrecognized format: line {count}")
+        ) as strings_fp:
             line = strings_fp.readline()
+            count: float = 0
+            while line:
+                count += 1
+                try:
+                    offset, string = self._parse_line(line)
+                    string_list.append((offset, string))
+                except ValueError:
+                    vollog.error(f"Line in unrecognized format: line {count}")
+                line = strings_fp.readline()
         kernel = self.context.modules[self.config["kernel"]]
 
         revmap_tree = self.generate_mapping(
EOF
@@ -115,19 +115,19 @@
string_list: list[tuple[int, bytes]] = []

# Test strings file format is accurate
strings_fp = resources.ResourceAccessor().open(
with resources.ResourceAccessor().open(
self.config["strings_file"], "rb"
)
line = strings_fp.readline()
count: float = 0
while line:
count += 1
try:
offset, string = self._parse_line(line)
string_list.append((offset, string))
except ValueError:
vollog.error(f"Line in unrecognized format: line {count}")
) as strings_fp:
line = strings_fp.readline()
count: float = 0
while line:
count += 1
try:
offset, string = self._parse_line(line)
string_list.append((offset, string))
except ValueError:
vollog.error(f"Line in unrecognized format: line {count}")
line = strings_fp.readline()
kernel = self.context.modules[self.config["kernel"]]

revmap_tree = self.generate_mapping(
Copilot is powered by AI and may make mistakes. Always verify output.
line = strings_fp.readline()
count: float = 0
while line:
Expand All @@ -71,39 +128,54 @@
except ValueError:
vollog.error(f"Line in unrecognized format: line {count}")
line = strings_fp.readline()
kernel = self.context.modules[self.config["kernel"]]

revmap = self.generate_mapping(
context=self.context,
kernel_module_name=self.config["kernel"],
revmap_tree = self.generate_mapping(
self.context,
kernel.layer_name,
kernel.symbol_table_name,
progress_callback=self._progress_callback,
pid_list=self.config["pid"],
)

last_prog: float = 0
_last_prog: float = 0
line_count: float = 0
num_strings = len(string_list)
for offset, string in string_list:
_num_strings = len(string_list)

Check notice

Code scanning / CodeQL

Unused local variable Note

Variable _num_strings is not used.

Copilot Autofix

AI 24 days ago

The best way to fix the problem is to remove the unused assignment to _num_strings. This can be achieved by deleting the line that assigns len(string_list) to _num_strings (line 143). This change is safe because the result of len(string_list) is not used and the call to len() has no side effect, so nothing in the function's behavior is altered. No additional imports, methods, or other code changes are required.

Suggested changeset 1
volatility3/framework/plugins/windows/strings.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/volatility3/framework/plugins/windows/strings.py b/volatility3/framework/plugins/windows/strings.py
--- a/volatility3/framework/plugins/windows/strings.py
+++ b/volatility3/framework/plugins/windows/strings.py
@@ -140,7 +140,6 @@
 
         _last_prog: float = 0
         line_count: float = 0
-        _num_strings = len(string_list)
 
         for phys_offset, string in string_list:
             line_count += 1
EOF
@@ -140,7 +140,6 @@

_last_prog: float = 0
line_count: float = 0
_num_strings = len(string_list)

for phys_offset, string in string_list:
line_count += 1
Copilot is powered by AI and may make mistakes. Always verify output.

for phys_offset, string in string_list:
line_count += 1
try:
revmap_list = [
name + ":" + hex(offset) for (name, offset) in revmap[offset >> 12]
]
except (IndexError, KeyError):
revmap_list = ["FREE MEMORY"]
yield (
0,
(
str(string, "latin-1"),
format_hints.Hex(offset),
", ".join(revmap_list),
),
)
prog = line_count / num_strings * 100
if round(prog, 1) > last_prog:
last_prog = round(prog, 1)
self._progress_callback(prog, "Matching strings in memory")

def _parse_line(self, line: bytes) -> Tuple[int, bytes]:

matched_region = False
for node in revmap_tree.at(phys_offset):
matched_region = True

region_offset = phys_offset - node.physical_addr_start
offset = node.virtual_addr_start + region_offset
yield (
0,
(
str(string.strip(), "latin-1"),
node.region,
node.process_id,
format_hints.Hex(phys_offset),
format_hints.Hex(offset),
),
)

if not matched_region:
# no maps found for this offset
yield (
0,
(
str(string.strip(), "latin-1"),
"Unallocated",
-1,
format_hints.Hex(phys_offset),
format_hints.Hex(0x00),
),
)

def _parse_line(self, line: bytes) -> tuple[int, bytes]:
"""Parses a single line from a strings file.
Args:
Expand All @@ -123,77 +195,99 @@
def generate_mapping(
cls,
context: interfaces.context.ContextInterface,
kernel_module_name: str,
layer_name: str,
symbol_table: str,
progress_callback: constants.ProgressCallback = None,
pid_list: Optional[List[int]] = None,
) -> Dict[int, Set[Tuple[str, int]]]:
pid_list: list[int] | None = None,
) -> MappingTree:
"""Creates a reverse mapping between virtual addresses and physical
addresses.
Args:
context: the context for the method to run against
kernel_module_name: the name of the module forthe kernel
layer_name: the name of the windows intel layer to be scanned
symbol_table: the name of the kernel symbol table
progress_callback: an optional callable to display progress
pid_list: a lit of process IDs to consider when generating the reverse map
Returns:
A mapping of virtual offsets to strings and physical offsets
"""
filter = pslist.PsList.create_pid_filter(pid_list)
revmap_tree = MappingTree()

kernel = context.modules[kernel_module_name]

layer = context.layers[kernel.layer_name]
reverse_map: Dict[int, Set[Tuple[str, int]]] = dict()
# start with kernel mappings
layer: intel.Intel = context.layers[layer_name]
min_kernel_addr = 2 ** (layer._maxvirtaddr - 1)
if isinstance(layer, intel.Intel):
# We don't care about errors, we just wanted chunks that map correctly
for mapval in layer.mapping(0x0, layer.maximum_address, ignore_errors=True):
offset, _, mapped_offset, mapped_size, maplayer = mapval
for val in range(mapped_offset, mapped_offset + mapped_size, 0x1000):
cur_set = reverse_map.get(val >> 12, set())
cur_set.add(("kernel", offset))
reverse_map[val >> 12] = cur_set
for mapval in layer.mapping(
min_kernel_addr, layer.maximum_address, ignore_errors=True
):
(
virt_offset,
virt_size,
phy_offset,
phy_mapping_size,
_phy_layer_name,
) = mapval

node = MappingNode(
phy_offset,
phy_offset + phy_mapping_size,
virt_offset,
virt_offset + virt_size,
-1,
"Kernel",
)
revmap_tree.add(node)

if progress_callback:
progress_callback(
(offset * 100) / layer.maximum_address,
"Creating reverse kernel map",
(virt_offset * 100) / layer.maximum_address,
f"Creating custom tree mapping for kernel at offset : {virt_offset:x}",
)

# TODO: Include kernel modules
# now process normal processes, ignoring kernel addrs
for process in pslist.PsList.list_processes(context, layer_name, symbol_table):
if not filter(process):
proc_id = "Unknown"
try:
proc_id = process.UniqueProcessId
proc_layer_name = process.add_process_layer()
except exceptions.InvalidAddressException as excp:
vollog.debug(
f"Process {proc_id}: invalid address {excp.invalid_address} in layer {excp.layer_name}"
)
continue

for process in pslist.PsList.list_processes(
context=context, kernel_module_name=kernel_module_name
):
if not filter(process):
proc_id = "Unknown"
try:
proc_id = process.UniqueProcessId
proc_layer_name = process.add_process_layer()
except exceptions.InvalidAddressException as excp:
vollog.debug(
f"Process {proc_id}: invalid address {excp.invalid_address} in layer {excp.layer_name}"
proc_layer: intel.Intel = context.layers[proc_layer_name]
max_proc_addr = (2 ** (proc_layer._maxvirtaddr - 1)) - 1
if isinstance(proc_layer, linear.LinearlyMappedLayer):
for mapval in proc_layer.mapping(
0, max_proc_addr, ignore_errors=True
):
(
virt_offset,
virt_size,
phy_offset,
phy_mapping_size,
_phy_layer_name,
) = mapval

node = MappingNode(
phy_offset,
phy_offset + phy_mapping_size,
virt_offset,
virt_offset + virt_size,
process_id=proc_id,
region="Process",
)
continue

proc_layer = context.layers[proc_layer_name]
if isinstance(proc_layer, linear.LinearlyMappedLayer):
for mapval in proc_layer.mapping(
0x0, proc_layer.maximum_address, ignore_errors=True
):
mapped_offset, _, offset, mapped_size, _maplayer = mapval
for val in range(
mapped_offset, mapped_offset + mapped_size, 0x1000
):
cur_set = reverse_map.get(mapped_offset >> 12, set())
cur_set.add(
(f"Process {process.UniqueProcessId}", offset)
)
reverse_map[mapped_offset >> 12] = cur_set
# FIXME: make the progress for all processes, rather than per-process
if progress_callback:
progress_callback(
(offset * 100) / layer.maximum_address,
f"Creating mapping for task {process.UniqueProcessId}",
)

return reverse_map
revmap_tree.add(node)

if progress_callback:
progress_callback(
(virt_offset * 100) / max_proc_addr,
f"Creating custom tree mapping for task {proc_id}: {virt_offset:x}",
)
return revmap_tree
Loading