@ -172,6 +172,13 @@ typedef struct {
*/
QmpCommandList * commands ;
bool qmp_caps [ QMP_CAPABILITY__MAX ] ;
/*
* Protects qmp request / response queue . Please take monitor_lock
* first when used together .
*/
QemuMutex qmp_queue_lock ;
/* Input queue that holds all the parsed QMP requests */
GQueue * qmp_requests ;
} MonitorQMP ;
/*
@ -218,6 +225,8 @@ struct Monitor {
/* Let's add monitor global variables to this struct. */
static struct {
IOThread * mon_iothread ;
/* Bottom half to dispatch the requests received from IO thread */
QEMUBH * qmp_dispatcher_bh ;
} mon_global ;
/* QMP checker flags */
@ -600,11 +609,13 @@ static void monitor_data_init(Monitor *mon, bool skip_flush,
{
memset ( mon , 0 , sizeof ( Monitor ) ) ;
qemu_mutex_init ( & mon - > out_lock ) ;
qemu_mutex_init ( & mon - > qmp . qmp_queue_lock ) ;
mon - > outbuf = qstring_new ( ) ;
/* Use *mon_cmds by default. */
mon - > cmd_table = mon_cmds ;
mon - > skip_flush = skip_flush ;
mon - > use_io_thr = use_io_thr ;
mon - > qmp . qmp_requests = g_queue_new ( ) ;
}
static void monitor_data_destroy ( Monitor * mon )
@ -617,6 +628,8 @@ static void monitor_data_destroy(Monitor *mon)
readline_free ( mon - > rs ) ;
QDECREF ( mon - > outbuf ) ;
qemu_mutex_destroy ( & mon - > out_lock ) ;
qemu_mutex_destroy ( & mon - > qmp . qmp_queue_lock ) ;
g_queue_free ( mon - > qmp . qmp_requests ) ;
}
char * qmp_human_monitor_command ( const char * command_line , bool has_cpu_index ,
@ -1059,6 +1072,16 @@ static void monitor_init_qmp_commands(void)
qmp_marshal_qmp_capabilities , QCO_NO_OPTIONS ) ;
}
static bool qmp_cap_enabled ( Monitor * mon , QMPCapability cap )
{
return mon - > qmp . qmp_caps [ cap ] ;
}
static bool qmp_oob_enabled ( Monitor * mon )
{
return qmp_cap_enabled ( mon , QMP_CAPABILITY_OOB ) ;
}
static void qmp_caps_check ( Monitor * mon , QMPCapabilityList * list ,
Error * * errp )
{
@ -3869,30 +3892,39 @@ static void monitor_qmp_respond(Monitor *mon, QObject *rsp,
qobject_decref ( rsp ) ;
}
static void handle_qmp_command ( JSONMessageParser * parser , GQueue * tokens )
struct QMPRequest {
/* Owner of the request */
Monitor * mon ;
/* "id" field of the request */
QObject * id ;
/* Request object to be handled */
QObject * req ;
/*
* Whether we need to resume the monitor afterward . This flag is
* used to emulate the old QMP server behavior that the current
* command must be completed before execution of the next one .
*/
bool need_resume ;
} ;
typedef struct QMPRequest QMPRequest ;
/*
* Dispatch one single QMP request . The function will free the req_obj
* and objects inside it before return .
*/
static void monitor_qmp_dispatch_one ( QMPRequest * req_obj )
{
QObject * req , * rsp = NULL , * id = NULL ;
Monitor * mon , * old_mon ;
QObject * req , * rsp = NULL , * id ;
QDict * qdict = NULL ;
MonitorQMP * mon_qmp = container_of ( parser , MonitorQMP , parser ) ;
Monitor * old_mon , * mon = container_of ( mon_qmp , Monitor , qmp ) ;
Error * err = NULL ;
bool need_resume ;
req = json_parser_parse_err ( tokens , NULL , & err ) ;
if ( ! req & & ! err ) {
/* json_parser_parse_err() sucks: can fail without setting @err */
error_setg ( & err , QERR_JSON_PARSING ) ;
}
if ( err ) {
goto err_out ;
}
req = req_obj - > req ;
mon = req_obj - > mon ;
id = req_obj - > id ;
need_resume = req_obj - > need_resume ;
qdict = qobject_to ( QDict , req ) ;
if ( qdict ) {
id = qdict_get ( qdict , " id " ) ;
qobject_incref ( id ) ;
qdict_del ( qdict , " id " ) ;
} /* else will fail qmp_dispatch() */
g_free ( req_obj ) ;
if ( trace_event_get_state_backends ( TRACE_HANDLE_QMP_COMMAND ) ) {
QString * req_json = qobject_to_json ( req ) ;
@ -3903,7 +3935,7 @@ static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens)
old_mon = cur_mon ;
cur_mon = mon ;
rsp = qmp_dispatch ( cur_ mon- > qmp . commands , req ) ;
rsp = qmp_dispatch ( mon - > qmp . commands , req ) ;
cur_mon = old_mon ;
@ -3919,12 +3951,122 @@ static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens)
}
}
err_out :
monitor_qmp_respond ( mon , rsp , err , id ) ;
/* Respond if necessary */
monitor_qmp_respond ( mon , rsp , NULL , id ) ;
/* This pairs with the monitor_suspend() in handle_qmp_command(). */
if ( need_resume ) {
monitor_resume ( mon ) ;
}
qobject_decref ( req ) ;
}
/*
* Pop one QMP request from monitor queues , return NULL if not found .
* We are using round - robin fashion to pop the request , to avoid
* processing commands only on a very busy monitor . To achieve that ,
* when we process one request on a specific monitor , we put that
* monitor to the end of mon_list queue .
*/
static QMPRequest * monitor_qmp_requests_pop_one ( void )
{
QMPRequest * req_obj = NULL ;
Monitor * mon ;
qemu_mutex_lock ( & monitor_lock ) ;
QTAILQ_FOREACH ( mon , & mon_list , entry ) {
qemu_mutex_lock ( & mon - > qmp . qmp_queue_lock ) ;
req_obj = g_queue_pop_head ( mon - > qmp . qmp_requests ) ;
qemu_mutex_unlock ( & mon - > qmp . qmp_queue_lock ) ;
if ( req_obj ) {
break ;
}
}
if ( req_obj ) {
/*
* We found one request on the monitor . Degrade this monitor ' s
* priority to lowest by re - inserting it to end of queue .
*/
QTAILQ_REMOVE ( & mon_list , mon , entry ) ;
QTAILQ_INSERT_TAIL ( & mon_list , mon , entry ) ;
}
qemu_mutex_unlock ( & monitor_lock ) ;
return req_obj ;
}
static void monitor_qmp_bh_dispatcher ( void * data )
{
QMPRequest * req_obj = monitor_qmp_requests_pop_one ( ) ;
if ( req_obj ) {
monitor_qmp_dispatch_one ( req_obj ) ;
/* Reschedule instead of looping so the main loop stays responsive */
qemu_bh_schedule ( mon_global . qmp_dispatcher_bh ) ;
}
}
static void handle_qmp_command ( JSONMessageParser * parser , GQueue * tokens )
{
QObject * req , * id = NULL ;
QDict * qdict = NULL ;
MonitorQMP * mon_qmp = container_of ( parser , MonitorQMP , parser ) ;
Monitor * mon = container_of ( mon_qmp , Monitor , qmp ) ;
Error * err = NULL ;
QMPRequest * req_obj ;
req = json_parser_parse_err ( tokens , NULL , & err ) ;
if ( ! req & & ! err ) {
/* json_parser_parse_err() sucks: can fail without setting @err */
error_setg ( & err , QERR_JSON_PARSING ) ;
}
if ( err ) {
monitor_qmp_respond ( mon , NULL , err , NULL ) ;
qobject_decref ( req ) ;
return ;
}
qdict = qobject_to ( QDict , req ) ;
if ( qdict ) {
id = qdict_get ( qdict , " id " ) ;
qobject_incref ( id ) ;
qdict_del ( qdict , " id " ) ;
} /* else will fail qmp_dispatch() */
req_obj = g_new0 ( QMPRequest , 1 ) ;
req_obj - > mon = mon ;
req_obj - > id = id ;
req_obj - > req = req ;
req_obj - > need_resume = false ;
/*
* If OOB is not enabled on the current monitor , we ' ll emulate the
* old behavior that we won ' t process the current monitor any more
* until it has responded . This helps make sure that as long as
* OOB is not enabled , the server will never drop any command .
*/
if ( ! qmp_oob_enabled ( mon ) ) {
monitor_suspend ( mon ) ;
req_obj - > need_resume = true ;
}
/*
* Put the request to the end of queue so that requests will be
* handled in time order . Ownership for req_obj , req , id ,
* etc . will be delivered to the handler side .
*/
qemu_mutex_lock ( & mon - > qmp . qmp_queue_lock ) ;
g_queue_push_tail ( mon - > qmp . qmp_requests , req_obj ) ;
qemu_mutex_unlock ( & mon - > qmp . qmp_queue_lock ) ;
/* Kick the dispatcher routine */
qemu_bh_schedule ( mon_global . qmp_dispatcher_bh ) ;
}
static void monitor_qmp_read ( void * opaque , const uint8_t * buf , int size )
{
Monitor * mon = opaque ;
@ -4137,6 +4279,15 @@ static void monitor_iothread_init(void)
{
mon_global . mon_iothread = iothread_create ( " mon_iothread " ,
& error_abort ) ;
/*
* This MUST be on main loop thread since we have commands that
* have assumption to be run on main loop thread . It would be
* nice that one day we can remove this assumption in the future .
*/
mon_global . qmp_dispatcher_bh = aio_bh_new ( qemu_get_aio_context ( ) ,
monitor_qmp_bh_dispatcher ,
NULL ) ;
}
void monitor_init_globals ( void )
@ -4288,6 +4439,10 @@ void monitor_cleanup(void)
}
qemu_mutex_unlock ( & monitor_lock ) ;
/* QEMUBHs needs to be deleted before destroying the IOThread. */
qemu_bh_delete ( mon_global . qmp_dispatcher_bh ) ;
mon_global . qmp_dispatcher_bh = NULL ;
iothread_destroy ( mon_global . mon_iothread ) ;
mon_global . mon_iothread = NULL ;
}