Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21
#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
 
23
#include <Thrift.h>
24
#include <server/TServer.h>
25
#include <transport/TBufferTransports.h>
26
#include <concurrency/ThreadManager.h>
27
#include <stack>
28
#include <string>
29
#include <errno.h>
30
#include <cstdlib>
31
#include <unistd.h>
32
#include <event.h>
33
 
34
namespace apache { namespace thrift { namespace server {
35
 
36
using apache::thrift::transport::TMemoryBuffer;
37
using apache::thrift::protocol::TProtocol;
38
using apache::thrift::concurrency::Runnable;
39
using apache::thrift::concurrency::ThreadManager;
40
 
41
// Forward declaration of class
42
class TConnection;
43
 
44
/**
45
 * This is a non-blocking server in C++ for high performance that operates a
46
 * single IO thread. It assumes that all incoming requests are framed with a
47
 * 4 byte length indicator and writes out responses using the same framing.
48
 *
49
 * It does not use the TServerTransport framework, but rather has socket
50
 * operations hardcoded for use with select.
51
 *
52
 */
53
class TNonblockingServer : public TServer {
54
 private:
55
 
56
  // Listen backlog
57
  static const int LISTEN_BACKLOG = 1024;
58
 
59
  // Default limit on size of idle connection pool
60
  static const size_t CONNECTION_STACK_LIMIT = 1024;
61
 
62
  // Maximum size of buffer allocated to idle connection
63
  static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
64
 
65
  // Server socket file descriptor
66
  int serverSocket_;
67
 
68
  // Port server runs on
69
  int port_;
70
 
71
  // For processing via thread pool, may be NULL
72
  boost::shared_ptr<ThreadManager> threadManager_;
73
 
74
  // Is thread pool processing?
75
  bool threadPoolProcessing_;
76
 
77
  // The event base for libevent
78
  event_base* eventBase_;
79
 
80
  // Event struct, for use with eventBase_
81
  struct event serverEvent_;
82
 
83
  // Number of TConnection object we've created
84
  size_t numTConnections_;
85
 
86
  // Limit for how many TConnection objects to cache
87
  size_t connectionStackLimit_;
88
 
89
  /**
90
   * Max read buffer size for an idle connection.  When we place an idle
91
   * TConnection into connectionStack_, we insure that its read buffer is
92
   * reduced to this size to insure that idle connections don't hog memory.
93
   */
94
  uint32_t idleBufferMemLimit_;
95
 
96
  /**
97
   * This is a stack of all the objects that have been created but that
98
   * are NOT currently in use. When we close a connection, we place it on this
99
   * stack so that the object can be reused later, rather than freeing the
100
   * memory and reallocating a new object later.
101
   */
102
  std::stack<TConnection*> connectionStack_;
103
 
104
  void handleEvent(int fd, short which);
105
 
106
 public:
107
  TNonblockingServer(boost::shared_ptr<TProcessor> processor,
108
                     int port) :
109
    TServer(processor),
110
    serverSocket_(-1),
111
    port_(port),
112
    threadPoolProcessing_(false),
113
    eventBase_(NULL),
114
    numTConnections_(0),
115
    connectionStackLimit_(CONNECTION_STACK_LIMIT),
116
    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {}
117
 
118
  TNonblockingServer(boost::shared_ptr<TProcessor> processor,
119
                     boost::shared_ptr<TProtocolFactory> protocolFactory,
120
                     int port,
121
                     boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
122
    TServer(processor),
123
    serverSocket_(-1),
124
    port_(port),
125
    threadManager_(threadManager),
126
    eventBase_(NULL),
127
    numTConnections_(0),
128
    connectionStackLimit_(CONNECTION_STACK_LIMIT),
129
    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
130
    setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
131
    setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
132
    setInputProtocolFactory(protocolFactory);
133
    setOutputProtocolFactory(protocolFactory);
134
    setThreadManager(threadManager);
135
  }
136
 
137
  TNonblockingServer(boost::shared_ptr<TProcessor> processor,
138
                     boost::shared_ptr<TTransportFactory> inputTransportFactory,
139
                     boost::shared_ptr<TTransportFactory> outputTransportFactory,
140
                     boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
141
                     boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
142
                     int port,
143
                     boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
144
    TServer(processor),
145
    serverSocket_(0),
146
    port_(port),
147
    threadManager_(threadManager),
148
    eventBase_(NULL),
149
    numTConnections_(0),
150
    connectionStackLimit_(CONNECTION_STACK_LIMIT),
151
    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
152
    setInputTransportFactory(inputTransportFactory);
153
    setOutputTransportFactory(outputTransportFactory);
154
    setInputProtocolFactory(inputProtocolFactory);
155
    setOutputProtocolFactory(outputProtocolFactory);
156
    setThreadManager(threadManager);
157
  }
158
 
159
  ~TNonblockingServer() {}
160
 
161
  void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
162
    threadManager_ = threadManager;
163
    threadPoolProcessing_ = (threadManager != NULL);
164
  }
165
 
166
  boost::shared_ptr<ThreadManager> getThreadManager() {
167
    return threadManager_;
168
  }
169
 
170
  /**
171
   * Get the maximum number of unused TConnection we will hold in reserve.
172
   *
173
   * @return the current limit on TConnection pool size.
174
   */
175
  size_t getConnectionStackLimit() const {
176
    return connectionStackLimit_;
177
  }
178
 
179
  /**
180
   * Set the maximum number of unused TConnection we will hold in reserve.
181
   *
182
   * @param sz the new limit for TConnection pool size.
183
   */
184
  void setConnectionStackLimit(size_t sz) {
185
    connectionStackLimit_ = sz;
186
  }
187
 
188
  bool isThreadPoolProcessing() const {
189
    return threadPoolProcessing_;
190
  }
191
 
192
  void addTask(boost::shared_ptr<Runnable> task) {
193
    threadManager_->add(task);
194
  }
195
 
196
  event_base* getEventBase() const {
197
    return eventBase_;
198
  }
199
 
200
  void incrementNumConnections() {
201
    ++numTConnections_;
202
  }
203
 
204
  void decrementNumConnections() {
205
    --numTConnections_;
206
  }
207
 
208
  size_t getNumConnections() {
209
    return numTConnections_;
210
  }
211
 
212
  size_t getNumIdleConnections() {
213
    return connectionStack_.size();
214
  }
215
 
216
  /**
217
   * Get the maximum limit of memory allocated to idle TConnection objects.
218
   *
219
   * @return # bytes beyond which we will shrink buffers when idle.
220
   */
221
  size_t getIdleBufferMemLimit() const {
222
    return idleBufferMemLimit_;
223
  }
224
 
225
  /**
226
   * Set the maximum limit of memory allocated to idle TConnection objects.
227
   * If a TConnection object goes idle with more than this much memory
228
   * allocated to its buffer, we shrink it to this value.
229
   *
230
   * @param limit of bytes beyond which we will shrink buffers when idle.
231
   */
232
  void setIdleBufferMemLimit(size_t limit) {
233
    idleBufferMemLimit_ = limit;
234
  }
235
 
236
  TConnection* createConnection(int socket, short flags);
237
 
238
  void returnConnection(TConnection* connection);
239
 
240
  static void eventHandler(int fd, short which, void* v) {
241
    ((TNonblockingServer*)v)->handleEvent(fd, which);
242
  }
243
 
244
  void listenSocket();
245
 
246
  void listenSocket(int fd);
247
 
248
  void registerEvents(event_base* base);
249
 
250
  void serve();
251
};
252
 
253
/**
254
 * Two states for sockets, recv and send mode
255
 */
256
enum TSocketState {
257
  SOCKET_RECV,
258
  SOCKET_SEND
259
};
260
 
261
/**
262
 * Four states for the nonblocking servr:
263
 *  1) initialize
264
 *  2) read 4 byte frame size
265
 *  3) read frame of data
266
 *  4) send back data (if any)
267
 */
268
enum TAppState {
269
  APP_INIT,
270
  APP_READ_FRAME_SIZE,
271
  APP_READ_REQUEST,
272
  APP_WAIT_TASK,
273
  APP_SEND_RESULT
274
};
275
 
276
/**
277
 * Represents a connection that is handled via libevent. This connection
278
 * essentially encapsulates a socket that has some associated libevent state.
279
 */
280
class TConnection {
281
 private:
282
 
283
  class Task;
284
 
285
  // Server handle
286
  TNonblockingServer* server_;
287
 
288
  // Socket handle
289
  int socket_;
290
 
291
  // Libevent object
292
  struct event event_;
293
 
294
  // Libevent flags
295
  short eventFlags_;
296
 
297
  // Socket mode
298
  TSocketState socketState_;
299
 
300
  // Application state
301
  TAppState appState_;
302
 
303
  // How much data needed to read
304
  uint32_t readWant_;
305
 
306
  // Where in the read buffer are we
307
  uint32_t readBufferPos_;
308
 
309
  // Read buffer
310
  uint8_t* readBuffer_;
311
 
312
  // Read buffer size
313
  uint32_t readBufferSize_;
314
 
315
  // Write buffer
316
  uint8_t* writeBuffer_;
317
 
318
  // Write buffer size
319
  uint32_t writeBufferSize_;
320
 
321
  // How far through writing are we?
322
  uint32_t writeBufferPos_;
323
 
324
  // How many times have we read since our last buffer reset?
325
  uint32_t numReadsSinceReset_;
326
 
327
  // How many times have we written since our last buffer reset?
328
  uint32_t numWritesSinceReset_;
329
 
330
  // Task handle
331
  int taskHandle_;
332
 
333
  // Task event
334
  struct event taskEvent_;
335
 
336
  // Transport to read from
337
  boost::shared_ptr<TMemoryBuffer> inputTransport_;
338
 
339
  // Transport that processor writes to
340
  boost::shared_ptr<TMemoryBuffer> outputTransport_;
341
 
342
  // extra transport generated by transport factory (e.g. BufferedRouterTransport)
343
  boost::shared_ptr<TTransport> factoryInputTransport_;
344
  boost::shared_ptr<TTransport> factoryOutputTransport_;
345
 
346
  // Protocol decoder
347
  boost::shared_ptr<TProtocol> inputProtocol_;
348
 
349
  // Protocol encoder
350
  boost::shared_ptr<TProtocol> outputProtocol_;
351
 
352
  // Go into read mode
353
  void setRead() {
354
    setFlags(EV_READ | EV_PERSIST);
355
  }
356
 
357
  // Go into write mode
358
  void setWrite() {
359
    setFlags(EV_WRITE | EV_PERSIST);
360
  }
361
 
362
  // Set socket idle
363
  void setIdle() {
364
    setFlags(0);
365
  }
366
 
367
  // Set event flags
368
  void setFlags(short eventFlags);
369
 
370
  // Libevent handlers
371
  void workSocket();
372
 
373
  // Close this client and reset
374
  void close();
375
 
376
 public:
377
 
378
  // Constructor
379
  TConnection(int socket, short eventFlags, TNonblockingServer *s) {
380
    readBuffer_ = (uint8_t*)std::malloc(1024);
381
    if (readBuffer_ == NULL) {
382
      throw new apache::thrift::TException("Out of memory.");
383
    }
384
    readBufferSize_ = 1024;
385
 
386
    numReadsSinceReset_ = 0;
387
    numWritesSinceReset_ = 0;
388
 
389
    // Allocate input and output tranpsorts
390
    // these only need to be allocated once per TConnection (they don't need to be
391
    // reallocated on init() call)
392
    inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
393
    outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
394
 
395
    init(socket, eventFlags, s);
396
    server_->incrementNumConnections();
397
  }
398
 
399
  ~TConnection() {
400
    server_->decrementNumConnections();
401
  }
402
 
403
  /**
404
   * Check read buffer against a given limit and shrink it if exceeded.
405
   *
406
   * @param limit we limit buffer size to.
407
   */
408
  void checkIdleBufferMemLimit(uint32_t limit);
409
 
410
  // Initialize
411
  void init(int socket, short eventFlags, TNonblockingServer *s);
412
 
413
  // Transition into a new state
414
  void transition();
415
 
416
  // Handler wrapper
417
  static void eventHandler(int fd, short /* which */, void* v) {
418
    assert(fd == ((TConnection*)v)->socket_);
419
    ((TConnection*)v)->workSocket();
420
  }
421
 
422
  // Handler wrapper for task block
423
  static void taskHandler(int fd, short /* which */, void* v) {
424
    assert(fd == ((TConnection*)v)->taskHandle_);
425
    if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
426
      GlobalOutput.perror("TConnection::taskHandler close handle failed, resource leak ", errno);
427
    }
428
    ((TConnection*)v)->transition();
429
  }
430
 
431
};
432
 
433
}}} // apache::thrift::server
434
 
435
#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_