Rev 30 | Blame | Compare with Previous | Last modification | View Log | RSS feed
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing,* software distributed under the License is distributed on an* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY* KIND, either express or implied. See the License for the* specific language governing permissions and limitations* under the License.*/#include "TNonblockingServer.h"#include <concurrency/Exception.h>#include <iostream>#include <sys/socket.h>#include <netinet/in.h>#include <netinet/tcp.h>#include <netdb.h>#include <fcntl.h>#include <errno.h>#include <assert.h>#ifndef AF_LOCAL#define AF_LOCAL AF_UNIX#endifnamespace apache { namespace thrift { namespace server {using namespace apache::thrift::protocol;using namespace apache::thrift::transport;using namespace apache::thrift::concurrency;using namespace std;class TConnection::Task: public Runnable {public:Task(boost::shared_ptr<TProcessor> processor,boost::shared_ptr<TProtocol> input,boost::shared_ptr<TProtocol> output,int taskHandle) :processor_(processor),input_(input),output_(output),taskHandle_(taskHandle) {}void run() {try {while (processor_->process(input_, output_)) {if (!input_->getTransport()->peek()) {break;}}} catch (TTransportException& ttx) {cerr << "TNonblockingServer client died: " << ttx.what() << endl;} catch (TException& x) {cerr << "TNonblockingServer exception: " << x.what() << endl;} catch (...) {cerr << "TNonblockingServer uncaught exception." << endl;}// Signal completion back to the libevent thread via a socketpairint8_t b = 0;if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {GlobalOutput.perror("TNonblockingServer::Task: send ", errno);}if (-1 == ::close(taskHandle_)) {GlobalOutput.perror("TNonblockingServer::Task: close, possible resource leak ", errno);}}private:boost::shared_ptr<TProcessor> processor_;boost::shared_ptr<TProtocol> input_;boost::shared_ptr<TProtocol> output_;int taskHandle_;};void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {socket_ = socket;server_ = s;appState_ = APP_INIT;eventFlags_ = 0;readBufferPos_ = 0;readWant_ = 0;writeBuffer_ = NULL;writeBufferSize_ = 0;writeBufferPos_ = 0;socketState_ = SOCKET_RECV;appState_ = APP_INIT;taskHandle_ = -1;// Set flags, which also registers the eventsetFlags(eventFlags);// get input/transportsfactoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);// Create protocolinputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);}void TConnection::workSocket() {int flags=0, got=0, left=0, sent=0;uint32_t fetch = 0;switch (socketState_) {case SOCKET_RECV:// It is an error to be in this state if we already have all the dataassert(readBufferPos_ < readWant_);// Double the buffer size until it is big enoughif (readWant_ > readBufferSize_) {while (readWant_ > readBufferSize_) {readBufferSize_ *= 2;}readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);if (readBuffer_ == NULL) {GlobalOutput("TConnection::workSocket() realloc");close();return;}}// Read from the socketfetch = readWant_ - readBufferPos_;got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);if (got > 0) {// Move along in the bufferreadBufferPos_ += got;// Check that we did not overdo itassert(readBufferPos_ <= readWant_);// We are done reading, move onto the next stateif (readBufferPos_ == readWant_) {transition();}return;} else if (got == -1) {// Blocking errors are okay, just move onif (errno == EAGAIN || errno == EWOULDBLOCK) {return;}if (errno != ECONNRESET) {GlobalOutput.perror("TConnection::workSocket() recv -1 ", errno);}}// Whenever we get down here it means a remote disconnectclose();return;case SOCKET_SEND:// Should never have position past sizeassert(writeBufferPos_ <= writeBufferSize_);// If there is no data to send, then let us move onif (writeBufferPos_ == writeBufferSize_) {GlobalOutput("WARNING: Send state with no data to send\n");transition();return;}flags = 0;#ifdef MSG_NOSIGNAL// Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we// check for the EPIPE return condition and close the socket in that caseflags |= MSG_NOSIGNAL;#endif // ifdef MSG_NOSIGNALleft = writeBufferSize_ - writeBufferPos_;sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);if (sent <= 0) {// Blocking errors are okay, just move onif (errno == EAGAIN || errno == EWOULDBLOCK) {return;}if (errno != EPIPE) {GlobalOutput.perror("TConnection::workSocket() send -1 ", errno);}close();return;}writeBufferPos_ += sent;// Did we overdo it?assert(writeBufferPos_ <= writeBufferSize_);// We are done!if (writeBufferPos_ == writeBufferSize_) {transition();}return;default:GlobalOutput.printf("Shit Got Ill. Socket State %d", socketState_);assert(0);}}/*** This is called when the application transitions from one state into* another. This means that it has finished writing the data that it needed* to, or finished receiving the data that it needed to.*/void TConnection::transition() {int sz = 0;// Switch upon the state that we are currently in and move to a new stateswitch (appState_) {case APP_READ_REQUEST:// We are done reading the request, package the read buffer into transport// and get back some data from the dispatch function// If we've used these transport buffers enough times, reset them to avoid bloatinginputTransport_->resetBuffer(readBuffer_, readBufferPos_);++numReadsSinceReset_;if (numWritesSinceReset_ < 512) {outputTransport_->resetBuffer();} else {// reset the capacity of the output transport if we used it enough times that it might be bloatedtry {outputTransport_->resetBuffer(true);numWritesSinceReset_ = 0;} catch (TTransportException &ttx) {GlobalOutput.printf("TTransportException: TMemoryBuffer::resetBuffer() %s", ttx.what());close();return;}}// Prepend four bytes of blank space to the buffer so we can// write the frame size there later.outputTransport_->getWritePtr(4);outputTransport_->wroteBytes(4);if (server_->isThreadPoolProcessing()) {// We are setting up a Task to do this work and we will wait on itint sv[2];if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {GlobalOutput.perror("TConnection::socketpair() failed ", errno);// Now we will fall through to the APP_WAIT_TASK block with no response} else {// Create task and dispatch to the thread managerboost::shared_ptr<Runnable> task =boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),inputProtocol_,outputProtocol_,sv[1]));// The application is now waiting on the task to finishappState_ = APP_WAIT_TASK;// Create an event to be notified when the task finishesevent_set(&taskEvent_,taskHandle_ = sv[0],EV_READ,TConnection::taskHandler,this);// Attach to the baseevent_base_set(server_->getEventBase(), &taskEvent_);// Add the event and start up the serverif (-1 == event_add(&taskEvent_, 0)) {GlobalOutput("TNonblockingServer::serve(): coult not event_add");return;}try {server_->addTask(task);} catch (IllegalStateException & ise) {// The ThreadManager is not ready to handle any more tasks (it's probably shutting down).GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());close();}// Set this connection idle so that libevent doesn't process more// data on it while we're still waiting for the threadmanager to// finish this tasksetIdle();return;}} else {try {// Invoke the processorserver_->getProcessor()->process(inputProtocol_, outputProtocol_);} catch (TTransportException &ttx) {GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());close();return;} catch (TException &x) {GlobalOutput.printf("TException: Server::process() %s", x.what());close();return;} catch (...) {GlobalOutput.printf("Server::process() unknown exception");close();return;}}// Intentionally fall through here, the call to process has written into// the writeBuffer_case APP_WAIT_TASK:// We have now finished processing a task and the result has been written// into the outputTransport_, so we grab its contents and place them into// the writeBuffer_ for actual writing by the libevent thread// Get the result of the operationoutputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);// If the function call generated return data, then move into the send// state and get going// 4 bytes were reserved for frame sizeif (writeBufferSize_ > 4) {// Move into write statewriteBufferPos_ = 0;socketState_ = SOCKET_SEND;// Put the frame size into the write bufferint32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);memcpy(writeBuffer_, &frameSize, 4);// Socket into write modeappState_ = APP_SEND_RESULT;setWrite();// Try to work the socket immediately// workSocket();return;}// In this case, the request was oneway and we should fall through// right back into the read frame header stategoto LABEL_APP_INIT;case APP_SEND_RESULT:++numWritesSinceReset_;// N.B.: We also intentionally fall through here into the INIT state!LABEL_APP_INIT:case APP_INIT:// reset the input buffer if we used it enough times that it might be bloatedif (numReadsSinceReset_ > 512){void * new_buffer = std::realloc(readBuffer_, 1024);if (new_buffer == NULL) {GlobalOutput("TConnection::transition() realloc");close();return;}readBuffer_ = (uint8_t*) new_buffer;readBufferSize_ = 1024;numReadsSinceReset_ = 0;}// Clear write buffer variableswriteBuffer_ = NULL;writeBufferPos_ = 0;writeBufferSize_ = 0;// Set up read buffer for getting 4 bytesreadBufferPos_ = 0;readWant_ = 4;// Into read4 state we gosocketState_ = SOCKET_RECV;appState_ = APP_READ_FRAME_SIZE;// Register read eventsetRead();// Try to work the socket right away// workSocket();return;case APP_READ_FRAME_SIZE:// We just read the request length, deserialize itsz = *(int32_t*)readBuffer_;sz = (int32_t)ntohl(sz);if (sz <= 0) {GlobalOutput.printf("TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);close();return;}// Reset the read bufferreadWant_ = (uint32_t)sz;readBufferPos_= 0;// Move into read request stateappState_ = APP_READ_REQUEST;// Work the socket right away// workSocket();return;default:GlobalOutput.printf("Totally Fucked. Application State %d", appState_);assert(0);}}void TConnection::setFlags(short eventFlags) {// Catch the do nothing caseif (eventFlags_ == eventFlags) {return;}// Delete a previously existing eventif (eventFlags_ != 0) {if (event_del(&event_) == -1) {GlobalOutput("TConnection::setFlags event_del");return;}}// Update in memory structureeventFlags_ = eventFlags;// Do not call event_set if there are no flagsif (!eventFlags_) {return;}/*** event_set:** Prepares the event structure &event to be used in future calls to* event_add() and event_del(). The event will be prepared to call the* eventHandler using the 'sock' file descriptor to monitor events.** The events can be either EV_READ, EV_WRITE, or both, indicating* that an application can read or write from the file respectively without* blocking.** The eventHandler will be called with the file descriptor that triggered* the event and the type of event which will be one of: EV_TIMEOUT,* EV_SIGNAL, EV_READ, EV_WRITE.** The additional flag EV_PERSIST makes an event_add() persistent until* event_del() has been called.** Once initialized, the &event struct can be used repeatedly with* event_add() and event_del() and does not need to be reinitialized unless* the eventHandler and/or the argument to it are to be changed. However,* when an ev structure has been added to libevent using event_add() the* structure must persist until the event occurs (assuming EV_PERSIST* is not set) or is removed using event_del(). You may not reuse the same* ev structure for multiple monitored descriptors; each descriptor needs* its own ev.*/event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);event_base_set(server_->getEventBase(), &event_);// Add the eventif (event_add(&event_, 0) == -1) {GlobalOutput("TConnection::setFlags(): could not event_add");}}/*** Closes a connection*/void TConnection::close() {// Delete the registered libeventif (event_del(&event_) == -1) {GlobalOutput("TConnection::close() event_del");}// Close the socketif (socket_ > 0) {::close(socket_);}socket_ = 0;// close any factory produced transportsfactoryInputTransport_->close();factoryOutputTransport_->close();// Give this object back to the server that owns itserver_->returnConnection(this);}void TConnection::checkIdleBufferMemLimit(uint32_t limit) {if (readBufferSize_ > limit) {readBufferSize_ = limit;readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);if (readBuffer_ == NULL) {GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");close();}}}/*** Creates a new connection either by reusing an object off the stack or* by allocating a new one entirely*/TConnection* TNonblockingServer::createConnection(int socket, short flags) {// Check the stackif (connectionStack_.empty()) {return new TConnection(socket, flags, this);} else {TConnection* result = connectionStack_.top();connectionStack_.pop();result->init(socket, flags, this);return result;}}/*** Returns a connection to the stack*/void TNonblockingServer::returnConnection(TConnection* connection) {if (connectionStackLimit_ &&(connectionStack_.size() >= connectionStackLimit_)) {delete connection;} else {connection->checkIdleBufferMemLimit(idleBufferMemLimit_);connectionStack_.push(connection);}}/*** Server socket had something happen. We accept all waiting client* connections on fd and assign TConnection objects to handle those requests.*/void TNonblockingServer::handleEvent(int fd, short which) {// Make sure that libevent didn't fuck up the socket handlesassert(fd == serverSocket_);// Server socket accepted a new connectionsocklen_t addrLen;struct sockaddr addr;addrLen = sizeof(addr);// Going to accept a new client socketint clientSocket;// Accept as many new clients as possible, even though libevent signaled only// one, this helps us to avoid having to go back into the libevent engine so// many timeswhile ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {// Explicitly set this socket to NONBLOCK modeint flags;if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);close(clientSocket);return;}// Create a new TConnection for this client socket.TConnection* clientConnection =createConnection(clientSocket, EV_READ | EV_PERSIST);// Fail fast if we could not create a TConnection objectif (clientConnection == NULL) {GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");close(clientSocket);return;}// Put this client connection into the proper stateclientConnection->transition();// addrLen is written by the accept() call, so needs to be set before the next call.addrLen = sizeof(addr);}// Done looping accept, now we have to make sure the error is due to// blocking. Any other error is a problemif (errno != EAGAIN && errno != EWOULDBLOCK) {GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);}}/*** Creates a socket to listen on and binds it to the local port.*/void TNonblockingServer::listenSocket() {int s;struct addrinfo hints, *res, *res0;int error;char port[sizeof("65536") + 1];memset(&hints, 0, sizeof(hints));hints.ai_family = PF_UNSPEC;hints.ai_socktype = SOCK_STREAM;hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;sprintf(port, "%d", port_);// Wildcard addresserror = getaddrinfo(NULL, port, &hints, &res0);if (error) {string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));GlobalOutput(errStr.c_str());return;}// Pick the ipv6 address first since ipv4 addresses can be mapped// into ipv6 space.for (res = res0; res; res = res->ai_next) {if (res->ai_family == AF_INET6 || res->ai_next == NULL)break;}// Create the server sockets = socket(res->ai_family, res->ai_socktype, res->ai_protocol);if (s == -1) {freeaddrinfo(res0);throw TException("TNonblockingServer::serve() socket() -1");}#ifdef IPV6_V6ONLYint zero = 0;if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) {GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");}#endif // #ifdef IPV6_V6ONLYint one = 1;// Set reuseaddr to avoid 2MSL delay on server restartsetsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {close(s);freeaddrinfo(res0);throw TException("TNonblockingServer::serve() bind");}// Done with the addr infofreeaddrinfo(res0);// Set up this file descriptor for listeninglistenSocket(s);}/*** Takes a socket created by listenSocket() and sets various options on it* to prepare for use in the server.*/void TNonblockingServer::listenSocket(int s) {// Set socket to nonblocking modeint flags;if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {close(s);throw TException("TNonblockingServer::serve() O_NONBLOCK");}int one = 1;struct linger ling = {0, 0};// Keepalive to ensure full result flushingsetsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));// Turn linger off to avoid hung socketssetsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));// Set TCP nodelay if available, MAC OS X Hack// See http://lists.danga.com/pipermail/memcached/2005-March/001240.html#ifndef TCP_NOPUSHsetsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));#endifif (listen(s, LISTEN_BACKLOG) == -1) {close(s);throw TException("TNonblockingServer::serve() listen");}// Cool, this socket is good to go, set it as the serverSocket_serverSocket_ = s;}/*** Register the core libevent events onto the proper base.*/void TNonblockingServer::registerEvents(event_base* base) {assert(serverSocket_ != -1);assert(!eventBase_);eventBase_ = base;// Print some libevent statsGlobalOutput.printf("libevent %s method %s",event_get_version(),event_get_method());// Register the server eventevent_set(&serverEvent_,serverSocket_,EV_READ | EV_PERSIST,TNonblockingServer::eventHandler,this);event_base_set(eventBase_, &serverEvent_);// Add the event and start up the serverif (-1 == event_add(&serverEvent_, 0)) {throw TException("TNonblockingServer::serve(): coult not event_add");}}/*** Main workhorse function, starts up the server listening on a port and* loops over the libevent handler.*/void TNonblockingServer::serve() {// Init socketlistenSocket();// Initialize libevent coreregisterEvents(static_cast<event_base*>(event_init()));// Run the preServe eventif (eventHandler_ != NULL) {eventHandler_->preServe();}// Run libevent engine, never returns, invokes calls to eventHandlerevent_base_loop(eventBase_, 0);}}}} // apache::thrift::server