AzmqFlatbuffer.hpp
Go to the documentation of this file.
1 #ifndef _AZMQ_FLATBUFFER_
2 #define _AZMQ_FLATBUFFER_
3 
4 #include <mutex>
5 #include <iostream>
6 #include <memory>
7 #include <thread>
8 
9 #include <boost/asio.hpp>
10 #include <boost/asio/io_service.hpp>
11 #include <boost/asio/ip/tcp.hpp>
12 #include <boost/asio/spawn.hpp>
13 #include <boost/asio/steady_timer.hpp>
14 #include <boost/asio/write.hpp>
15 #include <boost/asio/signal_set.hpp>
16 #include <boost/circular_buffer.hpp>
17 #include <boost/container/static_vector.hpp>
18 
19 
20 #include <azmq/socket.hpp>
21 #include <flatbuffers/flatbuffers.h>
22 
23 
24 
25 /// @brief sends and receives flatbuffer data via AZMQ implementation of ZeroMQ plus manages the relevant buffers
26 ///
27 /// Sending: This class provides a mechanism to asynchronously send google flatbuffers.
28 /// It also stores a pool of these buffers so that they don't need to be reallocated.
29 ///
30 /// Rationale: FlatBufferBuilders are much faster if they are reused
31 /// so we create a small pool of them that you can get from this object
32 /// after you send a flat buffer, the builder is put into the pool
33 ///
34 /// @todo the buffer pools may be a premature optimization. Evaluate this.
35 class AzmqFlatbuffer : public std::enable_shared_from_this<AzmqFlatbuffer>
36 {
37 public:
38 
39 static const int default_circular_buffer_size = 10;
40 /// @todo Consider making this a simple std::vector so it is runtime configurable
41 typedef std::shared_ptr<boost::container::static_vector<uint8_t,256>> receive_buffer_type;
42 
43  /// Initialize AzmqFlatbuffer with a socket.
44  /// The socket should be fully configured
45  /// and ready to use when it is passed to this object.
46  /// We also recommend the user utilizes AzmqFlatbuffer(std::move(socket))
47  /// when calling this constructor.
48  /// @see AzmqFlatbufferTest for an example of usage.
49  ///
50  /// @todo consider making default_circular_buffer_size and receive_buffer_type configurable
51  explicit AzmqFlatbuffer(azmq::socket socket)
52  : socket_(std::move(socket)),
53  strand_(socket_.get_io_service()),
54  unusedFlatBufferBuilders_(default_circular_buffer_size),
55  unusedReceiveBuffers_(default_circular_buffer_size),
56  receiveBuffersWithData_(default_circular_buffer_size),
57  doneReceiving_(true)
58  {
59  for (int i = 0; i<default_circular_buffer_size; ++i) {
60  unusedFlatBufferBuilders_.push_back(std::make_shared<flatbuffers::FlatBufferBuilder>());
61  unusedReceiveBuffers_.push_back(std::make_shared<receive_buffer_type::element_type>());
62  }
63  }
64 
65  /// Send a FlatbufferBuilder to the destination specified in the socket.
66  /// @todo make it so FlatBufferBuilders can be used directly without shared_ptr overhead.
67  ///
68  /// @pre there are no other instances of fbbP (reference count of the shared_ptr should be 1)
69  ///
70  /// @note the FlatBufferBuilder is automatically put back into the pool internal to this class after sending
71  void async_send_flatbuffer(std::shared_ptr<flatbuffers::FlatBufferBuilder> fbbP)
72  {
73  auto self(shared_from_this());
74 
75 #if 0
76  socket_.async_send(boost::asio::buffer(fbbP->GetBufferPointer(), fbbP->GetSize()), [this,self,fbbP] (boost::system::error_code const& ec, size_t bytes_transferred) {
77  if(ec) std::cout << "SendFlatBuffer error! todo: figure out how to handle this\n";
78  std::lock_guard<std::mutex> lock(this->unusedFlatBufferBuildersLock_);
79  fbbP->Clear();
80  this->unusedFlatBufferBuilders_.push_back(fbbP);
81  });
82 #endif
83 
84  //Without +/-:
85 
86  //char test[] = {1,1,1,1,0};
87 
88  size_t res = socket_.send(boost::asio::buffer(fbbP->GetBufferPointer(), fbbP->GetSize()));
89 
90  //size_t res = socket_.send(boost::asio::buffer(test, 5));
91  if (not res) {
92  std::cerr << "empty\n";
93  }
94 
95  /*, [this,self,fbbP] (boost::system::error_code const& ec, size_t bytes_transferred) {
96  if(ec) std::cout << "SendFlatBuffer error! todo: figure out how to handle this\n";
97  std::lock_guard<std::mutex> lock(this->unusedFlatBufferBuildersLock_);
98  fbbP->Clear();
99  this->unusedFlatBufferBuilders_.push_back(fbbP);
100  });*/
101 
102  }
103 
104  void shutdown(azmq::socket::shutdown_type sd,boost::system::error_code& ec){
105  socket_.shutdown(sd,ec);
106  }
107 
108  void cancel(){
109  socket_.cancel();
110  }
111 
112  /// destructor
113  /// @todo maybe do something useful with the error code from shutting down the socket?
115  doneReceiving_=true;
116  boost::system::error_code ec;
117  socket_.shutdown(azmq::socket::shutdown_type::send,ec);
118  socket_.shutdown(azmq::socket::shutdown_type::receive,ec);
119  socket_.cancel();
120 
121  }
122 
123  /// get a google FlatBufferBuilder object from the pool of unused objects
124  std::shared_ptr<flatbuffers::FlatBufferBuilder> GetUnusedBufferBuilder(){
125  std::lock_guard<std::mutex> lock(this->unusedFlatBufferBuildersLock_);
126  std::shared_ptr<flatbuffers::FlatBufferBuilder> back;
127  if (!unusedFlatBufferBuilders_.empty()) {
128  back = unusedFlatBufferBuilders_.back();
129  unusedFlatBufferBuilders_.pop_back();
130  } else {
131  /// @todo eliminate the need for this extra allocation, though this may be premature optimization
132  back = std::make_shared<flatbuffers::FlatBufferBuilder>();
133  }
134 
135  return back;
136  }
137 
138  /// Initializes the process of receiving
139  /// buffers from the source specified
140  /// by the azmq::socket taht was provided.
141  /// This initializes a loop that will continuously
142  /// read data and fill out the internal ring buffer
143  /// for users to extract. This allows this class to
144  /// run asynchronously while interacting with
145  /// synchronous users.
147  if(doneReceiving_){
148  doneReceiving_=false;
149  next_async_receive_buffers();
150  }
151  }
152 
153 private:
154  /// read the next set of data from the zeromq interface in a loop.
155  /// This relies on the io_service for the loop, so that the stack
156  /// doesn't get used up.
157  ///
158  /// @todo When the receiveBuffersWithData buffer is full, consider moving the oldest buffer to the "unused" buffer to save allocations.
159  void next_async_receive_buffers(){
160  receive_buffer_type rbP;
161  { // this bracket is important so the lock is released ASAP
162  std::lock_guard<std::mutex> lock(receiveBuffersLock_);
163  if (!unusedReceiveBuffers_.empty()) {
164  rbP = unusedReceiveBuffers_.back();
165  unusedReceiveBuffers_.pop_back();
166  } else {
167  /// @todo eliminate the need for this extra allocation
168  rbP = std::make_shared<receive_buffer_type::element_type>();
169  }
170 
171  }
172  // use the full capacity of the buffer
173  rbP->resize(rbP->capacity());
174 
175  auto self(shared_from_this());
176 
177  socket_.async_receive(boost::asio::buffer(&(rbP->begin()[0]),rbP->size()), [this,self,rbP](boost::system::error_code const ec, size_t bytes_transferred) {
178  if(ec) std::cout << "start_async_receive_buffers error! todo: figure out how to handle this\n";
179  // make rbp the size of the actual amount of data read
180  rbP->resize(std::min(bytes_transferred,rbP->capacity()));
181  std::lock_guard<std::mutex> lock(receiveBuffersLock_);
182  self->receiveBuffersWithData_.push_back(rbP);
183 
184  // run this function again *after* it returns
185  if(!doneReceiving_) socket_.get_io_service().post(std::bind(&AzmqFlatbuffer::next_async_receive_buffers,this));
186  });
187  }
188 public:
189 
190  /// Stop receiving buffers
192  doneReceiving_ = true;
193  }
194 
195 
196  /// @return true if there are no buffers that have been received via the socket, false otherwise
198  std::lock_guard<std::mutex> lock(receiveBuffersLock_);
199  return receiveBuffersWithData_.empty();
200  }
201 
202  /// @return The number of incoming buffers stored
203  std::size_t receive_buffers_size(){
204  std::lock_guard<std::mutex> lock(receiveBuffersLock_);
205  return receiveBuffersWithData_.size();
206  }
207 
208  /// @return The number of possible incoming buffers that can be stored
210  std::lock_guard<std::mutex> lock(receiveBuffersLock_);
211  return receiveBuffersWithData_.capacity();
212  }
213 
214  /// Inserts a range of values into the available receive buffers
215  ///
216  /// @pre p must be a valid iterator of *this in range [begin(), end()].
217  /// @pre distance(first, last) <= capacity()
218  /// @pre Iterator must meet the ForwardTraversalIterator concept.
219  template<typename T>
221  std::lock_guard<std::mutex> lock(receiveBuffersLock_);
222  unusedReceiveBuffers_.insert(unusedFlatBufferBuilders_.end(), range.begin(), range.end());
223  }
224 
225  /// Put an unused buffer, or one that is no longer needed, back into the pool
226  void push_back_unused_receive_buffer(receive_buffer_type rb){
227  std::lock_guard<std::mutex> lock(receiveBuffersLock_);
228  unusedReceiveBuffers_.push_back(rb);
229  }
230 
231  /// get the last, aka most chronologically recent, incoming buffer from the pool
232  receive_buffer_type get_back_receive_buffer_with_data(){
233  receive_buffer_type rbP;
234  std::lock_guard<std::mutex> lock(receiveBuffersLock_);
235  if (!receiveBuffersWithData_.empty()) {
236  rbP = receiveBuffersWithData_.back();
237  receiveBuffersWithData_.pop_back();
238  }
239  return rbP;
240  }
241 
242 
243  /// get the first, aka the chronologically oldest, incoming buffer from the pool
244  receive_buffer_type get_front_receive_buffer_with_data(){
245  receive_buffer_type rbP;
246  std::lock_guard<std::mutex> lock(receiveBuffersLock_);
247  if (!receiveBuffersWithData_.empty()) {
248  rbP = unusedReceiveBuffers_.front();
249  receiveBuffersWithData_.pop_front();
250  }
251  return rbP;
252  }
253 
254 
255  /// @brief Get all buffers from the pool at once and put them in an OutputIterator. This reduces locking/unlocking substantially.
256  template<typename OutputIterator>
257  void get_all_receive_buffers_with_data(OutputIterator it){
258  std::lock_guard<std::mutex> lock(receiveBuffersLock_);
259  std::copy(receiveBuffersWithData_.begin(), receiveBuffersWithData_.end(), it);
260  receiveBuffersWithData_.clear();
261  }
262 
263 private:
264 /// @todo may need to create io_service::work object here to keep io_service from exiting run() call
265  azmq::socket socket_;
266  boost::asio::io_service::strand strand_;
267 /// @todo it is a bit unsafe to use shared pointers, but I'm not sure if it is possible to std::move FlatBufferBuilders themselves, or unique ptrs into lambda functions
268  boost::circular_buffer<std::shared_ptr<flatbuffers::FlatBufferBuilder>> unusedFlatBufferBuilders_;
269  std::mutex unusedFlatBufferBuildersLock_;
270 
271 
272  boost::circular_buffer<receive_buffer_type> unusedReceiveBuffers_;
273  boost::circular_buffer<receive_buffer_type> receiveBuffersWithData_;
274  std::mutex receiveBuffersLock_;
275  std::atomic<bool> doneReceiving_;
276 };
277 
278 
279 #endif
std::size_t receive_buffers_capacity()
void insert_unused_receive_buffers(T &range)
STL namespace.
OutputIterator copy(std::string model, OutputIterator it, grl::revolute_joint_velocity_open_chain_state_constraint_tag)
copy vector of joint velocity limits in radians/s
Definition: Kuka.hpp:131
receive_buffer_type get_back_receive_buffer_with_data()
get the last, aka most chronologically recent, incoming buffer from the pool
void stop_async_receive_buffers()
Stop receiving buffers.
receive_buffer_type get_front_receive_buffer_with_data()
get the first, aka the chronologically oldest, incoming buffer from the pool
std::shared_ptr< flatbuffers::FlatBufferBuilder > GetUnusedBufferBuilder()
get a google FlatBufferBuilder object from the pool of unused objects
std::size_t receive_buffers_size()
void start_async_receive_buffers()
static const int default_circular_buffer_size
void shutdown(azmq::socket::shutdown_type sd, boost::system::error_code &ec)
bool receive_buffers_empty()
AzmqFlatbuffer(azmq::socket socket)
void get_all_receive_buffers_with_data(OutputIterator it)
Get all buffers from the pool at once and put them in an OutputIterator. This reduces locking/unlocki...
sends and receives flatbuffer data via AZMQ implementation of ZeroMQ plus manages the relevant buffer...
std::shared_ptr< boost::container::static_vector< uint8_t, 256 > > receive_buffer_type
void push_back_unused_receive_buffer(receive_buffer_type rb)
Put an unused buffer, or one that is no longer needed, back into the pool.
void async_send_flatbuffer(std::shared_ptr< flatbuffers::FlatBufferBuilder > fbbP)