Subversion Repositories SmartDukaan

Rev

Rev 30 | Blame | Compare with Previous | Last modification | View Log | RSS feed

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#include <config.h>
#include <concurrency/ThreadManager.h>
#include <concurrency/PosixThreadFactory.h>
#include <concurrency/Monitor.h>
#include <concurrency/Util.h>

#include <assert.h>
#include <set>
#include <iostream>
#include <set>
#include <stdint.h>

namespace apache { namespace thrift { namespace concurrency { namespace test {

using namespace apache::thrift::concurrency;

/**
 * ThreadManagerTests class
 *
 * @version $Id:$
 */
class ThreadManagerTests {

public:

  static const double ERROR;

  class Task: public Runnable {

  public:

    Task(Monitor& monitor, size_t& count, int64_t timeout) :
      _monitor(monitor),
      _count(count),
      _timeout(timeout),
      _done(false) {}

    void run() {

      _startTime = Util::currentTime();

      {
        Synchronized s(_sleep);

        try {
          _sleep.wait(_timeout);
        } catch(TimedOutException& e) {
          ;
        }catch(...) {
          assert(0);
        }
      }

      _endTime = Util::currentTime();

      _done = true;

      {
        Synchronized s(_monitor);

        // std::cout << "Thread " << _count << " completed " << std::endl;

        _count--;

        if (_count == 0) {

          _monitor.notify();
        }
      }
    }

    Monitor& _monitor;
    size_t& _count;
    int64_t _timeout;
    int64_t _startTime;
    int64_t _endTime;
    bool _done;
    Monitor _sleep;
  };

  /**
   * Dispatch count tasks, each of which blocks for timeout milliseconds then
   * completes. Verify that all tasks completed and that thread manager cleans
   * up properly on delete.
   */
  bool loadTest(size_t count=100, int64_t timeout=100LL, size_t workerCount=4) {

    Monitor monitor;

    size_t activeCount = count;

    shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);

    shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());

    threadFactory->setPriority(PosixThreadFactory::HIGHEST);

    threadManager->threadFactory(threadFactory);

    threadManager->start();

    std::set<shared_ptr<ThreadManagerTests::Task> > tasks;

    for (size_t ix = 0; ix < count; ix++) {

      tasks.insert(shared_ptr<ThreadManagerTests::Task>(new ThreadManagerTests::Task(monitor, activeCount, timeout)));
    }

    int64_t time00 = Util::currentTime();

    for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {

        threadManager->add(*ix);
    }

    {
      Synchronized s(monitor);

      while(activeCount > 0) {

        monitor.wait();
      }
    }

    int64_t time01 = Util::currentTime();

    int64_t firstTime = 9223372036854775807LL;
    int64_t lastTime = 0;

    double averageTime = 0;
    int64_t minTime = 9223372036854775807LL;
    int64_t maxTime = 0;

    for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {

      shared_ptr<ThreadManagerTests::Task> task = *ix;

      int64_t delta = task->_endTime - task->_startTime;

      assert(delta > 0);

      if (task->_startTime < firstTime) {
        firstTime = task->_startTime;
      }

      if (task->_endTime > lastTime) {
        lastTime = task->_endTime;
      }

      if (delta < minTime) {
        minTime = delta;
      }

      if (delta > maxTime) {
        maxTime = delta;
      }

      averageTime+= delta;
    }

    averageTime /= count;

    std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime << "ms" << std::endl;

    double expectedTime = ((count + (workerCount - 1)) / workerCount) * timeout;

    double error = ((time01 - time00) - expectedTime) / expectedTime;

    if (error < 0) {
      error*= -1.0;
    }

    bool success = error < ERROR;

    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << expectedTime << "ms elapsed time: "<< time01 - time00 << "ms error%: " << error * 100.0 << std::endl;

    return success;
  }

  class BlockTask: public Runnable {

  public:

    BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count) :
      _monitor(monitor),
      _bmonitor(bmonitor),
      _count(count) {}

    void run() {
      {
        Synchronized s(_bmonitor);

        _bmonitor.wait();

      }

      {
        Synchronized s(_monitor);

        _count--;

        if (_count == 0) {

          _monitor.notify();
        }
      }
    }

    Monitor& _monitor;
    Monitor& _bmonitor;
    size_t& _count;
  };

  /**
   * Block test.  Create pendingTaskCountMax tasks.  Verify that we block adding the
   * pendingTaskCountMax + 1th task.  Verify that we unblock when a task completes */

  bool blockTest(int64_t timeout=100LL, size_t workerCount=2) {

    bool success = false;

    try {

      Monitor bmonitor;
      Monitor monitor;

      size_t pendingTaskMaxCount = workerCount;

      size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};

      shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);

      shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());

      threadFactory->setPriority(PosixThreadFactory::HIGHEST);

      threadManager->threadFactory(threadFactory);

      threadManager->start();

      std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks;

      for (size_t ix = 0; ix < workerCount; ix++) {

        tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[0])));
      }

      for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {

        tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[1])));
      }

      for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
        threadManager->add(*ix);
      }

      if(!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
        throw TException("Unexpected pending task count");
      }

      shared_ptr<ThreadManagerTests::BlockTask> extraTask(new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2]));

      try {
        threadManager->add(extraTask, 1);
        throw TException("Unexpected success adding task in excess of pending task count");
      } catch(TooManyPendingTasksException& e) {
        throw TException("Should have timed out adding task in excess of pending task count");
      } catch(TimedOutException& e) {
        // Expected result
      }

      try {
        threadManager->add(extraTask, -1);
        throw TException("Unexpected success adding task in excess of pending task count");
      } catch(TimedOutException& e) {
        throw TException("Unexpected timeout adding task in excess of pending task count");
      } catch(TooManyPendingTasksException& e) {
        // Expected result
      }

      std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount()  << std::endl;

      {
        Synchronized s(bmonitor);

        bmonitor.notifyAll();
      }

      {
        Synchronized s(monitor);

        while(activeCounts[0] != 0) {
          monitor.wait();
        }
      }

      std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;

      try {
        threadManager->add(extraTask, 1);
      } catch(TimedOutException& e) {
        std::cout << "\t\t\t" << "add timed out unexpectedly"  << std::endl;
        throw TException("Unexpected timeout adding task");

      } catch(TooManyPendingTasksException& e) {
        std::cout << "\t\t\t" << "add encountered too many pending exepctions" << std::endl;
        throw TException("Unexpected timeout adding task");
      }

      // Wake up tasks that were pending before and wait for them to complete

      {
        Synchronized s(bmonitor);

        bmonitor.notifyAll();
      }

      {
        Synchronized s(monitor);

        while(activeCounts[1] != 0) {
          monitor.wait();
        }
      }

      // Wake up the extra task and wait for it to complete

      {
        Synchronized s(bmonitor);

        bmonitor.notifyAll();
      }

      {
        Synchronized s(monitor);

        while(activeCounts[2] != 0) {
          monitor.wait();
        }
      }

      if(!(success = (threadManager->totalTaskCount() == 0))) {
        throw TException("Unexpected pending task count");
      }

    } catch(TException& e) {
      std::cout << "ERROR: " << e.what() << std::endl;
    }

    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
    return success;
 }
};

const double ThreadManagerTests::ERROR = .20;

}}}} // apache::thrift::concurrency

using namespace apache::thrift::concurrency::test;