diff --git a/plugins/DataWriterModule.cpp b/plugins/DataWriterModule.cpp index b85ed63..96b3912 100644 --- a/plugins/DataWriterModule.cpp +++ b/plugins/DataWriterModule.cpp @@ -10,17 +10,17 @@ #include "dfmodules/CommonIssues.hpp" #include "dfmodules/opmon/DataWriter.pb.h" -#include "confmodel/Application.hpp" -#include "confmodel/Session.hpp" +#include "appmodel/DataStoreConf.hpp" #include "appmodel/DataWriterModule.hpp" #include "appmodel/TRBModule.hpp" -#include "appmodel/DataStoreConf.hpp" +#include "confmodel/Application.hpp" #include "confmodel/Connection.hpp" +#include "confmodel/Session.hpp" #include "daqdataformats/Fragment.hpp" #include "dfmessages/TriggerDecision.hpp" #include "dfmessages/TriggerRecord_serialization.hpp" -#include "logging/Logging.hpp" #include "iomanager/IOManager.hpp" +#include "logging/Logging.hpp" #include "rcif/cmd/Nljs.hpp" #include @@ -34,7 +34,7 @@ /** * @brief Name used by TRACE TLOG calls from this source file */ -//#define TRACE_NAME "DataWriterModule" // NOLINT This is the default +// #define TRACE_NAME "DataWriterModule" // NOLINT This is the default enum { TLVL_ENTER_EXIT_METHODS = 5, @@ -77,7 +77,7 @@ DataWriterModule::init(std::shared_ptr mcfg) } if (outputs.size() != 1) { throw appfwk::CommandFailed( - ERS_HERE, "init", get_name(), "Expected 1 output, got " + std::to_string(outputs.size())); + ERS_HERE, "init", get_name(), "Expected 1 output, got " + std::to_string(outputs.size())); } m_module_configuration = mcfg; @@ -85,10 +85,10 @@ DataWriterModule::init(std::shared_ptr mcfg) m_writer_identifier = mdal->get_writer_identifier(); if (inputs[0]->get_data_type() != datatype_to_string>()) { - throw InvalidQueueFatalError(ERS_HERE, get_name(), "TriggerRecord Input queue"); + throw InvalidQueueFatalError(ERS_HERE, get_name(), "TriggerRecord Input queue"); } if (outputs[0]->get_data_type() != datatype_to_string()) { - throw InvalidQueueFatalError(ERS_HERE, get_name(), "TriggerDecisionToken Output queue"); + throw InvalidQueueFatalError(ERS_HERE, get_name(), "TriggerDecisionToken Output queue"); } m_trigger_record_connection = inputs[0]->UID(); @@ -102,7 +102,7 @@ DataWriterModule::init(std::shared_ptr mcfg) break; } if (mod->class_name() == "TRMonRequestorModule") { - is_trmon = true; + is_trmon = true; break; } } @@ -120,29 +120,30 @@ DataWriterModule::init(std::shared_ptr mcfg) } // try to create the receiver to see test the connection anyway - m_tr_receiver = iom -> get_receiver>(m_trigger_record_connection); + m_tr_receiver = iom->get_receiver>(m_trigger_record_connection); m_token_output = iom->get_sender(outputs[0]->UID()); - + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method"; } void -DataWriterModule::generate_opmon_data() { +DataWriterModule::generate_opmon_data() +{ opmon::DataWriterInfo dwi; dwi.set_records_received(m_records_received_tot.load()); -// dwi.new_records_received = m_records_received.exchange(0); + // dwi.new_records_received = m_records_received.exchange(0); dwi.set_records_written(m_records_written_tot.load()); dwi.set_new_records_written(m_records_written.exchange(0)); -// dwi.bytes_output = m_bytes_output_tot.load(); MR: byte writing to be delegated to DataStorage -// dwi.new_bytes_output = m_bytes_output.exchange(0); + // dwi.bytes_output = m_bytes_output_tot.load(); MR: byte writing to be delegated to DataStorage + // dwi.new_bytes_output = m_bytes_output.exchange(0); dwi.set_writing_time_us(m_writing_us.exchange(0)); publish(std::move(dwi)); } - + void DataWriterModule::do_conf(const CommandData_t&) { @@ -150,7 +151,8 @@ DataWriterModule::do_conf(const CommandData_t&) m_data_storage_prescale = m_data_writer_conf->get_data_storage_prescale(); TLOG_DEBUG(TLVL_CONFIG) << get_name() << ": data_storage_prescale is " << m_data_storage_prescale; - TLOG_DEBUG(TLVL_CONFIG) << get_name() << ": data_store_parameters are " << m_data_writer_conf->get_data_store_params(); + TLOG_DEBUG(TLVL_CONFIG) << get_name() << ": data_store_parameters are " + << m_data_writer_conf->get_data_store_params(); m_min_write_retry_time_usec = m_data_writer_conf->get_min_write_retry_time_ms() * 1000; if (m_min_write_retry_time_usec < 1) { m_min_write_retry_time_usec = 1; @@ -162,7 +164,8 @@ DataWriterModule::do_conf(const CommandData_t&) try { m_data_writer = make_data_store(m_data_writer_conf->get_data_store_params()->get_type(), m_data_writer_conf->get_data_store_params()->UID(), - m_module_configuration, m_writer_identifier); + m_module_configuration, + m_writer_identifier); register_node("data_writer", m_data_writer); } catch (const ers::Issue& excpt) { throw UnableToConfigure(ERS_HERE, get_name(), excpt); @@ -180,7 +183,7 @@ void DataWriterModule::do_start(const CommandData_t& payload) { TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method"; - + rcif::cmd::StartParams start_params = payload.get(); m_data_storage_is_enabled = (!start_params.disable_data_storage); m_run_number = start_params.run; @@ -204,7 +207,7 @@ DataWriterModule::do_start(const CommandData_t& payload) std::this_thread::sleep_for(std::chrono::microseconds(5000)); } } while (wasSentSuccessfully); - + // 04-Feb-2021, KAB: added this call to allow DataStore to prepare for the run. // I've put this call fairly early in this method because it could throw an // exception and abort the run start. And, it seems sensible to avoid starting @@ -217,7 +220,7 @@ DataWriterModule::do_start(const CommandData_t& payload) // in case the "start" has been called before the "conf" ers::fatal(InvalidDataWriterModule(ERS_HERE, get_name())); } - + try { m_data_writer->prepare_for_run(m_run_number, (start_params.production_vs_test == "TEST")); } catch (const ers::Issue& excpt) { @@ -226,7 +229,7 @@ DataWriterModule::do_start(const CommandData_t& payload) } m_seqno_counts.clear(); - + m_records_received = 0; m_records_received_tot = 0; m_records_written = 0; @@ -237,8 +240,8 @@ DataWriterModule::do_start(const CommandData_t& payload) m_running.store(true); m_thread.start_working_thread(get_name()); - //iomanager::IOManager::get()->add_callback>( m_trigger_record_connection, - // bind( &DataWriterModule::receive_trigger_record, this, std::placeholders::_1) ); + // iomanager::IOManager::get()->add_callback>( + // m_trigger_record_connection, bind( &DataWriterModule::receive_trigger_record, this, std::placeholders::_1) ); TLOG() << get_name() << " successfully started for run number " << m_run_number; TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method"; @@ -250,8 +253,9 @@ DataWriterModule::do_stop(const CommandData_t& /*args*/) TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method"; m_running.store(false); - m_thread.stop_working_thread(); - //iomanager::IOManager::get()->remove_callback>( m_trigger_record_connection ); + m_thread.stop_working_thread(); + // iomanager::IOManager::get()->remove_callback>( + // m_trigger_record_connection ); // 04-Feb-2021, KAB: added this call to allow DataStore to finish up with this run. // I've put this call fairly late in this method so that any draining of queues @@ -280,21 +284,24 @@ DataWriterModule::do_scrap(const CommandData_t& /*payload*/) } void -DataWriterModule::receive_trigger_record(std::unique_ptr & trigger_record_ptr) +DataWriterModule::receive_trigger_record(std::unique_ptr& trigger_record_ptr) { TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": receiving a new TR ptr"; ++m_records_received; ++m_records_received_tot; TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": Obtained the TriggerRecord for trigger number " - << trigger_record_ptr->get_header_ref().get_trigger_number() << "." - << trigger_record_ptr->get_header_ref().get_sequence_number() - << ", run number " << trigger_record_ptr->get_header_ref().get_run_number() - << " off the input connection"; + << trigger_record_ptr->get_header_ref().get_trigger_number() << "." + << trigger_record_ptr->get_header_ref().get_sequence_number() << ", run number " + << trigger_record_ptr->get_header_ref().get_run_number() << " off the input connection"; if (trigger_record_ptr->get_header_ref().get_run_number() != m_run_number) { - ers::error(InvalidRunNumber(ERS_HERE, get_name(), "TriggerRecord", trigger_record_ptr->get_header_ref().get_run_number(), - m_run_number, trigger_record_ptr->get_header_ref().get_trigger_number(), + ers::error(InvalidRunNumber(ERS_HERE, + get_name(), + "TriggerRecord", + trigger_record_ptr->get_header_ref().get_run_number(), + m_run_number, + trigger_record_ptr->get_header_ref().get_trigger_number(), trigger_record_ptr->get_header_ref().get_sequence_number())); return; } @@ -304,42 +311,42 @@ DataWriterModule::receive_trigger_record(std::unique_ptrwrite(*trigger_record_ptr); - ++m_records_written; - ++m_records_written_tot; - m_bytes_output += trigger_record_ptr->get_total_size_bytes(); - m_bytes_output_tot += trigger_record_ptr->get_total_size_bytes(); - } catch (const RetryableDataStoreProblem& excpt) { - should_retry = true; - ers::error(DataWritingProblem(ERS_HERE, - get_name(), - trigger_record_ptr->get_header_ref().get_trigger_number(), - trigger_record_ptr->get_header_ref().get_sequence_number(), - trigger_record_ptr->get_header_ref().get_run_number(), - excpt)); - if (retry_wait_usec > m_max_write_retry_time_usec) { - retry_wait_usec = m_max_write_retry_time_usec; - } - usleep(retry_wait_usec); - retry_wait_usec *= m_write_retry_time_increase_factor; - } catch (const std::exception& excpt) { - ers::error(DataWritingProblem(ERS_HERE, - get_name(), - trigger_record_ptr->get_header_ref().get_trigger_number(), - trigger_record_ptr->get_header_ref().get_sequence_number(), - trigger_record_ptr->get_header_ref().get_run_number(), - excpt)); - } + should_retry = false; + try { + m_data_writer->write(*trigger_record_ptr); + ++m_records_written; + ++m_records_written_tot; + m_bytes_output += trigger_record_ptr->get_total_size_bytes(); + m_bytes_output_tot += trigger_record_ptr->get_total_size_bytes(); + } catch (const RetryableDataStoreProblem& excpt) { + should_retry = true; + ers::error(DataWritingProblem(ERS_HERE, + get_name(), + trigger_record_ptr->get_header_ref().get_trigger_number(), + trigger_record_ptr->get_header_ref().get_sequence_number(), + trigger_record_ptr->get_header_ref().get_run_number(), + excpt)); + if (retry_wait_usec > m_max_write_retry_time_usec) { + retry_wait_usec = m_max_write_retry_time_usec; + } + usleep(retry_wait_usec); + retry_wait_usec *= m_write_retry_time_increase_factor; + } catch (const std::exception& excpt) { + ers::error(DataWritingProblem(ERS_HERE, + get_name(), + trigger_record_ptr->get_header_ref().get_trigger_number(), + trigger_record_ptr->get_header_ref().get_sequence_number(), + trigger_record_ptr->get_header_ref().get_run_number(), + excpt)); + } } while (should_retry && m_running.load()); std::chrono::steady_clock::time_point end_time = std::chrono::steady_clock::now(); @@ -347,7 +354,7 @@ DataWriterModule::receive_trigger_record(std::unique_ptrget_header_ref().get_max_sequence_number() > 0) { daqdataformats::trigger_number_t trigno = trigger_record_ptr->get_header_ref().get_trigger_number(); @@ -362,50 +369,49 @@ DataWriterModule::receive_trigger_record(std::unique_ptrget_header_ref().get_trigger_number() - << " onto the relevant output queue"; + << trigger_record_ptr->get_header_ref().get_trigger_number() + << " onto the relevant output queue"; dfmessages::TriggerDecisionToken token; token.run_number = m_run_number; token.trigger_number = trigger_record_ptr->get_header_ref().get_trigger_number(); token.decision_destination = m_trigger_decision_connection; bool wasSentSuccessfully = false; - do { + do { try { - m_token_output -> send( std::move(token), m_queue_timeout ); - wasSentSuccessfully = true; + m_token_output->send(std::move(token), m_queue_timeout); + wasSentSuccessfully = true; } catch (const ers::Issue& excpt) { - std::ostringstream oss_warn; - oss_warn << "Send with sender \"" << m_token_output -> get_name() << "\" failed"; - ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt)); + std::ostringstream oss_warn; + oss_warn << "Send with sender \"" << m_token_output->get_name() << "\" failed"; + ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt)); } } while (!wasSentSuccessfully && m_running.load()); - } - + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": operations completed for TR"; } // NOLINT(readability/fn_size) void -DataWriterModule::do_work(std::atomic& running_flag) { +DataWriterModule::do_work(std::atomic& running_flag) +{ while (running_flag.load()) { - try { - std::unique_ptr tr = m_tr_receiver-> receive(std::chrono::milliseconds(10)); - receive_trigger_record(tr); - } - catch(const iomanager::TimeoutExpired& excpt) { - } - catch(const ers::Issue & excpt) { - ers::warning(excpt); - } + try { + std::unique_ptr tr = m_tr_receiver->receive(std::chrono::milliseconds(10)); + receive_trigger_record(tr); + } catch (const iomanager::TimeoutExpired& excpt) { + } catch (const ers::Issue& excpt) { + ers::warning(excpt); + } } } diff --git a/plugins/DataWriterModule.hpp b/plugins/DataWriterModule.hpp index 3a4ebbd..5412710 100644 --- a/plugins/DataWriterModule.hpp +++ b/plugins/DataWriterModule.hpp @@ -17,8 +17,8 @@ #include "dfmessages/TriggerDecisionToken.hpp" #include "iomanager/Receiver.hpp" #include "iomanager/Sender.hpp" -#include "utilities/WorkerThread.hpp" #include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG< #include @@ -98,7 +98,6 @@ class DataWriterModule : public dunedaq::appfwk::DAQModule std::atomic m_bytes_output_tot = { 0 }; // NOLINT(build/unsigned) std::atomic m_writing_us = { 0 }; // NOLINT(build/unsigned) - // Other std::map m_seqno_counts; @@ -129,10 +128,9 @@ ERS_DECLARE_ISSUE_BASE(dfmodules, ERS_DECLARE_ISSUE_BASE(dfmodules, InvalidRunNumber, appfwk::GeneralDAQModuleIssue, - "An invalid run number was received in a " - << msg_type << " message, " - << "received=" << received << ", expected=" << expected << ", trig/seq_number=" << trnum << "." - << seqnum, + "An invalid run number was received in a " << msg_type << " message, " << "received=" << received + << ", expected=" << expected + << ", trig/seq_number=" << trnum << "." << seqnum, ((std::string)name), ((std::string)msg_type)((size_t)received)((size_t)expected)((size_t)trnum)((size_t)seqnum))