ref: 3d25c25aeafcf7c186ff449201ef7c061db00192
parent: 30328b85f2f4bb89d71e38b15148859a3fad14a6
parent: c12b88ae43a3a6c2293555b5abaeb9d60904b7f9
author: sijchen <[email protected]>
date: Mon Sep 5 18:41:07 EDT 2016
Merge pull request #2554 from ruil2/thread3 use pthread_condition instead of semaphores on apple platform
--- a/codec/common/inc/WelsThread.h
+++ b/codec/common/inc/WelsThread.h
@@ -58,7 +58,7 @@
virtual void ExecuteTask() = 0;
virtual WELS_THREAD_ERROR_CODE Start();
virtual void Kill();
-
+ WELS_MUTEX m_hMutex;
protected:
static WELS_THREAD_ROUTINE_TYPE TheThread (void* pParam);
--- a/codec/common/inc/WelsThreadLib.h
+++ b/codec/common/inc/WelsThreadLib.h
@@ -79,7 +79,12 @@
typedef void* (*LPWELS_THREAD_ROUTINE) (void*);
typedef pthread_mutex_t WELS_MUTEX;
+
+#ifdef __APPLE__
+typedef pthread_cond_t WELS_EVENT;
+#else
typedef sem_t* WELS_EVENT;
+#endif
#define WELS_THREAD_ROUTINE_TYPE void *
#define WELS_THREAD_ROUTINE_RETURN(rc) return (void*)(intptr_t)rc;
@@ -108,12 +113,10 @@
WELS_THREAD_ERROR_CODE WelsEventClose (WELS_EVENT* event, const char* event_name = NULL);
WELS_THREAD_ERROR_CODE WelsEventSignal (WELS_EVENT* event);
-WELS_THREAD_ERROR_CODE WelsEventWait (WELS_EVENT* event);
-WELS_THREAD_ERROR_CODE WelsEventWaitWithTimeOut (WELS_EVENT* event, uint32_t dwMilliseconds);
+WELS_THREAD_ERROR_CODE WelsEventWait (WELS_EVENT* event,WELS_MUTEX *pMutex = NULL);
+WELS_THREAD_ERROR_CODE WelsEventWaitWithTimeOut (WELS_EVENT* event, uint32_t dwMilliseconds,WELS_MUTEX *pMutex = NULL);
WELS_THREAD_ERROR_CODE WelsMultipleEventsWaitSingleBlocking (uint32_t nCount, WELS_EVENT* event_list,
- WELS_EVENT* master_event = NULL);
-WELS_THREAD_ERROR_CODE WelsMultipleEventsWaitAllBlocking (uint32_t nCount, WELS_EVENT* event_list,
- WELS_EVENT* master_event = NULL);
+ WELS_EVENT* master_event = NULL,WELS_MUTEX *pMutex = NULL);
WELS_THREAD_ERROR_CODE WelsThreadCreate (WELS_THREAD_HANDLE* thread, LPWELS_THREAD_ROUTINE routine,
void* arg, WELS_THREAD_ATTR attr);
--- a/codec/common/src/WelsTaskThread.cpp
+++ b/codec/common/src/WelsTaskThread.cpp
@@ -75,9 +75,9 @@
if (!GetRunning()) {
return WELS_THREAD_ERROR_GENERAL;
}
-
+ WelsMutexLock(&m_hMutex);
m_pTask = pTask;
-
+ WelsMutexUnlock(&m_hMutex);
SignalThread();
return WELS_THREAD_ERROR_OK;
--- a/codec/common/src/WelsThread.cpp
+++ b/codec/common/src/WelsThread.cpp
@@ -46,22 +46,21 @@
m_hThread (0),
m_bRunning (false),
m_bEndFlag (false) {
- WELS_THREAD_ERROR_CODE rc = WelsEventOpen (&m_hEvent);
- if (WELS_THREAD_ERROR_OK != rc) {
- m_hEvent = NULL;
- }
+ WelsEventOpen (&m_hEvent);
+ WelsMutexInit(&m_hMutex);
+
}
CWelsThread::~CWelsThread() {
Kill();
WelsEventClose (&m_hEvent);
- m_hEvent = NULL;
+ WelsMutexDestroy(&m_hMutex);
}
void CWelsThread::Thread() {
while (true) {
- WelsEventWait (&m_hEvent);
+ WelsEventWait (&m_hEvent,&m_hMutex);
if (GetEndFlag()) {
break;
@@ -74,10 +73,11 @@
}
WELS_THREAD_ERROR_CODE CWelsThread::Start() {
+#ifndef __APPLE__
if (NULL == m_hEvent) {
return WELS_THREAD_ERROR_GENERAL;
}
-
+#endif
if (GetRunning()) {
return WELS_THREAD_ERROR_OK;
}
--- a/codec/common/src/WelsThreadLib.cpp
+++ b/codec/common/src/WelsThreadLib.cpp
@@ -133,11 +133,10 @@
WELS_THREAD_ERROR_CODE WelsEventOpen (WELS_EVENT* event, const char* event_name) {
WELS_EVENT h = CreateEvent (NULL, FALSE, FALSE, NULL);
-
+ *event = h;
if (h == NULL) {
return WELS_THREAD_ERROR_GENERAL;
}
- *event = h;
return WELS_THREAD_ERROR_OK;
}
@@ -148,26 +147,20 @@
return WELS_THREAD_ERROR_GENERAL;
}
-WELS_THREAD_ERROR_CODE WelsEventWait (WELS_EVENT* event) {
+WELS_THREAD_ERROR_CODE WelsEventWait (WELS_EVENT* event, WELS_MUTEX* pMutex) {
return WaitForSingleObject (*event, INFINITE);
}
-WELS_THREAD_ERROR_CODE WelsEventWaitWithTimeOut (WELS_EVENT* event, uint32_t dwMilliseconds) {
+WELS_THREAD_ERROR_CODE WelsEventWaitWithTimeOut (WELS_EVENT* event, uint32_t dwMilliseconds, WELS_MUTEX* pMutex) {
return WaitForSingleObject (*event, dwMilliseconds);
}
WELS_THREAD_ERROR_CODE WelsMultipleEventsWaitSingleBlocking (uint32_t nCount,
- WELS_EVENT* event_list, WELS_EVENT* master_event) {
+ WELS_EVENT* event_list, WELS_EVENT* master_even, WELS_MUTEX* pMutext) {
// Don't need/use the master event for anything, since windows has got WaitForMultipleObjects
return WaitForMultipleObjects (nCount, event_list, FALSE, INFINITE);
}
-WELS_THREAD_ERROR_CODE WelsMultipleEventsWaitAllBlocking (uint32_t nCount,
- WELS_EVENT* event_list, WELS_EVENT* master_event) {
- // Don't need/use the master event for anything, since windows has got WaitForMultipleObjects
- return WaitForMultipleObjects (nCount, event_list, TRUE, INFINITE);
-}
-
WELS_THREAD_ERROR_CODE WelsEventClose (WELS_EVENT* event, const char* event_name) {
CloseHandle (*event);
@@ -278,10 +271,10 @@
WELS_THREAD_ERROR_CODE WelsThreadSetName (const char* thread_name) {
#ifdef APPLE_IOS
- pthread_setname_np(thread_name);
+ pthread_setname_np (thread_name);
#endif
#if defined(__ANDROID__) && __ANDROID_API__ >= 9
- pthread_setname_np(pthread_self(), thread_name);
+ pthread_setname_np (pthread_self(), thread_name);
#endif
// do nothing
return WELS_THREAD_ERROR_OK;
@@ -299,27 +292,14 @@
WELS_THREAD_ERROR_CODE WelsEventOpen (WELS_EVENT* p_event, const char* event_name) {
#ifdef __APPLE__
- if (p_event == NULL) {
- return WELS_THREAD_ERROR_GENERAL;
- }
- char strSuffix[100] = { 0 };
- if (NULL == event_name) {
- sprintf (strSuffix, "WelsSem%ld_p%ld", (intptr_t)p_event, (long) (getpid()));
- event_name = &strSuffix[0];
- }
- *p_event = sem_open (event_name, O_CREAT, (S_IRUSR | S_IWUSR)/*0600*/, 0);
- if (*p_event == (sem_t*)SEM_FAILED) {
- sem_unlink (event_name);
- *p_event = NULL;
- return WELS_THREAD_ERROR_GENERAL;
- } else {
- //printf("event_open:%x, %s\n", p_event, event_name);
- return WELS_THREAD_ERROR_OK;
- }
+ WELS_THREAD_ERROR_CODE err= pthread_cond_init (p_event, NULL);
+ return err;
#else
WELS_EVENT event = (WELS_EVENT) malloc (sizeof (*event));
- if (event == NULL)
+ if (event == NULL){
+ *p_event = NULL;
return WELS_THREAD_ERROR_GENERAL;
+ }
WELS_THREAD_ERROR_CODE err = sem_init (event, 0, 0);
if (!err) {
*p_event = event;
@@ -326,6 +306,7 @@
return err;
}
free (event);
+ *p_event = NULL;
return err;
#endif
}
@@ -332,13 +313,12 @@
WELS_THREAD_ERROR_CODE WelsEventClose (WELS_EVENT* event, const char* event_name) {
//printf("event_close:%x, %s\n", event, event_name);
#ifdef __APPLE__
- WELS_THREAD_ERROR_CODE err = sem_close (*event); // match with sem_open
- if (event_name)
- sem_unlink (event_name);
+ WELS_THREAD_ERROR_CODE err = pthread_cond_destroy (event);
return err;
#else
WELS_THREAD_ERROR_CODE err = sem_destroy (*event); // match with sem_init
free (*event);
+ *event = NULL;
return err;
#endif
}
@@ -349,37 +329,37 @@
WELS_THREAD_ERROR_CODE WelsEventSignal (WELS_EVENT* event) {
WELS_THREAD_ERROR_CODE err = 0;
+#ifdef __APPLE__
+ err = pthread_cond_signal (event);
+#else
// int32_t val = 0;
// sem_getvalue(event, &val);
// fprintf( stderr, "before signal it, val= %d..\n",val );
- err = sem_post (*event);
+ if (event != NULL)
+ err = sem_post (*event);
// sem_getvalue(event, &val);
// fprintf( stderr, "after signal it, val= %d..\n",val );
+#endif
return err;
}
-WELS_THREAD_ERROR_CODE WelsEventWait (WELS_EVENT* event) {
+WELS_THREAD_ERROR_CODE WelsEventWait (WELS_EVENT* event, WELS_MUTEX* pMutex) {
+#ifdef __APPLE__
+ return pthread_cond_wait (event, pMutex);
+#else
return sem_wait (*event); // blocking until signaled
+#endif
}
-WELS_THREAD_ERROR_CODE WelsEventWaitWithTimeOut (WELS_EVENT* event, uint32_t dwMilliseconds) {
+WELS_THREAD_ERROR_CODE WelsEventWaitWithTimeOut (WELS_EVENT* event, uint32_t dwMilliseconds, WELS_MUTEX* pMutex) {
+
if (dwMilliseconds != (uint32_t) - 1) {
- return sem_wait (*event);
- } else {
#if defined(__APPLE__)
- int32_t err = 0;
- int32_t wait_count = 0;
- do {
- err = sem_trywait (*event);
- if (WELS_THREAD_ERROR_OK == err)
- break;// WELS_THREAD_ERROR_OK;
- else if (wait_count > 0)
- break;
- usleep (dwMilliseconds * 1000);
- ++ wait_count;
- } while (1);
- return err;
+ return pthread_cond_wait (event, pMutex);
#else
+ return sem_wait (*event);
+#endif
+ } else {
struct timespec ts;
struct timeval tv;
@@ -389,19 +369,23 @@
ts.tv_sec = tv.tv_sec + ts.tv_nsec / 1000000000;
ts.tv_nsec %= 1000000000;
+#if defined(__APPLE__)
+ return pthread_cond_timedwait (event, pMutex, &ts);
+#else
return sem_timedwait (*event, &ts);
-#endif//__APPLE__
+#endif
}
+
}
WELS_THREAD_ERROR_CODE WelsMultipleEventsWaitSingleBlocking (uint32_t nCount,
- WELS_EVENT* event_list, WELS_EVENT* master_event) {
+ WELS_EVENT* event_list, WELS_EVENT* master_event, WELS_MUTEX* pMutex) {
uint32_t nIdx = 0;
uint32_t uiAccessTime = 2; // 2 us once
if (nCount == 0)
return WELS_THREAD_ERROR_WAIT_FAILED;
-
+#if defined(__APPLE__)
if (master_event != NULL) {
// This design relies on the events actually being semaphores;
// if multiple events in the list have been signalled, the master
@@ -408,7 +392,7 @@
// event should have a similar count (events in windows can't keep
// track of the actual count, but the master event isn't needed there
// since it uses WaitForMultipleObjects).
- int32_t err = sem_wait (*master_event);
+ int32_t err = pthread_cond_wait (master_event, pMutex);
if (err != WELS_THREAD_ERROR_OK)
return err;
uiAccessTime = 0; // no blocking, just quickly loop through all to find the one that was signalled
@@ -425,7 +409,7 @@
* pthread_cond_timedwait() might be better choice if need
*/
do {
- err = sem_trywait (event_list[nIdx]);
+ err = pthread_cond_wait (&event_list[nIdx], pMutex);
if (WELS_THREAD_ERROR_OK == err)
return WELS_THREAD_ERROR_WAIT_OBJECT_0 + nIdx;
else if (wait_count > 0 || uiAccessTime == 0)
@@ -446,60 +430,53 @@
uiAccessTime = 2;
}
}
+#else
+ if (master_event != NULL) {
+ // This design relies on the events actually being semaphores;
+ // if multiple events in the list have been signalled, the master
+ // event should have a similar count (events in windows can't keep
+ // track of the actual count, but the master event isn't needed there
+ // since it uses WaitForMultipleObjects).
+ int32_t err = sem_wait (*master_event);
+ if (err != WELS_THREAD_ERROR_OK)
+ return err;
+ uiAccessTime = 0; // no blocking, just quickly loop through all to find the one that was signalled
+ }
- return WELS_THREAD_ERROR_WAIT_FAILED;
-}
-
-WELS_THREAD_ERROR_CODE WelsMultipleEventsWaitAllBlocking (uint32_t nCount,
- WELS_EVENT* event_list, WELS_EVENT* master_event) {
- uint32_t nIdx = 0;
- uint32_t uiCountSignals = 0;
- uint32_t uiSignalFlag = 0; // UGLY: suppose maximal event number up to 32
-
- if (nCount == 0 || nCount > (sizeof (uint32_t) << 3))
- return WELS_THREAD_ERROR_WAIT_FAILED;
-
while (1) {
nIdx = 0; // access each event by order
while (nIdx < nCount) {
- const uint32_t kuiBitwiseFlag = (1 << nIdx);
+ int32_t err = 0;
+ int32_t wait_count = 0;
- if ((uiSignalFlag & kuiBitwiseFlag) != kuiBitwiseFlag) { // non-blocking mode
- int32_t err = 0;
-// fprintf( stderr, "sem_wait(): start to wait event %d..\n", nIdx );
- if (master_event == NULL) {
- err = sem_wait (event_list[nIdx]);
- } else {
- err = sem_wait (*master_event);
- if (err == WELS_THREAD_ERROR_OK) {
- err = sem_wait (event_list[nIdx]);
- if (err != WELS_THREAD_ERROR_OK) {
- // We successfully waited for the master event,
- // but waiting for the individual event failed (e.g. EINTR?).
- // Increase the master event count so that the next retry will
- // work as intended.
- sem_post (*master_event);
- }
- }
- }
-// fprintf( stderr, "sem_wait(): wait event %d result %d errno %d..\n", nIdx, err, errno );
- if (WELS_THREAD_ERROR_OK == err) {
-// int32_t val = 0;
-// sem_getvalue(&event_list[nIdx], &val);
-// fprintf( stderr, "after sem_timedwait(), event_list[%d] semaphore value= %d..\n", nIdx, val);
-
- uiSignalFlag |= kuiBitwiseFlag;
- ++ uiCountSignals;
- if (uiCountSignals >= nCount) {
- return WELS_THREAD_ERROR_OK;
- }
- }
- }
+ /*
+ * although such interface is not used in __GNUC__ like platform, to use
+ * pthread_cond_timedwait() might be better choice if need
+ */
+ do {
+ err = sem_trywait (event_list[nIdx]);
+ if (WELS_THREAD_ERROR_OK == err)
+ return WELS_THREAD_ERROR_WAIT_OBJECT_0 + nIdx;
+ else if (wait_count > 0 || uiAccessTime == 0)
+ break;
+ usleep (uiAccessTime);
+ ++ wait_count;
+ } while (1);
// we do need access next event next time
++ nIdx;
}
+ usleep (1); // switch to working threads
+ if (master_event != NULL) {
+ // A master event was used and was signalled, but none of the events in the
+ // list was found to be signalled, thus wait a little more when rechecking
+ // the list to avoid busylooping here.
+ // If we ever hit this codepath it's mostly a bug in the code that signals
+ // the events.
+ uiAccessTime = 2;
+ }
}
+#endif
return WELS_THREAD_ERROR_WAIT_FAILED;
}
@@ -519,7 +496,7 @@
#else
int32_t count = 0;
for (int i = 0; i < CPU_SETSIZE; i++) {
- if (CPU_ISSET(i, &cpuset)) {
+ if (CPU_ISSET (i, &cpuset)) {
count++;
}
}
--- a/codec/encoder/core/inc/mt_defs.h
+++ b/codec/encoder/core/inc/mt_defs.h
@@ -89,7 +89,7 @@
uint8_t* pThreadBsBuffer[MAX_THREADS_NUM]; //actual memory for slice buffer
bool bThreadBsBufferUsage[MAX_THREADS_NUM];
WELS_MUTEX mutexThreadBsBufferUsage;
-
+WELS_MUTEX mutexEvent;
} SSliceThreading;
#endif//MULTIPLE_THREADING_DEFINES_H__
--- a/codec/encoder/core/inc/wels_task_management.h
+++ b/codec/encoder/core/inc/wels_task_management.h
@@ -105,7 +105,7 @@
int32_t m_iWaitTaskNum;
WELS_EVENT m_hTaskEvent;
-
+ WELS_MUTEX m_hEventMutex;
WelsCommon::CWelsLock m_cWaitTaskNumLock;
private:
--- a/codec/encoder/core/src/encoder_ext.cpp
+++ b/codec/encoder/core/src/encoder_ext.cpp
@@ -3804,24 +3804,6 @@
else if ((SM_SIZELIMITED_SLICE == pParam->sSliceArgument.uiSliceMode) && (pSvcParam->iMultipleThreadIdc > 1)) {
const int32_t kiPartitionCnt = pCtx->iActiveThreadsNum;
-#if 0 //TODO: temporarily use this to keep old codes for a while, will remove old codes later
- int32_t iRet = 0;
- // to fire slice coding threads
- iRet = FiredSliceThreads (pCtx, &pCtx->pSliceThreading->pThreadPEncCtx[0],
- &pCtx->pSliceThreading->pReadySliceCodingEvent[0],
- &pCtx->pSliceThreading->pThreadMasterEvent[0],
- pFbi, kiPartitionCnt, &pCtx->pCurDqLayer->sSliceEncCtx, true);
- if (iRet) {
- WelsLog (pLogCtx, WELS_LOG_ERROR,
- "[MT] WelsEncoderEncodeExt(), FiredSliceThreads return(%d) failed and exit encoding frame, iSliceCount= %d, uiSliceMode= %d, iMultipleThreadIdc= %d!!",
- iRet, iSliceCount, pParam->sSliceArgument.uiSliceMode, pSvcParam->iMultipleThreadIdc);
- return ENC_RETURN_UNEXPECTED;
- }
-
- WelsMultipleEventsWaitAllBlocking (kiPartitionCnt, &pCtx->pSliceThreading->pSliceCodedEvent[0],
- &pCtx->pSliceThreading->pSliceCodedMasterEvent);
- WELS_VERIFY_RETURN_IFNEQ (pCtx->iEncoderError, ENC_RETURN_SUCCESS)
-#else
int32_t iEndMbIdx = pCtx->pCurDqLayer->sSliceEncCtx.iMbNumInFrame;
for (int32_t iIdx = kiPartitionCnt - 1; iIdx >= 0; --iIdx) {
const int32_t iFirstMbIdx =
@@ -3857,7 +3839,6 @@
pParam->sSliceArgument.uiSliceMode, pCtx->iEncoderError);
return pCtx->iEncoderError;
}
-#endif
iLayerSize = AppendSliceToFrameBs (pCtx, pLayerBsInfo, kiPartitionCnt);
} else { // for non-dynamic-slicing mode single threading branch..
--- a/codec/encoder/core/src/slice_multi_threading.cpp
+++ b/codec/encoder/core/src/slice_multi_threading.cpp
@@ -361,7 +361,8 @@
memset (&pSmt->bThreadBsBufferUsage, 0, MAX_THREADS_NUM * sizeof (bool));
iReturn = WelsMutexInit (&pSmt->mutexThreadBsBufferUsage);
WELS_VERIFY_RETURN_PROC_IF (1, (WELS_THREAD_ERROR_OK != iReturn), FreeMemorySvc (ppCtx))
-
+ iReturn = WelsMutexInit (&pSmt->mutexEvent);
+ WELS_VERIFY_RETURN_PROC_IF (1, (WELS_THREAD_ERROR_OK != iReturn), FreeMemorySvc (ppCtx));
iReturn = WelsMutexInit (& (*ppCtx)->mutexEncoderError);
WELS_VERIFY_RETURN_PROC_IF (1, (WELS_THREAD_ERROR_OK != iReturn), FreeMemorySvc (ppCtx))
@@ -411,7 +412,7 @@
WelsMutexDestroy (&pSmt->mutexSliceNumUpdate);
WelsMutexDestroy (&pSmt->mutexThreadBsBufferUsage);
WelsMutexDestroy (& ((*ppCtx)->mutexEncoderError));
-
+ WelsMutexDestroy (&pSmt->mutexEvent);
if (pSmt->pThreadPEncCtx != NULL) {
pMa->WelsFree (pSmt->pThreadPEncCtx, "pThreadPEncCtx");
pSmt->pThreadPEncCtx = NULL;
@@ -585,7 +586,7 @@
pEventsList[0], pEventsList[1], pEventsList[1], (void*)pEncPEncCtx);
iWaitRet = WelsMultipleEventsWaitSingleBlocking (iEventCount,
&pEventsList[0],
- &pEncPEncCtx->pSliceThreading->pThreadMasterEvent[iEventIdx]); // blocking until at least one event is signalled
+ &pEncPEncCtx->pSliceThreading->pThreadMasterEvent[iEventIdx],&pEncPEncCtx->pSliceThreading->mutexEvent); // blocking until at least one event is signalled
if (WELS_THREAD_ERROR_WAIT_OBJECT_0 == iWaitRet) { // start pSlice coding signal waited
//int iLayerIndex = pEncPEncCtx->pOut->iLayerBsIndex;
//SFrameBSInfo* pFrameBsInfo = pPrivateData->pFrameBsInfo;
@@ -877,10 +878,8 @@
pPriData[iIdx].pFrameBsInfo = pFrameBsInfo;
pPriData[iIdx].iSliceIndex = iIdx;
SetOneSliceBsBufferUnderMultithread (pCtx, iIdx, iIdx);
- if (pEventsList[iIdx])
- WelsEventSignal (&pEventsList[iIdx]);
- if (pMasterEventsList[iIdx])
- WelsEventSignal (&pMasterEventsList[iIdx]);
+ WelsEventSignal (&pEventsList[iIdx]);
+ WelsEventSignal (&pMasterEventsList[iIdx]);
++ iIdx;
}
--- a/codec/encoder/core/src/wels_task_management.cpp
+++ b/codec/encoder/core/src/wels_task_management.cpp
@@ -85,6 +85,7 @@
}
WelsEventOpen (&m_hTaskEvent);
+ WelsMutexInit(&m_hEventMutex);
}
CWelsTaskManageBase::~CWelsTaskManageBase() {
@@ -131,6 +132,7 @@
WELS_DELETE_OP(m_cPreEncodingTaskList[iDid]);
}
WelsEventClose (&m_hTaskEvent);
+ WelsMutexDestroy(&m_hEventMutex);
}
WelsErrorType CWelsTaskManageBase::CreateTasks (sWelsEncCtx* pEncCtx, const int32_t kiCurDid) {
@@ -225,7 +227,7 @@
m_pThreadPool->QueueTask (pTargetTaskList->GetIndexNode (iIdx));
iIdx ++;
}
- WelsEventWait (&m_hTaskEvent);
+ WelsEventWait (&m_hTaskEvent,&m_hEventMutex);
return ENC_RETURN_SUCCESS;
}