Subversion Repositories SmartDukaan

Rev

Go to most recent revision | Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
#include "TimerManager.h"
21
#include "Exception.h"
22
#include "Util.h"
23
 
24
#include <assert.h>
25
#include <iostream>
26
#include <set>
27
 
28
namespace apache { namespace thrift { namespace concurrency {
29
 
30
using boost::shared_ptr;
31
 
32
/**
33
 * TimerManager class
34
 *
35
 * @version $Id:$
36
 */
37
class TimerManager::Task : public Runnable {
38
 
39
 public:
40
  enum STATE {
41
    WAITING,
42
    EXECUTING,
43
    CANCELLED,
44
    COMPLETE
45
  };
46
 
47
  Task(shared_ptr<Runnable> runnable) :
48
    runnable_(runnable),
49
    state_(WAITING) {}
50
 
51
  ~Task() {
52
  }
53
 
54
  void run() {
55
    if (state_ == EXECUTING) {
56
      runnable_->run();
57
      state_ = COMPLETE;
58
    }
59
  }
60
 
61
 private:
62
  shared_ptr<Runnable> runnable_;
63
  class TimerManager::Dispatcher;
64
  friend class TimerManager::Dispatcher;
65
  STATE state_;
66
};
67
 
68
class TimerManager::Dispatcher: public Runnable {
69
 
70
 public:
71
  Dispatcher(TimerManager* manager) :
72
    manager_(manager) {}
73
 
74
  ~Dispatcher() {}
75
 
76
  /**
77
   * Dispatcher entry point
78
   *
79
   * As long as dispatcher thread is running, pull tasks off the task taskMap_
80
   * and execute.
81
   */
82
  void run() {
83
    {
84
      Synchronized s(manager_->monitor_);
85
      if (manager_->state_ == TimerManager::STARTING) {
86
        manager_->state_ = TimerManager::STARTED;
87
        manager_->monitor_.notifyAll();
88
      }
89
    }
90
 
91
    do {
92
      std::set<shared_ptr<TimerManager::Task> > expiredTasks;
93
      {
94
        Synchronized s(manager_->monitor_);
95
        task_iterator expiredTaskEnd;
96
        int64_t now = Util::currentTime();
97
        while (manager_->state_ == TimerManager::STARTED &&
98
               (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) == manager_->taskMap_.begin()) {
99
          int64_t timeout = 0LL;
100
          if (!manager_->taskMap_.empty()) {
101
            timeout = manager_->taskMap_.begin()->first - now;
102
          }
103
          assert((timeout != 0 && manager_->taskCount_ > 0) || (timeout == 0 && manager_->taskCount_ == 0));
104
          try {
105
            manager_->monitor_.wait(timeout);
106
          } catch (TimedOutException &e) {}
107
          now = Util::currentTime();
108
        }
109
 
110
        if (manager_->state_ == TimerManager::STARTED) {
111
          for (task_iterator ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) {
112
            shared_ptr<TimerManager::Task> task = ix->second;
113
            expiredTasks.insert(task);
114
            if (task->state_ == TimerManager::Task::WAITING) {
115
              task->state_ = TimerManager::Task::EXECUTING;
116
            }
117
            manager_->taskCount_--;
118
          }
119
          manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
120
        }
121
      }
122
 
123
      for (std::set<shared_ptr<Task> >::iterator ix =  expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
124
        (*ix)->run();
125
      }
126
 
127
    } while (manager_->state_ == TimerManager::STARTED);
128
 
129
    {
130
      Synchronized s(manager_->monitor_);
131
      if (manager_->state_ == TimerManager::STOPPING) {
132
        manager_->state_ = TimerManager::STOPPED;
133
        manager_->monitor_.notify();
134
      }
135
    }
136
    return;
137
  }
138
 
139
 private:
140
  TimerManager* manager_;
141
  friend class TimerManager;
142
};
143
 
144
TimerManager::TimerManager() :
145
  taskCount_(0),
146
  state_(TimerManager::UNINITIALIZED),
147
  dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) {
148
}
149
 
150
 
151
TimerManager::~TimerManager() {
152
 
153
  // If we haven't been explicitly stopped, do so now.  We don't need to grab
154
  // the monitor here, since stop already takes care of reentrancy.
155
 
156
  if (state_ != STOPPED) {
157
    try {
158
      stop();
159
    } catch(...) {
160
      throw;
161
      // uhoh
162
    }
163
  }
164
}
165
 
166
void TimerManager::start() {
167
  bool doStart = false;
168
  {
169
    Synchronized s(monitor_);
170
    if (threadFactory_ == NULL) {
171
      throw InvalidArgumentException();
172
    }
173
    if (state_ == TimerManager::UNINITIALIZED) {
174
      state_ = TimerManager::STARTING;
175
      doStart = true;
176
    }
177
  }
178
 
179
  if (doStart) {
180
    dispatcherThread_ = threadFactory_->newThread(dispatcher_);
181
    dispatcherThread_->start();
182
  }
183
 
184
  {
185
    Synchronized s(monitor_);
186
    while (state_ == TimerManager::STARTING) {
187
      monitor_.wait();
188
    }
189
    assert(state_ != TimerManager::STARTING);
190
  }
191
}
192
 
193
void TimerManager::stop() {
194
  bool doStop = false;
195
  {
196
    Synchronized s(monitor_);
197
    if (state_ == TimerManager::UNINITIALIZED) {
198
      state_ = TimerManager::STOPPED;
199
    } else if (state_ != STOPPING &&  state_ != STOPPED) {
200
      doStop = true;
201
      state_ = STOPPING;
202
      monitor_.notifyAll();
203
    }
204
    while (state_ != STOPPED) {
205
      monitor_.wait();
206
    }
207
  }
208
 
209
  if (doStop) {
210
    // Clean up any outstanding tasks
211
    taskMap_.clear();
212
 
213
    // Remove dispatcher's reference to us.
214
    dispatcher_->manager_ = NULL;
215
  }
216
}
217
 
218
shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
219
  Synchronized s(monitor_);
220
  return threadFactory_;
221
}
222
 
223
void TimerManager::threadFactory(shared_ptr<const ThreadFactory>  value) {
224
  Synchronized s(monitor_);
225
  threadFactory_ = value;
226
}
227
 
228
size_t TimerManager::taskCount() const {
229
  return taskCount_;
230
}
231
 
232
void TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) {
233
  int64_t now = Util::currentTime();
234
  timeout += now;
235
 
236
  {
237
    Synchronized s(monitor_);
238
    if (state_ != TimerManager::STARTED) {
239
      throw IllegalStateException();
240
    }
241
 
242
    // If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him
243
    // if the expiration time is shorter than the current value. Need to test before we insert,
244
    // because the new task might insert at the front.
245
    bool notifyRequired = (taskCount_ == 0) ? true : timeout < taskMap_.begin()->first;
246
 
247
    taskCount_++;
248
    taskMap_.insert(std::pair<int64_t, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
249
 
250
    // If the task map was empty, or if we have an expiration that is earlier
251
    // than any previously seen, kick the dispatcher so it can update its
252
    // timeout
253
    if (notifyRequired) {
254
      monitor_.notify();
255
    }
256
  }
257
}
258
 
259
void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) {
260
 
261
  int64_t expiration;
262
  Util::toMilliseconds(expiration, value);
263
 
264
  int64_t now = Util::currentTime();
265
 
266
  if (expiration < now) {
267
    throw  InvalidArgumentException();
268
  }
269
 
270
  add(task, expiration - now);
271
}
272
 
273
 
274
void TimerManager::remove(shared_ptr<Runnable> task) {
275
  Synchronized s(monitor_);
276
  if (state_ != TimerManager::STARTED) {
277
    throw IllegalStateException();
278
  }
279
}
280
 
281
const TimerManager::STATE TimerManager::state() const { return state_; }
282
 
283
}}} // apache::thrift::concurrency
284