ref: 757a596e97d91a3b55644ce5eca1d8e72e218a1d
dir: /codec/common/src/WelsThreadPool.cpp/
/*! * \copy * Copyright (c) 2009-2015, Cisco Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * * * \file WelsThreadPool.cpp * * \brief functions for Thread Pool * * \date 5/09/2012 Created * ************************************************************************************* */ #include <list> #include <map> #include "typedefs.h" #include "WelsThreadPool.h" namespace WelsCommon { CWelsThreadPool::CWelsThreadPool (IWelsThreadPoolSink* pSink, int32_t iMaxThreadNum) : m_pSink (pSink) { m_iMaxThreadNum = 0; Init (iMaxThreadNum); } CWelsThreadPool::~CWelsThreadPool() { Uninit(); } WELS_THREAD_ERROR_CODE CWelsThreadPool::OnTaskStart (CWelsTaskThread* pThread, IWelsTask* pTask) { AddThreadToBusyMap (pThread); return WELS_THREAD_ERROR_OK; } WELS_THREAD_ERROR_CODE CWelsThreadPool::OnTaskStop (CWelsTaskThread* pThread, IWelsTask* pTask) { RemoveThreadFromBusyMap (pThread); AddThreadToIdleMap (pThread); if (m_pSink) { m_pSink->OnTaskExecuted (pTask); } //WELS_INFO_TRACE("ThreadPool: Task "<<(uint32_t)pTask<<" Finished, Thread "<<(uint32_t)pThread<<" put to idle list"); SignalThread(); return WELS_THREAD_ERROR_OK; } WELS_THREAD_ERROR_CODE CWelsThreadPool::Init (int32_t iMaxThreadNum) { CWelsAutoLock cLock (m_cLockPool); //WELS_INFO_TRACE("Enter WelsThreadPool Init"); int32_t i; if (iMaxThreadNum <= 0) iMaxThreadNum = 1; m_iMaxThreadNum = iMaxThreadNum; for (i = 0; i < m_iMaxThreadNum; i++) { if (WELS_THREAD_ERROR_OK != CreateIdleThread()) { return WELS_THREAD_ERROR_GENERAL; } } if (WELS_THREAD_ERROR_OK != Start()) { return WELS_THREAD_ERROR_GENERAL; } return WELS_THREAD_ERROR_OK; } WELS_THREAD_ERROR_CODE CWelsThreadPool::Uninit() { WELS_THREAD_ERROR_CODE iReturn = WELS_THREAD_ERROR_OK; CWelsAutoLock cLock (m_cLockPool); ClearWaitedTasks(); while (GetBusyThreadNum() > 0) { //WELS_INFO_TRACE ("CWelsThreadPool::Uninit - Waiting all thread to exit"); WelsSleep (10); } if (GetIdleThreadNum() != m_iMaxThreadNum) { iReturn = WELS_THREAD_ERROR_GENERAL; } m_cLockIdleTasks.Lock(); std::map<uintptr_t, CWelsTaskThread*>::iterator iter = m_cIdleThreads.begin(); while (iter != m_cIdleThreads.end()) { DestroyThread (iter->second); ++ iter; } m_cLockIdleTasks.Unlock(); m_iMaxThreadNum = 0; Kill(); return iReturn; } void CWelsThreadPool::ExecuteTask() { //WELS_INFO_TRACE("ThreadPool: schedule tasks"); CWelsTaskThread* pThread = NULL; IWelsTask* pTask = NULL; while (GetWaitedTaskNum() > 0) { pThread = GetIdleThread(); if (pThread == NULL) { break; } pTask = GetWaitedTask(); //WELS_INFO_TRACE("ThreadPool: ExecuteTask = "<<(uint32_t)(pTask)<<" at thread = "<<(uint32_t)(pThread)); pThread->SetTask (pTask); } } WELS_THREAD_ERROR_CODE CWelsThreadPool::QueueTask (IWelsTask* pTask) { CWelsAutoLock cLock (m_cLockPool); //WELS_INFO_TRACE("ThreadPool: QueueTask = "<<(uint32_t)(pTask)); if (GetWaitedTaskNum() == 0) { CWelsTaskThread* pThread = GetIdleThread(); if (pThread != NULL) { //WELS_INFO_TRACE("ThreadPool: ExecuteTask = "<<(uint32_t)(pTask)); pThread->SetTask (pTask); return WELS_THREAD_ERROR_OK; } } AddTaskToWaitedList (pTask); SignalThread(); return WELS_THREAD_ERROR_OK; } WELS_THREAD_ERROR_CODE CWelsThreadPool::CreateIdleThread() { CWelsTaskThread* pThread = new CWelsTaskThread (this); if (NULL == pThread) { return WELS_THREAD_ERROR_GENERAL; } pThread->Start(); AddThreadToIdleMap (pThread); return WELS_THREAD_ERROR_OK; } void CWelsThreadPool::DestroyThread (CWelsTaskThread* pThread) { pThread->Kill(); delete pThread; return; } WELS_THREAD_ERROR_CODE CWelsThreadPool::AddThreadToIdleMap (CWelsTaskThread* pThread) { CWelsAutoLock cLock (m_cLockIdleTasks); uintptr_t id = pThread->GetID(); std::map<uintptr_t, CWelsTaskThread*>::iterator iter = m_cIdleThreads.find (id); if (iter != m_cIdleThreads.end()) { return WELS_THREAD_ERROR_GENERAL; } m_cIdleThreads[id] = pThread; return WELS_THREAD_ERROR_OK; } WELS_THREAD_ERROR_CODE CWelsThreadPool::AddThreadToBusyMap (CWelsTaskThread* pThread) { CWelsAutoLock cLock (m_cLockBusyTasks); uintptr_t id = pThread->GetID(); std::map<uintptr_t, CWelsTaskThread*>::iterator iter = m_cBusyThreads.find (id); if (iter != m_cBusyThreads.end()) { return WELS_THREAD_ERROR_GENERAL; } m_cBusyThreads[id] = pThread; return WELS_THREAD_ERROR_OK; } WELS_THREAD_ERROR_CODE CWelsThreadPool::RemoveThreadFromBusyMap (CWelsTaskThread* pThread) { CWelsAutoLock cLock (m_cLockBusyTasks); uintptr_t id = pThread->GetID(); std::map<uintptr_t, CWelsTaskThread*>::iterator iter = m_cBusyThreads.find (id); if (iter != m_cBusyThreads.end()) { m_cBusyThreads.erase (iter); } else { return WELS_THREAD_ERROR_GENERAL; } return WELS_THREAD_ERROR_OK; } void CWelsThreadPool::AddTaskToWaitedList (IWelsTask* pTask) { CWelsAutoLock cLock (m_cLockWaitedTasks); m_cWaitedTasks.push_back (pTask); return; } CWelsTaskThread* CWelsThreadPool::GetIdleThread() { CWelsAutoLock cLock (m_cLockIdleTasks); if (m_cIdleThreads.size() == 0) { return NULL; } std::map<uintptr_t, CWelsTaskThread*>::iterator it = m_cIdleThreads.begin(); CWelsTaskThread* pThread = it->second; m_cIdleThreads.erase (it); return pThread; } int32_t CWelsThreadPool::GetBusyThreadNum() { return static_cast<int32_t> (m_cBusyThreads.size()); } int32_t CWelsThreadPool::GetIdleThreadNum() { return static_cast<int32_t> (m_cIdleThreads.size()); } int32_t CWelsThreadPool::GetWaitedTaskNum() { return static_cast<int32_t> (m_cWaitedTasks.size()); } IWelsTask* CWelsThreadPool::GetWaitedTask() { CWelsAutoLock lock (m_cLockWaitedTasks); if (m_cWaitedTasks.size() == 0) { return NULL; } std::list<IWelsTask*>::iterator it = m_cWaitedTasks.begin(); IWelsTask* pTask = *it; m_cWaitedTasks.pop_front(); return pTask; } void CWelsThreadPool::ClearWaitedTasks() { CWelsAutoLock cLock (m_cLockWaitedTasks); std::list<IWelsTask*>::iterator iter = m_cWaitedTasks.begin(); if (m_pSink) { while (iter != m_cWaitedTasks.end()) { m_pSink->OnTaskCancelled (*iter); ++ iter; } } m_cWaitedTasks.clear(); } }