Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
#include <config.h>
21
#include <concurrency/Thread.h>
22
#include <concurrency/PosixThreadFactory.h>
23
#include <concurrency/Monitor.h>
24
#include <concurrency/Util.h>
25
 
26
#include <assert.h>
27
#include <unistd.h>
28
#include <iostream>
29
#include <set>
30
 
31
namespace apache { namespace thrift { namespace concurrency { namespace test {
32
 
33
using boost::shared_ptr;
34
using namespace apache::thrift::concurrency;
35
 
36
/**
37
 * ThreadManagerTests class
38
 *
39
 * @version $Id:$
40
 */
41
class ThreadFactoryTests {
42
 
43
public:
44
 
45
  static const double ERROR;
46
 
47
  class Task: public Runnable {
48
 
49
  public:
50
 
51
    Task() {}
52
 
53
    void run() {
54
      std::cout << "\t\t\tHello World" << std::endl;
55
    }
56
  };
57
 
58
  /**
59
   * Hello world test
60
   */
61
  bool helloWorldTest() {
62
 
63
    PosixThreadFactory threadFactory = PosixThreadFactory();
64
 
65
    shared_ptr<Task> task = shared_ptr<Task>(new ThreadFactoryTests::Task());
66
 
67
    shared_ptr<Thread> thread = threadFactory.newThread(task);
68
 
69
    thread->start();
70
 
71
    thread->join();
72
 
73
    std::cout << "\t\t\tSuccess!" << std::endl;
74
 
75
    return true;
76
  }
77
 
78
  /**
79
   * Reap N threads
80
   */
81
  class ReapNTask: public Runnable {
82
 
83
   public:
84
 
85
    ReapNTask(Monitor& monitor, int& activeCount) :
86
      _monitor(monitor),
87
      _count(activeCount) {}
88
 
89
    void run() {
90
      Synchronized s(_monitor);
91
 
92
      _count--;
93
 
94
      //std::cout << "\t\t\tthread count: " << _count << std::endl;
95
 
96
      if (_count == 0) {
97
        _monitor.notify();
98
      }
99
    }
100
 
101
    Monitor& _monitor;
102
 
103
    int& _count;
104
  };
105
 
106
  bool reapNThreads(int loop=1, int count=10) {
107
 
108
    PosixThreadFactory threadFactory =  PosixThreadFactory();
109
 
110
    Monitor* monitor = new Monitor();
111
 
112
    for(int lix = 0; lix < loop; lix++) {
113
 
114
      int* activeCount  = new int(count);
115
 
116
      std::set<shared_ptr<Thread> > threads;
117
 
118
      int tix;
119
 
120
      for (tix = 0; tix < count; tix++) {
121
        try {
122
          threads.insert(threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, *activeCount))));
123
        } catch(SystemResourceException& e) {
124
          std::cout << "\t\t\tfailed to create " << lix * count + tix << " thread " << e.what() << std::endl;
125
          throw e;
126
        }
127
      }
128
 
129
      tix = 0;
130
      for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); tix++, ++thread) {
131
 
132
        try {
133
          (*thread)->start();
134
        } catch(SystemResourceException& e) {
135
          std::cout << "\t\t\tfailed to start  " << lix * count + tix << " thread " << e.what() << std::endl;
136
          throw e;
137
        }
138
      }
139
 
140
      {
141
        Synchronized s(*monitor);
142
        while (*activeCount > 0) {
143
          monitor->wait(1000);
144
        }
145
      }
146
 
147
      std::cout << "\t\t\treaped " << lix * count << " threads" << std::endl;
148
    }
149
 
150
    std::cout << "\t\t\tSuccess!" << std::endl;
151
 
152
    return true;
153
  }
154
 
155
  class SynchStartTask: public Runnable {
156
 
157
   public:
158
 
159
    enum STATE {
160
      UNINITIALIZED,
161
      STARTING,
162
      STARTED,
163
      STOPPING,
164
      STOPPED
165
    };
166
 
167
    SynchStartTask(Monitor& monitor, volatile  STATE& state) :
168
      _monitor(monitor),
169
      _state(state) {}
170
 
171
    void run() {
172
      {
173
        Synchronized s(_monitor);
174
        if (_state == SynchStartTask::STARTING) {
175
          _state = SynchStartTask::STARTED;
176
          _monitor.notify();
177
        }
178
      }
179
 
180
      {
181
        Synchronized s(_monitor);
182
        while (_state == SynchStartTask::STARTED) {
183
          _monitor.wait();
184
        }
185
 
186
        if (_state == SynchStartTask::STOPPING) {
187
          _state = SynchStartTask::STOPPED;
188
          _monitor.notifyAll();
189
        }
190
      }
191
    }
192
 
193
   private:
194
    Monitor& _monitor;
195
    volatile  STATE& _state;
196
  };
197
 
198
  bool synchStartTest() {
199
 
200
    Monitor monitor;
201
 
202
    SynchStartTask::STATE state = SynchStartTask::UNINITIALIZED;
203
 
204
    shared_ptr<SynchStartTask> task = shared_ptr<SynchStartTask>(new SynchStartTask(monitor, state));
205
 
206
    PosixThreadFactory threadFactory =  PosixThreadFactory();
207
 
208
    shared_ptr<Thread> thread = threadFactory.newThread(task);
209
 
210
    if (state == SynchStartTask::UNINITIALIZED) {
211
 
212
      state = SynchStartTask::STARTING;
213
 
214
      thread->start();
215
    }
216
 
217
    {
218
      Synchronized s(monitor);
219
      while (state == SynchStartTask::STARTING) {
220
        monitor.wait();
221
      }
222
    }
223
 
224
    assert(state != SynchStartTask::STARTING);
225
 
226
    {
227
      Synchronized s(monitor);
228
 
229
      try {
230
          monitor.wait(100);
231
      } catch(TimedOutException& e) {
232
      }
233
 
234
      if (state == SynchStartTask::STARTED) {
235
 
236
        state = SynchStartTask::STOPPING;
237
 
238
        monitor.notify();
239
      }
240
 
241
      while (state == SynchStartTask::STOPPING) {
242
        monitor.wait();
243
      }
244
    }
245
 
246
    assert(state == SynchStartTask::STOPPED);
247
 
248
    bool success = true;
249
 
250
    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "!" << std::endl;
251
 
252
    return true;
253
  }
254
 
255
  /** See how accurate monitor timeout is. */
256
 
257
  bool monitorTimeoutTest(size_t count=1000, int64_t timeout=10) {
258
 
259
    Monitor monitor;
260
 
261
    int64_t startTime = Util::currentTime();
262
 
263
    for (size_t ix = 0; ix < count; ix++) {
264
      {
265
        Synchronized s(monitor);
266
        try {
267
            monitor.wait(timeout);
268
        } catch(TimedOutException& e) {
269
        }
270
      }
271
    }
272
 
273
    int64_t endTime = Util::currentTime();
274
 
275
    double error = ((endTime - startTime) - (count * timeout)) / (double)(count * timeout);
276
 
277
    if (error < 0.0)  {
278
 
279
      error *= 1.0;
280
    }
281
 
282
    bool success = error < ThreadFactoryTests::ERROR;
283
 
284
    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << count * timeout << "ms elapsed time: "<< endTime - startTime << "ms error%: " << error * 100.0 << std::endl;
285
 
286
    return success;
287
  }
288
 
289
 
290
  class FloodTask : public Runnable {
291
  public:
292
 
293
    FloodTask(const size_t id) :_id(id) {}
294
    ~FloodTask(){
295
      if(_id % 1000 == 0) {
296
        std::cout << "\t\tthread " << _id << " done" << std::endl;
297
      }
298
    }
299
 
300
    void run(){
301
      if(_id % 1000 == 0) {
302
        std::cout << "\t\tthread " << _id << " started" << std::endl;
303
      }
304
 
305
      usleep(1);
306
    }
307
    const size_t _id;
308
  };
309
 
310
  void foo(PosixThreadFactory *tf) {
311
  }
312
 
313
  bool floodNTest(size_t loop=1, size_t count=100000) {
314
 
315
    bool success = false;
316
 
317
    for(size_t lix = 0; lix < loop; lix++) {
318
 
319
      PosixThreadFactory threadFactory = PosixThreadFactory();
320
      threadFactory.setDetached(true);
321
 
322
        for(size_t tix = 0; tix < count; tix++) {
323
 
324
          try {
325
 
326
            shared_ptr<FloodTask> task(new FloodTask(lix * count + tix ));
327
 
328
            shared_ptr<Thread> thread = threadFactory.newThread(task);
329
 
330
            thread->start();
331
 
332
            usleep(1);
333
 
334
          } catch (TException& e) {
335
 
336
            std::cout << "\t\t\tfailed to start  " << lix * count + tix << " thread " << e.what() << std::endl;
337
 
338
            return success;
339
          }
340
        }
341
 
342
        std::cout << "\t\t\tflooded " << (lix + 1) * count << " threads" << std::endl;
343
 
344
        success = true;
345
    }
346
 
347
    return success;
348
  }
349
};
350
 
351
const double ThreadFactoryTests::ERROR = .20;
352
 
353
}}}} // apache::thrift::concurrency::test
354