Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions components/livekit/core/engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
// MARK: - Constants
static const char* TAG = "livekit_engine";

/// Maximum time `engine_destroy` waits for the engine task to exit before
/// forcibly deleting it.
#define ENGINE_TASK_JOIN_TIMEOUT_MS 5000

// MARK: - Type definitions

/// Engine state machine state.
Expand All @@ -55,6 +59,7 @@ typedef enum {
EV_MAX_RETRIES_REACHED, /// Maximum number of retry attempts reached.
_EV_STATE_ENTER, /// State enter hook (internal).
_EV_STATE_EXIT, /// State exit hook (internal).
_EV_STOP, /// Wakes the engine task so it can observe shutdown (internal).
} engine_event_type_t;

/// An event processed by the engine state machine.
Expand Down Expand Up @@ -111,6 +116,7 @@ typedef struct {
session_state_t session;

TaskHandle_t task_handle;
SemaphoreHandle_t task_done_sem;
QueueHandle_t event_queue;
TimerHandle_t timer;
bool is_running;
Expand Down Expand Up @@ -1063,6 +1069,11 @@ static void engine_task(void *arg)

// Discard any remaining events in the queue before exiting.
flush_event_queue(eng);

// Signal that the task has finished so `engine_destroy` can join instead of
// deleting the task handle itself. Deleting from both sides races: the task
// self-deletes here while `engine_destroy` may still hold a now-stale handle.
xSemaphoreGive(eng->task_done_sem);
vTaskDelete(NULL);
}

Expand Down Expand Up @@ -1125,6 +1136,12 @@ engine_handle_t engine_init(const engine_options_t *options)
goto _init_failed;
}

// Created before the task so the task can always signal completion on exit.
eng->task_done_sem = xSemaphoreCreateBinary();
if (eng->task_done_sem == NULL) {
goto _init_failed;
}

if (xTaskCreate(
engine_task,
"engine_task",
Expand Down Expand Up @@ -1176,11 +1193,23 @@ engine_err_t engine_destroy(engine_handle_t handle)
engine_t *eng = (engine_t *)handle;
eng->is_running = false;
if (eng->task_handle != NULL) {
// TODO: Wait for disconnected state or timeout
vTaskDelay(pdMS_TO_TICKS(100));
vTaskDelete(eng->task_handle);
// The task may be blocked waiting for an event, so enqueue a stop event
// to wake it; it will then observe `is_running == false` and exit its loop.
event_enqueue(eng, &(engine_event_t){ .type = _EV_STOP }, true);

// Join the task rather than deleting it here: the task self-deletes via
// vTaskDelete(NULL), so deleting `task_handle` could act on a stale handle
// and crash. Only force deletion if the task fails to exit in time.
if (xSemaphoreTake(eng->task_done_sem, pdMS_TO_TICKS(ENGINE_TASK_JOIN_TIMEOUT_MS)) != pdTRUE) {
ESP_LOGW(TAG, "Engine task did not exit in time; forcing deletion");
vTaskDelete(eng->task_handle);
}
eng->task_handle = NULL;
}
if (eng->task_done_sem != NULL) {
vSemaphoreDelete(eng->task_done_sem);
eng->task_done_sem = NULL;
}
if (eng->timer != NULL) {
xTimerDelete(eng->timer, portMAX_DELAY);
eng->timer = NULL;
Expand Down
Loading