summaryrefslogtreecommitdiffstats
path: root/src/modules/module-protocol-pulse/client.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/modules/module-protocol-pulse/client.c')
-rw-r--r--src/modules/module-protocol-pulse/client.c390
1 files changed, 390 insertions, 0 deletions
diff --git a/src/modules/module-protocol-pulse/client.c b/src/modules/module-protocol-pulse/client.c
new file mode 100644
index 0000000..1e6d202
--- /dev/null
+++ b/src/modules/module-protocol-pulse/client.c
@@ -0,0 +1,390 @@
+/* PipeWire
+ *
+ * Copyright © 2020 Wim Taymans
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice (including the next
+ * paragraph) shall be included in all copies or substantial portions of the
+ * Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <arpa/inet.h>
+#include <sys/socket.h>
+
+#include <spa/utils/defs.h>
+#include <spa/utils/hook.h>
+#include <spa/utils/list.h>
+#include <pipewire/core.h>
+#include <pipewire/log.h>
+#include <pipewire/loop.h>
+#include <pipewire/map.h>
+#include <pipewire/properties.h>
+
+#include "client.h"
+#include "commands.h"
+#include "defs.h"
+#include "internal.h"
+#include "log.h"
+#include "manager.h"
+#include "message.h"
+#include "operation.h"
+#include "pending-sample.h"
+#include "server.h"
+#include "stream.h"
+
+#define client_emit_disconnect(c) spa_hook_list_call(&(c)->listener_list, struct client_events, disconnect, 0)
+
+struct client *client_new(struct server *server)
+{
+ struct client *client = calloc(1, sizeof(*client));
+ if (client == NULL)
+ return NULL;
+
+ client->ref = 1;
+ client->server = server;
+ client->impl = server->impl;
+ client->connect_tag = SPA_ID_INVALID;
+
+ pw_map_init(&client->streams, 16, 16);
+ spa_list_init(&client->out_messages);
+ spa_list_init(&client->operations);
+ spa_list_init(&client->pending_samples);
+ spa_list_init(&client->pending_streams);
+ spa_hook_list_init(&client->listener_list);
+
+ spa_list_append(&server->clients, &client->link);
+ server->n_clients++;
+
+ return client;
+}
+
+static int client_free_stream(void *item, void *data)
+{
+ struct stream *s = item;
+
+ stream_free(s);
+ return 0;
+}
+
+/*
+ * tries to detach the client from the server,
+ * but it does not drop the server's reference
+ */
+bool client_detach(struct client *client)
+{
+ struct impl *impl = client->impl;
+ struct server *server = client->server;
+
+ if (server == NULL)
+ return false;
+
+ pw_log_debug("client %p: detaching from server %p", client, server);
+
+ /* remove from the `server->clients` list */
+ spa_list_remove(&client->link);
+ spa_list_append(&impl->cleanup_clients, &client->link);
+
+ server->n_clients--;
+ if (server->wait_clients > 0 && --server->wait_clients == 0) {
+ int mask = server->source->mask;
+ SPA_FLAG_SET(mask, SPA_IO_IN);
+ pw_loop_update_io(impl->loop, server->source, mask);
+ }
+
+ client->server = NULL;
+
+ return true;
+}
+
+void client_disconnect(struct client *client)
+{
+ struct impl *impl = client->impl;
+
+ if (client->disconnect)
+ return;
+
+ client_emit_disconnect(client);
+
+ /* the client must be detached from the server to disconnect */
+ spa_assert(client->server == NULL);
+
+ client->disconnect = true;
+
+ pw_map_for_each(&client->streams, client_free_stream, client);
+
+ if (client->source) {
+ pw_loop_destroy_source(impl->loop, client->source);
+ client->source = NULL;
+ }
+
+ if (client->manager) {
+ pw_manager_destroy(client->manager);
+ client->manager = NULL;
+ }
+}
+
+void client_free(struct client *client)
+{
+ struct impl *impl = client->impl;
+ struct pending_sample *p;
+ struct message *msg;
+ struct operation *o;
+
+ pw_log_debug("client %p: free", client);
+
+ client_detach(client);
+ client_disconnect(client);
+
+ /* remove from the `impl->cleanup_clients` list */
+ spa_list_remove(&client->link);
+
+ spa_list_consume(p, &client->pending_samples, link)
+ pending_sample_free(p);
+
+ if (client->message)
+ message_free(client->message, false, false);
+
+ spa_list_consume(msg, &client->out_messages, link)
+ message_free(msg, true, false);
+
+ spa_list_consume(o, &client->operations, link)
+ operation_free(o);
+
+ if (client->core)
+ pw_core_disconnect(client->core);
+
+ pw_map_clear(&client->streams);
+
+ pw_work_queue_cancel(impl->work_queue, client, SPA_ID_INVALID);
+
+ free(client->default_sink);
+ free(client->default_source);
+
+ free(client->temporary_default_sink);
+ free(client->temporary_default_source);
+
+ pw_properties_free(client->props);
+ pw_properties_free(client->routes);
+
+ spa_hook_list_clean(&client->listener_list);
+
+ free(client);
+}
+
+int client_queue_message(struct client *client, struct message *msg)
+{
+ struct impl *impl = client->impl;
+ int res;
+
+ if (msg == NULL)
+ return -EINVAL;
+
+ if (client->disconnect) {
+ res = -ENOTCONN;
+ goto error;
+ }
+
+ if (msg->length == 0) {
+ res = 0;
+ goto error;
+ } else if (msg->length > msg->allocated) {
+ res = -ENOMEM;
+ goto error;
+ }
+
+ msg->offset = 0;
+ spa_list_append(&client->out_messages, &msg->link);
+
+ uint32_t mask = client->source->mask;
+ if (!SPA_FLAG_IS_SET(mask, SPA_IO_OUT)) {
+ SPA_FLAG_SET(mask, SPA_IO_OUT);
+ pw_loop_update_io(impl->loop, client->source, mask);
+ }
+
+ client->new_msg_since_last_flush = true;
+
+ return 0;
+
+error:
+ message_free(msg, false, false);
+ return res;
+}
+
+static int client_try_flush_messages(struct client *client)
+{
+ pw_log_trace("client %p: flushing", client);
+
+ spa_assert(!client->disconnect);
+
+ while (!spa_list_is_empty(&client->out_messages)) {
+ struct message *m = spa_list_first(&client->out_messages, struct message, link);
+ struct descriptor desc;
+ const void *data;
+ size_t size;
+
+ if (client->out_index < sizeof(desc)) {
+ desc.length = htonl(m->length);
+ desc.channel = htonl(m->channel);
+ desc.offset_hi = 0;
+ desc.offset_lo = 0;
+ desc.flags = 0;
+
+ data = SPA_PTROFF(&desc, client->out_index, void);
+ size = sizeof(desc) - client->out_index;
+ } else if (client->out_index < m->length + sizeof(desc)) {
+ uint32_t idx = client->out_index - sizeof(desc);
+ data = m->data + idx;
+ size = m->length - idx;
+ } else {
+ if (debug_messages && m->channel == SPA_ID_INVALID)
+ message_dump(SPA_LOG_LEVEL_INFO, m);
+ message_free(m, true, false);
+ client->out_index = 0;
+ continue;
+ }
+
+ while (true) {
+ ssize_t sent = send(client->source->fd, data, size, MSG_NOSIGNAL | MSG_DONTWAIT);
+ if (sent < 0) {
+ int res = -errno;
+ if (res == -EINTR)
+ continue;
+ return res;
+ }
+ client->out_index += sent;
+ break;
+ }
+ }
+ return 0;
+}
+
+int client_flush_messages(struct client *client)
+{
+ client->new_msg_since_last_flush = false;
+
+ int res = client_try_flush_messages(client);
+ if (res >= 0) {
+ uint32_t mask = client->source->mask;
+
+ if (SPA_FLAG_IS_SET(mask, SPA_IO_OUT)) {
+ SPA_FLAG_CLEAR(mask, SPA_IO_OUT);
+ pw_loop_update_io(client->impl->loop, client->source, mask);
+ }
+ } else {
+ if (res != -EAGAIN && res != -EWOULDBLOCK)
+ return res;
+ }
+ return 0;
+}
+
+static bool drop_from_out_queue(struct client *client, struct message *m)
+{
+ spa_assert(!spa_list_is_empty(&client->out_messages));
+
+ struct message *first = spa_list_first(&client->out_messages, struct message, link);
+ if (m == first && client->out_index > 0)
+ return false;
+
+ message_free(m, true, false);
+
+ return true;
+}
+
+/* returns true if an event with the (mask, event, index) triplet should be dropped because it is redundant */
+static bool client_prune_subscribe_events(struct client *client, uint32_t event, uint32_t index)
+{
+ struct message *m, *t;
+
+ if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_NEW)
+ return false;
+
+ /* NOTE: reverse iteration */
+ spa_list_for_each_safe_reverse(m, t, &client->out_messages, link) {
+ if (m->extra[0] != COMMAND_SUBSCRIBE_EVENT)
+ continue;
+ if ((m->extra[1] ^ event) & SUBSCRIPTION_EVENT_FACILITY_MASK)
+ continue;
+ if (m->extra[2] != index)
+ continue;
+
+ if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_REMOVE) {
+ /* This object is being removed, hence there is
+ * point in keeping the old events regarding
+ * entry in the queue. */
+
+ bool is_new = (m->extra[1] & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_NEW;
+
+ if (drop_from_out_queue(client, m)) {
+ pw_log_debug("client %p: dropped redundant event due to remove event for object %u",
+ client, index);
+
+ /* if the NEW event for the current object could successfully be dropped,
+ there is no need to deliver the REMOVE event */
+ if (is_new)
+ goto drop;
+ }
+
+ /* stop if the NEW event for the current object is reached */
+ if (is_new)
+ break;
+ }
+
+ if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_CHANGE) {
+ /* This object has changed. If a "new" or "change" event for
+ * this object is still in the queue we can exit. */
+ goto drop;
+ }
+ }
+
+ return false;
+
+drop:
+ pw_log_debug("client %p: dropped redundant event for object %u", client, index);
+
+ return true;
+}
+
+int client_queue_subscribe_event(struct client *client, uint32_t mask, uint32_t event, uint32_t index)
+{
+ if (client->disconnect)
+ return -ENOTCONN;
+
+ if (!(client->subscribed & mask))
+ return 0;
+
+ pw_log_debug("client %p: SUBSCRIBE event:%08x index:%u", client, event, index);
+
+ if (client_prune_subscribe_events(client, event, index))
+ return 0;
+
+ struct message *reply = message_alloc(client->impl, -1, 0);
+ reply->extra[0] = COMMAND_SUBSCRIBE_EVENT;
+ reply->extra[1] = event;
+ reply->extra[2] = index;
+
+ message_put(reply,
+ TAG_U32, COMMAND_SUBSCRIBE_EVENT,
+ TAG_U32, -1,
+ TAG_U32, event,
+ TAG_U32, index,
+ TAG_INVALID);
+
+ return client_queue_message(client, reply);
+}