Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
#include <cstring>
21
#include <sys/types.h>
22
#include <sys/socket.h>
23
#include <sys/poll.h>
24
#include <sys/types.h>
25
#include <netinet/in.h>
26
#include <netinet/tcp.h>
27
#include <netdb.h>
28
#include <fcntl.h>
29
#include <errno.h>
30
#include <unistd.h>
31
 
32
#include "TSocket.h"
33
#include "TServerSocket.h"
34
#include <boost/shared_ptr.hpp>
35
 
36
#ifndef AF_LOCAL
37
#define AF_LOCAL AF_UNIX
38
#endif
39
 
40
namespace apache { namespace thrift { namespace transport {
41
 
42
using namespace std;
43
using boost::shared_ptr;
44
 
45
TServerSocket::TServerSocket(int port) :
46
  port_(port),
47
  serverSocket_(-1),
48
  acceptBacklog_(1024),
49
  sendTimeout_(0),
50
  recvTimeout_(0),
51
  retryLimit_(0),
52
  retryDelay_(0),
53
  tcpSendBuffer_(0),
54
  tcpRecvBuffer_(0),
55
  intSock1_(-1),
56
  intSock2_(-1) {}
57
 
58
TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout) :
59
  port_(port),
60
  serverSocket_(-1),
61
  acceptBacklog_(1024),
62
  sendTimeout_(sendTimeout),
63
  recvTimeout_(recvTimeout),
64
  retryLimit_(0),
65
  retryDelay_(0),
66
  tcpSendBuffer_(0),
67
  tcpRecvBuffer_(0),
68
  intSock1_(-1),
69
  intSock2_(-1) {}
70
 
71
TServerSocket::~TServerSocket() {
72
  close();
73
}
74
 
75
void TServerSocket::setSendTimeout(int sendTimeout) {
76
  sendTimeout_ = sendTimeout;
77
}
78
 
79
void TServerSocket::setRecvTimeout(int recvTimeout) {
80
  recvTimeout_ = recvTimeout;
81
}
82
 
83
void TServerSocket::setRetryLimit(int retryLimit) {
84
  retryLimit_ = retryLimit;
85
}
86
 
87
void TServerSocket::setRetryDelay(int retryDelay) {
88
  retryDelay_ = retryDelay;
89
}
90
 
91
void TServerSocket::setTcpSendBuffer(int tcpSendBuffer) {
92
  tcpSendBuffer_ = tcpSendBuffer;
93
}
94
 
95
void TServerSocket::setTcpRecvBuffer(int tcpRecvBuffer) {
96
  tcpRecvBuffer_ = tcpRecvBuffer;
97
}
98
 
99
void TServerSocket::listen() {
100
  int sv[2];
101
  if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
102
    GlobalOutput.perror("TServerSocket::listen() socketpair() ", errno);
103
    intSock1_ = -1;
104
    intSock2_ = -1;
105
  } else {
106
    intSock1_ = sv[1];
107
    intSock2_ = sv[0];
108
  }
109
 
110
  struct addrinfo hints, *res, *res0;
111
  int error;
112
  char port[sizeof("65536") + 1];
113
  std::memset(&hints, 0, sizeof(hints));
114
  hints.ai_family = PF_UNSPEC;
115
  hints.ai_socktype = SOCK_STREAM;
116
  hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
117
  sprintf(port, "%d", port_);
118
 
119
  // Wildcard address
120
  error = getaddrinfo(NULL, port, &hints, &res0);
121
  if (error) {
122
    GlobalOutput.printf("getaddrinfo %d: %s", error, gai_strerror(error));
123
    close();
124
    throw TTransportException(TTransportException::NOT_OPEN, "Could not resolve host for server socket.");
125
  }
126
 
127
  // Pick the ipv6 address first since ipv4 addresses can be mapped
128
  // into ipv6 space.
129
  for (res = res0; res; res = res->ai_next) {
130
    if (res->ai_family == AF_INET6 || res->ai_next == NULL)
131
      break;
132
  }
133
 
134
  serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
135
  if (serverSocket_ == -1) {
136
    int errno_copy = errno;
137
    GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
138
    close();
139
    throw TTransportException(TTransportException::NOT_OPEN, "Could not create server socket.", errno_copy);
140
  }
141
 
142
  // Set reusaddress to prevent 2MSL delay on accept
143
  int one = 1;
144
  if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR,
145
                       &one, sizeof(one))) {
146
    int errno_copy = errno;
147
    GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_REUSEADDR ", errno_copy);
148
    close();
149
    throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_REUSEADDR", errno_copy);
150
  }
151
 
152
  // Set TCP buffer sizes
153
  if (tcpSendBuffer_ > 0) {
154
    if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_SNDBUF,
155
                         &tcpSendBuffer_, sizeof(tcpSendBuffer_))) {
156
      int errno_copy = errno;
157
      GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_SNDBUF ", errno_copy);
158
      close();
159
      throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_SNDBUF", errno_copy);
160
    }
161
  }
162
 
163
  if (tcpRecvBuffer_ > 0) {
164
    if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_RCVBUF,
165
                         &tcpRecvBuffer_, sizeof(tcpRecvBuffer_))) {
166
      int errno_copy = errno;
167
      GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_RCVBUF ", errno_copy);
168
      close();
169
      throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_RCVBUF", errno_copy);
170
    }
171
  }
172
 
173
  // Defer accept
174
  #ifdef TCP_DEFER_ACCEPT
175
  if (-1 == setsockopt(serverSocket_, SOL_SOCKET, TCP_DEFER_ACCEPT,
176
                       &one, sizeof(one))) {
177
    int errno_copy = errno;
178
    GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_DEFER_ACCEPT ", errno_copy);
179
    close();
180
    throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_DEFER_ACCEPT", errno_copy);
181
  }
182
  #endif // #ifdef TCP_DEFER_ACCEPT
183
 
184
  #ifdef IPV6_V6ONLY
185
  int zero = 0;
186
  if (-1 == setsockopt(serverSocket_, IPPROTO_IPV6, IPV6_V6ONLY,
187
                        &zero, sizeof(zero))) {
188
    GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", errno);
189
  }
190
  #endif // #ifdef IPV6_V6ONLY
191
 
192
  // Turn linger off, don't want to block on calls to close
193
  struct linger ling = {0, 0};
194
  if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER,
195
                       &ling, sizeof(ling))) {
196
    int errno_copy = errno;
197
    GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_LINGER ", errno_copy);
198
    close();
199
    throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER", errno_copy);
200
  }
201
 
202
  // TCP Nodelay, speed over bandwidth
203
  if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY,
204
                       &one, sizeof(one))) {
205
    int errno_copy = errno;
206
    GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
207
    close();
208
    throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY", errno_copy);
209
  }
210
 
211
  // Set NONBLOCK on the accept socket
212
  int flags = fcntl(serverSocket_, F_GETFL, 0);
213
  if (flags == -1) {
214
    int errno_copy = errno;
215
    GlobalOutput.perror("TServerSocket::listen() fcntl() F_GETFL ", errno_copy);
216
    throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy);
217
  }
218
 
219
  if (-1 == fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK)) {
220
    int errno_copy = errno;
221
    GlobalOutput.perror("TServerSocket::listen() fcntl() O_NONBLOCK ", errno_copy);
222
    throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy);
223
  }
224
 
225
  // prepare the port information
226
  // we may want to try to bind more than once, since SO_REUSEADDR doesn't
227
  // always seem to work. The client can configure the retry variables.
228
  int retries = 0;
229
  do {
230
    if (0 == bind(serverSocket_, res->ai_addr, res->ai_addrlen)) {
231
      break;
232
    }
233
 
234
    // use short circuit evaluation here to only sleep if we need to
235
  } while ((retries++ < retryLimit_) && (sleep(retryDelay_) == 0));
236
 
237
  // free addrinfo
238
  freeaddrinfo(res0);
239
 
240
  // throw an error if we failed to bind properly
241
  if (retries > retryLimit_) {
242
    char errbuf[1024];
243
    sprintf(errbuf, "TServerSocket::listen() BIND %d", port_);
244
    GlobalOutput(errbuf);
245
    close();
246
    throw TTransportException(TTransportException::NOT_OPEN, "Could not bind");
247
  }
248
 
249
  // Call listen
250
  if (-1 == ::listen(serverSocket_, acceptBacklog_)) {
251
    int errno_copy = errno;
252
    GlobalOutput.perror("TServerSocket::listen() listen() ", errno_copy);
253
    close();
254
    throw TTransportException(TTransportException::NOT_OPEN, "Could not listen", errno_copy);
255
  }
256
 
257
  // The socket is now listening!
258
}
259
 
260
shared_ptr<TTransport> TServerSocket::acceptImpl() {
261
  if (serverSocket_ < 0) {
262
    throw TTransportException(TTransportException::NOT_OPEN, "TServerSocket not listening");
263
  }
264
 
265
  struct pollfd fds[2];
266
 
267
  int maxEintrs = 5;
268
  int numEintrs = 0;
269
 
270
  while (true) {
271
    std::memset(fds, 0 , sizeof(fds));
272
    fds[0].fd = serverSocket_;
273
    fds[0].events = POLLIN;
274
    if (intSock2_ >= 0) {
275
      fds[1].fd = intSock2_;
276
      fds[1].events = POLLIN;
277
    }
278
    int ret = poll(fds, 2, -1);
279
 
280
    if (ret < 0) {
281
      // error cases
282
      if (errno == EINTR && (numEintrs++ < maxEintrs)) {
283
        // EINTR needs to be handled manually and we can tolerate
284
        // a certain number
285
        continue;
286
      }
287
      int errno_copy = errno;
288
      GlobalOutput.perror("TServerSocket::acceptImpl() poll() ", errno_copy);
289
      throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
290
    } else if (ret > 0) {
291
      // Check for an interrupt signal
292
      if (intSock2_ >= 0 && (fds[1].revents & POLLIN)) {
293
        int8_t buf;
294
        if (-1 == recv(intSock2_, &buf, sizeof(int8_t), 0)) {
295
          GlobalOutput.perror("TServerSocket::acceptImpl() recv() interrupt ", errno);
296
        }
297
        throw TTransportException(TTransportException::INTERRUPTED);
298
      }
299
 
300
      // Check for the actual server socket being ready
301
      if (fds[0].revents & POLLIN) {
302
        break;
303
      }
304
    } else {
305
      GlobalOutput("TServerSocket::acceptImpl() poll 0");
306
      throw TTransportException(TTransportException::UNKNOWN);
307
    }
308
  }
309
 
310
  struct sockaddr_storage clientAddress;
311
  int size = sizeof(clientAddress);
312
  int clientSocket = ::accept(serverSocket_,
313
                              (struct sockaddr *) &clientAddress,
314
                              (socklen_t *) &size);
315
 
316
  if (clientSocket < 0) {
317
    int errno_copy = errno;
318
    GlobalOutput.perror("TServerSocket::acceptImpl() ::accept() ", errno_copy);
319
    throw TTransportException(TTransportException::UNKNOWN, "accept()", errno_copy);
320
  }
321
 
322
  // Make sure client socket is blocking
323
  int flags = fcntl(clientSocket, F_GETFL, 0);
324
  if (flags == -1) {
325
    int errno_copy = errno;
326
    GlobalOutput.perror("TServerSocket::acceptImpl() fcntl() F_GETFL ", errno_copy);
327
    throw TTransportException(TTransportException::UNKNOWN, "fcntl(F_GETFL)", errno_copy);
328
  }
329
 
330
  if (-1 == fcntl(clientSocket, F_SETFL, flags & ~O_NONBLOCK)) {
331
    int errno_copy = errno;
332
    GlobalOutput.perror("TServerSocket::acceptImpl() fcntl() F_SETFL ~O_NONBLOCK ", errno_copy);
333
    throw TTransportException(TTransportException::UNKNOWN, "fcntl(F_SETFL)", errno_copy);
334
  }
335
 
336
  shared_ptr<TSocket> client(new TSocket(clientSocket));
337
  if (sendTimeout_ > 0) {
338
    client->setSendTimeout(sendTimeout_);
339
  }
340
  if (recvTimeout_ > 0) {
341
    client->setRecvTimeout(recvTimeout_);
342
  }
343
 
344
  return client;
345
}
346
 
347
void TServerSocket::interrupt() {
348
  if (intSock1_ >= 0) {
349
    int8_t byte = 0;
350
    if (-1 == send(intSock1_, &byte, sizeof(int8_t), 0)) {
351
      GlobalOutput.perror("TServerSocket::interrupt() send() ", errno);
352
    }
353
  }
354
}
355
 
356
void TServerSocket::close() {
357
  if (serverSocket_ >= 0) {
358
    shutdown(serverSocket_, SHUT_RDWR);
359
    ::close(serverSocket_);
360
  }
361
  if (intSock1_ >= 0) {
362
    ::close(intSock1_);
363
  }
364
  if (intSock2_ >= 0) {
365
    ::close(intSock2_);
366
  }
367
  serverSocket_ = -1;
368
  intSock1_ = -1;
369
  intSock2_ = -1;
370
}
371
 
372
}}} // apache::thrift::transport