diff --git a/CMakeLists.txt b/CMakeLists.txt index 6ef7bcf..bdf8afa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,7 +5,8 @@ set(CMAKE_CXX_STANDARD 14) add_compile_options("-fno-omit-frame-pointer") add_compile_options("-Wall") add_compile_options("-fsanitize=address") -add_compile_options("-O3") +add_compile_options("-O1") +add_compile_options("-pthread") #No, don't do it, don't do it! #add_compile_options("-v") @@ -22,18 +23,11 @@ include_directories(src/util) include_directories(src/util/exceptions) include_directories(src/util/logger) - - add_executable(log_engine src/engine/adapter/Categorization/Categorization.cpp src/engine/adapter/Categorization/Categorization.h src/engine/adapter/Categorization/Result.h src/engine/adapter/Categorization/Rule.h - src/engine/adapter/ClientUpTime/ClientUpTime.cpp - src/engine/adapter/ClientUpTime/ClientUpTime.h - src/engine/adapter/ClientUpTime/Connection.h - src/engine/adapter/MultiIPChecker/MultiIPChecker.h - src/engine/adapter/OptimizeMe/OptimizeMe.h src/engine/adapter/TextSearch/TextSearch.cpp src/engine/adapter/TextSearch/TextSearch.h src/engine/model/EngineInput.h @@ -53,8 +47,18 @@ add_executable(log_engine src/util/Config.cpp src/engine/model/Payload.h src/engine/adapter/TextSearch/TextSearchPayload.h - src/engine/adapter/PatternAbuser/PatternAbuser.cpp - src/engine/adapter/PatternAbuser/PatternAbuser.h src/engine/OutputWrapper.cpp src/engine/OutputWrapper.h src/engine/interface/IJsonSerializable.h) + src/engine/OutputWrapper.cpp + src/engine/OutputWrapper.h + src/engine/interface/IJsonSerializable.h + src/engine/task/AsyncExecutionBuffer.cpp + src/engine/task/AsyncExecutionBuffer.h + src/engine/interface/IEngineOutputElement.h + src/engine/adapter/TextSearch/TextSearchBufferElement.h + src/engine/adapter/STDOutput.cpp + src/engine/adapter/STDOutput.h + src/engine/task/TaskScheduler.cpp + src/engine/task/TaskScheduler.h + src/engine/model/IExecutionTask.h) target_link_libraries(log_engine -fsanitize=address diff --git a/src/engine/Dispatcher.cpp b/src/engine/Dispatcher.cpp index 83fdec3..170e14e 100644 --- a/src/engine/Dispatcher.cpp +++ b/src/engine/Dispatcher.cpp @@ -3,11 +3,11 @@ #include #include #include "Dispatcher.h" - +#include +#include Dispatcher::Dispatcher(const EngineInput &eIN) : - engineInput(eIN) -{ + engineInput(eIN) { logger = new util::Logger("Dispatcher"); validate(eIN.payload); initEngines(); @@ -15,20 +15,17 @@ Dispatcher::Dispatcher(const EngineInput &eIN) : terminate(); } -void Dispatcher::validate(const std::string &payload) -{ +void Dispatcher::validate(const std::string &payload) { logger->info("Validating input"); - if (engineInput.serverLogFile.empty()) - { + if (engineInput.serverLogFile.empty()) { logger->error("No log file specified"); throw std::system_error(std::error_code(404, std::system_category()), "The log file needs to be specified with the -l/-log CLI parameter"); } - if (engineInput.function == -1) - { + if (engineInput.function == -1) { logger->error("No function was specified to be executed!"); throw std::system_error(std::error_code(404, std::system_category()), "The engine function has to be specified by using the -f/-function CLI parameter"); @@ -36,8 +33,7 @@ void Dispatcher::validate(const std::string &payload) engineInput.payload = payload; } -void Dispatcher::initEngines() -{ +void Dispatcher::initEngines() { logger->info("Initializing requested Engine Adapter"); //executionList.push_back(new adapter::OptimizeMe()); //optimize, do not sync standard input output @@ -45,50 +41,34 @@ void Dispatcher::initEngines() //optimize, detach console input std::cin.tie(nullptr); - switch (engineInput.function) - { - case ENGINE_FUNCTION::SEARCH: - { + switch (engineInput.function) { + case ENGINE_FUNCTION::SEARCH: { nlohmann::json jsonPayload = nlohmann::json::parse(engineInput.payload); TextSearchPayload stackAllocation; stackAllocation.fromJson(jsonPayload); - executionList.push_back(new adapter::TextSearch(engineInput.serverLogFile, stackAllocation)); + taskScheduler.registerTask(new adapter::TextSearch(engineInput.serverLogFile, stackAllocation)); break; } - case ENGINE_FUNCTION::SEARCH_AND_CATEGORIZE: - { - executionList.push_back(new adapter::Categorization()); + case ENGINE_FUNCTION::SEARCH_AND_CATEGORIZE: { + taskScheduler.registerTask(new adapter::Categorization()); break; } - case ENGINE_FUNCTION::CLIENT_UPTIME: - { + case ENGINE_FUNCTION::CLIENT_UPTIME: { //executionList.push_back(new adapter::ClientUpTime(engineInput.serverLogFile, engineInput.searchStrings)); break; } - default: - { + default: { logger->warn("Engine function " + std::to_string(engineInput.function) + " is not implemented"); break; } } - - for (IEngineAdapter *engine : executionList) - { - logger->info("Loaded " + engine->getName() + " into executionList"); - } } -void Dispatcher::run() -{ - for (IEngineAdapter *engine : executionList) - { - logger->info("Running " + engine->getName() + " Engine Adapter"); - engine->run(); - } +void Dispatcher::run() { + taskScheduler.start(); } -void Dispatcher::terminate() -{ +void Dispatcher::terminate() { logger->info("Terminating Engine Routine"); logger->info("Engine stopped"); } diff --git a/src/engine/Dispatcher.h b/src/engine/Dispatcher.h index ecec6ea..03053a9 100644 --- a/src/engine/Dispatcher.h +++ b/src/engine/Dispatcher.h @@ -14,6 +14,7 @@ #include #include #include +#include #include "interface/IEngineAdapter.h" class Dispatcher @@ -22,8 +23,7 @@ class Dispatcher EngineInput engineInput; util::Logger *logger; nlohmann::json configuration; - - std::list executionList; + TaskScheduler taskScheduler; void validate(const std::string &payload); diff --git a/src/engine/adapter/Categorization/Categorization.cpp b/src/engine/adapter/Categorization/Categorization.cpp index b1c2c33..8d718b3 100644 --- a/src/engine/adapter/Categorization/Categorization.cpp +++ b/src/engine/adapter/Categorization/Categorization.cpp @@ -16,6 +16,10 @@ using namespace adapter; +void Categorization::terminate() { + +} + Categorization::Categorization() { //load rules from fileStream into JSON object @@ -40,7 +44,7 @@ Categorization::Categorization() Categorization::~Categorization() = default; -void Categorization::run() +void Categorization::run(AsyncExecutionBuffer &asyncExecutionBuffer) { } diff --git a/src/engine/adapter/Categorization/Categorization.h b/src/engine/adapter/Categorization/Categorization.h index d6bd3ad..7fccd76 100644 --- a/src/engine/adapter/Categorization/Categorization.h +++ b/src/engine/adapter/Categorization/Categorization.h @@ -25,7 +25,7 @@ namespace adapter ~Categorization() override; - void run() override; + void run(AsyncExecutionBuffer &asyncExecutionBuffer) override; int getEngineFunction() const override; @@ -36,6 +36,8 @@ namespace adapter return (rules); } + void terminate() override; + private: std::map rules; diff --git a/src/engine/adapter/ClientUpTime/ClientUpTime.cpp b/src/engine/adapter/ClientUpTime/ClientUpTime.cpp deleted file mode 100644 index 9038514..0000000 --- a/src/engine/adapter/ClientUpTime/ClientUpTime.cpp +++ /dev/null @@ -1,73 +0,0 @@ -#include - -/* Created by BridgeTroll - * https://bridgetroll.de - * https://steamcommunity.com/id/Bridge_Troll/ - * - * ClientUpTime.cpp - * - * Created on: 17.05.2019 - * Author: bt - */ - -#include -#include -#include -#include "ClientUpTime.h" - - -using namespace adapter; - -ClientUpTime::ClientUpTime(std::string filePath, std::string searchTerms) : - filePath(std::move(filePath)) -{ - playerGuids = MBKingdoms::Lib::explode(searchTerms, ';'); - if (playerGuids.empty()) - { - throw std::system_error(std::error_code(404, std::system_category()), "No guids to search for were specified"); - } - - Categorization categorisation = Categorization(); - Rule ruleConnect(categorisation.getRules().at("player_connected_client")); - Rule ruleDisconnect(categorisation.getRules().at("player_disconnect_message_guid")); - -} - -ClientUpTime::~ClientUpTime() -= default; - -void ClientUpTime::run() -{ - auto startTime = getEngineTime(); - int resultCounter = 0; - int lineCounter = 1; - std::string line; - std::ifstream fileInputStream = getFileInputStream(filePath); - - while (getline(fileInputStream, line)) - { - if (line.find("has GUID:") != std::string::npos) - { - - auto it = std::find_if(begin(playerGuids), end(playerGuids), [&](const std::string &s) -> bool - { - return (line.find(s) != std::string::npos); - }); - - if (it != end(playerGuids)) - { - std::cout << "Connected:: " << line << "\n"; - - } - - resultCounter++; - lineCounter++; - } - - } - auto endTime = getEngineTime(); - logger.info("ClientUpTime run finished with " + std::to_string(resultCounter) + " found results. Took " + - getDurationMS(startTime, endTime) + " ms"); - -} - diff --git a/src/engine/adapter/ClientUpTime/ClientUpTime.h b/src/engine/adapter/ClientUpTime/ClientUpTime.h deleted file mode 100644 index ac40ae0..0000000 --- a/src/engine/adapter/ClientUpTime/ClientUpTime.h +++ /dev/null @@ -1,51 +0,0 @@ -/* Created by BridgeTroll - * https://bridgetroll.de - * https://steamcommunity.com/id/Bridge_Troll/ - * - * ClientUpTime.h - * - * Created on: 16.05.2019 - * Author: bt - */ - -#ifndef SRC_ENGINE_ADAPTER_CLIENTUPTIME_CLIENTUPTIME_H_ -#define SRC_ENGINE_ADAPTER_CLIENTUPTIME_CLIENTUPTIME_H_ - - -#include -#include -#include "Connection.h" - -namespace adapter -{ - - class ClientUpTime : public IEngineAdapter - { - public: - ClientUpTime(std::string filePath, std::string searchTerms); - - ~ClientUpTime() override; - - void run() override; - - int getEngineFunction() const override - { - return Dispatcher::ENGINE_FUNCTION::CLIENT_UPTIME; - } - - std::string getName() const override - { - return ("ClientUpTime"); - } - - private: - std::vector playerGuids; - std::string filePath; - std::string regexString; - util::Logger logger = util::Logger(this->getName()); - std::map> connections; - - }; -} - -#endif /* SRC_ENGINE_ADAPTER_CLIENTUPTIME_CLIENTUPTIME_H_ */ diff --git a/src/engine/adapter/ClientUpTime/Connection.h b/src/engine/adapter/ClientUpTime/Connection.h deleted file mode 100644 index 3ddd10a..0000000 --- a/src/engine/adapter/ClientUpTime/Connection.h +++ /dev/null @@ -1,25 +0,0 @@ -/* Created by BridgeTroll - * https://bridgetroll.de - * https://steamcommunity.com/id/Bridge_Troll/ - * - * Connection.h - * - * Created on: 18.05.2019 - * Author: bt - */ - -#ifndef SRC_ENGINE_ADAPTER_CLIENTUPTIME_CONNECTION_H_ -#define SRC_ENGINE_ADAPTER_CLIENTUPTIME_CONNECTION_H_ - -#include - -struct Connection -{ - std::string guid; - std::string name; - std::string connectTimeStr; - std::string disconnectTimeStr = ""; - std::string total; -}; - -#endif /* SRC_ENGINE_ADAPTER_CLIENTUPTIME_CONNECTION_H_ */ diff --git a/src/engine/adapter/MultiIPChecker/MultiIPChecker.h b/src/engine/adapter/MultiIPChecker/MultiIPChecker.h deleted file mode 100644 index 976cbbe..0000000 --- a/src/engine/adapter/MultiIPChecker/MultiIPChecker.h +++ /dev/null @@ -1,28 +0,0 @@ -/* Created by BridgeTroll - * https://bridgetroll.de - * https://steamcommunity.com/id/Bridge_Troll/ - * - * MultiIPChecker.h - * - * Created on: 17.05.2019 - * Author: bt - */ - -#ifndef SRC_ENGINE_ADAPTER_MULTIIPCHECKER_MULTIIPCHECKER_H_ -#define SRC_ENGINE_ADAPTER_MULTIIPCHECKER_MULTIIPCHECKER_H_ - -#include "interface/IEngineAdapter.h" - - -namespace adapter -{ - class MultiIPChecker : public IEngineAdapter - { - //TODO: Check for multiple IPs that use the same GUID and vice versa - //TODO: Detect multiple GUIDs for one single IP - //TODO: GeoLocation checking via open web api in parallel(non-blocking) thread ??? - }; -} - - -#endif /* SRC_ENGINE_ADAPTER_MULTIIPCHECKER_MULTIIPCHECKER_H_ */ diff --git a/src/engine/adapter/OptimizeMe/OptimizeMe.h b/src/engine/adapter/OptimizeMe/OptimizeMe.h deleted file mode 100644 index 9c8724e..0000000 --- a/src/engine/adapter/OptimizeMe/OptimizeMe.h +++ /dev/null @@ -1,47 +0,0 @@ -/* Created by BridgeTroll - * https://bridgetroll.de - * https://steamcommunity.com/id/Bridge_Troll/ - * - * OptimizeMe.h - * - * Created on: 17.05.2019 - * Author: bt - */ - -#ifndef SRC_ENGINE_ADAPTER_OPTIMIZEME_OPTIMIZEME_H_ -#define SRC_ENGINE_ADAPTER_OPTIMIZEME_OPTIMIZEME_H_ - -#include -#include - -#include "../../Dispatcher.h" -#include "interface/IEngineAdapter.h" - - -namespace adapter -{ - class OptimizeMe : public IEngineAdapter - { - public: - OptimizeMe() - { - } - - void run() - { - - } - - int getEngineFunction() const - { - return (Dispatcher::ENGINE_FUNCTION::INTERNAL_OPTIMIZE_ME); - } - - std::string getName() const - { - return ("OptimizeMe"); - } - }; -} - -#endif /* SRC_ENGINE_ADAPTER_OPTIMIZEME_OPTIMIZEME_H_ */ diff --git a/src/engine/adapter/PatternAbuser/PatternAbuser.cpp b/src/engine/adapter/PatternAbuser/PatternAbuser.cpp deleted file mode 100644 index d6e7cc0..0000000 --- a/src/engine/adapter/PatternAbuser/PatternAbuser.cpp +++ /dev/null @@ -1,17 +0,0 @@ -// -// Created by bt on 02.06.19. -// - -#include "PatternAbuser.h" - -using namespace adapter; - -void PatternAbuser::run() -{ - -} - -PatternAbuser::~PatternAbuser() -{ - -} diff --git a/src/engine/adapter/PatternAbuser/PatternAbuser.h b/src/engine/adapter/PatternAbuser/PatternAbuser.h deleted file mode 100644 index d19114b..0000000 --- a/src/engine/adapter/PatternAbuser/PatternAbuser.h +++ /dev/null @@ -1,46 +0,0 @@ -// -// Created by bt on 02.06.19. -// - -#ifndef LOG_ENGINE_PATTERNABUSER_H -#define LOG_ENGINE_PATTERNABUSER_H - -#include -#include "interface/IEngineAdapter.h" - - -namespace adapter -{ - class PatternAbuser : public IEngineAdapter - { - public: - /** - * get Engine name as string - * immutable - */ - std::string getName() const override - { - return ("PatternAbuser"); - } - - /** - * get engine function as integer value - * immutable - * */ - int getEngineFunction() const override - { - return (Dispatcher::ENGINE_FUNCTION::PATTERN_ABUSER); - } - - void run() override; - - protected: - ~PatternAbuser() override; - - private: - util::Logger logger = util::Logger(this->getName()); - }; -} - - -#endif //LOG_ENGINE_PATTERNABUSER_H diff --git a/src/engine/adapter/STDOutput.cpp b/src/engine/adapter/STDOutput.cpp new file mode 100644 index 0000000..86aa3dc --- /dev/null +++ b/src/engine/adapter/STDOutput.cpp @@ -0,0 +1,42 @@ +// +// Created by bt LAPNUG-win10 on 10.08.2019. +// + +#include "STDOutput.h" +#include +#include + +using namespace adapter; + +STDOutput::STDOutput(std::condition_variable &conditionVariable) { + running = true; +} + +int STDOutput::getEngineFunction() const { + return 0; +} + +void STDOutput::run(AsyncExecutionBuffer &asyncExecutionBuffer) { + while (shouldRun(asyncExecutionBuffer)) { + if (!asyncExecutionBuffer.isEmpty()) { + logger.debug("doing output"); + IEngineOutputElement *output = asyncExecutionBuffer.pop(); + std::cout << output->getLineContent() << std::endl; + } + std::this_thread::sleep_for(std::chrono::milliseconds(30)); + } + logger.debug("STDOutput killed"); +} + +void STDOutput::terminate() { + logger.debug("send shutdown to STDOutput Thread"); + running = false; +} + +bool STDOutput::shouldRun(AsyncExecutionBuffer &asyncExecutionBuffer) { + if (running && !asyncExecutionBuffer.isEmpty()) { return true; } + else if (!running && !asyncExecutionBuffer.isEmpty()) { return true; } + else if (running && asyncExecutionBuffer.isEmpty()) { return true; } + else if (!running && asyncExecutionBuffer.isEmpty()) { return false; } + return false; +} diff --git a/src/engine/adapter/STDOutput.h b/src/engine/adapter/STDOutput.h new file mode 100644 index 0000000..4013073 --- /dev/null +++ b/src/engine/adapter/STDOutput.h @@ -0,0 +1,37 @@ +// +// Created by bt LAPNUG-win10 on 10.08.2019. +// + +#ifndef LOG_ENGINE_STDOUTPUT_H +#define LOG_ENGINE_STDOUTPUT_H + + +#include + +namespace adapter { + class STDOutput : public IEngineAdapter { + private: + bool running; + util::Logger logger = util::Logger(getName()); + + bool shouldRun(AsyncExecutionBuffer &asyncExecutionBuffer); + + public: + explicit STDOutput(std::condition_variable &conditionVariable); + + ~STDOutput() override = default; + + int getEngineFunction() const override; + + void run(AsyncExecutionBuffer &asyncExecutionBuffer) override; + + std::string getName() const override + { + return "STDOutput"; + }; + + void terminate() override; + }; +} + +#endif //LOG_ENGINE_STDOUTPUT_H diff --git a/src/engine/adapter/TextSearch/TextSearch.cpp b/src/engine/adapter/TextSearch/TextSearch.cpp index f3282e7..fae7eee 100644 --- a/src/engine/adapter/TextSearch/TextSearch.cpp +++ b/src/engine/adapter/TextSearch/TextSearch.cpp @@ -12,6 +12,7 @@ #include "../../../lib/json.hpp" #include "../../../lib/MBLib.h" #include "TextSearchPayload.h" +#include "TextSearchBufferElement.h" namespace adapter @@ -59,7 +60,7 @@ namespace adapter TextSearch::~TextSearch() = default; - void TextSearch::run() + void TextSearch::run(AsyncExecutionBuffer &asyncExecutionBuffer) { auto searchStartTime = getEngineTime(); std::ifstream fileInputStream = getFileInputStream(filePath); @@ -68,10 +69,10 @@ namespace adapter int resultCounter = 0; nlohmann::json o; - std::list j; + //std::list j; - OutputWrapper outputWrapper{}; - outputWrapper.open(); + //OutputWrapper outputWrapper{}; + //outputWrapper.open(); while (getline(fileInputStream, line)) { lineCounter++; @@ -118,14 +119,16 @@ namespace adapter if (it != end(searchTerms)) { - o["lineNumber"] = lineCounter; - o["string"] = line; - outputWrapper.push(o.dump()); + TextSearchBufferElement* textSearchBufferElement = new TextSearchBufferElement(lineCounter, line); + asyncExecutionBuffer.push(textSearchBufferElement); + o["line"] = line; + o["number"] = lineCounter; + std::cout << o.dump(); resultCounter++; } } - outputWrapper.close(); + //outputWrapper.close(); auto searchEndTime = getEngineTime(); logger.info("Search run finished with " + std::to_string(resultCounter) + " found results. Took " + @@ -141,4 +144,8 @@ namespace adapter { return Dispatcher::ENGINE_FUNCTION::SEARCH; } + + void TextSearch::terminate() { + + } } diff --git a/src/engine/adapter/TextSearch/TextSearch.h b/src/engine/adapter/TextSearch/TextSearch.h index 329f55c..d7b366e 100644 --- a/src/engine/adapter/TextSearch/TextSearch.h +++ b/src/engine/adapter/TextSearch/TextSearch.h @@ -33,7 +33,9 @@ namespace adapter */ int getEngineFunction() const override; - void run() override; + void run(AsyncExecutionBuffer &asyncExecutionBuffer) override; + + void terminate() override; protected: ~TextSearch() override; diff --git a/src/engine/adapter/TextSearch/TextSearchBufferElement.h b/src/engine/adapter/TextSearch/TextSearchBufferElement.h new file mode 100644 index 0000000..3607c37 --- /dev/null +++ b/src/engine/adapter/TextSearch/TextSearchBufferElement.h @@ -0,0 +1,48 @@ +// +// Created by bt LAPNUG-win10 on 10.08.2019. +// + +#ifndef LOG_ENGINE_TEXTSEARCHBUFFERELEMENT_H +#define LOG_ENGINE_TEXTSEARCHBUFFERELEMENT_H + +#include +#include + +class TextSearchBufferElement : public IEngineOutputElement { +private: + int lineNumber; + std::string lineContent; + bool stale; +public: + + TextSearchBufferElement(const int lineNumber, const std::string &lineContent) { + TextSearchBufferElement::lineNumber = lineNumber; + TextSearchBufferElement::lineContent = lineContent; + TextSearchBufferElement::stale = false; + } + + std::string toJson() override { + nlohmann::json o; + o["lineNumber"] = lineNumber; + o["string"] = lineContent; + return o.dump(); + } + + std::string getLineContent() override { + return lineContent; + } + + int getLineNumber() override { + return lineNumber; + } + + void setStaleness(const bool &staleness) override { + TextSearchBufferElement::stale = staleness; + } + + bool isElementStale() override { + return stale; + } +}; + +#endif //LOG_ENGINE_TEXTSEARCHBUFFERELEMENT_H diff --git a/src/engine/interface/IEngineAdapter.h b/src/engine/interface/IEngineAdapter.h index 01ce561..e24045e 100644 --- a/src/engine/interface/IEngineAdapter.h +++ b/src/engine/interface/IEngineAdapter.h @@ -14,47 +14,36 @@ #include #include #include +#include +#include using engineTime = std::chrono::time_point; -class IEngineAdapter -{ +class IEngineAdapter : public IExecutionTask { public: virtual int getEngineFunction() const = 0; - virtual void run() = 0; - virtual std::string getName() const = 0; protected: - virtual ~IEngineAdapter() - { - //Todo cleanup lololololol just joking, - //this would trash the memory at some point if the interface is used wrongly, so JUST DO IT - } + virtual ~IEngineAdapter() = default; - std::ifstream getFileInputStream(std::string filePath) - { + static std::ifstream getFileInputStream(const std::string &filePath) { std::ifstream filePtr(filePath, std::ifstream::in); - if (!filePtr.is_open()) - { + if (!filePtr.is_open()) { throw std::system_error(std::error_code(404, std::system_category()), "File " + filePath + " cannot be opened"); - } - else - { + } else { return (filePtr); } } - engineTime getEngineTime() - { + static engineTime getEngineTime() { return (std::chrono::high_resolution_clock::now()); } - std::string getDurationMS(engineTime start, engineTime end) - { + static std::string getDurationMS(engineTime start, engineTime end) { return (std::to_string(std::chrono::duration_cast< std::chrono::milliseconds>(end - start).count())); } diff --git a/src/engine/interface/IEngineOutputElement.h b/src/engine/interface/IEngineOutputElement.h new file mode 100644 index 0000000..84e4cf9 --- /dev/null +++ b/src/engine/interface/IEngineOutputElement.h @@ -0,0 +1,21 @@ +// +// Created by bt LAPNUG-win10 on 10.08.2019. +// + +#ifndef LOG_ENGINE_IENGINEOUTPUTELEMENT_H +#define LOG_ENGINE_IENGINEOUTPUTELEMENT_H + +#include + +class IEngineOutputElement { +public: + virtual std::string toJson() = 0; + virtual std::string getLineContent() = 0; + virtual int getLineNumber() = 0; + + //Ready for output? + virtual void setStaleness(const bool &staleness) = 0; + virtual bool isElementStale() = 0; +}; + +#endif //LOG_ENGINE_IENGINEOUTPUTELEMENT_H diff --git a/src/engine/model/IExecutionTask.h b/src/engine/model/IExecutionTask.h new file mode 100644 index 0000000..ccf1e57 --- /dev/null +++ b/src/engine/model/IExecutionTask.h @@ -0,0 +1,18 @@ +// +// Created by bt LAPNUG-win10 on 10.08.2019. +// + +#ifndef LOG_ENGINE_IEXECUTIONTASK_H +#define LOG_ENGINE_IEXECUTIONTASK_H + + +#include + +class IExecutionTask { +public: + virtual void run(AsyncExecutionBuffer& asyncExecutionBuffer) = 0; + virtual void terminate() = 0; +}; + + +#endif //LOG_ENGINE_IEXECUTIONTASK_H diff --git a/src/engine/task/AsyncExecutionBuffer.cpp b/src/engine/task/AsyncExecutionBuffer.cpp new file mode 100644 index 0000000..1de8e65 --- /dev/null +++ b/src/engine/task/AsyncExecutionBuffer.cpp @@ -0,0 +1,30 @@ +// +// Created by bt LAPNUG-win10 on 10.08.2019. +// + +#include "AsyncExecutionBuffer.h" + +AsyncExecutionBuffer::AsyncExecutionBuffer(std::condition_variable &conditionVariable) { + logger.info("AsyncExecutionBuffer is initialized"); +} + +void AsyncExecutionBuffer::push(IEngineOutputElement *bufferInput) { + { + std::lock_guard guard(mtx); //RAII + buffer.emplace_back(bufferInput); + logger.debug("perform write to execution buffer. new size " + std::to_string(buffer.size())); + } + +} + +bool AsyncExecutionBuffer::isEmpty() { + return buffer.empty(); +} + +IEngineOutputElement *AsyncExecutionBuffer::pop() { + std::lock_guard guard(mtx); + IEngineOutputElement *firstElement = buffer.front(); + buffer.pop_front(); + logger.debug("perform pop to execution buffer. new size " + std::to_string(buffer.size())); + return firstElement; +} diff --git a/src/engine/task/AsyncExecutionBuffer.h b/src/engine/task/AsyncExecutionBuffer.h new file mode 100644 index 0000000..0913415 --- /dev/null +++ b/src/engine/task/AsyncExecutionBuffer.h @@ -0,0 +1,32 @@ +// +// Created by bt LAPNUG-win10 on 10.08.2019. +// + +#ifndef LOG_ENGINE_ASYNCEXECUTIONBUFFER_H +#define LOG_ENGINE_ASYNCEXECUTIONBUFFER_H + + +#include +#include +#include +#include +#include + +class AsyncExecutionBuffer { +private: + std::list buffer; + util::Logger logger = util::Logger("AsyncExecutionBuffer"); + std::mutex mtx; + std::condition_variable conditionVariable; +public: + explicit AsyncExecutionBuffer(std::condition_variable &conditionVariable); + + void push(IEngineOutputElement *bufferInput); + + IEngineOutputElement *pop(); + + bool isEmpty(); +}; + + +#endif //LOG_ENGINE_ASYNCEXECUTIONBUFFER_H diff --git a/src/engine/task/TaskScheduler.cpp b/src/engine/task/TaskScheduler.cpp new file mode 100644 index 0000000..79412a0 --- /dev/null +++ b/src/engine/task/TaskScheduler.cpp @@ -0,0 +1,35 @@ +// +// Created by bt LAPNUG-win10 on 10.08.2019. +// + +#include +#include "TaskScheduler.h" + +TaskScheduler::TaskScheduler() { + asyncExecutionBuffer = new AsyncExecutionBuffer(conditionVariable); +} + +void TaskScheduler::registerTask(IExecutionTask *const &executionTask) { + executionList.push_back(executionTask); +} + +void TaskScheduler::start() { + adapter::STDOutput stdOutput(conditionVariable); + std::thread outputThread([&stdOutput, this]() -> void { + stdOutput.run(*asyncExecutionBuffer); + }); + + for (auto &execution : executionList) { + threadList.emplace_back(std::thread([execution, this]() -> void { + execution->run(*this->asyncExecutionBuffer); + })); + } + + for (auto &thread: threadList) { + thread.join(); + } + stdOutput.terminate(); + outputThread.join(); +} + + diff --git a/src/engine/task/TaskScheduler.h b/src/engine/task/TaskScheduler.h new file mode 100644 index 0000000..2eb4957 --- /dev/null +++ b/src/engine/task/TaskScheduler.h @@ -0,0 +1,35 @@ +// +// Created by bt LAPNUG-win10 on 10.08.2019. +// + +#ifndef LOG_ENGINE_TASKSCHEDULER_H +#define LOG_ENGINE_TASKSCHEDULER_H + + +#include +#include +#include +#include "AsyncExecutionBuffer.h" + +class TaskScheduler { +private: + AsyncExecutionBuffer *asyncExecutionBuffer = nullptr; + + std::condition_variable conditionVariable; + std::mutex mutex; + + std::list executionList; + std::list threadList; + + util::Logger logger = util::Logger("TaskScheduler"); + +public: + explicit TaskScheduler(); + + void registerTask(IExecutionTask *const &executionTask); + + void start(); +}; + + +#endif //LOG_ENGINE_TASKSCHEDULER_H diff --git a/src/modules/StartRoutine/StartRoutine.cpp b/src/modules/StartRoutine/StartRoutine.cpp index 5e0f231..261e03e 100644 --- a/src/modules/StartRoutine/StartRoutine.cpp +++ b/src/modules/StartRoutine/StartRoutine.cpp @@ -17,7 +17,7 @@ StartRoutine::StartRoutine(const std::string &stdInput) { nlohmann::json jsonHND = nlohmann::json::parse(stdInput); EngineInput eIN = jsonHND.get(); - Config* c = new Config(eIN.configPath); + new Config(eIN.configPath); - Dispatcher d = Dispatcher(eIN); + Dispatcher d(eIN); } diff --git a/test/TestTaskScheduler.cpp b/test/TestTaskScheduler.cpp new file mode 100644 index 0000000..2f8264b --- /dev/null +++ b/test/TestTaskScheduler.cpp @@ -0,0 +1,32 @@ +// +// Created by bt LAPNUG-win10 on 10.08.2019. +// + +#include +#include + +class myTask : public IExecutionTask { +public: + bool stopped{false}; + + void run(AsyncExecutionBuffer* asyncExecutionBuffer) override { + for (int i = 0; i < 200; i++) { + if (stopped) { + return; + } + asyncExecutionBuffer->push(new TextSearchBufferElement(i, "some text at line " + std::to_string(i))); + std::this_thread::sleep_for(std::chrono::seconds(2)); + } + } + + void terminate() override { + stopped = true; + } +}; + +int main() { + TaskScheduler taskScheduler; + taskScheduler.registerTask(new myTask); + + taskScheduler.start(); +} \ No newline at end of file