| 30 |
ashish |
1 |
#
|
|
|
2 |
# Licensed to the Apache Software Foundation (ASF) under one
|
|
|
3 |
# or more contributor license agreements. See the NOTICE file
|
|
|
4 |
# distributed with this work for additional information
|
|
|
5 |
# regarding copyright ownership. The ASF licenses this file
|
|
|
6 |
# to you under the Apache License, Version 2.0 (the
|
|
|
7 |
# "License"); you may not use this file except in compliance
|
|
|
8 |
# with the License. You may obtain a copy of the License at
|
|
|
9 |
#
|
|
|
10 |
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
11 |
#
|
|
|
12 |
# Unless required by applicable law or agreed to in writing,
|
|
|
13 |
# software distributed under the License is distributed on an
|
|
|
14 |
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
|
15 |
# KIND, either express or implied. See the License for the
|
|
|
16 |
# specific language governing permissions and limitations
|
|
|
17 |
# under the License.
|
|
|
18 |
#
|
|
|
19 |
|
|
|
20 |
require 'rubygems'
|
|
|
21 |
$:.unshift File.dirname(__FILE__) + '/../lib'
|
|
|
22 |
require 'thrift'
|
|
|
23 |
require 'stringio'
|
|
|
24 |
|
|
|
25 |
HOST = '127.0.0.1'
|
|
|
26 |
PORT = 42587
|
|
|
27 |
|
|
|
28 |
###############
|
|
|
29 |
## Server
|
|
|
30 |
###############
|
|
|
31 |
|
|
|
32 |
class Server
|
|
|
33 |
attr_accessor :serverclass
|
|
|
34 |
attr_accessor :interpreter
|
|
|
35 |
attr_accessor :host
|
|
|
36 |
attr_accessor :port
|
|
|
37 |
|
|
|
38 |
def initialize(opts)
|
|
|
39 |
@serverclass = opts.fetch(:class, Thrift::NonblockingServer)
|
|
|
40 |
@interpreter = opts.fetch(:interpreter, "ruby")
|
|
|
41 |
@host = opts.fetch(:host, ::HOST)
|
|
|
42 |
@port = opts.fetch(:port, ::PORT)
|
|
|
43 |
end
|
|
|
44 |
|
|
|
45 |
def start
|
|
|
46 |
return if @serverclass == Object
|
|
|
47 |
args = (File.basename(@interpreter) == "jruby" ? "-J-server" : "")
|
|
|
48 |
@pipe = IO.popen("#{@interpreter} #{args} #{File.dirname(__FILE__)}/server.rb #{@host} #{@port} #{@serverclass.name}", "r+")
|
|
|
49 |
Marshal.load(@pipe) # wait until the server has started
|
|
|
50 |
sleep 0.4 # give the server time to actually start spawning sockets
|
|
|
51 |
end
|
|
|
52 |
|
|
|
53 |
def shutdown
|
|
|
54 |
return unless @pipe
|
|
|
55 |
Marshal.dump(:shutdown, @pipe)
|
|
|
56 |
begin
|
|
|
57 |
@pipe.read(10) # block until the server shuts down
|
|
|
58 |
rescue EOFError
|
|
|
59 |
end
|
|
|
60 |
@pipe.close
|
|
|
61 |
@pipe = nil
|
|
|
62 |
end
|
|
|
63 |
end
|
|
|
64 |
|
|
|
65 |
class BenchmarkManager
|
|
|
66 |
def initialize(opts, server)
|
|
|
67 |
@socket = opts.fetch(:socket) do
|
|
|
68 |
@host = opts.fetch(:host, 'localhost')
|
|
|
69 |
@port = opts.fetch(:port)
|
|
|
70 |
nil
|
|
|
71 |
end
|
|
|
72 |
@num_processes = opts.fetch(:num_processes, 40)
|
|
|
73 |
@clients_per_process = opts.fetch(:clients_per_process, 10)
|
|
|
74 |
@calls_per_client = opts.fetch(:calls_per_client, 50)
|
|
|
75 |
@interpreter = opts.fetch(:interpreter, "ruby")
|
|
|
76 |
@server = server
|
|
|
77 |
@log_exceptions = opts.fetch(:log_exceptions, false)
|
|
|
78 |
end
|
|
|
79 |
|
|
|
80 |
def run
|
|
|
81 |
@pool = []
|
|
|
82 |
@benchmark_start = Time.now
|
|
|
83 |
puts "Spawning benchmark processes..."
|
|
|
84 |
@num_processes.times do
|
|
|
85 |
spawn
|
|
|
86 |
sleep 0.02 # space out spawns
|
|
|
87 |
end
|
|
|
88 |
collect_output
|
|
|
89 |
@benchmark_end = Time.now # we know the procs are done here
|
|
|
90 |
translate_output
|
|
|
91 |
analyze_output
|
|
|
92 |
report_output
|
|
|
93 |
end
|
|
|
94 |
|
|
|
95 |
def spawn
|
|
|
96 |
pipe = IO.popen("#{@interpreter} #{File.dirname(__FILE__)}/client.rb #{"-log-exceptions" if @log_exceptions} #{@host} #{@port} #{@clients_per_process} #{@calls_per_client}")
|
|
|
97 |
@pool << pipe
|
|
|
98 |
end
|
|
|
99 |
|
|
|
100 |
def socket_class
|
|
|
101 |
if @socket
|
|
|
102 |
Thrift::UNIXSocket
|
|
|
103 |
else
|
|
|
104 |
Thrift::Socket
|
|
|
105 |
end
|
|
|
106 |
end
|
|
|
107 |
|
|
|
108 |
def collect_output
|
|
|
109 |
puts "Collecting output..."
|
|
|
110 |
# read from @pool until all sockets are closed
|
|
|
111 |
@buffers = Hash.new { |h,k| h[k] = '' }
|
|
|
112 |
until @pool.empty?
|
|
|
113 |
rd, = select(@pool)
|
|
|
114 |
next if rd.nil?
|
|
|
115 |
rd.each do |fd|
|
|
|
116 |
begin
|
|
|
117 |
@buffers[fd] << fd.readpartial(4096)
|
|
|
118 |
rescue EOFError
|
|
|
119 |
@pool.delete fd
|
|
|
120 |
end
|
|
|
121 |
end
|
|
|
122 |
end
|
|
|
123 |
end
|
|
|
124 |
|
|
|
125 |
def translate_output
|
|
|
126 |
puts "Translating output..."
|
|
|
127 |
@output = []
|
|
|
128 |
@buffers.each do |fd, buffer|
|
|
|
129 |
strio = StringIO.new(buffer)
|
|
|
130 |
logs = []
|
|
|
131 |
begin
|
|
|
132 |
loop do
|
|
|
133 |
logs << Marshal.load(strio)
|
|
|
134 |
end
|
|
|
135 |
rescue EOFError
|
|
|
136 |
@output << logs
|
|
|
137 |
end
|
|
|
138 |
end
|
|
|
139 |
end
|
|
|
140 |
|
|
|
141 |
def analyze_output
|
|
|
142 |
puts "Analyzing output..."
|
|
|
143 |
call_times = []
|
|
|
144 |
client_times = []
|
|
|
145 |
connection_failures = []
|
|
|
146 |
connection_errors = []
|
|
|
147 |
shortest_call = 0
|
|
|
148 |
shortest_client = 0
|
|
|
149 |
longest_call = 0
|
|
|
150 |
longest_client = 0
|
|
|
151 |
@output.each do |logs|
|
|
|
152 |
cur_call, cur_client = nil
|
|
|
153 |
logs.each do |tok, time|
|
|
|
154 |
case tok
|
|
|
155 |
when :start
|
|
|
156 |
cur_client = time
|
|
|
157 |
when :call_start
|
|
|
158 |
cur_call = time
|
|
|
159 |
when :call_end
|
|
|
160 |
delta = time - cur_call
|
|
|
161 |
call_times << delta
|
|
|
162 |
longest_call = delta unless longest_call > delta
|
|
|
163 |
shortest_call = delta if shortest_call == 0 or delta < shortest_call
|
|
|
164 |
cur_call = nil
|
|
|
165 |
when :end
|
|
|
166 |
delta = time - cur_client
|
|
|
167 |
client_times << delta
|
|
|
168 |
longest_client = delta unless longest_client > delta
|
|
|
169 |
shortest_client = delta if shortest_client == 0 or delta < shortest_client
|
|
|
170 |
cur_client = nil
|
|
|
171 |
when :connection_failure
|
|
|
172 |
connection_failures << time
|
|
|
173 |
when :connection_error
|
|
|
174 |
connection_errors << time
|
|
|
175 |
end
|
|
|
176 |
end
|
|
|
177 |
end
|
|
|
178 |
@report = {}
|
|
|
179 |
@report[:total_calls] = call_times.inject(0.0) { |a,t| a += t }
|
|
|
180 |
@report[:avg_calls] = @report[:total_calls] / call_times.size
|
|
|
181 |
@report[:total_clients] = client_times.inject(0.0) { |a,t| a += t }
|
|
|
182 |
@report[:avg_clients] = @report[:total_clients] / client_times.size
|
|
|
183 |
@report[:connection_failures] = connection_failures.size
|
|
|
184 |
@report[:connection_errors] = connection_errors.size
|
|
|
185 |
@report[:shortest_call] = shortest_call
|
|
|
186 |
@report[:shortest_client] = shortest_client
|
|
|
187 |
@report[:longest_call] = longest_call
|
|
|
188 |
@report[:longest_client] = longest_client
|
|
|
189 |
@report[:total_benchmark_time] = @benchmark_end - @benchmark_start
|
|
|
190 |
@report[:fastthread] = $".include?('fastthread.bundle')
|
|
|
191 |
end
|
|
|
192 |
|
|
|
193 |
def report_output
|
|
|
194 |
fmt = "%.4f seconds"
|
|
|
195 |
puts
|
|
|
196 |
tabulate "%d",
|
|
|
197 |
[["Server class", "%s"], @server.serverclass == Object ? "" : @server.serverclass],
|
|
|
198 |
[["Server interpreter", "%s"], @server.interpreter],
|
|
|
199 |
[["Client interpreter", "%s"], @interpreter],
|
|
|
200 |
[["Socket class", "%s"], socket_class],
|
|
|
201 |
["Number of processes", @num_processes],
|
|
|
202 |
["Clients per process", @clients_per_process],
|
|
|
203 |
["Calls per client", @calls_per_client],
|
|
|
204 |
[["Using fastthread", "%s"], @report[:fastthread] ? "yes" : "no"]
|
|
|
205 |
puts
|
|
|
206 |
failures = (@report[:connection_failures] > 0)
|
|
|
207 |
tabulate fmt,
|
|
|
208 |
[["Connection failures", "%d", [:red, :bold]], @report[:connection_failures]],
|
|
|
209 |
[["Connection errors", "%d", [:red, :bold]], @report[:connection_errors]],
|
|
|
210 |
["Average time per call", @report[:avg_calls]],
|
|
|
211 |
["Average time per client (%d calls)" % @calls_per_client, @report[:avg_clients]],
|
|
|
212 |
["Total time for all calls", @report[:total_calls]],
|
|
|
213 |
["Real time for benchmarking", @report[:total_benchmark_time]],
|
|
|
214 |
["Shortest call time", @report[:shortest_call]],
|
|
|
215 |
["Longest call time", @report[:longest_call]],
|
|
|
216 |
["Shortest client time (%d calls)" % @calls_per_client, @report[:shortest_client]],
|
|
|
217 |
["Longest client time (%d calls)" % @calls_per_client, @report[:longest_client]]
|
|
|
218 |
end
|
|
|
219 |
|
|
|
220 |
ANSI = {
|
|
|
221 |
:reset => 0,
|
|
|
222 |
:bold => 1,
|
|
|
223 |
:black => 30,
|
|
|
224 |
:red => 31,
|
|
|
225 |
:green => 32,
|
|
|
226 |
:yellow => 33,
|
|
|
227 |
:blue => 34,
|
|
|
228 |
:magenta => 35,
|
|
|
229 |
:cyan => 36,
|
|
|
230 |
:white => 37
|
|
|
231 |
}
|
|
|
232 |
|
|
|
233 |
def tabulate(fmt, *labels_and_values)
|
|
|
234 |
labels = labels_and_values.map { |l| Array === l ? l.first : l }
|
|
|
235 |
label_width = labels.inject(0) { |w,l| l.size > w ? l.size : w }
|
|
|
236 |
labels_and_values.each do |(l,v)|
|
|
|
237 |
f = fmt
|
|
|
238 |
l, f, c = l if Array === l
|
|
|
239 |
fmtstr = "%-#{label_width+1}s #{f}"
|
|
|
240 |
if STDOUT.tty? and c and v.to_i > 0
|
|
|
241 |
fmtstr = "\e[#{[*c].map { |x| ANSI[x] } * ";"}m" + fmtstr + "\e[#{ANSI[:reset]}m"
|
|
|
242 |
end
|
|
|
243 |
puts fmtstr % [l+":", v]
|
|
|
244 |
end
|
|
|
245 |
end
|
|
|
246 |
end
|
|
|
247 |
|
|
|
248 |
def resolve_const(const)
|
|
|
249 |
const and const.split('::').inject(Object) { |k,c| k.const_get(c) }
|
|
|
250 |
end
|
|
|
251 |
|
|
|
252 |
puts "Starting server..."
|
|
|
253 |
args = {}
|
|
|
254 |
args[:interpreter] = ENV['THRIFT_SERVER_INTERPRETER'] || ENV['THRIFT_INTERPRETER'] || "ruby"
|
|
|
255 |
args[:class] = resolve_const(ENV['THRIFT_SERVER']) || Thrift::NonblockingServer
|
|
|
256 |
args[:host] = ENV['THRIFT_HOST'] || HOST
|
|
|
257 |
args[:port] = (ENV['THRIFT_PORT'] || PORT).to_i
|
|
|
258 |
server = Server.new(args)
|
|
|
259 |
server.start
|
|
|
260 |
|
|
|
261 |
args = {}
|
|
|
262 |
args[:host] = ENV['THRIFT_HOST'] || HOST
|
|
|
263 |
args[:port] = (ENV['THRIFT_PORT'] || PORT).to_i
|
|
|
264 |
args[:num_processes] = (ENV['THRIFT_NUM_PROCESSES'] || 40).to_i
|
|
|
265 |
args[:clients_per_process] = (ENV['THRIFT_NUM_CLIENTS'] || 5).to_i
|
|
|
266 |
args[:calls_per_client] = (ENV['THRIFT_NUM_CALLS'] || 50).to_i
|
|
|
267 |
args[:interpreter] = ENV['THRIFT_CLIENT_INTERPRETER'] || ENV['THRIFT_INTERPRETER'] || "ruby"
|
|
|
268 |
args[:log_exceptions] = !!ENV['THRIFT_LOG_EXCEPTIONS']
|
|
|
269 |
BenchmarkManager.new(args, server).run
|
|
|
270 |
|
|
|
271 |
server.shutdown
|