Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
#
2
# Licensed to the Apache Software Foundation (ASF) under one
3
# or more contributor license agreements. See the NOTICE file
4
# distributed with this work for additional information
5
# regarding copyright ownership. The ASF licenses this file
6
# to you under the Apache License, Version 2.0 (the
7
# "License"); you may not use this file except in compliance
8
# with the License. You may obtain a copy of the License at
9
#
10
#   http://www.apache.org/licenses/LICENSE-2.0
11
#
12
# Unless required by applicable law or agreed to in writing,
13
# software distributed under the License is distributed on an
14
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
# KIND, either express or implied. See the License for the
16
# specific language governing permissions and limitations
17
# under the License.
18
#
19
 
20
require 'rubygems'
21
$:.unshift File.dirname(__FILE__) + '/../lib'
22
require 'thrift'
23
require 'stringio'
24
 
25
HOST = '127.0.0.1'
26
PORT = 42587
27
 
28
###############
29
## Server
30
###############
31
 
32
class Server
33
  attr_accessor :serverclass
34
  attr_accessor :interpreter
35
  attr_accessor :host
36
  attr_accessor :port
37
 
38
  def initialize(opts)
39
    @serverclass = opts.fetch(:class, Thrift::NonblockingServer)
40
    @interpreter = opts.fetch(:interpreter, "ruby")
41
    @host = opts.fetch(:host, ::HOST)
42
    @port = opts.fetch(:port, ::PORT)
43
  end
44
 
45
  def start
46
    return if @serverclass == Object
47
    args = (File.basename(@interpreter) == "jruby" ? "-J-server" : "")
48
    @pipe = IO.popen("#{@interpreter} #{args} #{File.dirname(__FILE__)}/server.rb #{@host} #{@port} #{@serverclass.name}", "r+")
49
    Marshal.load(@pipe) # wait until the server has started
50
    sleep 0.4 # give the server time to actually start spawning sockets
51
  end
52
 
53
  def shutdown
54
    return unless @pipe
55
    Marshal.dump(:shutdown, @pipe)
56
    begin
57
      @pipe.read(10) # block until the server shuts down
58
    rescue EOFError
59
    end
60
    @pipe.close
61
    @pipe = nil
62
  end
63
end
64
 
65
class BenchmarkManager
66
  def initialize(opts, server)
67
    @socket = opts.fetch(:socket) do
68
      @host = opts.fetch(:host, 'localhost')
69
      @port = opts.fetch(:port)
70
      nil
71
    end
72
    @num_processes = opts.fetch(:num_processes, 40)
73
    @clients_per_process = opts.fetch(:clients_per_process, 10)
74
    @calls_per_client = opts.fetch(:calls_per_client, 50)
75
    @interpreter = opts.fetch(:interpreter, "ruby")
76
    @server = server
77
    @log_exceptions = opts.fetch(:log_exceptions, false)
78
  end
79
 
80
  def run
81
    @pool = []
82
    @benchmark_start = Time.now
83
    puts "Spawning benchmark processes..."
84
    @num_processes.times do
85
      spawn
86
      sleep 0.02 # space out spawns
87
    end
88
    collect_output
89
    @benchmark_end = Time.now # we know the procs are done here
90
    translate_output
91
    analyze_output
92
    report_output
93
  end
94
 
95
  def spawn
96
    pipe = IO.popen("#{@interpreter} #{File.dirname(__FILE__)}/client.rb #{"-log-exceptions" if @log_exceptions} #{@host} #{@port} #{@clients_per_process} #{@calls_per_client}")
97
    @pool << pipe
98
  end
99
 
100
  def socket_class
101
    if @socket
102
      Thrift::UNIXSocket
103
    else
104
      Thrift::Socket
105
    end
106
  end
107
 
108
  def collect_output
109
    puts "Collecting output..."
110
    # read from @pool until all sockets are closed
111
    @buffers = Hash.new { |h,k| h[k] = '' }
112
    until @pool.empty?
113
      rd, = select(@pool)
114
      next if rd.nil?
115
      rd.each do |fd|
116
        begin
117
          @buffers[fd] << fd.readpartial(4096)
118
        rescue EOFError
119
          @pool.delete fd
120
        end
121
      end
122
    end
123
  end
124
 
125
  def translate_output
126
    puts "Translating output..."
127
    @output = []
128
    @buffers.each do |fd, buffer|
129
      strio = StringIO.new(buffer)
130
      logs = []
131
      begin
132
        loop do
133
          logs << Marshal.load(strio)
134
        end
135
      rescue EOFError
136
        @output << logs
137
      end
138
    end
139
  end
140
 
141
  def analyze_output
142
    puts "Analyzing output..."
143
    call_times = []
144
    client_times = []
145
    connection_failures = []
146
    connection_errors = []
147
    shortest_call = 0
148
    shortest_client = 0
149
    longest_call = 0
150
    longest_client = 0
151
    @output.each do |logs|
152
      cur_call, cur_client = nil
153
      logs.each do |tok, time|
154
        case tok
155
        when :start
156
          cur_client = time
157
        when :call_start
158
          cur_call = time
159
        when :call_end
160
          delta = time - cur_call
161
          call_times << delta
162
          longest_call = delta unless longest_call > delta
163
          shortest_call = delta if shortest_call == 0 or delta < shortest_call
164
          cur_call = nil
165
        when :end
166
          delta = time - cur_client
167
          client_times << delta
168
          longest_client = delta unless longest_client > delta
169
          shortest_client = delta if shortest_client == 0 or delta < shortest_client
170
          cur_client = nil
171
        when :connection_failure
172
          connection_failures << time
173
        when :connection_error
174
          connection_errors << time
175
        end
176
      end
177
    end
178
    @report = {}
179
    @report[:total_calls] = call_times.inject(0.0) { |a,t| a += t }
180
    @report[:avg_calls] = @report[:total_calls] / call_times.size
181
    @report[:total_clients] = client_times.inject(0.0) { |a,t| a += t }
182
    @report[:avg_clients] = @report[:total_clients] / client_times.size
183
    @report[:connection_failures] = connection_failures.size
184
    @report[:connection_errors] = connection_errors.size
185
    @report[:shortest_call] = shortest_call
186
    @report[:shortest_client] = shortest_client
187
    @report[:longest_call] = longest_call
188
    @report[:longest_client] = longest_client
189
    @report[:total_benchmark_time] = @benchmark_end - @benchmark_start
190
    @report[:fastthread] = $".include?('fastthread.bundle')
191
  end
192
 
193
  def report_output
194
    fmt = "%.4f seconds"
195
    puts
196
    tabulate "%d",
197
             [["Server class", "%s"], @server.serverclass == Object ? "" : @server.serverclass],
198
             [["Server interpreter", "%s"], @server.interpreter],
199
             [["Client interpreter", "%s"], @interpreter],
200
             [["Socket class", "%s"], socket_class],
201
             ["Number of processes", @num_processes],
202
             ["Clients per process", @clients_per_process],
203
             ["Calls per client", @calls_per_client],
204
             [["Using fastthread", "%s"], @report[:fastthread] ? "yes" : "no"]
205
    puts
206
    failures = (@report[:connection_failures] > 0)
207
    tabulate fmt,
208
             [["Connection failures", "%d", [:red, :bold]], @report[:connection_failures]],
209
             [["Connection errors", "%d", [:red, :bold]], @report[:connection_errors]],
210
             ["Average time per call", @report[:avg_calls]],
211
             ["Average time per client (%d calls)" % @calls_per_client, @report[:avg_clients]],
212
             ["Total time for all calls", @report[:total_calls]],
213
             ["Real time for benchmarking", @report[:total_benchmark_time]],
214
             ["Shortest call time", @report[:shortest_call]],
215
             ["Longest call time", @report[:longest_call]],
216
             ["Shortest client time (%d calls)" % @calls_per_client, @report[:shortest_client]],
217
             ["Longest client time (%d calls)" % @calls_per_client, @report[:longest_client]]
218
  end
219
 
220
  ANSI = {
221
    :reset => 0,
222
    :bold => 1,
223
    :black => 30,
224
    :red => 31,
225
    :green => 32,
226
    :yellow => 33,
227
    :blue => 34,
228
    :magenta => 35,
229
    :cyan => 36,
230
    :white => 37
231
  }
232
 
233
  def tabulate(fmt, *labels_and_values)
234
    labels = labels_and_values.map { |l| Array === l ? l.first : l }
235
    label_width = labels.inject(0) { |w,l| l.size > w ? l.size : w }
236
    labels_and_values.each do |(l,v)|
237
      f = fmt
238
      l, f, c = l if Array === l
239
      fmtstr = "%-#{label_width+1}s #{f}"
240
      if STDOUT.tty? and c and v.to_i > 0
241
        fmtstr = "\e[#{[*c].map { |x| ANSI[x] } * ";"}m" + fmtstr + "\e[#{ANSI[:reset]}m"
242
      end
243
      puts fmtstr % [l+":", v]
244
    end
245
  end
246
end
247
 
248
def resolve_const(const)
249
  const and const.split('::').inject(Object) { |k,c| k.const_get(c) }
250
end
251
 
252
puts "Starting server..."
253
args = {}
254
args[:interpreter] = ENV['THRIFT_SERVER_INTERPRETER'] || ENV['THRIFT_INTERPRETER'] || "ruby"
255
args[:class] = resolve_const(ENV['THRIFT_SERVER']) || Thrift::NonblockingServer
256
args[:host] = ENV['THRIFT_HOST'] || HOST
257
args[:port] = (ENV['THRIFT_PORT'] || PORT).to_i
258
server = Server.new(args)
259
server.start
260
 
261
args = {}
262
args[:host] = ENV['THRIFT_HOST'] || HOST
263
args[:port] = (ENV['THRIFT_PORT'] || PORT).to_i
264
args[:num_processes] = (ENV['THRIFT_NUM_PROCESSES'] || 40).to_i
265
args[:clients_per_process] = (ENV['THRIFT_NUM_CLIENTS'] || 5).to_i
266
args[:calls_per_client] = (ENV['THRIFT_NUM_CALLS'] || 50).to_i
267
args[:interpreter] = ENV['THRIFT_CLIENT_INTERPRETER'] || ENV['THRIFT_INTERPRETER'] || "ruby"
268
args[:log_exceptions] = !!ENV['THRIFT_LOG_EXCEPTIONS']
269
BenchmarkManager.new(args, server).run
270
 
271
server.shutdown