Subversion Repositories SmartDukaan

Rev

Rev 30 | Blame | Compare with Previous | Last modification | View Log | RSS feed

# 
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# 
#   http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# 

require 'logger'
require 'thread'

module Thrift
  # this class expects to always use a FramedTransport for reading messages
  class NonblockingServer < BaseServer
    def initialize(processor, server_transport, transport_factory=nil, protocol_factory=nil, num=20, logger=nil)
      super(processor, server_transport, transport_factory, protocol_factory)
      @num_threads = num
      if logger.nil?
        @logger = Logger.new(STDERR)
        @logger.level = Logger::WARN
      else
        @logger = logger
      end
      @shutdown_semaphore = Mutex.new
      @transport_semaphore = Mutex.new
    end

    def serve
      @logger.info "Starting #{self}"
      @server_transport.listen
      @io_manager = start_io_manager

      begin
        loop do
          break if @server_transport.closed?
          rd, = select([@server_transport], nil, nil, 0.1)
          next if rd.nil?
          socket = @server_transport.accept
          @logger.debug "Accepted socket: #{socket.inspect}"
          @io_manager.add_connection socket
        end
      rescue IOError => e
      end
      # we must be shutting down
      @logger.info "#{self} is shutting down, goodbye"
    ensure
      @transport_semaphore.synchronize do
        @server_transport.close
      end
      @io_manager.ensure_closed unless @io_manager.nil?
    end

    def shutdown(timeout = 0, block = true)
      @shutdown_semaphore.synchronize do
        return if @is_shutdown
        @is_shutdown = true
      end
      # nonblocking is intended for calling from within a Handler
      # but we can't change the order of operations here, so lets thread
      shutdown_proc = lambda do
        @io_manager.shutdown(timeout)
        @transport_semaphore.synchronize do
          @server_transport.close # this will break the accept loop
        end
      end
      if block
        shutdown_proc.call
      else
        Thread.new &shutdown_proc
      end
    end

    private

    def start_io_manager
      iom = IOManager.new(@processor, @server_transport, @transport_factory, @protocol_factory, @num_threads, @logger)
      iom.spawn
      iom
    end

    class IOManager # :nodoc:
      DEFAULT_BUFFER = 2**20
      
      def initialize(processor, server_transport, transport_factory, protocol_factory, num, logger)
        @processor = processor
        @server_transport = server_transport
        @transport_factory = transport_factory
        @protocol_factory = protocol_factory
        @num_threads = num
        @logger = logger
        @connections = []
        @buffers = Hash.new { |h,k| h[k] = '' }
        @signal_queue = Queue.new
        @signal_pipes = IO.pipe
        @signal_pipes[1].sync = true
        @worker_queue = Queue.new
        @shutdown_queue = Queue.new
      end

      def add_connection(socket)
        signal [:connection, socket]
      end

      def spawn
        @iom_thread = Thread.new do
          @logger.debug "Starting #{self}"
          run
        end
      end

      def shutdown(timeout = 0)
        @logger.debug "#{self} is shutting down workers"
        @worker_queue.clear
        @num_threads.times { @worker_queue.push [:shutdown] }
        signal [:shutdown, timeout]
        @shutdown_queue.pop
        @signal_pipes[0].close
        @signal_pipes[1].close
        @logger.debug "#{self} is shutting down, goodbye"
      end

      def ensure_closed
        kill_worker_threads if @worker_threads
        @iom_thread.kill
      end

      private
      
      def run
        spin_worker_threads

        loop do
          rd, = select([@signal_pipes[0], *@connections])
          if rd.delete @signal_pipes[0]
            break if read_signals == :shutdown
          end
          rd.each do |fd|
            if fd.handle.eof?
              remove_connection fd
            else
              read_connection fd
            end
          end
        end
        join_worker_threads(@shutdown_timeout)
      ensure
        @shutdown_queue.push :shutdown
      end

      def read_connection(fd)
        @buffers[fd] << fd.read(DEFAULT_BUFFER)
        frame = slice_frame!(@buffers[fd])
        if frame
          @logger.debug "#{self} is processing a frame"
          @worker_queue.push [:frame, fd, frame]
        end
      end

      def spin_worker_threads
        @logger.debug "#{self} is spinning up worker threads"
        @worker_threads = []
        @num_threads.times do
          @worker_threads << spin_thread
        end
      end

      def spin_thread
        Worker.new(@processor, @transport_factory, @protocol_factory, @logger, @worker_queue).spawn
      end

      def signal(msg)
        @signal_queue << msg
        @signal_pipes[1].write " "
      end

      def read_signals
        # clear the signal pipe
        # note that since read_nonblock is broken in jruby,
        # we can only read up to a set number of signals at once
        sigstr = @signal_pipes[0].readpartial(1024)
        # now read the signals
        begin
          sigstr.length.times do
            signal, obj = @signal_queue.pop(true)
            case signal
            when :connection
              @connections << obj
            when :shutdown
              @shutdown_timeout = obj
              return :shutdown
            end
          end
        rescue ThreadError
          # out of signals
          # note that in a perfect world this would never happen, since we're
          # only reading the number of signals pushed on the pipe, but given the lack
          # of locks, in theory we could clear the pipe/queue while a new signal is being
          # placed on the pipe, at which point our next read_signals would hit this error
        end
      end

      def remove_connection(fd)
        # don't explicitly close it, a thread may still be writing to it
        @connections.delete fd
        @buffers.delete fd
      end

      def join_worker_threads(shutdown_timeout)
        start = Time.now
        @worker_threads.each do |t|
          if shutdown_timeout > 0
            timeout = (start + shutdown_timeout) - Time.now
            break if timeout <= 0
            t.join(timeout)
          else
            t.join
          end
        end
        kill_worker_threads
      end

      def kill_worker_threads
        @worker_threads.each do |t|
          t.kill if t.status
        end
        @worker_threads.clear
      end

      def slice_frame!(buf)
        if buf.length >= 4
          size = buf.unpack('N').first
          if buf.length >= size + 4
            buf.slice!(0, size + 4)
          else
            nil
          end
        else
          nil
        end
      end

      class Worker # :nodoc:
        def initialize(processor, transport_factory, protocol_factory, logger, queue)
          @processor = processor
          @transport_factory = transport_factory
          @protocol_factory = protocol_factory
          @logger = logger
          @queue = queue
        end

        def spawn
          Thread.new do
            @logger.debug "#{self} is spawning"
            run
          end
        end

        private

        def run
          loop do
            cmd, *args = @queue.pop
            case cmd
            when :shutdown
              @logger.debug "#{self} is shutting down, goodbye"
              break
            when :frame
              fd, frame = args
              begin
                otrans = @transport_factory.get_transport(fd)
                oprot = @protocol_factory.get_protocol(otrans)
                membuf = MemoryBufferTransport.new(frame)
                itrans = @transport_factory.get_transport(membuf)
                iprot = @protocol_factory.get_protocol(itrans)
                @processor.process(iprot, oprot)
              rescue => e
                @logger.error "#{Thread.current.inspect} raised error: #{e.inspect}\n#{e.backtrace.join("\n")}"
              end
            end
          end
        end
      end
    end
  end
end