Skip to content

Commit 1c9be97

Browse files
committed
parser: lazy streaming to extract body and conditional extraction
Phase 2 of a memory usage optimization extract_parts_ranges/2 function identifies part boundaries first as lightweight {offset, size} tuples before extracting any content. Only small parts (configurable threshold, default 10MB) are extracted and parsed; Large parts are replaced with placeholder messages.
1 parent 3728d22 commit 1c9be97

File tree

2 files changed

+182
-35
lines changed

2 files changed

+182
-35
lines changed

lib/mail/parsers/rfc_2822.ex

Lines changed: 135 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ defmodule Mail.Parsers.RFC2822 do
4545
4646
* `:charset_handler` - A function that takes a charset and binary and returns a binary. Defaults to return the string as is.
4747
* `:header_only` - Whether to parse only the headers. Defaults to false.
48+
* `:max_part_size` - The maximum size of a part in bytes. Defaults to 10MB.
49+
* `:skip_large_parts?` - Whether to skip parts larger than `max_part_size`. Defaults to false.
4850
4951
"""
5052
@spec parse(binary() | nonempty_maybe_improper_list(), keyword()) :: Mail.Message.t()
@@ -714,65 +716,163 @@ defmodule Mail.Parsers.RFC2822 do
714716
defp remove_excess_whitespace(<<char::utf8, rest::binary>>),
715717
do: <<char::utf8, remove_excess_whitespace(rest)::binary>>
716718

717-
defp parse_body(%Mail.Message{multipart: true} = message, lines, opts) do
719+
defp parse_body_binary(%Mail.Message{multipart: true} = message, body_content, opts) do
718720
content_type = message.headers["content-type"]
719721
boundary = Mail.Proplist.get(content_type, "boundary")
722+
part_ranges = extract_parts_ranges(body_content, boundary)
720723

721-
parts =
722-
lines
723-
|> extract_parts(boundary)
724-
|> Enum.map(fn part ->
725-
parse(part, opts)
724+
size_threshold = Keyword.get(opts, :max_part_size, 10_000_000)
725+
skip_large_parts? = Keyword.get(opts, :skip_large_parts?, false)
726+
727+
parsed_parts =
728+
part_ranges
729+
|> Enum.map(fn {start, size} ->
730+
if skip_large_parts? and size > size_threshold do
731+
# Don't extract or parse large parts: return placeholder
732+
%Mail.Message{body: "[Part skipped: #{size} bytes - too large to parse]"}
733+
else
734+
String.trim_trailing(binary_part(body_content, start, size), "\r\n")
735+
|> parse(opts)
736+
end
726737
end)
727738

728-
case parts do
729-
[] -> parse_body(Map.put(message, :multipart, false), lines, opts)
730-
_ -> Map.put(message, :parts, parts)
739+
case parsed_parts do
740+
[] -> parse_body_binary(Map.put(message, :multipart, false), body_content, opts)
741+
_ -> Map.put(message, :parts, parsed_parts)
731742
end
732743
end
733744

734-
defp parse_body(%Mail.Message{} = message, [], _opts) do
735-
message
745+
# Empty body for non-multipart - set to empty string
746+
defp parse_body_binary(%Mail.Message{multipart: false} = message, "", _opts) do
747+
Map.put(message, :body, "")
736748
end
737749

738-
defp parse_body(%Mail.Message{} = message, lines, opts) do
739-
decoded =
740-
lines
741-
|> join_body()
742-
|> decode(message, opts)
750+
# Empty body for multipart (shouldn't happen normally, but leave as nil)
751+
defp parse_body_binary(%Mail.Message{} = message, "", _opts) do
752+
message
753+
end
743754

755+
# Simple (non-multipart) body
756+
defp parse_body_binary(%Mail.Message{} = message, body_content, opts) do
757+
# Normalize line endings without splitting into array
758+
normalized_body = String.replace(body_content, ~r/\r?\n/, "\r\n")
759+
decoded = decode(normalized_body, message, opts)
744760
Map.put(message, :body, decoded)
745761
end
746762

747-
defp join_body(lines, acc \\ [])
748-
defp join_body([], acc), do: acc |> Enum.reverse() |> Enum.join("\r\n")
749-
defp join_body([""], acc), do: acc |> Enum.reverse() |> Enum.join("\r\n")
750-
defp join_body([head | tail], acc), do: join_body(tail, [head | acc])
763+
defp extract_parts_ranges(content, boundary) do
764+
start_boundary = "--" <> boundary
765+
end_boundary = "--" <> boundary <> "--"
766+
767+
# Stream through content tracking byte offsets for boundaries
768+
content
769+
|> Stream.unfold(fn remaining ->
770+
case remaining |> :binary.split("\n") do
771+
[""] -> nil
772+
[line, rest] -> {{line, byte_size(line) + 1}, rest}
773+
[line] -> {{line, byte_size(line)}, ""}
774+
end
775+
end)
776+
|> Stream.map(fn {line, size} -> {String.trim_trailing(line), size} end)
777+
|> Enum.reduce_while({[], 0, nil, nil}, fn {line, line_size}, {_, offset, _, _} = acc ->
778+
new_offset = offset + line_size
779+
accumulate_part_range({line, new_offset}, acc, start_boundary, end_boundary)
780+
end)
781+
|> extract_final_part_range(content)
782+
|> Enum.reverse()
783+
end
784+
785+
# End boundary found: but no part was started
786+
defp accumulate_part_range(
787+
{line, new_offset},
788+
{ranges, _offset, _state, nil},
789+
_start_boundary,
790+
end_boundary
791+
)
792+
when line == end_boundary do
793+
{:halt, {ranges, new_offset, :done, nil}}
794+
end
751795

752-
defp extract_parts(lines, boundary, acc \\ [], parts \\ nil)
796+
# End boundary found: append new [part_start -> offset] range
797+
defp accumulate_part_range(
798+
{line, new_offset},
799+
{ranges, offset, _state, part_start},
800+
_start_boundary,
801+
end_boundary
802+
)
803+
when line == end_boundary do
804+
ranges = [{part_start, offset - part_start} | ranges]
805+
{:halt, {ranges, new_offset, :done, nil}}
806+
end
753807

754-
defp extract_parts([], _boundary, _acc, parts),
755-
do: Enum.reverse(List.wrap(parts))
808+
# Start boundary found: first boundary
809+
defp accumulate_part_range(
810+
{line, new_offset},
811+
{ranges, _offset, nil, _part_start},
812+
start_boundary,
813+
_end_boundary
814+
)
815+
when line == start_boundary do
816+
{:cont, {ranges, new_offset, :collecting, new_offset}}
817+
end
756818

757-
defp extract_parts(["--" <> boundary | tail], boundary, acc, nil),
758-
do: extract_parts(tail, boundary, acc, [])
819+
# Start boundary found: subsequent boundary
820+
defp accumulate_part_range(
821+
{line, new_offset},
822+
{ranges, offset, _state, part_start},
823+
start_boundary,
824+
_end_boundary
825+
)
826+
when line == start_boundary do
827+
part_range = {part_start, offset - part_start}
828+
{:cont, {[part_range | ranges], new_offset, :collecting, new_offset}}
829+
end
759830

760-
defp extract_parts(["--" <> boundary | tail], boundary, acc, parts),
761-
do: extract_parts(tail, boundary, [], [Enum.reverse(acc) | parts])
831+
# Inside a part: just track offset
832+
defp accumulate_part_range(
833+
{_line, new_offset},
834+
{ranges, _offset, :collecting, part_start},
835+
_start_boundary,
836+
_end_boundary
837+
) do
838+
{:cont, {ranges, new_offset, :collecting, part_start}}
839+
end
840+
841+
# Before first boundary: ignore
842+
defp accumulate_part_range(
843+
{_line, new_offset},
844+
{ranges, _offset, state, part_start},
845+
_start_boundary,
846+
_end_boundary
847+
) do
848+
{:cont, {ranges, new_offset, state, part_start}}
849+
end
850+
851+
# Handle case where end boundary wasn't found (still :collecting)
852+
defp extract_final_part_range({ranges, _offset, :collecting, start}, content) do
853+
# Add final part from start to end of content (if it has content (not empty or just whitespace)
854+
part_size = byte_size(content) - start
762855

763-
defp extract_parts([<<"--" <> rest>> = line | tail], boundary, acc, parts) do
764-
if rest == boundary <> "--" do
765-
extract_parts([], boundary, [], [Enum.reverse(acc) | parts])
856+
if part_size > 0 and contains_non_whitespace?(content, start, start + part_size) do
857+
[{start, part_size} | ranges]
766858
else
767-
extract_parts(tail, boundary, [line | acc], parts)
859+
ranges
768860
end
769861
end
770862

771-
defp extract_parts([_line | tail], boundary, acc, nil),
772-
do: extract_parts(tail, boundary, acc, nil)
863+
defp extract_final_part_range({ranges, _offset, _state, _start}, _content) do
864+
ranges
865+
end
866+
867+
@whitespaces [?\s, ?\t, ?\r, ?\n]
868+
defp contains_non_whitespace?(content, pos, limit) when pos < limit do
869+
case :binary.at(content, pos) do
870+
char when char in @whitespaces -> contains_non_whitespace?(content, pos + 1, limit)
871+
_ -> true
872+
end
873+
end
773874

774-
defp extract_parts([head | tail], boundary, acc, parts),
775-
do: extract_parts(tail, boundary, [head | acc], parts)
875+
defp contains_non_whitespace?(_, _, _), do: false
776876

777877
defp key_to_atom(key) do
778878
key

test/mail/parsers/rfc_2822_test.exs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1292,6 +1292,53 @@ defmodule Mail.Parsers.RFC2822Test do
12921292
end
12931293
end
12941294

1295+
describe "Large part handling" do
1296+
@multi_part_message """
1297+
To: Test User <[email protected]>, Other User <[email protected]>
1298+
CC: The Dude <[email protected]>, Batman <[email protected]>
1299+
From: Me <[email protected]>
1300+
Subject: Test email
1301+
Mime-Version: 1.0
1302+
Content-Type: multipart/alternative; boundary=foobar
1303+
1304+
This is a multi-part message in MIME format
1305+
--foobar
1306+
Content-Type: text/plain
1307+
1308+
This is some text
1309+
1310+
--foobar
1311+
Content-Type: text/html
1312+
1313+
<h1>This is some HTML</h1>
1314+
1315+
--foobar
1316+
x-my-header: no body!
1317+
1318+
--foobar--
1319+
"""
1320+
1321+
test "large parts are skipped" do
1322+
message =
1323+
parse_email(
1324+
@multi_part_message,
1325+
skip_large_parts?: true,
1326+
max_part_size: 1
1327+
)
1328+
1329+
assert message.body == nil
1330+
1331+
[text_part, html_part, headers_only_part] = message.parts
1332+
1333+
assert text_part.parts == []
1334+
assert text_part.headers == %{}
1335+
assert html_part.parts == []
1336+
assert html_part.headers == %{}
1337+
assert headers_only_part.parts == []
1338+
assert headers_only_part.headers == %{}
1339+
end
1340+
end
1341+
12951342
defp parse_email(email, opts \\ []),
12961343
do: email |> convert_crlf |> Mail.Parsers.RFC2822.parse(opts)
12971344

0 commit comments

Comments
 (0)