Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion SEFramework/SEFramework/Output/OutputRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class OutputRegistry {
return converter(source.getProperty<PropertyType>(i));
};
}
Euclid::Table::Row::cell_type operator()(const SourceInterface& source) {
Euclid::Table::Row::cell_type operator()(const SourceInterface& source) const {
return m_convert_func(source, index);
}
std::size_t index = 0;
Expand Down
19 changes: 12 additions & 7 deletions SEFramework/src/lib/Output/OutputRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,26 @@ auto OutputRegistry::getSourceToRowConverter(const std::vector<std::string>& ena
return [this, out_prop_list](const SourceInterface& source) {
std::vector<ColumnInfo::info_type> info_list {};
std::vector<Row::cell_type> cell_values {};

const auto& property_to_names_map = m_property_to_names_map;
const auto& name_to_col_info_map = m_name_to_col_info_map;
const auto& name_to_converter_map = m_name_to_converter_map;

for (const auto& property : out_prop_list) {
if (m_property_to_names_map.count(property) == 0) {
if (property_to_names_map.count(property) == 0) {
throw Elements::Exception() << "Missing column generator for " << property.name();
}
for (const auto& name : m_property_to_names_map.at(property)) {
auto& col_info = m_name_to_col_info_map.at(name);
info_list.emplace_back(name, m_name_to_converter_map.at(name).first,
col_info.unit, col_info.description);
cell_values.emplace_back(m_name_to_converter_map.at(name).second(source));
for (const auto& name : property_to_names_map.at(property)) {
auto& col_info = name_to_col_info_map.at(name);
info_list.push_back(ColumnInfo::info_type {name, name_to_converter_map.at(name).first,
col_info.unit, col_info.description});
cell_values.push_back(name_to_converter_map.at(name).second(source));
}
}
if (info_list.empty()) {
throw Elements::Exception() << "The given configuration would not generate any output";
}
return Row {std::move(cell_values), std::make_shared<ColumnInfo>(move(info_list))};
return Row {std::move(cell_values), std::make_shared<ColumnInfo>(std::move(info_list))};
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ class MultithreadedMeasurement : public Measurement {

using SourceToRowConverter = std::function<Euclid::Table::Row(const SourceInterface&)>;
MultithreadedMeasurement(SourceToRowConverter source_to_row, const std::shared_ptr<Euclid::ThreadPool>& thread_pool,
unsigned max_queue_size)
unsigned /*max_queue_size*/)
: m_source_to_row(source_to_row),
m_thread_pool(thread_pool),
m_group_counter(0),
m_input_done(false), m_abort_raised(false), m_semaphore(max_queue_size) {}
m_input_done(false), m_abort_raised(false) {}

~MultithreadedMeasurement() override;

Expand All @@ -66,10 +66,8 @@ class MultithreadedMeasurement : public Measurement {
int m_group_counter;
std::atomic_bool m_input_done, m_abort_raised;

std::condition_variable m_new_output;
std::list<std::pair<int, std::unique_ptr<SourceGroupInterface>>> m_output_queue;
std::mutex m_output_queue_mutex;
Euclid::Semaphore m_semaphore;
mutable std::mutex m_output_queue_mutex;
};

}
Expand Down
34 changes: 17 additions & 17 deletions SEImplementation/src/lib/Measurement/MultithreadedMeasurement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ static Elements::Logging logger = Elements::Logging::getLogger("Multithreading")


MultithreadedMeasurement::~MultithreadedMeasurement() {
if (m_output_thread->joinable()) {
if (m_output_thread && m_output_thread->joinable()) {
m_output_thread->join();
}
}
Expand All @@ -46,7 +46,9 @@ void MultithreadedMeasurement::startThreads() {
void MultithreadedMeasurement::stopThreads() {
m_input_done = true;
m_thread_pool->block();
m_output_thread->join();
if (m_output_thread && m_output_thread->joinable()) {
m_output_thread->join();
}
logger.debug() << "All worker threads done!";
}

Expand Down Expand Up @@ -81,22 +83,23 @@ void MultithreadedMeasurement::receiveSource(std::unique_ptr<SourceGroupInterfac

// Put the new SourceGroup into the input queue
auto order_number = m_group_counter;

auto lambda = [this, order_number, source_group = std::move(source_group)]() mutable {
// Trigger measurements
for (auto& source : *source_group) {
m_source_to_row(source);
}
// Pass to the output thread
{
std::unique_lock<std::mutex> output_lock(m_output_queue_mutex);
std::lock_guard<std::mutex> output_lock(m_output_queue_mutex);
m_output_queue.emplace_back(order_number, std::move(source_group));
}
m_new_output.notify_one();
};
auto lambda_copyable = [lambda = std::make_shared<decltype(lambda)>(std::move(lambda))](){
(*lambda)();
};
m_thread_pool->submit(lambda_copyable);

++m_group_counter;
}

Expand All @@ -118,23 +121,20 @@ void MultithreadedMeasurement::outputThreadStatic(MultithreadedMeasurement *meas

void MultithreadedMeasurement::outputThreadLoop() {
while (m_thread_pool->activeThreads() > 0) {
std::unique_lock<std::mutex> output_lock(m_output_queue_mutex);

// Wait for something in the output queue
if (m_output_queue.empty()) {
m_new_output.wait_for(output_lock, std::chrono::milliseconds(100));
}
{
std::lock_guard<std::mutex> output_lock(m_output_queue_mutex);

// Process the output queue
while (!m_output_queue.empty()) {
sendSource(std::move(m_output_queue.front().second));
m_output_queue.pop_front();
}
while (!m_output_queue.empty()) {
sendSource(std::move(m_output_queue.front().second));
m_output_queue.pop_front();
}

if (m_input_done && m_thread_pool->running() + m_thread_pool->queued() == 0 &&
if (m_input_done && m_thread_pool->running() + m_thread_pool->queued() == 0 &&
m_output_queue.empty()) {
break;
break;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}

Expand Down