Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
#include <concurrency/ThreadManager.h>
21
#include <concurrency/PosixThreadFactory.h>
22
#include <concurrency/Monitor.h>
23
#include <concurrency/Util.h>
24
#include <concurrency/Mutex.h>
25
#include <protocol/TBinaryProtocol.h>
26
#include <server/TSimpleServer.h>
27
#include <server/TThreadPoolServer.h>
28
#include <server/TThreadedServer.h>
29
#include <server/TNonblockingServer.h>
30
#include <transport/TServerSocket.h>
31
#include <transport/TSocket.h>
32
#include <transport/TTransportUtils.h>
33
#include <transport/TFileTransport.h>
34
#include <TLogging.h>
35
 
36
#include "Service.h"
37
 
38
#include <unistd.h>
39
#include <boost/shared_ptr.hpp>
40
 
41
#include <iostream>
42
#include <set>
43
#include <stdexcept>
44
#include <sstream>
45
 
46
#include <map>
47
#include <ext/hash_map>
48
using __gnu_cxx::hash_map;
49
using __gnu_cxx::hash;
50
 
51
using namespace std;
52
using namespace boost;
53
 
54
using namespace apache::thrift;
55
using namespace apache::thrift::protocol;
56
using namespace apache::thrift::transport;
57
using namespace apache::thrift::server;
58
using namespace apache::thrift::concurrency;
59
 
60
using namespace test::stress;
61
 
62
struct eqstr {
63
  bool operator()(const char* s1, const char* s2) const {
64
    return strcmp(s1, s2) == 0;
65
  }
66
};
67
 
68
struct ltstr {
69
  bool operator()(const char* s1, const char* s2) const {
70
    return strcmp(s1, s2) < 0;
71
  }
72
};
73
 
74
 
75
// typedef hash_map<const char*, int, hash<const char*>, eqstr> count_map;
76
typedef map<const char*, int, ltstr> count_map;
77
 
78
class Server : public ServiceIf {
79
 public:
80
  Server() {}
81
 
82
  void count(const char* method) {
83
    Guard m(lock_);
84
    int ct = counts_[method];
85
    counts_[method] = ++ct;
86
  }
87
 
88
  void echoVoid() {
89
    count("echoVoid");
90
    // Sleep to simulate work
91
    usleep(5000);
92
    return;
93
  }
94
 
95
  count_map getCount() {
96
    Guard m(lock_);
97
    return counts_;
98
  }
99
 
100
  int8_t echoByte(const int8_t arg) {return arg;}
101
  int32_t echoI32(const int32_t arg) {return arg;}
102
  int64_t echoI64(const int64_t arg) {return arg;}
103
  void echoString(string& out, const string &arg) {
104
    if (arg != "hello") {
105
      T_ERROR_ABORT("WRONG STRING!!!!");
106
    }
107
    out = arg;
108
  }
109
  void echoList(vector<int8_t> &out, const vector<int8_t> &arg) { out = arg; }
110
  void echoSet(set<int8_t> &out, const set<int8_t> &arg) { out = arg; }
111
  void echoMap(map<int8_t, int8_t> &out, const map<int8_t, int8_t> &arg) { out = arg; }
112
 
113
private:
114
  count_map counts_;
115
  Mutex lock_;
116
 
117
};
118
 
119
class ClientThread: public Runnable {
120
public:
121
 
122
  ClientThread(shared_ptr<TTransport>transport, shared_ptr<ServiceClient> client, Monitor& monitor, size_t& workerCount, size_t loopCount, TType loopType) :
123
    _transport(transport),
124
    _client(client),
125
    _monitor(monitor),
126
    _workerCount(workerCount),
127
    _loopCount(loopCount),
128
    _loopType(loopType)
129
  {}
130
 
131
  void run() {
132
 
133
    // Wait for all worker threads to start
134
 
135
    {Synchronized s(_monitor);
136
        while(_workerCount == 0) {
137
          _monitor.wait();
138
        }
139
    }
140
 
141
    _startTime = Util::currentTime();
142
 
143
    _transport->open();
144
 
145
    switch(_loopType) {
146
    case T_VOID: loopEchoVoid(); break;
147
    case T_BYTE: loopEchoByte(); break;
148
    case T_I32: loopEchoI32(); break;
149
    case T_I64: loopEchoI64(); break;
150
    case T_STRING: loopEchoString(); break;
151
    default: cerr << "Unexpected loop type" << _loopType << endl; break;
152
    }
153
 
154
    _endTime = Util::currentTime();
155
 
156
    _transport->close();
157
 
158
    _done = true;
159
 
160
    {Synchronized s(_monitor);
161
 
162
      _workerCount--;
163
 
164
      if (_workerCount == 0) {
165
 
166
        _monitor.notify();
167
      }
168
    }
169
  }
170
 
171
  void loopEchoVoid() {
172
    for (size_t ix = 0; ix < _loopCount; ix++) {
173
      _client->echoVoid();
174
    }
175
  }
176
 
177
  void loopEchoByte() {
178
    for (size_t ix = 0; ix < _loopCount; ix++) {
179
      int8_t arg = 1;
180
      int8_t result;
181
      result =_client->echoByte(arg);
182
      assert(result == arg);
183
    }
184
  }
185
 
186
  void loopEchoI32() {
187
    for (size_t ix = 0; ix < _loopCount; ix++) {
188
      int32_t arg = 1;
189
      int32_t result;
190
      result =_client->echoI32(arg);
191
      assert(result == arg);
192
    }
193
  }
194
 
195
  void loopEchoI64() {
196
    for (size_t ix = 0; ix < _loopCount; ix++) {
197
      int64_t arg = 1;
198
      int64_t result;
199
      result =_client->echoI64(arg);
200
      assert(result == arg);
201
    }
202
  }
203
 
204
  void loopEchoString() {
205
    for (size_t ix = 0; ix < _loopCount; ix++) {
206
      string arg = "hello";
207
      string result;
208
      _client->echoString(result, arg);
209
      assert(result == arg);
210
    }
211
  }
212
 
213
  shared_ptr<TTransport> _transport;
214
  shared_ptr<ServiceClient> _client;
215
  Monitor& _monitor;
216
  size_t& _workerCount;
217
  size_t _loopCount;
218
  TType _loopType;
219
  long long _startTime;
220
  long long _endTime;
221
  bool _done;
222
  Monitor _sleep;
223
};
224
 
225
 
226
int main(int argc, char **argv) {
227
 
228
  int port = 9091;
229
  string serverType = "simple";
230
  string protocolType = "binary";
231
  size_t workerCount = 4;
232
  size_t clientCount = 20;
233
  size_t loopCount = 50000;
234
  TType loopType  = T_VOID;
235
  string callName = "echoVoid";
236
  bool runServer = true;
237
  bool logRequests = false;
238
  string requestLogPath = "./requestlog.tlog";
239
  bool replayRequests = false;
240
 
241
  ostringstream usage;
242
 
243
  usage <<
244
    argv[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>] [--clients=<client-count>] [--loop=<loop-count>]" << endl <<
245
    "\tclients        Number of client threads to create - 0 implies no clients, i.e. server only.  Default is " << clientCount << endl <<
246
    "\thelp           Prints this help text." << endl <<
247
    "\tcall           Service method to call.  Default is " << callName << endl <<
248
    "\tloop           The number of remote thrift calls each client makes.  Default is " << loopCount << endl <<
249
    "\tport           The port the server and clients should bind to for thrift network connections.  Default is " << port << endl <<
250
    "\tserver         Run the Thrift server in this process.  Default is " << runServer << endl <<
251
    "\tserver-type    Type of server, \"simple\" or \"thread-pool\".  Default is " << serverType << endl <<
252
    "\tprotocol-type  Type of protocol, \"binary\", \"ascii\", or \"xml\".  Default is " << protocolType << endl <<
253
    "\tlog-request    Log all request to ./requestlog.tlog. Default is " << logRequests << endl <<
254
    "\treplay-request Replay requests from log file (./requestlog.tlog) Default is " << replayRequests << endl <<
255
    "\tworkers        Number of thread pools workers.  Only valid for thread-pool server type.  Default is " << workerCount << endl;
256
 
257
 
258
  map<string, string>  args;
259
 
260
  for (int ix = 1; ix < argc; ix++) {
261
 
262
    string arg(argv[ix]);
263
 
264
    if (arg.compare(0,2, "--") == 0) {
265
 
266
      size_t end = arg.find_first_of("=", 2);
267
 
268
      string key = string(arg, 2, end - 2);
269
 
270
      if (end != string::npos) {
271
        args[key] = string(arg, end + 1);
272
      } else {
273
        args[key] = "true";
274
      }
275
    } else {
276
      throw invalid_argument("Unexcepted command line token: "+arg);
277
    }
278
  }
279
 
280
  try {
281
 
282
    if (!args["clients"].empty()) {
283
      clientCount = atoi(args["clients"].c_str());
284
    }
285
 
286
    if (!args["help"].empty()) {
287
      cerr << usage.str();
288
      return 0;
289
    }
290
 
291
    if (!args["loop"].empty()) {
292
      loopCount = atoi(args["loop"].c_str());
293
    }
294
 
295
    if (!args["call"].empty()) {
296
      callName = args["call"];
297
    }
298
 
299
    if (!args["port"].empty()) {
300
      port = atoi(args["port"].c_str());
301
    }
302
 
303
    if (!args["server"].empty()) {
304
      runServer = args["server"] == "true";
305
    }
306
 
307
    if (!args["log-request"].empty()) {
308
      logRequests = args["log-request"] == "true";
309
    }
310
 
311
    if (!args["replay-request"].empty()) {
312
      replayRequests = args["replay-request"] == "true";
313
    }
314
 
315
    if (!args["server-type"].empty()) {
316
      serverType = args["server-type"];
317
    }
318
 
319
    if (!args["workers"].empty()) {
320
      workerCount = atoi(args["workers"].c_str());
321
    }
322
 
323
  } catch(exception& e) {
324
    cerr << e.what() << endl;
325
    cerr << usage;
326
  }
327
 
328
  shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
329
 
330
  // Dispatcher
331
  shared_ptr<Server> serviceHandler(new Server());
332
 
333
  if (replayRequests) {
334
    shared_ptr<Server> serviceHandler(new Server());
335
    shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
336
 
337
    // Transports
338
    shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
339
    fileTransport->setChunkSize(2 * 1024 * 1024);
340
    fileTransport->setMaxEventSize(1024 * 16);
341
    fileTransport->seekToEnd();
342
 
343
    // Protocol Factory
344
    shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
345
 
346
    TFileProcessor fileProcessor(serviceProcessor,
347
                                 protocolFactory,
348
                                 fileTransport);
349
 
350
    fileProcessor.process(0, true);
351
    exit(0);
352
  }
353
 
354
 
355
  if (runServer) {
356
 
357
    shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
358
 
359
    // Protocol Factory
360
    shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
361
 
362
    // Transport Factory
363
    shared_ptr<TTransportFactory>      transportFactory;
364
 
365
    if (logRequests) {
366
      // initialize the log file
367
      shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
368
      fileTransport->setChunkSize(2 * 1024 * 1024);
369
      fileTransport->setMaxEventSize(1024 * 16);
370
 
371
      transportFactory =
372
        shared_ptr<TTransportFactory>(new TPipedTransportFactory(fileTransport));
373
    }
374
 
375
    shared_ptr<Thread> serverThread;
376
    shared_ptr<Thread> serverThread2;
377
 
378
    if (serverType == "simple") {
379
 
380
      serverThread = threadFactory->newThread(shared_ptr<TServer>(new TNonblockingServer(serviceProcessor, protocolFactory, port)));
381
      serverThread2 = threadFactory->newThread(shared_ptr<TServer>(new TNonblockingServer(serviceProcessor, protocolFactory, port+1)));
382
 
383
    } else if (serverType == "thread-pool") {
384
 
385
      shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
386
 
387
      threadManager->threadFactory(threadFactory);
388
      threadManager->start();
389
      serverThread = threadFactory->newThread(shared_ptr<TServer>(new TNonblockingServer(serviceProcessor, protocolFactory, port, threadManager)));
390
      serverThread2 = threadFactory->newThread(shared_ptr<TServer>(new TNonblockingServer(serviceProcessor, protocolFactory, port+1, threadManager)));
391
    }
392
 
393
    cerr << "Starting the server on port " << port << " and " << (port + 1) << endl;
394
    serverThread->start();
395
    serverThread2->start();
396
 
397
    // If we aren't running clients, just wait forever for external clients
398
 
399
    if (clientCount == 0) {
400
      serverThread->join();
401
      serverThread2->join();
402
    }
403
  }
404
  sleep(1);
405
 
406
  if (clientCount > 0) {
407
 
408
    Monitor monitor;
409
 
410
    size_t threadCount = 0;
411
 
412
    set<shared_ptr<Thread> > clientThreads;
413
 
414
    if (callName == "echoVoid") { loopType = T_VOID;}
415
    else if (callName == "echoByte") { loopType = T_BYTE;}
416
    else if (callName == "echoI32") { loopType = T_I32;}
417
    else if (callName == "echoI64") { loopType = T_I64;}
418
    else if (callName == "echoString") { loopType = T_STRING;}
419
    else {throw invalid_argument("Unknown service call "+callName);}
420
 
421
    for (size_t ix = 0; ix < clientCount; ix++) {
422
 
423
      shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port + (ix % 2)));
424
      shared_ptr<TFramedTransport> framedSocket(new TFramedTransport(socket));
425
      shared_ptr<TProtocol> protocol(new TBinaryProtocol(framedSocket));
426
      shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
427
 
428
      clientThreads.insert(threadFactory->newThread(shared_ptr<ClientThread>(new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType))));
429
    }
430
 
431
    for (std::set<shared_ptr<Thread> >::const_iterator thread = clientThreads.begin(); thread != clientThreads.end(); thread++) {
432
      (*thread)->start();
433
    }
434
 
435
    long long time00;
436
    long long time01;
437
 
438
    {Synchronized s(monitor);
439
      threadCount = clientCount;
440
 
441
      cerr << "Launch "<< clientCount << " client threads" << endl;
442
 
443
      time00 =  Util::currentTime();
444
 
445
      monitor.notifyAll();
446
 
447
      while(threadCount > 0) {
448
        monitor.wait();
449
      }
450
 
451
      time01 =  Util::currentTime();
452
    }
453
 
454
    long long firstTime = 9223372036854775807LL;
455
    long long lastTime = 0;
456
 
457
    double averageTime = 0;
458
    long long minTime = 9223372036854775807LL;
459
    long long maxTime = 0;
460
 
461
    for (set<shared_ptr<Thread> >::iterator ix = clientThreads.begin(); ix != clientThreads.end(); ix++) {
462
 
463
      shared_ptr<ClientThread> client = dynamic_pointer_cast<ClientThread>((*ix)->runnable());
464
 
465
      long long delta = client->_endTime - client->_startTime;
466
 
467
      assert(delta > 0);
468
 
469
      if (client->_startTime < firstTime) {
470
        firstTime = client->_startTime;
471
      }
472
 
473
      if (client->_endTime > lastTime) {
474
        lastTime = client->_endTime;
475
      }
476
 
477
      if (delta < minTime) {
478
        minTime = delta;
479
      }
480
 
481
      if (delta > maxTime) {
482
        maxTime = delta;
483
      }
484
 
485
      averageTime+= delta;
486
    }
487
 
488
    averageTime /= clientCount;
489
 
490
 
491
    cout <<  "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl;
492
 
493
    count_map count = serviceHandler->getCount();
494
    count_map::iterator iter;
495
    for (iter = count.begin(); iter != count.end(); ++iter) {
496
      printf("%s => %d\n", iter->first, iter->second);
497
    }
498
    cerr << "done." << endl;
499
  }
500
 
501
  return 0;
502
}