Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
# 
2
# Licensed to the Apache Software Foundation (ASF) under one
3
# or more contributor license agreements. See the NOTICE file
4
# distributed with this work for additional information
5
# regarding copyright ownership. The ASF licenses this file
6
# to you under the Apache License, Version 2.0 (the
7
# "License"); you may not use this file except in compliance
8
# with the License. You may obtain a copy of the License at
9
# 
10
#   http://www.apache.org/licenses/LICENSE-2.0
11
# 
12
# Unless required by applicable law or agreed to in writing,
13
# software distributed under the License is distributed on an
14
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
# KIND, either express or implied. See the License for the
16
# specific language governing permissions and limitations
17
# under the License.
18
# 
19
 
20
require 'logger'
21
require 'thread'
22
 
23
module Thrift
24
  # this class expects to always use a FramedTransport for reading messages
25
  class NonblockingServer < BaseServer
26
    def initialize(processor, server_transport, transport_factory=nil, protocol_factory=nil, num=20, logger=nil)
27
      super(processor, server_transport, transport_factory, protocol_factory)
28
      @num_threads = num
29
      if logger.nil?
30
        @logger = Logger.new(STDERR)
31
        @logger.level = Logger::WARN
32
      else
33
        @logger = logger
34
      end
35
      @shutdown_semaphore = Mutex.new
36
      @transport_semaphore = Mutex.new
37
    end
38
 
39
    def serve
40
      @logger.info "Starting #{self}"
41
      @server_transport.listen
42
      @io_manager = start_io_manager
43
 
44
      begin
45
        loop do
46
          break if @server_transport.closed?
47
          rd, = select([@server_transport], nil, nil, 0.1)
48
          next if rd.nil?
49
          socket = @server_transport.accept
50
          @logger.debug "Accepted socket: #{socket.inspect}"
51
          @io_manager.add_connection socket
52
        end
53
      rescue IOError => e
54
      end
55
      # we must be shutting down
56
      @logger.info "#{self} is shutting down, goodbye"
57
    ensure
58
      @transport_semaphore.synchronize do
59
        @server_transport.close
60
      end
61
      @io_manager.ensure_closed unless @io_manager.nil?
62
    end
63
 
64
    def shutdown(timeout = 0, block = true)
65
      @shutdown_semaphore.synchronize do
66
        return if @is_shutdown
67
        @is_shutdown = true
68
      end
69
      # nonblocking is intended for calling from within a Handler
70
      # but we can't change the order of operations here, so lets thread
71
      shutdown_proc = lambda do
72
        @io_manager.shutdown(timeout)
73
        @transport_semaphore.synchronize do
74
          @server_transport.close # this will break the accept loop
75
        end
76
      end
77
      if block
78
        shutdown_proc.call
79
      else
80
        Thread.new &shutdown_proc
81
      end
82
    end
83
 
84
    private
85
 
86
    def start_io_manager
87
      iom = IOManager.new(@processor, @server_transport, @transport_factory, @protocol_factory, @num_threads, @logger)
88
      iom.spawn
89
      iom
90
    end
91
 
92
    class IOManager # :nodoc:
93
      DEFAULT_BUFFER = 2**20
94
 
95
      def initialize(processor, server_transport, transport_factory, protocol_factory, num, logger)
96
        @processor = processor
97
        @server_transport = server_transport
98
        @transport_factory = transport_factory
99
        @protocol_factory = protocol_factory
100
        @num_threads = num
101
        @logger = logger
102
        @connections = []
103
        @buffers = Hash.new { |h,k| h[k] = '' }
104
        @signal_queue = Queue.new
105
        @signal_pipes = IO.pipe
106
        @signal_pipes[1].sync = true
107
        @worker_queue = Queue.new
108
        @shutdown_queue = Queue.new
109
      end
110
 
111
      def add_connection(socket)
112
        signal [:connection, socket]
113
      end
114
 
115
      def spawn
116
        @iom_thread = Thread.new do
117
          @logger.debug "Starting #{self}"
118
          run
119
        end
120
      end
121
 
122
      def shutdown(timeout = 0)
123
        @logger.debug "#{self} is shutting down workers"
124
        @worker_queue.clear
125
        @num_threads.times { @worker_queue.push [:shutdown] }
126
        signal [:shutdown, timeout]
127
        @shutdown_queue.pop
128
        @signal_pipes[0].close
129
        @signal_pipes[1].close
130
        @logger.debug "#{self} is shutting down, goodbye"
131
      end
132
 
133
      def ensure_closed
134
        kill_worker_threads if @worker_threads
135
        @iom_thread.kill
136
      end
137
 
138
      private
139
 
140
      def run
141
        spin_worker_threads
142
 
143
        loop do
144
          rd, = select([@signal_pipes[0], *@connections])
145
          if rd.delete @signal_pipes[0]
146
            break if read_signals == :shutdown
147
          end
148
          rd.each do |fd|
149
            if fd.handle.eof?
150
              remove_connection fd
151
            else
152
              read_connection fd
153
            end
154
          end
155
        end
156
        join_worker_threads(@shutdown_timeout)
157
      ensure
158
        @shutdown_queue.push :shutdown
159
      end
160
 
161
      def read_connection(fd)
162
        @buffers[fd] << fd.read(DEFAULT_BUFFER)
163
        frame = slice_frame!(@buffers[fd])
164
        if frame
165
          @logger.debug "#{self} is processing a frame"
166
          @worker_queue.push [:frame, fd, frame]
167
        end
168
      end
169
 
170
      def spin_worker_threads
171
        @logger.debug "#{self} is spinning up worker threads"
172
        @worker_threads = []
173
        @num_threads.times do
174
          @worker_threads << spin_thread
175
        end
176
      end
177
 
178
      def spin_thread
179
        Worker.new(@processor, @transport_factory, @protocol_factory, @logger, @worker_queue).spawn
180
      end
181
 
182
      def signal(msg)
183
        @signal_queue << msg
184
        @signal_pipes[1].write " "
185
      end
186
 
187
      def read_signals
188
        # clear the signal pipe
189
        # note that since read_nonblock is broken in jruby,
190
        # we can only read up to a set number of signals at once
191
        sigstr = @signal_pipes[0].readpartial(1024)
192
        # now read the signals
193
        begin
194
          sigstr.length.times do
195
            signal, obj = @signal_queue.pop(true)
196
            case signal
197
            when :connection
198
              @connections << obj
199
            when :shutdown
200
              @shutdown_timeout = obj
201
              return :shutdown
202
            end
203
          end
204
        rescue ThreadError
205
          # out of signals
206
          # note that in a perfect world this would never happen, since we're
207
          # only reading the number of signals pushed on the pipe, but given the lack
208
          # of locks, in theory we could clear the pipe/queue while a new signal is being
209
          # placed on the pipe, at which point our next read_signals would hit this error
210
        end
211
      end
212
 
213
      def remove_connection(fd)
214
        # don't explicitly close it, a thread may still be writing to it
215
        @connections.delete fd
216
        @buffers.delete fd
217
      end
218
 
219
      def join_worker_threads(shutdown_timeout)
220
        start = Time.now
221
        @worker_threads.each do |t|
222
          if shutdown_timeout > 0
223
            timeout = (start + shutdown_timeout) - Time.now
224
            break if timeout <= 0
225
            t.join(timeout)
226
          else
227
            t.join
228
          end
229
        end
230
        kill_worker_threads
231
      end
232
 
233
      def kill_worker_threads
234
        @worker_threads.each do |t|
235
          t.kill if t.status
236
        end
237
        @worker_threads.clear
238
      end
239
 
240
      def slice_frame!(buf)
241
        if buf.length >= 4
242
          size = buf.unpack('N').first
243
          if buf.length >= size + 4
244
            buf.slice!(0, size + 4)
245
          else
246
            nil
247
          end
248
        else
249
          nil
250
        end
251
      end
252
 
253
      class Worker # :nodoc:
254
        def initialize(processor, transport_factory, protocol_factory, logger, queue)
255
          @processor = processor
256
          @transport_factory = transport_factory
257
          @protocol_factory = protocol_factory
258
          @logger = logger
259
          @queue = queue
260
        end
261
 
262
        def spawn
263
          Thread.new do
264
            @logger.debug "#{self} is spawning"
265
            run
266
          end
267
        end
268
 
269
        private
270
 
271
        def run
272
          loop do
273
            cmd, *args = @queue.pop
274
            case cmd
275
            when :shutdown
276
              @logger.debug "#{self} is shutting down, goodbye"
277
              break
278
            when :frame
279
              fd, frame = args
280
              begin
281
                otrans = @transport_factory.get_transport(fd)
282
                oprot = @protocol_factory.get_protocol(otrans)
283
                membuf = MemoryBufferTransport.new(frame)
284
                itrans = @transport_factory.get_transport(membuf)
285
                iprot = @protocol_factory.get_protocol(itrans)
286
                @processor.process(iprot, oprot)
287
              rescue => e
288
                @logger.error "#{Thread.current.inspect} raised error: #{e.inspect}\n#{e.backtrace.join("\n")}"
289
              end
290
            end
291
          end
292
        end
293
      end
294
    end
295
  end
296
end