Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
#include <config.h>
21
#include <cstring>
22
#include <sstream>
23
#include <sys/socket.h>
24
#include <sys/poll.h>
25
#include <sys/types.h>
26
#include <arpa/inet.h>
27
#include <netinet/in.h>
28
#include <netinet/tcp.h>
29
#include <netdb.h>
30
#include <unistd.h>
31
#include <errno.h>
32
#include <fcntl.h>
33
 
34
#include "concurrency/Monitor.h"
35
#include "TSocket.h"
36
#include "TTransportException.h"
37
 
38
namespace apache { namespace thrift { namespace transport {
39
 
40
using namespace std;
41
 
42
// Global var to track total socket sys calls
43
uint32_t g_socket_syscalls = 0;
44
 
45
/**
46
 * TSocket implementation.
47
 *
48
 */
49
 
50
TSocket::TSocket(string host, int port) :
51
  host_(host),
52
  port_(port),
53
  socket_(-1),
54
  connTimeout_(0),
55
  sendTimeout_(0),
56
  recvTimeout_(0),
57
  lingerOn_(1),
58
  lingerVal_(0),
59
  noDelay_(1),
60
  maxRecvRetries_(5) {
61
  recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
62
  recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
63
}
64
 
65
TSocket::TSocket() :
66
  host_(""),
67
  port_(0),
68
  socket_(-1),
69
  connTimeout_(0),
70
  sendTimeout_(0),
71
  recvTimeout_(0),
72
  lingerOn_(1),
73
  lingerVal_(0),
74
  noDelay_(1),
75
  maxRecvRetries_(5) {
76
  recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
77
  recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
78
}
79
 
80
TSocket::TSocket(int socket) :
81
  host_(""),
82
  port_(0),
83
  socket_(socket),
84
  connTimeout_(0),
85
  sendTimeout_(0),
86
  recvTimeout_(0),
87
  lingerOn_(1),
88
  lingerVal_(0),
89
  noDelay_(1),
90
  maxRecvRetries_(5) {
91
  recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
92
  recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
93
}
94
 
95
TSocket::~TSocket() {
96
  close();
97
}
98
 
99
bool TSocket::isOpen() {
100
  return (socket_ >= 0);
101
}
102
 
103
bool TSocket::peek() {
104
  if (!isOpen()) {
105
    return false;
106
  }
107
  uint8_t buf;
108
  int r = recv(socket_, &buf, 1, MSG_PEEK);
109
  if (r == -1) {
110
    int errno_copy = errno;
111
    #if defined __FreeBSD__ || defined __MACH__
112
    /* shigin:
113
     * freebsd returns -1 and ECONNRESET if socket was closed by 
114
     * the other side
115
     */
116
    if (errno_copy == ECONNRESET)
117
    {
118
      close();
119
      return false;
120
    }
121
    #endif
122
    GlobalOutput.perror("TSocket::peek() recv() " + getSocketInfo(), errno_copy);
123
    throw TTransportException(TTransportException::UNKNOWN, "recv()", errno_copy);
124
  }
125
  return (r > 0);
126
}
127
 
128
void TSocket::openConnection(struct addrinfo *res) {
129
  if (isOpen()) {
130
    throw TTransportException(TTransportException::ALREADY_OPEN);
131
  }
132
 
133
  socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
134
  if (socket_ == -1) {
135
    int errno_copy = errno;
136
    GlobalOutput.perror("TSocket::open() socket() " + getSocketInfo(), errno_copy);
137
    throw TTransportException(TTransportException::NOT_OPEN, "socket()", errno_copy);
138
  }
139
 
140
  // Send timeout
141
  if (sendTimeout_ > 0) {
142
    setSendTimeout(sendTimeout_);
143
  }
144
 
145
  // Recv timeout
146
  if (recvTimeout_ > 0) {
147
    setRecvTimeout(recvTimeout_);
148
  }
149
 
150
  // Linger
151
  setLinger(lingerOn_, lingerVal_);
152
 
153
  // No delay
154
  setNoDelay(noDelay_);
155
 
156
  // Set the socket to be non blocking for connect if a timeout exists
157
  int flags = fcntl(socket_, F_GETFL, 0);
158
  if (connTimeout_ > 0) {
159
    if (-1 == fcntl(socket_, F_SETFL, flags | O_NONBLOCK)) {
160
      int errno_copy = errno;
161
      GlobalOutput.perror("TSocket::open() fcntl() " + getSocketInfo(), errno_copy);
162
      throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy);
163
    }
164
  } else {
165
    if (-1 == fcntl(socket_, F_SETFL, flags & ~O_NONBLOCK)) {
166
      int errno_copy = errno;
167
      GlobalOutput.perror("TSocket::open() fcntl " + getSocketInfo(), errno_copy);
168
      throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy);
169
    }
170
  }
171
 
172
  // Connect the socket
173
  int ret = connect(socket_, res->ai_addr, res->ai_addrlen);
174
 
175
  // success case
176
  if (ret == 0) {
177
    goto done;
178
  }
179
 
180
  if (errno != EINPROGRESS) {
181
    int errno_copy = errno;
182
    GlobalOutput.perror("TSocket::open() connect() " + getSocketInfo(), errno_copy);
183
    throw TTransportException(TTransportException::NOT_OPEN, "connect() failed", errno_copy);
184
  }
185
 
186
 
187
  struct pollfd fds[1];
188
  std::memset(fds, 0 , sizeof(fds));
189
  fds[0].fd = socket_;
190
  fds[0].events = POLLOUT;
191
  ret = poll(fds, 1, connTimeout_);
192
 
193
  if (ret > 0) {
194
    // Ensure the socket is connected and that there are no errors set
195
    int val;
196
    socklen_t lon;
197
    lon = sizeof(int);
198
    int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, (void *)&val, &lon);
199
    if (ret2 == -1) {
200
      int errno_copy = errno;
201
      GlobalOutput.perror("TSocket::open() getsockopt() " + getSocketInfo(), errno_copy);
202
      throw TTransportException(TTransportException::NOT_OPEN, "getsockopt()", errno_copy);
203
    }
204
    // no errors on socket, go to town
205
    if (val == 0) {
206
      goto done;
207
    }
208
    GlobalOutput.perror("TSocket::open() error on socket (after poll) " + getSocketInfo(), val);
209
    throw TTransportException(TTransportException::NOT_OPEN, "socket open() error", val);
210
  } else if (ret == 0) {
211
    // socket timed out
212
    string errStr = "TSocket::open() timed out " + getSocketInfo();
213
    GlobalOutput(errStr.c_str());
214
    throw TTransportException(TTransportException::NOT_OPEN, "open() timed out");
215
  } else {
216
    // error on poll()
217
    int errno_copy = errno;
218
    GlobalOutput.perror("TSocket::open() poll() " + getSocketInfo(), errno_copy);
219
    throw TTransportException(TTransportException::NOT_OPEN, "poll() failed", errno_copy);
220
  }
221
 
222
 done:
223
  // Set socket back to normal mode (blocking)
224
  fcntl(socket_, F_SETFL, flags);
225
}
226
 
227
void TSocket::open() {
228
  if (isOpen()) {
229
    throw TTransportException(TTransportException::ALREADY_OPEN);
230
  }
231
 
232
  // Validate port number
233
  if (port_ < 0 || port_ > 65536) {
234
    throw TTransportException(TTransportException::NOT_OPEN, "Specified port is invalid");
235
  }
236
 
237
  struct addrinfo hints, *res, *res0;
238
  res = NULL;
239
  res0 = NULL;
240
  int error;
241
  char port[sizeof("65536")];
242
  std::memset(&hints, 0, sizeof(hints));
243
  hints.ai_family = PF_UNSPEC;
244
  hints.ai_socktype = SOCK_STREAM;
245
  hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
246
  sprintf(port, "%d", port_);
247
 
248
  error = getaddrinfo(host_.c_str(), port, &hints, &res0);
249
 
250
  if (error) {
251
    string errStr = "TSocket::open() getaddrinfo() " + getSocketInfo() + string(gai_strerror(error));
252
    GlobalOutput(errStr.c_str());
253
    close();
254
    throw TTransportException(TTransportException::NOT_OPEN, "Could not resolve host for client socket.");
255
  }
256
 
257
  // Cycle through all the returned addresses until one
258
  // connects or push the exception up.
259
  for (res = res0; res; res = res->ai_next) {
260
    try {
261
      openConnection(res);
262
      break;
263
    } catch (TTransportException& ttx) {
264
      if (res->ai_next) {
265
        close();
266
      } else {
267
        close();
268
        freeaddrinfo(res0); // cleanup on failure
269
        throw;
270
      }
271
    }
272
  }
273
 
274
  // Free address structure memory
275
  freeaddrinfo(res0);
276
}
277
 
278
void TSocket::close() {
279
  if (socket_ >= 0) {
280
    shutdown(socket_, SHUT_RDWR);
281
    ::close(socket_);
282
  }
283
  socket_ = -1;
284
}
285
 
286
uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
287
  if (socket_ < 0) {
288
    throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open socket");
289
  }
290
 
291
  int32_t retries = 0;
292
 
293
  // EAGAIN can be signalled both when a timeout has occurred and when
294
  // the system is out of resources (an awesome undocumented feature).
295
  // The following is an approximation of the time interval under which
296
  // EAGAIN is taken to indicate an out of resources error.
297
  uint32_t eagainThresholdMicros = 0;
298
  if (recvTimeout_) {
299
    // if a readTimeout is specified along with a max number of recv retries, then
300
    // the threshold will ensure that the read timeout is not exceeded even in the
301
    // case of resource errors
302
    eagainThresholdMicros = (recvTimeout_*1000)/ ((maxRecvRetries_>0) ? maxRecvRetries_ : 2);
303
  }
304
 
305
 try_again:
306
  // Read from the socket
307
  struct timeval begin;
308
  gettimeofday(&begin, NULL);
309
  int got = recv(socket_, buf, len, 0);
310
  int errno_copy = errno; //gettimeofday can change errno
311
  struct timeval end;
312
  gettimeofday(&end, NULL);
313
  uint32_t readElapsedMicros =  (((end.tv_sec - begin.tv_sec) * 1000 * 1000)
314
                                 + (((uint64_t)(end.tv_usec - begin.tv_usec))));
315
  ++g_socket_syscalls;
316
 
317
  // Check for error on read
318
  if (got < 0) {
319
    if (errno_copy == EAGAIN) {
320
      // check if this is the lack of resources or timeout case
321
      if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) {
322
        if (retries++ < maxRecvRetries_) {
323
          usleep(50);
324
          goto try_again;
325
        } else {
326
          throw TTransportException(TTransportException::TIMED_OUT,
327
                                    "EAGAIN (unavailable resources)");
328
        }
329
      } else {
330
        // infer that timeout has been hit
331
        throw TTransportException(TTransportException::TIMED_OUT,
332
                                  "EAGAIN (timed out)");
333
      }
334
    }
335
 
336
    // If interrupted, try again
337
    if (errno_copy == EINTR && retries++ < maxRecvRetries_) {
338
      goto try_again;
339
    }
340
 
341
    #if defined __FreeBSD__ || defined __MACH__
342
    if (errno_copy == ECONNRESET) {
343
      /* shigin: freebsd doesn't follow POSIX semantic of recv and fails with
344
       * ECONNRESET if peer performed shutdown 
345
       */
346
      close();
347
      return 0;
348
    }
349
    #endif
350
 
351
    // Now it's not a try again case, but a real probblez
352
    GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy);
353
 
354
    // If we disconnect with no linger time
355
    if (errno_copy == ECONNRESET) {
356
      throw TTransportException(TTransportException::NOT_OPEN, "ECONNRESET");
357
    }
358
 
359
    // This ish isn't open
360
    if (errno_copy == ENOTCONN) {
361
      throw TTransportException(TTransportException::NOT_OPEN, "ENOTCONN");
362
    }
363
 
364
    // Timed out!
365
    if (errno_copy == ETIMEDOUT) {
366
      throw TTransportException(TTransportException::TIMED_OUT, "ETIMEDOUT");
367
    }
368
 
369
    // Some other error, whatevz
370
    throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
371
  }
372
 
373
  // The remote host has closed the socket
374
  if (got == 0) {
375
    close();
376
    return 0;
377
  }
378
 
379
  // Pack data into string
380
  return got;
381
}
382
 
383
void TSocket::write(const uint8_t* buf, uint32_t len) {
384
  if (socket_ < 0) {
385
    throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open socket");
386
  }
387
 
388
  uint32_t sent = 0;
389
 
390
  while (sent < len) {
391
 
392
    int flags = 0;
393
    #ifdef MSG_NOSIGNAL
394
    // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
395
    // check for the EPIPE return condition and close the socket in that case
396
    flags |= MSG_NOSIGNAL;
397
    #endif // ifdef MSG_NOSIGNAL
398
 
399
    int b = send(socket_, buf + sent, len - sent, flags);
400
    ++g_socket_syscalls;
401
 
402
    // Fail on a send error
403
    if (b < 0) {
404
      int errno_copy = errno;
405
      GlobalOutput.perror("TSocket::write() send() " + getSocketInfo(), errno_copy);
406
 
407
      if (errno == EPIPE || errno == ECONNRESET || errno == ENOTCONN) {
408
        close();
409
        throw TTransportException(TTransportException::NOT_OPEN, "write() send()", errno_copy);
410
      }
411
 
412
      throw TTransportException(TTransportException::UNKNOWN, "write() send()", errno_copy);
413
    }
414
 
415
    // Fail on blocked send
416
    if (b == 0) {
417
      throw TTransportException(TTransportException::NOT_OPEN, "Socket send returned 0.");
418
    }
419
    sent += b;
420
  }
421
}
422
 
423
std::string TSocket::getHost() {
424
  return host_;
425
}
426
 
427
int TSocket::getPort() {
428
  return port_;
429
}
430
 
431
void TSocket::setHost(string host) {
432
  host_ = host;
433
}
434
 
435
void TSocket::setPort(int port) {
436
  port_ = port;
437
}
438
 
439
void TSocket::setLinger(bool on, int linger) {
440
  lingerOn_ = on;
441
  lingerVal_ = linger;
442
  if (socket_ < 0) {
443
    return;
444
  }
445
 
446
  struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_};
447
  int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, &l, sizeof(l));
448
  if (ret == -1) {
449
    int errno_copy = errno;  // Copy errno because we're allocating memory.
450
    GlobalOutput.perror("TSocket::setLinger() setsockopt() " + getSocketInfo(), errno_copy);
451
  }
452
}
453
 
454
void TSocket::setNoDelay(bool noDelay) {
455
  noDelay_ = noDelay;
456
  if (socket_ < 0) {
457
    return;
458
  }
459
 
460
  // Set socket to NODELAY
461
  int v = noDelay_ ? 1 : 0;
462
  int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v));
463
  if (ret == -1) {
464
    int errno_copy = errno;  // Copy errno because we're allocating memory.
465
    GlobalOutput.perror("TSocket::setNoDelay() setsockopt() " + getSocketInfo(), errno_copy);
466
  }
467
}
468
 
469
void TSocket::setConnTimeout(int ms) {
470
  connTimeout_ = ms;
471
}
472
 
473
void TSocket::setRecvTimeout(int ms) {
474
  if (ms < 0) {
475
    char errBuf[512];
476
    sprintf(errBuf, "TSocket::setRecvTimeout with negative input: %d\n", ms);
477
    GlobalOutput(errBuf);
478
    return;
479
  }
480
  recvTimeout_ = ms;
481
 
482
  if (socket_ < 0) {
483
    return;
484
  }
485
 
486
  recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
487
  recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
488
 
489
  // Copy because poll may modify
490
  struct timeval r = recvTimeval_;
491
  int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, &r, sizeof(r));
492
  if (ret == -1) {
493
    int errno_copy = errno;  // Copy errno because we're allocating memory.
494
    GlobalOutput.perror("TSocket::setRecvTimeout() setsockopt() " + getSocketInfo(), errno_copy);
495
  }
496
}
497
 
498
void TSocket::setSendTimeout(int ms) {
499
  if (ms < 0) {
500
    char errBuf[512];
501
    sprintf(errBuf, "TSocket::setSendTimeout with negative input: %d\n", ms);
502
    GlobalOutput(errBuf);
503
    return;
504
  }
505
  sendTimeout_ = ms;
506
 
507
  if (socket_ < 0) {
508
    return;
509
  }
510
 
511
  struct timeval s = {(int)(sendTimeout_/1000),
512
                      (int)((sendTimeout_%1000)*1000)};
513
  int ret = setsockopt(socket_, SOL_SOCKET, SO_SNDTIMEO, &s, sizeof(s));
514
  if (ret == -1) {
515
    int errno_copy = errno;  // Copy errno because we're allocating memory.
516
    GlobalOutput.perror("TSocket::setSendTimeout() setsockopt() " + getSocketInfo(), errno_copy);
517
  }
518
}
519
 
520
void TSocket::setMaxRecvRetries(int maxRecvRetries) {
521
  maxRecvRetries_ = maxRecvRetries;
522
}
523
 
524
string TSocket::getSocketInfo() {
525
  std::ostringstream oss;
526
  oss << "<Host: " << host_ << " Port: " << port_ << ">";
527
  return oss.str();
528
}
529
 
530
std::string TSocket::getPeerHost() {
531
  if (peerHost_.empty()) {
532
    struct sockaddr_storage addr;
533
    socklen_t addrLen = sizeof(addr);
534
 
535
    if (socket_ < 0) {
536
      return host_;
537
    }
538
 
539
    int rv = getpeername(socket_, (sockaddr*) &addr, &addrLen);
540
 
541
    if (rv != 0) {
542
      return peerHost_;
543
    }
544
 
545
    char clienthost[NI_MAXHOST];
546
    char clientservice[NI_MAXSERV];
547
 
548
    getnameinfo((sockaddr*) &addr, addrLen,
549
                clienthost, sizeof(clienthost),
550
                clientservice, sizeof(clientservice), 0);
551
 
552
    peerHost_ = clienthost;
553
  }
554
  return peerHost_;
555
}
556
 
557
std::string TSocket::getPeerAddress() {
558
  if (peerAddress_.empty()) {
559
    struct sockaddr_storage addr;
560
    socklen_t addrLen = sizeof(addr);
561
 
562
    if (socket_ < 0) {
563
      return peerAddress_;
564
    }
565
 
566
    int rv = getpeername(socket_, (sockaddr*) &addr, &addrLen);
567
 
568
    if (rv != 0) {
569
      return peerAddress_;
570
    }
571
 
572
    char clienthost[NI_MAXHOST];
573
    char clientservice[NI_MAXSERV];
574
 
575
    getnameinfo((sockaddr*) &addr, addrLen,
576
                clienthost, sizeof(clienthost),
577
                clientservice, sizeof(clientservice),
578
                NI_NUMERICHOST|NI_NUMERICSERV);
579
 
580
    peerAddress_ = clienthost;
581
    peerPort_ = std::atoi(clientservice);
582
  }
583
  return peerAddress_;
584
}
585
 
586
int TSocket::getPeerPort() {
587
  getPeerAddress();
588
  return peerPort_;
589
}
590
 
591
}}} // apache::thrift::transport