@ -13,9 +13,14 @@
# include "qemu/osdep.h"
# include "qemu/module.h"
# include "qapi/error.h"
# include "qapi/qapi-types-migration.h"
# include "exec/ramblock.h"
# include "multifd.h"
# include "qpl/qpl.h"
/* Maximum number of retries to resubmit a job if IAA work queues are full */
# define MAX_SUBMIT_RETRY_NUM (3)
typedef struct {
/* the QPL hardware path job */
qpl_job * job ;
@ -260,6 +265,225 @@ static void multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
p - > iov = NULL ;
}
/**
* multifd_qpl_prepare_job : prepare the job
*
* Set the QPL job parameters and properties .
*
* @ job : pointer to the qpl_job structure
* @ is_compression : indicates compression and decompression
* @ input : pointer to the input data buffer
* @ input_len : the length of the input data
* @ output : pointer to the output data buffer
* @ output_len : the length of the output data
*/
static void multifd_qpl_prepare_job ( qpl_job * job , bool is_compression ,
uint8_t * input , uint32_t input_len ,
uint8_t * output , uint32_t output_len )
{
job - > op = is_compression ? qpl_op_compress : qpl_op_decompress ;
job - > next_in_ptr = input ;
job - > next_out_ptr = output ;
job - > available_in = input_len ;
job - > available_out = output_len ;
job - > flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY ;
/* only supports compression level 1 */
job - > level = 1 ;
}
/**
* multifd_qpl_prepare_comp_job : prepare the compression job
*
* Set the compression job parameters and properties .
*
* @ job : pointer to the qpl_job structure
* @ input : pointer to the input data buffer
* @ output : pointer to the output data buffer
* @ size : the page size
*/
static void multifd_qpl_prepare_comp_job ( qpl_job * job , uint8_t * input ,
uint8_t * output , uint32_t size )
{
/*
* Set output length to less than the page size to force the job to
* fail in case it compresses to a larger size . We ' ll send that page
* without compression and skip the decompression operation on the
* destination .
*/
multifd_qpl_prepare_job ( job , true , input , size , output , size - 1 ) ;
}
/**
* multifd_qpl_prepare_decomp_job : prepare the decompression job
*
* Set the decompression job parameters and properties .
*
* @ job : pointer to the qpl_job structure
* @ input : pointer to the input data buffer
* @ len : the length of the input data
* @ output : pointer to the output data buffer
* @ size : the page size
*/
static void multifd_qpl_prepare_decomp_job ( qpl_job * job , uint8_t * input ,
uint32_t len , uint8_t * output ,
uint32_t size )
{
multifd_qpl_prepare_job ( job , false , input , len , output , size ) ;
}
/**
* multifd_qpl_fill_iov : fill in the IOV
*
* Fill in the QPL packet IOV
*
* @ p : Params for the channel being used
* @ data : pointer to the IOV data
* @ len : The length of the IOV data
*/
static void multifd_qpl_fill_iov ( MultiFDSendParams * p , uint8_t * data ,
uint32_t len )
{
p - > iov [ p - > iovs_num ] . iov_base = data ;
p - > iov [ p - > iovs_num ] . iov_len = len ;
p - > iovs_num + + ;
p - > next_packet_size + = len ;
}
/**
* multifd_qpl_fill_packet : fill the compressed page into the QPL packet
*
* Fill the compressed page length and IOV into the QPL packet
*
* @ idx : The index of the compressed length array
* @ p : Params for the channel being used
* @ data : pointer to the compressed page buffer
* @ len : The length of the compressed page
*/
static void multifd_qpl_fill_packet ( uint32_t idx , MultiFDSendParams * p ,
uint8_t * data , uint32_t len )
{
QplData * qpl = p - > compress_data ;
qpl - > zlen [ idx ] = cpu_to_be32 ( len ) ;
multifd_qpl_fill_iov ( p , data , len ) ;
}
/**
* multifd_qpl_submit_job : submit a job to the hardware
*
* Submit a QPL hardware job to the IAA device
*
* Returns true if the job is submitted successfully , otherwise false .
*
* @ job : pointer to the qpl_job structure
*/
static bool multifd_qpl_submit_job ( qpl_job * job )
{
qpl_status status ;
uint32_t num = 0 ;
retry :
status = qpl_submit_job ( job ) ;
if ( status = = QPL_STS_QUEUES_ARE_BUSY_ERR ) {
if ( num < MAX_SUBMIT_RETRY_NUM ) {
num + + ;
goto retry ;
}
}
return ( status = = QPL_STS_OK ) ;
}
/**
* multifd_qpl_compress_pages_slow_path : compress pages using slow path
*
* Compress the pages using software . If compression fails , the uncompressed
* page will be sent .
*
* @ p : Params for the channel being used
*/
static void multifd_qpl_compress_pages_slow_path ( MultiFDSendParams * p )
{
QplData * qpl = p - > compress_data ;
uint32_t size = p - > page_size ;
qpl_job * job = qpl - > sw_job ;
uint8_t * zbuf = qpl - > zbuf ;
uint8_t * buf ;
for ( int i = 0 ; i < p - > pages - > normal_num ; i + + ) {
buf = p - > pages - > block - > host + p - > pages - > offset [ i ] ;
multifd_qpl_prepare_comp_job ( job , buf , zbuf , size ) ;
if ( qpl_execute_job ( job ) = = QPL_STS_OK ) {
multifd_qpl_fill_packet ( i , p , zbuf , job - > total_out ) ;
} else {
/* send the uncompressed page */
multifd_qpl_fill_packet ( i , p , buf , size ) ;
}
zbuf + = size ;
}
}
/**
* multifd_qpl_compress_pages : compress pages
*
* Submit the pages to the IAA hardware for compression . If hardware
* compression fails , it falls back to software compression . If software
* compression also fails , the uncompressed page is sent .
*
* @ p : Params for the channel being used
*/
static void multifd_qpl_compress_pages ( MultiFDSendParams * p )
{
QplData * qpl = p - > compress_data ;
MultiFDPages_t * pages = p - > pages ;
uint32_t size = p - > page_size ;
QplHwJob * hw_job ;
uint8_t * buf ;
uint8_t * zbuf ;
for ( int i = 0 ; i < pages - > normal_num ; i + + ) {
buf = pages - > block - > host + pages - > offset [ i ] ;
zbuf = qpl - > zbuf + ( size * i ) ;
hw_job = & qpl - > hw_jobs [ i ] ;
multifd_qpl_prepare_comp_job ( hw_job - > job , buf , zbuf , size ) ;
if ( multifd_qpl_submit_job ( hw_job - > job ) ) {
hw_job - > fallback_sw_path = false ;
} else {
/*
* The IAA work queue is full , any immediate subsequent job
* submission is likely to fail , sending the page via the QPL
* software path at this point gives us a better chance of
* finding the queue open for the next pages .
*/
hw_job - > fallback_sw_path = true ;
multifd_qpl_prepare_comp_job ( qpl - > sw_job , buf , zbuf , size ) ;
if ( qpl_execute_job ( qpl - > sw_job ) = = QPL_STS_OK ) {
hw_job - > sw_output = zbuf ;
hw_job - > sw_output_len = qpl - > sw_job - > total_out ;
} else {
hw_job - > sw_output = buf ;
hw_job - > sw_output_len = size ;
}
}
}
for ( int i = 0 ; i < pages - > normal_num ; i + + ) {
buf = pages - > block - > host + pages - > offset [ i ] ;
zbuf = qpl - > zbuf + ( size * i ) ;
hw_job = & qpl - > hw_jobs [ i ] ;
if ( hw_job - > fallback_sw_path ) {
multifd_qpl_fill_packet ( i , p , hw_job - > sw_output ,
hw_job - > sw_output_len ) ;
continue ;
}
if ( qpl_wait_job ( hw_job - > job ) = = QPL_STS_OK ) {
multifd_qpl_fill_packet ( i , p , zbuf , hw_job - > job - > total_out ) ;
} else {
/* send the uncompressed page */
multifd_qpl_fill_packet ( i , p , buf , size ) ;
}
}
}
/**
* multifd_qpl_send_prepare : prepare data to be able to send
*
@ -273,8 +497,26 @@ static void multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
*/
static int multifd_qpl_send_prepare ( MultiFDSendParams * p , Error * * errp )
{
/* Implement in next patch */
return - 1 ;
QplData * qpl = p - > compress_data ;
uint32_t len = 0 ;
if ( ! multifd_send_prepare_common ( p ) ) {
goto out ;
}
/* The first IOV is used to store the compressed page lengths */
len = p - > pages - > normal_num * sizeof ( uint32_t ) ;
multifd_qpl_fill_iov ( p , ( uint8_t * ) qpl - > zlen , len ) ;
if ( qpl - > hw_avail ) {
multifd_qpl_compress_pages ( p ) ;
} else {
multifd_qpl_compress_pages_slow_path ( p ) ;
}
out :
p - > flags | = MULTIFD_FLAG_QPL ;
multifd_send_fill_packet ( p ) ;
return 0 ;
}
/**
@ -312,6 +554,140 @@ static void multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
p - > compress_data = NULL ;
}
/**
* multifd_qpl_process_and_check_job : process and check a QPL job
*
* Process the job and check whether the job output length is the
* same as the specified length
*
* Returns true if the job execution succeeded and the output length
* is equal to the specified length , otherwise false .
*
* @ job : pointer to the qpl_job structure
* @ is_hardware : indicates whether the job is a hardware job
* @ len : Specified output length
* @ errp : pointer to an error
*/
static bool multifd_qpl_process_and_check_job ( qpl_job * job , bool is_hardware ,
uint32_t len , Error * * errp )
{
qpl_status status ;
status = ( is_hardware ? qpl_wait_job ( job ) : qpl_execute_job ( job ) ) ;
if ( status ! = QPL_STS_OK ) {
error_setg ( errp , " qpl job failed with error %d " , status ) ;
return false ;
}
if ( job - > total_out ! = len ) {
error_setg ( errp , " qpl decompressed len %u, expected len %u " ,
job - > total_out , len ) ;
return false ;
}
return true ;
}
/**
* multifd_qpl_decompress_pages_slow_path : decompress pages using slow path
*
* Decompress the pages using software
*
* Returns 0 on success or - 1 on error
*
* @ p : Params for the channel being used
* @ errp : pointer to an error
*/
static int multifd_qpl_decompress_pages_slow_path ( MultiFDRecvParams * p ,
Error * * errp )
{
QplData * qpl = p - > compress_data ;
uint32_t size = p - > page_size ;
qpl_job * job = qpl - > sw_job ;
uint8_t * zbuf = qpl - > zbuf ;
uint8_t * addr ;
uint32_t len ;
for ( int i = 0 ; i < p - > normal_num ; i + + ) {
len = qpl - > zlen [ i ] ;
addr = p - > host + p - > normal [ i ] ;
/* the page is uncompressed, load it */
if ( len = = size ) {
memcpy ( addr , zbuf , size ) ;
zbuf + = size ;
continue ;
}
multifd_qpl_prepare_decomp_job ( job , zbuf , len , addr , size ) ;
if ( ! multifd_qpl_process_and_check_job ( job , false , size , errp ) ) {
return - 1 ;
}
zbuf + = len ;
}
return 0 ;
}
/**
* multifd_qpl_decompress_pages : decompress pages
*
* Decompress the pages using the IAA hardware . If hardware
* decompression fails , it falls back to software decompression .
*
* Returns 0 on success or - 1 on error
*
* @ p : Params for the channel being used
* @ errp : pointer to an error
*/
static int multifd_qpl_decompress_pages ( MultiFDRecvParams * p , Error * * errp )
{
QplData * qpl = p - > compress_data ;
uint32_t size = p - > page_size ;
uint8_t * zbuf = qpl - > zbuf ;
uint8_t * addr ;
uint32_t len ;
qpl_job * job ;
for ( int i = 0 ; i < p - > normal_num ; i + + ) {
addr = p - > host + p - > normal [ i ] ;
len = qpl - > zlen [ i ] ;
/* the page is uncompressed if received length equals the page size */
if ( len = = size ) {
memcpy ( addr , zbuf , size ) ;
zbuf + = size ;
continue ;
}
job = qpl - > hw_jobs [ i ] . job ;
multifd_qpl_prepare_decomp_job ( job , zbuf , len , addr , size ) ;
if ( multifd_qpl_submit_job ( job ) ) {
qpl - > hw_jobs [ i ] . fallback_sw_path = false ;
} else {
/*
* The IAA work queue is full , any immediate subsequent job
* submission is likely to fail , sending the page via the QPL
* software path at this point gives us a better chance of
* finding the queue open for the next pages .
*/
qpl - > hw_jobs [ i ] . fallback_sw_path = true ;
job = qpl - > sw_job ;
multifd_qpl_prepare_decomp_job ( job , zbuf , len , addr , size ) ;
if ( ! multifd_qpl_process_and_check_job ( job , false , size , errp ) ) {
return - 1 ;
}
}
zbuf + = len ;
}
for ( int i = 0 ; i < p - > normal_num ; i + + ) {
/* ignore pages that have already been processed */
if ( qpl - > zlen [ i ] = = size | | qpl - > hw_jobs [ i ] . fallback_sw_path ) {
continue ;
}
job = qpl - > hw_jobs [ i ] . job ;
if ( ! multifd_qpl_process_and_check_job ( job , true , size , errp ) ) {
return - 1 ;
}
}
return 0 ;
}
/**
* multifd_qpl_recv : read the data from the channel into actual pages
*
@ -325,8 +701,48 @@ static void multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
*/
static int multifd_qpl_recv ( MultiFDRecvParams * p , Error * * errp )
{
/* Implement in next patch */
return - 1 ;
QplData * qpl = p - > compress_data ;
uint32_t in_size = p - > next_packet_size ;
uint32_t flags = p - > flags & MULTIFD_FLAG_COMPRESSION_MASK ;
uint32_t len = 0 ;
uint32_t zbuf_len = 0 ;
int ret ;
if ( flags ! = MULTIFD_FLAG_QPL ) {
error_setg ( errp , " multifd %u: flags received %x flags expected %x " ,
p - > id , flags , MULTIFD_FLAG_QPL ) ;
return - 1 ;
}
multifd_recv_zero_page_process ( p ) ;
if ( ! p - > normal_num ) {
assert ( in_size = = 0 ) ;
return 0 ;
}
/* read compressed page lengths */
len = p - > normal_num * sizeof ( uint32_t ) ;
assert ( len < in_size ) ;
ret = qio_channel_read_all ( p - > c , ( void * ) qpl - > zlen , len , errp ) ;
if ( ret ! = 0 ) {
return ret ;
}
for ( int i = 0 ; i < p - > normal_num ; i + + ) {
qpl - > zlen [ i ] = be32_to_cpu ( qpl - > zlen [ i ] ) ;
assert ( qpl - > zlen [ i ] < = p - > page_size ) ;
zbuf_len + = qpl - > zlen [ i ] ;
}
/* read compressed pages */
assert ( in_size = = len + zbuf_len ) ;
ret = qio_channel_read_all ( p - > c , ( void * ) qpl - > zbuf , zbuf_len , errp ) ;
if ( ret ! = 0 ) {
return ret ;
}
if ( qpl - > hw_avail ) {
return multifd_qpl_decompress_pages ( p , errp ) ;
}
return multifd_qpl_decompress_pages_slow_path ( p , errp ) ;
}
static MultiFDMethods multifd_qpl_ops = {