AERA
tcp_connection.h
1 //_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/
2 //_/_/
3 //_/_/ AERA
4 //_/_/ Autocatalytic Endogenous Reflective Architecture
5 //_/_/
6 //_/_/ Copyright (c) 2018-2025 Jeff Thompson
7 //_/_/ Copyright (c) 2018-2025 Kristinn R. Thorisson
8 //_/_/ Copyright (c) 2018-2025 Icelandic Institute for Intelligent Machines
9 //_/_/ Copyright (c) 2021-2025 Leonard Eberding
10 //_/_/ http://www.iiim.is
11 //_/_/
12 //_/_/ --- Open-Source BSD License, with CADIA Clause v 1.0 ---
13 //_/_/
14 //_/_/ Redistribution and use in source and binary forms, with or without
15 //_/_/ modification, is permitted provided that the following conditions
16 //_/_/ are met:
17 //_/_/ - Redistributions of source code must retain the above copyright
18 //_/_/ and collaboration notice, this list of conditions and the
19 //_/_/ following disclaimer.
20 //_/_/ - Redistributions in binary form must reproduce the above copyright
21 //_/_/ notice, this list of conditions and the following disclaimer
22 //_/_/ in the documentation and/or other materials provided with
23 //_/_/ the distribution.
24 //_/_/
25 //_/_/ - Neither the name of its copyright holders nor the names of its
26 //_/_/ contributors may be used to endorse or promote products
27 //_/_/ derived from this software without specific prior
28 //_/_/ written permission.
29 //_/_/
30 //_/_/ - CADIA Clause: The license granted in and to the software
31 //_/_/ under this agreement is a limited-use license.
32 //_/_/ The software may not be used in furtherance of:
33 //_/_/ (i) intentionally causing bodily injury or severe emotional
34 //_/_/ distress to any person;
35 //_/_/ (ii) invading the personal privacy or violating the human
36 //_/_/ rights of any person; or
37 //_/_/ (iii) committing or preparing for any act of war.
38 //_/_/
39 //_/_/ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
40 //_/_/ CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
41 //_/_/ INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
42 //_/_/ MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
43 //_/_/ DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
44 //_/_/ CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
45 //_/_/ SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
46 //_/_/ BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
47 //_/_/ SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
48 //_/_/ INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
49 //_/_/ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
50 //_/_/ NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
51 //_/_/ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
52 //_/_/ OF SUCH DAMAGE.
53 //_/_/
54 //_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/
55 
56 #pragma once
57 
58 #pragma comment (lib, "Ws2_32.lib")
59 
60 #include <queue>
61 #include <mutex>
62 #include <condition_variable>
63 #if defined(_WIN32)
64 #include <winsock2.h>
65 #include <windows.h>
66 #include <ws2tcpip.h>
67 #endif
68 #include <string>
69 #include <stdlib.h>
70 #include <stdio.h>
71 #include <thread>
72 #include <bitset>
73 
74 #include "tcp_data_message.pb.h"
75 
76 namespace tcp_io_device {
77 
82  class SafeQueue
83  {
84  public:
89  : queue_()
90  , mutex_()
91  {
92  max_elements_ = 1;
93  }
94 
99  SafeQueue(int max_elements)
100  : queue_()
101  , mutex_()
102  {
103  max_elements_ = max_elements;
104  }
105 
106  ~SafeQueue()
107  {}
108 
113  void enqueue(std::unique_ptr<TCPMessage> t)
114  {
115  std::lock_guard<std::recursive_mutex> lock(mutex_);
116  while (queue_.size() >= max_elements_) {
117  if (queue_.front()->messagetype() == TCPMessage_Type_DATA) {
118  queue_.front()->release_datamessage();
119  }
120  queue_.pop();
121  }
122  queue_.push(std::move(t));
123  }
124 
125 
130  std::unique_ptr<TCPMessage> dequeue()
131  {
132  std::lock_guard<std::recursive_mutex> lock(mutex_);
133  if (queue_.empty()) {
134  return NULL;
135  }
136  std::unique_ptr<TCPMessage> val = std::move(queue_.front());
137  queue_.pop();
138  return val;
139  }
140 
144  void clear() {
145  std::lock_guard<std::recursive_mutex> lock(mutex_);
146  while (queue_.size() > 0) {
147  if (queue_.front()->messagetype() == TCPMessage_Type_DATA) {
148  queue_.front()->release_datamessage();
149  }
150  queue_.pop();
151  }
152  }
153 
154 
155 
156  private:
157  std::queue<std::unique_ptr<TCPMessage>> queue_;
158  mutable std::recursive_mutex mutex_;
159  int max_elements_;
160  };
161 
162 
168 
169  public:
170 
171  static std::map<int, std::string> type_to_name_map_;
172 
179  TCPConnection(std::shared_ptr<SafeQueue> receive_queue, std::shared_ptr<SafeQueue> send_queue, uint64_t msg_length_buf_size);
180  ~TCPConnection();
181 
187  int listenAndAwaitConnection(std::string port);
188 
195  int establishConnection(std::string host, std::string port);
196 
200  void start();
201 
205  void stop();
206 
211  bool isRunning() { return state_ == RUNNING; }
212 
218 #if defined(_WIN32)
219  static int receiveIsReady(SOCKET fd);
220 #else
221  static int receiveIsReady(int fd);
222 #endif
223 
224  protected:
225 
226  typedef enum {
227  NOT_STARTED = 0,
228  RUNNING = 1,
229  STOPPED = 2,
230  }State;
231 
232  typedef enum {
233  CLIENT = 0,
234  SERVER = 1,
235  }SocketType;
236 
237  State state_;
238  SocketType socket_type_;
239 
240  std::shared_ptr<std::thread> tcp_background_thread_;
241 
242 #if defined(_WIN32)
243  SOCKET tcp_socket_;
244  SOCKET server_listen_socket_;
245 #else
246  int tcp_socket_;
247  int server_listen_socket_;
248 #endif
249 
250  uint64_t msg_buf_size_;
251  uint64_t msg_length_buf_size_;
252 
253  std::shared_ptr<SafeQueue> incoming_queue_;
254  std::shared_ptr<SafeQueue> outgoing_queue_;
255 
256  std::string host_;
257  std::string port_;
258 
265 
270  std::unique_ptr<TCPMessage> receiveMessage();
271 
277  int sendMessage(std::unique_ptr<TCPMessage> msg);
278  };
279 
280 } // namespace tcp_io_device
tcp_io_device::SafeQueue
Definition: tcp_connection.h:83
tcp_io_device::TCPConnection::isRunning
bool isRunning()
Definition: tcp_connection.h:211
tcp_io_device::TCPConnection::receiveIsReady
static int receiveIsReady(int fd)
tcp_io_device::TCPConnection::sendMessage
int sendMessage(std::unique_ptr< TCPMessage > msg)
tcp_io_device::SafeQueue::dequeue
std::unique_ptr< TCPMessage > dequeue()
Definition: tcp_connection.h:130
tcp_io_device::SafeQueue::enqueue
void enqueue(std::unique_ptr< TCPMessage > t)
Definition: tcp_connection.h:113
tcp_io_device::TCPConnection::TCPConnection
TCPConnection(std::shared_ptr< SafeQueue > receive_queue, std::shared_ptr< SafeQueue > send_queue, uint64_t msg_length_buf_size)
tcp_io_device::SafeQueue::SafeQueue
SafeQueue()
Definition: tcp_connection.h:88
tcp_io_device::TCPConnection::stop
void stop()
tcp_io_device::SafeQueue::clear
void clear()
Definition: tcp_connection.h:144
tcp_io_device::SafeQueue::SafeQueue
SafeQueue(int max_elements)
Definition: tcp_connection.h:99
tcp_io_device::TCPConnection::listenAndAwaitConnection
int listenAndAwaitConnection(std::string port)
tcp_io_device::TCPConnection::establishConnection
int establishConnection(std::string host, std::string port)
tcp_io_device::TCPConnection::start
void start()
tcp_io_device::TCPConnection::tcpBackgroundHandler
void tcpBackgroundHandler()
tcp_io_device::TCPConnection::receiveMessage
std::unique_ptr< TCPMessage > receiveMessage()
tcp_io_device::TCPConnection
Definition: tcp_connection.h:167