38 if (conn.
dest == ref_block) {
41 for (
const auto& temp_conn : this->connections[conn.
dest->
getID()]) {
42 if (ref_block == temp_conn.src
43 || this->find_cycle(temp_conn, ref_block) == -1) {
53 if (!(this->isRegistered(connection.
src)
54 && this->isRegistered(connection.
dest)))
57 "RT::Connector : source or destination blocks are not registered");
60 if (this->find_cycle(connection, connection.
src) == -1) {
64 if (!(this->connected(connection))) {
65 this->connections[connection.
src->
getID()].push_back(connection);
72 if (!(this->isRegistered(connection.
src)
73 && this->isRegistered(connection.
dest)))
77 const size_t src_id = connection.
src->
getID();
78 auto iter = std::find(this->connections[src_id].begin(),
79 this->connections[src_id].end(),
81 return iter != this->connections[src_id].end();
86 if (!(this->isRegistered(connection.
src)
87 && this->isRegistered(connection.
dest)))
91 const size_t src_id = connection.
src->
getID();
92 auto it = std::find(this->connections[src_id].begin(),
93 this->connections[src_id].end(),
95 if (it != this->connections[src_id].end()) {
96 this->connections[src_id].erase(it);
101 IO::Block* block, std::vector<RT::block_connection_t>& block_connections)
103 if (block ==
nullptr || this->isRegistered(block)) {
115 for (
id = 0;
id < this->block_registry.size();
id++) {
116 if (this->block_registry[
id] ==
nullptr) {
117 this->block_registry[id] = block;
119 this->connections[id].swap(block_connections);
126 block->
assignID(this->block_registry.size());
127 this->block_registry.push_back(block);
128 this->connections.emplace_back(std::move(block_connections));
134 if (block ==
nullptr || !(this->isRegistered(block))) {
138 this->block_registry[block->
getID()] =
nullptr;
144 if (block ==
nullptr) {
147 if (block->
getID() >= this->block_registry.size()) {
150 return block == this->block_registry[block->
getID()];
153 std::vector<RT::Thread*> RT::Connector::topological_sort()
155 auto processing_q = std::queue<IO::Block*>();
156 auto sorted_blocks = std::vector<IO::Block*>();
157 auto sources_per_block = std::unordered_map<IO::Block*, int>();
161 for (
auto* block : this->block_registry) {
162 if (block ==
nullptr) {
165 sources_per_block[block] = 0;
169 for (
const auto& entry : this->connections) {
170 for (
const auto& conn : entry) {
171 sources_per_block[conn.
dest] += 1;
176 for (
auto block_count : sources_per_block) {
177 if (block_count.second == 0) {
178 processing_q.push(block_count.first);
183 while (!processing_q.empty()) {
184 sorted_blocks.push_back(processing_q.front());
185 for (
const auto& entry : this->connections) {
186 for (
const auto& conn : entry) {
187 if (processing_q.front() == conn.
src) {
188 sources_per_block[conn.
dest] -= 1;
189 if (sources_per_block[conn.
dest] == 0) {
190 processing_q.push(conn.
dest);
199 std::vector<RT::Thread*> sorted_active_threads;
200 for (
auto* block : sorted_blocks) {
201 if (block->getActive() && block->dependent()) {
202 sorted_active_threads.push_back(
dynamic_cast<RT::Thread*
>(block));
205 return sorted_active_threads;
210 std::vector<RT::Device*> devices;
211 for (
auto* block : this->block_registry) {
212 if (block ==
nullptr) {
215 if (block->getActive() && !block->dependent()) {
216 devices.push_back(
dynamic_cast<RT::Device*
>(block));
224 return this->topological_sort();
229 if (!this->isRegistered(src)) {
232 return this->connections[src->
getID()];
237 for (
const auto& conn : this->connections[block->
getID()]) {
245 for (
auto& entry : this->connections) {
246 entry.erase(std::remove_if(entry.begin(),
249 { return conn.dest == block; }),
256 std::vector<IO::Block*> blocks;
257 for (
auto* block : this->block_registry) {
258 if (block !=
nullptr) {
259 blocks.push_back(block);
267 std::vector<RT::block_connection_t> all_connections;
268 for (
auto entry : this->connections) {
269 all_connections.insert(all_connections.end(), entry.begin(), entry.end());
271 return all_connections;
282 ERROR_MSG(
"RT::System::System : failed to create Fifo");
285 this->task = std::make_unique<RT::OS::Task>();
287 ERROR_MSG(
"RT::System::System : failed to create realtime thread\n");
291 this->threads.reserve(100);
292 this->devices.reserve(100);
298 this->task->task_finished =
true;
300 this->event_manager->unregisterHandler(
this);
301 this->telemitry_processing_thread_running =
false;
302 this->eventFifo->close();
303 if (this->telemitry_processing_thread.joinable()) {
304 this->telemitry_processing_thread.join();
310 return this->task->period;
322 eventLogger* logger = this->event_manager->getLogger();
323 std::vector<RT::Telemitry::Response> responses;
324 while (!this->task->task_finished
325 && this->telemitry_processing_thread_running) {
326 responses = this->getTelemitry();
327 for (
auto telem : responses) {
328 if (telem.cmd !=
nullptr) {
332 this->telemitry_processing_thread_running =
false;
339 this->telemitry_processing_thread = std::thread(proc);
341 std::string(
"TelemitryWorker"));
346 this->eventFifo->poll();
347 std::vector<RT::Telemitry::Response> responses;
351 responses.push_back(telemitry);
356 void RT::System::setPeriod(RT::System::CMD* cmd)
358 auto period = std::get<int64_t>(cmd->getRTParam(
"period"));
359 this->task->period = period;
361 this->postTelemitry(telem);
364 void RT::System::updateDeviceList(RT::System::CMD* cmd)
366 std::vector<RT::Device*>* vec_ptr =
367 std::get<std::vector<RT::Device*>*>(cmd->getRTParam(
"deviceList"));
368 this->devices.clear();
369 this->devices.assign(vec_ptr->begin(), vec_ptr->end());
371 auto* device = std::get<RT::Device*>(cmd->getRTParam(
"device"));
372 this->rt_connector->clearAllConnections(device);
377 this->postTelemitry(telem);
380 void RT::System::updateThreadList(RT::System::CMD* cmd)
382 std::vector<RT::Thread*>* vec_ptr =
383 std::get<std::vector<RT::Thread*>*>(cmd->getRTParam(
"threadList"));
384 this->threads.clear();
385 this->threads.assign(vec_ptr->begin(), vec_ptr->end());
387 auto* thread = std::get<RT::Thread*>(cmd->getRTParam(
"thread"));
388 this->rt_connector->clearAllConnections(thread);
393 this->postTelemitry(telem);
396 void RT::System::ioLinkUpdateCMD(RT::System::CMD* cmd)
398 auto conn = std::get<RT::block_connection_t>(cmd->getRTParam(
"connection"));
401 switch (cmd->getType()) {
403 this->rt_connector->connect(conn);
407 this->rt_connector->disconnect(conn);
414 this->postTelemitry(telem);
417 void RT::System::getPeriodTicksCMD(RT::System::CMD* cmd)
420 switch (cmd->getType()) {
422 value = &(this->periodStartTime);
423 cmd->setRTParam(
"pre-period", value);
426 value = &(this->periodEndTime);
427 cmd->setRTParam(
"post-period", value);
433 this->postTelemitry(telem);
436 void RT::System::changeWidgetParametersCMD(RT::System::CMD* cmd)
442 std::get<Widgets::Component*>(cmd->getRTParam(
"paramWidget"));
443 auto param_id = std::get<size_t>(cmd->getRTParam(
"paramID"));
444 auto param_type_num = std::get<uint64_t>(cmd->getRTParam(
"paramType"));
447 switch (param_type) {
449 component->setValue<
double>(param_id, std::get<double>(param_value_any));
452 component->setValue<int64_t>(param_id,
453 std::get<int64_t>(param_value_any));
456 component->setValue<uint64_t>(param_id,
457 std::get<uint64_t>(param_value_any));
463 std::get<State::state_t>(param_value_any)));
467 "Widget Parameter Change event does not contain expected parameter " 471 this->postTelemitry(telem);
474 void RT::System::changeWidgetStateCMD(RT::System::CMD* cmd)
479 auto* component = std::get<Widgets::Component*>(cmd->getRTParam(
"component"));
480 auto state = std::get<RT::State::state_t>(cmd->getRTParam(
"state"));
481 component->setState(state);
482 this->postTelemitry(telem);
485 void RT::System::executeCMD(RT::System::CMD* cmd)
489 switch (cmd->getType()) {
495 this->ioLinkUpdateCMD(cmd);
499 this->getPeriodTicksCMD(cmd);
505 this->updateDeviceList(cmd);
511 this->updateThreadList(cmd);
514 this->task->task_finished =
true;
517 this->postTelemitry(telem);
520 this->changeWidgetParametersCMD(cmd);
523 this->changeWidgetStateCMD(cmd);
528 this->postTelemitry(telem);
533 RT::System::postTelemitry(telem);
545 if (this->task->task_finished) {
554 this->blockInfoRequest(event);
557 this->connectionsInfoRequest(event);
560 this->allConnectionsInfoRequest(event);
564 this->ioLinkChange(event);
567 this->insertThread(event);
570 this->removeThread(event);
574 this->threadActivityChange(event);
578 this->deviceActivityChange(event);
581 this->insertDevice(event);
584 this->removeDevice(event);
587 this->changeWidgetParameters(event);
590 this->changeWidgetState(event);
597 this->provideTimetickPointers(event);
600 this->getPeriodValues(event);
613 RT::System::CMD cmd(event->
getType());
614 auto period = std::any_cast<int64_t>(event->
getParam(
"period"));
615 cmd.setRTParam(
"period", period);
616 RT::System::CMD* cmd_ptr = &cmd;
617 this->eventFifo->write(&cmd_ptr,
sizeof(RT::System::CMD*));
623 event->setParam(
"period", std::any(this->
getPeriod()));
629 RT::System::CMD* cmd_ptr = &cmd;
630 this->eventFifo->write(&cmd_ptr,
sizeof(RT::System::CMD*));
636 auto* device = std::any_cast<RT::Device*>(event->
getParam(
"device"));
637 std::vector<RT::block_connection_t> connections_memory;
638 connections_memory.reserve(10);
639 if (device ==
nullptr) {
640 ERROR_MSG(
"RT::System::insertDevice : invalid device pointer\n");
643 this->rt_connector->insertBlock(device, connections_memory);
644 std::vector<RT::Device*> device_list = this->rt_connector->getDevices();
645 RT::System::CMD cmd(event->
getType());
646 cmd.setRTParam(
"deviceList", &device_list);
647 RT::System::CMD* cmd_ptr = &cmd;
648 this->eventFifo->write(&cmd_ptr,
sizeof(RT::System::CMD*));
654 auto* device = std::any_cast<RT::Device*>(event->
getParam(
"device"));
655 if (device ==
nullptr) {
656 ERROR_MSG(
"RT::System::removeDevice : invalid device pointer\n");
660 this->rt_connector->removeBlock(device);
661 std::vector<RT::Device*> device_list = this->rt_connector->getDevices();
662 RT::System::CMD cmd(event->
getType());
663 cmd.setRTParam(
"deviceList", &device_list);
664 cmd.setRTParam(
"device", device);
665 RT::System::CMD* cmd_ptr = &cmd;
666 this->eventFifo->write(&cmd_ptr,
sizeof(RT::System::CMD*));
672 auto* thread = std::any_cast<RT::Thread*>(event->
getParam(
"thread"));
673 std::vector<RT::block_connection_t> connections_memory;
674 connections_memory.reserve(10);
675 if (thread ==
nullptr) {
676 ERROR_MSG(
"RT::System::removeDevice : invalid device pointer\n");
679 this->rt_connector->insertBlock(thread, connections_memory);
680 std::vector<RT::Thread*> thread_list = this->rt_connector->getThreads();
681 RT::System::CMD cmd(event->
getType());
682 cmd.setRTParam(
"threadList", &thread_list);
683 RT::System::CMD* cmd_ptr = &cmd;
684 this->eventFifo->write(&cmd_ptr,
sizeof(RT::System::CMD*));
691 auto* thread = std::any_cast<RT::Thread*>(event->
getParam(
"thread"));
692 if (thread ==
nullptr) {
693 ERROR_MSG(
"RT::System::removeDevice : invalid device pointer\n");
697 thread->setActive(
false);
698 this->rt_connector->removeBlock(thread);
699 std::vector<RT::Thread*> thread_list = this->rt_connector->getThreads();
700 RT::System::CMD cmd(event->
getType());
701 cmd.setRTParam(
"threadList", &thread_list);
702 cmd.setRTParam(
"thread", thread);
703 RT::System::CMD* cmd_ptr = &cmd;
704 this->eventFifo->write(&cmd_ptr,
sizeof(RT::System::CMD*));
711 RT::System::CMD cmd(event->
getType());
712 RT::System::CMD* cmd_ptr = &cmd;
713 auto* thread = std::any_cast<RT::Thread*>(event->
getParam(
"thread"));
714 thread->setActive(isactive);
715 auto thread_list = this->rt_connector->getThreads();
716 cmd.setRTParam(
"threadList", &thread_list);
717 this->eventFifo->write(&cmd_ptr,
sizeof(RT::System::CMD*));
724 RT::System::CMD cmd(event->
getType());
725 RT::System::CMD* cmd_ptr = &cmd;
726 auto* device = std::any_cast<RT::Device*>(event->
getParam(
"device"));
727 device->setActive(isactive);
728 auto device_list = this->rt_connector->getDevices();
729 cmd.setRTParam(
"deviceList", &device_list);
730 this->eventFifo->write(&cmd_ptr,
sizeof(RT::System::CMD*));
736 RT::System::CMD cmd(event->
getType());
739 std::any_cast<RT::block_connection_t>(event->
getParam(
"connection")));
740 RT::System::CMD* cmd_ptr = &cmd;
741 this->eventFifo->write(&cmd_ptr,
sizeof(RT::System::CMD*));
745 void RT::System::connectionsInfoRequest(
Event::Object* event)
747 auto* source = std::any_cast<IO::Block*>(event->
getParam(
"block"));
748 const std::vector<RT::block_connection_t> outputs =
749 this->rt_connector->getOutputs(source);
750 event->setParam(
"outputs", std::any(outputs));
753 void RT::System::allConnectionsInfoRequest(
Event::Object* event)
755 const std::vector<RT::block_connection_t> all_conn =
756 this->rt_connector->getAllConnections();
757 event->setParam(
"connections", std::any(all_conn));
762 const std::vector<IO::Block*> blocks =
763 this->rt_connector->getRegisteredBlocks();
764 event->setParam(
"blockList", std::any(blocks));
769 RT::System::CMD cmd(event->
getType());
770 RT::System::CMD* cmd_ptr = &cmd;
771 this->eventFifo->write(&cmd_ptr,
sizeof(RT::System::CMD*));
775 void RT::System::provideTimetickPointers(
Event::Object* event)
777 RT::System::CMD cmd(event->
getType());
778 RT::System::CMD* cmd_ptr = &cmd;
779 this->eventFifo->write(&cmd_ptr,
sizeof(RT::System::CMD*));
782 int64_t* startperiod =
nullptr;
783 int64_t* stopperiod =
nullptr;
787 startperiod = std::get<int64_t*>(cmd.getRTParam(
"pre-period"));
788 event->setParam(
"pre-period", std::any(startperiod));
791 stopperiod = std::get<int64_t*>(cmd.getRTParam(
"post-period"));
792 event->setParam(
"post-period", std::any(stopperiod));
799 void RT::System::changeWidgetParameters(
Event::Object* event)
802 RT::System::CMD cmd(event->
getType());
804 std::any_cast<Widgets::Component*>(event->
getParam(
"paramWidget"));
805 cmd.setRTParam(
"paramWidget", component);
806 auto param_id = std::any_cast<size_t>(event->
getParam(
"paramID"));
807 cmd.setRTParam(
"paramID", param_id);
808 auto param_type = std::any_cast<Widgets::Variable::variable_t>(
810 cmd.setRTParam(
"paramType", param_type);
811 std::any param_value_any =
event->getParam(
"paramValue");
812 switch (param_type) {
814 cmd.setRTParam(
"paramValue", std::any_cast<double>(param_value_any));
817 cmd.setRTParam(
"paramValue", std::any_cast<int64_t>(param_value_any));
821 cmd.setRTParam(
"paramValue", std::any_cast<uint64_t>(param_value_any));
824 cmd.setRTParam(
"paramValue",
825 std::any_cast<RT::State::state_t>(param_value_any));
829 "Widget Parameter Change event does not contain expected parameter " 833 RT::System::CMD* cmd_ptr = &cmd;
834 this->eventFifo->write(&cmd_ptr,
sizeof(RT::System::CMD*));
840 RT::System::CMD cmd(event->
getType());
842 std::any_cast<Widgets::Component*>(event->
getParam(
"component"));
843 auto state = std::any_cast<RT::State::state_t>(event->
getParam(
"state"));
844 cmd.setRTParam(
"component", component);
845 cmd.setRTParam(
"state", state);
846 RT::System::CMD* cmd_ptr = &cmd;
847 this->eventFifo->write(&cmd_ptr,
sizeof(RT::System::CMD*));
857 const std::string_view& param_name)
859 for (
auto& parameter : rt_params) {
860 if (parameter.name == param_name) {
861 return parameter.value;
864 return std::monostate();
867 void RT::System::CMD::setRTParam(
const std::string_view& param_name,
870 for (
auto& parameter : rt_params) {
871 if (parameter.name == param_name) {
872 parameter.value = value;
878 temp.name = param_name;
880 rt_params.push_back(temp);
883 void RT::System::execute(
void* sys)
886 RT::System::CMD* cmd =
nullptr;
890 "RT::System::execute : failed to set the initial period of the " 891 "realtime thread\n");
896 system->task->next_t = starttime + system->task->period;
897 while (!(system->task->task_finished)) {
901 system->periodStartTime = starttime;
902 system->periodEndTime = endtime;
908 for (
auto* iDevice : system->devices) {
910 system->rt_connector->propagateBlockConnections(iDevice);
913 for (
auto* iThread : system->threads) {
915 system->rt_connector->propagateBlockConnections(iThread);
918 for (
auto* iDevice : system->devices) {
922 while (system->eventFifo->readRT(&cmd,
sizeof(RT::System::CMD*)) > 0) {
923 system->executeCMD(cmd);
void registerHandler(Handler *handler)
Event::Type getType() const
std::any getParam(const std::string ¶m_name) const
void writeinput(size_t index, const double &data)
void assignID(size_t block_id)
const double & readPort(IO::flags_t direction, size_t index)
std::vector< RT::block_connection_t > getAllConnections()
std::vector< IO::Block * > getRegisteredBlocks()
std::vector< RT::Thread * > getThreads()
void propagateBlockConnections(IO::Block *block)
void disconnect(block_connection_t connection)
int connect(block_connection_t connection)
void insertBlock(IO::Block *block, std::vector< RT::block_connection_t > &block_connections)
bool isRegistered(IO::Block *block)
std::vector< RT::block_connection_t > getOutputs(IO::Block *src)
bool connected(block_connection_t connection)
void clearAllConnections(IO::Block *block)
std::vector< RT::Device * > getDevices()
void removeBlock(IO::Block *block)
std::vector< RT::Telemitry::Response > getTelemitry()
System(Event::Manager *em, RT::Connector *rtc)
void receiveEvent(Event::Object *event) override
void createTelemitryProcessor()
void log(Event::Object *event)
void ERROR_MSG(const std::string &errmsg, Args... args)
@ RT_WIDGET_PARAMETER_CHANGE_EVENT
@ RT_DEVICE_UNPAUSE_EVENT
@ RT_THREAD_UNPAUSE_EVENT
@ RT_WIDGET_STATE_CHANGE_EVENT
@ IO_ALL_CONNECTIONS_QUERY_EVENT
@ IO_BLOCK_OUTPUTS_QUERY_EVENT
constexpr size_t INVALID_BLOCK_ID
int createTask(Task *task, void(*func)(void *), void *arg)
const uint64_t DEFAULT_FIFO_SIZE
int setPeriod(Task *task, int64_t period)
int getFifo(std::unique_ptr< Fifo > &fifo, size_t fifo_size)
void shutdown(RT::OS::Task *task)
void sleepTimestep(Task *task)
void renameOSThread(std::thread &thread, const std::string &name)
void deleteTask(Task *task)
const int64_t DEFAULT_PERIOD
constexpr response_t RT_WIDGET_PARAM_UPDATE
constexpr response_t RT_DEVICE_LIST_UPDATE
constexpr response_t RT_NOOP
constexpr response_t IO_LINK_UPDATED
constexpr response_t RT_PERIOD_UPDATE
constexpr response_t RT_ERROR
constexpr response_t RT_THREAD_LIST_UPDATE
constexpr response_t RT_WIDGET_STATE_UPDATE
constexpr response_t RT_SHUTDOWN
std::variant< std::monostate, int64_t, int64_t *, uint64_t, double, RT::Thread *, std::vector< RT::Thread * > *, RT::Device *, std::vector< RT::Device * > *, IO::Block *, RT::block_connection_t, Widgets::Component *, State::state_t, std::string > command_param_t
IO::flags_t src_port_type