diff --git a/lib/que.ex b/lib/que.ex index 87b5eb5..3a19a86 100644 --- a/lib/que.ex +++ b/lib/que.ex @@ -102,8 +102,161 @@ defmodule Que do Que.add(App.Workers.SignupMailer, to: "some@email.com", message: "Thank you for Signing up!") #=> :ok + + # Add a job with custom retry configuration + Que.add(App.Workers.ImportantTask, task_data, max_retries: 5) + #=> :ok + + # Add a high priority job with timeout + Que.add(App.Workers.UrgentTask, urgent_data, priority: :high, timeout: 30_000) + #=> :ok + ``` + """ + @spec add(worker :: module, arguments :: term, opts :: Keyword.t()) :: {:ok, %Que.Job{}} + def add(worker, arguments, opts \\ []) do + Que.ServerSupervisor.add(worker, arguments, opts) + end + + + @doc """ + Schedules a Job to be processed by Que at a specific time. + + Accepts the worker module name, arguments to be passed to the worker, + and the time when the job should be executed. + + ## Example + + ``` + # Schedule a job to run in 1 hour + scheduled_time = NaiveDateTime.add(NaiveDateTime.utc_now(), 3600, :second) + Que.add_scheduled(App.Workers.SendReminder, user.email, scheduled_time) + #=> {:ok, %Que.Job{}} + + # Schedule a job to run at a specific datetime + scheduled_time = ~N[2024-12-31 23:59:59] + Que.add_scheduled(App.Workers.NewYearGreeting, user.id, scheduled_time) + #=> {:ok, %Que.Job{}} + ``` + """ + @spec add_scheduled(worker :: module, arguments :: term, scheduled_at :: NaiveDateTime.t(), opts :: Keyword.t()) :: {:ok, %Que.Job{}} + def add_scheduled(worker, arguments, scheduled_at, opts \\ []) do + Que.ServerSupervisor.add_scheduled(worker, arguments, scheduled_at, opts) + end + + + @doc """ + Schedules a Job to be processed by Que after a specific number of seconds. + + This is a convenience function that schedules a job to run after the specified + delay in seconds. + + ## Example + + ``` + # Schedule a job to run in 30 seconds + Que.add_in(App.Workers.SendReminder, user.email, 30) + #=> {:ok, %Que.Job{}} + + # Schedule a job to run in 1 hour (3600 seconds) + Que.add_in(App.Workers.CleanupExpiredTokens, [], 3600) + #=> {:ok, %Que.Job{}} + ``` + """ + @spec add_in(worker :: module, arguments :: term, delay_seconds :: integer, opts :: Keyword.t()) :: {:ok, %Que.Job{}} + def add_in(worker, arguments, delay_seconds, opts \\ []) do + scheduled_at = NaiveDateTime.add(NaiveDateTime.utc_now(), delay_seconds, :second) + add_scheduled(worker, arguments, scheduled_at, opts) + end + + + @doc """ + Cancels a Job by its ID. + + Only jobs with status `:scheduled` or `:queued` can be cancelled. + Returns `:ok` if the job was successfully cancelled, or an error tuple otherwise. + + ## Example + + ``` + {:ok, job} = Que.add_scheduled(App.Workers.SendReminder, user.email, scheduled_time) + + # Later, cancel the job + Que.cancel(job.id) + #=> :ok ``` """ - @spec add(worker :: module, arguments :: term) :: {:ok, %Que.Job{}} - defdelegate add(worker, arguments), to: Que.ServerSupervisor + @spec cancel(job_id :: integer) :: :ok | {:error, :not_found} | {:error, :not_cancellable} + def cancel(job_id) do + case Que.Persistence.find(job_id) do + nil -> + {:error, :not_found} + job -> + if Que.Job.cancellable?(job) do + job + |> Que.Job.cancel() + |> Que.Persistence.update() + :ok + else + {:error, :not_cancellable} + end + end + end + + + @doc """ + Cancels all cancellable jobs for a specific worker. + + Returns the number of jobs that were cancelled. + + ## Example + + ``` + cancelled_count = Que.cancel_all(App.Workers.SendReminder) + #=> 5 + ``` + """ + @spec cancel_all(worker :: module) :: integer + def cancel_all(worker) do + worker + |> Que.Persistence.cancellable() + |> Enum.map(&Que.Job.cancel/1) + |> Enum.map(&Que.Persistence.update/1) + |> length() + end + + + @doc """ + Adds a high priority job to the queue. + + This is a convenience function that sets priority to :high. + + ## Example + + ``` + Que.add_high_priority(App.Workers.UrgentTask, critical_data) + #=> {:ok, %Que.Job{}} + ``` + """ + @spec add_high_priority(worker :: module, arguments :: term, opts :: Keyword.t()) :: {:ok, %Que.Job{}} + def add_high_priority(worker, arguments, opts \\ []) do + add(worker, arguments, Keyword.put(opts, :priority, :high)) + end + + + @doc """ + Adds an urgent priority job to the queue. + + This is a convenience function that sets priority to :urgent. + + ## Example + + ``` + Que.add_urgent(App.Workers.CriticalTask, emergency_data) + #=> {:ok, %Que.Job{}} + ``` + """ + @spec add_urgent(worker :: module, arguments :: term, opts :: Keyword.t()) :: {:ok, %Que.Job{}} + def add_urgent(worker, arguments, opts \\ []) do + add(worker, arguments, Keyword.put(opts, :priority, :urgent)) + end end diff --git a/lib/que/job.ex b/lib/que/job.ex index f99a787..542c351 100644 --- a/lib/que/job.ex +++ b/lib/que/job.ex @@ -1,7 +1,7 @@ defmodule Que.Job do require Logger - defstruct [:id, :arguments, :worker, :status, :ref, :pid, :created_at, :updated_at] + defstruct [:id, :arguments, :worker, :status, :ref, :pid, :created_at, :updated_at, :scheduled_at, :retry_count, :max_retries, :last_error, :timeout, :timeout_ref, :priority] ## Note: Update Que.Persistence.Mnesia after changing these values @moduledoc """ @@ -12,25 +12,97 @@ defmodule Que.Job do unless you absolutely know what you're doing. """ - @statuses [:queued, :started, :failed, :completed] + @statuses [:scheduled, :queued, :started, :failed, :completed, :cancelled, :retrying, :timeout] @typedoc "One of the atoms in `#{inspect(@statuses)}`" @type status :: atom + # Priority levels - higher numbers = higher priority + @priorities %{low: 1, normal: 5, high: 10, urgent: 20} + @priority_levels Map.keys(@priorities) + @typedoc "One of the atoms in `#{inspect(@priority_levels)}`" + @type priority :: atom + @typedoc "A `Que.Job` struct" @type t :: %Que.Job{} @doc """ Returns a new Job struct with defaults """ - @spec new(worker :: Que.Worker.t(), args :: list) :: Que.Job.t() - def new(worker, args \\ nil) do + @spec new(worker :: Que.Worker.t(), args :: list, opts :: Keyword.t()) :: Que.Job.t() + def new(worker, args \\ nil, opts \\ []) do + max_retries = Keyword.get(opts, :max_retries, get_default_max_retries(worker)) + timeout = Keyword.get(opts, :timeout, get_default_timeout(worker)) + priority = get_priority_value(Keyword.get(opts, :priority, :normal)) + %Que.Job{ status: :queued, worker: worker, - arguments: args + arguments: args, + retry_count: 0, + max_retries: max_retries, + timeout: timeout, + priority: priority + } + end + + @doc """ + Returns a new scheduled Job struct with specified execution time + """ + @spec new_scheduled(worker :: Que.Worker.t(), args :: list, scheduled_at :: NaiveDateTime.t(), opts :: Keyword.t()) :: Que.Job.t() + def new_scheduled(worker, args, scheduled_at, opts \\ []) do + max_retries = Keyword.get(opts, :max_retries, get_default_max_retries(worker)) + timeout = Keyword.get(opts, :timeout, get_default_timeout(worker)) + priority = get_priority_value(Keyword.get(opts, :priority, :normal)) + + %Que.Job{ + status: :scheduled, + worker: worker, + arguments: args, + scheduled_at: scheduled_at, + retry_count: 0, + max_retries: max_retries, + timeout: timeout, + priority: priority } end + # Get default max retries for a worker, defaults to 3 + defp get_default_max_retries(worker) do + if function_exported?(worker, :max_retries, 0) do + worker.max_retries() + else + 3 + end + end + + # Get default timeout for a worker, defaults to 60 seconds (60000ms) + defp get_default_timeout(worker) do + if function_exported?(worker, :timeout, 0) do + worker.timeout() + else + 60_000 + end + end + + # Convert priority atom to numeric value + defp get_priority_value(priority) when priority in @priority_levels do + @priorities[priority] + end + defp get_priority_value(priority) when is_integer(priority) do + priority + end + defp get_priority_value(_), do: @priorities[:normal] + + @doc """ + Checks if the job is ready to be executed (for scheduled jobs) + """ + @spec ready?(job :: Que.Job.t()) :: boolean + def ready?(%Que.Job{status: :scheduled, scheduled_at: scheduled_at}) do + NaiveDateTime.compare(NaiveDateTime.utc_now(), scheduled_at) != :lt + end + def ready?(%Que.Job{status: :queued}), do: true + def ready?(_), do: false + @doc """ Update the Job status to one of the predefined values in `@statuses` """ @@ -39,10 +111,83 @@ defmodule Que.Job do %{job | status: status} end + @doc """ + Promotes a scheduled job to queued status when it's ready to run + """ + @spec promote_if_ready(job :: Que.Job.t()) :: Que.Job.t() + def promote_if_ready(%Que.Job{status: :scheduled} = job) do + if ready?(job) do + %{job | status: :queued} + else + job + end + end + def promote_if_ready(job), do: job + + @doc """ + Checks if the job can be cancelled (only scheduled and queued jobs can be cancelled) + """ + @spec cancellable?(job :: Que.Job.t()) :: boolean + def cancellable?(%Que.Job{status: status}) when status in [:scheduled, :queued], do: true + def cancellable?(_), do: false + + @doc """ + Cancels a job by setting its status to :cancelled + """ + @spec cancel(job :: Que.Job.t()) :: Que.Job.t() + def cancel(%Que.Job{} = job) do + if cancellable?(job) do + %{job | status: :cancelled} + else + job + end + end + + @doc """ + Checks if the job can be retried (has retries remaining) + """ + @spec retryable?(job :: Que.Job.t()) :: boolean + def retryable?(%Que.Job{retry_count: retry_count, max_retries: max_retries}) + when is_integer(retry_count) and is_integer(max_retries) do + retry_count < max_retries + end + def retryable?(_), do: false + + @doc """ + Calculates the delay before the next retry using exponential backoff + Base delay is 2^retry_count seconds with jitter + """ + @spec retry_delay(job :: Que.Job.t()) :: integer + def retry_delay(%Que.Job{retry_count: retry_count}) do + base_delay = :math.pow(2, retry_count) |> round() + jitter = :rand.uniform(1000) # 0-1000ms jitter + (base_delay * 1000) + jitter # Convert to milliseconds + end + + @doc """ + Schedules a job for retry with exponential backoff + """ + @spec schedule_retry(job :: Que.Job.t(), error :: term) :: Que.Job.t() + def schedule_retry(%Que.Job{} = job, error) do + if retryable?(job) do + delay_ms = retry_delay(job) + retry_at = NaiveDateTime.add(NaiveDateTime.utc_now(), delay_ms, :millisecond) + + %{job | + status: :scheduled, + scheduled_at: retry_at, + retry_count: job.retry_count + 1, + last_error: inspect(error) + } + else + %{job | status: :failed, last_error: inspect(error)} + end + end + @doc """ Updates the Job struct with new status and spawns & monitors a new Task under the TaskSupervisor which executes the perform method with supplied - arguments + arguments. Also sets up a timeout timer if configured. """ @spec perform(job :: Que.Job.t()) :: Que.Job.t() def perform(job) do @@ -55,7 +200,43 @@ defmodule Que.Job do job.worker.perform(job.arguments) end) - %{job | status: :started, pid: pid, ref: Process.monitor(pid)} + process_ref = Process.monitor(pid) + + # Set timeout timer if configured + timeout_ref = if job.timeout do + Process.send_after(self(), {:job_timeout, job.id}, job.timeout) + else + nil + end + + %{job | status: :started, pid: pid, ref: process_ref, timeout_ref: timeout_ref} + end + + @doc """ + Handles Job timeout by killing the job process and marking it as timed out + """ + @spec handle_timeout(job :: Que.Job.t()) :: Que.Job.t() + def handle_timeout(job) do + Que.Helpers.log("Timeout #{job} after #{job.timeout}ms") + + # Kill the job process if it's still running + if job.pid && Process.alive?(job.pid) do + Process.exit(job.pid, :timeout) + end + + # Cancel the timeout timer if it exists + if job.timeout_ref do + Process.cancel_timer(job.timeout_ref) + end + + # Run teardown callback + Que.Helpers.do_task(fn -> + Logger.metadata(job_id: job.id) + job.worker.on_failure(job.arguments, :timeout) + job.worker.on_teardown(job) + end) + + %{job | status: :timeout, pid: nil, ref: nil, timeout_ref: nil} end @doc """ @@ -66,30 +247,46 @@ defmodule Que.Job do def handle_success(job) do Que.Helpers.log("Completed #{job}") + # Cancel timeout timer if it exists + if job.timeout_ref do + Process.cancel_timer(job.timeout_ref) + end + Que.Helpers.do_task(fn -> Logger.metadata(job_id: job.id) job.worker.on_success(job.arguments) job.worker.on_teardown(job) end) - %{job | status: :completed, pid: nil, ref: nil} + %{job | status: :completed, pid: nil, ref: nil, timeout_ref: nil} end @doc """ - Handles Job Failure, Calls appropriate worker method and updates the job - status to :failed + Handles Job Failure, Calls appropriate worker method and schedules retry if possible, + otherwise updates the job status to :failed """ @spec handle_failure(job :: Que.Job.t(), error :: term) :: Que.Job.t() def handle_failure(job, error) do - Que.Helpers.log("Failed #{job}") + # Cancel timeout timer if it exists + if job.timeout_ref do + Process.cancel_timer(job.timeout_ref) + end - Que.Helpers.do_task(fn -> - Logger.metadata(job_id: job.id) - job.worker.on_failure(job.arguments, error) - job.worker.on_teardown(job) - end) + updated_job = schedule_retry(job, error) + + if updated_job.status == :scheduled do + Que.Helpers.log("Failed #{job}, retrying in #{retry_delay(job)}ms (attempt #{job.retry_count + 1}/#{job.max_retries})") + else + Que.Helpers.log("Failed #{job}, no more retries available") + + Que.Helpers.do_task(fn -> + Logger.metadata(job_id: job.id) + job.worker.on_failure(job.arguments, error) + job.worker.on_teardown(job) + end) + end - %{job | status: :failed, pid: nil, ref: nil} + %{updated_job | pid: nil, ref: nil, timeout_ref: nil} end end diff --git a/lib/que/persistence/mnesia/db.ex b/lib/que/persistence/mnesia/db.ex index 60bce48..bfd9ded 100644 --- a/lib/que/persistence/mnesia/db.ex +++ b/lib/que/persistence/mnesia/db.ex @@ -14,8 +14,8 @@ defmodule Que.Persistence.Mnesia.DB do defmodule Jobs do use Memento.Table, - attributes: [:id, :arguments, :worker, :status, :ref, :pid, :created_at, :updated_at], - index: [:worker, :status], + attributes: [:id, :arguments, :worker, :status, :ref, :pid, :created_at, :updated_at, :scheduled_at, :retry_count, :max_retries, :last_error, :timeout, :timeout_ref, :priority], + index: [:worker, :status, :priority], type: :ordered_set, autoincrement: true @@ -71,7 +71,10 @@ defmodule Que.Persistence.Mnesia.DB do run_query( {:or, {:==, :status, :queued}, - {:==, :status, :started} + {:or, + {:==, :status, :started}, + {:==, :status, :scheduled} + } } ) end @@ -85,7 +88,37 @@ defmodule Que.Persistence.Mnesia.DB do {:==, :worker, name}, {:or, {:==, :status, :queued}, - {:==, :status, :started} + {:or, + {:==, :status, :started}, + {:==, :status, :scheduled} + } + } + } + ) + end + + + @doc "Find ready scheduled Jobs" + def ready_scheduled_jobs do + current_time = NaiveDateTime.utc_now() + run_query( + {:and, + {:==, :status, :scheduled}, + {:=<, :scheduled_at, current_time} + } + ) + end + + + @doc "Find ready scheduled Jobs for worker" + def ready_scheduled_jobs(name) do + current_time = NaiveDateTime.utc_now() + run_query( + {:and, + {:==, :worker, name}, + {:and, + {:==, :status, :scheduled}, + {:=<, :scheduled_at, current_time} } } ) @@ -113,6 +146,88 @@ defmodule Que.Persistence.Mnesia.DB do end + @doc "Find Cancelled Jobs" + def cancelled_jobs do + run_query( + {:==, :status, :cancelled} + ) + end + + + @doc "Find Cancelled Jobs for worker" + def cancelled_jobs(name) do + run_query( + {:and, + {:==, :worker, name}, + {:==, :status, :cancelled} + } + ) + end + + + @doc "Find Cancellable Jobs (scheduled or queued)" + def cancellable_jobs do + run_query( + {:or, + {:==, :status, :scheduled}, + {:==, :status, :queued} + } + ) + end + + + @doc "Find Cancellable Jobs for worker" + def cancellable_jobs(name) do + run_query( + {:and, + {:==, :worker, name}, + {:or, + {:==, :status, :scheduled}, + {:==, :status, :queued} + } + } + ) + end + + + @doc "Find Retrying Jobs" + def retrying_jobs do + run_query( + {:==, :status, :retrying} + ) + end + + + @doc "Find Retrying Jobs for worker" + def retrying_jobs(name) do + run_query( + {:and, + {:==, :worker, name}, + {:==, :status, :retrying} + } + ) + end + + + @doc "Find Timeout Jobs" + def timeout_jobs do + run_query( + {:==, :status, :timeout} + ) + end + + + @doc "Find Timeout Jobs for worker" + def timeout_jobs(name) do + run_query( + {:and, + {:==, :worker, name}, + {:==, :status, :timeout} + } + ) + end + + @doc "Finds a Job in the DB" def find_job(job) do diff --git a/lib/que/persistence/mnesia/mnesia.ex b/lib/que/persistence/mnesia/mnesia.ex index 51d153f..963f65f 100644 --- a/lib/que/persistence/mnesia/mnesia.ex +++ b/lib/que/persistence/mnesia/mnesia.ex @@ -160,6 +160,36 @@ defmodule Que.Persistence.Mnesia do @doc false defdelegate failed(worker), to: @store, as: :failed_jobs + @doc false + defdelegate ready_scheduled, to: @store, as: :ready_scheduled_jobs + + @doc false + defdelegate ready_scheduled(worker), to: @store, as: :ready_scheduled_jobs + + @doc false + defdelegate cancelled, to: @store, as: :cancelled_jobs + + @doc false + defdelegate cancelled(worker), to: @store, as: :cancelled_jobs + + @doc false + defdelegate cancellable, to: @store, as: :cancellable_jobs + + @doc false + defdelegate cancellable(worker), to: @store, as: :cancellable_jobs + + @doc false + defdelegate retrying, to: @store, as: :retrying_jobs + + @doc false + defdelegate retrying(worker), to: @store, as: :retrying_jobs + + @doc false + defdelegate timeout, to: @store, as: :timeout_jobs + + @doc false + defdelegate timeout(worker), to: @store, as: :timeout_jobs + @doc false defdelegate find(job), to: @store, as: :find_job diff --git a/lib/que/persistence/persistence.ex b/lib/que/persistence/persistence.ex index 253205c..ca72d75 100644 --- a/lib/que/persistence/persistence.ex +++ b/lib/que/persistence/persistence.ex @@ -136,6 +136,78 @@ defmodule Que.Persistence do defdelegate failed(worker), to: @adapter + @doc """ + Returns scheduled `Que.Job`s that are ready to be executed. + """ + @callback ready_scheduled :: list(Que.Job.t) + defdelegate ready_scheduled, to: @adapter + + + @doc """ + Returns scheduled `Que.Job`s for the given worker that are ready to be executed. + """ + @callback ready_scheduled(worker :: Que.Worker.t) :: list(Que.Job.t) + defdelegate ready_scheduled(worker), to: @adapter + + + @doc """ + Returns cancelled `Que.Job`s from the database. + """ + @callback cancelled :: list(Que.Job.t) + defdelegate cancelled, to: @adapter + + + @doc """ + Returns cancelled `Que.Job`s for the given worker. + """ + @callback cancelled(worker :: Que.Worker.t) :: list(Que.Job.t) + defdelegate cancelled(worker), to: @adapter + + + @doc """ + Returns cancellable `Que.Job`s from the database. + + This includes all Jobs whose status is either `:scheduled` or `:queued`. + """ + @callback cancellable :: list(Que.Job.t) + defdelegate cancellable, to: @adapter + + + @doc """ + Returns cancellable `Que.Job`s for the given worker. + """ + @callback cancellable(worker :: Que.Worker.t) :: list(Que.Job.t) + defdelegate cancellable(worker), to: @adapter + + + @doc """ + Returns retrying `Que.Job`s from the database. + """ + @callback retrying :: list(Que.Job.t) + defdelegate retrying, to: @adapter + + + @doc """ + Returns retrying `Que.Job`s for the given worker. + """ + @callback retrying(worker :: Que.Worker.t) :: list(Que.Job.t) + defdelegate retrying(worker), to: @adapter + + + @doc """ + Returns timeout `Que.Job`s from the database. + """ + @callback timeout :: list(Que.Job.t) + defdelegate timeout, to: @adapter + + + @doc """ + Returns timeout `Que.Job`s for the given worker. + """ + @callback timeout(worker :: Que.Worker.t) :: list(Que.Job.t) + defdelegate timeout(worker), to: @adapter + + @doc """ diff --git a/lib/que/queue.ex b/lib/que/queue.ex index 869dba1..6498aee 100644 --- a/lib/que/queue.ex +++ b/lib/que/queue.ex @@ -43,6 +43,9 @@ defmodule Que.Queue do def process(%Que.Queue{running: running, worker: worker} = q) do Que.Worker.validate!(worker) + # First, promote any ready scheduled jobs to queued + q = promote_ready_scheduled_jobs(q) + if (length(running) < worker.concurrency) do case fetch(q) do {q, nil} -> @@ -66,16 +69,21 @@ defmodule Que.Queue do @doc """ - Adds one or more Jobs to the `queued` list + Adds one or more Jobs to the `queued` list, maintaining priority order """ @spec put(queue :: Que.Queue.t, jobs :: Que.Job.t | list(Que.Job.t)) :: Que.Queue.t def put(%Que.Queue{queued: queued} = q, jobs) when is_list(jobs) do - jobs = :queue.from_list(jobs) - %{ q | queued: :queue.join(queued, jobs) } + existing_jobs = :queue.to_list(queued) + all_jobs = existing_jobs ++ jobs + sorted_jobs = Enum.sort_by(all_jobs, & &1.priority, :desc) + %{ q | queued: :queue.from_list(sorted_jobs) } end def put(%Que.Queue{queued: queued} = q, job) do - %{ q | queued: :queue.in(job, queued) } + existing_jobs = :queue.to_list(queued) + all_jobs = existing_jobs ++ [job] + sorted_jobs = Enum.sort_by(all_jobs, & &1.priority, :desc) + %{ q | queued: :queue.from_list(sorted_jobs) } end @@ -83,12 +91,19 @@ defmodule Que.Queue do @doc """ Fetches the next Job in queue and returns a queue and Job tuple + Skips cancelled jobs automatically """ @spec fetch(queue :: Que.Queue.t) :: { Que.Queue.t, Que.Job.t | nil } def fetch(%Que.Queue{queued: queue} = q) do case :queue.out(queue) do - {{:value, job}, rest} -> { %{ q | queued: rest }, job } - {:empty, _} -> { q, nil } + {{:value, %Que.Job{status: :cancelled} = job}, rest} -> + # Skip cancelled jobs and update the database + Que.Persistence.update(job) + fetch(%{ q | queued: rest }) + {{:value, job}, rest} -> + { %{ q | queued: rest }, job } + {:empty, _} -> + { q, nil } end end @@ -182,4 +197,19 @@ defmodule Que.Queue do running end + + @doc """ + Promotes ready scheduled jobs to queued status and adds them to the queue + """ + @spec promote_ready_scheduled_jobs(queue :: Que.Queue.t) :: Que.Queue.t + defp promote_ready_scheduled_jobs(%Que.Queue{worker: worker} = q) do + ready_jobs = + worker + |> Que.Persistence.ready_scheduled() + |> Enum.map(&Que.Job.promote_if_ready/1) + |> Enum.map(&Que.Persistence.update/1) + + put(q, ready_jobs) + end + end diff --git a/lib/que/server.ex b/lib/que/server.ex index 3e01d6e..ec07a2f 100644 --- a/lib/que/server.ex +++ b/lib/que/server.ex @@ -49,8 +49,16 @@ defmodule Que.Server do # arguments. Use Que.add instead of directly calling this @doc false - def add(worker, arg) do - GenServer.call(via_worker(worker), {:add_job, worker, arg}) + def add(worker, arg, opts \\ []) do + GenServer.call(via_worker(worker), {:add_job, worker, arg, opts}) + end + + # Validates worker and creates a new scheduled job with the passed + # arguments. Use Que.add_scheduled instead of directly calling this + + @doc false + def add_scheduled(worker, arg, scheduled_at, opts \\ []) do + GenServer.call(via_worker(worker), {:add_scheduled_job, worker, arg, scheduled_at, opts}) end @@ -77,12 +85,12 @@ defmodule Que.Server do # Pushes a new Job to the queue and processes it @doc false - def handle_call({:add_job, worker, args}, _from, queue) do + def handle_call({:add_job, worker, args, opts}, _from, queue) do Que.Helpers.log("Queued new Job for #{ExUtils.Module.name(worker)}") job = worker - |> Que.Job.new(args) + |> Que.Job.new(args, opts) |> Que.Persistence.insert queue = @@ -93,6 +101,37 @@ defmodule Que.Server do {:reply, {:ok, job}, queue} end + # Backward compatibility - handle old API without opts + @doc false + def handle_call({:add_job, worker, args}, from, queue) do + handle_call({:add_job, worker, args, []}, from, queue) + end + + # Pushes a new scheduled Job to the queue + + @doc false + def handle_call({:add_scheduled_job, worker, args, scheduled_at, opts}, _from, queue) do + Que.Helpers.log("Scheduled new Job for #{ExUtils.Module.name(worker)} at #{scheduled_at}") + + job = + worker + |> Que.Job.new_scheduled(args, scheduled_at, opts) + |> Que.Persistence.insert + + queue = + queue + |> Que.Queue.put(job) + |> Que.Queue.process + + {:reply, {:ok, job}, queue} + end + + # Backward compatibility - handle old API without opts + @doc false + def handle_call({:add_scheduled_job, worker, args, scheduled_at}, from, queue) do + handle_call({:add_scheduled_job, worker, args, scheduled_at, []}, from, queue) + end + @@ -136,6 +175,30 @@ defmodule Que.Server do {:noreply, queue} end + # Job timeout - Kill job and handle timeout + + @doc false + def handle_info({:job_timeout, job_id}, queue) do + job = Que.Queue.find(queue, :id, job_id) + + if job && job.status == :started do + job = + job + |> Que.Job.handle_timeout() + |> Que.Persistence.update + + queue = + queue + |> Que.Queue.remove(job) + |> Que.Queue.process + + {:noreply, queue} + else + # Job already completed or not found, ignore timeout + {:noreply, queue} + end + end + diff --git a/lib/que/server_supervisor.ex b/lib/que/server_supervisor.ex index 2e8a29d..b9d699a 100644 --- a/lib/que/server_supervisor.ex +++ b/lib/que/server_supervisor.ex @@ -35,12 +35,22 @@ defmodule Que.ServerSupervisor do # If the server for the worker is running, add job to it. # If not, spawn a new server first and then add it. @doc false - def add(worker, args) do + def add(worker, args, opts \\ []) do unless Que.Server.exists?(worker) do start_server(worker) end - Que.Server.add(worker, args) + Que.Server.add(worker, args, opts) + end + + # Similar to add/2 but for scheduled jobs + @doc false + def add_scheduled(worker, args, scheduled_at, opts \\ []) do + unless Que.Server.exists?(worker) do + start_server(worker) + end + + Que.Server.add_scheduled(worker, args, scheduled_at, opts) end @doc false