Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,11 @@ add_ccf_static_library(
# CCF task system library
add_ccf_static_library(
ccf_tasks
SRCS ${CCF_DIR}/src/tasks/task_system.cpp ${CCF_DIR}/src/tasks/job_board.cpp
SRCS ${CCF_DIR}/src/tasks/task_system.cpp
${CCF_DIR}/src/tasks/job_board.cpp
${CCF_DIR}/src/tasks/ordered_tasks.cpp
${CCF_DIR}/src/tasks/fan_in_tasks.cpp
${CCF_DIR}/src/tasks/thread_manager.cpp
)

# Common test args for Python scripts starting up CCF networks
Expand Down Expand Up @@ -572,6 +574,7 @@ if(BUILD_TESTS)
${CMAKE_CURRENT_SOURCE_DIR}/src/tasks/test/ordered_tasks.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/tasks/test/delayed_tasks.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/tasks/test/fan_in_tasks.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/tasks/test/tasks_api.cpp
)
target_link_libraries(task_system_test PRIVATE ccf_tasks)

Expand Down
6 changes: 3 additions & 3 deletions src/tasks/basic_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ namespace ccf::tasks
using Fn = std::function<void()>;

Fn fn;
const std::string name;
const std::string name = "BasicTask";

BasicTask(const Fn& _fn, const std::string& s = "[Anon]") : fn(_fn), name(s)
BasicTask(const Fn& fn_, const std::string& s = "[Anon]") : fn(fn_), name(s)
{}

void do_task_implementation() override
{
fn();
}

std::string_view get_name() const override
const std::string& get_name() const override
{
return name;
}
Expand Down
18 changes: 8 additions & 10 deletions src/tasks/fan_in_tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ namespace ccf::tasks
{
struct FanInTasks::PImpl
{
std::string name;
IJobBoard& job_board;
JobBoard& job_board;

// Synchronise access to pending_tasks and next_expected_task_index
std::mutex pending_tasks_mutex;
Expand Down Expand Up @@ -71,16 +70,16 @@ namespace ccf::tasks

FanInTasks::FanInTasks(
[[maybe_unused]] FanInTasks::Private force_private_constructor,
IJobBoard& job_board_,
const std::string& name_) :
pimpl(std::make_unique<FanInTasks::PImpl>(name_, job_board_))
JobBoard& job_board_) :
pimpl(std::make_unique<FanInTasks::PImpl>(job_board_))
{}

FanInTasks::~FanInTasks() = default;

std::string_view FanInTasks::get_name() const
const std::string& FanInTasks::get_name() const
{
return pimpl->name;
static const std::string name = "FanInTasks";
return name;
}

void FanInTasks::add_task(size_t task_index, Task task)
Expand Down Expand Up @@ -124,9 +123,8 @@ namespace ccf::tasks
}
}

std::shared_ptr<FanInTasks> FanInTasks::create(
IJobBoard& job_board_, const std::string& name_)
std::shared_ptr<FanInTasks> FanInTasks::create(JobBoard& job_board_)
{
return std::make_shared<FanInTasks>(Private{}, job_board_, name_);
return std::make_shared<FanInTasks>(Private{}, job_board_);
}
}
9 changes: 4 additions & 5 deletions src/tasks/fan_in_tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the Apache 2.0 License.
#pragma once

#include "tasks/job_board_interface.h"
#include "tasks/job_board.h"
#include "tasks/task.h"

#include <memory>
Expand All @@ -27,13 +27,12 @@ namespace ccf::tasks
};

public:
FanInTasks(Private, IJobBoard& job_board_, const std::string& name_);
FanInTasks(Private, JobBoard& job_board_);
~FanInTasks();

static std::shared_ptr<FanInTasks> create(
IJobBoard& job_board_, const std::string& name_ = "[FanIn]");
static std::shared_ptr<FanInTasks> create(JobBoard& job_board_);

std::string_view get_name() const override;
const std::string& get_name() const override;

void add_task(size_t task_index, Task task);
};
Expand Down
221 changes: 193 additions & 28 deletions src/tasks/job_board.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,217 @@
// Licensed under the Apache 2.0 License.
#include "tasks/job_board.h"

#include <chrono>
#include <map>

namespace ccf::tasks
{
void JobBoard::add_task(Task&& task)
struct Consumer
{
// Temporary structure
// NOLINTBEGIN(cppcoreguidelines-avoid-const-or-ref-data-members)
Task& task;
std::condition_variable& cv;
// NOLINTEND(cppcoreguidelines-avoid-const-or-ref-data-members)
};

struct Delayed
{
struct DelayedTask
{
std::lock_guard<std::mutex> lock(mutex);
queue.emplace(std::move(task));
}
work_beacon.notify_work_available();
}
Task task;
std::optional<std::chrono::milliseconds> repeat = std::nullopt;
};

Task JobBoard::get_task()
using DelayedTasks = std::vector<DelayedTask>;

using DelayedTasksByTime =
std::map<std::chrono::milliseconds, DelayedTasks>;

std::atomic<std::chrono::milliseconds> total_elapsed =
std::chrono::milliseconds(0);

std::mutex tasks_mutex;
DelayedTasksByTime tasks;
};

struct JobBoard::PImpl
{
std::lock_guard<std::mutex> lock(mutex);
if (queue.empty())
std::mutex mutex;
std::queue<Task> pending_tasks;
std::vector<Consumer*> waiting_consumers;

Delayed delayed;

void add_task(Task&& task)
{
std::unique_lock<std::mutex> lock(mutex);

for (Consumer* consumer : waiting_consumers)
{
// NB: Although waiting_consumers is modified under lock, it's possible
// that another call to add_task arrives the notified thread wakes up
// and removes itself from this collection. In this case we must avoid
// overwriting a previously-assigned task.
if (consumer->task == nullptr)
{
consumer->task = std::move(task);
consumer->cv.notify_one();
return;
}
}

// There are no waiting_consumers currently, or none waiting for a task
pending_tasks.emplace(std::move(task));
}

Task get_task()
{
return nullptr;
using namespace std::chrono_literals;
return wait_for_task(0ms);
}

Task task = queue.front();
queue.pop();
return task;
Task wait_for_task(const std::chrono::milliseconds& timeout)
{
Task to_return = nullptr;

{
std::unique_lock<std::mutex> lock(mutex);

if (pending_tasks.empty())
{
std::condition_variable cv;
auto consumer = std::make_unique<Consumer>(to_return, cv);
waiting_consumers.push_back(consumer.get());

cv.wait_for(lock, timeout);

auto it = std::find(
waiting_consumers.begin(), waiting_consumers.end(), consumer.get());
waiting_consumers.erase(it);
}
else
{
to_return = pending_tasks.front();
pending_tasks.pop();
}
}
return to_return;
}

void add_timed_task(
Task task,
std::chrono::milliseconds initial_delay,
std::optional<std::chrono::milliseconds> periodic_delay)
{
std::lock_guard<std::mutex> lock(delayed.tasks_mutex);

const auto trigger_time = delayed.total_elapsed.load() + initial_delay;
delayed.tasks[trigger_time].emplace_back(task, periodic_delay);
}

void tick(std::chrono::milliseconds elapsed)
{
elapsed += delayed.total_elapsed.load();

{
std::lock_guard<std::mutex> lock(delayed.tasks_mutex);
auto end_it = delayed.tasks.upper_bound(elapsed);

Delayed::DelayedTasksByTime repeats;

for (auto it = delayed.tasks.begin(); it != end_it; ++it)
{
Delayed::DelayedTasks& ready = it->second;

for (Delayed::DelayedTask& delayed_task : ready)
{
// Don't schedule (or repeat) cancelled tasks
if (delayed_task.task->is_cancelled())
{
continue;
}

Task task_copy(delayed_task.task);
add_task(std::move(task_copy));
if (delayed_task.repeat.has_value())
{
repeats[elapsed + delayed_task.repeat.value()].emplace_back(
delayed_task);
}
}
}

delayed.tasks.erase(delayed.tasks.begin(), end_it);

for (auto&& [repeat_time, repeated_tasks] : repeats)
{
Delayed::DelayedTasks& delayed_tasks_at_time =
delayed.tasks[repeat_time];
delayed_tasks_at_time.insert(
delayed_tasks_at_time.end(),
repeated_tasks.begin(),
repeated_tasks.end());
}
}

delayed.total_elapsed.store(elapsed);
}
};

void JobBoard::add_timed_task(
Task task,
std::chrono::milliseconds initial_delay,
std::optional<std::chrono::milliseconds> periodic_delay)
{
pimpl->add_timed_task(std::move(task), initial_delay, periodic_delay);
}

bool JobBoard::empty()
JobBoard::JobBoard() : pimpl(std::make_unique<PImpl>()) {}

JobBoard::~JobBoard() = default;

void JobBoard::add_task(Task task)
{
std::lock_guard<std::mutex> lock(mutex);
return queue.empty();
pimpl->add_task(std::move(task));
}

Task JobBoard::wait_for_task(const std::chrono::milliseconds& timeout)
Task JobBoard::get_task()
{
using TClock = std::chrono::system_clock;
return pimpl->get_task();
}

const auto start = TClock::now();
const auto until = start + timeout;
Task JobBoard::wait_for_task(const std::chrono::milliseconds& timeout)
{
return pimpl->wait_for_task(timeout);
}

while (true)
JobBoard::Summary JobBoard::get_summary()
{
Summary summary{};
{
auto task = get_task();
if (task != nullptr || TClock::now() >= until)
{
return task;
}

work_beacon.wait_for_work_with_timeout(timeout);
std::lock_guard<std::mutex> lock(pimpl->mutex);
summary.pending_tasks = pimpl->pending_tasks.size();
summary.idle_workers = pimpl->waiting_consumers.size();
}
return summary;
}

void JobBoard::add_delayed_task(Task task, std::chrono::milliseconds delay)
{
add_timed_task(task, delay, std::nullopt);
}

void JobBoard::add_periodic_task(
Task task,
std::chrono::milliseconds initial_delay,
std::chrono::milliseconds repeat_period)
{
add_timed_task(task, initial_delay, repeat_period);
}

void JobBoard::tick(std::chrono::milliseconds elapsed)
{
pimpl->tick(elapsed);
}
}
Loading