56 #ifdef ENABLE_PROTOBUF
62 #include <sys/types.h>
63 #include <netinet/in.h>
64 #include <sys/socket.h>
66 #include <arpa/inet.h>
70 #include "tcp_connection.h"
80 isValidSocket(
const SOCKET sock) {
return sock != INVALID_SOCKET; }
82 isValidSocket(
const int sock) {
return sock >= 0; }
92 setSocketInvalid(SOCKET &sock) { sock = INVALID_SOCKET; }
94 setSocketInvalid(
int &sock) { sock = -1; }
101 static int getLastError()
104 return WSAGetLastError();
110 namespace tcp_io_device {
112 TCPConnection::TCPConnection(std::shared_ptr<SafeQueue> receive_queue, std::shared_ptr<SafeQueue> send_queue, uint64_t msg_length_buf_size)
114 outgoing_queue_ = send_queue;
115 incoming_queue_ = receive_queue;
116 msg_length_buf_size_ = msg_length_buf_size;
117 state_ = NOT_STARTED;
118 setSocketInvalid(tcp_socket_);
119 setSocketInvalid(server_listen_socket_);
122 TCPConnection::~TCPConnection()
124 std::cout <<
"> INFO: Shutting down TCP connection" << std::endl;
128 tcp_background_thread_->join();
129 if (isValidSocket(tcp_socket_)) {
131 int err = shutdown(tcp_socket_, SD_BOTH);
133 int err = shutdown(tcp_socket_, SHUT_WR);
136 std::cout <<
"ERROR: Shutdown of Client Socket failed with error: " << getLastError() << std::endl;
139 closesocket(tcp_socket_);
155 err = WSAStartup(MAKEWORD(2, 2), &wsa_data);
157 std::cout <<
"ERROR: WSAStartup failed with error: " << err << std::endl;
160 struct addrinfo* result = NULL;
161 struct addrinfo hints;
163 ZeroMemory(&hints,
sizeof(hints));
164 hints.ai_family = AF_INET;
165 hints.ai_socktype = SOCK_STREAM;
166 hints.ai_protocol = IPPROTO_TCP;
167 hints.ai_flags = AI_PASSIVE;
169 std::cout <<
"> INFO: Resolving server address and port" << std::endl;
171 err = getaddrinfo(NULL, port.c_str(), &hints, &result);
173 std::cout <<
"ERROR: getaddrinfo failed with error: " << err << std::endl;
178 std::cout <<
"> INFO: Creating socket for connection to client" << std::endl;
181 server_listen_socket_ = ::socket(result->ai_family, result->ai_socktype, result->ai_protocol);
182 if (!isValidSocket(server_listen_socket_)) {
183 std::cout <<
"ERROR: Socker failed with error: " << getLastError() << std::endl;
184 freeaddrinfo(result);
189 std::cout <<
"> INFO: Setting up TCP listening socket" << std::endl;
191 err = ::bind(server_listen_socket_, result->ai_addr, (
int)result->ai_addrlen);
192 if (err == SOCKET_ERROR) {
193 std::cout <<
"ERROR: Bind failed with error: " << getLastError() << std::endl;
194 freeaddrinfo(result);
195 closesocket(server_listen_socket_);
200 freeaddrinfo(result);
203 err = listen(server_listen_socket_, SOMAXCONN);
204 if (err == SOCKET_ERROR) {
205 std::cout <<
"ERROR: Listen failed with error: " << getLastError() << std::endl;
206 closesocket(server_listen_socket_);
212 std::cout <<
"> INFO: Waiting to accept client socket on port " << port << std::endl;
214 tcp_socket_ = ::accept(server_listen_socket_, NULL, NULL);
215 if (!isValidSocket(tcp_socket_)) {
216 std::cout <<
"ERROR: Accepting client failed with error: " << getLastError() << std::endl;
217 closesocket(server_listen_socket_);
222 std::cout <<
"> INFO: TCP connection successfully established" << std::endl;
224 socket_type_ = SERVER;
226 std::cout <<
"TODO: Implement TCPConnection::listenAndAwaitConnection for non-Windows" << std::endl;
238 struct addrinfo* result = NULL, hints;
244 std::cout <<
"> INFO: Initializing Winsock" << std::endl;
246 err = WSAStartup(MAKEWORD(2, 2), &wsaData);
248 std::cout <<
"ERROR: WSAStartup failed with error: " << err << std::endl;
253 memset(&hints, 0,
sizeof(hints));
254 hints.ai_family = AF_INET;
255 hints.ai_socktype = SOCK_STREAM;
256 hints.ai_protocol = IPPROTO_TCP;
257 hints.ai_flags = AI_PASSIVE;
259 std::cout <<
"> INFO: Resolving server address and port" << std::endl;
261 err = getaddrinfo(host.c_str(), port.c_str(), &hints, &result);
263 std::cout <<
"ERROR: getaddrinfo failed with error: " << err << std::endl;
270 std::cout <<
"> INFO: Creating socket for connection to server" << std::endl;
272 tcp_socket_ = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
273 if (!isValidSocket(tcp_socket_)) {
274 std::cout <<
"ERROR: Socker failed with error: " << getLastError() << std::endl;
275 freeaddrinfo(result);
282 std::cout <<
"> INFO: Connecting to TCP server" << std::endl;
285 std::cout <<
"Trying to connect to " << host <<
":" << port << std::endl;
286 err = connect(tcp_socket_, result->ai_addr, (
int)result->ai_addrlen);
290 std::cout <<
"Failed to connect to server with error: " << getLastError() << std::endl;
291 std::cout <<
"Trying to reconnect in 1 sec..." << std::endl;
292 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
295 freeaddrinfo(result);
297 if (!isValidSocket(tcp_socket_)) {
298 printf(
"Unable to connect to server!\n");
307 std::cout <<
"> INFO: TCP connection successfully established" << std::endl;
309 socket_type_ = CLIENT;
329 std::this_thread::sleep_for(std::chrono::milliseconds(100));
332 while (state_ == RUNNING) {
333 if (!isValidSocket(tcp_socket_)) {
334 std::cout <<
"WARNING: Lost TCP connection. Trying to reconnect." << std::endl;
336 switch (socket_type_)
339 if (!isValidSocket(server_listen_socket_)) {
343 std::cout <<
"INFO: Accepting new client on socket, waiting for connection." << std::endl;
344 tcp_socket_ = ::accept(server_listen_socket_, NULL, NULL);
352 if (error_code != 0 || !isValidSocket(tcp_socket_))
354 std::cout <<
"Unable to reconnect... Error: " << getLastError() <<
" Retrying in 1 sec..." << std::endl;
355 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
358 std::cout <<
"INFO: Reconnect successfull." << std::endl;
359 std::unique_ptr<TCPMessage> reconnect_msg = std::make_unique<TCPMessage>();
360 reconnect_msg->set_messagetype(TCPMessage::RECONNECT);
361 incoming_queue_->enqueue(std::move(reconnect_msg));
364 std::unique_ptr<TCPMessage> msg = outgoing_queue_->dequeue();
366 std::cout <<
"Sending message of type " << msg->messagetype() << std::endl;
368 if (error_code <= 0) {
372 msg = std::move(outgoing_queue_->dequeue());
376 std::this_thread::sleep_for(std::chrono::milliseconds(1));
377 bool got_error =
false;
385 else if (ready < 0) {
387 std::cout <<
"select() == SOCKET_ERROR error: " << getLastError() << std::endl;
390 setSocketInvalid(tcp_socket_);
398 setSocketInvalid(tcp_socket_);
403 incoming_queue_->enqueue(std::move(in_msg));
409 incoming_queue_->clear();
410 outgoing_queue_->clear();
413 if (isValidSocket(tcp_socket_)) {
415 int err = shutdown(tcp_socket_, SD_SEND);
417 int err = shutdown(tcp_socket_, SHUT_WR);
420 std::cout <<
"ERROR: Shutdown of Client Socket failed with error: " << getLastError() << std::endl;
423 closesocket(tcp_socket_);
429 setSocketInvalid(tcp_socket_);
435 int received_bytes = 0;
438 uint64_t len_res = 0;
441 std::string tcp_msg_len_recv_buf;
442 tcp_msg_len_recv_buf.reserve(msg_length_buf_size_);
444 while (len_res < msg_length_buf_size_) {
445 received_bytes = recv(tcp_socket_, &(tcp_msg_len_recv_buf[len_res]), msg_length_buf_size_ - len_res, 0);
446 if (received_bytes > 0) {
448 len_res += received_bytes;
450 else if (received_bytes == 0) {
453 std::cout <<
"Connection closing..." << std::endl;
458 std::cout <<
"recv failed during recv of data length with error: " << getLastError() << std::endl;
460 closesocket(tcp_socket_);
470 uint64_t msg_len = 0;
471 for (
int i = msg_length_buf_size_ - 1; i >= 0; --i)
474 msg_len |= (
unsigned char)tcp_msg_len_recv_buf[i];
481 char* buf =
new char[msg_len]();
482 while (len_res < msg_len) {
483 received_bytes = recv(tcp_socket_, &buf[len_res], msg_len - len_res, 0);
484 if (received_bytes > 0) {
485 len_res += received_bytes;
487 else if (received_bytes == 0) {
491 std::cout <<
"recv failed during recv of data message with error: " << getLastError() << std::endl;
497 std::unique_ptr<TCPMessage> msg = std::make_unique<TCPMessage>();
498 if (!msg->ParseFromArray(buf, msg_len)) {
499 std::cout <<
"ERROR: Parsing Message from String failed" << std::endl;
512 out = msg->SerializeAsString();
515 std::string out_buffer =
"";
516 for (
int i = 0; i < 8; ++i) {
517 out_buffer += (
unsigned char)((
int)(((uint64_t)out.size() >> (i * 8)) & 0xFF));
524 int i_send_result = ::send(tcp_socket_, &out_buffer[0], out_buffer.size(), 0);
525 if (i_send_result < 0) {
526 std::cout <<
"SendMessage failed with error: " << getLastError() << std::endl;
528 closesocket(tcp_socket_);
535 return i_send_result;
544 if (!isValidSocket(fd))
550 FD_SET tcp_client_fd_set;
551 FD_ZERO(&tcp_client_fd_set);
552 FD_SET(fd, &tcp_client_fd_set);
553 int rc = ::select(fd + 1, &tcp_client_fd_set, NULL, NULL, &tv);
555 struct pollfd pollInfo[1];
557 pollInfo[0].events = POLLIN;
558 int rc = ::poll(pollInfo, 1, 0);
568 if (!(pollInfo[0].revents & POLLIN)) {
577 std::map<int, std::string> TCPConnection::type_to_name_map_ =
578 { {TCPMessage_Type_DATA,
"DATA"},
579 {TCPMessage_Type_SETUP,
"SETUP"},
580 {TCPMessage_Type_START,
"START"},
581 {TCPMessage_Type_STOP,
"STOP"} };