summaryrefslogtreecommitdiffstats
path: root/src/modules/module-protocol-pulse/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/modules/module-protocol-pulse/stream.c')
-rw-r--r--src/modules/module-protocol-pulse/stream.c428
1 files changed, 428 insertions, 0 deletions
diff --git a/src/modules/module-protocol-pulse/stream.c b/src/modules/module-protocol-pulse/stream.c
new file mode 100644
index 0000000..59fb8a3
--- /dev/null
+++ b/src/modules/module-protocol-pulse/stream.c
@@ -0,0 +1,428 @@
+/* PipeWire
+ *
+ * Copyright © 2020 Wim Taymans
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice (including the next
+ * paragraph) shall be included in all copies or substantial portions of the
+ * Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#include <spa/utils/hook.h>
+#include <spa/utils/ringbuffer.h>
+#include <pipewire/log.h>
+#include <pipewire/loop.h>
+#include <pipewire/map.h>
+#include <pipewire/properties.h>
+#include <pipewire/stream.h>
+#include <pipewire/work-queue.h>
+
+#include "client.h"
+#include "commands.h"
+#include "defs.h"
+#include "internal.h"
+#include "log.h"
+#include "message.h"
+#include "reply.h"
+#include "stream.h"
+
+static int parse_frac(struct pw_properties *props, const char *key,
+ const struct spa_fraction *def, struct spa_fraction *res)
+{
+ const char *str;
+ if (props == NULL ||
+ (str = pw_properties_get(props, key)) == NULL ||
+ sscanf(str, "%u/%u", &res->num, &res->denom) != 2 ||
+ res->denom == 0) {
+ *res = *def;
+ }
+ return 0;
+}
+
+struct stream *stream_new(struct client *client, enum stream_type type, uint32_t create_tag,
+ const struct sample_spec *ss, const struct channel_map *map,
+ const struct buffer_attr *attr)
+{
+ int res;
+ struct defs *defs = &client->impl->defs;
+ const char *str;
+
+ struct stream *stream = calloc(1, sizeof(*stream));
+ if (stream == NULL)
+ return NULL;
+
+ stream->channel = pw_map_insert_new(&client->streams, stream);
+ if (stream->channel == SPA_ID_INVALID)
+ goto error_errno;
+
+ stream->impl = client->impl;
+ stream->client = client;
+ stream->type = type;
+ stream->create_tag = create_tag;
+ stream->ss = *ss;
+ stream->map = *map;
+ stream->attr = *attr;
+ spa_ringbuffer_init(&stream->ring);
+
+ stream->peer_index = SPA_ID_INVALID;
+
+ parse_frac(client->props, "pulse.min.req", &defs->min_req, &stream->min_req);
+ parse_frac(client->props, "pulse.min.frag", &defs->min_frag, &stream->min_frag);
+ parse_frac(client->props, "pulse.min.quantum", &defs->min_quantum, &stream->min_quantum);
+ parse_frac(client->props, "pulse.default.req", &defs->default_req, &stream->default_req);
+ parse_frac(client->props, "pulse.default.frag", &defs->default_frag, &stream->default_frag);
+ parse_frac(client->props, "pulse.default.tlength", &defs->default_tlength, &stream->default_tlength);
+ stream->idle_timeout_sec = defs->idle_timeout;
+ if ((str = pw_properties_get(client->props, "pulse.idle.timeout")) != NULL)
+ spa_atou32(str, &stream->idle_timeout_sec, 0);
+
+ switch (type) {
+ case STREAM_TYPE_RECORD:
+ stream->direction = PW_DIRECTION_INPUT;
+ break;
+ case STREAM_TYPE_PLAYBACK:
+ case STREAM_TYPE_UPLOAD:
+ stream->direction = PW_DIRECTION_OUTPUT;
+ break;
+ default:
+ spa_assert_not_reached();
+ }
+
+ return stream;
+
+error_errno:
+ res = errno;
+ free(stream);
+ errno = res;
+
+ return NULL;
+}
+
+void stream_free(struct stream *stream)
+{
+ struct client *client = stream->client;
+ struct impl *impl = client->impl;
+
+ pw_log_debug("client %p: stream %p channel:%d", client, stream, stream->channel);
+
+ if (stream->pending)
+ spa_list_remove(&stream->link);
+
+ if (stream->drain_tag)
+ reply_error(client, -1, stream->drain_tag, -ENOENT);
+
+ if (stream->killed)
+ stream_send_killed(stream);
+
+ if (stream->stream) {
+ spa_hook_remove(&stream->stream_listener);
+ pw_stream_disconnect(stream->stream);
+
+ /* force processing of all pending messages before we destroy
+ * the stream */
+ pw_loop_invoke(impl->loop, NULL, 0, NULL, 0, false, client);
+
+ pw_stream_destroy(stream->stream);
+ }
+ if (stream->channel != SPA_ID_INVALID)
+ pw_map_remove(&client->streams, stream->channel);
+
+ pw_work_queue_cancel(impl->work_queue, stream, SPA_ID_INVALID);
+
+ if (stream->buffer)
+ free(stream->buffer);
+
+ pw_properties_free(stream->props);
+
+ free(stream);
+}
+
+void stream_flush(struct stream *stream)
+{
+ pw_stream_flush(stream->stream, false);
+
+ if (stream->type == STREAM_TYPE_PLAYBACK) {
+ stream->ring.writeindex = stream->ring.readindex;
+ stream->write_index = stream->read_index;
+
+ if (stream->attr.prebuf > 0)
+ stream->in_prebuf = true;
+
+ stream->playing_for = 0;
+ stream->underrun_for = -1;
+ stream->is_underrun = true;
+
+ stream_send_request(stream);
+ } else {
+ stream->ring.readindex = stream->ring.writeindex;
+ stream->read_index = stream->write_index;
+ }
+}
+
+static bool stream_prebuf_active(struct stream *stream, int32_t avail)
+{
+ if (stream->in_prebuf) {
+ if (avail >= (int32_t) stream->attr.prebuf)
+ stream->in_prebuf = false;
+ } else {
+ if (stream->attr.prebuf > 0 && avail <= 0)
+ stream->in_prebuf = true;
+ }
+ return stream->in_prebuf;
+}
+
+uint32_t stream_pop_missing(struct stream *stream)
+{
+ int64_t missing, avail;
+
+ avail = stream->write_index - stream->read_index;
+
+ missing = stream->attr.tlength;
+ missing -= stream->requested;
+ missing -= avail;
+
+ if (missing <= 0)
+ return 0;
+
+ if (missing < stream->attr.minreq && !stream_prebuf_active(stream, avail))
+ return 0;
+
+ stream->requested += missing;
+
+ return missing;
+}
+
+void stream_set_paused(struct stream *stream, bool paused, const char *reason)
+{
+ if (stream->is_paused == paused)
+ return;
+
+ if (reason && stream->client)
+ pw_log_info("%p: [%s] %s because of %s",
+ stream, stream->client->name,
+ paused ? "paused" : "resumed", reason);
+
+ stream->is_paused = paused;
+ pw_stream_set_active(stream->stream, !paused);
+}
+
+int stream_send_underflow(struct stream *stream, int64_t offset)
+{
+ struct client *client = stream->client;
+ struct impl *impl = client->impl;
+ struct message *reply;
+
+ if (ratelimit_test(&impl->rate_limit, stream->timestamp, SPA_LOG_LEVEL_INFO)) {
+ pw_log_info("[%s]: UNDERFLOW channel:%u offset:%" PRIi64,
+ client->name, stream->channel, offset);
+ }
+
+ reply = message_alloc(impl, -1, 0);
+ message_put(reply,
+ TAG_U32, COMMAND_UNDERFLOW,
+ TAG_U32, -1,
+ TAG_U32, stream->channel,
+ TAG_INVALID);
+
+ if (client->version >= 23) {
+ message_put(reply,
+ TAG_S64, offset,
+ TAG_INVALID);
+ }
+
+ return client_queue_message(client, reply);
+}
+
+int stream_send_overflow(struct stream *stream)
+{
+ struct client *client = stream->client;
+ struct impl *impl = client->impl;
+ struct message *reply;
+
+ pw_log_warn("client %p [%s]: stream %p OVERFLOW channel:%u",
+ client, client->name, stream, stream->channel);
+
+ reply = message_alloc(impl, -1, 0);
+ message_put(reply,
+ TAG_U32, COMMAND_OVERFLOW,
+ TAG_U32, -1,
+ TAG_U32, stream->channel,
+ TAG_INVALID);
+
+ return client_queue_message(client, reply);
+}
+
+int stream_send_killed(struct stream *stream)
+{
+ struct client *client = stream->client;
+ struct impl *impl = client->impl;
+ struct message *reply;
+ uint32_t command;
+
+ command = stream->direction == PW_DIRECTION_OUTPUT ?
+ COMMAND_PLAYBACK_STREAM_KILLED :
+ COMMAND_RECORD_STREAM_KILLED;
+
+ pw_log_info("[%s]: %s channel:%u",
+ client->name, commands[command].name, stream->channel);
+
+ if (client->version < 23)
+ return 0;
+
+ reply = message_alloc(impl, -1, 0);
+ message_put(reply,
+ TAG_U32, command,
+ TAG_U32, -1,
+ TAG_U32, stream->channel,
+ TAG_INVALID);
+
+ return client_queue_message(client, reply);
+}
+
+int stream_send_started(struct stream *stream)
+{
+ struct client *client = stream->client;
+ struct impl *impl = client->impl;
+ struct message *reply;
+
+ pw_log_debug("client %p [%s]: stream %p STARTED channel:%u",
+ client, client->name, stream, stream->channel);
+
+ reply = message_alloc(impl, -1, 0);
+ message_put(reply,
+ TAG_U32, COMMAND_STARTED,
+ TAG_U32, -1,
+ TAG_U32, stream->channel,
+ TAG_INVALID);
+
+ return client_queue_message(client, reply);
+}
+
+int stream_send_request(struct stream *stream)
+{
+ struct client *client = stream->client;
+ struct impl *impl = client->impl;
+ struct message *msg;
+ uint32_t size;
+
+ size = stream_pop_missing(stream);
+ pw_log_debug("stream %p: REQUEST channel:%d %u", stream, stream->channel, size);
+
+ if (size == 0)
+ return 0;
+
+ msg = message_alloc(impl, -1, 0);
+ message_put(msg,
+ TAG_U32, COMMAND_REQUEST,
+ TAG_U32, -1,
+ TAG_U32, stream->channel,
+ TAG_U32, size,
+ TAG_INVALID);
+
+ return client_queue_message(client, msg);
+}
+
+int stream_update_minreq(struct stream *stream, uint32_t minreq)
+{
+ struct client *client = stream->client;
+ struct impl *impl = client->impl;
+ uint32_t old_tlength = stream->attr.tlength;
+ uint32_t new_tlength = minreq + 2 * stream->attr.minreq;
+ uint64_t lat_usec;
+
+ if (new_tlength <= old_tlength)
+ return 0;
+
+ if (new_tlength > MAXLENGTH)
+ new_tlength = MAXLENGTH;
+
+ stream->attr.tlength = new_tlength;
+ if (stream->attr.tlength > stream->attr.maxlength)
+ stream->attr.maxlength = stream->attr.tlength;
+
+ if (client->version >= 15) {
+ struct message *msg;
+
+ lat_usec = minreq * SPA_USEC_PER_SEC / stream->ss.rate;
+
+ msg = message_alloc(impl, -1, 0);
+ message_put(msg,
+ TAG_U32, COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED,
+ TAG_U32, -1,
+ TAG_U32, stream->channel,
+ TAG_U32, stream->attr.maxlength,
+ TAG_U32, stream->attr.tlength,
+ TAG_U32, stream->attr.prebuf,
+ TAG_U32, stream->attr.minreq,
+ TAG_USEC, lat_usec,
+ TAG_INVALID);
+ return client_queue_message(client, msg);
+ }
+ return 0;
+}
+
+int stream_send_moved(struct stream *stream, uint32_t peer_index, const char *peer_name)
+{
+ struct client *client = stream->client;
+ struct impl *impl = client->impl;
+ struct message *reply;
+ uint32_t command;
+
+ command = stream->direction == PW_DIRECTION_OUTPUT ?
+ COMMAND_PLAYBACK_STREAM_MOVED :
+ COMMAND_RECORD_STREAM_MOVED;
+
+ pw_log_info("client %p [%s]: stream %p %s channel:%u",
+ client, client->name, stream, commands[command].name,
+ stream->channel);
+
+ if (client->version < 12)
+ return 0;
+
+ reply = message_alloc(impl, -1, 0);
+ message_put(reply,
+ TAG_U32, command,
+ TAG_U32, -1,
+ TAG_U32, stream->channel,
+ TAG_U32, peer_index,
+ TAG_STRING, peer_name,
+ TAG_BOOLEAN, false, /* suspended */
+ TAG_INVALID);
+
+ if (client->version >= 13) {
+ if (command == COMMAND_PLAYBACK_STREAM_MOVED) {
+ message_put(reply,
+ TAG_U32, stream->attr.maxlength,
+ TAG_U32, stream->attr.tlength,
+ TAG_U32, stream->attr.prebuf,
+ TAG_U32, stream->attr.minreq,
+ TAG_USEC, stream->lat_usec,
+ TAG_INVALID);
+ } else {
+ message_put(reply,
+ TAG_U32, stream->attr.maxlength,
+ TAG_U32, stream->attr.fragsize,
+ TAG_USEC, stream->lat_usec,
+ TAG_INVALID);
+ }
+ }
+ return client_queue_message(client, reply);
+}