Subversion Repositories SmartDukaan

Rev

Go to most recent revision | Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
 
21
package org.apache.thrift.server;
22
 
23
import java.util.concurrent.ExecutorService;
24
import java.util.concurrent.LinkedBlockingQueue;
25
import java.util.concurrent.ThreadPoolExecutor;
26
import java.util.concurrent.TimeUnit;
27
 
28
import org.apache.thrift.TProcessor;
29
import org.apache.thrift.TProcessorFactory;
30
import org.apache.thrift.protocol.TBinaryProtocol;
31
import org.apache.thrift.protocol.TProtocolFactory;
32
import org.apache.thrift.transport.TFramedTransport;
33
import org.apache.thrift.transport.TNonblockingServerTransport;
34
 
35
/**
36
 * An extension of the TNonblockingServer to a Half-Sync/Half-Async server.
37
 * Like TNonblockingServer, it relies on the use of TFramedTransport.
38
 */
39
public class THsHaServer extends TNonblockingServer {
40
 
41
  // This wraps all the functionality of queueing and thread pool management
42
  // for the passing of Invocations from the Selector to workers.
43
  private ExecutorService invoker;
44
 
45
  protected final int MIN_WORKER_THREADS;
46
  protected final int MAX_WORKER_THREADS;
47
  protected final int STOP_TIMEOUT_VAL;
48
  protected final TimeUnit STOP_TIMEOUT_UNIT;
49
 
50
  /**
51
   * Create server with given processor, and server transport. Default server
52
   * options, TBinaryProtocol for the protocol, and TFramedTransport.Factory on
53
   * both input and output transports. A TProcessorFactory will be created that
54
   * always returns the specified processor.
55
   */
56
  public THsHaServer( TProcessor processor,
57
                      TNonblockingServerTransport serverTransport) {
58
    this(processor, serverTransport, new Options());
59
  }
60
 
61
  /**
62
   * Create server with given processor, server transport, and server options
63
   * using TBinaryProtocol for the protocol, and TFramedTransport.Factory on
64
   * both input and output transports. A TProcessorFactory will be created that
65
   * always returns the specified processor.
66
   */
67
  public THsHaServer( TProcessor processor,
68
                      TNonblockingServerTransport serverTransport,
69
                      Options options) {
70
    this(new TProcessorFactory(processor), serverTransport, options);
71
  }
72
 
73
  /**
74
   * Create server with specified processor factory and server transport. Uses
75
   * default options. TBinaryProtocol is assumed. TFramedTransport.Factory is
76
   * used on both input and output transports.
77
   */
78
  public THsHaServer( TProcessorFactory processorFactory,
79
                      TNonblockingServerTransport serverTransport) {
80
    this(processorFactory, serverTransport, new Options());
81
  }
82
 
83
  /**
84
   * Create server with specified processor factory, server transport, and server
85
   * options. TBinaryProtocol is assumed. TFramedTransport.Factory is used on
86
   * both input and output transports.
87
   */
88
  public THsHaServer( TProcessorFactory processorFactory,
89
                      TNonblockingServerTransport serverTransport,
90
                      Options options) {
91
    this(processorFactory, serverTransport, new TFramedTransport.Factory(),
92
      new TBinaryProtocol.Factory(), options);
93
  }
94
 
95
  /**
96
   * Server with specified processor, server transport, and in/out protocol
97
   * factory. Defaults will be used for in/out transport factory and server
98
   * options.
99
   */
100
  public THsHaServer( TProcessor processor,
101
                      TNonblockingServerTransport serverTransport,
102
                      TProtocolFactory protocolFactory) {
103
    this(processor, serverTransport, protocolFactory, new Options());
104
  }
105
 
106
  /**
107
   * Server with specified processor, server transport, and in/out protocol
108
   * factory. Defaults will be used for in/out transport factory and server
109
   * options.
110
   */
111
  public THsHaServer( TProcessor processor,
112
                      TNonblockingServerTransport serverTransport,
113
                      TProtocolFactory protocolFactory,
114
                      Options options) {
115
    this(processor, serverTransport, new TFramedTransport.Factory(),
116
      protocolFactory);
117
  }
118
 
119
  /**
120
   * Create server with specified processor, server transport, in/out
121
   * transport factory, in/out protocol factory, and default server options. A
122
   * processor factory will be created that always returns the specified
123
   * processor.
124
   */
125
  public THsHaServer( TProcessor processor,
126
                      TNonblockingServerTransport serverTransport,
127
                      TFramedTransport.Factory transportFactory,
128
                      TProtocolFactory protocolFactory) {
129
    this(new TProcessorFactory(processor), serverTransport,
130
      transportFactory, protocolFactory);
131
  }
132
 
133
  /**
134
   * Create server with specified processor factory, server transport, in/out
135
   * transport factory, in/out protocol factory, and default server options.
136
   */
137
  public THsHaServer( TProcessorFactory processorFactory,
138
                      TNonblockingServerTransport serverTransport,
139
                      TFramedTransport.Factory transportFactory,
140
                      TProtocolFactory protocolFactory) {
141
    this(processorFactory, serverTransport,
142
      transportFactory, transportFactory,
143
      protocolFactory, protocolFactory, new Options());
144
  }
145
 
146
  /**
147
   * Create server with specified processor factory, server transport, in/out
148
   * transport factory, in/out protocol factory, and server options.
149
   */
150
  public THsHaServer( TProcessorFactory processorFactory,
151
                      TNonblockingServerTransport serverTransport,
152
                      TFramedTransport.Factory transportFactory,
153
                      TProtocolFactory protocolFactory,
154
                      Options options) {
155
    this(processorFactory, serverTransport,
156
      transportFactory, transportFactory,
157
      protocolFactory, protocolFactory,
158
      options);
159
  }
160
 
161
  /**
162
   * Create server with everything specified, except use default server options.
163
   */
164
  public THsHaServer( TProcessor processor,
165
                      TNonblockingServerTransport serverTransport,
166
                      TFramedTransport.Factory inputTransportFactory,
167
                      TFramedTransport.Factory outputTransportFactory,
168
                      TProtocolFactory inputProtocolFactory,
169
                      TProtocolFactory outputProtocolFactory) {
170
    this(new TProcessorFactory(processor), serverTransport,
171
      inputTransportFactory, outputTransportFactory,
172
      inputProtocolFactory, outputProtocolFactory);
173
  }
174
 
175
  /**
176
   * Create server with everything specified, except use default server options.
177
   */
178
  public THsHaServer( TProcessorFactory processorFactory,
179
                      TNonblockingServerTransport serverTransport,
180
                      TFramedTransport.Factory inputTransportFactory,
181
                      TFramedTransport.Factory outputTransportFactory,
182
                      TProtocolFactory inputProtocolFactory,
183
                      TProtocolFactory outputProtocolFactory)
184
  {
185
    this(processorFactory, serverTransport,
186
      inputTransportFactory, outputTransportFactory,
187
      inputProtocolFactory, outputProtocolFactory, new Options());
188
  }
189
 
190
  /**
191
   * Create server with every option fully specified.
192
   */
193
  public THsHaServer( TProcessorFactory processorFactory,
194
                      TNonblockingServerTransport serverTransport,
195
                      TFramedTransport.Factory inputTransportFactory,
196
                      TFramedTransport.Factory outputTransportFactory,
197
                      TProtocolFactory inputProtocolFactory,
198
                      TProtocolFactory outputProtocolFactory,
199
                      Options options)
200
  {
201
    super(processorFactory, serverTransport,
202
      inputTransportFactory, outputTransportFactory,
203
      inputProtocolFactory, outputProtocolFactory,
204
      options);
205
 
206
    MIN_WORKER_THREADS = options.minWorkerThreads;
207
    MAX_WORKER_THREADS = options.maxWorkerThreads;
208
    STOP_TIMEOUT_VAL = options.stopTimeoutVal;
209
    STOP_TIMEOUT_UNIT = options.stopTimeoutUnit;
210
  }
211
 
212
  /** @inheritDoc */
213
  @Override
214
  public void serve() {
215
    if (!startInvokerPool()) {
216
      return;
217
    }
218
 
219
    // start listening, or exit
220
    if (!startListening()) {
221
      return;
222
    }
223
 
224
    // start the selector, or exit
225
    if (!startSelectorThread()) {
226
      return;
227
    }
228
 
229
    // this will block while we serve
230
    joinSelector();
231
 
232
    gracefullyShutdownInvokerPool();
233
 
234
    // do a little cleanup
235
    stopListening();
236
 
237
    // ungracefully shut down the invoker pool?
238
  }
239
 
240
  protected boolean startInvokerPool() {
241
    // start the invoker pool
242
    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
243
    invoker = new ThreadPoolExecutor(MIN_WORKER_THREADS, MAX_WORKER_THREADS,
244
      STOP_TIMEOUT_VAL, STOP_TIMEOUT_UNIT, queue);
245
 
246
    return true;
247
  }
248
 
249
  protected void gracefullyShutdownInvokerPool() {
250
    // try to gracefully shut down the executor service
251
    invoker.shutdown();
252
 
253
    // Loop until awaitTermination finally does return without a interrupted
254
    // exception. If we don't do this, then we'll shut down prematurely. We want
255
    // to let the executorService clear it's task queue, closing client sockets
256
    // appropriately.
257
    long timeoutMS = 10000;
258
    long now = System.currentTimeMillis();
259
    while (timeoutMS >= 0) {
260
      try {
261
        invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
262
        break;
263
      } catch (InterruptedException ix) {
264
        long newnow = System.currentTimeMillis();
265
        timeoutMS -= (newnow - now);
266
        now = newnow;
267
      }
268
    }
269
  }
270
 
271
  /**
272
   * We override the standard invoke method here to queue the invocation for
273
   * invoker service instead of immediately invoking. The thread pool takes care of the rest.
274
   */
275
  @Override
276
  protected void requestInvoke(FrameBuffer frameBuffer) {
277
    invoker.execute(new Invocation(frameBuffer));
278
  }
279
 
280
  /**
281
   * An Invocation represents a method call that is prepared to execute, given
282
   * an idle worker thread. It contains the input and output protocols the
283
   * thread's processor should use to perform the usual Thrift invocation.
284
   */
285
  private class Invocation implements Runnable {
286
 
287
    private final FrameBuffer frameBuffer;
288
 
289
    public Invocation(final FrameBuffer frameBuffer) {
290
      this.frameBuffer = frameBuffer;
291
    }
292
 
293
    public void run() {
294
      frameBuffer.invoke();
295
    }
296
  }
297
 
298
  public static class Options extends TNonblockingServer.Options {
299
    public int minWorkerThreads = 5;
300
    public int maxWorkerThreads = Integer.MAX_VALUE;
301
    public int stopTimeoutVal = 60;
302
    public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
303
  }
304
}