diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:28:17 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:28:17 +0000 |
commit | 7a46c07230b8d8108c0e8e80df4522d0ac116538 (patch) | |
tree | d483300dab478b994fe199a5d19d18d74153718a /src/modules/module-client-node | |
parent | Initial commit. (diff) | |
download | pipewire-upstream.tar.xz pipewire-upstream.zip |
Adding upstream version 0.3.65.upstream/0.3.65upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/modules/module-client-node.c | 229 | ||||
-rw-r--r-- | src/modules/module-client-node/client-node.c | 1777 | ||||
-rw-r--r-- | src/modules/module-client-node/client-node.h | 60 | ||||
-rw-r--r-- | src/modules/module-client-node/protocol-native.c | 1259 | ||||
-rw-r--r-- | src/modules/module-client-node/remote-node.c | 1339 | ||||
-rw-r--r-- | src/modules/module-client-node/v0/client-node.c | 1447 | ||||
-rw-r--r-- | src/modules/module-client-node/v0/client-node.h | 101 | ||||
-rw-r--r-- | src/modules/module-client-node/v0/ext-client-node.h | 414 | ||||
-rw-r--r-- | src/modules/module-client-node/v0/protocol-native.c | 534 | ||||
-rw-r--r-- | src/modules/module-client-node/v0/transport.c | 262 | ||||
-rw-r--r-- | src/modules/module-client-node/v0/transport.h | 59 |
11 files changed, 7481 insertions, 0 deletions
diff --git a/src/modules/module-client-node.c b/src/modules/module-client-node.c new file mode 100644 index 0000000..4dc9bf8 --- /dev/null +++ b/src/modules/module-client-node.c @@ -0,0 +1,229 @@ +/* PipeWire + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include <string.h> +#include <stdio.h> +#include <errno.h> +#include <dlfcn.h> + +#include "config.h" + +#include <spa/utils/result.h> + +#include <pipewire/impl.h> + +#include "module-client-node/v0/client-node.h" +#include "module-client-node/client-node.h" + +/** \page page_module_client_node PipeWire Module: Client Node + */ + +#define NAME "client-node" + +PW_LOG_TOPIC(mod_topic, "mod." NAME); +#define PW_LOG_TOPIC_DEFAULT mod_topic + +static const struct spa_dict_item module_props[] = { + { PW_KEY_MODULE_AUTHOR, "Wim Taymans <wim.taymans@gmail.com>" }, + { PW_KEY_MODULE_DESCRIPTION, "Allow clients to create and control remote nodes" }, + { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, +}; + +struct pw_proxy *pw_core_node_export(struct pw_core *core, + const char *type, const struct spa_dict *props, void *object, size_t user_data_size); +struct pw_proxy *pw_core_spa_node_export(struct pw_core *core, + const char *type, const struct spa_dict *props, void *object, size_t user_data_size); + +struct pw_protocol *pw_protocol_native_ext_client_node_init(struct pw_context *context); +struct pw_protocol *pw_protocol_native_ext_client_node0_init(struct pw_context *context); + +struct factory_data { + struct pw_impl_factory *factory; + struct spa_hook factory_listener; + + struct pw_impl_module *module; + struct spa_hook module_listener; + + struct pw_export_type export_node; + struct pw_export_type export_spanode; +}; + +static void *create_object(void *_data, + struct pw_resource *resource, + const char *type, + uint32_t version, + struct pw_properties *properties, + uint32_t new_id) +{ + void *result; + struct pw_resource *node_resource; + struct pw_impl_client *client; + int res; + + if (resource == NULL) { + res = -EINVAL; + goto error_exit; + } + + client = pw_resource_get_client(resource); + node_resource = pw_resource_new(client, new_id, PW_PERM_ALL, type, version, 0); + if (node_resource == NULL) { + res = -errno; + goto error_resource; + } + + if (version == 0) { + result = pw_impl_client_node0_new(node_resource, properties); + } else { + result = pw_impl_client_node_new(node_resource, properties, true); + } + if (result == NULL) { + res = -errno; + goto error_node; + } + return result; + +error_resource: + pw_log_error("can't create resource: %s", spa_strerror(res)); + pw_resource_errorf_id(resource, new_id, res, "can't create resource: %s", spa_strerror(res)); + goto error_exit; +error_node: + pw_log_error("can't create node: %s", spa_strerror(res)); + pw_resource_errorf_id(resource, new_id, res, "can't create node: %s", spa_strerror(res)); + goto error_exit; +error_exit: + errno = -res; + return NULL; +} + +static const struct pw_impl_factory_implementation impl_factory = { + PW_VERSION_IMPL_FACTORY_IMPLEMENTATION, + .create_object = create_object, +}; + +static void factory_destroy(void *data) +{ + struct factory_data *d = data; + spa_hook_remove(&d->factory_listener); + d->factory = NULL; + if (d->module) + pw_impl_module_destroy(d->module); +} + +static const struct pw_impl_factory_events factory_events = { + PW_VERSION_IMPL_FACTORY_EVENTS, + .destroy = factory_destroy, +}; + +static void module_destroy(void *data) +{ + struct factory_data *d = data; + + spa_hook_remove(&d->module_listener); + spa_list_remove(&d->export_node.link); + spa_list_remove(&d->export_spanode.link); + + d->module = NULL; + if (d->factory) + pw_impl_factory_destroy(d->factory); +} + +static void module_registered(void *data) +{ + struct factory_data *d = data; + struct pw_impl_module *module = d->module; + struct pw_impl_factory *factory = d->factory; + struct spa_dict_item items[1]; + char id[16]; + int res; + + snprintf(id, sizeof(id), "%d", pw_global_get_id(pw_impl_module_get_global(module))); + items[0] = SPA_DICT_ITEM_INIT(PW_KEY_MODULE_ID, id); + pw_impl_factory_update_properties(factory, &SPA_DICT_INIT(items, 1)); + + if ((res = pw_impl_factory_register(factory, NULL)) < 0) { + pw_log_error("%p: can't register factory: %s", factory, spa_strerror(res)); + } +} + +static const struct pw_impl_module_events module_events = { + PW_VERSION_IMPL_MODULE_EVENTS, + .destroy = module_destroy, + .registered = module_registered, +}; + +SPA_EXPORT +int pipewire__module_init(struct pw_impl_module *module, const char *args) +{ + struct pw_context *context = pw_impl_module_get_context(module); + struct pw_impl_factory *factory; + struct factory_data *data; + int res; + + PW_LOG_TOPIC_INIT(mod_topic); + + factory = pw_context_create_factory(context, + "client-node", + PW_TYPE_INTERFACE_ClientNode, + PW_VERSION_CLIENT_NODE, + NULL, + sizeof(*data)); + if (factory == NULL) + return -errno; + + data = pw_impl_factory_get_user_data(factory); + data->factory = factory; + data->module = module; + + pw_log_debug("module %p: new", module); + + pw_impl_factory_set_implementation(factory, + &impl_factory, + data); + + data->export_node.type = PW_TYPE_INTERFACE_Node; + data->export_node.func = pw_core_node_export; + if ((res = pw_context_register_export_type(context, &data->export_node)) < 0) + goto error; + + data->export_spanode.type = SPA_TYPE_INTERFACE_Node; + data->export_spanode.func = pw_core_spa_node_export; + if ((res = pw_context_register_export_type(context, &data->export_spanode)) < 0) + goto error_remove; + + pw_protocol_native_ext_client_node_init(context); + pw_protocol_native_ext_client_node0_init(context); + + pw_impl_factory_add_listener(factory, &data->factory_listener, &factory_events, data); + pw_impl_module_add_listener(module, &data->module_listener, &module_events, data); + + pw_impl_module_update_properties(module, &SPA_DICT_INIT_ARRAY(module_props)); + + return 0; +error_remove: + spa_list_remove(&data->export_node.link); +error: + pw_impl_factory_destroy(data->factory); + return res; +} diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c new file mode 100644 index 0000000..d5c133a --- /dev/null +++ b/src/modules/module-client-node/client-node.c @@ -0,0 +1,1777 @@ +/* PipeWire + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include <string.h> +#include <stddef.h> +#include <stdio.h> +#include <errno.h> +#include <unistd.h> +#include <time.h> + +#include <spa/support/system.h> +#include <spa/node/node.h> +#include <spa/node/utils.h> +#include <spa/pod/filter.h> +#include <spa/pod/dynamic.h> +#include <spa/pod/parser.h> +#include <spa/debug/types.h> + +#include <pipewire/pipewire.h> +#include "pipewire/private.h" + +#include "modules/spa/spa-node.h" +#include "client-node.h" + +PW_LOG_TOPIC_EXTERN(mod_topic); +#define PW_LOG_TOPIC_DEFAULT mod_topic + +/** \cond */ + +#define MAX_BUFFERS 64 +#define MAX_METAS 16u +#define MAX_DATAS 64u +#define MAX_AREAS 2048 + +#define CHECK_FREE_PORT(this,d,p) (p <= pw_map_get_size(&this->ports[d]) && !CHECK_PORT(this,d,p)) +#define CHECK_PORT(this,d,p) (pw_map_lookup(&this->ports[d], p) != NULL) +#define GET_PORT(this,d,p) (pw_map_lookup(&this->ports[d], p)) + +#define CHECK_PORT_BUFFER(this,b,p) (b < p->n_buffers) + +struct buffer { + struct spa_buffer *outbuf; + struct spa_buffer buffer; + struct spa_meta metas[MAX_METAS]; + struct spa_data datas[MAX_DATAS]; + struct pw_memblock *mem; +}; + +struct mix { + unsigned int valid:1; + uint32_t id; + struct port *port; + uint32_t peer_id; + uint32_t n_buffers; + struct buffer buffers[MAX_BUFFERS]; +}; + +struct params { + uint32_t n_params; + struct spa_pod **params; +}; + +struct port { + struct pw_impl_port *port; + struct node *node; + struct impl *impl; + + enum spa_direction direction; + uint32_t id; + + struct spa_node mix_node; + + struct spa_port_info info; + struct pw_properties *properties; + + struct params params; + + unsigned int removed:1; + unsigned int destroyed:1; + + struct pw_array mix; +}; + +struct node { + struct spa_node node; + + struct impl *impl; + + struct spa_log *log; + struct spa_loop *data_loop; + struct spa_system *data_system; + + struct spa_hook_list hooks; + struct spa_callbacks callbacks; + + struct pw_resource *resource; + struct pw_impl_client *client; + + struct spa_source data_source; + int writefd; + + struct pw_map ports[2]; + + struct port dummy; + + struct params params; +}; + +struct impl { + struct pw_impl_client_node this; + + struct pw_context *context; + + struct node node; + + struct pw_map io_map; + struct pw_memblock *io_areas; + + struct pw_memblock *activation; + + struct spa_hook node_listener; + struct spa_hook resource_listener; + struct spa_hook object_listener; + + uint32_t node_id; + + uint32_t bind_node_version; + uint32_t bind_node_id; + + int fds[2]; + int other_fds[2]; +}; + +#define pw_client_node_resource(r,m,v,...) \ + pw_resource_call_res(r,struct pw_client_node_events,m,v,__VA_ARGS__) + +#define pw_client_node_resource_transport(r,...) \ + pw_client_node_resource(r,transport,0,__VA_ARGS__) +#define pw_client_node_resource_set_param(r,...) \ + pw_client_node_resource(r,set_param,0,__VA_ARGS__) +#define pw_client_node_resource_set_io(r,...) \ + pw_client_node_resource(r,set_io,0,__VA_ARGS__) +#define pw_client_node_resource_event(r,...) \ + pw_client_node_resource(r,event,0,__VA_ARGS__) +#define pw_client_node_resource_command(r,...) \ + pw_client_node_resource(r,command,0,__VA_ARGS__) +#define pw_client_node_resource_add_port(r,...) \ + pw_client_node_resource(r,add_port,0,__VA_ARGS__) +#define pw_client_node_resource_remove_port(r,...) \ + pw_client_node_resource(r,remove_port,0,__VA_ARGS__) +#define pw_client_node_resource_port_set_param(r,...) \ + pw_client_node_resource(r,port_set_param,0,__VA_ARGS__) +#define pw_client_node_resource_port_use_buffers(r,...) \ + pw_client_node_resource(r,port_use_buffers,0,__VA_ARGS__) +#define pw_client_node_resource_port_set_io(r,...) \ + pw_client_node_resource(r,port_set_io,0,__VA_ARGS__) +#define pw_client_node_resource_set_activation(r,...) \ + pw_client_node_resource(r,set_activation,0,__VA_ARGS__) +#define pw_client_node_resource_port_set_mix_info(r,...) \ + pw_client_node_resource(r,port_set_mix_info,1,__VA_ARGS__) + +static int update_params(struct params *p, uint32_t n_params, const struct spa_pod **params) +{ + uint32_t i; + for (i = 0; i < p->n_params; i++) + free(p->params[i]); + p->n_params = n_params; + if (p->n_params == 0) { + free(p->params); + p->params = NULL; + } else { + struct spa_pod **np; + np = pw_reallocarray(p->params, p->n_params, sizeof(struct spa_pod *)); + if (np == NULL) { + pw_log_error("%p: can't realloc: %m", p); + free(p->params); + p->params = NULL; + p->n_params = 0; + return -errno; + } + p->params = np; + } + for (i = 0; i < p->n_params; i++) + p->params[i] = params[i] ? spa_pod_copy(params[i]) : NULL; + return 0; +} + +static int +do_port_use_buffers(struct impl *impl, + enum spa_direction direction, + uint32_t port_id, + uint32_t mix_id, + uint32_t flags, + struct spa_buffer **buffers, + uint32_t n_buffers); + +/** \endcond */ + +static struct mix *find_mix(struct port *p, uint32_t mix_id) +{ + struct mix *mix; + size_t len; + + if (mix_id == SPA_ID_INVALID) + mix_id = 0; + else + mix_id++; + + len = pw_array_get_len(&p->mix, struct mix); + if (mix_id >= len) { + size_t need = sizeof(struct mix) * (mix_id + 1 - len); + void *ptr = pw_array_add(&p->mix, need); + if (ptr == NULL) + return NULL; + memset(ptr, 0, need); + } + mix = pw_array_get_unchecked(&p->mix, mix_id, struct mix); + return mix; +} + +static void mix_init(struct mix *mix, struct port *p, uint32_t id) +{ + mix->valid = true; + mix->id = id; + mix->port = p; + mix->n_buffers = 0; +} + +static struct mix *ensure_mix(struct impl *impl, struct port *p, uint32_t mix_id) +{ + struct mix *mix; + + if ((mix = find_mix(p, mix_id)) == NULL) + return NULL; + if (mix->valid) + return mix; + mix_init(mix, p, mix_id); + return mix; +} + +static void clear_data(struct node *this, struct spa_data *d) +{ + struct impl *impl = this->impl; + + switch (d->type) { + case SPA_DATA_MemId: + { + uint32_t id; + struct pw_memblock *m; + + id = SPA_PTR_TO_UINT32(d->data); + m = pw_mempool_find_id(this->client->pool, id); + if (m) { + pw_log_debug("%p: mem %d", impl, m->id); + pw_memblock_unref(m); + } + break; + } + case SPA_DATA_MemFd: + case SPA_DATA_DmaBuf: + pw_log_debug("%p: close fd:%d", impl, (int)d->fd); + close(d->fd); + break; + default: + break; + } +} + +static int clear_buffers(struct node *this, struct mix *mix) +{ + uint32_t i, j; + + for (i = 0; i < mix->n_buffers; i++) { + struct buffer *b = &mix->buffers[i]; + + spa_log_debug(this->log, "%p: clear buffer %d", this, i); + + for (j = 0; j < b->buffer.n_datas; j++) { + struct spa_data *d = &b->datas[j]; + clear_data(this, d); + } + pw_memblock_unref(b->mem); + } + mix->n_buffers = 0; + return 0; +} + +static void mix_clear(struct node *this, struct mix *mix) +{ + struct port *port = mix->port; + + if (!mix->valid) + return; + do_port_use_buffers(this->impl, port->direction, port->id, + mix->id, 0, NULL, 0); + mix->valid = false; +} + +static int impl_node_enum_params(void *object, int seq, + uint32_t id, uint32_t start, uint32_t num, + const struct spa_pod *filter) +{ + struct node *this = object; + uint8_t buffer[1024]; + struct spa_pod_dynamic_builder b; + struct spa_result_node_params result; + uint32_t count = 0; + bool found = false; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(num != 0, -EINVAL); + + result.id = id; + result.next = 0; + + while (true) { + struct spa_pod *param; + + result.index = result.next++; + if (result.index >= this->params.n_params) + break; + + param = this->params.params[result.index]; + + if (param == NULL || !spa_pod_is_object_id(param, id)) + continue; + + found = true; + + if (result.index < start) + continue; + + spa_pod_dynamic_builder_init(&b, buffer, sizeof(buffer), 4096); + if (spa_pod_filter(&b.b, &result.param, param, filter) == 0) { + pw_log_debug("%p: %d param %u", this, seq, result.index); + spa_node_emit_result(&this->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result); + count++; + } + spa_pod_dynamic_builder_clean(&b); + + if (count == num) + break; + } + return found ? 0 : -ENOENT; +} + +static int impl_node_set_param(void *object, uint32_t id, uint32_t flags, + const struct spa_pod *param) +{ + struct node *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + if (this->resource == NULL) + return param == NULL ? 0 : -EIO; + + return pw_client_node_resource_set_param(this->resource, id, flags, param); +} + +static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size) +{ + struct node *this = object; + struct impl *impl = this->impl; + struct pw_memmap *mm, *old; + uint32_t memid, mem_offset, mem_size; + uint32_t tag[5] = { impl->node_id, id, }; + + if (impl->this.flags & 1) + return 0; + + old = pw_mempool_find_tag(this->client->pool, tag, sizeof(tag)); + + if (data) { + mm = pw_mempool_import_map(this->client->pool, + impl->context->pool, data, size, tag); + if (mm == NULL) + return -errno; + + mem_offset = mm->offset; + memid = mm->block->id; + mem_size = size; + } + else { + memid = SPA_ID_INVALID; + mem_offset = mem_size = 0; + } + pw_memmap_free(old); + + if (this->resource == NULL) + return data == NULL ? 0 : -EIO; + + return pw_client_node_resource_set_io(this->resource, + id, + memid, + mem_offset, mem_size); +} + +static int impl_node_send_command(void *object, const struct spa_command *command) +{ + struct node *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(command != NULL, -EINVAL); + + pw_log_debug("%p: send command %d", this, SPA_COMMAND_TYPE(command)); + + if (this->resource == NULL) + return -EIO; + + return pw_client_node_resource_command(this->resource, command); +} + + +static void emit_port_info(struct node *this, struct port *port) +{ + spa_node_emit_port_info(&this->hooks, + port->direction, port->id, &port->info); +} + +static int impl_node_add_listener(void *object, + struct spa_hook *listener, + const struct spa_node_events *events, + void *data) +{ + struct node *this = object; + struct spa_hook_list save; + union pw_map_item *item; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + spa_hook_list_isolate(&this->hooks, &save, listener, events, data); + + pw_array_for_each(item, &this->ports[SPA_DIRECTION_INPUT].items) { + if (item->data) + emit_port_info(this, item->data); + } + pw_array_for_each(item, &this->ports[SPA_DIRECTION_OUTPUT].items) { + if (item->data) + emit_port_info(this, item->data); + } + spa_hook_list_join(&this->hooks, &save); + + return 0; +} + +static int +impl_node_set_callbacks(void *object, + const struct spa_node_callbacks *callbacks, + void *data) +{ + struct node *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + this->callbacks = SPA_CALLBACKS_INIT(callbacks, data); + + return 0; +} + +static int +impl_node_sync(void *object, int seq) +{ + struct node *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + pw_log_debug("%p: sync", this); + + if (this->resource == NULL) + return -EIO; + + return pw_resource_ping(this->resource, seq); +} + +static void +do_update_port(struct node *this, + struct port *port, + uint32_t change_mask, + uint32_t n_params, + const struct spa_pod **params, + const struct spa_port_info *info) +{ + if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_PARAMS) { + spa_log_debug(this->log, "%p: port %u update %d params", this, port->id, n_params); + update_params(&port->params, n_params, params); + } + + if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_INFO) { + pw_properties_free(port->properties); + port->properties = NULL; + port->info.props = NULL; + port->info.n_params = 0; + port->info.params = NULL; + + if (info) { + port->info = *info; + if (info->props) { + port->properties = pw_properties_new_dict(info->props); + port->info.props = &port->properties->dict; + } + port->info.n_params = 0; + port->info.params = NULL; + spa_node_emit_port_info(&this->hooks, port->direction, port->id, info); + } + } +} + +static void +clear_port(struct node *this, struct port *port) +{ + struct mix *mix; + + spa_log_debug(this->log, "%p: clear port %p", this, port); + + do_update_port(this, port, + PW_CLIENT_NODE_PORT_UPDATE_PARAMS | + PW_CLIENT_NODE_PORT_UPDATE_INFO, 0, NULL, NULL); + + pw_array_for_each(mix, &port->mix) + mix_clear(this, mix); + pw_array_clear(&port->mix); + pw_array_init(&port->mix, sizeof(struct mix) * 2); + + pw_map_insert_at(&this->ports[port->direction], port->id, NULL); + + if (!port->removed) + spa_node_emit_port_info(&this->hooks, port->direction, port->id, NULL); +} + +static int +impl_node_add_port(void *object, enum spa_direction direction, uint32_t port_id, + const struct spa_dict *props) +{ + struct node *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(CHECK_FREE_PORT(this, direction, port_id), -EINVAL); + + if (this->resource == NULL) + return -EIO; + + return pw_client_node_resource_add_port(this->resource, direction, port_id, props); +} + +static int +impl_node_remove_port(void *object, enum spa_direction direction, uint32_t port_id) +{ + struct node *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); + + if (this->resource == NULL) + return -EIO; + + return pw_client_node_resource_remove_port(this->resource, direction, port_id); +} + +static int +impl_node_port_enum_params(void *object, int seq, + enum spa_direction direction, uint32_t port_id, + uint32_t id, uint32_t start, uint32_t num, + const struct spa_pod *filter) +{ + struct node *this = object; + struct port *port; + uint8_t buffer[1024]; + struct spa_pod_dynamic_builder b; + struct spa_result_node_params result; + uint32_t count = 0; + bool found = false; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(num != 0, -EINVAL); + + port = GET_PORT(this, direction, port_id); + spa_return_val_if_fail(port != NULL, -EINVAL); + + pw_log_debug("%p: seq:%d port %d.%d id:%u start:%u num:%u n_params:%d", + this, seq, direction, port_id, id, start, num, port->params.n_params); + + result.id = id; + result.next = 0; + + while (true) { + struct spa_pod *param; + + result.index = result.next++; + if (result.index >= port->params.n_params) + break; + + param = port->params.params[result.index]; + + if (param == NULL || !spa_pod_is_object_id(param, id)) + continue; + + found = true; + + if (result.index < start) + continue; + + spa_pod_dynamic_builder_init(&b, buffer, sizeof(buffer), 4096); + if (spa_pod_filter(&b.b, &result.param, param, filter) == 0) { + pw_log_debug("%p: %d param %u", this, seq, result.index); + spa_node_emit_result(&this->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result); + count++; + } + spa_pod_dynamic_builder_clean(&b); + + if (count == num) + break; + } + return found ? 0 : -ENOENT; +} + +static int +impl_node_port_set_param(void *object, + enum spa_direction direction, uint32_t port_id, + uint32_t id, uint32_t flags, + const struct spa_pod *param) +{ + struct node *this = object; + struct port *port; + struct mix *mix; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + port = GET_PORT(this, direction, port_id); + if(port == NULL) + return param == NULL ? 0 : -EINVAL; + + pw_log_debug("%p: port %d.%d set param %s %d", this, + direction, port_id, + spa_debug_type_find_name(spa_type_param, id), id); + + if (id == SPA_PARAM_Format) { + pw_array_for_each(mix, &port->mix) + clear_buffers(this, mix); + } + if (this->resource == NULL) + return param == NULL ? 0 : -EIO; + + return pw_client_node_resource_port_set_param(this->resource, + direction, port_id, + id, flags, + param); +} + +static int do_port_set_io(struct impl *impl, + enum spa_direction direction, uint32_t port_id, + uint32_t mix_id, + uint32_t id, void *data, size_t size) +{ + struct node *this = &impl->node; + uint32_t memid, mem_offset, mem_size; + struct port *port; + struct mix *mix; + uint32_t tag[5] = { impl->node_id, direction, port_id, mix_id, id }; + struct pw_memmap *mm, *old; + + pw_log_debug("%p: %s port %d.%d set io %p %zd", this, + direction == SPA_DIRECTION_INPUT ? "input" : "output", + port_id, mix_id, data, size); + + port = GET_PORT(this, direction, port_id); + if (port == NULL) + return data == NULL ? 0 : -EINVAL; + + if ((mix = find_mix(port, mix_id)) == NULL || !mix->valid) + return -EINVAL; + + old = pw_mempool_find_tag(this->client->pool, tag, sizeof(tag)); + + if (data) { + mm = pw_mempool_import_map(this->client->pool, + impl->context->pool, data, size, tag); + if (mm == NULL) + return -errno; + + mem_offset = mm->offset; + memid = mm->block->id; + mem_size = size; + } + else { + memid = SPA_ID_INVALID; + mem_offset = mem_size = 0; + } + pw_memmap_free(old); + + if (this->resource == NULL) + return data == NULL ? 0 : -EIO; + + return pw_client_node_resource_port_set_io(this->resource, + direction, port_id, + mix_id, + id, + memid, + mem_offset, mem_size); +} + +static int +impl_node_port_set_io(void *object, + enum spa_direction direction, + uint32_t port_id, + uint32_t id, + void *data, size_t size) +{ + /* ignore io on the node itself, we only care about the io on the + * port mixers, the io on the node ports itself is handled on the + * client side */ + return -EINVAL; +} + +static int +do_port_use_buffers(struct impl *impl, + enum spa_direction direction, + uint32_t port_id, + uint32_t mix_id, + uint32_t flags, + struct spa_buffer **buffers, + uint32_t n_buffers) +{ + struct node *this = &impl->node; + struct port *p; + struct mix *mix; + uint32_t i, j; + struct pw_client_node_buffer *mb; + + p = GET_PORT(this, direction, port_id); + if (p == NULL) + return n_buffers == 0 ? 0 : -EINVAL; + + if (n_buffers > MAX_BUFFERS) + return -ENOSPC; + + spa_log_debug(this->log, "%p: %s port %d.%d use buffers %p %u flags:%08x", this, + direction == SPA_DIRECTION_INPUT ? "input" : "output", + port_id, mix_id, buffers, n_buffers, flags); + + if ((mix = find_mix(p, mix_id)) == NULL || !mix->valid) + return -EINVAL; + + if (direction == SPA_DIRECTION_OUTPUT) { + mix_id = SPA_ID_INVALID; + if ((mix = find_mix(p, mix_id)) == NULL || !mix->valid) + return -EINVAL; + } + + clear_buffers(this, mix); + + if (n_buffers > 0) { + mb = alloca(n_buffers * sizeof(struct pw_client_node_buffer)); + } else { + mb = NULL; + } + + if (this->resource == NULL) + return n_buffers == 0 ? 0 : -EIO; + + if (p->destroyed) + return 0; + + for (i = 0; i < n_buffers; i++) { + struct buffer *b = &mix->buffers[i]; + struct pw_memblock *mem, *m; + void *baseptr, *endptr; + + b->outbuf = buffers[i]; + memcpy(&b->buffer, buffers[i], sizeof(struct spa_buffer)); + b->buffer.datas = b->datas; + b->buffer.metas = b->metas; + + if (buffers[i]->n_metas > 0) + baseptr = buffers[i]->metas[0].data; + else if (buffers[i]->n_datas > 0) + baseptr = buffers[i]->datas[0].chunk; + else + return -EINVAL; + + if ((mem = pw_mempool_find_ptr(impl->context->pool, baseptr)) == NULL) + return -EINVAL; + + endptr = SPA_PTROFF(baseptr, buffers[i]->n_datas * sizeof(struct spa_chunk), void); + for (j = 0; j < buffers[i]->n_metas; j++) { + endptr = SPA_PTROFF(endptr, SPA_ROUND_UP_N(buffers[i]->metas[j].size, 8), void); + } + for (j = 0; j < buffers[i]->n_datas; j++) { + struct spa_data *d = &buffers[i]->datas[j]; + if (d->type == SPA_DATA_MemPtr) { + if ((m = pw_mempool_find_ptr(impl->context->pool, d->data)) == NULL || + m != mem) + return -EINVAL; + endptr = SPA_MAX(endptr, SPA_PTROFF(d->data, d->maxsize, void)); + } + } + if (endptr > SPA_PTROFF(baseptr, mem->size, void)) + return -EINVAL; + + m = pw_mempool_import_block(this->client->pool, mem); + if (m == NULL) + return -errno; + + b->mem = m; + + mb[i].buffer = &b->buffer; + mb[i].mem_id = m->id; + mb[i].offset = SPA_PTRDIFF(baseptr, mem->map->ptr); + mb[i].size = SPA_PTRDIFF(endptr, baseptr); + spa_log_debug(this->log, "%p: buffer %d %d %d %d", this, i, mb[i].mem_id, + mb[i].offset, mb[i].size); + + b->buffer.n_metas = SPA_MIN(buffers[i]->n_metas, MAX_METAS); + for (j = 0; j < b->buffer.n_metas; j++) + memcpy(&b->buffer.metas[j], &buffers[i]->metas[j], sizeof(struct spa_meta)); + + b->buffer.n_datas = SPA_MIN(buffers[i]->n_datas, MAX_DATAS); + for (j = 0; j < b->buffer.n_datas; j++) { + struct spa_data *d = &buffers[i]->datas[j]; + + memcpy(&b->datas[j], d, sizeof(struct spa_data)); + + if (flags & SPA_NODE_BUFFERS_FLAG_ALLOC) + continue; + + switch (d->type) { + case SPA_DATA_DmaBuf: + case SPA_DATA_MemFd: + { + uint32_t flags = PW_MEMBLOCK_FLAG_DONT_CLOSE; + + if (d->flags & SPA_DATA_FLAG_READABLE) + flags |= PW_MEMBLOCK_FLAG_READABLE; + if (d->flags & SPA_DATA_FLAG_WRITABLE) + flags |= PW_MEMBLOCK_FLAG_WRITABLE; + + spa_log_debug(this->log, "mem %d type:%d fd:%d", j, d->type, (int)d->fd); + m = pw_mempool_import(this->client->pool, + flags, d->type, d->fd); + if (m == NULL) + return -errno; + + b->datas[j].type = SPA_DATA_MemId; + b->datas[j].data = SPA_UINT32_TO_PTR(m->id); + break; + } + case SPA_DATA_MemPtr: + spa_log_debug(this->log, "mem %d %zd", j, SPA_PTRDIFF(d->data, baseptr)); + b->datas[j].data = SPA_INT_TO_PTR(SPA_PTRDIFF(d->data, baseptr)); + break; + default: + b->datas[j].type = SPA_ID_INVALID; + b->datas[j].data = NULL; + spa_log_error(this->log, "invalid memory type %d", d->type); + break; + } + } + } + mix->n_buffers = n_buffers; + + return pw_client_node_resource_port_use_buffers(this->resource, + direction, port_id, mix_id, flags, + n_buffers, mb); +} + +static int +impl_node_port_use_buffers(void *object, + enum spa_direction direction, + uint32_t port_id, + uint32_t flags, + struct spa_buffer **buffers, + uint32_t n_buffers) +{ + struct node *this = object; + struct impl *impl; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + impl = this->impl; + + return do_port_use_buffers(impl, direction, port_id, + SPA_ID_INVALID, flags, buffers, n_buffers); +} + +static int +impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id) +{ + struct node *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(CHECK_PORT(this, SPA_DIRECTION_OUTPUT, port_id), -EINVAL); + + spa_log_trace_fp(this->log, "reuse buffer %d", buffer_id); + + return -ENOTSUP; +} + +static int impl_node_process(void *object) +{ + struct node *this = object; + struct impl *impl = this->impl; + struct pw_impl_node *n = impl->this.node; + struct timespec ts; + + spa_log_trace_fp(this->log, "%p: send process driver:%p", this, impl->this.node->driver_node); + + if (SPA_UNLIKELY(spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &ts) < 0)) + spa_zero(ts); + + n->rt.activation->status = PW_NODE_ACTIVATION_TRIGGERED; + n->rt.activation->signal_time = SPA_TIMESPEC_TO_NSEC(&ts); + + if (SPA_UNLIKELY(spa_system_eventfd_write(this->data_system, this->writefd, 1) < 0)) + spa_log_warn(this->log, "%p: error %m", this); + + return SPA_STATUS_OK; +} + +static struct pw_node * +client_node_get_node(void *data, + uint32_t version, + size_t user_data_size) +{ + struct impl *impl = data; + struct node *this = &impl->node; + uint32_t new_id = user_data_size; + + pw_log_debug("%p: bind %u/%u", this, new_id, version); + + impl->bind_node_version = version; + impl->bind_node_id = new_id; + pw_map_insert_at(&this->client->objects, new_id, NULL); + + return NULL; +} + +static int +client_node_update(void *data, + uint32_t change_mask, + uint32_t n_params, + const struct spa_pod **params, + const struct spa_node_info *info) +{ + struct impl *impl = data; + struct node *this = &impl->node; + + if (change_mask & PW_CLIENT_NODE_UPDATE_PARAMS) { + pw_log_debug("%p: update %d params", this, n_params); + update_params(&this->params, n_params, params); + } + if (change_mask & PW_CLIENT_NODE_UPDATE_INFO) { + spa_node_emit_info(&this->hooks, info); + } + pw_log_debug("%p: got node update", this); + return 0; +} + +static int +client_node_port_update(void *data, + enum spa_direction direction, + uint32_t port_id, + uint32_t change_mask, + uint32_t n_params, + const struct spa_pod **params, + const struct spa_port_info *info) +{ + struct impl *impl = data; + struct node *this = &impl->node; + struct port *port; + bool remove; + + spa_log_debug(this->log, "%p: got port update change:%08x params:%d", + this, change_mask, n_params); + + remove = (change_mask == 0); + + port = GET_PORT(this, direction, port_id); + + if (remove) { + if (port == NULL) + return 0; + port->destroyed = true; + clear_port(this, port); + } else { + struct port *target; + + if (port == NULL) { + if (!CHECK_FREE_PORT(this, direction, port_id)) + return -EINVAL; + + target = &this->dummy; + spa_zero(this->dummy); + target->direction = direction; + target->id = port_id; + } else + target = port; + + do_update_port(this, + target, + change_mask, + n_params, params, + info); + } + return 0; +} + +static int client_node_set_active(void *data, bool active) +{ + struct impl *impl = data; + struct node *this = &impl->node; + spa_log_debug(this->log, "%p: active:%d", this, active); + return pw_impl_node_set_active(impl->this.node, active); +} + +static int client_node_event(void *data, const struct spa_event *event) +{ + struct impl *impl = data; + struct node *this = &impl->node; + spa_node_emit_event(&this->hooks, event); + return 0; +} + +static int client_node_port_buffers(void *data, + enum spa_direction direction, + uint32_t port_id, + uint32_t mix_id, + uint32_t n_buffers, + struct spa_buffer **buffers) +{ + struct impl *impl = data; + struct node *this = &impl->node; + struct port *p; + struct mix *mix; + uint32_t i, j; + + spa_log_debug(this->log, "%p: %s port %d.%d buffers %p %u", this, + direction == SPA_DIRECTION_INPUT ? "input" : "output", + port_id, mix_id, buffers, n_buffers); + + p = GET_PORT(this, direction, port_id); + spa_return_val_if_fail(p != NULL, -EINVAL); + + if (direction == SPA_DIRECTION_OUTPUT) + mix_id = SPA_ID_INVALID; + + if ((mix = find_mix(p, mix_id)) == NULL || !mix->valid) + return -EINVAL; + + if (mix->n_buffers != n_buffers) + return -EINVAL; + + for (i = 0; i < n_buffers; i++) { + struct spa_buffer *oldbuf, *newbuf; + struct buffer *b = &mix->buffers[i]; + + oldbuf = b->outbuf; + newbuf = buffers[i]; + + spa_log_debug(this->log, "buffer %d n_datas:%d", i, newbuf->n_datas); + + if (oldbuf->n_datas != newbuf->n_datas) + return -EINVAL; + + for (j = 0; j < b->buffer.n_datas; j++) { + struct spa_chunk *oldchunk = oldbuf->datas[j].chunk; + struct spa_data *d = &newbuf->datas[j]; + + /* overwrite everything except the chunk */ + oldbuf->datas[j] = *d; + oldbuf->datas[j].chunk = oldchunk; + + b->datas[j].type = d->type; + b->datas[j].fd = d->fd; + + spa_log_debug(this->log, " data %d type:%d fl:%08x fd:%d, offs:%d max:%d", + j, d->type, d->flags, (int) d->fd, d->mapoffset, + d->maxsize); + } + } + mix->n_buffers = n_buffers; + + return 0; +} + +static const struct pw_client_node_methods client_node_methods = { + PW_VERSION_CLIENT_NODE_METHODS, + .get_node = client_node_get_node, + .update = client_node_update, + .port_update = client_node_port_update, + .set_active = client_node_set_active, + .event = client_node_event, + .port_buffers = client_node_port_buffers, +}; + +static void node_on_data_fd_events(struct spa_source *source) +{ + struct node *this = source->data; + + if (source->rmask & (SPA_IO_ERR | SPA_IO_HUP)) { + spa_log_warn(this->log, "%p: got error", this); + return; + } + + if (source->rmask & SPA_IO_IN) { + uint64_t cmd; + struct pw_impl_node *node = this->impl->this.node; + + if (SPA_UNLIKELY(spa_system_eventfd_read(this->data_system, + this->data_source.fd, &cmd) < 0)) + pw_log_warn("%p: read failed %m", this); + else if (SPA_UNLIKELY(cmd > 1)) + pw_log_info("(%s-%u) client missed %"PRIu64" wakeups", + node->name, node->info.id, cmd - 1); + + spa_log_trace_fp(this->log, "%p: got ready", this); + spa_node_call_ready(&this->callbacks, SPA_STATUS_HAVE_DATA); + } +} + +static const struct spa_node_methods impl_node = { + SPA_VERSION_NODE_METHODS, + .add_listener = impl_node_add_listener, + .set_callbacks = impl_node_set_callbacks, + .sync = impl_node_sync, + .enum_params = impl_node_enum_params, + .set_param = impl_node_set_param, + .set_io = impl_node_set_io, + .send_command = impl_node_send_command, + .add_port = impl_node_add_port, + .remove_port = impl_node_remove_port, + .port_enum_params = impl_node_port_enum_params, + .port_set_param = impl_node_port_set_param, + .port_use_buffers = impl_node_port_use_buffers, + .port_set_io = impl_node_port_set_io, + .port_reuse_buffer = impl_node_port_reuse_buffer, + .process = impl_node_process, +}; + +static int +node_init(struct node *this, + struct spa_dict *info, + const struct spa_support *support, + uint32_t n_support) +{ + this->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log); + this->data_loop = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DataLoop); + this->data_system = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DataSystem); + + if (this->data_loop == NULL) { + spa_log_error(this->log, "a data-loop is needed"); + return -EINVAL; + } + if (this->data_system == NULL) { + spa_log_error(this->log, "a data-system is needed"); + return -EINVAL; + } + + this->node.iface = SPA_INTERFACE_INIT( + SPA_TYPE_INTERFACE_Node, + SPA_VERSION_NODE, + &impl_node, this); + spa_hook_list_init(&this->hooks); + + this->data_source.func = node_on_data_fd_events; + this->data_source.data = this; + this->data_source.fd = -1; + this->data_source.mask = SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP; + this->data_source.rmask = 0; + + return 0; +} + +static int node_clear(struct node *this) +{ + update_params(&this->params, 0, NULL); + return 0; +} + +static int do_remove_source(struct spa_loop *loop, + bool async, + uint32_t seq, + const void *data, + size_t size, + void *user_data) +{ + struct spa_source *source = user_data; + spa_loop_remove_source(loop, source); + return 0; +} + +static void client_node_resource_destroy(void *data) +{ + struct impl *impl = data; + struct pw_impl_client_node *this = &impl->this; + struct node *node = &impl->node; + + pw_log_debug("%p: destroy", node); + + impl->node.resource = this->resource = NULL; + spa_hook_remove(&impl->resource_listener); + spa_hook_remove(&impl->object_listener); + + if (node->data_source.fd != -1) { + spa_loop_invoke(node->data_loop, + do_remove_source, + SPA_ID_INVALID, + NULL, + 0, + true, + &node->data_source); + } + if (this->node) + pw_impl_node_destroy(this->node); +} + +static void client_node_resource_error(void *data, int seq, int res, const char *message) +{ + struct impl *impl = data; + struct node *this = &impl->node; + struct spa_result_node_error result; + + pw_log_error("%p: error seq:%d %d (%s)", this, seq, res, message); + result.message = message; + spa_node_emit_result(&this->hooks, seq, res, SPA_RESULT_TYPE_NODE_ERROR, &result); +} + +static void client_node_resource_pong(void *data, int seq) +{ + struct impl *impl = data; + struct node *this = &impl->node; + + pw_log_debug("%p: got pong, emit result %d", this, seq); + spa_node_emit_result(&this->hooks, seq, 0, 0, NULL); +} + +void pw_impl_client_node_registered(struct pw_impl_client_node *this, struct pw_global *global) +{ + struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); + struct pw_impl_node *node = this->node; + struct pw_impl_client *client = impl->node.client; + uint32_t node_id = global->id; + + pw_log_debug("%p: %d", &impl->node, node_id); + + impl->activation = pw_mempool_import_block(client->pool, node->activation); + if (impl->activation == NULL) { + pw_log_debug("%p: can't import block: %m", &impl->node); + return; + } + impl->node_id = node_id; + + if (this->resource == NULL) + return; + + pw_resource_set_bound_id(this->resource, node_id); + + pw_client_node_resource_transport(this->resource, + impl->other_fds[0], + impl->other_fds[1], + impl->activation->id, + 0, + sizeof(struct pw_node_activation)); + + if (impl->bind_node_id) { + pw_global_bind(global, client, PW_PERM_ALL, + impl->bind_node_version, impl->bind_node_id); + } +} + +static void node_initialized(void *data) +{ + struct impl *impl = data; + struct pw_impl_client_node *this = &impl->this; + struct node *node = &impl->node; + struct pw_global *global; + struct spa_system *data_system = impl->node.data_system; + size_t size; + + impl->fds[0] = spa_system_eventfd_create(data_system, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK); + impl->fds[1] = spa_system_eventfd_create(data_system, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK); + impl->other_fds[0] = impl->fds[1]; + impl->other_fds[1] = impl->fds[0]; + node->data_source.fd = impl->fds[0]; + node->writefd = impl->fds[1]; + + spa_loop_add_source(node->data_loop, &node->data_source); + pw_log_debug("%p: transport read-fd:%d write-fd:%d", node, impl->fds[0], impl->fds[1]); + + size = sizeof(struct spa_io_buffers) * MAX_AREAS; + + impl->io_areas = pw_mempool_alloc(impl->context->pool, + PW_MEMBLOCK_FLAG_READWRITE | + PW_MEMBLOCK_FLAG_MAP | + PW_MEMBLOCK_FLAG_SEAL, + SPA_DATA_MemFd, size); + if (impl->io_areas == NULL) + return; + + pw_log_debug("%p: io areas %p", node, impl->io_areas->map->ptr); + + if ((global = pw_impl_node_get_global(this->node)) != NULL) + pw_impl_client_node_registered(this, global); +} + +static void node_free(void *data) +{ + struct impl *impl = data; + struct pw_impl_client_node *this = &impl->this; + struct node *node = &impl->node; + struct spa_system *data_system = node->data_system; + uint32_t tag[5] = { impl->node_id, }; + struct pw_memmap *mm; + + this->node = NULL; + + pw_log_debug("%p: free", node); + node_clear(node); + + spa_hook_remove(&impl->node_listener); + + while ((mm = pw_mempool_find_tag(node->client->pool, tag, sizeof(uint32_t))) != NULL) + pw_memmap_free(mm); + + if (this->resource) + pw_resource_destroy(this->resource); + + if (impl->activation) + pw_memblock_unref(impl->activation); + if (impl->io_areas) + pw_memblock_unref(impl->io_areas); + + pw_map_clear(&impl->node.ports[0]); + pw_map_clear(&impl->node.ports[1]); + pw_map_clear(&impl->io_map); + + if (impl->fds[0] != -1) + spa_system_close(data_system, impl->fds[0]); + if (impl->fds[1] != -1) + spa_system_close(data_system, impl->fds[1]); + free(impl); +} + +static int port_init_mix(void *data, struct pw_impl_port_mix *mix) +{ + struct port *port = data; + struct impl *impl = port->impl; + struct mix *m; + + if ((m = ensure_mix(impl, port, mix->port.port_id)) == NULL) + return -ENOMEM; + + mix->id = pw_map_insert_new(&impl->io_map, NULL); + if (mix->id == SPA_ID_INVALID) { + m->valid = false; + return -errno; + } + if (mix->id > MAX_AREAS) { + pw_map_remove(&impl->io_map, mix->id); + m->valid = false; + return -ENOMEM; + } + + mix->io = SPA_PTROFF(impl->io_areas->map->ptr, + mix->id * sizeof(struct spa_io_buffers), void); + *mix->io = SPA_IO_BUFFERS_INIT; + + m->peer_id = mix->peer_id; + + pw_log_debug("%p: init mix id:%d io:%p base:%p", impl, + mix->id, mix->io, impl->io_areas->map->ptr); + + return 0; +} + +static int port_release_mix(void *data, struct pw_impl_port_mix *mix) +{ + struct port *port = data; + struct impl *impl = port->impl; + struct node *this = &impl->node; + struct mix *m; + + pw_log_debug("%p: remove mix id:%d io:%p base:%p", + this, mix->id, mix->io, impl->io_areas->map->ptr); + + if ((m = find_mix(port, mix->port.port_id)) == NULL || !m->valid) + return -EINVAL; + + pw_map_remove(&impl->io_map, mix->id); + m->valid = false; + + return 0; +} + +static const struct pw_impl_port_implementation port_impl = { + PW_VERSION_PORT_IMPLEMENTATION, + .init_mix = port_init_mix, + .release_mix = port_release_mix, +}; + +static int +impl_mix_port_enum_params(void *object, int seq, + enum spa_direction direction, uint32_t port_id, + uint32_t id, uint32_t start, uint32_t num, + const struct spa_pod *filter) +{ + struct port *port = object; + + if (port->direction != direction) + return -ENOTSUP; + + return impl_node_port_enum_params(&port->node->node, seq, direction, port->id, + id, start, num, filter); +} + +static int +impl_mix_port_set_param(void *object, + enum spa_direction direction, uint32_t port_id, + uint32_t id, uint32_t flags, + const struct spa_pod *param) +{ + return -ENOTSUP; +} + +static int +impl_mix_add_port(void *object, enum spa_direction direction, uint32_t mix_id, + const struct spa_dict *props) +{ + struct port *port = object; + pw_log_debug("%p: add port %d:%d.%d", object, direction, port->id, mix_id); + return 0; +} + +static int +impl_mix_remove_port(void *object, enum spa_direction direction, uint32_t mix_id) +{ + struct port *port = object; + pw_log_debug("%p: remove port %d:%d.%d", object, direction, port->id, mix_id); + return 0; +} + +static int +impl_mix_port_use_buffers(void *object, + enum spa_direction direction, + uint32_t mix_id, + uint32_t flags, + struct spa_buffer **buffers, + uint32_t n_buffers) +{ + struct port *port = object; + struct impl *impl = port->impl; + + return do_port_use_buffers(impl, direction, port->id, mix_id, flags, buffers, n_buffers); +} + +static int impl_mix_port_set_io(void *object, + enum spa_direction direction, uint32_t mix_id, + uint32_t id, void *data, size_t size) +{ + struct port *p = object; + struct pw_impl_port *port = p->port; + struct impl *impl = port->owner_data; + struct node *this = &impl->node; + struct pw_impl_port_mix *mix; + + mix = pw_map_lookup(&port->mix_port_map, mix_id); + if (mix == NULL) + return -EINVAL; + + if (id == SPA_IO_Buffers) { + if (data && size >= sizeof(struct spa_io_buffers)) + mix->io = data; + else + mix->io = NULL; + + if (mix->io != NULL && this->resource && this->resource->version >= 4) + pw_client_node_resource_port_set_mix_info(this->resource, + direction, port->port_id, + mix->port.port_id, mix->peer_id, NULL); + } + + return do_port_set_io(impl, + direction, port->port_id, mix->port.port_id, + id, data, size); +} + +static int +impl_mix_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id) +{ + struct port *p = object; + return impl_node_port_reuse_buffer(&p->node->node, p->id, buffer_id); +} + +static int impl_mix_process(void *object) +{ + return SPA_STATUS_HAVE_DATA; +} + +static const struct spa_node_methods impl_port_mix = { + SPA_VERSION_NODE_METHODS, + .port_enum_params = impl_mix_port_enum_params, + .port_set_param = impl_mix_port_set_param, + .add_port = impl_mix_add_port, + .remove_port = impl_mix_remove_port, + .port_use_buffers = impl_mix_port_use_buffers, + .port_set_io = impl_mix_port_set_io, + .port_reuse_buffer = impl_mix_port_reuse_buffer, + .process = impl_mix_process, +}; + +static void node_port_init(void *data, struct pw_impl_port *port) +{ + struct impl *impl = data; + struct port *p = pw_impl_port_get_user_data(port); + struct node *this = &impl->node; + + pw_log_debug("%p: port %p init", this, port); + + *p = this->dummy; + p->port = port; + p->node = this; + p->direction = port->direction; + p->id = port->port_id; + p->impl = impl; + pw_array_init(&p->mix, sizeof(struct mix) * 2); + p->mix_node.iface = SPA_INTERFACE_INIT( + SPA_TYPE_INTERFACE_Node, + SPA_VERSION_NODE, + &impl_port_mix, p); + ensure_mix(impl, p, SPA_ID_INVALID); + + pw_map_insert_at(&this->ports[p->direction], p->id, p); + return; +} + +static void node_port_added(void *data, struct pw_impl_port *port) +{ + struct impl *impl = data; + struct port *p = pw_impl_port_get_user_data(port); + + port->flags |= PW_IMPL_PORT_FLAG_NO_MIXER; + + port->impl = SPA_CALLBACKS_INIT(&port_impl, p); + port->owner_data = impl; + + pw_impl_port_set_mix(port, &p->mix_node, + PW_IMPL_PORT_MIX_FLAG_MULTI | + PW_IMPL_PORT_MIX_FLAG_MIX_ONLY); +} + +static void node_port_removed(void *data, struct pw_impl_port *port) +{ + struct impl *impl = data; + struct node *this = &impl->node; + struct port *p = pw_impl_port_get_user_data(port); + + pw_log_debug("%p: port %p remove", this, port); + + p->removed = true; + clear_port(this, p); +} + +static void node_peer_added(void *data, struct pw_impl_node *peer) +{ + struct impl *impl = data; + struct node *this = &impl->node; + struct pw_memblock *m; + + if (peer == impl->this.node) + return; + + m = pw_mempool_import_block(this->client->pool, peer->activation); + if (m == NULL) { + pw_log_debug("%p: can't ensure mem: %m", this); + return; + } + pw_log_debug("%p: peer %p id:%u added mem_id:%u", &impl->this, peer, + peer->info.id, m->id); + + if (this->resource == NULL) + return; + + pw_client_node_resource_set_activation(this->resource, + peer->info.id, + peer->source.fd, + m->id, + 0, + sizeof(struct pw_node_activation)); +} + +static void node_peer_removed(void *data, struct pw_impl_node *peer) +{ + struct impl *impl = data; + struct node *this = &impl->node; + struct pw_memblock *m; + + if (peer == impl->this.node) + return; + + m = pw_mempool_find_fd(this->client->pool, peer->activation->fd); + if (m == NULL) { + pw_log_warn("%p: unknown peer %p fd:%d", this, peer, + peer->source.fd); + return; + } + pw_log_debug("%p: peer %p %u removed", this, peer, + peer->info.id); + + if (this->resource != NULL) { + pw_client_node_resource_set_activation(this->resource, + peer->info.id, + -1, + SPA_ID_INVALID, + 0, + 0); + } + + pw_memblock_unref(m); +} + +static void node_driver_changed(void *data, struct pw_impl_node *old, struct pw_impl_node *driver) +{ + struct impl *impl = data; + struct node *this = &impl->node; + + pw_log_debug("%p: driver changed %p -> %p", this, old, driver); + + node_peer_removed(data, old); + node_peer_added(data, driver); +} + +static const struct pw_impl_node_events node_events = { + PW_VERSION_IMPL_NODE_EVENTS, + .free = node_free, + .initialized = node_initialized, + .port_init = node_port_init, + .port_added = node_port_added, + .port_removed = node_port_removed, + .peer_added = node_peer_added, + .peer_removed = node_peer_removed, + .driver_changed = node_driver_changed, +}; + +static const struct pw_resource_events resource_events = { + PW_VERSION_RESOURCE_EVENTS, + .destroy = client_node_resource_destroy, + .error = client_node_resource_error, + .pong = client_node_resource_pong, +}; + +/** Create a new client node + * \param client an owner \ref pw_client + * \param id an id + * \param name a name + * \param properties extra properties + * \return a newly allocated client node + * + * Create a new \ref pw_impl_node. + * + * \memberof pw_impl_client_node + */ +struct pw_impl_client_node *pw_impl_client_node_new(struct pw_resource *resource, + struct pw_properties *properties, + bool do_register) +{ + struct impl *impl; + struct pw_impl_client_node *this; + struct pw_impl_client *client = pw_resource_get_client(resource); + struct pw_context *context = pw_impl_client_get_context(client); + const struct spa_support *support; + uint32_t n_support; + int res; + + impl = calloc(1, sizeof(struct impl)); + if (impl == NULL) { + res = -errno; + goto error_exit_cleanup; + } + + if (properties == NULL) + properties = pw_properties_new(NULL, NULL); + if (properties == NULL) { + res = -errno; + goto error_exit_free; + } + + pw_properties_setf(properties, PW_KEY_CLIENT_ID, "%d", client->global->id); + + this = &impl->this; + + impl->context = context; + impl->fds[0] = impl->fds[1] = -1; + pw_log_debug("%p: new", &impl->node); + + support = pw_context_get_support(impl->context, &n_support); + node_init(&impl->node, NULL, support, n_support); + impl->node.impl = impl; + impl->node.resource = resource; + impl->node.client = client; + this->flags = do_register ? 0 : 1; + + pw_map_init(&impl->node.ports[0], 64, 64); + pw_map_init(&impl->node.ports[1], 64, 64); + pw_map_init(&impl->io_map, 64, 64); + + this->resource = resource; + this->node = pw_spa_node_new(context, + PW_SPA_NODE_FLAG_ASYNC | + (do_register ? 0 : PW_SPA_NODE_FLAG_NO_REGISTER), + (struct spa_node *)&impl->node.node, + NULL, + properties, 0); + + if (this->node == NULL) + goto error_no_node; + + this->node->remote = true; + this->flags = 0; + + pw_resource_add_listener(this->resource, + &impl->resource_listener, + &resource_events, + impl); + pw_resource_add_object_listener(this->resource, + &impl->object_listener, + &client_node_methods, + impl); + + this->node->port_user_data_size = sizeof(struct port); + + pw_impl_node_add_listener(this->node, &impl->node_listener, &node_events, impl); + + return this; + +error_no_node: + res = -errno; + node_clear(&impl->node); + properties = NULL; + goto error_exit_free; + +error_exit_free: + free(impl); +error_exit_cleanup: + if (resource) + pw_resource_destroy(resource); + pw_properties_free(properties); + errno = -res; + return NULL; +} + +/** Destroy a client node + * \param node the client node to destroy + * \memberof pw_impl_client_node + */ +void pw_impl_client_node_destroy(struct pw_impl_client_node *node) +{ + pw_resource_destroy(node->resource); +} diff --git a/src/modules/module-client-node/client-node.h b/src/modules/module-client-node/client-node.h new file mode 100644 index 0000000..f7e060a --- /dev/null +++ b/src/modules/module-client-node/client-node.h @@ -0,0 +1,60 @@ +/* PipeWire + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef PIPEWIRE_CLIENT_NODE_H +#define PIPEWIRE_CLIENT_NODE_H + +#include <pipewire/impl.h> +#include <pipewire/extensions/client-node.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/** \class pw_impl_client_node + * + * PipeWire client node interface + */ +struct pw_impl_client_node { + struct pw_impl_node *node; + + struct pw_resource *resource; + uint32_t flags; +}; + +struct pw_impl_client_node * +pw_impl_client_node_new(struct pw_resource *resource, + struct pw_properties *properties, + bool do_register); + +void +pw_impl_client_node_destroy(struct pw_impl_client_node *node); + +void pw_impl_client_node_registered(struct pw_impl_client_node *node, struct pw_global *global); + +#ifdef __cplusplus +} +#endif + +#endif /* PIPEWIRE_CLIENT_NODE_H */ diff --git a/src/modules/module-client-node/protocol-native.c b/src/modules/module-client-node/protocol-native.c new file mode 100644 index 0000000..dd51692 --- /dev/null +++ b/src/modules/module-client-node/protocol-native.c @@ -0,0 +1,1259 @@ +/* PipeWire + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include <errno.h> + +#include <spa/pod/builder.h> +#include <spa/pod/parser.h> +#include <spa/utils/result.h> + +#include <pipewire/impl.h> + +#include <pipewire/extensions/protocol-native.h> +#include <pipewire/extensions/client-node.h> + +#define MAX_DICT 1024 +#define MAX_PARAMS 4096 +#define MAX_PARAM_INFO 128 +#define MAX_BUFFERS 64 +#define MAX_METAS 16u +#define MAX_DATAS 64u + +PW_LOG_TOPIC_EXTERN(mod_topic); +#define PW_LOG_TOPIC_DEFAULT mod_topic + +static inline void push_item(struct spa_pod_builder *b, const struct spa_dict_item *item) +{ + const char *str; + spa_pod_builder_string(b, item->key); + str = item->value; + if (spa_strstartswith(str, "pointer:")) + str = ""; + spa_pod_builder_string(b, str); +} + +static void push_dict(struct spa_pod_builder *b, const struct spa_dict *dict) +{ + uint32_t i, n_items; + struct spa_pod_frame f; + + n_items = dict ? dict->n_items : 0; + + spa_pod_builder_push_struct(b, &f); + spa_pod_builder_int(b, n_items); + for (i = 0; i < n_items; i++) + push_item(b, &dict->items[i]); + spa_pod_builder_pop(b, &f); +} + +static inline int parse_item(struct spa_pod_parser *prs, struct spa_dict_item *item) +{ + int res; + if ((res = spa_pod_parser_get(prs, + SPA_POD_String(&item->key), + SPA_POD_String(&item->value), + NULL)) < 0) + return res; + if (spa_strstartswith(item->value, "pointer:")) + item->value = ""; + return 0; +} + +#define parse_dict(prs,d) \ +do { \ + uint32_t i; \ + if (spa_pod_parser_get(prs, \ + SPA_POD_Int(&(d)->n_items), NULL) < 0) \ + return -EINVAL; \ + (d)->items = NULL; \ + if ((d)->n_items > 0) { \ + if ((d)->n_items > MAX_DICT) \ + return -ENOSPC; \ + (d)->items = alloca((d)->n_items * sizeof(struct spa_dict_item)); \ + for (i = 0; i < (d)->n_items; i++) { \ + if (parse_item(prs, (struct spa_dict_item *) &(d)->items[i]) < 0) \ + return -EINVAL; \ + } \ + } \ +} while(0) + +#define parse_dict_struct(prs,f,dict) \ +do { \ + if (spa_pod_parser_push_struct(prs, f) < 0) \ + return -EINVAL; \ + parse_dict(prs, dict); \ + spa_pod_parser_pop(prs, f); \ +} while(0) + +#define parse_params(prs,n_params,params) \ +do { \ + uint32_t i; \ + if (spa_pod_parser_get(prs, \ + SPA_POD_Int(&n_params), NULL) < 0) \ + return -EINVAL; \ + params = NULL; \ + if (n_params > 0) { \ + if (n_params > MAX_PARAMS) \ + return -ENOSPC; \ + params = alloca(n_params * sizeof(struct spa_pod *)); \ + for (i = 0; i < n_params; i++) { \ + if (spa_pod_parser_get(prs, \ + SPA_POD_PodObject(¶ms[i]), NULL) < 0) \ + return -EINVAL; \ + } \ + } \ +} while(0) + +#define parse_param_info(prs,n_params,params) \ +do { \ + uint32_t i; \ + if (spa_pod_parser_get(prs, \ + SPA_POD_Int(&(n_params)), NULL) < 0) \ + return -EINVAL; \ + params = NULL; \ + if (n_params > 0) { \ + if (n_params > MAX_PARAM_INFO) \ + return -ENOSPC; \ + params = alloca(n_params * sizeof(struct spa_param_info)); \ + for (i = 0; i < n_params; i++) { \ + if (spa_pod_parser_get(prs, \ + SPA_POD_Id(&(params[i]).id), \ + SPA_POD_Int(&(params[i]).flags), NULL) < 0) \ + return -EINVAL; \ + } \ + } \ +} while(0) + +static int client_node_marshal_add_listener(void *object, + struct spa_hook *listener, + const struct pw_client_node_events *events, + void *data) +{ + struct pw_proxy *proxy = object; + pw_proxy_add_object_listener(proxy, listener, events, data); + return 0; +} + +static struct pw_node * +client_node_marshal_get_node(void *object, uint32_t version, size_t user_data_size) +{ + struct pw_proxy *proxy = object; + struct spa_pod_builder *b; + struct pw_proxy *res; + uint32_t new_id; + + res = pw_proxy_new(object, PW_TYPE_INTERFACE_Node, version, user_data_size); + if (res == NULL) + return NULL; + + new_id = pw_proxy_get_id(res); + + b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_NODE_METHOD_GET_NODE, NULL); + + spa_pod_builder_add_struct(b, + SPA_POD_Int(version), + SPA_POD_Int(new_id)); + + pw_protocol_native_end_proxy(proxy, b); + + return (struct pw_node *) res; +} + +static int +client_node_marshal_update(void *object, + uint32_t change_mask, + uint32_t n_params, + const struct spa_pod **params, + const struct spa_node_info *info) +{ + struct pw_proxy *proxy = object; + struct spa_pod_builder *b; + struct spa_pod_frame f[2]; + uint32_t i, n_items, n_info_params; + + b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_NODE_METHOD_UPDATE, NULL); + + spa_pod_builder_push_struct(b, &f[0]); + spa_pod_builder_add(b, + SPA_POD_Int(change_mask), + SPA_POD_Int(n_params), NULL); + + for (i = 0; i < n_params; i++) + spa_pod_builder_add(b, SPA_POD_Pod(params[i]), NULL); + + if (info) { + uint64_t change_mask = info->change_mask; + + change_mask &= SPA_NODE_CHANGE_MASK_FLAGS | + SPA_NODE_CHANGE_MASK_PROPS | + SPA_NODE_CHANGE_MASK_PARAMS; + + n_items = info->props && (change_mask & SPA_NODE_CHANGE_MASK_PROPS) ? + info->props->n_items : 0; + n_info_params = (change_mask & SPA_NODE_CHANGE_MASK_PARAMS) ? + info->n_params : 0; + + spa_pod_builder_push_struct(b, &f[1]); + spa_pod_builder_add(b, + SPA_POD_Int(info->max_input_ports), + SPA_POD_Int(info->max_output_ports), + SPA_POD_Long(change_mask), + SPA_POD_Long(info->flags), + SPA_POD_Int(n_items), NULL); + for (i = 0; i < n_items; i++) + push_item(b, &info->props->items[i]); + spa_pod_builder_add(b, + SPA_POD_Int(n_info_params), NULL); + for (i = 0; i < n_info_params; i++) { + spa_pod_builder_add(b, + SPA_POD_Id(info->params[i].id), + SPA_POD_Int(info->params[i].flags), NULL); + } + spa_pod_builder_pop(b, &f[1]); + + } else { + spa_pod_builder_add(b, + SPA_POD_Pod(NULL), NULL); + } + spa_pod_builder_pop(b, &f[0]); + + return pw_protocol_native_end_proxy(proxy, b); +} + +static int +client_node_marshal_port_update(void *object, + enum spa_direction direction, + uint32_t port_id, + uint32_t change_mask, + uint32_t n_params, + const struct spa_pod **params, + const struct spa_port_info *info) +{ + struct pw_proxy *proxy = object; + struct spa_pod_builder *b; + struct spa_pod_frame f[2]; + uint32_t i, n_items; + + b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_NODE_METHOD_PORT_UPDATE, NULL); + + spa_pod_builder_push_struct(b, &f[0]); + spa_pod_builder_add(b, + SPA_POD_Int(direction), + SPA_POD_Int(port_id), + SPA_POD_Int(change_mask), + SPA_POD_Int(n_params), NULL); + + for (i = 0; i < n_params; i++) + spa_pod_builder_add(b, + SPA_POD_Pod(params[i]), NULL); + + if (info) { + uint64_t change_mask = info->change_mask; + + n_items = info->props ? info->props->n_items : 0; + + change_mask &= SPA_PORT_CHANGE_MASK_FLAGS | + SPA_PORT_CHANGE_MASK_RATE | + SPA_PORT_CHANGE_MASK_PROPS | + SPA_PORT_CHANGE_MASK_PARAMS; + + spa_pod_builder_push_struct(b, &f[1]); + spa_pod_builder_add(b, + SPA_POD_Long(change_mask), + SPA_POD_Long(info->flags), + SPA_POD_Int(info->rate.num), + SPA_POD_Int(info->rate.denom), + SPA_POD_Int(n_items), NULL); + for (i = 0; i < n_items; i++) + push_item(b, &info->props->items[i]); + spa_pod_builder_add(b, + SPA_POD_Int(info->n_params), NULL); + for (i = 0; i < info->n_params; i++) { + spa_pod_builder_add(b, + SPA_POD_Id(info->params[i].id), + SPA_POD_Int(info->params[i].flags), NULL); + } + spa_pod_builder_pop(b, &f[1]); + + } else { + spa_pod_builder_add(b, + SPA_POD_Pod(NULL), NULL); + } + spa_pod_builder_pop(b, &f[0]); + + return pw_protocol_native_end_proxy(proxy, b); +} + +static int client_node_marshal_set_active(void *object, bool active) +{ + struct pw_proxy *proxy = object; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_NODE_METHOD_SET_ACTIVE, NULL); + + spa_pod_builder_add_struct(b, + SPA_POD_Bool(active)); + + return pw_protocol_native_end_proxy(proxy, b); +} + +static int client_node_marshal_event_method(void *object, const struct spa_event *event) +{ + struct pw_proxy *proxy = object; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_NODE_METHOD_EVENT, NULL); + + spa_pod_builder_add_struct(b, + SPA_POD_Pod(event)); + + return pw_protocol_native_end_proxy(proxy, b); +} + +static int +client_node_marshal_port_buffers(void *object, + enum spa_direction direction, + uint32_t port_id, + uint32_t mix_id, + uint32_t n_buffers, + struct spa_buffer **buffers) +{ + struct pw_proxy *proxy = object; + struct spa_pod_builder *b; + struct spa_pod_frame f[2]; + uint32_t i, j; + + b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_NODE_METHOD_PORT_BUFFERS, NULL); + + spa_pod_builder_push_struct(b, &f[0]); + spa_pod_builder_add(b, + SPA_POD_Int(direction), + SPA_POD_Int(port_id), + SPA_POD_Int(mix_id), + SPA_POD_Int(n_buffers), NULL); + + for (i = 0; i < n_buffers; i++) { + struct spa_buffer *buf = buffers[i]; + + spa_pod_builder_add(b, + SPA_POD_Int(buf->n_datas), NULL); + + for (j = 0; j < buf->n_datas; j++) { + struct spa_data *d = &buf->datas[j]; + spa_pod_builder_add(b, + SPA_POD_Id(d->type), + SPA_POD_Fd(pw_protocol_native_add_proxy_fd(proxy, d->fd)), + SPA_POD_Int(d->flags), + SPA_POD_Int(d->mapoffset), + SPA_POD_Int(d->maxsize), NULL); + } + } + spa_pod_builder_pop(b, &f[0]); + + return pw_protocol_native_end_proxy(proxy, b); +} + +static int client_node_demarshal_transport(void *data, const struct pw_protocol_native_message *msg) +{ + struct pw_proxy *proxy = data; + struct spa_pod_parser prs; + uint32_t mem_id, offset, sz; + int64_t ridx, widx; + int readfd, writefd; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_get_struct(&prs, + SPA_POD_Fd(&ridx), + SPA_POD_Fd(&widx), + SPA_POD_Int(&mem_id), + SPA_POD_Int(&offset), + SPA_POD_Int(&sz)) < 0) + return -EINVAL; + + readfd = pw_protocol_native_get_proxy_fd(proxy, ridx); + writefd = pw_protocol_native_get_proxy_fd(proxy, widx); + + if (readfd < 0 || writefd < 0) + return -EINVAL; + + pw_proxy_notify(proxy, struct pw_client_node_events, transport, 0, + readfd, writefd, mem_id, + offset, sz); + return 0; +} + +static int client_node_demarshal_set_param(void *data, const struct pw_protocol_native_message *msg) +{ + struct pw_proxy *proxy = data; + struct spa_pod_parser prs; + uint32_t id, flags; + const struct spa_pod *param = NULL; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_get_struct(&prs, + SPA_POD_Id(&id), + SPA_POD_Int(&flags), + SPA_POD_PodObject(¶m)) < 0) + return -EINVAL; + + pw_proxy_notify(proxy, struct pw_client_node_events, set_param, 0, id, flags, param); + return 0; +} + +static int client_node_demarshal_event_event(void *data, const struct pw_protocol_native_message *msg) +{ + struct pw_proxy *proxy = data; + struct spa_pod_parser prs; + const struct spa_event *event; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_get_struct(&prs, + SPA_POD_PodObject(&event)) < 0) + return -EINVAL; + + pw_proxy_notify(proxy, struct pw_client_node_events, event, 0, event); + return 0; +} + +static int client_node_demarshal_command(void *data, const struct pw_protocol_native_message *msg) +{ + struct pw_proxy *proxy = data; + struct spa_pod_parser prs; + const struct spa_command *command; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_get_struct(&prs, + SPA_POD_PodObject(&command)) < 0) + return -EINVAL; + + pw_proxy_notify(proxy, struct pw_client_node_events, command, 0, command); + return 0; +} + +static int client_node_demarshal_add_port(void *data, const struct pw_protocol_native_message *msg) +{ + struct pw_proxy *proxy = data; + struct spa_pod_parser prs; + struct spa_pod_frame f[2]; + int32_t direction, port_id; + struct spa_dict props = SPA_DICT_INIT(NULL, 0); + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_push_struct(&prs, &f[0]) < 0) + return -EINVAL; + + if (spa_pod_parser_get(&prs, + SPA_POD_Int(&direction), + SPA_POD_Int(&port_id), NULL) < 0) + return -EINVAL; + + parse_dict_struct(&prs, &f[1], &props); + + pw_proxy_notify(proxy, struct pw_client_node_events, add_port, 0, direction, port_id, + props.n_items ? &props : NULL); + return 0; +} + +static int client_node_demarshal_remove_port(void *data, const struct pw_protocol_native_message *msg) +{ + struct pw_proxy *proxy = data; + struct spa_pod_parser prs; + int32_t direction, port_id; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_get_struct(&prs, + SPA_POD_Int(&direction), + SPA_POD_Int(&port_id)) < 0) + return -EINVAL; + + pw_proxy_notify(proxy, struct pw_client_node_events, remove_port, 0, direction, port_id); + return 0; +} + +static int client_node_demarshal_port_set_param(void *data, const struct pw_protocol_native_message *msg) +{ + struct pw_proxy *proxy = data; + struct spa_pod_parser prs; + uint32_t direction, port_id, id, flags; + const struct spa_pod *param = NULL; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_get_struct(&prs, + SPA_POD_Int(&direction), + SPA_POD_Int(&port_id), + SPA_POD_Id(&id), + SPA_POD_Int(&flags), + SPA_POD_PodObject(¶m)) < 0) + return -EINVAL; + + pw_proxy_notify(proxy, struct pw_client_node_events, port_set_param, 0, + direction, port_id, id, flags, param); + return 0; +} + +static int client_node_demarshal_port_use_buffers(void *data, const struct pw_protocol_native_message *msg) +{ + struct pw_proxy *proxy = data; + struct spa_pod_parser prs; + struct spa_pod_frame f; + uint32_t direction, port_id, mix_id, flags, n_buffers, data_id; + struct pw_client_node_buffer *buffers; + uint32_t i, j; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_push_struct(&prs, &f) < 0 || + spa_pod_parser_get(&prs, + SPA_POD_Int(&direction), + SPA_POD_Int(&port_id), + SPA_POD_Int(&mix_id), + SPA_POD_Int(&flags), + SPA_POD_Int(&n_buffers), NULL) < 0) + return -EINVAL; + + if (n_buffers > MAX_BUFFERS) + return -ENOSPC; + + buffers = alloca(sizeof(struct pw_client_node_buffer) * n_buffers); + for (i = 0; i < n_buffers; i++) { + struct spa_buffer *buf = buffers[i].buffer = alloca(sizeof(struct spa_buffer)); + + if (spa_pod_parser_get(&prs, + SPA_POD_Int(&buffers[i].mem_id), + SPA_POD_Int(&buffers[i].offset), + SPA_POD_Int(&buffers[i].size), + SPA_POD_Int(&buf->n_metas), NULL) < 0) + return -EINVAL; + + if (buf->n_metas > MAX_METAS) + return -ENOSPC; + + buf->metas = alloca(sizeof(struct spa_meta) * buf->n_metas); + for (j = 0; j < buf->n_metas; j++) { + struct spa_meta *m = &buf->metas[j]; + + if (spa_pod_parser_get(&prs, + SPA_POD_Id(&m->type), + SPA_POD_Int(&m->size), NULL) < 0) + return -EINVAL; + + m->data = NULL; + } + if (spa_pod_parser_get(&prs, + SPA_POD_Int(&buf->n_datas), NULL) < 0) + return -EINVAL; + + if (buf->n_datas > MAX_DATAS) + return -ENOSPC; + + buf->datas = alloca(sizeof(struct spa_data) * buf->n_datas); + for (j = 0; j < buf->n_datas; j++) { + struct spa_data *d = &buf->datas[j]; + + if (spa_pod_parser_get(&prs, + SPA_POD_Id(&d->type), + SPA_POD_Int(&data_id), + SPA_POD_Int(&d->flags), + SPA_POD_Int(&d->mapoffset), + SPA_POD_Int(&d->maxsize), NULL) < 0) + return -EINVAL; + + d->fd = -1; + d->data = SPA_UINT32_TO_PTR(data_id); + d->chunk = NULL; + } + } + pw_proxy_notify(proxy, struct pw_client_node_events, port_use_buffers, 0, + direction, + port_id, + mix_id, + flags, + n_buffers, buffers); + return 0; +} + +static int client_node_demarshal_port_set_io(void *data, const struct pw_protocol_native_message *msg) +{ + struct pw_proxy *proxy = data; + struct spa_pod_parser prs; + uint32_t direction, port_id, mix_id, id, memid, off, sz; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_get_struct(&prs, + SPA_POD_Int(&direction), + SPA_POD_Int(&port_id), + SPA_POD_Int(&mix_id), + SPA_POD_Id(&id), + SPA_POD_Int(&memid), + SPA_POD_Int(&off), + SPA_POD_Int(&sz)) < 0) + return -EINVAL; + + pw_proxy_notify(proxy, struct pw_client_node_events, port_set_io, 0, + direction, port_id, mix_id, + id, memid, + off, sz); + return 0; +} + +static int client_node_demarshal_set_activation(void *data, const struct pw_protocol_native_message *msg) +{ + struct pw_proxy *proxy = data; + struct spa_pod_parser prs; + uint32_t node_id, memid, off, sz; + int64_t sigidx; + int signalfd; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_get_struct(&prs, + SPA_POD_Int(&node_id), + SPA_POD_Fd(&sigidx), + SPA_POD_Int(&memid), + SPA_POD_Int(&off), + SPA_POD_Int(&sz)) < 0) + return -EINVAL; + + signalfd = pw_protocol_native_get_proxy_fd(proxy, sigidx); + + pw_proxy_notify(proxy, struct pw_client_node_events, set_activation, 0, + node_id, + signalfd, + memid, + off, sz); + return 0; +} + +static int client_node_demarshal_port_set_mix_info(void *data, const struct pw_protocol_native_message *msg) +{ + struct pw_proxy *proxy = data; + struct spa_pod_parser prs; + uint32_t direction, port_id, mix_id, peer_id; + struct spa_pod_frame f[2]; + struct spa_dict props = SPA_DICT_INIT(NULL, 0); + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_push_struct(&prs, &f[0]) < 0 || + spa_pod_parser_get(&prs, + SPA_POD_Int(&direction), + SPA_POD_Int(&port_id), + SPA_POD_Int(&mix_id), + SPA_POD_Int(&peer_id), NULL) < 0) + return -EINVAL; + + parse_dict_struct(&prs, &f[1], &props); + + pw_proxy_notify(proxy, struct pw_client_node_events, port_set_mix_info, 1, + direction, port_id, mix_id, + peer_id, &props); + return 0; +} + +static int client_node_demarshal_set_io(void *data, const struct pw_protocol_native_message *msg) +{ + struct pw_proxy *proxy = data; + struct spa_pod_parser prs; + uint32_t id, memid, off, sz; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_get_struct(&prs, + SPA_POD_Id(&id), + SPA_POD_Int(&memid), + SPA_POD_Int(&off), + SPA_POD_Int(&sz)) < 0) + return -EINVAL; + + pw_proxy_notify(proxy, struct pw_client_node_events, set_io, 0, + id, memid, off, sz); + return 0; +} + +static int client_node_marshal_transport(void *data, int readfd, int writefd, + uint32_t mem_id, uint32_t offset, uint32_t size) +{ + struct pw_protocol_native_message *msg; + struct pw_resource *resource = data; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_EVENT_TRANSPORT, &msg); + + spa_pod_builder_add_struct(b, + SPA_POD_Fd(pw_protocol_native_add_resource_fd(resource, readfd)), + SPA_POD_Fd(pw_protocol_native_add_resource_fd(resource, writefd)), + SPA_POD_Int(mem_id), + SPA_POD_Int(offset), + SPA_POD_Int(size)); + + return pw_protocol_native_end_resource(resource, b); +} + +static int +client_node_marshal_set_param(void *data, uint32_t id, uint32_t flags, + const struct spa_pod *param) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_EVENT_SET_PARAM, NULL); + + spa_pod_builder_add_struct(b, + SPA_POD_Id(id), + SPA_POD_Int(flags), + SPA_POD_Pod(param)); + + return pw_protocol_native_end_resource(resource, b); +} + +static int client_node_marshal_event_event(void *data, const struct spa_event *event) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_EVENT_EVENT, NULL); + + spa_pod_builder_add_struct(b, + SPA_POD_Pod(event)); + + return pw_protocol_native_end_resource(resource, b); +} + +static int +client_node_marshal_command(void *data, const struct spa_command *command) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_EVENT_COMMAND, NULL); + + spa_pod_builder_add_struct(b, + SPA_POD_Pod(command)); + + return pw_protocol_native_end_resource(resource, b); +} + +static int +client_node_marshal_add_port(void *data, + enum spa_direction direction, uint32_t port_id, + const struct spa_dict *props) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + struct spa_pod_frame f; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_EVENT_ADD_PORT, NULL); + + spa_pod_builder_push_struct(b, &f); + spa_pod_builder_add(b, + SPA_POD_Int(direction), + SPA_POD_Int(port_id), NULL); + push_dict(b, props); + spa_pod_builder_pop(b, &f); + + return pw_protocol_native_end_resource(resource, b); +} + +static int +client_node_marshal_remove_port(void *data, + enum spa_direction direction, uint32_t port_id) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_EVENT_REMOVE_PORT, NULL); + + spa_pod_builder_add_struct(b, + SPA_POD_Int(direction), + SPA_POD_Int(port_id)); + + return pw_protocol_native_end_resource(resource, b); +} + +static int +client_node_marshal_port_set_param(void *data, + enum spa_direction direction, + uint32_t port_id, + uint32_t id, + uint32_t flags, + const struct spa_pod *param) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_EVENT_PORT_SET_PARAM, NULL); + + spa_pod_builder_add_struct(b, + SPA_POD_Int(direction), + SPA_POD_Int(port_id), + SPA_POD_Id(id), + SPA_POD_Int(flags), + SPA_POD_Pod(param)); + + return pw_protocol_native_end_resource(resource, b); +} + +static int +client_node_marshal_port_use_buffers(void *data, + enum spa_direction direction, + uint32_t port_id, + uint32_t mix_id, + uint32_t flags, + uint32_t n_buffers, struct pw_client_node_buffer *buffers) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + struct spa_pod_frame f; + uint32_t i, j; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_EVENT_PORT_USE_BUFFERS, NULL); + + spa_pod_builder_push_struct(b, &f); + spa_pod_builder_add(b, + SPA_POD_Int(direction), + SPA_POD_Int(port_id), + SPA_POD_Int(mix_id), + SPA_POD_Int(flags), + SPA_POD_Int(n_buffers), NULL); + + for (i = 0; i < n_buffers; i++) { + struct spa_buffer *buf = buffers[i].buffer; + + spa_pod_builder_add(b, + SPA_POD_Int(buffers[i].mem_id), + SPA_POD_Int(buffers[i].offset), + SPA_POD_Int(buffers[i].size), + SPA_POD_Int(buf->n_metas), NULL); + + for (j = 0; j < buf->n_metas; j++) { + struct spa_meta *m = &buf->metas[j]; + spa_pod_builder_add(b, + SPA_POD_Id(m->type), + SPA_POD_Int(m->size), NULL); + } + spa_pod_builder_add(b, + SPA_POD_Int(buf->n_datas), NULL); + for (j = 0; j < buf->n_datas; j++) { + struct spa_data *d = &buf->datas[j]; + spa_pod_builder_add(b, + SPA_POD_Id(d->type), + SPA_POD_Int(SPA_PTR_TO_UINT32(d->data)), + SPA_POD_Int(d->flags), + SPA_POD_Int(d->mapoffset), + SPA_POD_Int(d->maxsize), NULL); + } + } + spa_pod_builder_pop(b, &f); + + return pw_protocol_native_end_resource(resource, b); +} + +static int +client_node_marshal_port_set_io(void *data, + uint32_t direction, + uint32_t port_id, + uint32_t mix_id, + uint32_t id, + uint32_t memid, + uint32_t offset, + uint32_t size) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_EVENT_PORT_SET_IO, NULL); + + spa_pod_builder_add_struct(b, + SPA_POD_Int(direction), + SPA_POD_Int(port_id), + SPA_POD_Int(mix_id), + SPA_POD_Id(id), + SPA_POD_Int(memid), + SPA_POD_Int(offset), + SPA_POD_Int(size)); + + return pw_protocol_native_end_resource(resource, b); +} + +static int +client_node_marshal_set_activation(void *data, + uint32_t node_id, + int signalfd, + uint32_t memid, + uint32_t offset, + uint32_t size) +{ + struct pw_protocol_native_message *msg; + struct pw_resource *resource = data; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_EVENT_SET_ACTIVATION, &msg); + + spa_pod_builder_add_struct(b, + SPA_POD_Int(node_id), + SPA_POD_Fd(pw_protocol_native_add_resource_fd(resource, signalfd)), + SPA_POD_Int(memid), + SPA_POD_Int(offset), + SPA_POD_Int(size)); + + return pw_protocol_native_end_resource(resource, b); +} + +static int +client_node_marshal_set_io(void *data, + uint32_t id, + uint32_t memid, + uint32_t offset, + uint32_t size) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_EVENT_SET_IO, NULL); + spa_pod_builder_add_struct(b, + SPA_POD_Id(id), + SPA_POD_Int(memid), + SPA_POD_Int(offset), + SPA_POD_Int(size)); + return pw_protocol_native_end_resource(resource, b); +} + +static int +client_node_marshal_port_set_mix_info(void *data, + uint32_t direction, + uint32_t port_id, + uint32_t mix_id, + uint32_t peer_id, + const struct spa_dict *props) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + struct spa_pod_frame f; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_EVENT_PORT_SET_MIX_INFO, NULL); + + spa_pod_builder_push_struct(b, &f); + spa_pod_builder_add(b, + SPA_POD_Int(direction), + SPA_POD_Int(port_id), + SPA_POD_Int(mix_id), + SPA_POD_Int(peer_id), NULL); + push_dict(b, props); + spa_pod_builder_pop(b, &f); + + return pw_protocol_native_end_resource(resource, b); +} + + +static int client_node_demarshal_get_node(void *object, const struct pw_protocol_native_message *msg) +{ + struct pw_resource *resource = object; + struct spa_pod_parser prs; + int32_t version, new_id; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_get_struct(&prs, + SPA_POD_Int(&version), + SPA_POD_Int(&new_id)) < 0) + return -EINVAL; + + return pw_resource_notify(resource, struct pw_client_node_methods, get_node, 0, + version, new_id); +} + +static int client_node_demarshal_update(void *object, const struct pw_protocol_native_message *msg) +{ + struct pw_resource *resource = object; + struct spa_pod_parser prs; + struct spa_pod_frame f[2]; + uint32_t change_mask, n_params; + const struct spa_pod **params = NULL; + struct spa_node_info info = SPA_NODE_INFO_INIT(), *infop = NULL; + struct spa_pod *ipod; + struct spa_dict props = SPA_DICT_INIT(NULL, 0); + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_push_struct(&prs, &f[0]) < 0 || + spa_pod_parser_get(&prs, + SPA_POD_Int(&change_mask), NULL) < 0) + return -EINVAL; + + parse_params(&prs, n_params, params); + + if (spa_pod_parser_get(&prs, + SPA_POD_PodStruct(&ipod), NULL) < 0) + return -EINVAL; + + if (ipod) { + struct spa_pod_parser p2; + struct spa_pod_frame f2; + infop = &info; + + spa_pod_parser_pod(&p2, ipod); + if (spa_pod_parser_push_struct(&p2, &f2) < 0 || + spa_pod_parser_get(&p2, + SPA_POD_Int(&info.max_input_ports), + SPA_POD_Int(&info.max_output_ports), + SPA_POD_Long(&info.change_mask), + SPA_POD_Long(&info.flags), NULL) < 0) + return -EINVAL; + + info.change_mask &= SPA_NODE_CHANGE_MASK_FLAGS | + SPA_NODE_CHANGE_MASK_PROPS | + SPA_NODE_CHANGE_MASK_PARAMS; + + parse_dict(&p2, &props); + if (props.n_items > 0) + info.props = &props; + + parse_param_info(&p2, info.n_params, info.params); + } + + pw_resource_notify(resource, struct pw_client_node_methods, update, 0, change_mask, + n_params, + params, infop); + return 0; +} + +static int client_node_demarshal_port_update(void *object, const struct pw_protocol_native_message *msg) +{ + struct pw_resource *resource = object; + struct spa_pod_parser prs; + struct spa_pod_frame f; + uint32_t direction, port_id, change_mask, n_params; + const struct spa_pod **params = NULL; + struct spa_port_info info = SPA_PORT_INFO_INIT(), *infop = NULL; + struct spa_pod *ipod; + struct spa_dict props = SPA_DICT_INIT(NULL, 0); + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_push_struct(&prs, &f) < 0 || + spa_pod_parser_get(&prs, + SPA_POD_Int(&direction), + SPA_POD_Int(&port_id), + SPA_POD_Int(&change_mask), NULL) < 0) + return -EINVAL; + + parse_params(&prs, n_params, params); + + if (spa_pod_parser_get(&prs, + SPA_POD_PodStruct(&ipod), NULL) < 0) + return -EINVAL; + + if (ipod) { + struct spa_pod_parser p2; + struct spa_pod_frame f2; + infop = &info; + + spa_pod_parser_pod(&p2, ipod); + if (spa_pod_parser_push_struct(&p2, &f2) < 0 || + spa_pod_parser_get(&p2, + SPA_POD_Long(&info.change_mask), + SPA_POD_Long(&info.flags), + SPA_POD_Int(&info.rate.num), + SPA_POD_Int(&info.rate.denom), NULL) < 0) + return -EINVAL; + + info.change_mask &= SPA_PORT_CHANGE_MASK_FLAGS | + SPA_PORT_CHANGE_MASK_RATE | + SPA_PORT_CHANGE_MASK_PROPS | + SPA_PORT_CHANGE_MASK_PARAMS; + + parse_dict(&p2, &props); + if (props.n_items > 0) + info.props = &props; + + parse_param_info(&p2, info.n_params, info.params); + } + + pw_resource_notify(resource, struct pw_client_node_methods, port_update, 0, direction, + port_id, + change_mask, + n_params, + params, infop); + return 0; +} + +static int client_node_demarshal_set_active(void *object, const struct pw_protocol_native_message *msg) +{ + struct pw_resource *resource = object; + struct spa_pod_parser prs; + bool active; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_get_struct(&prs, + SPA_POD_Bool(&active)) < 0) + return -EINVAL; + + pw_resource_notify(resource, struct pw_client_node_methods, set_active, 0, active); + return 0; +} + +static int client_node_demarshal_event_method(void *object, const struct pw_protocol_native_message *msg) +{ + struct pw_resource *resource = object; + struct spa_pod_parser prs; + const struct spa_event *event; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_get_struct(&prs, + SPA_POD_PodObject(&event)) < 0) + return -EINVAL; + + pw_resource_notify(resource, struct pw_client_node_methods, event, 0, event); + return 0; +} + +static int client_node_demarshal_port_buffers(void *object, const struct pw_protocol_native_message *msg) +{ + struct pw_resource *resource = object; + struct spa_pod_parser prs; + struct spa_pod_frame f; + uint32_t i, j, direction, port_id, mix_id, n_buffers; + int64_t data_fd; + struct spa_buffer **buffers = NULL; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_push_struct(&prs, &f) < 0 || + spa_pod_parser_get(&prs, + SPA_POD_Int(&direction), + SPA_POD_Int(&port_id), + SPA_POD_Int(&mix_id), + SPA_POD_Int(&n_buffers), NULL) < 0) + return -EINVAL; + + if (n_buffers > MAX_BUFFERS) + return -ENOSPC; + + buffers = alloca(sizeof(struct spa_buffer*) * n_buffers); + for (i = 0; i < n_buffers; i++) { + struct spa_buffer *buf = buffers[i] = alloca(sizeof(struct spa_buffer)); + + spa_zero(*buf); + if (spa_pod_parser_get(&prs, + SPA_POD_Int(&buf->n_datas), NULL) < 0) + return -EINVAL; + + if (buf->n_datas > MAX_DATAS) + return -ENOSPC; + + buf->datas = alloca(sizeof(struct spa_data) * buf->n_datas); + for (j = 0; j < buf->n_datas; j++) { + struct spa_data *d = &buf->datas[j]; + + if (spa_pod_parser_get(&prs, + SPA_POD_Id(&d->type), + SPA_POD_Fd(&data_fd), + SPA_POD_Int(&d->flags), + SPA_POD_Int(&d->mapoffset), + SPA_POD_Int(&d->maxsize), NULL) < 0) + return -EINVAL; + + d->fd = pw_protocol_native_get_resource_fd(resource, data_fd); + } + } + + pw_resource_notify(resource, struct pw_client_node_methods, port_buffers, 0, + direction, port_id, mix_id, n_buffers, buffers); + + return 0; +} + +static const struct pw_client_node_methods pw_protocol_native_client_node_method_marshal = { + PW_VERSION_CLIENT_NODE_METHODS, + .add_listener = &client_node_marshal_add_listener, + .get_node = &client_node_marshal_get_node, + .update = &client_node_marshal_update, + .port_update = &client_node_marshal_port_update, + .set_active = &client_node_marshal_set_active, + .event = &client_node_marshal_event_method, + .port_buffers = &client_node_marshal_port_buffers +}; + +static const struct pw_protocol_native_demarshal +pw_protocol_native_client_node_method_demarshal[PW_CLIENT_NODE_METHOD_NUM] = +{ + [PW_CLIENT_NODE_METHOD_ADD_LISTENER] = { NULL, 0 }, + [PW_CLIENT_NODE_METHOD_GET_NODE] = { &client_node_demarshal_get_node, 0 }, + [PW_CLIENT_NODE_METHOD_UPDATE] = { &client_node_demarshal_update, 0 }, + [PW_CLIENT_NODE_METHOD_PORT_UPDATE] = { &client_node_demarshal_port_update, 0 }, + [PW_CLIENT_NODE_METHOD_SET_ACTIVE] = { &client_node_demarshal_set_active, 0 }, + [PW_CLIENT_NODE_METHOD_EVENT] = { &client_node_demarshal_event_method, 0 }, + [PW_CLIENT_NODE_METHOD_PORT_BUFFERS] = { &client_node_demarshal_port_buffers, 0 } +}; + +static const struct pw_client_node_events pw_protocol_native_client_node_event_marshal = { + PW_VERSION_CLIENT_NODE_EVENTS, + .transport = &client_node_marshal_transport, + .set_param = &client_node_marshal_set_param, + .set_io = &client_node_marshal_set_io, + .event = &client_node_marshal_event_event, + .command = &client_node_marshal_command, + .add_port = &client_node_marshal_add_port, + .remove_port = &client_node_marshal_remove_port, + .port_set_param = &client_node_marshal_port_set_param, + .port_use_buffers = &client_node_marshal_port_use_buffers, + .port_set_io = &client_node_marshal_port_set_io, + .set_activation = &client_node_marshal_set_activation, + .port_set_mix_info = &client_node_marshal_port_set_mix_info, +}; + +static const struct pw_protocol_native_demarshal +pw_protocol_native_client_node_event_demarshal[PW_CLIENT_NODE_EVENT_NUM] = +{ + [PW_CLIENT_NODE_EVENT_TRANSPORT] = { &client_node_demarshal_transport, 0 }, + [PW_CLIENT_NODE_EVENT_SET_PARAM] = { &client_node_demarshal_set_param, 0 }, + [PW_CLIENT_NODE_EVENT_SET_IO] = { &client_node_demarshal_set_io, 0 }, + [PW_CLIENT_NODE_EVENT_EVENT] = { &client_node_demarshal_event_event, 0 }, + [PW_CLIENT_NODE_EVENT_COMMAND] = { &client_node_demarshal_command, 0 }, + [PW_CLIENT_NODE_EVENT_ADD_PORT] = { &client_node_demarshal_add_port, 0 }, + [PW_CLIENT_NODE_EVENT_REMOVE_PORT] = { &client_node_demarshal_remove_port, 0 }, + [PW_CLIENT_NODE_EVENT_PORT_SET_PARAM] = { &client_node_demarshal_port_set_param, 0 }, + [PW_CLIENT_NODE_EVENT_PORT_USE_BUFFERS] = { &client_node_demarshal_port_use_buffers, 0 }, + [PW_CLIENT_NODE_EVENT_PORT_SET_IO] = { &client_node_demarshal_port_set_io, 0 }, + [PW_CLIENT_NODE_EVENT_SET_ACTIVATION] = { &client_node_demarshal_set_activation, 0 }, + [PW_CLIENT_NODE_EVENT_PORT_SET_MIX_INFO] = { &client_node_demarshal_port_set_mix_info, 0 } +}; + +static const struct pw_protocol_marshal pw_protocol_native_client_node_marshal = { + PW_TYPE_INTERFACE_ClientNode, + PW_VERSION_CLIENT_NODE, + 0, + PW_CLIENT_NODE_METHOD_NUM, + PW_CLIENT_NODE_EVENT_NUM, + .client_marshal = &pw_protocol_native_client_node_method_marshal, + .server_demarshal = &pw_protocol_native_client_node_method_demarshal, + .server_marshal = &pw_protocol_native_client_node_event_marshal, + .client_demarshal = pw_protocol_native_client_node_event_demarshal, +}; + +struct pw_protocol *pw_protocol_native_ext_client_node_init(struct pw_context *context) +{ + struct pw_protocol *protocol; + + protocol = pw_context_find_protocol(context, PW_TYPE_INFO_PROTOCOL_Native); + + if (protocol == NULL) + return NULL; + + pw_protocol_add_marshal(protocol, &pw_protocol_native_client_node_marshal); + + return protocol; +} diff --git a/src/modules/module-client-node/remote-node.c b/src/modules/module-client-node/remote-node.c new file mode 100644 index 0000000..051ab07 --- /dev/null +++ b/src/modules/module-client-node/remote-node.c @@ -0,0 +1,1339 @@ +/* PipeWire + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include <stdio.h> +#include <unistd.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <errno.h> +#include <time.h> +#include <sys/mman.h> + +#include <spa/pod/parser.h> +#include <spa/pod/dynamic.h> +#include <spa/node/utils.h> +#include <spa/utils/result.h> +#include <spa/debug/types.h> + +#include "pipewire/pipewire.h" +#include "pipewire/private.h" + +#include "pipewire/extensions/protocol-native.h" +#include "pipewire/extensions/client-node.h" + +#define MAX_BUFFERS 64 + +PW_LOG_TOPIC_EXTERN(mod_topic); +#define PW_LOG_TOPIC_DEFAULT mod_topic + +/** \cond */ +static bool mlock_warned = false; + +struct buffer { + uint32_t id; + struct spa_buffer *buf; + struct pw_memmap *mem; +}; + +struct mix { + struct spa_list link; + struct pw_impl_port *port; + uint32_t mix_id; + struct pw_impl_port_mix mix; + struct pw_array buffers; + bool active; +}; + +struct node_data { + struct pw_context *context; + + struct pw_mempool *pool; + + uint32_t remote_id; + int rtwritefd; + struct pw_memmap *activation; + + struct spa_list mix[2]; + struct spa_list free_mix; + + struct pw_impl_node *node; + struct spa_hook node_listener; + unsigned int do_free:1; + unsigned int have_transport:1; + unsigned int allow_mlock:1; + unsigned int warn_mlock:1; + + struct pw_client_node *client_node; + struct spa_hook client_node_listener; + struct spa_hook proxy_client_node_listener; + + struct spa_list links; +}; + +struct link { + struct spa_list link; + struct node_data *data; + struct pw_memmap *map; + struct pw_node_target target; + uint32_t node_id; + int signalfd; +}; + +/** \endcond */ + +static struct link *find_activation(struct spa_list *links, uint32_t node_id) +{ + struct link *l; + + spa_list_for_each(l, links, link) { + if (l->node_id == node_id) + return l; + } + return NULL; +} + +static int +do_deactivate_link(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct link *link = user_data; + pw_log_trace("link %p deactivate", link); + spa_list_remove(&link->target.link); + return 0; +} + +static void clear_link(struct node_data *data, struct link *link) +{ + struct pw_context *context = data->context; + pw_log_debug("link %p", link); + pw_loop_invoke(context->data_loop, + do_deactivate_link, SPA_ID_INVALID, NULL, 0, true, link); + pw_memmap_free(link->map); + spa_system_close(context->data_system, link->signalfd); + spa_list_remove(&link->link); + free(link); +} + +static void clean_transport(struct node_data *data) +{ + struct link *l; + uint32_t tag[5] = { data->remote_id, }; + struct pw_memmap *mm; + + if (!data->have_transport) + return; + + spa_list_consume(l, &data->links, link) + clear_link(data, l); + + while ((mm = pw_mempool_find_tag(data->pool, tag, sizeof(uint32_t))) != NULL) { + if (mm->tag[1] == SPA_ID_INVALID) + spa_node_set_io(data->node->node, mm->tag[2], NULL, 0); + + pw_memmap_free(mm); + } + + pw_memmap_free(data->activation); + data->node->rt.activation = data->node->activation->map->ptr; + + spa_system_close(data->context->data_system, data->rtwritefd); + data->have_transport = false; +} + +static void mix_init(struct mix *mix, struct pw_impl_port *port, uint32_t mix_id) +{ + pw_log_debug("port %p: mix init %d.%d", port, port->port_id, mix_id); + mix->port = port; + mix->mix_id = mix_id; + pw_impl_port_init_mix(port, &mix->mix); + mix->active = false; + pw_array_init(&mix->buffers, 32); + pw_array_ensure_size(&mix->buffers, sizeof(struct buffer) * 64); +} + +static int +do_deactivate_mix(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct mix *mix = user_data; + spa_list_remove(&mix->mix.rt_link); + return 0; +} + +static int +deactivate_mix(struct node_data *data, struct mix *mix) +{ + if (mix->active) { + pw_log_debug("node %p: mix %p deactivate", data, mix); + pw_loop_invoke(data->context->data_loop, + do_deactivate_mix, SPA_ID_INVALID, NULL, 0, true, mix); + mix->active = false; + } + return 0; +} + +static int +do_activate_mix(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct mix *mix = user_data; + + spa_list_append(&mix->port->rt.mix_list, &mix->mix.rt_link); + return 0; +} + +static int +activate_mix(struct node_data *data, struct mix *mix) +{ + if (!mix->active) { + pw_log_debug("node %p: mix %p activate", data, mix); + pw_loop_invoke(data->context->data_loop, + do_activate_mix, SPA_ID_INVALID, NULL, 0, false, mix); + mix->active = true; + } + return 0; +} + +static struct mix *find_mix(struct node_data *data, + enum spa_direction direction, uint32_t port_id, uint32_t mix_id) +{ + struct mix *mix; + + spa_list_for_each(mix, &data->mix[direction], link) { + if (mix->port->port_id == port_id && + mix->mix_id == mix_id) { + pw_log_debug("port %p: found mix %d:%d.%d", mix->port, + direction, port_id, mix_id); + return mix; + } + } + return NULL; +} + +static struct mix *ensure_mix(struct node_data *data, + enum spa_direction direction, uint32_t port_id, uint32_t mix_id) +{ + struct mix *mix; + struct pw_impl_port *port; + + if ((mix = find_mix(data, direction, port_id, mix_id))) + return mix; + + port = pw_impl_node_find_port(data->node, direction, port_id); + if (port == NULL) + return NULL; + + if (spa_list_is_empty(&data->free_mix)) { + if ((mix = calloc(1, sizeof(*mix))) == NULL) + return NULL; + } else { + mix = spa_list_first(&data->free_mix, struct mix, link); + spa_list_remove(&mix->link); + } + + mix_init(mix, port, mix_id); + spa_list_append(&data->mix[direction], &mix->link); + + return mix; +} + + +static int client_node_transport(void *_data, + int readfd, int writefd, uint32_t mem_id, uint32_t offset, uint32_t size) +{ + struct node_data *data = _data; + struct pw_proxy *proxy = (struct pw_proxy*)data->client_node; + + clean_transport(data); + + data->activation = pw_mempool_map_id(data->pool, mem_id, + PW_MEMMAP_FLAG_READWRITE, offset, size, NULL); + if (data->activation == NULL) { + pw_log_warn("remote-node %p: can't map activation: %m", proxy); + return -errno; + } + + data->node->rt.activation = data->activation->ptr; + + pw_log_debug("remote-node %p: fds:%d %d node:%u activation:%p", + proxy, readfd, writefd, data->remote_id, data->activation->ptr); + + data->rtwritefd = writefd; + spa_system_close(data->context->data_system, data->node->source.fd); + data->node->source.fd = readfd; + + data->have_transport = true; + + if (data->node->active) + pw_client_node_set_active(data->client_node, true); + + return 0; +} + +static int add_node_update(struct node_data *data, uint32_t change_mask, uint32_t info_mask) +{ + struct pw_impl_node *node = data->node; + struct spa_node_info ni = SPA_NODE_INFO_INIT(); + uint32_t n_params = 0; + struct spa_pod **params = NULL; + int res; + + if (change_mask & PW_CLIENT_NODE_UPDATE_PARAMS) { + uint32_t i, idx, id; + uint8_t buf[4096]; + struct spa_pod_dynamic_builder b; + + for (i = 0; i < node->info.n_params; i++) { + struct spa_pod *param; + + id = node->info.params[i].id; + if (id == SPA_PARAM_Invalid) + continue; + + for (idx = 0;;) { + spa_pod_dynamic_builder_init(&b, buf, sizeof(buf), 4096); + + res = spa_node_enum_params_sync(node->node, + id, &idx, NULL, ¶m, &b.b); + if (res == 1) { + void *p; + p = pw_reallocarray(params, n_params + 1, sizeof(struct spa_pod *)); + if (p == NULL) { + res = -errno; + pw_log_error("realloc failed: %m"); + } else { + params = p; + params[n_params++] = spa_pod_copy(param); + } + } + spa_pod_dynamic_builder_clean(&b); + if (res != 1) + break; + } + } + } + if (change_mask & PW_CLIENT_NODE_UPDATE_INFO) { + ni.max_input_ports = node->info.max_input_ports; + ni.max_output_ports = node->info.max_output_ports; + ni.change_mask = info_mask; + ni.flags = node->spa_flags; + ni.props = node->info.props; + ni.params = node->info.params; + ni.n_params = node->info.n_params; + } + + res = pw_client_node_update(data->client_node, + change_mask, + n_params, + (const struct spa_pod **)params, + &ni); + + if (params) { + while (n_params > 0) + free(params[--n_params]); + free(params); + } + return res; +} + +static int add_port_update(struct node_data *data, struct pw_impl_port *port, uint32_t change_mask) +{ + struct spa_port_info pi = SPA_PORT_INFO_INIT(); + uint32_t n_params = 0; + struct spa_pod **params = NULL; + int res; + + if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_PARAMS) { + uint32_t i, idx, id; + uint8_t buf[4096]; + struct spa_pod_dynamic_builder b; + + for (i = 0; i < port->info.n_params; i++) { + struct spa_pod *param; + + id = port->info.params[i].id; + if (id == SPA_PARAM_Invalid) + continue; + + for (idx = 0;;) { + spa_pod_dynamic_builder_init(&b, buf, sizeof(buf), 4096); + + res = spa_node_port_enum_params_sync(port->node->node, + port->direction, port->port_id, + id, &idx, NULL, ¶m, &b.b); + if (res == 1) { + void *p; + p = pw_reallocarray(params, n_params + 1, sizeof(struct spa_pod*)); + if (p == NULL) { + res = -errno; + pw_log_error("realloc failed: %m"); + } else { + params = p; + params[n_params++] = spa_pod_copy(param); + } + } + spa_pod_dynamic_builder_clean(&b); + + if (res != 1) + break; + + } + } + } + if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_INFO) { + pi.change_mask = SPA_PORT_CHANGE_MASK_FLAGS | + SPA_PORT_CHANGE_MASK_RATE | + SPA_PORT_CHANGE_MASK_PROPS | + SPA_PORT_CHANGE_MASK_PARAMS; + pi.flags = port->spa_flags; + pi.rate = SPA_FRACTION(0, 1); + pi.props = &port->properties->dict; + SPA_FLAG_CLEAR(pi.flags, SPA_PORT_FLAG_DYNAMIC_DATA); + pi.n_params = port->info.n_params; + pi.params = port->info.params; + } + + res = pw_client_node_port_update(data->client_node, + port->direction, + port->port_id, + change_mask, + n_params, + (const struct spa_pod **)params, + &pi); + if (params) { + while (n_params > 0) + free(params[--n_params]); + free(params); + } + return res; +} + +static int +client_node_set_param(void *_data, uint32_t id, uint32_t flags, + const struct spa_pod *param) +{ + struct node_data *data = _data; + struct pw_proxy *proxy = (struct pw_proxy*)data->client_node; + int res; + + pw_log_debug("node %p: set_param %s:", proxy, + spa_debug_type_find_name(spa_type_param, id)); + + res = spa_node_set_param(data->node->node, id, flags, param); + + if (res < 0) { + pw_log_error("node %p: set_param %s (%d) %p: %s", proxy, + spa_debug_type_find_name(spa_type_param, id), + id, param, spa_strerror(res)); + pw_proxy_errorf(proxy, res, "node_set_param(%s) failed: %s", + spa_debug_type_find_name(spa_type_param, id), + spa_strerror(res)); + } + return res; +} + +static int +client_node_set_io(void *_data, + uint32_t id, + uint32_t memid, + uint32_t offset, + uint32_t size) +{ + struct node_data *data = _data; + struct pw_proxy *proxy = (struct pw_proxy*)data->client_node; + struct pw_memmap *old, *mm; + void *ptr; + uint32_t tag[5] = { data->remote_id, SPA_ID_INVALID, id, }; + int res; + + old = pw_mempool_find_tag(data->pool, tag, sizeof(tag)); + + if (memid == SPA_ID_INVALID) { + mm = ptr = NULL; + size = 0; + } else { + mm = pw_mempool_map_id(data->pool, memid, + PW_MEMMAP_FLAG_READWRITE, offset, size, tag); + if (mm == NULL) { + pw_log_warn("can't map memory id %u: %m", memid); + res = -errno; + goto exit; + } + ptr = mm->ptr; + } + + pw_log_debug("node %p: set io %s %p", proxy, + spa_debug_type_find_name(spa_type_io, id), ptr); + + res = spa_node_set_io(data->node->node, id, ptr, size); + + pw_memmap_free(old); +exit: + if (res < 0) { + pw_log_error("node %p: set_io: %s", proxy, spa_strerror(res)); + pw_proxy_errorf(proxy, res, "node_set_io failed: %s", spa_strerror(res)); + } + return res; +} + +static int client_node_event(void *data, const struct spa_event *event) +{ + pw_log_warn("unhandled node event %d", SPA_EVENT_TYPE(event)); + return -ENOTSUP; +} + +static int client_node_command(void *_data, const struct spa_command *command) +{ + struct node_data *data = _data; + struct pw_proxy *proxy = (struct pw_proxy*)data->client_node; + int res; + + switch (SPA_NODE_COMMAND_ID(command)) { + case SPA_NODE_COMMAND_Pause: + pw_log_debug("node %p: pause", proxy); + + if ((res = pw_impl_node_set_state(data->node, PW_NODE_STATE_IDLE)) < 0) { + pw_log_warn("node %p: pause failed", proxy); + pw_proxy_error(proxy, res, "pause failed"); + } + + break; + case SPA_NODE_COMMAND_Start: + pw_log_debug("node %p: start", proxy); + + if ((res = pw_impl_node_set_state(data->node, PW_NODE_STATE_RUNNING)) < 0) { + pw_log_warn("node %p: start failed", proxy); + pw_proxy_error(proxy, res, "start failed"); + } + break; + + case SPA_NODE_COMMAND_Suspend: + pw_log_debug("node %p: suspend", proxy); + if ((res = pw_impl_node_set_state(data->node, PW_NODE_STATE_SUSPENDED)) < 0) { + pw_log_warn("node %p: suspend failed", proxy); + pw_proxy_error(proxy, res, "suspend failed"); + } + break; + case SPA_NODE_COMMAND_RequestProcess: + res = pw_impl_node_send_command(data->node, command); + break; + default: + pw_log_warn("unhandled node command %d", SPA_NODE_COMMAND_ID(command)); + res = -ENOTSUP; + pw_proxy_errorf(proxy, res, "command %d not supported", SPA_NODE_COMMAND_ID(command)); + } + return res; +} + +static int +client_node_add_port(void *_data, enum spa_direction direction, uint32_t port_id, + const struct spa_dict *props) +{ + struct node_data *data = _data; + struct pw_proxy *proxy = (struct pw_proxy*)data->client_node; + pw_log_warn("add port not supported"); + pw_proxy_error(proxy, -ENOTSUP, "add port not supported"); + return -ENOTSUP; +} + +static int +client_node_remove_port(void *_data, enum spa_direction direction, uint32_t port_id) +{ + struct node_data *data = _data; + struct pw_proxy *proxy = (struct pw_proxy*)data->client_node; + pw_log_warn("remove port not supported"); + pw_proxy_error(proxy, -ENOTSUP, "remove port not supported"); + return -ENOTSUP; +} + +static int clear_buffers(struct node_data *data, struct mix *mix) +{ + struct pw_impl_port *port = mix->port; + struct buffer *b; + int res; + + pw_log_debug("port %p: clear %zd buffers mix:%d", port, + pw_array_get_len(&mix->buffers, struct buffer *), + mix->mix_id); + + if ((res = pw_impl_port_use_buffers(port, &mix->mix, 0, NULL, 0)) < 0) { + pw_log_error("port %p: error clear buffers %s", port, spa_strerror(res)); + return res; + } + + pw_array_for_each(b, &mix->buffers) { + pw_log_debug("port %p: clear buffer %d map %p %p", + port, b->id, b->mem, b->buf); + pw_memmap_free(b->mem); + free(b->buf); + } + mix->buffers.size = 0; + return 0; +} + +static int +client_node_port_set_param(void *_data, + enum spa_direction direction, uint32_t port_id, + uint32_t id, uint32_t flags, + const struct spa_pod *param) +{ + struct node_data *data = _data; + struct pw_proxy *proxy = (struct pw_proxy*)data->client_node; + struct pw_impl_port *port; + int res; + + port = pw_impl_node_find_port(data->node, direction, port_id); + if (port == NULL) { + res = -EINVAL; + goto error_exit; + } + + pw_log_debug("port %p: set_param %s %p", port, + spa_debug_type_find_name(spa_type_param, id), param); + + res = pw_impl_port_set_param(port, id, flags, param); + if (res < 0) + goto error_exit; + + if (id == SPA_PARAM_Format) { + struct mix *mix; + spa_list_for_each(mix, &data->mix[direction], link) { + if (mix->port->port_id == port_id) + clear_buffers(data, mix); + } + } + return res; + +error_exit: + pw_log_error("port %p: set_param %d %p: %s", port, id, param, spa_strerror(res)); + pw_proxy_errorf(proxy, res, "port_set_param(%s) failed: %s", + spa_debug_type_find_name(spa_type_param, id), + spa_strerror(res)); + return res; +} + +static int +client_node_port_use_buffers(void *_data, + enum spa_direction direction, uint32_t port_id, uint32_t mix_id, + uint32_t flags, + uint32_t n_buffers, struct pw_client_node_buffer *buffers) +{ + struct node_data *data = _data; + struct pw_proxy *proxy = (struct pw_proxy*)data->client_node; + struct buffer *bid; + uint32_t i, j; + struct spa_buffer *b, **bufs; + struct mix *mix; + int res, prot; + + mix = ensure_mix(data, direction, port_id, mix_id); + if (mix == NULL) { + res = -ENOENT; + goto error_exit; + } + + if (n_buffers > MAX_BUFFERS) + return -ENOSPC; + + prot = PW_MEMMAP_FLAG_READWRITE; + + /* clear previous buffers */ + clear_buffers(data, mix); + + bufs = alloca(n_buffers * sizeof(struct spa_buffer *)); + + for (i = 0; i < n_buffers; i++) { + size_t size; + off_t offset; + struct pw_memmap *mm; + + mm = pw_mempool_map_id(data->pool, buffers[i].mem_id, + prot, buffers[i].offset, buffers[i].size, NULL); + if (mm == NULL) { + res = -errno; + goto error_exit_cleanup; + } + + bid = pw_array_add(&mix->buffers, sizeof(struct buffer)); + if (bid == NULL) { + res = -errno; + goto error_exit_cleanup; + } + bid->id = i; + bid->mem = mm; + + if (data->allow_mlock && mlock(mm->ptr, mm->size) < 0) + if (errno != ENOMEM || !mlock_warned) { + pw_log(data->warn_mlock ? SPA_LOG_LEVEL_WARN : SPA_LOG_LEVEL_DEBUG, + "Failed to mlock memory %p %u: %s", + mm->ptr, mm->size, + errno == ENOMEM ? + "This is not a problem but for best performance, " + "consider increasing RLIMIT_MEMLOCK" : strerror(errno)); + mlock_warned |= errno == ENOMEM; + } + + size = sizeof(struct spa_buffer); + for (j = 0; j < buffers[i].buffer->n_metas; j++) + size += sizeof(struct spa_meta); + for (j = 0; j < buffers[i].buffer->n_datas; j++) + size += sizeof(struct spa_data); + + b = bid->buf = malloc(size); + if (b == NULL) { + res = -errno; + goto error_exit_cleanup; + } + memcpy(b, buffers[i].buffer, sizeof(struct spa_buffer)); + + b->metas = SPA_PTROFF(b, sizeof(struct spa_buffer), struct spa_meta); + b->datas = SPA_PTROFF(b->metas, sizeof(struct spa_meta) * b->n_metas, + struct spa_data); + + pw_log_debug("add buffer mem:%d id:%d offset:%u size:%u %p", mm->block->id, + bid->id, buffers[i].offset, buffers[i].size, bid->buf); + + offset = 0; + for (j = 0; j < b->n_metas; j++) { + struct spa_meta *m = &b->metas[j]; + memcpy(m, &buffers[i].buffer->metas[j], sizeof(struct spa_meta)); + m->data = SPA_PTROFF(mm->ptr, offset, void); + offset += SPA_ROUND_UP_N(m->size, 8); + } + + for (j = 0; j < b->n_datas; j++) { + struct spa_data *d = &b->datas[j]; + + memcpy(d, &buffers[i].buffer->datas[j], sizeof(struct spa_data)); + d->chunk = + SPA_PTROFF(mm->ptr, offset + sizeof(struct spa_chunk) * j, + struct spa_chunk); + + if (flags & SPA_NODE_BUFFERS_FLAG_ALLOC) + continue; + + if (d->type == SPA_DATA_MemId) { + uint32_t mem_id = SPA_PTR_TO_UINT32(d->data); + struct pw_memblock *bm; + + bm = pw_mempool_find_id(data->pool, mem_id); + if (bm == NULL) { + pw_log_error("unknown buffer mem %u", mem_id); + res = -ENODEV; + goto error_exit_cleanup; + } + + d->fd = bm->fd; + d->type = bm->type; + d->data = NULL; + + pw_log_debug(" data %d %u -> fd %d maxsize %d", + j, bm->id, bm->fd, d->maxsize); + } else if (d->type == SPA_DATA_MemPtr) { + int offs = SPA_PTR_TO_INT(d->data); + d->data = SPA_PTROFF(mm->ptr, offs, void); + d->fd = -1; + pw_log_debug(" data %d id:%u -> mem:%p offs:%d maxsize:%d", + j, bid->id, d->data, offs, d->maxsize); + } else { + pw_log_warn("unknown buffer data type %d", d->type); + } + } + bufs[i] = b; + } + + if ((res = pw_impl_port_use_buffers(mix->port, &mix->mix, flags, bufs, n_buffers)) < 0) + goto error_exit_cleanup; + + if (flags & SPA_NODE_BUFFERS_FLAG_ALLOC) { + pw_client_node_port_buffers(data->client_node, + direction, port_id, mix_id, + n_buffers, + bufs); + } + return res; + +error_exit_cleanup: + clear_buffers(data, mix); +error_exit: + pw_log_error("port %p: use_buffers: %d %s", mix, res, spa_strerror(res)); + pw_proxy_errorf(proxy, res, "port_use_buffers error: %s", spa_strerror(res)); + return res; +} + +static int +client_node_port_set_io(void *_data, + uint32_t direction, + uint32_t port_id, + uint32_t mix_id, + uint32_t id, + uint32_t memid, + uint32_t offset, + uint32_t size) +{ + struct node_data *data = _data; + struct pw_proxy *proxy = (struct pw_proxy*)data->client_node; + struct mix *mix; + struct pw_memmap *mm, *old; + void *ptr; + int res = 0; + uint32_t tag[5] = { data->remote_id, direction, port_id, mix_id, id }; + + mix = ensure_mix(data, direction, port_id, mix_id); + if (mix == NULL) { + res = -ENOENT; + goto exit; + } + + old = pw_mempool_find_tag(data->pool, tag, sizeof(tag)); + + if (memid == SPA_ID_INVALID) { + mm = ptr = NULL; + size = 0; + } + else { + mm = pw_mempool_map_id(data->pool, memid, + PW_MEMMAP_FLAG_READWRITE, offset, size, tag); + if (mm == NULL) { + pw_log_warn("can't map memory id %u: %m", memid); + res = -errno; + goto exit; + } + ptr = mm->ptr; + } + + pw_log_debug("port %p: set io:%s new:%p old:%p", mix->port, + spa_debug_type_find_name(spa_type_io, id), ptr, mix->mix.io); + + if (id == SPA_IO_Buffers) { + if (ptr == NULL && mix->mix.io) + deactivate_mix(data, mix); + } + + if ((res = spa_node_port_set_io(mix->port->mix, + direction, mix->mix.port.port_id, id, ptr, size)) < 0) { + if (res == -ENOTSUP) + res = 0; + else + goto exit_free; + } + if (id == SPA_IO_Buffers) { + mix->mix.io = ptr; + if (ptr) + activate_mix(data, mix); + } +exit_free: + pw_memmap_free(old); +exit: + if (res < 0) { + pw_log_error("port %p: set_io: %s", mix, spa_strerror(res)); + pw_proxy_errorf(proxy, res, "port_set_io failed: %s", spa_strerror(res)); + } + return res; +} + +static int link_signal_func(void *user_data) +{ + struct link *link = user_data; + struct spa_system *data_system = link->data->context->data_system; + + pw_log_trace_fp("link %p: signal %p", link, link->target.activation); + if (SPA_UNLIKELY(spa_system_eventfd_write(data_system, link->signalfd, 1) < 0)) + pw_log_warn("link %p: write failed %m", link); + + return 0; +} + +static int +do_activate_link(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct link *link = user_data; + struct node_data *d = link->data; + pw_log_trace("link %p activate", link); + spa_list_append(&d->node->rt.target_list, &link->target.link); + return 0; +} + +static int +client_node_set_activation(void *_data, + uint32_t node_id, + int signalfd, + uint32_t memid, + uint32_t offset, + uint32_t size) +{ + struct node_data *data = _data; + struct pw_proxy *proxy = (struct pw_proxy*)data->client_node; + struct pw_impl_node *node = data->node; + struct pw_memmap *mm; + void *ptr; + struct link *link; + int res = 0; + + if (data->remote_id == node_id) { + pw_log_debug("node %p: our activation %u: %u %u %u", node, node_id, + memid, offset, size); + spa_system_close(data->context->data_system, signalfd); + return 0; + } + + if (memid == SPA_ID_INVALID) { + mm = ptr = NULL; + size = 0; + } else { + mm = pw_mempool_map_id(data->pool, memid, + PW_MEMMAP_FLAG_READWRITE, offset, size, NULL); + if (mm == NULL) { + res = -errno; + goto error_exit; + } + ptr = mm->ptr; + } + pw_log_debug("node %p: set activation %d %p %u %u", node, node_id, ptr, offset, size); + + if (ptr) { + link = calloc(1, sizeof(struct link)); + if (link == NULL) { + res = -errno; + goto error_exit; + } + link->data = data; + link->node_id = node_id; + link->map = mm; + link->target.activation = ptr; + link->signalfd = signalfd; + link->target.signal_func = link_signal_func; + link->target.data = link; + link->target.node = NULL; + spa_list_append(&data->links, &link->link); + + pw_loop_invoke(data->context->data_loop, + do_activate_link, SPA_ID_INVALID, NULL, 0, false, link); + + pw_log_debug("node %p: link %p: fd:%d id:%u state %p required %d, pending %d", + node, link, signalfd, + link->target.activation->position.clock.id, + &link->target.activation->state[0], + link->target.activation->state[0].required, + link->target.activation->state[0].pending); + } else { + link = find_activation(&data->links, node_id); + if (link == NULL) { + res = -ENOENT; + goto error_exit; + } + clear_link(data, link); + } + return res; + +error_exit: + pw_log_error("node %p: set activation %d: %s", node, node_id, spa_strerror(res)); + pw_proxy_errorf(proxy, res, "set_activation: %s", spa_strerror(res)); + return res; +} + +static const struct pw_client_node_events client_node_events = { + PW_VERSION_CLIENT_NODE_EVENTS, + .transport = client_node_transport, + .set_param = client_node_set_param, + .set_io = client_node_set_io, + .event = client_node_event, + .command = client_node_command, + .add_port = client_node_add_port, + .remove_port = client_node_remove_port, + .port_set_param = client_node_port_set_param, + .port_use_buffers = client_node_port_use_buffers, + .port_set_io = client_node_port_set_io, + .set_activation = client_node_set_activation, +}; + +static void do_node_init(struct node_data *data) +{ + struct pw_impl_port *port; + + pw_log_debug("%p: node %p init", data, data->node); + add_node_update(data, PW_CLIENT_NODE_UPDATE_PARAMS | + PW_CLIENT_NODE_UPDATE_INFO, + SPA_NODE_CHANGE_MASK_FLAGS | + SPA_NODE_CHANGE_MASK_PROPS | + SPA_NODE_CHANGE_MASK_PARAMS); + + spa_list_for_each(port, &data->node->input_ports, link) { + add_port_update(data, port, + PW_CLIENT_NODE_PORT_UPDATE_PARAMS | + PW_CLIENT_NODE_PORT_UPDATE_INFO); + } + spa_list_for_each(port, &data->node->output_ports, link) { + add_port_update(data, port, + PW_CLIENT_NODE_PORT_UPDATE_PARAMS | + PW_CLIENT_NODE_PORT_UPDATE_INFO); + } +} + +static void clear_mix(struct node_data *data, struct mix *mix) +{ + pw_log_debug("port %p: mix clear %d.%d", mix->port, mix->port->port_id, mix->mix_id); + + deactivate_mix(data, mix); + + spa_list_remove(&mix->link); + + clear_buffers(data, mix); + pw_array_clear(&mix->buffers); + + spa_list_append(&data->free_mix, &mix->link); + pw_impl_port_release_mix(mix->port, &mix->mix); +} + +static void clean_node(struct node_data *d) +{ + struct mix *mix; + + if (d->have_transport) { + spa_list_consume(mix, &d->mix[SPA_DIRECTION_INPUT], link) + clear_mix(d, mix); + spa_list_consume(mix, &d->mix[SPA_DIRECTION_OUTPUT], link) + clear_mix(d, mix); + } + spa_list_consume(mix, &d->free_mix, link) { + spa_list_remove(&mix->link); + free(mix); + } + clean_transport(d); +} + +static void node_destroy(void *data) +{ + struct node_data *d = data; + + pw_log_debug("%p: destroy", d); + + clean_node(d); +} + +static void node_free(void *data) +{ + struct node_data *d = data; + pw_log_debug("%p: free", d); + d->node = NULL; +} + +static void node_info_changed(void *data, const struct pw_node_info *info) +{ + struct node_data *d = data; + uint32_t change_mask, info_mask; + + pw_log_debug("info changed %p", d); + + if (d->client_node == NULL) + return; + + change_mask = PW_CLIENT_NODE_UPDATE_INFO; + info_mask = SPA_NODE_CHANGE_MASK_FLAGS; + if (info->change_mask & PW_NODE_CHANGE_MASK_PROPS) { + info_mask |= SPA_NODE_CHANGE_MASK_PROPS; + } + if (info->change_mask & PW_NODE_CHANGE_MASK_PARAMS) { + change_mask |= PW_CLIENT_NODE_UPDATE_PARAMS; + info_mask |= SPA_NODE_CHANGE_MASK_PARAMS; + } + add_node_update(d, change_mask, info_mask); +} + +static void node_port_info_changed(void *data, struct pw_impl_port *port, + const struct pw_port_info *info) +{ + struct node_data *d = data; + uint32_t change_mask = 0; + + pw_log_debug("info changed %p", d); + + if (d->client_node == NULL) + return; + + if (info->change_mask & PW_PORT_CHANGE_MASK_PROPS) + change_mask |= PW_CLIENT_NODE_PORT_UPDATE_INFO; + if (info->change_mask & PW_PORT_CHANGE_MASK_PARAMS) { + change_mask |= PW_CLIENT_NODE_PORT_UPDATE_PARAMS; + change_mask |= PW_CLIENT_NODE_PORT_UPDATE_INFO; + } + add_port_update(d, port, change_mask); +} + +static void node_port_removed(void *data, struct pw_impl_port *port) +{ + struct node_data *d = data; + struct mix *mix, *tmp; + + pw_log_debug("removed %p", d); + + if (d->client_node == NULL) + return; + + pw_client_node_port_update(d->client_node, + port->direction, + port->port_id, + 0, 0, NULL, NULL); + + spa_list_for_each_safe(mix, tmp, &d->mix[port->direction], link) { + if (mix->port == port) + clear_mix(d, mix); + } +} + +static void node_active_changed(void *data, bool active) +{ + struct node_data *d = data; + pw_log_debug("active %d", active); + + if (d->client_node == NULL) + return; + + pw_client_node_set_active(d->client_node, active); +} + +static void node_event(void *data, const struct spa_event *event) +{ + struct node_data *d = data; + pw_log_debug("%p", d); + + if (d->client_node == NULL) + return; + pw_client_node_event(d->client_node, event); +} + +static const struct pw_impl_node_events node_events = { + PW_VERSION_IMPL_NODE_EVENTS, + .destroy = node_destroy, + .free = node_free, + .info_changed = node_info_changed, + .port_info_changed = node_port_info_changed, + .port_removed = node_port_removed, + .active_changed = node_active_changed, + .event = node_event, +}; + +static void client_node_removed(void *_data) +{ + struct node_data *data = _data; + pw_log_debug("%p: removed", data); + + spa_hook_remove(&data->proxy_client_node_listener); + spa_hook_remove(&data->client_node_listener); + + if (data->node) { + spa_hook_remove(&data->node_listener); + pw_impl_node_set_state(data->node, PW_NODE_STATE_SUSPENDED); + + clean_node(data); + + if (data->do_free) + pw_impl_node_destroy(data->node); + } + data->client_node = NULL; +} + +static void client_node_destroy(void *_data) +{ + struct node_data *data = _data; + + pw_log_debug("%p: destroy", data); + client_node_removed(_data); +} + +static void client_node_bound(void *_data, uint32_t global_id) +{ + struct node_data *data = _data; + pw_log_debug("%p: bound %u", data, global_id); + data->remote_id = global_id; +} + +static const struct pw_proxy_events proxy_client_node_events = { + PW_VERSION_PROXY_EVENTS, + .removed = client_node_removed, + .destroy = client_node_destroy, + .bound = client_node_bound, +}; + +static int node_ready(void *d, int status) +{ + struct node_data *data = d; + struct pw_impl_node *node = data->node; + struct pw_node_activation *a = node->rt.activation; + struct spa_system *data_system = data->context->data_system; + struct timespec ts; + struct pw_impl_port *p; + + pw_log_trace_fp("node %p: ready driver:%d exported:%d status:%d", node, + node->driver, node->exported, status); + + if (status & SPA_STATUS_HAVE_DATA) { + spa_list_for_each(p, &node->rt.output_mix, rt.node_link) + spa_node_process(p->mix); + } + + spa_system_clock_gettime(data_system, CLOCK_MONOTONIC, &ts); + a->status = PW_NODE_ACTIVATION_TRIGGERED; + a->signal_time = SPA_TIMESPEC_TO_NSEC(&ts); + + if (SPA_UNLIKELY(spa_system_eventfd_write(data_system, data->rtwritefd, 1) < 0)) + pw_log_warn("node %p: write failed %m", node); + + return 0; +} + +static int node_reuse_buffer(void *data, uint32_t port_id, uint32_t buffer_id) +{ + return 0; +} + +static int node_xrun(void *d, uint64_t trigger, uint64_t delay, struct spa_pod *info) +{ + struct node_data *data = d; + struct pw_impl_node *node = data->node; + struct pw_node_activation *a = node->rt.activation; + + a->xrun_count++; + a->xrun_time = trigger; + a->xrun_delay = delay; + a->max_delay = SPA_MAX(a->max_delay, delay); + + pw_log_debug("node %p: XRun! count:%u time:%"PRIu64" delay:%"PRIu64" max:%"PRIu64, + node, a->xrun_count, trigger, delay, a->max_delay); + + pw_context_driver_emit_xrun(data->context, node); + + return 0; +} + +static const struct spa_node_callbacks node_callbacks = { + SPA_VERSION_NODE_CALLBACKS, + .ready = node_ready, + .reuse_buffer = node_reuse_buffer, + .xrun = node_xrun +}; + +static struct pw_proxy *node_export(struct pw_core *core, void *object, bool do_free, + size_t user_data_size) +{ + struct pw_impl_node *node = object; + struct pw_proxy *client_node; + struct node_data *data; + + user_data_size = SPA_ROUND_UP_N(user_data_size, __alignof__(struct node_data)); + + client_node = pw_core_create_object(core, + "client-node", + PW_TYPE_INTERFACE_ClientNode, + PW_VERSION_CLIENT_NODE, + &node->properties->dict, + user_data_size + sizeof(struct node_data)); + if (client_node == NULL) + goto error; + + data = pw_proxy_get_user_data(client_node); + data = SPA_PTROFF(data, user_data_size, struct node_data); + data->pool = pw_core_get_mempool(core); + data->node = node; + data->do_free = do_free; + data->context = pw_impl_node_get_context(node); + data->client_node = (struct pw_client_node *)client_node; + data->remote_id = SPA_ID_INVALID; + + + data->allow_mlock = pw_properties_get_bool(node->properties, "mem.allow-mlock", + data->context->settings.mem_allow_mlock); + + data->warn_mlock = pw_properties_get_bool(node->properties, "mem.warn-mlock", + data->context->settings.mem_warn_mlock); + + node->exported = true; + + spa_list_init(&data->free_mix); + spa_list_init(&data->mix[0]); + spa_list_init(&data->mix[1]); + + spa_list_init(&data->links); + + pw_proxy_add_listener(client_node, + &data->proxy_client_node_listener, + &proxy_client_node_events, data); + + spa_node_set_callbacks(node->node, &node_callbacks, data); + pw_impl_node_add_listener(node, &data->node_listener, &node_events, data); + + pw_client_node_add_listener(data->client_node, + &data->client_node_listener, + &client_node_events, + data); + do_node_init(data); + + return client_node; +error: + if (do_free) + pw_impl_node_destroy(node); + return NULL; + +} + +struct pw_proxy *pw_core_node_export(struct pw_core *core, + const char *type, const struct spa_dict *props, void *object, + size_t user_data_size) +{ + struct pw_impl_node *node = object; + + if (props) + pw_impl_node_update_properties(node, props); + return node_export(core, object, false, user_data_size); +} + +struct pw_proxy *pw_core_spa_node_export(struct pw_core *core, + const char *type, const struct spa_dict *props, void *object, + size_t user_data_size) +{ + struct pw_impl_node *node; + struct pw_proxy *proxy; + const char *str; + bool do_register; + + str = props ? spa_dict_lookup(props, PW_KEY_OBJECT_REGISTER) : NULL; + do_register = str ? pw_properties_parse_bool(str) : true; + + node = pw_context_create_node(pw_core_get_context(core), + props ? pw_properties_new_dict(props) : NULL, 0); + if (node == NULL) + return NULL; + + pw_impl_node_set_implementation(node, (struct spa_node*)object); + + if (do_register) + pw_impl_node_register(node, NULL); + + proxy = node_export(core, node, true, user_data_size); + if (proxy) + pw_impl_node_set_active(node, true); + + return proxy; +} diff --git a/src/modules/module-client-node/v0/client-node.c b/src/modules/module-client-node/v0/client-node.c new file mode 100644 index 0000000..e70a7c2 --- /dev/null +++ b/src/modules/module-client-node/v0/client-node.c @@ -0,0 +1,1447 @@ +/* PipeWire + * + * Copyright © 2015 Wim Taymans <wim.taymans@gmail.com> + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include <string.h> +#include <stddef.h> +#include <stdio.h> +#include <errno.h> +#include <unistd.h> +#include <fcntl.h> +#include <dlfcn.h> +#include <sys/socket.h> +#include <sys/mman.h> + +#include <spa/node/node.h> +#include <spa/node/utils.h> +#include <spa/node/io.h> +#include <spa/pod/filter.h> +#include <spa/utils/keys.h> + +#define PW_ENABLE_DEPRECATED + +#include "pipewire/pipewire.h" +#include "pipewire/private.h" + +#include "pipewire/context.h" +#include "modules/spa/spa-node.h" +#include "client-node.h" +#include "transport.h" + +PW_LOG_TOPIC_EXTERN(mod_topic); +#define PW_LOG_TOPIC_DEFAULT mod_topic + +/** \cond */ + +#define MAX_INPUTS 64 +#define MAX_OUTPUTS 64 + +#define MAX_BUFFERS 64 + +#define CHECK_IN_PORT_ID(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) < MAX_INPUTS) +#define CHECK_OUT_PORT_ID(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) < MAX_OUTPUTS) +#define CHECK_PORT_ID(this,d,p) (CHECK_IN_PORT_ID(this,d,p) || CHECK_OUT_PORT_ID(this,d,p)) +#define CHECK_FREE_IN_PORT(this,d,p) (CHECK_IN_PORT_ID(this,d,p) && !(this)->in_ports[p].valid) +#define CHECK_FREE_OUT_PORT(this,d,p) (CHECK_OUT_PORT_ID(this,d,p) && !(this)->out_ports[p].valid) +#define CHECK_FREE_PORT(this,d,p) (CHECK_FREE_IN_PORT (this,d,p) || CHECK_FREE_OUT_PORT (this,d,p)) +#define CHECK_IN_PORT(this,d,p) (CHECK_IN_PORT_ID(this,d,p) && (this)->in_ports[p].valid) +#define CHECK_OUT_PORT(this,d,p) (CHECK_OUT_PORT_ID(this,d,p) && (this)->out_ports[p].valid) +#define CHECK_PORT(this,d,p) (CHECK_IN_PORT (this,d,p) || CHECK_OUT_PORT (this,d,p)) + +#define GET_IN_PORT(this,p) (&this->in_ports[p]) +#define GET_OUT_PORT(this,p) (&this->out_ports[p]) +#define GET_PORT(this,d,p) (d == SPA_DIRECTION_INPUT ? GET_IN_PORT(this,p) : GET_OUT_PORT(this,p)) + +#define CHECK_PORT_BUFFER(this,b,p) (b < p->n_buffers) + +extern uint32_t pw_protocol_native0_type_from_v2(struct pw_impl_client *client, uint32_t type); +extern uint32_t pw_protocol_native0_name_to_v2(struct pw_impl_client *client, const char *name); + +struct mem { + uint32_t id; + int ref; + int fd; + uint32_t type; + uint32_t flags; +}; + +struct buffer { + struct spa_buffer *outbuf; + struct spa_buffer buffer; + struct spa_meta metas[4]; + struct spa_data datas[4]; + bool outstanding; + uint32_t memid; +}; + +struct port { + uint32_t id; + enum spa_direction direction; + + bool valid; + struct spa_port_info info; + struct pw_properties *properties; + + bool have_format; + uint32_t n_params; + struct spa_pod **params; + struct spa_io_buffers *io; + + uint32_t n_buffers; + struct buffer buffers[MAX_BUFFERS]; +}; + +struct node { + struct spa_node node; + + struct impl *impl; + + struct spa_log *log; + struct spa_loop *data_loop; + struct spa_system *data_system; + + struct spa_hook_list hooks; + struct spa_callbacks callbacks; + + struct spa_io_position *position; + + struct pw_resource *resource; + + struct spa_source data_source; + int writefd; + + struct spa_node_info info; + + uint32_t n_inputs; + uint32_t n_outputs; + struct port in_ports[MAX_INPUTS]; + struct port out_ports[MAX_OUTPUTS]; + + uint32_t n_params; + struct spa_pod **params; + + uint32_t seq; + uint32_t init_pending; +}; + +struct impl { + struct pw_impl_client_node0 this; + + bool client_reuse; + + struct pw_context *context; + + struct node node; + + struct pw_client_node0_transport *transport; + + struct spa_hook node_listener; + struct spa_hook resource_listener; + struct spa_hook object_listener; + + struct pw_array mems; + + int fds[2]; + int other_fds[2]; + + uint32_t input_ready; + bool out_pending; +}; + +/** \endcond */ + +static struct mem *ensure_mem(struct impl *impl, int fd, uint32_t type, uint32_t flags) +{ + struct mem *m, *f = NULL; + + pw_array_for_each(m, &impl->mems) { + if (m->ref <= 0) + f = m; + else if (m->fd == fd) + goto found; + } + + if (f == NULL) { + m = pw_array_add(&impl->mems, sizeof(struct mem)); + m->id = pw_array_get_len(&impl->mems, struct mem) - 1; + m->ref = 0; + } + else { + m = f; + } + m->fd = fd; + m->type = type; + m->flags = flags; + + pw_client_node0_resource_add_mem(impl->node.resource, + m->id, + type, + m->fd, + m->flags); + found: + m->ref++; + return m; +} + + +static int clear_buffers(struct node *this, struct port *port) +{ + uint32_t i, j; + struct impl *impl = this->impl; + + for (i = 0; i < port->n_buffers; i++) { + struct buffer *b = &port->buffers[i]; + struct mem *m; + + spa_log_debug(this->log, "node %p: clear buffer %d", this, i); + + for (j = 0; j < b->buffer.n_datas; j++) { + struct spa_data *d = &b->datas[j]; + + if (d->type == SPA_DATA_DmaBuf || + d->type == SPA_DATA_MemFd) { + uint32_t id; + + id = SPA_PTR_TO_UINT32(b->buffer.datas[j].data); + m = pw_array_get_unchecked(&impl->mems, id, struct mem); + m->ref--; + } + } + m = pw_array_get_unchecked(&impl->mems, b->memid, struct mem); + m->ref--; + } + port->n_buffers = 0; + return 0; +} + +static void emit_port_info(struct node *this, struct port *port) +{ + spa_node_emit_port_info(&this->hooks, + port->direction, port->id, &port->info); +} + +static int impl_node_add_listener(void *object, + struct spa_hook *listener, + const struct spa_node_events *events, + void *data) +{ + struct node *this = object; + struct spa_hook_list save; + uint32_t i; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + spa_hook_list_isolate(&this->hooks, &save, listener, events, data); + + for (i = 0; i < MAX_INPUTS; i++) { + if (this->in_ports[i].valid) + emit_port_info(this, &this->in_ports[i]); + } + for (i = 0; i < MAX_OUTPUTS; i++) { + if (this->out_ports[i].valid) + emit_port_info(this, &this->out_ports[i]); + } + spa_hook_list_join(&this->hooks, &save); + + return 0; +} + +static int impl_node_enum_params(void *object, int seq, + uint32_t id, uint32_t start, uint32_t num, + const struct spa_pod *filter) +{ + struct node *this = object; + uint8_t buffer[1024]; + struct spa_pod_builder b = { 0 }; + struct spa_result_node_params result; + uint32_t count = 0; + bool found = false; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(num != 0, -EINVAL); + + result.id = id; + result.next = 0; + + while (true) { + struct spa_pod *param; + + result.index = result.next++; + if (result.index >= this->n_params) + break; + + param = this->params[result.index]; + + if (param == NULL || !spa_pod_is_object_id(param, id)) + continue; + + found = true; + + if (result.index < start) + continue; + + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + if (spa_pod_filter(&b, &result.param, param, filter) != 0) + continue; + + pw_log_debug("%p: %d param %u", this, seq, result.index); + spa_node_emit_result(&this->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result); + + if (++count == num) + break; + } + return found ? 0 : -ENOENT; +} + +static int impl_node_set_param(void *object, uint32_t id, uint32_t flags, + const struct spa_pod *param) +{ + struct node *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + if (this->resource == NULL) + return -EIO; + + pw_client_node0_resource_set_param(this->resource, this->seq, id, flags, param); + + return SPA_RESULT_RETURN_ASYNC(this->seq++); +} + +static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size) +{ + struct node *this = object; + int res = 0; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + switch(id) { + case SPA_IO_Position: + this->position = data; + break; + default: + res = -ENOTSUP; + break; + } + return res; +} + +static inline void do_flush(struct node *this) +{ + if (spa_system_eventfd_write(this->data_system, this->writefd, 1) < 0) + spa_log_warn(this->log, "node %p: error flushing : %s", this, strerror(errno)); + +} + +static int send_clock_update(struct node *this) +{ + struct pw_impl_client *client = this->resource->client; + uint32_t type = pw_protocol_native0_name_to_v2(client, SPA_TYPE_INFO_NODE_COMMAND_BASE "ClockUpdate"); + struct timespec ts; + int64_t now; + + clock_gettime(CLOCK_MONOTONIC, &ts); + now = SPA_TIMESPEC_TO_NSEC(&ts); + pw_log_trace("%p: now %"PRIi64, this, now); + + struct spa_command_node0_clock_update cu = + SPA_COMMAND_NODE0_CLOCK_UPDATE_INIT(type, + SPA_COMMAND_NODE0_CLOCK_UPDATE_TIME | + SPA_COMMAND_NODE0_CLOCK_UPDATE_SCALE | + SPA_COMMAND_NODE0_CLOCK_UPDATE_STATE | + SPA_COMMAND_NODE0_CLOCK_UPDATE_LATENCY, /* change_mask */ + SPA_USEC_PER_SEC, /* rate */ + now / SPA_NSEC_PER_USEC, /* ticks */ + now, /* monotonic_time */ + 0, /* offset */ + (1 << 16) | 1, /* scale */ + SPA_CLOCK0_STATE_RUNNING, /* state */ + SPA_COMMAND_NODE0_CLOCK_UPDATE_FLAG_LIVE, /* flags */ + 0); /* latency */ + + pw_client_node0_resource_command(this->resource, this->seq, (const struct spa_command*)&cu); + return SPA_RESULT_RETURN_ASYNC(this->seq++); +} + +static int impl_node_send_command(void *object, const struct spa_command *command) +{ + struct node *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(command != NULL, -EINVAL); + + if (this->resource == NULL) + return -EIO; + + if (SPA_NODE_COMMAND_ID(command) == SPA_NODE_COMMAND_Start) { + send_clock_update(this); + } + + pw_client_node0_resource_command(this->resource, this->seq, command); + return SPA_RESULT_RETURN_ASYNC(this->seq++); +} + +static int +impl_node_set_callbacks(void *object, + const struct spa_node_callbacks *callbacks, + void *data) +{ + struct node *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + this->callbacks = SPA_CALLBACKS_INIT(callbacks, data); + + return 0; +} + +static int +impl_node_sync(void *object, int seq) +{ + struct node *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + pw_log_debug("%p: sync %p", this, this->resource); + + if (this->resource == NULL) + return -EIO; + + this->init_pending = SPA_RESULT_RETURN_ASYNC(this->seq++); + + return this->init_pending; +} + + +extern struct spa_pod *pw_protocol_native0_pod_from_v2(struct pw_impl_client *client, const struct spa_pod *pod); +extern int pw_protocol_native0_pod_to_v2(struct pw_impl_client *client, const struct spa_pod *pod, + struct spa_pod_builder *b); + +static void +do_update_port(struct node *this, + enum spa_direction direction, + uint32_t port_id, + uint32_t change_mask, + uint32_t n_params, + const struct spa_pod **params, + const struct spa_port_info *info) +{ + struct port *port; + + port = GET_PORT(this, direction, port_id); + + if (!port->valid) { + spa_log_debug(this->log, "node %p: adding port %d, direction %d", + this, port_id, direction); + port->id = port_id; + port->direction = direction; + port->have_format = false; + port->valid = true; + + if (direction == SPA_DIRECTION_INPUT) + this->n_inputs++; + else + this->n_outputs++; + } + + if (change_mask & PW_CLIENT_NODE0_PORT_UPDATE_PARAMS) { + uint32_t i; + + port->have_format = false; + + spa_log_debug(this->log, "node %p: port %u update %d params", this, port_id, n_params); + for (i = 0; i < port->n_params; i++) + free(port->params[i]); + port->n_params = n_params; + if (port->n_params == 0) { + free(port->params); + port->params = NULL; + } else { + void *p; + p = pw_reallocarray(port->params, port->n_params, sizeof(struct spa_pod *)); + if (p == NULL) { + pw_log_error("%p: port %u can't realloc: %m", this, port_id); + free(port->params); + port->n_params = 0; + } + port->params = p; + } + for (i = 0; i < port->n_params; i++) { + port->params[i] = params[i] ? + pw_protocol_native0_pod_from_v2(this->resource->client, params[i]) : NULL; + + if (port->params[i] && spa_pod_is_object_id(port->params[i], SPA_PARAM_Format)) + port->have_format = true; + } + } + + if (change_mask & PW_CLIENT_NODE0_PORT_UPDATE_INFO) { + pw_properties_free(port->properties); + port->properties = NULL; + port->info.props = NULL; + port->info.n_params = 0; + port->info.params = NULL; + + if (info) { + port->info = *info; + if (info->props) { + port->properties = pw_properties_new_dict(info->props); + port->info.props = &port->properties->dict; + } + } + spa_node_emit_port_info(&this->hooks, direction, port_id, info); + } +} + +static void +clear_port(struct node *this, + struct port *port, enum spa_direction direction, uint32_t port_id) +{ + do_update_port(this, + direction, + port_id, + PW_CLIENT_NODE0_PORT_UPDATE_PARAMS | + PW_CLIENT_NODE0_PORT_UPDATE_INFO, 0, NULL, NULL); + clear_buffers(this, port); +} + +static void do_uninit_port(struct node *this, enum spa_direction direction, uint32_t port_id) +{ + struct port *port; + + spa_log_debug(this->log, "node %p: removing port %d", this, port_id); + + if (direction == SPA_DIRECTION_INPUT) { + port = GET_IN_PORT(this, port_id); + this->n_inputs--; + } else { + port = GET_OUT_PORT(this, port_id); + this->n_outputs--; + } + clear_port(this, port, direction, port_id); + port->valid = false; + spa_node_emit_port_info(&this->hooks, direction, port_id, NULL); +} + +static int +impl_node_add_port(void *object, enum spa_direction direction, uint32_t port_id, + const struct spa_dict *props) +{ + struct node *this = object; + struct port *port; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(CHECK_FREE_PORT(this, direction, port_id), -EINVAL); + + port = GET_PORT(this, direction, port_id); + clear_port(this, port, direction, port_id); + + return 0; +} + +static int +impl_node_remove_port(void *object, enum spa_direction direction, uint32_t port_id) +{ + struct node *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); + + do_uninit_port(this, direction, port_id); + + return 0; +} + +static int +impl_node_port_enum_params(void *object, int seq, + enum spa_direction direction, uint32_t port_id, + uint32_t id, uint32_t start, uint32_t num, + const struct spa_pod *filter) +{ + struct node *this = object; + struct port *port; + uint8_t buffer[1024]; + struct spa_pod_builder b = { 0 }; + struct spa_result_node_params result; + uint32_t count = 0; + bool found = false; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(num != 0, -EINVAL); + spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); + + port = GET_PORT(this, direction, port_id); + + pw_log_debug("%p: %d port %d.%d %u %u %u", this, seq, + direction, port_id, id, start, num); + + result.id = id; + result.next = 0; + + while (true) { + struct spa_pod *param; + + result.index = result.next++; + if (result.index >= port->n_params) + break; + + param = port->params[result.index]; + + if (param == NULL || !spa_pod_is_object_id(param, id)) + continue; + + found = true; + + if (result.index < start) + continue; + + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + if (spa_pod_filter(&b, &result.param, param, filter) < 0) + continue; + + pw_log_debug("%p: %d param %u", this, seq, result.index); + spa_node_emit_result(&this->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result); + + if (++count == num) + break; + } + return found ? 0 : -ENOENT; +} + +static int +impl_node_port_set_param(void *object, + enum spa_direction direction, uint32_t port_id, + uint32_t id, uint32_t flags, + const struct spa_pod *param) +{ + struct node *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); + + if (this->resource == NULL) + return -EIO; + + pw_client_node0_resource_port_set_param(this->resource, + this->seq, + direction, port_id, + id, flags, + param); + return SPA_RESULT_RETURN_ASYNC(this->seq++); +} + +static int +impl_node_port_set_io(void *object, + enum spa_direction direction, + uint32_t port_id, + uint32_t id, + void *data, size_t size) +{ + struct node *this = object; + struct impl *impl; + struct pw_memblock *mem; + struct mem *m; + uint32_t memid, mem_offset, mem_size; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); + + impl = this->impl; + + spa_log_debug(this->log, "node %p: port %d.%d set io %d %p", this, + direction, port_id, id, data); + + if (id == SPA_IO_Buffers) { + struct port *port = GET_PORT(this, direction, port_id); + port->io = data; + } + + if (this->resource == NULL) + return -EIO; + + + if (data) { + if ((mem = pw_mempool_find_ptr(impl->context->pool, data)) == NULL) + return -EINVAL; + + mem_offset = SPA_PTRDIFF(data, mem->map->ptr); + mem_size = mem->size; + if (mem_size - mem_offset < size) + return -EINVAL; + + mem_offset += mem->map->offset; + m = ensure_mem(impl, mem->fd, SPA_DATA_MemFd, mem->flags); + memid = m->id; + } + else { + memid = SPA_ID_INVALID; + mem_offset = mem_size = 0; + } + + pw_client_node0_resource_port_set_io(this->resource, + this->seq, + direction, port_id, + id, + memid, + mem_offset, mem_size); + return SPA_RESULT_RETURN_ASYNC(this->seq++); +} + +static int +impl_node_port_use_buffers(void *object, + enum spa_direction direction, + uint32_t port_id, + uint32_t flags, + struct spa_buffer **buffers, + uint32_t n_buffers) +{ + struct node *this = object; + struct impl *impl; + struct port *port; + uint32_t i, j; + struct pw_client_node0_buffer *mb; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); + + impl = this->impl; + spa_log_debug(this->log, "node %p: use buffers %p %u", this, buffers, n_buffers); + + port = GET_PORT(this, direction, port_id); + + if (!port->have_format) + return -EIO; + + clear_buffers(this, port); + + if (n_buffers > 0) { + mb = alloca(n_buffers * sizeof(struct pw_client_node0_buffer)); + } else { + mb = NULL; + } + + port->n_buffers = n_buffers; + + if (this->resource == NULL) + return -EIO; + + for (i = 0; i < n_buffers; i++) { + struct buffer *b = &port->buffers[i]; + struct pw_memblock *mem; + struct mem *m; + size_t data_size; + void *baseptr; + + b->outbuf = buffers[i]; + memcpy(&b->buffer, buffers[i], sizeof(struct spa_buffer)); + b->buffer.datas = b->datas; + b->buffer.metas = b->metas; + + if (buffers[i]->n_metas > 0) + baseptr = buffers[i]->metas[0].data; + else if (buffers[i]->n_datas > 0) + baseptr = buffers[i]->datas[0].chunk; + else + return -EINVAL; + + if ((mem = pw_mempool_find_ptr(impl->context->pool, baseptr)) == NULL) + return -EINVAL; + + data_size = 0; + for (j = 0; j < buffers[i]->n_metas; j++) { + data_size += buffers[i]->metas[j].size; + } + for (j = 0; j < buffers[i]->n_datas; j++) { + struct spa_data *d = buffers[i]->datas; + data_size += sizeof(struct spa_chunk); + if (d->type == SPA_DATA_MemPtr) + data_size += d->maxsize; + } + + m = ensure_mem(impl, mem->fd, SPA_DATA_MemFd, mem->flags); + b->memid = m->id; + + mb[i].buffer = &b->buffer; + mb[i].mem_id = b->memid; + mb[i].offset = SPA_PTRDIFF(baseptr, SPA_PTROFF(mem->map->ptr, mem->map->offset, void)); + mb[i].size = data_size; + + for (j = 0; j < buffers[i]->n_metas; j++) + memcpy(&b->buffer.metas[j], &buffers[i]->metas[j], sizeof(struct spa_meta)); + b->buffer.n_metas = j; + + for (j = 0; j < buffers[i]->n_datas; j++) { + struct spa_data *d = &buffers[i]->datas[j]; + + memcpy(&b->buffer.datas[j], d, sizeof(struct spa_data)); + + if (d->type == SPA_DATA_DmaBuf || + d->type == SPA_DATA_MemFd) { + m = ensure_mem(impl, d->fd, d->type, d->flags); + b->buffer.datas[j].data = SPA_UINT32_TO_PTR(m->id); + } else if (d->type == SPA_DATA_MemPtr) { + b->buffer.datas[j].data = SPA_INT_TO_PTR(SPA_PTRDIFF(d->data, baseptr)); + } else { + b->buffer.datas[j].type = SPA_ID_INVALID; + b->buffer.datas[j].data = 0; + spa_log_error(this->log, "invalid memory type %d", d->type); + } + } + } + + pw_client_node0_resource_port_use_buffers(this->resource, + this->seq, + direction, port_id, + n_buffers, mb); + + return SPA_RESULT_RETURN_ASYNC(this->seq++); +} + +static int +impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id) +{ + struct node *this = object; + struct impl *impl; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(CHECK_OUT_PORT(this, SPA_DIRECTION_OUTPUT, port_id), -EINVAL); + + impl = this->impl; + + spa_log_trace(this->log, "reuse buffer %d", buffer_id); + + pw_client_node0_transport_add_message(impl->transport, (struct pw_client_node0_message *) + &PW_CLIENT_NODE0_MESSAGE_PORT_REUSE_BUFFER_INIT(port_id, buffer_id)); + do_flush(this); + + return 0; +} + +static int impl_node_process_input(struct spa_node *node) +{ + struct node *this = SPA_CONTAINER_OF(node, struct node, node); + struct impl *impl = this->impl; +// bool client_reuse = impl->client_reuse; + uint32_t i; + int res; + + if (impl->input_ready == 0) { + /* the client is not ready to receive our buffers, recycle them */ + pw_log_trace("node not ready, recycle buffers"); + for (i = 0; i < MAX_INPUTS; i++) { + struct port *p = &this->in_ports[i]; + struct spa_io_buffers *io = p->io; + + if (!p->valid || io == NULL) + continue; + + io->status = SPA_STATUS_NEED_DATA; + } + res = SPA_STATUS_NEED_DATA; + } + else { + for (i = 0; i < MAX_INPUTS; i++) { + struct port *p = &this->in_ports[i]; + struct spa_io_buffers *io = p->io; + + if (!p->valid || io == NULL) + continue; + + pw_log_trace("set io status to %d %d", io->status, io->buffer_id); + impl->transport->inputs[p->id] = *io; + + /* explicitly recycle buffers when the client is not going to do it */ +// if (!client_reuse && (pp = p->peer)) +// spa_node_port_reuse_buffer(pp->node->implementation, +// pp->port_id, io->buffer_id); + } + pw_client_node0_transport_add_message(impl->transport, + &PW_CLIENT_NODE0_MESSAGE_INIT(PW_CLIENT_NODE0_MESSAGE_PROCESS_INPUT)); + do_flush(this); + + impl->input_ready--; + res = SPA_STATUS_OK; + } + return res; +} + +#if 0 +/** this is used for clients providing data to pipewire and currently + * not supported in the compat layer */ +static int impl_node_process_output(struct spa_node *node) +{ + struct node *this; + struct impl *impl; + uint32_t i; + + this = SPA_CONTAINER_OF(node, struct node, node); + impl = this->impl; + + if (impl->out_pending) + goto done; + + impl->out_pending = true; + + for (i = 0; i < MAX_OUTPUTS; i++) { + struct port *p = &this->out_ports[i]; + struct spa_io_buffers *io = p->io; + + if (!p->valid || io == NULL) + continue; + + impl->transport->outputs[p->id] = *io; + + pw_log_trace("%d %d -> %d %d", io->status, io->buffer_id, + impl->transport->outputs[p->id].status, + impl->transport->outputs[p->id].buffer_id); + } + + done: + pw_client_node0_transport_add_message(impl->transport, + &PW_CLIENT_NODE0_MESSAGE_INIT(PW_CLIENT_NODE0_MESSAGE_PROCESS_OUTPUT)); + do_flush(this); + + return SPA_STATUS_OK; +} +#endif + +static int impl_node_process(void *object) +{ + struct node *this = object; + struct impl *impl = this->impl; + struct pw_impl_node *n = impl->this.node; + + return impl_node_process_input(n->node); +} + +static int handle_node_message(struct node *this, struct pw_client_node0_message *message) +{ + struct impl *impl = SPA_CONTAINER_OF(this, struct impl, node); + uint32_t i; + + switch (PW_CLIENT_NODE0_MESSAGE_TYPE(message)) { + case PW_CLIENT_NODE0_MESSAGE_HAVE_OUTPUT: + for (i = 0; i < MAX_OUTPUTS; i++) { + struct port *p = &this->out_ports[i]; + struct spa_io_buffers *io = p->io; + if (!p->valid || io == NULL) + continue; + *io = impl->transport->outputs[p->id]; + pw_log_trace("have output %d %d", io->status, io->buffer_id); + } + impl->out_pending = false; + spa_node_call_ready(&this->callbacks, SPA_STATUS_HAVE_DATA); + break; + + case PW_CLIENT_NODE0_MESSAGE_NEED_INPUT: + for (i = 0; i < MAX_INPUTS; i++) { + struct port *p = &this->in_ports[i]; + struct spa_io_buffers *io = p->io; + if (!p->valid || io == NULL) + continue; + pw_log_trace("need input %d %d", i, p->id); + *io = impl->transport->inputs[p->id]; + pw_log_trace("need input %d %d", io->status, io->buffer_id); + } + impl->input_ready++; + spa_node_call_ready(&this->callbacks, SPA_STATUS_NEED_DATA); + break; + + case PW_CLIENT_NODE0_MESSAGE_PORT_REUSE_BUFFER: + if (impl->client_reuse) { + struct pw_client_node0_message_port_reuse_buffer *p = + (struct pw_client_node0_message_port_reuse_buffer *) message; + spa_node_call_reuse_buffer(&this->callbacks, p->body.port_id.value, + p->body.buffer_id.value); + } + break; + + default: + pw_log_warn("unhandled message %d", PW_CLIENT_NODE0_MESSAGE_TYPE(message)); + return -ENOTSUP; + } + return 0; +} + +static void setup_transport(struct impl *impl) +{ + struct node *this = &impl->node; + uint32_t max_inputs = 0, max_outputs = 0, n_inputs = 0, n_outputs = 0; + struct spa_dict_item items[1]; + + n_inputs = this->n_inputs; + max_inputs = this->info.max_input_ports == 0 ? this->n_inputs : this->info.max_input_ports; + n_outputs = this->n_outputs; + max_outputs = this->info.max_output_ports == 0 ? this->n_outputs : this->info.max_output_ports; + + impl->transport = pw_client_node0_transport_new(impl->context, max_inputs, max_outputs); + impl->transport->area->n_input_ports = n_inputs; + impl->transport->area->n_output_ports = n_outputs; + + if (n_inputs > 0) { + items[0] = SPA_DICT_ITEM_INIT(SPA_KEY_MEDIA_CLASS, "Stream/Input/Video"); + } else { + items[0] = SPA_DICT_ITEM_INIT(SPA_KEY_MEDIA_CLASS, "Stream/Output/Video"); + } + pw_impl_node_update_properties(impl->this.node, &SPA_DICT_INIT(items, 1)); +} + +static void +client_node0_done(void *data, int seq, int res) +{ + struct impl *impl = data; + struct node *this = &impl->node; + + if (seq == 0 && res == 0 && impl->transport == NULL) + setup_transport(impl); + + pw_log_debug("seq:%d res:%d pending:%d", seq, res, this->init_pending); + spa_node_emit_result(&this->hooks, seq, res, 0, NULL); + + if (this->init_pending != SPA_ID_INVALID) { + spa_node_emit_result(&this->hooks, this->init_pending, res, 0, NULL); + this->init_pending = SPA_ID_INVALID; + } +} + +static void +client_node0_update(void *data, + uint32_t change_mask, + uint32_t max_input_ports, + uint32_t max_output_ports, + uint32_t n_params, + const struct spa_pod **params) +{ + struct impl *impl = data; + struct node *this = &impl->node; + + if (change_mask & PW_CLIENT_NODE0_UPDATE_MAX_INPUTS) + this->info.max_input_ports = max_input_ports; + if (change_mask & PW_CLIENT_NODE0_UPDATE_MAX_OUTPUTS) + this->info.max_output_ports = max_output_ports; + if (change_mask & PW_CLIENT_NODE0_UPDATE_PARAMS) { + uint32_t i; + spa_log_debug(this->log, "node %p: update %d params", this, n_params); + + for (i = 0; i < this->n_params; i++) + free(this->params[i]); + this->n_params = n_params; + if (this->n_params == 0) { + free(this->params); + this->params = NULL; + } else { + void *p; + p = pw_reallocarray(this->params, this->n_params, sizeof(struct spa_pod *)); + if (p == NULL) { + pw_log_error("%p: can't realloc: %m", this); + free(this->params); + this->n_params = 0; + } + this->params = p; + } + for (i = 0; i < this->n_params; i++) + this->params[i] = params[i] ? spa_pod_copy(params[i]) : NULL; + } + if (change_mask & (PW_CLIENT_NODE0_UPDATE_MAX_INPUTS | PW_CLIENT_NODE0_UPDATE_MAX_OUTPUTS)) { + spa_node_emit_info(&this->hooks, &this->info); + } + + spa_log_debug(this->log, "node %p: got node update max_in %u, max_out %u", this, + this->info.max_input_ports, this->info.max_output_ports); +} + +static void +client_node0_port_update(void *data, + enum spa_direction direction, + uint32_t port_id, + uint32_t change_mask, + uint32_t n_params, + const struct spa_pod **params, + const struct spa_port_info *info) +{ + struct impl *impl = data; + struct node *this = &impl->node; + bool remove; + + spa_log_debug(this->log, "node %p: got port update", this); + if (!CHECK_PORT_ID(this, direction, port_id)) + return; + + remove = (change_mask == 0); + + if (remove) { + do_uninit_port(this, direction, port_id); + } else { + do_update_port(this, + direction, + port_id, + change_mask, + n_params, params, info); + } +} + +static void client_node0_set_active(void *data, bool active) +{ + struct impl *impl = data; + pw_impl_node_set_active(impl->this.node, active); +} + +static void client_node0_event(void *data, struct spa_event *event) +{ + struct impl *impl = data; + struct node *this = &impl->node; + + switch (SPA_EVENT_TYPE(event)) { + case SPA_NODE0_EVENT_RequestClockUpdate: + send_clock_update(this); + break; + default: + spa_node_emit_event(&this->hooks, event); + } +} + +static const struct pw_client_node0_methods client_node0_methods = { + PW_VERSION_CLIENT_NODE0_METHODS, + .done = client_node0_done, + .update = client_node0_update, + .port_update = client_node0_port_update, + .set_active = client_node0_set_active, + .event = client_node0_event, +}; + +static void node_on_data_fd_events(struct spa_source *source) +{ + struct node *this = source->data; + struct impl *impl = this->impl; + + if (source->rmask & (SPA_IO_ERR | SPA_IO_HUP)) { + spa_log_warn(this->log, "node %p: got error", this); + return; + } + + if (source->rmask & SPA_IO_IN) { + struct pw_client_node0_message message; + uint64_t cmd; + + if (spa_system_eventfd_read(this->data_system, this->data_source.fd, &cmd) < 0) + spa_log_warn(this->log, "node %p: error reading message: %s", + this, strerror(errno)); + + while (pw_client_node0_transport_next_message(impl->transport, &message) == 1) { + struct pw_client_node0_message *msg = alloca(SPA_POD_SIZE(&message)); + pw_client_node0_transport_parse_message(impl->transport, msg); + handle_node_message(this, msg); + } + } +} + +static const struct spa_node_methods impl_node = { + SPA_VERSION_NODE_METHODS, + .add_listener = impl_node_add_listener, + .set_callbacks = impl_node_set_callbacks, + .sync = impl_node_sync, + .enum_params = impl_node_enum_params, + .set_param = impl_node_set_param, + .set_io = impl_node_set_io, + .send_command = impl_node_send_command, + .add_port = impl_node_add_port, + .remove_port = impl_node_remove_port, + .port_enum_params = impl_node_port_enum_params, + .port_set_param = impl_node_port_set_param, + .port_use_buffers = impl_node_port_use_buffers, + .port_set_io = impl_node_port_set_io, + .port_reuse_buffer = impl_node_port_reuse_buffer, + .process = impl_node_process, +}; + +static int +node_init(struct node *this, + struct spa_dict *info, + const struct spa_support *support, + uint32_t n_support) +{ + this->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log); + this->data_loop = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DataLoop); + this->data_system = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DataSystem); + + if (this->data_loop == NULL) { + spa_log_error(this->log, "a data-loop is needed"); + return -EINVAL; + } + + this->node.iface = SPA_INTERFACE_INIT( + SPA_TYPE_INTERFACE_Node, + SPA_VERSION_NODE, + &impl_node, this); + spa_hook_list_init(&this->hooks); + + this->data_source.func = node_on_data_fd_events; + this->data_source.data = this; + this->data_source.fd = -1; + this->data_source.mask = SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP; + this->data_source.rmask = 0; + + this->seq = 1; + this->init_pending = SPA_ID_INVALID; + + return SPA_RESULT_RETURN_ASYNC(this->seq++); +} + +static int node_clear(struct node *this) +{ + uint32_t i; + + for (i = 0; i < MAX_INPUTS; i++) { + if (this->in_ports[i].valid) + clear_port(this, &this->in_ports[i], SPA_DIRECTION_INPUT, i); + } + for (i = 0; i < MAX_OUTPUTS; i++) { + if (this->out_ports[i].valid) + clear_port(this, &this->out_ports[i], SPA_DIRECTION_OUTPUT, i); + } + + return 0; +} + +static int do_remove_source(struct spa_loop *loop, + bool async, + uint32_t seq, + const void *data, + size_t size, + void *user_data) +{ + struct spa_source *source = user_data; + spa_loop_remove_source(loop, source); + return 0; +} + +static void client_node0_resource_destroy(void *data) +{ + struct impl *impl = data; + struct pw_impl_client_node0 *this = &impl->this; + struct node *node = &impl->node; + + pw_log_debug("client-node %p: destroy", impl); + + impl->node.resource = this->resource = NULL; + spa_hook_remove(&impl->resource_listener); + spa_hook_remove(&impl->object_listener); + + if (node->data_source.fd != -1) { + spa_loop_invoke(node->data_loop, + do_remove_source, + SPA_ID_INVALID, + NULL, + 0, + true, + &node->data_source); + } + if (this->node) + pw_impl_node_destroy(this->node); +} + +static void node_initialized(void *data) +{ + struct impl *impl = data; + struct pw_impl_client_node0 *this = &impl->this; + struct pw_impl_node *node = this->node; + struct spa_system *data_system = impl->node.data_system; + + if (this->resource == NULL) + return; + + impl->fds[0] = spa_system_eventfd_create(data_system, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK); + impl->fds[1] = spa_system_eventfd_create(data_system, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK); + impl->node.data_source.fd = impl->fds[0]; + impl->node.writefd = impl->fds[1]; + impl->other_fds[0] = impl->fds[1]; + impl->other_fds[1] = impl->fds[0]; + + spa_loop_add_source(impl->node.data_loop, &impl->node.data_source); + pw_log_debug("client-node %p: transport fd %d %d", node, impl->fds[0], impl->fds[1]); + + pw_client_node0_resource_transport(this->resource, + pw_global_get_id(pw_impl_node_get_global(node)), + impl->other_fds[0], + impl->other_fds[1], + impl->transport); +} + +static void node_free(void *data) +{ + struct impl *impl = data; + struct pw_impl_client_node0 *this = &impl->this; + struct spa_system *data_system = impl->node.data_system; + + this->node = NULL; + + pw_log_debug("client-node %p: free", &impl->this); + node_clear(&impl->node); + + if (impl->transport) + pw_client_node0_transport_destroy(impl->transport); + + spa_hook_remove(&impl->node_listener); + + if (this->resource) + pw_resource_destroy(this->resource); + + pw_array_clear(&impl->mems); + + if (impl->fds[0] != -1) + spa_system_close(data_system, impl->fds[0]); + if (impl->fds[1] != -1) + spa_system_close(data_system, impl->fds[1]); + free(impl); +} + +static const struct pw_impl_node_events node_events = { + PW_VERSION_IMPL_NODE_EVENTS, + .free = node_free, + .initialized = node_initialized, +}; + +static const struct pw_resource_events resource_events = { + PW_VERSION_RESOURCE_EVENTS, + .destroy = client_node0_resource_destroy, +}; + +static void convert_properties(struct pw_properties *properties) +{ + static const struct { + const char *from, *to; + } props[] = { + { "pipewire.autoconnect", PW_KEY_NODE_AUTOCONNECT, }, + /* XXX deprecated */ + { "pipewire.target.node", PW_KEY_NODE_TARGET, } + }; + + const char *str; + + SPA_FOR_EACH_ELEMENT_VAR(props, p) { + if ((str = pw_properties_get(properties, p->from)) != NULL) { + pw_properties_set(properties, p->to, str); + pw_properties_set(properties, p->from, NULL); + } + } +} + +/** Create a new client node + * \param client an owner \ref pw_client + * \param id an id + * \param name a name + * \param properties extra properties + * \return a newly allocated client node + * + * Create a new \ref pw_impl_node. + * + * \memberof pw_impl_client_node + */ +struct pw_impl_client_node0 *pw_impl_client_node0_new(struct pw_resource *resource, + struct pw_properties *properties) +{ + struct impl *impl; + struct pw_impl_client_node0 *this; + struct pw_impl_client *client = pw_resource_get_client(resource); + struct pw_context *context = pw_impl_client_get_context(client); + const struct spa_support *support; + uint32_t n_support; + const char *name; + int res; + + impl = calloc(1, sizeof(struct impl)); + if (impl == NULL) + return NULL; + + this = &impl->this; + + if (properties == NULL) + properties = pw_properties_new(NULL, NULL); + if (properties == NULL) { + res = -errno; + goto error_exit_free; + } + convert_properties(properties); + + pw_properties_setf(properties, PW_KEY_CLIENT_ID, "%d", client->global->id); + + impl->context = context; + impl->fds[0] = impl->fds[1] = -1; + pw_log_debug("client-node %p: new", impl); + + support = pw_context_get_support(impl->context, &n_support); + + node_init(&impl->node, NULL, support, n_support); + impl->node.impl = impl; + + pw_array_init(&impl->mems, 64); + + if ((name = pw_properties_get(properties, "node.name")) == NULL) + name = "client-node"; + pw_properties_set(properties, PW_KEY_MEDIA_TYPE, "Video"); + + impl->node.resource = resource; + this->resource = resource; + this->node = pw_spa_node_new(context, + PW_SPA_NODE_FLAG_ASYNC, + &impl->node.node, + NULL, + properties, 0); + if (this->node == NULL) { + res = -errno; + goto error_no_node; + } + + impl->client_reuse = pw_properties_get_bool(properties, "pipewire.client.reuse", false); + + pw_resource_add_listener(this->resource, + &impl->resource_listener, + &resource_events, + impl); + pw_resource_add_object_listener(this->resource, + &impl->object_listener, + &client_node0_methods, + impl); + + + pw_impl_node_add_listener(this->node, &impl->node_listener, &node_events, impl); + + return this; + +error_no_node: + pw_resource_destroy(this->resource); + node_clear(&impl->node); +error_exit_free: + free(impl); + errno = -res; + return NULL; +} + +/** Destroy a client node + * \param node the client node to destroy + * \memberof pw_impl_client_node + */ +void pw_impl_client_node0_destroy(struct pw_impl_client_node0 *node) +{ + pw_resource_destroy(node->resource); +} diff --git a/src/modules/module-client-node/v0/client-node.h b/src/modules/module-client-node/v0/client-node.h new file mode 100644 index 0000000..04b77d4 --- /dev/null +++ b/src/modules/module-client-node/v0/client-node.h @@ -0,0 +1,101 @@ +/* PipeWire + * + * Copyright © 2015 Wim Taymans <wim.taymans@gmail.com> + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef PIPEWIRE_CLIENT_NODE0_H +#define PIPEWIRE_CLIENT_NODE0_H + +#include <pipewire/impl.h> + +#include "ext-client-node.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/** The state of the clock */ +enum spa_clock0_state { + SPA_CLOCK0_STATE_STOPPED, /*< the clock is stopped */ + SPA_CLOCK0_STATE_PAUSED, /*< the clock is paused */ + SPA_CLOCK0_STATE_RUNNING, /*< the clock is running */ +}; + +struct spa_command_node0_clock_update_body { + struct spa_pod_object_body body; +#define SPA_COMMAND_NODE0_CLOCK_UPDATE_TIME (1 << 0) +#define SPA_COMMAND_NODE0_CLOCK_UPDATE_SCALE (1 << 1) +#define SPA_COMMAND_NODE0_CLOCK_UPDATE_STATE (1 << 2) +#define SPA_COMMAND_NODE0_CLOCK_UPDATE_LATENCY (1 << 3) + struct spa_pod_int change_mask SPA_ALIGNED(8); + struct spa_pod_int rate SPA_ALIGNED(8); + struct spa_pod_long ticks SPA_ALIGNED(8); + struct spa_pod_long monotonic_time SPA_ALIGNED(8); + struct spa_pod_long offset SPA_ALIGNED(8); + struct spa_pod_int scale SPA_ALIGNED(8); + struct spa_pod_int state SPA_ALIGNED(8); +#define SPA_COMMAND_NODE0_CLOCK_UPDATE_FLAG_LIVE (1 << 0) + struct spa_pod_int flags SPA_ALIGNED(8); + struct spa_pod_long latency SPA_ALIGNED(8); +}; + +struct spa_command_node0_clock_update { + struct spa_pod pod; + struct spa_command_node0_clock_update_body body; +}; + +#define SPA_COMMAND_NODE0_CLOCK_UPDATE_INIT(type,change_mask,rate,ticks,monotonic_time,offset,scale,state,flags,latency) \ + SPA_COMMAND_INIT_FULL(struct spa_command_node0_clock_update, \ + sizeof(struct spa_command_node0_clock_update_body), 0, type, \ + SPA_POD_INIT_Int(change_mask), \ + SPA_POD_INIT_Int(rate), \ + SPA_POD_INIT_Long(ticks), \ + SPA_POD_INIT_Long(monotonic_time), \ + SPA_POD_INIT_Long(offset), \ + SPA_POD_INIT_Int(scale), \ + SPA_POD_INIT_Int(state), \ + SPA_POD_INIT_Int(flags), \ + SPA_POD_INIT_Long(latency)) + + +/** \class pw_impl_client_node0 + * + * PipeWire client node interface + */ +struct pw_impl_client_node0 { + struct pw_impl_node *node; + + struct pw_resource *resource; +}; + +struct pw_impl_client_node0 * +pw_impl_client_node0_new(struct pw_resource *resource, + struct pw_properties *properties); + +void +pw_impl_client_node0_destroy(struct pw_impl_client_node0 *node); + +#ifdef __cplusplus +} +#endif + +#endif /* PIPEWIRE_CLIENT_NODE0_H */ diff --git a/src/modules/module-client-node/v0/ext-client-node.h b/src/modules/module-client-node/v0/ext-client-node.h new file mode 100644 index 0000000..21ba597 --- /dev/null +++ b/src/modules/module-client-node/v0/ext-client-node.h @@ -0,0 +1,414 @@ +/* PipeWire + * + * Copyright © 2016 Wim Taymans <wim.taymans@gmail.com> + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef __PIPEWIRE_EXT_CLIENT_NODE0_H__ +#define __PIPEWIRE_EXT_CLIENT_NODE0_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <spa/utils/defs.h> +#include <spa/param/param.h> +#include <spa/node/node.h> + +#include <pipewire/proxy.h> + +#define PW_TYPE_INTERFACE_ClientNode PW_TYPE_INFO_INTERFACE_BASE "ClientNode" + +#define PW_VERSION_CLIENT_NODE0 0 + +struct pw_client_node0_message; + +/** Shared structure between client and server \memberof pw_client_node */ +struct pw_client_node0_area { + uint32_t max_input_ports; /**< max input ports of the node */ + uint32_t n_input_ports; /**< number of input ports of the node */ + uint32_t max_output_ports; /**< max output ports of the node */ + uint32_t n_output_ports; /**< number of output ports of the node */ +}; + +/** \class pw_client_node0_transport + * + * \brief Transport object + * + * The transport object contains shared data and ringbuffers to exchange + * events and data between the server and the client in a low-latency and + * lockfree way. + */ +struct pw_client_node0_transport { + struct pw_client_node0_area *area; /**< the transport area */ + struct spa_io_buffers *inputs; /**< array of buffer input io */ + struct spa_io_buffers *outputs; /**< array of buffer output io */ + void *input_data; /**< input memory for ringbuffer */ + struct spa_ringbuffer *input_buffer; /**< ringbuffer for input memory */ + void *output_data; /**< output memory for ringbuffer */ + struct spa_ringbuffer *output_buffer; /**< ringbuffer for output memory */ + + /** Destroy a transport + * \param trans a transport to destroy + * \memberof pw_client_node0_transport + */ + void (*destroy) (struct pw_client_node0_transport *trans); + + /** Add a message to the transport + * \param trans the transport to send the message on + * \param message the message to add + * \return 0 on success, < 0 on error + * + * Write \a message to the shared ringbuffer. + */ + int (*add_message) (struct pw_client_node0_transport *trans, struct pw_client_node0_message *message); + + /** Get next message from a transport + * \param trans the transport to get the message of + * \param[out] message the message to read + * \return < 0 on error, 1 when a message is available, + * 0 when no more messages are available. + * + * Get the skeleton next message from \a trans into \a message. This function will + * only read the head and object body of the message. + * + * After the complete size of the message has been calculated, you should call + * \ref parse_message() to read the complete message contents. + */ + int (*next_message) (struct pw_client_node0_transport *trans, struct pw_client_node0_message *message); + + /** Parse the complete message on transport + * \param trans the transport to read from + * \param[out] message memory that can hold the complete message + * \return 0 on success, < 0 on error + * + * Use this function after \ref next_message(). + */ + int (*parse_message) (struct pw_client_node0_transport *trans, void *message); +}; + +#define pw_client_node0_transport_destroy(t) ((t)->destroy((t))) +#define pw_client_node0_transport_add_message(t,m) ((t)->add_message((t), (m))) +#define pw_client_node0_transport_next_message(t,m) ((t)->next_message((t), (m))) +#define pw_client_node0_transport_parse_message(t,m) ((t)->parse_message((t), (m))) + +enum pw_client_node0_message_type { + PW_CLIENT_NODE0_MESSAGE_HAVE_OUTPUT, /*< signal that the node has output */ + PW_CLIENT_NODE0_MESSAGE_NEED_INPUT, /*< signal that the node needs input */ + PW_CLIENT_NODE0_MESSAGE_PROCESS_INPUT, /*< instruct the node to process input */ + PW_CLIENT_NODE0_MESSAGE_PROCESS_OUTPUT, /*< instruct the node output is processed */ + PW_CLIENT_NODE0_MESSAGE_PORT_REUSE_BUFFER, /*< reuse a buffer */ +}; + +struct pw_client_node0_message_body { + struct spa_pod_int type SPA_ALIGNED(8); /*< one of enum pw_client_node0_message_type */ +}; + +struct pw_client_node0_message { + struct spa_pod_struct pod; + struct pw_client_node0_message_body body; +}; + +struct pw_client_node0_message_port_reuse_buffer_body { + struct spa_pod_int type SPA_ALIGNED(8); /*< PW_CLIENT_NODE0_MESSAGE_PORT_REUSE_BUFFER */ + struct spa_pod_int port_id SPA_ALIGNED(8); /*< port id */ + struct spa_pod_int buffer_id SPA_ALIGNED(8); /*< buffer id to reuse */ +}; + +struct pw_client_node0_message_port_reuse_buffer { + struct spa_pod_struct pod; + struct pw_client_node0_message_port_reuse_buffer_body body; +}; + +#define PW_CLIENT_NODE0_MESSAGE_TYPE(message) (((struct pw_client_node0_message*)(message))->body.type.value) + +#define PW_CLIENT_NODE0_MESSAGE_INIT(message) ((struct pw_client_node0_message) \ + { { { sizeof(struct pw_client_node0_message_body), SPA_TYPE_Struct } }, \ + { SPA_POD_INIT_Int(message) } }) + +#define PW_CLIENT_NODE0_MESSAGE_INIT_FULL(type,size,message,...) (type) \ + { { { size, SPA_TYPE_Struct } }, \ + { SPA_POD_INIT_Int(message), ##__VA_ARGS__ } } \ + +#define PW_CLIENT_NODE0_MESSAGE_PORT_REUSE_BUFFER_INIT(port_id,buffer_id) \ + PW_CLIENT_NODE0_MESSAGE_INIT_FULL(struct pw_client_node0_message_port_reuse_buffer, \ + sizeof(struct pw_client_node0_message_port_reuse_buffer_body), \ + PW_CLIENT_NODE0_MESSAGE_PORT_REUSE_BUFFER, \ + SPA_POD_INIT_Int(port_id), \ + SPA_POD_INIT_Int(buffer_id)) + +/** information about a buffer */ +struct pw_client_node0_buffer { + uint32_t mem_id; /**< the memory id for the metadata */ + uint32_t offset; /**< offset in memory */ + uint32_t size; /**< size in memory */ + struct spa_buffer *buffer; /**< buffer describing metadata and buffer memory */ +}; + +#define PW_CLIENT_NODE0_METHOD_DONE 0 +#define PW_CLIENT_NODE0_METHOD_UPDATE 1 +#define PW_CLIENT_NODE0_METHOD_PORT_UPDATE 2 +#define PW_CLIENT_NODE0_METHOD_SET_ACTIVE 3 +#define PW_CLIENT_NODE0_METHOD_EVENT 4 +#define PW_CLIENT_NODE0_METHOD_DESTROY 5 +#define PW_CLIENT_NODE0_METHOD_NUM 6 + +/** \ref pw_client_node methods */ +struct pw_client_node0_methods { +#define PW_VERSION_CLIENT_NODE0_METHODS 0 + uint32_t version; + + /** Complete an async operation */ + void (*done) (void *object, int seq, int res); + + /** + * Update the node ports and properties + * + * Update the maximum number of ports and the params of the + * client node. + * \param change_mask bitfield with changed parameters + * \param max_input_ports new max input ports + * \param max_output_ports new max output ports + * \param params new params + */ + void (*update) (void *object, +#define PW_CLIENT_NODE0_UPDATE_MAX_INPUTS (1 << 0) +#define PW_CLIENT_NODE0_UPDATE_MAX_OUTPUTS (1 << 1) +#define PW_CLIENT_NODE0_UPDATE_PARAMS (1 << 2) + uint32_t change_mask, + uint32_t max_input_ports, + uint32_t max_output_ports, + uint32_t n_params, + const struct spa_pod **params); + + /** + * Update a node port + * + * Update the information of one port of a node. + * \param direction the direction of the port + * \param port_id the port id to update + * \param change_mask a bitfield of changed items + * \param n_params number of port parameters + * \param params array of port parameters + * \param info port information + */ + void (*port_update) (void *object, + enum spa_direction direction, + uint32_t port_id, +#define PW_CLIENT_NODE0_PORT_UPDATE_PARAMS (1 << 0) +#define PW_CLIENT_NODE0_PORT_UPDATE_INFO (1 << 1) + uint32_t change_mask, + uint32_t n_params, + const struct spa_pod **params, + const struct spa_port_info *info); + /** + * Activate or deactivate the node + */ + void (*set_active) (void *object, bool active); + /** + * Send an event to the node + * \param event the event to send + */ + void (*event) (void *object, struct spa_event *event); + /** + * Destroy the client_node + */ + void (*destroy) (void *object); +}; + +#define PW_CLIENT_NODE0_EVENT_ADD_MEM 0 +#define PW_CLIENT_NODE0_EVENT_TRANSPORT 1 +#define PW_CLIENT_NODE0_EVENT_SET_PARAM 2 +#define PW_CLIENT_NODE0_EVENT_EVENT 3 +#define PW_CLIENT_NODE0_EVENT_COMMAND 4 +#define PW_CLIENT_NODE0_EVENT_ADD_PORT 5 +#define PW_CLIENT_NODE0_EVENT_REMOVE_PORT 6 +#define PW_CLIENT_NODE0_EVENT_PORT_SET_PARAM 7 +#define PW_CLIENT_NODE0_EVENT_PORT_USE_BUFFERS 8 +#define PW_CLIENT_NODE0_EVENT_PORT_COMMAND 9 +#define PW_CLIENT_NODE0_EVENT_PORT_SET_IO 10 +#define PW_CLIENT_NODE0_EVENT_NUM 11 + +/** \ref pw_client_node events */ +struct pw_client_node0_events { +#define PW_VERSION_CLIENT_NODE0_EVENTS 0 + uint32_t version; + /** + * Memory was added to a node + * + * \param mem_id the id of the memory + * \param type the memory type + * \param memfd the fd of the memory + * \param flags flags for the \a memfd + */ + void (*add_mem) (void *data, + uint32_t mem_id, + uint32_t type, + int memfd, + uint32_t flags); + /** + * Notify of a new transport area + * + * The transport area is used to exchange real-time commands between + * the client and the server. + * + * \param node_id the node id created for this client node + * \param readfd fd for signal data can be read + * \param writefd fd for signal data can be written + * \param transport the shared transport area + */ + void (*transport) (void *data, + uint32_t node_id, + int readfd, + int writefd, + struct pw_client_node0_transport *transport); + /** + * Notify of a property change + * + * When the server configures the properties on the node + * this event is sent + * + * \param seq a sequence number + * \param id the id of the parameter + * \param flags parameter flags + * \param param the param to set + */ + void (*set_param) (void *data, uint32_t seq, + uint32_t id, uint32_t flags, + const struct spa_pod *param); + /** + * Receive an event from the client node + * \param event the received event */ + void (*event) (void *data, const struct spa_event *event); + /** + * Notify of a new node command + * + * \param seq a sequence number + * \param command the command + */ + void (*command) (void *data, uint32_t seq, const struct spa_command *command); + /** + * A new port was added to the node + * + * The server can at any time add a port to the node when there + * are free ports available. + * + * \param seq a sequence number + * \param direction the direction of the port + * \param port_id the new port id + */ + void (*add_port) (void *data, + uint32_t seq, + enum spa_direction direction, + uint32_t port_id); + /** + * A port was removed from the node + * + * \param seq a sequence number + * \param direction a port direction + * \param port_id the remove port id + */ + void (*remove_port) (void *data, + uint32_t seq, + enum spa_direction direction, + uint32_t port_id); + /** + * A parameter was configured on the port + * + * \param seq a sequence number + * \param direction a port direction + * \param port_id the port id + * \param id the id of the parameter + * \param flags flags used when setting the param + * \param param the new param + */ + void (*port_set_param) (void *data, + uint32_t seq, + enum spa_direction direction, + uint32_t port_id, + uint32_t id, uint32_t flags, + const struct spa_pod *param); + /** + * Notify the port of buffers + * + * \param seq a sequence number + * \param direction a port direction + * \param port_id the port id + * \param n_buffer the number of buffers + * \param buffers and array of buffer descriptions + */ + void (*port_use_buffers) (void *data, + uint32_t seq, + enum spa_direction direction, + uint32_t port_id, + uint32_t n_buffers, + struct pw_client_node0_buffer *buffers); + /** + * Notify of a new port command + * + * \param direction a port direction + * \param port_id the port id + * \param command the command + */ + void (*port_command) (void *data, + enum spa_direction direction, + uint32_t port_id, + const struct spa_command *command); + + /** + * Configure the io area with \a id of \a port_id. + * + * \param seq a sequence number + * \param direction the direction of the port + * \param port_id the port id + * \param id the id of the io area to set + * \param mem_id the id of the memory to use + * \param offset offset of io area in memory + * \param size size of the io area + */ + void (*port_set_io) (void *data, + uint32_t seq, + enum spa_direction direction, + uint32_t port_id, + uint32_t id, + uint32_t mem_id, + uint32_t offset, + uint32_t size); +}; +#define pw_client_node0_resource(r,m,v,...) pw_resource_call(r, struct pw_client_node0_events, m, v, ##__VA_ARGS__) + +#define pw_client_node0_resource_add_mem(r,...) pw_client_node0_resource(r,add_mem,0,__VA_ARGS__) +#define pw_client_node0_resource_transport(r,...) pw_client_node0_resource(r,transport,0,__VA_ARGS__) +#define pw_client_node0_resource_set_param(r,...) pw_client_node0_resource(r,set_param,0,__VA_ARGS__) +#define pw_client_node0_resource_event(r,...) pw_client_node0_resource(r,event,0,__VA_ARGS__) +#define pw_client_node0_resource_command(r,...) pw_client_node0_resource(r,command,0,__VA_ARGS__) +#define pw_client_node0_resource_add_port(r,...) pw_client_node0_resource(r,add_port,0,__VA_ARGS__) +#define pw_client_node0_resource_remove_port(r,...) pw_client_node0_resource(r,remove_port,0,__VA_ARGS__) +#define pw_client_node0_resource_port_set_param(r,...) pw_client_node0_resource(r,port_set_param,0,__VA_ARGS__) +#define pw_client_node0_resource_port_use_buffers(r,...) pw_client_node0_resource(r,port_use_buffers,0,__VA_ARGS__) +#define pw_client_node0_resource_port_command(r,...) pw_client_node0_resource(r,port_command,0,__VA_ARGS__) +#define pw_client_node0_resource_port_set_io(r,...) pw_client_node0_resource(r,port_set_io,0,__VA_ARGS__) + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* __PIPEWIRE_EXT_CLIENT_NODE0_H__ */ diff --git a/src/modules/module-client-node/v0/protocol-native.c b/src/modules/module-client-node/v0/protocol-native.c new file mode 100644 index 0000000..86d8fe9 --- /dev/null +++ b/src/modules/module-client-node/v0/protocol-native.c @@ -0,0 +1,534 @@ +/* PipeWire + * + * Copyright © 2017 Wim Taymans <wim.taymans@gmail.com> + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include <errno.h> + +#include <spa/pod/parser.h> +#include <spa/pod/builder.h> +#include <spa/utils/type-info.h> + +#include "pipewire/impl.h" + +#include "pipewire/extensions/protocol-native.h" + +#include "ext-client-node.h" + +#include "transport.h" + +#define PW_PROTOCOL_NATIVE_FLAG_REMAP (1<<0) + +extern uint32_t pw_protocol_native0_find_type(struct pw_impl_client *client, const char *type); +extern int pw_protocol_native0_pod_to_v2(struct pw_impl_client *client, const struct spa_pod *pod, + struct spa_pod_builder *b); +extern struct spa_pod * pw_protocol_native0_pod_from_v2(struct pw_impl_client *client, + const struct spa_pod *pod); +extern uint32_t pw_protocol_native0_type_to_v2(struct pw_impl_client *client, + const struct spa_type_info *info, uint32_t type); + +static void +client_node_marshal_add_mem(void *data, + uint32_t mem_id, + uint32_t type, + int memfd, uint32_t flags) +{ + struct pw_resource *resource = data; + struct pw_impl_client *client = pw_resource_get_client(resource); + struct spa_pod_builder *b; + const char *typename; + + switch (type) { + case SPA_DATA_MemFd: + typename = "Spa:Enum:DataType:Fd:MemFd"; + break; + case SPA_DATA_DmaBuf: + typename = "Spa:Enum:DataType:Fd:DmaBuf"; + break; + default: + case SPA_DATA_MemPtr: + return; + + } + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_ADD_MEM, NULL); + + spa_pod_builder_add_struct(b, + "i", mem_id, + "I", pw_protocol_native0_find_type(client, typename), + "i", pw_protocol_native_add_resource_fd(resource, memfd), + "i", flags); + + pw_protocol_native_end_resource(resource, b); +} + +static void client_node_marshal_transport(void *data, uint32_t node_id, int readfd, int writefd, + struct pw_client_node0_transport *transport) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + struct pw_client_node0_transport_info info; + + pw_client_node0_transport_get_info(transport, &info); + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_TRANSPORT, NULL); + + spa_pod_builder_add_struct(b, + "i", node_id, + "i", pw_protocol_native_add_resource_fd(resource, readfd), + "i", pw_protocol_native_add_resource_fd(resource, writefd), + "i", pw_protocol_native_add_resource_fd(resource, info.memfd), + "i", info.offset, + "i", info.size); + + pw_protocol_native_end_resource(resource, b); +} + +static void +client_node_marshal_set_param(void *data, uint32_t seq, uint32_t id, uint32_t flags, + const struct spa_pod *param) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_SET_PARAM, NULL); + + spa_pod_builder_add_struct(b, + "i", seq, + "I", id, + "i", flags, + "P", param); + + pw_protocol_native_end_resource(resource, b); +} + +static void client_node_marshal_event_event(void *data, const struct spa_event *event) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_EVENT, NULL); + + spa_pod_builder_add_struct(b, "P", event); + + pw_protocol_native_end_resource(resource, b); +} + +static void +client_node_marshal_command(void *data, uint32_t seq, const struct spa_command *command) +{ + struct pw_resource *resource = data; + struct pw_impl_client *client = pw_resource_get_client(resource); + struct spa_pod_builder *b; + struct spa_pod_frame f; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_COMMAND, NULL); + + spa_pod_builder_push_struct(b, &f); + spa_pod_builder_add(b, "i", seq, NULL); + if (SPA_COMMAND_TYPE(command) == 0) + spa_pod_builder_add(b, "P", command, NULL); + else + pw_protocol_native0_pod_to_v2(client, (struct spa_pod *)command, b); + spa_pod_builder_pop(b, &f); + + pw_protocol_native_end_resource(resource, b); +} + +static void +client_node_marshal_add_port(void *data, + uint32_t seq, enum spa_direction direction, uint32_t port_id) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_ADD_PORT, NULL); + + spa_pod_builder_add_struct(b, + "i", seq, + "i", direction, + "i", port_id); + + pw_protocol_native_end_resource(resource, b); +} + +static void +client_node_marshal_remove_port(void *data, + uint32_t seq, enum spa_direction direction, uint32_t port_id) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_REMOVE_PORT, NULL); + + spa_pod_builder_add_struct(b, + "i", seq, + "i", direction, + "i", port_id); + + pw_protocol_native_end_resource(resource, b); +} + +static void +client_node_marshal_port_set_param(void *data, + uint32_t seq, + enum spa_direction direction, + uint32_t port_id, + uint32_t id, + uint32_t flags, + const struct spa_pod *param) +{ + struct pw_resource *resource = data; + struct pw_impl_client *client = pw_resource_get_client(resource); + struct spa_pod_builder *b; + struct spa_pod_frame f; + const char *typename; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_PORT_SET_PARAM, NULL); + + switch (id) { + case SPA_PARAM_Props: + typename = "Spa:Enum:ParamId:Props"; + break; + case SPA_PARAM_Format: + typename = "Spa:Enum:ParamId:Format"; + break; + default: + return; + } + + spa_pod_builder_push_struct(b, &f); + spa_pod_builder_add(b, + "i", seq, + "i", direction, + "i", port_id, + "I", pw_protocol_native0_find_type(client, typename), + "i", flags, NULL); + pw_protocol_native0_pod_to_v2(client, param, b); + spa_pod_builder_pop(b, &f); + + pw_protocol_native_end_resource(resource, b); +} + +static void +client_node_marshal_port_use_buffers(void *data, + uint32_t seq, + enum spa_direction direction, + uint32_t port_id, + uint32_t n_buffers, struct pw_client_node0_buffer *buffers) +{ + struct pw_resource *resource = data; + struct pw_impl_client *client = pw_resource_get_client(resource); + struct spa_pod_builder *b; + struct spa_pod_frame f; + uint32_t i, j; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_PORT_USE_BUFFERS, NULL); + + spa_pod_builder_push_struct(b, &f); + spa_pod_builder_add(b, + "i", seq, + "i", direction, + "i", port_id, + "i", n_buffers, NULL); + + for (i = 0; i < n_buffers; i++) { + struct spa_buffer *buf = buffers[i].buffer; + + spa_pod_builder_add(b, + "i", buffers[i].mem_id, + "i", buffers[i].offset, + "i", buffers[i].size, + "i", i, + "i", buf->n_metas, NULL); + + for (j = 0; j < buf->n_metas; j++) { + struct spa_meta *m = &buf->metas[j]; + spa_pod_builder_add(b, + "I", pw_protocol_native0_type_to_v2(client, spa_type_meta_type, m->type), + "i", m->size, NULL); + } + spa_pod_builder_add(b, "i", buf->n_datas, NULL); + for (j = 0; j < buf->n_datas; j++) { + struct spa_data *d = &buf->datas[j]; + spa_pod_builder_add(b, + "I", pw_protocol_native0_type_to_v2(client, spa_type_data_type, d->type), + "i", SPA_PTR_TO_UINT32(d->data), + "i", d->flags, + "i", d->mapoffset, + "i", d->maxsize, NULL); + } + } + spa_pod_builder_pop(b, &f); + + pw_protocol_native_end_resource(resource, b); +} + +static void +client_node_marshal_port_command(void *data, + uint32_t direction, + uint32_t port_id, + const struct spa_command *command) +{ + struct pw_resource *resource = data; + struct pw_impl_client *client = pw_resource_get_client(resource); + struct spa_pod_builder *b; + struct spa_pod_frame f; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_PORT_COMMAND, NULL); + + spa_pod_builder_push_struct(b, &f); + spa_pod_builder_add(b, + "i", direction, + "i", port_id, NULL); + pw_protocol_native0_pod_to_v2(client, (struct spa_pod *)command, b); + spa_pod_builder_pop(b, &f); + + pw_protocol_native_end_resource(resource, b); +} + +static void +client_node_marshal_port_set_io(void *data, + uint32_t seq, + uint32_t direction, + uint32_t port_id, + uint32_t id, + uint32_t memid, + uint32_t offset, + uint32_t size) +{ + struct pw_resource *resource = data; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE0_EVENT_PORT_SET_IO, NULL); + + spa_pod_builder_add_struct(b, + "i", seq, + "i", direction, + "i", port_id, + "I", id, + "i", memid, + "i", offset, + "i", size); + + pw_protocol_native_end_resource(resource, b); +} + + +static int client_node_demarshal_done(void *object, const struct pw_protocol_native_message *msg) +{ + struct pw_resource *resource = object; + struct spa_pod_parser prs; + uint32_t seq, res; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_get_struct(&prs, + "i", &seq, + "i", &res) < 0) + return -EINVAL; + + return pw_resource_notify(resource, struct pw_client_node0_methods, done, 0, seq, res); +} + +static int client_node_demarshal_update(void *object, const struct pw_protocol_native_message *msg) +{ + struct pw_resource *resource = object; + struct spa_pod_parser prs; + struct spa_pod_frame f; + uint32_t change_mask, max_input_ports, max_output_ports, n_params; + const struct spa_pod **params; + uint32_t i; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_push_struct(&prs, &f) < 0 || + spa_pod_parser_get(&prs, + "i", &change_mask, + "i", &max_input_ports, + "i", &max_output_ports, + "i", &n_params, NULL) < 0) + return -EINVAL; + + params = alloca(n_params * sizeof(struct spa_pod *)); + for (i = 0; i < n_params; i++) + if (spa_pod_parser_get(&prs, "O", ¶ms[i], NULL) < 0) + return -EINVAL; + + return pw_resource_notify(resource, struct pw_client_node0_methods, update, 0, change_mask, + max_input_ports, + max_output_ports, + n_params, + params); +} + +static int client_node_demarshal_port_update(void *object, const struct pw_protocol_native_message *msg) +{ + struct pw_resource *resource = object; + struct spa_pod_parser prs; + struct spa_pod_frame f[2]; + uint32_t i, direction, port_id, change_mask, n_params; + const struct spa_pod **params = NULL; + struct spa_port_info info = { 0 }, *infop = NULL; + struct spa_dict props; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_push_struct(&prs, &f[0]) < 0 || + spa_pod_parser_get(&prs, + "i", &direction, + "i", &port_id, + "i", &change_mask, + "i", &n_params, NULL) < 0) + return -EINVAL; + + params = alloca(n_params * sizeof(struct spa_pod *)); + for (i = 0; i < n_params; i++) + if (spa_pod_parser_get(&prs, "O", ¶ms[i], NULL) < 0) + return -EINVAL; + + + if (spa_pod_parser_push_struct(&prs, &f[1]) >= 0) { + infop = &info; + + if (spa_pod_parser_get(&prs, + "i", &info.flags, + "i", &info.rate, + "i", &props.n_items, NULL) < 0) + return -EINVAL; + + if (props.n_items > 0) { + info.props = &props; + + props.items = alloca(props.n_items * sizeof(struct spa_dict_item)); + for (i = 0; i < props.n_items; i++) { + if (spa_pod_parser_get(&prs, + "s", &props.items[i].key, + "s", &props.items[i].value, + NULL) < 0) + return -EINVAL; + } + } + } + + return pw_resource_notify(resource, struct pw_client_node0_methods, port_update, 0, direction, + port_id, + change_mask, + n_params, + params, infop); +} + +static int client_node_demarshal_set_active(void *object, const struct pw_protocol_native_message *msg) +{ + struct pw_resource *resource = object; + struct spa_pod_parser prs; + int active; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_get_struct(&prs, + "b", &active) < 0) + return -EINVAL; + + return pw_resource_notify(resource, struct pw_client_node0_methods, set_active, 0, active); +} + +static int client_node_demarshal_event_method(void *object, const struct pw_protocol_native_message *msg) +{ + struct pw_resource *resource = object; + struct pw_impl_client *client = pw_resource_get_client(resource); + struct spa_pod_parser prs; + struct spa_event *event; + int res; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_get_struct(&prs, + "O", &event) < 0) + return -EINVAL; + + event = (struct spa_event*)pw_protocol_native0_pod_from_v2(client, (struct spa_pod *)event); + + res = pw_resource_notify(resource, struct pw_client_node0_methods, event, 0, event); + free(event); + + return res; +} + +static int client_node_demarshal_destroy(void *object, const struct pw_protocol_native_message *msg) +{ + struct pw_resource *resource = object; + struct spa_pod_parser prs; + int res; + + spa_pod_parser_init(&prs, msg->data, msg->size); + if (spa_pod_parser_get_struct(&prs, NULL) < 0) + return -EINVAL; + + res = pw_resource_notify(resource, struct pw_client_node0_methods, destroy, 0); + pw_resource_destroy(resource); + return res; +} + +static const struct pw_protocol_native_demarshal pw_protocol_native_client_node_method_demarshal[] = { + { &client_node_demarshal_done, 0, 0 }, + { &client_node_demarshal_update, 0, PW_PROTOCOL_NATIVE_FLAG_REMAP }, + { &client_node_demarshal_port_update, 0, PW_PROTOCOL_NATIVE_FLAG_REMAP }, + { &client_node_demarshal_set_active, 0, 0 }, + { &client_node_demarshal_event_method, 0, PW_PROTOCOL_NATIVE_FLAG_REMAP }, + { &client_node_demarshal_destroy, 0, 0 }, +}; + +static const struct pw_client_node0_events pw_protocol_native_client_node_event_marshal = { + PW_VERSION_CLIENT_NODE0_EVENTS, + &client_node_marshal_add_mem, + &client_node_marshal_transport, + &client_node_marshal_set_param, + &client_node_marshal_event_event, + &client_node_marshal_command, + &client_node_marshal_add_port, + &client_node_marshal_remove_port, + &client_node_marshal_port_set_param, + &client_node_marshal_port_use_buffers, + &client_node_marshal_port_command, + &client_node_marshal_port_set_io, +}; + +static const struct pw_protocol_marshal pw_protocol_native_client_node_marshal = { + PW_TYPE_INTERFACE_ClientNode, + PW_VERSION_CLIENT_NODE0, + PW_CLIENT_NODE0_METHOD_NUM, + PW_CLIENT_NODE0_EVENT_NUM, + 0, + NULL, + .server_demarshal = &pw_protocol_native_client_node_method_demarshal, + .server_marshal = &pw_protocol_native_client_node_event_marshal, + NULL, +}; + +struct pw_protocol *pw_protocol_native_ext_client_node0_init(struct pw_context *context) +{ + struct pw_protocol *protocol; + + protocol = pw_context_find_protocol(context, PW_TYPE_INFO_PROTOCOL_Native); + + if (protocol == NULL) + return NULL; + + pw_protocol_add_marshal(protocol, &pw_protocol_native_client_node_marshal); + + return protocol; +} diff --git a/src/modules/module-client-node/v0/transport.c b/src/modules/module-client-node/v0/transport.c new file mode 100644 index 0000000..fa84795 --- /dev/null +++ b/src/modules/module-client-node/v0/transport.c @@ -0,0 +1,262 @@ +/* PipeWire + * + * Copyright © 2016 Wim Taymans <wim.taymans@gmail.com> + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include <unistd.h> +#include <errno.h> +#include <sys/mman.h> + +#include <spa/utils/ringbuffer.h> +#include <spa/node/io.h> + +#include <pipewire/impl.h> +#include <pipewire/private.h> + +#include "ext-client-node.h" + +#include "transport.h" + +/** \cond */ + +#define INPUT_BUFFER_SIZE (1<<12) +#define OUTPUT_BUFFER_SIZE (1<<12) + +struct transport { + struct pw_client_node0_transport trans; + + struct pw_memblock *mem; + size_t offset; + + struct pw_client_node0_message current; + uint32_t current_index; +}; +/** \endcond */ + +static size_t area_get_size(struct pw_client_node0_area *area) +{ + size_t size; + size = sizeof(struct pw_client_node0_area); + size += area->max_input_ports * sizeof(struct spa_io_buffers); + size += area->max_output_ports * sizeof(struct spa_io_buffers); + size += sizeof(struct spa_ringbuffer); + size += INPUT_BUFFER_SIZE; + size += sizeof(struct spa_ringbuffer); + size += OUTPUT_BUFFER_SIZE; + return size; +} + +static void transport_setup_area(void *p, struct pw_client_node0_transport *trans) +{ + struct pw_client_node0_area *a; + + trans->area = a = p; + p = SPA_PTROFF(p, sizeof(struct pw_client_node0_area), struct spa_io_buffers); + + trans->inputs = p; + p = SPA_PTROFF(p, a->max_input_ports * sizeof(struct spa_io_buffers), void); + + trans->outputs = p; + p = SPA_PTROFF(p, a->max_output_ports * sizeof(struct spa_io_buffers), void); + + trans->input_buffer = p; + p = SPA_PTROFF(p, sizeof(struct spa_ringbuffer), void); + + trans->input_data = p; + p = SPA_PTROFF(p, INPUT_BUFFER_SIZE, void); + + trans->output_buffer = p; + p = SPA_PTROFF(p, sizeof(struct spa_ringbuffer), void); + + trans->output_data = p; + p = SPA_PTROFF(p, OUTPUT_BUFFER_SIZE, void); +} + +static void transport_reset_area(struct pw_client_node0_transport *trans) +{ + uint32_t i; + struct pw_client_node0_area *a = trans->area; + + for (i = 0; i < a->max_input_ports; i++) { + trans->inputs[i] = SPA_IO_BUFFERS_INIT; + } + for (i = 0; i < a->max_output_ports; i++) { + trans->outputs[i] = SPA_IO_BUFFERS_INIT; + } + spa_ringbuffer_init(trans->input_buffer); + spa_ringbuffer_init(trans->output_buffer); +} + +static void destroy(struct pw_client_node0_transport *trans) +{ + struct transport *impl = (struct transport *) trans; + + pw_log_debug("transport %p: destroy", trans); + + pw_memblock_free(impl->mem); + free(impl); +} + +static int add_message(struct pw_client_node0_transport *trans, struct pw_client_node0_message *message) +{ + struct transport *impl = (struct transport *) trans; + int32_t filled, avail; + uint32_t size, index; + + if (impl == NULL || message == NULL) + return -EINVAL; + + filled = spa_ringbuffer_get_write_index(trans->output_buffer, &index); + avail = OUTPUT_BUFFER_SIZE - filled; + size = SPA_POD_SIZE(message); + if (avail < (int)size) + return -ENOSPC; + + spa_ringbuffer_write_data(trans->output_buffer, + trans->output_data, OUTPUT_BUFFER_SIZE, + index & (OUTPUT_BUFFER_SIZE - 1), message, size); + spa_ringbuffer_write_update(trans->output_buffer, index + size); + + return 0; +} + +static int next_message(struct pw_client_node0_transport *trans, struct pw_client_node0_message *message) +{ + struct transport *impl = (struct transport *) trans; + int32_t avail; + + if (impl == NULL || message == NULL) + return -EINVAL; + + avail = spa_ringbuffer_get_read_index(trans->input_buffer, &impl->current_index); + if (avail < (int) sizeof(struct pw_client_node0_message)) + return 0; + + spa_ringbuffer_read_data(trans->input_buffer, + trans->input_data, INPUT_BUFFER_SIZE, + impl->current_index & (INPUT_BUFFER_SIZE - 1), + &impl->current, sizeof(struct pw_client_node0_message)); + + if (avail < (int) SPA_POD_SIZE(&impl->current)) + return 0; + + *message = impl->current; + + return 1; +} + +static int parse_message(struct pw_client_node0_transport *trans, void *message) +{ + struct transport *impl = (struct transport *) trans; + uint32_t size; + + if (impl == NULL || message == NULL) + return -EINVAL; + + size = SPA_POD_SIZE(&impl->current); + + spa_ringbuffer_read_data(trans->input_buffer, + trans->input_data, INPUT_BUFFER_SIZE, + impl->current_index & (INPUT_BUFFER_SIZE - 1), message, size); + spa_ringbuffer_read_update(trans->input_buffer, impl->current_index + size); + + return 0; +} + +/** Create a new transport + * \param max_input_ports maximum number of input_ports + * \param max_output_ports maximum number of output_ports + * \return a newly allocated \ref pw_client_node0_transport + * \memberof pw_client_node0_transport + */ +struct pw_client_node0_transport * +pw_client_node0_transport_new(struct pw_context *context, + uint32_t max_input_ports, uint32_t max_output_ports) +{ + struct transport *impl; + struct pw_client_node0_transport *trans; + struct pw_client_node0_area area = { 0 }; + + area.max_input_ports = max_input_ports; + area.n_input_ports = 0; + area.max_output_ports = max_output_ports; + area.n_output_ports = 0; + + impl = calloc(1, sizeof(struct transport)); + if (impl == NULL) + return NULL; + + pw_log_debug("transport %p: new %d %d", impl, max_input_ports, max_output_ports); + + trans = &impl->trans; + impl->offset = 0; + + impl->mem = pw_mempool_alloc(context->pool, + PW_MEMBLOCK_FLAG_READWRITE | + PW_MEMBLOCK_FLAG_MAP | + PW_MEMBLOCK_FLAG_SEAL, + SPA_DATA_MemFd, area_get_size(&area)); + if (impl->mem == NULL) { + free(impl); + return NULL; + } + + memcpy(impl->mem->map->ptr, &area, sizeof(struct pw_client_node0_area)); + transport_setup_area(impl->mem->map->ptr, trans); + transport_reset_area(trans); + + trans->destroy = destroy; + trans->add_message = add_message; + trans->next_message = next_message; + trans->parse_message = parse_message; + + return trans; +} + +struct pw_client_node0_transport * +pw_client_node0_transport_new_from_info(struct pw_client_node0_transport_info *info) +{ + errno = ENOTSUP; + return NULL; +} + +/** Get transport info + * \param trans the transport to get info of + * \param[out] info transport info + * \return 0 on success + * + * Fill \a info with the transport info of \a trans. This information can be + * passed to the client to set up the shared transport. + * + * \memberof pw_client_node0_transport + */ +int pw_client_node0_transport_get_info(struct pw_client_node0_transport *trans, + struct pw_client_node0_transport_info *info) +{ + struct transport *impl = (struct transport *) trans; + + info->memfd = impl->mem->fd; + info->offset = impl->offset; + info->size = impl->mem->size; + + return 0; +} diff --git a/src/modules/module-client-node/v0/transport.h b/src/modules/module-client-node/v0/transport.h new file mode 100644 index 0000000..3abcd6d --- /dev/null +++ b/src/modules/module-client-node/v0/transport.h @@ -0,0 +1,59 @@ +/* PipeWire + * + * Copyright © 2016 Wim Taymans <wim.taymans@gmail.com> + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef __PIPEWIRE_CLIENT_NODE0_TRANSPORT_H__ +#define __PIPEWIRE_CLIENT_NODE0_TRANSPORT_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <string.h> + +#include <spa/utils/defs.h> + +#include <pipewire/mem.h> + +/** information about the transport region \memberof pw_client_node */ +struct pw_client_node0_transport_info { + int memfd; /**< the memfd of the transport area */ + uint32_t offset; /**< offset to map \a memfd at */ + uint32_t size; /**< size of memfd mapping */ +}; + +struct pw_client_node0_transport * +pw_client_node0_transport_new(struct pw_context *context, uint32_t max_input_ports, uint32_t max_output_ports); + +struct pw_client_node0_transport * +pw_client_node0_transport_new_from_info(struct pw_client_node0_transport_info *info); + +int +pw_client_node0_transport_get_info(struct pw_client_node0_transport *trans, + struct pw_client_node0_transport_info *info); + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* __PIPEWIRE_CLIENT_NODE0_TRANSPORT_H__ */ |