Subversion Repositories SmartDukaan

Rev

Go to most recent revision | Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
# 
2
# Licensed to the Apache Software Foundation (ASF) under one
3
# or more contributor license agreements. See the NOTICE file
4
# distributed with this work for additional information
5
# regarding copyright ownership. The ASF licenses this file
6
# to you under the Apache License, Version 2.0 (the
7
# "License"); you may not use this file except in compliance
8
# with the License. You may obtain a copy of the License at
9
# 
10
#   http://www.apache.org/licenses/LICENSE-2.0
11
# 
12
# Unless required by applicable law or agreed to in writing,
13
# software distributed under the License is distributed on an
14
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
# KIND, either express or implied. See the License for the
16
# specific language governing permissions and limitations
17
# under the License.
18
# 
19
 
20
require 'thread'
21
 
22
module Thrift
23
  class ThreadPoolServer < BaseServer
24
    def initialize(processor, server_transport, transport_factory=nil, protocol_factory=nil, num=20)
25
      super(processor, server_transport, transport_factory, protocol_factory)
26
      @thread_q = SizedQueue.new(num)
27
      @exception_q = Queue.new
28
      @running = false
29
    end
30
 
31
    ## exceptions that happen in worker threads will be relayed here and
32
    ## must be caught. 'retry' can be used to continue. (threads will
33
    ## continue to run while the exception is being handled.)
34
    def rescuable_serve
35
      Thread.new { serve } unless @running
36
      @running = true
37
      raise @exception_q.pop
38
    end
39
 
40
    ## exceptions that happen in worker threads simply cause that thread
41
    ## to die and another to be spawned in its place.
42
    def serve
43
      @server_transport.listen
44
 
45
      begin
46
        loop do
47
          @thread_q.push(:token)
48
          Thread.new do
49
            begin
50
              loop do
51
                client = @server_transport.accept
52
                trans = @transport_factory.get_transport(client)
53
                prot = @protocol_factory.get_protocol(trans)
54
                begin
55
                  loop do
56
                    @processor.process(prot, prot)
57
                  end
58
                rescue Thrift::TransportException, Thrift::ProtocolException => e
59
                ensure
60
                  trans.close
61
                end
62
              end
63
            rescue => e
64
              @exception_q.push(e)
65
            ensure
66
              @thread_q.pop # thread died!
67
            end
68
          end
69
        end
70
      ensure
71
        @server_transport.close
72
      end
73
    end
74
  end
75
end