diff options
Diffstat (limited to '')
-rw-r--r-- | src/modules/module-client-node/v0/client-node.c | 1447 | ||||
-rw-r--r-- | src/modules/module-client-node/v0/client-node.h | 101 | ||||
-rw-r--r-- | src/modules/module-client-node/v0/ext-client-node.h | 414 | ||||
-rw-r--r-- | src/modules/module-client-node/v0/protocol-native.c | 534 | ||||
-rw-r--r-- | src/modules/module-client-node/v0/transport.c | 262 | ||||
-rw-r--r-- | src/modules/module-client-node/v0/transport.h | 59 |
6 files changed, 2817 insertions, 0 deletions
diff --git a/src/modules/module-client-node/v0/client-node.c b/src/modules/module-client-node/v0/client-node.c new file mode 100644 index 0000000..e70a7c2 --- /dev/null +++ b/src/modules/module-client-node/v0/client-node.c @@ -0,0 +1,1447 @@ +/* PipeWire + * + * Copyright © 2015 Wim Taymans <wim.taymans@gmail.com> + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include <string.h> +#include <stddef.h> +#include <stdio.h> +#include <errno.h> +#include <unistd.h> +#include <fcntl.h> +#include <dlfcn.h> +#include <sys/socket.h> +#include <sys/mman.h> + +#include <spa/node/node.h> +#include <spa/node/utils.h> +#include <spa/node/io.h> +#include <spa/pod/filter.h> +#include <spa/utils/keys.h> + +#define PW_ENABLE_DEPRECATED + +#include "pipewire/pipewire.h" +#include "pipewire/private.h" + +#include "pipewire/context.h" +#include "modules/spa/spa-node.h" +#include "client-node.h" +#include "transport.h" + +PW_LOG_TOPIC_EXTERN(mod_topic); +#define PW_LOG_TOPIC_DEFAULT mod_topic + +/** \cond */ + +#define MAX_INPUTS 64 +#define MAX_OUTPUTS 64 + +#define MAX_BUFFERS 64 + +#define CHECK_IN_PORT_ID(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) < MAX_INPUTS) +#define CHECK_OUT_PORT_ID(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) < MAX_OUTPUTS) +#define CHECK_PORT_ID(this,d,p) (CHECK_IN_PORT_ID(this,d,p) || CHECK_OUT_PORT_ID(this,d,p)) +#define CHECK_FREE_IN_PORT(this,d,p) (CHECK_IN_PORT_ID(this,d,p) && !(this)->in_ports[p].valid) +#define CHECK_FREE_OUT_PORT(this,d,p) (CHECK_OUT_PORT_ID(this,d,p) && !(this)->out_ports[p].valid) +#define CHECK_FREE_PORT(this,d,p) (CHECK_FREE_IN_PORT (this,d,p) || CHECK_FREE_OUT_PORT (this,d,p)) +#define CHECK_IN_PORT(this,d,p) (CHECK_IN_PORT_ID(this,d,p) && (this)->in_ports[p].valid) +#define CHECK_OUT_PORT(this,d,p) (CHECK_OUT_PORT_ID(this,d,p) && (this)->out_ports[p].valid) +#define CHECK_PORT(this,d,p) (CHECK_IN_PORT (this,d,p) || CHECK_OUT_PORT (this,d,p)) + +#define GET_IN_PORT(this,p) (&this->in_ports[p]) +#define GET_OUT_PORT(this,p) (&this->out_ports[p]) +#define GET_PORT(this,d,p) (d == SPA_DIRECTION_INPUT ? GET_IN_PORT(this,p) : GET_OUT_PORT(this,p)) + +#define CHECK_PORT_BUFFER(this,b,p) (b < p->n_buffers) + +extern uint32_t pw_protocol_native0_type_from_v2(struct pw_impl_client *client, uint32_t type); +extern uint32_t pw_protocol_native0_name_to_v2(struct pw_impl_client *client, const char *name); + +struct mem { + uint32_t id; + int ref; + int fd; + uint32_t type; + uint32_t flags; +}; + +struct buffer { + struct spa_buffer *outbuf; + struct spa_buffer buffer; + struct spa_meta metas[4]; + struct spa_data datas[4]; + bool outstanding; + uint32_t memid; +}; + +struct port { + uint32_t id; + enum spa_direction direction; + + bool valid; + struct spa_port_info info; + struct pw_properties *properties; + + bool have_format; + uint32_t n_params; + struct spa_pod **params; + struct spa_io_buffers *io; + + uint32_t n_buffers; + struct buffer buffers[MAX_BUFFERS]; +}; + +struct node { + struct spa_node node; + + struct impl *impl; + + struct spa_log *log; + struct spa_loop *data_loop; + struct spa_system *data_system; + + struct spa_hook_list hooks; + struct spa_callbacks callbacks; + + struct spa_io_position *position; + + struct pw_resource *resource; + + struct spa_source data_source; + int writefd; + + struct spa_node_info info; + + uint32_t n_inputs; + uint32_t n_outputs; + struct port in_ports[MAX_INPUTS]; + struct port out_ports[MAX_OUTPUTS]; + + uint32_t n_params; + struct spa_pod **params; + + uint32_t seq; + uint32_t init_pending; +}; + +struct impl { + struct pw_impl_client_node0 this; + + bool client_reuse; + + struct pw_context *context; + + struct node node; + + struct pw_client_node0_transport *transport; + + struct spa_hook node_listener; + struct spa_hook resource_listener; + struct spa_hook object_listener; + + struct pw_array mems; + + int fds[2]; + int other_fds[2]; + + uint32_t input_ready; + bool out_pending; +}; + +/** \endcond */ + +static struct mem *ensure_mem(struct impl *impl, int fd, uint32_t type, uint32_t flags) +{ + struct mem *m, *f = NULL; + + pw_array_for_each(m, &impl->mems) { + if (m->ref <= 0) + f = m; + else if (m->fd == fd) + goto found; + } + + if (f == NULL) { + m = pw_array_add(&impl->mems, sizeof(struct mem)); + m->id = pw_array_get_len(&impl->mems, struct mem) - 1; + m->ref = 0; + } + else { + m = f; + } + m->fd = fd; + m->type = type; + m->flags = flags; + + pw_client_node0_resource_add_mem(impl->node.resource, + m->id, + type, + m->fd, + m->flags); + found: + m->ref++; + return m; +} + + +static int clear_buffers(struct node *this, struct port *port) +{ + uint32_t i, j; + struct impl *impl = this->impl; + + for (i = 0; i < port->n_buffers; i++) { + struct buffer *b = &port->buffers[i]; + struct mem *m; + + spa_log_debug(this->log, "node %p: clear buffer %d", this, i); + + for (j = 0; j < b->buffer.n_datas; j++) { + struct spa_data *d = &b->datas[j]; + + if (d->type == SPA_DATA_DmaBuf || + d->type == SPA_DATA_MemFd) { + uint32_t id; + + id = SPA_PTR_TO_UINT32(b->buffer.datas[j].data); + m = pw_array_get_unchecked(&impl->mems, id, struct mem); + m->ref--; + } + } + m = pw_array_get_unchecked(&impl->mems, b->memid, struct mem); + m->ref--; + } + port->n_buffers = 0; + return 0; +} + +static void emit_port_info(struct node *this, struct port *port) +{ + spa_node_emit_port_info(&this->hooks, + port->direction, port->id, &port->info); +} + +static int impl_node_add_listener(void *object, + struct spa_hook *listener, + const struct spa_node_events *events, + void *data) +{ + struct node *this = object; + struct spa_hook_list save; + uint32_t i; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + spa_hook_list_isolate(&this->hooks, &save, listener, events, data); + + for (i = 0; i < MAX_INPUTS; i++) { + if (this->in_ports[i].valid) + emit_port_info(this, &this->in_ports[i]); + } + for (i = 0; i < MAX_OUTPUTS; i++) { + if (this->out_ports[i].valid) + emit_port_info(this, &this->out_ports[i]); + } + spa_hook_list_join(&this->hooks, &save); + + return 0; +} + +static int impl_node_enum_params(void *object, int seq, + uint32_t id, uint32_t start, uint32_t num, + const struct spa_pod *filter) +{ + struct node *this = object; + uint8_t buffer[1024]; + struct spa_pod_builder b = { 0 }; + struct spa_result_node_params result; + uint32_t count = 0; + bool found = false; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(num != 0, -EINVAL); + + result.id = id; + result.next = 0; + + while (true) { + struct spa_pod *param; + + result.index = result.next++; + if (result.index >= this->n_params) + break; + + param = this->params[result.index]; + + if (param == NULL || !spa_pod_is_object_id(param, id)) + continue; + + found = true; + + if (result.index < start) + continue; + + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + if (spa_pod_filter(&b, &result.param, param, filter) != 0) + continue; + + pw_log_debug("%p: %d param %u", this, seq, result.index); + spa_node_emit_result(&this->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result); + + if (++count == num) + break; + } + return found ? 0 : -ENOENT; +} + +static int impl_node_set_param(void *object, uint32_t id, uint32_t flags, + const struct spa_pod *param) +{ + struct node *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + if (this->resource == NULL) + return -EIO; + + pw_client_node0_resource_set_param(this->resource, this->seq, id, flags, param); + + return SPA_RESULT_RETURN_ASYNC(this->seq++); +} + +static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size) +{ + struct node *this = object; + int res = 0; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + switch(id) { + case SPA_IO_Position: + this->position = data; + break; + default: + res = -ENOTSUP; + break; + } + return res; +} + +static inline void do_flush(struct node *this) +{ + if (spa_system_eventfd_write(this->data_system, this->writefd, 1) < 0) + spa_log_warn(this->log, "node %p: error flushing : %s", this, strerror(errno)); + +} + +static int send_clock_update(struct node *this) +{ + struct pw_impl_client *client = this->resource->client; + uint32_t type = pw_protocol_native0_name_to_v2(client, SPA_TYPE_INFO_NODE_COMMAND_BASE "ClockUpdate"); + struct timespec ts; + int64_t now; + + clock_gettime(CLOCK_MONOTONIC, &ts); + now = SPA_TIMESPEC_TO_NSEC(&ts); + pw_log_trace("%p: now %"PRIi64, this, now); + + struct spa_command_node0_clock_update cu = + SPA_COMMAND_NODE0_CLOCK_UPDATE_INIT(type, + SPA_COMMAND_NODE0_CLOCK_UPDATE_TIME | + SPA_COMMAND_NODE0_CLOCK_UPDATE_SCALE | + SPA_COMMAND_NODE0_CLOCK_UPDATE_STATE | + SPA_COMMAND_NODE0_CLOCK_UPDATE_LATENCY, /* change_mask */ + SPA_USEC_PER_SEC, /* rate */ + now / SPA_NSEC_PER_USEC, /* ticks */ + now, /* monotonic_time */ + 0, /* offset */ + (1 << 16) | 1, /* scale */ + SPA_CLOCK0_STATE_RUNNING, /* state */ + SPA_COMMAND_NODE0_CLOCK_UPDATE_FLAG_LIVE, /* flags */ + 0); /* latency */ + + pw_client_node0_resource_command(this->resource, this->seq, (const struct spa_command*)&cu); + return SPA_RESULT_RETURN_ASYNC(this->seq++); +} + +static int impl_node_send_command(void *object, const struct spa_command *command) +{ + struct node *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(command != NULL, -EINVAL); + + if (this->resource == NULL) + return -EIO; + + if (SPA_NODE_COMMAND_ID(command) == SPA_NODE_COMMAND_Start) { + send_clock_update(this); + } + + pw_client_node0_resource_command(this->resource, this->seq, command); + return SPA_RESULT_RETURN_ASYNC(this->seq++); +} + +static int +impl_node_set_callbacks(void *object, + const struct spa_node_callbacks *callbacks, + void *data) +{ + struct node *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + this->callbacks = SPA_CALLBACKS_INIT(callbacks, data); + + return 0; +} + +static int +impl_node_sync(void *object, int seq) +{ + struct node *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + pw_log_debug("%p: sync %p", this, this->resource); + + if (this->resource == NULL) + return -EIO; + + this->init_pending = SPA_RESULT_RETURN_ASYNC(this->seq++); + + return this->init_pending; +} + + +extern struct spa_pod *pw_protocol_native0_pod_from_v2(struct pw_impl_client *client, const struct spa_pod *pod); +extern int pw_protocol_native0_pod_to_v2(struct pw_impl_client *client, const struct spa_pod *pod, + struct spa_pod_builder *b); + +static void +do_update_port(struct node *this, + enum spa_direction direction, + uint32_t port_id, + uint32_t change_mask, + uint32_t n_params, + const struct spa_pod **params, + const struct spa_port_info *info) +{ + struct port *port; + + port = GET_PORT(this, direction, port_id); + + if (!port->valid) { + spa_log_debug(this->log, "node %p: adding port %d, direction %d", + this, port_id, direction); + port->id = port_id; + port->direction = direction; + port->have_format = false; + port->valid = true; + + if (direction == SPA_DIRECTION_INPUT) + this->n_inputs++; + else + this->n_outputs++; + } + + if (change_mask & PW_CLIENT_NODE0_PORT_UPDATE_PARAMS) { + uint32_t i; + + port->have_format = false; + + spa_log_debug(this->log, "node %p: port %u update %d params", this, port_id, n_params); + for (i = 0; i < port->n_params; i++) + free(port->params[i]); + port->n_params = n_params; + if (port->n_params == 0) { + free(port->params); + port->params = NULL; + } else { + void *p; + p = pw_reallocarray(port->params, port->n_params, sizeof(struct spa_pod *)); + if (p == NULL) { + pw_log_error("%p: port %u can't realloc: %m", this, port_id); + free(port->params); + port->n_params = 0; + } + port->params = p; + } + for (i = 0; i < port->n_params; i++) { + port->params[i] = params[i] ? + pw_protocol_native0_pod_from_v2(this->resource->client, params[i]) : NULL; + + if (port->params[i] && spa_pod_is_object_id(port->params[i], SPA_PARAM_Format)) + port->have_format = true; + } + } + + if (change_mask & PW_CLIENT_NODE0_PORT_UPDATE_INFO) { + pw_properties_free(port->properties); + port->properties = NULL; + port->info.props = NULL; + port->info.n_params = 0; + port->info.params = NULL; + + if (info) { + port->info = *info; + if (info->props) { + port->properties = pw_properties_new_dict(info->props); + port->info.props = &port->properties->dict; + } + } + spa_node_emit_port_info(&this->hooks, direction, port_id, info); + } +} + +static void +clear_port(struct node *this, + struct port *port, enum spa_direction direction, uint32_t port_id) +{ + do_update_port(this, + direction, + port_id, + PW_CLIENT_NODE0_PORT_UPDATE_PARAMS | + PW_CLIENT_NODE0_PORT_UPDATE_INFO, 0, NULL, NULL); + clear_buffers(this, port); +} + +static void do_uninit_port(struct node *this, enum spa_direction direction, uint32_t port_id) +{ + struct port *port; + + spa_log_debug(this->log, "node %p: removing port %d", this, port_id); + + if (direction == SPA_DIRECTION_INPUT) { + port = GET_IN_PORT(this, port_id); + this->n_inputs--; + } else { + port = GET_OUT_PORT(this, port_id); + this->n_outputs--; + } + clear_port(this, port, direction, port_id); + port->valid = false; + spa_node_emit_port_info(&this->hooks, direction, port_id, NULL); +} + +static int +impl_node_add_port(void *object, enum spa_direction direction, uint32_t port_id, + const struct spa_dict *props) +{ + struct node *this = object; + struct port *port; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(CHECK_FREE_PORT(this, direction, port_id), -EINVAL); + + port = GET_PORT(this, direction, port_id); + clear_port(this, port, direction, port_id); + + return 0; +} + +static int +impl_node_remove_port(void *object, enum spa_direction direction, uint32_t port_id) +{ + struct node *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); + + do_uninit_port(this, direction, port_id); + + return 0; +} + +static int +impl_node_port_enum_params(void *object, int seq, + enum spa_direction direction, uint32_t port_id, + uint32_t id, uint32_t start, uint32_t num, + const struct spa_pod *filter) +{ + struct node *this = object; + struct port *port; + uint8_t buffer[1024]; + struct spa_pod_builder b = { 0 }; + struct spa_result_node_params result; + uint32_t count = 0; + bool found = false; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(num != 0, -EINVAL); + spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); + + port = GET_PORT(this, direction, port_id); + + pw_log_debug("%p: %d port %d.%d %u %u %u", this, seq, + direction, port_id, id, start, num); + + result.id = id; + result.next = 0; + + while (true) { + struct spa_pod *param; + + result.index = result.next++; + if (result.index >= port->n_params) + break; + + param = port->params[result.index]; + + if (param == NULL || !spa_pod_is_object_id(param, id)) + continue; + + found = true; + + if (result.index < start) + continue; + + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + if (spa_pod_filter(&b, &result.param, param, filter) < 0) + continue; + + pw_log_debug("%p: %d param %u", this, seq, result.index); + spa_node_emit_result(&this->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result); + + if (++count == num) + break; + } + return found ? 0 : -ENOENT; +} + +static int +impl_node_port_set_param(void *object, + enum spa_direction direction, uint32_t port_id, + uint32_t id, uint32_t flags, + const struct spa_pod *param) +{ + struct node *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); + + if (this->resource == NULL) + return -EIO; + + pw_client_node0_resource_port_set_param(this->resource, + this->seq, + direction, port_id, + id, flags, + param); + return SPA_RESULT_RETURN_ASYNC(this->seq++); +} + +static int +impl_node_port_set_io(void *object, + enum spa_direction direction, + uint32_t port_id, + uint32_t id, + void *data, size_t size) +{ + struct node *this = object; + struct impl *impl; + struct pw_memblock *mem; + struct mem *m; + uint32_t memid, mem_offset, mem_size; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); + + impl = this->impl; + + spa_log_debug(this->log, "node %p: port %d.%d set io %d %p", this, + direction, port_id, id, data); + + if (id == SPA_IO_Buffers) { + struct port *port = GET_PORT(this, direction, port_id); + port->io = data; + } + + if (this->resource == NULL) + return -EIO; + + + if (data) { + if ((mem = pw_mempool_find_ptr(impl->context->pool, data)) == NULL) + return -EINVAL; + + mem_offset = SPA_PTRDIFF(data, mem->map->ptr); + mem_size = mem->size; + if (mem_size - mem_offset < size) + return -EINVAL; + + mem_offset += mem->map->offset; + m = ensure_mem(impl, mem->fd, SPA_DATA_MemFd, mem->flags); + memid = m->id; + } + else { + memid = SPA_ID_INVALID; + mem_offset = mem_size = 0; + } + + pw_client_node0_resource_port_set_io(this->resource, + this->seq, + direction, port_id, + id, + memid, + mem_offset, mem_size); + return SPA_RESULT_RETURN_ASYNC(this->seq++); +} + +static int +impl_node_port_use_buffers(void *object, + enum spa_direction direction, + uint32_t port_id, + uint32_t flags, + struct spa_buffer **buffers, + uint32_t n_buffers) +{ + struct node *this = object; + struct impl *impl; + struct port *port; + uint32_t i, j; + struct pw_client_node0_buffer *mb; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); + + impl = this->impl; + spa_log_debug(this->log, "node %p: use buffers %p %u", this, buffers, n_buffers); + + port = GET_PORT(this, direction, port_id); + + if (!port->have_format) + return -EIO; + + clear_buffers(this, port); + + if (n_buffers > 0) { + mb = alloca(n_buffers * sizeof(struct pw_client_node0_buffer)); + } else { + mb = NULL; + } + + port->n_buffers = n_buffers; + + if (this->resource == NULL) + return -EIO; + + for (i = 0; i < n_buffers; i++) { + struct buffer *b = &port->buffers[i]; + struct pw_memblock *mem; + struct mem *m; + size_t data_size; + void *baseptr; + + b->outbuf = buffers[i]; + memcpy(&b->buffer, buffers[i], sizeof(struct spa_buffer)); + b->buffer.datas = b->datas; + b->buffer.metas = b->metas; + + if (buffers[i]->n_metas > 0) + baseptr = buffers[i]->metas[0].data; + else if (buffers[i]->n_datas > 0) + baseptr = buffers[i]->datas[0].chunk; + else + return -EINVAL; + + if ((mem = pw_mempool_find_ptr(impl->context->pool, baseptr)) == NULL) + return -EINVAL; + + data_size = 0; + for (j = 0; j < buffers[i]->n_metas; j++) { + data_size += buffers[i]->metas[j].size; + } + for (j = 0; j < buffers[i]->n_datas; j++) { + struct spa_data *d = buffers[i]->datas; + data_size += sizeof(struct spa_chunk); + if (d->type == SPA_DATA_MemPtr) + data_size += d->maxsize; + } + + m = ensure_mem(impl, mem->fd, SPA_DATA_MemFd, mem->flags); + b->memid = m->id; + + mb[i].buffer = &b->buffer; + mb[i].mem_id = b->memid; + mb[i].offset = SPA_PTRDIFF(baseptr, SPA_PTROFF(mem->map->ptr, mem->map->offset, void)); + mb[i].size = data_size; + + for (j = 0; j < buffers[i]->n_metas; j++) + memcpy(&b->buffer.metas[j], &buffers[i]->metas[j], sizeof(struct spa_meta)); + b->buffer.n_metas = j; + + for (j = 0; j < buffers[i]->n_datas; j++) { + struct spa_data *d = &buffers[i]->datas[j]; + + memcpy(&b->buffer.datas[j], d, sizeof(struct spa_data)); + + if (d->type == SPA_DATA_DmaBuf || + d->type == SPA_DATA_MemFd) { + m = ensure_mem(impl, d->fd, d->type, d->flags); + b->buffer.datas[j].data = SPA_UINT32_TO_PTR(m->id); + } else if (d->type == SPA_DATA_MemPtr) { + b->buffer.datas[j].data = SPA_INT_TO_PTR(SPA_PTRDIFF(d->data, baseptr)); + } else { + b->buffer.datas[j].type = SPA_ID_INVALID; + b->buffer.datas[j].data = 0; + spa_log_error(this->log, "invalid memory type %d", d->type); + } + } + } + + pw_client_node0_resource_port_use_buffers(this->resource, + this->seq, + direction, port_id, + n_buffers, mb); + + return SPA_RESULT_RETURN_ASYNC(this->seq++); +} + +static int +impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id) +{ + struct node *this = object; + struct impl *impl; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(CHECK_OUT_PORT(this, SPA_DIRECTION_OUTPUT, port_id), -EINVAL); + + impl = this->impl; + + spa_log_trace(this->log, "reuse buffer %d", buffer_id); + + pw_client_node0_transport_add_message(impl->transport, (struct pw_client_node0_message *) + &PW_CLIENT_NODE0_MESSAGE_PORT_REUSE_BUFFER_INIT(port_id, buffer_id)); + do_flush(this); + + return 0; +} + +static int impl_node_process_input(struct spa_node *node) +{ + struct node *this = SPA_CONTAINER_OF(node, struct node, node); + struct impl *impl = this->impl; +// bool client_reuse = impl->client_reuse; + uint32_t i; + int res; + + if (impl->input_ready == 0) { + /* the client is not ready to receive our buffers, recycle them */ + pw_log_trace("node not ready, recycle buffers"); + for (i = 0; i < MAX_INPUTS; i++) { + struct port *p = &this->in_ports[i]; + struct spa_io_buffers *io = p->io; + + if (!p->valid || io == NULL) + continue; + + io->status = SPA_STATUS_NEED_DATA; + } + res = SPA_STATUS_NEED_DATA; + } + else { + for (i = 0; i < MAX_INPUTS; i++) { + struct port *p = &this->in_ports[i]; + struct spa_io_buffers *io = p->io; + + if (!p->valid || io == NULL) + continue; + + pw_log_trace("set io status to %d %d", io->status, io->buffer_id); + impl->transport->inputs[p->id] = *io; + + /* explicitly recycle buffers when the client is not going to do it */ +// if (!client_reuse && (pp = p->peer)) +// spa_node_port_reuse_buffer(pp->node->implementation, +// pp->port_id, io->buffer_id); + } + pw_client_node0_transport_add_message(impl->transport, + &PW_CLIENT_NODE0_MESSAGE_INIT(PW_CLIENT_NODE0_MESSAGE_PROCESS_INPUT)); + do_flush(this); + + impl->input_ready--; + res = SPA_STATUS_OK; + } + return res; +} + +#if 0 +/** this is used for clients providing data to pipewire and currently + * not supported in the compat layer */ +static int impl_node_process_output(struct spa_node *node) +{ + struct node *this; + struct impl *impl; + uint32_t i; + + this = SPA_CONTAINER_OF(node, struct node, node); + impl = this->impl; + + if (impl->out_pending) + goto done; + + impl->out_pending = true; + + for (i = 0; i < MAX_OUTPUTS; i++) { + struct port *p = &this->out_ports[i]; + struct spa_io_buffers *io = p->io; + + if (!p->valid || io == NULL) + continue; + + impl->transport->outputs[p->id] = *io; + + pw_log_trace("%d %d -> %d %d", io->status, io->buffer_id, + impl->transport->outputs[p->id].status, + impl->transport->outputs[p->id].buffer_id); + } + + done: + pw_client_node0_transport_add_message(impl->transport, + &PW_CLIENT_NODE0_MESSAGE_INIT(PW_CLIENT_NODE0_MESSAGE_PROCESS_OUTPUT)); + do_flush(this); + + return SPA_STATUS_OK; +} +#endif + +static int impl_node_process(void *object) +{ + struct node *this = object; + struct impl *impl = this->impl; + struct pw_impl_node *n = impl->this.node; + + return impl_node_process_input(n->node); +} + +static int handle_node_message(struct node *this, struct pw_client_node0_message *message) +{ + struct impl *impl = SPA_CONTAINER_OF(this, struct impl, node); + uint32_t i; + + switch (PW_CLIENT_NODE0_MESSAGE_TYPE(message)) { + case PW_CLIENT_NODE0_MESSAGE_HAVE_OUTPUT: + for (i = 0; i < MAX_OUTPUTS; i++) { + struct port *p = &this->out_ports[i]; + struct spa_io_buffers *io = p->io; + if (!p->valid || io == NULL) + continue; + *io = impl->transport->outputs[p->id]; + pw_log_trace("have output %d %d", io->status, io->buffer_id); + } + impl->out_pending = false; + spa_node_call_ready(&this->callbacks, SPA_STATUS_HAVE_DATA); + break; + + case PW_CLIENT_NODE0_MESSAGE_NEED_INPUT: + for (i = 0; i < MAX_INPUTS; i++) { + struct port *p = &this->in_ports[i]; + struct spa_io_buffers *io = p->io; + if (!p->valid || io == NULL) + continue; + pw_log_trace("need input %d %d", i, p->id); + *io = impl->transport->inputs[p->id]; + pw_log_trace("need input %d %d", io->status, io->buffer_id); + } + impl->input_ready++; + spa_node_call_ready(&this->callbacks, SPA_STATUS_NEED_DATA); + break; + + case PW_CLIENT_NODE0_MESSAGE_PORT_REUSE_BUFFER: + if (impl->client_reuse) { + struct pw_client_node0_message_port_reuse_buffer *p = + (struct pw_client_node0_message_port_reuse_buffer *) message; + spa_node_call_reuse_buffer(&this->callbacks, p->body.port_id.value, + p->body.buffer_id.value); + } + break; + + default: + pw_log_warn("unhandled message %d", PW_CLIENT_NODE0_MESSAGE_TYPE(message)); + return -ENOTSUP; + } + return 0; +} + +static void setup_transport(struct impl *impl) +{ + struct node *this = &impl->node; + uint32_t max_inputs = 0, max_outputs = 0, n_inputs = 0, n_outputs = 0; + struct spa_dict_item items[1]; + + n_inputs = this->n_inputs; + max_inputs = this->info.max_input_ports == 0 ? this->n_inputs : this->info.max_input_ports; + n_outputs = this->n_outputs; + max_outputs = this->info.max_output_ports == 0 ? this->n_outputs : this->info.max_output_ports; + + impl->transport = pw_client_node0_transport_new(impl->context, max_inputs, max_outputs); + impl->transport->area->n_input_ports = n_inputs; + impl->transport->area->n_output_ports = n_outputs; + + if (n_inputs > 0) { + items[0] = SPA_DICT_ITEM_INIT(SPA_KEY_MEDIA_CLASS, "Stream/Input/Video"); + } else { + items[0] = SPA_DICT_ITEM_INIT(SPA_KEY_MEDIA_CLASS, "Stream/Output/Video"); + } + pw_impl_node_update_properties(impl->this.node, &SPA_DICT_INIT(items, 1)); +} + +static void +client_node0_done(void *data, int seq, int res) +{ + struct impl *impl = data; + struct node *this = &impl->node; + + if (seq == 0 && res == 0 && impl->transport == NULL) + setup_transport(impl); + + pw_log_debug("seq:%d res:%d pending:%d", seq, res, this->init_pending); + spa_node_emit_result(&this->hooks, seq, res, 0, NULL); + + if (this->init_pending != SPA_ID_INVALID) { + spa_node_emit_result(&this->hooks, this->init_pending, res, 0, NULL); + this->init_pending = SPA_ID_INVALID; + } +} + +static void +client_node0_update(void *data, + uint32_t change_mask, + uint32_t max_input_ports, + uint32_t max_output_ports, + uint32_t n_params, + const struct spa_pod **params) +{ + struct impl *impl = data; + struct node *this = &impl->node; + + if (change_mask & PW_CLIENT_NODE0_UPDATE_MAX_INPUTS) + this->info.max_input_ports = max_input_ports; + if (change_mask & PW_CLIENT_NODE0_UPDATE_MAX_OUTPUTS) + this->info.max_output_ports = max_output_ports; + if (change_mask & PW_CLIENT_NODE0_UPDATE_PARAMS) { + uint32_t i; + spa_log_debug(this->log, "node %p: update %d params", this, n_params); + + for (i = 0; i < this->n_params; i++) + free(this->params[i]); + this->n_params = n_params; + if (this->n_params == 0) { + free(this->params); + this->params = NULL; + } else { + void *p; + p = pw_reallocarray(this->params, this->n_params, sizeof(struct spa_pod *)); + if (p == NULL) { + pw_log_error("%p: can't realloc: %m", this); + free(this->params); + this->n_params = 0; + } + this->params = p; + } + for (i = 0; i < this->n_params; i++) + this->params[i] = params[i] ? spa_pod_copy(params[i]) : NULL; + } + if (change_mask & (PW_CLIENT_NODE0_UPDATE_MAX_INPUTS | PW_CLIENT_NODE0_UPDATE_MAX_OUTPUTS)) { + spa_node_emit_info(&this->hooks, &this->info); + } + + spa_log_debug(this->log, "node %p: got node update max_in %u, max_out %u", this, + this->info.max_input_ports, this->info.max_output_ports); +} + +static void +client_node0_port_update(void *data, + enum spa_direction direction, + uint32_t port_id, + uint32_t change_mask, + uint32_t n_params, + const struct spa_pod **params, + const struct spa_port_info *info) +{ + struct impl *impl = data; + struct node *this = &impl->node; + bool remove; + + spa_log_debug(this->log, "node %p: got port update", this); + if (!CHECK_PORT_ID(this, direction, port_id)) + return; + + remove = (change_mask == 0); + + if (remove) { + do_uninit_port(this, direction, port_id); + } else { + do_update_port(this, + direction, + port_id, + change_mask, + n_params, params, info); + } +} + +static void client_node0_set_active(void *data, bool active) +{ + struct impl *impl = data; + pw_impl_node_set_active(impl->this.node, active); +} + +static void client_node0_event(void *data, struct spa_event *event) +{ + struct impl *impl = data; + struct node *this = &impl->node; + + switch (SPA_EVENT_TYPE(event)) { + case SPA_NODE0_EVENT_RequestClockUpdate: + send_clock_update(this); + break; + default: + spa_node_emit_event(&this->hooks, event); + } +} + +static const struct pw_client_node0_methods client_node0_methods = { + PW_VERSION_CLIENT_NODE0_METHODS, + .done = client_node0_done, + .update = client_node0_update, + .port_update = client_node0_port_update, + .set_active = client_node0_set_active, + .event = client_node0_event, +}; + +static void node_on_data_fd_events(struct spa_source *source) +{ + struct node *this = source->data; + struct impl *impl = this->impl; + + if (source->rmask & (SPA_IO_ERR | SPA_IO_HUP)) { + spa_log_warn(this->log, "node %p: got error", this); + return; + } + + if (source->rmask & SPA_IO_IN) { + struct pw_client_node0_message message; + uint64_t cmd; + + if (spa_system_eventfd_read(this->data_system, this->data_source.fd, &cmd) < 0) + spa_log_warn(this->log, "node %p: error reading message: %s", + this, strerror(errno)); + + while (pw_client_node0_transport_next_message(impl->transport, &message) == 1) { + struct pw_client_node0_message *msg = alloca(SPA_POD_SIZE(&message)); + pw_client_node0_transport_parse_message(impl->transport, msg); + handle_node_message(this, msg); + } + } +} + +static const struct spa_node_methods impl_node = { + SPA_VERSION_NODE_METHODS, + .add_listener = impl_node_add_listener, + .set_callbacks = impl_node_set_callbacks, + .sync = impl_node_sync, + .enum_params = impl_node_enum_params, + .set_param = impl_node_set_param, + .set_io = impl_node_set_io, + .send_command = impl_node_send_command, + .add_port = impl_node_add_port, + .remove_port = impl_node_remove_port, + .port_enum_params = impl_node_port_enum_params, + .port_set_param = impl_node_port_set_param, + .port_use_buffers = impl_node_port_use_buffers, + .port_set_io = impl_node_port_set_io, + .port_reuse_buffer = impl_node_port_reuse_buffer, + .process = impl_node_process, +}; + +static int +node_init(struct node *this, + struct spa_dict *info, + const struct spa_support *support, + uint32_t n_support) +{ + this->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log); + this->data_loop = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DataLoop); + this->data_system = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DataSystem); + + if (this->data_loop == NULL) { + spa_log_error(this->log, "a data-loop is needed"); + return -EINVAL; + } + + this->node.iface = SPA_INTERFACE_INIT( + SPA_TYPE_INTERFACE_Node, + SPA_VERSION_NODE, + &impl_node, this); + spa_hook_list_init(&this->hooks); + + this->data_source.func = node_on_data_fd_events; + this->data_source.data = this; + this->data_source.fd = -1; + this->data_source.mask = SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP; + this->data_source.rmask = 0; + + this->seq = 1; + this->init_pending = SPA_ID_INVALID; + + return SPA_RESULT_RETURN_ASYNC(this->seq++); +} + +static int node_clear(struct node *this) +{ + uint32_t i; + + for (i = 0; i < MAX_INPUTS; i++) { + if (this->in_ports[i].valid) + clear_port(this, &this->in_ports[i], SPA_DIRECTION_INPUT, i); + } + for (i = 0; i < MAX_OUTPUTS; i++) { + if (this->out_ports[i].valid) + clear_port(this, &this->out_ports[i], SPA_DIRECTION_OUTPUT, i); + } + + return 0; +} + +static int do_remove_source(struct spa_loop *loop, + bool async, + uint32_t seq, + const void *data, + size_t size, + void *user_data) +{ + struct spa_source *source = user_data; + spa_loop_remove_source(loop, source); + return 0; +} + +static void client_node0_resource_destroy(void *data) +{ + struct impl *impl = data; + struct pw_impl_client_node0 *this = &impl->this; + struct node *node = &impl->node; + + pw_log_debug("client-node %p: destroy", impl); + + impl->node.resource = this->resource = NULL; + spa_hook_remove(&impl->resource_listener); + spa_hook_remove(&impl->object_listener); + + if (node->data_source.fd != -1) { + spa_loop_invoke(node->data_loop, + do_remove_source, + SPA_ID_INVALID, + NULL, + 0, + true, + &node->data_source); + } + if (this->node) + pw_impl_node_destroy(this->node); +} + +static void node_initialized(void *data) +{ + struct impl *impl = data; + struct pw_impl_client_node0 *this = &impl->this; + struct pw_impl_node *node = this->node; + struct spa_system *data_system = impl->node.data_system; + + if (this->resource == NULL) + return; + + impl->fds[0] = spa_system_eventfd_create(data_system, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK); + impl->fds[1] = spa_system_eventfd_create(data_system, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK); + impl->node.data_source.fd = impl->fds[0]; + impl->node.writefd = impl->fds[1]; + impl->other_fds[0] = impl->fds[1]; + impl->other_fds[1] = impl->fds[0]; + + spa_loop_add_source(impl->node.data_loop, &impl->node.data_source); + pw_log_debug("client-node %p: transport fd %d %d", node, impl->fds[0], impl->fds[1]); + + pw_client_node0_resource_transport(this->resource, + pw_global_get_id(pw_impl_node_get_global(node)), + impl->other_fds[0], + impl->other_fds[1], + impl->transport); +} + +static void node_free(void *data) +{ + struct impl *impl = data; + struct pw_impl_client_node0 *this = &impl->this; + struct spa_system *data_system = impl->node.data_system; + + this->node = NULL; + + pw_log_debug("client-node %p: free", &impl->this); + node_clear(&impl->node); + + if (impl->transport) + pw_client_node0_transport_destroy(impl->transport); + + spa_hook_remove(&impl->node_listener); + + if (this->resource) + pw_resource_destroy(this->resource); + + pw_array_clear(&impl->mems); + + if (impl->fds[0] != -1) + spa_system_close(data_system, impl->fds[0]); + if (impl->fds[1] != -1) + spa_system_close(data_system, impl->fds[1]); + free(impl); +} + +static const struct pw_impl_node_events node_events = { + PW_VERSION_IMPL_NODE_EVENTS, + .free = node_free, + .initialized = node_initialized, +}; + +static const struct pw_resource_events resource_events = { + PW_VERSION_RESOURCE_EVENTS, + .destroy = client_node0_resource_destroy, +}; + +static void convert_properties(struct pw_properties *properties) +{ + static const struct { + const char *from, *to; + } props[] = { + { "pipewire.autoconnect", PW_KEY_NODE_AUTOCONNECT, }, + /* XXX deprecated */ + { "pipewire.target.node", PW_KEY_NODE_TARGET, } + }; + + const char *str; + + SPA_FOR_EACH_ELEMENT_VAR(props, p) { + if ((str = pw_properties_get(properties, p->from)) != NULL) { + pw_properties_set(properties, p->to, str); + pw_properties_set(properties, p->from, NULL); + } + } +} + +/** Create a new client node + * \param client an owner \ref pw_client + * \param id an id + * \param name a name + * \param properties extra properties + * \return a newly allocated client node + * + * Create a new \ref pw_impl_node. + * + * \memberof pw_impl_client_node + */ +struct pw_impl_client_node0 *pw_impl_client_node0_new(struct pw_resource *resource, + struct pw_properties *properties) +{ + struct impl *impl; + struct pw_impl_client_node0 *this; + struct pw_impl_client *client = pw_resource_get_client(resource); + struct pw_context *context = pw_impl_client_get_context(client); + const struct spa_support *support; + uint32_t n_support; + const char *name; + int res; + + impl = calloc(1, sizeof(struct impl)); + if (impl == NULL) + return NULL; + + this = &impl->this; + + if (properties == NULL) + properties = pw_properties_new(NULL, NULL); + if (properties == NULL) { + res = -errno; + goto error_exit_free; + } + convert_properties(properties); + + pw_properties_setf(properties, PW_KEY_CLIENT_ID, "%d", client->global->id); + + impl->context = context; + impl->fds[0] = impl->fds[1] = -1; + pw_log_debug("client-node %p: new", impl); + + support = pw_context_get_support(impl->context, &n_support); + + node_init(&impl->node, NULL, support, n_support); + impl->node.impl = impl; + + pw_array_init(&impl->mems, 64); + + if ((name = pw_properties_get(properties, "node.name")) == NULL) + name = "client-node"; + pw_properties_set(properties, PW_KEY_MEDIA_TYPE, "Video"); + + impl->node.resource = resource; + this->resource = resource; + this->node = pw_spa_node_new(context, + PW_SPA_NODE_FLAG_ASYNC, + &impl->node.node, + NULL, + properties, 0); + if (this->node == NULL) { + res = -errno; + goto error_no_node; + } + + impl->client_reuse = pw_properties_get_bool(properties, "pipewire.client.reuse", false); + + pw_resource_add_listener(this->resource, + &impl->resource_listener, + &resource_events, + impl); + pw_resource_add_object_listener(this->resource, + &impl->object_listener, + &client_node0_methods, + impl); + + + pw_impl_node_add_listener(this->node, &impl->node_listener, &node_events, impl); + + return this; + +error_no_node: + pw_resource_destroy(this->resource); + node_clear(&impl->node); +error_exit_free: + free(impl); + errno = -res; + return NULL; +} + +/** Destroy a client node + * \param node the client node to destroy + * \memberof pw_impl_client_node + */ +void pw_impl_client_node0_destroy(struct pw_impl_client_node0 *node) +{ + pw_resource_destroy(node->resource); +} diff --git a/src/modules/module-client-node/v0/client-node.h b/src/modules/module-client-node/v0/client-node.h new file mode 100644 index 0000000..04b77d4 --- /dev/null +++ b/src/modules/module-client-node/v0/client-node.h @@ -0,0 +1,101 @@ +/* PipeWire + * + * Copyright © 2015 Wim Taymans <wim.taymans@gmail.com> + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef PIPEWIRE_CLIENT_NODE0_H +#define PIPEWIRE_CLIENT_NODE0_H + +#include <pipewire/impl.h> + +#include "ext-client-node.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/** The state of the clock */ +enum spa_clock0_state { + SPA_CLOCK0_STATE_STOPPED, /*< the clock is stopped */ + SPA_CLOCK0_STATE_PAUSED, /*< the clock is paused */ + SPA_CLOCK0_STATE_RUNNING, /*< the clock is running */ +}; + +struct spa_command_node0_clock_update_body { + struct spa_pod_object_body body; +#define SPA_COMMAND_NODE0_CLOCK_UPDATE_TIME (1 << 0) +#define SPA_COMMAND_NODE0_CLOCK_UPDATE_SCALE (1 << 1) +#define SPA_COMMAND_NODE0_CLOCK_UPDATE_STATE (1 << 2) +#define SPA_COMMAND_NODE0_CLOCK_UPDATE_LATENCY (1 << 3) + struct spa_pod_int change_mask SPA_ALIGNED(8); + struct spa_pod_int rate SPA_ALIGNED(8); + struct spa_pod_long ticks SPA_ALIGNED(8); + struct spa_pod_long monotonic_time SPA_ALIGNED(8); + struct spa_pod_long offset SPA_ALIGNED(8); + struct spa_pod_int scale SPA_ALIGNED(8); + struct spa_pod_int state SPA_ALIGNED(8); +#define SPA_COMMAND_NODE0_CLOCK_UPDATE_FLAG_LIVE (1 << 0) + struct spa_pod_int flags SPA_ALIGNED(8); + struct spa_pod_long latency SPA_ALIGNED(8); +}; + +struct spa_command_node0_clock_update { + struct spa_pod pod; + struct spa_command_node0_clock_update_body body; +}; + +#define SPA_COMMAND_NODE0_CLOCK_UPDATE_INIT(type,change_mask,rate,ticks,monotonic_time,offset,scale,state,flags,latency) \ + SPA_COMMAND_INIT_FULL(struct spa_command_node0_clock_update, \ + sizeof(struct spa_command_node0_clock_update_body), 0, type, \ + SPA_POD_INIT_Int(change_mask), \ + SPA_POD_INIT_Int(rate), \ + SPA_POD_INIT_Long(ticks), \ + SPA_POD_INIT_Long(monotonic_time), \ + SPA_POD_INIT_Long(offset), \ + SPA_POD_INIT_Int(scale), \ + SPA_POD_INIT_Int(state), \ + SPA_POD_INIT_Int(flags), \ + SPA_POD_INIT_Long(latency)) + + +/** \class pw_impl_client_node0 + * + * PipeWire client node interface + */ +struct pw_impl_client_node0 { + struct pw_impl_node *node; + + struct pw_resource *resource; +}; + +struct pw_impl_client_node0 * +pw_impl_client_node0_new(struct pw_resource *resource, + struct pw_properties *properties); + +void +pw_impl_client_node0_destroy(struct pw_impl_client_node0 *node); + +#ifdef __cplusplus +} +#endif + +#endif /* PIPEWIRE_CLIENT_NODE0_H */ diff --git a/src/modules/module-client-node/v0/ext-client-node.h b/src/modules/module-client-node/v0/ext-client-node.h new file mode 100644 index 0000000..21ba597 --- /dev/null +++ b/src/modules/module-client-node/v0/ext-client-node.h @@ -0,0 +1,414 @@ +/* PipeWire + * + * Copyright © 2016 Wim Taymans <wim.taymans@gmail.com> + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef __PIPEWIRE_EXT_CLIENT_NODE0_H__ +#define __PIPEWIRE_EXT_CLIENT_NODE0_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <spa/utils/defs.h> +#include <spa/param/param.h> +#include <spa/node/node.h> + +#include <pipewire/proxy.h> + +#define PW_TYPE_INTERFACE_ClientNode PW_TYPE_INFO_INTERFACE_BASE "ClientNode" + +#define PW_VERSION_CLIENT_NODE0 0 + +struct pw_client_node0_message; + +/** Shared structure between client and server \memberof pw_client_node */ +struct pw_client_node0_area { + uint32_t max_input_ports; /**< max input ports of the node */ + uint32_t n_input_ports; /**< number of input ports of the node */ + uint32_t max_output_ports; /**< max output ports of the node */ + uint32_t n_output_ports; /**< number of output ports of the node */ +}; + +/** \class pw_client_node0_transport + * + * \brief Transport object + * + * The transport object contains shared data and ringbuffers to exchange + * events and data between the server and the client in a low-latency and + * lockfree way. + */ +struct pw_client_node0_transport { + struct pw_client_node0_area *area; /**< the transport area */ + struct spa_io_buffers *inputs; /**< array of buffer input io */ + struct spa_io_buffers *outputs; /**< array of buffer output io */ + void *input_data; /**< input memory for ringbuffer */ + struct spa_ringbuffer *input_buffer; /**< ringbuffer for input memory */ + void *output_data; /**< output memory for ringbuffer */ + struct spa_ringbuffer *output_buffer; /**< ringbuffer for output memory */ + + /** Destroy a transport + * \param trans a transport to destroy + * \memberof pw_client_node0_transport + */ + void (*destroy) (struct pw_client_node0_transport *trans); + + /** Add a message to the transport + * \param trans the transport to send the message on + * \param message the message to add + * \return 0 on success, < 0 on error + * + * Write \a message to the shared ringbuffer. + */ + int (*add_message) (struct pw_client_node0_transport *trans, struct pw_client_node0_message *message); + + /** Get next message from a transport + * \param trans the transport to get the message of + * \param[out] message the message to read + * \return < 0 on error, 1 when a message is available, + * 0 when no more messages are available. + * + * Get the skeleton next message from \a trans into \a message. This function will + * only read the head and object body of the message. + * + * After the complete size of the message has been calculated, you should call + * \ref parse_message() to read the complete message contents. + */ + int (*next_message) (struct pw_client_node0_transport *trans, struct pw_client_node0_message *message); + + /** Parse the complete message on transport + * \param trans the transport to read from + * \param[out] message memory that can hold the complete message + * \return 0 on success, < 0 on error + * + * Use this function after \ref next_message(). + */ + int (*parse_message) (struct pw_client_node0_transport *trans, void *message); +}; + +#define pw_client_node0_transport_destroy(t) ((t)->destroy((t))) +#define pw_client_node0_transport_add_message(t,m) ((t)->add_message((t), (m))) +#define pw_client_node0_transport_next_message(t,m) ((t)->next_message((t), (m))) +#define pw_client_node0_transport_parse_message(t,m) ((t)->parse_message((t), (m))) + +enum pw_client_node0_message_type { + PW_CLIENT_NODE0_MESSAGE_HAVE_OUTPUT, /*< signal that the node has output */ + PW_CLIENT_NODE0_MESSAGE_NEED_INPUT, /*< signal that the node needs input */ + PW_CLIENT_NODE0_MESSAGE_PROCESS_INPUT, /*< instruct the node to process input */ + PW_CLIENT_NODE0_MESSAGE_PROCESS_OUTPUT, /*< instruct the node output is processed */ + PW_CLIENT_NODE0_MESSAGE_PORT_REUSE_BUFFER, /*< reuse a buffer */ +}; + +struct pw_client_node0_message_body { + struct spa_pod_int type SPA_ALIGNED(8); /*< one of enum pw_client_node0_message_type */ +}; + +struct pw_client_node0_message { + struct spa_pod_struct pod; + struct pw_client_node0_message_body body; +}; + +struct pw_client_node0_message_port_reuse_buffer_body { + struct spa_pod_int type SPA_ALIGNED(8); /*< PW_CLIENT_NODE0_MESSAGE_PORT_REUSE_BUFFER */ + struct spa_pod_int port_id SPA_ALIGNED(8); /*< port id */ + struct spa_pod_int buffer_id SPA_ALIGNED(8); /*< buffer id to reuse */ +}; + +struct pw_client_node0_message_port_reuse_buffer { + struct spa_pod_struct pod; + struct pw_client_node0_message_port_reuse_buffer_body body; +}; + +#define PW_CLIENT_NODE0_MESSAGE_TYPE(message) (((struct pw_client_node0_message*)(message))->body.type.value) + +#define PW_CLIENT_NODE0_MESSAGE_INIT(message) ((struct pw_client_node0_message) \ + { { { sizeof(struct pw_client_node0_message_body), SPA_TYPE_Struct } }, \ + { SPA_POD_INIT_Int(message) } }) + +#define PW_CLIENT_NODE0_MESSAGE_INIT_FULL(type,size,message,...) (type) \ + { { { size, SPA_TYPE_Struct } }, \ + { SPA_POD_INIT_Int(message), ##__VA_ARGS__ } } \ + +#define PW_CLIENT_NODE0_MESSAGE_PORT_REUSE_BUFFER_INIT(port_id,buffer_id) \ + PW_CLIENT_NODE0_MESSAGE_INIT_FULL(struct pw_client_node0_message_port_reuse_buffer, \ + sizeof(struct pw_client_node0_message_port_reuse_buffer_body), \ + PW_CLIENT_NODE0_MESSAGE_PORT_REUSE_BUFFER, \ + SPA_POD_INIT_Int(port_id), \ + SPA_POD_INIT_Int(buffer_id)) + +/** information about a buffer */ +struct pw_client_node0_buffer { + uint32_t mem_id; /**< the memory id for the metadata */ + uint32_t offset; /**< offset in memory */ + uint32_t size; /**< size in memory */ + struct spa_buffer *buffer; /**< buffer describing metadata and buffer memory */ +}; + +#define PW_CLIENT_NODE0_METHOD_DONE 0 +#define PW_CLIENT_NODE0_METHOD_UPDATE 1 +#define PW_CLIENT_NODE0_METHOD_PORT_UPDATE 2 +#define PW_CLIENT_NODE0_METHOD_SET_ACTIVE 3 +#define PW_CLIENT_NODE0_METHOD_EVENT 4 +#define PW_CLIENT_NODE0_METHOD_DESTROY 5 +#define PW_CLIENT_NODE0_METHOD_NUM 6 + +/** \ref pw_client_node methods */ +struct pw_client_node0_methods { +#define PW_VERSION_CLIENT_NODE0_METHODS 0 + uint32_t version; + + /** Complete an async operation */ + void (*done) (void *object, int seq, int res); + + /** + * Update the node ports and properties + * + * Update the maximum number of ports and the params of the + * client node. + * \param change_mask bitfield with changed parameters + * \param max_input_ports new max input ports + * \param max_output_ports new max output ports + * \param params new params + */ + void (*update) (void *object, +#define PW_CLIENT_NODE0_UPDATE_MAX_INPUTS (1 << 0) +#define PW_CLIENT_NODE0_UPDATE_MAX_OUTPUTS (1 << 1) +#define PW_CLIENT_NODE0_UPDATE_PARAMS (1 << 2) + uint32_t change_mask, + uint32_t max_input_ports, + uint32_t max_output_ports, + uint32_t n_params, + const struct spa_pod **params); + + /** + * Update a node port + * + * Update the information of one port of a node. + * \param direction the direction of the port + * \param port_id the port id to update + * \param change_mask a bitfield of changed items + * \param n_params number of port parameters + * \param params array of port parameters + * \param info port information + */ + void (*port_update) (void *object, + enum spa_direction direction, + uint32_t port_id, +#define PW_CLIENT_NODE0_PORT_UPDATE_PARAMS (1 << 0) +#define PW_CLIENT_NODE0_PORT_UPDATE_INFO (1 << 1) + uint32_t change_mask, + uint32_t n_params, + const struct spa_pod **params, + const struct spa_port_info *info); + /** + * Activate or deactivate the node + */ + void (*set_active) (void *object, bool active); + /** + * Send an event to the node + * \param event the event to send + */ + void (*event) (void *object, struct spa_event *event); + /** + * Destroy the client_node + */ + void (*destroy) (void *object); +}; + +#define PW_CLIENT_NODE0_EVENT_ADD_MEM 0 +#define PW_CLIENT_NODE0_EVENT_TRANSPORT 1 +#define PW_CLIENT_NODE0_EVENT_SET_PARAM 2 +#define PW_CLIENT_NODE0_EVENT_EVENT 3 +#define PW_CLIENT_NODE0_EVENT_COMMAND 4 +#define PW_CLIENT_NODE0_EVENT_ADD_PORT 5 +#define PW_CLIENT_NODE0_EVENT_REMOVE_PORT 6 +#define PW_CLIENT_NODE0_EVENT_PORT_SET_PARAM 7 +#define PW_CLIENT_NODE0_EVENT_PORT_USE_BUFFERS 8 +#define PW_CLIENT_NODE0_EVENT_PORT_COMMAND 9 +#define PW_CLIENT_NODE0_EVENT_PORT_SET_IO 10 +#define PW_CLIENT_NODE0_EVENT_NUM 11 + +/** \ref pw_client_node events */ +struct pw_client_node0_events { +#define PW_VERSION_CLIENT_NODE0_EVENTS 0 + uint32_t version; + /** + * Memory was added to a node + * + * \param mem_id the id of the memory + * \param type the memory type + * \param memfd the fd of the memory + * \param flags flags for the \a memfd + */ + void (*add_mem) (void *data, + uint32_t mem_id, + uint32_t type, + int memfd, + uint32_t flags); + /** + * Notify of a new transport area + * + * The transport area is used to exchange real-time commands between + * the client and the server. + * + * \param node_id the node id created for this client node + * \param readfd fd for signal data can be read + * \param writefd fd for signal data can be written + * \param transport the shared transport area + */ + void (*transport) (void *data, + uint32_t node_id, + int readfd, + int writefd, + struct pw_client_node0_transport *transport); + /** + * Notify of a property change + * + * When the server configures the properties on the node + * this event is sent + * + * \param seq a sequence number + * \param id the id of the parameter + * \param flags parameter flags + * \param param the param to set + */ + void (*set_param) (void *data, uint32_t seq, + uint32_t id, uint32_t flags, + const struct spa_pod *param); + /** + * Receive an event from the client node + * \param event the received event */ + void (*event) (void *data, const struct spa_event *event); + /** + * Notify of a new node command + * + * \param seq a sequence number + * \param command the command + */ + void (*command) (void *data, uint32_t seq, const struct spa_command *command); + /** + * A new port was added to the node + * + * The server can at any time add a port to the node when there + * are free ports available. + * + * \param seq a sequence number + * \param direction the direction of the port + * \param port_id the new port id + */ + void (*add_port) (void *data, + uint32_t seq, + enum spa_direction direction, + uint32_t port_id); + /** + * A port was removed from the node + * + * \param seq a sequence number + * \param direction a port direction + * \param port_id the remove port id + */ + void (*remove_port) (void *data, + uint32_t seq, + enum spa_direction direction, + uint32_t port_id); + /** + * A parameter was configured on the port + * + * \param seq a sequence number + * \param direction a port direction + * \param port_id the port id + * \param id the id of the parameter + * \param flags flags used when setting the param + * \param param the new param + */ + void (*port_set_param) (void *data, + uint32_t seq, + enum spa_direction direction, + uint32_t port_id, + uint32_t id, uint32_t flags, + const struct spa_pod *param); + /** + * Notify the port of buffers + * + * \param seq a sequence number + * \param direction a port direction + * \param port_id the port id + * \param n_buffer the number of buffers + * \param buffers and array of buffer descriptions + */ + void (*port_use_buffers) (void *data, + uint32_t seq, + enum spa_direction direction, + uint32_t port_id, + uint32_t n_buffers, + struct pw_client_node0_buffer *buffers); + /** + * Notify of a new port command + * + * \param direction a port direction + * \param port_id the port id + * \param command the command + */ + void (*port_command) (void *data, + enum spa_direction direction, + uint32_t port_id, + const struct spa_command *command); + + /** + * Configure the io area with \a id of \a port_id. + * + * \param seq a sequence number + * \param direction the direction of the port + * \param port_id the port id + * \param id the id of the io area to set + * \param mem_id the id of the memory to use + * \param offset offset of io area in memory + * \param size size of the io area + */ + void (*port_set_io) (void *data, + uint32_t seq, + enum spa_direction direction, + uint32_t port_id, + uint32_t id, + uint32_t mem_id, + uint32_t offset, + uint32_t size); +}; +#define pw_client_node0_resource(r,m,v,...) pw_resource_call(r, struct pw_client_node0_events, m, v, ##__VA_ARGS__) + +#define pw_client_node0_resource_add_mem(r,...) pw_client_node0_resource(r,add_mem,0,__VA_ARGS__) +#define pw_client_node0_resource_transport(r,...) pw_client_node0_resource(r,transport,0,__VA_ARGS__) +#define pw_client_node0_resource_set_param(r,...) pw_client_node0_resource(r,set_param,0,__VA_ARGS__) +#define pw_client_node0_resource_event(r,...) pw_client_node0_resource(r,event,0,__VA_ARGS__) +#define pw_client_node0_resource_command(r,...) pw_client_node0_resource(r,command,0,__VA_ARGS__) +#define pw_client_node0_resource_add_port(r,...) pw_client_node0_resource(r,add_port,0,__VA_ARGS__) +#define pw_client_node0_resource_remove_port(r,...) pw_client_node0_resource(r,remove_port,0,__VA_ARGS__) +#define pw_client_node0_resource_port_set_param(r,...) pw_client_node0_resource(r,port_set_param,0,__VA_ARGS__) +#define pw_client_node0_resource_port_use_buffers(r,...) pw_client_node0_resource(r,port_use_buffers,0,__VA_ARGS__) +#define pw_client_node0_resource_port_command(r,...) pw_client_node0_resource(r,port_command,0,__VA_ARGS__) +#define pw_client_node0_resource_port_set_io(r,...) pw_client_node0_resource(r,port_set_io,0,__VA_ARGS__) + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* __PIPEWIRE_EXT_CLIENT_NODE0_H__ */ diff --git a/src/modules/module-client-node/v0/protocol-native.c b/src/modules/module-client-node/v0/protocol-native.c new file mode 100644 index 0000000..86d8fe9 --- /dev/null +++ b/src/modules/module-client-node/v0/protocol-native.c @@ -0,0 +1,534 @@ +/* PipeWire + * + * Copyright © 2017 Wim Taymans <wim.taymans@gmail.com> + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include <errno.h> + +#include <spa/pod/parser.h> +#include <spa/pod/builder.h> +#include <spa/utils/type-info.h> + +#include "pipewire/impl.h" + +#include "pipewire/extensions/protocol-native.h" + +#include "ext-client-node.h" + +#include "transport.h" + +#define PW_PROTOCOL_NATIVE_FLAG_REMAP (1<<0) + +extern uint32_t pw_protocol_native0_find_type(struct pw_impl_client *client, const char *type); +extern int pw_protocol_native0_pod_to_v2(struct pw_impl_client *client, const struct spa_pod *pod, + struct spa_pod_builder *b); +extern struct spa_pod * pw_protocol_native0_pod_from_v2(struct pw_impl_client *client, + const struct spa_pod *pod); +extern uint32_t pw_protocol_native0_type_to_v2(struct pw_impl_client *client, + const struct spa_type_info *info, uint32_t type); + +static void +client_node_marshal_add_mem(void *data, + uint32_t mem_id, + uint32_t type, + int memfd, uint32_t flags) +{ + struct pw_resource *resource = data; + struct pw_impl_client *client = pw_resource_get_client(resource); + struct spa_pod_builder *b; + const char *typename; + + switch (type) { + case SPA_DATA_MemFd: + typename = "Spa:Enum:DataType:Fd:MemFd"; + break; + case SPA_DATA_DmaBuf: + typename = "Spa:Enum:DataType:Fd:DmaBuf"; + break; + default: + case SPA_DATA_MemPtr: + return; + + } + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_ADD_MEM, NULL); + + spa_pod_builder_add_struct(b, + "i", mem_id, + "I", pw_protocol_native0_find_type(client, typename), + "i", pw_protocol_native_add_resource_fd(resource, memfd), + "i", flags); + + pw_protocol_native_end_resource(resource, b); +} + +static void client_node_marshal_transport(void *data, uint32_t node_id, int readfd, int writefd, + struct pw_client_node0_transport *transport) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + struct pw_client_node0_transport_info info; + + pw_client_node0_transport_get_info(transport, &info); + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_TRANSPORT, NULL); + + spa_pod_builder_add_struct(b, + "i", node_id, + "i", pw_protocol_native_add_resource_fd(resource, readfd), + "i", pw_protocol_native_add_resource_fd(resource, writefd), + "i", pw_protocol_native_add_resource_fd(resource, info.memfd), + "i", info.offset, + "i", info.size); + + pw_protocol_native_end_resource(resource, b); +} + +static void +client_node_marshal_set_param(void *data, uint32_t seq, uint32_t id, uint32_t flags, + const struct spa_pod *param) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_SET_PARAM, NULL); + + spa_pod_builder_add_struct(b, + "i", seq, + "I", id, + "i", flags, + "P", param); + + pw_protocol_native_end_resource(resource, b); +} + +static void client_node_marshal_event_event(void *data, const struct spa_event *event) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_EVENT, NULL); + + spa_pod_builder_add_struct(b, "P", event); + + pw_protocol_native_end_resource(resource, b); +} + +static void +client_node_marshal_command(void *data, uint32_t seq, const struct spa_command *command) +{ + struct pw_resource *resource = data; + struct pw_impl_client *client = pw_resource_get_client(resource); + struct spa_pod_builder *b; + struct spa_pod_frame f; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_COMMAND, NULL); + + spa_pod_builder_push_struct(b, &f); + spa_pod_builder_add(b, "i", seq, NULL); + if (SPA_COMMAND_TYPE(command) == 0) + spa_pod_builder_add(b, "P", command, NULL); + else + pw_protocol_native0_pod_to_v2(client, (struct spa_pod *)command, b); + spa_pod_builder_pop(b, &f); + + pw_protocol_native_end_resource(resource, b); +} + +static void +client_node_marshal_add_port(void *data, + uint32_t seq, enum spa_direction direction, uint32_t port_id) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_ADD_PORT, NULL); + + spa_pod_builder_add_struct(b, + "i", seq, + "i", direction, + "i", port_id); + + pw_protocol_native_end_resource(resource, b); +} + +static void +client_node_marshal_remove_port(void *data, + uint32_t seq, enum spa_direction direction, uint32_t port_id) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_REMOVE_PORT, NULL); + + spa_pod_builder_add_struct(b, + "i", seq, + "i", direction, + "i", port_id); + + pw_protocol_native_end_resource(resource, b); +} + +static void +client_node_marshal_port_set_param(void *data, + uint32_t seq, + enum spa_direction direction, + uint32_t port_id, + uint32_t id, + uint32_t flags, + const struct spa_pod *param) +{ + struct pw_resource *resource = data; + struct pw_impl_client *client = pw_resource_get_client(resource); + struct spa_pod_builder *b; + struct spa_pod_frame f; + const char *typename; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_PORT_SET_PARAM, NULL); + + switch (id) { + case SPA_PARAM_Props: + typename = "Spa:Enum:ParamId:Props"; + break; + case SPA_PARAM_Format: + typename = "Spa:Enum:ParamId:Format"; + break; + default: + return; + } + + spa_pod_builder_push_struct(b, &f); + spa_pod_builder_add(b, + "i", seq, + "i", direction, + "i", port_id, + "I", pw_protocol_native0_find_type(client, typename), + "i", flags, NULL); + pw_protocol_native0_pod_to_v2(client, param, b); + spa_pod_builder_pop(b, &f); + + pw_protocol_native_end_resource(resource, b); +} + +static void +client_node_marshal_port_use_buffers(void *data, + uint32_t seq, + enum spa_direction direction, + uint32_t port_id, + uint32_t n_buffers, struct pw_client_node0_buffer *buffers) +{ + struct pw_resource *resource = data; + struct pw_impl_client *client = pw_resource_get_client(resource); + struct spa_pod_builder *b; + struct spa_pod_frame f; + uint32_t i, j; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_PORT_USE_BUFFERS, NULL); + + spa_pod_builder_push_struct(b, &f); + spa_pod_builder_add(b, + "i", seq, + "i", direction, + "i", port_id, + "i", n_buffers, NULL); + + for (i = 0; i < n_buffers; i++) { + struct spa_buffer *buf = buffers[i].buffer; + + spa_pod_builder_add(b, + "i", buffers[i].mem_id, + "i", buffers[i].offset, + "i", buffers[i].size, + "i", i, + "i", buf->n_metas, NULL); + + for (j = 0; j < buf->n_metas; j++) { + struct spa_meta *m = &buf->metas[j]; + spa_pod_builder_add(b, + "I", pw_protocol_native0_type_to_v2(client, spa_type_meta_type, m->type), + "i", m->size, NULL); + } + spa_pod_builder_add(b, "i", buf->n_datas, NULL); + for (j = 0; j < buf->n_datas; j++) { + struct spa_data *d = &buf->datas[j]; + spa_pod_builder_add(b, + "I", pw_protocol_native0_type_to_v2(client, spa_type_data_type, d->type), + "i", SPA_PTR_TO_UINT32(d->data), + "i", d->flags, + "i", d->mapoffset, + "i", d->maxsize, NULL); + } + } + spa_pod_builder_pop(b, &f); + + pw_protocol_native_end_resource(resource, b); +} + +static void +client_node_marshal_port_command(void *data, + uint32_t direction, + uint32_t port_id, + const struct spa_command *command) +{ + struct pw_resource *resource = data; + struct pw_impl_client *client = pw_resource_get_client(resource); + struct spa_pod_builder *b; + struct spa_pod_frame f; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_PORT_COMMAND, NULL); + + spa_pod_builder_push_struct(b, &f); + spa_pod_builder_add(b, + "i", direction, + "i", port_id, NULL); + pw_protocol_native0_pod_to_v2(client, (struct spa_pod *)command, b); + spa_pod_builder_pop(b, &f); + + pw_protocol_native_end_resource(resource, b); +} + +static void +client_node_marshal_port_set_io(void *data, + uint32_t seq, + uint32_t direction, + uint32_t port_id, + uint32_t id, + uint32_t memid, + uint32_t offset, + uint32_t size) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_PORT_SET_IO, NULL); + + spa_pod_builder_add_struct(b, + "i", seq, + "i", direction, + "i", port_id, + "I", id, + "i", memid, + "i", offset, + "i", size); + + pw_protocol_native_end_resource(resource, b); +} + + +static int client_node_demarshal_done(void *object, const struct pw_protocol_native_message *msg) +{ + struct pw_resource *resource = object; + struct spa_pod_parser prs; + uint32_t seq, res; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_get_struct(&prs, + "i", &seq, + "i", &res) < 0) + return -EINVAL; + + return pw_resource_notify(resource, struct pw_client_node0_methods, done, 0, seq, res); +} + +static int client_node_demarshal_update(void *object, const struct pw_protocol_native_message *msg) +{ + struct pw_resource *resource = object; + struct spa_pod_parser prs; + struct spa_pod_frame f; + uint32_t change_mask, max_input_ports, max_output_ports, n_params; + const struct spa_pod **params; + uint32_t i; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_push_struct(&prs, &f) < 0 || + spa_pod_parser_get(&prs, + "i", &change_mask, + "i", &max_input_ports, + "i", &max_output_ports, + "i", &n_params, NULL) < 0) + return -EINVAL; + + params = alloca(n_params * sizeof(struct spa_pod *)); + for (i = 0; i < n_params; i++) + if (spa_pod_parser_get(&prs, "O", ¶ms[i], NULL) < 0) + return -EINVAL; + + return pw_resource_notify(resource, struct pw_client_node0_methods, update, 0, change_mask, + max_input_ports, + max_output_ports, + n_params, + params); +} + +static int client_node_demarshal_port_update(void *object, const struct pw_protocol_native_message *msg) +{ + struct pw_resource *resource = object; + struct spa_pod_parser prs; + struct spa_pod_frame f[2]; + uint32_t i, direction, port_id, change_mask, n_params; + const struct spa_pod **params = NULL; + struct spa_port_info info = { 0 }, *infop = NULL; + struct spa_dict props; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_push_struct(&prs, &f[0]) < 0 || + spa_pod_parser_get(&prs, + "i", &direction, + "i", &port_id, + "i", &change_mask, + "i", &n_params, NULL) < 0) + return -EINVAL; + + params = alloca(n_params * sizeof(struct spa_pod *)); + for (i = 0; i < n_params; i++) + if (spa_pod_parser_get(&prs, "O", ¶ms[i], NULL) < 0) + return -EINVAL; + + + if (spa_pod_parser_push_struct(&prs, &f[1]) >= 0) { + infop = &info; + + if (spa_pod_parser_get(&prs, + "i", &info.flags, + "i", &info.rate, + "i", &props.n_items, NULL) < 0) + return -EINVAL; + + if (props.n_items > 0) { + info.props = &props; + + props.items = alloca(props.n_items * sizeof(struct spa_dict_item)); + for (i = 0; i < props.n_items; i++) { + if (spa_pod_parser_get(&prs, + "s", &props.items[i].key, + "s", &props.items[i].value, + NULL) < 0) + return -EINVAL; + } + } + } + + return pw_resource_notify(resource, struct pw_client_node0_methods, port_update, 0, direction, + port_id, + change_mask, + n_params, + params, infop); +} + +static int client_node_demarshal_set_active(void *object, const struct pw_protocol_native_message *msg) +{ + struct pw_resource *resource = object; + struct spa_pod_parser prs; + int active; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_get_struct(&prs, + "b", &active) < 0) + return -EINVAL; + + return pw_resource_notify(resource, struct pw_client_node0_methods, set_active, 0, active); +} + +static int client_node_demarshal_event_method(void *object, const struct pw_protocol_native_message *msg) +{ + struct pw_resource *resource = object; + struct pw_impl_client *client = pw_resource_get_client(resource); + struct spa_pod_parser prs; + struct spa_event *event; + int res; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_get_struct(&prs, + "O", &event) < 0) + return -EINVAL; + + event = (struct spa_event*)pw_protocol_native0_pod_from_v2(client, (struct spa_pod *)event); + + res = pw_resource_notify(resource, struct pw_client_node0_methods, event, 0, event); + free(event); + + return res; +} + +static int client_node_demarshal_destroy(void *object, const struct pw_protocol_native_message *msg) +{ + struct pw_resource *resource = object; + struct spa_pod_parser prs; + int res; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_get_struct(&prs, NULL) < 0) + return -EINVAL; + + res = pw_resource_notify(resource, struct pw_client_node0_methods, destroy, 0); + pw_resource_destroy(resource); + return res; +} + +static const struct pw_protocol_native_demarshal pw_protocol_native_client_node_method_demarshal[] = { + { &client_node_demarshal_done, 0, 0 }, + { &client_node_demarshal_update, 0, PW_PROTOCOL_NATIVE_FLAG_REMAP }, + { &client_node_demarshal_port_update, 0, PW_PROTOCOL_NATIVE_FLAG_REMAP }, + { &client_node_demarshal_set_active, 0, 0 }, + { &client_node_demarshal_event_method, 0, PW_PROTOCOL_NATIVE_FLAG_REMAP }, + { &client_node_demarshal_destroy, 0, 0 }, +}; + +static const struct pw_client_node0_events pw_protocol_native_client_node_event_marshal = { + PW_VERSION_CLIENT_NODE0_EVENTS, + &client_node_marshal_add_mem, + &client_node_marshal_transport, + &client_node_marshal_set_param, + &client_node_marshal_event_event, + &client_node_marshal_command, + &client_node_marshal_add_port, + &client_node_marshal_remove_port, + &client_node_marshal_port_set_param, + &client_node_marshal_port_use_buffers, + &client_node_marshal_port_command, + &client_node_marshal_port_set_io, +}; + +static const struct pw_protocol_marshal pw_protocol_native_client_node_marshal = { + PW_TYPE_INTERFACE_ClientNode, + PW_VERSION_CLIENT_NODE0, + PW_CLIENT_NODE0_METHOD_NUM, + PW_CLIENT_NODE0_EVENT_NUM, + 0, + NULL, + .server_demarshal = &pw_protocol_native_client_node_method_demarshal, + .server_marshal = &pw_protocol_native_client_node_event_marshal, + NULL, +}; + +struct pw_protocol *pw_protocol_native_ext_client_node0_init(struct pw_context *context) +{ + struct pw_protocol *protocol; + + protocol = pw_context_find_protocol(context, PW_TYPE_INFO_PROTOCOL_Native); + + if (protocol == NULL) + return NULL; + + pw_protocol_add_marshal(protocol, &pw_protocol_native_client_node_marshal); + + return protocol; +} diff --git a/src/modules/module-client-node/v0/transport.c b/src/modules/module-client-node/v0/transport.c new file mode 100644 index 0000000..fa84795 --- /dev/null +++ b/src/modules/module-client-node/v0/transport.c @@ -0,0 +1,262 @@ +/* PipeWire + * + * Copyright © 2016 Wim Taymans <wim.taymans@gmail.com> + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include <unistd.h> +#include <errno.h> +#include <sys/mman.h> + +#include <spa/utils/ringbuffer.h> +#include <spa/node/io.h> + +#include <pipewire/impl.h> +#include <pipewire/private.h> + +#include "ext-client-node.h" + +#include "transport.h" + +/** \cond */ + +#define INPUT_BUFFER_SIZE (1<<12) +#define OUTPUT_BUFFER_SIZE (1<<12) + +struct transport { + struct pw_client_node0_transport trans; + + struct pw_memblock *mem; + size_t offset; + + struct pw_client_node0_message current; + uint32_t current_index; +}; +/** \endcond */ + +static size_t area_get_size(struct pw_client_node0_area *area) +{ + size_t size; + size = sizeof(struct pw_client_node0_area); + size += area->max_input_ports * sizeof(struct spa_io_buffers); + size += area->max_output_ports * sizeof(struct spa_io_buffers); + size += sizeof(struct spa_ringbuffer); + size += INPUT_BUFFER_SIZE; + size += sizeof(struct spa_ringbuffer); + size += OUTPUT_BUFFER_SIZE; + return size; +} + +static void transport_setup_area(void *p, struct pw_client_node0_transport *trans) +{ + struct pw_client_node0_area *a; + + trans->area = a = p; + p = SPA_PTROFF(p, sizeof(struct pw_client_node0_area), struct spa_io_buffers); + + trans->inputs = p; + p = SPA_PTROFF(p, a->max_input_ports * sizeof(struct spa_io_buffers), void); + + trans->outputs = p; + p = SPA_PTROFF(p, a->max_output_ports * sizeof(struct spa_io_buffers), void); + + trans->input_buffer = p; + p = SPA_PTROFF(p, sizeof(struct spa_ringbuffer), void); + + trans->input_data = p; + p = SPA_PTROFF(p, INPUT_BUFFER_SIZE, void); + + trans->output_buffer = p; + p = SPA_PTROFF(p, sizeof(struct spa_ringbuffer), void); + + trans->output_data = p; + p = SPA_PTROFF(p, OUTPUT_BUFFER_SIZE, void); +} + +static void transport_reset_area(struct pw_client_node0_transport *trans) +{ + uint32_t i; + struct pw_client_node0_area *a = trans->area; + + for (i = 0; i < a->max_input_ports; i++) { + trans->inputs[i] = SPA_IO_BUFFERS_INIT; + } + for (i = 0; i < a->max_output_ports; i++) { + trans->outputs[i] = SPA_IO_BUFFERS_INIT; + } + spa_ringbuffer_init(trans->input_buffer); + spa_ringbuffer_init(trans->output_buffer); +} + +static void destroy(struct pw_client_node0_transport *trans) +{ + struct transport *impl = (struct transport *) trans; + + pw_log_debug("transport %p: destroy", trans); + + pw_memblock_free(impl->mem); + free(impl); +} + +static int add_message(struct pw_client_node0_transport *trans, struct pw_client_node0_message *message) +{ + struct transport *impl = (struct transport *) trans; + int32_t filled, avail; + uint32_t size, index; + + if (impl == NULL || message == NULL) + return -EINVAL; + + filled = spa_ringbuffer_get_write_index(trans->output_buffer, &index); + avail = OUTPUT_BUFFER_SIZE - filled; + size = SPA_POD_SIZE(message); + if (avail < (int)size) + return -ENOSPC; + + spa_ringbuffer_write_data(trans->output_buffer, + trans->output_data, OUTPUT_BUFFER_SIZE, + index & (OUTPUT_BUFFER_SIZE - 1), message, size); + spa_ringbuffer_write_update(trans->output_buffer, index + size); + + return 0; +} + +static int next_message(struct pw_client_node0_transport *trans, struct pw_client_node0_message *message) +{ + struct transport *impl = (struct transport *) trans; + int32_t avail; + + if (impl == NULL || message == NULL) + return -EINVAL; + + avail = spa_ringbuffer_get_read_index(trans->input_buffer, &impl->current_index); + if (avail < (int) sizeof(struct pw_client_node0_message)) + return 0; + + spa_ringbuffer_read_data(trans->input_buffer, + trans->input_data, INPUT_BUFFER_SIZE, + impl->current_index & (INPUT_BUFFER_SIZE - 1), + &impl->current, sizeof(struct pw_client_node0_message)); + + if (avail < (int) SPA_POD_SIZE(&impl->current)) + return 0; + + *message = impl->current; + + return 1; +} + +static int parse_message(struct pw_client_node0_transport *trans, void *message) +{ + struct transport *impl = (struct transport *) trans; + uint32_t size; + + if (impl == NULL || message == NULL) + return -EINVAL; + + size = SPA_POD_SIZE(&impl->current); + + spa_ringbuffer_read_data(trans->input_buffer, + trans->input_data, INPUT_BUFFER_SIZE, + impl->current_index & (INPUT_BUFFER_SIZE - 1), message, size); + spa_ringbuffer_read_update(trans->input_buffer, impl->current_index + size); + + return 0; +} + +/** Create a new transport + * \param max_input_ports maximum number of input_ports + * \param max_output_ports maximum number of output_ports + * \return a newly allocated \ref pw_client_node0_transport + * \memberof pw_client_node0_transport + */ +struct pw_client_node0_transport * +pw_client_node0_transport_new(struct pw_context *context, + uint32_t max_input_ports, uint32_t max_output_ports) +{ + struct transport *impl; + struct pw_client_node0_transport *trans; + struct pw_client_node0_area area = { 0 }; + + area.max_input_ports = max_input_ports; + area.n_input_ports = 0; + area.max_output_ports = max_output_ports; + area.n_output_ports = 0; + + impl = calloc(1, sizeof(struct transport)); + if (impl == NULL) + return NULL; + + pw_log_debug("transport %p: new %d %d", impl, max_input_ports, max_output_ports); + + trans = &impl->trans; + impl->offset = 0; + + impl->mem = pw_mempool_alloc(context->pool, + PW_MEMBLOCK_FLAG_READWRITE | + PW_MEMBLOCK_FLAG_MAP | + PW_MEMBLOCK_FLAG_SEAL, + SPA_DATA_MemFd, area_get_size(&area)); + if (impl->mem == NULL) { + free(impl); + return NULL; + } + + memcpy(impl->mem->map->ptr, &area, sizeof(struct pw_client_node0_area)); + transport_setup_area(impl->mem->map->ptr, trans); + transport_reset_area(trans); + + trans->destroy = destroy; + trans->add_message = add_message; + trans->next_message = next_message; + trans->parse_message = parse_message; + + return trans; +} + +struct pw_client_node0_transport * +pw_client_node0_transport_new_from_info(struct pw_client_node0_transport_info *info) +{ + errno = ENOTSUP; + return NULL; +} + +/** Get transport info + * \param trans the transport to get info of + * \param[out] info transport info + * \return 0 on success + * + * Fill \a info with the transport info of \a trans. This information can be + * passed to the client to set up the shared transport. + * + * \memberof pw_client_node0_transport + */ +int pw_client_node0_transport_get_info(struct pw_client_node0_transport *trans, + struct pw_client_node0_transport_info *info) +{ + struct transport *impl = (struct transport *) trans; + + info->memfd = impl->mem->fd; + info->offset = impl->offset; + info->size = impl->mem->size; + + return 0; +} diff --git a/src/modules/module-client-node/v0/transport.h b/src/modules/module-client-node/v0/transport.h new file mode 100644 index 0000000..3abcd6d --- /dev/null +++ b/src/modules/module-client-node/v0/transport.h @@ -0,0 +1,59 @@ +/* PipeWire + * + * Copyright © 2016 Wim Taymans <wim.taymans@gmail.com> + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef __PIPEWIRE_CLIENT_NODE0_TRANSPORT_H__ +#define __PIPEWIRE_CLIENT_NODE0_TRANSPORT_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <string.h> + +#include <spa/utils/defs.h> + +#include <pipewire/mem.h> + +/** information about the transport region \memberof pw_client_node */ +struct pw_client_node0_transport_info { + int memfd; /**< the memfd of the transport area */ + uint32_t offset; /**< offset to map \a memfd at */ + uint32_t size; /**< size of memfd mapping */ +}; + +struct pw_client_node0_transport * +pw_client_node0_transport_new(struct pw_context *context, uint32_t max_input_ports, uint32_t max_output_ports); + +struct pw_client_node0_transport * +pw_client_node0_transport_new_from_info(struct pw_client_node0_transport_info *info); + +int +pw_client_node0_transport_get_info(struct pw_client_node0_transport *trans, + struct pw_client_node0_transport_info *info); + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* __PIPEWIRE_CLIENT_NODE0_TRANSPORT_H__ */ |