Subversion Repositories SmartDukaan

Rev

Go to most recent revision | Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
#include "PosixThreadFactory.h"
21
#include "Exception.h"
22
 
23
#if GOOGLE_PERFTOOLS_REGISTER_THREAD
24
#  include <google/profiler.h>
25
#endif
26
 
27
#include <assert.h>
28
#include <pthread.h>
29
 
30
#include <iostream>
31
 
32
#include <boost/weak_ptr.hpp>
33
 
34
namespace apache { namespace thrift { namespace concurrency {
35
 
36
using boost::shared_ptr;
37
using boost::weak_ptr;
38
 
39
/**
40
 * The POSIX thread class.
41
 *
42
 * @version $Id:$
43
 */
44
class PthreadThread: public Thread {
45
 public:
46
 
47
  enum STATE {
48
    uninitialized,
49
    starting,
50
    started,
51
    stopping,
52
    stopped
53
  };
54
 
55
  static const int MB = 1024 * 1024;
56
 
57
  static void* threadMain(void* arg);
58
 
59
 private:
60
  pthread_t pthread_;
61
  STATE state_;
62
  int policy_;
63
  int priority_;
64
  int stackSize_;
65
  weak_ptr<PthreadThread> self_;
66
  bool detached_;
67
 
68
 public:
69
 
70
  PthreadThread(int policy, int priority, int stackSize, bool detached, shared_ptr<Runnable> runnable) :
71
    pthread_(0),
72
    state_(uninitialized),
73
    policy_(policy),
74
    priority_(priority),
75
    stackSize_(stackSize),
76
    detached_(detached) {
77
 
78
    this->Thread::runnable(runnable);
79
  }
80
 
81
  ~PthreadThread() {
82
    /* Nothing references this thread, if is is not detached, do a join
83
       now, otherwise the thread-id and, possibly, other resources will
84
       be leaked. */
85
    if(!detached_) {
86
      try {
87
        join();
88
      } catch(...) {
89
        // We're really hosed.
90
      }
91
    }
92
  }
93
 
94
  void start() {
95
    if (state_ != uninitialized) {
96
      return;
97
    }
98
 
99
    pthread_attr_t thread_attr;
100
    if (pthread_attr_init(&thread_attr) != 0) {
101
        throw SystemResourceException("pthread_attr_init failed");
102
    }
103
 
104
    if(pthread_attr_setdetachstate(&thread_attr,
105
                                   detached_ ?
106
                                   PTHREAD_CREATE_DETACHED :
107
                                   PTHREAD_CREATE_JOINABLE) != 0) {
108
        throw SystemResourceException("pthread_attr_setdetachstate failed");
109
    }
110
 
111
    // Set thread stack size
112
    if (pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) {
113
      throw SystemResourceException("pthread_attr_setstacksize failed");
114
    }
115
 
116
    // Set thread policy
117
    if (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {
118
      throw SystemResourceException("pthread_attr_setschedpolicy failed");
119
    }
120
 
121
    struct sched_param sched_param;
122
    sched_param.sched_priority = priority_;
123
 
124
    // Set thread priority
125
    if (pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) {
126
      throw SystemResourceException("pthread_attr_setschedparam failed");
127
    }
128
 
129
    // Create reference
130
    shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
131
    *selfRef = self_.lock();
132
 
133
    state_ = starting;
134
 
135
    if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
136
      throw SystemResourceException("pthread_create failed");
137
    }
138
  }
139
 
140
  void join() {
141
    if (!detached_ && state_ != uninitialized) {
142
      void* ignore;
143
      /* XXX
144
         If join fails it is most likely due to the fact
145
         that the last reference was the thread itself and cannot
146
         join.  This results in leaked threads and will eventually
147
         cause the process to run out of thread resources.
148
         We're beyond the point of throwing an exception.  Not clear how
149
         best to handle this. */
150
      detached_ = pthread_join(pthread_, &ignore) == 0;
151
    }
152
  }
153
 
154
  Thread::id_t getId() {
155
    return (Thread::id_t)pthread_;
156
  }
157
 
158
  shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
159
 
160
  void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
161
 
162
  void weakRef(shared_ptr<PthreadThread> self) {
163
    assert(self.get() == this);
164
    self_ = weak_ptr<PthreadThread>(self);
165
  }
166
};
167
 
168
void* PthreadThread::threadMain(void* arg) {
169
  shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
170
  delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
171
 
172
  if (thread == NULL) {
173
    return (void*)0;
174
  }
175
 
176
  if (thread->state_ != starting) {
177
    return (void*)0;
178
  }
179
 
180
#if GOOGLE_PERFTOOLS_REGISTER_THREAD
181
  ProfilerRegisterThread();
182
#endif
183
 
184
  thread->state_ = starting;
185
  thread->runnable()->run();
186
  if (thread->state_ != stopping && thread->state_ != stopped) {
187
    thread->state_ = stopping;
188
  }
189
 
190
  return (void*)0;
191
}
192
 
193
/**
194
 * POSIX Thread factory implementation
195
 */
196
class PosixThreadFactory::Impl {
197
 
198
 private:
199
  POLICY policy_;
200
  PRIORITY priority_;
201
  int stackSize_;
202
  bool detached_;
203
 
204
  /**
205
   * Converts generic posix thread schedule policy enums into pthread
206
   * API values.
207
   */
208
  static int toPthreadPolicy(POLICY policy) {
209
    switch (policy) {
210
    case OTHER:
211
      return SCHED_OTHER;
212
    case FIFO:
213
      return SCHED_FIFO;
214
    case ROUND_ROBIN:
215
      return SCHED_RR;
216
    }
217
    return SCHED_OTHER;
218
  }
219
 
220
  /**
221
   * Converts relative thread priorities to absolute value based on posix
222
   * thread scheduler policy
223
   *
224
   *  The idea is simply to divide up the priority range for the given policy
225
   * into the correpsonding relative priority level (lowest..highest) and
226
   * then pro-rate accordingly.
227
   */
228
  static int toPthreadPriority(POLICY policy, PRIORITY priority) {
229
    int pthread_policy = toPthreadPolicy(policy);
230
    int min_priority = 0;
231
    int max_priority = 0;
232
#ifdef HAVE_SCHED_GET_PRIORITY_MIN
233
    min_priority = sched_get_priority_min(pthread_policy);
234
#endif
235
#ifdef HAVE_SCHED_GET_PRIORITY_MAX
236
    max_priority = sched_get_priority_max(pthread_policy);
237
#endif
238
    int quanta = (HIGHEST - LOWEST) + 1;
239
    float stepsperquanta = (max_priority - min_priority) / quanta;
240
 
241
    if (priority <= HIGHEST) {
242
      return (int)(min_priority + stepsperquanta * priority);
243
    } else {
244
      // should never get here for priority increments.
245
      assert(false);
246
      return (int)(min_priority + stepsperquanta * NORMAL);
247
    }
248
  }
249
 
250
 public:
251
 
252
  Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
253
    policy_(policy),
254
    priority_(priority),
255
    stackSize_(stackSize),
256
    detached_(detached) {}
257
 
258
  /**
259
   * Creates a new POSIX thread to run the runnable object
260
   *
261
   * @param runnable A runnable object
262
   */
263
  shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
264
    shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, detached_, runnable));
265
    result->weakRef(result);
266
    runnable->thread(result);
267
    return result;
268
  }
269
 
270
  int getStackSize() const { return stackSize_; }
271
 
272
  void setStackSize(int value) { stackSize_ = value; }
273
 
274
  PRIORITY getPriority() const { return priority_; }
275
 
276
  /**
277
   * Sets priority.
278
   *
279
   *  XXX
280
   *  Need to handle incremental priorities properly.
281
   */
282
  void setPriority(PRIORITY value) { priority_ = value; }
283
 
284
  bool isDetached() const { return detached_; }
285
 
286
  void setDetached(bool value) { detached_ = value; }
287
 
288
  Thread::id_t getCurrentThreadId() const {
289
    return (Thread::id_t)pthread_self();
290
  }
291
 
292
};
293
 
294
PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
295
  impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
296
 
297
shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
298
 
299
int PosixThreadFactory::getStackSize() const { return impl_->getStackSize(); }
300
 
301
void PosixThreadFactory::setStackSize(int value) { impl_->setStackSize(value); }
302
 
303
PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { return impl_->getPriority(); }
304
 
305
void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) { impl_->setPriority(value); }
306
 
307
bool PosixThreadFactory::isDetached() const { return impl_->isDetached(); }
308
 
309
void PosixThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
310
 
311
Thread::id_t PosixThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
312
 
313
}}} // apache::thrift::concurrency