Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
#include <config.h>
21
#include <concurrency/ThreadManager.h>
22
#include <concurrency/PosixThreadFactory.h>
23
#include <concurrency/Monitor.h>
24
#include <concurrency/Util.h>
25
 
26
#include <assert.h>
27
#include <set>
28
#include <iostream>
29
#include <set>
30
#include <stdint.h>
31
 
32
namespace apache { namespace thrift { namespace concurrency { namespace test {
33
 
34
using namespace apache::thrift::concurrency;
35
 
36
/**
37
 * ThreadManagerTests class
38
 *
39
 * @version $Id:$
40
 */
41
class ThreadManagerTests {
42
 
43
public:
44
 
45
  static const double ERROR;
46
 
47
  class Task: public Runnable {
48
 
49
  public:
50
 
51
    Task(Monitor& monitor, size_t& count, int64_t timeout) :
52
      _monitor(monitor),
53
      _count(count),
54
      _timeout(timeout),
55
      _done(false) {}
56
 
57
    void run() {
58
 
59
      _startTime = Util::currentTime();
60
 
61
      {
62
        Synchronized s(_sleep);
63
 
64
        try {
65
          _sleep.wait(_timeout);
66
        } catch(TimedOutException& e) {
67
          ;
68
        }catch(...) {
69
          assert(0);
70
        }
71
      }
72
 
73
      _endTime = Util::currentTime();
74
 
75
      _done = true;
76
 
77
      {
78
        Synchronized s(_monitor);
79
 
80
        // std::cout << "Thread " << _count << " completed " << std::endl;
81
 
82
        _count--;
83
 
84
        if (_count == 0) {
85
 
86
          _monitor.notify();
87
        }
88
      }
89
    }
90
 
91
    Monitor& _monitor;
92
    size_t& _count;
93
    int64_t _timeout;
94
    int64_t _startTime;
95
    int64_t _endTime;
96
    bool _done;
97
    Monitor _sleep;
98
  };
99
 
100
  /**
101
   * Dispatch count tasks, each of which blocks for timeout milliseconds then
102
   * completes. Verify that all tasks completed and that thread manager cleans
103
   * up properly on delete.
104
   */
105
  bool loadTest(size_t count=100, int64_t timeout=100LL, size_t workerCount=4) {
106
 
107
    Monitor monitor;
108
 
109
    size_t activeCount = count;
110
 
111
    shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
112
 
113
    shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
114
 
115
    threadFactory->setPriority(PosixThreadFactory::HIGHEST);
116
 
117
    threadManager->threadFactory(threadFactory);
118
 
119
    threadManager->start();
120
 
121
    std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
122
 
123
    for (size_t ix = 0; ix < count; ix++) {
124
 
125
      tasks.insert(shared_ptr<ThreadManagerTests::Task>(new ThreadManagerTests::Task(monitor, activeCount, timeout)));
126
    }
127
 
128
    int64_t time00 = Util::currentTime();
129
 
130
    for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
131
 
132
        threadManager->add(*ix);
133
    }
134
 
135
    {
136
      Synchronized s(monitor);
137
 
138
      while(activeCount > 0) {
139
 
140
        monitor.wait();
141
      }
142
    }
143
 
144
    int64_t time01 = Util::currentTime();
145
 
146
    int64_t firstTime = 9223372036854775807LL;
147
    int64_t lastTime = 0;
148
 
149
    double averageTime = 0;
150
    int64_t minTime = 9223372036854775807LL;
151
    int64_t maxTime = 0;
152
 
153
    for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
154
 
155
      shared_ptr<ThreadManagerTests::Task> task = *ix;
156
 
157
      int64_t delta = task->_endTime - task->_startTime;
158
 
159
      assert(delta > 0);
160
 
161
      if (task->_startTime < firstTime) {
162
        firstTime = task->_startTime;
163
      }
164
 
165
      if (task->_endTime > lastTime) {
166
        lastTime = task->_endTime;
167
      }
168
 
169
      if (delta < minTime) {
170
        minTime = delta;
171
      }
172
 
173
      if (delta > maxTime) {
174
        maxTime = delta;
175
      }
176
 
177
      averageTime+= delta;
178
    }
179
 
180
    averageTime /= count;
181
 
182
    std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime << "ms" << std::endl;
183
 
184
    double expectedTime = ((count + (workerCount - 1)) / workerCount) * timeout;
185
 
186
    double error = ((time01 - time00) - expectedTime) / expectedTime;
187
 
188
    if (error < 0) {
189
      error*= -1.0;
190
    }
191
 
192
    bool success = error < ERROR;
193
 
194
    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << expectedTime << "ms elapsed time: "<< time01 - time00 << "ms error%: " << error * 100.0 << std::endl;
195
 
196
    return success;
197
  }
198
 
199
  class BlockTask: public Runnable {
200
 
201
  public:
202
 
203
    BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count) :
204
      _monitor(monitor),
205
      _bmonitor(bmonitor),
206
      _count(count) {}
207
 
208
    void run() {
209
      {
210
        Synchronized s(_bmonitor);
211
 
212
        _bmonitor.wait();
213
 
214
      }
215
 
216
      {
217
        Synchronized s(_monitor);
218
 
219
        _count--;
220
 
221
        if (_count == 0) {
222
 
223
          _monitor.notify();
224
        }
225
      }
226
    }
227
 
228
    Monitor& _monitor;
229
    Monitor& _bmonitor;
230
    size_t& _count;
231
  };
232
 
233
  /**
234
   * Block test.  Create pendingTaskCountMax tasks.  Verify that we block adding the
235
   * pendingTaskCountMax + 1th task.  Verify that we unblock when a task completes */
236
 
237
  bool blockTest(int64_t timeout=100LL, size_t workerCount=2) {
238
 
239
    bool success = false;
240
 
241
    try {
242
 
243
      Monitor bmonitor;
244
      Monitor monitor;
245
 
246
      size_t pendingTaskMaxCount = workerCount;
247
 
248
      size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
249
 
250
      shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
251
 
252
      shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
253
 
254
      threadFactory->setPriority(PosixThreadFactory::HIGHEST);
255
 
256
      threadManager->threadFactory(threadFactory);
257
 
258
      threadManager->start();
259
 
260
      std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
261
 
262
      for (size_t ix = 0; ix < workerCount; ix++) {
263
 
264
        tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[0])));
265
      }
266
 
267
      for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
268
 
269
        tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[1])));
270
      }
271
 
272
      for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
273
        threadManager->add(*ix);
274
      }
275
 
276
      if(!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
277
        throw TException("Unexpected pending task count");
278
      }
279
 
280
      shared_ptr<ThreadManagerTests::BlockTask> extraTask(new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2]));
281
 
282
      try {
283
        threadManager->add(extraTask, 1);
284
        throw TException("Unexpected success adding task in excess of pending task count");
285
      } catch(TooManyPendingTasksException& e) {
286
        throw TException("Should have timed out adding task in excess of pending task count");
287
      } catch(TimedOutException& e) {
288
        // Expected result
289
      }
290
 
291
      try {
292
        threadManager->add(extraTask, -1);
293
        throw TException("Unexpected success adding task in excess of pending task count");
294
      } catch(TimedOutException& e) {
295
        throw TException("Unexpected timeout adding task in excess of pending task count");
296
      } catch(TooManyPendingTasksException& e) {
297
        // Expected result
298
      }
299
 
300
      std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount()  << std::endl;
301
 
302
      {
303
        Synchronized s(bmonitor);
304
 
305
        bmonitor.notifyAll();
306
      }
307
 
308
      {
309
        Synchronized s(monitor);
310
 
311
        while(activeCounts[0] != 0) {
312
          monitor.wait();
313
        }
314
      }
315
 
316
      std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
317
 
318
      try {
319
        threadManager->add(extraTask, 1);
320
      } catch(TimedOutException& e) {
321
        std::cout << "\t\t\t" << "add timed out unexpectedly"  << std::endl;
322
        throw TException("Unexpected timeout adding task");
323
 
324
      } catch(TooManyPendingTasksException& e) {
325
        std::cout << "\t\t\t" << "add encountered too many pending exepctions" << std::endl;
326
        throw TException("Unexpected timeout adding task");
327
      }
328
 
329
      // Wake up tasks that were pending before and wait for them to complete
330
 
331
      {
332
        Synchronized s(bmonitor);
333
 
334
        bmonitor.notifyAll();
335
      }
336
 
337
      {
338
        Synchronized s(monitor);
339
 
340
        while(activeCounts[1] != 0) {
341
          monitor.wait();
342
        }
343
      }
344
 
345
      // Wake up the extra task and wait for it to complete
346
 
347
      {
348
        Synchronized s(bmonitor);
349
 
350
        bmonitor.notifyAll();
351
      }
352
 
353
      {
354
        Synchronized s(monitor);
355
 
356
        while(activeCounts[2] != 0) {
357
          monitor.wait();
358
        }
359
      }
360
 
361
      if(!(success = (threadManager->totalTaskCount() == 0))) {
362
        throw TException("Unexpected pending task count");
363
      }
364
 
365
    } catch(TException& e) {
366
      std::cout << "ERROR: " << e.what() << std::endl;
367
    }
368
 
369
    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
370
    return success;
371
 }
372
};
373
 
374
const double ThreadManagerTests::ERROR = .20;
375
 
376
}}}} // apache::thrift::concurrency
377
 
378
using namespace apache::thrift::concurrency::test;
379