avformat/udp: Add a delay between packets for streaming to clients with short buffer
authorPavel Nikiforov <nikiforov.pavel@gmail.com>
Tue, 8 Mar 2016 20:27:45 +0000 (23:27 +0300)
committerMichael Niedermayer <michael@niedermayer.cc>
Tue, 24 May 2016 23:39:22 +0000 (01:39 +0200)
This commit enables sending UDP packets in a background thread with specified delay.
When sending packets without a delay some devices with small RX buffer
( MAG200 STB, for example) will drop tail packets in bursts causing
decoding errors.

To use it specify "fifo_size" with "packet_gap" .

The output url will looks like udp://xxx:yyy?fifo_size=<output fifo
size>&packet_gap=<delay in usecs>

Signed-off-by: Michael Niedermayer <michael@niedermayer.cc>
doc/protocols.texi
libavformat/udp.c

index a1084bd..a9c9d0c 100644 (file)
@@ -1285,6 +1285,9 @@ Set the UDP maximum socket buffer size in bytes. This is used to set either
 the receive or send buffer size, depending on what the socket is used for.
 Default is 64KB.  See also @var{fifo_size}.
 
+@item packet_gap=@var{seconds}
+Delay between packets
+
 @item localport=@var{port}
 Override the local UDP port to bind with.
 
index e42b911..70dc98e 100644 (file)
@@ -92,6 +92,7 @@ typedef struct UDPContext {
     int circular_buffer_size;
     AVFifoBuffer *fifo;
     int circular_buffer_error;
+    int64_t packet_gap; /* delay between transmitted packets */
 #if HAVE_PTHREAD_CANCEL
     pthread_t circular_buffer_thread;
     pthread_mutex_t mutex;
@@ -112,6 +113,7 @@ typedef struct UDPContext {
 #define E AV_OPT_FLAG_ENCODING_PARAM
 static const AVOption options[] = {
     { "buffer_size",    "System data size (in bytes)",                     OFFSET(buffer_size),    AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },
+    { "packet_gap",     "Delay between packets",                           OFFSET(packet_gap),     AV_OPT_TYPE_DURATION,    { .i64 = 0  },     0, INT_MAX, .flags = E },
     { "localport",      "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, D|E },
     { "local_port",     "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },
     { "localaddr",      "Local address",                                   OFFSET(localaddr),      AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
@@ -486,7 +488,7 @@ static int udp_get_file_handle(URLContext *h)
 }
 
 #if HAVE_PTHREAD_CANCEL
-static void *circular_buffer_task( void *_URLContext)
+static void *circular_buffer_task_rx( void *_URLContext)
 {
     URLContext *h = _URLContext;
     UDPContext *s = h->priv_data;
@@ -542,6 +544,81 @@ end:
     pthread_mutex_unlock(&s->mutex);
     return NULL;
 }
+
+static void do_udp_write(void *arg, void *buf, int size) {
+    URLContext *h = arg;
+    UDPContext *s = h->priv_data;
+
+    int ret;
+
+    if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
+        ret = ff_network_wait_fd(s->udp_fd, 1);
+        if (ret < 0) {
+            s->circular_buffer_error = ret;
+            return;
+        }
+    }
+
+    if (!s->is_connected) {
+        ret = sendto (s->udp_fd, buf, size, 0,
+                      (struct sockaddr *) &s->dest_addr,
+                      s->dest_addr_len);
+    } else
+        ret = send(s->udp_fd, buf, size, 0);
+
+    s->circular_buffer_error=ret;
+}
+
+static void *circular_buffer_task_tx( void *_URLContext)
+{
+    URLContext *h = _URLContext;
+    UDPContext *s = h->priv_data;
+    int old_cancelstate;
+
+    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+
+    for(;;) {
+        int len;
+        uint8_t tmp[4];
+
+        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
+
+        av_usleep(s->packet_gap);
+
+        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+
+        pthread_mutex_lock(&s->mutex);
+
+        len=av_fifo_size(s->fifo);
+
+        while (len<4) {
+            if (pthread_cond_wait(&s->cond, &s->mutex) < 0) {
+                goto end;
+            }
+            len=av_fifo_size(s->fifo);
+        }
+
+        av_fifo_generic_peek(s->fifo, tmp, 4, NULL);
+        len=AV_RL32(tmp);
+
+        if (len>0 && av_fifo_size(s->fifo)>=len+4) {
+            av_fifo_drain(s->fifo, 4); /* skip packet length */
+            av_fifo_generic_read(s->fifo, h, len, do_udp_write); /* use function for write from fifo buffer */
+            if (s->circular_buffer_error == len) {
+                /* all ok - reset error */
+                s->circular_buffer_error=0;
+            }
+        }
+
+        pthread_mutex_unlock(&s->mutex);
+    }
+
+end:
+    pthread_mutex_unlock(&s->mutex);
+    return NULL;
+}
+
+
 #endif
 
 static int parse_source_list(char *buf, char **sources, int *num_sources,
@@ -650,6 +727,16 @@ static int udp_open(URLContext *h, const char *uri, int flags)
                        "'circular_buffer_size' option was set but it is not supported "
                        "on this build (pthread support is required)\n");
         }
+        if (av_find_info_tag(buf, sizeof(buf), "packet_gap", p)) {
+            if (av_parse_time(&s->packet_gap, buf, 1)<0) {
+                av_log(h, AV_LOG_ERROR, "Can't parse 'packet_gap'");
+                goto fail;
+            }
+            if (!HAVE_PTHREAD_CANCEL)
+                av_log(h, AV_LOG_WARNING,
+                       "'packet_gap' option was set but it is not supported "
+                       "on this build (pthread support is required)\n");
+        }
         if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
             av_strlcpy(localaddr, buf, sizeof(localaddr));
         }
@@ -829,7 +916,18 @@ static int udp_open(URLContext *h, const char *uri, int flags)
     s->udp_fd = udp_fd;
 
 #if HAVE_PTHREAD_CANCEL
-    if (!is_output && s->circular_buffer_size) {
+    /*
+      Create thread in case of:
+      1. Input and circular_buffer_size is set
+      2. Output and packet_gap and circular_buffer_size is set
+    */
+
+    if (is_output && s->packet_gap && !s->circular_buffer_size) {
+        /* Warn user in case of 'circular_buffer_size' is not set */
+        av_log(h, AV_LOG_WARNING,"'packet_gap' option was set but 'circular_buffer_size' is not, but required\n");
+    }
+
+    if ((!is_output && s->circular_buffer_size) || (is_output && s->packet_gap && s->circular_buffer_size)) {
         int ret;
 
         /* start the task going */
@@ -844,7 +942,7 @@ static int udp_open(URLContext *h, const char *uri, int flags)
             av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
             goto cond_fail;
         }
-        ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h);
+        ret = pthread_create(&s->circular_buffer_thread, NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx, h);
         if (ret != 0) {
             av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
             goto thread_fail;
@@ -945,6 +1043,36 @@ static int udp_write(URLContext *h, const uint8_t *buf, int size)
     UDPContext *s = h->priv_data;
     int ret;
 
+#if HAVE_PTHREAD_CANCEL
+    if (s->fifo) {
+        uint8_t tmp[4];
+
+        pthread_mutex_lock(&s->mutex);
+
+        /*
+          Return error if last tx failed.
+          Here we can't know on which packet error was, but it needs to know that error exists.
+        */
+        if (s->circular_buffer_error<0) {
+            int err=s->circular_buffer_error;
+            s->circular_buffer_error=0;
+            pthread_mutex_unlock(&s->mutex);
+            return err;
+        }
+
+        if(av_fifo_space(s->fifo) < size + 4) {
+            /* What about a partial packet tx ? */
+            pthread_mutex_unlock(&s->mutex);
+            return AVERROR(ENOMEM);
+        }
+        AV_WL32(tmp, size);
+        av_fifo_generic_write(s->fifo, tmp, 4, NULL); /* size of packet */
+        av_fifo_generic_write(s->fifo, (uint8_t *)buf, size, NULL); /* the data */
+        pthread_cond_signal(&s->cond);
+        pthread_mutex_unlock(&s->mutex);
+        return size;
+    }
+#endif
     if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
         ret = ff_network_wait_fd(s->udp_fd, 1);
         if (ret < 0)