Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
#
2
# Licensed to the Apache Software Foundation (ASF) under one
3
# or more contributor license agreements. See the NOTICE file
4
# distributed with this work for additional information
5
# regarding copyright ownership. The ASF licenses this file
6
# to you under the Apache License, Version 2.0 (the
7
# "License"); you may not use this file except in compliance
8
# with the License. You may obtain a copy of the License at
9
#
10
#   http://www.apache.org/licenses/LICENSE-2.0
11
#
12
# Unless required by applicable law or agreed to in writing,
13
# software distributed under the License is distributed on an
14
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
# KIND, either express or implied. See the License for the
16
# specific language governing permissions and limitations
17
# under the License.
18
#
19
 
20
require File.dirname(__FILE__) + '/spec_helper'
21
 
22
class ThriftNonblockingServerSpec < Spec::ExampleGroup
23
  include Thrift
24
  include SpecNamespace
25
 
26
  class Handler
27
    def initialize
28
      @queue = Queue.new
29
    end
30
 
31
    attr_accessor :server
32
 
33
    def greeting(english)
34
      if english
35
        SpecNamespace::Hello.new
36
      else
37
        SpecNamespace::Hello.new(:greeting => "Aloha!")
38
      end
39
    end
40
 
41
    def block
42
      @queue.pop
43
    end
44
 
45
    def unblock(n)
46
      n.times { @queue.push true }
47
    end
48
 
49
    def sleep(time)
50
      Kernel.sleep time
51
    end
52
 
53
    def shutdown
54
      @server.shutdown(0, false)
55
    end
56
  end
57
 
58
  class SpecTransport < BaseTransport
59
    def initialize(transport, queue)
60
      @transport = transport
61
      @queue = queue
62
      @flushed = false
63
    end
64
 
65
    def open?
66
      @transport.open?
67
    end
68
 
69
    def open
70
      @transport.open
71
    end
72
 
73
    def close
74
      @transport.close
75
    end
76
 
77
    def read(sz)
78
      @transport.read(sz)
79
    end
80
 
81
    def write(buf,sz=nil)
82
      @transport.write(buf, sz)
83
    end
84
 
85
    def flush
86
      @queue.push :flushed unless @flushed or @queue.nil?
87
      @flushed = true
88
      @transport.flush
89
    end
90
  end
91
 
92
  class SpecServerSocket < ServerSocket
93
    def initialize(host, port, queue)
94
      super(host, port)
95
      @queue = queue
96
    end
97
 
98
    def listen
99
      super
100
      @queue.push :listen
101
    end
102
  end
103
 
104
  describe Thrift::NonblockingServer do
105
    before(:each) do
106
      @port = 43251
107
      handler = Handler.new
108
      processor = NonblockingService::Processor.new(handler)
109
      queue = Queue.new
110
      @transport = SpecServerSocket.new('localhost', @port, queue)
111
      transport_factory = FramedTransportFactory.new
112
      logger = Logger.new(STDERR)
113
      logger.level = Logger::WARN
114
      @server = NonblockingServer.new(processor, @transport, transport_factory, nil, 5, logger)
115
      handler.server = @server
116
      @server_thread = Thread.new(Thread.current) do |master_thread|
117
        begin
118
          @server.serve
119
        rescue => e
120
          p e
121
          puts e.backtrace * "\n"
122
          master_thread.raise e
123
        end
124
      end
125
      queue.pop
126
 
127
      @clients = []
128
      @catch_exceptions = false
129
    end
130
 
131
    after(:each) do
132
      @clients.each { |client, trans| trans.close }
133
      # @server.shutdown(1)
134
      @server_thread.kill
135
      @transport.close
136
    end
137
 
138
    def setup_client(queue = nil)
139
      transport = SpecTransport.new(FramedTransport.new(Socket.new('localhost', @port)), queue)
140
      protocol = BinaryProtocol.new(transport)
141
      client = NonblockingService::Client.new(protocol)
142
      transport.open
143
      @clients << [client, transport]
144
      client
145
    end
146
 
147
    def setup_client_thread(result)
148
      queue = Queue.new
149
      Thread.new do
150
        begin
151
          client = setup_client
152
          while (cmd = queue.pop)
153
            msg, *args = cmd
154
            case msg
155
            when :block
156
              result << client.block
157
            when :unblock
158
              client.unblock(args.first)
159
            when :hello
160
              result << client.greeting(true) # ignore result
161
            when :sleep
162
              client.sleep(args[0] || 0.5)
163
              result << :slept
164
            when :shutdown
165
              client.shutdown
166
            when :exit
167
              result << :done
168
              break
169
            end
170
          end
171
          @clients.each { |c,t| t.close and break if c == client } #close the transport
172
        rescue => e
173
          raise e unless @catch_exceptions
174
        end
175
      end
176
      queue
177
    end
178
 
179
    it "should handle basic message passing" do
180
      client = setup_client
181
      client.greeting(true).should == Hello.new
182
      client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
183
      @server.shutdown
184
    end
185
 
186
    it "should handle concurrent clients" do
187
      queue = Queue.new
188
      trans_queue = Queue.new
189
      4.times do
190
        Thread.new(Thread.current) do |main_thread|
191
          begin
192
            queue.push setup_client(trans_queue).block
193
          rescue => e
194
            main_thread.raise e
195
          end
196
        end
197
      end
198
      4.times { trans_queue.pop }
199
      setup_client.unblock(4)
200
      4.times { queue.pop.should be_true }
201
      @server.shutdown
202
    end
203
 
204
    it "should handle messages from more than 5 long-lived connections" do
205
      queues = []
206
      result = Queue.new
207
      7.times do |i|
208
        queues << setup_client_thread(result)
209
        Thread.pass if i == 4 # give the server time to accept connections
210
      end
211
      client = setup_client
212
      # block 4 connections
213
      4.times { |i| queues[i] << :block }
214
      queues[4] << :hello
215
      queues[5] << :hello
216
      queues[6] << :hello
217
      3.times { result.pop.should == Hello.new }
218
      client.greeting(true).should == Hello.new
219
      queues[5] << [:unblock, 4]
220
      4.times { result.pop.should be_true }
221
      queues[2] << :hello
222
      result.pop.should == Hello.new
223
      client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
224
      7.times { queues.shift << :exit }
225
      client.greeting(true).should == Hello.new
226
      @server.shutdown
227
    end
228
 
229
    it "should shut down when asked" do
230
      # connect first to ensure it's running
231
      client = setup_client
232
      client.greeting(false) # force a message pass
233
      @server.shutdown
234
      @server_thread.join(2).should be_an_instance_of(Thread)
235
    end
236
 
237
    it "should continue processing active messages when shutting down" do
238
      result = Queue.new
239
      client = setup_client_thread(result)
240
      client << :sleep
241
      sleep 0.1 # give the server time to start processing the client's message
242
      @server.shutdown
243
      @server_thread.join(2).should be_an_instance_of(Thread)
244
      result.pop.should == :slept
245
    end
246
 
247
    it "should kill active messages when they don't expire while shutting down" do
248
      result = Queue.new
249
      client = setup_client_thread(result)
250
      client << [:sleep, 10]
251
      sleep 0.1 # start processing the client's message
252
      @server.shutdown(1)
253
      @catch_exceptions = true
254
      @server_thread.join(3).should_not be_nil
255
      result.should be_empty
256
    end
257
 
258
    it "should allow shutting down in response to a message" do
259
      client = setup_client
260
      client.greeting(true).should == Hello.new
261
      client.shutdown
262
      @server_thread.join(2).should_not be_nil
263
    end
264
  end
265
end