Subversion Repositories SmartDukaan

Rev

Go to most recent revision | Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
#include "TNonblockingServer.h"
21
#include <concurrency/Exception.h>
22
 
23
#include <iostream>
24
#include <sys/socket.h>
25
#include <netinet/in.h>
26
#include <netinet/tcp.h>
27
#include <netdb.h>
28
#include <fcntl.h>
29
#include <errno.h>
30
#include <assert.h>
31
 
32
#ifndef AF_LOCAL
33
#define AF_LOCAL AF_UNIX
34
#endif
35
 
36
namespace apache { namespace thrift { namespace server {
37
 
38
using namespace apache::thrift::protocol;
39
using namespace apache::thrift::transport;
40
using namespace apache::thrift::concurrency;
41
using namespace std;
42
 
43
class TConnection::Task: public Runnable {
44
 public:
45
  Task(boost::shared_ptr<TProcessor> processor,
46
       boost::shared_ptr<TProtocol> input,
47
       boost::shared_ptr<TProtocol> output,
48
       int taskHandle) :
49
    processor_(processor),
50
    input_(input),
51
    output_(output),
52
    taskHandle_(taskHandle) {}
53
 
54
  void run() {
55
    try {
56
      while (processor_->process(input_, output_)) {
57
        if (!input_->getTransport()->peek()) {
58
          break;
59
        }
60
      }
61
    } catch (TTransportException& ttx) {
62
      cerr << "TNonblockingServer client died: " << ttx.what() << endl;
63
    } catch (TException& x) {
64
      cerr << "TNonblockingServer exception: " << x.what() << endl;
65
    } catch (...) {
66
      cerr << "TNonblockingServer uncaught exception." << endl;
67
    }
68
 
69
    // Signal completion back to the libevent thread via a socketpair
70
    int8_t b = 0;
71
    if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
72
      GlobalOutput.perror("TNonblockingServer::Task: send ", errno);
73
    }
74
    if (-1 == ::close(taskHandle_)) {
75
      GlobalOutput.perror("TNonblockingServer::Task: close, possible resource leak ", errno);
76
    }
77
  }
78
 
79
 private:
80
  boost::shared_ptr<TProcessor> processor_;
81
  boost::shared_ptr<TProtocol> input_;
82
  boost::shared_ptr<TProtocol> output_;
83
  int taskHandle_;
84
};
85
 
86
void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
87
  socket_ = socket;
88
  server_ = s;
89
  appState_ = APP_INIT;
90
  eventFlags_ = 0;
91
 
92
  readBufferPos_ = 0;
93
  readWant_ = 0;
94
 
95
  writeBuffer_ = NULL;
96
  writeBufferSize_ = 0;
97
  writeBufferPos_ = 0;
98
 
99
  socketState_ = SOCKET_RECV;
100
  appState_ = APP_INIT;
101
 
102
  taskHandle_ = -1;
103
 
104
  // Set flags, which also registers the event
105
  setFlags(eventFlags);
106
 
107
  // get input/transports
108
  factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
109
  factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
110
 
111
  // Create protocol
112
  inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
113
  outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
114
}
115
 
116
void TConnection::workSocket() {
117
  int flags=0, got=0, left=0, sent=0;
118
  uint32_t fetch = 0;
119
 
120
  switch (socketState_) {
121
  case SOCKET_RECV:
122
    // It is an error to be in this state if we already have all the data
123
    assert(readBufferPos_ < readWant_);
124
 
125
    // Double the buffer size until it is big enough
126
    if (readWant_ > readBufferSize_) {
127
      while (readWant_ > readBufferSize_) {
128
        readBufferSize_ *= 2;
129
      }
130
      readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
131
      if (readBuffer_ == NULL) {
132
        GlobalOutput("TConnection::workSocket() realloc");
133
        close();
134
        return;
135
      }
136
    }
137
 
138
    // Read from the socket
139
    fetch = readWant_ - readBufferPos_;
140
    got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
141
 
142
    if (got > 0) {
143
      // Move along in the buffer
144
      readBufferPos_ += got;
145
 
146
      // Check that we did not overdo it
147
      assert(readBufferPos_ <= readWant_);
148
 
149
      // We are done reading, move onto the next state
150
      if (readBufferPos_ == readWant_) {
151
        transition();
152
      }
153
      return;
154
    } else if (got == -1) {
155
      // Blocking errors are okay, just move on
156
      if (errno == EAGAIN || errno == EWOULDBLOCK) {
157
        return;
158
      }
159
 
160
      if (errno != ECONNRESET) {
161
        GlobalOutput.perror("TConnection::workSocket() recv -1 ", errno);
162
      }
163
    }
164
 
165
    // Whenever we get down here it means a remote disconnect
166
    close();
167
 
168
    return;
169
 
170
  case SOCKET_SEND:
171
    // Should never have position past size
172
    assert(writeBufferPos_ <= writeBufferSize_);
173
 
174
    // If there is no data to send, then let us move on
175
    if (writeBufferPos_ == writeBufferSize_) {
176
      GlobalOutput("WARNING: Send state with no data to send\n");
177
      transition();
178
      return;
179
    }
180
 
181
    flags = 0;
182
    #ifdef MSG_NOSIGNAL
183
    // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
184
    // check for the EPIPE return condition and close the socket in that case
185
    flags |= MSG_NOSIGNAL;
186
    #endif // ifdef MSG_NOSIGNAL
187
 
188
    left = writeBufferSize_ - writeBufferPos_;
189
    sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
190
 
191
    if (sent <= 0) {
192
      // Blocking errors are okay, just move on
193
      if (errno == EAGAIN || errno == EWOULDBLOCK) {
194
        return;
195
      }
196
      if (errno != EPIPE) {
197
        GlobalOutput.perror("TConnection::workSocket() send -1 ", errno);
198
      }
199
      close();
200
      return;
201
    }
202
 
203
    writeBufferPos_ += sent;
204
 
205
    // Did we overdo it?
206
    assert(writeBufferPos_ <= writeBufferSize_);
207
 
208
    // We are done!
209
    if (writeBufferPos_ == writeBufferSize_) {
210
      transition();
211
    }
212
 
213
    return;
214
 
215
  default:
216
    GlobalOutput.printf("Shit Got Ill. Socket State %d", socketState_);
217
    assert(0);
218
  }
219
}
220
 
221
/**
222
 * This is called when the application transitions from one state into
223
 * another. This means that it has finished writing the data that it needed
224
 * to, or finished receiving the data that it needed to.
225
 */
226
void TConnection::transition() {
227
 
228
  int sz = 0;
229
 
230
  // Switch upon the state that we are currently in and move to a new state
231
  switch (appState_) {
232
 
233
  case APP_READ_REQUEST:
234
    // We are done reading the request, package the read buffer into transport
235
    // and get back some data from the dispatch function
236
    // If we've used these transport buffers enough times, reset them to avoid bloating
237
 
238
    inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
239
    ++numReadsSinceReset_;
240
    if (numWritesSinceReset_ < 512) {
241
      outputTransport_->resetBuffer();
242
    } else {
243
      // reset the capacity of the output transport if we used it enough times that it might be bloated
244
      try {
245
        outputTransport_->resetBuffer(true);
246
        numWritesSinceReset_ = 0;
247
      } catch (TTransportException &ttx) {
248
        GlobalOutput.printf("TTransportException: TMemoryBuffer::resetBuffer() %s", ttx.what());
249
        close();
250
        return;
251
      }
252
    }
253
 
254
    // Prepend four bytes of blank space to the buffer so we can
255
    // write the frame size there later.
256
    outputTransport_->getWritePtr(4);
257
    outputTransport_->wroteBytes(4);
258
 
259
    if (server_->isThreadPoolProcessing()) {
260
      // We are setting up a Task to do this work and we will wait on it
261
      int sv[2];
262
      if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
263
        GlobalOutput.perror("TConnection::socketpair() failed ", errno);
264
        // Now we will fall through to the APP_WAIT_TASK block with no response
265
      } else {
266
        // Create task and dispatch to the thread manager
267
        boost::shared_ptr<Runnable> task =
268
          boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
269
                                               inputProtocol_,
270
                                               outputProtocol_,
271
                                               sv[1]));
272
        // The application is now waiting on the task to finish
273
        appState_ = APP_WAIT_TASK;
274
 
275
        // Create an event to be notified when the task finishes
276
        event_set(&taskEvent_,
277
                  taskHandle_ = sv[0],
278
                  EV_READ,
279
                  TConnection::taskHandler,
280
                  this);
281
 
282
        // Attach to the base
283
        event_base_set(server_->getEventBase(), &taskEvent_);
284
 
285
        // Add the event and start up the server
286
        if (-1 == event_add(&taskEvent_, 0)) {
287
          GlobalOutput("TNonblockingServer::serve(): coult not event_add");
288
          return;
289
        }
290
        try {
291
          server_->addTask(task);
292
        } catch (IllegalStateException & ise) {
293
          // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
294
          GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
295
          close();
296
        }
297
 
298
        // Set this connection idle so that libevent doesn't process more
299
        // data on it while we're still waiting for the threadmanager to
300
        // finish this task
301
        setIdle();
302
        return;
303
      }
304
    } else {
305
      try {
306
        // Invoke the processor
307
        server_->getProcessor()->process(inputProtocol_, outputProtocol_);
308
      } catch (TTransportException &ttx) {
309
        GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
310
        close();
311
        return;
312
      } catch (TException &x) {
313
        GlobalOutput.printf("TException: Server::process() %s", x.what());
314
        close();
315
        return;
316
      } catch (...) {
317
        GlobalOutput.printf("Server::process() unknown exception");
318
        close();
319
        return;
320
      }
321
    }
322
 
323
    // Intentionally fall through here, the call to process has written into
324
    // the writeBuffer_
325
 
326
  case APP_WAIT_TASK:
327
    // We have now finished processing a task and the result has been written
328
    // into the outputTransport_, so we grab its contents and place them into
329
    // the writeBuffer_ for actual writing by the libevent thread
330
 
331
    // Get the result of the operation
332
    outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
333
 
334
    // If the function call generated return data, then move into the send
335
    // state and get going
336
    // 4 bytes were reserved for frame size
337
    if (writeBufferSize_ > 4) {
338
 
339
      // Move into write state
340
      writeBufferPos_ = 0;
341
      socketState_ = SOCKET_SEND;
342
 
343
      // Put the frame size into the write buffer
344
      int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
345
      memcpy(writeBuffer_, &frameSize, 4);
346
 
347
      // Socket into write mode
348
      appState_ = APP_SEND_RESULT;
349
      setWrite();
350
 
351
      // Try to work the socket immediately
352
      // workSocket();
353
 
354
      return;
355
    }
356
 
357
    // In this case, the request was oneway and we should fall through
358
    // right back into the read frame header state
359
    goto LABEL_APP_INIT;
360
 
361
  case APP_SEND_RESULT:
362
 
363
    ++numWritesSinceReset_;
364
 
365
    // N.B.: We also intentionally fall through here into the INIT state!
366
 
367
  LABEL_APP_INIT:
368
  case APP_INIT:
369
 
370
    // reset the input buffer if we used it enough times that it might be bloated
371
    if (numReadsSinceReset_ > 512)
372
    {
373
      void * new_buffer = std::realloc(readBuffer_, 1024);
374
      if (new_buffer == NULL) {
375
        GlobalOutput("TConnection::transition() realloc");
376
        close();
377
        return;
378
      }
379
      readBuffer_ = (uint8_t*) new_buffer;
380
      readBufferSize_ = 1024;
381
      numReadsSinceReset_ = 0;
382
    }
383
 
384
    // Clear write buffer variables
385
    writeBuffer_ = NULL;
386
    writeBufferPos_ = 0;
387
    writeBufferSize_ = 0;
388
 
389
    // Set up read buffer for getting 4 bytes
390
    readBufferPos_ = 0;
391
    readWant_ = 4;
392
 
393
    // Into read4 state we go
394
    socketState_ = SOCKET_RECV;
395
    appState_ = APP_READ_FRAME_SIZE;
396
 
397
    // Register read event
398
    setRead();
399
 
400
    // Try to work the socket right away
401
    // workSocket();
402
 
403
    return;
404
 
405
  case APP_READ_FRAME_SIZE:
406
    // We just read the request length, deserialize it
407
    sz = *(int32_t*)readBuffer_;
408
    sz = (int32_t)ntohl(sz);
409
 
410
    if (sz <= 0) {
411
      GlobalOutput.printf("TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
412
      close();
413
      return;
414
    }
415
 
416
    // Reset the read buffer
417
    readWant_ = (uint32_t)sz;
418
    readBufferPos_= 0;
419
 
420
    // Move into read request state
421
    appState_ = APP_READ_REQUEST;
422
 
423
    // Work the socket right away
424
    // workSocket();
425
 
426
    return;
427
 
428
  default:
429
    GlobalOutput.printf("Totally Fucked. Application State %d", appState_);
430
    assert(0);
431
  }
432
}
433
 
434
void TConnection::setFlags(short eventFlags) {
435
  // Catch the do nothing case
436
  if (eventFlags_ == eventFlags) {
437
    return;
438
  }
439
 
440
  // Delete a previously existing event
441
  if (eventFlags_ != 0) {
442
    if (event_del(&event_) == -1) {
443
      GlobalOutput("TConnection::setFlags event_del");
444
      return;
445
    }
446
  }
447
 
448
  // Update in memory structure
449
  eventFlags_ = eventFlags;
450
 
451
  // Do not call event_set if there are no flags
452
  if (!eventFlags_) {
453
    return;
454
  }
455
 
456
  /**
457
   * event_set:
458
   *
459
   * Prepares the event structure &event to be used in future calls to
460
   * event_add() and event_del().  The event will be prepared to call the
461
   * eventHandler using the 'sock' file descriptor to monitor events.
462
   *
463
   * The events can be either EV_READ, EV_WRITE, or both, indicating
464
   * that an application can read or write from the file respectively without
465
   * blocking.
466
   *
467
   * The eventHandler will be called with the file descriptor that triggered
468
   * the event and the type of event which will be one of: EV_TIMEOUT,
469
   * EV_SIGNAL, EV_READ, EV_WRITE.
470
   *
471
   * The additional flag EV_PERSIST makes an event_add() persistent until
472
   * event_del() has been called.
473
   *
474
   * Once initialized, the &event struct can be used repeatedly with
475
   * event_add() and event_del() and does not need to be reinitialized unless
476
   * the eventHandler and/or the argument to it are to be changed.  However,
477
   * when an ev structure has been added to libevent using event_add() the
478
   * structure must persist until the event occurs (assuming EV_PERSIST
479
   * is not set) or is removed using event_del().  You may not reuse the same
480
   * ev structure for multiple monitored descriptors; each descriptor needs
481
   * its own ev.
482
   */
483
  event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
484
  event_base_set(server_->getEventBase(), &event_);
485
 
486
  // Add the event
487
  if (event_add(&event_, 0) == -1) {
488
    GlobalOutput("TConnection::setFlags(): could not event_add");
489
  }
490
}
491
 
492
/**
493
 * Closes a connection
494
 */
495
void TConnection::close() {
496
  // Delete the registered libevent
497
  if (event_del(&event_) == -1) {
498
    GlobalOutput("TConnection::close() event_del");
499
  }
500
 
501
  // Close the socket
502
  if (socket_ > 0) {
503
    ::close(socket_);
504
  }
505
  socket_ = 0;
506
 
507
  // close any factory produced transports
508
  factoryInputTransport_->close();
509
  factoryOutputTransport_->close();
510
 
511
  // Give this object back to the server that owns it
512
  server_->returnConnection(this);
513
}
514
 
515
void TConnection::checkIdleBufferMemLimit(uint32_t limit) {
516
  if (readBufferSize_ > limit) {
517
    readBufferSize_ = limit;
518
    readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
519
    if (readBuffer_ == NULL) {
520
      GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
521
      close();
522
    }
523
  }
524
}
525
 
526
/**
527
 * Creates a new connection either by reusing an object off the stack or
528
 * by allocating a new one entirely
529
 */
530
TConnection* TNonblockingServer::createConnection(int socket, short flags) {
531
  // Check the stack
532
  if (connectionStack_.empty()) {
533
    return new TConnection(socket, flags, this);
534
  } else {
535
    TConnection* result = connectionStack_.top();
536
    connectionStack_.pop();
537
    result->init(socket, flags, this);
538
    return result;
539
  }
540
}
541
 
542
/**
543
 * Returns a connection to the stack
544
 */
545
void TNonblockingServer::returnConnection(TConnection* connection) {
546
  if (connectionStackLimit_ &&
547
      (connectionStack_.size() >= connectionStackLimit_)) {
548
    delete connection;
549
  } else {
550
    connection->checkIdleBufferMemLimit(idleBufferMemLimit_);
551
    connectionStack_.push(connection);
552
  }
553
}
554
 
555
/**
556
 * Server socket had something happen.  We accept all waiting client
557
 * connections on fd and assign TConnection objects to handle those requests.
558
 */
559
void TNonblockingServer::handleEvent(int fd, short which) {
560
  // Make sure that libevent didn't fuck up the socket handles
561
  assert(fd == serverSocket_);
562
 
563
  // Server socket accepted a new connection
564
  socklen_t addrLen;
565
  struct sockaddr addr;
566
  addrLen = sizeof(addr);
567
 
568
  // Going to accept a new client socket
569
  int clientSocket;
570
 
571
  // Accept as many new clients as possible, even though libevent signaled only
572
  // one, this helps us to avoid having to go back into the libevent engine so
573
  // many times
574
  while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
575
 
576
    // Explicitly set this socket to NONBLOCK mode
577
    int flags;
578
    if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
579
        fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
580
      GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
581
      close(clientSocket);
582
      return;
583
    }
584
 
585
    // Create a new TConnection for this client socket.
586
    TConnection* clientConnection =
587
      createConnection(clientSocket, EV_READ | EV_PERSIST);
588
 
589
    // Fail fast if we could not create a TConnection object
590
    if (clientConnection == NULL) {
591
      GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
592
      close(clientSocket);
593
      return;
594
    }
595
 
596
    // Put this client connection into the proper state
597
    clientConnection->transition();
598
 
599
    // addrLen is written by the accept() call, so needs to be set before the next call.
600
    addrLen = sizeof(addr);
601
  }
602
 
603
  // Done looping accept, now we have to make sure the error is due to
604
  // blocking. Any other error is a problem
605
  if (errno != EAGAIN && errno != EWOULDBLOCK) {
606
    GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
607
  }
608
}
609
 
610
/**
611
 * Creates a socket to listen on and binds it to the local port.
612
 */
613
void TNonblockingServer::listenSocket() {
614
  int s;
615
  struct addrinfo hints, *res, *res0;
616
  int error;
617
 
618
  char port[sizeof("65536") + 1];
619
  memset(&hints, 0, sizeof(hints));
620
  hints.ai_family = PF_UNSPEC;
621
  hints.ai_socktype = SOCK_STREAM;
622
  hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
623
  sprintf(port, "%d", port_);
624
 
625
  // Wildcard address
626
  error = getaddrinfo(NULL, port, &hints, &res0);
627
  if (error) {
628
    string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
629
    GlobalOutput(errStr.c_str());
630
    return;
631
  }
632
 
633
  // Pick the ipv6 address first since ipv4 addresses can be mapped
634
  // into ipv6 space.
635
  for (res = res0; res; res = res->ai_next) {
636
    if (res->ai_family == AF_INET6 || res->ai_next == NULL)
637
      break;
638
  }
639
 
640
  // Create the server socket
641
  s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
642
  if (s == -1) {
643
    freeaddrinfo(res0);
644
    throw TException("TNonblockingServer::serve() socket() -1");
645
  }
646
 
647
  #ifdef IPV6_V6ONLY
648
  int zero = 0;
649
  if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) {
650
    GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
651
  }
652
  #endif // #ifdef IPV6_V6ONLY
653
 
654
 
655
  int one = 1;
656
 
657
  // Set reuseaddr to avoid 2MSL delay on server restart
658
  setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
659
 
660
  if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
661
    close(s);
662
    freeaddrinfo(res0);
663
    throw TException("TNonblockingServer::serve() bind");
664
  }
665
 
666
  // Done with the addr info
667
  freeaddrinfo(res0);
668
 
669
  // Set up this file descriptor for listening
670
  listenSocket(s);
671
}
672
 
673
/**
674
 * Takes a socket created by listenSocket() and sets various options on it
675
 * to prepare for use in the server.
676
 */
677
void TNonblockingServer::listenSocket(int s) {
678
  // Set socket to nonblocking mode
679
  int flags;
680
  if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
681
      fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
682
    close(s);
683
    throw TException("TNonblockingServer::serve() O_NONBLOCK");
684
  }
685
 
686
  int one = 1;
687
  struct linger ling = {0, 0};
688
 
689
  // Keepalive to ensure full result flushing
690
  setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
691
 
692
  // Turn linger off to avoid hung sockets
693
  setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
694
 
695
  // Set TCP nodelay if available, MAC OS X Hack
696
  // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
697
  #ifndef TCP_NOPUSH
698
  setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
699
  #endif
700
 
701
  if (listen(s, LISTEN_BACKLOG) == -1) {
702
    close(s);
703
    throw TException("TNonblockingServer::serve() listen");
704
  }
705
 
706
  // Cool, this socket is good to go, set it as the serverSocket_
707
  serverSocket_ = s;
708
}
709
 
710
/**
711
 * Register the core libevent events onto the proper base.
712
 */
713
void TNonblockingServer::registerEvents(event_base* base) {
714
  assert(serverSocket_ != -1);
715
  assert(!eventBase_);
716
  eventBase_ = base;
717
 
718
  // Print some libevent stats
719
  GlobalOutput.printf("libevent %s method %s",
720
          event_get_version(),
721
          event_get_method());
722
 
723
  // Register the server event
724
  event_set(&serverEvent_,
725
            serverSocket_,
726
            EV_READ | EV_PERSIST,
727
            TNonblockingServer::eventHandler,
728
            this);
729
  event_base_set(eventBase_, &serverEvent_);
730
 
731
  // Add the event and start up the server
732
  if (-1 == event_add(&serverEvent_, 0)) {
733
    throw TException("TNonblockingServer::serve(): coult not event_add");
734
  }
735
}
736
 
737
/**
738
 * Main workhorse function, starts up the server listening on a port and
739
 * loops over the libevent handler.
740
 */
741
void TNonblockingServer::serve() {
742
  // Init socket
743
  listenSocket();
744
 
745
  // Initialize libevent core
746
  registerEvents(static_cast<event_base*>(event_init()));
747
 
748
  // Run the preServe event
749
  if (eventHandler_ != NULL) {
750
    eventHandler_->preServe();
751
  }
752
 
753
  // Run libevent engine, never returns, invokes calls to eventHandler
754
  event_base_loop(eventBase_, 0);
755
}
756
 
757
}}} // apache::thrift::server