Rev 30 | Blame | Compare with Previous | Last modification | View Log | RSS feed
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing,* software distributed under the License is distributed on an* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY* KIND, either express or implied. See the License for the* specific language governing permissions and limitations* under the License.*/#include <config.h>#include <concurrency/ThreadManager.h>#include <concurrency/PosixThreadFactory.h>#include <concurrency/Monitor.h>#include <concurrency/Util.h>#include <assert.h>#include <set>#include <iostream>#include <set>#include <stdint.h>namespace apache { namespace thrift { namespace concurrency { namespace test {using namespace apache::thrift::concurrency;/*** ThreadManagerTests class** @version $Id:$*/class ThreadManagerTests {public:static const double ERROR;class Task: public Runnable {public:Task(Monitor& monitor, size_t& count, int64_t timeout) :_monitor(monitor),_count(count),_timeout(timeout),_done(false) {}void run() {_startTime = Util::currentTime();{Synchronized s(_sleep);try {_sleep.wait(_timeout);} catch(TimedOutException& e) {;}catch(...) {assert(0);}}_endTime = Util::currentTime();_done = true;{Synchronized s(_monitor);// std::cout << "Thread " << _count << " completed " << std::endl;_count--;if (_count == 0) {_monitor.notify();}}}Monitor& _monitor;size_t& _count;int64_t _timeout;int64_t _startTime;int64_t _endTime;bool _done;Monitor _sleep;};/*** Dispatch count tasks, each of which blocks for timeout milliseconds then* completes. Verify that all tasks completed and that thread manager cleans* up properly on delete.*/bool loadTest(size_t count=100, int64_t timeout=100LL, size_t workerCount=4) {Monitor monitor;size_t activeCount = count;shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());threadFactory->setPriority(PosixThreadFactory::HIGHEST);threadManager->threadFactory(threadFactory);threadManager->start();std::set<shared_ptr<ThreadManagerTests::Task> > tasks;for (size_t ix = 0; ix < count; ix++) {tasks.insert(shared_ptr<ThreadManagerTests::Task>(new ThreadManagerTests::Task(monitor, activeCount, timeout)));}int64_t time00 = Util::currentTime();for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {threadManager->add(*ix);}{Synchronized s(monitor);while(activeCount > 0) {monitor.wait();}}int64_t time01 = Util::currentTime();int64_t firstTime = 9223372036854775807LL;int64_t lastTime = 0;double averageTime = 0;int64_t minTime = 9223372036854775807LL;int64_t maxTime = 0;for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {shared_ptr<ThreadManagerTests::Task> task = *ix;int64_t delta = task->_endTime - task->_startTime;assert(delta > 0);if (task->_startTime < firstTime) {firstTime = task->_startTime;}if (task->_endTime > lastTime) {lastTime = task->_endTime;}if (delta < minTime) {minTime = delta;}if (delta > maxTime) {maxTime = delta;}averageTime+= delta;}averageTime /= count;std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime << "ms" << std::endl;double expectedTime = ((count + (workerCount - 1)) / workerCount) * timeout;double error = ((time01 - time00) - expectedTime) / expectedTime;if (error < 0) {error*= -1.0;}bool success = error < ERROR;std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << expectedTime << "ms elapsed time: "<< time01 - time00 << "ms error%: " << error * 100.0 << std::endl;return success;}class BlockTask: public Runnable {public:BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count) :_monitor(monitor),_bmonitor(bmonitor),_count(count) {}void run() {{Synchronized s(_bmonitor);_bmonitor.wait();}{Synchronized s(_monitor);_count--;if (_count == 0) {_monitor.notify();}}}Monitor& _monitor;Monitor& _bmonitor;size_t& _count;};/*** Block test. Create pendingTaskCountMax tasks. Verify that we block adding the* pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */bool blockTest(int64_t timeout=100LL, size_t workerCount=2) {bool success = false;try {Monitor bmonitor;Monitor monitor;size_t pendingTaskMaxCount = workerCount;size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());threadFactory->setPriority(PosixThreadFactory::HIGHEST);threadManager->threadFactory(threadFactory);threadManager->start();std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks;for (size_t ix = 0; ix < workerCount; ix++) {tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[0])));}for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[1])));}for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {threadManager->add(*ix);}if(!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {throw TException("Unexpected pending task count");}shared_ptr<ThreadManagerTests::BlockTask> extraTask(new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2]));try {threadManager->add(extraTask, 1);throw TException("Unexpected success adding task in excess of pending task count");} catch(TooManyPendingTasksException& e) {throw TException("Should have timed out adding task in excess of pending task count");} catch(TimedOutException& e) {// Expected result}try {threadManager->add(extraTask, -1);throw TException("Unexpected success adding task in excess of pending task count");} catch(TimedOutException& e) {throw TException("Unexpected timeout adding task in excess of pending task count");} catch(TooManyPendingTasksException& e) {// Expected result}std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;{Synchronized s(bmonitor);bmonitor.notifyAll();}{Synchronized s(monitor);while(activeCounts[0] != 0) {monitor.wait();}}std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;try {threadManager->add(extraTask, 1);} catch(TimedOutException& e) {std::cout << "\t\t\t" << "add timed out unexpectedly" << std::endl;throw TException("Unexpected timeout adding task");} catch(TooManyPendingTasksException& e) {std::cout << "\t\t\t" << "add encountered too many pending exepctions" << std::endl;throw TException("Unexpected timeout adding task");}// Wake up tasks that were pending before and wait for them to complete{Synchronized s(bmonitor);bmonitor.notifyAll();}{Synchronized s(monitor);while(activeCounts[1] != 0) {monitor.wait();}}// Wake up the extra task and wait for it to complete{Synchronized s(bmonitor);bmonitor.notifyAll();}{Synchronized s(monitor);while(activeCounts[2] != 0) {monitor.wait();}}if(!(success = (threadManager->totalTaskCount() == 0))) {throw TException("Unexpected pending task count");}} catch(TException& e) {std::cout << "ERROR: " << e.what() << std::endl;}std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;return success;}};const double ThreadManagerTests::ERROR = .20;}}}} // apache::thrift::concurrencyusing namespace apache::thrift::concurrency::test;