ZMQManager.java
Go to the documentation of this file.
1 package grl;
2 
3 import java.nio.ByteBuffer;
4 
5 import org.zeromq.ZMQ;
6 
7 import com.kuka.task.ITaskLogger;
8 
9 /**
10  * @brief ZMQManager Handles the ZeroMQ/JeroMQ loading in flatbuffer data.
11  *
12  * @author Andrew Hundt
13  *
14  * @todo support sending data back to the controller PC
15  */
16 public class ZMQManager {
17  ZMQ.Context context = null;
18  ZMQ.Socket subscriber = null;
19  ITaskLogger logger;
20  String _ZMQ_MASTER_URI;
21 
22  int statesLength = 0;
23  long message_counter = 0;
24  long noMessageCounter = 0;
25  long noMessageCounterLimit = 9999999;
26  private grl.flatbuffer.KUKAiiwaStates _currentKUKAiiwaStates = null;
27  private grl.flatbuffer.KUKAiiwaState _currentKUKAiiwaState = null;
28  private grl.flatbuffer.KUKAiiwaState _previousKUKAiiwaState = null;
29  byte [] data = null;
30  ByteBuffer bb = null;
31  boolean stop;
32 
33  long startTime;
34  long elapsedTime;
35  long lastMessageStartTime;
36  long lastMessageElapsedTime;
37  long lastMessageTimeoutMilliseconds = 1000;
38 
39  int retriesAllowed = 3;
40  int retriesAttempted = 0;
41 
42  public ZMQManager(String ZMQ_MASTER_URI, ITaskLogger errorlogger) {
43  super();
44  this.logger = errorlogger;
45  _ZMQ_MASTER_URI = ZMQ_MASTER_URI;
46  }
47 
48  /**
49  * Blocks until a connection is established or stop() is called.
50  *
51  * @return error code: false on success, otherwise failure (or told to stop)
52  */
53  public boolean connect(){
54 
55  logger.info("Waiting for ZMQ connection initialization...");
56  this.context = ZMQ.context(1);
57  subscriber = context.socket(ZMQ.DEALER);
58  subscriber.connect(_ZMQ_MASTER_URI);
59  subscriber.setRcvHWM(100000);
60  startTime = System.currentTimeMillis();
61  elapsedTime = 0L;
62  grl.flatbuffer.KUKAiiwaStates newKUKAiiwaStates = null;
63  int newStatesLength = 0;
64 
65  while(newStatesLength<1 && newKUKAiiwaStates == null){
66  if((data = subscriber.recv(ZMQ.DONTWAIT))!=null){
67  bb = ByteBuffer.wrap(data);
68 
69  newKUKAiiwaStates = grl.flatbuffer.KUKAiiwaStates.getRootAsKUKAiiwaStates(bb);
70  newStatesLength = newKUKAiiwaStates.statesLength();
71  }
72 
73  if (stop) {
74  logger.info("Stopping program.");
75  return true; // asked to exit
76  }
77  }
78 
79  _currentKUKAiiwaStates = newKUKAiiwaStates;
80  statesLength = newStatesLength;
81 
82  logger.info("States initialized...");
83 
84  startTime = System.currentTimeMillis();
85  lastMessageStartTime = startTime;
86  lastMessageElapsedTime = System.currentTimeMillis() - lastMessageStartTime;
87  elapsedTime = 0L;
88 
89  return false; // no error
90  }
91 
92  /**
93  * Blocks until a connection is re-established or stop() is called.
94  *
95  * @return error code: false on success, otherwise failure (or told to stop)
96  */
97  public boolean reconnect(){
98  logger.info("Disconnecting...");
99  //subscriber.disconnect(_ZMQ_MASTER_URI);
100  //subscriber.close();
101  //context.close();
102  return false;// this.connect();
103  }
104 
106  {
107  boolean haveNextMessage = false;
108  while(!stop && !haveNextMessage) {
109 
110 // elapsedTime = System.currentTimeMillis() - startTime;
111 // lastMessageElapsedTime = System.currentTimeMillis() - lastMessageStartTime;
112 //
113 // if(lastMessageElapsedTime > lastMessageTimeoutMilliseconds)
114 // {
115 // retriesAttempted++;
116 // logger.error("Message rate timeout occurred... ZMQ connection may be dead. Retrying first. \nAttempting to restart connection...\n");
117 //
118 //
119 // if(retriesAttempted > retriesAllowed){
120 // logger.error("Attempting to restart connection...\n");
121 // this.reconnect();
122 // } else {
123 // lastMessageStartTime = System.currentTimeMillis();
124 // }
125 //
126 // }
127 //
128 // else if (noMessageCounter > noMessageCounterLimit)
129 // {
130 // logger.error("ZMQ connection seems dead, messages arrive empty.\nAttempting to restart connection...\n");
131 // this.reconnect();
132 // }
133 
134 
135  if((data = subscriber.recv(ZMQ.DONTWAIT))!=null){
136  /// TODO: BUG! noMessageCounter is always set to 0 here and only incremented below, so it will only ever be 0 or 1
137 
138  message_counter+=1;
139  bb = ByteBuffer.wrap(data);
140 
141  _currentKUKAiiwaStates = grl.flatbuffer.KUKAiiwaStates.getRootAsKUKAiiwaStates(bb, _currentKUKAiiwaStates);
142 
143  if(_currentKUKAiiwaStates.statesLength()>0) {
144  // initialize the fist state
145  grl.flatbuffer.KUKAiiwaState tmp = _currentKUKAiiwaStates.states(0);
146  if (tmp == null || tmp.armControlState() == null) {
147  noMessageCounter +=1;
148  if (message_counter % 100 == 0) {
149  logger.warn("NULL ArmControlState message, main ZMQ message is arriving but doesn't contain any data/commands!");
150  }
151  continue;
152  } else {
153  _previousKUKAiiwaState = _currentKUKAiiwaState;
154  _currentKUKAiiwaState = tmp;
155  }
156 
157  if (_currentKUKAiiwaState == null) {
158  noMessageCounter+=1;
159  logger.error("Missing current state message!");
160  continue;
161  }
162 
163  haveNextMessage=true;
164  noMessageCounter = 0;
165  lastMessageStartTime = System.currentTimeMillis();
166  } else {
167  logger.error("got a ZMQ message but it isn't a valid message, this is an unexpected state that shouldn't occur. please debug me.");
168  }
169  }
170 
171  }
172 
173  return _currentKUKAiiwaState;
174  }
175 
177  return _currentKUKAiiwaState;
178  }
179 
181  return _previousKUKAiiwaState;
182  }
183 
184 
185 
186  public boolean isStop() {
187  return stop;
188  }
189 
190  public void setStop(boolean stop) {
191  if(stop)
192  {
193  // done
194  subscriber.close();
195  context.term();
196  this.stop = stop;
197 
198  }
199  }
200 
201  public void stop(){
202  setStop(true);
203  }
204 
205 }
grl.flatbuffer.KUKAiiwaState waitForNextMessage()
boolean reconnect()
Definition: ZMQManager.java:97
void setStop(boolean stop)
grl.flatbuffer.KUKAiiwaState getCurrentMessage()
boolean isStop()
ZMQManager(String ZMQ_MASTER_URI, ITaskLogger errorlogger)
Definition: ZMQManager.java:42
grl.flatbuffer.KUKAiiwaState getPrevMessage()
boolean connect()
Definition: ZMQManager.java:53
ZMQManager Handles the ZeroMQ/JeroMQ loading in flatbuffer data.
Definition: ZMQManager.java:16