Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions SEFramework/SEFramework/Pipeline/SourceGrouping.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class SourceGrouping : public SourceGroupingInterface {
std::shared_ptr<SourceGroupFactory> m_group_factory;
std::list<std::unique_ptr<SourceGroupInterface>> m_source_groups;
unsigned int m_hard_limit;
unsigned int m_total_sources_waiting = 0;

}; /* End of SourceGrouping class */

Expand Down
13 changes: 13 additions & 0 deletions SEFramework/src/lib/Pipeline/SourceGrouping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,25 @@
*/

#include "SEFramework/Pipeline/SourceGrouping.h"
#include <ElementsKernel/Logging.h>
#include <vector>


namespace SourceXtractor {

static Elements::Logging logger = Elements::Logging::getLogger("SourceGrouping");


SourceGrouping::SourceGrouping(std::shared_ptr<GroupingCriteria> grouping_criteria,
std::shared_ptr<SourceGroupFactory> group_factory,
unsigned int hard_limit)
: m_grouping_criteria(grouping_criteria), m_group_factory(group_factory), m_hard_limit(hard_limit) {
}

void SourceGrouping::receiveSource(std::unique_ptr<SourceInterface> source) {
m_total_sources_waiting++;
logger.debug() << "Receiving source, sources in grouping: " << m_total_sources_waiting;

// Pointer which points to the group of the source
SourceGroupInterface* matched_group = nullptr;

Expand Down Expand Up @@ -87,6 +94,8 @@ void SourceGrouping::receiveSource(std::unique_ptr<SourceInterface> source) {
void SourceGrouping::receiveProcessSignal(const ProcessSourcesEvent& process_event) {
std::vector<std::list<std::unique_ptr<SourceGroupInterface>>::iterator> groups_to_process;

logger.debug() << "Received processing signal, total sources waiting in grouping: " << m_total_sources_waiting;

// We iterate through all the SourceGroups we have
for (auto group_it = m_source_groups.begin(); group_it != m_source_groups.end(); ++group_it) {
// We look at its Sources and if we find at least one that needs to be processed we put it in groups_to_process
Expand All @@ -101,9 +110,13 @@ void SourceGrouping::receiveProcessSignal(const ProcessSourcesEvent& process_eve
// For each SourceGroup that we put in groups_to_process,
for (auto& group : groups_to_process) {
// we remove it from our list of stored SourceGroups and notify our observers
m_total_sources_waiting -= (*group)->size();
logger.debug() << "Sending group size " << (*group)->size() << ", sources remaining in grouping: " << m_total_sources_waiting;
sendSource(std::move(*group));
m_source_groups.erase(group);
}

logger.debug() << "Processing signal handled, total sources remaining in grouping: " << m_total_sources_waiting;
}

std::set<PropertyId> SourceGrouping::requiredProperties() const {
Expand Down
7 changes: 5 additions & 2 deletions SEFramework/src/lib/Source/EntangledSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@

#include "SEFramework/Source/SourceGroupWithOnDemandProperties.h"
#include "SEFramework/Task/GroupTask.h"
#include "ElementsKernel/Logging.h"

namespace SourceXtractor {

static Elements::Logging logger = Elements::Logging::getLogger("EntangledSource");

SourceGroupWithOnDemandProperties::EntangledSource::EntangledSource(std::shared_ptr<SourceInterface> source, SourceGroupWithOnDemandProperties& group)
: m_source(source), m_group(group) {
// Normally, it should not be possible that the given source is of type
Expand All @@ -38,7 +41,6 @@ SourceGroupWithOnDemandProperties::EntangledSource::EntangledSource(std::shared_
}

const Property& SourceGroupWithOnDemandProperties::EntangledSource::getProperty(const PropertyId& property_id) const {

// If we already have the property stored in this object, returns it
if (m_property_holder.isPropertySet(property_id)) {
return m_property_holder.getProperty(property_id);
Expand All @@ -63,7 +65,8 @@ const Property& SourceGroupWithOnDemandProperties::EntangledSource::getProperty(
throw PropertyNotFoundException(property_id);
}

// Use the task to make the property
//logger.debug() << "Computing property " << property_id.getString();
// Use the task to make the property
group_task->computeProperties(m_group);

// The property should now be available either in this object or in the group object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const Property& SourceWithOnDemandProperties::getProperty(const PropertyId& prop
// if not, get the task that makes it and execute, we should have it then
auto task = m_task_provider->getTask<SourceTask>(property_id);
if (task) {
//logger.debug() << "Computing property " << property_id.getString();
task->computeProperties(const_cast<SourceWithOnDemandProperties&>(*this));
return m_property_holder.getProperty(property_id);
}
Expand Down
1 change: 1 addition & 0 deletions SEImplementation/SEImplementation/Grouping/AssocGrouping.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class AssocGrouping : public SourceGroupingInterface {
std::shared_ptr<SourceGroupFactory> m_group_factory;
std::map<unsigned int, std::unique_ptr<SourceGroupInterface>> m_source_groups;
unsigned int m_hard_limit;
unsigned int m_total_sources_waiting = 0;

};

Expand Down
2 changes: 2 additions & 0 deletions SEImplementation/SEImplementation/Grouping/MoffatGrouping.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class MoffatGrouping : public SourceGroupingInterface {
size_t m_group_counter;
std::map<unsigned int, std::shared_ptr<Group>> m_groups;
QuadTree<std::shared_ptr<SourceInfo>> m_tree;

unsigned int m_total_sources_waiting = 0;
};

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class SplitSourcesGrouping : public SourceGroupingInterface {
std::shared_ptr<SourceGroupFactory> m_group_factory;
std::map<unsigned int, std::unique_ptr<SourceGroupInterface>> m_source_groups;
unsigned int m_hard_limit;

unsigned int m_total_sources_waiting = 0;
};

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ class FlexibleModelFittingIterativeTask : public GroupTask {
std::vector<int> iterations_per_meta;
std::vector<SeFloat> fitting_areas_x;
std::vector<SeFloat> fitting_areas_y;

// Cached images
std::vector<std::shared_ptr<VectorImage<SeFloat>>> cached_image_copies;
std::vector<std::shared_ptr<VectorImage<SeFloat>>> cached_weight_images;
};

struct FittingState {
Expand All @@ -111,8 +115,8 @@ class FlexibleModelFittingIterativeTask : public GroupTask {
void updateCheckImages(SourceGroupInterface& group, double pixel_scale, FittingState& state) const;
SeFloat computeChiSquared(SourceGroupInterface& group, SourceInterface& source, int index,
double pixel_scale, FlexibleModelFittingParameterManager& manager, int& total_data_points, FittingState& state) const;
SeFloat computeChiSquaredForFrame(std::shared_ptr<const Image<SeFloat>> image,
std::shared_ptr<const Image<SeFloat>> model, std::shared_ptr<const Image<SeFloat>> weights, int& data_points) const;
SeFloat computeChiSquaredForFrame(std::shared_ptr<const VectorImage<SeFloat>> image,
std::shared_ptr<const VectorImage<SeFloat>> model, std::shared_ptr<const VectorImage<SeFloat>> weights, int& data_points) const;
int fitSourcePrepareParameters(FlexibleModelFittingParameterManager& parameter_manager,
ModelFitting::EngineParameterManager& engine_parameter_manager,
SourceInterface& source, int index, FittingState& state) const;
Expand Down
17 changes: 16 additions & 1 deletion SEImplementation/src/lib/Grouping/AssocGrouping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

#include "SEImplementation/Grouping/AssocGrouping.h"
#include "SEImplementation/Plugin/AssocMode/AssocMode.h"

#include <ElementsKernel/Logging.h>
namespace SourceXtractor {

static Elements::Logging logger = Elements::Logging::getLogger("AssocGrouping");

AssocGrouping::AssocGrouping(std::shared_ptr<SourceGroupFactory> group_factory, unsigned int hard_limit)
: m_group_factory(group_factory), m_hard_limit(hard_limit)
{
Expand All @@ -31,6 +33,9 @@ std::set<PropertyId> AssocGrouping::requiredProperties() const {

/// Handles a new Source
void AssocGrouping::receiveSource(std::unique_ptr<SourceInterface> source) {
m_total_sources_waiting++;
logger.debug() << "Receiving source, sources in grouping: " << m_total_sources_waiting;

auto source_id = source->getProperty<AssocMode>().getGroupId();

if (m_source_groups.find(source_id) == m_source_groups.end()) {
Expand All @@ -41,6 +46,9 @@ void AssocGrouping::receiveSource(std::unique_ptr<SourceInterface> source) {
if (m_hard_limit > 0 && m_source_groups.at(source_id)->size() >= m_hard_limit) {
// the stored group has reached the hard limit
// send the current group to processing
m_total_sources_waiting -= m_source_groups.at(source_id)->size();
logger.debug() << "Group " << source_id << " reached hard limit of " << m_hard_limit
<< " sources, sending group to processing, sources remaining in grouping: " << m_total_sources_waiting;
sendSource(std::move(m_source_groups.at(source_id)));

// and replace it with a new empty one
Expand All @@ -55,6 +63,8 @@ void AssocGrouping::receiveSource(std::unique_ptr<SourceInterface> source) {
void AssocGrouping::receiveProcessSignal(const ProcessSourcesEvent& event) {
std::vector<unsigned int> groups_to_process;

logger.debug() << "Received processing signal, total sources waiting in grouping: " << m_total_sources_waiting;

// We iterate through all the SourceGroups we have
for (auto const& it : m_source_groups) {
// We look at its Sources and if we find at least one that needs to be processed
Expand All @@ -69,9 +79,14 @@ void AssocGrouping::receiveProcessSignal(const ProcessSourcesEvent& event) {
// For each SourceGroup that we put in groups_to_process,
for (auto group_id : groups_to_process) {
// we remove it from our list of stored SourceGroups and notify our observers
m_total_sources_waiting -= m_source_groups[group_id]->size();
logger.debug() << "Sending group size " << m_source_groups[group_id]->size() << ", sources remaining in grouping: " << m_total_sources_waiting;

sendSource(std::move(m_source_groups[group_id]));
m_source_groups.erase(group_id);
}

logger.debug() << "Processing signal handled, total sources remaining in grouping: " << m_total_sources_waiting;
}

} // SourceXtractor namespace
Expand Down
13 changes: 13 additions & 0 deletions SEImplementation/src/lib/Grouping/MoffatGrouping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@
#include "SEImplementation/Plugin/PixelCentroid/PixelCentroid.h"
#include "SEImplementation/Plugin/PeakValue/PeakValue.h"

#include <ElementsKernel/Logging.h>

namespace SourceXtractor {

static Elements::Logging logger = Elements::Logging::getLogger("MoffatGrouping");

template <>
struct QuadTreeTraits<std::shared_ptr<MoffatGrouping::SourceInfo>> {
static double getCoord(std::shared_ptr<MoffatGrouping::SourceInfo> t, size_t index) {
Expand Down Expand Up @@ -58,6 +62,8 @@ std::set<PropertyId> MoffatGrouping::requiredProperties() const {

/// Handles a new Source
void MoffatGrouping::receiveSource(std::unique_ptr<SourceInterface> source) {
m_total_sources_waiting++;
logger.debug() << "Receiving source, sources in grouping: " << m_total_sources_waiting;

// Encapsulates the source unique_ptr
auto& centroid = source->getProperty<PixelCentroid>();
Expand Down Expand Up @@ -110,6 +116,8 @@ void MoffatGrouping::receiveSource(std::unique_ptr<SourceInterface> source) {
void MoffatGrouping::receiveProcessSignal(const ProcessSourcesEvent& event) {
std::vector<size_t> groups_to_process;

logger.debug() << "Received processing signal, total sources waiting in grouping: " << m_total_sources_waiting;

// We iterate through all the SourceGroups we have
for (auto const& it : m_groups) {
// We look at its Sources and if we find at least one that needs to be processed
Expand All @@ -125,6 +133,8 @@ void MoffatGrouping::receiveProcessSignal(const ProcessSourcesEvent& event) {
for (auto group_id : groups_to_process) {
processGroup(group_id);
}

logger.debug() << "Processing signal handled, total sources remaining in grouping: " << m_total_sources_waiting;
}

void MoffatGrouping::processGroup(unsigned int group_id) {
Expand All @@ -136,6 +146,9 @@ void MoffatGrouping::processGroup(unsigned int group_id) {
m_tree.remove(source_info);
}

m_total_sources_waiting -= new_group->size();
logger.debug() << "Sending group size " << new_group->size() << ", sources remaining in grouping: " << m_total_sources_waiting;

sendSource(std::move(new_group));
m_groups.erase(group_id);
}
Expand Down
31 changes: 23 additions & 8 deletions SEImplementation/src/lib/Grouping/SplitSourcesGrouping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

#include "SEImplementation/Grouping/SplitSourcesGrouping.h"
#include "SEImplementation/Property/SourceId.h"

#include <ElementsKernel/Logging.h>
namespace SourceXtractor {

static Elements::Logging logger = Elements::Logging::getLogger("SplitSourcesGrouping");

SplitSourcesGrouping::SplitSourcesGrouping(std::shared_ptr<SourceGroupFactory> group_factory, unsigned int hard_limit)
: m_group_factory(group_factory), m_hard_limit(hard_limit)
{
Expand All @@ -31,6 +33,9 @@ std::set<PropertyId> SplitSourcesGrouping::requiredProperties() const {

/// Handles a new Source
void SplitSourcesGrouping::receiveSource(std::unique_ptr<SourceInterface> source) {
m_total_sources_waiting++;
logger.debug() << "Receiving source, sources in grouping: " << m_total_sources_waiting;

auto source_id = source->getProperty<SourceId>().getDetectionId();

if (m_source_groups.find(source_id) == m_source_groups.end()) {
Expand All @@ -39,14 +44,17 @@ void SplitSourcesGrouping::receiveSource(std::unique_ptr<SourceInterface> source
}

if (m_hard_limit > 0 && m_source_groups.at(source_id)->size() >= m_hard_limit) {
// the stored group has reached the hard limit
// send the current group to processing
sendSource(std::move(m_source_groups.at(source_id)));
// the stored group has reached the hard limit
// send the current group to processing
m_total_sources_waiting -= m_source_groups.at(source_id)->size();
logger.debug() << "Group " << source_id << " reached hard limit of " << m_hard_limit
<< " sources, sending group to processing, sources remaining in grouping: " << m_total_sources_waiting;
sendSource(std::move(m_source_groups.at(source_id)));

// and replace it with a new empty one
auto new_group = m_group_factory->createSourceGroup();
m_source_groups[source_id] = std::move(new_group);
}
// and replace it with a new empty one
auto new_group = m_group_factory->createSourceGroup();
m_source_groups[source_id] = std::move(new_group);
}

m_source_groups.at(source_id)->addSource(std::move(source));
}
Expand All @@ -55,6 +63,8 @@ void SplitSourcesGrouping::receiveSource(std::unique_ptr<SourceInterface> source
void SplitSourcesGrouping::receiveProcessSignal(const ProcessSourcesEvent& event) {
std::vector<unsigned int> groups_to_process;

logger.debug() << "Received processing signal, total sources waiting in grouping: " << m_total_sources_waiting;

// We iterate through all the SourceGroups we have
for (auto const& it : m_source_groups) {
// We look at its Sources and if we find at least one that needs to be processed
Expand All @@ -69,9 +79,14 @@ void SplitSourcesGrouping::receiveProcessSignal(const ProcessSourcesEvent& event
// For each SourceGroup that we put in groups_to_process,
for (auto group_id : groups_to_process) {
// we remove it from our list of stored SourceGroups and notify our observers
m_total_sources_waiting -= m_source_groups[group_id]->size();
logger.debug() << "Sending group size " << m_source_groups[group_id]->size() << ", sources remaining in grouping: " << m_total_sources_waiting;

sendSource(std::move(m_source_groups[group_id]));
m_source_groups.erase(group_id);
}

logger.debug() << "Processing signal handled, total sources remaining in grouping: " << m_total_sources_waiting;
}

} // SourceXtractor namespace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

using namespace SourceXtractor;

static Elements::Logging logger = Elements::Logging::getLogger("Multithreading");
static Elements::Logging logger = Elements::Logging::getLogger("MultithreadedMeasurement");


MultithreadedMeasurement::~MultithreadedMeasurement() {
Expand All @@ -44,13 +44,16 @@ void MultithreadedMeasurement::startThreads() {
}

void MultithreadedMeasurement::stopThreads() {
logger.debug() << "Stopping worker threads";
m_input_done = true;
m_thread_pool->block();
m_output_thread->join();
logger.debug() << "All worker threads done!";
}

void MultithreadedMeasurement::synchronizeThreads() {
logger.debug() << "Synchronizing worker threads";

// Wait until all worker threads are done
m_thread_pool->block();

Expand All @@ -74,6 +77,9 @@ void MultithreadedMeasurement::synchronizeThreads() {
}

void MultithreadedMeasurement::receiveSource(std::unique_ptr<SourceGroupInterface> source_group) {
logger.debug() << "Receiving source group with " << source_group->size() << " sources, job queue size: "
<< m_thread_pool->queued() << " output queue size: " << m_output_queue.size();

// Force computation of SourceID here, where the order is still deterministic
for (auto& source : *source_group) {
source.getProperty<SourceID>();
Expand Down Expand Up @@ -127,8 +133,14 @@ void MultithreadedMeasurement::outputThreadLoop() {

// Process the output queue
while (!m_output_queue.empty()) {
logger.debug() << "Output queue: " << m_output_queue.size() << " Sending group to output, group has "
<< m_output_queue.front().second->size() << " sources";

sendSource(std::move(m_output_queue.front().second));
m_output_queue.pop_front();

logger.debug() << "Thread pool: active threads: " << m_thread_pool->activeThreads() << ", running jobs: " << m_thread_pool->running()
<< " queued jobs: " << m_thread_pool->queued();
}

if (m_input_done && m_thread_pool->running() + m_thread_pool->queued() == 0 &&
Expand Down
Loading
Loading