1 #ifndef _AZMQ_FLATBUFFER_ 2 #define _AZMQ_FLATBUFFER_ 9 #include <boost/asio.hpp> 10 #include <boost/asio/io_service.hpp> 11 #include <boost/asio/ip/tcp.hpp> 12 #include <boost/asio/spawn.hpp> 13 #include <boost/asio/steady_timer.hpp> 14 #include <boost/asio/write.hpp> 15 #include <boost/asio/signal_set.hpp> 16 #include <boost/circular_buffer.hpp> 17 #include <boost/container/static_vector.hpp> 20 #include <azmq/socket.hpp> 21 #include <flatbuffers/flatbuffers.h> 52 : socket_(
std::move(socket)),
53 strand_(socket_.get_io_service()),
54 unusedFlatBufferBuilders_(default_circular_buffer_size),
55 unusedReceiveBuffers_(default_circular_buffer_size),
56 receiveBuffersWithData_(default_circular_buffer_size),
60 unusedFlatBufferBuilders_.push_back(std::make_shared<flatbuffers::FlatBufferBuilder>());
61 unusedReceiveBuffers_.push_back(std::make_shared<receive_buffer_type::element_type>());
73 auto self(shared_from_this());
76 socket_.async_send(boost::asio::buffer(fbbP->GetBufferPointer(), fbbP->GetSize()), [
this,
self,fbbP] (boost::system::error_code
const& ec,
size_t bytes_transferred) {
77 if(ec) std::cout <<
"SendFlatBuffer error! todo: figure out how to handle this\n";
78 std::lock_guard<std::mutex> lock(this->unusedFlatBufferBuildersLock_);
80 this->unusedFlatBufferBuilders_.push_back(fbbP);
88 size_t res = socket_.send(boost::asio::buffer(fbbP->GetBufferPointer(), fbbP->GetSize()));
92 std::cerr <<
"empty\n";
104 void shutdown(azmq::socket::shutdown_type sd,boost::system::error_code& ec){
105 socket_.shutdown(sd,ec);
116 boost::system::error_code ec;
117 socket_.shutdown(azmq::socket::shutdown_type::send,ec);
118 socket_.shutdown(azmq::socket::shutdown_type::receive,ec);
125 std::lock_guard<std::mutex> lock(this->unusedFlatBufferBuildersLock_);
126 std::shared_ptr<flatbuffers::FlatBufferBuilder> back;
127 if (!unusedFlatBufferBuilders_.empty()) {
128 back = unusedFlatBufferBuilders_.back();
129 unusedFlatBufferBuilders_.pop_back();
132 back = std::make_shared<flatbuffers::FlatBufferBuilder>();
148 doneReceiving_=
false;
149 next_async_receive_buffers();
159 void next_async_receive_buffers(){
160 receive_buffer_type rbP;
162 std::lock_guard<std::mutex> lock(receiveBuffersLock_);
163 if (!unusedReceiveBuffers_.empty()) {
164 rbP = unusedReceiveBuffers_.back();
165 unusedReceiveBuffers_.pop_back();
168 rbP = std::make_shared<receive_buffer_type::element_type>();
173 rbP->resize(rbP->capacity());
175 auto self(shared_from_this());
177 socket_.async_receive(boost::asio::buffer(&(rbP->begin()[0]),rbP->size()), [
this,
self,rbP](boost::system::error_code
const ec,
size_t bytes_transferred) {
178 if(ec) std::cout <<
"start_async_receive_buffers error! todo: figure out how to handle this\n";
180 rbP->resize(std::min(bytes_transferred,rbP->capacity()));
181 std::lock_guard<std::mutex> lock(receiveBuffersLock_);
182 self->receiveBuffersWithData_.push_back(rbP);
185 if(!doneReceiving_) socket_.get_io_service().post(std::bind(&AzmqFlatbuffer::next_async_receive_buffers,
this));
192 doneReceiving_ =
true;
198 std::lock_guard<std::mutex> lock(receiveBuffersLock_);
199 return receiveBuffersWithData_.empty();
204 std::lock_guard<std::mutex> lock(receiveBuffersLock_);
205 return receiveBuffersWithData_.size();
210 std::lock_guard<std::mutex> lock(receiveBuffersLock_);
211 return receiveBuffersWithData_.capacity();
221 std::lock_guard<std::mutex> lock(receiveBuffersLock_);
222 unusedReceiveBuffers_.insert(unusedFlatBufferBuilders_.end(), range.begin(), range.end());
227 std::lock_guard<std::mutex> lock(receiveBuffersLock_);
228 unusedReceiveBuffers_.push_back(rb);
233 receive_buffer_type rbP;
234 std::lock_guard<std::mutex> lock(receiveBuffersLock_);
235 if (!receiveBuffersWithData_.empty()) {
236 rbP = receiveBuffersWithData_.back();
237 receiveBuffersWithData_.pop_back();
245 receive_buffer_type rbP;
246 std::lock_guard<std::mutex> lock(receiveBuffersLock_);
247 if (!receiveBuffersWithData_.empty()) {
248 rbP = unusedReceiveBuffers_.front();
249 receiveBuffersWithData_.pop_front();
256 template<
typename OutputIterator>
258 std::lock_guard<std::mutex> lock(receiveBuffersLock_);
259 std::copy(receiveBuffersWithData_.begin(), receiveBuffersWithData_.end(), it);
260 receiveBuffersWithData_.clear();
265 azmq::socket socket_;
266 boost::asio::io_service::strand strand_;
268 boost::circular_buffer<std::shared_ptr<flatbuffers::FlatBufferBuilder>> unusedFlatBufferBuilders_;
269 std::mutex unusedFlatBufferBuildersLock_;
272 boost::circular_buffer<receive_buffer_type> unusedReceiveBuffers_;
273 boost::circular_buffer<receive_buffer_type> receiveBuffersWithData_;
274 std::mutex receiveBuffersLock_;
275 std::atomic<bool> doneReceiving_;
std::size_t receive_buffers_capacity()
void insert_unused_receive_buffers(T &range)
OutputIterator copy(std::string model, OutputIterator it, grl::revolute_joint_velocity_open_chain_state_constraint_tag)
copy vector of joint velocity limits in radians/s
receive_buffer_type get_back_receive_buffer_with_data()
get the last, aka most chronologically recent, incoming buffer from the pool
void stop_async_receive_buffers()
Stop receiving buffers.
receive_buffer_type get_front_receive_buffer_with_data()
get the first, aka the chronologically oldest, incoming buffer from the pool
std::shared_ptr< flatbuffers::FlatBufferBuilder > GetUnusedBufferBuilder()
get a google FlatBufferBuilder object from the pool of unused objects
std::size_t receive_buffers_size()
void start_async_receive_buffers()
static const int default_circular_buffer_size
void shutdown(azmq::socket::shutdown_type sd, boost::system::error_code &ec)
bool receive_buffers_empty()
AzmqFlatbuffer(azmq::socket socket)
void get_all_receive_buffers_with_data(OutputIterator it)
Get all buffers from the pool at once and put them in an OutputIterator. This reduces locking/unlocki...
sends and receives flatbuffer data via AZMQ implementation of ZeroMQ plus manages the relevant buffer...
std::shared_ptr< boost::container::static_vector< uint8_t, 256 > > receive_buffer_type
void push_back_unused_receive_buffer(receive_buffer_type rb)
Put an unused buffer, or one that is no longer needed, back into the pool.
void async_send_flatbuffer(std::shared_ptr< flatbuffers::FlatBufferBuilder > fbbP)