| 30 |
ashish |
1 |
/*
|
|
|
2 |
* Licensed to the Apache Software Foundation (ASF) under one
|
|
|
3 |
* or more contributor license agreements. See the NOTICE file
|
|
|
4 |
* distributed with this work for additional information
|
|
|
5 |
* regarding copyright ownership. The ASF licenses this file
|
|
|
6 |
* to you under the Apache License, Version 2.0 (the
|
|
|
7 |
* "License"); you may not use this file except in compliance
|
|
|
8 |
* with the License. You may obtain a copy of the License at
|
|
|
9 |
*
|
|
|
10 |
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
11 |
*
|
|
|
12 |
* Unless required by applicable law or agreed to in writing,
|
|
|
13 |
* software distributed under the License is distributed on an
|
|
|
14 |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
|
15 |
* KIND, either express or implied. See the License for the
|
|
|
16 |
* specific language governing permissions and limitations
|
|
|
17 |
* under the License.
|
|
|
18 |
*/
|
|
|
19 |
|
|
|
20 |
#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
|
|
|
21 |
#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
|
|
|
22 |
|
|
|
23 |
#include <Thrift.h>
|
|
|
24 |
#include <server/TServer.h>
|
|
|
25 |
#include <transport/TBufferTransports.h>
|
|
|
26 |
#include <concurrency/ThreadManager.h>
|
|
|
27 |
#include <stack>
|
|
|
28 |
#include <string>
|
|
|
29 |
#include <errno.h>
|
|
|
30 |
#include <cstdlib>
|
|
|
31 |
#include <unistd.h>
|
|
|
32 |
#include <event.h>
|
|
|
33 |
|
|
|
34 |
namespace apache { namespace thrift { namespace server {
|
|
|
35 |
|
|
|
36 |
using apache::thrift::transport::TMemoryBuffer;
|
|
|
37 |
using apache::thrift::protocol::TProtocol;
|
|
|
38 |
using apache::thrift::concurrency::Runnable;
|
|
|
39 |
using apache::thrift::concurrency::ThreadManager;
|
|
|
40 |
|
|
|
41 |
// Forward declaration of class
|
|
|
42 |
class TConnection;
|
|
|
43 |
|
|
|
44 |
/**
|
|
|
45 |
* This is a non-blocking server in C++ for high performance that operates a
|
|
|
46 |
* single IO thread. It assumes that all incoming requests are framed with a
|
|
|
47 |
* 4 byte length indicator and writes out responses using the same framing.
|
|
|
48 |
*
|
|
|
49 |
* It does not use the TServerTransport framework, but rather has socket
|
|
|
50 |
* operations hardcoded for use with select.
|
|
|
51 |
*
|
|
|
52 |
*/
|
|
|
53 |
class TNonblockingServer : public TServer {
|
|
|
54 |
private:
|
|
|
55 |
|
|
|
56 |
// Listen backlog
|
|
|
57 |
static const int LISTEN_BACKLOG = 1024;
|
|
|
58 |
|
|
|
59 |
// Default limit on size of idle connection pool
|
|
|
60 |
static const size_t CONNECTION_STACK_LIMIT = 1024;
|
|
|
61 |
|
|
|
62 |
// Maximum size of buffer allocated to idle connection
|
|
|
63 |
static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
|
|
|
64 |
|
|
|
65 |
// Server socket file descriptor
|
|
|
66 |
int serverSocket_;
|
|
|
67 |
|
|
|
68 |
// Port server runs on
|
|
|
69 |
int port_;
|
|
|
70 |
|
|
|
71 |
// For processing via thread pool, may be NULL
|
|
|
72 |
boost::shared_ptr<ThreadManager> threadManager_;
|
|
|
73 |
|
|
|
74 |
// Is thread pool processing?
|
|
|
75 |
bool threadPoolProcessing_;
|
|
|
76 |
|
|
|
77 |
// The event base for libevent
|
|
|
78 |
event_base* eventBase_;
|
|
|
79 |
|
|
|
80 |
// Event struct, for use with eventBase_
|
|
|
81 |
struct event serverEvent_;
|
|
|
82 |
|
|
|
83 |
// Number of TConnection object we've created
|
|
|
84 |
size_t numTConnections_;
|
|
|
85 |
|
|
|
86 |
// Limit for how many TConnection objects to cache
|
|
|
87 |
size_t connectionStackLimit_;
|
|
|
88 |
|
|
|
89 |
/**
|
|
|
90 |
* Max read buffer size for an idle connection. When we place an idle
|
|
|
91 |
* TConnection into connectionStack_, we insure that its read buffer is
|
|
|
92 |
* reduced to this size to insure that idle connections don't hog memory.
|
|
|
93 |
*/
|
|
|
94 |
uint32_t idleBufferMemLimit_;
|
|
|
95 |
|
|
|
96 |
/**
|
|
|
97 |
* This is a stack of all the objects that have been created but that
|
|
|
98 |
* are NOT currently in use. When we close a connection, we place it on this
|
|
|
99 |
* stack so that the object can be reused later, rather than freeing the
|
|
|
100 |
* memory and reallocating a new object later.
|
|
|
101 |
*/
|
|
|
102 |
std::stack<TConnection*> connectionStack_;
|
|
|
103 |
|
|
|
104 |
void handleEvent(int fd, short which);
|
|
|
105 |
|
|
|
106 |
public:
|
|
|
107 |
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
|
|
|
108 |
int port) :
|
|
|
109 |
TServer(processor),
|
|
|
110 |
serverSocket_(-1),
|
|
|
111 |
port_(port),
|
|
|
112 |
threadPoolProcessing_(false),
|
|
|
113 |
eventBase_(NULL),
|
|
|
114 |
numTConnections_(0),
|
|
|
115 |
connectionStackLimit_(CONNECTION_STACK_LIMIT),
|
|
|
116 |
idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {}
|
|
|
117 |
|
|
|
118 |
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
|
|
|
119 |
boost::shared_ptr<TProtocolFactory> protocolFactory,
|
|
|
120 |
int port,
|
|
|
121 |
boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
|
|
|
122 |
TServer(processor),
|
|
|
123 |
serverSocket_(-1),
|
|
|
124 |
port_(port),
|
|
|
125 |
threadManager_(threadManager),
|
|
|
126 |
eventBase_(NULL),
|
|
|
127 |
numTConnections_(0),
|
|
|
128 |
connectionStackLimit_(CONNECTION_STACK_LIMIT),
|
|
|
129 |
idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
|
|
|
130 |
setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
|
|
|
131 |
setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
|
|
|
132 |
setInputProtocolFactory(protocolFactory);
|
|
|
133 |
setOutputProtocolFactory(protocolFactory);
|
|
|
134 |
setThreadManager(threadManager);
|
|
|
135 |
}
|
|
|
136 |
|
|
|
137 |
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
|
|
|
138 |
boost::shared_ptr<TTransportFactory> inputTransportFactory,
|
|
|
139 |
boost::shared_ptr<TTransportFactory> outputTransportFactory,
|
|
|
140 |
boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
|
|
|
141 |
boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
|
|
|
142 |
int port,
|
|
|
143 |
boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
|
|
|
144 |
TServer(processor),
|
|
|
145 |
serverSocket_(0),
|
|
|
146 |
port_(port),
|
|
|
147 |
threadManager_(threadManager),
|
|
|
148 |
eventBase_(NULL),
|
|
|
149 |
numTConnections_(0),
|
|
|
150 |
connectionStackLimit_(CONNECTION_STACK_LIMIT),
|
|
|
151 |
idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
|
|
|
152 |
setInputTransportFactory(inputTransportFactory);
|
|
|
153 |
setOutputTransportFactory(outputTransportFactory);
|
|
|
154 |
setInputProtocolFactory(inputProtocolFactory);
|
|
|
155 |
setOutputProtocolFactory(outputProtocolFactory);
|
|
|
156 |
setThreadManager(threadManager);
|
|
|
157 |
}
|
|
|
158 |
|
|
|
159 |
~TNonblockingServer() {}
|
|
|
160 |
|
|
|
161 |
void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
|
|
|
162 |
threadManager_ = threadManager;
|
|
|
163 |
threadPoolProcessing_ = (threadManager != NULL);
|
|
|
164 |
}
|
|
|
165 |
|
|
|
166 |
boost::shared_ptr<ThreadManager> getThreadManager() {
|
|
|
167 |
return threadManager_;
|
|
|
168 |
}
|
|
|
169 |
|
|
|
170 |
/**
|
|
|
171 |
* Get the maximum number of unused TConnection we will hold in reserve.
|
|
|
172 |
*
|
|
|
173 |
* @return the current limit on TConnection pool size.
|
|
|
174 |
*/
|
|
|
175 |
size_t getConnectionStackLimit() const {
|
|
|
176 |
return connectionStackLimit_;
|
|
|
177 |
}
|
|
|
178 |
|
|
|
179 |
/**
|
|
|
180 |
* Set the maximum number of unused TConnection we will hold in reserve.
|
|
|
181 |
*
|
|
|
182 |
* @param sz the new limit for TConnection pool size.
|
|
|
183 |
*/
|
|
|
184 |
void setConnectionStackLimit(size_t sz) {
|
|
|
185 |
connectionStackLimit_ = sz;
|
|
|
186 |
}
|
|
|
187 |
|
|
|
188 |
bool isThreadPoolProcessing() const {
|
|
|
189 |
return threadPoolProcessing_;
|
|
|
190 |
}
|
|
|
191 |
|
|
|
192 |
void addTask(boost::shared_ptr<Runnable> task) {
|
|
|
193 |
threadManager_->add(task);
|
|
|
194 |
}
|
|
|
195 |
|
|
|
196 |
event_base* getEventBase() const {
|
|
|
197 |
return eventBase_;
|
|
|
198 |
}
|
|
|
199 |
|
|
|
200 |
void incrementNumConnections() {
|
|
|
201 |
++numTConnections_;
|
|
|
202 |
}
|
|
|
203 |
|
|
|
204 |
void decrementNumConnections() {
|
|
|
205 |
--numTConnections_;
|
|
|
206 |
}
|
|
|
207 |
|
|
|
208 |
size_t getNumConnections() {
|
|
|
209 |
return numTConnections_;
|
|
|
210 |
}
|
|
|
211 |
|
|
|
212 |
size_t getNumIdleConnections() {
|
|
|
213 |
return connectionStack_.size();
|
|
|
214 |
}
|
|
|
215 |
|
|
|
216 |
/**
|
|
|
217 |
* Get the maximum limit of memory allocated to idle TConnection objects.
|
|
|
218 |
*
|
|
|
219 |
* @return # bytes beyond which we will shrink buffers when idle.
|
|
|
220 |
*/
|
|
|
221 |
size_t getIdleBufferMemLimit() const {
|
|
|
222 |
return idleBufferMemLimit_;
|
|
|
223 |
}
|
|
|
224 |
|
|
|
225 |
/**
|
|
|
226 |
* Set the maximum limit of memory allocated to idle TConnection objects.
|
|
|
227 |
* If a TConnection object goes idle with more than this much memory
|
|
|
228 |
* allocated to its buffer, we shrink it to this value.
|
|
|
229 |
*
|
|
|
230 |
* @param limit of bytes beyond which we will shrink buffers when idle.
|
|
|
231 |
*/
|
|
|
232 |
void setIdleBufferMemLimit(size_t limit) {
|
|
|
233 |
idleBufferMemLimit_ = limit;
|
|
|
234 |
}
|
|
|
235 |
|
|
|
236 |
TConnection* createConnection(int socket, short flags);
|
|
|
237 |
|
|
|
238 |
void returnConnection(TConnection* connection);
|
|
|
239 |
|
|
|
240 |
static void eventHandler(int fd, short which, void* v) {
|
|
|
241 |
((TNonblockingServer*)v)->handleEvent(fd, which);
|
|
|
242 |
}
|
|
|
243 |
|
|
|
244 |
void listenSocket();
|
|
|
245 |
|
|
|
246 |
void listenSocket(int fd);
|
|
|
247 |
|
|
|
248 |
void registerEvents(event_base* base);
|
|
|
249 |
|
|
|
250 |
void serve();
|
|
|
251 |
};
|
|
|
252 |
|
|
|
253 |
/**
|
|
|
254 |
* Two states for sockets, recv and send mode
|
|
|
255 |
*/
|
|
|
256 |
enum TSocketState {
|
|
|
257 |
SOCKET_RECV,
|
|
|
258 |
SOCKET_SEND
|
|
|
259 |
};
|
|
|
260 |
|
|
|
261 |
/**
|
|
|
262 |
* Four states for the nonblocking servr:
|
|
|
263 |
* 1) initialize
|
|
|
264 |
* 2) read 4 byte frame size
|
|
|
265 |
* 3) read frame of data
|
|
|
266 |
* 4) send back data (if any)
|
|
|
267 |
*/
|
|
|
268 |
enum TAppState {
|
|
|
269 |
APP_INIT,
|
|
|
270 |
APP_READ_FRAME_SIZE,
|
|
|
271 |
APP_READ_REQUEST,
|
|
|
272 |
APP_WAIT_TASK,
|
|
|
273 |
APP_SEND_RESULT
|
|
|
274 |
};
|
|
|
275 |
|
|
|
276 |
/**
|
|
|
277 |
* Represents a connection that is handled via libevent. This connection
|
|
|
278 |
* essentially encapsulates a socket that has some associated libevent state.
|
|
|
279 |
*/
|
|
|
280 |
class TConnection {
|
|
|
281 |
private:
|
|
|
282 |
|
|
|
283 |
class Task;
|
|
|
284 |
|
|
|
285 |
// Server handle
|
|
|
286 |
TNonblockingServer* server_;
|
|
|
287 |
|
|
|
288 |
// Socket handle
|
|
|
289 |
int socket_;
|
|
|
290 |
|
|
|
291 |
// Libevent object
|
|
|
292 |
struct event event_;
|
|
|
293 |
|
|
|
294 |
// Libevent flags
|
|
|
295 |
short eventFlags_;
|
|
|
296 |
|
|
|
297 |
// Socket mode
|
|
|
298 |
TSocketState socketState_;
|
|
|
299 |
|
|
|
300 |
// Application state
|
|
|
301 |
TAppState appState_;
|
|
|
302 |
|
|
|
303 |
// How much data needed to read
|
|
|
304 |
uint32_t readWant_;
|
|
|
305 |
|
|
|
306 |
// Where in the read buffer are we
|
|
|
307 |
uint32_t readBufferPos_;
|
|
|
308 |
|
|
|
309 |
// Read buffer
|
|
|
310 |
uint8_t* readBuffer_;
|
|
|
311 |
|
|
|
312 |
// Read buffer size
|
|
|
313 |
uint32_t readBufferSize_;
|
|
|
314 |
|
|
|
315 |
// Write buffer
|
|
|
316 |
uint8_t* writeBuffer_;
|
|
|
317 |
|
|
|
318 |
// Write buffer size
|
|
|
319 |
uint32_t writeBufferSize_;
|
|
|
320 |
|
|
|
321 |
// How far through writing are we?
|
|
|
322 |
uint32_t writeBufferPos_;
|
|
|
323 |
|
|
|
324 |
// How many times have we read since our last buffer reset?
|
|
|
325 |
uint32_t numReadsSinceReset_;
|
|
|
326 |
|
|
|
327 |
// How many times have we written since our last buffer reset?
|
|
|
328 |
uint32_t numWritesSinceReset_;
|
|
|
329 |
|
|
|
330 |
// Task handle
|
|
|
331 |
int taskHandle_;
|
|
|
332 |
|
|
|
333 |
// Task event
|
|
|
334 |
struct event taskEvent_;
|
|
|
335 |
|
|
|
336 |
// Transport to read from
|
|
|
337 |
boost::shared_ptr<TMemoryBuffer> inputTransport_;
|
|
|
338 |
|
|
|
339 |
// Transport that processor writes to
|
|
|
340 |
boost::shared_ptr<TMemoryBuffer> outputTransport_;
|
|
|
341 |
|
|
|
342 |
// extra transport generated by transport factory (e.g. BufferedRouterTransport)
|
|
|
343 |
boost::shared_ptr<TTransport> factoryInputTransport_;
|
|
|
344 |
boost::shared_ptr<TTransport> factoryOutputTransport_;
|
|
|
345 |
|
|
|
346 |
// Protocol decoder
|
|
|
347 |
boost::shared_ptr<TProtocol> inputProtocol_;
|
|
|
348 |
|
|
|
349 |
// Protocol encoder
|
|
|
350 |
boost::shared_ptr<TProtocol> outputProtocol_;
|
|
|
351 |
|
|
|
352 |
// Go into read mode
|
|
|
353 |
void setRead() {
|
|
|
354 |
setFlags(EV_READ | EV_PERSIST);
|
|
|
355 |
}
|
|
|
356 |
|
|
|
357 |
// Go into write mode
|
|
|
358 |
void setWrite() {
|
|
|
359 |
setFlags(EV_WRITE | EV_PERSIST);
|
|
|
360 |
}
|
|
|
361 |
|
|
|
362 |
// Set socket idle
|
|
|
363 |
void setIdle() {
|
|
|
364 |
setFlags(0);
|
|
|
365 |
}
|
|
|
366 |
|
|
|
367 |
// Set event flags
|
|
|
368 |
void setFlags(short eventFlags);
|
|
|
369 |
|
|
|
370 |
// Libevent handlers
|
|
|
371 |
void workSocket();
|
|
|
372 |
|
|
|
373 |
// Close this client and reset
|
|
|
374 |
void close();
|
|
|
375 |
|
|
|
376 |
public:
|
|
|
377 |
|
|
|
378 |
// Constructor
|
|
|
379 |
TConnection(int socket, short eventFlags, TNonblockingServer *s) {
|
|
|
380 |
readBuffer_ = (uint8_t*)std::malloc(1024);
|
|
|
381 |
if (readBuffer_ == NULL) {
|
|
|
382 |
throw new apache::thrift::TException("Out of memory.");
|
|
|
383 |
}
|
|
|
384 |
readBufferSize_ = 1024;
|
|
|
385 |
|
|
|
386 |
numReadsSinceReset_ = 0;
|
|
|
387 |
numWritesSinceReset_ = 0;
|
|
|
388 |
|
|
|
389 |
// Allocate input and output tranpsorts
|
|
|
390 |
// these only need to be allocated once per TConnection (they don't need to be
|
|
|
391 |
// reallocated on init() call)
|
|
|
392 |
inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
|
|
|
393 |
outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
|
|
|
394 |
|
|
|
395 |
init(socket, eventFlags, s);
|
|
|
396 |
server_->incrementNumConnections();
|
|
|
397 |
}
|
|
|
398 |
|
|
|
399 |
~TConnection() {
|
|
|
400 |
server_->decrementNumConnections();
|
|
|
401 |
}
|
|
|
402 |
|
|
|
403 |
/**
|
|
|
404 |
* Check read buffer against a given limit and shrink it if exceeded.
|
|
|
405 |
*
|
|
|
406 |
* @param limit we limit buffer size to.
|
|
|
407 |
*/
|
|
|
408 |
void checkIdleBufferMemLimit(uint32_t limit);
|
|
|
409 |
|
|
|
410 |
// Initialize
|
|
|
411 |
void init(int socket, short eventFlags, TNonblockingServer *s);
|
|
|
412 |
|
|
|
413 |
// Transition into a new state
|
|
|
414 |
void transition();
|
|
|
415 |
|
|
|
416 |
// Handler wrapper
|
|
|
417 |
static void eventHandler(int fd, short /* which */, void* v) {
|
|
|
418 |
assert(fd == ((TConnection*)v)->socket_);
|
|
|
419 |
((TConnection*)v)->workSocket();
|
|
|
420 |
}
|
|
|
421 |
|
|
|
422 |
// Handler wrapper for task block
|
|
|
423 |
static void taskHandler(int fd, short /* which */, void* v) {
|
|
|
424 |
assert(fd == ((TConnection*)v)->taskHandle_);
|
|
|
425 |
if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
|
|
|
426 |
GlobalOutput.perror("TConnection::taskHandler close handle failed, resource leak ", errno);
|
|
|
427 |
}
|
|
|
428 |
((TConnection*)v)->transition();
|
|
|
429 |
}
|
|
|
430 |
|
|
|
431 |
};
|
|
|
432 |
|
|
|
433 |
}}} // apache::thrift::server
|
|
|
434 |
|
|
|
435 |
#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_
|