@ -18,6 +18,7 @@
# include "qemu/defer-call.h"
# include "qemu/queue.h"
# include "qemu/thread.h"
# include "qemu/atomic.h"
# include "qemu/coroutine.h"
# include "trace.h"
# include "block/thread-pool.h"
@ -39,9 +40,13 @@ struct ThreadPoolElementAio {
ThreadPoolFunc * func ;
void * arg ;
/* Moving state out of THREAD_QUEUED is protected by lock. After
* that , only the worker thread can write to it . Reads and writes
* of state and ret are ordered with memory barriers .
/*
* Accessed with atomics . Moving state out of THREAD_QUEUED is
* protected by pool - > lock and only the worker thread can move
* the state from THREAD_ACTIVE to THREAD_DONE .
*
* When state is THREAD_DONE , ret must have been written already .
* Use acquire / release ordering when reading / writing ret as well .
*/
enum ThreadState state ;
int ret ;
@ -105,15 +110,14 @@ static void *worker_thread(void *opaque)
req = QTAILQ_FIRST ( & pool - > request_list ) ;
QTAILQ_REMOVE ( & pool - > request_list , req , reqs ) ;
req - > state = THREAD_ACTIVE ;
qatomic_set ( & req - > state , THREAD_ACTIVE ) ;
qemu_mutex_unlock ( & pool - > lock ) ;
ret = req - > func ( req - > arg ) ;
req - > ret = ret ;
/* Write ret before state. */
smp_wmb ( ) ;
req - > state = THREAD_DONE ;
qatomic_set ( & req - > ret , ret ) ;
/* _release to write ret before state. */
qatomic_store_release ( & req - > state , THREAD_DONE ) ;
qemu_bh_schedule ( pool - > completion_bh ) ;
qemu_mutex_lock ( & pool - > lock ) ;
@ -180,7 +184,8 @@ static void thread_pool_completion_bh(void *opaque)
restart :
QLIST_FOREACH_SAFE ( elem , & pool - > head , all , next ) {
if ( elem - > state ! = THREAD_DONE ) {
/* _acquire to read state before ret. */
if ( qatomic_load_acquire ( & elem - > state ) ! = THREAD_DONE ) {
continue ;
}
@ -189,9 +194,6 @@ restart:
QLIST_REMOVE ( elem , all ) ;
if ( elem - > common . cb ) {
/* Read state before ret. */
smp_rmb ( ) ;
/* Schedule ourselves in case elem->common.cb() calls aio_poll() to
* wait for another request that completed at the same time .
*/
@ -223,12 +225,12 @@ static void thread_pool_cancel(BlockAIOCB *acb)
trace_thread_pool_cancel_aio ( elem , elem - > common . opaque ) ;
QEMU_LOCK_GUARD ( & pool - > lock ) ;
if ( elem - > state = = THREAD_QUEUED ) {
if ( qatomic_read ( & elem - > state ) = = THREAD_QUEUED ) {
QTAILQ_REMOVE ( & pool - > request_list , elem , reqs ) ;
qemu_bh_schedule ( pool - > completion_bh ) ;
elem - > state = THREAD_DONE ;
elem - > ret = - ECANCELED ;
qatomic_set ( & elem - > ret , - ECANCELED ) ;
qatomic_store_release ( & elem - > state , THREAD_DONE ) ;
}
}