RTXI  3.0.0
The Real-Time eXperiment Interface Reference Manual
fifo.cpp
Go to the documentation of this file.
1 /*
2  The Real-Time eXperiment Interface (RTXI)
3  Copyright (C) 2011 Georgia Institute of Technology, University of Utah,
4  Will Cornell Medical College
5 
6  This program is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 
19 */
20 
21 #include <array>
22 #include <condition_variable>
23 #include <mutex>
24 
25 #include "fifo.hpp"
26 
27 #include "debug.hpp"
28 
29 // Generic posix fifo based on pipes
30 namespace RT::OS
31 {
32 class xbuffFifo : public RT::OS::Fifo
33 {
34 public:
35  explicit xbuffFifo(size_t size);
36  xbuffFifo(const xbuffFifo& fifo) = delete;
37  xbuffFifo& operator=(const xbuffFifo& fifo) = delete;
38  xbuffFifo(xbuffFifo&&) = default;
39  xbuffFifo& operator=(xbuffFifo&&) = default;
40  ~xbuffFifo() override;
41 
42  size_t getCapacity() override;
43  int64_t read(void* buf, size_t data_size) override;
44  int64_t write(void* buf, size_t data_size) override;
45  int64_t readRT(void* buf, size_t data_size) override;
46  int64_t writeRT(void* buf, size_t data_size) override;
47  void poll() override;
48  void close() override;
49 
50 private:
51  char* rt_to_ui; // 0 is read from rt; 1 is write to ui
52  char* ui_to_rt; // 0 is read from ui; 1 is write to rt
53  const uint64_t fifo_capacity;
54  std::mutex mutex_rt2ui;
55  std::mutex mutex_ui2rt;
56  uint64_t ui_rptr = 0;
57  uint64_t ui_wptr = 0;
58  uint64_t rt_rptr = 0;
59  uint64_t rt_wptr = 0;
60  uint64_t ui_available_read_bytes;
61  uint64_t ui_available_write_bytes;
62  uint64_t rt_available_read_bytes;
63  uint64_t rt_available_write_bytes;
64 
65  // synchronization primitives specific to poll function
66  std::mutex ui_read_mut;
67  std::condition_variable available_read_cond;
68  bool rt_closed = false;
69  void flush_buff();
70 };
71 } // namespace RT::OS
72 
74  : fifo_capacity(size)
75 {
76  rt_to_ui = new char[this->fifo_capacity];
77  ui_to_rt = new char[this->fifo_capacity];
78 }
79 
81 {
82  delete[] rt_to_ui;
83  delete[] ui_to_rt;
84 }
85 
86 int64_t RT::OS::xbuffFifo::read(void* buf, size_t data_size)
87 {
88  if (rt_wptr == ui_rptr) {
89  return -1;
90  }
91  std::unique_lock<std::mutex> lock(mutex_rt2ui);
92  // return if the caller requests more data than available
93  if (ui_available_read_bytes < data_size) {
94  return -1;
95  }
96  if (fifo_capacity - ui_rptr < data_size) {
97  uint64_t m = fifo_capacity - ui_rptr;
98  memcpy(buf, rt_to_ui + ui_rptr, m);
99  memcpy(reinterpret_cast<char*>(buf) + m, rt_to_ui, data_size - m);
100  } else {
101  memcpy(buf, rt_to_ui + ui_rptr, data_size);
102  }
103  ui_rptr = (ui_rptr + data_size) % this->fifo_capacity;
104  ui_available_read_bytes = (fifo_capacity + ui_rptr - rt_wptr) % fifo_capacity;
105  return static_cast<int64_t>(data_size);
106 }
107 
108 int64_t RT::OS::xbuffFifo::write(void* buf, size_t data_size)
109 {
110  if (data_size > fifo_capacity - rt_available_read_bytes) {
111  ERROR_MSG("FIFO::write : Fifo full, data lost\n");
112  return -1;
113  }
114 
115  if (data_size > fifo_capacity - ui_wptr) {
116  uint64_t m = fifo_capacity - ui_wptr;
117  memcpy(ui_to_rt + ui_wptr, buf, m);
118  memcpy(ui_to_rt, reinterpret_cast<const char*>(buf) + m, data_size - m);
119  } else {
120  memcpy(ui_to_rt + ui_wptr, buf, data_size);
121  }
122  ui_wptr = (ui_wptr + data_size) % fifo_capacity;
123  rt_available_read_bytes = (fifo_capacity + rt_rptr - ui_wptr) % fifo_capacity;
124  return static_cast<int64_t>(data_size);
125 }
126 
127 int64_t RT::OS::xbuffFifo::readRT(void* buf, size_t data_size)
128 {
129  if (ui_wptr == rt_rptr) {
130  return -1;
131  }
132  std::unique_lock<std::mutex> lock(mutex_rt2ui);
133  // return if the caller requests more data than available
134  if (rt_available_read_bytes < data_size) {
135  return -1;
136  }
137  if (fifo_capacity - rt_rptr < data_size) {
138  uint64_t m = fifo_capacity - rt_rptr;
139  memcpy(buf, ui_to_rt + rt_rptr, m);
140  memcpy(reinterpret_cast<char*>(buf) + m, ui_to_rt, data_size - m);
141  } else {
142  memcpy(buf, ui_to_rt + rt_rptr, data_size);
143  }
144  rt_rptr = (rt_rptr + data_size) % this->fifo_capacity;
145  rt_available_read_bytes = (fifo_capacity + rt_rptr - ui_wptr) % fifo_capacity;
146  return static_cast<int64_t>(data_size);
147 }
148 
149 int64_t RT::OS::xbuffFifo::writeRT(void* buf, size_t data_size)
150 {
151  if (data_size > fifo_capacity - ui_available_read_bytes) {
152  ERROR_MSG("FIFO::write : Fifo full, data lost\n");
153  return -1;
154  }
155 
156  if (data_size > fifo_capacity - rt_wptr) {
157  uint64_t m = fifo_capacity - rt_wptr;
158  memcpy(rt_to_ui + rt_wptr, buf, m);
159  memcpy(rt_to_ui, reinterpret_cast<const char*>(buf) + m, data_size - m);
160  } else {
161  memcpy(rt_to_ui + rt_wptr, buf, data_size);
162  }
163  rt_wptr = (rt_wptr + data_size) % fifo_capacity;
164  ui_available_read_bytes = (fifo_capacity + ui_rptr - rt_wptr) % fifo_capacity;
165  this->available_read_cond.notify_one();
166  return static_cast<int64_t>(data_size);
167 }
168 
170 {
171  std::unique_lock<std::mutex> lk(this->ui_read_mut);
172  if (ui_available_read_bytes == 0) {
173  this->available_read_cond.wait(
174  lk, [this]() { return ui_available_read_bytes != 0 || rt_closed; });
175  }
176 }
177 
178 void RT::OS::xbuffFifo::flush_buff()
179 {
180  this->rt_closed = true;
181  this->available_read_cond.notify_all();
182 }
183 
185 {
186  this->flush_buff();
187 }
188 
190 {
191  return this->fifo_capacity;
192 }
193 
194 int RT::OS::getFifo(std::unique_ptr<Fifo>& fifo, size_t fifo_size)
195 {
196  auto tmp_fifo = std::make_unique<RT::OS::xbuffFifo>(fifo_size);
197  fifo = std::move(tmp_fifo);
198  return 0;
199 }
int64_t readRT(void *buf, size_t data_size) override
Definition: fifo.cpp:127
void close() override
Definition: fifo.cpp:184
void poll() override
Definition: fifo.cpp:169
xbuffFifo & operator=(const xbuffFifo &fifo)=delete
xbuffFifo & operator=(xbuffFifo &&)=default
size_t getCapacity() override
Definition: fifo.cpp:189
xbuffFifo(size_t size)
Definition: fifo.cpp:73
int64_t writeRT(void *buf, size_t data_size) override
Definition: fifo.cpp:149
int64_t read(void *buf, size_t data_size) override
Definition: fifo.cpp:86
xbuffFifo(xbuffFifo &&)=default
~xbuffFifo() override
Definition: fifo.cpp:80
int64_t write(void *buf, size_t data_size) override
Definition: fifo.cpp:108
xbuffFifo(const xbuffFifo &fifo)=delete
void ERROR_MSG(const std::string &errmsg, Args... args)
Definition: debug.hpp:36
Definition: fifo.cpp:31
int getFifo(std::unique_ptr< Fifo > &fifo, size_t fifo_size)
Definition: fifo.cpp:194