Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
#include <concurrency/ThreadManager.h>
21
#include <concurrency/PosixThreadFactory.h>
22
#include <concurrency/Monitor.h>
23
#include <concurrency/Util.h>
24
#include <concurrency/Mutex.h>
25
#include <protocol/TBinaryProtocol.h>
26
#include <server/TSimpleServer.h>
27
#include <server/TThreadPoolServer.h>
28
#include <server/TThreadedServer.h>
29
#include <transport/TServerSocket.h>
30
#include <transport/TSocket.h>
31
#include <transport/TTransportUtils.h>
32
#include <transport/TFileTransport.h>
33
#include <TLogging.h>
34
 
35
#include "Service.h"
36
 
37
#include <iostream>
38
#include <set>
39
#include <stdexcept>
40
#include <sstream>
41
 
42
#include <map>
43
#include <ext/hash_map>
44
using __gnu_cxx::hash_map;
45
using __gnu_cxx::hash;
46
 
47
using namespace std;
48
using namespace boost;
49
 
50
using namespace apache::thrift;
51
using namespace apache::thrift::protocol;
52
using namespace apache::thrift::transport;
53
using namespace apache::thrift::server;
54
using namespace apache::thrift::concurrency;
55
 
56
using namespace test::stress;
57
 
58
struct eqstr {
59
  bool operator()(const char* s1, const char* s2) const {
60
    return strcmp(s1, s2) == 0;
61
  }
62
};
63
 
64
struct ltstr {
65
  bool operator()(const char* s1, const char* s2) const {
66
    return strcmp(s1, s2) < 0;
67
  }
68
};
69
 
70
 
71
// typedef hash_map<const char*, int, hash<const char*>, eqstr> count_map;
72
typedef map<const char*, int, ltstr> count_map;
73
 
74
class Server : public ServiceIf {
75
 public:
76
  Server() {}
77
 
78
  void count(const char* method) {
79
    Guard m(lock_);
80
    int ct = counts_[method];
81
    counts_[method] = ++ct;
82
  }
83
 
84
  void echoVoid() {
85
    count("echoVoid");
86
    return;
87
  }
88
 
89
  count_map getCount() {
90
    Guard m(lock_);
91
    return counts_;
92
  }
93
 
94
  int8_t echoByte(const int8_t arg) {return arg;}
95
  int32_t echoI32(const int32_t arg) {return arg;}
96
  int64_t echoI64(const int64_t arg) {return arg;}
97
  void echoString(string& out, const string &arg) {
98
    if (arg != "hello") {
99
      T_ERROR_ABORT("WRONG STRING!!!!");
100
    }
101
    out = arg;
102
  }
103
  void echoList(vector<int8_t> &out, const vector<int8_t> &arg) { out = arg; }
104
  void echoSet(set<int8_t> &out, const set<int8_t> &arg) { out = arg; }
105
  void echoMap(map<int8_t, int8_t> &out, const map<int8_t, int8_t> &arg) { out = arg; }
106
 
107
private:
108
  count_map counts_;
109
  Mutex lock_;
110
 
111
};
112
 
113
class ClientThread: public Runnable {
114
public:
115
 
116
  ClientThread(shared_ptr<TTransport>transport, shared_ptr<ServiceClient> client, Monitor& monitor, size_t& workerCount, size_t loopCount, TType loopType) :
117
    _transport(transport),
118
    _client(client),
119
    _monitor(monitor),
120
    _workerCount(workerCount),
121
    _loopCount(loopCount),
122
    _loopType(loopType)
123
  {}
124
 
125
  void run() {
126
 
127
    // Wait for all worker threads to start
128
 
129
    {Synchronized s(_monitor);
130
      while(_workerCount == 0) {
131
        _monitor.wait();
132
      }
133
    }
134
 
135
    _startTime = Util::currentTime();
136
 
137
    _transport->open();
138
 
139
    switch(_loopType) {
140
    case T_VOID: loopEchoVoid(); break;
141
    case T_BYTE: loopEchoByte(); break;
142
    case T_I32: loopEchoI32(); break;
143
    case T_I64: loopEchoI64(); break;
144
    case T_STRING: loopEchoString(); break;
145
    default: cerr << "Unexpected loop type" << _loopType << endl; break;
146
    }
147
 
148
    _endTime = Util::currentTime();
149
 
150
    _transport->close();
151
 
152
    _done = true;
153
 
154
    {Synchronized s(_monitor);
155
 
156
      _workerCount--;
157
 
158
      if (_workerCount == 0) {
159
 
160
        _monitor.notify();
161
      }
162
    }
163
  }
164
 
165
  void loopEchoVoid() {
166
    for (size_t ix = 0; ix < _loopCount; ix++) {
167
      _client->echoVoid();
168
    }
169
  }
170
 
171
  void loopEchoByte() {
172
    for (size_t ix = 0; ix < _loopCount; ix++) {
173
      int8_t arg = 1;
174
      int8_t result;
175
      result =_client->echoByte(arg);
176
      assert(result == arg);
177
    }
178
  }
179
 
180
  void loopEchoI32() {
181
    for (size_t ix = 0; ix < _loopCount; ix++) {
182
      int32_t arg = 1;
183
      int32_t result;
184
      result =_client->echoI32(arg);
185
      assert(result == arg);
186
    }
187
  }
188
 
189
  void loopEchoI64() {
190
    for (size_t ix = 0; ix < _loopCount; ix++) {
191
      int64_t arg = 1;
192
      int64_t result;
193
      result =_client->echoI64(arg);
194
      assert(result == arg);
195
    }
196
  }
197
 
198
  void loopEchoString() {
199
    for (size_t ix = 0; ix < _loopCount; ix++) {
200
      string arg = "hello";
201
      string result;
202
      _client->echoString(result, arg);
203
      assert(result == arg);
204
    }
205
  }
206
 
207
  shared_ptr<TTransport> _transport;
208
  shared_ptr<ServiceClient> _client;
209
  Monitor& _monitor;
210
  size_t& _workerCount;
211
  size_t _loopCount;
212
  TType _loopType;
213
  long long _startTime;
214
  long long _endTime;
215
  bool _done;
216
  Monitor _sleep;
217
};
218
 
219
 
220
int main(int argc, char **argv) {
221
 
222
  int port = 9091;
223
  string serverType = "thread-pool";
224
  string protocolType = "binary";
225
  size_t workerCount = 4;
226
  size_t clientCount = 20;
227
  size_t loopCount = 50000;
228
  TType loopType  = T_VOID;
229
  string callName = "echoVoid";
230
  bool runServer = true;
231
  bool logRequests = false;
232
  string requestLogPath = "./requestlog.tlog";
233
  bool replayRequests = false;
234
 
235
  ostringstream usage;
236
 
237
  usage <<
238
    argv[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>] [--clients=<client-count>] [--loop=<loop-count>]" << endl <<
239
    "\tclients        Number of client threads to create - 0 implies no clients, i.e. server only.  Default is " << clientCount << endl <<
240
    "\thelp           Prints this help text." << endl <<
241
    "\tcall           Service method to call.  Default is " << callName << endl <<
242
    "\tloop           The number of remote thrift calls each client makes.  Default is " << loopCount << endl <<
243
    "\tport           The port the server and clients should bind to for thrift network connections.  Default is " << port << endl <<
244
    "\tserver         Run the Thrift server in this process.  Default is " << runServer << endl <<
245
    "\tserver-type    Type of server, \"simple\" or \"thread-pool\".  Default is " << serverType << endl <<
246
    "\tprotocol-type  Type of protocol, \"binary\", \"ascii\", or \"xml\".  Default is " << protocolType << endl <<
247
    "\tlog-request    Log all request to ./requestlog.tlog. Default is " << logRequests << endl <<
248
    "\treplay-request Replay requests from log file (./requestlog.tlog) Default is " << replayRequests << endl <<
249
    "\tworkers        Number of thread pools workers.  Only valid for thread-pool server type.  Default is " << workerCount << endl;
250
 
251
 
252
  map<string, string>  args;
253
 
254
  for (int ix = 1; ix < argc; ix++) {
255
 
256
    string arg(argv[ix]);
257
 
258
    if (arg.compare(0,2, "--") == 0) {
259
 
260
      size_t end = arg.find_first_of("=", 2);
261
 
262
      string key = string(arg, 2, end - 2);
263
 
264
      if (end != string::npos) {
265
        args[key] = string(arg, end + 1);
266
      } else {
267
        args[key] = "true";
268
      }
269
    } else {
270
      throw invalid_argument("Unexcepted command line token: "+arg);
271
    }
272
  }
273
 
274
  try {
275
 
276
    if (!args["clients"].empty()) {
277
      clientCount = atoi(args["clients"].c_str());
278
    }
279
 
280
    if (!args["help"].empty()) {
281
      cerr << usage.str();
282
      return 0;
283
    }
284
 
285
    if (!args["loop"].empty()) {
286
      loopCount = atoi(args["loop"].c_str());
287
    }
288
 
289
    if (!args["call"].empty()) {
290
      callName = args["call"];
291
    }
292
 
293
    if (!args["port"].empty()) {
294
      port = atoi(args["port"].c_str());
295
    }
296
 
297
    if (!args["server"].empty()) {
298
      runServer = args["server"] == "true";
299
    }
300
 
301
    if (!args["log-request"].empty()) {
302
      logRequests = args["log-request"] == "true";
303
    }
304
 
305
    if (!args["replay-request"].empty()) {
306
      replayRequests = args["replay-request"] == "true";
307
    }
308
 
309
    if (!args["server-type"].empty()) {
310
      serverType = args["server-type"];
311
 
312
      if (serverType == "simple") {
313
 
314
      } else if (serverType == "thread-pool") {
315
 
316
      } else if (serverType == "threaded") {
317
 
318
      } else {
319
 
320
        throw invalid_argument("Unknown server type "+serverType);
321
      }
322
    }
323
 
324
    if (!args["workers"].empty()) {
325
      workerCount = atoi(args["workers"].c_str());
326
    }
327
 
328
  } catch(exception& e) {
329
    cerr << e.what() << endl;
330
    cerr << usage;
331
  }
332
 
333
  shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
334
 
335
  // Dispatcher
336
  shared_ptr<Server> serviceHandler(new Server());
337
 
338
  if (replayRequests) {
339
    shared_ptr<Server> serviceHandler(new Server());
340
    shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
341
 
342
    // Transports
343
    shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
344
    fileTransport->setChunkSize(2 * 1024 * 1024);
345
    fileTransport->setMaxEventSize(1024 * 16);
346
    fileTransport->seekToEnd();
347
 
348
    // Protocol Factory
349
    shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
350
 
351
    TFileProcessor fileProcessor(serviceProcessor,
352
                                 protocolFactory,
353
                                 fileTransport);
354
 
355
    fileProcessor.process(0, true);
356
    exit(0);
357
  }
358
 
359
 
360
  if (runServer) {
361
 
362
    shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
363
 
364
    // Transport
365
    shared_ptr<TServerSocket> serverSocket(new TServerSocket(port));
366
 
367
    // Transport Factory
368
    shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
369
 
370
    // Protocol Factory
371
    shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
372
 
373
    if (logRequests) {
374
      // initialize the log file
375
      shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
376
      fileTransport->setChunkSize(2 * 1024 * 1024);
377
      fileTransport->setMaxEventSize(1024 * 16);
378
 
379
      transportFactory =
380
        shared_ptr<TTransportFactory>(new TPipedTransportFactory(fileTransport));
381
    }
382
 
383
    shared_ptr<Thread> serverThread;
384
 
385
    if (serverType == "simple") {
386
 
387
      serverThread = threadFactory->newThread(shared_ptr<TServer>(new TSimpleServer(serviceProcessor, serverSocket, transportFactory, protocolFactory)));
388
 
389
    } else if (serverType == "threaded") {
390
 
391
      serverThread = threadFactory->newThread(shared_ptr<TServer>(new TThreadedServer(serviceProcessor, serverSocket, transportFactory, protocolFactory)));
392
 
393
    } else if (serverType == "thread-pool") {
394
 
395
      shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
396
 
397
      threadManager->threadFactory(threadFactory);
398
      threadManager->start();
399
      serverThread = threadFactory->newThread(shared_ptr<TServer>(new TThreadPoolServer(serviceProcessor, serverSocket, transportFactory, protocolFactory, threadManager)));
400
    }
401
 
402
    cerr << "Starting the server on port " << port << endl;
403
 
404
    serverThread->start();
405
 
406
    // If we aren't running clients, just wait forever for external clients
407
 
408
    if (clientCount == 0) {
409
      serverThread->join();
410
    }
411
  }
412
 
413
  if (clientCount > 0) {
414
 
415
    Monitor monitor;
416
 
417
    size_t threadCount = 0;
418
 
419
    set<shared_ptr<Thread> > clientThreads;
420
 
421
    if (callName == "echoVoid") { loopType = T_VOID;}
422
    else if (callName == "echoByte") { loopType = T_BYTE;}
423
    else if (callName == "echoI32") { loopType = T_I32;}
424
    else if (callName == "echoI64") { loopType = T_I64;}
425
    else if (callName == "echoString") { loopType = T_STRING;}
426
    else {throw invalid_argument("Unknown service call "+callName);}
427
 
428
    for (size_t ix = 0; ix < clientCount; ix++) {
429
 
430
      shared_ptr<TSocket> socket(new TSocket("127.0.01", port));
431
      shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048));
432
      shared_ptr<TProtocol> protocol(new TBinaryProtocol(bufferedSocket));
433
      shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
434
 
435
      clientThreads.insert(threadFactory->newThread(shared_ptr<ClientThread>(new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType))));
436
    }
437
 
438
    for (std::set<shared_ptr<Thread> >::const_iterator thread = clientThreads.begin(); thread != clientThreads.end(); thread++) {
439
      (*thread)->start();
440
    }
441
 
442
    long long time00;
443
    long long time01;
444
 
445
    {Synchronized s(monitor);
446
      threadCount = clientCount;
447
 
448
      cerr << "Launch "<< clientCount << " client threads" << endl;
449
 
450
      time00 =  Util::currentTime();
451
 
452
      monitor.notifyAll();
453
 
454
      while(threadCount > 0) {
455
        monitor.wait();
456
      }
457
 
458
      time01 =  Util::currentTime();
459
    }
460
 
461
    long long firstTime = 9223372036854775807LL;
462
    long long lastTime = 0;
463
 
464
    double averageTime = 0;
465
    long long minTime = 9223372036854775807LL;
466
    long long maxTime = 0;
467
 
468
    for (set<shared_ptr<Thread> >::iterator ix = clientThreads.begin(); ix != clientThreads.end(); ix++) {
469
 
470
      shared_ptr<ClientThread> client = dynamic_pointer_cast<ClientThread>((*ix)->runnable());
471
 
472
      long long delta = client->_endTime - client->_startTime;
473
 
474
      assert(delta > 0);
475
 
476
      if (client->_startTime < firstTime) {
477
        firstTime = client->_startTime;
478
      }
479
 
480
      if (client->_endTime > lastTime) {
481
        lastTime = client->_endTime;
482
      }
483
 
484
      if (delta < minTime) {
485
        minTime = delta;
486
      }
487
 
488
      if (delta > maxTime) {
489
        maxTime = delta;
490
      }
491
 
492
      averageTime+= delta;
493
    }
494
 
495
    averageTime /= clientCount;
496
 
497
 
498
    cout <<  "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl;
499
 
500
    count_map count = serviceHandler->getCount();
501
    count_map::iterator iter;
502
    for (iter = count.begin(); iter != count.end(); ++iter) {
503
      printf("%s => %d\n", iter->first, iter->second);
504
    }
505
    cerr << "done." << endl;
506
  }
507
 
508
  return 0;
509
}