/////////////////////////////////////////////////////////////////////////////// // Copyright (C) 2002-2016, Open Design Alliance (the "Alliance"). // All rights reserved. // // This software and its documentation and related materials are owned by // the Alliance. The software may only be incorporated into application // programs owned by members of the Alliance, subject to a signed // Membership Agreement and Supplemental Software License Agreement with the // Alliance. The structure and organization of this software are the valuable // trade secrets of the Alliance and its suppliers. The software is also // protected by copyright law and international treaty provisions. Application // programs incorporating this software must include the following statement // with their copyright notices: // // This application incorporates Teigha(R) software pursuant to a license // agreement with Open Design Alliance. // Teigha(R) Copyright (C) 2002-2016 by Open Design Alliance. // All rights reserved. // // By use of this software, its documentation or related materials, you // acknowledge and accept the above terms. /////////////////////////////////////////////////////////////////////////////// #ifndef _ODRXTHREADPOOLSERVICE_INCLUDED_ #define _ODRXTHREADPOOLSERVICE_INCLUDED_ /* { Secret } **/ #include "TD_PackPush.h" #include "RxModule.h" #include "ThreadsCounter.h" #include "OdArray.h" #include "StaticRxObject.h" #include class OdRxThreadPoolService; typedef ptrdiff_t OdApcParamType; typedef void (*OdApcEntryPointVoidParam)( OdApcParamType parameter ); typedef void (*OdApcEntryPointRxObjParam)( OdRxObject* parameter ); /** \details This interface represents APC thread object. Library: TD_Db */ class ODRX_ABSTRACT OdApcThread : public OdRxObject { public: /** \details Execute atomic portion of code encapsulated as OdApcEntryPointVoidParam and returns immediately. \param ep [in] Function pointer to execute. \param parameter [in] Argument which will be passed into executed function. */ virtual void asyncProcCall( OdApcEntryPointVoidParam ep, OdApcParamType parameter ) = 0; /** \details Execute atomic portion of code encapsulated as OdApcEntryPointRxObjParam and returns immediately. \param ep [in] Function pointer to execute. \param parameter [in] Argument which will be passed into executed function. */ virtual void asyncProcCall( OdApcEntryPointRxObjParam ep, OdRxObject* parameter ) = 0; /** \details Waits for completion of function execution in the current thread. */ virtual void wait() = 0; /** \details Returns thread's unique system identifier. */ virtual unsigned int getId() const = 0; }; /** \details This template class is a specialization of the OdSmartPtr class template for OdApcThread object pointers. */ typedef OdSmartPtr OdApcThreadPtr; /** \details This interface represents client's atomic portion of code to execute via asynchronous call. Library: TD_Db */ class ODRX_ABSTRACT OdApcAtom : public OdRxObject { public: /** \details Entry point for parallel thread. */ virtual void apcEntryPoint( OdRxObject* pMessage ) {} /** \details Entry point for parallel thread. */ virtual void apcEntryPoint( OdApcParamType pMessage ) {} }; /** \details This template class is a specialization of the OdSmartPtr class template for OdRxApcQueue object pointers. */ typedef OdSmartPtr OdApcAtomPtr; /** \details This interface represents APC object pool. Library: TD_Db */ class OdApcObjectPool : public OdRxObject { public: /** \details Reserve specified number of objects in object pool. */ virtual void reserve( OdUInt32 n ) = 0; /** \details Take object from object pool. */ virtual OdRxObjectPtr take() = 0; /** \details Put object into object pool. */ virtual void put( OdRxObject* pObj ) = 0; }; typedef OdSmartPtr OdApcObjectPoolPtr; class OdApcObjectPool; /** \details This interface represents APC framework queue. Library: TD_Db */ class ODRX_ABSTRACT OdApcQueue : public OdRxObject { public: /** \details Returns reference to the threading framework. */ virtual OdRxThreadPoolService& framework() = 0; /** \details Sets reference to helper atom pool object, where every atom object is being put after APC call is returned. */ virtual void setAtomPoolRef( OdApcObjectPool* pAtomPool ) = 0; /** \details Adds atomic portion of code encapsulated in OdRxAtom to queue and returns immediately. */ virtual void addEntryPoint( OdApcAtom* pRecipient, OdRxObject* pMessage = 0 ) = 0; /** \details Adds atomic portion of code encapsulated in OdRxAtom to queue and returns immediately. */ virtual void addEntryPoint( OdApcAtom* pRecipient, OdApcParamType pMessage ) = 0; /** \details Waits until all OdApcAtom objects in main and synchronized queues are processed. \remarks Calling thread also utilized to process queued asynchronous calls. */ virtual void wait() = 0; /** \details Execute action in the main thread (doesn't depend on multithread queues count). \param mtFunc [in] Function to execute in the main thread. \param pArg [in] Function argument to execute in the main thread. \remarks Simply redirects call into threading framework. */ virtual void executeMainThreadAction( MainThreadFunc mtFunc, void *pArg ) = 0; }; /** \details This template class is a specialization of the OdSmartPtr class template for OdApcQueue object pointers. */ typedef OdSmartPtr OdApcQueuePtr; /** \details This interface represents APC framework gateway. It provides functionality used for protecting data that usually is being accessed by many reading threads and few writing threads. It allows access to protecting data simultaneously by reading threads until no thread gains write access. Library: TD_Db */ class OdApcDataReadWriteDispatcher : public OdRxObject { public: /** \details A calling thread waits until area is unlocked and enters it to read only. */ virtual void enter() = 0; /** \details A calling thread leaves protected area. */ virtual void leave() = 0; /** \details 1. Locks the entrance. 2. Waits until all threads exit read state. 3. Returns. \remarks IMPORTANT: If calling thread calls enter() without leave(), it should call lockFromInside() to be not deadlocked. */ virtual void lock() = 0; /** \details Unlocks the entrance. */ virtual void unlock() = 0; /** \details Locks the entrance while being in read state. \remarks To unlock protected area, the calling thread may either call unlock() or unlockFromInside() to stay in read state. */ virtual void lockFromInside() = 0; /** \details Unlocks the entrance while staying in read state. */ virtual void unlockFromInside() = 0; }; /** \details This template class is a specialization of the OdSmartPtr class template for OdApcDataReadWriteDispatcher object pointers. */ typedef OdSmartPtr OdApcDataReadWriteDispatcherPtr; /** \details This interface represents APC framework event. Library: TD_Db */ class ODRX_ABSTRACT OdApcEvent : public OdRxObject { public: /** \details Sets the event object to the signaled state. */ virtual void set() = 0; /** \details Sets the event object to the nonsignaled state. */ virtual void reset() = 0; /** \details Waits until event object doesn't set to the signaled state. */ virtual void wait() = 0; /** \details Waits until event object doesn't set to the signaled state and sets it to the nonsignaled state. */ virtual void waitAndReset() = 0; }; /** \details This template class is a specialization of the OdSmartPtr class template for OdApcEvent object pointers. */ typedef OdSmartPtr OdApcEventPtr; /** \details This interface represents APC framework check point. Library: TD_Db */ class ODRX_ABSTRACT OdApcGateway : public OdRxObject { public: /** \details Controlled thread locks while passing through check point. When specified number of controlled threads enters sync point, event is triggered. See OdApcSyncPoint::wait(). */ virtual void lockByMain( OdUInt32 numThreads ) = 0; /** \details Controlled thread waits until number of threads specified by last call to lock() enters sync point. */ virtual void waitByMain() = 0; /** \details Controlled thread unlocks while passing through sync point. */ virtual void unlockByMain() = 0; /////////////////////////////////////////////////////////////////////////////// /** \details Controlled thread enters sync point by calling this method. Controlled thread is blocked inside this method until controlling thread calls unlock(). */ virtual void passBySecondary() = 0; }; /** \details This template class is a specialization of the OdSmartPtr class template for OdApcGateway object pointers. */ typedef OdSmartPtr OdApcGatewayPtr; /** \details This interface represents APC framework looped gateway point. Library: TD_Db */ class ODRX_ABSTRACT OdApcLoopedGateway : public OdRxObject { public: /** \details */ virtual void init( OdUInt32 numThreads ) = 0; /** \details Controlled thread unlocks while passing through sync point. */ virtual void passByMain() = 0; /** \details Controlled thread locks while passing through check point. When specified number of controlled threads enters sync point, event is triggered. See OdApcSyncPoint::wait(). */ virtual void waitByMain() = 0; /** \details Controlled thread waits until number of threads specified by last call to lock() enters sync point. */ virtual void passByMainNoWait() = 0; /////////////////////////////////////////////////////////////////////////////// /** \details Controlled thread enters sync point by calling this method. Controlled thread is blocked inside this method until controlling thread calls unlock(). */ virtual void passBySecondary() = 0; }; /** \details This template class is a specialization of the OdSmartPtr class template for OdApcLoopedGateway object pointers. */ typedef OdSmartPtr OdApcLoopedGatewayPtr; /** \details Represents set of flags for new APC MtQueue construction. Library: TD_Db */ enum OdApcMtQueueFlags { kMtQueueNoFlags = 0, // Empty MtQueue flags. kMtQueueForceNewThreads = (1 << 0), // Spawn new threads and add them to the pool if there are not enough free threads. kMtQueueAllowExecByMain = (1 << 1), // Allow to use main thread for execution if there are no free threads. kMtQueueForceTopLevel = (1 << 2), // Make MtQueue top level even if other registered threads already run. kMtQueueLastFlag = kMtQueueForceTopLevel }; /** \details Library: TD_Db */ class FIRSTDLL_EXPORT ODRX_ABSTRACT OdRxThreadPoolService : public OdRxModule { public: ODRX_DECLARE_MEMBERS(OdRxThreadPoolService); /** \details Returns number of logical CPUs installed on this machine. */ virtual int numCPUs() const = 0; /** \details Returns number of physical CPU cores installed on this machine. */ virtual int numPhysicalCores() const = 0; /** \details Returns number of all threads in the pool. */ virtual int numThreads() const = 0; /** \details Returns number of currently available free threads in the pool. \remarks (numThreads() - numFreeThreads()) - count of currently running threads. */ virtual int numFreeThreads() const = 0; /** \details Creates APC thread object. */ virtual OdApcThreadPtr newThread() = 0; /** \details Creates synchronized (single thread) queue. \remarks Returned queue is processed by one thread at one moment. */ virtual OdApcQueuePtr newSTQueue() = 0; /** \details Creates mutiple thread processing queue. \param nThreadAttributes [in] Set of the attributes for the threads to run. \param numThreads [in] Number of threads for this queue. \param nFlags [in] Set of flags for new MtQueue construction. \remarks If numThreads argument is zero, then the queue uses all available free threads. */ virtual OdApcQueuePtr newMTQueue( unsigned nThreadAttributes = ThreadsCounter::kNoAttributes, int numThreads = 0, OdUInt32 nFlags = kMtQueueNoFlags ) = 0; /** \details Creates APC object pool object. */ virtual OdApcObjectPoolPtr newObjectPool() = 0; /** \details Creates APC event object. */ virtual OdApcEventPtr newEvent() = 0; /** \details Creates APC gateway object. */ virtual OdApcGatewayPtr newGateway() = 0; /** \details Creates APC sync point object. */ virtual OdApcLoopedGatewayPtr newLoopedGateway() = 0; /** \details Execute action in the main thread (doesn't depend on multithread queues count). \param mtFunc [in] Function to execute in the main thread. \param pArg [in] Function argument to execute in the main thread. */ virtual void executeMainThreadAction( MainThreadFunc mtFunc, void *pArg ) = 0; /* Support for external threads manager */ /** \details Register externally executed threads. \param nThreads [in] Count of threads in aThreads array. \param aThreads [in] Thread ID's array. \param nThreadAttributes [in] Set of attributes for running threads. \remarks If application starts own threads, uses system threads, or executes threads accessed by newThread() without using Mt queue. it must register ID's of these threads to enable library Mt mode, initialization of local heaps, etc. */ virtual void registerExternalThreads( unsigned nThreads, const unsigned* aThreads, unsigned nThreadAttribs = ThreadsCounter::kNoAttributes ) = 0; /** \details Unregister externally executed threads. \param nThreads [in] Count of threads in aThreads array. \param aThreads [in] Thread ID's array. \remarks If application starts own threads, uses system threads, or executes threads accessed by newThread() without using Mt queue, it must register ID's of these threads to enable library Mt mode, initialization of local heaps and etc. */ virtual void unregisterExternalThreads( unsigned nThreads, const unsigned* aThreads ) = 0; /** \details Inform Teigha that external thread started. */ virtual void externalThreadStart() = 0; /** \details Inform Teigha that external thread stopped. */ virtual void externalThreadStop() = 0; /** \details Setup function for provide execution in the external main thread. \param execFunc [in] Main thread execution function. */ virtual void setExternalMainThreadFunc( ExecuteMainThreadFunc execFunc ) = 0; /** \details Returns current function for execution in the external main thread. */ virtual ExecuteMainThreadFunc getExternalMainThreadFunc() const = 0; }; typedef OdSmartPtr OdRxThreadPoolServicePtr; /** \details Library: TD_Db */ class OdApcQueueHelper : public OdSmartPtr { public: OdApcQueueHelper( ) {} OdApcQueueHelper( const OdApcQueue* pObject, OdRxObjMod m ) : OdSmartPtr( pObject, m ) {} OdApcQueueHelper( const OdApcQueue* pObject ) : OdSmartPtr( pObject ) {} OdApcQueueHelper( const OdRxObject* pObject ) : OdSmartPtr( pObject ) {} OdApcQueueHelper( OdRxObject* pObject, OdRxObjMod m ) : OdSmartPtr( pObject, m ) {} OdApcQueueHelper( const OdSmartPtr& pObject ) : OdSmartPtr( pObject ) {} OdApcQueueHelper( const OdRxObjectPtr& pObject ) : OdSmartPtr( pObject ) {} OdApcQueueHelper( const OdBaseObjectPtr& pObject ) : OdSmartPtr( pObject ) {} using OdSmartPtr::operator =; void initST( OdRxThreadPoolService* pThreadPool ) { *this = pThreadPool ? pThreadPool->newSTQueue().get() : 0; } void initMT( OdRxThreadPoolService* pThreadPool ) { *this = pThreadPool ? pThreadPool->newMTQueue().get() : 0; } void call( OdApcAtom* pAction, OdRxObject* pParam = 0 ) { if( m_pObject ) { get()->addEntryPoint( pAction, pParam ); } else { pAction->apcEntryPoint( pParam ); } } void call( OdApcAtom* pAction, OdApcParamType param ) { if( m_pObject ) { get()->addEntryPoint( pAction, param ); } else { pAction->apcEntryPoint( param ); } } void setAtomPoolRef( OdApcObjectPool* pAtomPool ) { if( m_pObject ) { get()->setAtomPoolRef( pAtomPool ); } } void wait() { if( m_pObject ) { get()->wait(); } } }; /** \details Library: TD_Db class OdApcDataReadWriteDispatcherHelper : public OdSmartPtr { public: OdApcDataReadWriteDispatcherHelper( ) {} OdApcDataReadWriteDispatcherHelper( const OdApcDataReadWriteDispatcher* pObject, OdRxObjMod m ) : OdSmartPtr( pObject, m ) {} OdApcDataReadWriteDispatcherHelper( const OdApcDataReadWriteDispatcher* pObject ) : OdSmartPtr( pObject ) {} OdApcDataReadWriteDispatcherHelper( const OdRxObject* pObject ) : OdSmartPtr( pObject ) {} OdApcDataReadWriteDispatcherHelper( OdRxObject* pObject, OdRxObjMod m ) : OdSmartPtr( pObject, m ) {} OdApcDataReadWriteDispatcherHelper( const OdSmartPtr& pObject ) : OdSmartPtr( pObject ) {} OdApcDataReadWriteDispatcherHelper( const OdRxObjectPtr& pObject ) : OdSmartPtr( pObject ) {} OdApcDataReadWriteDispatcherHelper( const OdBaseObjectPtr& pObject ) : OdSmartPtr( pObject ) {} using OdSmartPtr::operator =; void init( OdRxThreadPoolService* pThreadPool ) { *this = pThreadPool ? pThreadPool->newDataReadWriteDispatcher().get() : 0; } void enter() { if( m_pObject ) { get()->enter(); } } void leave() { if( m_pObject ) { get()->leave(); } } void lock() { if( m_pObject ) { get()->lock(); } } void unlock() { if( m_pObject ) { get()->unlock(); } } void lockFromInside() { if( m_pObject ) { get()->lockFromInside(); } } void unlockFromInside() { if( m_pObject ) { get()->unlockFromInside(); } } }; class OdReadWriteDispatcherAutoPass { public: OdReadWriteDispatcherAutoPass( OdApcDataReadWriteDispatcherHelper& lock ) : m_lock( lock ) { m_lock.enter(); } ~OdReadWriteDispatcherAutoPass() { m_lock.leave(); } private: OdApcDataReadWriteDispatcherHelper& m_lock; }; class OdReadWriteDispatcherAutoLock { public: OdReadWriteDispatcherAutoLock( OdApcDataReadWriteDispatcherHelper& lock ) : m_lock( lock ) { m_lock.lock(); } ~OdReadWriteDispatcherAutoLock() { m_lock.unlock(); } private: OdApcDataReadWriteDispatcherHelper& m_lock; }; class OdReadWriteDispatcherAutoLockFromInside { public: OdReadWriteDispatcherAutoLockFromInside( OdApcDataReadWriteDispatcherHelper& lock ) : m_lock( lock ) { m_lock.lockFromInside(); } ~OdReadWriteDispatcherAutoLockFromInside() { m_lock.unlockFromInside(); } private: OdApcDataReadWriteDispatcherHelper& m_lock; }; */ /** \details Library: TD_Db */ class OdApcQueueHelperArray : public OdArray { public: void initST( int n, OdRxThreadPoolService* pTP ) { resize( n ); if ( pTP ) { for( int i=0; inewSTQueue() ); } } } void wait() { size_type n = size(); for( size_type i=0; i */ class ODRX_ABSTRACT OdAsyncForEachBase : public OdApcAtom { void* sync_next( OdMutex* mutex, OdUInt32 threadIndex, OdUInt32& itemIndex ) { OdMutexAutoLock lock( *mutex ); return next( threadIndex, itemIndex ); } protected: virtual OdMutex* mutexForNext() = 0; virtual void* next( OdUInt32 threadIndex, OdUInt32& itemIndex ) = 0; virtual void doAction( OdUInt32 threadIndex, OdUInt32 itemIndex, void* pItem ) = 0; virtual void apcEntryPoint( OdApcParamType threadIndex ) { OdMutex* mutex = mutexForNext(); OdUInt32 itemIndex; while( void* pItem = sync_next( mutex, (OdUInt32)threadIndex, itemIndex ) ) { doAction( (OdUInt32)threadIndex, itemIndex, pItem ); } } public: void for_each( OdApcQueue* pQueue, OdUInt32 nThreads = 0 ) { if ( pQueue!=0 ) { OdUInt32 n = nThreads==0 ? pQueue->framework().numCPUs() : nThreads; if( n > 1 ) { while( --n ) { pQueue->addEntryPoint( this, (OdApcParamType)n ); } apcEntryPoint( (OdApcParamType)n ); pQueue->wait(); return; } } apcEntryPoint( 0 ); } }; /** */ template< class It, class Fn > class OdAsyncForEach : public OdStaticRxObject { OdMutex m_mutex; It m_cur, m_last; Fn m_fn; bool next( It& cur ) { OdMutexAutoLock lock( m_mutex ); if ( m_cur < m_last ) { cur = m_cur; ++m_cur; return true; } return false; } void apcEntryPoint( OdRxObject* ) { It cur; if( next( cur ) ) { do { m_fn( *cur ); } while( next( cur ) ); } } public: void for_each( OdApcQueue* pQueue, It first, It last, Fn fn ) { unsigned long n = last-first; if( pQueue && n>1 ) { m_cur = first; m_last = last; m_fn = fn; unsigned long numCPUs = pQueue->framework().numCPUs(); n = ( n < numCPUs ) ? n : numCPUs; while( n-- ) { pQueue->addEntryPoint( this ); } pQueue->wait(); } else { std::for_each( first, last, fn ); } } }; template< class It, class Fn > void od_async_for_each( OdApcQueue* pQueue, It first, It last, Fn fn ) { ODA_ASSERT_ONCE( first <= last ); if( first < last ) { OdAsyncForEach< It, Fn >().for_each( pQueue, first, last, fn ); } } /** */ template< class TObject > struct OdApcObjectPoolHelperDummyInitFn { void operator () ( TObject& ) const { // does nothing } }; /** */ template< class TObject, class TInitFn = OdApcObjectPoolHelperDummyInitFn > class OdApcObjectPoolHelper : public OdApcObjectPoolPtr { TObject* m_pEntries; OdUInt32 m_nCount; public: OdApcObjectPoolHelper() : m_pEntries( NULL ), m_nCount(0) {} ~OdApcObjectPoolHelper() { release(); ::delete [] m_pEntries; m_pEntries = NULL; m_nCount=0; } void init( OdRxThreadPoolService* pTP, int n = 0, const TInitFn* pInitFn = 0 ) { if ( pTP ) { if( n==0 ) { n = pTP->numCPUs(); } m_nCount = n; m_pEntries = ::new TObject[n]; (*(OdApcObjectPoolPtr*)this) = pTP->newObjectPool(); while( --n ) { TObject& obj = m_pEntries[ n ]; if( pInitFn ) { (*pInitFn)( obj ); } get()->put( &obj ); } } else { m_nCount = 1; m_pEntries = ::new TObject[1]; if( pInitFn ) { (*pInitFn)( *m_pEntries ); } } } TObject* take() { if( m_pObject ) { return static_cast( get()->take().get() ); } return m_pEntries; } OdUInt32 size() const { return m_nCount; } TObject& at( OdUInt32 i ) const { if( i < m_nCount ) { return m_pEntries[i]; } throw OdError_InvalidIndex(); } operator OdApcObjectPool* () { return get(); } operator const OdApcObjectPool* () const { return get(); } }; #include "TD_PackPop.h" #endif //_ODRXTHREADPOOLSERVICE_INCLUDED_