Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 93 additions & 87 deletions plugins/DataWriterModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@
#include "dfmodules/CommonIssues.hpp"
#include "dfmodules/opmon/DataWriter.pb.h"

#include "confmodel/Application.hpp"
#include "confmodel/Session.hpp"
#include "appmodel/DataStoreConf.hpp"
#include "appmodel/DataWriterModule.hpp"
#include "appmodel/TRBModule.hpp"
#include "appmodel/DataStoreConf.hpp"
#include "confmodel/Application.hpp"
#include "confmodel/Connection.hpp"
#include "confmodel/Session.hpp"
#include "daqdataformats/Fragment.hpp"
#include "dfmessages/TriggerDecision.hpp"
#include "dfmessages/TriggerRecord_serialization.hpp"
#include "logging/Logging.hpp"
#include "iomanager/IOManager.hpp"
#include "logging/Logging.hpp"
#include "rcif/cmd/Nljs.hpp"

#include <algorithm>
Expand All @@ -34,7 +34,7 @@
/**
* @brief Name used by TRACE TLOG calls from this source file
*/
//#define TRACE_NAME "DataWriterModule" // NOLINT This is the default
// #define TRACE_NAME "DataWriterModule" // NOLINT This is the default
enum
{
TLVL_ENTER_EXIT_METHODS = 5,
Expand Down Expand Up @@ -77,18 +77,18 @@ DataWriterModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
}
if (outputs.size() != 1) {
throw appfwk::CommandFailed(
ERS_HERE, "init", get_name(), "Expected 1 output, got " + std::to_string(outputs.size()));
ERS_HERE, "init", get_name(), "Expected 1 output, got " + std::to_string(outputs.size()));
}

m_module_configuration = mcfg;
m_data_writer_conf = mdal->get_configuration();
m_writer_identifier = mdal->get_writer_identifier();

if (inputs[0]->get_data_type() != datatype_to_string<std::unique_ptr<daqdataformats::TriggerRecord>>()) {
throw InvalidQueueFatalError(ERS_HERE, get_name(), "TriggerRecord Input queue");
throw InvalidQueueFatalError(ERS_HERE, get_name(), "TriggerRecord Input queue");
}
if (outputs[0]->get_data_type() != datatype_to_string<dfmessages::TriggerDecisionToken>()) {
throw InvalidQueueFatalError(ERS_HERE, get_name(), "TriggerDecisionToken Output queue");
throw InvalidQueueFatalError(ERS_HERE, get_name(), "TriggerDecisionToken Output queue");
}

m_trigger_record_connection = inputs[0]->UID();
Expand All @@ -102,7 +102,7 @@ DataWriterModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
break;
}
if (mod->class_name() == "TRMonRequestorModule") {
is_trmon = true;
is_trmon = true;
break;
}
}
Expand All @@ -120,37 +120,39 @@ DataWriterModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
}

// try to create the receiver to see test the connection anyway
m_tr_receiver = iom -> get_receiver<std::unique_ptr<daqdataformats::TriggerRecord>>(m_trigger_record_connection);
m_tr_receiver = iom->get_receiver<std::unique_ptr<daqdataformats::TriggerRecord>>(m_trigger_record_connection);

m_token_output = iom->get_sender<dfmessages::TriggerDecisionToken>(outputs[0]->UID());

TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
}

void
DataWriterModule::generate_opmon_data() {
DataWriterModule::generate_opmon_data()
{

opmon::DataWriterInfo dwi;

dwi.set_records_received(m_records_received_tot.load());
// dwi.new_records_received = m_records_received.exchange(0);
// dwi.new_records_received = m_records_received.exchange(0);
dwi.set_records_written(m_records_written_tot.load());
dwi.set_new_records_written(m_records_written.exchange(0));
// dwi.bytes_output = m_bytes_output_tot.load(); MR: byte writing to be delegated to DataStorage
// dwi.new_bytes_output = m_bytes_output.exchange(0);
// dwi.bytes_output = m_bytes_output_tot.load(); MR: byte writing to be delegated to DataStorage
// dwi.new_bytes_output = m_bytes_output.exchange(0);
dwi.set_writing_time_us(m_writing_us.exchange(0));

publish(std::move(dwi));
}

void
DataWriterModule::do_conf(const CommandData_t&)
{
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method";

m_data_storage_prescale = m_data_writer_conf->get_data_storage_prescale();
TLOG_DEBUG(TLVL_CONFIG) << get_name() << ": data_storage_prescale is " << m_data_storage_prescale;
TLOG_DEBUG(TLVL_CONFIG) << get_name() << ": data_store_parameters are " << m_data_writer_conf->get_data_store_params();
TLOG_DEBUG(TLVL_CONFIG) << get_name() << ": data_store_parameters are "
<< m_data_writer_conf->get_data_store_params();
m_min_write_retry_time_usec = m_data_writer_conf->get_min_write_retry_time_ms() * 1000;
if (m_min_write_retry_time_usec < 1) {
m_min_write_retry_time_usec = 1;
Expand All @@ -162,7 +164,8 @@ DataWriterModule::do_conf(const CommandData_t&)
try {
m_data_writer = make_data_store(m_data_writer_conf->get_data_store_params()->get_type(),
m_data_writer_conf->get_data_store_params()->UID(),
m_module_configuration, m_writer_identifier);
m_module_configuration,
m_writer_identifier);
register_node("data_writer", m_data_writer);
} catch (const ers::Issue& excpt) {
throw UnableToConfigure(ERS_HERE, get_name(), excpt);
Expand All @@ -180,7 +183,7 @@ void
DataWriterModule::do_start(const CommandData_t& payload)
{
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";

rcif::cmd::StartParams start_params = payload.get<rcif::cmd::StartParams>();
m_data_storage_is_enabled = (!start_params.disable_data_storage);
m_run_number = start_params.run;
Expand All @@ -204,7 +207,7 @@ DataWriterModule::do_start(const CommandData_t& payload)
std::this_thread::sleep_for(std::chrono::microseconds(5000));
}
} while (wasSentSuccessfully);

// 04-Feb-2021, KAB: added this call to allow DataStore to prepare for the run.
// I've put this call fairly early in this method because it could throw an
// exception and abort the run start. And, it seems sensible to avoid starting
Expand All @@ -217,7 +220,7 @@ DataWriterModule::do_start(const CommandData_t& payload)
// in case the "start" has been called before the "conf"
ers::fatal(InvalidDataWriterModule(ERS_HERE, get_name()));
}

try {
m_data_writer->prepare_for_run(m_run_number, (start_params.production_vs_test == "TEST"));
} catch (const ers::Issue& excpt) {
Expand All @@ -226,7 +229,7 @@ DataWriterModule::do_start(const CommandData_t& payload)
}

m_seqno_counts.clear();

m_records_received = 0;
m_records_received_tot = 0;
m_records_written = 0;
Expand All @@ -237,8 +240,8 @@ DataWriterModule::do_start(const CommandData_t& payload)
m_running.store(true);

m_thread.start_working_thread(get_name());
//iomanager::IOManager::get()->add_callback<std::unique_ptr<daqdataformats::TriggerRecord>>( m_trigger_record_connection,
// bind( &DataWriterModule::receive_trigger_record, this, std::placeholders::_1) );
// iomanager::IOManager::get()->add_callback<std::unique_ptr<daqdataformats::TriggerRecord>>(
// m_trigger_record_connection, bind( &DataWriterModule::receive_trigger_record, this, std::placeholders::_1) );

TLOG() << get_name() << " successfully started for run number " << m_run_number;
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
Expand All @@ -250,8 +253,9 @@ DataWriterModule::do_stop(const CommandData_t& /*args*/)
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";

m_running.store(false);
m_thread.stop_working_thread();
//iomanager::IOManager::get()->remove_callback<std::unique_ptr<daqdataformats::TriggerRecord>>( m_trigger_record_connection );
m_thread.stop_working_thread();
// iomanager::IOManager::get()->remove_callback<std::unique_ptr<daqdataformats::TriggerRecord>>(
// m_trigger_record_connection );

// 04-Feb-2021, KAB: added this call to allow DataStore to finish up with this run.
// I've put this call fairly late in this method so that any draining of queues
Expand Down Expand Up @@ -280,21 +284,24 @@ DataWriterModule::do_scrap(const CommandData_t& /*payload*/)
}

void
DataWriterModule::receive_trigger_record(std::unique_ptr<daqdataformats::TriggerRecord> & trigger_record_ptr)
DataWriterModule::receive_trigger_record(std::unique_ptr<daqdataformats::TriggerRecord>& trigger_record_ptr)
{
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": receiving a new TR ptr";

++m_records_received;
++m_records_received_tot;
TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": Obtained the TriggerRecord for trigger number "
<< trigger_record_ptr->get_header_ref().get_trigger_number() << "."
<< trigger_record_ptr->get_header_ref().get_sequence_number()
<< ", run number " << trigger_record_ptr->get_header_ref().get_run_number()
<< " off the input connection";
<< trigger_record_ptr->get_header_ref().get_trigger_number() << "."
<< trigger_record_ptr->get_header_ref().get_sequence_number() << ", run number "
<< trigger_record_ptr->get_header_ref().get_run_number() << " off the input connection";

if (trigger_record_ptr->get_header_ref().get_run_number() != m_run_number) {
ers::error(InvalidRunNumber(ERS_HERE, get_name(), "TriggerRecord", trigger_record_ptr->get_header_ref().get_run_number(),
m_run_number, trigger_record_ptr->get_header_ref().get_trigger_number(),
ers::error(InvalidRunNumber(ERS_HERE,
get_name(),
"TriggerRecord",
trigger_record_ptr->get_header_ref().get_run_number(),
m_run_number,
trigger_record_ptr->get_header_ref().get_trigger_number(),
trigger_record_ptr->get_header_ref().get_sequence_number()));
return;
}
Expand All @@ -304,50 +311,50 @@ DataWriterModule::receive_trigger_record(std::unique_ptr<daqdataformats::Trigger
// instead of zero, since I think that it would be nice to always get the first event
// written out.
if (m_data_storage_prescale <= 1 || ((m_records_received_tot.load() % m_data_storage_prescale) == 1)) {

if (m_data_storage_is_enabled) {

std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now();

bool should_retry = true;
size_t retry_wait_usec = m_min_write_retry_time_usec;
do {
should_retry = false;
try {
m_data_writer->write(*trigger_record_ptr);
++m_records_written;
++m_records_written_tot;
m_bytes_output += trigger_record_ptr->get_total_size_bytes();
m_bytes_output_tot += trigger_record_ptr->get_total_size_bytes();
} catch (const RetryableDataStoreProblem& excpt) {
should_retry = true;
ers::error(DataWritingProblem(ERS_HERE,
get_name(),
trigger_record_ptr->get_header_ref().get_trigger_number(),
trigger_record_ptr->get_header_ref().get_sequence_number(),
trigger_record_ptr->get_header_ref().get_run_number(),
excpt));
if (retry_wait_usec > m_max_write_retry_time_usec) {
retry_wait_usec = m_max_write_retry_time_usec;
}
usleep(retry_wait_usec);
retry_wait_usec *= m_write_retry_time_increase_factor;
} catch (const std::exception& excpt) {
ers::error(DataWritingProblem(ERS_HERE,
get_name(),
trigger_record_ptr->get_header_ref().get_trigger_number(),
trigger_record_ptr->get_header_ref().get_sequence_number(),
trigger_record_ptr->get_header_ref().get_run_number(),
excpt));
}
should_retry = false;
try {
m_data_writer->write(*trigger_record_ptr);
++m_records_written;
++m_records_written_tot;
m_bytes_output += trigger_record_ptr->get_total_size_bytes();
m_bytes_output_tot += trigger_record_ptr->get_total_size_bytes();
} catch (const RetryableDataStoreProblem& excpt) {
should_retry = true;
ers::error(DataWritingProblem(ERS_HERE,
get_name(),
trigger_record_ptr->get_header_ref().get_trigger_number(),
trigger_record_ptr->get_header_ref().get_sequence_number(),
trigger_record_ptr->get_header_ref().get_run_number(),
excpt));
if (retry_wait_usec > m_max_write_retry_time_usec) {
retry_wait_usec = m_max_write_retry_time_usec;
}
usleep(retry_wait_usec);
retry_wait_usec *= m_write_retry_time_increase_factor;
} catch (const std::exception& excpt) {
ers::error(DataWritingProblem(ERS_HERE,
get_name(),
trigger_record_ptr->get_header_ref().get_trigger_number(),
trigger_record_ptr->get_header_ref().get_sequence_number(),
trigger_record_ptr->get_header_ref().get_run_number(),
excpt));
}
} while (should_retry && m_running.load());

std::chrono::steady_clock::time_point end_time = std::chrono::steady_clock::now();
auto writing_time = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
m_writing_us += writing_time.count();
} // if m_data_storage_is_enabled
}

bool send_trigger_complete_message = m_running.load();
if (trigger_record_ptr->get_header_ref().get_max_sequence_number() > 0) {
daqdataformats::trigger_number_t trigno = trigger_record_ptr->get_header_ref().get_trigger_number();
Expand All @@ -362,50 +369,49 @@ DataWriterModule::receive_trigger_record(std::unique_ptr<daqdataformats::Trigger
m_seqno_counts.erase(trigno);
} else {
// Using const .count and .at to avoid reintroducing element to map
TLOG_DEBUG(TLVL_SEQNO_MAP_CONTENTS) << get_name() << ": the sequence number count for trigger number " << trigno
<< " is " << (m_seqno_counts.count(trigno) ? m_seqno_counts.at(trigno) : 0) << " (number of entries "
<< "in the seqno map is " << m_seqno_counts.size() << ").";
TLOG_DEBUG(TLVL_SEQNO_MAP_CONTENTS)
<< get_name() << ": the sequence number count for trigger number " << trigno << " is "
<< (m_seqno_counts.count(trigno) ? m_seqno_counts.at(trigno) : 0) << " (number of entries "
<< "in the seqno map is " << m_seqno_counts.size() << ").";
send_trigger_complete_message = false;
}
}
if (send_trigger_complete_message) {
TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": Pushing the TriggerDecisionToken for trigger number "
<< trigger_record_ptr->get_header_ref().get_trigger_number()
<< " onto the relevant output queue";
<< trigger_record_ptr->get_header_ref().get_trigger_number()
<< " onto the relevant output queue";
dfmessages::TriggerDecisionToken token;
token.run_number = m_run_number;
token.trigger_number = trigger_record_ptr->get_header_ref().get_trigger_number();
token.decision_destination = m_trigger_decision_connection;

bool wasSentSuccessfully = false;
do {
do {
try {
m_token_output -> send( std::move(token), m_queue_timeout );
wasSentSuccessfully = true;
m_token_output->send(std::move(token), m_queue_timeout);
wasSentSuccessfully = true;
} catch (const ers::Issue& excpt) {
std::ostringstream oss_warn;
oss_warn << "Send with sender \"" << m_token_output -> get_name() << "\" failed";
ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt));
std::ostringstream oss_warn;
oss_warn << "Send with sender \"" << m_token_output->get_name() << "\" failed";
ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt));
}
} while (!wasSentSuccessfully && m_running.load());

}

TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": operations completed for TR";
} // NOLINT(readability/fn_size)

void
DataWriterModule::do_work(std::atomic<bool>& running_flag) {
DataWriterModule::do_work(std::atomic<bool>& running_flag)
{
while (running_flag.load()) {
try {
std::unique_ptr<daqdataformats::TriggerRecord> tr = m_tr_receiver-> receive(std::chrono::milliseconds(10));
receive_trigger_record(tr);
}
catch(const iomanager::TimeoutExpired& excpt) {
}
catch(const ers::Issue & excpt) {
ers::warning(excpt);
}
try {
std::unique_ptr<daqdataformats::TriggerRecord> tr = m_tr_receiver->receive(std::chrono::milliseconds(10));
receive_trigger_record(tr);
} catch (const iomanager::TimeoutExpired& excpt) {
} catch (const ers::Issue& excpt) {
ers::warning(excpt);
}
}
}

Expand Down
10 changes: 4 additions & 6 deletions plugins/DataWriterModule.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
#include "dfmessages/TriggerDecisionToken.hpp"
#include "iomanager/Receiver.hpp"
#include "iomanager/Sender.hpp"
#include "utilities/WorkerThread.hpp"
#include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
#include "utilities/WorkerThread.hpp"

#include <chrono>
#include <map>
Expand Down Expand Up @@ -98,7 +98,6 @@ class DataWriterModule : public dunedaq::appfwk::DAQModule
std::atomic<uint64_t> m_bytes_output_tot = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_writing_us = { 0 }; // NOLINT(build/unsigned)


// Other
std::map<daqdataformats::trigger_number_t, size_t> m_seqno_counts;

Expand Down Expand Up @@ -129,10 +128,9 @@ ERS_DECLARE_ISSUE_BASE(dfmodules,
ERS_DECLARE_ISSUE_BASE(dfmodules,
InvalidRunNumber,
appfwk::GeneralDAQModuleIssue,
"An invalid run number was received in a "
<< msg_type << " message, "
<< "received=" << received << ", expected=" << expected << ", trig/seq_number=" << trnum << "."
<< seqnum,
"An invalid run number was received in a " << msg_type << " message, " << "received=" << received
<< ", expected=" << expected
<< ", trig/seq_number=" << trnum << "." << seqnum,
((std::string)name),
((std::string)msg_type)((size_t)received)((size_t)expected)((size_t)trnum)((size_t)seqnum))

Expand Down