diff --git a/SEFramework/SEFramework/Output/OutputRegistry.h b/SEFramework/SEFramework/Output/OutputRegistry.h index 83c5b0ae0..1a6a0d889 100644 --- a/SEFramework/SEFramework/Output/OutputRegistry.h +++ b/SEFramework/SEFramework/Output/OutputRegistry.h @@ -169,7 +169,7 @@ class OutputRegistry { return converter(source.getProperty(i)); }; } - Euclid::Table::Row::cell_type operator()(const SourceInterface& source) { + Euclid::Table::Row::cell_type operator()(const SourceInterface& source) const { return m_convert_func(source, index); } std::size_t index = 0; diff --git a/SEFramework/src/lib/Output/OutputRegistry.cpp b/SEFramework/src/lib/Output/OutputRegistry.cpp index 6cef389b1..cf284cdf4 100644 --- a/SEFramework/src/lib/Output/OutputRegistry.cpp +++ b/SEFramework/src/lib/Output/OutputRegistry.cpp @@ -49,21 +49,26 @@ auto OutputRegistry::getSourceToRowConverter(const std::vector& ena return [this, out_prop_list](const SourceInterface& source) { std::vector info_list {}; std::vector cell_values {}; + + const auto& property_to_names_map = m_property_to_names_map; + const auto& name_to_col_info_map = m_name_to_col_info_map; + const auto& name_to_converter_map = m_name_to_converter_map; + for (const auto& property : out_prop_list) { - if (m_property_to_names_map.count(property) == 0) { + if (property_to_names_map.count(property) == 0) { throw Elements::Exception() << "Missing column generator for " << property.name(); } - for (const auto& name : m_property_to_names_map.at(property)) { - auto& col_info = m_name_to_col_info_map.at(name); - info_list.emplace_back(name, m_name_to_converter_map.at(name).first, - col_info.unit, col_info.description); - cell_values.emplace_back(m_name_to_converter_map.at(name).second(source)); + for (const auto& name : property_to_names_map.at(property)) { + auto& col_info = name_to_col_info_map.at(name); + info_list.push_back(ColumnInfo::info_type {name, name_to_converter_map.at(name).first, + col_info.unit, col_info.description}); + cell_values.push_back(name_to_converter_map.at(name).second(source)); } } if (info_list.empty()) { throw Elements::Exception() << "The given configuration would not generate any output"; } - return Row {std::move(cell_values), std::make_shared(move(info_list))}; + return Row {std::move(cell_values), std::make_shared(std::move(info_list))}; }; } diff --git a/SEImplementation/SEImplementation/Measurement/MultithreadedMeasurement.h b/SEImplementation/SEImplementation/Measurement/MultithreadedMeasurement.h index 7e46a1390..3524303d9 100644 --- a/SEImplementation/SEImplementation/Measurement/MultithreadedMeasurement.h +++ b/SEImplementation/SEImplementation/Measurement/MultithreadedMeasurement.h @@ -40,11 +40,11 @@ class MultithreadedMeasurement : public Measurement { using SourceToRowConverter = std::function; MultithreadedMeasurement(SourceToRowConverter source_to_row, const std::shared_ptr& thread_pool, - unsigned max_queue_size) + unsigned /*max_queue_size*/) : m_source_to_row(source_to_row), m_thread_pool(thread_pool), m_group_counter(0), - m_input_done(false), m_abort_raised(false), m_semaphore(max_queue_size) {} + m_input_done(false), m_abort_raised(false) {} ~MultithreadedMeasurement() override; @@ -66,10 +66,8 @@ class MultithreadedMeasurement : public Measurement { int m_group_counter; std::atomic_bool m_input_done, m_abort_raised; - std::condition_variable m_new_output; std::list>> m_output_queue; - std::mutex m_output_queue_mutex; - Euclid::Semaphore m_semaphore; + mutable std::mutex m_output_queue_mutex; }; } diff --git a/SEImplementation/src/lib/Measurement/MultithreadedMeasurement.cpp b/SEImplementation/src/lib/Measurement/MultithreadedMeasurement.cpp index b29fdecff..b0d7046aa 100644 --- a/SEImplementation/src/lib/Measurement/MultithreadedMeasurement.cpp +++ b/SEImplementation/src/lib/Measurement/MultithreadedMeasurement.cpp @@ -34,7 +34,7 @@ static Elements::Logging logger = Elements::Logging::getLogger("Multithreading") MultithreadedMeasurement::~MultithreadedMeasurement() { - if (m_output_thread->joinable()) { + if (m_output_thread && m_output_thread->joinable()) { m_output_thread->join(); } } @@ -46,7 +46,9 @@ void MultithreadedMeasurement::startThreads() { void MultithreadedMeasurement::stopThreads() { m_input_done = true; m_thread_pool->block(); - m_output_thread->join(); + if (m_output_thread && m_output_thread->joinable()) { + m_output_thread->join(); + } logger.debug() << "All worker threads done!"; } @@ -81,6 +83,7 @@ void MultithreadedMeasurement::receiveSource(std::unique_ptr output_lock(m_output_queue_mutex); + std::lock_guard output_lock(m_output_queue_mutex); m_output_queue.emplace_back(order_number, std::move(source_group)); } - m_new_output.notify_one(); }; auto lambda_copyable = [lambda = std::make_shared(std::move(lambda))](){ (*lambda)(); }; m_thread_pool->submit(lambda_copyable); + ++m_group_counter; } @@ -118,23 +121,20 @@ void MultithreadedMeasurement::outputThreadStatic(MultithreadedMeasurement *meas void MultithreadedMeasurement::outputThreadLoop() { while (m_thread_pool->activeThreads() > 0) { - std::unique_lock output_lock(m_output_queue_mutex); - - // Wait for something in the output queue - if (m_output_queue.empty()) { - m_new_output.wait_for(output_lock, std::chrono::milliseconds(100)); - } + { + std::lock_guard output_lock(m_output_queue_mutex); - // Process the output queue - while (!m_output_queue.empty()) { - sendSource(std::move(m_output_queue.front().second)); - m_output_queue.pop_front(); - } + while (!m_output_queue.empty()) { + sendSource(std::move(m_output_queue.front().second)); + m_output_queue.pop_front(); + } - if (m_input_done && m_thread_pool->running() + m_thread_pool->queued() == 0 && + if (m_input_done && m_thread_pool->running() + m_thread_pool->queued() == 0 && m_output_queue.empty()) { - break; + break; + } } + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } }