AERA
tcp_connection.cpp
1 //_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/
2 //_/_/
3 //_/_/ AERA
4 //_/_/ Autocatalytic Endogenous Reflective Architecture
5 //_/_/
6 //_/_/ Copyright (c) 2018-2025 Jeff Thompson
7 //_/_/ Copyright (c) 2018-2025 Kristinn R. Thorisson
8 //_/_/ Copyright (c) 2018-2025 Icelandic Institute for Intelligent Machines
9 //_/_/ Copyright (c) 2021-2025 Leonard Eberding
10 //_/_/ http://www.iiim.is
11 //_/_/
12 //_/_/ --- Open-Source BSD License, with CADIA Clause v 1.0 ---
13 //_/_/
14 //_/_/ Redistribution and use in source and binary forms, with or without
15 //_/_/ modification, is permitted provided that the following conditions
16 //_/_/ are met:
17 //_/_/ - Redistributions of source code must retain the above copyright
18 //_/_/ and collaboration notice, this list of conditions and the
19 //_/_/ following disclaimer.
20 //_/_/ - Redistributions in binary form must reproduce the above copyright
21 //_/_/ notice, this list of conditions and the following disclaimer
22 //_/_/ in the documentation and/or other materials provided with
23 //_/_/ the distribution.
24 //_/_/
25 //_/_/ - Neither the name of its copyright holders nor the names of its
26 //_/_/ contributors may be used to endorse or promote products
27 //_/_/ derived from this software without specific prior
28 //_/_/ written permission.
29 //_/_/
30 //_/_/ - CADIA Clause: The license granted in and to the software
31 //_/_/ under this agreement is a limited-use license.
32 //_/_/ The software may not be used in furtherance of:
33 //_/_/ (i) intentionally causing bodily injury or severe emotional
34 //_/_/ distress to any person;
35 //_/_/ (ii) invading the personal privacy or violating the human
36 //_/_/ rights of any person; or
37 //_/_/ (iii) committing or preparing for any act of war.
38 //_/_/
39 //_/_/ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
40 //_/_/ CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
41 //_/_/ INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
42 //_/_/ MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
43 //_/_/ DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
44 //_/_/ CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
45 //_/_/ SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
46 //_/_/ BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
47 //_/_/ SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
48 //_/_/ INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
49 //_/_/ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
50 //_/_/ NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
51 //_/_/ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
52 //_/_/ OF SUCH DAMAGE.
53 //_/_/
54 //_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/
55 
56 #ifdef ENABLE_PROTOBUF
57 
58 #if !defined(_WIN32)
59 #include <errno.h>
60 #include <unistd.h>
61 #include <netdb.h>
62 #include <sys/types.h>
63 #include <netinet/in.h>
64 #include <sys/socket.h>
65 #include <sys/un.h>
66 #include <arpa/inet.h>
67 #include <poll.h>
68 #endif
69 
70 #include "tcp_connection.h"
71 
78 static bool
79 #if defined(_WIN32)
80 isValidSocket(const SOCKET sock) { return sock != INVALID_SOCKET; }
81 #else
82 isValidSocket(const int sock) { return sock >= 0; }
83 #endif
84 
90 static void
91 #if defined(_WIN32)
92 setSocketInvalid(SOCKET &sock) { sock = INVALID_SOCKET; }
93 #else
94 setSocketInvalid(int &sock) { sock = -1; }
95 #endif
96 
101 static int getLastError()
102 {
103 #if defined(_WIN32)
104  return WSAGetLastError();
105 #else
106  return errno;
107 #endif
108 }
109 
110 namespace tcp_io_device {
111 
112  TCPConnection::TCPConnection(std::shared_ptr<SafeQueue> receive_queue, std::shared_ptr<SafeQueue> send_queue, uint64_t msg_length_buf_size)
113  {
114  outgoing_queue_ = send_queue;
115  incoming_queue_ = receive_queue;
116  msg_length_buf_size_ = msg_length_buf_size;
117  state_ = NOT_STARTED;
118  setSocketInvalid(tcp_socket_);
119  setSocketInvalid(server_listen_socket_);
120  }
121 
122  TCPConnection::~TCPConnection()
123  {
124  std::cout << "> INFO: Shutting down TCP connection" << std::endl;
125  // Set state to STOPPED triggers end of while loop in the backgroundHandler.
126  // Wait for the background thread to join and close the socket, if necessary
127  state_ = STOPPED;
128  tcp_background_thread_->join();
129  if (isValidSocket(tcp_socket_)) {
130 #if defined(_WIN32)
131  int err = shutdown(tcp_socket_, SD_BOTH);
132 #else
133  int err = shutdown(tcp_socket_, SHUT_WR);
134 #endif
135  if (err != 0) {
136  std::cout << "ERROR: Shutdown of Client Socket failed with error: " << getLastError() << std::endl;
137  }
138 #if defined(_WIN32)
139  closesocket(tcp_socket_);
140  WSACleanup();
141 #else
142  close(tcp_socket_);
143 #endif
144  }
145  }
146 
147  int TCPConnection::listenAndAwaitConnection(std::string port)
148  {
149  port_ = port;
150 
151 #if defined(_WIN32)
152  WSADATA wsa_data;
153  int err;
154 
155  err = WSAStartup(MAKEWORD(2, 2), &wsa_data);
156  if (err != 0) {
157  std::cout << "ERROR: WSAStartup failed with error: " << err << std::endl;
158  return 1;
159  }
160  struct addrinfo* result = NULL;
161  struct addrinfo hints;
162 
163  ZeroMemory(&hints, sizeof(hints));
164  hints.ai_family = AF_INET;
165  hints.ai_socktype = SOCK_STREAM;
166  hints.ai_protocol = IPPROTO_TCP;
167  hints.ai_flags = AI_PASSIVE;
168 
169  std::cout << "> INFO: Resolving server address and port" << std::endl;
170  // Resolve the server address and port
171  err = getaddrinfo(NULL, port.c_str(), &hints, &result);
172  if (err != 0) {
173  std::cout << "ERROR: getaddrinfo failed with error: " << err << std::endl;
174  WSACleanup();
175  return 1;
176  }
177 
178  std::cout << "> INFO: Creating socket for connection to client" << std::endl;
179 
180  // Create a SOCKET for connecting to client
181  server_listen_socket_ = ::socket(result->ai_family, result->ai_socktype, result->ai_protocol);
182  if (!isValidSocket(server_listen_socket_)) {
183  std::cout << "ERROR: Socker failed with error: " << getLastError() << std::endl;
184  freeaddrinfo(result);
185  WSACleanup();
186  return 1;
187  }
188 
189  std::cout << "> INFO: Setting up TCP listening socket" << std::endl;
190  // Setup the TCP listening socket
191  err = ::bind(server_listen_socket_, result->ai_addr, (int)result->ai_addrlen);
192  if (err == SOCKET_ERROR) {
193  std::cout << "ERROR: Bind failed with error: " << getLastError() << std::endl;
194  freeaddrinfo(result);
195  closesocket(server_listen_socket_);
196  WSACleanup();
197  return 1;
198  }
199 
200  freeaddrinfo(result);
201 
202  // Wait for a client to conenct to the socket.
203  err = listen(server_listen_socket_, SOMAXCONN);
204  if (err == SOCKET_ERROR) {
205  std::cout << "ERROR: Listen failed with error: " << getLastError() << std::endl;
206  closesocket(server_listen_socket_);
207  WSACleanup();
208  return 1;
209  }
210 
211 
212  std::cout << "> INFO: Waiting to accept client socket on port " << port << std::endl;
213  // Accept a client socket
214  tcp_socket_ = ::accept(server_listen_socket_, NULL, NULL);
215  if (!isValidSocket(tcp_socket_)) {
216  std::cout << "ERROR: Accepting client failed with error: " << getLastError() << std::endl;
217  closesocket(server_listen_socket_);
218  WSACleanup();
219  return 1;
220  }
221 
222  std::cout << "> INFO: TCP connection successfully established" << std::endl;
223 
224  socket_type_ = SERVER;
225 #else
226  std::cout << "TODO: Implement TCPConnection::listenAndAwaitConnection for non-Windows" << std::endl;
227  return 1;
228 #endif
229 
230  return 0;
231  }
232 
233  int TCPConnection::establishConnection(std::string host, std::string port) {
234 
235  host_ = host;
236  port_ = port;
237 
238  struct addrinfo* result = NULL, hints;
239 
240  int err;
241 
242 #if defined(_WIN32)
243  // Initialize Winsock
244  std::cout << "> INFO: Initializing Winsock" << std::endl;
245  WSADATA wsaData;
246  err = WSAStartup(MAKEWORD(2, 2), &wsaData);
247  if (err != 0) {
248  std::cout << "ERROR: WSAStartup failed with error: " << err << std::endl;
249  return 1;
250  }
251 #endif
252 
253  memset(&hints, 0, sizeof(hints));
254  hints.ai_family = AF_INET;
255  hints.ai_socktype = SOCK_STREAM;
256  hints.ai_protocol = IPPROTO_TCP;
257  hints.ai_flags = AI_PASSIVE;
258 
259  std::cout << "> INFO: Resolving server address and port" << std::endl;
260  // Resolve the server address and port
261  err = getaddrinfo(host.c_str(), port.c_str(), &hints, &result);
262  if (err != 0) {
263  std::cout << "ERROR: getaddrinfo failed with error: " << err << std::endl;
264 #if defined(_WIN32)
265  WSACleanup();
266 #endif
267  return 1;
268  }
269 
270  std::cout << "> INFO: Creating socket for connection to server" << std::endl;
271  // Create a SOCKET for connecting to server
272  tcp_socket_ = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
273  if (!isValidSocket(tcp_socket_)) {
274  std::cout << "ERROR: Socker failed with error: " << getLastError() << std::endl;
275  freeaddrinfo(result);
276 #if defined(_WIN32)
277  WSACleanup();
278 #endif
279  return 1;
280  }
281 
282  std::cout << "> INFO: Connecting to TCP server" << std::endl;
283  // Connect to server.
284  while (true) {
285  std::cout << "Trying to connect to " << host << ":" << port << std::endl;
286  err = connect(tcp_socket_, result->ai_addr, (int)result->ai_addrlen);
287  if (err == 0) {
288  break;
289  }
290  std::cout << "Failed to connect to server with error: " << getLastError() << std::endl;
291  std::cout << "Trying to reconnect in 1 sec..." << std::endl;
292  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
293  }
294 
295  freeaddrinfo(result);
296 
297  if (!isValidSocket(tcp_socket_)) {
298  printf("Unable to connect to server!\n");
299 #if defined(_WIN32)
300  WSACleanup();
301 #else
302  close(tcp_socket_);
303 #endif
304  return 1;
305  }
306 
307  std::cout << "> INFO: TCP connection successfully established" << std::endl;
308 
309  socket_type_ = CLIENT;
310 
311  return 0;
312  }
313 
314  void TCPConnection::start() {
315  // Start the background thread to handle incoming and outgoing messages.
316  state_ = RUNNING;
317  tcp_background_thread_ = std::make_shared<std::thread>(&TCPConnection::tcpBackgroundHandler, this);
318  }
319 
320  void TCPConnection::stop()
321  {
322  state_ = STOPPED;
323  }
324 
326  {
327 
328  // Wait for the connection to become alive
329  std::this_thread::sleep_for(std::chrono::milliseconds(100));
330 
331  int error_code = 0;
332  while (state_ == RUNNING) {
333  if (!isValidSocket(tcp_socket_)) {
334  std::cout << "WARNING: Lost TCP connection. Trying to reconnect." << std::endl;
335  error_code = 0;
336  switch (socket_type_)
337  {
338  case SERVER:
339  if (!isValidSocket(server_listen_socket_)) {
340  error_code = listenAndAwaitConnection(port_);
341  break;
342  }
343  std::cout << "INFO: Accepting new client on socket, waiting for connection." << std::endl;
344  tcp_socket_ = ::accept(server_listen_socket_, NULL, NULL);
345  break;
346  case CLIENT:
347  error_code = establishConnection(host_, port_);
348  break;
349  default:
350  break;
351  }
352  if (error_code != 0 || !isValidSocket(tcp_socket_))
353  {
354  std::cout << "Unable to reconnect... Error: " << getLastError() << " Retrying in 1 sec..." << std::endl;
355  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
356  continue;
357  }
358  std::cout << "INFO: Reconnect successfull." << std::endl;
359  std::unique_ptr<TCPMessage> reconnect_msg = std::make_unique<TCPMessage>();
360  reconnect_msg->set_messagetype(TCPMessage::RECONNECT);
361  incoming_queue_->enqueue(std::move(reconnect_msg));
362  }
363  // First send all data from the queue
364  std::unique_ptr<TCPMessage> msg = outgoing_queue_->dequeue();
365  while (msg) {
366  std::cout << "Sending message of type " << msg->messagetype() << std::endl;
367  error_code = sendMessage(std::move(msg));
368  if (error_code <= 0) {
369  // Error occured while sending message, break the loop and end the thread.
370  break;
371  }
372  msg = std::move(outgoing_queue_->dequeue());
373  }
374 
375  // Yield to other threads while waiting for input.
376  std::this_thread::sleep_for(std::chrono::milliseconds(1));
377  bool got_error = false;
378  while (true) {
379  // Check if new data is on the TCP connection to receive
380  int ready = receiveIsReady(tcp_socket_);
381  if (ready == 0) {
382  // No messages on the socket, continue with the handler main loop.
383  break;
384  }
385  else if (ready < 0) {
386  // Something went wrong when receiving the message, break the handler, end the thread.
387  std::cout << "select() == SOCKET_ERROR error: " << getLastError() << std::endl;
388  // got_error = true;
389  //closesocket(tcp_socket_);
390  setSocketInvalid(tcp_socket_);
391  break;
392  }
393  auto in_msg = receiveMessage();
394  if (!in_msg) {
395  // Something went wrong when receiving the message, break the handler, end the thread.
396  // got_error = true;
397  //closesocket(tcp_socket_);
398  setSocketInvalid(tcp_socket_);
399  break;
400  }
401 
402  // Add it to the queue, let the main thread handle them
403  incoming_queue_->enqueue(std::move(in_msg));
404  }
405  if (got_error)
406  break;
407  }
408  // Clear all entries of the queues before shutting down.
409  incoming_queue_->clear();
410  outgoing_queue_->clear();
411 
412  // Close the socket
413  if (isValidSocket(tcp_socket_)) {
414 #if defined(_WIN32)
415  int err = shutdown(tcp_socket_, SD_SEND);
416 #else
417  int err = shutdown(tcp_socket_, SHUT_WR);
418 #endif
419  if (err != 0) {
420  std::cout << "ERROR: Shutdown of Client Socket failed with error: " << getLastError() << std::endl;
421  }
422 #if defined(_WIN32)
423  closesocket(tcp_socket_);
424  WSACleanup();
425 #else
426  close(tcp_socket_);
427 #endif
428  }
429  setSocketInvalid(tcp_socket_);
430  }
431 
432  std::unique_ptr<TCPMessage> TCPConnection::receiveMessage()
433  {
434  // Number of bytes received
435  int received_bytes = 0;
436 
437  // Length of read bytes in total (used to ensure split messages are read correctly)
438  uint64_t len_res = 0;
439 
440  // First read the length of the message to expect (8 byte uint64_t)
441  std::string tcp_msg_len_recv_buf;
442  tcp_msg_len_recv_buf.reserve(msg_length_buf_size_);
443  // To ensure split message is read correctly
444  while (len_res < msg_length_buf_size_) {
445  received_bytes = recv(tcp_socket_, &(tcp_msg_len_recv_buf[len_res]), msg_length_buf_size_ - len_res, 0);
446  if (received_bytes > 0) {
447  // All good
448  len_res += received_bytes;
449  }
450  else if (received_bytes == 0) {
451  // Client closed the connection
452  len_res = -1;
453  std::cout << "Connection closing..." << std::endl;
454  return NULL;
455  }
456  else {
457  // Error occured during receiving
458  std::cout << "recv failed during recv of data length with error: " << getLastError() << std::endl;
459 #if defined(_WIN32)
460  closesocket(tcp_socket_);
461  WSACleanup();
462 #else
463  close(tcp_socket_);
464 #endif
465  return NULL;
466  }
467  }
468 
469  // Convert the read bytes to uint64_t. Little Endian! Otherwise invert for loop - not implemented, yet.
470  uint64_t msg_len = 0;
471  for (int i = msg_length_buf_size_ - 1; i >= 0; --i)
472  {
473  msg_len <<= 8;
474  msg_len |= (unsigned char)tcp_msg_len_recv_buf[i];
475  }
476 
477  // Reset read bytes and total read bytes to 0
478  received_bytes = 0;
479  len_res = 0;
480  // Read as many packages as needed to fill the message buffer. Ensures split messages are received correctly.
481  char* buf = new char[msg_len]();
482  while (len_res < msg_len) {
483  received_bytes = recv(tcp_socket_, &buf[len_res], msg_len - len_res, 0);
484  if (received_bytes > 0) {
485  len_res += received_bytes;
486  }
487  else if (received_bytes == 0) {
488  return NULL;
489  }
490  else {
491  std::cout << "recv failed during recv of data message with error: " << getLastError() << std::endl;
492  return NULL;
493  }
494  }
495 
496  // Parse the byte-stream into a TCPMessage
497  std::unique_ptr<TCPMessage> msg = std::make_unique<TCPMessage>();
498  if (!msg->ParseFromArray(buf, msg_len)) {
499  std::cout << "ERROR: Parsing Message from String failed" << std::endl;
500  return NULL;
501  }
502 
503  delete[] buf;
504 
505  return msg;
506  }
507 
508  int TCPConnection::sendMessage(std::unique_ptr<TCPMessage> msg)
509  {
510  // Serialize the TCPMessage
511  std::string out;
512  out = msg->SerializeAsString();
513 
514  // First put the length of the message in the first 8 bytes of the output stream
515  std::string out_buffer = "";
516  for (int i = 0; i < 8; ++i) {
517  out_buffer += (unsigned char)((int)(((uint64_t)out.size() >> (i * 8)) & 0xFF));
518  }
519 
520  // Attach the serialized message to the byte-stream
521  out_buffer += out;
522 
523  // Send message length + message through the socket.
524  int i_send_result = ::send(tcp_socket_, &out_buffer[0], out_buffer.size(), 0);
525  if (i_send_result < 0) {
526  std::cout << "SendMessage failed with error: " << getLastError() << std::endl;
527 #if defined(_WIN32)
528  closesocket(tcp_socket_);
529  WSACleanup();
530 #else
531  close(tcp_socket_);
532 #endif
533  }
534 
535  return i_send_result;
536  }
537 
538 #if defined(_WIN32)
539  int TCPConnection::receiveIsReady(SOCKET fd)
540 #else
542 #endif
543  {
544  if (!isValidSocket(fd))
545  // The socket is not open. Just silently return.
546  return 0;
547 
548 #if defined(_WIN32)
549  timeval tv{ 0, 0 };
550  FD_SET tcp_client_fd_set;
551  FD_ZERO(&tcp_client_fd_set);
552  FD_SET(fd, &tcp_client_fd_set);
553  int rc = ::select(fd + 1, &tcp_client_fd_set, NULL, NULL, &tv);
554 #else
555  struct pollfd pollInfo[1];
556  pollInfo[0].fd = fd;
557  pollInfo[0].events = POLLIN;
558  int rc = ::poll(pollInfo, 1, 0);
559 #endif
560  if (rc < 0) {
561  return -1;
562  }
563  if (rc == 0) {
564  // No messages on the socket.
565  return 0;
566  }
567 #if !defined(_WIN32)
568  if (!(pollInfo[0].revents & POLLIN)) {
569  // No POLLIN flag.
570  return 0;
571  }
572 #endif
573 
574  return 1;
575  }
576 
577  std::map<int, std::string> TCPConnection::type_to_name_map_ =
578  { {TCPMessage_Type_DATA, "DATA"},
579  {TCPMessage_Type_SETUP, "SETUP"},
580  {TCPMessage_Type_START, "START"},
581  {TCPMessage_Type_STOP, "STOP"} };
582 
583 } // namespace tcp_io_device
584 
585 #endif
tcp_io_device::TCPConnection::receiveIsReady
static int receiveIsReady(int fd)
tcp_io_device::TCPConnection::sendMessage
int sendMessage(std::unique_ptr< TCPMessage > msg)
tcp_io_device::TCPConnection::TCPConnection
TCPConnection(std::shared_ptr< SafeQueue > receive_queue, std::shared_ptr< SafeQueue > send_queue, uint64_t msg_length_buf_size)
tcp_io_device::TCPConnection::stop
void stop()
tcp_io_device::TCPConnection::listenAndAwaitConnection
int listenAndAwaitConnection(std::string port)
tcp_io_device::TCPConnection::establishConnection
int establishConnection(std::string host, std::string port)
tcp_io_device::TCPConnection::start
void start()
tcp_io_device::TCPConnection::tcpBackgroundHandler
void tcpBackgroundHandler()
tcp_io_device::TCPConnection::receiveMessage
std::unique_ptr< TCPMessage > receiveMessage()