You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

363 lines
14 KiB

/*****************************************************************************
* rist.c: RIST (Reliable Internet Stream Transport) input module
*****************************************************************************
* Copyright (C) 2021, SipRadius LLC
*
* Authors: Sergio Ammirata <sergio@ammirata.net>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
*****************************************************************************/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <vlc_common.h>
#include <vlc_interrupt.h>
#include <vlc_plugin.h>
#include <vlc_access.h>
#include <vlc_block.h>
#define RIST_CFG_PREFIX "rist-"
#include "rist.h"
#define NACK_FMT_RANGE 0
#define NACK_FMT_BITMASK 1
static const int nack_type_values[] = {
NACK_FMT_RANGE, NACK_FMT_BITMASK,
};
static const char *const nack_type_names[] = {
N_("Range"), N_("Bitmask"),
};
typedef struct
{
struct rist_ctx *receiver_ctx;
int gre_filter_dst_port;
uint32_t cumulative_loss;
uint32_t flow_id;
bool eof;
int i_recovery_buffer;
int i_maximum_jitter;
struct rist_logging_settings logging_settings;
vlc_mutex_t lock;
struct rist_data_block *rist_items[RIST_MAX_QUEUE_BUFFERS];
} stream_sys_t;
static int cb_stats(void *arg, const struct rist_stats *stats_container)
{
stream_t *p_access = (stream_t*)arg;
stream_sys_t *p_sys = p_access->p_sys;
msg_Dbg(p_access, "[RIST-STATS]: %s", stats_container->stats_json);
const struct rist_stats_receiver_flow *stats_receiver_flow = &stats_container->stats.receiver_flow;
p_sys->cumulative_loss += stats_receiver_flow->lost;
msg_Dbg(p_access, "[RIST-STATS]: received %"PRIu64", missing %"PRIu32", reordered %"PRIu32", recovered %"PRIu32", lost %"PRIu32", Q %.2f, max jitter (us) %"PRIu64", rtt %"PRIu32"ms, cumulative loss %"PRIu32"",
stats_receiver_flow->received,
stats_receiver_flow->missing,
stats_receiver_flow->reordered,
stats_receiver_flow->recovered,
stats_receiver_flow->lost,
stats_receiver_flow->quality,
stats_receiver_flow->max_inter_packet_spacing,
stats_receiver_flow->rtt,
p_sys->cumulative_loss
);
if ((int)stats_receiver_flow->max_inter_packet_spacing > p_sys->i_recovery_buffer * 1000)
{
msg_Err(p_access, "The IP network jitter exceeded your recovery buffer size, %d > %d us, you should increase the recovery buffer size or fix your source/network jitter",
(int)stats_receiver_flow->max_inter_packet_spacing, p_sys->i_recovery_buffer *1000);
}
if ((int)stats_receiver_flow->rtt > (p_sys->i_recovery_buffer))
{
msg_Err(p_access, "The RTT between us and the sender is higher than the configured recovery buffer size, %"PRIu32" > %d ms, you should increase the recovery buffer size",
stats_receiver_flow->rtt, p_sys->i_recovery_buffer);
}
vlc_mutex_lock( &p_sys->lock );
/* Trigger the appropriate response when there is no more data */
/* status of 1 is no data for one buffer length */
/* status of 2 is no data for 60 seconds, i.e. session timeout */
if (p_sys->flow_id == stats_receiver_flow->flow_id && stats_receiver_flow->status == 2) {
p_sys->eof = true;
}
vlc_mutex_unlock( &p_sys->lock );
rist_stats_free(stats_container);
return 0;
}
static int Control(stream_t *p_access, int i_query, va_list args)
{
switch( i_query )
{
case STREAM_CAN_SEEK:
case STREAM_CAN_FASTSEEK:
case STREAM_CAN_PAUSE:
case STREAM_CAN_CONTROL_PACE:
*va_arg( args, bool * ) = false;
break;
case STREAM_GET_PTS_DELAY:
*va_arg( args, vlc_tick_t * ) = VLC_TICK_FROM_MS(
var_InheritInteger(p_access, "network-caching") );
break;
default:
return VLC_EGENERIC;
}
return VLC_SUCCESS;
}
static block_t *BlockRIST(stream_t *p_access, bool *restrict eof)
{
stream_sys_t *p_sys = p_access->p_sys;
block_t *pktout = NULL;
struct rist_data_block *rist_buffer = NULL;
size_t i_total_size, i_rist_items_index;
int i_flags, ret;
*eof = false;
i_rist_items_index = i_flags = i_total_size = 0;
int i_read_timeout_ms = p_sys->i_maximum_jitter;
while ((ret = rist_receiver_data_read2(p_sys->receiver_ctx, &rist_buffer, i_read_timeout_ms)) > 0)
{
if (p_sys->gre_filter_dst_port > 0 && rist_buffer->virt_dst_port != p_sys->gre_filter_dst_port) {
rist_receiver_data_block_free2(&rist_buffer);
continue;
}
i_read_timeout_ms = 0;
p_sys->rist_items[i_rist_items_index++] = rist_buffer;
i_total_size += rist_buffer->payload_len;
vlc_mutex_lock( &p_sys->lock );
if (p_sys->flow_id != rist_buffer->flow_id ||
rist_buffer->flags == RIST_DATA_FLAGS_DISCONTINUITY ||
rist_buffer->flags == RIST_DATA_FLAGS_FLOW_BUFFER_START) {
if (p_sys->flow_id != rist_buffer->flow_id) {
msg_Info(p_access, "New flow detected with id %"PRIu32"", rist_buffer->flow_id);
p_sys->flow_id = rist_buffer->flow_id;
}
i_flags = BLOCK_FLAG_DISCONTINUITY;
vlc_mutex_unlock( &p_sys->lock );
break;
}
vlc_mutex_unlock( &p_sys->lock );
// Make sure we never read more than our array size
if (i_rist_items_index == (RIST_MAX_QUEUE_BUFFERS -1))
break;
}
if (ret > 50)
msg_Dbg(p_access, "Falling behind reading rist buffer by %d packets", ret);
if (ret < 0) {
msg_Err(p_access, "Unrecoverable error %i while reading from rist, ending stream", ret);
*eof = true;
goto failed_cleanup;
}
if (i_total_size == 0) {
return NULL;
}
// Prepare one large buffer (when we are behing in reading, otherwise it is the same size as what is being read)
pktout = block_Alloc(i_total_size);
if ( unlikely(pktout == NULL) ) {
goto failed_cleanup;
}
size_t block_offset = 0;
for(size_t i = 0; i < i_rist_items_index; ++i) {
memcpy(pktout->p_buffer + block_offset, p_sys->rist_items[i]->payload, p_sys->rist_items[i]->payload_len);
block_offset += p_sys->rist_items[i]->payload_len;
rist_receiver_data_block_free2(& p_sys->rist_items[i]);
}
pktout->i_flags = i_flags;
return pktout;
failed_cleanup:
if (i_total_size > 0) {
for (size_t i = 0; i < i_rist_items_index; i++) {
rist_receiver_data_block_free2(&p_sys->rist_items[i]);
}
}
return NULL;
}
static void Close(vlc_object_t *p_this)
{
stream_t *p_access = (stream_t*)p_this;
stream_sys_t *p_sys = p_access->p_sys;
rist_destroy(p_sys->receiver_ctx);
}
static int Open(vlc_object_t *p_this)
{
stream_t *p_access = (stream_t*)p_this;
stream_sys_t *p_sys = NULL;
p_sys = vlc_obj_calloc( p_this, 1, sizeof( *p_sys ) );
if( unlikely( p_sys == NULL ) )
return VLC_ENOMEM;
p_access->p_sys = p_sys;
vlc_mutex_init( &p_sys->lock );
int rist_profile = var_InheritInteger(p_access, RIST_CFG_PREFIX RIST_URL_PARAM_PROFILE);
p_sys->i_maximum_jitter = var_InheritInteger(p_access, RIST_CFG_PREFIX "maximum-jitter");
p_sys->gre_filter_dst_port = var_InheritInteger(p_access, RIST_CFG_PREFIX RIST_URL_PARAM_VIRT_DST_PORT);
if (p_sys->gre_filter_dst_port % 2 != 0) {
msg_Err(p_access, "Virtual destination port must be an even number.");
return VLC_EGENERIC;
}
if (rist_profile == RIST_PROFILE_SIMPLE)
p_sys->gre_filter_dst_port = 0;
int i_recovery_length = var_InheritInteger(p_access, RIST_CFG_PREFIX RIST_CFG_LATENCY);
if (i_recovery_length == 0) {
// Auto-configure the recovery buffer
i_recovery_length = 1000;//1 Second, libRIST default
}
p_sys->i_recovery_buffer = i_recovery_length;
int i_verbose_level = var_InheritInteger( p_access, RIST_CFG_PREFIX RIST_URL_PARAM_VERBOSE_LEVEL );
//This simply disables the global logs, which are only used by the udpsocket functions provided
//by libRIST. When called by the library logs are anyway generated (though perhaps less accurate).
//This prevents all sorts of complications wrt other rist modules running and other the like.
struct rist_logging_settings rist_global_logging_settings = LOGGING_SETTINGS_INITIALIZER;
if (rist_logging_set_global(&rist_global_logging_settings) != 0) {
msg_Err(p_access,"Could not set logging\n");
return VLC_EGENERIC;
}
struct rist_logging_settings *logging_settings = &p_sys->logging_settings;
logging_settings->log_socket = -1;
logging_settings->log_stream = NULL;
logging_settings->log_level = i_verbose_level;
logging_settings->log_cb = log_cb;
logging_settings->log_cb_arg = p_access;
if (rist_receiver_create(&p_sys->receiver_ctx, rist_profile, logging_settings) != 0) {
msg_Err(p_access, "Could not create rist receiver context");
return VLC_EGENERIC;
}
int nack_type = var_InheritInteger( p_access, RIST_CFG_PREFIX "nack-type" );
if (rist_receiver_nack_type_set(p_sys->receiver_ctx, nack_type)) {
msg_Err(p_access, "Could not set nack type");
goto failed;
}
// Enable stats data
if (rist_stats_callback_set(p_sys->receiver_ctx, 1000, cb_stats, (void *)p_access) == -1) {
msg_Err(p_access, "Could not enable stats callback");
goto failed;
}
if( !rist_add_peers(VLC_OBJECT(p_access), p_sys->receiver_ctx, p_access->psz_url, 0, RIST_DEFAULT_VIRT_DST_PORT, i_recovery_length) )
goto failed;
/* Start the rist protocol thread */
if (rist_start(p_sys->receiver_ctx)) {
msg_Err(p_access, "Could not start rist receiver");
goto failed;
}
p_access->pf_block = BlockRIST;
p_access->pf_control = Control;
return VLC_SUCCESS;
failed:
rist_destroy(p_sys->receiver_ctx);
msg_Err(p_access, "Failed to open rist module");
return VLC_EGENERIC;
}
#define DST_PORT_TEXT N_("Virtual Destination Port Filter")
#define DST_PORT_LONGTEXT N_( \
"Destination port to be used inside the reduced-mode of the main profile "\
"to filter incoming data. Use zero to allow all." )
/* Module descriptor */
vlc_module_begin ()
set_shortname( N_("RIST") )
set_description( N_("RIST input") )
set_subcategory( SUBCAT_INPUT_ACCESS )
add_integer( RIST_CFG_PREFIX "maximum-jitter", 5,
N_("RIST demux/decode maximum jitter (default is 5ms)"),
N_("This controls the maximum jitter that will be passed to the demux/decode chain. "
"The lower the value, the more CPU cycles the module will consume"))
add_integer( RIST_CFG_PREFIX "nack-type", NACK_FMT_RANGE,
N_("RIST nack type, 0 = range, 1 = bitmask. Default is range"), NULL)
change_integer_list( nack_type_values, nack_type_names )
add_integer( RIST_CFG_PREFIX RIST_URL_PARAM_VIRT_DST_PORT, 0,
DST_PORT_TEXT, DST_PORT_LONGTEXT )
add_integer( RIST_CFG_PREFIX RIST_CFG_MAX_PACKET_SIZE, RIST_MAX_PACKET_SIZE,
RIST_PACKET_SIZE_TEXT, NULL )
add_string( RIST_CFG_PREFIX RIST_CFG_URL2, NULL, RIST_URL2_TEXT, NULL )
add_string( RIST_CFG_PREFIX RIST_CFG_URL3, NULL, RIST_URL3_TEXT, NULL )
add_string( RIST_CFG_PREFIX RIST_CFG_URL4, NULL, RIST_URL4_TEXT, NULL )
add_integer( RIST_CFG_PREFIX RIST_URL_PARAM_BANDWIDTH, RIST_DEFAULT_RECOVERY_MAXBITRATE,
RIST_MAX_BITRATE_TEXT, RIST_MAX_BITRATE_LONGTEXT )
add_integer( RIST_CFG_PREFIX RIST_CFG_RETRY_INTERVAL, RIST_DEFAULT_RECOVERY_RTT_MIN,
RIST_RETRY_INTERVAL_TEXT, NULL )
add_integer( RIST_CFG_PREFIX RIST_URL_PARAM_REORDER_BUFFER, RIST_DEFAULT_RECOVERY_REORDER_BUFFER,
RIST_REORDER_BUFFER_TEXT, NULL )
add_integer( RIST_CFG_PREFIX RIST_CFG_MAX_RETRIES, RIST_DEFAULT_MAX_RETRIES,
RIST_MAX_RETRIES_TEXT, NULL )
add_integer( RIST_CFG_PREFIX RIST_URL_PARAM_VERBOSE_LEVEL, RIST_DEFAULT_VERBOSE_LEVEL,
RIST_VERBOSE_LEVEL_TEXT, RIST_VERBOSE_LEVEL_LONGTEXT )
change_integer_list( verbose_level_type, verbose_level_type_names )
add_integer( RIST_CFG_PREFIX RIST_CFG_LATENCY, 0,
BUFFER_TEXT, BUFFER_LONGTEXT )
add_string( RIST_CFG_PREFIX RIST_URL_PARAM_CNAME, NULL, RIST_CNAME_TEXT,
RIST_CNAME_LONGTEXT )
add_integer( RIST_CFG_PREFIX RIST_URL_PARAM_PROFILE, RIST_DEFAULT_PROFILE,
RIST_PROFILE_TEXT, RIST_PROFILE_LONGTEXT )
add_password( RIST_CFG_PREFIX RIST_URL_PARAM_SECRET, "",
RIST_SHARED_SECRET_TEXT, NULL )
add_integer( RIST_CFG_PREFIX RIST_URL_PARAM_AES_TYPE, 0,
RIST_ENCRYPTION_TYPE_TEXT, NULL )
change_integer_list( rist_encryption_type, rist_encryption_type_names )
add_integer( RIST_CFG_PREFIX RIST_URL_PARAM_TIMING_MODE, RIST_DEFAULT_TIMING_MODE,
RIST_TIMING_MODE_TEXT, NULL )
change_integer_list( rist_timing_mode_type, rist_timing_mode_names )
add_string( RIST_CFG_PREFIX RIST_URL_PARAM_SRP_USERNAME, "",
RIST_SRP_USERNAME_TEXT, NULL )
add_password( RIST_CFG_PREFIX RIST_URL_PARAM_SRP_PASSWORD, "",
RIST_SRP_PASSWORD_TEXT, NULL )
set_capability( "access", 10 )
add_shortcut( "librist", "rist", "tr06" )
set_callbacks( Open, Close )
vlc_module_end ()