Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/ash/action_input.ex
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ defmodule Ash.ActionInput do
resource: Ash.Resource.t(),
invalid_keys: MapSet.t(),
context: map(),
domain: Ash.Domain.t(),
domain: Ash.Domain.t() | nil,
valid?: boolean(),
errors: [Ash.Error.t()],
before_action: [before_action_fun],
Expand Down Expand Up @@ -132,7 +132,7 @@ defmodule Ash.ActionInput do
- `set_argument/3` for adding arguments
- `set_context/2` for adding context
"""
@spec new(Ash.Resource.t(), Ash.Domain.t()) :: t
@spec new(Ash.Resource.t(), Ash.Domain.t() | nil) :: t
def new(resource, domain \\ nil) do
%__MODULE__{resource: resource, domain: domain}
end
Expand Down
16 changes: 12 additions & 4 deletions lib/ash/actions/bulk_manual_action_helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,29 @@ defmodule Ash.Actions.BulkManualActionHelpers do
case result do
{:ok, record} ->
record =
Ash.Resource.put_metadata(
record,
record
|> Ash.Resource.put_metadata(
metadata_index_name,
changeset.context[bulk_action_type].index
)
|> Ash.Resource.put_metadata(
:bulk_action_ref,
changeset.context[bulk_action_type].ref
)

{:ok, record}

{:ok, record, notifications} ->
record =
Ash.Resource.put_metadata(
record,
record
|> Ash.Resource.put_metadata(
metadata_index_name,
changeset.context[bulk_action_type].index
)
|> Ash.Resource.put_metadata(
:bulk_action_ref,
changeset.context[bulk_action_type].ref
)

{:ok, record, notifications}

Expand Down
52 changes: 37 additions & 15 deletions lib/ash/actions/create/bulk.ex
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ defmodule Ash.Actions.Create.Bulk do
:bulk_create
)

changesets_by_index = index_changesets(batch)
{changesets_by_ref, changesets_by_index} = index_changesets(batch)

run_batch(
resource,
Expand All @@ -683,6 +683,7 @@ defmodule Ash.Actions.Create.Bulk do
ref,
attrs_to_require,
action_select,
changesets_by_ref,
changesets_by_index
)
|> run_after_action_hooks(opts, domain, ref)
Expand All @@ -691,6 +692,7 @@ defmodule Ash.Actions.Create.Bulk do
all_changes,
opts,
ref,
changesets_by_ref,
changesets_by_index,
batch,
domain,
Expand Down Expand Up @@ -720,7 +722,7 @@ defmodule Ash.Actions.Create.Bulk do
argument_names
) do
base
|> Ash.Changeset.put_context(:bulk_create, %{index: index})
|> Ash.Changeset.put_context(:bulk_create, %{index: index, ref: make_ref()})
|> Ash.Changeset.set_private_arguments_for_action(opts[:private_arguments] || %{})
|> handle_params(
Keyword.get(opts, :assume_casted?, false),
Expand Down Expand Up @@ -868,12 +870,14 @@ defmodule Ash.Actions.Create.Bulk do
end

defp index_changesets(batch) do
Enum.reduce(batch, %{}, fn changeset, changesets_by_index ->
Map.put(
changesets_by_index,
changeset.context.bulk_create.index,
changeset
)
Enum.reduce(batch, {%{}, %{}}, fn changeset, {by_ref, by_index} ->
ref = changeset.context.bulk_create.ref
index = changeset.context.bulk_create.index

{
Map.put(by_ref, ref, changeset),
Map.put(by_index, index, ref)
}
end)
end

Expand Down Expand Up @@ -1031,6 +1035,7 @@ defmodule Ash.Actions.Create.Bulk do
ref,
attrs_to_require,
action_select,
changesets_by_ref,
changesets_by_index
) do
batch
Expand Down Expand Up @@ -1092,7 +1097,7 @@ defmodule Ash.Actions.Create.Bulk do
end)
|> case do
[] ->
{[], changesets_by_index}
{[], changesets_by_ref, changesets_by_index}

batch ->
upsert_keys =
Expand Down Expand Up @@ -1122,7 +1127,7 @@ defmodule Ash.Actions.Create.Bulk do
end
end

changesets_by_index = index_changesets(batch)
{changesets_by_ref, changesets_by_index} = index_changesets(batch)

batch
|> Enum.group_by(&{&1.atomics, &1.filter})
Expand Down Expand Up @@ -1346,7 +1351,7 @@ defmodule Ash.Actions.Create.Bulk do
[]
end
end)
|> then(&{&1, changesets_by_index})
|> then(&{&1, changesets_by_ref, changesets_by_index})
end
end

Expand All @@ -1365,13 +1370,20 @@ defmodule Ash.Actions.Create.Bulk do
end

defp run_after_action_hooks(
{batch_results, changesets_by_index},
{batch_results, changesets_by_ref, changesets_by_index},
opts,
domain,
ref
) do
Enum.flat_map(batch_results, fn result ->
changeset = changesets_by_index[result.__metadata__.bulk_create_index]
changeset =
Ash.Actions.Helpers.lookup_changeset(
result,
changesets_by_ref,
changesets_by_index,
index_key: :bulk_create_index,
ref_key: :bulk_action_ref
)

case manage_relationships(result, domain, changeset,
upsert?: opts[:upsert?],
Expand Down Expand Up @@ -1413,6 +1425,7 @@ defmodule Ash.Actions.Create.Bulk do
all_changes,
opts,
ref,
changesets_by_ref,
changesets_by_index,
changesets,
domain,
Expand All @@ -1422,7 +1435,14 @@ defmodule Ash.Actions.Create.Bulk do
) do
results =
Enum.flat_map(batch, fn result ->
changeset = changesets_by_index[result.__metadata__.bulk_create_index]
changeset =
Ash.Actions.Helpers.lookup_changeset(
result,
changesets_by_ref,
changesets_by_index,
index_key: :bulk_create_index,
ref_key: :bulk_action_ref
)

if opts[:notify?] || opts[:return_notifications?] do
store_notification(ref, notification(changeset, result, opts), opts)
Expand Down Expand Up @@ -1455,12 +1475,14 @@ defmodule Ash.Actions.Create.Bulk do
|> Ash.Actions.Update.Bulk.run_bulk_after_changes(
all_changes,
results,
changesets_by_ref,
changesets_by_index,
changesets,
opts,
ref,
resource,
:bulk_create_index
:bulk_create_index,
:bulk_action_ref
)
|> then(fn records ->
if opts[:return_records?] do
Expand Down
55 changes: 41 additions & 14 deletions lib/ash/actions/destroy/bulk.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1472,7 +1472,8 @@ defmodule Ash.Actions.Destroy.Bulk do

[
Ash.Resource.set_metadata(result, %{
bulk_destroy_index: changeset.context.bulk_destroy.index
bulk_destroy_index: changeset.context.bulk_destroy.index,
bulk_action_ref: changeset.context.bulk_destroy.ref
})
]

Expand All @@ -1490,7 +1491,8 @@ defmodule Ash.Actions.Destroy.Bulk do

[
Ash.Resource.set_metadata(result, %{
bulk_destroy_index: changeset.context.bulk_destroy.index
bulk_destroy_index: changeset.context.bulk_destroy.index,
bulk_action_ref: changeset.context.bulk_destroy.ref
})
]

Expand Down Expand Up @@ -1628,7 +1630,7 @@ defmodule Ash.Actions.Destroy.Bulk do
:bulk_destroy
)

changesets_by_index = index_changesets(batch)
{changesets_by_ref, changesets_by_index} = index_changesets(batch)

run_batch(
resource,
Expand All @@ -1640,12 +1642,13 @@ defmodule Ash.Actions.Destroy.Bulk do
domain,
ref
)
|> run_after_action_hooks(opts, domain, ref, changesets_by_index)
|> run_after_action_hooks(opts, domain, ref, changesets_by_ref, changesets_by_index)
|> process_results(
changes,
all_changes,
opts,
ref,
changesets_by_ref,
changesets_by_index,
batch,
domain,
Expand Down Expand Up @@ -1678,7 +1681,7 @@ defmodule Ash.Actions.Destroy.Bulk do
|> Map.put(:domain, domain)
|> Ash.Changeset.prepare_changeset_for_action(action, opts)
|> Ash.Changeset.set_private_arguments_for_action(opts[:private_arguments] || %{})
|> Ash.Changeset.put_context(:bulk_destroy, %{index: index})
|> Ash.Changeset.put_context(:bulk_destroy, %{index: index, ref: make_ref()})
|> Ash.Changeset.set_context(opts[:context] || %{})
|> handle_params(
Keyword.get(opts, :assume_casted?, false),
Expand Down Expand Up @@ -1769,12 +1772,14 @@ defmodule Ash.Actions.Destroy.Bulk do
end

defp index_changesets(batch) do
Enum.reduce(batch, %{}, fn changeset, changesets_by_index ->
Map.put(
changesets_by_index,
changeset.context.bulk_destroy.index,
changeset
)
Enum.reduce(batch, {%{}, %{}}, fn changeset, {by_ref, by_index} ->
ref = changeset.context.bulk_destroy.ref
index = changeset.context.bulk_destroy.index

{
Map.put(by_ref, ref, changeset),
Map.put(by_index, index, ref)
}
end)
end

Expand Down Expand Up @@ -2072,6 +2077,10 @@ defmodule Ash.Actions.Destroy.Bulk do
:bulk_destroy_index,
changeset.context.bulk_destroy.index
)
|> Ash.Resource.put_metadata(
:bulk_action_ref,
changeset.context.bulk_destroy.ref
)
]}

{:error, error} ->
Expand Down Expand Up @@ -2116,10 +2125,18 @@ defmodule Ash.Actions.Destroy.Bulk do
opts,
domain,
ref,
changesets_by_ref,
changesets_by_index
) do
Enum.flat_map(batch_results, fn result ->
changeset = changesets_by_index[result.__metadata__.bulk_destroy_index]
changeset =
Ash.Actions.Helpers.lookup_changeset(
result,
changesets_by_ref,
changesets_by_index,
index_key: :bulk_destroy_index,
ref_key: :bulk_action_ref
)

case manage_relationships(result, domain, changeset,
actor: opts[:actor],
Expand Down Expand Up @@ -2160,6 +2177,7 @@ defmodule Ash.Actions.Destroy.Bulk do
all_changes,
opts,
ref,
changesets_by_ref,
changesets_by_index,
changesets,
domain,
Expand All @@ -2170,15 +2188,24 @@ defmodule Ash.Actions.Destroy.Bulk do
|> Ash.Actions.Update.Bulk.run_bulk_after_changes(
all_changes,
batch,
changesets_by_ref,
changesets_by_index,
changesets,
opts,
ref,
resource,
:bulk_destroy_index
:bulk_destroy_index,
:bulk_action_ref
)
|> Enum.flat_map(fn result ->
changeset = changesets_by_index[result.__metadata__[:bulk_destroy_index]]
changeset =
Ash.Actions.Helpers.lookup_changeset(
result,
changesets_by_ref,
changesets_by_index,
index_key: :bulk_destroy_index,
ref_key: :bulk_action_ref
)

if opts[:notify?] || opts[:return_notifications?] do
store_notification(ref, notification(changeset, result, opts), opts)
Expand Down
Loading