avformat/rt*p: Joining a SSM multicast group using an SDP (Issue #2171)
authorEd Torbett <ed.torbett@simulation-systems.co.uk>
Thu, 27 Jun 2013 07:53:00 +0000 (08:53 +0100)
committerMichael Niedermayer <michaelni@gmx.at>
Thu, 18 Jul 2013 16:01:31 +0000 (18:01 +0200)
Passes Source-Specific Multicast parameters read from an sdp file through to the UDP socket code,
allowing source-specific multicast streams to be correctly received. As an integral part of this
change, additional checking (currently only enabled in the case of SSM streams, but probably
useful in similar scenarios) has been added to the RTP protocol handler to distinguish UDP packets
arriving from multiple sources to the same port and process only the expected packets
(those transmitted from the expected UDP source address). This resolves an issue identified
when multiple instances of FFmpeg subscribe to different Source-Specific Multicast streams
but with each sharing the same destination port.

Signed-off-by: Edward Torbett <ed.torbett@simulation-systems.co.uk>
Signed-off-by: Michael Niedermayer <michaelni@gmx.at>
libavformat/rtpproto.c
libavformat/rtsp.c
libavformat/rtsp.h

index 62ec4c9..000bd1e 100644 (file)
@@ -42,7 +42,8 @@
 
 typedef struct RTPContext {
     URLContext *rtp_hd, *rtcp_hd;
-    int rtp_fd, rtcp_fd;
+    int rtp_fd, rtcp_fd, ssm;
+    struct sockaddr_storage ssm_addr;
 } RTPContext;
 
 /**
@@ -75,6 +76,31 @@ int ff_rtp_set_remote_url(URLContext *h, const char *uri)
     return 0;
 }
 
+static struct addrinfo* rtp_resolve_host(const char *hostname, int port,
+                                         int type, int family, int flags)
+{
+    struct addrinfo hints = { 0 }, *res = 0;
+    int error;
+    char sport[16];
+    const char *node = 0, *service = "0";
+
+    if (port > 0) {
+        snprintf(sport, sizeof(sport), "%d", port);
+        service = sport;
+    }
+    if ((hostname) && (hostname[0] != '\0') && (hostname[0] != '?')) {
+        node = hostname;
+    }
+    hints.ai_socktype = type;
+    hints.ai_family   = family;
+    hints.ai_flags = flags;
+    if ((error = getaddrinfo(node, service, &hints, &res))) {
+        res = NULL;
+        av_log(NULL, AV_LOG_ERROR, "rtp_resolve_host: %s\n", gai_strerror(error));
+    }
+
+    return res;
+}
 
 /**
  * add option to url of the form:
@@ -98,6 +124,7 @@ static av_printf_format(3, 4) void url_add_option(char *buf, int buf_size, const
 
 static void build_udp_url(char *buf, int buf_size,
                           const char *hostname, int port,
+                          int ssm, const char* source_addr,
                           int local_port, int ttl,
                           int max_packet_size, int connect)
 {
@@ -108,6 +135,8 @@ static void build_udp_url(char *buf, int buf_size,
         url_add_option(buf, buf_size, "ttl=%d", ttl);
     if (max_packet_size >=0)
         url_add_option(buf, buf_size, "pkt_size=%d", max_packet_size);
+    if (ssm)
+        url_add_option(buf, buf_size, "sources=%s", source_addr);
     if (connect)
         url_add_option(buf, buf_size, "connect=1");
     url_add_option(buf, buf_size, "fifo_size=0");
@@ -123,6 +152,7 @@ static void build_udp_url(char *buf, int buf_size,
  *         'connect=0/1'      : do a connect() on the UDP socket
  * deprecated option:
  *         'localport=n'      : set the local port to n
+ *         'ssm=ip'           : use ip as source-specific multicast address
  *
  * if rtcpport isn't set the rtcp port will be the rtp port + 1
  * if local rtp port isn't set any available port will be used for the local
@@ -136,10 +166,11 @@ static int rtp_open(URLContext *h, const char *uri, int flags)
     int rtp_port, rtcp_port,
         ttl, connect,
         local_rtp_port, local_rtcp_port, max_packet_size;
-    char hostname[256];
+    char hostname[256],source_ip[50];
     char buf[1024];
     char path[1024];
     const char *p;
+    struct addrinfo *sourceaddr;
 
     av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &rtp_port,
                  path, sizeof(path), uri);
@@ -149,6 +180,7 @@ static int rtp_open(URLContext *h, const char *uri, int flags)
     local_rtp_port = -1;
     local_rtcp_port = -1;
     max_packet_size = -1;
+    s->ssm = 0;
     connect = 0;
 
     p = strchr(uri, '?');
@@ -174,10 +206,21 @@ static int rtp_open(URLContext *h, const char *uri, int flags)
         if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
             connect = strtol(buf, NULL, 10);
         }
+        if (av_find_info_tag(buf, sizeof(buf), "ssm", p)) {
+            s->ssm = 1;
+            snprintf(source_ip, sizeof(source_ip), "%s", buf);
+
+            sourceaddr = rtp_resolve_host(source_ip, 0,
+                                          SOCK_DGRAM, AF_UNSPEC,
+                                          AI_NUMERICHOST);
+
+            memcpy(&s->ssm_addr, sourceaddr->ai_addr, sourceaddr->ai_addrlen);
+            freeaddrinfo(sourceaddr);
+        }
     }
 
     build_udp_url(buf, sizeof(buf),
-                  hostname, rtp_port, local_rtp_port, ttl, max_packet_size,
+                  hostname, rtp_port, s->ssm, source_ip, local_rtp_port, ttl, max_packet_size,
                   connect);
     if (ffurl_open(&s->rtp_hd, buf, flags, &h->interrupt_callback, NULL) < 0)
         goto fail;
@@ -185,7 +228,7 @@ static int rtp_open(URLContext *h, const char *uri, int flags)
         local_rtcp_port = ff_udp_get_local_port(s->rtp_hd) + 1;
 
     build_udp_url(buf, sizeof(buf),
-                  hostname, rtcp_port, local_rtcp_port, ttl, max_packet_size,
+                  hostname, rtcp_port, s->ssm, source_ip, local_rtcp_port, ttl, max_packet_size,
                   connect);
     if (ffurl_open(&s->rtcp_hd, buf, flags, &h->interrupt_callback, NULL) < 0)
         goto fail;
@@ -232,6 +275,27 @@ static int rtp_read(URLContext *h, uint8_t *buf, int size)
                         continue;
                     return AVERROR(EIO);
                 }
+                if (s->ssm) {
+                    if (from.ss_family == AF_INET && s->ssm_addr.ss_family == AF_INET) {
+                        uint32_t intended_source = ((struct sockaddr_in *)&s->ssm_addr)->sin_addr.s_addr;
+                        uint32_t actual_source = ((struct sockaddr_in *)&from)->sin_addr.s_addr;
+                        if (intended_source != actual_source) {
+                            // discard the packet without any processing
+                            continue;
+                        }
+                    }
+
+#if defined(IPPROTO_IPV6)
+                    if (from.ss_family == AF_INET6 && s->ssm_addr.ss_family == AF_INET6) {
+                        unsigned char* intended_source = ((struct sockaddr_in6 *)&s->ssm_addr)->sin6_addr.s6_addr;
+                        unsigned char* actual_source = ((struct sockaddr_in6 *)&from)->sin6_addr.s6_addr;
+                        if (memcmp(intended_source, actual_source, 16) != 0) {
+                            // discard the packet without any processing
+                            continue;
+                        }
+                    }
+#endif
+                }
                 break;
             }
             /* then RTP */
@@ -245,6 +309,27 @@ static int rtp_read(URLContext *h, uint8_t *buf, int size)
                         continue;
                     return AVERROR(EIO);
                 }
+                if (s->ssm) {
+                    if (from.ss_family == AF_INET && s->ssm_addr.ss_family == AF_INET) {
+                        uint32_t intended_source = ((struct sockaddr_in *)&s->ssm_addr)->sin_addr.s_addr;
+                        uint32_t actual_source = ((struct sockaddr_in *)&from)->sin_addr.s_addr;
+                        if (intended_source != actual_source) {
+                            // discard the packet without any processing
+                            continue;
+                        }
+                    }
+
+#if defined(IPPROTO_IPV6)
+                    if (from.ss_family == AF_INET6 && s->ssm_addr.ss_family == AF_INET6) {
+                        unsigned char* intended_source = ((struct sockaddr_in6 *)&s->ssm_addr)->sin6_addr.s6_addr;
+                        unsigned char* actual_source = ((struct sockaddr_in6 *)&from)->sin6_addr.s6_addr;
+                        if (memcmp(intended_source, actual_source, 16) != 0) {
+                            // discard the packet without any processing
+                            continue;
+                        }
+                    }
+#endif
+                }
                 break;
             }
         } else if (n < 0) {
index 3a07e9c..ff3b740 100644 (file)
@@ -498,6 +498,23 @@ static void sdp_parse_line(AVFormatContext *s, SDPParseState *s1,
             p += strspn(p, SPACE_CHARS);
             if (av_strstart(p, "inline:", &p))
                 get_word(rtsp_st->crypto_params, sizeof(rtsp_st->crypto_params), &p);
+        } else if (av_strstart(p, "source-filter", &p) && s->nb_streams > 0) {
+            get_word(buf1, sizeof(buf1), &p); // ignore tag
+
+            get_word(buf1, sizeof(buf1), &p);
+            if (strcmp(buf1, "IN") != 0)
+                    return;
+            get_word(buf1, sizeof(buf1), &p);
+            if (strcmp(buf1, "IP4") && strcmp(buf1, "IP6"))
+                    return;
+            get_word(buf1, sizeof(buf1), &p); //ignore repeated multicast address
+            get_word(buf1, sizeof(buf1), &p);
+            if (get_sockaddr(buf1, &sdp_ip))
+                    return;
+
+            rtsp_st = rt->rtsp_streams[rt->nb_rtsp_streams - 1];
+            rtsp_st->ssm = 1;
+            rtsp_st->source_ip = sdp_ip;
         } else {
             if (rt->server_type == RTSP_SERVER_WMS)
                 ff_wms_parse_sdp_a_line(s, p);
@@ -2084,6 +2101,11 @@ static int sdp_read_header(AVFormatContext *s)
                         "?localport=%d&ttl=%d&connect=%d", rtsp_st->sdp_port,
                         rtsp_st->sdp_ttl,
                         rt->rtsp_flags & RTSP_FLAG_FILTER_SRC ? 1 : 0);
+            if (rtsp_st->ssm) {
+                getnameinfo((struct sockaddr*) &rtsp_st->source_ip, sizeof(rtsp_st->source_ip),
+                            namebuf, sizeof(namebuf), NULL, 0, NI_NUMERICHOST);
+                av_strlcatf(url, sizeof(url), "&ssm=%s", namebuf);
+            }
             if (ffurl_open(&rtsp_st->rtp_handle, url, AVIO_FLAG_READ_WRITE,
                            &s->interrupt_callback, NULL) < 0) {
                 err = AVERROR_INVALIDDATA;
index 9e6e237..a68b204 100644 (file)
@@ -435,6 +435,8 @@ typedef struct RTSPStream {
     //@{
     int sdp_port;             /**< port (from SDP content) */
     struct sockaddr_storage sdp_ip; /**< IP address (from SDP content) */
+    int ssm;                    /**< Whether the stream should use source-specific multicast or not (from SDP content) */
+    struct sockaddr_storage source_ip; /**< Source-specific multicast source IP address (from SDP content) */
     int sdp_ttl;              /**< IP Time-To-Live (from SDP content) */
     int sdp_payload_type;     /**< payload type */
     //@}