Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
#include "ThreadManager.h"
21
#include "Exception.h"
22
#include "Monitor.h"
23
 
24
#include <boost/shared_ptr.hpp>
25
 
26
#include <assert.h>
27
#include <queue>
28
#include <set>
29
 
30
#if defined(DEBUG)
31
#include <iostream>
32
#endif //defined(DEBUG)
33
 
34
namespace apache { namespace thrift { namespace concurrency {
35
 
36
using boost::shared_ptr;
37
using boost::dynamic_pointer_cast;
38
 
39
/**
40
 * ThreadManager class
41
 *
42
 * This class manages a pool of threads. It uses a ThreadFactory to create
43
 * threads.  It never actually creates or destroys worker threads, rather
44
 * it maintains statistics on number of idle threads, number of active threads,
45
 * task backlog, and average wait and service times.
46
 *
47
 * @version $Id:$
48
 */
49
class ThreadManager::Impl : public ThreadManager  {
50
 
51
 public:
52
  Impl() :
53
    workerCount_(0),
54
    workerMaxCount_(0),
55
    idleCount_(0),
56
    pendingTaskCountMax_(0),
57
    state_(ThreadManager::UNINITIALIZED) {}
58
 
59
  ~Impl() { stop(); }
60
 
61
  void start();
62
 
63
  void stop() { stopImpl(false); }
64
 
65
  void join() { stopImpl(true); }
66
 
67
  const ThreadManager::STATE state() const {
68
    return state_;
69
  }
70
 
71
  shared_ptr<ThreadFactory> threadFactory() const {
72
    Synchronized s(monitor_);
73
    return threadFactory_;
74
  }
75
 
76
  void threadFactory(shared_ptr<ThreadFactory> value) {
77
    Synchronized s(monitor_);
78
    threadFactory_ = value;
79
  }
80
 
81
  void addWorker(size_t value);
82
 
83
  void removeWorker(size_t value);
84
 
85
  size_t idleWorkerCount() const {
86
    return idleCount_;
87
  }
88
 
89
  size_t workerCount() const {
90
    Synchronized s(monitor_);
91
    return workerCount_;
92
  }
93
 
94
  size_t pendingTaskCount() const {
95
    Synchronized s(monitor_);
96
    return tasks_.size();
97
  }
98
 
99
  size_t totalTaskCount() const {
100
    Synchronized s(monitor_);
101
    return tasks_.size() + workerCount_ - idleCount_;
102
  }
103
 
104
  size_t pendingTaskCountMax() const {
105
    Synchronized s(monitor_);
106
    return pendingTaskCountMax_;
107
  }
108
 
109
  void pendingTaskCountMax(const size_t value) {
110
    Synchronized s(monitor_);
111
    pendingTaskCountMax_ = value;
112
  }
113
 
114
  bool canSleep();
115
 
116
  void add(shared_ptr<Runnable> value, int64_t timeout);
117
 
118
  void remove(shared_ptr<Runnable> task);
119
 
120
private:
121
  void stopImpl(bool join);
122
 
123
  size_t workerCount_;
124
  size_t workerMaxCount_;
125
  size_t idleCount_;
126
  size_t pendingTaskCountMax_;
127
 
128
  ThreadManager::STATE state_;
129
  shared_ptr<ThreadFactory> threadFactory_;
130
 
131
 
132
  friend class ThreadManager::Task;
133
  std::queue<shared_ptr<Task> > tasks_;
134
  Monitor monitor_;
135
  Monitor workerMonitor_;
136
 
137
  friend class ThreadManager::Worker;
138
  std::set<shared_ptr<Thread> > workers_;
139
  std::set<shared_ptr<Thread> > deadWorkers_;
140
  std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
141
};
142
 
143
class ThreadManager::Task : public Runnable {
144
 
145
 public:
146
  enum STATE {
147
    WAITING,
148
    EXECUTING,
149
    CANCELLED,
150
    COMPLETE
151
  };
152
 
153
  Task(shared_ptr<Runnable> runnable) :
154
    runnable_(runnable),
155
    state_(WAITING) {}
156
 
157
  ~Task() {}
158
 
159
  void run() {
160
    if (state_ == EXECUTING) {
161
      runnable_->run();
162
      state_ = COMPLETE;
163
    }
164
  }
165
 
166
 private:
167
  shared_ptr<Runnable> runnable_;
168
  friend class ThreadManager::Worker;
169
  STATE state_;
170
};
171
 
172
class ThreadManager::Worker: public Runnable {
173
  enum STATE {
174
    UNINITIALIZED,
175
    STARTING,
176
    STARTED,
177
    STOPPING,
178
    STOPPED
179
  };
180
 
181
 public:
182
  Worker(ThreadManager::Impl* manager) :
183
    manager_(manager),
184
    state_(UNINITIALIZED),
185
    idle_(false) {}
186
 
187
  ~Worker() {}
188
 
189
 private:
190
  bool isActive() const {
191
    return
192
      (manager_->workerCount_ <= manager_->workerMaxCount_) ||
193
      (manager_->state_ == JOINING && !manager_->tasks_.empty());
194
  }
195
 
196
 public:
197
  /**
198
   * Worker entry point
199
   *
200
   * As long as worker thread is running, pull tasks off the task queue and
201
   * execute.
202
   */
203
  void run() {
204
    bool active = false;
205
    bool notifyManager = false;
206
 
207
    /**
208
     * Increment worker semaphore and notify manager if worker count reached
209
     * desired max
210
     *
211
     * Note: We have to release the monitor and acquire the workerMonitor
212
     * since that is what the manager blocks on for worker add/remove
213
     */
214
    {
215
      Synchronized s(manager_->monitor_);
216
      active = manager_->workerCount_ < manager_->workerMaxCount_;
217
      if (active) {
218
        manager_->workerCount_++;
219
        notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
220
      }
221
    }
222
 
223
    if (notifyManager) {
224
      Synchronized s(manager_->workerMonitor_);
225
      manager_->workerMonitor_.notify();
226
      notifyManager = false;
227
    }
228
 
229
    while (active) {
230
      shared_ptr<ThreadManager::Task> task;
231
 
232
      /**
233
       * While holding manager monitor block for non-empty task queue (Also
234
       * check that the thread hasn't been requested to stop). Once the queue
235
       * is non-empty, dequeue a task, release monitor, and execute. If the
236
       * worker max count has been decremented such that we exceed it, mark
237
       * ourself inactive, decrement the worker count and notify the manager
238
       * (technically we're notifying the next blocked thread but eventually
239
       * the manager will see it.
240
       */
241
      {
242
        Synchronized s(manager_->monitor_);
243
        active = isActive();
244
 
245
        while (active && manager_->tasks_.empty()) {
246
          manager_->idleCount_++;
247
          idle_ = true;
248
          manager_->monitor_.wait();
249
          active = isActive();
250
          idle_ = false;
251
          manager_->idleCount_--;
252
        }
253
 
254
        if (active) {
255
          if (!manager_->tasks_.empty()) {
256
            task = manager_->tasks_.front();
257
            manager_->tasks_.pop();
258
            if (task->state_ == ThreadManager::Task::WAITING) {
259
              task->state_ = ThreadManager::Task::EXECUTING;
260
            }
261
 
262
            /* If we have a pending task max and we just dropped below it, wakeup any
263
               thread that might be blocked on add. */
264
            if (manager_->pendingTaskCountMax_ != 0 &&
265
                manager_->tasks_.size() == manager_->pendingTaskCountMax_ - 1) {
266
              manager_->monitor_.notify();
267
            }
268
          }
269
        } else {
270
          idle_ = true;
271
          manager_->workerCount_--;
272
          notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
273
        }
274
      }
275
 
276
      if (task != NULL) {
277
        if (task->state_ == ThreadManager::Task::EXECUTING) {
278
          try {
279
            task->run();
280
          } catch(...) {
281
            // XXX need to log this
282
          }
283
        }
284
      }
285
    }
286
 
287
    {
288
      Synchronized s(manager_->workerMonitor_);
289
      manager_->deadWorkers_.insert(this->thread());
290
      if (notifyManager) {
291
        manager_->workerMonitor_.notify();
292
      }
293
    }
294
 
295
    return;
296
  }
297
 
298
  private:
299
    ThreadManager::Impl* manager_;
300
    friend class ThreadManager::Impl;
301
    STATE state_;
302
    bool idle_;
303
};
304
 
305
 
306
  void ThreadManager::Impl::addWorker(size_t value) {
307
  std::set<shared_ptr<Thread> > newThreads;
308
  for (size_t ix = 0; ix < value; ix++) {
309
    class ThreadManager::Worker;
310
    shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
311
    newThreads.insert(threadFactory_->newThread(worker));
312
  }
313
 
314
  {
315
    Synchronized s(monitor_);
316
    workerMaxCount_ += value;
317
    workers_.insert(newThreads.begin(), newThreads.end());
318
  }
319
 
320
  for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
321
    shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
322
    worker->state_ = ThreadManager::Worker::STARTING;
323
    (*ix)->start();
324
    idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
325
  }
326
 
327
  {
328
    Synchronized s(workerMonitor_);
329
    while (workerCount_ != workerMaxCount_) {
330
      workerMonitor_.wait();
331
    }
332
  }
333
}
334
 
335
void ThreadManager::Impl::start() {
336
 
337
  if (state_ == ThreadManager::STOPPED) {
338
    return;
339
  }
340
 
341
  {
342
    Synchronized s(monitor_);
343
    if (state_ == ThreadManager::UNINITIALIZED) {
344
      if (threadFactory_ == NULL) {
345
        throw InvalidArgumentException();
346
      }
347
      state_ = ThreadManager::STARTED;
348
      monitor_.notifyAll();
349
    }
350
 
351
    while (state_ == STARTING) {
352
      monitor_.wait();
353
    }
354
  }
355
}
356
 
357
void ThreadManager::Impl::stopImpl(bool join) {
358
  bool doStop = false;
359
  if (state_ == ThreadManager::STOPPED) {
360
    return;
361
  }
362
 
363
  {
364
    Synchronized s(monitor_);
365
    if (state_ != ThreadManager::STOPPING &&
366
        state_ != ThreadManager::JOINING &&
367
        state_ != ThreadManager::STOPPED) {
368
      doStop = true;
369
      state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
370
    }
371
  }
372
 
373
  if (doStop) {
374
    removeWorker(workerCount_);
375
  }
376
 
377
  // XXX
378
  // should be able to block here for transition to STOPPED since we're no
379
  // using shared_ptrs
380
 
381
  {
382
    Synchronized s(monitor_);
383
    state_ = ThreadManager::STOPPED;
384
  }
385
 
386
}
387
 
388
void ThreadManager::Impl::removeWorker(size_t value) {
389
  std::set<shared_ptr<Thread> > removedThreads;
390
  {
391
    Synchronized s(monitor_);
392
    if (value > workerMaxCount_) {
393
      throw InvalidArgumentException();
394
    }
395
 
396
    workerMaxCount_ -= value;
397
 
398
    if (idleCount_ < value) {
399
      for (size_t ix = 0; ix < idleCount_; ix++) {
400
        monitor_.notify();
401
      }
402
    } else {
403
      monitor_.notifyAll();
404
    }
405
  }
406
 
407
  {
408
    Synchronized s(workerMonitor_);
409
 
410
    while (workerCount_ != workerMaxCount_) {
411
      workerMonitor_.wait();
412
    }
413
 
414
    for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
415
      workers_.erase(*ix);
416
      idMap_.erase((*ix)->getId());
417
    }
418
 
419
    deadWorkers_.clear();
420
  }
421
}
422
 
423
  bool ThreadManager::Impl::canSleep() {
424
    const Thread::id_t id = threadFactory_->getCurrentThreadId();
425
    return idMap_.find(id) == idMap_.end();
426
  }
427
 
428
  void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout) {
429
    Synchronized s(monitor_);
430
 
431
    if (state_ != ThreadManager::STARTED) {
432
      throw IllegalStateException();
433
    }
434
 
435
    if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
436
      if (canSleep() && timeout >= 0) {
437
        while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
438
          monitor_.wait(timeout);
439
        }
440
      } else {
441
        throw TooManyPendingTasksException();
442
      }
443
    }
444
 
445
    tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
446
 
447
    // If idle thread is available notify it, otherwise all worker threads are
448
    // running and will get around to this task in time.
449
    if (idleCount_ > 0) {
450
      monitor_.notify();
451
    }
452
  }
453
 
454
void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
455
  Synchronized s(monitor_);
456
  if (state_ != ThreadManager::STARTED) {
457
    throw IllegalStateException();
458
  }
459
}
460
 
461
class SimpleThreadManager : public ThreadManager::Impl {
462
 
463
 public:
464
  SimpleThreadManager(size_t workerCount=4, size_t pendingTaskCountMax=0) :
465
    workerCount_(workerCount),
466
    pendingTaskCountMax_(pendingTaskCountMax),
467
    firstTime_(true) {
468
  }
469
 
470
  void start() {
471
    ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
472
    ThreadManager::Impl::start();
473
    addWorker(workerCount_);
474
  }
475
 
476
 private:
477
  const size_t workerCount_;
478
  const size_t pendingTaskCountMax_;
479
  bool firstTime_;
480
  Monitor monitor_;
481
};
482
 
483
 
484
shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
485
  return shared_ptr<ThreadManager>(new ThreadManager::Impl());
486
}
487
 
488
shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count, size_t pendingTaskCountMax) {
489
  return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
490
}
491
 
492
}}} // apache::thrift::concurrency
493