RTXI  3.0.0
The Real-Time eXperiment Interface Reference Manual
rt.cpp
Go to the documentation of this file.
1 /*
2  The Real-Time eXperiment Interface (RTXI)
3  Copyright (C) 2011 Georgia Institute of Technology, University of Utah,
4  Will Cornell Medical College
5 
6  This program is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 
19  */
20 
21 #include <functional>
22 #include <iostream>
23 #include <queue>
24 
25 #include "rt.hpp"
26 
27 #include "debug.hpp"
28 #include "event.hpp"
29 #include "fifo.hpp"
30 #include "logger.hpp"
31 #include "rtos.hpp"
32 #include "widgets.hpp"
33 
34 // TODO: convert cycle detection into non-recursive version
35 int RT::Connector::find_cycle(RT::block_connection_t conn, IO::Block* ref_block)
36 {
37  // Cannot connect a block with itself
38  if (conn.dest == ref_block) {
39  return -1;
40  }
41  for (const auto& temp_conn : this->connections[conn.dest->getID()]) {
42  if (ref_block == temp_conn.src
43  || this->find_cycle(temp_conn, ref_block) == -1) {
44  return -1;
45  }
46  }
47  return 0;
48 }
49 
51 {
52  // Let's remind our users to register their block first
53  if (!(this->isRegistered(connection.src)
54  && this->isRegistered(connection.dest)))
55  {
56  ERROR_MSG(
57  "RT::Connector : source or destination blocks are not registered");
58  return -1;
59  }
60  if (this->find_cycle(connection, connection.src) == -1) {
61  return -1;
62  }
63 
64  if (!(this->connected(connection))) {
65  this->connections[connection.src->getID()].push_back(connection);
66  }
67  return 0;
68 }
69 
71 {
72  if (!(this->isRegistered(connection.src)
73  && this->isRegistered(connection.dest)))
74  {
75  return false;
76  }
77  const size_t src_id = connection.src->getID();
78  auto iter = std::find(this->connections[src_id].begin(),
79  this->connections[src_id].end(),
80  connection);
81  return iter != this->connections[src_id].end();
82 }
83 
85 {
86  if (!(this->isRegistered(connection.src)
87  && this->isRegistered(connection.dest)))
88  {
89  return;
90  }
91  const size_t src_id = connection.src->getID();
92  auto it = std::find(this->connections[src_id].begin(),
93  this->connections[src_id].end(),
94  connection);
95  if (it != this->connections[src_id].end()) {
96  this->connections[src_id].erase(it);
97  }
98 }
99 
101  IO::Block* block, std::vector<RT::block_connection_t>& block_connections)
102 {
103  if (block == nullptr || this->isRegistered(block)) {
104  return;
105  }
106 
107  // This insertion block algorithm makes use of move semantics to insert and
108  // delete vector items. This is mainly to avoid allocations in the real-time
109  // thread and std::move allows us to place data without having to allocate
110  // memory again. We should make sure to use reserve() member function on
111  // vectors in the non-rt thread so that further push_back calls are less
112  // likely to allocate memory.
113  size_t id = 0;
114  bool stored = false;
115  for (id = 0; id < this->block_registry.size(); id++) {
116  if (this->block_registry[id] == nullptr) {
117  this->block_registry[id] = block;
118  block->assignID(id);
119  this->connections[id].swap(block_connections);
120  stored = true;
121  break;
122  }
123  }
124 
125  if (!stored) {
126  block->assignID(this->block_registry.size());
127  this->block_registry.push_back(block);
128  this->connections.emplace_back(std::move(block_connections));
129  }
130 }
131 
133 {
134  if (block == nullptr || !(this->isRegistered(block))) {
135  return;
136  }
137  // remove block from registry
138  this->block_registry[block->getID()] = nullptr;
139  // block->assignID(IO::INVALID_BLOCK_ID);
140 }
141 
143 {
144  if (block == nullptr) {
145  return false;
146  }
147  if (block->getID() >= this->block_registry.size()) {
148  return false;
149  }
150  return block == this->block_registry[block->getID()];
151 }
152 
153 std::vector<RT::Thread*> RT::Connector::topological_sort()
154 {
155  auto processing_q = std::queue<IO::Block*>();
156  auto sorted_blocks = std::vector<IO::Block*>();
157  auto sources_per_block = std::unordered_map<IO::Block*, int>();
158  // auto valid_threads = std::vector<IO::Block*>();
159 
160  // initialize counts
161  for (auto* block : this->block_registry) {
162  if (block == nullptr) {
163  continue;
164  }
165  sources_per_block[block] = 0;
166  }
167 
168  // Calculate number of sources per block
169  for (const auto& entry : this->connections) {
170  for (const auto& conn : entry) {
171  sources_per_block[conn.dest] += 1;
172  }
173  }
174 
175  // Initialize queue for processing nodes in graph
176  for (auto block_count : sources_per_block) {
177  if (block_count.second == 0) {
178  processing_q.push(block_count.first);
179  }
180  }
181 
182  // Process the graph nodes.
183  while (!processing_q.empty()) {
184  sorted_blocks.push_back(processing_q.front());
185  for (const auto& entry : this->connections) {
186  for (const auto& conn : entry) {
187  if (processing_q.front() == conn.src) {
188  sources_per_block[conn.dest] -= 1;
189  if (sources_per_block[conn.dest] == 0) {
190  processing_q.push(conn.dest);
191  }
192  }
193  }
194  }
195  processing_q.pop();
196  }
197 
198  // System only cares about active threads
199  std::vector<RT::Thread*> sorted_active_threads;
200  for (auto* block : sorted_blocks) {
201  if (block->getActive() && block->dependent()) {
202  sorted_active_threads.push_back(dynamic_cast<RT::Thread*>(block));
203  }
204  }
205  return sorted_active_threads;
206 }
207 
208 std::vector<RT::Device*> RT::Connector::getDevices()
209 {
210  std::vector<RT::Device*> devices;
211  for (auto* block : this->block_registry) {
212  if (block == nullptr) {
213  continue;
214  }
215  if (block->getActive() && !block->dependent()) {
216  devices.push_back(dynamic_cast<RT::Device*>(block));
217  }
218  }
219  return devices;
220 }
221 
222 std::vector<RT::Thread*> RT::Connector::getThreads()
223 {
224  return this->topological_sort();
225 }
226 
227 std::vector<RT::block_connection_t> RT::Connector::getOutputs(IO::Block* src)
228 {
229  if (!this->isRegistered(src)) {
230  return {};
231  }
232  return this->connections[src->getID()];
233 }
234 
236 {
237  for (const auto& conn : this->connections[block->getID()]) {
238  conn.dest->writeinput(
239  conn.dest_port, conn.src->readPort(conn.src_port_type, conn.src_port));
240  }
241 }
242 
244 {
245  for (auto& entry : this->connections) {
246  entry.erase(std::remove_if(entry.begin(),
247  entry.end(),
248  [&](RT::block_connection_t conn)
249  { return conn.dest == block; }),
250  entry.end());
251  }
252 }
253 
254 std::vector<IO::Block*> RT::Connector::getRegisteredBlocks()
255 {
256  std::vector<IO::Block*> blocks;
257  for (auto* block : this->block_registry) {
258  if (block != nullptr) {
259  blocks.push_back(block);
260  }
261  }
262  return blocks;
263 }
264 
265 std::vector<RT::block_connection_t> RT::Connector::getAllConnections()
266 {
267  std::vector<RT::block_connection_t> all_connections;
268  for (auto entry : this->connections) {
269  all_connections.insert(all_connections.end(), entry.begin(), entry.end());
270  }
271  return all_connections;
272 }
273 
275  : event_manager(em)
276  , rt_connector(rtc)
277 {
278  if (RT::OS::getFifo(this->eventFifo,
280  != 0)
281  {
282  ERROR_MSG("RT::System::System : failed to create Fifo");
283  return;
284  }
285  this->task = std::make_unique<RT::OS::Task>();
286  if (RT::OS::createTask(this->task.get(), &RT::System::execute, this) != 0) {
287  ERROR_MSG("RT::System::System : failed to create realtime thread\n");
288  return;
289  }
290 
291  this->threads.reserve(100);
292  this->devices.reserve(100);
293  this->event_manager->registerHandler(this);
294 }
295 
297 {
298  this->task->task_finished = true;
299  RT::OS::deleteTask(this->task.get());
300  this->event_manager->unregisterHandler(this);
301  this->telemitry_processing_thread_running = false;
302  this->eventFifo->close();
303  if (this->telemitry_processing_thread.joinable()) {
304  this->telemitry_processing_thread.join();
305  }
306 }
307 
309 {
310  return this->task->period;
311 }
312 
313 void RT::System::postTelemitry(RT::Telemitry::Response telemitry)
314 {
315  this->eventFifo->writeRT(&telemitry, sizeof(RT::Telemitry::Response));
316 }
317 
319 {
320  auto proc = [&]()
321  {
322  eventLogger* logger = this->event_manager->getLogger();
323  std::vector<RT::Telemitry::Response> responses;
324  while (!this->task->task_finished
325  && this->telemitry_processing_thread_running) {
326  responses = this->getTelemitry();
327  for (auto telem : responses) {
328  if (telem.cmd != nullptr) {
329  telem.cmd->done();
330  }
331  if (telem.type == RT::Telemitry::RT_SHUTDOWN) {
332  this->telemitry_processing_thread_running = false;
333  }
334  // let's log this telemitry
335  logger->log(telem);
336  }
337  }
338  };
339  this->telemitry_processing_thread = std::thread(proc);
340  RT::OS::renameOSThread(this->telemitry_processing_thread,
341  std::string("TelemitryWorker"));
342 }
343 
344 std::vector<RT::Telemitry::Response> RT::System::getTelemitry()
345 {
346  this->eventFifo->poll();
347  std::vector<RT::Telemitry::Response> responses;
348  RT::Telemitry::Response telemitry;
349  while (this->eventFifo->read(&telemitry, sizeof(RT::Telemitry::Response)) > 0)
350  {
351  responses.push_back(telemitry);
352  }
353  return responses;
354 }
355 
356 void RT::System::setPeriod(RT::System::CMD* cmd)
357 {
358  auto period = std::get<int64_t>(cmd->getRTParam("period"));
359  this->task->period = period;
361  this->postTelemitry(telem);
362 }
363 
364 void RT::System::updateDeviceList(RT::System::CMD* cmd)
365 {
366  std::vector<RT::Device*>* vec_ptr =
367  std::get<std::vector<RT::Device*>*>(cmd->getRTParam("deviceList"));
368  this->devices.clear();
369  this->devices.assign(vec_ptr->begin(), vec_ptr->end());
370  if (cmd->getType() == Event::Type::RT_DEVICE_REMOVE_EVENT) {
371  auto* device = std::get<RT::Device*>(cmd->getRTParam("device"));
372  this->rt_connector->clearAllConnections(device);
373  device->assignID(IO::INVALID_BLOCK_ID);
374  }
376  cmd};
377  this->postTelemitry(telem);
378 }
379 
380 void RT::System::updateThreadList(RT::System::CMD* cmd)
381 {
382  std::vector<RT::Thread*>* vec_ptr =
383  std::get<std::vector<RT::Thread*>*>(cmd->getRTParam("threadList"));
384  this->threads.clear();
385  this->threads.assign(vec_ptr->begin(), vec_ptr->end());
386  if (cmd->getType() == Event::Type::RT_THREAD_REMOVE_EVENT) {
387  auto* thread = std::get<RT::Thread*>(cmd->getRTParam("thread"));
388  this->rt_connector->clearAllConnections(thread);
389  thread->assignID(IO::INVALID_BLOCK_ID);
390  }
392  cmd};
393  this->postTelemitry(telem);
394 }
395 
396 void RT::System::ioLinkUpdateCMD(RT::System::CMD* cmd)
397 {
398  auto conn = std::get<RT::block_connection_t>(cmd->getRTParam("connection"));
400  telem.cmd = cmd;
401  switch (cmd->getType()) {
403  this->rt_connector->connect(conn);
405  break;
407  this->rt_connector->disconnect(conn);
409  break;
410  default:
412  break;
413  }
414  this->postTelemitry(telem);
415 }
416 
417 void RT::System::getPeriodTicksCMD(RT::System::CMD* cmd)
418 {
419  RT::command_param_t value;
420  switch (cmd->getType()) {
422  value = &(this->periodStartTime);
423  cmd->setRTParam("pre-period", value);
424  break;
426  value = &(this->periodEndTime);
427  cmd->setRTParam("post-period", value);
428  break;
429  default:
430  return;
431  }
433  this->postTelemitry(telem);
434 }
435 
436 void RT::System::changeWidgetParametersCMD(RT::System::CMD* cmd)
437 {
439  telem.cmd = cmd;
441  auto* component =
442  std::get<Widgets::Component*>(cmd->getRTParam("paramWidget"));
443  auto param_id = std::get<size_t>(cmd->getRTParam("paramID"));
444  auto param_type_num = std::get<uint64_t>(cmd->getRTParam("paramType"));
445  auto param_type = static_cast<Widgets::Variable::variable_t>(param_type_num);
446  RT::command_param_t param_value_any = cmd->getRTParam("paramValue");
447  switch (param_type) {
449  component->setValue<double>(param_id, std::get<double>(param_value_any));
450  break;
452  component->setValue<int64_t>(param_id,
453  std::get<int64_t>(param_value_any));
454  break;
456  component->setValue<uint64_t>(param_id,
457  std::get<uint64_t>(param_value_any));
458  break;
460  component->setValue<RT::State::state_t>(
461  param_id,
462  static_cast<RT::State::state_t>(
463  std::get<State::state_t>(param_value_any)));
464  break;
465  default:
466  ERROR_MSG(
467  "Widget Parameter Change event does not contain expected parameter "
468  "types");
470  }
471  this->postTelemitry(telem);
472 }
473 
474 void RT::System::changeWidgetStateCMD(RT::System::CMD* cmd)
475 {
477  telem.cmd = cmd;
479  auto* component = std::get<Widgets::Component*>(cmd->getRTParam("component"));
480  auto state = std::get<RT::State::state_t>(cmd->getRTParam("state"));
481  component->setState(state);
482  this->postTelemitry(telem);
483 }
484 
485 void RT::System::executeCMD(RT::System::CMD* cmd)
486 {
488  telem.cmd = cmd;
489  switch (cmd->getType()) {
491  this->setPeriod(cmd);
492  break;
495  this->ioLinkUpdateCMD(cmd);
496  break;
499  this->getPeriodTicksCMD(cmd);
500  break;
505  this->updateDeviceList(cmd);
506  break;
511  this->updateThreadList(cmd);
512  break;
514  this->task->task_finished = true;
516  telem.cmd = cmd;
517  this->postTelemitry(telem);
518  break;
520  this->changeWidgetParametersCMD(cmd);
521  break;
523  this->changeWidgetStateCMD(cmd);
524  break;
525  case Event::Type::NOOP:
527  telem.cmd = cmd;
528  this->postTelemitry(telem);
529  break;
530  default:
532  telem.cmd = nullptr;
533  RT::System::postTelemitry(telem);
534  // make sure the command is handled so caller can continue
535  }
536 }
537 
539 {
540  // funnily enough it may be that real-time loop
541  // is already shutting down before unregistering
542  // system from the list of event handlers.
543  // In that case we shouldn't attempt to send commands
544  // or process events
545  if (this->task->task_finished) {
546  return;
547  }
548 
549  switch (event->getType()) {
551  this->setPeriod(event);
552  break;
554  this->blockInfoRequest(event);
555  break;
557  this->connectionsInfoRequest(event);
558  break;
560  this->allConnectionsInfoRequest(event);
561  break;
564  this->ioLinkChange(event);
565  break;
567  this->insertThread(event);
568  break;
570  this->removeThread(event);
571  break;
574  this->threadActivityChange(event);
575  break;
578  this->deviceActivityChange(event);
579  break;
581  this->insertDevice(event);
582  break;
584  this->removeDevice(event);
585  break;
587  this->changeWidgetParameters(event);
588  break;
590  this->changeWidgetState(event);
591  break;
593  this->shutdown(event);
594  break;
597  this->provideTimetickPointers(event);
598  break;
600  this->getPeriodValues(event);
601  break;
602  case Event::Type::NOOP:
603  this->NOOP(event);
604  break;
605  default:
606  return;
607  }
608 }
609 
610 void RT::System::setPeriod(Event::Object* event)
611 {
612  // auto period = std::any_cast<int64_t>(event->getParam("period"));
613  RT::System::CMD cmd(event->getType());
614  auto period = std::any_cast<int64_t>(event->getParam("period"));
615  cmd.setRTParam("period", period);
616  RT::System::CMD* cmd_ptr = &cmd;
617  this->eventFifo->write(&cmd_ptr, sizeof(RT::System::CMD*));
618  cmd.wait();
619 }
620 
621 void RT::System::getPeriodValues(Event::Object* event)
622 {
623  event->setParam("period", std::any(this->getPeriod()));
624 }
625 
626 void RT::System::NOOP(Event::Object* /*event*/)
627 {
628  RT::System::CMD cmd(Event::Type::NOOP);
629  RT::System::CMD* cmd_ptr = &cmd;
630  this->eventFifo->write(&cmd_ptr, sizeof(RT::System::CMD*));
631  cmd.wait();
632 }
633 
634 void RT::System::insertDevice(Event::Object* event)
635 {
636  auto* device = std::any_cast<RT::Device*>(event->getParam("device"));
637  std::vector<RT::block_connection_t> connections_memory;
638  connections_memory.reserve(10);
639  if (device == nullptr) {
640  ERROR_MSG("RT::System::insertDevice : invalid device pointer\n");
641  return;
642  }
643  this->rt_connector->insertBlock(device, connections_memory);
644  std::vector<RT::Device*> device_list = this->rt_connector->getDevices();
645  RT::System::CMD cmd(event->getType());
646  cmd.setRTParam("deviceList", &device_list);
647  RT::System::CMD* cmd_ptr = &cmd;
648  this->eventFifo->write(&cmd_ptr, sizeof(RT::System::CMD*));
649  cmd.wait();
650 }
651 
652 void RT::System::removeDevice(Event::Object* event)
653 {
654  auto* device = std::any_cast<RT::Device*>(event->getParam("device"));
655  if (device == nullptr) {
656  ERROR_MSG("RT::System::removeDevice : invalid device pointer\n");
657  return;
658  }
659  // We have to make sure to deactivate device before removing
660  this->rt_connector->removeBlock(device);
661  std::vector<RT::Device*> device_list = this->rt_connector->getDevices();
662  RT::System::CMD cmd(event->getType());
663  cmd.setRTParam("deviceList", &device_list);
664  cmd.setRTParam("device", device);
665  RT::System::CMD* cmd_ptr = &cmd;
666  this->eventFifo->write(&cmd_ptr, sizeof(RT::System::CMD*));
667  cmd.wait();
668 }
669 
670 void RT::System::insertThread(Event::Object* event)
671 {
672  auto* thread = std::any_cast<RT::Thread*>(event->getParam("thread"));
673  std::vector<RT::block_connection_t> connections_memory;
674  connections_memory.reserve(10);
675  if (thread == nullptr) {
676  ERROR_MSG("RT::System::removeDevice : invalid device pointer\n");
677  return;
678  }
679  this->rt_connector->insertBlock(thread, connections_memory);
680  std::vector<RT::Thread*> thread_list = this->rt_connector->getThreads();
681  RT::System::CMD cmd(event->getType());
682  cmd.setRTParam("threadList", &thread_list);
683  RT::System::CMD* cmd_ptr = &cmd;
684  this->eventFifo->write(&cmd_ptr, sizeof(RT::System::CMD*));
685  cmd.wait();
686 }
687 
688 // TODO: come back after figuring out connection problems
689 void RT::System::removeThread(Event::Object* event)
690 {
691  auto* thread = std::any_cast<RT::Thread*>(event->getParam("thread"));
692  if (thread == nullptr) {
693  ERROR_MSG("RT::System::removeDevice : invalid device pointer\n");
694  return;
695  }
696  // We have to make sure to deactivate thread before removing
697  thread->setActive(/*act=*/false);
698  this->rt_connector->removeBlock(thread);
699  std::vector<RT::Thread*> thread_list = this->rt_connector->getThreads();
700  RT::System::CMD cmd(event->getType());
701  cmd.setRTParam("threadList", &thread_list);
702  cmd.setRTParam("thread", thread);
703  RT::System::CMD* cmd_ptr = &cmd;
704  this->eventFifo->write(&cmd_ptr, sizeof(RT::System::CMD*));
705  cmd.wait();
706 }
707 
708 void RT::System::threadActivityChange(Event::Object* event)
709 {
710  auto isactive = event->getType() == Event::Type::RT_THREAD_UNPAUSE_EVENT;
711  RT::System::CMD cmd(event->getType());
712  RT::System::CMD* cmd_ptr = &cmd;
713  auto* thread = std::any_cast<RT::Thread*>(event->getParam("thread"));
714  thread->setActive(isactive);
715  auto thread_list = this->rt_connector->getThreads();
716  cmd.setRTParam("threadList", &thread_list);
717  this->eventFifo->write(&cmd_ptr, sizeof(RT::System::CMD*));
718  cmd.wait();
719 }
720 
721 void RT::System::deviceActivityChange(Event::Object* event)
722 {
723  auto isactive = event->getType() == Event::Type::RT_DEVICE_UNPAUSE_EVENT;
724  RT::System::CMD cmd(event->getType());
725  RT::System::CMD* cmd_ptr = &cmd;
726  auto* device = std::any_cast<RT::Device*>(event->getParam("device"));
727  device->setActive(isactive);
728  auto device_list = this->rt_connector->getDevices();
729  cmd.setRTParam("deviceList", &device_list);
730  this->eventFifo->write(&cmd_ptr, sizeof(RT::System::CMD*));
731  cmd.wait();
732 }
733 
734 void RT::System::ioLinkChange(Event::Object* event)
735 {
736  RT::System::CMD cmd(event->getType());
737  cmd.setRTParam(
738  "connection",
739  std::any_cast<RT::block_connection_t>(event->getParam("connection")));
740  RT::System::CMD* cmd_ptr = &cmd;
741  this->eventFifo->write(&cmd_ptr, sizeof(RT::System::CMD*));
742  cmd.wait();
743 }
744 
745 void RT::System::connectionsInfoRequest(Event::Object* event)
746 {
747  auto* source = std::any_cast<IO::Block*>(event->getParam("block"));
748  const std::vector<RT::block_connection_t> outputs =
749  this->rt_connector->getOutputs(source);
750  event->setParam("outputs", std::any(outputs));
751 }
752 
753 void RT::System::allConnectionsInfoRequest(Event::Object* event)
754 {
755  const std::vector<RT::block_connection_t> all_conn =
756  this->rt_connector->getAllConnections();
757  event->setParam("connections", std::any(all_conn));
758 }
759 
760 void RT::System::blockInfoRequest(Event::Object* event)
761 {
762  const std::vector<IO::Block*> blocks =
763  this->rt_connector->getRegisteredBlocks();
764  event->setParam("blockList", std::any(blocks));
765 }
766 
767 void RT::System::shutdown(Event::Object* event)
768 {
769  RT::System::CMD cmd(event->getType());
770  RT::System::CMD* cmd_ptr = &cmd;
771  this->eventFifo->write(&cmd_ptr, sizeof(RT::System::CMD*));
772  cmd.wait();
773 }
774 
775 void RT::System::provideTimetickPointers(Event::Object* event)
776 {
777  RT::System::CMD cmd(event->getType());
778  RT::System::CMD* cmd_ptr = &cmd;
779  this->eventFifo->write(&cmd_ptr, sizeof(RT::System::CMD*));
780  cmd.wait();
781 
782  int64_t* startperiod = nullptr;
783  int64_t* stopperiod = nullptr;
784  // transfer values to event for poster to use
785  switch (event->getType()) {
787  startperiod = std::get<int64_t*>(cmd.getRTParam("pre-period"));
788  event->setParam("pre-period", std::any(startperiod));
789  break;
791  stopperiod = std::get<int64_t*>(cmd.getRTParam("post-period"));
792  event->setParam("post-period", std::any(stopperiod));
793  break;
794  default:
795  return;
796  }
797 }
798 
799 void RT::System::changeWidgetParameters(Event::Object* event)
800 {
801  // we must convert event object to cmd object
802  RT::System::CMD cmd(event->getType());
803  auto* component =
804  std::any_cast<Widgets::Component*>(event->getParam("paramWidget"));
805  cmd.setRTParam("paramWidget", component);
806  auto param_id = std::any_cast<size_t>(event->getParam("paramID"));
807  cmd.setRTParam("paramID", param_id);
808  auto param_type = std::any_cast<Widgets::Variable::variable_t>(
809  event->getParam("paramType"));
810  cmd.setRTParam("paramType", param_type);
811  std::any param_value_any = event->getParam("paramValue");
812  switch (param_type) {
814  cmd.setRTParam("paramValue", std::any_cast<double>(param_value_any));
815  break;
817  cmd.setRTParam("paramValue", std::any_cast<int64_t>(param_value_any));
818 
819  break;
821  cmd.setRTParam("paramValue", std::any_cast<uint64_t>(param_value_any));
822  break;
824  cmd.setRTParam("paramValue",
825  std::any_cast<RT::State::state_t>(param_value_any));
826  break;
827  default:
828  ERROR_MSG(
829  "Widget Parameter Change event does not contain expected parameter "
830  "types");
831  }
832 
833  RT::System::CMD* cmd_ptr = &cmd;
834  this->eventFifo->write(&cmd_ptr, sizeof(RT::System::CMD*));
835  cmd_ptr->wait();
836 }
837 
838 void RT::System::changeWidgetState(Event::Object* event)
839 {
840  RT::System::CMD cmd(event->getType());
841  auto* component =
842  std::any_cast<Widgets::Component*>(event->getParam("component"));
843  auto state = std::any_cast<RT::State::state_t>(event->getParam("state"));
844  cmd.setRTParam("component", component);
845  cmd.setRTParam("state", state);
846  RT::System::CMD* cmd_ptr = &cmd;
847  this->eventFifo->write(&cmd_ptr, sizeof(RT::System::CMD*));
848  cmd.wait();
849 }
850 
851 RT::System::CMD::CMD(Event::Type et)
852  : Event::Object(et)
853 {
854 }
855 
856 RT::command_param_t RT::System::CMD::getRTParam(
857  const std::string_view& param_name)
858 {
859  for (auto& parameter : rt_params) {
860  if (parameter.name == param_name) {
861  return parameter.value;
862  }
863  }
864  return std::monostate();
865 }
866 
867 void RT::System::CMD::setRTParam(const std::string_view& param_name,
868  const RT::command_param_t& value)
869 {
870  for (auto& parameter : rt_params) {
871  if (parameter.name == param_name) {
872  parameter.value = value;
873  return;
874  }
875  }
876 
877  rt_param temp = {};
878  temp.name = param_name;
879  temp.value = value;
880  rt_params.push_back(temp);
881 }
882 
883 void RT::System::execute(void* sys)
884 {
885  auto* system = static_cast<RT::System*>(sys);
886  RT::System::CMD* cmd = nullptr;
887 
888  if (RT::OS::setPeriod(system->task.get(), RT::OS::DEFAULT_PERIOD) != 0) {
889  ERROR_MSG(
890  "RT::System::execute : failed to set the initial period of the "
891  "realtime thread\n");
892  return;
893  }
894  auto starttime = RT::OS::getTime();
895  int64_t endtime = 0;
896  system->task->next_t = starttime + system->task->period;
897  while (!(system->task->task_finished)) {
898  // storing timing information and placing it in local variables
899 
900  // store period timing values for previous period
901  system->periodStartTime = starttime;
902  system->periodEndTime = endtime;
903 
904  // sleep until next cycle
905  RT::OS::sleepTimestep(system->task.get());
906  starttime = RT::OS::getTime();
907 
908  for (auto* iDevice : system->devices) {
909  iDevice->read();
910  system->rt_connector->propagateBlockConnections(iDevice);
911  }
912 
913  for (auto* iThread : system->threads) {
914  iThread->execute();
915  system->rt_connector->propagateBlockConnections(iThread);
916  }
917 
918  for (auto* iDevice : system->devices) {
919  iDevice->write();
920  }
921 
922  while (system->eventFifo->readRT(&cmd, sizeof(RT::System::CMD*)) > 0) {
923  system->executeCMD(cmd);
924  }
925  endtime = RT::OS::getTime();
926  cmd = nullptr;
927  }
928 }
void registerHandler(Handler *handler)
Definition: event.cpp:336
Event::Type getType() const
Definition: event.cpp:228
std::any getParam(const std::string &param_name) const
Definition: event.cpp:170
Definition: io.hpp:79
size_t getID() const
Definition: io.hpp:202
void writeinput(size_t index, const double &data)
Definition: io.cpp:65
void assignID(size_t block_id)
Definition: io.hpp:195
const double & readPort(IO::flags_t direction, size_t index)
Definition: io.cpp:85
std::vector< RT::block_connection_t > getAllConnections()
Definition: rt.cpp:265
std::vector< IO::Block * > getRegisteredBlocks()
Definition: rt.cpp:254
std::vector< RT::Thread * > getThreads()
Definition: rt.cpp:222
void propagateBlockConnections(IO::Block *block)
Definition: rt.cpp:235
void disconnect(block_connection_t connection)
Definition: rt.cpp:84
int connect(block_connection_t connection)
Definition: rt.cpp:50
void insertBlock(IO::Block *block, std::vector< RT::block_connection_t > &block_connections)
Definition: rt.cpp:100
bool isRegistered(IO::Block *block)
Definition: rt.cpp:142
std::vector< RT::block_connection_t > getOutputs(IO::Block *src)
Definition: rt.cpp:227
bool connected(block_connection_t connection)
Definition: rt.cpp:70
void clearAllConnections(IO::Block *block)
Definition: rt.cpp:243
std::vector< RT::Device * > getDevices()
Definition: rt.cpp:208
void removeBlock(IO::Block *block)
Definition: rt.cpp:132
std::vector< RT::Telemitry::Response > getTelemitry()
Definition: rt.cpp:344
System(Event::Manager *em, RT::Connector *rtc)
Definition: rt.cpp:274
void receiveEvent(Event::Object *event) override
Definition: rt.cpp:538
int64_t getPeriod()
Definition: rt.cpp:308
void createTelemitryProcessor()
Definition: rt.cpp:318
~System() override
Definition: rt.cpp:296
void log(Event::Object *event)
Definition: logger.cpp:12
void ERROR_MSG(const std::string &errmsg, Args... args)
Definition: debug.hpp:36
Definition: event.hpp:49
Type
Definition: event.hpp:55
@ RT_WIDGET_PARAMETER_CHANGE_EVENT
Definition: event.hpp:68
@ RT_THREAD_PAUSE_EVENT
Definition: event.hpp:62
@ RT_GET_PERIOD_EVENT
Definition: event.hpp:59
@ RT_PERIOD_EVENT
Definition: event.hpp:56
@ RT_DEVICE_UNPAUSE_EVENT
Definition: event.hpp:67
@ RT_PREPERIOD_EVENT
Definition: event.hpp:57
@ RT_THREAD_UNPAUSE_EVENT
Definition: event.hpp:63
@ RT_WIDGET_STATE_CHANGE_EVENT
Definition: event.hpp:69
@ IO_LINK_INSERT_EVENT
Definition: event.hpp:71
@ IO_LINK_REMOVE_EVENT
Definition: event.hpp:72
@ RT_THREAD_INSERT_EVENT
Definition: event.hpp:60
@ NOOP
Definition: event.hpp:93
@ RT_DEVICE_PAUSE_EVENT
Definition: event.hpp:66
@ RT_DEVICE_INSERT_EVENT
Definition: event.hpp:64
@ RT_THREAD_REMOVE_EVENT
Definition: event.hpp:61
@ IO_BLOCK_QUERY_EVENT
Definition: event.hpp:73
@ IO_ALL_CONNECTIONS_QUERY_EVENT
Definition: event.hpp:76
@ RT_SHUTDOWN_EVENT
Definition: event.hpp:70
@ IO_BLOCK_OUTPUTS_QUERY_EVENT
Definition: event.hpp:74
@ RT_POSTPERIOD_EVENT
Definition: event.hpp:58
@ RT_DEVICE_REMOVE_EVENT
Definition: event.hpp:65
constexpr size_t INVALID_BLOCK_ID
Definition: io.hpp:42
int createTask(Task *task, void(*func)(void *), void *arg)
Definition: rtos_evl.cpp:86
const uint64_t DEFAULT_FIFO_SIZE
Definition: rtos.hpp:20
int64_t getPeriod()
Definition: rtos_evl.cpp:150
int setPeriod(Task *task, int64_t period)
Definition: rtos_evl.cpp:144
int getFifo(std::unique_ptr< Fifo > &fifo, size_t fifo_size)
Definition: fifo.cpp:194
void shutdown(RT::OS::Task *task)
Definition: rtos_evl.cpp:72
void sleepTimestep(Task *task)
Definition: rtos_evl.cpp:159
void renameOSThread(std::thread &thread, const std::string &name)
Definition: rtos_evl.cpp:175
int64_t getTime()
void deleteTask(Task *task)
Definition: rtos_evl.cpp:117
const int64_t DEFAULT_PERIOD
Definition: rtos.hpp:19
state_t
Definition: rt.hpp:53
constexpr response_t RT_WIDGET_PARAM_UPDATE
Definition: rt.hpp:76
constexpr response_t RT_DEVICE_LIST_UPDATE
Definition: rt.hpp:72
constexpr response_t RT_NOOP
Definition: rt.hpp:73
constexpr response_t IO_LINK_UPDATED
Definition: rt.hpp:78
constexpr response_t RT_PERIOD_UPDATE
Definition: rt.hpp:70
constexpr response_t RT_ERROR
Definition: rt.hpp:82
constexpr response_t RT_THREAD_LIST_UPDATE
Definition: rt.hpp:71
constexpr response_t RT_WIDGET_STATE_UPDATE
Definition: rt.hpp:80
constexpr response_t RT_SHUTDOWN
Definition: rt.hpp:74
std::variant< std::monostate, int64_t, int64_t *, uint64_t, double, RT::Thread *, std::vector< RT::Thread * > *, RT::Device *, std::vector< RT::Device * > *, IO::Block *, RT::block_connection_t, Widgets::Component *, State::state_t, std::string > command_param_t
Definition: rt.hpp:394
response_t type
Definition: rt.hpp:98
Event::Object * cmd
Definition: rt.hpp:99
IO::Block * dest
Definition: rt.hpp:190
IO::Block * src
Definition: rt.hpp:187
IO::flags_t src_port_type
Definition: rt.hpp:188