Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
#include "server/TThreadedServer.h"
21
#include "transport/TTransportException.h"
22
#include "concurrency/PosixThreadFactory.h"
23
 
24
#include <string>
25
#include <iostream>
26
#include <pthread.h>
27
#include <unistd.h>
28
 
29
namespace apache { namespace thrift { namespace server {
30
 
31
using boost::shared_ptr;
32
using namespace std;
33
using namespace apache::thrift;
34
using namespace apache::thrift::protocol;
35
using namespace apache::thrift::transport;
36
using namespace apache::thrift::concurrency;
37
 
38
class TThreadedServer::Task: public Runnable {
39
 
40
public:
41
 
42
  Task(TThreadedServer& server,
43
       shared_ptr<TProcessor> processor,
44
       shared_ptr<TProtocol> input,
45
       shared_ptr<TProtocol> output) :
46
    server_(server),
47
    processor_(processor),
48
    input_(input),
49
    output_(output) {
50
  }
51
 
52
  ~Task() {}
53
 
54
  void run() {
55
    boost::shared_ptr<TServerEventHandler> eventHandler =
56
      server_.getEventHandler();
57
    if (eventHandler != NULL) {
58
      eventHandler->clientBegin(input_, output_);
59
    }
60
    try {
61
      while (processor_->process(input_, output_)) {
62
        if (!input_->getTransport()->peek()) {
63
          break;
64
        }
65
      }
66
    } catch (TTransportException& ttx) {
67
      string errStr = string("TThreadedServer client died: ") + ttx.what();
68
      GlobalOutput(errStr.c_str());
69
    } catch (TException& x) {
70
      string errStr = string("TThreadedServer exception: ") + x.what();
71
      GlobalOutput(errStr.c_str());
72
    } catch (...) {
73
      GlobalOutput("TThreadedServer uncaught exception.");
74
    }
75
    if (eventHandler != NULL) {
76
      eventHandler->clientEnd(input_, output_);
77
    }
78
 
79
    try {
80
      input_->getTransport()->close();
81
    } catch (TTransportException& ttx) {
82
      string errStr = string("TThreadedServer input close failed: ") + ttx.what();
83
      GlobalOutput(errStr.c_str());
84
    }
85
    try {
86
      output_->getTransport()->close();
87
    } catch (TTransportException& ttx) {
88
      string errStr = string("TThreadedServer output close failed: ") + ttx.what();
89
      GlobalOutput(errStr.c_str());
90
    }
91
 
92
    // Remove this task from parent bookkeeping
93
    {
94
      Synchronized s(server_.tasksMonitor_);
95
      server_.tasks_.erase(this);
96
      if (server_.tasks_.empty()) {
97
        server_.tasksMonitor_.notify();
98
      }
99
    }
100
 
101
  }
102
 
103
 private:
104
  TThreadedServer& server_;
105
  friend class TThreadedServer;
106
 
107
  shared_ptr<TProcessor> processor_;
108
  shared_ptr<TProtocol> input_;
109
  shared_ptr<TProtocol> output_;
110
};
111
 
112
 
113
TThreadedServer::TThreadedServer(shared_ptr<TProcessor> processor,
114
                                 shared_ptr<TServerTransport> serverTransport,
115
                                 shared_ptr<TTransportFactory> transportFactory,
116
                                 shared_ptr<TProtocolFactory> protocolFactory):
117
  TServer(processor, serverTransport, transportFactory, protocolFactory),
118
  stop_(false) {
119
  threadFactory_ = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
120
}
121
 
122
TThreadedServer::TThreadedServer(boost::shared_ptr<TProcessor> processor,
123
                                 boost::shared_ptr<TServerTransport> serverTransport,
124
                                 boost::shared_ptr<TTransportFactory> transportFactory,
125
                                 boost::shared_ptr<TProtocolFactory> protocolFactory,
126
                                 boost::shared_ptr<ThreadFactory> threadFactory):
127
  TServer(processor, serverTransport, transportFactory, protocolFactory),
128
  threadFactory_(threadFactory),
129
  stop_(false) {
130
}
131
 
132
TThreadedServer::~TThreadedServer() {}
133
 
134
void TThreadedServer::serve() {
135
 
136
  shared_ptr<TTransport> client;
137
  shared_ptr<TTransport> inputTransport;
138
  shared_ptr<TTransport> outputTransport;
139
  shared_ptr<TProtocol> inputProtocol;
140
  shared_ptr<TProtocol> outputProtocol;
141
 
142
  try {
143
    // Start the server listening
144
    serverTransport_->listen();
145
  } catch (TTransportException& ttx) {
146
    string errStr = string("TThreadedServer::run() listen(): ") +ttx.what();
147
    GlobalOutput(errStr.c_str());
148
    return;
149
  }
150
 
151
  // Run the preServe event
152
  if (eventHandler_ != NULL) {
153
    eventHandler_->preServe();
154
  }
155
 
156
  while (!stop_) {
157
    try {
158
      client.reset();
159
      inputTransport.reset();
160
      outputTransport.reset();
161
      inputProtocol.reset();
162
      outputProtocol.reset();
163
 
164
      // Fetch client from server
165
      client = serverTransport_->accept();
166
 
167
      // Make IO transports
168
      inputTransport = inputTransportFactory_->getTransport(client);
169
      outputTransport = outputTransportFactory_->getTransport(client);
170
      inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
171
      outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
172
 
173
      TThreadedServer::Task* task = new TThreadedServer::Task(*this,
174
                                                              processor_,
175
                                                              inputProtocol,
176
                                                              outputProtocol);
177
 
178
      // Create a task
179
      shared_ptr<Runnable> runnable =
180
        shared_ptr<Runnable>(task);
181
 
182
      // Create a thread for this task
183
      shared_ptr<Thread> thread =
184
        shared_ptr<Thread>(threadFactory_->newThread(runnable));
185
 
186
      // Insert thread into the set of threads
187
      {
188
        Synchronized s(tasksMonitor_);
189
        tasks_.insert(task);
190
      }
191
 
192
      // Start the thread!
193
      thread->start();
194
 
195
    } catch (TTransportException& ttx) {
196
      if (inputTransport != NULL) { inputTransport->close(); }
197
      if (outputTransport != NULL) { outputTransport->close(); }
198
      if (client != NULL) { client->close(); }
199
      if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
200
        string errStr = string("TThreadedServer: TServerTransport died on accept: ") + ttx.what();
201
        GlobalOutput(errStr.c_str());
202
      }
203
      continue;
204
    } catch (TException& tx) {
205
      if (inputTransport != NULL) { inputTransport->close(); }
206
      if (outputTransport != NULL) { outputTransport->close(); }
207
      if (client != NULL) { client->close(); }
208
      string errStr = string("TThreadedServer: Caught TException: ") + tx.what();
209
      GlobalOutput(errStr.c_str());
210
      continue;
211
    } catch (string s) {
212
      if (inputTransport != NULL) { inputTransport->close(); }
213
      if (outputTransport != NULL) { outputTransport->close(); }
214
      if (client != NULL) { client->close(); }
215
      string errStr = "TThreadedServer: Unknown exception: " + s;
216
      GlobalOutput(errStr.c_str());
217
      break;
218
    }
219
  }
220
 
221
  // If stopped manually, make sure to close server transport
222
  if (stop_) {
223
    try {
224
      serverTransport_->close();
225
    } catch (TException &tx) {
226
      string errStr = string("TThreadedServer: Exception shutting down: ") + tx.what();
227
      GlobalOutput(errStr.c_str());
228
    }
229
    try {
230
      Synchronized s(tasksMonitor_);
231
      while (!tasks_.empty()) {
232
        tasksMonitor_.wait();
233
      }
234
    } catch (TException &tx) {
235
      string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
236
      GlobalOutput(errStr.c_str());
237
    }
238
    stop_ = false;
239
  }
240
 
241
}
242
 
243
}}} // apache::thrift::server