Subversion Repositories SmartDukaan

Rev

Go to most recent revision | Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
#include "server/TThreadPoolServer.h"
21
#include "transport/TTransportException.h"
22
#include "concurrency/Thread.h"
23
#include "concurrency/ThreadManager.h"
24
#include <string>
25
#include <iostream>
26
 
27
namespace apache { namespace thrift { namespace server {
28
 
29
using boost::shared_ptr;
30
using namespace std;
31
using namespace apache::thrift;
32
using namespace apache::thrift::concurrency;
33
using namespace apache::thrift::protocol;;
34
using namespace apache::thrift::transport;
35
 
36
class TThreadPoolServer::Task : public Runnable {
37
 
38
public:
39
 
40
  Task(TThreadPoolServer &server,
41
       shared_ptr<TProcessor> processor,
42
       shared_ptr<TProtocol> input,
43
       shared_ptr<TProtocol> output) :
44
    server_(server),
45
    processor_(processor),
46
    input_(input),
47
    output_(output) {
48
  }
49
 
50
  ~Task() {}
51
 
52
  void run() {
53
    boost::shared_ptr<TServerEventHandler> eventHandler =
54
      server_.getEventHandler();
55
    if (eventHandler != NULL) {
56
      eventHandler->clientBegin(input_, output_);
57
    }
58
    try {
59
      while (processor_->process(input_, output_)) {
60
        if (!input_->getTransport()->peek()) {
61
          break;
62
        }
63
      }
64
    } catch (TTransportException& ttx) {
65
      // This is reasonably expected, client didn't send a full request so just
66
      // ignore him
67
      // string errStr = string("TThreadPoolServer client died: ") + ttx.what();
68
      // GlobalOutput(errStr.c_str());
69
    } catch (TException& x) {
70
      string errStr = string("TThreadPoolServer exception: ") + x.what();
71
      GlobalOutput(errStr.c_str());
72
    } catch (std::exception &x) {
73
      string errStr = string("TThreadPoolServer, std::exception: ") + x.what();
74
      GlobalOutput(errStr.c_str());
75
    }
76
 
77
    if (eventHandler != NULL) {
78
      eventHandler->clientEnd(input_, output_);
79
    }
80
 
81
    try {
82
      input_->getTransport()->close();
83
    } catch (TTransportException& ttx) {
84
      string errStr = string("TThreadPoolServer input close failed: ") + ttx.what();
85
      GlobalOutput(errStr.c_str());
86
    }
87
    try {
88
      output_->getTransport()->close();
89
    } catch (TTransportException& ttx) {
90
      string errStr = string("TThreadPoolServer output close failed: ") + ttx.what();
91
      GlobalOutput(errStr.c_str());
92
    }
93
 
94
  }
95
 
96
 private:
97
  TServer& server_;
98
  shared_ptr<TProcessor> processor_;
99
  shared_ptr<TProtocol> input_;
100
  shared_ptr<TProtocol> output_;
101
 
102
};
103
 
104
TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
105
                                     shared_ptr<TServerTransport> serverTransport,
106
                                     shared_ptr<TTransportFactory> transportFactory,
107
                                     shared_ptr<TProtocolFactory> protocolFactory,
108
                                     shared_ptr<ThreadManager> threadManager) :
109
  TServer(processor, serverTransport, transportFactory, protocolFactory),
110
  threadManager_(threadManager),
111
  stop_(false), timeout_(0) {}
112
 
113
TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
114
                                     shared_ptr<TServerTransport> serverTransport,
115
                                     shared_ptr<TTransportFactory> inputTransportFactory,
116
                                     shared_ptr<TTransportFactory> outputTransportFactory,
117
                                     shared_ptr<TProtocolFactory> inputProtocolFactory,
118
                                     shared_ptr<TProtocolFactory> outputProtocolFactory,
119
                                     shared_ptr<ThreadManager> threadManager) :
120
  TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
121
          inputProtocolFactory, outputProtocolFactory),
122
  threadManager_(threadManager),
123
  stop_(false), timeout_(0) {}
124
 
125
 
126
TThreadPoolServer::~TThreadPoolServer() {}
127
 
128
void TThreadPoolServer::serve() {
129
  shared_ptr<TTransport> client;
130
  shared_ptr<TTransport> inputTransport;
131
  shared_ptr<TTransport> outputTransport;
132
  shared_ptr<TProtocol> inputProtocol;
133
  shared_ptr<TProtocol> outputProtocol;
134
 
135
  try {
136
    // Start the server listening
137
    serverTransport_->listen();
138
  } catch (TTransportException& ttx) {
139
    string errStr = string("TThreadPoolServer::run() listen(): ") + ttx.what();
140
    GlobalOutput(errStr.c_str());
141
    return;
142
  }
143
 
144
  // Run the preServe event
145
  if (eventHandler_ != NULL) {
146
    eventHandler_->preServe();
147
  }
148
 
149
  while (!stop_) {
150
    try {
151
      client.reset();
152
      inputTransport.reset();
153
      outputTransport.reset();
154
      inputProtocol.reset();
155
      outputProtocol.reset();
156
 
157
      // Fetch client from server
158
      client = serverTransport_->accept();
159
 
160
      // Make IO transports
161
      inputTransport = inputTransportFactory_->getTransport(client);
162
      outputTransport = outputTransportFactory_->getTransport(client);
163
      inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
164
      outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
165
 
166
      // Add to threadmanager pool
167
      threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol)), timeout_);
168
 
169
    } catch (TTransportException& ttx) {
170
      if (inputTransport != NULL) { inputTransport->close(); }
171
      if (outputTransport != NULL) { outputTransport->close(); }
172
      if (client != NULL) { client->close(); }
173
      if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
174
        string errStr = string("TThreadPoolServer: TServerTransport died on accept: ") + ttx.what();
175
        GlobalOutput(errStr.c_str());
176
      }
177
      continue;
178
    } catch (TException& tx) {
179
      if (inputTransport != NULL) { inputTransport->close(); }
180
      if (outputTransport != NULL) { outputTransport->close(); }
181
      if (client != NULL) { client->close(); }
182
      string errStr = string("TThreadPoolServer: Caught TException: ") + tx.what();
183
      GlobalOutput(errStr.c_str());
184
      continue;
185
    } catch (string s) {
186
      if (inputTransport != NULL) { inputTransport->close(); }
187
      if (outputTransport != NULL) { outputTransport->close(); }
188
      if (client != NULL) { client->close(); }
189
      string errStr = "TThreadPoolServer: Unknown exception: " + s;
190
      GlobalOutput(errStr.c_str());
191
      break;
192
    }
193
  }
194
 
195
  // If stopped manually, join the existing threads
196
  if (stop_) {
197
    try {
198
      serverTransport_->close();
199
      threadManager_->join();
200
    } catch (TException &tx) {
201
      string errStr = string("TThreadPoolServer: Exception shutting down: ") + tx.what();
202
      GlobalOutput(errStr.c_str());
203
    }
204
    stop_ = false;
205
  }
206
 
207
}
208
 
209
int64_t TThreadPoolServer::getTimeout() const {
210
  return timeout_;
211
}
212
 
213
void TThreadPoolServer::setTimeout(int64_t value) {
214
  timeout_ = value;
215
}
216
 
217
}}} // apache::thrift::server