summaryrefslogtreecommitdiffstats
path: root/src/modules/rtp
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 16:03:18 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 16:03:18 +0000
commit2dd5bc6a074165ddfbd57c4bd52c2d2dac8f47a1 (patch)
tree465b29cb405d3af0b0ad50c78e1dccc636594fec /src/modules/rtp
parentInitial commit. (diff)
downloadpulseaudio-2dd5bc6a074165ddfbd57c4bd52c2d2dac8f47a1.tar.xz
pulseaudio-2dd5bc6a074165ddfbd57c4bd52c2d2dac8f47a1.zip
Adding upstream version 14.2.upstream/14.2upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/modules/rtp')
-rw-r--r--src/modules/rtp/headerlist.c173
-rw-r--r--src/modules/rtp/headerlist.h44
-rw-r--r--src/modules/rtp/meson.build35
-rw-r--r--src/modules/rtp/module-rtp-recv.c774
-rw-r--r--src/modules/rtp/module-rtp-send.c547
-rw-r--r--src/modules/rtp/rtp-common.c97
-rw-r--r--src/modules/rtp/rtp-gstreamer.c665
-rw-r--r--src/modules/rtp/rtp-native.c404
-rw-r--r--src/modules/rtp/rtp.h56
-rw-r--r--src/modules/rtp/rtsp_client.c643
-rw-r--r--src/modules/rtp/rtsp_client.h83
-rw-r--r--src/modules/rtp/sap.c235
-rw-r--r--src/modules/rtp/sap.h44
-rw-r--r--src/modules/rtp/sdp.c256
-rw-r--r--src/modules/rtp/sdp.h48
15 files changed, 4104 insertions, 0 deletions
diff --git a/src/modules/rtp/headerlist.c b/src/modules/rtp/headerlist.c
new file mode 100644
index 0000000..1fb0b41
--- /dev/null
+++ b/src/modules/rtp/headerlist.c
@@ -0,0 +1,173 @@
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2008 Colin Guthrie
+ Copyright 2007 Lennart Poettering
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as
+ published by the Free Software Foundation; either version 2.1 of the
+ License, or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <string.h>
+
+#include <pulse/xmalloc.h>
+
+#include <pulsecore/hashmap.h>
+#include <pulsecore/strbuf.h>
+#include <pulsecore/core-util.h>
+
+#include "headerlist.h"
+
+struct header {
+ char *key;
+ void *value;
+ size_t nbytes;
+};
+
+#define MAKE_HASHMAP(p) ((pa_hashmap*) (p))
+#define MAKE_HEADERLIST(p) ((pa_headerlist*) (p))
+
+static void header_free(struct header *hdr) {
+ pa_assert(hdr);
+
+ pa_xfree(hdr->key);
+ pa_xfree(hdr->value);
+ pa_xfree(hdr);
+}
+
+pa_headerlist* pa_headerlist_new(void) {
+ return MAKE_HEADERLIST(pa_hashmap_new_full(pa_idxset_string_hash_func, pa_idxset_string_compare_func, NULL, (pa_free_cb_t) header_free));
+}
+
+void pa_headerlist_free(pa_headerlist* p) {
+ pa_hashmap_free(MAKE_HASHMAP(p));
+}
+
+int pa_headerlist_puts(pa_headerlist *p, const char *key, const char *value) {
+ struct header *hdr;
+ bool add = false;
+
+ pa_assert(p);
+ pa_assert(key);
+
+ if (!(hdr = pa_hashmap_get(MAKE_HASHMAP(p), key))) {
+ hdr = pa_xnew(struct header, 1);
+ hdr->key = pa_xstrdup(key);
+ add = true;
+ } else
+ pa_xfree(hdr->value);
+
+ hdr->value = pa_xstrdup(value);
+ hdr->nbytes = strlen(value)+1;
+
+ if (add)
+ pa_hashmap_put(MAKE_HASHMAP(p), hdr->key, hdr);
+
+ return 0;
+}
+
+int pa_headerlist_putsappend(pa_headerlist *p, const char *key, const char *value) {
+ struct header *hdr;
+ bool add = false;
+
+ pa_assert(p);
+ pa_assert(key);
+
+ if (!(hdr = pa_hashmap_get(MAKE_HASHMAP(p), key))) {
+ hdr = pa_xnew(struct header, 1);
+ hdr->key = pa_xstrdup(key);
+ hdr->value = pa_xstrdup(value);
+ add = true;
+ } else {
+ void *newval = pa_sprintf_malloc("%s%s", (char*)hdr->value, value);
+ pa_xfree(hdr->value);
+ hdr->value = newval;
+ }
+ hdr->nbytes = strlen(hdr->value)+1;
+
+ if (add)
+ pa_hashmap_put(MAKE_HASHMAP(p), hdr->key, hdr);
+
+ return 0;
+}
+
+const char *pa_headerlist_gets(pa_headerlist *p, const char *key) {
+ struct header *hdr;
+
+ pa_assert(p);
+ pa_assert(key);
+
+ if (!(hdr = pa_hashmap_get(MAKE_HASHMAP(p), key)))
+ return NULL;
+
+ if (hdr->nbytes <= 0)
+ return NULL;
+
+ if (((char*) hdr->value)[hdr->nbytes-1] != 0)
+ return NULL;
+
+ if (strlen((char*) hdr->value) != hdr->nbytes-1)
+ return NULL;
+
+ return (char*) hdr->value;
+}
+
+int pa_headerlist_remove(pa_headerlist *p, const char *key) {
+ pa_assert(p);
+ pa_assert(key);
+
+ return pa_hashmap_remove_and_free(MAKE_HASHMAP(p), key);
+}
+
+const char *pa_headerlist_iterate(pa_headerlist *p, void **state) {
+ struct header *hdr;
+
+ if (!(hdr = pa_hashmap_iterate(MAKE_HASHMAP(p), state, NULL)))
+ return NULL;
+
+ return hdr->key;
+}
+
+char *pa_headerlist_to_string(pa_headerlist *p) {
+ const char *key;
+ void *state = NULL;
+ pa_strbuf *buf;
+
+ pa_assert(p);
+
+ buf = pa_strbuf_new();
+
+ while ((key = pa_headerlist_iterate(p, &state))) {
+
+ const char *v;
+
+ if ((v = pa_headerlist_gets(p, key)))
+ pa_strbuf_printf(buf, "%s: %s\r\n", key, v);
+ }
+
+ return pa_strbuf_to_string_free(buf);
+}
+
+int pa_headerlist_contains(pa_headerlist *p, const char *key) {
+ pa_assert(p);
+ pa_assert(key);
+
+ if (!(pa_hashmap_get(MAKE_HASHMAP(p), key)))
+ return 0;
+
+ return 1;
+}
diff --git a/src/modules/rtp/headerlist.h b/src/modules/rtp/headerlist.h
new file mode 100644
index 0000000..f38fb78
--- /dev/null
+++ b/src/modules/rtp/headerlist.h
@@ -0,0 +1,44 @@
+#ifndef foopulseheaderlisthfoo
+#define foopulseheaderlisthfoo
+
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2008 Colin Guthrie
+ Copyright 2007 Lennart Poettering
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as
+ published by the Free Software Foundation; either version 2.1 of the
+ License, or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#include <pulsecore/macro.h>
+
+typedef struct pa_headerlist pa_headerlist;
+
+pa_headerlist* pa_headerlist_new(void);
+void pa_headerlist_free(pa_headerlist* p);
+
+int pa_headerlist_puts(pa_headerlist *p, const char *key, const char *value);
+int pa_headerlist_putsappend(pa_headerlist *p, const char *key, const char *value);
+
+const char *pa_headerlist_gets(pa_headerlist *p, const char *key);
+
+int pa_headerlist_remove(pa_headerlist *p, const char *key);
+
+const char *pa_headerlist_iterate(pa_headerlist *p, void **state);
+
+char *pa_headerlist_to_string(pa_headerlist *p);
+
+int pa_headerlist_contains(pa_headerlist *p, const char *key);
+
+#endif
diff --git a/src/modules/rtp/meson.build b/src/modules/rtp/meson.build
new file mode 100644
index 0000000..119cf08
--- /dev/null
+++ b/src/modules/rtp/meson.build
@@ -0,0 +1,35 @@
+librtp_sources = [
+ 'rtp-common.c',
+ 'sdp.c',
+ 'sap.c',
+ 'rtsp_client.c',
+ 'headerlist.c',
+]
+
+librtp_headers = [
+ 'rtp.h',
+ 'sdp.h',
+ 'sap.h',
+ 'rtsp_client.h',
+ 'headerlist.h',
+]
+
+if have_gstreamer
+ librtp_sources += 'rtp-gstreamer.c'
+else
+ librtp_sources += 'rtp-native.c'
+endif
+
+librtp = shared_library('rtp',
+ librtp_sources,
+ librtp_headers,
+ c_args : [pa_c_args, server_c_args],
+ link_args : [nodelete_link_args],
+ include_directories : [configinc, topinc],
+ dependencies : [libpulse_dep, libpulsecommon_dep, libpulsecore_dep, libatomic_ops_dep, gst_dep, gstapp_dep, gstrtp_dep, gio_dep],
+ install : true,
+ install_rpath : privlibdir,
+ install_dir : modlibexecdir,
+)
+
+librtp_dep = declare_dependency(link_with: librtp)
diff --git a/src/modules/rtp/module-rtp-recv.c b/src/modules/rtp/module-rtp-recv.c
new file mode 100644
index 0000000..a9b42bb
--- /dev/null
+++ b/src/modules/rtp/module-rtp-recv.c
@@ -0,0 +1,774 @@
+
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2006 Lennart Poettering
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <stdio.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <errno.h>
+#include <string.h>
+#include <unistd.h>
+#include <math.h>
+
+#include <pulse/rtclock.h>
+#include <pulse/timeval.h>
+#include <pulse/xmalloc.h>
+
+#include <pulsecore/core-error.h>
+#include <pulsecore/module.h>
+#include <pulsecore/llist.h>
+#include <pulsecore/sink.h>
+#include <pulsecore/sink-input.h>
+#include <pulsecore/memblockq.h>
+#include <pulsecore/log.h>
+#include <pulsecore/core-rtclock.h>
+#include <pulsecore/core-util.h>
+#include <pulsecore/modargs.h>
+#include <pulsecore/namereg.h>
+#include <pulsecore/sample-util.h>
+#include <pulsecore/macro.h>
+#include <pulsecore/socket-util.h>
+#include <pulsecore/atomic.h>
+#include <pulsecore/once.h>
+#include <pulsecore/poll.h>
+#include <pulsecore/arpa-inet.h>
+
+#include "rtp.h"
+#include "sdp.h"
+#include "sap.h"
+
+PA_MODULE_AUTHOR("Lennart Poettering");
+PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
+PA_MODULE_VERSION(PACKAGE_VERSION);
+PA_MODULE_LOAD_ONCE(false);
+PA_MODULE_USAGE(
+ "sink=<name of the sink> "
+ "sap_address=<multicast address to listen on> "
+ "latency_msec=<latency in ms> "
+);
+
+#define SAP_PORT 9875
+#define DEFAULT_SAP_ADDRESS "224.0.0.56"
+#define DEFAULT_LATENCY_MSEC 500
+#define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
+#define MAX_SESSIONS 16
+#define DEATH_TIMEOUT 20
+#define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
+
+static const char* const valid_modargs[] = {
+ "sink",
+ "sap_address",
+ "latency_msec",
+ NULL
+};
+
+struct session {
+ struct userdata *userdata;
+ PA_LLIST_FIELDS(struct session);
+
+ pa_sink_input *sink_input;
+ pa_memblockq *memblockq;
+
+ bool first_packet;
+ uint32_t offset;
+
+ struct pa_sdp_info sdp_info;
+
+ pa_rtp_context *rtp_context;
+
+ pa_rtpoll_item *rtpoll_item;
+
+ pa_atomic_t timestamp;
+
+ pa_usec_t intended_latency;
+ pa_usec_t sink_latency;
+
+ unsigned int base_rate;
+ pa_usec_t last_rate_update;
+ pa_usec_t last_latency;
+ double estimated_rate;
+ double avg_estimated_rate;
+};
+
+struct userdata {
+ pa_module *module;
+ pa_core *core;
+
+ pa_sap_context sap_context;
+ pa_io_event* sap_event;
+
+ pa_time_event *check_death_event;
+
+ char *sink_name;
+
+ PA_LLIST_HEAD(struct session, sessions);
+ pa_hashmap *by_origin;
+ int n_sessions;
+
+ pa_usec_t latency;
+};
+
+static void session_free(struct session *s);
+
+/* Called from I/O thread context */
+static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
+ struct session *s = PA_SINK_INPUT(o)->userdata;
+
+ switch (code) {
+ case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
+ *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
+
+ /* Fall through, the default handler will add in the extra
+ * latency added by the resampler */
+ break;
+ }
+
+ return pa_sink_input_process_msg(o, code, data, offset, chunk);
+}
+
+/* Called from I/O thread context */
+static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
+ struct session *s;
+ pa_sink_input_assert_ref(i);
+ pa_assert_se(s = i->userdata);
+
+ if (pa_memblockq_peek(s->memblockq, chunk) < 0)
+ return -1;
+
+ pa_memblockq_drop(s->memblockq, chunk->length);
+
+ return 0;
+}
+
+/* Called from I/O thread context */
+static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
+ struct session *s;
+
+ pa_sink_input_assert_ref(i);
+ pa_assert_se(s = i->userdata);
+
+ pa_memblockq_rewind(s->memblockq, nbytes);
+}
+
+/* Called from I/O thread context */
+static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
+ struct session *s;
+
+ pa_sink_input_assert_ref(i);
+ pa_assert_se(s = i->userdata);
+
+ pa_memblockq_set_maxrewind(s->memblockq, nbytes);
+}
+
+/* Called from main context */
+static void sink_input_kill(pa_sink_input* i) {
+ struct session *s;
+ pa_sink_input_assert_ref(i);
+ pa_assert_se(s = i->userdata);
+
+ pa_hashmap_remove_and_free(s->userdata->by_origin, s->sdp_info.origin);
+}
+
+/* Called from IO context */
+static void sink_input_suspend_within_thread(pa_sink_input* i, bool b) {
+ struct session *s;
+ pa_sink_input_assert_ref(i);
+ pa_assert_se(s = i->userdata);
+
+ if (b)
+ pa_memblockq_flush_read(s->memblockq);
+ else
+ s->first_packet = false;
+}
+
+/* Called from I/O thread context */
+static int rtpoll_work_cb(pa_rtpoll_item *i) {
+ pa_memchunk chunk;
+ uint32_t timestamp;
+ int64_t k, j, delta;
+ struct timeval now = { 0, 0 };
+ struct session *s;
+ struct pollfd *p;
+
+ pa_assert_se(s = pa_rtpoll_item_get_work_userdata(i));
+
+ p = pa_rtpoll_item_get_pollfd(i, NULL);
+
+ if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
+ pa_log("poll() signalled bad revents.");
+ return -1;
+ }
+
+ if ((p->revents & POLLIN) == 0)
+ return 0;
+
+ p->revents = 0;
+
+ if (pa_rtp_recv(s->rtp_context, &chunk, s->userdata->module->core->mempool, &timestamp, &now) < 0)
+ return 0;
+
+ if (!PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
+ pa_memblock_unref(chunk.memblock);
+ return 0;
+ }
+
+ if (!s->first_packet) {
+ s->first_packet = true;
+ s->offset = timestamp;
+ }
+
+ /* Check whether there was a timestamp overflow */
+ k = (int64_t) timestamp - (int64_t) s->offset;
+ j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) timestamp;
+
+ if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
+ delta = k;
+ else
+ delta = j;
+
+ pa_memblockq_seek(s->memblockq, delta * (int64_t) pa_rtp_context_get_frame_size(s->rtp_context), PA_SEEK_RELATIVE,
+ true);
+
+ if (now.tv_sec == 0) {
+ PA_ONCE_BEGIN {
+ pa_log_warn("Using artificial time instead of timestamp");
+ } PA_ONCE_END;
+ pa_rtclock_get(&now);
+ } else
+ pa_rtclock_from_wallclock(&now);
+
+ if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
+ pa_log_warn("Queue overrun");
+ pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, true);
+ }
+
+/* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
+
+ pa_memblock_unref(chunk.memblock);
+
+ /* The next timestamp we expect */
+ s->offset = timestamp + (uint32_t) (chunk.length / pa_rtp_context_get_frame_size(s->rtp_context));
+
+ pa_atomic_store(&s->timestamp, (int) now.tv_sec);
+
+ if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
+ pa_usec_t wi, ri, render_delay, sink_delay = 0, latency;
+ uint32_t current_rate = s->sink_input->sample_spec.rate;
+ uint32_t new_rate;
+ double estimated_rate, alpha = 0.02;
+
+ pa_log_debug("Updating sample rate");
+
+ wi = pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec);
+ ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
+
+ pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
+
+ sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink, false);
+ render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
+
+ if (ri > render_delay+sink_delay)
+ ri -= render_delay+sink_delay;
+ else
+ ri = 0;
+
+ if (wi < ri)
+ latency = 0;
+ else
+ latency = wi - ri;
+
+ pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
+
+ /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
+ * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
+ * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
+ * T
+ * R̂ = ─────────────── Rⁿ . (1)
+ * T - (Lⁿ - Lⁿ⁻ⁱ)
+ *
+ * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
+ * is correct). But there is also the requirement to keep the buffer at a predefined target
+ * latency L̂. So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
+ * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
+ * aT the latency is reduced from Lⁿ to L̂. This strategy translates to the requirements
+ * ₐ R̂ - Rⁿ⁺ʲ a-j+1 j-1
+ * Σ T ────────── = L̂ - Lⁿ with Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
+ * ʲ⁼ⁱ R̂ a a
+ * Solving for Rⁿ⁺ⁱ gives
+ * T - ²∕ₐ₊₁(L̂ - Lⁿ)
+ * Rⁿ⁺ⁱ = ───────────────── R̂ . (2)
+ * T
+ * In the code below a = 7 is used.
+ *
+ * Equation (1) is not directly used in (2), but instead an exponentially weighted average
+ * of the estimated rate R̂ is used. This average R̅ is defined as
+ * R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
+ * Because it is difficult to find a fixed value for the coefficient α such that the
+ * averaging is without significant lag but oscillations are filtered out, a heuristic is
+ * used. When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
+ * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
+ */
+ estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency);
+ if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) {
+ double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate);
+ alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8);
+ }
+ s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate;
+ s->estimated_rate = estimated_rate;
+ pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha);
+ new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate);
+ s->last_latency = latency;
+
+ if (new_rate < (uint32_t) (s->base_rate*0.8) || new_rate > (uint32_t) (s->base_rate*1.25)) {
+ pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", s->base_rate, new_rate);
+ new_rate = s->base_rate;
+ } else {
+ if (s->base_rate < new_rate + 20 && new_rate < s->base_rate + 20)
+ new_rate = s->base_rate;
+ /* Do the adjustment in small steps; 2‰ can be considered inaudible */
+ if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
+ pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate);
+ new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
+ }
+ }
+ s->sink_input->sample_spec.rate = new_rate;
+
+ pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
+
+ pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
+
+ pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
+
+ s->last_rate_update = pa_timeval_load(&now);
+ }
+
+ if (pa_memblockq_is_readable(s->memblockq) &&
+ s->sink_input->thread_info.underrun_for > 0) {
+ pa_log_debug("Requesting rewind due to end of underrun");
+ pa_sink_input_request_rewind(s->sink_input,
+ (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
+ false, true, false);
+ }
+
+ return 1;
+}
+
+/* Called from I/O thread context */
+static void sink_input_attach(pa_sink_input *i) {
+ struct session *s;
+
+ pa_sink_input_assert_ref(i);
+ pa_assert_se(s = i->userdata);
+
+ pa_assert(!s->rtpoll_item);
+ s->rtpoll_item = pa_rtp_context_get_rtpoll_item(s->rtp_context, i->sink->thread_info.rtpoll);
+
+ pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb, s);
+}
+
+/* Called from I/O thread context */
+static void sink_input_detach(pa_sink_input *i) {
+ struct session *s;
+ pa_sink_input_assert_ref(i);
+ pa_assert_se(s = i->userdata);
+
+ pa_assert(s->rtpoll_item);
+ pa_rtpoll_item_free(s->rtpoll_item);
+ s->rtpoll_item = NULL;
+}
+
+static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
+ int af, fd = -1, r, one;
+
+ pa_assert(sa);
+ pa_assert(salen > 0);
+
+ af = sa->sa_family;
+ if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
+ pa_log("Failed to create socket: %s", pa_cstrerror(errno));
+ goto fail;
+ }
+
+ pa_make_udp_socket_low_delay(fd);
+
+#ifdef SO_TIMESTAMP
+ one = 1;
+ if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
+ pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
+ goto fail;
+ }
+#else
+ pa_log("SO_TIMESTAMP unsupported on this platform");
+ goto fail;
+#endif
+
+ one = 1;
+ if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
+ pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
+ goto fail;
+ }
+
+ r = 0;
+ if (af == AF_INET) {
+ /* IPv4 multicast addresses are in the 224.0.0.0-239.255.255.255 range */
+ static const uint32_t ipv4_mcast_mask = 0xe0000000;
+
+ if ((ntohl(((const struct sockaddr_in*) sa)->sin_addr.s_addr) & ipv4_mcast_mask) == ipv4_mcast_mask) {
+ struct ip_mreq mr4;
+ memset(&mr4, 0, sizeof(mr4));
+ mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
+ r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
+ }
+#ifdef HAVE_IPV6
+ } else if (af == AF_INET6) {
+ /* IPv6 multicast addresses have 255 as the most significant byte */
+ if (((const struct sockaddr_in6*) sa)->sin6_addr.s6_addr[0] == 0xff) {
+ struct ipv6_mreq mr6;
+ memset(&mr6, 0, sizeof(mr6));
+ mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
+ r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
+ }
+#endif
+ } else
+ pa_assert_not_reached();
+
+ if (r < 0) {
+ pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
+ goto fail;
+ }
+
+ if (bind(fd, sa, salen) < 0) {
+ pa_log("bind() failed: %s", pa_cstrerror(errno));
+ goto fail;
+ }
+
+ return fd;
+
+fail:
+ if (fd >= 0)
+ close(fd);
+
+ return -1;
+}
+
+static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
+ struct session *s = NULL;
+ pa_sink *sink;
+ int fd = -1;
+ pa_memchunk silence;
+ pa_sink_input_new_data data;
+ struct timeval now;
+
+ pa_assert(u);
+ pa_assert(sdp_info);
+
+ if (u->n_sessions >= MAX_SESSIONS) {
+ pa_log("Session limit reached.");
+ goto fail;
+ }
+
+ if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
+ pa_log("Sink does not exist.");
+ goto fail;
+ }
+
+ pa_rtclock_get(&now);
+
+ s = pa_xnew0(struct session, 1);
+ s->userdata = u;
+ s->first_packet = false;
+ s->sdp_info = *sdp_info;
+ s->rtpoll_item = NULL;
+ s->intended_latency = u->latency;
+ s->last_rate_update = pa_timeval_load(&now);
+ s->last_latency = u->latency;
+ pa_atomic_store(&s->timestamp, (int) now.tv_sec);
+
+ if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
+ goto fail;
+
+ pa_sink_input_new_data_init(&data);
+ pa_sink_input_new_data_set_sink(&data, sink, false, true);
+ data.driver = __FILE__;
+ pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
+ pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
+ "RTP Stream%s%s%s",
+ sdp_info->session_name ? " (" : "",
+ sdp_info->session_name ? sdp_info->session_name : "",
+ sdp_info->session_name ? ")" : "");
+
+ if (sdp_info->session_name)
+ pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
+ pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
+ pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
+ data.module = u->module;
+ pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
+ data.flags = PA_SINK_INPUT_VARIABLE_RATE;
+
+ pa_sink_input_new(&s->sink_input, u->module->core, &data);
+ pa_sink_input_new_data_done(&data);
+
+ if (!s->sink_input) {
+ pa_log("Failed to create sink input.");
+ goto fail;
+ }
+
+ s->base_rate = (double) s->sink_input->sample_spec.rate;
+ s->estimated_rate = (double) s->sink_input->sample_spec.rate;
+ s->avg_estimated_rate = (double) s->sink_input->sample_spec.rate;
+
+ s->sink_input->userdata = s;
+
+ s->sink_input->parent.process_msg = sink_input_process_msg;
+ s->sink_input->pop = sink_input_pop_cb;
+ s->sink_input->process_rewind = sink_input_process_rewind_cb;
+ s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
+ s->sink_input->kill = sink_input_kill;
+ s->sink_input->attach = sink_input_attach;
+ s->sink_input->detach = sink_input_detach;
+ s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
+
+ pa_sink_input_get_silence(s->sink_input, &silence);
+
+ s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
+
+ if (s->intended_latency < s->sink_latency*2)
+ s->intended_latency = s->sink_latency*2;
+
+ s->memblockq = pa_memblockq_new(
+ "module-rtp-recv memblockq",
+ 0,
+ MEMBLOCKQ_MAXLENGTH,
+ MEMBLOCKQ_MAXLENGTH,
+ &s->sink_input->sample_spec,
+ pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
+ 0,
+ 0,
+ &silence);
+
+ pa_memblock_unref(silence.memblock);
+
+ if (!(s->rtp_context = pa_rtp_context_new_recv(fd, sdp_info->payload, &s->sdp_info.sample_spec)))
+ goto fail;
+
+ pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
+ u->n_sessions++;
+ PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
+
+ pa_sink_input_put(s->sink_input);
+
+ pa_log_info("New session '%s'", s->sdp_info.session_name);
+
+ return s;
+
+fail:
+ pa_xfree(s);
+
+ if (fd >= 0)
+ pa_close(fd);
+
+ return NULL;
+}
+
+static void session_free(struct session *s) {
+ pa_assert(s);
+
+ pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
+
+ pa_sink_input_unlink(s->sink_input);
+ pa_sink_input_unref(s->sink_input);
+
+ PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
+ pa_assert(s->userdata->n_sessions >= 1);
+ s->userdata->n_sessions--;
+
+ pa_memblockq_free(s->memblockq);
+ pa_sdp_info_destroy(&s->sdp_info);
+ pa_rtp_context_free(s->rtp_context);
+
+ pa_xfree(s);
+}
+
+static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
+ struct userdata *u = userdata;
+ bool goodbye = false;
+ pa_sdp_info info;
+ struct session *s;
+
+ pa_assert(m);
+ pa_assert(e);
+ pa_assert(u);
+ pa_assert(fd == u->sap_context.fd);
+ pa_assert(flags == PA_IO_EVENT_INPUT);
+
+ if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
+ return;
+
+ if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
+ return;
+
+ if (goodbye) {
+ pa_hashmap_remove_and_free(u->by_origin, info.origin);
+ pa_sdp_info_destroy(&info);
+ } else {
+
+ if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
+ if (!session_new(u, &info))
+ pa_sdp_info_destroy(&info);
+
+ } else {
+ struct timeval now;
+ pa_rtclock_get(&now);
+ pa_atomic_store(&s->timestamp, (int) now.tv_sec);
+
+ pa_sdp_info_destroy(&info);
+ }
+ }
+}
+
+static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
+ struct session *s, *n;
+ struct userdata *u = userdata;
+ struct timeval now;
+
+ pa_assert(m);
+ pa_assert(t);
+ pa_assert(u);
+
+ pa_rtclock_get(&now);
+
+ pa_log_debug("Checking for dead streams ...");
+
+ for (s = u->sessions; s; s = n) {
+ int k;
+ n = s->next;
+
+ k = pa_atomic_load(&s->timestamp);
+
+ if (k + DEATH_TIMEOUT < now.tv_sec)
+ pa_hashmap_remove_and_free(u->by_origin, s->sdp_info.origin);
+ }
+
+ /* Restart timer */
+ pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
+}
+
+int pa__init(pa_module*m) {
+ struct userdata *u;
+ pa_modargs *ma = NULL;
+ struct sockaddr_in sa4;
+#ifdef HAVE_IPV6
+ struct sockaddr_in6 sa6;
+#endif
+ struct sockaddr *sa;
+ socklen_t salen;
+ const char *sap_address;
+ uint32_t latency_msec;
+ int fd = -1;
+
+ pa_assert(m);
+
+ if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
+ pa_log("failed to parse module arguments");
+ goto fail;
+ }
+
+ sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
+
+ if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
+ sa4.sin_family = AF_INET;
+ sa4.sin_port = htons(SAP_PORT);
+ sa = (struct sockaddr*) &sa4;
+ salen = sizeof(sa4);
+#ifdef HAVE_IPV6
+ } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
+ sa6.sin6_family = AF_INET6;
+ sa6.sin6_port = htons(SAP_PORT);
+ sa = (struct sockaddr*) &sa6;
+ salen = sizeof(sa6);
+#endif
+ } else {
+ pa_log("Invalid SAP address '%s'", sap_address);
+ goto fail;
+ }
+
+ latency_msec = DEFAULT_LATENCY_MSEC;
+ if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 1 || latency_msec > 300000) {
+ pa_log("Invalid latency specification");
+ goto fail;
+ }
+
+ if ((fd = mcast_socket(sa, salen)) < 0)
+ goto fail;
+
+ m->userdata = u = pa_xnew(struct userdata, 1);
+ u->module = m;
+ u->core = m->core;
+ u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
+ u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC;
+
+ u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
+ pa_sap_context_init_recv(&u->sap_context, fd);
+
+ PA_LLIST_HEAD_INIT(struct session, u->sessions);
+ u->n_sessions = 0;
+ u->by_origin = pa_hashmap_new_full(pa_idxset_string_hash_func, pa_idxset_string_compare_func, NULL, (pa_free_cb_t) session_free);
+
+ u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
+
+ pa_modargs_free(ma);
+
+ return 0;
+
+fail:
+ if (ma)
+ pa_modargs_free(ma);
+
+ if (fd >= 0)
+ pa_close(fd);
+
+ return -1;
+}
+
+void pa__done(pa_module*m) {
+ struct userdata *u;
+
+ pa_assert(m);
+
+ if (!(u = m->userdata))
+ return;
+
+ if (u->sap_event)
+ m->core->mainloop->io_free(u->sap_event);
+
+ if (u->check_death_event)
+ m->core->mainloop->time_free(u->check_death_event);
+
+ pa_sap_context_destroy(&u->sap_context);
+
+ if (u->by_origin)
+ pa_hashmap_free(u->by_origin);
+
+ pa_xfree(u->sink_name);
+ pa_xfree(u);
+}
diff --git a/src/modules/rtp/module-rtp-send.c b/src/modules/rtp/module-rtp-send.c
new file mode 100644
index 0000000..5a4c6fc
--- /dev/null
+++ b/src/modules/rtp/module-rtp-send.c
@@ -0,0 +1,547 @@
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2006 Lennart Poettering
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <stdio.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <errno.h>
+#include <unistd.h>
+
+#include <pulse/rtclock.h>
+#include <pulse/timeval.h>
+#include <pulse/util.h>
+#include <pulse/xmalloc.h>
+
+#include <pulsecore/core-error.h>
+#include <pulsecore/module.h>
+#include <pulsecore/source.h>
+#include <pulsecore/source-output.h>
+#include <pulsecore/memblockq.h>
+#include <pulsecore/log.h>
+#include <pulsecore/core-util.h>
+#include <pulsecore/modargs.h>
+#include <pulsecore/namereg.h>
+#include <pulsecore/sample-util.h>
+#include <pulsecore/macro.h>
+#include <pulsecore/socket-util.h>
+#include <pulsecore/arpa-inet.h>
+
+#include "rtp.h"
+#include "sdp.h"
+#include "sap.h"
+
+PA_MODULE_AUTHOR("Lennart Poettering");
+PA_MODULE_DESCRIPTION("Read data from source and send it to the network via RTP/SAP/SDP");
+PA_MODULE_VERSION(PACKAGE_VERSION);
+PA_MODULE_LOAD_ONCE(false);
+PA_MODULE_USAGE(
+ "source=<name of the source> "
+ "format=<sample format> "
+ "channels=<number of channels> "
+ "rate=<sample rate> "
+ "destination_ip=<destination IP address> "
+ "source_ip=<source IP address> "
+ "port=<port number> "
+ "mtu=<maximum transfer unit> "
+ "loop=<loopback to local host?> "
+ "ttl=<ttl value> "
+ "inhibit_auto_suspend=<always|never|only_with_non_monitor_sources>"
+ "stream_name=<name of the stream>"
+);
+
+#define DEFAULT_PORT 46000
+#define DEFAULT_TTL 1
+#define SAP_PORT 9875
+#define DEFAULT_SOURCE_IP "0.0.0.0"
+#define DEFAULT_DESTINATION_IP "224.0.0.56"
+#define MEMBLOCKQ_MAXLENGTH (1024*170)
+#define DEFAULT_MTU 1280
+#define SAP_INTERVAL (5*PA_USEC_PER_SEC)
+
+static const char* const valid_modargs[] = {
+ "source",
+ "format",
+ "channels",
+ "rate",
+ "destination", /* Compatbility */
+ "destination_ip",
+ "source_ip",
+ "port",
+ "mtu" ,
+ "loop",
+ "ttl",
+ "inhibit_auto_suspend",
+ "stream_name",
+ NULL
+};
+
+enum inhibit_auto_suspend {
+ INHIBIT_AUTO_SUSPEND_ALWAYS,
+ INHIBIT_AUTO_SUSPEND_NEVER,
+ INHIBIT_AUTO_SUSPEND_ONLY_WITH_NON_MONITOR_SOURCES
+};
+
+struct userdata {
+ pa_module *module;
+
+ pa_source_output *source_output;
+ pa_memblockq *memblockq;
+
+ pa_rtp_context *rtp_context;
+ pa_sap_context sap_context;
+
+ pa_time_event *sap_event;
+
+ enum inhibit_auto_suspend inhibit_auto_suspend;
+};
+
+/* Called from I/O thread context */
+static int source_output_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
+ struct userdata *u;
+ pa_assert_se(u = PA_SOURCE_OUTPUT(o)->userdata);
+
+ switch (code) {
+ case PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY:
+ *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(u->memblockq), &u->source_output->sample_spec);
+
+ /* Fall through, the default handler will add in the extra
+ * latency added by the resampler */
+ break;
+ }
+
+ return pa_source_output_process_msg(o, code, data, offset, chunk);
+}
+
+/* Called from I/O thread context */
+static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
+ struct userdata *u;
+ pa_source_output_assert_ref(o);
+ pa_assert_se(u = o->userdata);
+
+ if (pa_memblockq_push(u->memblockq, chunk) < 0) {
+ pa_log_warn("Failed to push chunk into memblockq.");
+ return;
+ }
+
+ pa_rtp_send(u->rtp_context, u->memblockq);
+}
+
+static pa_source_output_flags_t get_dont_inhibit_auto_suspend_flag(pa_source *source,
+ enum inhibit_auto_suspend inhibit_auto_suspend) {
+ pa_assert(source);
+
+ switch (inhibit_auto_suspend) {
+ case INHIBIT_AUTO_SUSPEND_ALWAYS:
+ return 0;
+
+ case INHIBIT_AUTO_SUSPEND_NEVER:
+ return PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND;
+
+ case INHIBIT_AUTO_SUSPEND_ONLY_WITH_NON_MONITOR_SOURCES:
+ return source->monitor_of ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0;
+ }
+
+ pa_assert_not_reached();
+}
+
+/* Called from the main thread. */
+static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
+ struct userdata *u;
+
+ pa_assert(o);
+
+ u = o->userdata;
+
+ if (!dest)
+ return;
+
+ o->flags &= ~PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND;
+ o->flags |= get_dont_inhibit_auto_suspend_flag(dest, u->inhibit_auto_suspend);
+}
+
+/* Called from main context */
+static void source_output_kill_cb(pa_source_output* o) {
+ struct userdata *u;
+ pa_source_output_assert_ref(o);
+ pa_assert_se(u = o->userdata);
+
+ pa_module_unload_request(u->module, true);
+
+ pa_source_output_unlink(u->source_output);
+ pa_source_output_unref(u->source_output);
+ u->source_output = NULL;
+}
+
+static void sap_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
+ struct userdata *u = userdata;
+
+ pa_assert(m);
+ pa_assert(t);
+ pa_assert(u);
+
+ pa_sap_send(&u->sap_context, 0);
+
+ pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + SAP_INTERVAL);
+}
+
+int pa__init(pa_module*m) {
+ struct userdata *u;
+ pa_modargs *ma = NULL;
+ const char *dst_addr;
+ const char *src_addr;
+ uint32_t port = DEFAULT_PORT, mtu;
+ uint32_t ttl = DEFAULT_TTL;
+ sa_family_t af;
+ int fd = -1, sap_fd = -1;
+ pa_source *s;
+ pa_sample_spec ss;
+ pa_channel_map cm;
+ struct sockaddr_in dst_sa4, dst_sap_sa4, src_sa4, src_sap_sa4;
+#ifdef HAVE_IPV6
+ struct sockaddr_in6 dst_sa6, dst_sap_sa6, src_sa6, src_sap_sa6;
+#endif
+ struct sockaddr_storage sa_dst;
+ pa_source_output *o = NULL;
+ uint8_t payload;
+ char *p;
+ int r, j;
+ socklen_t k;
+ char hn[128], *n;
+ bool loop = false;
+ enum inhibit_auto_suspend inhibit_auto_suspend = INHIBIT_AUTO_SUSPEND_ONLY_WITH_NON_MONITOR_SOURCES;
+ const char *inhibit_auto_suspend_str;
+ pa_source_output_new_data data;
+
+ pa_assert(m);
+
+ if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
+ pa_log("Failed to parse module arguments");
+ goto fail;
+ }
+
+ if (!(s = pa_namereg_get(m->core, pa_modargs_get_value(ma, "source", NULL), PA_NAMEREG_SOURCE))) {
+ pa_log("Source does not exist.");
+ goto fail;
+ }
+
+ if (pa_modargs_get_value_boolean(ma, "loop", &loop) < 0) {
+ pa_log("Failed to parse \"loop\" parameter.");
+ goto fail;
+ }
+
+ if ((inhibit_auto_suspend_str = pa_modargs_get_value(ma, "inhibit_auto_suspend", NULL))) {
+ if (pa_streq(inhibit_auto_suspend_str, "always"))
+ inhibit_auto_suspend = INHIBIT_AUTO_SUSPEND_ALWAYS;
+ else if (pa_streq(inhibit_auto_suspend_str, "never"))
+ inhibit_auto_suspend = INHIBIT_AUTO_SUSPEND_NEVER;
+ else if (pa_streq(inhibit_auto_suspend_str, "only_with_non_monitor_sources"))
+ inhibit_auto_suspend = INHIBIT_AUTO_SUSPEND_ONLY_WITH_NON_MONITOR_SOURCES;
+ else {
+ pa_log("Failed to parse the \"inhibit_auto_suspend\" parameter.");
+ goto fail;
+ }
+ }
+
+ ss = s->sample_spec;
+ pa_rtp_sample_spec_fixup(&ss);
+ cm = s->channel_map;
+ if (pa_modargs_get_sample_spec(ma, &ss) < 0) {
+ pa_log("Failed to parse sample specification");
+ goto fail;
+ }
+
+ if (!pa_rtp_sample_spec_valid(&ss)) {
+ pa_log("Specified sample type not compatible with RTP");
+ goto fail;
+ }
+
+ if (ss.channels != cm.channels)
+ pa_channel_map_init_auto(&cm, ss.channels, PA_CHANNEL_MAP_AIFF);
+
+ payload = pa_rtp_payload_from_sample_spec(&ss);
+
+ mtu = (uint32_t) pa_frame_align(DEFAULT_MTU, &ss);
+
+ if (pa_modargs_get_value_u32(ma, "mtu", &mtu) < 0 || mtu < 1 || mtu % pa_frame_size(&ss) != 0) {
+ pa_log("Invalid MTU.");
+ goto fail;
+ }
+
+ port = DEFAULT_PORT + ((uint32_t) (rand() % 512) << 1);
+ if (pa_modargs_get_value_u32(ma, "port", &port) < 0 || port < 1 || port > 0xFFFF) {
+ pa_log("port= expects a numerical argument between 1 and 65535.");
+ goto fail;
+ }
+
+ if (port & 1)
+ pa_log_warn("Port number not even as suggested in RFC3550!");
+
+ if (pa_modargs_get_value_u32(ma, "ttl", &ttl) < 0 || ttl < 1 || ttl > 0xFF) {
+ pa_log("ttl= expects a numerical argument between 1 and 255.");
+ goto fail;
+ }
+
+ src_addr = pa_modargs_get_value(ma, "source_ip", DEFAULT_SOURCE_IP);
+
+ if (inet_pton(AF_INET, src_addr, &src_sa4.sin_addr) > 0) {
+ src_sa4.sin_family = af = AF_INET;
+ src_sa4.sin_port = htons(0);
+ memset(&src_sa4.sin_zero, 0, sizeof(src_sa4.sin_zero));
+ src_sap_sa4 = src_sa4;
+#ifdef HAVE_IPV6
+ } else if (inet_pton(AF_INET6, src_addr, &src_sa6.sin6_addr) > 0) {
+ src_sa6.sin6_family = af = AF_INET6;
+ src_sa6.sin6_port = htons(0);
+ src_sa6.sin6_flowinfo = 0;
+ src_sa6.sin6_scope_id = 0;
+ src_sap_sa6 = src_sa6;
+#endif
+ } else {
+ pa_log("Invalid source address '%s'", src_addr);
+ goto fail;
+ }
+
+ dst_addr = pa_modargs_get_value(ma, "destination", NULL);
+ if (dst_addr == NULL)
+ dst_addr = pa_modargs_get_value(ma, "destination_ip", DEFAULT_DESTINATION_IP);
+
+ if (inet_pton(AF_INET, dst_addr, &dst_sa4.sin_addr) > 0) {
+ dst_sa4.sin_family = af = AF_INET;
+ dst_sa4.sin_port = htons((uint16_t) port);
+ memset(&dst_sa4.sin_zero, 0, sizeof(dst_sa4.sin_zero));
+ dst_sap_sa4 = dst_sa4;
+ dst_sap_sa4.sin_port = htons(SAP_PORT);
+#ifdef HAVE_IPV6
+ } else if (inet_pton(AF_INET6, dst_addr, &dst_sa6.sin6_addr) > 0) {
+ dst_sa6.sin6_family = af = AF_INET6;
+ dst_sa6.sin6_port = htons((uint16_t) port);
+ dst_sa6.sin6_flowinfo = 0;
+ dst_sa6.sin6_scope_id = 0;
+ dst_sap_sa6 = dst_sa6;
+ dst_sap_sa6.sin6_port = htons(SAP_PORT);
+#endif
+ } else {
+ pa_log("Invalid destination '%s'", dst_addr);
+ goto fail;
+ }
+
+ if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
+ pa_log("socket() failed: %s", pa_cstrerror(errno));
+ goto fail;
+ }
+
+ if (af == AF_INET && bind(fd, (struct sockaddr*) &src_sa4, sizeof(src_sa4)) < 0) {
+ pa_log("bind() failed: %s", pa_cstrerror(errno));
+ goto fail;
+#ifdef HAVE_IPV6
+ } else if (af == AF_INET6 && bind(fd, (struct sockaddr*) &src_sa6, sizeof(src_sa6)) < 0) {
+ pa_log("bind() failed: %s", pa_cstrerror(errno));
+ goto fail;
+#endif
+ }
+
+ if (af == AF_INET && connect(fd, (struct sockaddr*) &dst_sa4, sizeof(dst_sa4)) < 0) {
+ pa_log("connect() failed: %s", pa_cstrerror(errno));
+ goto fail;
+#ifdef HAVE_IPV6
+ } else if (af == AF_INET6 && connect(fd, (struct sockaddr*) &dst_sa6, sizeof(dst_sa6)) < 0) {
+ pa_log("connect() failed: %s", pa_cstrerror(errno));
+ goto fail;
+#endif
+ }
+
+ if ((sap_fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
+ pa_log("socket() failed: %s", pa_cstrerror(errno));
+ goto fail;
+ }
+
+ if (af == AF_INET && bind(sap_fd, (struct sockaddr*) &src_sap_sa4, sizeof(src_sap_sa4)) < 0) {
+ pa_log("bind() failed: %s", pa_cstrerror(errno));
+ goto fail;
+#ifdef HAVE_IPV6
+ } else if (af == AF_INET6 && bind(sap_fd, (struct sockaddr*) &src_sap_sa6, sizeof(src_sap_sa6)) < 0) {
+ pa_log("bind() failed: %s", pa_cstrerror(errno));
+ goto fail;
+#endif
+ }
+
+ if (af == AF_INET && connect(sap_fd, (struct sockaddr*) &dst_sap_sa4, sizeof(dst_sap_sa4)) < 0) {
+ pa_log("connect() failed: %s", pa_cstrerror(errno));
+ goto fail;
+#ifdef HAVE_IPV6
+ } else if (af == AF_INET6 && connect(sap_fd, (struct sockaddr*) &dst_sap_sa6, sizeof(dst_sap_sa6)) < 0) {
+ pa_log("connect() failed: %s", pa_cstrerror(errno));
+ goto fail;
+#endif
+ }
+
+ j = loop;
+ if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_LOOP, &j, sizeof(j)) < 0 ||
+ setsockopt(sap_fd, IPPROTO_IP, IP_MULTICAST_LOOP, &j, sizeof(j)) < 0) {
+ pa_log("IP_MULTICAST_LOOP failed: %s", pa_cstrerror(errno));
+ goto fail;
+ }
+
+ if (ttl != DEFAULT_TTL) {
+ int _ttl = (int) ttl;
+
+ if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_TTL, &_ttl, sizeof(_ttl)) < 0) {
+ pa_log("IP_MULTICAST_TTL failed: %s", pa_cstrerror(errno));
+ goto fail;
+ }
+
+ if (setsockopt(sap_fd, IPPROTO_IP, IP_MULTICAST_TTL, &_ttl, sizeof(_ttl)) < 0) {
+ pa_log("IP_MULTICAST_TTL (sap) failed: %s", pa_cstrerror(errno));
+ goto fail;
+ }
+ }
+
+ /* If the socket queue is full, let's drop packets */
+ pa_make_fd_nonblock(fd);
+ pa_make_udp_socket_low_delay(fd);
+
+ pa_source_output_new_data_init(&data);
+ pa_proplist_sets(data.proplist, PA_PROP_MEDIA_NAME, "RTP Monitor Stream");
+ pa_proplist_sets(data.proplist, "rtp.source", src_addr);
+ pa_proplist_sets(data.proplist, "rtp.destination", dst_addr);
+ pa_proplist_setf(data.proplist, "rtp.mtu", "%lu", (unsigned long) mtu);
+ pa_proplist_setf(data.proplist, "rtp.port", "%lu", (unsigned long) port);
+ pa_proplist_setf(data.proplist, "rtp.ttl", "%lu", (unsigned long) ttl);
+ data.driver = __FILE__;
+ data.module = m;
+ pa_source_output_new_data_set_source(&data, s, false, true);
+ pa_source_output_new_data_set_sample_spec(&data, &ss);
+ pa_source_output_new_data_set_channel_map(&data, &cm);
+ data.flags |= get_dont_inhibit_auto_suspend_flag(s, inhibit_auto_suspend);
+
+ pa_source_output_new(&o, m->core, &data);
+ pa_source_output_new_data_done(&data);
+
+ if (!o) {
+ pa_log("failed to create source output.");
+ goto fail;
+ }
+
+ o->parent.process_msg = source_output_process_msg;
+ o->push = source_output_push_cb;
+ o->moving = source_output_moving_cb;
+ o->kill = source_output_kill_cb;
+
+ pa_log_info("Configured source latency of %llu ms.",
+ (unsigned long long) pa_source_output_set_requested_latency(o, pa_bytes_to_usec(mtu, &o->sample_spec)) / PA_USEC_PER_MSEC);
+
+ m->userdata = o->userdata = u = pa_xnew(struct userdata, 1);
+ u->module = m;
+ u->source_output = o;
+
+ u->memblockq = pa_memblockq_new(
+ "module-rtp-send memblockq",
+ 0,
+ MEMBLOCKQ_MAXLENGTH,
+ MEMBLOCKQ_MAXLENGTH,
+ &ss,
+ 1,
+ 0,
+ 0,
+ NULL);
+
+ k = sizeof(sa_dst);
+ pa_assert_se((r = getsockname(fd, (struct sockaddr*) &sa_dst, &k)) >= 0);
+
+ n = pa_xstrdup(pa_modargs_get_value(ma, "stream_name", NULL));
+ if (n == NULL)
+ n = pa_sprintf_malloc("PulseAudio RTP Stream on %s", pa_get_fqdn(hn, sizeof(hn)));
+
+ if (af == AF_INET) {
+ p = pa_sdp_build(af,
+ (void*) &((struct sockaddr_in*) &sa_dst)->sin_addr,
+ (void*) &dst_sa4.sin_addr,
+ n, (uint16_t) port, payload, &ss);
+#ifdef HAVE_IPV6
+ } else {
+ p = pa_sdp_build(af,
+ (void*) &((struct sockaddr_in6*) &sa_dst)->sin6_addr,
+ (void*) &dst_sa6.sin6_addr,
+ n, (uint16_t) port, payload, &ss);
+#endif
+ }
+
+ pa_xfree(n);
+
+ if (!(u->rtp_context = pa_rtp_context_new_send(fd, payload, mtu, &ss)))
+ goto fail;
+ pa_sap_context_init_send(&u->sap_context, sap_fd, p);
+
+ pa_log_info("RTP stream initialized with mtu %u on %s:%u from %s ttl=%u, payload=%u",
+ mtu, dst_addr, port, src_addr, ttl, payload);
+ pa_log_info("SDP-Data:\n%s\nEOF", p);
+
+ pa_sap_send(&u->sap_context, 0);
+
+ u->sap_event = pa_core_rttime_new(m->core, pa_rtclock_now() + SAP_INTERVAL, sap_event_cb, u);
+ u->inhibit_auto_suspend = inhibit_auto_suspend;
+
+ pa_source_output_put(u->source_output);
+
+ pa_modargs_free(ma);
+
+ return 0;
+
+fail:
+ if (ma)
+ pa_modargs_free(ma);
+
+ if (fd >= 0)
+ pa_close(fd);
+
+ if (sap_fd >= 0)
+ pa_close(sap_fd);
+
+ return -1;
+}
+
+void pa__done(pa_module*m) {
+ struct userdata *u;
+ pa_assert(m);
+
+ if (!(u = m->userdata))
+ return;
+
+ if (u->sap_event)
+ m->core->mainloop->time_free(u->sap_event);
+
+ if (u->source_output) {
+ pa_source_output_unlink(u->source_output);
+ pa_source_output_unref(u->source_output);
+ }
+
+ pa_rtp_context_free(u->rtp_context);
+
+ pa_sap_send(&u->sap_context, 1);
+ pa_sap_context_destroy(&u->sap_context);
+
+ if (u->memblockq)
+ pa_memblockq_free(u->memblockq);
+
+ pa_xfree(u);
+}
diff --git a/src/modules/rtp/rtp-common.c b/src/modules/rtp/rtp-common.c
new file mode 100644
index 0000000..65e2c7a
--- /dev/null
+++ b/src/modules/rtp/rtp-common.c
@@ -0,0 +1,97 @@
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2006 Lennart Poettering
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include "rtp.h"
+
+#include <pulsecore/core-util.h>
+
+uint8_t pa_rtp_payload_from_sample_spec(const pa_sample_spec *ss) {
+ pa_assert(ss);
+
+ if (ss->format == PA_SAMPLE_S16BE && ss->rate == 44100 && ss->channels == 2)
+ return 10;
+ if (ss->format == PA_SAMPLE_S16BE && ss->rate == 44100 && ss->channels == 1)
+ return 11;
+
+ return 127;
+}
+
+pa_sample_spec *pa_rtp_sample_spec_from_payload(uint8_t payload, pa_sample_spec *ss) {
+ pa_assert(ss);
+
+ switch (payload) {
+ case 10:
+ ss->channels = 2;
+ ss->format = PA_SAMPLE_S16BE;
+ ss->rate = 44100;
+ break;
+
+ case 11:
+ ss->channels = 1;
+ ss->format = PA_SAMPLE_S16BE;
+ ss->rate = 44100;
+ break;
+
+ default:
+ return NULL;
+ }
+
+ return ss;
+}
+
+pa_sample_spec *pa_rtp_sample_spec_fixup(pa_sample_spec * ss) {
+ pa_assert(ss);
+
+ if (!pa_rtp_sample_spec_valid(ss))
+ ss->format = PA_SAMPLE_S16BE;
+
+ pa_assert(pa_rtp_sample_spec_valid(ss));
+ return ss;
+}
+
+int pa_rtp_sample_spec_valid(const pa_sample_spec *ss) {
+ pa_assert(ss);
+
+ if (!pa_sample_spec_valid(ss))
+ return 0;
+
+ return ss->format == PA_SAMPLE_S16BE;
+}
+
+const char* pa_rtp_format_to_string(pa_sample_format_t f) {
+ switch (f) {
+ case PA_SAMPLE_S16BE:
+ return "L16";
+ default:
+ return NULL;
+ }
+}
+
+pa_sample_format_t pa_rtp_string_to_format(const char *s) {
+ pa_assert(s);
+
+ if (pa_streq(s, "L16"))
+ return PA_SAMPLE_S16BE;
+ else
+ return PA_SAMPLE_INVALID;
+}
diff --git a/src/modules/rtp/rtp-gstreamer.c b/src/modules/rtp/rtp-gstreamer.c
new file mode 100644
index 0000000..28d367b
--- /dev/null
+++ b/src/modules/rtp/rtp-gstreamer.c
@@ -0,0 +1,665 @@
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2016 Arun Raghavan <mail@arunraghavan.net>
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <pulse/timeval.h>
+#include <pulsecore/fdsem.h>
+#include <pulsecore/core-rtclock.h>
+
+#include "rtp.h"
+
+#include <gio/gio.h>
+
+#include <gst/gst.h>
+#include <gst/app/gstappsrc.h>
+#include <gst/app/gstappsink.h>
+#include <gst/base/gstadapter.h>
+#include <gst/rtp/gstrtpbuffer.h>
+
+#define MAKE_ELEMENT_NAMED(v, e, n) \
+ v = gst_element_factory_make(e, n); \
+ if (!v) { \
+ pa_log("Could not create %s element", e); \
+ goto fail; \
+ }
+
+#define MAKE_ELEMENT(v, e) MAKE_ELEMENT_NAMED((v), (e), NULL)
+#define RTP_HEADER_SIZE 12
+
+struct pa_rtp_context {
+ pa_fdsem *fdsem;
+ pa_sample_spec ss;
+
+ GstElement *pipeline;
+ GstElement *appsrc;
+ GstElement *appsink;
+ GstCaps *meta_reference;
+
+ bool first_buffer;
+ uint32_t last_timestamp;
+
+ uint8_t *send_buf;
+ size_t mtu;
+};
+
+static GstCaps* caps_from_sample_spec(const pa_sample_spec *ss) {
+ if (ss->format != PA_SAMPLE_S16BE)
+ return NULL;
+
+ return gst_caps_new_simple("audio/x-raw",
+ "format", G_TYPE_STRING, "S16BE",
+ "rate", G_TYPE_INT, (int) ss->rate,
+ "channels", G_TYPE_INT, (int) ss->channels,
+ "layout", G_TYPE_STRING, "interleaved",
+ NULL);
+}
+
+static bool init_send_pipeline(pa_rtp_context *c, int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss) {
+ GstElement *appsrc = NULL, *pay = NULL, *capsf = NULL, *rtpbin = NULL, *sink = NULL;
+ GstCaps *caps;
+ GSocket *socket;
+ GInetSocketAddress *addr;
+ GInetAddress *iaddr;
+ guint16 port;
+ gchar *addr_str;
+
+ MAKE_ELEMENT(appsrc, "appsrc");
+ MAKE_ELEMENT(pay, "rtpL16pay");
+ MAKE_ELEMENT(capsf, "capsfilter");
+ MAKE_ELEMENT(rtpbin, "rtpbin");
+ MAKE_ELEMENT(sink, "udpsink");
+
+ c->pipeline = gst_pipeline_new(NULL);
+
+ gst_bin_add_many(GST_BIN(c->pipeline), appsrc, pay, capsf, rtpbin, sink, NULL);
+
+ caps = caps_from_sample_spec(ss);
+ if (!caps) {
+ pa_log("Unsupported format to payload");
+ goto fail;
+ }
+
+ socket = g_socket_new_from_fd(fd, NULL);
+ if (!socket) {
+ pa_log("Failed to create socket");
+ goto fail;
+ }
+
+ addr = G_INET_SOCKET_ADDRESS(g_socket_get_remote_address(socket, NULL));
+ iaddr = g_inet_socket_address_get_address(addr);
+ addr_str = g_inet_address_to_string(iaddr);
+ port = g_inet_socket_address_get_port(addr);
+
+ g_object_set(appsrc, "caps", caps, "is-live", TRUE, "blocksize", mtu, "format", 3 /* time */, NULL);
+ g_object_set(pay, "mtu", mtu, NULL);
+ g_object_set(sink, "socket", socket, "host", addr_str, "port", port,
+ "enable-last-sample", FALSE, "sync", FALSE, "loop",
+ g_socket_get_multicast_loopback(socket), "ttl",
+ g_socket_get_ttl(socket), "ttl-mc",
+ g_socket_get_multicast_ttl(socket), "auto-multicast", FALSE,
+ NULL);
+
+ g_free(addr_str);
+ g_object_unref(addr);
+ g_object_unref(socket);
+
+ gst_caps_unref(caps);
+
+ /* Force the payload type that we want */
+ caps = gst_caps_new_simple("application/x-rtp", "payload", G_TYPE_INT, (int) payload, NULL);
+ g_object_set(capsf, "caps", caps, NULL);
+ gst_caps_unref(caps);
+
+ if (!gst_element_link(appsrc, pay) ||
+ !gst_element_link(pay, capsf) ||
+ !gst_element_link_pads(capsf, "src", rtpbin, "send_rtp_sink_0") ||
+ !gst_element_link_pads(rtpbin, "send_rtp_src_0", sink, "sink")) {
+
+ pa_log("Could not set up send pipeline");
+ goto fail;
+ }
+
+ if (gst_element_set_state(c->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
+ pa_log("Could not start pipeline");
+ goto fail;
+ }
+
+ c->appsrc = gst_object_ref(appsrc);
+
+ return true;
+
+fail:
+ if (c->pipeline) {
+ gst_object_unref(c->pipeline);
+ } else {
+ /* These weren't yet added to pipeline, so we still have a ref */
+ if (appsrc)
+ gst_object_unref(appsrc);
+ if (pay)
+ gst_object_unref(pay);
+ if (capsf)
+ gst_object_unref(capsf);
+ if (rtpbin)
+ gst_object_unref(rtpbin);
+ if (sink)
+ gst_object_unref(sink);
+ }
+
+ return false;
+}
+
+pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss) {
+ pa_rtp_context *c = NULL;
+ GError *error = NULL;
+
+ pa_assert(fd >= 0);
+
+ pa_log_info("Initialising GStreamer RTP backend for send");
+
+ c = pa_xnew0(pa_rtp_context, 1);
+
+ c->ss = *ss;
+ c->mtu = mtu - RTP_HEADER_SIZE;
+ c->send_buf = pa_xmalloc(c->mtu);
+
+ if (!gst_init_check(NULL, NULL, &error)) {
+ pa_log_error("Could not initialise GStreamer: %s", error->message);
+ g_error_free(error);
+ goto fail;
+ }
+
+ if (!init_send_pipeline(c, fd, payload, mtu, ss))
+ goto fail;
+
+ return c;
+
+fail:
+ pa_rtp_context_free(c);
+ return NULL;
+}
+
+/* Called from I/O thread context */
+static bool process_bus_messages(pa_rtp_context *c) {
+ GstBus *bus;
+ GstMessage *message;
+ bool ret = true;
+
+ bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline));
+
+ while (ret && (message = gst_bus_pop(bus))) {
+ if (GST_MESSAGE_TYPE(message) == GST_MESSAGE_ERROR) {
+ GError *error = NULL;
+
+ ret = false;
+
+ gst_message_parse_error(message, &error, NULL);
+ pa_log("Got an error: %s", error->message);
+
+ g_error_free(error);
+ }
+
+ gst_message_unref(message);
+ }
+
+ gst_object_unref(bus);
+
+ return ret;
+}
+
+/* Called from I/O thread context */
+int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
+ GstBuffer *buf;
+ size_t n = 0;
+
+ pa_assert(c);
+ pa_assert(q);
+
+ if (!process_bus_messages(c))
+ return -1;
+
+ /*
+ * While we check here for atleast MTU worth of data being available in
+ * memblockq, we might not have exact equivalent to MTU. Hence, we walk
+ * over the memchunks in memblockq and accumulate MTU bytes next.
+ */
+ if (pa_memblockq_get_length(q) < c->mtu)
+ return 0;
+
+ for (;;) {
+ pa_memchunk chunk;
+ int r;
+
+ pa_memchunk_reset(&chunk);
+
+ if ((r = pa_memblockq_peek(q, &chunk)) >= 0) {
+ /*
+ * Accumulate MTU bytes of data before sending. If the current
+ * chunk length + accumulated bytes exceeds MTU, we drop bytes
+ * considered for transfer in this iteration from memblockq.
+ *
+ * The remaining bytes will be available in the next iteration,
+ * as these will be tracked and maintained by memblockq.
+ */
+ size_t k = n + chunk.length > c->mtu ? c->mtu - n : chunk.length;
+
+ pa_assert(chunk.memblock);
+
+ memcpy(c->send_buf + n, pa_memblock_acquire_chunk(&chunk), k);
+ pa_memblock_release(chunk.memblock);
+ pa_memblock_unref(chunk.memblock);
+
+ n += k;
+ pa_memblockq_drop(q, k);
+ }
+
+ if (r < 0 || n >= c->mtu) {
+ GstClock *clock;
+ GstClockTime timestamp, clock_time;
+ GstMapInfo info;
+
+ if (n > 0) {
+ clock = gst_element_get_clock(c->pipeline);
+ clock_time = gst_clock_get_time(clock);
+ gst_object_unref(clock);
+
+ timestamp = gst_element_get_base_time(c->pipeline);
+ if (timestamp > clock_time)
+ timestamp -= clock_time;
+ else
+ timestamp = 0;
+
+ buf = gst_buffer_new_allocate(NULL, n, NULL);
+ pa_assert(buf);
+
+ GST_BUFFER_PTS(buf) = timestamp;
+
+ pa_assert_se(gst_buffer_map(buf, &info, GST_MAP_WRITE));
+
+ memcpy(info.data, c->send_buf, n);
+ gst_buffer_unmap(buf, &info);
+
+ if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) {
+ pa_log_error("Could not push buffer");
+ return -1;
+ }
+ }
+
+ if (r < 0 || pa_memblockq_get_length(q) < c->mtu)
+ break;
+
+ n = 0;
+ }
+ }
+
+ return 0;
+}
+
+static GstCaps* rtp_caps_from_sample_spec(const pa_sample_spec *ss) {
+ if (ss->format != PA_SAMPLE_S16BE)
+ return NULL;
+
+ return gst_caps_new_simple("application/x-rtp",
+ "media", G_TYPE_STRING, "audio",
+ "encoding-name", G_TYPE_STRING, "L16",
+ "clock-rate", G_TYPE_INT, (int) ss->rate,
+ "payload", G_TYPE_INT, (int) pa_rtp_payload_from_sample_spec(ss),
+ "layout", G_TYPE_STRING, "interleaved",
+ NULL);
+}
+
+static void on_pad_added(GstElement *element, GstPad *pad, gpointer userdata) {
+ pa_rtp_context *c = (pa_rtp_context *) userdata;
+ GstElement *depay;
+ GstPad *sinkpad;
+ GstPadLinkReturn ret;
+
+ depay = gst_bin_get_by_name(GST_BIN(c->pipeline), "depay");
+ pa_assert(depay);
+
+ sinkpad = gst_element_get_static_pad(depay, "sink");
+
+ ret = gst_pad_link(pad, sinkpad);
+ if (ret != GST_PAD_LINK_OK) {
+ GstBus *bus;
+ GError *error;
+
+ bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline));
+ error = g_error_new(GST_CORE_ERROR, GST_CORE_ERROR_PAD, "Could not link rtpbin to depayloader");
+ gst_bus_post(bus, gst_message_new_error(GST_OBJECT(c->pipeline), error, NULL));
+
+ /* Actually cause the I/O thread to wake up and process the error */
+ pa_fdsem_post(c->fdsem);
+
+ g_error_free(error);
+ gst_object_unref(bus);
+ }
+
+ gst_object_unref(sinkpad);
+ gst_object_unref(depay);
+}
+
+static GstPadProbeReturn udpsrc_buffer_probe(GstPad *pad, GstPadProbeInfo *info, gpointer userdata) {
+ struct timeval tv;
+ pa_usec_t timestamp;
+ pa_rtp_context *c = (pa_rtp_context *) userdata;
+
+ pa_assert(info->type & GST_PAD_PROBE_TYPE_BUFFER);
+
+ pa_gettimeofday(&tv);
+ timestamp = pa_timeval_load(&tv);
+
+ gst_buffer_add_reference_timestamp_meta(GST_BUFFER(info->data), c->meta_reference, timestamp * GST_USECOND,
+ GST_CLOCK_TIME_NONE);
+
+ return GST_PAD_PROBE_OK;
+}
+
+static bool init_receive_pipeline(pa_rtp_context *c, int fd, const pa_sample_spec *ss) {
+ GstElement *udpsrc = NULL, *rtpbin = NULL, *depay = NULL, *appsink = NULL;
+ GstCaps *caps;
+ GstPad *pad;
+ GSocket *socket;
+ GError *error = NULL;
+
+ MAKE_ELEMENT(udpsrc, "udpsrc");
+ MAKE_ELEMENT(rtpbin, "rtpbin");
+ MAKE_ELEMENT_NAMED(depay, "rtpL16depay", "depay");
+ MAKE_ELEMENT(appsink, "appsink");
+
+ c->pipeline = gst_pipeline_new(NULL);
+
+ gst_bin_add_many(GST_BIN(c->pipeline), udpsrc, rtpbin, depay, appsink, NULL);
+
+ socket = g_socket_new_from_fd(fd, &error);
+ if (error) {
+ pa_log("Could not create socket: %s", error->message);
+ g_error_free(error);
+ goto fail;
+ }
+
+ caps = rtp_caps_from_sample_spec(ss);
+ if (!caps) {
+ pa_log("Unsupported format to payload");
+ goto fail;
+ }
+
+ g_object_set(udpsrc, "socket", socket, "caps", caps, "auto-multicast" /* caller handles this */, FALSE, NULL);
+ g_object_set(rtpbin, "latency", 0, "buffer-mode", 0 /* none */, NULL);
+ g_object_set(appsink, "sync", FALSE, "enable-last-sample", FALSE, NULL);
+
+ gst_caps_unref(caps);
+ g_object_unref(socket);
+
+ if (!gst_element_link_pads(udpsrc, "src", rtpbin, "recv_rtp_sink_0") ||
+ !gst_element_link(depay, appsink)) {
+
+ pa_log("Could not set up receive pipeline");
+ goto fail;
+ }
+
+ g_signal_connect(G_OBJECT(rtpbin), "pad-added", G_CALLBACK(on_pad_added), c);
+
+ /* This logic should go into udpsrc, and we should be populating the
+ * receive timestamp using SCM_TIMESTAMP, but until we have that ... */
+ c->meta_reference = gst_caps_new_empty_simple("timestamp/x-pulseaudio-wallclock");
+
+ pad = gst_element_get_static_pad(udpsrc, "src");
+ gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, udpsrc_buffer_probe, c, NULL);
+ gst_object_unref(pad);
+
+ if (gst_element_set_state(c->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
+ pa_log("Could not start pipeline");
+ goto fail;
+ }
+
+ c->appsink = gst_object_ref(appsink);
+
+ return true;
+
+fail:
+ if (c->pipeline) {
+ gst_object_unref(c->pipeline);
+ } else {
+ /* These weren't yet added to pipeline, so we still have a ref */
+ if (udpsrc)
+ gst_object_unref(udpsrc);
+ if (depay)
+ gst_object_unref(depay);
+ if (rtpbin)
+ gst_object_unref(rtpbin);
+ if (appsink)
+ gst_object_unref(appsink);
+ }
+
+ return false;
+}
+
+/* Called from the GStreamer streaming thread */
+static void appsink_eos(GstAppSink *appsink, gpointer userdata) {
+ pa_rtp_context *c = (pa_rtp_context *) userdata;
+
+ pa_fdsem_post(c->fdsem);
+}
+
+/* Called from the GStreamer streaming thread */
+static GstFlowReturn appsink_new_sample(GstAppSink *appsink, gpointer userdata) {
+ pa_rtp_context *c = (pa_rtp_context *) userdata;
+
+ pa_fdsem_post(c->fdsem);
+
+ return GST_FLOW_OK;
+}
+
+pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample_spec *ss) {
+ pa_rtp_context *c = NULL;
+ GstAppSinkCallbacks callbacks = { 0, };
+ GError *error = NULL;
+
+ pa_assert(fd >= 0);
+
+ pa_log_info("Initialising GStreamer RTP backend for receive");
+
+ c = pa_xnew0(pa_rtp_context, 1);
+
+ c->fdsem = pa_fdsem_new();
+ c->ss = *ss;
+ c->send_buf = NULL;
+ c->first_buffer = true;
+
+ if (!gst_init_check(NULL, NULL, &error)) {
+ pa_log_error("Could not initialise GStreamer: %s", error->message);
+ g_error_free(error);
+ goto fail;
+ }
+
+ if (!init_receive_pipeline(c, fd, ss))
+ goto fail;
+
+ callbacks.eos = appsink_eos;
+ callbacks.new_sample = appsink_new_sample;
+ gst_app_sink_set_callbacks(GST_APP_SINK(c->appsink), &callbacks, c, NULL);
+
+ return c;
+
+fail:
+ pa_rtp_context_free(c);
+ return NULL;
+}
+
+/* Called from I/O thread context */
+int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) {
+ GstSample *sample = NULL;
+ GstBufferList *buf_list;
+ GstAdapter *adapter;
+ GstBuffer *buf;
+ GstMapInfo info;
+ GstClockTime timestamp = GST_CLOCK_TIME_NONE;
+ uint8_t *data;
+ uint64_t data_len = 0;
+
+ if (!process_bus_messages(c))
+ goto fail;
+
+ adapter = gst_adapter_new();
+ pa_assert(adapter);
+
+ while (true) {
+ sample = gst_app_sink_try_pull_sample(GST_APP_SINK(c->appsink), 0);
+ if (!sample)
+ break;
+
+ buf = gst_sample_get_buffer(sample);
+
+ /* Get the timestamp from the first buffer */
+ if (timestamp == GST_CLOCK_TIME_NONE) {
+ GstReferenceTimestampMeta *meta = gst_buffer_get_reference_timestamp_meta(buf, c->meta_reference);
+
+ /* Use the meta if we were able to insert it and it came through,
+ * else try to fallback to the DTS, which is only available in
+ * GStreamer 1.16 and earlier. */
+ if (meta)
+ timestamp = meta->timestamp;
+ else if (GST_BUFFER_DTS(buf) != GST_CLOCK_TIME_NONE)
+ timestamp = GST_BUFFER_DTS(buf);
+ else
+ timestamp = 0;
+ }
+
+ if (GST_BUFFER_IS_DISCONT(buf))
+ pa_log_info("Discontinuity detected, possibly lost some packets");
+
+ if (!gst_buffer_map(buf, &info, GST_MAP_READ)) {
+ pa_log_info("Failed to map buffer");
+ gst_sample_unref(sample);
+ goto fail;
+ }
+
+ data_len += info.size;
+ /* We need the buffer to be valid longer than the sample, which will
+ * be valid only for the duration of this loop.
+ *
+ * To do this, increase the ref count. Ownership is transferred to the
+ * adapter in gst_adapter_push.
+ */
+ gst_buffer_ref(buf);
+ gst_adapter_push(adapter, buf);
+ gst_buffer_unmap(buf, &info);
+
+ gst_sample_unref(sample);
+ }
+
+ buf_list = gst_adapter_take_buffer_list(adapter, data_len);
+ pa_assert(buf_list);
+
+ pa_assert(pa_mempool_block_size_max(pool) >= data_len);
+
+ chunk->memblock = pa_memblock_new(pool, data_len);
+ chunk->index = 0;
+ chunk->length = data_len;
+
+ data = (uint8_t *) pa_memblock_acquire_chunk(chunk);
+
+ for (int i = 0; i < gst_buffer_list_length(buf_list); i++) {
+ buf = gst_buffer_list_get(buf_list, i);
+
+ if (!gst_buffer_map(buf, &info, GST_MAP_READ)) {
+ gst_buffer_list_unref(buf_list);
+ goto fail;
+ }
+
+ memcpy(data, info.data, info.size);
+ data += info.size;
+ gst_buffer_unmap(buf, &info);
+ }
+
+ pa_memblock_release(chunk->memblock);
+
+ /* When buffer-mode = none, the buffer PTS is the RTP timestamp, converted
+ * to time units (instead of clock-rate units as is in the header) and
+ * wraparound-corrected. */
+ *rtp_tstamp = gst_util_uint64_scale_int(GST_BUFFER_PTS(gst_buffer_list_get(buf_list, 0)), c->ss.rate, GST_SECOND) & 0xFFFFFFFFU;
+ if (timestamp != GST_CLOCK_TIME_NONE)
+ pa_timeval_rtstore(tstamp, timestamp / PA_NSEC_PER_USEC, false);
+
+ if (c->first_buffer) {
+ c->first_buffer = false;
+ c->last_timestamp = *rtp_tstamp;
+ } else {
+ /* The RTP clock -> time domain -> RTP clock transformation above might
+ * add a ±1 rounding error, so let's get rid of that */
+ uint32_t expected = c->last_timestamp + (uint32_t) (data_len / pa_rtp_context_get_frame_size(c));
+ int delta = *rtp_tstamp - expected;
+
+ if (delta == 1 || delta == -1)
+ *rtp_tstamp -= delta;
+
+ c->last_timestamp = *rtp_tstamp;
+ }
+
+ gst_buffer_list_unref(buf_list);
+ gst_object_unref(adapter);
+
+ return 0;
+
+fail:
+ if (adapter)
+ gst_object_unref(adapter);
+
+ if (chunk->memblock)
+ pa_memblock_unref(chunk->memblock);
+
+ return -1;
+}
+
+void pa_rtp_context_free(pa_rtp_context *c) {
+ pa_assert(c);
+
+ if (c->meta_reference)
+ gst_caps_unref(c->meta_reference);
+
+ if (c->appsrc) {
+ gst_app_src_end_of_stream(GST_APP_SRC(c->appsrc));
+ gst_object_unref(c->appsrc);
+ pa_xfree(c->send_buf);
+ }
+
+ if (c->appsink)
+ gst_object_unref(c->appsink);
+
+ if (c->pipeline) {
+ gst_element_set_state(c->pipeline, GST_STATE_NULL);
+ gst_object_unref(c->pipeline);
+ }
+
+ if (c->fdsem)
+ pa_fdsem_free(c->fdsem);
+
+ pa_xfree(c);
+}
+
+pa_rtpoll_item* pa_rtp_context_get_rtpoll_item(pa_rtp_context *c, pa_rtpoll *rtpoll) {
+ return pa_rtpoll_item_new_fdsem(rtpoll, PA_RTPOLL_LATE, c->fdsem);
+}
+
+size_t pa_rtp_context_get_frame_size(pa_rtp_context *c) {
+ return pa_frame_size(&c->ss);
+}
diff --git a/src/modules/rtp/rtp-native.c b/src/modules/rtp/rtp-native.c
new file mode 100644
index 0000000..01e668c
--- /dev/null
+++ b/src/modules/rtp/rtp-native.c
@@ -0,0 +1,404 @@
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2006 Lennart Poettering
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <unistd.h>
+#include <sys/ioctl.h>
+
+#ifdef HAVE_SYS_FILIO_H
+#include <sys/filio.h>
+#endif
+
+#ifdef HAVE_SYS_UIO_H
+#include <sys/uio.h>
+#endif
+
+#include <pulsecore/core-error.h>
+#include <pulsecore/log.h>
+#include <pulsecore/macro.h>
+#include <pulsecore/core-util.h>
+#include <pulsecore/arpa-inet.h>
+#include <pulsecore/poll.h>
+
+#include "rtp.h"
+
+typedef struct pa_rtp_context {
+ int fd;
+ uint16_t sequence;
+ uint32_t timestamp;
+ uint32_t ssrc;
+ uint8_t payload;
+ size_t frame_size;
+ size_t mtu;
+
+ uint8_t *recv_buf;
+ size_t recv_buf_size;
+ pa_memchunk memchunk;
+} pa_rtp_context;
+
+pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss) {
+ pa_rtp_context *c;
+
+ pa_assert(fd >= 0);
+
+ pa_log_info("Initialising native RTP backend for send");
+
+ c = pa_xnew0(pa_rtp_context, 1);
+
+ c->fd = fd;
+ c->sequence = (uint16_t) (rand()*rand());
+ c->timestamp = 0;
+ c->ssrc = (uint32_t) (rand()*rand());
+ c->payload = (uint8_t) (payload & 127U);
+ c->frame_size = pa_frame_size(ss);
+ c->mtu = mtu;
+
+ c->recv_buf = NULL;
+ c->recv_buf_size = 0;
+ pa_memchunk_reset(&c->memchunk);
+
+ return c;
+}
+
+#define MAX_IOVECS 16
+
+int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
+ struct iovec iov[MAX_IOVECS];
+ pa_memblock* mb[MAX_IOVECS];
+ int iov_idx = 1;
+ size_t n = 0;
+
+ pa_assert(c);
+ pa_assert(q);
+
+ if (pa_memblockq_get_length(q) < c->mtu)
+ return 0;
+
+ for (;;) {
+ int r;
+ pa_memchunk chunk;
+
+ pa_memchunk_reset(&chunk);
+
+ if ((r = pa_memblockq_peek(q, &chunk)) >= 0) {
+
+ size_t k = n + chunk.length > c->mtu ? c->mtu - n : chunk.length;
+
+ pa_assert(chunk.memblock);
+
+ iov[iov_idx].iov_base = pa_memblock_acquire_chunk(&chunk);
+ iov[iov_idx].iov_len = k;
+ mb[iov_idx] = chunk.memblock;
+ iov_idx ++;
+
+ n += k;
+ pa_memblockq_drop(q, k);
+ }
+
+ pa_assert(n % c->frame_size == 0);
+
+ if (r < 0 || n >= c->mtu || iov_idx >= MAX_IOVECS) {
+ uint32_t header[3];
+ struct msghdr m;
+ ssize_t k;
+ int i;
+
+ if (n > 0) {
+ header[0] = htonl(((uint32_t) 2 << 30) | ((uint32_t) c->payload << 16) | ((uint32_t) c->sequence));
+ header[1] = htonl(c->timestamp);
+ header[2] = htonl(c->ssrc);
+
+ iov[0].iov_base = (void*)header;
+ iov[0].iov_len = sizeof(header);
+
+ m.msg_name = NULL;
+ m.msg_namelen = 0;
+ m.msg_iov = iov;
+ m.msg_iovlen = (size_t) iov_idx;
+ m.msg_control = NULL;
+ m.msg_controllen = 0;
+ m.msg_flags = 0;
+
+ k = sendmsg(c->fd, &m, MSG_DONTWAIT);
+
+ for (i = 1; i < iov_idx; i++) {
+ pa_memblock_release(mb[i]);
+ pa_memblock_unref(mb[i]);
+ }
+
+ c->sequence++;
+ } else
+ k = 0;
+
+ c->timestamp += (unsigned) (n/c->frame_size);
+
+ if (k < 0) {
+ if (errno != EAGAIN && errno != EINTR) /* If the queue is full, just ignore it */
+ pa_log("sendmsg() failed: %s", pa_cstrerror(errno));
+ return -1;
+ }
+
+ if (r < 0 || pa_memblockq_get_length(q) < c->mtu)
+ break;
+
+ n = 0;
+ iov_idx = 1;
+ }
+ }
+
+ return 0;
+}
+
+pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample_spec *ss) {
+ pa_rtp_context *c;
+
+ pa_log_info("Initialising native RTP backend for receive");
+
+ c = pa_xnew0(pa_rtp_context, 1);
+
+ c->fd = fd;
+ c->payload = payload;
+ c->frame_size = pa_frame_size(ss);
+
+ c->recv_buf_size = 2000;
+ c->recv_buf = pa_xmalloc(c->recv_buf_size);
+ pa_memchunk_reset(&c->memchunk);
+
+ return c;
+}
+
+int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) {
+ int size;
+ size_t audio_length;
+ size_t metadata_length;
+ struct msghdr m;
+ struct cmsghdr *cm;
+ struct iovec iov;
+ uint32_t header;
+ uint32_t ssrc;
+ uint8_t payload;
+ unsigned cc;
+ ssize_t r;
+ uint8_t aux[1024];
+ bool found_tstamp = false;
+
+ pa_assert(c);
+ pa_assert(chunk);
+
+ pa_memchunk_reset(chunk);
+
+ if (ioctl(c->fd, FIONREAD, &size) < 0) {
+ pa_log_warn("FIONREAD failed: %s", pa_cstrerror(errno));
+ goto fail;
+ }
+
+ if (size <= 0) {
+ /* size can be 0 due to any of the following reasons:
+ *
+ * 1. Somebody sent us a perfectly valid zero-length UDP packet.
+ * 2. Somebody sent us a UDP packet with a bad CRC.
+ *
+ * It is unknown whether size can actually be less than zero.
+ *
+ * In the first case, the packet has to be read out, otherwise the
+ * kernel will tell us again and again about it, thus preventing
+ * reception of any further packets. So let's just read it out
+ * now and discard it later, when comparing the number of bytes
+ * received (0) with the number of bytes wanted (1, see below).
+ *
+ * In the second case, recvmsg() will fail, thus allowing us to
+ * return the error.
+ *
+ * Just to avoid passing zero-sized memchunks and NULL pointers to
+ * recvmsg(), let's force allocation of at least one byte by setting
+ * size to 1.
+ */
+ size = 1;
+ }
+
+ if (c->recv_buf_size < (size_t) size) {
+ do
+ c->recv_buf_size *= 2;
+ while (c->recv_buf_size < (size_t) size);
+
+ c->recv_buf = pa_xrealloc(c->recv_buf, c->recv_buf_size);
+ }
+
+ pa_assert(c->recv_buf_size >= (size_t) size);
+
+ iov.iov_base = c->recv_buf;
+ iov.iov_len = (size_t) size;
+
+ m.msg_name = NULL;
+ m.msg_namelen = 0;
+ m.msg_iov = &iov;
+ m.msg_iovlen = 1;
+ m.msg_control = aux;
+ m.msg_controllen = sizeof(aux);
+ m.msg_flags = 0;
+
+ r = recvmsg(c->fd, &m, 0);
+
+ if (r != size) {
+ if (r < 0 && errno != EAGAIN && errno != EINTR)
+ pa_log_warn("recvmsg() failed: %s", r < 0 ? pa_cstrerror(errno) : "size mismatch");
+
+ goto fail;
+ }
+
+ if (size < 12) {
+ pa_log_warn("RTP packet too short.");
+ goto fail;
+ }
+
+ memcpy(&header, iov.iov_base, sizeof(uint32_t));
+ memcpy(rtp_tstamp, (uint8_t*) iov.iov_base + 4, sizeof(uint32_t));
+ memcpy(&ssrc, (uint8_t*) iov.iov_base + 8, sizeof(uint32_t));
+
+ header = ntohl(header);
+ *rtp_tstamp = ntohl(*rtp_tstamp);
+ ssrc = ntohl(c->ssrc);
+
+ if ((header >> 30) != 2) {
+ pa_log_warn("Unsupported RTP version.");
+ goto fail;
+ }
+
+ if ((header >> 29) & 1) {
+ pa_log_warn("RTP padding not supported.");
+ goto fail;
+ }
+
+ if ((header >> 28) & 1) {
+ pa_log_warn("RTP header extensions not supported.");
+ goto fail;
+ }
+
+ if (ssrc != c->ssrc) {
+ pa_log_debug("Got unexpected SSRC");
+ goto fail;
+ }
+
+ cc = (header >> 24) & 0xF;
+ payload = (uint8_t) ((header >> 16) & 127U);
+ c->sequence = (uint16_t) (header & 0xFFFFU);
+
+ metadata_length = 12 + cc * 4;
+
+ if (payload != c->payload) {
+ pa_log_debug("Got unexpected payload: %u", payload);
+ goto fail;
+ }
+
+ if (metadata_length > (unsigned) size) {
+ pa_log_warn("RTP packet too short. (CSRC)");
+ goto fail;
+ }
+
+ audio_length = size - metadata_length;
+
+ if (audio_length % c->frame_size != 0) {
+ pa_log_warn("Bad RTP packet size.");
+ goto fail;
+ }
+
+ if (c->memchunk.length < (unsigned) audio_length) {
+ size_t l;
+
+ if (c->memchunk.memblock)
+ pa_memblock_unref(c->memchunk.memblock);
+
+ l = PA_MAX((size_t) audio_length, pa_mempool_block_size_max(pool));
+
+ c->memchunk.memblock = pa_memblock_new(pool, l);
+ c->memchunk.index = 0;
+ c->memchunk.length = pa_memblock_get_length(c->memchunk.memblock);
+ }
+
+ memcpy(pa_memblock_acquire_chunk(&c->memchunk), c->recv_buf + metadata_length, audio_length);
+ pa_memblock_release(c->memchunk.memblock);
+
+ chunk->memblock = pa_memblock_ref(c->memchunk.memblock);
+ chunk->index = c->memchunk.index;
+ chunk->length = audio_length;
+
+ c->memchunk.index += audio_length;
+ c->memchunk.length -= audio_length;
+
+ if (c->memchunk.length <= 0) {
+ pa_memblock_unref(c->memchunk.memblock);
+ pa_memchunk_reset(&c->memchunk);
+ }
+
+ for (cm = CMSG_FIRSTHDR(&m); cm; cm = CMSG_NXTHDR(&m, cm))
+ if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_TIMESTAMP) {
+ memcpy(tstamp, CMSG_DATA(cm), sizeof(struct timeval));
+ found_tstamp = true;
+ break;
+ }
+
+ if (!found_tstamp) {
+ pa_log_warn("Couldn't find SCM_TIMESTAMP data in auxiliary recvmsg() data!");
+ pa_zero(*tstamp);
+ }
+
+ return 0;
+
+fail:
+ if (chunk->memblock)
+ pa_memblock_unref(chunk->memblock);
+
+ return -1;
+}
+
+void pa_rtp_context_free(pa_rtp_context *c) {
+ pa_assert(c);
+
+ pa_assert_se(pa_close(c->fd) == 0);
+
+ if (c->memchunk.memblock)
+ pa_memblock_unref(c->memchunk.memblock);
+
+ pa_xfree(c->recv_buf);
+ pa_xfree(c);
+}
+
+size_t pa_rtp_context_get_frame_size(pa_rtp_context *c) {
+ return c->frame_size;
+}
+
+pa_rtpoll_item* pa_rtp_context_get_rtpoll_item(pa_rtp_context *c, pa_rtpoll *rtpoll) {
+ pa_rtpoll_item *item;
+ struct pollfd *p;
+
+ item = pa_rtpoll_item_new(rtpoll, PA_RTPOLL_LATE, 1);
+
+ p = pa_rtpoll_item_get_pollfd(item, NULL);
+ p->fd = c->fd;
+ p->events = POLLIN;
+ p->revents = 0;
+
+ return item;
+}
diff --git a/src/modules/rtp/rtp.h b/src/modules/rtp/rtp.h
new file mode 100644
index 0000000..372df75
--- /dev/null
+++ b/src/modules/rtp/rtp.h
@@ -0,0 +1,56 @@
+#ifndef foortphfoo
+#define foortphfoo
+
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2006 Lennart Poettering
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#include <inttypes.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <pulsecore/memblockq.h>
+#include <pulsecore/memchunk.h>
+#include <pulsecore/rtpoll.h>
+
+typedef struct pa_rtp_context pa_rtp_context;
+
+int pa_rtp_context_init_send(pa_rtp_context *c, int fd, uint8_t payload, size_t mtu, size_t frame_size);
+pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss);
+
+/* If the memblockq doesn't have a silence memchunk set, then the caller must
+ * guarantee that the current read index doesn't point to a hole. */
+int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q);
+
+pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample_spec *ss);
+int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp);
+
+void pa_rtp_context_free(pa_rtp_context *c);
+
+size_t pa_rtp_context_get_frame_size(pa_rtp_context *c);
+pa_rtpoll_item* pa_rtp_context_get_rtpoll_item(pa_rtp_context *c, pa_rtpoll *rtpoll);
+
+pa_sample_spec* pa_rtp_sample_spec_fixup(pa_sample_spec *ss);
+int pa_rtp_sample_spec_valid(const pa_sample_spec *ss);
+
+uint8_t pa_rtp_payload_from_sample_spec(const pa_sample_spec *ss);
+pa_sample_spec *pa_rtp_sample_spec_from_payload(uint8_t payload, pa_sample_spec *ss);
+
+const char* pa_rtp_format_to_string(pa_sample_format_t f);
+pa_sample_format_t pa_rtp_string_to_format(const char *s);
+
+#endif
diff --git a/src/modules/rtp/rtsp_client.c b/src/modules/rtp/rtsp_client.c
new file mode 100644
index 0000000..9fd386a
--- /dev/null
+++ b/src/modules/rtp/rtsp_client.c
@@ -0,0 +1,643 @@
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2008 Colin Guthrie
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <unistd.h>
+#include <sys/ioctl.h>
+#include <netinet/in.h>
+#include <pulse/rtclock.h>
+#include <pulse/timeval.h>
+
+#ifdef HAVE_SYS_FILIO_H
+#include <sys/filio.h>
+#endif
+
+#include <pulse/xmalloc.h>
+
+#include <pulsecore/core-error.h>
+#include <pulsecore/core-util.h>
+#include <pulsecore/log.h>
+#include <pulsecore/macro.h>
+#include <pulsecore/strbuf.h>
+#include <pulsecore/ioline.h>
+#include <pulsecore/arpa-inet.h>
+#include <pulsecore/random.h>
+#include <pulsecore/core-rtclock.h>
+
+#include "rtsp_client.h"
+
+#define RECONNECT_INTERVAL (5 * PA_USEC_PER_SEC)
+
+struct pa_rtsp_client {
+ pa_mainloop_api *mainloop;
+ char *hostname;
+ uint16_t port;
+
+ pa_socket_client *sc;
+ pa_ioline *ioline;
+
+ pa_rtsp_cb_t callback;
+
+ void *userdata;
+ const char *useragent;
+
+ pa_rtsp_state_t state;
+ pa_rtsp_status_t status;
+ uint8_t waiting;
+
+ pa_headerlist* headers;
+ char *last_header;
+ pa_strbuf *header_buffer;
+ pa_headerlist* response_headers;
+
+ char *localip;
+ char *url;
+ uint16_t rtp_port;
+ uint32_t cseq;
+ char *session;
+ char *transport;
+ pa_time_event *reconnect_event;
+ bool autoreconnect;
+};
+
+pa_rtsp_client* pa_rtsp_client_new(pa_mainloop_api *mainloop, const char *hostname, uint16_t port, const char *useragent, bool autoreconnect) {
+ pa_rtsp_client *c;
+
+ pa_assert(mainloop);
+ pa_assert(hostname);
+ pa_assert(port > 0);
+
+ c = pa_xnew0(pa_rtsp_client, 1);
+ c->mainloop = mainloop;
+ c->hostname = pa_xstrdup(hostname);
+ c->port = port;
+ c->headers = pa_headerlist_new();
+
+ if (useragent)
+ c->useragent = useragent;
+ else
+ c->useragent = "PulseAudio RTSP Client";
+
+ c->autoreconnect = autoreconnect;
+ return c;
+}
+
+static void free_events(pa_rtsp_client *c) {
+ pa_assert(c);
+
+ if (c->reconnect_event) {
+ c->mainloop->time_free(c->reconnect_event);
+ c->reconnect_event = NULL;
+ }
+}
+
+void pa_rtsp_client_free(pa_rtsp_client *c) {
+ pa_assert(c);
+
+ free_events(c);
+ if (c->sc)
+ pa_socket_client_unref(c->sc);
+
+ pa_rtsp_disconnect(c);
+
+ pa_xfree(c->hostname);
+ pa_xfree(c->url);
+ pa_xfree(c->localip);
+ pa_xfree(c->session);
+ pa_xfree(c->transport);
+ pa_xfree(c->last_header);
+ if (c->header_buffer)
+ pa_strbuf_free(c->header_buffer);
+ if (c->response_headers)
+ pa_headerlist_free(c->response_headers);
+ pa_headerlist_free(c->headers);
+
+ pa_xfree(c);
+}
+
+static void headers_read(pa_rtsp_client *c) {
+ char delimiters[] = ";";
+ char* token;
+
+ pa_assert(c);
+ pa_assert(c->response_headers);
+ pa_assert(c->callback);
+
+ /* Deal with a SETUP response */
+ if (STATE_SETUP == c->state) {
+ const char* token_state = NULL;
+ const char* pc = NULL;
+ c->session = pa_xstrdup(pa_headerlist_gets(c->response_headers, "Session"));
+ c->transport = pa_xstrdup(pa_headerlist_gets(c->response_headers, "Transport"));
+
+ if (!c->session || !c->transport) {
+ pa_log("Invalid SETUP response.");
+ return;
+ }
+
+ /* Now parse out the server port component of the response. */
+ while ((token = pa_split(c->transport, delimiters, &token_state))) {
+ if ((pc = strchr(token, '='))) {
+ if (0 == strncmp(token, "server_port", 11)) {
+ uint32_t p;
+
+ if (pa_atou(pc + 1, &p) < 0 || p <= 0 || p > 0xffff) {
+ pa_log("Invalid SETUP response (invalid server_port).");
+ pa_xfree(token);
+ return;
+ }
+
+ c->rtp_port = p;
+ pa_xfree(token);
+ break;
+ }
+ }
+ pa_xfree(token);
+ }
+ if (0 == c->rtp_port) {
+ /* Error no server_port in response */
+ pa_log("Invalid SETUP response (no port number).");
+ return;
+ }
+ }
+
+ /* Call our callback */
+ c->callback(c, c->state, c->status, c->response_headers, c->userdata);
+}
+
+static void line_callback(pa_ioline *line, const char *s, void *userdata) {
+ pa_rtsp_client *c = userdata;
+ char *delimpos;
+ char *s2, *s2p;
+
+ pa_assert(line);
+ pa_assert(c);
+ pa_assert(c->callback);
+
+ if (!s) {
+ /* Keep the ioline/iochannel open as they will be freed automatically */
+ c->ioline = NULL;
+ c->callback(c, STATE_DISCONNECTED, STATUS_NO_RESPONSE, NULL, c->userdata);
+ return;
+ }
+
+ s2 = pa_xstrdup(s);
+ /* Trim trailing carriage returns */
+ s2p = s2 + strlen(s2) - 1;
+ while (s2p >= s2 && '\r' == *s2p) {
+ *s2p = '\0';
+ s2p -= 1;
+ }
+
+ if (c->waiting && pa_streq(s2, "RTSP/1.0 200 OK")) {
+ if (c->response_headers)
+ pa_headerlist_free(c->response_headers);
+ c->response_headers = pa_headerlist_new();
+
+ c->status = STATUS_OK;
+ c->waiting = 0;
+ goto exit;
+ } else if (c->waiting && pa_streq(s2, "RTSP/1.0 401 Unauthorized")) {
+ if (c->response_headers)
+ pa_headerlist_free(c->response_headers);
+ c->response_headers = pa_headerlist_new();
+
+ c->status = STATUS_UNAUTHORIZED;
+ c->waiting = 0;
+ goto exit;
+ } else if (c->waiting) {
+ pa_log_warn("Unexpected/Unhandled response: %s", s2);
+
+ if (pa_streq(s2, "RTSP/1.0 400 Bad Request"))
+ c->status = STATUS_BAD_REQUEST;
+ else if (pa_streq(s2, "RTSP/1.0 500 Internal Server Error"))
+ c->status = STATUS_INTERNAL_ERROR;
+ else
+ c->status = STATUS_NO_RESPONSE;
+ goto exit;
+ }
+
+ if (!strlen(s2)) {
+ /* End of headers */
+ /* We will have a header left from our looping iteration, so add it in :) */
+ if (c->last_header) {
+ char *tmp = pa_strbuf_to_string_free(c->header_buffer);
+ /* This is not a continuation header so let's dump it into our proplist */
+ pa_headerlist_puts(c->response_headers, c->last_header, tmp);
+ pa_xfree(tmp);
+ pa_xfree(c->last_header);
+ c->last_header = NULL;
+ c->header_buffer = NULL;
+ }
+
+ pa_log_debug("Full response received. Dispatching");
+ headers_read(c);
+ goto exit;
+ }
+
+ /* Read and parse a header (we know it's not empty) */
+ /* TODO: Move header reading into the headerlist. */
+
+ /* If the first character is a space, it's a continuation header */
+ if (c->last_header && ' ' == s2[0]) {
+ pa_assert(c->header_buffer);
+
+ /* Add this line to the buffer (sans the space) */
+ pa_strbuf_puts(c->header_buffer, &(s2[1]));
+ goto exit;
+ }
+
+ if (c->last_header) {
+ char *tmp = pa_strbuf_to_string_free(c->header_buffer);
+ /* This is not a continuation header so let's dump the full
+ header/value into our proplist */
+ pa_headerlist_puts(c->response_headers, c->last_header, tmp);
+ pa_xfree(tmp);
+ pa_xfree(c->last_header);
+ c->last_header = NULL;
+ c->header_buffer = NULL;
+ }
+
+ delimpos = strstr(s2, ":");
+ if (!delimpos) {
+ pa_log_warn("Unexpected response when expecting header: %s", s);
+ goto exit;
+ }
+
+ pa_assert(!c->header_buffer);
+ pa_assert(!c->last_header);
+
+ c->header_buffer = pa_strbuf_new();
+ if (strlen(delimpos) > 1) {
+ /* Cut our line off so we can copy the header name out */
+ *delimpos++ = '\0';
+
+ /* Trim the front of any spaces */
+ while (' ' == *delimpos)
+ ++delimpos;
+
+ pa_strbuf_puts(c->header_buffer, delimpos);
+ } else {
+ /* Cut our line off so we can copy the header name out */
+ *delimpos = '\0';
+ }
+
+ /* Save the header name */
+ c->last_header = pa_xstrdup(s2);
+
+ exit:
+ pa_xfree(s2);
+}
+
+static void reconnect_cb(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
+ if (userdata) {
+ pa_rtsp_client *c = userdata;
+ pa_rtsp_connect(c);
+ }
+}
+
+static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
+ pa_rtsp_client *c = userdata;
+ union {
+ struct sockaddr sa;
+ struct sockaddr_in in;
+ struct sockaddr_in6 in6;
+ } sa;
+ socklen_t sa_len = sizeof(sa);
+
+ pa_assert(sc);
+ pa_assert(c);
+ pa_assert(STATE_CONNECT == c->state);
+ pa_assert(c->sc == sc);
+ pa_socket_client_unref(c->sc);
+ c->sc = NULL;
+
+ if (!io) {
+ if (c->autoreconnect) {
+ struct timeval tv;
+
+ pa_log_warn("Connection to server %s:%d failed: %s - will try later", c->hostname, c->port, pa_cstrerror(errno));
+
+ if (!c->reconnect_event)
+ c->reconnect_event = c->mainloop->time_new(c->mainloop, pa_timeval_rtstore(&tv, pa_rtclock_now() + RECONNECT_INTERVAL, true), reconnect_cb, c);
+ else
+ c->mainloop->time_restart(c->reconnect_event, pa_timeval_rtstore(&tv, pa_rtclock_now() + RECONNECT_INTERVAL, true));
+ } else {
+ pa_log("Connection to server %s:%d failed: %s", c->hostname, c->port, pa_cstrerror(errno));
+ }
+ return;
+ }
+ pa_assert(!c->ioline);
+
+ c->ioline = pa_ioline_new(io);
+ pa_ioline_set_callback(c->ioline, line_callback, c);
+
+ /* Get the local IP address for use externally */
+ if (0 == getsockname(pa_iochannel_get_recv_fd(io), &sa.sa, &sa_len)) {
+ char buf[INET6_ADDRSTRLEN];
+ const char *res = NULL;
+
+ if (AF_INET == sa.sa.sa_family) {
+ if ((res = inet_ntop(sa.sa.sa_family, &sa.in.sin_addr, buf, sizeof(buf)))) {
+ c->localip = pa_xstrdup(res);
+ }
+ } else if (AF_INET6 == sa.sa.sa_family) {
+ if ((res = inet_ntop(AF_INET6, &sa.in6.sin6_addr, buf, sizeof(buf)))) {
+ c->localip = pa_xstrdup(res);
+ }
+ }
+ }
+ pa_log_debug("Established RTSP connection from local ip %s", c->localip);
+
+ if (c->callback)
+ c->callback(c, c->state, STATUS_OK, NULL, c->userdata);
+}
+
+int pa_rtsp_connect(pa_rtsp_client *c) {
+ pa_assert(c);
+ pa_assert(!c->sc);
+
+ pa_xfree(c->session);
+ c->session = NULL;
+
+ pa_log_debug("Attempting to connect to server '%s:%d'", c->hostname, c->port);
+ if (!(c->sc = pa_socket_client_new_string(c->mainloop, true, c->hostname, c->port))) {
+ pa_log("failed to connect to server '%s:%d'", c->hostname, c->port);
+ return -1;
+ }
+
+ pa_socket_client_set_callback(c->sc, on_connection, c);
+ c->waiting = 1;
+ c->state = STATE_CONNECT;
+ c->status = STATUS_NO_RESPONSE;
+ return 0;
+}
+
+void pa_rtsp_set_callback(pa_rtsp_client *c, pa_rtsp_cb_t callback, void *userdata) {
+ pa_assert(c);
+
+ c->callback = callback;
+ c->userdata = userdata;
+}
+
+void pa_rtsp_disconnect(pa_rtsp_client *c) {
+ pa_assert(c);
+
+ if (c->ioline) {
+ pa_ioline_close(c->ioline);
+ pa_ioline_unref(c->ioline);
+ }
+ c->ioline = NULL;
+}
+
+const char* pa_rtsp_localip(pa_rtsp_client *c) {
+ pa_assert(c);
+
+ return c->localip;
+}
+
+uint32_t pa_rtsp_serverport(pa_rtsp_client *c) {
+ pa_assert(c);
+
+ return c->rtp_port;
+}
+
+bool pa_rtsp_exec_ready(const pa_rtsp_client *c) {
+ pa_assert(c);
+
+ return c->url != NULL && c->ioline != NULL;
+}
+
+void pa_rtsp_set_url(pa_rtsp_client *c, const char *url) {
+ pa_assert(c);
+
+ c->url = pa_xstrdup(url);
+}
+
+bool pa_rtsp_has_header(pa_rtsp_client *c, const char *key) {
+ pa_assert(c);
+ pa_assert(key);
+
+ return pa_headerlist_contains(c->headers, key);
+}
+
+void pa_rtsp_add_header(pa_rtsp_client *c, const char *key, const char *value) {
+ pa_assert(c);
+ pa_assert(key);
+ pa_assert(value);
+
+ pa_headerlist_puts(c->headers, key, value);
+}
+
+const char* pa_rtsp_get_header(pa_rtsp_client *c, const char *key) {
+ pa_assert(c);
+ pa_assert(key);
+
+ return pa_headerlist_gets(c->headers, key);
+}
+
+void pa_rtsp_remove_header(pa_rtsp_client *c, const char *key) {
+ pa_assert(c);
+ pa_assert(key);
+
+ pa_headerlist_remove(c->headers, key);
+}
+
+static int rtsp_exec(pa_rtsp_client *c, const char *cmd,
+ const char *content_type, const char *content,
+ int expect_response,
+ pa_headerlist *headers) {
+ pa_strbuf *buf;
+ char *hdrs;
+
+ pa_assert(c);
+ pa_assert(c->url);
+ pa_assert(cmd);
+ pa_assert(c->ioline);
+
+ pa_log_debug("Sending command: %s", cmd);
+
+ buf = pa_strbuf_new();
+ pa_strbuf_printf(buf, "%s %s RTSP/1.0\r\nCSeq: %d\r\n", cmd, c->url, ++c->cseq);
+ if (c->session)
+ pa_strbuf_printf(buf, "Session: %s\r\n", c->session);
+
+ /* Add the headers */
+ if (headers) {
+ hdrs = pa_headerlist_to_string(headers);
+ pa_strbuf_puts(buf, hdrs);
+ pa_xfree(hdrs);
+ }
+
+ if (content_type && content) {
+ pa_strbuf_printf(buf, "Content-Type: %s\r\nContent-Length: %d\r\n",
+ content_type, (int)strlen(content));
+ }
+
+ pa_strbuf_printf(buf, "User-Agent: %s\r\n", c->useragent);
+
+ if (c->headers) {
+ hdrs = pa_headerlist_to_string(c->headers);
+ pa_strbuf_puts(buf, hdrs);
+ pa_xfree(hdrs);
+ }
+
+ pa_strbuf_puts(buf, "\r\n");
+
+ if (content_type && content) {
+ pa_strbuf_puts(buf, content);
+ }
+
+ /* Our packet is created... now we can send it :) */
+ hdrs = pa_strbuf_to_string_free(buf);
+ /*pa_log_debug("Submitting request:");
+ pa_log_debug(hdrs);*/
+ pa_ioline_puts(c->ioline, hdrs);
+ pa_xfree(hdrs);
+ /* The command is sent we can configure the rtsp client structure to handle a new answer */
+ c->waiting = 1;
+ return 0;
+}
+
+int pa_rtsp_options(pa_rtsp_client *c) {
+ char *url;
+ int rv;
+
+ pa_assert(c);
+
+ url = c->url;
+ c->state = STATE_OPTIONS;
+
+ c->url = (char *)"*";
+ rv = rtsp_exec(c, "OPTIONS", NULL, NULL, 0, NULL);
+
+ c->url = url;
+ return rv;
+}
+
+int pa_rtsp_announce(pa_rtsp_client *c, const char *sdp) {
+ int rv;
+
+ pa_assert(c);
+
+ if (!sdp)
+ return -1;
+
+ c->state = STATE_ANNOUNCE;
+ rv = rtsp_exec(c, "ANNOUNCE", "application/sdp", sdp, 1, NULL);
+
+ return rv;
+}
+
+int pa_rtsp_setup(pa_rtsp_client *c, const char *transport) {
+ pa_headerlist *headers;
+ int rv;
+
+ pa_assert(c);
+
+ headers = pa_headerlist_new();
+ if (!transport)
+ pa_headerlist_puts(headers, "Transport", "RTP/AVP/TCP;unicast;interleaved=0-1;mode=record");
+ else
+ pa_headerlist_puts(headers, "Transport", transport);
+
+ c->state = STATE_SETUP;
+ rv = rtsp_exec(c, "SETUP", NULL, NULL, 1, headers);
+
+ pa_headerlist_free(headers);
+ return rv;
+}
+
+int pa_rtsp_record(pa_rtsp_client *c, uint16_t *seq, uint32_t *rtptime) {
+ pa_headerlist *headers;
+ char *info;
+ int rv;
+
+ pa_assert(c);
+
+ if (!c->session) {
+ /* No session in progress */
+ return -1;
+ }
+
+ pa_random(seq, sizeof(*seq));
+ pa_random(rtptime, sizeof(*rtptime));
+
+ headers = pa_headerlist_new();
+ pa_headerlist_puts(headers, "Range", "npt=0-");
+ info = pa_sprintf_malloc("seq=%u;rtptime=%u", *seq, *rtptime);
+ pa_headerlist_puts(headers, "RTP-Info", info);
+ pa_xfree(info);
+
+ c->state = STATE_RECORD;
+ rv = rtsp_exec(c, "RECORD", NULL, NULL, 1, headers);
+
+ pa_headerlist_free(headers);
+ return rv;
+}
+
+int pa_rtsp_setparameter(pa_rtsp_client *c, const char *param) {
+ int rv;
+
+ pa_assert(c);
+
+ if (!param)
+ return -1;
+
+ c->state = STATE_SET_PARAMETER;
+ rv = rtsp_exec(c, "SET_PARAMETER", "text/parameters", param, 1, NULL);
+
+ return rv;
+}
+
+int pa_rtsp_flush(pa_rtsp_client *c, uint16_t seq, uint32_t rtptime) {
+ pa_headerlist* headers;
+ char *info;
+ int rv;
+
+ pa_assert(c);
+
+ headers = pa_headerlist_new();
+ info = pa_sprintf_malloc("seq=%u;rtptime=%u", seq, rtptime);
+ pa_headerlist_puts(headers, "RTP-Info", info);
+ pa_xfree(info);
+
+ c->state = STATE_FLUSH;
+ rv = rtsp_exec(c, "FLUSH", NULL, NULL, 1, headers);
+
+ pa_headerlist_free(headers);
+ return rv;
+}
+
+int pa_rtsp_teardown(pa_rtsp_client *c) {
+ int rv;
+
+ pa_assert(c);
+
+ c->state = STATE_TEARDOWN;
+ rv = rtsp_exec(c, "TEARDOWN", NULL, NULL, 0, NULL);
+
+ return rv;
+}
diff --git a/src/modules/rtp/rtsp_client.h b/src/modules/rtp/rtsp_client.h
new file mode 100644
index 0000000..2593085
--- /dev/null
+++ b/src/modules/rtp/rtsp_client.h
@@ -0,0 +1,83 @@
+#ifndef foortspclienthfoo
+#define foortspclienthfoo
+
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2008 Colin Guthrie
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#include <inttypes.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <netdb.h>
+
+#include <pulsecore/socket-client.h>
+#include <pulse/mainloop-api.h>
+
+#include "headerlist.h"
+
+typedef struct pa_rtsp_client pa_rtsp_client;
+
+typedef enum pa_rtsp_state {
+ STATE_CONNECT,
+ STATE_OPTIONS,
+ STATE_ANNOUNCE,
+ STATE_SETUP,
+ STATE_RECORD,
+ STATE_SET_PARAMETER,
+ STATE_FLUSH,
+ STATE_TEARDOWN,
+ STATE_DISCONNECTED
+} pa_rtsp_state_t;
+
+typedef enum pa_rtsp_status {
+ STATUS_OK = 200,
+ STATUS_BAD_REQUEST = 400,
+ STATUS_UNAUTHORIZED = 401,
+ STATUS_NO_RESPONSE = 444,
+ STATUS_INTERNAL_ERROR = 500
+} pa_rtsp_status_t;
+
+typedef void (*pa_rtsp_cb_t)(pa_rtsp_client *c, pa_rtsp_state_t state, pa_rtsp_status_t code, pa_headerlist *headers, void *userdata);
+
+pa_rtsp_client* pa_rtsp_client_new(pa_mainloop_api *mainloop, const char *hostname, uint16_t port, const char *useragent, bool autoreconnect);
+void pa_rtsp_client_free(pa_rtsp_client *c);
+
+int pa_rtsp_connect(pa_rtsp_client *c);
+void pa_rtsp_set_callback(pa_rtsp_client *c, pa_rtsp_cb_t callback, void *userdata);
+void pa_rtsp_disconnect(pa_rtsp_client *c);
+
+const char* pa_rtsp_localip(pa_rtsp_client *c);
+uint32_t pa_rtsp_serverport(pa_rtsp_client *c);
+bool pa_rtsp_exec_ready(const pa_rtsp_client *c);
+
+void pa_rtsp_set_url(pa_rtsp_client *c, const char *url);
+
+bool pa_rtsp_has_header(pa_rtsp_client *c, const char *key);
+void pa_rtsp_add_header(pa_rtsp_client *c, const char *key, const char *value);
+const char* pa_rtsp_get_header(pa_rtsp_client *c, const char *key);
+void pa_rtsp_remove_header(pa_rtsp_client *c, const char *key);
+
+int pa_rtsp_options(pa_rtsp_client *c);
+int pa_rtsp_announce(pa_rtsp_client *c, const char *sdp);
+int pa_rtsp_setup(pa_rtsp_client *c, const char *transport);
+int pa_rtsp_record(pa_rtsp_client *c, uint16_t *seq, uint32_t *rtptime);
+int pa_rtsp_setparameter(pa_rtsp_client *c, const char *param);
+int pa_rtsp_flush(pa_rtsp_client *c, uint16_t seq, uint32_t rtptime);
+int pa_rtsp_teardown(pa_rtsp_client *c);
+
+#endif
diff --git a/src/modules/rtp/sap.c b/src/modules/rtp/sap.c
new file mode 100644
index 0000000..7fb1a38
--- /dev/null
+++ b/src/modules/rtp/sap.c
@@ -0,0 +1,235 @@
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2006 Lennart Poettering
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <errno.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/ioctl.h>
+
+#ifdef HAVE_SYS_FILIO_H
+#include <sys/filio.h>
+#endif
+
+#ifdef HAVE_SYS_UIO_H
+#include <sys/uio.h>
+#endif
+
+#include <pulse/xmalloc.h>
+
+#include <pulsecore/core-error.h>
+#include <pulsecore/core-util.h>
+#include <pulsecore/log.h>
+#include <pulsecore/macro.h>
+#include <pulsecore/arpa-inet.h>
+
+#include "sap.h"
+#include "sdp.h"
+
+#define MIME_TYPE "application/sdp"
+
+pa_sap_context* pa_sap_context_init_send(pa_sap_context *c, int fd, char *sdp_data) {
+ pa_assert(c);
+ pa_assert(fd >= 0);
+ pa_assert(sdp_data);
+
+ c->fd = fd;
+ c->sdp_data = sdp_data;
+ c->msg_id_hash = (uint16_t) (rand()*rand());
+
+ return c;
+}
+
+void pa_sap_context_destroy(pa_sap_context *c) {
+ pa_assert(c);
+
+ pa_close(c->fd);
+ pa_xfree(c->sdp_data);
+}
+
+int pa_sap_send(pa_sap_context *c, bool goodbye) {
+ uint32_t header;
+ struct sockaddr_storage sa_buf;
+ struct sockaddr *sa = (struct sockaddr*) &sa_buf;
+ socklen_t salen = sizeof(sa_buf);
+ struct iovec iov[4];
+ struct msghdr m;
+ ssize_t k;
+
+ if (getsockname(c->fd, sa, &salen) < 0) {
+ pa_log("getsockname() failed: %s\n", pa_cstrerror(errno));
+ return -1;
+ }
+
+#ifdef HAVE_IPV6
+ pa_assert(sa->sa_family == AF_INET || sa->sa_family == AF_INET6);
+#else
+ pa_assert(sa->sa_family == AF_INET);
+#endif
+
+ header = htonl(((uint32_t) 1 << 29) |
+#ifdef HAVE_IPV6
+ (sa->sa_family == AF_INET6 ? (uint32_t) 1 << 28 : 0) |
+#endif
+ (goodbye ? (uint32_t) 1 << 26 : 0) |
+ (c->msg_id_hash));
+
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof(header);
+
+ if (sa->sa_family == AF_INET) {
+ iov[1].iov_base = (void*) &((struct sockaddr_in*) sa)->sin_addr;
+ iov[1].iov_len = 4U;
+#ifdef HAVE_IPV6
+ } else {
+ iov[1].iov_base = (void*) &((struct sockaddr_in6*) sa)->sin6_addr;
+ iov[1].iov_len = 16U;
+#endif
+ }
+
+ iov[2].iov_base = (char*) MIME_TYPE;
+ iov[2].iov_len = sizeof(MIME_TYPE);
+
+ iov[3].iov_base = c->sdp_data;
+ iov[3].iov_len = strlen(c->sdp_data);
+
+ m.msg_name = NULL;
+ m.msg_namelen = 0;
+ m.msg_iov = iov;
+ m.msg_iovlen = 4;
+ m.msg_control = NULL;
+ m.msg_controllen = 0;
+ m.msg_flags = 0;
+
+ if ((k = sendmsg(c->fd, &m, MSG_DONTWAIT)) < 0)
+ pa_log_warn("sendmsg() failed: %s\n", pa_cstrerror(errno));
+
+ return (int) k;
+}
+
+pa_sap_context* pa_sap_context_init_recv(pa_sap_context *c, int fd) {
+ pa_assert(c);
+ pa_assert(fd >= 0);
+
+ c->fd = fd;
+ c->sdp_data = NULL;
+ return c;
+}
+
+int pa_sap_recv(pa_sap_context *c, bool *goodbye) {
+ struct msghdr m;
+ struct iovec iov;
+ int size;
+ char *buf = NULL, *e;
+ uint32_t header;
+ unsigned six, ac, k;
+ ssize_t r;
+
+ pa_assert(c);
+ pa_assert(goodbye);
+
+ if (ioctl(c->fd, FIONREAD, &size) < 0) {
+ pa_log_warn("FIONREAD failed: %s", pa_cstrerror(errno));
+ goto fail;
+ }
+
+ buf = pa_xnew(char, (unsigned) size+1);
+ buf[size] = 0;
+
+ iov.iov_base = buf;
+ iov.iov_len = (size_t) size;
+
+ m.msg_name = NULL;
+ m.msg_namelen = 0;
+ m.msg_iov = &iov;
+ m.msg_iovlen = 1;
+ m.msg_control = NULL;
+ m.msg_controllen = 0;
+ m.msg_flags = 0;
+
+ if ((r = recvmsg(c->fd, &m, 0)) != size) {
+ pa_log_warn("recvmsg() failed: %s", r < 0 ? pa_cstrerror(errno) : "size mismatch");
+ goto fail;
+ }
+
+ if (size < 4) {
+ pa_log_warn("SAP packet too short.");
+ goto fail;
+ }
+
+ memcpy(&header, buf, sizeof(uint32_t));
+ header = ntohl(header);
+
+ if (header >> 29 != 1) {
+ pa_log_warn("Unsupported SAP version.");
+ goto fail;
+ }
+
+ if ((header >> 25) & 1) {
+ pa_log_warn("Encrypted SAP not supported.");
+ goto fail;
+ }
+
+ if ((header >> 24) & 1) {
+ pa_log_warn("Compressed SAP not supported.");
+ goto fail;
+ }
+
+ six = (header >> 28) & 1U;
+ ac = (header >> 16) & 0xFFU;
+
+ k = 4 + (six ? 16U : 4U) + ac*4U;
+ if ((unsigned) size < k) {
+ pa_log_warn("SAP packet too short (AD).");
+ goto fail;
+ }
+
+ e = buf + k;
+ size -= (int) k;
+
+ if ((unsigned) size >= sizeof(MIME_TYPE) && pa_streq(e, MIME_TYPE)) {
+ e += sizeof(MIME_TYPE);
+ size -= (int) sizeof(MIME_TYPE);
+ } else if ((unsigned) size < sizeof(PA_SDP_HEADER)-1 || strncmp(e, PA_SDP_HEADER, sizeof(PA_SDP_HEADER)-1)) {
+ pa_log_warn("Invalid SDP header.");
+ goto fail;
+ }
+
+ if (c->sdp_data)
+ pa_xfree(c->sdp_data);
+
+ c->sdp_data = pa_xstrndup(e, (unsigned) size);
+ pa_xfree(buf);
+
+ *goodbye = !!((header >> 26) & 1);
+
+ return 0;
+
+fail:
+ pa_xfree(buf);
+
+ return -1;
+}
diff --git a/src/modules/rtp/sap.h b/src/modules/rtp/sap.h
new file mode 100644
index 0000000..becb4ec
--- /dev/null
+++ b/src/modules/rtp/sap.h
@@ -0,0 +1,44 @@
+#ifndef foosaphfoo
+#define foosaphfoo
+
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2006 Lennart Poettering
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#include <inttypes.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <pulsecore/memblockq.h>
+#include <pulsecore/memchunk.h>
+
+typedef struct pa_sap_context {
+ int fd;
+ char *sdp_data;
+
+ uint16_t msg_id_hash;
+} pa_sap_context;
+
+pa_sap_context* pa_sap_context_init_send(pa_sap_context *c, int fd, char *sdp_data);
+void pa_sap_context_destroy(pa_sap_context *c);
+
+int pa_sap_send(pa_sap_context *c, bool goodbye);
+
+pa_sap_context* pa_sap_context_init_recv(pa_sap_context *c, int fd);
+int pa_sap_recv(pa_sap_context *c, bool *goodbye);
+
+#endif
diff --git a/src/modules/rtp/sdp.c b/src/modules/rtp/sdp.c
new file mode 100644
index 0000000..6a2e0c9
--- /dev/null
+++ b/src/modules/rtp/sdp.c
@@ -0,0 +1,256 @@
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2006 Lennart Poettering
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <time.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <string.h>
+
+#include <pulse/xmalloc.h>
+#include <pulse/util.h>
+
+#include <pulsecore/core-util.h>
+#include <pulsecore/log.h>
+#include <pulsecore/macro.h>
+#include <pulsecore/arpa-inet.h>
+
+#include "sdp.h"
+#include "rtp.h"
+
+char *pa_sdp_build(int af, const void *src, const void *dst, const char *name, uint16_t port, uint8_t payload, const pa_sample_spec *ss) {
+ uint32_t ntp;
+ char buf_src[64], buf_dst[64], un[64];
+ const char *u, *f;
+
+ pa_assert(src);
+ pa_assert(dst);
+
+#ifdef HAVE_IPV6
+ pa_assert(af == AF_INET || af == AF_INET6);
+#else
+ pa_assert(af == AF_INET);
+#endif
+
+ pa_assert_se(f = pa_rtp_format_to_string(ss->format));
+
+ if (!(u = pa_get_user_name(un, sizeof(un))))
+ u = "-";
+
+ ntp = (uint32_t) time(NULL) + 2208988800U;
+
+ pa_assert_se(inet_ntop(af, src, buf_src, sizeof(buf_src)));
+ pa_assert_se(inet_ntop(af, dst, buf_dst, sizeof(buf_dst)));
+
+ return pa_sprintf_malloc(
+ PA_SDP_HEADER
+ "o=%s %lu 0 IN %s %s\n"
+ "s=%s\n"
+ "c=IN %s %s\n"
+ "t=%lu 0\n"
+ "a=recvonly\n"
+ "m=audio %u RTP/AVP %i\n"
+ "a=rtpmap:%i %s/%u/%u\n"
+ "a=type:broadcast\n",
+ u, (unsigned long) ntp, af == AF_INET ? "IP4" : "IP6", buf_src,
+ name,
+ af == AF_INET ? "IP4" : "IP6", buf_dst,
+ (unsigned long) ntp,
+ port, payload,
+ payload, f, ss->rate, ss->channels);
+}
+
+static pa_sample_spec *parse_sdp_sample_spec(pa_sample_spec *ss, char *c) {
+ unsigned rate, channels;
+ pa_assert(ss);
+ pa_assert(c);
+
+ if (pa_startswith(c, "L16/")) {
+ ss->format = PA_SAMPLE_S16BE;
+ c += 4;
+ } else
+ return NULL;
+
+ if (sscanf(c, "%u/%u", &rate, &channels) == 2) {
+ ss->rate = (uint32_t) rate;
+ ss->channels = (uint8_t) channels;
+ } else if (sscanf(c, "%u", &rate) == 2) {
+ ss->rate = (uint32_t) rate;
+ ss->channels = 1;
+ } else
+ return NULL;
+
+ if (!pa_sample_spec_valid(ss))
+ return NULL;
+
+ return ss;
+}
+
+pa_sdp_info *pa_sdp_parse(const char *t, pa_sdp_info *i, int is_goodbye) {
+ uint16_t port = 0;
+ bool ss_valid = false;
+
+ pa_assert(t);
+ pa_assert(i);
+
+ i->origin = i->session_name = NULL;
+ i->salen = 0;
+ i->payload = 255;
+
+ if (!pa_startswith(t, PA_SDP_HEADER)) {
+ pa_log("Failed to parse SDP data: invalid header.");
+ goto fail;
+ }
+
+ t += sizeof(PA_SDP_HEADER)-1;
+
+ while (*t) {
+ size_t l;
+
+ l = strcspn(t, "\n");
+
+ if (l <= 2) {
+ pa_log("Failed to parse SDP data: line too short: >%s<.", t);
+ goto fail;
+ }
+
+ if (pa_startswith(t, "o="))
+ i->origin = pa_xstrndup(t+2, l-2);
+ else if (pa_startswith(t, "s="))
+ i->session_name = pa_xstrndup(t+2, l-2);
+ else if (pa_startswith(t, "c=IN IP4 ")) {
+ char a[64];
+ size_t k;
+
+ k = l-8 > sizeof(a) ? sizeof(a) : l-8;
+
+ pa_strlcpy(a, t+9, k);
+ a[strcspn(a, "/")] = 0;
+
+ if (inet_pton(AF_INET, a, &((struct sockaddr_in*) &i->sa)->sin_addr) <= 0) {
+ pa_log("Failed to parse SDP data: bad address: >%s<.", a);
+ goto fail;
+ }
+
+ ((struct sockaddr_in*) &i->sa)->sin_family = AF_INET;
+ ((struct sockaddr_in*) &i->sa)->sin_port = 0;
+ i->salen = sizeof(struct sockaddr_in);
+#ifdef HAVE_IPV6
+ } else if (pa_startswith(t, "c=IN IP6 ")) {
+ char a[64];
+ size_t k;
+
+ k = l-8 > sizeof(a) ? sizeof(a) : l-8;
+
+ pa_strlcpy(a, t+9, k);
+ a[strcspn(a, "/")] = 0;
+
+ if (inet_pton(AF_INET6, a, &((struct sockaddr_in6*) &i->sa)->sin6_addr) <= 0) {
+ pa_log("Failed to parse SDP data: bad address: >%s<.", a);
+ goto fail;
+ }
+
+ ((struct sockaddr_in6*) &i->sa)->sin6_family = AF_INET6;
+ ((struct sockaddr_in6*) &i->sa)->sin6_port = 0;
+ i->salen = sizeof(struct sockaddr_in6);
+#endif
+ } else if (pa_startswith(t, "m=audio ")) {
+
+ if (i->payload > 127) {
+ int _port, _payload;
+
+ if (sscanf(t+8, "%i RTP/AVP %i", &_port, &_payload) == 2) {
+
+ if (_port <= 0 || _port > 0xFFFF) {
+ pa_log("Failed to parse SDP data: invalid port %i.", _port);
+ goto fail;
+ }
+
+ if (_payload < 0 || _payload > 127) {
+ pa_log("Failed to parse SDP data: invalid payload %i.", _payload);
+ goto fail;
+ }
+
+ port = (uint16_t) _port;
+ i->payload = (uint8_t) _payload;
+
+ if (pa_rtp_sample_spec_from_payload(i->payload, &i->sample_spec))
+ ss_valid = true;
+ }
+ }
+ } else if (pa_startswith(t, "a=rtpmap:")) {
+
+ if (i->payload <= 127) {
+ char c[64];
+ int _payload;
+ int len;
+
+ if (sscanf(t + 9, "%i %n", &_payload, &len) == 1) {
+ if (_payload < 0 || _payload > 127) {
+ pa_log("Failed to parse SDP data: invalid payload %i.", _payload);
+ goto fail;
+ }
+ if (_payload == i->payload) {
+ strncpy(c, t + 9 + len, 63);
+ c[63] = 0;
+ c[strcspn(c, "\n")] = 0;
+
+ if (parse_sdp_sample_spec(&i->sample_spec, c))
+ ss_valid = true;
+ }
+ }
+ }
+ }
+
+ t += l;
+
+ if (*t == '\n')
+ t++;
+ }
+
+ if (!i->origin || (!is_goodbye && (!i->salen || i->payload > 127 || !ss_valid || port == 0))) {
+ pa_log("Failed to parse SDP data: missing data.");
+ goto fail;
+ }
+
+ if (((struct sockaddr*) &i->sa)->sa_family == AF_INET)
+ ((struct sockaddr_in*) &i->sa)->sin_port = htons(port);
+ else
+ ((struct sockaddr_in6*) &i->sa)->sin6_port = htons(port);
+
+ return i;
+
+fail:
+ pa_xfree(i->origin);
+ pa_xfree(i->session_name);
+
+ return NULL;
+}
+
+void pa_sdp_info_destroy(pa_sdp_info *i) {
+ pa_assert(i);
+
+ pa_xfree(i->origin);
+ pa_xfree(i->session_name);
+}
diff --git a/src/modules/rtp/sdp.h b/src/modules/rtp/sdp.h
new file mode 100644
index 0000000..5e9b8fe
--- /dev/null
+++ b/src/modules/rtp/sdp.h
@@ -0,0 +1,48 @@
+#ifndef foosdphfoo
+#define foosdphfoo
+
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2006 Lennart Poettering
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#include <inttypes.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+
+#include <pulse/sample.h>
+
+#define PA_SDP_HEADER "v=0\n"
+
+typedef struct pa_sdp_info {
+ char *origin;
+ char *session_name;
+
+ struct sockaddr_storage sa;
+ socklen_t salen;
+
+ pa_sample_spec sample_spec;
+ uint8_t payload;
+} pa_sdp_info;
+
+char *pa_sdp_build(int af, const void *src, const void *dst, const char *name, uint16_t port, uint8_t payload, const pa_sample_spec *ss);
+
+pa_sdp_info *pa_sdp_parse(const char *t, pa_sdp_info *info, int is_goodbye);
+
+void pa_sdp_info_destroy(pa_sdp_info *i);
+
+#endif