Skip to content

Conversation

@Noarkhh
Copy link
Contributor

@Noarkhh Noarkhh commented Nov 14, 2025

When using server-based endpoints (reader, writer, message), the server will now spawn the pipeline directly and communicate with elixir elements directly.

I'm not sure which approach is better for having push and pull elixir source and sink - using defdelegetes and have a common behavior module (the current one) or have a macro that generates push and pull element.

@Noarkhh Noarkhh self-assigned this Nov 14, 2025
@Noarkhh Noarkhh added this to Smackore Nov 14, 2025
@Noarkhh Noarkhh moved this to In Progress in Smackore Nov 14, 2025
@Noarkhh Noarkhh changed the base branch from master to message-endpoint November 14, 2025 17:23
@Noarkhh Noarkhh force-pushed the refactor-elixir-endpoints branch from 7cb399c to e85f6a1 Compare November 14, 2025 17:23
@Noarkhh Noarkhh force-pushed the message-endpoint branch 3 times, most recently from 88a148a to e44efe8 Compare November 26, 2025 15:17
@Noarkhh Noarkhh force-pushed the refactor-elixir-endpoints branch 2 times, most recently from 9ac398d to b729d70 Compare November 26, 2025 15:20
@Noarkhh Noarkhh requested review from mat-hek and varsill November 26, 2025 15:54
@Noarkhh Noarkhh force-pushed the refactor-elixir-endpoints branch 2 times, most recently from 4c07d31 to a053f6c Compare November 26, 2025 15:59
@Noarkhh Noarkhh moved this from In Progress to In Review in Smackore Nov 26, 2025
@mat-hek
Copy link
Member

mat-hek commented Nov 28, 2025

@Noarkhh I'd go for a macro that generates push & pull elements ;)

membrane_source_demand: non_neg_integer(),
pipeline_supervisor: pid() | nil,
pipeline: pid() | nil,
ghosted_client: GenServer.from() | Process.dest() | nil,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is a bit ambiguous, maybe client_to_reply or current_client?


RELEASES_URL = "https://github.com/membraneframework/boombox/releases"
PACKAGE_NAME = "boomboxlib"
PACKAGE_NAME = "boomboxlibb"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?


match self._call(request):
case Atom("ok"):
print("uuuu")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xd

Comment on lines 16 to 17
|> Map.update!(:input, &resolve_elixir_endpoint(&1, self()))
|> Map.update!(:output, &resolve_elixir_endpoint(&1, self()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

endpoint resolution happens in the internal bin, can we move this there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really, because we need to call self() in the parent process, not the bin

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're passing this pid as a parent option. We can pass it to the internal bin as well

examples.livemd Outdated
Comment on lines 576 to 602
Stream.unfold(%{}, fn _state ->
{result1, packet1} = Boombox.read(reader1)
{result2, packet2} = Boombox.read(reader2)
case {Boombox.read(reader1), Boombox.read(reader2)} do
{:finished, :finished} ->
nil

joined_image =
Vix.Vips.Operation.join!(packet1.payload, packet2.payload, :VIPS_DIRECTION_HORIZONTAL)
{{:ok, _packet}, :finished} ->
Boombox.close(reader1)
nil

packet = %Boombox.Packet{
pts: max(packet1.pts, packet2.pts),
payload: joined_image,
kind: :video
}
{:finished, {:ok, _packet}} ->
Boombox.close(reader2)
nil

Boombox.write(writer, packet)
{{:ok, packet1}, {:ok, packet2}} ->
joined_image =
Vix.Vips.Operation.join!(packet1.payload, packet2.payload, :VIPS_DIRECTION_HORIZONTAL)

if :finished in [result1, result2] do
if result1 == :ok, do:
Boombox.close(reader1)
if result2 == :ok, do:
Boombox.close(reader2)
nil
else
{nil, %{}}
packet = %Boombox.Packet{
pts: max(packet1.pts, packet2.pts),
payload: joined_image,
kind: :video
}

Boombox.write(writer, packet)

{nil, %{}}
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream.unfold with an ignored state looks weird. Also, matching on all the possibilities seems redundant. I'd go for

Suggested change
Stream.unfold(%{}, fn _state ->
{result1, packet1} = Boombox.read(reader1)
{result2, packet2} = Boombox.read(reader2)
case {Boombox.read(reader1), Boombox.read(reader2)} do
{:finished, :finished} ->
nil
joined_image =
Vix.Vips.Operation.join!(packet1.payload, packet2.payload, :VIPS_DIRECTION_HORIZONTAL)
{{:ok, _packet}, :finished} ->
Boombox.close(reader1)
nil
packet = %Boombox.Packet{
pts: max(packet1.pts, packet2.pts),
payload: joined_image,
kind: :video
}
{:finished, {:ok, _packet}} ->
Boombox.close(reader2)
nil
Boombox.write(writer, packet)
{{:ok, packet1}, {:ok, packet2}} ->
joined_image =
Vix.Vips.Operation.join!(packet1.payload, packet2.payload, :VIPS_DIRECTION_HORIZONTAL)
if :finished in [result1, result2] do
if result1 == :ok, do:
Boombox.close(reader1)
if result2 == :ok, do:
Boombox.close(reader2)
nil
else
{nil, %{}}
packet = %Boombox.Packet{
pts: max(packet1.pts, packet2.pts),
payload: joined_image,
kind: :video
}
Boombox.write(writer, packet)
{nil, %{}}
end
Stream.repeatedly(fn ->
case {Boombox.read(reader1), Boombox.read(reader2)} do
{{:ok, packet1}, {:ok, packet2}} ->
joined_image =
Vix.Vips.Operation.join!(packet1.payload, packet2.payload, :VIPS_DIRECTION_HORIZONTAL)
packet = %Boombox.Packet{
pts: max(packet1.pts, packet2.pts),
payload: joined_image,
kind: :video
}
:ok = Boombox.write(writer, packet)
_finished ->
:eos
end
end)
|> Enum.find(& &1 == :eos)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mat-hek it looks good we just need to remember to call Boombox.close()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, missed that. Maybe Boombox.close on already closed boombox could be a noop, so we could just close both boomboxes no matter what?


alias Boombox.InternalBin.ElixirEndpoints.Sink

def_input_pad :input,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How many pads of this type can exist? Wouldn't max_instances be applicable here?

examples.livemd Outdated
Comment on lines 630 to 634
bb_states: %{
bb1: %{last_packet: nil, eos: false},
bb2: %{last_packet: nil, eos: false}
},
input_boomboxes: %{bb1 => :bb1, bb2 => :bb2},
bbs: %{bb1 => :bb1, bb2 => :bb2},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] I am not 100% encouraged by this change, bbs look quite funny to me :D

examples.livemd Outdated
Comment on lines 576 to 602
Stream.unfold(%{}, fn _state ->
{result1, packet1} = Boombox.read(reader1)
{result2, packet2} = Boombox.read(reader2)
case {Boombox.read(reader1), Boombox.read(reader2)} do
{:finished, :finished} ->
nil

joined_image =
Vix.Vips.Operation.join!(packet1.payload, packet2.payload, :VIPS_DIRECTION_HORIZONTAL)
{{:ok, _packet}, :finished} ->
Boombox.close(reader1)
nil

packet = %Boombox.Packet{
pts: max(packet1.pts, packet2.pts),
payload: joined_image,
kind: :video
}
{:finished, {:ok, _packet}} ->
Boombox.close(reader2)
nil

Boombox.write(writer, packet)
{{:ok, packet1}, {:ok, packet2}} ->
joined_image =
Vix.Vips.Operation.join!(packet1.payload, packet2.payload, :VIPS_DIRECTION_HORIZONTAL)

if :finished in [result1, result2] do
if result1 == :ok, do:
Boombox.close(reader1)
if result2 == :ok, do:
Boombox.close(reader2)
nil
else
{nil, %{}}
packet = %Boombox.Packet{
pts: max(packet1.pts, packet2.pts),
payload: joined_image,
kind: :video
}

Boombox.write(writer, packet)

{nil, %{}}
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mat-hek it looks good we just need to remember to call Boombox.close()

Comment on lines 563 to 568
Stream.unfold(:ok, fn
:ok ->
{result, packet} = Boombox.read(boombox)
{packet, result}

:finished ->
nil
case Boombox.read(boombox) do
{:ok, packet} -> {packet, :ok}
:finished -> nil
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] This "state" here is useless so you can remove it
And similarly to the case in examples.livemd, you could use Enum.repeatedly here

if not self._response.done():
self._response.set_result(response)
case (Atom("DOWN"), _, Atom("process"), _, Atom("normal")):
print(self._boombox_mode)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can remove these :D


@impl true
def handle_info(:boombox_demand, _ctx, state) do
def handle_info({:boombox_demand, consumer}, _ctx, %{consumer: consumer} = state) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we buffer that demand up if consumer is not yet available?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually after looking at the whole module I am not sure if the consumer field in the state is defined anywhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the consumer is passed with options, but I agree, this needs better typing

def_input_pad :input,
accepted_format: any_of(Membrane.RawAudio, Membrane.RawVideo),
availability: :on_request,
flow_control: :auto
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why :auto instead of :push?

@Noarkhh Noarkhh force-pushed the refactor-elixir-endpoints branch 5 times, most recently from b46ae53 to e99f2d6 Compare December 3, 2025 17:05
@Noarkhh Noarkhh requested review from mat-hek and varsill December 4, 2025 15:21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I like the version with defdelegate more, but let it be :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a comment 🥰

Comment on lines 24 to 28
def_options(
consumer: [
spec: pid()
]
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove the parentheses.
I would also opt for some :description (just to mention that it's a pid of a process that will receive the {:boombox_packet, ...} messages) even though it's a private module :D

@Noarkhh Noarkhh force-pushed the refactor-elixir-endpoints branch from e99f2d6 to e680f73 Compare December 11, 2025 11:01
@Noarkhh Noarkhh force-pushed the refactor-elixir-endpoints branch from e680f73 to d71885f Compare December 11, 2025 11:14
@Noarkhh Noarkhh requested a review from varsill December 11, 2025 11:18
# handles demands from subsequent element by demanding packets from the producer
# process with `{:boombox_demand, self(), demand_amount}` messages.
# * PushSource - The element has `:push` flow control on output pads and expects
# the producer process to provide it packets without demanding them.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# the producer process to provide it packets without demanding them.
# the producer process to provide it with packets without demanding them.

examples.livemd Outdated

# MIX_INSTALL_CONFIG_BEGIN
boombox = {:boombox, github: "membraneframework/boombox"}
boombox = {:boombox, github: "membraneframework/boombox", branch: "refactor-elixir-endpoints"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remember to update it before merging

lib/boombox.ex Outdated
Returns `:ok` if more packets can be provided, and
`:finished` when Boombox finished consuming and will not accept any more packets. Returns
synchronously once the packet has been processed by Boombox.
synchronously once the packet has been ingested Boombox is ready for more packets.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
synchronously once the packet has been ingested Boombox is ready for more packets.
synchronously once the packet has been ingested and Boombox is ready for more packets.

Comment on lines 53 to 61
def handle_demand(Pad.ref(:output, _id), _size, _unit, ctx, state) do
demands = Enum.map(ctx.pads, fn {_pad, %{demand: demand}} -> demand end)

if Enum.all?(demands, &(&1 > 0)) do
send(state.producer, {:boombox_demand, self(), Enum.sum(demands)})
end

{[], state}
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, handling demand requires changes

@Noarkhh Noarkhh requested a review from varsill December 18, 2025 11:12
Base automatically changed from message-endpoint to master December 18, 2025 11:44

# value, _state ->
# raise ArgumentError, "Expected Boombox.Packet.t(), got: #{inspect(value)}"
# end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leftover

packets and will begin terminating. No more messages will be sent.
```
Boombox.run(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example doesn't fit here

If the output is a `:reader` endpoint this function will return a `Boombox.Reader` struct,
which is used to read media packets from boombox with `read/1`.
Input endpoints with special behaviours:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add one sentence describing what's a 'normal' behaviour

Comment on lines +63 to +65
def handle_demand(Pad.ref(:output, id), size, _unit, _ctx, state) do
send(state.producer, {:boombox_demand, self(), id, size})
{[], state}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this approach, don't we have the same problem as before:

  • handle_demand is called
  • we send the demand to boombox
  • boombox receives the demand
  • handle_demand is called again, with bigger demand
  • boombox sends buffers to element
  • boombox receives the new demand and saves it, but the actual demand is smaller, as buffers were sent

We could use ctx.incoming_demand, send that to boombox and sum it in boombox to avoid this problem

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: In Review

Development

Successfully merging this pull request may close these issues.

4 participants