Rev 30 | Blame | Compare with Previous | Last modification | View Log | RSS feed
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.#require File.dirname(__FILE__) + '/spec_helper'class ThriftNonblockingServerSpec < Spec::ExampleGroupinclude Thriftinclude SpecNamespaceclass Handlerdef initialize@queue = Queue.newendattr_accessor :serverdef greeting(english)if englishSpecNamespace::Hello.newelseSpecNamespace::Hello.new(:greeting => "Aloha!")endenddef block@queue.popenddef unblock(n)n.times { @queue.push true }enddef sleep(time)Kernel.sleep timeenddef shutdown@server.shutdown(0, false)endendclass SpecTransport < BaseTransportdef initialize(transport, queue)@transport = transport@queue = queue@flushed = falseenddef open?@transport.open?enddef open@transport.openenddef close@transport.closeenddef read(sz)@transport.read(sz)enddef write(buf,sz=nil)@transport.write(buf, sz)enddef flush@queue.push :flushed unless @flushed or @queue.nil?@flushed = true@transport.flushendendclass SpecServerSocket < ServerSocketdef initialize(host, port, queue)super(host, port)@queue = queueenddef listensuper@queue.push :listenendenddescribe Thrift::NonblockingServer dobefore(:each) do@port = 43251handler = Handler.newprocessor = NonblockingService::Processor.new(handler)queue = Queue.new@transport = SpecServerSocket.new('localhost', @port, queue)transport_factory = FramedTransportFactory.newlogger = Logger.new(STDERR)logger.level = Logger::WARN@server = NonblockingServer.new(processor, @transport, transport_factory, nil, 5, logger)handler.server = @server@server_thread = Thread.new(Thread.current) do |master_thread|begin@server.serverescue => ep eputs e.backtrace * "\n"master_thread.raise eendendqueue.pop@clients = []@catch_exceptions = falseendafter(:each) do@clients.each { |client, trans| trans.close }# @server.shutdown(1)@server_thread.kill@transport.closeenddef setup_client(queue = nil)transport = SpecTransport.new(FramedTransport.new(Socket.new('localhost', @port)), queue)protocol = BinaryProtocol.new(transport)client = NonblockingService::Client.new(protocol)transport.open@clients << [client, transport]clientenddef setup_client_thread(result)queue = Queue.newThread.new dobeginclient = setup_clientwhile (cmd = queue.pop)msg, *args = cmdcase msgwhen :blockresult << client.blockwhen :unblockclient.unblock(args.first)when :helloresult << client.greeting(true) # ignore resultwhen :sleepclient.sleep(args[0] || 0.5)result << :sleptwhen :shutdownclient.shutdownwhen :exitresult << :donebreakendend@clients.each { |c,t| t.close and break if c == client } #close the transportrescue => eraise e unless @catch_exceptionsendendqueueendit "should handle basic message passing" doclient = setup_clientclient.greeting(true).should == Hello.newclient.greeting(false).should == Hello.new(:greeting => 'Aloha!')@server.shutdownendit "should handle concurrent clients" doqueue = Queue.newtrans_queue = Queue.new4.times doThread.new(Thread.current) do |main_thread|beginqueue.push setup_client(trans_queue).blockrescue => emain_thread.raise eendendend4.times { trans_queue.pop }setup_client.unblock(4)4.times { queue.pop.should be_true }@server.shutdownendit "should handle messages from more than 5 long-lived connections" doqueues = []result = Queue.new7.times do |i|queues << setup_client_thread(result)Thread.pass if i == 4 # give the server time to accept connectionsendclient = setup_client# block 4 connections4.times { |i| queues[i] << :block }queues[4] << :helloqueues[5] << :helloqueues[6] << :hello3.times { result.pop.should == Hello.new }client.greeting(true).should == Hello.newqueues[5] << [:unblock, 4]4.times { result.pop.should be_true }queues[2] << :helloresult.pop.should == Hello.newclient.greeting(false).should == Hello.new(:greeting => 'Aloha!')7.times { queues.shift << :exit }client.greeting(true).should == Hello.new@server.shutdownendit "should shut down when asked" do# connect first to ensure it's runningclient = setup_clientclient.greeting(false) # force a message pass@server.shutdown@server_thread.join(2).should be_an_instance_of(Thread)endit "should continue processing active messages when shutting down" doresult = Queue.newclient = setup_client_thread(result)client << :sleepsleep 0.1 # give the server time to start processing the client's message@server.shutdown@server_thread.join(2).should be_an_instance_of(Thread)result.pop.should == :sleptendit "should kill active messages when they don't expire while shutting down" doresult = Queue.newclient = setup_client_thread(result)client << [:sleep, 10]sleep 0.1 # start processing the client's message@server.shutdown(1)@catch_exceptions = true@server_thread.join(3).should_not be_nilresult.should be_emptyendit "should allow shutting down in response to a message" doclient = setup_clientclient.greeting(true).should == Hello.newclient.shutdown@server_thread.join(2).should_not be_nilendendend