AERA
tcp_io_device.cpp
1 //_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/
2 //_/_/
3 //_/_/ AERA
4 //_/_/ Autocatalytic Endogenous Reflective Architecture
5 //_/_/
6 //_/_/ Copyright (c) 2018-2025 Jeff Thompson
7 //_/_/ Copyright (c) 2018-2025 Kristinn R. Thorisson
8 //_/_/ Copyright (c) 2018-2025 Icelandic Institute for Intelligent Machines
9 //_/_/ Copyright (c) 2021 Leonard Eberding
10 //_/_/ http://www.iiim.is
11 //_/_/
12 //_/_/ Copyright (c) 2010-2012 Eric Nivel
13 //_/_/ Center for Analysis and Design of Intelligent Agents
14 //_/_/ Reykjavik University, Menntavegur 1, 102 Reykjavik, Iceland
15 //_/_/ http://cadia.ru.is
16 //_/_/
17 //_/_/ Part of this software was developed by Eric Nivel
18 //_/_/ in the HUMANOBS EU research project, which included
19 //_/_/ the following parties:
20 //_/_/
21 //_/_/ Autonomous Systems Laboratory
22 //_/_/ Technical University of Madrid, Spain
23 //_/_/ http://www.aslab.org/
24 //_/_/
25 //_/_/ Communicative Machines
26 //_/_/ Edinburgh, United Kingdom
27 //_/_/ http://www.cmlabs.com/
28 //_/_/
29 //_/_/ Istituto Dalle Molle di Studi sull'Intelligenza Artificiale
30 //_/_/ University of Lugano and SUPSI, Switzerland
31 //_/_/ http://www.idsia.ch/
32 //_/_/
33 //_/_/ Institute of Cognitive Sciences and Technologies
34 //_/_/ Consiglio Nazionale delle Ricerche, Italy
35 //_/_/ http://www.istc.cnr.it/
36 //_/_/
37 //_/_/ Dipartimento di Ingegneria Informatica
38 //_/_/ University of Palermo, Italy
39 //_/_/ http://diid.unipa.it/roboticslab/
40 //_/_/
41 //_/_/
42 //_/_/ --- HUMANOBS Open-Source BSD License, with CADIA Clause v 1.0 ---
43 //_/_/
44 //_/_/ Redistribution and use in source and binary forms, with or without
45 //_/_/ modification, is permitted provided that the following conditions
46 //_/_/ are met:
47 //_/_/ - Redistributions of source code must retain the above copyright
48 //_/_/ and collaboration notice, this list of conditions and the
49 //_/_/ following disclaimer.
50 //_/_/ - Redistributions in binary form must reproduce the above copyright
51 //_/_/ notice, this list of conditions and the following disclaimer
52 //_/_/ in the documentation and/or other materials provided with
53 //_/_/ the distribution.
54 //_/_/
55 //_/_/ - Neither the name of its copyright holders nor the names of its
56 //_/_/ contributors may be used to endorse or promote products
57 //_/_/ derived from this software without specific prior
58 //_/_/ written permission.
59 //_/_/
60 //_/_/ - CADIA Clause: The license granted in and to the software
61 //_/_/ under this agreement is a limited-use license.
62 //_/_/ The software may not be used in furtherance of:
63 //_/_/ (i) intentionally causing bodily injury or severe emotional
64 //_/_/ distress to any person;
65 //_/_/ (ii) invading the personal privacy or violating the human
66 //_/_/ rights of any person; or
67 //_/_/ (iii) committing or preparing for any act of war.
68 //_/_/
69 //_/_/ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
70 //_/_/ CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
71 //_/_/ INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
72 //_/_/ MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
73 //_/_/ DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
74 //_/_/ CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
75 //_/_/ SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
76 //_/_/ BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
77 //_/_/ SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
78 //_/_/ INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
79 //_/_/ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
80 //_/_/ NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
81 //_/_/ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
82 //_/_/ OF SUCH DAMAGE.
83 //_/_/
84 //_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/
85 
86 #include "tcp_io_device.h"
87 
88 using namespace std;
89 using namespace std::chrono;
90 using namespace r_code;
91 using namespace r_exec;
92 
93 
94 #ifndef ENABLE_PROTOBUF
95 namespace tcp_io_device {
96  template<class O, class S> TcpIoDevice<O, S>::TcpIoDevice() : MemExec<O, S>()
97  {
98  cout << "\n> ERROR: Trying to use the TcpIoDevice without setting ENABLE_PROTOBUF flag in the beginning of main.cpp" << endl;
99  }
100  template<class O, class S> TcpIoDevice<O, S>::TcpIoDevice(int number_of_servers, int number_of_clients, std::vector<std::pair<std::string, std::string> > server_configurations, std::vector<std::string> client_configurations) : MemExec<O, S>()
101  {
102  cout << "\n> ERROR: Trying to use the TcpIoDevice without setting ENABLE_PROTOBUF flag in the beginning of main.cpp" << endl;
103  }
104 
105  template<class O, class S>
106  TcpIoDevice<O, S>::~TcpIoDevice() {}
107 
108  template<class O, class S>
110  {
111  return 1;
112  }
113 
116 }
117 #else
118 
119 #include "AERA_Protobuf/utils.h"
120 #include "AERA_Protobuf/tcp_data_message.pb.h"
121 
122 namespace tcp_io_device {
123 
124  template<class O, class S> TcpIoDevice<O, S>::TcpIoDevice(
125  int number_of_servers,
126  int number_of_clients,
127  std::vector<std::pair<std::string, std::string> > server_configurations,
128  std::vector<std::string> client_configurations) :
129  MemExec<O, S>(),
130  number_of_servers_(number_of_servers),
131  number_of_clients_(number_of_clients),
132  server_configurations_(server_configurations),
133  client_configurations_(client_configurations)
134  {
135 
136  timeTickThread_ = 0;
137  lastInjectTime_ = Timestamp(seconds(0));
138  lastCommandTime_ = Timestamp(seconds(0));
139 
140  receive_queue_ = std::make_shared<SafeQueue>(number_of_servers + number_of_clients);
141  send_queue_ = std::make_shared<SafeQueue>(100);
142 
143  tcp_connections_.push_back(new TCPConnection(receive_queue_, send_queue_, 8));
144 
145  started_ = false;
146  }
147 
148  template<class O, class S>
149  TcpIoDevice<O, S>::~TcpIoDevice() {
150 
151  for (auto it = tcp_connections_.begin(); it != tcp_connections_.end();) {
152  delete *it;
153  it = tcp_connections_.erase(it);
154  }
155 
156  // Not sure if those are needed, I guess the ownership lies somewhere else, right?
157  // Delete entities
158  /*for (auto it = objects_.begin(); it != objects_.end(); ++it) {
159  delete it->second;
160  }
161 
162  // Delete objects
163  for (auto it = entities_.begin(); it != entities_.end(); ++it) {
164  delete it->second;
165  } */
166 
167  if (timeTickThread_) {
168  delete timeTickThread_;
169  }
170  }
171 
172  template<class O, class S>
174  {
175  int err = 0;
176  for (int i = 0; i < number_of_servers_; ++i) {
177  err = tcp_connections_[i]->establishConnection(server_configurations_[i].first, server_configurations_[i].second);
178  if (err != 0)
179  {
180  return err;
181  }
182  }
183  for (int i = number_of_servers_; i < number_of_servers_ + number_of_clients_; ++i) {
184  err = tcp_connections_[i]->listenAndAwaitConnection(client_configurations_[i-number_of_servers_]);
185  if (err != 0)
186  {
187  return err;
188  }
189  }
190  for (auto it = tcp_connections_.begin(); it != tcp_connections_.end(); ++it) {
191  (*it)->start();
192  }
193  // Wait for a SetupMessage from the client.
194  while (true) {
195  auto msg = std::move(receive_queue_->dequeue());
196  if (!msg) {
197  continue;
198  }
199  if (msg->messagetype() != msg->SETUP) {
200  continue;
201  }
202  handleSetupMessage(std::move(msg));
203  break;
204  }
205  return err;
206  }
207 
208 
209  template<class O, class S>
210  bool TcpIoDevice<O, S>::load(const vector<r_code::Code*>* objects, uint32 stdin_oid, uint32 stdout_oid, uint32 self_oid)
211  {
212  // Call the method in the parent class.
213  if (!MemExec<O, S>::load(objects, stdin_oid, stdout_oid, self_oid)) {
214  return false;
215  }
216 
217  // Load entities
218  cout << "> Loading entities:" << endl;
219  for (auto it = entities_.begin(); it != entities_.end(); ++it) {
220  it->second = _Mem::find_object(objects, &((it->first)[0]));
221  cout << it->first << ":\t" << it->second->get_oid() << endl;
222  }
223 
224  // Load objects
225  cout << "> Loading objects:" << endl;
226  for (auto it = objects_.begin(); it != objects_.end(); ++it) {
227  it->second = _Mem::find_object(objects, &((it->first)[0]));
228  cout << it->first << ":\t" << it->second->get_oid() << endl;
229  }
230 
231  // Load command op-codes
232  cout << "> Loading commands:" << endl;
233  for (auto it = commands_.begin(); it != commands_.end(); ++it) {
234  it->second = r_exec::GetOpcode(&((it->first)[0]));
235  cout << it->first << ":\t" << it->second << endl;
236  }
237 
238  return true;
239  }
240 
241 
242 
243  template<class O, class S>
244  r_code::Code* TcpIoDevice<O, S>::eject(r_code::Code* command)
245  {
246  uint16 function = (command->code(CMD_FUNCTION).atom_ >> 8) & 0x000000FF;
247 
248  uint16 args_set_index = command->code(CMD_ARGS).asIndex();
249  if (function == commands_["ready"]) {
250  if (command->code_size() < 2 || command->code(args_set_index + 1).getDescriptor() != Atom::I_PTR ||
251  command->code(command->code(args_set_index + 1).asIndex()).getDescriptor() != Atom::STRING) {
252  cout << "> WARNING: Cannot get identifier as string" << endl;
253  return NULL;
254  }
255 
256  string identifier = Utils::GetString(&command->code(command->code(args_set_index + 1).asIndex()));
257  if (!started_) {
258  cout << "I/O device is ready for " << identifier << endl;
259  startTimeTickThread();
260  }
261  return command;
262  }
263  // Not ready command, therefore go through all commands and find the appropriate one
264  for (auto cmd = commands_.begin(); cmd != commands_.end(); ++cmd) {
265  if (cmd->second != function) {
266  continue;
267  }
268  Code* obj = command->get_reference(command->code(args_set_index + 1).asIndex());
269  for (auto it = entities_.begin(); it != entities_.end(); ++it) {
270  if (it->second != obj) {
271  continue;
272  }
273  tcp_io_device::MsgData msg = constructMessageFromCommand(cmd->first, it->first, command);
274  if (!msg.isValid()) {
275  cout << "Could not create message from ejected command" << endl;
276  return NULL;
277  }
278  sendDataMessage(msg);
279  break;
280  }
281  return command;
282  }
283 
284  return NULL;
285  }
286 
287  template<class O, class S>
288  tcp_io_device::MsgData TcpIoDevice<O, S>::constructMessageFromCommand(string cmd_identifier, string entity, r_code::Code* cmd)
289  {
290  // Get the stored meta data received by the SetupMessage during establishConnection with the correct identifier.
291  std::map<string, MetaData>::iterator meta_data_it = meta_data_map_.find(cmd_identifier);
292  if (meta_data_it == meta_data_map_.end()) {
293  cout << "> WARNING: Could not find cmd identifier " << cmd_identifier << " in the MetaData of available commands!" << endl;
294  return tcp_io_device::MsgData::invalidMsgData();
295  }
296 
297  MetaData stored_meta_data = meta_data_it->second;
298  uint16 args_set_index = cmd->code(CMD_ARGS).asIndex();
299 
300  if (cmd->code(args_set_index).getAtomCount() == 1) {
301  // std::cout << "Cmd without values ejected." << std::endl;
302  return tcp_io_device::MsgData::createNewMsgData(stored_meta_data);
303  }
304 
305  tcp_io_device::MsgData msg_data = tcp_io_device::MsgData::invalidMsgData();
306  // Default values for OpCodeHandle == "" and dimensionality of 1.
307  int start = args_set_index + 2;
308  int end = args_set_index + 3;
309 
310  // Otherwise retrieve the start and end indices by de-constructing the r_code::Code object.
311  if (stored_meta_data.getOpCodeHandle() != "" ||
312  (stored_meta_data.getDimensions() != std::vector<uint64_t>({ 1 }) && stored_meta_data.getDimensions() != std::vector<uint64_t>({ 1, 1 }))) {
313  if (cmd->code(args_set_index + 2).getDescriptor() != Atom::I_PTR) {
314  std::cout << "ERROR: Ejected command with OpCodeHandle \"" << stored_meta_data.getOpCodeHandle() << "\" with dimensionality > 1 and r_code object without the necessary nesting." << std::endl;
315  return tcp_io_device::MsgData::invalidMsgData();
316  }
317  int set_index = cmd->code(args_set_index + 2).asIndex();
318  // @todo Check whether dimensionality of stored_meta_data fits the atom count of the set of the r_code Object.
319  start = set_index + 1;
320  end = start + cmd->code(set_index).getAtomCount();
321 #if 1
322  if (cmd->code(set_index).asOpcode() == GetOpcode("quat")) {
323  double _1 = cmd->code(start) < 0 ? -1 : 1;
324  for (int i = start; i < end; ++i) {
325  cmd->code(i) = Atom::Float(cmd->code(i).asFloat() * _1);
326  }
327  }
328 #endif
329  }
330 
331  std::cout << "Eject cmd: " << cmd_identifier << ", entity: " << entity << ", Value(s): "; // << cmd->code(args_set_index + 2).asFloat() << std::endl;
332  for (int i = start; i < end; ++i) {
333  std::cout << cmd->code(i).asFloat() << " ";
334  }
335  std::cout << std::endl;
336 
337  switch (auto t = stored_meta_data.getType()) {
338  case VariableDescription_DataType_BOOL:
339  {
340  std::vector<bool> data = getDataVec<bool>(cmd, start, end, t);
341  msg_data = tcp_io_device::MsgData::createNewMsgData(stored_meta_data, data);
342  break;
343  }
344  case VariableDescription_DataType_INT64:
345  {
346  std::vector<int64_t> data = getDataVec<int64_t>(cmd, start, end, t);
347  msg_data = tcp_io_device::MsgData::createNewMsgData(stored_meta_data, data);
348  break;
349  }
350  case VariableDescription_DataType_DOUBLE:
351  {
352  std::vector<double> data = getDataVec<double>(cmd, start, end, t);
353  msg_data = tcp_io_device::MsgData::createNewMsgData(stored_meta_data, data);
354  break;
355  }
356  case VariableDescription_DataType_COMMUNICATION_ID:
357  {
358  std::vector<communication_id_t> data = getDataVec<communication_id_t>(cmd, start, end, t);
359  msg_data = tcp_io_device::MsgData::createNewMsgData(stored_meta_data, data);
360  break;
361  }
362  case VariableDescription_DataType_STRING:
363  {
364  // std::vector<bool> data = getDataVec<bool>(cmd, start, end, t);
365  // msg_data.push_back(tcp_io_device::MsgData::createNewMsgData(stored_meta_data, data));
366  // TODO: Using the .asIndex() function this can be done (See the Utils::GetString())
367  cout << "> WARNING: String type not implemented, yet" << endl;
368  return tcp_io_device::MsgData::invalidMsgData();
369  break;
370  }
371  }
372  return msg_data;
373  }
374 
375  template<class O, class S>
376  template<class T>
377  std::vector<T> TcpIoDevice<O, S>::getDataVec(r_code::Code* cmd, int start_index, int end_index, tcp_io_device::VariableDescription_DataType type) {
378  std::vector<T> data;
379  switch (type) {
380  case VariableDescription_DataType_BOOL:
381  for (int i = start_index; i < end_index; ++i) {
382  data.push_back(cmd->code(i).asBoolean());
383  }
384  break;
385  case VariableDescription_DataType_INT64:
386  for (int i = start_index; i < end_index; ++i) {
387  data.push_back((int)cmd->code(i).asFloat());
388  }
389  break;
390  case VariableDescription_DataType_DOUBLE:
391  for (int i = start_index; i < end_index; ++i) {
392  data.push_back(cmd->code(i).asFloat());
393  // std::cout << "Debug DataType Double trace:" << cmd->trace_string() << std::endl;
394  }
395  break;
396  case VariableDescription_DataType_COMMUNICATION_ID:
397  for (int i = start_index; i < end_index; ++i) {
398  // std::cout << "Start: " << start_index << " End:" << end_index << std::endl;
399  // std::cout << "Debug test of Communication_id. 1) cmd trace " << cmd->trace_string() << std::endl;
400  if (cmd->code(i).getDescriptor() != Atom::R_PTR) {
401  // @todo This isn't an R_PTR whysoever...
402  std::cout << "WARNING: Got command which should include a communicaiton id which points to another object without R_PTR. Ignoring it." << std::endl;;
403  continue;
404  }
405  r_code::Code* reference = cmd->get_reference(cmd->code(i).asIndex());
406  for (auto it = entities_.begin(); it != entities_.end(); ++it) {
407  if (it->second != reference) {
408  continue;
409  }
410  // @todo Need to implement this once the proper object is found earlier.
411  }
412  data.push_back(cmd->code(i).asFloat());
413  }
414  break;
415  case VariableDescription_DataType_STRING:
416  // for (int i = start_index; i < end_index; ++i) {
417  // data.push_back(cmd->code(i).asBoolean());
418  // }
419  break;
420  }
421  return data;
422  }
423 
424  template<class O, class S>
425  void TcpIoDevice<O, S>::on_time_tick()
426  {
427  auto now = r_exec::Now();
428  if (now < lastInjectTime_ + _Mem::get_sampling_period() * 8 / 10) {
429  return;
430  }
431  // Dequeue a msg from the receive_queue_.
432  auto msg = std::move(receive_queue_->dequeue());
433  // If in diagnostic mode wait for a new message to be received.
434  if (_Mem::reduction_core_count_ == 0 && _Mem::time_core_count_ == 0 && started_) {
435  while (!msg) {
436  msg = std::move(receive_queue_->dequeue());
437  }
438  }
439  else {
440  // If not in diagnostic mode simply step continue if no message was received
441  if (!msg) {
442  return;
443  }
444  }
445  std::cout << "Received message of type " << msg->messagetype() << std::endl;
446 
447  handleMessage(std::move(msg));
448  lastInjectTime_ = now;
449  }
450 
451  template<class O, class S>
452  void TcpIoDevice<O, S>::startTimeTickThread()
453  {
454 
455  // Send start message to environment
456  std::unique_ptr<TCPMessage> msg = std::make_unique<TCPMessage>();
457  msg->set_messagetype(TCPMessage_Type_START);
458  sendMessage(std::move(msg));
459  started_ = true;
460 
461  if (_Mem::reduction_core_count_ == 0 && _Mem::time_core_count_ == 0) {
462  // We don't need a timeTickThread for diagnostic time.
463  return;
464  }
465  if (timeTickThread_) {
466  cout << "timeTickThread already running" << endl;
467  // We already started the thread.
468  return;
469  }
470 
471  // We are running in real time. on_diagnostic_time_tick() will not be called.
472  // Set up a timer thread to call on_time_tick().
473  timeTickThread_ = Thread::New<_Thread>(timeTickRun, this);
474  }
475 
476  template<class O, class S>
477  thread_ret thread_function_call TcpIoDevice<O, S>::timeTickRun(void* args)
478  {
479  TcpIoDevice<O, S>* self = (TcpIoDevice*)args;
480 
481  auto sampling_period = _Mem::Get()->get_sampling_period();
482  auto tickTime = r_exec::Now();
483  // Call on_time_tick at the sampling period.
484  while (self->state_ == _Mem::RUNNING) {
485  self->on_time_tick();
486 
487  tickTime += sampling_period;
488  Thread::Sleep(tickTime - r_exec::Now());
489  }
490  for (auto it = self->tcp_connections_.begin(); it != self->tcp_connections_.end(); ++it) {
491  (*it)->stop();
492  }
493 
494  thread_ret_val(0);
495  }
496 
497 
498  template<class O, class S>
499  void TcpIoDevice<O, S>::sendDataMessage(tcp_io_device::MsgData msg_data) {
500  std::unique_ptr<tcp_io_device::TCPMessage> msg = std::make_unique<tcp_io_device::TCPMessage>();
501  msg->set_messagetype(tcp_io_device::TCPMessage::DATA);
502  tcp_io_device::DataMessage* data_msg = msg->mutable_datamessage();
503  tcp_io_device::ProtoVariable* var = data_msg->add_variables();
504  msg_data.toMutableProtoVariable(var);
505  sendMessage(std::move(msg));
506  }
507 
508  template<class O, class S>
509  void TcpIoDevice<O, S>::sendMessage(std::unique_ptr<TCPMessage> msg) {
510  // Simply enqueue the message to send and let the TCPConnection do the actual sending.
511  std::cout << "Sending Message of type " << msg->messagetype() << std::endl;
512  send_queue_->enqueue(std::move(msg));
513  }
514 
515 
516  template<class O, class S>
517  void TcpIoDevice<O, S>::handleMessage(std::unique_ptr<TCPMessage> msg)
518  {
519  switch (msg->messagetype()) {
520  case TCPMessage_Type_SETUP:
521  handleSetupMessage(std::move(msg));
522  break;
523  case TCPMessage_Type_DATA:
524  handleDataMessage(std::move(msg));
525  break;
526  default:
527  cout << "> WARNING: Received Message of different type than SETUP or DATA" << endl;
528  }
529  }
530 
531  template<class O, class S>
532  void TcpIoDevice<O, S>::handleDataMessage(std::unique_ptr<TCPMessage> data_msg)
533  {
534  // @todo: Need to change it to actual receive time
535  auto now = r_exec::Now();
536  auto data = data_msg->release_datamessage();
537  // std::cout << "Received data message. Injecting received data:" << std::endl;
538  for (int i = 0; i < data->variables_size(); ++i) {
539  MsgData var(&(data->variables(i)));
540  auto entity = entities_[id_mapping_[var.getMetaData().getEntityID()]];
541  auto obj = objects_[id_mapping_[var.getMetaData().getID()]];
542  // std::cout << "Variable " << i << ":" << std::endl;
543  // std::cout << "Entity: " << id_mapping_[var.getMetaData().getEntityID()] << std::endl;
544  // std::cout << "Property: " << id_mapping_[var.getMetaData().getID()] << std::endl;
545  // std::cout << "Value(s):" << std::endl;
546 
547  if (var.getMetaData().getType() == VariableDescription_DataType_DOUBLE)
548  {
549  if (var.getMetaData().getOpCodeHandle() == "") {
550  injectDefault<double>(entity, obj, var.getData<double>(), now);
551  continue;
552  }
553  else if (var.getMetaData().getOpCodeHandle() == "set") {
554  injectSet<double>(entity, obj, var.getData<double>(), now);
555  continue;
556  }
557  else {
558  injectOpCode<double>(entity, obj, var.getData<double>(), now, var.getMetaData().getOpCodeHandle());
559  continue;
560  }
561  }
562  else if (var.getMetaData().getType() == VariableDescription_DataType_INT64)
563  {
564  if (var.getMetaData().getOpCodeHandle() == "") {
565  injectDefault<int64_t>(entity, obj, var.getData<int64_t>(), now);
566  continue;
567  }
568  else if (var.getMetaData().getOpCodeHandle() == "set") {
569  injectSet<int64_t>(entity, obj, var.getData<int64_t>(), now);
570  continue;
571  }
572  else {
573  injectOpCode<int64_t>(entity, obj, var.getData<int64_t>(), now, var.getMetaData().getOpCodeHandle());
574  continue;
575  }
576  }
577  else if (var.getMetaData().getType() == VariableDescription_DataType_COMMUNICATION_ID) {
578  int64_t val = var.getData<int64_t>()[0];
579  std::vector<r_code::Code*> value;
580  if (val != -1) {
581  if (id_mapping_.find(val) == id_mapping_.end())
582  {
583  std::cout << "WARNING: Received message with unknown Communication ID" << std::endl;
584  continue;
585  }
586  std::string object_entity = id_mapping_[val];
587  // std::cout << object_entity << std::endl;
588  if (entities_.find(object_entity) == entities_.end())
589  {
590  std::cout << "WARNING: Received message with uninitalized entity, this should never happen!" << std::endl;
591  continue;
592  }
593  value.push_back(entities_[object_entity]);
594  }
595  _Mem::inject_marker_value_from_io_device(entity, obj, value, now, now + _Mem::get_sampling_period(), r_exec::View::SYNC_PERIODIC, _Mem::get_stdin());
596  continue;
597  }
598  else if (var.getMetaData().getType() == VariableDescription_DataType_STRING) {
600  std::string val = var.getData<std::string>()[0];
601  _Mem::inject_marker_value_from_io_device(entity, obj, val, now, now + _Mem::get_sampling_period(), r_exec::View::SYNC_PERIODIC, _Mem::get_stdin());
602 
603  }
604  }
605  }
606 
607  template<class O, class S>
608  template<class V>
609  void TcpIoDevice<O, S>::injectDefault(r_code::Code* entity, r_code::Code* object, std::vector<V> vals, core::Timestamp time) {
610  if (vals.size() == 0) {
611  // std::cout << "[]" << std::endl;
612  _Mem::inject_marker_value_from_io_device(entity, object, std::vector<r_code::Code*>(), time, time + _Mem::get_sampling_period(), r_exec::View::SYNC_PERIODIC, _Mem::get_stdin());
613  return;
614  }
615  if (vals.size() == 1) {
616  // std::cout << vals[0] << std::endl;
617  _Mem::inject_marker_value_from_io_device(entity, object, Atom::Float(vals[0]), time, time + _Mem::get_sampling_period(), r_exec::View::SYNC_PERIODIC, _Mem::get_stdin());
618  return;
619  }
620  injectSet<V>(entity, object, vals, time);
621  }
622 
623  template<class O, class S>
624  void TcpIoDevice<O, S>::injectDefault(r_code::Code* entity, r_code::Code* object, std::vector<r_code::Code*> vals, core::Timestamp time) {
625  if (vals.size() == 0) {
626  _Mem::inject_marker_value_from_io_device(entity, object, std::vector<r_code::Code*>(), time, time + _Mem::get_sampling_period(), r_exec::View::SYNC_PERIODIC, _Mem::get_stdin());
627  return;
628  }
629  if (vals.size() == 1) {
630  _Mem::inject_marker_value_from_io_device(entity, object, vals[0], time, time + _Mem::get_sampling_period(), r_exec::View::SYNC_PERIODIC, _Mem::get_stdin());
631  return;
632  }
633  injectSet(entity, object, vals, time);
634  }
635 
636  template<class O, class S>
637  template<class V>
638  void TcpIoDevice<O, S>::injectSet(r_code::Code* entity, r_code::Code* object, std::vector<V> vals, core::Timestamp time) {
639  std::vector<Atom> atom_vals;
640  for (auto it = vals.begin(); it != vals.end(); ++it) {
641  // std::cout << *it << std::endl;
642  atom_vals.push_back(Atom::Float(*it));
643  }
644  _Mem::inject_marker_value_from_io_device(entity, object, atom_vals, time, time + _Mem::get_sampling_period(), r_exec::View::SYNC_PERIODIC, _Mem::get_stdin());
645  }
646 
647  template<class O, class S>
648  void TcpIoDevice<O, S>::injectSet(r_code::Code* entity, r_code::Code* object, std::vector<r_code::Code*> vals, core::Timestamp time) {
649  _Mem::inject_marker_value_from_io_device(entity, object, vals, time, time + _Mem::get_sampling_period(), r_exec::View::SYNC_PERIODIC, _Mem::get_stdin());
650  }
651 
652  template<class O, class S>
653  template<class V>
654  void TcpIoDevice<O, S>::injectOpCode(r_code::Code* entity, r_code::Code* object, std::vector<V> vals, core::Timestamp time, std::string opcode_handle) {
655  core::uint16 op_code = r_exec::GetOpcode(opcode_handle.c_str());
656  if (op_code == 0xFFFF) {
657  std::cout << "ERROR: Received message with unknown opcode handle! Handle: " << opcode_handle << std::endl;
658  return;
659  }
660  std::vector<Atom> atom_vals;
661  for (auto it = vals.begin(); it != vals.end(); ++it) {
662  // std::cout << *it << std::endl;
663  atom_vals.push_back(Atom::Float(*it));
664  }
665  _Mem::inject_marker_value_from_io_device(entity, object, op_code, atom_vals, time, time);
666  }
667 
668  template<class O, class S>
669  void TcpIoDevice<O, S>::handleSetupMessage(std::unique_ptr<TCPMessage> setup_msg)
670  {
671  auto setup_message = setup_msg->release_setupmessage();
672  // Initialize all entities and store their communication ids in the id_mapping_ map.
673  cout << "Setup message received." << endl;
674  cout << "Parsing entities with communication ids:" << endl;
675  for (auto it = setup_message->entities().begin(); it != setup_message->entities().end(); ++it) {
676  id_mapping_[it->second] = it->first;
677  entities_[it->first] = NULL;
678  cout << it->first << " : " << it->second << endl;
679  }
680  // Initialize all commands and store their communication ids in the id_mapping_ map.
681  cout << "Parsing commands with communication ids:" << endl;
682  for (auto it = setup_message->commands().begin(); it != setup_message->commands().end(); ++it) {
683  id_mapping_[it->second] = it->first;
684  commands_[it->first] = 0xFFFF;
685  cout << it->first << " : " << it->second << endl;
686  }
687  // Initialize all properties and store their communication ids in the id_mapping_ map.
688  cout << "Parsing objects with communication ids:" << endl;
689  for (auto it = setup_message->objects().begin(); it != setup_message->objects().end(); ++it) {
690  id_mapping_[it->second] = it->first;
691  objects_[it->first] = NULL;
692  cout << it->first << " : " << it->second << endl;
693  }
694  // Add the ready-command, if not received from the environment simulation.
695  if (commands_.find("ready") == commands_.end()) {
696  commands_["ready"] = 0xFFFF;
697  }
698  // Get all command descriptions (their meta-data) and store them for later access during the eject calls.
699  for (int v = 0; v < setup_message->commanddescriptions_size(); ++v) {
700  auto command_desscription = setup_message->commanddescriptions(v);
701  if (commands_.find(command_desscription.name()) == commands_.end()) {
702  cout << "> WARNING: Variable Description found with different name than any commands. This will never be used!" << endl;
703  continue;
704  }
705  meta_data_map_.insert(std::map<string, MetaData>::value_type(command_desscription.name(), MetaData(&command_desscription.description())));
706  }
707 
708  if (started_) {
709  std::unique_ptr<TCPMessage> start_msg = std::make_unique<TCPMessage>();
710  start_msg->set_messagetype(TCPMessage_Type_START);
711  sendMessage(std::move(start_msg));
712  }
713  }
714 
715  // Instatiate this template class as needed by main(). (Needs C++11.)
716  template class TcpIoDevice<r_exec::LObject, r_exec::MemStatic>;
717  template class TcpIoDevice<r_exec::LObject, r_exec::MemVolatile>;
718 
719 } // namespace tcp_io_device
720 
721 #endif // !ENABLE_PROTOBUF
tcp_io_device::TcpIoDevice::initTCP
int initTCP()
Definition: tcp_io_device.cpp:109
tcp_io_device::TcpIoDevice
Definition: tcp_io_device.h:96
r_code::Code
Definition: r_code/object.h:224
tcp_io_device::ProtoVariable
Definition: tcp_data_message.pb.h:1692
tcp_io_device::MsgData
Definition: AERA/IODevices/TCP/AERA_Protobuf/utils.h:225
tcp_io_device::DataMessage
Definition: tcp_data_message.pb.h:1275
r_exec::MemExec
Definition: mem.h:979