/* PipeWire * * Copyright © 2020 Wim Taymans * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), * to deal in the Software without restriction, including without limitation * the rights to use, copy, modify, merge, publish, distribute, sublicense, * and/or sell copies of the Software, and to permit persons to whom the * Software is furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice (including the next * paragraph) shall be included in all copies or substantial portions of the * Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER * DEALINGS IN THE SOFTWARE. */ #include #include #include #include #include #include #include #include #include #include #include #include "client.h" #include "commands.h" #include "defs.h" #include "internal.h" #include "log.h" #include "message.h" #include "reply.h" #include "stream.h" static int parse_frac(struct pw_properties *props, const char *key, const struct spa_fraction *def, struct spa_fraction *res) { const char *str; if (props == NULL || (str = pw_properties_get(props, key)) == NULL || sscanf(str, "%u/%u", &res->num, &res->denom) != 2 || res->denom == 0) { *res = *def; } return 0; } struct stream *stream_new(struct client *client, enum stream_type type, uint32_t create_tag, const struct sample_spec *ss, const struct channel_map *map, const struct buffer_attr *attr) { int res; struct defs *defs = &client->impl->defs; const char *str; struct stream *stream = calloc(1, sizeof(*stream)); if (stream == NULL) return NULL; stream->channel = pw_map_insert_new(&client->streams, stream); if (stream->channel == SPA_ID_INVALID) goto error_errno; stream->impl = client->impl; stream->client = client; stream->type = type; stream->create_tag = create_tag; stream->ss = *ss; stream->map = *map; stream->attr = *attr; spa_ringbuffer_init(&stream->ring); stream->peer_index = SPA_ID_INVALID; parse_frac(client->props, "pulse.min.req", &defs->min_req, &stream->min_req); parse_frac(client->props, "pulse.min.frag", &defs->min_frag, &stream->min_frag); parse_frac(client->props, "pulse.min.quantum", &defs->min_quantum, &stream->min_quantum); parse_frac(client->props, "pulse.default.req", &defs->default_req, &stream->default_req); parse_frac(client->props, "pulse.default.frag", &defs->default_frag, &stream->default_frag); parse_frac(client->props, "pulse.default.tlength", &defs->default_tlength, &stream->default_tlength); stream->idle_timeout_sec = defs->idle_timeout; if ((str = pw_properties_get(client->props, "pulse.idle.timeout")) != NULL) spa_atou32(str, &stream->idle_timeout_sec, 0); switch (type) { case STREAM_TYPE_RECORD: stream->direction = PW_DIRECTION_INPUT; break; case STREAM_TYPE_PLAYBACK: case STREAM_TYPE_UPLOAD: stream->direction = PW_DIRECTION_OUTPUT; break; default: spa_assert_not_reached(); } return stream; error_errno: res = errno; free(stream); errno = res; return NULL; } void stream_free(struct stream *stream) { struct client *client = stream->client; struct impl *impl = client->impl; pw_log_debug("client %p: stream %p channel:%d", client, stream, stream->channel); if (stream->pending) spa_list_remove(&stream->link); if (stream->drain_tag) reply_error(client, -1, stream->drain_tag, -ENOENT); if (stream->killed) stream_send_killed(stream); if (stream->stream) { spa_hook_remove(&stream->stream_listener); pw_stream_disconnect(stream->stream); /* force processing of all pending messages before we destroy * the stream */ pw_loop_invoke(impl->loop, NULL, 0, NULL, 0, false, client); pw_stream_destroy(stream->stream); } if (stream->channel != SPA_ID_INVALID) pw_map_remove(&client->streams, stream->channel); pw_work_queue_cancel(impl->work_queue, stream, SPA_ID_INVALID); if (stream->buffer) free(stream->buffer); pw_properties_free(stream->props); free(stream); } void stream_flush(struct stream *stream) { pw_stream_flush(stream->stream, false); if (stream->type == STREAM_TYPE_PLAYBACK) { stream->ring.writeindex = stream->ring.readindex; stream->write_index = stream->read_index; if (stream->attr.prebuf > 0) stream->in_prebuf = true; stream->playing_for = 0; stream->underrun_for = -1; stream->is_underrun = true; stream_send_request(stream); } else { stream->ring.readindex = stream->ring.writeindex; stream->read_index = stream->write_index; } } static bool stream_prebuf_active(struct stream *stream, int32_t avail) { if (stream->in_prebuf) { if (avail >= (int32_t) stream->attr.prebuf) stream->in_prebuf = false; } else { if (stream->attr.prebuf > 0 && avail <= 0) stream->in_prebuf = true; } return stream->in_prebuf; } uint32_t stream_pop_missing(struct stream *stream) { int64_t missing, avail; avail = stream->write_index - stream->read_index; missing = stream->attr.tlength; missing -= stream->requested; missing -= avail; if (missing <= 0) return 0; if (missing < stream->attr.minreq && !stream_prebuf_active(stream, avail)) return 0; stream->requested += missing; return missing; } void stream_set_paused(struct stream *stream, bool paused, const char *reason) { if (stream->is_paused == paused) return; if (reason && stream->client) pw_log_info("%p: [%s] %s because of %s", stream, stream->client->name, paused ? "paused" : "resumed", reason); stream->is_paused = paused; pw_stream_set_active(stream->stream, !paused); } int stream_send_underflow(struct stream *stream, int64_t offset) { struct client *client = stream->client; struct impl *impl = client->impl; struct message *reply; if (ratelimit_test(&impl->rate_limit, stream->timestamp, SPA_LOG_LEVEL_INFO)) { pw_log_info("[%s]: UNDERFLOW channel:%u offset:%" PRIi64, client->name, stream->channel, offset); } reply = message_alloc(impl, -1, 0); message_put(reply, TAG_U32, COMMAND_UNDERFLOW, TAG_U32, -1, TAG_U32, stream->channel, TAG_INVALID); if (client->version >= 23) { message_put(reply, TAG_S64, offset, TAG_INVALID); } return client_queue_message(client, reply); } int stream_send_overflow(struct stream *stream) { struct client *client = stream->client; struct impl *impl = client->impl; struct message *reply; pw_log_warn("client %p [%s]: stream %p OVERFLOW channel:%u", client, client->name, stream, stream->channel); reply = message_alloc(impl, -1, 0); message_put(reply, TAG_U32, COMMAND_OVERFLOW, TAG_U32, -1, TAG_U32, stream->channel, TAG_INVALID); return client_queue_message(client, reply); } int stream_send_killed(struct stream *stream) { struct client *client = stream->client; struct impl *impl = client->impl; struct message *reply; uint32_t command; command = stream->direction == PW_DIRECTION_OUTPUT ? COMMAND_PLAYBACK_STREAM_KILLED : COMMAND_RECORD_STREAM_KILLED; pw_log_info("[%s]: %s channel:%u", client->name, commands[command].name, stream->channel); if (client->version < 23) return 0; reply = message_alloc(impl, -1, 0); message_put(reply, TAG_U32, command, TAG_U32, -1, TAG_U32, stream->channel, TAG_INVALID); return client_queue_message(client, reply); } int stream_send_started(struct stream *stream) { struct client *client = stream->client; struct impl *impl = client->impl; struct message *reply; pw_log_debug("client %p [%s]: stream %p STARTED channel:%u", client, client->name, stream, stream->channel); reply = message_alloc(impl, -1, 0); message_put(reply, TAG_U32, COMMAND_STARTED, TAG_U32, -1, TAG_U32, stream->channel, TAG_INVALID); return client_queue_message(client, reply); } int stream_send_request(struct stream *stream) { struct client *client = stream->client; struct impl *impl = client->impl; struct message *msg; uint32_t size; size = stream_pop_missing(stream); pw_log_debug("stream %p: REQUEST channel:%d %u", stream, stream->channel, size); if (size == 0) return 0; msg = message_alloc(impl, -1, 0); message_put(msg, TAG_U32, COMMAND_REQUEST, TAG_U32, -1, TAG_U32, stream->channel, TAG_U32, size, TAG_INVALID); return client_queue_message(client, msg); } int stream_update_minreq(struct stream *stream, uint32_t minreq) { struct client *client = stream->client; struct impl *impl = client->impl; uint32_t old_tlength = stream->attr.tlength; uint32_t new_tlength = minreq + 2 * stream->attr.minreq; uint64_t lat_usec; if (new_tlength <= old_tlength) return 0; if (new_tlength > MAXLENGTH) new_tlength = MAXLENGTH; stream->attr.tlength = new_tlength; if (stream->attr.tlength > stream->attr.maxlength) stream->attr.maxlength = stream->attr.tlength; if (client->version >= 15) { struct message *msg; lat_usec = minreq * SPA_USEC_PER_SEC / stream->ss.rate; msg = message_alloc(impl, -1, 0); message_put(msg, TAG_U32, COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED, TAG_U32, -1, TAG_U32, stream->channel, TAG_U32, stream->attr.maxlength, TAG_U32, stream->attr.tlength, TAG_U32, stream->attr.prebuf, TAG_U32, stream->attr.minreq, TAG_USEC, lat_usec, TAG_INVALID); return client_queue_message(client, msg); } return 0; } int stream_send_moved(struct stream *stream, uint32_t peer_index, const char *peer_name) { struct client *client = stream->client; struct impl *impl = client->impl; struct message *reply; uint32_t command; command = stream->direction == PW_DIRECTION_OUTPUT ? COMMAND_PLAYBACK_STREAM_MOVED : COMMAND_RECORD_STREAM_MOVED; pw_log_info("client %p [%s]: stream %p %s channel:%u", client, client->name, stream, commands[command].name, stream->channel); if (client->version < 12) return 0; reply = message_alloc(impl, -1, 0); message_put(reply, TAG_U32, command, TAG_U32, -1, TAG_U32, stream->channel, TAG_U32, peer_index, TAG_STRING, peer_name, TAG_BOOLEAN, false, /* suspended */ TAG_INVALID); if (client->version >= 13) { if (command == COMMAND_PLAYBACK_STREAM_MOVED) { message_put(reply, TAG_U32, stream->attr.maxlength, TAG_U32, stream->attr.tlength, TAG_U32, stream->attr.prebuf, TAG_U32, stream->attr.minreq, TAG_USEC, stream->lat_usec, TAG_INVALID); } else { message_put(reply, TAG_U32, stream->attr.maxlength, TAG_U32, stream->attr.fragsize, TAG_USEC, stream->lat_usec, TAG_INVALID); } } return client_queue_message(client, reply); }