86 #include "tcp_io_device.h"
89 using namespace std::chrono;
90 using namespace r_code;
91 using namespace r_exec;
94 #ifndef ENABLE_PROTOBUF
95 namespace tcp_io_device {
96 template<
class O,
class S> TcpIoDevice<O, S>::TcpIoDevice() :
MemExec<O, S>()
98 cout <<
"\n> ERROR: Trying to use the TcpIoDevice without setting ENABLE_PROTOBUF flag in the beginning of main.cpp" << endl;
100 template<
class O,
class S> TcpIoDevice<O, S>::TcpIoDevice(
int number_of_servers,
int number_of_clients, std::vector<std::pair<std::string, std::string> > server_configurations, std::vector<std::string> client_configurations) :
MemExec<O, S>()
102 cout <<
"\n> ERROR: Trying to use the TcpIoDevice without setting ENABLE_PROTOBUF flag in the beginning of main.cpp" << endl;
105 template<
class O,
class S>
106 TcpIoDevice<O, S>::~TcpIoDevice() {}
108 template<
class O,
class S>
119 #include "AERA_Protobuf/utils.h"
120 #include "AERA_Protobuf/tcp_data_message.pb.h"
122 namespace tcp_io_device {
124 template<
class O,
class S> TcpIoDevice<O, S>::TcpIoDevice(
125 int number_of_servers,
126 int number_of_clients,
127 std::vector<std::pair<std::string, std::string> > server_configurations,
128 std::vector<std::string> client_configurations) :
130 number_of_servers_(number_of_servers),
131 number_of_clients_(number_of_clients),
132 server_configurations_(server_configurations),
133 client_configurations_(client_configurations)
137 lastInjectTime_ = Timestamp(seconds(0));
138 lastCommandTime_ = Timestamp(seconds(0));
140 receive_queue_ = std::make_shared<SafeQueue>(number_of_servers + number_of_clients);
141 send_queue_ = std::make_shared<SafeQueue>(100);
143 tcp_connections_.push_back(
new TCPConnection(receive_queue_, send_queue_, 8));
148 template<
class O,
class S>
149 TcpIoDevice<O, S>::~TcpIoDevice() {
151 for (
auto it = tcp_connections_.begin(); it != tcp_connections_.end();) {
153 it = tcp_connections_.erase(it);
167 if (timeTickThread_) {
168 delete timeTickThread_;
172 template<
class O,
class S>
176 for (
int i = 0; i < number_of_servers_; ++i) {
177 err = tcp_connections_[i]->establishConnection(server_configurations_[i].first, server_configurations_[i].second);
183 for (
int i = number_of_servers_; i < number_of_servers_ + number_of_clients_; ++i) {
184 err = tcp_connections_[i]->listenAndAwaitConnection(client_configurations_[i-number_of_servers_]);
190 for (
auto it = tcp_connections_.begin(); it != tcp_connections_.end(); ++it) {
195 auto msg = std::move(receive_queue_->dequeue());
199 if (msg->messagetype() != msg->SETUP) {
202 handleSetupMessage(std::move(msg));
209 template<
class O,
class S>
210 bool TcpIoDevice<O, S>::load(
const vector<r_code::Code*>* objects, uint32 stdin_oid, uint32 stdout_oid, uint32 self_oid)
218 cout <<
"> Loading entities:" << endl;
219 for (
auto it = entities_.begin(); it != entities_.end(); ++it) {
220 it->second = _Mem::find_object(objects, &((it->first)[0]));
221 cout << it->first <<
":\t" << it->second->get_oid() << endl;
225 cout <<
"> Loading objects:" << endl;
226 for (
auto it = objects_.begin(); it != objects_.end(); ++it) {
227 it->second = _Mem::find_object(objects, &((it->first)[0]));
228 cout << it->first <<
":\t" << it->second->get_oid() << endl;
232 cout <<
"> Loading commands:" << endl;
233 for (
auto it = commands_.begin(); it != commands_.end(); ++it) {
234 it->second = r_exec::GetOpcode(&((it->first)[0]));
235 cout << it->first <<
":\t" << it->second << endl;
243 template<
class O,
class S>
246 uint16
function = (command->code(CMD_FUNCTION).atom_ >> 8) & 0x000000FF;
248 uint16 args_set_index = command->code(CMD_ARGS).asIndex();
249 if (
function == commands_[
"ready"]) {
250 if (command->code_size() < 2 || command->code(args_set_index + 1).getDescriptor() != Atom::I_PTR ||
251 command->code(command->code(args_set_index + 1).asIndex()).getDescriptor() != Atom::STRING) {
252 cout <<
"> WARNING: Cannot get identifier as string" << endl;
256 string identifier = Utils::GetString(&command->code(command->code(args_set_index + 1).asIndex()));
258 cout <<
"I/O device is ready for " << identifier << endl;
259 startTimeTickThread();
264 for (
auto cmd = commands_.begin(); cmd != commands_.end(); ++cmd) {
265 if (cmd->second !=
function) {
268 Code* obj = command->get_reference(command->code(args_set_index + 1).asIndex());
269 for (
auto it = entities_.begin(); it != entities_.end(); ++it) {
270 if (it->second != obj) {
274 if (!msg.isValid()) {
275 cout <<
"Could not create message from ejected command" << endl;
278 sendDataMessage(msg);
287 template<
class O,
class S>
291 std::map<string, MetaData>::iterator meta_data_it = meta_data_map_.find(cmd_identifier);
292 if (meta_data_it == meta_data_map_.end()) {
293 cout <<
"> WARNING: Could not find cmd identifier " << cmd_identifier <<
" in the MetaData of available commands!" << endl;
294 return tcp_io_device::MsgData::invalidMsgData();
297 MetaData stored_meta_data = meta_data_it->second;
298 uint16 args_set_index = cmd->code(CMD_ARGS).asIndex();
300 if (cmd->code(args_set_index).getAtomCount() == 1) {
302 return tcp_io_device::MsgData::createNewMsgData(stored_meta_data);
307 int start = args_set_index + 2;
308 int end = args_set_index + 3;
311 if (stored_meta_data.getOpCodeHandle() !=
"" ||
312 (stored_meta_data.getDimensions() != std::vector<uint64_t>({ 1 }) && stored_meta_data.getDimensions() != std::vector<uint64_t>({ 1, 1 }))) {
313 if (cmd->code(args_set_index + 2).getDescriptor() != Atom::I_PTR) {
314 std::cout <<
"ERROR: Ejected command with OpCodeHandle \"" << stored_meta_data.getOpCodeHandle() <<
"\" with dimensionality > 1 and r_code object without the necessary nesting." << std::endl;
315 return tcp_io_device::MsgData::invalidMsgData();
317 int set_index = cmd->code(args_set_index + 2).asIndex();
319 start = set_index + 1;
320 end = start + cmd->code(set_index).getAtomCount();
322 if (cmd->code(set_index).asOpcode() == GetOpcode(
"quat")) {
323 double _1 = cmd->code(start) < 0 ? -1 : 1;
324 for (
int i = start; i < end; ++i) {
325 cmd->code(i) = Atom::Float(cmd->code(i).asFloat() * _1);
331 std::cout <<
"Eject cmd: " << cmd_identifier <<
", entity: " << entity <<
", Value(s): ";
332 for (
int i = start; i < end; ++i) {
333 std::cout << cmd->code(i).asFloat() <<
" ";
335 std::cout << std::endl;
337 switch (
auto t = stored_meta_data.getType()) {
338 case VariableDescription_DataType_BOOL:
340 std::vector<bool> data = getDataVec<bool>(cmd, start, end, t);
341 msg_data = tcp_io_device::MsgData::createNewMsgData(stored_meta_data, data);
344 case VariableDescription_DataType_INT64:
346 std::vector<int64_t> data = getDataVec<int64_t>(cmd, start, end, t);
347 msg_data = tcp_io_device::MsgData::createNewMsgData(stored_meta_data, data);
350 case VariableDescription_DataType_DOUBLE:
352 std::vector<double> data = getDataVec<double>(cmd, start, end, t);
353 msg_data = tcp_io_device::MsgData::createNewMsgData(stored_meta_data, data);
356 case VariableDescription_DataType_COMMUNICATION_ID:
358 std::vector<communication_id_t> data = getDataVec<communication_id_t>(cmd, start, end, t);
359 msg_data = tcp_io_device::MsgData::createNewMsgData(stored_meta_data, data);
362 case VariableDescription_DataType_STRING:
367 cout <<
"> WARNING: String type not implemented, yet" << endl;
368 return tcp_io_device::MsgData::invalidMsgData();
375 template<
class O,
class S>
377 std::vector<T> TcpIoDevice<O, S>::getDataVec(
r_code::Code* cmd,
int start_index,
int end_index, tcp_io_device::VariableDescription_DataType type) {
380 case VariableDescription_DataType_BOOL:
381 for (
int i = start_index; i < end_index; ++i) {
382 data.push_back(cmd->code(i).asBoolean());
385 case VariableDescription_DataType_INT64:
386 for (
int i = start_index; i < end_index; ++i) {
387 data.push_back((
int)cmd->code(i).asFloat());
390 case VariableDescription_DataType_DOUBLE:
391 for (
int i = start_index; i < end_index; ++i) {
392 data.push_back(cmd->code(i).asFloat());
396 case VariableDescription_DataType_COMMUNICATION_ID:
397 for (
int i = start_index; i < end_index; ++i) {
400 if (cmd->code(i).getDescriptor() != Atom::R_PTR) {
402 std::cout <<
"WARNING: Got command which should include a communicaiton id which points to another object without R_PTR. Ignoring it." << std::endl;;
405 r_code::Code* reference = cmd->get_reference(cmd->code(i).asIndex());
406 for (
auto it = entities_.begin(); it != entities_.end(); ++it) {
407 if (it->second != reference) {
412 data.push_back(cmd->code(i).asFloat());
415 case VariableDescription_DataType_STRING:
424 template<
class O,
class S>
425 void TcpIoDevice<O, S>::on_time_tick()
427 auto now = r_exec::Now();
428 if (now < lastInjectTime_ + _Mem::get_sampling_period() * 8 / 10) {
432 auto msg = std::move(receive_queue_->dequeue());
434 if (_Mem::reduction_core_count_ == 0 && _Mem::time_core_count_ == 0 && started_) {
436 msg = std::move(receive_queue_->dequeue());
445 std::cout <<
"Received message of type " << msg->messagetype() << std::endl;
447 handleMessage(std::move(msg));
448 lastInjectTime_ = now;
451 template<
class O,
class S>
452 void TcpIoDevice<O, S>::startTimeTickThread()
456 std::unique_ptr<TCPMessage> msg = std::make_unique<TCPMessage>();
457 msg->set_messagetype(TCPMessage_Type_START);
458 sendMessage(std::move(msg));
461 if (_Mem::reduction_core_count_ == 0 && _Mem::time_core_count_ == 0) {
465 if (timeTickThread_) {
466 cout <<
"timeTickThread already running" << endl;
473 timeTickThread_ = Thread::New<_Thread>(timeTickRun,
this);
476 template<
class O,
class S>
477 thread_ret thread_function_call TcpIoDevice<O, S>::timeTickRun(
void* args)
479 TcpIoDevice<O, S>*
self = (TcpIoDevice*)args;
481 auto sampling_period = _Mem::Get()->get_sampling_period();
482 auto tickTime = r_exec::Now();
484 while (self->state_ == _Mem::RUNNING) {
485 self->on_time_tick();
487 tickTime += sampling_period;
488 Thread::Sleep(tickTime - r_exec::Now());
490 for (
auto it = self->tcp_connections_.begin(); it != self->tcp_connections_.end(); ++it) {
498 template<
class O,
class S>
500 std::unique_ptr<tcp_io_device::TCPMessage> msg = std::make_unique<tcp_io_device::TCPMessage>();
501 msg->set_messagetype(tcp_io_device::TCPMessage::DATA);
504 msg_data.toMutableProtoVariable(var);
505 sendMessage(std::move(msg));
508 template<
class O,
class S>
509 void TcpIoDevice<O, S>::sendMessage(std::unique_ptr<TCPMessage> msg) {
511 std::cout <<
"Sending Message of type " << msg->messagetype() << std::endl;
512 send_queue_->enqueue(std::move(msg));
516 template<
class O,
class S>
517 void TcpIoDevice<O, S>::handleMessage(std::unique_ptr<TCPMessage> msg)
519 switch (msg->messagetype()) {
520 case TCPMessage_Type_SETUP:
521 handleSetupMessage(std::move(msg));
523 case TCPMessage_Type_DATA:
524 handleDataMessage(std::move(msg));
527 cout <<
"> WARNING: Received Message of different type than SETUP or DATA" << endl;
531 template<
class O,
class S>
532 void TcpIoDevice<O, S>::handleDataMessage(std::unique_ptr<TCPMessage> data_msg)
535 auto now = r_exec::Now();
536 auto data = data_msg->release_datamessage();
538 for (
int i = 0; i < data->variables_size(); ++i) {
539 MsgData var(&(data->variables(i)));
540 auto entity = entities_[id_mapping_[var.getMetaData().getEntityID()]];
541 auto obj = objects_[id_mapping_[var.getMetaData().getID()]];
547 if (var.getMetaData().getType() == VariableDescription_DataType_DOUBLE)
549 if (var.getMetaData().getOpCodeHandle() ==
"") {
550 injectDefault<double>(entity, obj, var.getData<
double>(), now);
553 else if (var.getMetaData().getOpCodeHandle() ==
"set") {
554 injectSet<double>(entity, obj, var.getData<
double>(), now);
558 injectOpCode<double>(entity, obj, var.getData<
double>(), now, var.getMetaData().getOpCodeHandle());
562 else if (var.getMetaData().getType() == VariableDescription_DataType_INT64)
564 if (var.getMetaData().getOpCodeHandle() ==
"") {
565 injectDefault<int64_t>(entity, obj, var.getData<int64_t>(), now);
568 else if (var.getMetaData().getOpCodeHandle() ==
"set") {
569 injectSet<int64_t>(entity, obj, var.getData<int64_t>(), now);
573 injectOpCode<int64_t>(entity, obj, var.getData<int64_t>(), now, var.getMetaData().getOpCodeHandle());
577 else if (var.getMetaData().getType() == VariableDescription_DataType_COMMUNICATION_ID) {
578 int64_t val = var.getData<int64_t>()[0];
579 std::vector<r_code::Code*> value;
581 if (id_mapping_.find(val) == id_mapping_.end())
583 std::cout <<
"WARNING: Received message with unknown Communication ID" << std::endl;
586 std::string object_entity = id_mapping_[val];
588 if (entities_.find(object_entity) == entities_.end())
590 std::cout <<
"WARNING: Received message with uninitalized entity, this should never happen!" << std::endl;
593 value.push_back(entities_[object_entity]);
595 _Mem::inject_marker_value_from_io_device(entity, obj, value, now, now + _Mem::get_sampling_period(), r_exec::View::SYNC_PERIODIC, _Mem::get_stdin());
598 else if (var.getMetaData().getType() == VariableDescription_DataType_STRING) {
600 std::string val = var.getData<std::string>()[0];
601 _Mem::inject_marker_value_from_io_device(entity, obj, val, now, now + _Mem::get_sampling_period(), r_exec::View::SYNC_PERIODIC, _Mem::get_stdin());
607 template<
class O,
class S>
609 void TcpIoDevice<O, S>::injectDefault(
r_code::Code* entity,
r_code::Code*
object, std::vector<V> vals, core::Timestamp time) {
610 if (vals.size() == 0) {
612 _Mem::inject_marker_value_from_io_device(entity,
object, std::vector<r_code::Code*>(), time, time + _Mem::get_sampling_period(), r_exec::View::SYNC_PERIODIC, _Mem::get_stdin());
615 if (vals.size() == 1) {
617 _Mem::inject_marker_value_from_io_device(entity,
object, Atom::Float(vals[0]), time, time + _Mem::get_sampling_period(), r_exec::View::SYNC_PERIODIC, _Mem::get_stdin());
620 injectSet<V>(entity,
object, vals, time);
623 template<
class O,
class S>
624 void TcpIoDevice<O, S>::injectDefault(
r_code::Code* entity,
r_code::Code*
object, std::vector<r_code::Code*> vals, core::Timestamp time) {
625 if (vals.size() == 0) {
626 _Mem::inject_marker_value_from_io_device(entity,
object, std::vector<r_code::Code*>(), time, time + _Mem::get_sampling_period(), r_exec::View::SYNC_PERIODIC, _Mem::get_stdin());
629 if (vals.size() == 1) {
630 _Mem::inject_marker_value_from_io_device(entity,
object, vals[0], time, time + _Mem::get_sampling_period(), r_exec::View::SYNC_PERIODIC, _Mem::get_stdin());
633 injectSet(entity,
object, vals, time);
636 template<
class O,
class S>
638 void TcpIoDevice<O, S>::injectSet(
r_code::Code* entity,
r_code::Code*
object, std::vector<V> vals, core::Timestamp time) {
639 std::vector<Atom> atom_vals;
640 for (
auto it = vals.begin(); it != vals.end(); ++it) {
642 atom_vals.push_back(Atom::Float(*it));
644 _Mem::inject_marker_value_from_io_device(entity,
object, atom_vals, time, time + _Mem::get_sampling_period(), r_exec::View::SYNC_PERIODIC, _Mem::get_stdin());
647 template<
class O,
class S>
648 void TcpIoDevice<O, S>::injectSet(
r_code::Code* entity,
r_code::Code*
object, std::vector<r_code::Code*> vals, core::Timestamp time) {
649 _Mem::inject_marker_value_from_io_device(entity,
object, vals, time, time + _Mem::get_sampling_period(), r_exec::View::SYNC_PERIODIC, _Mem::get_stdin());
652 template<
class O,
class S>
654 void TcpIoDevice<O, S>::injectOpCode(
r_code::Code* entity,
r_code::Code*
object, std::vector<V> vals, core::Timestamp time, std::string opcode_handle) {
655 core::uint16 op_code = r_exec::GetOpcode(opcode_handle.c_str());
656 if (op_code == 0xFFFF) {
657 std::cout <<
"ERROR: Received message with unknown opcode handle! Handle: " << opcode_handle << std::endl;
660 std::vector<Atom> atom_vals;
661 for (
auto it = vals.begin(); it != vals.end(); ++it) {
663 atom_vals.push_back(Atom::Float(*it));
665 _Mem::inject_marker_value_from_io_device(entity,
object, op_code, atom_vals, time, time);
668 template<
class O,
class S>
669 void TcpIoDevice<O, S>::handleSetupMessage(std::unique_ptr<TCPMessage> setup_msg)
671 auto setup_message = setup_msg->release_setupmessage();
673 cout <<
"Setup message received." << endl;
674 cout <<
"Parsing entities with communication ids:" << endl;
675 for (
auto it = setup_message->entities().begin(); it != setup_message->entities().end(); ++it) {
676 id_mapping_[it->second] = it->first;
677 entities_[it->first] = NULL;
678 cout << it->first <<
" : " << it->second << endl;
681 cout <<
"Parsing commands with communication ids:" << endl;
682 for (
auto it = setup_message->commands().begin(); it != setup_message->commands().end(); ++it) {
683 id_mapping_[it->second] = it->first;
684 commands_[it->first] = 0xFFFF;
685 cout << it->first <<
" : " << it->second << endl;
688 cout <<
"Parsing objects with communication ids:" << endl;
689 for (
auto it = setup_message->objects().begin(); it != setup_message->objects().end(); ++it) {
690 id_mapping_[it->second] = it->first;
691 objects_[it->first] = NULL;
692 cout << it->first <<
" : " << it->second << endl;
695 if (commands_.find(
"ready") == commands_.end()) {
696 commands_[
"ready"] = 0xFFFF;
699 for (
int v = 0; v < setup_message->commanddescriptions_size(); ++v) {
700 auto command_desscription = setup_message->commanddescriptions(v);
701 if (commands_.find(command_desscription.name()) == commands_.end()) {
702 cout <<
"> WARNING: Variable Description found with different name than any commands. This will never be used!" << endl;
705 meta_data_map_.insert(std::map<string, MetaData>::value_type(command_desscription.name(), MetaData(&command_desscription.description())));
709 std::unique_ptr<TCPMessage> start_msg = std::make_unique<TCPMessage>();
710 start_msg->set_messagetype(TCPMessage_Type_START);
711 sendMessage(std::move(start_msg));
716 template class TcpIoDevice<r_exec::LObject, r_exec::MemStatic>;
717 template class TcpIoDevice<r_exec::LObject, r_exec::MemVolatile>;
721 #endif // !ENABLE_PROTOBUF