Rev 30 | Blame | Compare with Previous | Last modification | View Log | RSS feed
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.#require 'logger'require 'thread'module Thrift# this class expects to always use a FramedTransport for reading messagesclass NonblockingServer < BaseServerdef initialize(processor, server_transport, transport_factory=nil, protocol_factory=nil, num=20, logger=nil)super(processor, server_transport, transport_factory, protocol_factory)@num_threads = numif logger.nil?@logger = Logger.new(STDERR)@logger.level = Logger::WARNelse@logger = loggerend@shutdown_semaphore = Mutex.new@transport_semaphore = Mutex.newenddef serve@logger.info "Starting #{self}"@server_transport.listen@io_manager = start_io_managerbeginloop dobreak if @server_transport.closed?rd, = select([@server_transport], nil, nil, 0.1)next if rd.nil?socket = @server_transport.accept@logger.debug "Accepted socket: #{socket.inspect}"@io_manager.add_connection socketendrescue IOError => eend# we must be shutting down@logger.info "#{self} is shutting down, goodbye"ensure@transport_semaphore.synchronize do@server_transport.closeend@io_manager.ensure_closed unless @io_manager.nil?enddef shutdown(timeout = 0, block = true)@shutdown_semaphore.synchronize doreturn if @is_shutdown@is_shutdown = trueend# nonblocking is intended for calling from within a Handler# but we can't change the order of operations here, so lets threadshutdown_proc = lambda do@io_manager.shutdown(timeout)@transport_semaphore.synchronize do@server_transport.close # this will break the accept loopendendif blockshutdown_proc.callelseThread.new &shutdown_procendendprivatedef start_io_manageriom = IOManager.new(@processor, @server_transport, @transport_factory, @protocol_factory, @num_threads, @logger)iom.spawniomendclass IOManager # :nodoc:DEFAULT_BUFFER = 2**20def initialize(processor, server_transport, transport_factory, protocol_factory, num, logger)@processor = processor@server_transport = server_transport@transport_factory = transport_factory@protocol_factory = protocol_factory@num_threads = num@logger = logger@connections = []@buffers = Hash.new { |h,k| h[k] = '' }@signal_queue = Queue.new@signal_pipes = IO.pipe@signal_pipes[1].sync = true@worker_queue = Queue.new@shutdown_queue = Queue.newenddef add_connection(socket)signal [:connection, socket]enddef spawn@iom_thread = Thread.new do@logger.debug "Starting #{self}"runendenddef shutdown(timeout = 0)@logger.debug "#{self} is shutting down workers"@worker_queue.clear@num_threads.times { @worker_queue.push [:shutdown] }signal [:shutdown, timeout]@shutdown_queue.pop@signal_pipes[0].close@signal_pipes[1].close@logger.debug "#{self} is shutting down, goodbye"enddef ensure_closedkill_worker_threads if @worker_threads@iom_thread.killendprivatedef runspin_worker_threadsloop dord, = select([@signal_pipes[0], *@connections])if rd.delete @signal_pipes[0]break if read_signals == :shutdownendrd.each do |fd|if fd.handle.eof?remove_connection fdelseread_connection fdendendendjoin_worker_threads(@shutdown_timeout)ensure@shutdown_queue.push :shutdownenddef read_connection(fd)@buffers[fd] << fd.read(DEFAULT_BUFFER)frame = slice_frame!(@buffers[fd])if frame@logger.debug "#{self} is processing a frame"@worker_queue.push [:frame, fd, frame]endenddef spin_worker_threads@logger.debug "#{self} is spinning up worker threads"@worker_threads = []@num_threads.times do@worker_threads << spin_threadendenddef spin_threadWorker.new(@processor, @transport_factory, @protocol_factory, @logger, @worker_queue).spawnenddef signal(msg)@signal_queue << msg@signal_pipes[1].write " "enddef read_signals# clear the signal pipe# note that since read_nonblock is broken in jruby,# we can only read up to a set number of signals at oncesigstr = @signal_pipes[0].readpartial(1024)# now read the signalsbeginsigstr.length.times dosignal, obj = @signal_queue.pop(true)case signalwhen :connection@connections << objwhen :shutdown@shutdown_timeout = objreturn :shutdownendendrescue ThreadError# out of signals# note that in a perfect world this would never happen, since we're# only reading the number of signals pushed on the pipe, but given the lack# of locks, in theory we could clear the pipe/queue while a new signal is being# placed on the pipe, at which point our next read_signals would hit this errorendenddef remove_connection(fd)# don't explicitly close it, a thread may still be writing to it@connections.delete fd@buffers.delete fdenddef join_worker_threads(shutdown_timeout)start = Time.now@worker_threads.each do |t|if shutdown_timeout > 0timeout = (start + shutdown_timeout) - Time.nowbreak if timeout <= 0t.join(timeout)elset.joinendendkill_worker_threadsenddef kill_worker_threads@worker_threads.each do |t|t.kill if t.statusend@worker_threads.clearenddef slice_frame!(buf)if buf.length >= 4size = buf.unpack('N').firstif buf.length >= size + 4buf.slice!(0, size + 4)elsenilendelsenilendendclass Worker # :nodoc:def initialize(processor, transport_factory, protocol_factory, logger, queue)@processor = processor@transport_factory = transport_factory@protocol_factory = protocol_factory@logger = logger@queue = queueenddef spawnThread.new do@logger.debug "#{self} is spawning"runendendprivatedef runloop docmd, *args = @queue.popcase cmdwhen :shutdown@logger.debug "#{self} is shutting down, goodbye"breakwhen :framefd, frame = argsbeginotrans = @transport_factory.get_transport(fd)oprot = @protocol_factory.get_protocol(otrans)membuf = MemoryBufferTransport.new(frame)itrans = @transport_factory.get_transport(membuf)iprot = @protocol_factory.get_protocol(itrans)@processor.process(iprot, oprot)rescue => e@logger.error "#{Thread.current.inspect} raised error: #{e.inspect}\n#{e.backtrace.join("\n")}"endendendendendendendend