diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 16:03:18 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 16:03:18 +0000 |
commit | 2dd5bc6a074165ddfbd57c4bd52c2d2dac8f47a1 (patch) | |
tree | 465b29cb405d3af0b0ad50c78e1dccc636594fec /src/modules/rtp | |
parent | Initial commit. (diff) | |
download | pulseaudio-2dd5bc6a074165ddfbd57c4bd52c2d2dac8f47a1.tar.xz pulseaudio-2dd5bc6a074165ddfbd57c4bd52c2d2dac8f47a1.zip |
Adding upstream version 14.2.upstream/14.2upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/modules/rtp/headerlist.c | 173 | ||||
-rw-r--r-- | src/modules/rtp/headerlist.h | 44 | ||||
-rw-r--r-- | src/modules/rtp/meson.build | 35 | ||||
-rw-r--r-- | src/modules/rtp/module-rtp-recv.c | 774 | ||||
-rw-r--r-- | src/modules/rtp/module-rtp-send.c | 547 | ||||
-rw-r--r-- | src/modules/rtp/rtp-common.c | 97 | ||||
-rw-r--r-- | src/modules/rtp/rtp-gstreamer.c | 665 | ||||
-rw-r--r-- | src/modules/rtp/rtp-native.c | 404 | ||||
-rw-r--r-- | src/modules/rtp/rtp.h | 56 | ||||
-rw-r--r-- | src/modules/rtp/rtsp_client.c | 643 | ||||
-rw-r--r-- | src/modules/rtp/rtsp_client.h | 83 | ||||
-rw-r--r-- | src/modules/rtp/sap.c | 235 | ||||
-rw-r--r-- | src/modules/rtp/sap.h | 44 | ||||
-rw-r--r-- | src/modules/rtp/sdp.c | 256 | ||||
-rw-r--r-- | src/modules/rtp/sdp.h | 48 |
15 files changed, 4104 insertions, 0 deletions
diff --git a/src/modules/rtp/headerlist.c b/src/modules/rtp/headerlist.c new file mode 100644 index 0000000..1fb0b41 --- /dev/null +++ b/src/modules/rtp/headerlist.c @@ -0,0 +1,173 @@ +/*** + This file is part of PulseAudio. + + Copyright 2008 Colin Guthrie + Copyright 2007 Lennart Poettering + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as + published by the Free Software Foundation; either version 2.1 of the + License, or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. +***/ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <string.h> + +#include <pulse/xmalloc.h> + +#include <pulsecore/hashmap.h> +#include <pulsecore/strbuf.h> +#include <pulsecore/core-util.h> + +#include "headerlist.h" + +struct header { + char *key; + void *value; + size_t nbytes; +}; + +#define MAKE_HASHMAP(p) ((pa_hashmap*) (p)) +#define MAKE_HEADERLIST(p) ((pa_headerlist*) (p)) + +static void header_free(struct header *hdr) { + pa_assert(hdr); + + pa_xfree(hdr->key); + pa_xfree(hdr->value); + pa_xfree(hdr); +} + +pa_headerlist* pa_headerlist_new(void) { + return MAKE_HEADERLIST(pa_hashmap_new_full(pa_idxset_string_hash_func, pa_idxset_string_compare_func, NULL, (pa_free_cb_t) header_free)); +} + +void pa_headerlist_free(pa_headerlist* p) { + pa_hashmap_free(MAKE_HASHMAP(p)); +} + +int pa_headerlist_puts(pa_headerlist *p, const char *key, const char *value) { + struct header *hdr; + bool add = false; + + pa_assert(p); + pa_assert(key); + + if (!(hdr = pa_hashmap_get(MAKE_HASHMAP(p), key))) { + hdr = pa_xnew(struct header, 1); + hdr->key = pa_xstrdup(key); + add = true; + } else + pa_xfree(hdr->value); + + hdr->value = pa_xstrdup(value); + hdr->nbytes = strlen(value)+1; + + if (add) + pa_hashmap_put(MAKE_HASHMAP(p), hdr->key, hdr); + + return 0; +} + +int pa_headerlist_putsappend(pa_headerlist *p, const char *key, const char *value) { + struct header *hdr; + bool add = false; + + pa_assert(p); + pa_assert(key); + + if (!(hdr = pa_hashmap_get(MAKE_HASHMAP(p), key))) { + hdr = pa_xnew(struct header, 1); + hdr->key = pa_xstrdup(key); + hdr->value = pa_xstrdup(value); + add = true; + } else { + void *newval = pa_sprintf_malloc("%s%s", (char*)hdr->value, value); + pa_xfree(hdr->value); + hdr->value = newval; + } + hdr->nbytes = strlen(hdr->value)+1; + + if (add) + pa_hashmap_put(MAKE_HASHMAP(p), hdr->key, hdr); + + return 0; +} + +const char *pa_headerlist_gets(pa_headerlist *p, const char *key) { + struct header *hdr; + + pa_assert(p); + pa_assert(key); + + if (!(hdr = pa_hashmap_get(MAKE_HASHMAP(p), key))) + return NULL; + + if (hdr->nbytes <= 0) + return NULL; + + if (((char*) hdr->value)[hdr->nbytes-1] != 0) + return NULL; + + if (strlen((char*) hdr->value) != hdr->nbytes-1) + return NULL; + + return (char*) hdr->value; +} + +int pa_headerlist_remove(pa_headerlist *p, const char *key) { + pa_assert(p); + pa_assert(key); + + return pa_hashmap_remove_and_free(MAKE_HASHMAP(p), key); +} + +const char *pa_headerlist_iterate(pa_headerlist *p, void **state) { + struct header *hdr; + + if (!(hdr = pa_hashmap_iterate(MAKE_HASHMAP(p), state, NULL))) + return NULL; + + return hdr->key; +} + +char *pa_headerlist_to_string(pa_headerlist *p) { + const char *key; + void *state = NULL; + pa_strbuf *buf; + + pa_assert(p); + + buf = pa_strbuf_new(); + + while ((key = pa_headerlist_iterate(p, &state))) { + + const char *v; + + if ((v = pa_headerlist_gets(p, key))) + pa_strbuf_printf(buf, "%s: %s\r\n", key, v); + } + + return pa_strbuf_to_string_free(buf); +} + +int pa_headerlist_contains(pa_headerlist *p, const char *key) { + pa_assert(p); + pa_assert(key); + + if (!(pa_hashmap_get(MAKE_HASHMAP(p), key))) + return 0; + + return 1; +} diff --git a/src/modules/rtp/headerlist.h b/src/modules/rtp/headerlist.h new file mode 100644 index 0000000..f38fb78 --- /dev/null +++ b/src/modules/rtp/headerlist.h @@ -0,0 +1,44 @@ +#ifndef foopulseheaderlisthfoo +#define foopulseheaderlisthfoo + +/*** + This file is part of PulseAudio. + + Copyright 2008 Colin Guthrie + Copyright 2007 Lennart Poettering + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as + published by the Free Software Foundation; either version 2.1 of the + License, or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. +***/ + +#include <pulsecore/macro.h> + +typedef struct pa_headerlist pa_headerlist; + +pa_headerlist* pa_headerlist_new(void); +void pa_headerlist_free(pa_headerlist* p); + +int pa_headerlist_puts(pa_headerlist *p, const char *key, const char *value); +int pa_headerlist_putsappend(pa_headerlist *p, const char *key, const char *value); + +const char *pa_headerlist_gets(pa_headerlist *p, const char *key); + +int pa_headerlist_remove(pa_headerlist *p, const char *key); + +const char *pa_headerlist_iterate(pa_headerlist *p, void **state); + +char *pa_headerlist_to_string(pa_headerlist *p); + +int pa_headerlist_contains(pa_headerlist *p, const char *key); + +#endif diff --git a/src/modules/rtp/meson.build b/src/modules/rtp/meson.build new file mode 100644 index 0000000..119cf08 --- /dev/null +++ b/src/modules/rtp/meson.build @@ -0,0 +1,35 @@ +librtp_sources = [ + 'rtp-common.c', + 'sdp.c', + 'sap.c', + 'rtsp_client.c', + 'headerlist.c', +] + +librtp_headers = [ + 'rtp.h', + 'sdp.h', + 'sap.h', + 'rtsp_client.h', + 'headerlist.h', +] + +if have_gstreamer + librtp_sources += 'rtp-gstreamer.c' +else + librtp_sources += 'rtp-native.c' +endif + +librtp = shared_library('rtp', + librtp_sources, + librtp_headers, + c_args : [pa_c_args, server_c_args], + link_args : [nodelete_link_args], + include_directories : [configinc, topinc], + dependencies : [libpulse_dep, libpulsecommon_dep, libpulsecore_dep, libatomic_ops_dep, gst_dep, gstapp_dep, gstrtp_dep, gio_dep], + install : true, + install_rpath : privlibdir, + install_dir : modlibexecdir, +) + +librtp_dep = declare_dependency(link_with: librtp) diff --git a/src/modules/rtp/module-rtp-recv.c b/src/modules/rtp/module-rtp-recv.c new file mode 100644 index 0000000..a9b42bb --- /dev/null +++ b/src/modules/rtp/module-rtp-recv.c @@ -0,0 +1,774 @@ + +/*** + This file is part of PulseAudio. + + Copyright 2006 Lennart Poettering + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; either version 2.1 of the License, + or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. +***/ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <stdio.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <errno.h> +#include <string.h> +#include <unistd.h> +#include <math.h> + +#include <pulse/rtclock.h> +#include <pulse/timeval.h> +#include <pulse/xmalloc.h> + +#include <pulsecore/core-error.h> +#include <pulsecore/module.h> +#include <pulsecore/llist.h> +#include <pulsecore/sink.h> +#include <pulsecore/sink-input.h> +#include <pulsecore/memblockq.h> +#include <pulsecore/log.h> +#include <pulsecore/core-rtclock.h> +#include <pulsecore/core-util.h> +#include <pulsecore/modargs.h> +#include <pulsecore/namereg.h> +#include <pulsecore/sample-util.h> +#include <pulsecore/macro.h> +#include <pulsecore/socket-util.h> +#include <pulsecore/atomic.h> +#include <pulsecore/once.h> +#include <pulsecore/poll.h> +#include <pulsecore/arpa-inet.h> + +#include "rtp.h" +#include "sdp.h" +#include "sap.h" + +PA_MODULE_AUTHOR("Lennart Poettering"); +PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP"); +PA_MODULE_VERSION(PACKAGE_VERSION); +PA_MODULE_LOAD_ONCE(false); +PA_MODULE_USAGE( + "sink=<name of the sink> " + "sap_address=<multicast address to listen on> " + "latency_msec=<latency in ms> " +); + +#define SAP_PORT 9875 +#define DEFAULT_SAP_ADDRESS "224.0.0.56" +#define DEFAULT_LATENCY_MSEC 500 +#define MEMBLOCKQ_MAXLENGTH (1024*1024*40) +#define MAX_SESSIONS 16 +#define DEATH_TIMEOUT 20 +#define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC) + +static const char* const valid_modargs[] = { + "sink", + "sap_address", + "latency_msec", + NULL +}; + +struct session { + struct userdata *userdata; + PA_LLIST_FIELDS(struct session); + + pa_sink_input *sink_input; + pa_memblockq *memblockq; + + bool first_packet; + uint32_t offset; + + struct pa_sdp_info sdp_info; + + pa_rtp_context *rtp_context; + + pa_rtpoll_item *rtpoll_item; + + pa_atomic_t timestamp; + + pa_usec_t intended_latency; + pa_usec_t sink_latency; + + unsigned int base_rate; + pa_usec_t last_rate_update; + pa_usec_t last_latency; + double estimated_rate; + double avg_estimated_rate; +}; + +struct userdata { + pa_module *module; + pa_core *core; + + pa_sap_context sap_context; + pa_io_event* sap_event; + + pa_time_event *check_death_event; + + char *sink_name; + + PA_LLIST_HEAD(struct session, sessions); + pa_hashmap *by_origin; + int n_sessions; + + pa_usec_t latency; +}; + +static void session_free(struct session *s); + +/* Called from I/O thread context */ +static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { + struct session *s = PA_SINK_INPUT(o)->userdata; + + switch (code) { + case PA_SINK_INPUT_MESSAGE_GET_LATENCY: + *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec); + + /* Fall through, the default handler will add in the extra + * latency added by the resampler */ + break; + } + + return pa_sink_input_process_msg(o, code, data, offset, chunk); +} + +/* Called from I/O thread context */ +static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) { + struct session *s; + pa_sink_input_assert_ref(i); + pa_assert_se(s = i->userdata); + + if (pa_memblockq_peek(s->memblockq, chunk) < 0) + return -1; + + pa_memblockq_drop(s->memblockq, chunk->length); + + return 0; +} + +/* Called from I/O thread context */ +static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) { + struct session *s; + + pa_sink_input_assert_ref(i); + pa_assert_se(s = i->userdata); + + pa_memblockq_rewind(s->memblockq, nbytes); +} + +/* Called from I/O thread context */ +static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) { + struct session *s; + + pa_sink_input_assert_ref(i); + pa_assert_se(s = i->userdata); + + pa_memblockq_set_maxrewind(s->memblockq, nbytes); +} + +/* Called from main context */ +static void sink_input_kill(pa_sink_input* i) { + struct session *s; + pa_sink_input_assert_ref(i); + pa_assert_se(s = i->userdata); + + pa_hashmap_remove_and_free(s->userdata->by_origin, s->sdp_info.origin); +} + +/* Called from IO context */ +static void sink_input_suspend_within_thread(pa_sink_input* i, bool b) { + struct session *s; + pa_sink_input_assert_ref(i); + pa_assert_se(s = i->userdata); + + if (b) + pa_memblockq_flush_read(s->memblockq); + else + s->first_packet = false; +} + +/* Called from I/O thread context */ +static int rtpoll_work_cb(pa_rtpoll_item *i) { + pa_memchunk chunk; + uint32_t timestamp; + int64_t k, j, delta; + struct timeval now = { 0, 0 }; + struct session *s; + struct pollfd *p; + + pa_assert_se(s = pa_rtpoll_item_get_work_userdata(i)); + + p = pa_rtpoll_item_get_pollfd(i, NULL); + + if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) { + pa_log("poll() signalled bad revents."); + return -1; + } + + if ((p->revents & POLLIN) == 0) + return 0; + + p->revents = 0; + + if (pa_rtp_recv(s->rtp_context, &chunk, s->userdata->module->core->mempool, ×tamp, &now) < 0) + return 0; + + if (!PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) { + pa_memblock_unref(chunk.memblock); + return 0; + } + + if (!s->first_packet) { + s->first_packet = true; + s->offset = timestamp; + } + + /* Check whether there was a timestamp overflow */ + k = (int64_t) timestamp - (int64_t) s->offset; + j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) timestamp; + + if ((k < 0 ? -k : k) < (j < 0 ? -j : j)) + delta = k; + else + delta = j; + + pa_memblockq_seek(s->memblockq, delta * (int64_t) pa_rtp_context_get_frame_size(s->rtp_context), PA_SEEK_RELATIVE, + true); + + if (now.tv_sec == 0) { + PA_ONCE_BEGIN { + pa_log_warn("Using artificial time instead of timestamp"); + } PA_ONCE_END; + pa_rtclock_get(&now); + } else + pa_rtclock_from_wallclock(&now); + + if (pa_memblockq_push(s->memblockq, &chunk) < 0) { + pa_log_warn("Queue overrun"); + pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, true); + } + +/* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */ + + pa_memblock_unref(chunk.memblock); + + /* The next timestamp we expect */ + s->offset = timestamp + (uint32_t) (chunk.length / pa_rtp_context_get_frame_size(s->rtp_context)); + + pa_atomic_store(&s->timestamp, (int) now.tv_sec); + + if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) { + pa_usec_t wi, ri, render_delay, sink_delay = 0, latency; + uint32_t current_rate = s->sink_input->sample_spec.rate; + uint32_t new_rate; + double estimated_rate, alpha = 0.02; + + pa_log_debug("Updating sample rate"); + + wi = pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec); + ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec); + + pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri); + + sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink, false); + render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec); + + if (ri > render_delay+sink_delay) + ri -= render_delay+sink_delay; + else + ri = 0; + + if (wi < ri) + latency = 0; + else + latency = wi - ri; + + pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC); + + /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in + * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that + * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate + * T + * R̂ = ─────────────── Rⁿ . (1) + * T - (Lⁿ - Lⁿ⁻ⁱ) + * + * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂ + * is correct). But there is also the requirement to keep the buffer at a predefined target + * latency L̂. So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R + * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time + * aT the latency is reduced from Lⁿ to L̂. This strategy translates to the requirements + * ₐ R̂ - Rⁿ⁺ʲ a-j+1 j-1 + * Σ T ────────── = L̂ - Lⁿ with Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ . + * ʲ⁼ⁱ R̂ a a + * Solving for Rⁿ⁺ⁱ gives + * T - ²∕ₐ₊₁(L̂ - Lⁿ) + * Rⁿ⁺ⁱ = ───────────────── R̂ . (2) + * T + * In the code below a = 7 is used. + * + * Equation (1) is not directly used in (2), but instead an exponentially weighted average + * of the estimated rate R̂ is used. This average R̅ is defined as + * R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ . + * Because it is difficult to find a fixed value for the coefficient α such that the + * averaging is without significant lag but oscillations are filtered out, a heuristic is + * used. When the successive estimates R̂ⁿ do not change much then α→1, but when there is a + * sudden spike in the estimated rate α→0, such that the deviation is given little weight. + */ + estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency); + if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) { + double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate); + alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8); + } + s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate; + s->estimated_rate = estimated_rate; + pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha); + new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate); + s->last_latency = latency; + + if (new_rate < (uint32_t) (s->base_rate*0.8) || new_rate > (uint32_t) (s->base_rate*1.25)) { + pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", s->base_rate, new_rate); + new_rate = s->base_rate; + } else { + if (s->base_rate < new_rate + 20 && new_rate < s->base_rate + 20) + new_rate = s->base_rate; + /* Do the adjustment in small steps; 2‰ can be considered inaudible */ + if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) { + pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate); + new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002)); + } + } + s->sink_input->sample_spec.rate = new_rate; + + pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec)); + + pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate); + + pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate); + + s->last_rate_update = pa_timeval_load(&now); + } + + if (pa_memblockq_is_readable(s->memblockq) && + s->sink_input->thread_info.underrun_for > 0) { + pa_log_debug("Requesting rewind due to end of underrun"); + pa_sink_input_request_rewind(s->sink_input, + (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 : s->sink_input->thread_info.underrun_for), + false, true, false); + } + + return 1; +} + +/* Called from I/O thread context */ +static void sink_input_attach(pa_sink_input *i) { + struct session *s; + + pa_sink_input_assert_ref(i); + pa_assert_se(s = i->userdata); + + pa_assert(!s->rtpoll_item); + s->rtpoll_item = pa_rtp_context_get_rtpoll_item(s->rtp_context, i->sink->thread_info.rtpoll); + + pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb, s); +} + +/* Called from I/O thread context */ +static void sink_input_detach(pa_sink_input *i) { + struct session *s; + pa_sink_input_assert_ref(i); + pa_assert_se(s = i->userdata); + + pa_assert(s->rtpoll_item); + pa_rtpoll_item_free(s->rtpoll_item); + s->rtpoll_item = NULL; +} + +static int mcast_socket(const struct sockaddr* sa, socklen_t salen) { + int af, fd = -1, r, one; + + pa_assert(sa); + pa_assert(salen > 0); + + af = sa->sa_family; + if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) { + pa_log("Failed to create socket: %s", pa_cstrerror(errno)); + goto fail; + } + + pa_make_udp_socket_low_delay(fd); + +#ifdef SO_TIMESTAMP + one = 1; + if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) { + pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno)); + goto fail; + } +#else + pa_log("SO_TIMESTAMP unsupported on this platform"); + goto fail; +#endif + + one = 1; + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) { + pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno)); + goto fail; + } + + r = 0; + if (af == AF_INET) { + /* IPv4 multicast addresses are in the 224.0.0.0-239.255.255.255 range */ + static const uint32_t ipv4_mcast_mask = 0xe0000000; + + if ((ntohl(((const struct sockaddr_in*) sa)->sin_addr.s_addr) & ipv4_mcast_mask) == ipv4_mcast_mask) { + struct ip_mreq mr4; + memset(&mr4, 0, sizeof(mr4)); + mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr; + r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4)); + } +#ifdef HAVE_IPV6 + } else if (af == AF_INET6) { + /* IPv6 multicast addresses have 255 as the most significant byte */ + if (((const struct sockaddr_in6*) sa)->sin6_addr.s6_addr[0] == 0xff) { + struct ipv6_mreq mr6; + memset(&mr6, 0, sizeof(mr6)); + mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr; + r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6)); + } +#endif + } else + pa_assert_not_reached(); + + if (r < 0) { + pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno)); + goto fail; + } + + if (bind(fd, sa, salen) < 0) { + pa_log("bind() failed: %s", pa_cstrerror(errno)); + goto fail; + } + + return fd; + +fail: + if (fd >= 0) + close(fd); + + return -1; +} + +static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) { + struct session *s = NULL; + pa_sink *sink; + int fd = -1; + pa_memchunk silence; + pa_sink_input_new_data data; + struct timeval now; + + pa_assert(u); + pa_assert(sdp_info); + + if (u->n_sessions >= MAX_SESSIONS) { + pa_log("Session limit reached."); + goto fail; + } + + if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) { + pa_log("Sink does not exist."); + goto fail; + } + + pa_rtclock_get(&now); + + s = pa_xnew0(struct session, 1); + s->userdata = u; + s->first_packet = false; + s->sdp_info = *sdp_info; + s->rtpoll_item = NULL; + s->intended_latency = u->latency; + s->last_rate_update = pa_timeval_load(&now); + s->last_latency = u->latency; + pa_atomic_store(&s->timestamp, (int) now.tv_sec); + + if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0) + goto fail; + + pa_sink_input_new_data_init(&data); + pa_sink_input_new_data_set_sink(&data, sink, false, true); + data.driver = __FILE__; + pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream"); + pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME, + "RTP Stream%s%s%s", + sdp_info->session_name ? " (" : "", + sdp_info->session_name ? sdp_info->session_name : "", + sdp_info->session_name ? ")" : ""); + + if (sdp_info->session_name) + pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name); + pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin); + pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload); + data.module = u->module; + pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec); + data.flags = PA_SINK_INPUT_VARIABLE_RATE; + + pa_sink_input_new(&s->sink_input, u->module->core, &data); + pa_sink_input_new_data_done(&data); + + if (!s->sink_input) { + pa_log("Failed to create sink input."); + goto fail; + } + + s->base_rate = (double) s->sink_input->sample_spec.rate; + s->estimated_rate = (double) s->sink_input->sample_spec.rate; + s->avg_estimated_rate = (double) s->sink_input->sample_spec.rate; + + s->sink_input->userdata = s; + + s->sink_input->parent.process_msg = sink_input_process_msg; + s->sink_input->pop = sink_input_pop_cb; + s->sink_input->process_rewind = sink_input_process_rewind_cb; + s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb; + s->sink_input->kill = sink_input_kill; + s->sink_input->attach = sink_input_attach; + s->sink_input->detach = sink_input_detach; + s->sink_input->suspend_within_thread = sink_input_suspend_within_thread; + + pa_sink_input_get_silence(s->sink_input, &silence); + + s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2); + + if (s->intended_latency < s->sink_latency*2) + s->intended_latency = s->sink_latency*2; + + s->memblockq = pa_memblockq_new( + "module-rtp-recv memblockq", + 0, + MEMBLOCKQ_MAXLENGTH, + MEMBLOCKQ_MAXLENGTH, + &s->sink_input->sample_spec, + pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec), + 0, + 0, + &silence); + + pa_memblock_unref(silence.memblock); + + if (!(s->rtp_context = pa_rtp_context_new_recv(fd, sdp_info->payload, &s->sdp_info.sample_spec))) + goto fail; + + pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s); + u->n_sessions++; + PA_LLIST_PREPEND(struct session, s->userdata->sessions, s); + + pa_sink_input_put(s->sink_input); + + pa_log_info("New session '%s'", s->sdp_info.session_name); + + return s; + +fail: + pa_xfree(s); + + if (fd >= 0) + pa_close(fd); + + return NULL; +} + +static void session_free(struct session *s) { + pa_assert(s); + + pa_log_info("Freeing session '%s'", s->sdp_info.session_name); + + pa_sink_input_unlink(s->sink_input); + pa_sink_input_unref(s->sink_input); + + PA_LLIST_REMOVE(struct session, s->userdata->sessions, s); + pa_assert(s->userdata->n_sessions >= 1); + s->userdata->n_sessions--; + + pa_memblockq_free(s->memblockq); + pa_sdp_info_destroy(&s->sdp_info); + pa_rtp_context_free(s->rtp_context); + + pa_xfree(s); +} + +static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) { + struct userdata *u = userdata; + bool goodbye = false; + pa_sdp_info info; + struct session *s; + + pa_assert(m); + pa_assert(e); + pa_assert(u); + pa_assert(fd == u->sap_context.fd); + pa_assert(flags == PA_IO_EVENT_INPUT); + + if (pa_sap_recv(&u->sap_context, &goodbye) < 0) + return; + + if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye)) + return; + + if (goodbye) { + pa_hashmap_remove_and_free(u->by_origin, info.origin); + pa_sdp_info_destroy(&info); + } else { + + if (!(s = pa_hashmap_get(u->by_origin, info.origin))) { + if (!session_new(u, &info)) + pa_sdp_info_destroy(&info); + + } else { + struct timeval now; + pa_rtclock_get(&now); + pa_atomic_store(&s->timestamp, (int) now.tv_sec); + + pa_sdp_info_destroy(&info); + } + } +} + +static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) { + struct session *s, *n; + struct userdata *u = userdata; + struct timeval now; + + pa_assert(m); + pa_assert(t); + pa_assert(u); + + pa_rtclock_get(&now); + + pa_log_debug("Checking for dead streams ..."); + + for (s = u->sessions; s; s = n) { + int k; + n = s->next; + + k = pa_atomic_load(&s->timestamp); + + if (k + DEATH_TIMEOUT < now.tv_sec) + pa_hashmap_remove_and_free(u->by_origin, s->sdp_info.origin); + } + + /* Restart timer */ + pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC); +} + +int pa__init(pa_module*m) { + struct userdata *u; + pa_modargs *ma = NULL; + struct sockaddr_in sa4; +#ifdef HAVE_IPV6 + struct sockaddr_in6 sa6; +#endif + struct sockaddr *sa; + socklen_t salen; + const char *sap_address; + uint32_t latency_msec; + int fd = -1; + + pa_assert(m); + + if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { + pa_log("failed to parse module arguments"); + goto fail; + } + + sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS); + + if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) { + sa4.sin_family = AF_INET; + sa4.sin_port = htons(SAP_PORT); + sa = (struct sockaddr*) &sa4; + salen = sizeof(sa4); +#ifdef HAVE_IPV6 + } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) { + sa6.sin6_family = AF_INET6; + sa6.sin6_port = htons(SAP_PORT); + sa = (struct sockaddr*) &sa6; + salen = sizeof(sa6); +#endif + } else { + pa_log("Invalid SAP address '%s'", sap_address); + goto fail; + } + + latency_msec = DEFAULT_LATENCY_MSEC; + if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 1 || latency_msec > 300000) { + pa_log("Invalid latency specification"); + goto fail; + } + + if ((fd = mcast_socket(sa, salen)) < 0) + goto fail; + + m->userdata = u = pa_xnew(struct userdata, 1); + u->module = m; + u->core = m->core; + u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL)); + u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC; + + u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u); + pa_sap_context_init_recv(&u->sap_context, fd); + + PA_LLIST_HEAD_INIT(struct session, u->sessions); + u->n_sessions = 0; + u->by_origin = pa_hashmap_new_full(pa_idxset_string_hash_func, pa_idxset_string_compare_func, NULL, (pa_free_cb_t) session_free); + + u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u); + + pa_modargs_free(ma); + + return 0; + +fail: + if (ma) + pa_modargs_free(ma); + + if (fd >= 0) + pa_close(fd); + + return -1; +} + +void pa__done(pa_module*m) { + struct userdata *u; + + pa_assert(m); + + if (!(u = m->userdata)) + return; + + if (u->sap_event) + m->core->mainloop->io_free(u->sap_event); + + if (u->check_death_event) + m->core->mainloop->time_free(u->check_death_event); + + pa_sap_context_destroy(&u->sap_context); + + if (u->by_origin) + pa_hashmap_free(u->by_origin); + + pa_xfree(u->sink_name); + pa_xfree(u); +} diff --git a/src/modules/rtp/module-rtp-send.c b/src/modules/rtp/module-rtp-send.c new file mode 100644 index 0000000..5a4c6fc --- /dev/null +++ b/src/modules/rtp/module-rtp-send.c @@ -0,0 +1,547 @@ +/*** + This file is part of PulseAudio. + + Copyright 2006 Lennart Poettering + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; either version 2.1 of the License, + or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. +***/ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <stdio.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <errno.h> +#include <unistd.h> + +#include <pulse/rtclock.h> +#include <pulse/timeval.h> +#include <pulse/util.h> +#include <pulse/xmalloc.h> + +#include <pulsecore/core-error.h> +#include <pulsecore/module.h> +#include <pulsecore/source.h> +#include <pulsecore/source-output.h> +#include <pulsecore/memblockq.h> +#include <pulsecore/log.h> +#include <pulsecore/core-util.h> +#include <pulsecore/modargs.h> +#include <pulsecore/namereg.h> +#include <pulsecore/sample-util.h> +#include <pulsecore/macro.h> +#include <pulsecore/socket-util.h> +#include <pulsecore/arpa-inet.h> + +#include "rtp.h" +#include "sdp.h" +#include "sap.h" + +PA_MODULE_AUTHOR("Lennart Poettering"); +PA_MODULE_DESCRIPTION("Read data from source and send it to the network via RTP/SAP/SDP"); +PA_MODULE_VERSION(PACKAGE_VERSION); +PA_MODULE_LOAD_ONCE(false); +PA_MODULE_USAGE( + "source=<name of the source> " + "format=<sample format> " + "channels=<number of channels> " + "rate=<sample rate> " + "destination_ip=<destination IP address> " + "source_ip=<source IP address> " + "port=<port number> " + "mtu=<maximum transfer unit> " + "loop=<loopback to local host?> " + "ttl=<ttl value> " + "inhibit_auto_suspend=<always|never|only_with_non_monitor_sources>" + "stream_name=<name of the stream>" +); + +#define DEFAULT_PORT 46000 +#define DEFAULT_TTL 1 +#define SAP_PORT 9875 +#define DEFAULT_SOURCE_IP "0.0.0.0" +#define DEFAULT_DESTINATION_IP "224.0.0.56" +#define MEMBLOCKQ_MAXLENGTH (1024*170) +#define DEFAULT_MTU 1280 +#define SAP_INTERVAL (5*PA_USEC_PER_SEC) + +static const char* const valid_modargs[] = { + "source", + "format", + "channels", + "rate", + "destination", /* Compatbility */ + "destination_ip", + "source_ip", + "port", + "mtu" , + "loop", + "ttl", + "inhibit_auto_suspend", + "stream_name", + NULL +}; + +enum inhibit_auto_suspend { + INHIBIT_AUTO_SUSPEND_ALWAYS, + INHIBIT_AUTO_SUSPEND_NEVER, + INHIBIT_AUTO_SUSPEND_ONLY_WITH_NON_MONITOR_SOURCES +}; + +struct userdata { + pa_module *module; + + pa_source_output *source_output; + pa_memblockq *memblockq; + + pa_rtp_context *rtp_context; + pa_sap_context sap_context; + + pa_time_event *sap_event; + + enum inhibit_auto_suspend inhibit_auto_suspend; +}; + +/* Called from I/O thread context */ +static int source_output_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { + struct userdata *u; + pa_assert_se(u = PA_SOURCE_OUTPUT(o)->userdata); + + switch (code) { + case PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY: + *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(u->memblockq), &u->source_output->sample_spec); + + /* Fall through, the default handler will add in the extra + * latency added by the resampler */ + break; + } + + return pa_source_output_process_msg(o, code, data, offset, chunk); +} + +/* Called from I/O thread context */ +static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) { + struct userdata *u; + pa_source_output_assert_ref(o); + pa_assert_se(u = o->userdata); + + if (pa_memblockq_push(u->memblockq, chunk) < 0) { + pa_log_warn("Failed to push chunk into memblockq."); + return; + } + + pa_rtp_send(u->rtp_context, u->memblockq); +} + +static pa_source_output_flags_t get_dont_inhibit_auto_suspend_flag(pa_source *source, + enum inhibit_auto_suspend inhibit_auto_suspend) { + pa_assert(source); + + switch (inhibit_auto_suspend) { + case INHIBIT_AUTO_SUSPEND_ALWAYS: + return 0; + + case INHIBIT_AUTO_SUSPEND_NEVER: + return PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND; + + case INHIBIT_AUTO_SUSPEND_ONLY_WITH_NON_MONITOR_SOURCES: + return source->monitor_of ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0; + } + + pa_assert_not_reached(); +} + +/* Called from the main thread. */ +static void source_output_moving_cb(pa_source_output *o, pa_source *dest) { + struct userdata *u; + + pa_assert(o); + + u = o->userdata; + + if (!dest) + return; + + o->flags &= ~PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND; + o->flags |= get_dont_inhibit_auto_suspend_flag(dest, u->inhibit_auto_suspend); +} + +/* Called from main context */ +static void source_output_kill_cb(pa_source_output* o) { + struct userdata *u; + pa_source_output_assert_ref(o); + pa_assert_se(u = o->userdata); + + pa_module_unload_request(u->module, true); + + pa_source_output_unlink(u->source_output); + pa_source_output_unref(u->source_output); + u->source_output = NULL; +} + +static void sap_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) { + struct userdata *u = userdata; + + pa_assert(m); + pa_assert(t); + pa_assert(u); + + pa_sap_send(&u->sap_context, 0); + + pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + SAP_INTERVAL); +} + +int pa__init(pa_module*m) { + struct userdata *u; + pa_modargs *ma = NULL; + const char *dst_addr; + const char *src_addr; + uint32_t port = DEFAULT_PORT, mtu; + uint32_t ttl = DEFAULT_TTL; + sa_family_t af; + int fd = -1, sap_fd = -1; + pa_source *s; + pa_sample_spec ss; + pa_channel_map cm; + struct sockaddr_in dst_sa4, dst_sap_sa4, src_sa4, src_sap_sa4; +#ifdef HAVE_IPV6 + struct sockaddr_in6 dst_sa6, dst_sap_sa6, src_sa6, src_sap_sa6; +#endif + struct sockaddr_storage sa_dst; + pa_source_output *o = NULL; + uint8_t payload; + char *p; + int r, j; + socklen_t k; + char hn[128], *n; + bool loop = false; + enum inhibit_auto_suspend inhibit_auto_suspend = INHIBIT_AUTO_SUSPEND_ONLY_WITH_NON_MONITOR_SOURCES; + const char *inhibit_auto_suspend_str; + pa_source_output_new_data data; + + pa_assert(m); + + if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { + pa_log("Failed to parse module arguments"); + goto fail; + } + + if (!(s = pa_namereg_get(m->core, pa_modargs_get_value(ma, "source", NULL), PA_NAMEREG_SOURCE))) { + pa_log("Source does not exist."); + goto fail; + } + + if (pa_modargs_get_value_boolean(ma, "loop", &loop) < 0) { + pa_log("Failed to parse \"loop\" parameter."); + goto fail; + } + + if ((inhibit_auto_suspend_str = pa_modargs_get_value(ma, "inhibit_auto_suspend", NULL))) { + if (pa_streq(inhibit_auto_suspend_str, "always")) + inhibit_auto_suspend = INHIBIT_AUTO_SUSPEND_ALWAYS; + else if (pa_streq(inhibit_auto_suspend_str, "never")) + inhibit_auto_suspend = INHIBIT_AUTO_SUSPEND_NEVER; + else if (pa_streq(inhibit_auto_suspend_str, "only_with_non_monitor_sources")) + inhibit_auto_suspend = INHIBIT_AUTO_SUSPEND_ONLY_WITH_NON_MONITOR_SOURCES; + else { + pa_log("Failed to parse the \"inhibit_auto_suspend\" parameter."); + goto fail; + } + } + + ss = s->sample_spec; + pa_rtp_sample_spec_fixup(&ss); + cm = s->channel_map; + if (pa_modargs_get_sample_spec(ma, &ss) < 0) { + pa_log("Failed to parse sample specification"); + goto fail; + } + + if (!pa_rtp_sample_spec_valid(&ss)) { + pa_log("Specified sample type not compatible with RTP"); + goto fail; + } + + if (ss.channels != cm.channels) + pa_channel_map_init_auto(&cm, ss.channels, PA_CHANNEL_MAP_AIFF); + + payload = pa_rtp_payload_from_sample_spec(&ss); + + mtu = (uint32_t) pa_frame_align(DEFAULT_MTU, &ss); + + if (pa_modargs_get_value_u32(ma, "mtu", &mtu) < 0 || mtu < 1 || mtu % pa_frame_size(&ss) != 0) { + pa_log("Invalid MTU."); + goto fail; + } + + port = DEFAULT_PORT + ((uint32_t) (rand() % 512) << 1); + if (pa_modargs_get_value_u32(ma, "port", &port) < 0 || port < 1 || port > 0xFFFF) { + pa_log("port= expects a numerical argument between 1 and 65535."); + goto fail; + } + + if (port & 1) + pa_log_warn("Port number not even as suggested in RFC3550!"); + + if (pa_modargs_get_value_u32(ma, "ttl", &ttl) < 0 || ttl < 1 || ttl > 0xFF) { + pa_log("ttl= expects a numerical argument between 1 and 255."); + goto fail; + } + + src_addr = pa_modargs_get_value(ma, "source_ip", DEFAULT_SOURCE_IP); + + if (inet_pton(AF_INET, src_addr, &src_sa4.sin_addr) > 0) { + src_sa4.sin_family = af = AF_INET; + src_sa4.sin_port = htons(0); + memset(&src_sa4.sin_zero, 0, sizeof(src_sa4.sin_zero)); + src_sap_sa4 = src_sa4; +#ifdef HAVE_IPV6 + } else if (inet_pton(AF_INET6, src_addr, &src_sa6.sin6_addr) > 0) { + src_sa6.sin6_family = af = AF_INET6; + src_sa6.sin6_port = htons(0); + src_sa6.sin6_flowinfo = 0; + src_sa6.sin6_scope_id = 0; + src_sap_sa6 = src_sa6; +#endif + } else { + pa_log("Invalid source address '%s'", src_addr); + goto fail; + } + + dst_addr = pa_modargs_get_value(ma, "destination", NULL); + if (dst_addr == NULL) + dst_addr = pa_modargs_get_value(ma, "destination_ip", DEFAULT_DESTINATION_IP); + + if (inet_pton(AF_INET, dst_addr, &dst_sa4.sin_addr) > 0) { + dst_sa4.sin_family = af = AF_INET; + dst_sa4.sin_port = htons((uint16_t) port); + memset(&dst_sa4.sin_zero, 0, sizeof(dst_sa4.sin_zero)); + dst_sap_sa4 = dst_sa4; + dst_sap_sa4.sin_port = htons(SAP_PORT); +#ifdef HAVE_IPV6 + } else if (inet_pton(AF_INET6, dst_addr, &dst_sa6.sin6_addr) > 0) { + dst_sa6.sin6_family = af = AF_INET6; + dst_sa6.sin6_port = htons((uint16_t) port); + dst_sa6.sin6_flowinfo = 0; + dst_sa6.sin6_scope_id = 0; + dst_sap_sa6 = dst_sa6; + dst_sap_sa6.sin6_port = htons(SAP_PORT); +#endif + } else { + pa_log("Invalid destination '%s'", dst_addr); + goto fail; + } + + if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) { + pa_log("socket() failed: %s", pa_cstrerror(errno)); + goto fail; + } + + if (af == AF_INET && bind(fd, (struct sockaddr*) &src_sa4, sizeof(src_sa4)) < 0) { + pa_log("bind() failed: %s", pa_cstrerror(errno)); + goto fail; +#ifdef HAVE_IPV6 + } else if (af == AF_INET6 && bind(fd, (struct sockaddr*) &src_sa6, sizeof(src_sa6)) < 0) { + pa_log("bind() failed: %s", pa_cstrerror(errno)); + goto fail; +#endif + } + + if (af == AF_INET && connect(fd, (struct sockaddr*) &dst_sa4, sizeof(dst_sa4)) < 0) { + pa_log("connect() failed: %s", pa_cstrerror(errno)); + goto fail; +#ifdef HAVE_IPV6 + } else if (af == AF_INET6 && connect(fd, (struct sockaddr*) &dst_sa6, sizeof(dst_sa6)) < 0) { + pa_log("connect() failed: %s", pa_cstrerror(errno)); + goto fail; +#endif + } + + if ((sap_fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) { + pa_log("socket() failed: %s", pa_cstrerror(errno)); + goto fail; + } + + if (af == AF_INET && bind(sap_fd, (struct sockaddr*) &src_sap_sa4, sizeof(src_sap_sa4)) < 0) { + pa_log("bind() failed: %s", pa_cstrerror(errno)); + goto fail; +#ifdef HAVE_IPV6 + } else if (af == AF_INET6 && bind(sap_fd, (struct sockaddr*) &src_sap_sa6, sizeof(src_sap_sa6)) < 0) { + pa_log("bind() failed: %s", pa_cstrerror(errno)); + goto fail; +#endif + } + + if (af == AF_INET && connect(sap_fd, (struct sockaddr*) &dst_sap_sa4, sizeof(dst_sap_sa4)) < 0) { + pa_log("connect() failed: %s", pa_cstrerror(errno)); + goto fail; +#ifdef HAVE_IPV6 + } else if (af == AF_INET6 && connect(sap_fd, (struct sockaddr*) &dst_sap_sa6, sizeof(dst_sap_sa6)) < 0) { + pa_log("connect() failed: %s", pa_cstrerror(errno)); + goto fail; +#endif + } + + j = loop; + if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_LOOP, &j, sizeof(j)) < 0 || + setsockopt(sap_fd, IPPROTO_IP, IP_MULTICAST_LOOP, &j, sizeof(j)) < 0) { + pa_log("IP_MULTICAST_LOOP failed: %s", pa_cstrerror(errno)); + goto fail; + } + + if (ttl != DEFAULT_TTL) { + int _ttl = (int) ttl; + + if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_TTL, &_ttl, sizeof(_ttl)) < 0) { + pa_log("IP_MULTICAST_TTL failed: %s", pa_cstrerror(errno)); + goto fail; + } + + if (setsockopt(sap_fd, IPPROTO_IP, IP_MULTICAST_TTL, &_ttl, sizeof(_ttl)) < 0) { + pa_log("IP_MULTICAST_TTL (sap) failed: %s", pa_cstrerror(errno)); + goto fail; + } + } + + /* If the socket queue is full, let's drop packets */ + pa_make_fd_nonblock(fd); + pa_make_udp_socket_low_delay(fd); + + pa_source_output_new_data_init(&data); + pa_proplist_sets(data.proplist, PA_PROP_MEDIA_NAME, "RTP Monitor Stream"); + pa_proplist_sets(data.proplist, "rtp.source", src_addr); + pa_proplist_sets(data.proplist, "rtp.destination", dst_addr); + pa_proplist_setf(data.proplist, "rtp.mtu", "%lu", (unsigned long) mtu); + pa_proplist_setf(data.proplist, "rtp.port", "%lu", (unsigned long) port); + pa_proplist_setf(data.proplist, "rtp.ttl", "%lu", (unsigned long) ttl); + data.driver = __FILE__; + data.module = m; + pa_source_output_new_data_set_source(&data, s, false, true); + pa_source_output_new_data_set_sample_spec(&data, &ss); + pa_source_output_new_data_set_channel_map(&data, &cm); + data.flags |= get_dont_inhibit_auto_suspend_flag(s, inhibit_auto_suspend); + + pa_source_output_new(&o, m->core, &data); + pa_source_output_new_data_done(&data); + + if (!o) { + pa_log("failed to create source output."); + goto fail; + } + + o->parent.process_msg = source_output_process_msg; + o->push = source_output_push_cb; + o->moving = source_output_moving_cb; + o->kill = source_output_kill_cb; + + pa_log_info("Configured source latency of %llu ms.", + (unsigned long long) pa_source_output_set_requested_latency(o, pa_bytes_to_usec(mtu, &o->sample_spec)) / PA_USEC_PER_MSEC); + + m->userdata = o->userdata = u = pa_xnew(struct userdata, 1); + u->module = m; + u->source_output = o; + + u->memblockq = pa_memblockq_new( + "module-rtp-send memblockq", + 0, + MEMBLOCKQ_MAXLENGTH, + MEMBLOCKQ_MAXLENGTH, + &ss, + 1, + 0, + 0, + NULL); + + k = sizeof(sa_dst); + pa_assert_se((r = getsockname(fd, (struct sockaddr*) &sa_dst, &k)) >= 0); + + n = pa_xstrdup(pa_modargs_get_value(ma, "stream_name", NULL)); + if (n == NULL) + n = pa_sprintf_malloc("PulseAudio RTP Stream on %s", pa_get_fqdn(hn, sizeof(hn))); + + if (af == AF_INET) { + p = pa_sdp_build(af, + (void*) &((struct sockaddr_in*) &sa_dst)->sin_addr, + (void*) &dst_sa4.sin_addr, + n, (uint16_t) port, payload, &ss); +#ifdef HAVE_IPV6 + } else { + p = pa_sdp_build(af, + (void*) &((struct sockaddr_in6*) &sa_dst)->sin6_addr, + (void*) &dst_sa6.sin6_addr, + n, (uint16_t) port, payload, &ss); +#endif + } + + pa_xfree(n); + + if (!(u->rtp_context = pa_rtp_context_new_send(fd, payload, mtu, &ss))) + goto fail; + pa_sap_context_init_send(&u->sap_context, sap_fd, p); + + pa_log_info("RTP stream initialized with mtu %u on %s:%u from %s ttl=%u, payload=%u", + mtu, dst_addr, port, src_addr, ttl, payload); + pa_log_info("SDP-Data:\n%s\nEOF", p); + + pa_sap_send(&u->sap_context, 0); + + u->sap_event = pa_core_rttime_new(m->core, pa_rtclock_now() + SAP_INTERVAL, sap_event_cb, u); + u->inhibit_auto_suspend = inhibit_auto_suspend; + + pa_source_output_put(u->source_output); + + pa_modargs_free(ma); + + return 0; + +fail: + if (ma) + pa_modargs_free(ma); + + if (fd >= 0) + pa_close(fd); + + if (sap_fd >= 0) + pa_close(sap_fd); + + return -1; +} + +void pa__done(pa_module*m) { + struct userdata *u; + pa_assert(m); + + if (!(u = m->userdata)) + return; + + if (u->sap_event) + m->core->mainloop->time_free(u->sap_event); + + if (u->source_output) { + pa_source_output_unlink(u->source_output); + pa_source_output_unref(u->source_output); + } + + pa_rtp_context_free(u->rtp_context); + + pa_sap_send(&u->sap_context, 1); + pa_sap_context_destroy(&u->sap_context); + + if (u->memblockq) + pa_memblockq_free(u->memblockq); + + pa_xfree(u); +} diff --git a/src/modules/rtp/rtp-common.c b/src/modules/rtp/rtp-common.c new file mode 100644 index 0000000..65e2c7a --- /dev/null +++ b/src/modules/rtp/rtp-common.c @@ -0,0 +1,97 @@ +/*** + This file is part of PulseAudio. + + Copyright 2006 Lennart Poettering + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; either version 2.1 of the License, + or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. +***/ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include "rtp.h" + +#include <pulsecore/core-util.h> + +uint8_t pa_rtp_payload_from_sample_spec(const pa_sample_spec *ss) { + pa_assert(ss); + + if (ss->format == PA_SAMPLE_S16BE && ss->rate == 44100 && ss->channels == 2) + return 10; + if (ss->format == PA_SAMPLE_S16BE && ss->rate == 44100 && ss->channels == 1) + return 11; + + return 127; +} + +pa_sample_spec *pa_rtp_sample_spec_from_payload(uint8_t payload, pa_sample_spec *ss) { + pa_assert(ss); + + switch (payload) { + case 10: + ss->channels = 2; + ss->format = PA_SAMPLE_S16BE; + ss->rate = 44100; + break; + + case 11: + ss->channels = 1; + ss->format = PA_SAMPLE_S16BE; + ss->rate = 44100; + break; + + default: + return NULL; + } + + return ss; +} + +pa_sample_spec *pa_rtp_sample_spec_fixup(pa_sample_spec * ss) { + pa_assert(ss); + + if (!pa_rtp_sample_spec_valid(ss)) + ss->format = PA_SAMPLE_S16BE; + + pa_assert(pa_rtp_sample_spec_valid(ss)); + return ss; +} + +int pa_rtp_sample_spec_valid(const pa_sample_spec *ss) { + pa_assert(ss); + + if (!pa_sample_spec_valid(ss)) + return 0; + + return ss->format == PA_SAMPLE_S16BE; +} + +const char* pa_rtp_format_to_string(pa_sample_format_t f) { + switch (f) { + case PA_SAMPLE_S16BE: + return "L16"; + default: + return NULL; + } +} + +pa_sample_format_t pa_rtp_string_to_format(const char *s) { + pa_assert(s); + + if (pa_streq(s, "L16")) + return PA_SAMPLE_S16BE; + else + return PA_SAMPLE_INVALID; +} diff --git a/src/modules/rtp/rtp-gstreamer.c b/src/modules/rtp/rtp-gstreamer.c new file mode 100644 index 0000000..28d367b --- /dev/null +++ b/src/modules/rtp/rtp-gstreamer.c @@ -0,0 +1,665 @@ +/*** + This file is part of PulseAudio. + + Copyright 2016 Arun Raghavan <mail@arunraghavan.net> + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; either version 2.1 of the License, + or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. +***/ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <pulse/timeval.h> +#include <pulsecore/fdsem.h> +#include <pulsecore/core-rtclock.h> + +#include "rtp.h" + +#include <gio/gio.h> + +#include <gst/gst.h> +#include <gst/app/gstappsrc.h> +#include <gst/app/gstappsink.h> +#include <gst/base/gstadapter.h> +#include <gst/rtp/gstrtpbuffer.h> + +#define MAKE_ELEMENT_NAMED(v, e, n) \ + v = gst_element_factory_make(e, n); \ + if (!v) { \ + pa_log("Could not create %s element", e); \ + goto fail; \ + } + +#define MAKE_ELEMENT(v, e) MAKE_ELEMENT_NAMED((v), (e), NULL) +#define RTP_HEADER_SIZE 12 + +struct pa_rtp_context { + pa_fdsem *fdsem; + pa_sample_spec ss; + + GstElement *pipeline; + GstElement *appsrc; + GstElement *appsink; + GstCaps *meta_reference; + + bool first_buffer; + uint32_t last_timestamp; + + uint8_t *send_buf; + size_t mtu; +}; + +static GstCaps* caps_from_sample_spec(const pa_sample_spec *ss) { + if (ss->format != PA_SAMPLE_S16BE) + return NULL; + + return gst_caps_new_simple("audio/x-raw", + "format", G_TYPE_STRING, "S16BE", + "rate", G_TYPE_INT, (int) ss->rate, + "channels", G_TYPE_INT, (int) ss->channels, + "layout", G_TYPE_STRING, "interleaved", + NULL); +} + +static bool init_send_pipeline(pa_rtp_context *c, int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss) { + GstElement *appsrc = NULL, *pay = NULL, *capsf = NULL, *rtpbin = NULL, *sink = NULL; + GstCaps *caps; + GSocket *socket; + GInetSocketAddress *addr; + GInetAddress *iaddr; + guint16 port; + gchar *addr_str; + + MAKE_ELEMENT(appsrc, "appsrc"); + MAKE_ELEMENT(pay, "rtpL16pay"); + MAKE_ELEMENT(capsf, "capsfilter"); + MAKE_ELEMENT(rtpbin, "rtpbin"); + MAKE_ELEMENT(sink, "udpsink"); + + c->pipeline = gst_pipeline_new(NULL); + + gst_bin_add_many(GST_BIN(c->pipeline), appsrc, pay, capsf, rtpbin, sink, NULL); + + caps = caps_from_sample_spec(ss); + if (!caps) { + pa_log("Unsupported format to payload"); + goto fail; + } + + socket = g_socket_new_from_fd(fd, NULL); + if (!socket) { + pa_log("Failed to create socket"); + goto fail; + } + + addr = G_INET_SOCKET_ADDRESS(g_socket_get_remote_address(socket, NULL)); + iaddr = g_inet_socket_address_get_address(addr); + addr_str = g_inet_address_to_string(iaddr); + port = g_inet_socket_address_get_port(addr); + + g_object_set(appsrc, "caps", caps, "is-live", TRUE, "blocksize", mtu, "format", 3 /* time */, NULL); + g_object_set(pay, "mtu", mtu, NULL); + g_object_set(sink, "socket", socket, "host", addr_str, "port", port, + "enable-last-sample", FALSE, "sync", FALSE, "loop", + g_socket_get_multicast_loopback(socket), "ttl", + g_socket_get_ttl(socket), "ttl-mc", + g_socket_get_multicast_ttl(socket), "auto-multicast", FALSE, + NULL); + + g_free(addr_str); + g_object_unref(addr); + g_object_unref(socket); + + gst_caps_unref(caps); + + /* Force the payload type that we want */ + caps = gst_caps_new_simple("application/x-rtp", "payload", G_TYPE_INT, (int) payload, NULL); + g_object_set(capsf, "caps", caps, NULL); + gst_caps_unref(caps); + + if (!gst_element_link(appsrc, pay) || + !gst_element_link(pay, capsf) || + !gst_element_link_pads(capsf, "src", rtpbin, "send_rtp_sink_0") || + !gst_element_link_pads(rtpbin, "send_rtp_src_0", sink, "sink")) { + + pa_log("Could not set up send pipeline"); + goto fail; + } + + if (gst_element_set_state(c->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { + pa_log("Could not start pipeline"); + goto fail; + } + + c->appsrc = gst_object_ref(appsrc); + + return true; + +fail: + if (c->pipeline) { + gst_object_unref(c->pipeline); + } else { + /* These weren't yet added to pipeline, so we still have a ref */ + if (appsrc) + gst_object_unref(appsrc); + if (pay) + gst_object_unref(pay); + if (capsf) + gst_object_unref(capsf); + if (rtpbin) + gst_object_unref(rtpbin); + if (sink) + gst_object_unref(sink); + } + + return false; +} + +pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss) { + pa_rtp_context *c = NULL; + GError *error = NULL; + + pa_assert(fd >= 0); + + pa_log_info("Initialising GStreamer RTP backend for send"); + + c = pa_xnew0(pa_rtp_context, 1); + + c->ss = *ss; + c->mtu = mtu - RTP_HEADER_SIZE; + c->send_buf = pa_xmalloc(c->mtu); + + if (!gst_init_check(NULL, NULL, &error)) { + pa_log_error("Could not initialise GStreamer: %s", error->message); + g_error_free(error); + goto fail; + } + + if (!init_send_pipeline(c, fd, payload, mtu, ss)) + goto fail; + + return c; + +fail: + pa_rtp_context_free(c); + return NULL; +} + +/* Called from I/O thread context */ +static bool process_bus_messages(pa_rtp_context *c) { + GstBus *bus; + GstMessage *message; + bool ret = true; + + bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline)); + + while (ret && (message = gst_bus_pop(bus))) { + if (GST_MESSAGE_TYPE(message) == GST_MESSAGE_ERROR) { + GError *error = NULL; + + ret = false; + + gst_message_parse_error(message, &error, NULL); + pa_log("Got an error: %s", error->message); + + g_error_free(error); + } + + gst_message_unref(message); + } + + gst_object_unref(bus); + + return ret; +} + +/* Called from I/O thread context */ +int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) { + GstBuffer *buf; + size_t n = 0; + + pa_assert(c); + pa_assert(q); + + if (!process_bus_messages(c)) + return -1; + + /* + * While we check here for atleast MTU worth of data being available in + * memblockq, we might not have exact equivalent to MTU. Hence, we walk + * over the memchunks in memblockq and accumulate MTU bytes next. + */ + if (pa_memblockq_get_length(q) < c->mtu) + return 0; + + for (;;) { + pa_memchunk chunk; + int r; + + pa_memchunk_reset(&chunk); + + if ((r = pa_memblockq_peek(q, &chunk)) >= 0) { + /* + * Accumulate MTU bytes of data before sending. If the current + * chunk length + accumulated bytes exceeds MTU, we drop bytes + * considered for transfer in this iteration from memblockq. + * + * The remaining bytes will be available in the next iteration, + * as these will be tracked and maintained by memblockq. + */ + size_t k = n + chunk.length > c->mtu ? c->mtu - n : chunk.length; + + pa_assert(chunk.memblock); + + memcpy(c->send_buf + n, pa_memblock_acquire_chunk(&chunk), k); + pa_memblock_release(chunk.memblock); + pa_memblock_unref(chunk.memblock); + + n += k; + pa_memblockq_drop(q, k); + } + + if (r < 0 || n >= c->mtu) { + GstClock *clock; + GstClockTime timestamp, clock_time; + GstMapInfo info; + + if (n > 0) { + clock = gst_element_get_clock(c->pipeline); + clock_time = gst_clock_get_time(clock); + gst_object_unref(clock); + + timestamp = gst_element_get_base_time(c->pipeline); + if (timestamp > clock_time) + timestamp -= clock_time; + else + timestamp = 0; + + buf = gst_buffer_new_allocate(NULL, n, NULL); + pa_assert(buf); + + GST_BUFFER_PTS(buf) = timestamp; + + pa_assert_se(gst_buffer_map(buf, &info, GST_MAP_WRITE)); + + memcpy(info.data, c->send_buf, n); + gst_buffer_unmap(buf, &info); + + if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) { + pa_log_error("Could not push buffer"); + return -1; + } + } + + if (r < 0 || pa_memblockq_get_length(q) < c->mtu) + break; + + n = 0; + } + } + + return 0; +} + +static GstCaps* rtp_caps_from_sample_spec(const pa_sample_spec *ss) { + if (ss->format != PA_SAMPLE_S16BE) + return NULL; + + return gst_caps_new_simple("application/x-rtp", + "media", G_TYPE_STRING, "audio", + "encoding-name", G_TYPE_STRING, "L16", + "clock-rate", G_TYPE_INT, (int) ss->rate, + "payload", G_TYPE_INT, (int) pa_rtp_payload_from_sample_spec(ss), + "layout", G_TYPE_STRING, "interleaved", + NULL); +} + +static void on_pad_added(GstElement *element, GstPad *pad, gpointer userdata) { + pa_rtp_context *c = (pa_rtp_context *) userdata; + GstElement *depay; + GstPad *sinkpad; + GstPadLinkReturn ret; + + depay = gst_bin_get_by_name(GST_BIN(c->pipeline), "depay"); + pa_assert(depay); + + sinkpad = gst_element_get_static_pad(depay, "sink"); + + ret = gst_pad_link(pad, sinkpad); + if (ret != GST_PAD_LINK_OK) { + GstBus *bus; + GError *error; + + bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline)); + error = g_error_new(GST_CORE_ERROR, GST_CORE_ERROR_PAD, "Could not link rtpbin to depayloader"); + gst_bus_post(bus, gst_message_new_error(GST_OBJECT(c->pipeline), error, NULL)); + + /* Actually cause the I/O thread to wake up and process the error */ + pa_fdsem_post(c->fdsem); + + g_error_free(error); + gst_object_unref(bus); + } + + gst_object_unref(sinkpad); + gst_object_unref(depay); +} + +static GstPadProbeReturn udpsrc_buffer_probe(GstPad *pad, GstPadProbeInfo *info, gpointer userdata) { + struct timeval tv; + pa_usec_t timestamp; + pa_rtp_context *c = (pa_rtp_context *) userdata; + + pa_assert(info->type & GST_PAD_PROBE_TYPE_BUFFER); + + pa_gettimeofday(&tv); + timestamp = pa_timeval_load(&tv); + + gst_buffer_add_reference_timestamp_meta(GST_BUFFER(info->data), c->meta_reference, timestamp * GST_USECOND, + GST_CLOCK_TIME_NONE); + + return GST_PAD_PROBE_OK; +} + +static bool init_receive_pipeline(pa_rtp_context *c, int fd, const pa_sample_spec *ss) { + GstElement *udpsrc = NULL, *rtpbin = NULL, *depay = NULL, *appsink = NULL; + GstCaps *caps; + GstPad *pad; + GSocket *socket; + GError *error = NULL; + + MAKE_ELEMENT(udpsrc, "udpsrc"); + MAKE_ELEMENT(rtpbin, "rtpbin"); + MAKE_ELEMENT_NAMED(depay, "rtpL16depay", "depay"); + MAKE_ELEMENT(appsink, "appsink"); + + c->pipeline = gst_pipeline_new(NULL); + + gst_bin_add_many(GST_BIN(c->pipeline), udpsrc, rtpbin, depay, appsink, NULL); + + socket = g_socket_new_from_fd(fd, &error); + if (error) { + pa_log("Could not create socket: %s", error->message); + g_error_free(error); + goto fail; + } + + caps = rtp_caps_from_sample_spec(ss); + if (!caps) { + pa_log("Unsupported format to payload"); + goto fail; + } + + g_object_set(udpsrc, "socket", socket, "caps", caps, "auto-multicast" /* caller handles this */, FALSE, NULL); + g_object_set(rtpbin, "latency", 0, "buffer-mode", 0 /* none */, NULL); + g_object_set(appsink, "sync", FALSE, "enable-last-sample", FALSE, NULL); + + gst_caps_unref(caps); + g_object_unref(socket); + + if (!gst_element_link_pads(udpsrc, "src", rtpbin, "recv_rtp_sink_0") || + !gst_element_link(depay, appsink)) { + + pa_log("Could not set up receive pipeline"); + goto fail; + } + + g_signal_connect(G_OBJECT(rtpbin), "pad-added", G_CALLBACK(on_pad_added), c); + + /* This logic should go into udpsrc, and we should be populating the + * receive timestamp using SCM_TIMESTAMP, but until we have that ... */ + c->meta_reference = gst_caps_new_empty_simple("timestamp/x-pulseaudio-wallclock"); + + pad = gst_element_get_static_pad(udpsrc, "src"); + gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, udpsrc_buffer_probe, c, NULL); + gst_object_unref(pad); + + if (gst_element_set_state(c->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { + pa_log("Could not start pipeline"); + goto fail; + } + + c->appsink = gst_object_ref(appsink); + + return true; + +fail: + if (c->pipeline) { + gst_object_unref(c->pipeline); + } else { + /* These weren't yet added to pipeline, so we still have a ref */ + if (udpsrc) + gst_object_unref(udpsrc); + if (depay) + gst_object_unref(depay); + if (rtpbin) + gst_object_unref(rtpbin); + if (appsink) + gst_object_unref(appsink); + } + + return false; +} + +/* Called from the GStreamer streaming thread */ +static void appsink_eos(GstAppSink *appsink, gpointer userdata) { + pa_rtp_context *c = (pa_rtp_context *) userdata; + + pa_fdsem_post(c->fdsem); +} + +/* Called from the GStreamer streaming thread */ +static GstFlowReturn appsink_new_sample(GstAppSink *appsink, gpointer userdata) { + pa_rtp_context *c = (pa_rtp_context *) userdata; + + pa_fdsem_post(c->fdsem); + + return GST_FLOW_OK; +} + +pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample_spec *ss) { + pa_rtp_context *c = NULL; + GstAppSinkCallbacks callbacks = { 0, }; + GError *error = NULL; + + pa_assert(fd >= 0); + + pa_log_info("Initialising GStreamer RTP backend for receive"); + + c = pa_xnew0(pa_rtp_context, 1); + + c->fdsem = pa_fdsem_new(); + c->ss = *ss; + c->send_buf = NULL; + c->first_buffer = true; + + if (!gst_init_check(NULL, NULL, &error)) { + pa_log_error("Could not initialise GStreamer: %s", error->message); + g_error_free(error); + goto fail; + } + + if (!init_receive_pipeline(c, fd, ss)) + goto fail; + + callbacks.eos = appsink_eos; + callbacks.new_sample = appsink_new_sample; + gst_app_sink_set_callbacks(GST_APP_SINK(c->appsink), &callbacks, c, NULL); + + return c; + +fail: + pa_rtp_context_free(c); + return NULL; +} + +/* Called from I/O thread context */ +int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) { + GstSample *sample = NULL; + GstBufferList *buf_list; + GstAdapter *adapter; + GstBuffer *buf; + GstMapInfo info; + GstClockTime timestamp = GST_CLOCK_TIME_NONE; + uint8_t *data; + uint64_t data_len = 0; + + if (!process_bus_messages(c)) + goto fail; + + adapter = gst_adapter_new(); + pa_assert(adapter); + + while (true) { + sample = gst_app_sink_try_pull_sample(GST_APP_SINK(c->appsink), 0); + if (!sample) + break; + + buf = gst_sample_get_buffer(sample); + + /* Get the timestamp from the first buffer */ + if (timestamp == GST_CLOCK_TIME_NONE) { + GstReferenceTimestampMeta *meta = gst_buffer_get_reference_timestamp_meta(buf, c->meta_reference); + + /* Use the meta if we were able to insert it and it came through, + * else try to fallback to the DTS, which is only available in + * GStreamer 1.16 and earlier. */ + if (meta) + timestamp = meta->timestamp; + else if (GST_BUFFER_DTS(buf) != GST_CLOCK_TIME_NONE) + timestamp = GST_BUFFER_DTS(buf); + else + timestamp = 0; + } + + if (GST_BUFFER_IS_DISCONT(buf)) + pa_log_info("Discontinuity detected, possibly lost some packets"); + + if (!gst_buffer_map(buf, &info, GST_MAP_READ)) { + pa_log_info("Failed to map buffer"); + gst_sample_unref(sample); + goto fail; + } + + data_len += info.size; + /* We need the buffer to be valid longer than the sample, which will + * be valid only for the duration of this loop. + * + * To do this, increase the ref count. Ownership is transferred to the + * adapter in gst_adapter_push. + */ + gst_buffer_ref(buf); + gst_adapter_push(adapter, buf); + gst_buffer_unmap(buf, &info); + + gst_sample_unref(sample); + } + + buf_list = gst_adapter_take_buffer_list(adapter, data_len); + pa_assert(buf_list); + + pa_assert(pa_mempool_block_size_max(pool) >= data_len); + + chunk->memblock = pa_memblock_new(pool, data_len); + chunk->index = 0; + chunk->length = data_len; + + data = (uint8_t *) pa_memblock_acquire_chunk(chunk); + + for (int i = 0; i < gst_buffer_list_length(buf_list); i++) { + buf = gst_buffer_list_get(buf_list, i); + + if (!gst_buffer_map(buf, &info, GST_MAP_READ)) { + gst_buffer_list_unref(buf_list); + goto fail; + } + + memcpy(data, info.data, info.size); + data += info.size; + gst_buffer_unmap(buf, &info); + } + + pa_memblock_release(chunk->memblock); + + /* When buffer-mode = none, the buffer PTS is the RTP timestamp, converted + * to time units (instead of clock-rate units as is in the header) and + * wraparound-corrected. */ + *rtp_tstamp = gst_util_uint64_scale_int(GST_BUFFER_PTS(gst_buffer_list_get(buf_list, 0)), c->ss.rate, GST_SECOND) & 0xFFFFFFFFU; + if (timestamp != GST_CLOCK_TIME_NONE) + pa_timeval_rtstore(tstamp, timestamp / PA_NSEC_PER_USEC, false); + + if (c->first_buffer) { + c->first_buffer = false; + c->last_timestamp = *rtp_tstamp; + } else { + /* The RTP clock -> time domain -> RTP clock transformation above might + * add a ±1 rounding error, so let's get rid of that */ + uint32_t expected = c->last_timestamp + (uint32_t) (data_len / pa_rtp_context_get_frame_size(c)); + int delta = *rtp_tstamp - expected; + + if (delta == 1 || delta == -1) + *rtp_tstamp -= delta; + + c->last_timestamp = *rtp_tstamp; + } + + gst_buffer_list_unref(buf_list); + gst_object_unref(adapter); + + return 0; + +fail: + if (adapter) + gst_object_unref(adapter); + + if (chunk->memblock) + pa_memblock_unref(chunk->memblock); + + return -1; +} + +void pa_rtp_context_free(pa_rtp_context *c) { + pa_assert(c); + + if (c->meta_reference) + gst_caps_unref(c->meta_reference); + + if (c->appsrc) { + gst_app_src_end_of_stream(GST_APP_SRC(c->appsrc)); + gst_object_unref(c->appsrc); + pa_xfree(c->send_buf); + } + + if (c->appsink) + gst_object_unref(c->appsink); + + if (c->pipeline) { + gst_element_set_state(c->pipeline, GST_STATE_NULL); + gst_object_unref(c->pipeline); + } + + if (c->fdsem) + pa_fdsem_free(c->fdsem); + + pa_xfree(c); +} + +pa_rtpoll_item* pa_rtp_context_get_rtpoll_item(pa_rtp_context *c, pa_rtpoll *rtpoll) { + return pa_rtpoll_item_new_fdsem(rtpoll, PA_RTPOLL_LATE, c->fdsem); +} + +size_t pa_rtp_context_get_frame_size(pa_rtp_context *c) { + return pa_frame_size(&c->ss); +} diff --git a/src/modules/rtp/rtp-native.c b/src/modules/rtp/rtp-native.c new file mode 100644 index 0000000..01e668c --- /dev/null +++ b/src/modules/rtp/rtp-native.c @@ -0,0 +1,404 @@ +/*** + This file is part of PulseAudio. + + Copyright 2006 Lennart Poettering + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; either version 2.1 of the License, + or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. +***/ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <stdlib.h> +#include <string.h> +#include <errno.h> +#include <unistd.h> +#include <sys/ioctl.h> + +#ifdef HAVE_SYS_FILIO_H +#include <sys/filio.h> +#endif + +#ifdef HAVE_SYS_UIO_H +#include <sys/uio.h> +#endif + +#include <pulsecore/core-error.h> +#include <pulsecore/log.h> +#include <pulsecore/macro.h> +#include <pulsecore/core-util.h> +#include <pulsecore/arpa-inet.h> +#include <pulsecore/poll.h> + +#include "rtp.h" + +typedef struct pa_rtp_context { + int fd; + uint16_t sequence; + uint32_t timestamp; + uint32_t ssrc; + uint8_t payload; + size_t frame_size; + size_t mtu; + + uint8_t *recv_buf; + size_t recv_buf_size; + pa_memchunk memchunk; +} pa_rtp_context; + +pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss) { + pa_rtp_context *c; + + pa_assert(fd >= 0); + + pa_log_info("Initialising native RTP backend for send"); + + c = pa_xnew0(pa_rtp_context, 1); + + c->fd = fd; + c->sequence = (uint16_t) (rand()*rand()); + c->timestamp = 0; + c->ssrc = (uint32_t) (rand()*rand()); + c->payload = (uint8_t) (payload & 127U); + c->frame_size = pa_frame_size(ss); + c->mtu = mtu; + + c->recv_buf = NULL; + c->recv_buf_size = 0; + pa_memchunk_reset(&c->memchunk); + + return c; +} + +#define MAX_IOVECS 16 + +int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) { + struct iovec iov[MAX_IOVECS]; + pa_memblock* mb[MAX_IOVECS]; + int iov_idx = 1; + size_t n = 0; + + pa_assert(c); + pa_assert(q); + + if (pa_memblockq_get_length(q) < c->mtu) + return 0; + + for (;;) { + int r; + pa_memchunk chunk; + + pa_memchunk_reset(&chunk); + + if ((r = pa_memblockq_peek(q, &chunk)) >= 0) { + + size_t k = n + chunk.length > c->mtu ? c->mtu - n : chunk.length; + + pa_assert(chunk.memblock); + + iov[iov_idx].iov_base = pa_memblock_acquire_chunk(&chunk); + iov[iov_idx].iov_len = k; + mb[iov_idx] = chunk.memblock; + iov_idx ++; + + n += k; + pa_memblockq_drop(q, k); + } + + pa_assert(n % c->frame_size == 0); + + if (r < 0 || n >= c->mtu || iov_idx >= MAX_IOVECS) { + uint32_t header[3]; + struct msghdr m; + ssize_t k; + int i; + + if (n > 0) { + header[0] = htonl(((uint32_t) 2 << 30) | ((uint32_t) c->payload << 16) | ((uint32_t) c->sequence)); + header[1] = htonl(c->timestamp); + header[2] = htonl(c->ssrc); + + iov[0].iov_base = (void*)header; + iov[0].iov_len = sizeof(header); + + m.msg_name = NULL; + m.msg_namelen = 0; + m.msg_iov = iov; + m.msg_iovlen = (size_t) iov_idx; + m.msg_control = NULL; + m.msg_controllen = 0; + m.msg_flags = 0; + + k = sendmsg(c->fd, &m, MSG_DONTWAIT); + + for (i = 1; i < iov_idx; i++) { + pa_memblock_release(mb[i]); + pa_memblock_unref(mb[i]); + } + + c->sequence++; + } else + k = 0; + + c->timestamp += (unsigned) (n/c->frame_size); + + if (k < 0) { + if (errno != EAGAIN && errno != EINTR) /* If the queue is full, just ignore it */ + pa_log("sendmsg() failed: %s", pa_cstrerror(errno)); + return -1; + } + + if (r < 0 || pa_memblockq_get_length(q) < c->mtu) + break; + + n = 0; + iov_idx = 1; + } + } + + return 0; +} + +pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample_spec *ss) { + pa_rtp_context *c; + + pa_log_info("Initialising native RTP backend for receive"); + + c = pa_xnew0(pa_rtp_context, 1); + + c->fd = fd; + c->payload = payload; + c->frame_size = pa_frame_size(ss); + + c->recv_buf_size = 2000; + c->recv_buf = pa_xmalloc(c->recv_buf_size); + pa_memchunk_reset(&c->memchunk); + + return c; +} + +int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) { + int size; + size_t audio_length; + size_t metadata_length; + struct msghdr m; + struct cmsghdr *cm; + struct iovec iov; + uint32_t header; + uint32_t ssrc; + uint8_t payload; + unsigned cc; + ssize_t r; + uint8_t aux[1024]; + bool found_tstamp = false; + + pa_assert(c); + pa_assert(chunk); + + pa_memchunk_reset(chunk); + + if (ioctl(c->fd, FIONREAD, &size) < 0) { + pa_log_warn("FIONREAD failed: %s", pa_cstrerror(errno)); + goto fail; + } + + if (size <= 0) { + /* size can be 0 due to any of the following reasons: + * + * 1. Somebody sent us a perfectly valid zero-length UDP packet. + * 2. Somebody sent us a UDP packet with a bad CRC. + * + * It is unknown whether size can actually be less than zero. + * + * In the first case, the packet has to be read out, otherwise the + * kernel will tell us again and again about it, thus preventing + * reception of any further packets. So let's just read it out + * now and discard it later, when comparing the number of bytes + * received (0) with the number of bytes wanted (1, see below). + * + * In the second case, recvmsg() will fail, thus allowing us to + * return the error. + * + * Just to avoid passing zero-sized memchunks and NULL pointers to + * recvmsg(), let's force allocation of at least one byte by setting + * size to 1. + */ + size = 1; + } + + if (c->recv_buf_size < (size_t) size) { + do + c->recv_buf_size *= 2; + while (c->recv_buf_size < (size_t) size); + + c->recv_buf = pa_xrealloc(c->recv_buf, c->recv_buf_size); + } + + pa_assert(c->recv_buf_size >= (size_t) size); + + iov.iov_base = c->recv_buf; + iov.iov_len = (size_t) size; + + m.msg_name = NULL; + m.msg_namelen = 0; + m.msg_iov = &iov; + m.msg_iovlen = 1; + m.msg_control = aux; + m.msg_controllen = sizeof(aux); + m.msg_flags = 0; + + r = recvmsg(c->fd, &m, 0); + + if (r != size) { + if (r < 0 && errno != EAGAIN && errno != EINTR) + pa_log_warn("recvmsg() failed: %s", r < 0 ? pa_cstrerror(errno) : "size mismatch"); + + goto fail; + } + + if (size < 12) { + pa_log_warn("RTP packet too short."); + goto fail; + } + + memcpy(&header, iov.iov_base, sizeof(uint32_t)); + memcpy(rtp_tstamp, (uint8_t*) iov.iov_base + 4, sizeof(uint32_t)); + memcpy(&ssrc, (uint8_t*) iov.iov_base + 8, sizeof(uint32_t)); + + header = ntohl(header); + *rtp_tstamp = ntohl(*rtp_tstamp); + ssrc = ntohl(c->ssrc); + + if ((header >> 30) != 2) { + pa_log_warn("Unsupported RTP version."); + goto fail; + } + + if ((header >> 29) & 1) { + pa_log_warn("RTP padding not supported."); + goto fail; + } + + if ((header >> 28) & 1) { + pa_log_warn("RTP header extensions not supported."); + goto fail; + } + + if (ssrc != c->ssrc) { + pa_log_debug("Got unexpected SSRC"); + goto fail; + } + + cc = (header >> 24) & 0xF; + payload = (uint8_t) ((header >> 16) & 127U); + c->sequence = (uint16_t) (header & 0xFFFFU); + + metadata_length = 12 + cc * 4; + + if (payload != c->payload) { + pa_log_debug("Got unexpected payload: %u", payload); + goto fail; + } + + if (metadata_length > (unsigned) size) { + pa_log_warn("RTP packet too short. (CSRC)"); + goto fail; + } + + audio_length = size - metadata_length; + + if (audio_length % c->frame_size != 0) { + pa_log_warn("Bad RTP packet size."); + goto fail; + } + + if (c->memchunk.length < (unsigned) audio_length) { + size_t l; + + if (c->memchunk.memblock) + pa_memblock_unref(c->memchunk.memblock); + + l = PA_MAX((size_t) audio_length, pa_mempool_block_size_max(pool)); + + c->memchunk.memblock = pa_memblock_new(pool, l); + c->memchunk.index = 0; + c->memchunk.length = pa_memblock_get_length(c->memchunk.memblock); + } + + memcpy(pa_memblock_acquire_chunk(&c->memchunk), c->recv_buf + metadata_length, audio_length); + pa_memblock_release(c->memchunk.memblock); + + chunk->memblock = pa_memblock_ref(c->memchunk.memblock); + chunk->index = c->memchunk.index; + chunk->length = audio_length; + + c->memchunk.index += audio_length; + c->memchunk.length -= audio_length; + + if (c->memchunk.length <= 0) { + pa_memblock_unref(c->memchunk.memblock); + pa_memchunk_reset(&c->memchunk); + } + + for (cm = CMSG_FIRSTHDR(&m); cm; cm = CMSG_NXTHDR(&m, cm)) + if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_TIMESTAMP) { + memcpy(tstamp, CMSG_DATA(cm), sizeof(struct timeval)); + found_tstamp = true; + break; + } + + if (!found_tstamp) { + pa_log_warn("Couldn't find SCM_TIMESTAMP data in auxiliary recvmsg() data!"); + pa_zero(*tstamp); + } + + return 0; + +fail: + if (chunk->memblock) + pa_memblock_unref(chunk->memblock); + + return -1; +} + +void pa_rtp_context_free(pa_rtp_context *c) { + pa_assert(c); + + pa_assert_se(pa_close(c->fd) == 0); + + if (c->memchunk.memblock) + pa_memblock_unref(c->memchunk.memblock); + + pa_xfree(c->recv_buf); + pa_xfree(c); +} + +size_t pa_rtp_context_get_frame_size(pa_rtp_context *c) { + return c->frame_size; +} + +pa_rtpoll_item* pa_rtp_context_get_rtpoll_item(pa_rtp_context *c, pa_rtpoll *rtpoll) { + pa_rtpoll_item *item; + struct pollfd *p; + + item = pa_rtpoll_item_new(rtpoll, PA_RTPOLL_LATE, 1); + + p = pa_rtpoll_item_get_pollfd(item, NULL); + p->fd = c->fd; + p->events = POLLIN; + p->revents = 0; + + return item; +} diff --git a/src/modules/rtp/rtp.h b/src/modules/rtp/rtp.h new file mode 100644 index 0000000..372df75 --- /dev/null +++ b/src/modules/rtp/rtp.h @@ -0,0 +1,56 @@ +#ifndef foortphfoo +#define foortphfoo + +/*** + This file is part of PulseAudio. + + Copyright 2006 Lennart Poettering + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; either version 2.1 of the License, + or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. +***/ + +#include <inttypes.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <pulsecore/memblockq.h> +#include <pulsecore/memchunk.h> +#include <pulsecore/rtpoll.h> + +typedef struct pa_rtp_context pa_rtp_context; + +int pa_rtp_context_init_send(pa_rtp_context *c, int fd, uint8_t payload, size_t mtu, size_t frame_size); +pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss); + +/* If the memblockq doesn't have a silence memchunk set, then the caller must + * guarantee that the current read index doesn't point to a hole. */ +int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q); + +pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample_spec *ss); +int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp); + +void pa_rtp_context_free(pa_rtp_context *c); + +size_t pa_rtp_context_get_frame_size(pa_rtp_context *c); +pa_rtpoll_item* pa_rtp_context_get_rtpoll_item(pa_rtp_context *c, pa_rtpoll *rtpoll); + +pa_sample_spec* pa_rtp_sample_spec_fixup(pa_sample_spec *ss); +int pa_rtp_sample_spec_valid(const pa_sample_spec *ss); + +uint8_t pa_rtp_payload_from_sample_spec(const pa_sample_spec *ss); +pa_sample_spec *pa_rtp_sample_spec_from_payload(uint8_t payload, pa_sample_spec *ss); + +const char* pa_rtp_format_to_string(pa_sample_format_t f); +pa_sample_format_t pa_rtp_string_to_format(const char *s); + +#endif diff --git a/src/modules/rtp/rtsp_client.c b/src/modules/rtp/rtsp_client.c new file mode 100644 index 0000000..9fd386a --- /dev/null +++ b/src/modules/rtp/rtsp_client.c @@ -0,0 +1,643 @@ +/*** + This file is part of PulseAudio. + + Copyright 2008 Colin Guthrie + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; either version 2.1 of the License, + or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. +***/ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <stdlib.h> +#include <string.h> +#include <errno.h> +#include <unistd.h> +#include <sys/ioctl.h> +#include <netinet/in.h> +#include <pulse/rtclock.h> +#include <pulse/timeval.h> + +#ifdef HAVE_SYS_FILIO_H +#include <sys/filio.h> +#endif + +#include <pulse/xmalloc.h> + +#include <pulsecore/core-error.h> +#include <pulsecore/core-util.h> +#include <pulsecore/log.h> +#include <pulsecore/macro.h> +#include <pulsecore/strbuf.h> +#include <pulsecore/ioline.h> +#include <pulsecore/arpa-inet.h> +#include <pulsecore/random.h> +#include <pulsecore/core-rtclock.h> + +#include "rtsp_client.h" + +#define RECONNECT_INTERVAL (5 * PA_USEC_PER_SEC) + +struct pa_rtsp_client { + pa_mainloop_api *mainloop; + char *hostname; + uint16_t port; + + pa_socket_client *sc; + pa_ioline *ioline; + + pa_rtsp_cb_t callback; + + void *userdata; + const char *useragent; + + pa_rtsp_state_t state; + pa_rtsp_status_t status; + uint8_t waiting; + + pa_headerlist* headers; + char *last_header; + pa_strbuf *header_buffer; + pa_headerlist* response_headers; + + char *localip; + char *url; + uint16_t rtp_port; + uint32_t cseq; + char *session; + char *transport; + pa_time_event *reconnect_event; + bool autoreconnect; +}; + +pa_rtsp_client* pa_rtsp_client_new(pa_mainloop_api *mainloop, const char *hostname, uint16_t port, const char *useragent, bool autoreconnect) { + pa_rtsp_client *c; + + pa_assert(mainloop); + pa_assert(hostname); + pa_assert(port > 0); + + c = pa_xnew0(pa_rtsp_client, 1); + c->mainloop = mainloop; + c->hostname = pa_xstrdup(hostname); + c->port = port; + c->headers = pa_headerlist_new(); + + if (useragent) + c->useragent = useragent; + else + c->useragent = "PulseAudio RTSP Client"; + + c->autoreconnect = autoreconnect; + return c; +} + +static void free_events(pa_rtsp_client *c) { + pa_assert(c); + + if (c->reconnect_event) { + c->mainloop->time_free(c->reconnect_event); + c->reconnect_event = NULL; + } +} + +void pa_rtsp_client_free(pa_rtsp_client *c) { + pa_assert(c); + + free_events(c); + if (c->sc) + pa_socket_client_unref(c->sc); + + pa_rtsp_disconnect(c); + + pa_xfree(c->hostname); + pa_xfree(c->url); + pa_xfree(c->localip); + pa_xfree(c->session); + pa_xfree(c->transport); + pa_xfree(c->last_header); + if (c->header_buffer) + pa_strbuf_free(c->header_buffer); + if (c->response_headers) + pa_headerlist_free(c->response_headers); + pa_headerlist_free(c->headers); + + pa_xfree(c); +} + +static void headers_read(pa_rtsp_client *c) { + char delimiters[] = ";"; + char* token; + + pa_assert(c); + pa_assert(c->response_headers); + pa_assert(c->callback); + + /* Deal with a SETUP response */ + if (STATE_SETUP == c->state) { + const char* token_state = NULL; + const char* pc = NULL; + c->session = pa_xstrdup(pa_headerlist_gets(c->response_headers, "Session")); + c->transport = pa_xstrdup(pa_headerlist_gets(c->response_headers, "Transport")); + + if (!c->session || !c->transport) { + pa_log("Invalid SETUP response."); + return; + } + + /* Now parse out the server port component of the response. */ + while ((token = pa_split(c->transport, delimiters, &token_state))) { + if ((pc = strchr(token, '='))) { + if (0 == strncmp(token, "server_port", 11)) { + uint32_t p; + + if (pa_atou(pc + 1, &p) < 0 || p <= 0 || p > 0xffff) { + pa_log("Invalid SETUP response (invalid server_port)."); + pa_xfree(token); + return; + } + + c->rtp_port = p; + pa_xfree(token); + break; + } + } + pa_xfree(token); + } + if (0 == c->rtp_port) { + /* Error no server_port in response */ + pa_log("Invalid SETUP response (no port number)."); + return; + } + } + + /* Call our callback */ + c->callback(c, c->state, c->status, c->response_headers, c->userdata); +} + +static void line_callback(pa_ioline *line, const char *s, void *userdata) { + pa_rtsp_client *c = userdata; + char *delimpos; + char *s2, *s2p; + + pa_assert(line); + pa_assert(c); + pa_assert(c->callback); + + if (!s) { + /* Keep the ioline/iochannel open as they will be freed automatically */ + c->ioline = NULL; + c->callback(c, STATE_DISCONNECTED, STATUS_NO_RESPONSE, NULL, c->userdata); + return; + } + + s2 = pa_xstrdup(s); + /* Trim trailing carriage returns */ + s2p = s2 + strlen(s2) - 1; + while (s2p >= s2 && '\r' == *s2p) { + *s2p = '\0'; + s2p -= 1; + } + + if (c->waiting && pa_streq(s2, "RTSP/1.0 200 OK")) { + if (c->response_headers) + pa_headerlist_free(c->response_headers); + c->response_headers = pa_headerlist_new(); + + c->status = STATUS_OK; + c->waiting = 0; + goto exit; + } else if (c->waiting && pa_streq(s2, "RTSP/1.0 401 Unauthorized")) { + if (c->response_headers) + pa_headerlist_free(c->response_headers); + c->response_headers = pa_headerlist_new(); + + c->status = STATUS_UNAUTHORIZED; + c->waiting = 0; + goto exit; + } else if (c->waiting) { + pa_log_warn("Unexpected/Unhandled response: %s", s2); + + if (pa_streq(s2, "RTSP/1.0 400 Bad Request")) + c->status = STATUS_BAD_REQUEST; + else if (pa_streq(s2, "RTSP/1.0 500 Internal Server Error")) + c->status = STATUS_INTERNAL_ERROR; + else + c->status = STATUS_NO_RESPONSE; + goto exit; + } + + if (!strlen(s2)) { + /* End of headers */ + /* We will have a header left from our looping iteration, so add it in :) */ + if (c->last_header) { + char *tmp = pa_strbuf_to_string_free(c->header_buffer); + /* This is not a continuation header so let's dump it into our proplist */ + pa_headerlist_puts(c->response_headers, c->last_header, tmp); + pa_xfree(tmp); + pa_xfree(c->last_header); + c->last_header = NULL; + c->header_buffer = NULL; + } + + pa_log_debug("Full response received. Dispatching"); + headers_read(c); + goto exit; + } + + /* Read and parse a header (we know it's not empty) */ + /* TODO: Move header reading into the headerlist. */ + + /* If the first character is a space, it's a continuation header */ + if (c->last_header && ' ' == s2[0]) { + pa_assert(c->header_buffer); + + /* Add this line to the buffer (sans the space) */ + pa_strbuf_puts(c->header_buffer, &(s2[1])); + goto exit; + } + + if (c->last_header) { + char *tmp = pa_strbuf_to_string_free(c->header_buffer); + /* This is not a continuation header so let's dump the full + header/value into our proplist */ + pa_headerlist_puts(c->response_headers, c->last_header, tmp); + pa_xfree(tmp); + pa_xfree(c->last_header); + c->last_header = NULL; + c->header_buffer = NULL; + } + + delimpos = strstr(s2, ":"); + if (!delimpos) { + pa_log_warn("Unexpected response when expecting header: %s", s); + goto exit; + } + + pa_assert(!c->header_buffer); + pa_assert(!c->last_header); + + c->header_buffer = pa_strbuf_new(); + if (strlen(delimpos) > 1) { + /* Cut our line off so we can copy the header name out */ + *delimpos++ = '\0'; + + /* Trim the front of any spaces */ + while (' ' == *delimpos) + ++delimpos; + + pa_strbuf_puts(c->header_buffer, delimpos); + } else { + /* Cut our line off so we can copy the header name out */ + *delimpos = '\0'; + } + + /* Save the header name */ + c->last_header = pa_xstrdup(s2); + + exit: + pa_xfree(s2); +} + +static void reconnect_cb(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) { + if (userdata) { + pa_rtsp_client *c = userdata; + pa_rtsp_connect(c); + } +} + +static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) { + pa_rtsp_client *c = userdata; + union { + struct sockaddr sa; + struct sockaddr_in in; + struct sockaddr_in6 in6; + } sa; + socklen_t sa_len = sizeof(sa); + + pa_assert(sc); + pa_assert(c); + pa_assert(STATE_CONNECT == c->state); + pa_assert(c->sc == sc); + pa_socket_client_unref(c->sc); + c->sc = NULL; + + if (!io) { + if (c->autoreconnect) { + struct timeval tv; + + pa_log_warn("Connection to server %s:%d failed: %s - will try later", c->hostname, c->port, pa_cstrerror(errno)); + + if (!c->reconnect_event) + c->reconnect_event = c->mainloop->time_new(c->mainloop, pa_timeval_rtstore(&tv, pa_rtclock_now() + RECONNECT_INTERVAL, true), reconnect_cb, c); + else + c->mainloop->time_restart(c->reconnect_event, pa_timeval_rtstore(&tv, pa_rtclock_now() + RECONNECT_INTERVAL, true)); + } else { + pa_log("Connection to server %s:%d failed: %s", c->hostname, c->port, pa_cstrerror(errno)); + } + return; + } + pa_assert(!c->ioline); + + c->ioline = pa_ioline_new(io); + pa_ioline_set_callback(c->ioline, line_callback, c); + + /* Get the local IP address for use externally */ + if (0 == getsockname(pa_iochannel_get_recv_fd(io), &sa.sa, &sa_len)) { + char buf[INET6_ADDRSTRLEN]; + const char *res = NULL; + + if (AF_INET == sa.sa.sa_family) { + if ((res = inet_ntop(sa.sa.sa_family, &sa.in.sin_addr, buf, sizeof(buf)))) { + c->localip = pa_xstrdup(res); + } + } else if (AF_INET6 == sa.sa.sa_family) { + if ((res = inet_ntop(AF_INET6, &sa.in6.sin6_addr, buf, sizeof(buf)))) { + c->localip = pa_xstrdup(res); + } + } + } + pa_log_debug("Established RTSP connection from local ip %s", c->localip); + + if (c->callback) + c->callback(c, c->state, STATUS_OK, NULL, c->userdata); +} + +int pa_rtsp_connect(pa_rtsp_client *c) { + pa_assert(c); + pa_assert(!c->sc); + + pa_xfree(c->session); + c->session = NULL; + + pa_log_debug("Attempting to connect to server '%s:%d'", c->hostname, c->port); + if (!(c->sc = pa_socket_client_new_string(c->mainloop, true, c->hostname, c->port))) { + pa_log("failed to connect to server '%s:%d'", c->hostname, c->port); + return -1; + } + + pa_socket_client_set_callback(c->sc, on_connection, c); + c->waiting = 1; + c->state = STATE_CONNECT; + c->status = STATUS_NO_RESPONSE; + return 0; +} + +void pa_rtsp_set_callback(pa_rtsp_client *c, pa_rtsp_cb_t callback, void *userdata) { + pa_assert(c); + + c->callback = callback; + c->userdata = userdata; +} + +void pa_rtsp_disconnect(pa_rtsp_client *c) { + pa_assert(c); + + if (c->ioline) { + pa_ioline_close(c->ioline); + pa_ioline_unref(c->ioline); + } + c->ioline = NULL; +} + +const char* pa_rtsp_localip(pa_rtsp_client *c) { + pa_assert(c); + + return c->localip; +} + +uint32_t pa_rtsp_serverport(pa_rtsp_client *c) { + pa_assert(c); + + return c->rtp_port; +} + +bool pa_rtsp_exec_ready(const pa_rtsp_client *c) { + pa_assert(c); + + return c->url != NULL && c->ioline != NULL; +} + +void pa_rtsp_set_url(pa_rtsp_client *c, const char *url) { + pa_assert(c); + + c->url = pa_xstrdup(url); +} + +bool pa_rtsp_has_header(pa_rtsp_client *c, const char *key) { + pa_assert(c); + pa_assert(key); + + return pa_headerlist_contains(c->headers, key); +} + +void pa_rtsp_add_header(pa_rtsp_client *c, const char *key, const char *value) { + pa_assert(c); + pa_assert(key); + pa_assert(value); + + pa_headerlist_puts(c->headers, key, value); +} + +const char* pa_rtsp_get_header(pa_rtsp_client *c, const char *key) { + pa_assert(c); + pa_assert(key); + + return pa_headerlist_gets(c->headers, key); +} + +void pa_rtsp_remove_header(pa_rtsp_client *c, const char *key) { + pa_assert(c); + pa_assert(key); + + pa_headerlist_remove(c->headers, key); +} + +static int rtsp_exec(pa_rtsp_client *c, const char *cmd, + const char *content_type, const char *content, + int expect_response, + pa_headerlist *headers) { + pa_strbuf *buf; + char *hdrs; + + pa_assert(c); + pa_assert(c->url); + pa_assert(cmd); + pa_assert(c->ioline); + + pa_log_debug("Sending command: %s", cmd); + + buf = pa_strbuf_new(); + pa_strbuf_printf(buf, "%s %s RTSP/1.0\r\nCSeq: %d\r\n", cmd, c->url, ++c->cseq); + if (c->session) + pa_strbuf_printf(buf, "Session: %s\r\n", c->session); + + /* Add the headers */ + if (headers) { + hdrs = pa_headerlist_to_string(headers); + pa_strbuf_puts(buf, hdrs); + pa_xfree(hdrs); + } + + if (content_type && content) { + pa_strbuf_printf(buf, "Content-Type: %s\r\nContent-Length: %d\r\n", + content_type, (int)strlen(content)); + } + + pa_strbuf_printf(buf, "User-Agent: %s\r\n", c->useragent); + + if (c->headers) { + hdrs = pa_headerlist_to_string(c->headers); + pa_strbuf_puts(buf, hdrs); + pa_xfree(hdrs); + } + + pa_strbuf_puts(buf, "\r\n"); + + if (content_type && content) { + pa_strbuf_puts(buf, content); + } + + /* Our packet is created... now we can send it :) */ + hdrs = pa_strbuf_to_string_free(buf); + /*pa_log_debug("Submitting request:"); + pa_log_debug(hdrs);*/ + pa_ioline_puts(c->ioline, hdrs); + pa_xfree(hdrs); + /* The command is sent we can configure the rtsp client structure to handle a new answer */ + c->waiting = 1; + return 0; +} + +int pa_rtsp_options(pa_rtsp_client *c) { + char *url; + int rv; + + pa_assert(c); + + url = c->url; + c->state = STATE_OPTIONS; + + c->url = (char *)"*"; + rv = rtsp_exec(c, "OPTIONS", NULL, NULL, 0, NULL); + + c->url = url; + return rv; +} + +int pa_rtsp_announce(pa_rtsp_client *c, const char *sdp) { + int rv; + + pa_assert(c); + + if (!sdp) + return -1; + + c->state = STATE_ANNOUNCE; + rv = rtsp_exec(c, "ANNOUNCE", "application/sdp", sdp, 1, NULL); + + return rv; +} + +int pa_rtsp_setup(pa_rtsp_client *c, const char *transport) { + pa_headerlist *headers; + int rv; + + pa_assert(c); + + headers = pa_headerlist_new(); + if (!transport) + pa_headerlist_puts(headers, "Transport", "RTP/AVP/TCP;unicast;interleaved=0-1;mode=record"); + else + pa_headerlist_puts(headers, "Transport", transport); + + c->state = STATE_SETUP; + rv = rtsp_exec(c, "SETUP", NULL, NULL, 1, headers); + + pa_headerlist_free(headers); + return rv; +} + +int pa_rtsp_record(pa_rtsp_client *c, uint16_t *seq, uint32_t *rtptime) { + pa_headerlist *headers; + char *info; + int rv; + + pa_assert(c); + + if (!c->session) { + /* No session in progress */ + return -1; + } + + pa_random(seq, sizeof(*seq)); + pa_random(rtptime, sizeof(*rtptime)); + + headers = pa_headerlist_new(); + pa_headerlist_puts(headers, "Range", "npt=0-"); + info = pa_sprintf_malloc("seq=%u;rtptime=%u", *seq, *rtptime); + pa_headerlist_puts(headers, "RTP-Info", info); + pa_xfree(info); + + c->state = STATE_RECORD; + rv = rtsp_exec(c, "RECORD", NULL, NULL, 1, headers); + + pa_headerlist_free(headers); + return rv; +} + +int pa_rtsp_setparameter(pa_rtsp_client *c, const char *param) { + int rv; + + pa_assert(c); + + if (!param) + return -1; + + c->state = STATE_SET_PARAMETER; + rv = rtsp_exec(c, "SET_PARAMETER", "text/parameters", param, 1, NULL); + + return rv; +} + +int pa_rtsp_flush(pa_rtsp_client *c, uint16_t seq, uint32_t rtptime) { + pa_headerlist* headers; + char *info; + int rv; + + pa_assert(c); + + headers = pa_headerlist_new(); + info = pa_sprintf_malloc("seq=%u;rtptime=%u", seq, rtptime); + pa_headerlist_puts(headers, "RTP-Info", info); + pa_xfree(info); + + c->state = STATE_FLUSH; + rv = rtsp_exec(c, "FLUSH", NULL, NULL, 1, headers); + + pa_headerlist_free(headers); + return rv; +} + +int pa_rtsp_teardown(pa_rtsp_client *c) { + int rv; + + pa_assert(c); + + c->state = STATE_TEARDOWN; + rv = rtsp_exec(c, "TEARDOWN", NULL, NULL, 0, NULL); + + return rv; +} diff --git a/src/modules/rtp/rtsp_client.h b/src/modules/rtp/rtsp_client.h new file mode 100644 index 0000000..2593085 --- /dev/null +++ b/src/modules/rtp/rtsp_client.h @@ -0,0 +1,83 @@ +#ifndef foortspclienthfoo +#define foortspclienthfoo + +/*** + This file is part of PulseAudio. + + Copyright 2008 Colin Guthrie + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; either version 2.1 of the License, + or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. +***/ + +#include <inttypes.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <netdb.h> + +#include <pulsecore/socket-client.h> +#include <pulse/mainloop-api.h> + +#include "headerlist.h" + +typedef struct pa_rtsp_client pa_rtsp_client; + +typedef enum pa_rtsp_state { + STATE_CONNECT, + STATE_OPTIONS, + STATE_ANNOUNCE, + STATE_SETUP, + STATE_RECORD, + STATE_SET_PARAMETER, + STATE_FLUSH, + STATE_TEARDOWN, + STATE_DISCONNECTED +} pa_rtsp_state_t; + +typedef enum pa_rtsp_status { + STATUS_OK = 200, + STATUS_BAD_REQUEST = 400, + STATUS_UNAUTHORIZED = 401, + STATUS_NO_RESPONSE = 444, + STATUS_INTERNAL_ERROR = 500 +} pa_rtsp_status_t; + +typedef void (*pa_rtsp_cb_t)(pa_rtsp_client *c, pa_rtsp_state_t state, pa_rtsp_status_t code, pa_headerlist *headers, void *userdata); + +pa_rtsp_client* pa_rtsp_client_new(pa_mainloop_api *mainloop, const char *hostname, uint16_t port, const char *useragent, bool autoreconnect); +void pa_rtsp_client_free(pa_rtsp_client *c); + +int pa_rtsp_connect(pa_rtsp_client *c); +void pa_rtsp_set_callback(pa_rtsp_client *c, pa_rtsp_cb_t callback, void *userdata); +void pa_rtsp_disconnect(pa_rtsp_client *c); + +const char* pa_rtsp_localip(pa_rtsp_client *c); +uint32_t pa_rtsp_serverport(pa_rtsp_client *c); +bool pa_rtsp_exec_ready(const pa_rtsp_client *c); + +void pa_rtsp_set_url(pa_rtsp_client *c, const char *url); + +bool pa_rtsp_has_header(pa_rtsp_client *c, const char *key); +void pa_rtsp_add_header(pa_rtsp_client *c, const char *key, const char *value); +const char* pa_rtsp_get_header(pa_rtsp_client *c, const char *key); +void pa_rtsp_remove_header(pa_rtsp_client *c, const char *key); + +int pa_rtsp_options(pa_rtsp_client *c); +int pa_rtsp_announce(pa_rtsp_client *c, const char *sdp); +int pa_rtsp_setup(pa_rtsp_client *c, const char *transport); +int pa_rtsp_record(pa_rtsp_client *c, uint16_t *seq, uint32_t *rtptime); +int pa_rtsp_setparameter(pa_rtsp_client *c, const char *param); +int pa_rtsp_flush(pa_rtsp_client *c, uint16_t seq, uint32_t rtptime); +int pa_rtsp_teardown(pa_rtsp_client *c); + +#endif diff --git a/src/modules/rtp/sap.c b/src/modules/rtp/sap.c new file mode 100644 index 0000000..7fb1a38 --- /dev/null +++ b/src/modules/rtp/sap.c @@ -0,0 +1,235 @@ +/*** + This file is part of PulseAudio. + + Copyright 2006 Lennart Poettering + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; either version 2.1 of the License, + or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. +***/ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <stdlib.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <errno.h> +#include <string.h> +#include <unistd.h> +#include <sys/ioctl.h> + +#ifdef HAVE_SYS_FILIO_H +#include <sys/filio.h> +#endif + +#ifdef HAVE_SYS_UIO_H +#include <sys/uio.h> +#endif + +#include <pulse/xmalloc.h> + +#include <pulsecore/core-error.h> +#include <pulsecore/core-util.h> +#include <pulsecore/log.h> +#include <pulsecore/macro.h> +#include <pulsecore/arpa-inet.h> + +#include "sap.h" +#include "sdp.h" + +#define MIME_TYPE "application/sdp" + +pa_sap_context* pa_sap_context_init_send(pa_sap_context *c, int fd, char *sdp_data) { + pa_assert(c); + pa_assert(fd >= 0); + pa_assert(sdp_data); + + c->fd = fd; + c->sdp_data = sdp_data; + c->msg_id_hash = (uint16_t) (rand()*rand()); + + return c; +} + +void pa_sap_context_destroy(pa_sap_context *c) { + pa_assert(c); + + pa_close(c->fd); + pa_xfree(c->sdp_data); +} + +int pa_sap_send(pa_sap_context *c, bool goodbye) { + uint32_t header; + struct sockaddr_storage sa_buf; + struct sockaddr *sa = (struct sockaddr*) &sa_buf; + socklen_t salen = sizeof(sa_buf); + struct iovec iov[4]; + struct msghdr m; + ssize_t k; + + if (getsockname(c->fd, sa, &salen) < 0) { + pa_log("getsockname() failed: %s\n", pa_cstrerror(errno)); + return -1; + } + +#ifdef HAVE_IPV6 + pa_assert(sa->sa_family == AF_INET || sa->sa_family == AF_INET6); +#else + pa_assert(sa->sa_family == AF_INET); +#endif + + header = htonl(((uint32_t) 1 << 29) | +#ifdef HAVE_IPV6 + (sa->sa_family == AF_INET6 ? (uint32_t) 1 << 28 : 0) | +#endif + (goodbye ? (uint32_t) 1 << 26 : 0) | + (c->msg_id_hash)); + + iov[0].iov_base = &header; + iov[0].iov_len = sizeof(header); + + if (sa->sa_family == AF_INET) { + iov[1].iov_base = (void*) &((struct sockaddr_in*) sa)->sin_addr; + iov[1].iov_len = 4U; +#ifdef HAVE_IPV6 + } else { + iov[1].iov_base = (void*) &((struct sockaddr_in6*) sa)->sin6_addr; + iov[1].iov_len = 16U; +#endif + } + + iov[2].iov_base = (char*) MIME_TYPE; + iov[2].iov_len = sizeof(MIME_TYPE); + + iov[3].iov_base = c->sdp_data; + iov[3].iov_len = strlen(c->sdp_data); + + m.msg_name = NULL; + m.msg_namelen = 0; + m.msg_iov = iov; + m.msg_iovlen = 4; + m.msg_control = NULL; + m.msg_controllen = 0; + m.msg_flags = 0; + + if ((k = sendmsg(c->fd, &m, MSG_DONTWAIT)) < 0) + pa_log_warn("sendmsg() failed: %s\n", pa_cstrerror(errno)); + + return (int) k; +} + +pa_sap_context* pa_sap_context_init_recv(pa_sap_context *c, int fd) { + pa_assert(c); + pa_assert(fd >= 0); + + c->fd = fd; + c->sdp_data = NULL; + return c; +} + +int pa_sap_recv(pa_sap_context *c, bool *goodbye) { + struct msghdr m; + struct iovec iov; + int size; + char *buf = NULL, *e; + uint32_t header; + unsigned six, ac, k; + ssize_t r; + + pa_assert(c); + pa_assert(goodbye); + + if (ioctl(c->fd, FIONREAD, &size) < 0) { + pa_log_warn("FIONREAD failed: %s", pa_cstrerror(errno)); + goto fail; + } + + buf = pa_xnew(char, (unsigned) size+1); + buf[size] = 0; + + iov.iov_base = buf; + iov.iov_len = (size_t) size; + + m.msg_name = NULL; + m.msg_namelen = 0; + m.msg_iov = &iov; + m.msg_iovlen = 1; + m.msg_control = NULL; + m.msg_controllen = 0; + m.msg_flags = 0; + + if ((r = recvmsg(c->fd, &m, 0)) != size) { + pa_log_warn("recvmsg() failed: %s", r < 0 ? pa_cstrerror(errno) : "size mismatch"); + goto fail; + } + + if (size < 4) { + pa_log_warn("SAP packet too short."); + goto fail; + } + + memcpy(&header, buf, sizeof(uint32_t)); + header = ntohl(header); + + if (header >> 29 != 1) { + pa_log_warn("Unsupported SAP version."); + goto fail; + } + + if ((header >> 25) & 1) { + pa_log_warn("Encrypted SAP not supported."); + goto fail; + } + + if ((header >> 24) & 1) { + pa_log_warn("Compressed SAP not supported."); + goto fail; + } + + six = (header >> 28) & 1U; + ac = (header >> 16) & 0xFFU; + + k = 4 + (six ? 16U : 4U) + ac*4U; + if ((unsigned) size < k) { + pa_log_warn("SAP packet too short (AD)."); + goto fail; + } + + e = buf + k; + size -= (int) k; + + if ((unsigned) size >= sizeof(MIME_TYPE) && pa_streq(e, MIME_TYPE)) { + e += sizeof(MIME_TYPE); + size -= (int) sizeof(MIME_TYPE); + } else if ((unsigned) size < sizeof(PA_SDP_HEADER)-1 || strncmp(e, PA_SDP_HEADER, sizeof(PA_SDP_HEADER)-1)) { + pa_log_warn("Invalid SDP header."); + goto fail; + } + + if (c->sdp_data) + pa_xfree(c->sdp_data); + + c->sdp_data = pa_xstrndup(e, (unsigned) size); + pa_xfree(buf); + + *goodbye = !!((header >> 26) & 1); + + return 0; + +fail: + pa_xfree(buf); + + return -1; +} diff --git a/src/modules/rtp/sap.h b/src/modules/rtp/sap.h new file mode 100644 index 0000000..becb4ec --- /dev/null +++ b/src/modules/rtp/sap.h @@ -0,0 +1,44 @@ +#ifndef foosaphfoo +#define foosaphfoo + +/*** + This file is part of PulseAudio. + + Copyright 2006 Lennart Poettering + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; either version 2.1 of the License, + or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. +***/ + +#include <inttypes.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <pulsecore/memblockq.h> +#include <pulsecore/memchunk.h> + +typedef struct pa_sap_context { + int fd; + char *sdp_data; + + uint16_t msg_id_hash; +} pa_sap_context; + +pa_sap_context* pa_sap_context_init_send(pa_sap_context *c, int fd, char *sdp_data); +void pa_sap_context_destroy(pa_sap_context *c); + +int pa_sap_send(pa_sap_context *c, bool goodbye); + +pa_sap_context* pa_sap_context_init_recv(pa_sap_context *c, int fd); +int pa_sap_recv(pa_sap_context *c, bool *goodbye); + +#endif diff --git a/src/modules/rtp/sdp.c b/src/modules/rtp/sdp.c new file mode 100644 index 0000000..6a2e0c9 --- /dev/null +++ b/src/modules/rtp/sdp.c @@ -0,0 +1,256 @@ +/*** + This file is part of PulseAudio. + + Copyright 2006 Lennart Poettering + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; either version 2.1 of the License, + or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. +***/ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <time.h> +#include <stdlib.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <string.h> + +#include <pulse/xmalloc.h> +#include <pulse/util.h> + +#include <pulsecore/core-util.h> +#include <pulsecore/log.h> +#include <pulsecore/macro.h> +#include <pulsecore/arpa-inet.h> + +#include "sdp.h" +#include "rtp.h" + +char *pa_sdp_build(int af, const void *src, const void *dst, const char *name, uint16_t port, uint8_t payload, const pa_sample_spec *ss) { + uint32_t ntp; + char buf_src[64], buf_dst[64], un[64]; + const char *u, *f; + + pa_assert(src); + pa_assert(dst); + +#ifdef HAVE_IPV6 + pa_assert(af == AF_INET || af == AF_INET6); +#else + pa_assert(af == AF_INET); +#endif + + pa_assert_se(f = pa_rtp_format_to_string(ss->format)); + + if (!(u = pa_get_user_name(un, sizeof(un)))) + u = "-"; + + ntp = (uint32_t) time(NULL) + 2208988800U; + + pa_assert_se(inet_ntop(af, src, buf_src, sizeof(buf_src))); + pa_assert_se(inet_ntop(af, dst, buf_dst, sizeof(buf_dst))); + + return pa_sprintf_malloc( + PA_SDP_HEADER + "o=%s %lu 0 IN %s %s\n" + "s=%s\n" + "c=IN %s %s\n" + "t=%lu 0\n" + "a=recvonly\n" + "m=audio %u RTP/AVP %i\n" + "a=rtpmap:%i %s/%u/%u\n" + "a=type:broadcast\n", + u, (unsigned long) ntp, af == AF_INET ? "IP4" : "IP6", buf_src, + name, + af == AF_INET ? "IP4" : "IP6", buf_dst, + (unsigned long) ntp, + port, payload, + payload, f, ss->rate, ss->channels); +} + +static pa_sample_spec *parse_sdp_sample_spec(pa_sample_spec *ss, char *c) { + unsigned rate, channels; + pa_assert(ss); + pa_assert(c); + + if (pa_startswith(c, "L16/")) { + ss->format = PA_SAMPLE_S16BE; + c += 4; + } else + return NULL; + + if (sscanf(c, "%u/%u", &rate, &channels) == 2) { + ss->rate = (uint32_t) rate; + ss->channels = (uint8_t) channels; + } else if (sscanf(c, "%u", &rate) == 2) { + ss->rate = (uint32_t) rate; + ss->channels = 1; + } else + return NULL; + + if (!pa_sample_spec_valid(ss)) + return NULL; + + return ss; +} + +pa_sdp_info *pa_sdp_parse(const char *t, pa_sdp_info *i, int is_goodbye) { + uint16_t port = 0; + bool ss_valid = false; + + pa_assert(t); + pa_assert(i); + + i->origin = i->session_name = NULL; + i->salen = 0; + i->payload = 255; + + if (!pa_startswith(t, PA_SDP_HEADER)) { + pa_log("Failed to parse SDP data: invalid header."); + goto fail; + } + + t += sizeof(PA_SDP_HEADER)-1; + + while (*t) { + size_t l; + + l = strcspn(t, "\n"); + + if (l <= 2) { + pa_log("Failed to parse SDP data: line too short: >%s<.", t); + goto fail; + } + + if (pa_startswith(t, "o=")) + i->origin = pa_xstrndup(t+2, l-2); + else if (pa_startswith(t, "s=")) + i->session_name = pa_xstrndup(t+2, l-2); + else if (pa_startswith(t, "c=IN IP4 ")) { + char a[64]; + size_t k; + + k = l-8 > sizeof(a) ? sizeof(a) : l-8; + + pa_strlcpy(a, t+9, k); + a[strcspn(a, "/")] = 0; + + if (inet_pton(AF_INET, a, &((struct sockaddr_in*) &i->sa)->sin_addr) <= 0) { + pa_log("Failed to parse SDP data: bad address: >%s<.", a); + goto fail; + } + + ((struct sockaddr_in*) &i->sa)->sin_family = AF_INET; + ((struct sockaddr_in*) &i->sa)->sin_port = 0; + i->salen = sizeof(struct sockaddr_in); +#ifdef HAVE_IPV6 + } else if (pa_startswith(t, "c=IN IP6 ")) { + char a[64]; + size_t k; + + k = l-8 > sizeof(a) ? sizeof(a) : l-8; + + pa_strlcpy(a, t+9, k); + a[strcspn(a, "/")] = 0; + + if (inet_pton(AF_INET6, a, &((struct sockaddr_in6*) &i->sa)->sin6_addr) <= 0) { + pa_log("Failed to parse SDP data: bad address: >%s<.", a); + goto fail; + } + + ((struct sockaddr_in6*) &i->sa)->sin6_family = AF_INET6; + ((struct sockaddr_in6*) &i->sa)->sin6_port = 0; + i->salen = sizeof(struct sockaddr_in6); +#endif + } else if (pa_startswith(t, "m=audio ")) { + + if (i->payload > 127) { + int _port, _payload; + + if (sscanf(t+8, "%i RTP/AVP %i", &_port, &_payload) == 2) { + + if (_port <= 0 || _port > 0xFFFF) { + pa_log("Failed to parse SDP data: invalid port %i.", _port); + goto fail; + } + + if (_payload < 0 || _payload > 127) { + pa_log("Failed to parse SDP data: invalid payload %i.", _payload); + goto fail; + } + + port = (uint16_t) _port; + i->payload = (uint8_t) _payload; + + if (pa_rtp_sample_spec_from_payload(i->payload, &i->sample_spec)) + ss_valid = true; + } + } + } else if (pa_startswith(t, "a=rtpmap:")) { + + if (i->payload <= 127) { + char c[64]; + int _payload; + int len; + + if (sscanf(t + 9, "%i %n", &_payload, &len) == 1) { + if (_payload < 0 || _payload > 127) { + pa_log("Failed to parse SDP data: invalid payload %i.", _payload); + goto fail; + } + if (_payload == i->payload) { + strncpy(c, t + 9 + len, 63); + c[63] = 0; + c[strcspn(c, "\n")] = 0; + + if (parse_sdp_sample_spec(&i->sample_spec, c)) + ss_valid = true; + } + } + } + } + + t += l; + + if (*t == '\n') + t++; + } + + if (!i->origin || (!is_goodbye && (!i->salen || i->payload > 127 || !ss_valid || port == 0))) { + pa_log("Failed to parse SDP data: missing data."); + goto fail; + } + + if (((struct sockaddr*) &i->sa)->sa_family == AF_INET) + ((struct sockaddr_in*) &i->sa)->sin_port = htons(port); + else + ((struct sockaddr_in6*) &i->sa)->sin6_port = htons(port); + + return i; + +fail: + pa_xfree(i->origin); + pa_xfree(i->session_name); + + return NULL; +} + +void pa_sdp_info_destroy(pa_sdp_info *i) { + pa_assert(i); + + pa_xfree(i->origin); + pa_xfree(i->session_name); +} diff --git a/src/modules/rtp/sdp.h b/src/modules/rtp/sdp.h new file mode 100644 index 0000000..5e9b8fe --- /dev/null +++ b/src/modules/rtp/sdp.h @@ -0,0 +1,48 @@ +#ifndef foosdphfoo +#define foosdphfoo + +/*** + This file is part of PulseAudio. + + Copyright 2006 Lennart Poettering + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; either version 2.1 of the License, + or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. +***/ + +#include <inttypes.h> +#include <sys/socket.h> +#include <sys/types.h> + +#include <pulse/sample.h> + +#define PA_SDP_HEADER "v=0\n" + +typedef struct pa_sdp_info { + char *origin; + char *session_name; + + struct sockaddr_storage sa; + socklen_t salen; + + pa_sample_spec sample_spec; + uint8_t payload; +} pa_sdp_info; + +char *pa_sdp_build(int af, const void *src, const void *dst, const char *name, uint16_t port, uint8_t payload, const pa_sample_spec *ss); + +pa_sdp_info *pa_sdp_parse(const char *t, pa_sdp_info *info, int is_goodbye); + +void pa_sdp_info_destroy(pa_sdp_info *i); + +#endif |