summaryrefslogtreecommitdiffstats
path: root/spa/plugins/avb
diff options
context:
space:
mode:
Diffstat (limited to 'spa/plugins/avb')
-rw-r--r--spa/plugins/avb/avb-pcm-sink.c910
-rw-r--r--spa/plugins/avb/avb-pcm-source.c910
-rw-r--r--spa/plugins/avb/avb-pcm.c1227
-rw-r--r--spa/plugins/avb/avb-pcm.h343
-rw-r--r--spa/plugins/avb/avb.c54
-rw-r--r--spa/plugins/avb/avb.h39
-rw-r--r--spa/plugins/avb/avbtp/packets.h220
-rw-r--r--spa/plugins/avb/meson.build14
8 files changed, 3717 insertions, 0 deletions
diff --git a/spa/plugins/avb/avb-pcm-sink.c b/spa/plugins/avb/avb-pcm-sink.c
new file mode 100644
index 0000000..f453494
--- /dev/null
+++ b/spa/plugins/avb/avb-pcm-sink.c
@@ -0,0 +1,910 @@
+/* Spa AVB PCM Sink
+ *
+ * Copyright © 2022 Wim Taymans
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice (including the next
+ * paragraph) shall be included in all copies or substantial portions of the
+ * Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#include <stddef.h>
+
+#include <spa/node/node.h>
+#include <spa/node/utils.h>
+#include <spa/node/keys.h>
+#include <spa/monitor/device.h>
+#include <spa/utils/keys.h>
+#include <spa/utils/names.h>
+#include <spa/utils/string.h>
+#include <spa/param/audio/format.h>
+#include <spa/pod/filter.h>
+
+#include "avb-pcm.h"
+
+#define CHECK_PORT(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) == 0)
+#define GET_PORT(this,d,p) (&this->ports[p])
+
+static void reset_props(struct props *props)
+{
+ snprintf(props->ifname, sizeof(props->ifname), "%s", DEFAULT_IFNAME);
+ parse_addr(props->addr, DEFAULT_ADDR);
+ props->prio = DEFAULT_PRIO;
+ parse_streamid(&props->streamid, DEFAULT_STREAMID);
+ props->mtt = DEFAULT_MTT;
+ props->t_uncertainty = DEFAULT_TU;
+ props->frames_per_pdu = DEFAULT_FRAMES_PER_PDU;
+}
+
+static void emit_node_info(struct state *this, bool full)
+{
+ uint64_t old = full ? this->info.change_mask : 0;
+
+ if (full)
+ this->info.change_mask = this->info_all;
+ if (this->info.change_mask) {
+ struct spa_dict_item items[4];
+ uint32_t i, n_items = 0;
+
+ items[n_items++] = SPA_DICT_ITEM_INIT(SPA_KEY_DEVICE_API, "avb");
+ items[n_items++] = SPA_DICT_ITEM_INIT(SPA_KEY_MEDIA_CLASS, "Audio/Sink");
+ items[n_items++] = SPA_DICT_ITEM_INIT(SPA_KEY_NODE_DRIVER, "true");
+ this->info.props = &SPA_DICT_INIT(items, n_items);
+
+ if (this->info.change_mask & SPA_NODE_CHANGE_MASK_PARAMS) {
+ for (i = 0; i < this->info.n_params; i++) {
+ if (this->params[i].user > 0) {
+ this->params[i].flags ^= SPA_PARAM_INFO_SERIAL;
+ this->params[i].user = 0;
+ }
+ }
+ }
+ spa_node_emit_info(&this->hooks, &this->info);
+
+ this->info.change_mask = old;
+ }
+}
+
+static void emit_port_info(struct state *this, struct port *port, bool full)
+{
+ uint64_t old = full ? port->info.change_mask : 0;
+
+ if (full)
+ port->info.change_mask = port->info_all;
+ if (port->info.change_mask) {
+ uint32_t i;
+
+ if (port->info.change_mask & SPA_PORT_CHANGE_MASK_PARAMS) {
+ for (i = 0; i < port->info.n_params; i++) {
+ if (port->params[i].user > 0) {
+ port->params[i].flags ^= SPA_PARAM_INFO_SERIAL;
+ port->params[i].user = 0;
+ }
+ }
+ }
+ spa_node_emit_port_info(&this->hooks,
+ port->direction, port->id, &port->info);
+ port->info.change_mask = old;
+ }
+}
+
+static int impl_node_enum_params(void *object, int seq,
+ uint32_t id, uint32_t start, uint32_t num,
+ const struct spa_pod *filter)
+{
+ struct state *this = object;
+ struct spa_pod *param;
+ struct spa_pod_builder b = { 0 };
+ uint8_t buffer[4096];
+ struct spa_result_node_params result;
+ uint32_t count = 0;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+ spa_return_val_if_fail(num != 0, -EINVAL);
+
+ result.id = id;
+ result.next = start;
+ next:
+ result.index = result.next++;
+
+ spa_pod_builder_init(&b, buffer, sizeof(buffer));
+
+ switch (id) {
+ case SPA_PARAM_PropInfo:
+ {
+ switch (result.index) {
+ default:
+ param = spa_avb_enum_propinfo(this, result.index, &b);
+ if (param == NULL)
+ return 0;
+ }
+ break;
+ }
+ case SPA_PARAM_Props:
+ {
+ struct spa_pod_frame f;
+
+ switch (result.index) {
+ case 0:
+ spa_pod_builder_push_object(&b, &f,
+ SPA_TYPE_OBJECT_Props, id);
+ spa_pod_builder_add(&b,
+ SPA_PROP_latencyOffsetNsec, SPA_POD_Long(this->process_latency.ns),
+ 0);
+ spa_avb_add_prop_params(this, &b);
+ param = spa_pod_builder_pop(&b, &f);
+ break;
+ default:
+ return 0;
+ }
+ break;
+ }
+ case SPA_PARAM_IO:
+ switch (result.index) {
+ case 0:
+ param = spa_pod_builder_add_object(&b,
+ SPA_TYPE_OBJECT_ParamIO, id,
+ SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Clock),
+ SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_clock)));
+ break;
+ case 1:
+ param = spa_pod_builder_add_object(&b,
+ SPA_TYPE_OBJECT_ParamIO, id,
+ SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Position),
+ SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_position)));
+ break;
+ default:
+ return 0;
+ }
+ break;
+
+ case SPA_PARAM_ProcessLatency:
+ switch (result.index) {
+ case 0:
+ param = spa_process_latency_build(&b, id, &this->process_latency);
+ break;
+ default:
+ return 0;
+ }
+ break;
+
+ default:
+ return -ENOENT;
+ }
+
+ if (spa_pod_filter(&b, &result.param, param, filter) < 0)
+ goto next;
+
+ spa_node_emit_result(&this->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result);
+
+ if (++count != num)
+ goto next;
+
+ return 0;
+}
+
+static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size)
+{
+ struct state *this = object;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+
+ switch (id) {
+ case SPA_IO_Clock:
+ this->clock = data;
+ break;
+ case SPA_IO_Position:
+ this->position = data;
+ break;
+ default:
+ return -ENOENT;
+ }
+ spa_avb_reassign_follower(this);
+
+ return 0;
+}
+
+static void handle_process_latency(struct state *this,
+ const struct spa_process_latency_info *info)
+{
+ bool ns_changed = this->process_latency.ns != info->ns;
+ struct port *port = &this->ports[0];
+
+ if (this->process_latency.quantum == info->quantum &&
+ this->process_latency.rate == info->rate &&
+ !ns_changed)
+ return;
+
+ this->process_latency = *info;
+
+ this->info.change_mask |= SPA_NODE_CHANGE_MASK_PARAMS;
+ if (ns_changed)
+ this->params[NODE_Props].user++;
+ this->params[NODE_ProcessLatency].user++;
+
+ port->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS;
+ port->params[PORT_Latency].user++;
+}
+
+static int impl_node_set_param(void *object, uint32_t id, uint32_t flags,
+ const struct spa_pod *param)
+{
+ struct state *this = object;
+ int res;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+
+ switch (id) {
+ case SPA_PARAM_Props:
+ {
+ struct props *p = &this->props;
+ struct spa_pod *params = NULL;
+ int64_t lat_ns = -1;
+
+ if (param == NULL) {
+ reset_props(p);
+ return 0;
+ }
+
+ spa_pod_parse_object(param,
+ SPA_TYPE_OBJECT_Props, NULL,
+ SPA_PROP_latencyOffsetNsec, SPA_POD_OPT_Long(&lat_ns),
+ SPA_PROP_params, SPA_POD_OPT_Pod(&params));
+
+ spa_avb_parse_prop_params(this, params);
+ if (lat_ns != -1) {
+ struct spa_process_latency_info info;
+ info = this->process_latency;
+ info.ns = lat_ns;
+ handle_process_latency(this, &info);
+ }
+ emit_node_info(this, false);
+ emit_port_info(this, &this->ports[0], false);
+ break;
+ }
+ case SPA_PARAM_ProcessLatency:
+ {
+ struct spa_process_latency_info info;
+ if ((res = spa_process_latency_parse(param, &info)) < 0)
+ return res;
+
+ handle_process_latency(this, &info);
+
+ emit_node_info(this, false);
+ emit_port_info(this, &this->ports[0], false);
+ break;
+ }
+ default:
+ return -ENOENT;
+ }
+ return 0;
+}
+
+static int impl_node_send_command(void *object, const struct spa_command *command)
+{
+ struct state *this = object;
+ int res;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+ spa_return_val_if_fail(command != NULL, -EINVAL);
+
+ switch (SPA_NODE_COMMAND_ID(command)) {
+ case SPA_NODE_COMMAND_ParamBegin:
+ break;
+ case SPA_NODE_COMMAND_ParamEnd:
+ break;
+ case SPA_NODE_COMMAND_Start:
+ if (!this->ports[0].have_format)
+ return -EIO;
+ if (this->ports[0].n_buffers == 0)
+ return -EIO;
+ if ((res = spa_avb_start(this)) < 0)
+ return res;
+ break;
+ case SPA_NODE_COMMAND_Suspend:
+ case SPA_NODE_COMMAND_Pause:
+ if ((res = spa_avb_pause(this)) < 0)
+ return res;
+ break;
+ default:
+ return -ENOTSUP;
+ }
+ return 0;
+}
+
+
+static int
+impl_node_add_listener(void *object,
+ struct spa_hook *listener,
+ const struct spa_node_events *events,
+ void *data)
+{
+ struct state *this = object;
+ struct spa_hook_list save;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+
+ spa_hook_list_isolate(&this->hooks, &save, listener, events, data);
+
+ emit_node_info(this, true);
+ emit_port_info(this, &this->ports[0], true);
+
+ spa_hook_list_join(&this->hooks, &save);
+
+ return 0;
+}
+
+static int
+impl_node_set_callbacks(void *object,
+ const struct spa_node_callbacks *callbacks,
+ void *data)
+{
+ struct state *this = object;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+
+ this->callbacks = SPA_CALLBACKS_INIT(callbacks, data);
+
+ return 0;
+}
+
+static int
+impl_node_sync(void *object, int seq)
+{
+ struct state *this = object;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+
+ spa_node_emit_result(&this->hooks, seq, 0, 0, NULL);
+
+ return 0;
+}
+
+static int impl_node_add_port(void *object, enum spa_direction direction, uint32_t port_id,
+ const struct spa_dict *props)
+{
+ return -ENOTSUP;
+}
+
+static int impl_node_remove_port(void *object, enum spa_direction direction, uint32_t port_id)
+{
+ return -ENOTSUP;
+}
+
+static int
+impl_node_port_enum_params(void *object, int seq,
+ enum spa_direction direction, uint32_t port_id,
+ uint32_t id, uint32_t start, uint32_t num,
+ const struct spa_pod *filter)
+{
+
+ struct state *this = object;
+ struct spa_pod *param;
+ struct spa_pod_builder b = { 0 };
+ uint8_t buffer[1024];
+ struct spa_result_node_params result;
+ uint32_t count = 0;
+ struct port *port;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+ spa_return_val_if_fail(num != 0, -EINVAL);
+
+ spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
+
+ port = GET_PORT(this, direction, port_id);
+
+ result.id = id;
+ result.next = start;
+ next:
+ result.index = result.next++;
+
+ spa_pod_builder_init(&b, buffer, sizeof(buffer));
+
+ switch (id) {
+ case SPA_PARAM_EnumFormat:
+ return spa_avb_enum_format(this, seq, start, num, filter);
+
+ case SPA_PARAM_Format:
+ if (!port->have_format)
+ return -EIO;
+ if (result.index > 0)
+ return 0;
+
+ param = spa_format_audio_raw_build(&b, id,
+ &port->current_format.info.raw);
+ break;
+
+ case SPA_PARAM_Buffers:
+ if (!port->have_format)
+ return -EIO;
+ if (result.index > 0)
+ return 0;
+
+ param = spa_pod_builder_add_object(&b,
+ SPA_TYPE_OBJECT_ParamBuffers, id,
+ SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(2, 1, MAX_BUFFERS),
+ SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(this->blocks),
+ SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(
+ this->quantum_limit * this->stride,
+ 16 * this->stride,
+ INT32_MAX),
+ SPA_PARAM_BUFFERS_stride, SPA_POD_Int(this->stride));
+ break;
+
+ case SPA_PARAM_Meta:
+ switch (result.index) {
+ case 0:
+ param = spa_pod_builder_add_object(&b,
+ SPA_TYPE_OBJECT_ParamMeta, id,
+ SPA_PARAM_META_type, SPA_POD_Id(SPA_META_Header),
+ SPA_PARAM_META_size, SPA_POD_Int(sizeof(struct spa_meta_header)));
+ break;
+ default:
+ return 0;
+ }
+ break;
+
+ case SPA_PARAM_IO:
+ switch (result.index) {
+ case 0:
+ param = spa_pod_builder_add_object(&b,
+ SPA_TYPE_OBJECT_ParamIO, id,
+ SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers),
+ SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers)));
+ break;
+ case 1:
+ param = spa_pod_builder_add_object(&b,
+ SPA_TYPE_OBJECT_ParamIO, id,
+ SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_RateMatch),
+ SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_rate_match)));
+ break;
+ default:
+ return 0;
+ }
+ break;
+
+ case SPA_PARAM_Latency:
+ switch (result.index) {
+ case 0: case 1:
+ {
+ struct spa_latency_info latency = this->latency[result.index];
+ if (latency.direction == SPA_DIRECTION_INPUT)
+ spa_process_latency_info_add(&this->process_latency, &latency);
+ param = spa_latency_build(&b, id, &latency);
+ break;
+ }
+ default:
+ return 0;
+ }
+ break;
+
+ default:
+ return -ENOENT;
+ }
+
+ if (spa_pod_filter(&b, &result.param, param, filter) < 0)
+ goto next;
+
+ spa_node_emit_result(&this->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result);
+
+ if (++count != num)
+ goto next;
+
+ return 0;
+}
+
+static int clear_buffers(struct state *this, struct port *port)
+{
+ if (port->n_buffers > 0) {
+ spa_list_init(&port->ready);
+ port->n_buffers = 0;
+ }
+ return 0;
+}
+
+static int port_set_format(void *object, struct port *port,
+ uint32_t flags, const struct spa_pod *format)
+{
+ struct state *this = object;
+ int err;
+
+ if (format == NULL) {
+ if (!port->have_format)
+ return 0;
+
+ spa_log_debug(this->log, "clear format");
+ port->have_format = false;
+ spa_avb_clear_format(this);
+ clear_buffers(this, port);
+ } else {
+ struct spa_audio_info info = { 0 };
+
+ if ((err = spa_format_parse(format, &info.media_type, &info.media_subtype)) < 0)
+ return err;
+
+ if (info.media_type != SPA_MEDIA_TYPE_audio ||
+ info.media_subtype != SPA_MEDIA_SUBTYPE_raw)
+ return -EINVAL;
+
+ if (spa_format_audio_raw_parse(format, &info.info.raw) < 0)
+ return -EINVAL;
+
+ if ((err = spa_avb_set_format(this, &info, flags)) < 0)
+ return err;
+
+ port->current_format = info;
+ port->have_format = true;
+ }
+
+ this->info.change_mask |= SPA_NODE_CHANGE_MASK_PROPS;
+ emit_node_info(this, false);
+
+ port->info.change_mask |= SPA_PORT_CHANGE_MASK_RATE;
+ port->info.rate = SPA_FRACTION(1, this->rate);
+ port->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS;
+ if (port->have_format) {
+ port->params[PORT_Format] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_READWRITE);
+ port->params[PORT_Buffers] = SPA_PARAM_INFO(SPA_PARAM_Buffers, SPA_PARAM_INFO_READ);
+ port->params[PORT_Latency].user++;
+ } else {
+ port->params[PORT_Format] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE);
+ port->params[PORT_Buffers] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0);
+ }
+ emit_port_info(this, port, false);
+
+ return 0;
+}
+
+static int
+impl_node_port_set_param(void *object,
+ enum spa_direction direction, uint32_t port_id,
+ uint32_t id, uint32_t flags,
+ const struct spa_pod *param)
+{
+ struct state *this = object;
+ struct port *port;
+ int res;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+
+ spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
+
+ port = GET_PORT(this, direction, port_id);
+
+ switch (id) {
+ case SPA_PARAM_Format:
+ res = port_set_format(this, port, flags, param);
+ break;
+ case SPA_PARAM_Latency:
+ {
+ struct spa_latency_info info;
+ if ((res = spa_latency_parse(param, &info)) < 0)
+ return res;
+ if (direction == info.direction)
+ return -EINVAL;
+
+ this->latency[info.direction] = info;
+ port->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS;
+ port->params[PORT_Latency].user++;
+ emit_port_info(this, port, false);
+ break;
+ }
+ default:
+ res = -ENOENT;
+ break;
+ }
+ return res;
+}
+
+static int
+impl_node_port_use_buffers(void *object,
+ enum spa_direction direction, uint32_t port_id,
+ uint32_t flags,
+ struct spa_buffer **buffers, uint32_t n_buffers)
+{
+ struct state *this = object;
+ struct port *port;
+ uint32_t i;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+
+ spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
+
+ port = GET_PORT(this, direction, port_id);
+
+ spa_log_debug(this->log, "%p: use %d buffers", this, n_buffers);
+
+ if (port->n_buffers > 0) {
+ spa_avb_pause(this);
+ clear_buffers(this, port);
+ }
+ if (n_buffers > 0 && !port->have_format)
+ return -EIO;
+ if (n_buffers > MAX_BUFFERS)
+ return -ENOSPC;
+
+ for (i = 0; i < n_buffers; i++) {
+ struct buffer *b = &port->buffers[i];
+ struct spa_data *d = buffers[i]->datas;
+
+ b->buf = buffers[i];
+ b->id = i;
+ b->flags = BUFFER_FLAG_OUT;
+
+ b->h = spa_buffer_find_meta_data(b->buf, SPA_META_Header, sizeof(*b->h));
+
+ if (d[0].data == NULL) {
+ spa_log_error(this->log, "%p: need mapped memory", this);
+ return -EINVAL;
+ }
+ spa_log_debug(this->log, "%p: %d %p data:%p", this, i, b->buf, d[0].data);
+ }
+ port->n_buffers = n_buffers;
+
+ return 0;
+}
+
+static int
+impl_node_port_set_io(void *object,
+ enum spa_direction direction,
+ uint32_t port_id,
+ uint32_t id,
+ void *data, size_t size)
+{
+ struct state *this = object;
+ struct port *port;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+
+ spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
+
+ port = GET_PORT(this, direction, port_id);
+
+ spa_log_debug(this->log, "%p: io %d %p %zd", this, id, data, size);
+
+ switch (id) {
+ case SPA_IO_Buffers:
+ port->io = data;
+ break;
+ case SPA_IO_RateMatch:
+ port->rate_match = data;
+ break;
+ default:
+ return -ENOENT;
+ }
+ return 0;
+}
+
+static int impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id)
+{
+ return -ENOTSUP;
+}
+
+static int impl_node_process(void *object)
+{
+ struct state *this = object;
+ struct port *port;
+ struct spa_io_buffers *io;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+
+ port = GET_PORT(this, SPA_DIRECTION_INPUT, 0);
+ if ((io = port->io) == NULL)
+ return -EIO;
+
+ spa_log_trace_fp(this->log, "%p: process %d %d/%d", this, io->status,
+ io->buffer_id, port->n_buffers);
+
+ if (this->position && this->position->clock.flags & SPA_IO_CLOCK_FLAG_FREEWHEEL) {
+ io->status = SPA_STATUS_NEED_DATA;
+ return SPA_STATUS_HAVE_DATA;
+ }
+ if (io->status == SPA_STATUS_HAVE_DATA &&
+ io->buffer_id < port->n_buffers) {
+ struct buffer *b = &port->buffers[io->buffer_id];
+
+ if (!SPA_FLAG_IS_SET(b->flags, BUFFER_FLAG_OUT)) {
+ spa_log_warn(this->log, "%p: buffer %u in use",
+ this, io->buffer_id);
+ io->status = -EINVAL;
+ return -EINVAL;
+ }
+ spa_log_trace_fp(this->log, "%p: queue buffer %u", this, io->buffer_id);
+ spa_list_append(&port->ready, &b->link);
+ SPA_FLAG_CLEAR(b->flags, BUFFER_FLAG_OUT);
+ io->buffer_id = SPA_ID_INVALID;
+
+ spa_avb_write(this);
+
+ io->status = SPA_STATUS_OK;
+ }
+ return SPA_STATUS_HAVE_DATA;
+}
+
+static const struct spa_node_methods impl_node = {
+ SPA_VERSION_NODE_METHODS,
+ .add_listener = impl_node_add_listener,
+ .set_callbacks = impl_node_set_callbacks,
+ .sync = impl_node_sync,
+ .enum_params = impl_node_enum_params,
+ .set_param = impl_node_set_param,
+ .set_io = impl_node_set_io,
+ .send_command = impl_node_send_command,
+ .add_port = impl_node_add_port,
+ .remove_port = impl_node_remove_port,
+ .port_enum_params = impl_node_port_enum_params,
+ .port_set_param = impl_node_port_set_param,
+ .port_use_buffers = impl_node_port_use_buffers,
+ .port_set_io = impl_node_port_set_io,
+ .port_reuse_buffer = impl_node_port_reuse_buffer,
+ .process = impl_node_process,
+};
+
+static int impl_get_interface(struct spa_handle *handle, const char *type, void **interface)
+{
+ struct state *this;
+
+ spa_return_val_if_fail(handle != NULL, -EINVAL);
+ spa_return_val_if_fail(interface != NULL, -EINVAL);
+
+ this = (struct state *) handle;
+
+ if (spa_streq(type, SPA_TYPE_INTERFACE_Node))
+ *interface = &this->node;
+ else
+ return -ENOENT;
+
+ return 0;
+}
+
+static int impl_clear(struct spa_handle *handle)
+{
+ struct state *this;
+ spa_return_val_if_fail(handle != NULL, -EINVAL);
+ this = (struct state *) handle;
+ spa_avb_clear(this);
+ return 0;
+}
+
+static size_t
+impl_get_size(const struct spa_handle_factory *factory,
+ const struct spa_dict *params)
+{
+ return sizeof(struct state);
+}
+
+static int
+impl_init(const struct spa_handle_factory *factory,
+ struct spa_handle *handle, const struct spa_dict *info, const struct spa_support *support, uint32_t n_support)
+{
+ struct state *this;
+ struct port *port;
+
+ spa_return_val_if_fail(factory != NULL, -EINVAL);
+ spa_return_val_if_fail(handle != NULL, -EINVAL);
+
+ handle->get_interface = impl_get_interface;
+ handle->clear = impl_clear;
+
+ this = (struct state *) handle;
+
+ this->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log);
+ avb_log_topic_init(this->log);
+
+ this->data_system = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DataSystem);
+ this->data_loop = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DataLoop);
+
+ if (this->data_loop == NULL) {
+ spa_log_error(this->log, "a data loop is needed");
+ return -EINVAL;
+ }
+ if (this->data_system == NULL) {
+ spa_log_error(this->log, "a data system is needed");
+ return -EINVAL;
+ }
+
+ this->node.iface = SPA_INTERFACE_INIT(
+ SPA_TYPE_INTERFACE_Node,
+ SPA_VERSION_NODE,
+ &impl_node, this);
+
+ spa_hook_list_init(&this->hooks);
+
+
+ this->info_all = SPA_NODE_CHANGE_MASK_FLAGS |
+ SPA_NODE_CHANGE_MASK_PROPS |
+ SPA_NODE_CHANGE_MASK_PARAMS;
+ this->info = SPA_NODE_INFO_INIT();
+ this->info.max_input_ports = 1;
+ this->info.flags = SPA_NODE_FLAG_RT;
+ this->params[NODE_PropInfo] = SPA_PARAM_INFO(SPA_PARAM_PropInfo, SPA_PARAM_INFO_READ);
+ this->params[NODE_Props] = SPA_PARAM_INFO(SPA_PARAM_Props, SPA_PARAM_INFO_READWRITE);
+ this->params[NODE_IO] = SPA_PARAM_INFO(SPA_PARAM_IO, SPA_PARAM_INFO_READ);
+ this->params[NODE_ProcessLatency] = SPA_PARAM_INFO(SPA_PARAM_ProcessLatency, SPA_PARAM_INFO_READWRITE);
+ this->info.params = this->params;
+ this->info.n_params = N_NODE_PARAMS;
+
+ reset_props(&this->props);
+
+ port = GET_PORT(this, SPA_DIRECTION_INPUT, 0);
+ port->direction = SPA_DIRECTION_INPUT;
+
+ port->info_all = SPA_PORT_CHANGE_MASK_FLAGS |
+ SPA_PORT_CHANGE_MASK_PARAMS;
+ port->info = SPA_PORT_INFO_INIT();
+ port->info.flags = SPA_PORT_FLAG_LIVE |
+ SPA_PORT_FLAG_PHYSICAL |
+ SPA_PORT_FLAG_TERMINAL;
+ port->params[PORT_EnumFormat] = SPA_PARAM_INFO(SPA_PARAM_EnumFormat, SPA_PARAM_INFO_READ);
+ port->params[PORT_Meta] = SPA_PARAM_INFO(SPA_PARAM_Meta, SPA_PARAM_INFO_READ);
+ port->params[PORT_IO] = SPA_PARAM_INFO(SPA_PARAM_IO, SPA_PARAM_INFO_READ);
+ port->params[PORT_Format] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE);
+ port->params[PORT_Buffers] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0);
+ port->params[PORT_Latency] = SPA_PARAM_INFO(SPA_PARAM_Latency, SPA_PARAM_INFO_READWRITE);
+ port->info.params = port->params;
+ port->info.n_params = N_PORT_PARAMS;
+
+ spa_list_init(&port->ready);
+
+ this->latency[port->direction] = SPA_LATENCY_INFO(
+ port->direction,
+ .min_quantum = 1.0f,
+ .max_quantum = 1.0f);
+ this->latency[SPA_DIRECTION_OUTPUT] = SPA_LATENCY_INFO(SPA_DIRECTION_OUTPUT);
+
+ return spa_avb_init(this, info);
+}
+
+static const struct spa_interface_info impl_interfaces[] = {
+ {SPA_TYPE_INTERFACE_Node,},
+};
+
+static int
+impl_enum_interface_info(const struct spa_handle_factory *factory,
+ const struct spa_interface_info **info, uint32_t *index)
+{
+ spa_return_val_if_fail(factory != NULL, -EINVAL);
+ spa_return_val_if_fail(info != NULL, -EINVAL);
+ spa_return_val_if_fail(index != NULL, -EINVAL);
+
+ switch (*index) {
+ case 0:
+ *info = &impl_interfaces[*index];
+ break;
+ default:
+ return 0;
+ }
+ (*index)++;
+ return 1;
+}
+
+static const struct spa_dict_item info_items[] = {
+ { SPA_KEY_FACTORY_AUTHOR, "Wim Taymans <wim.taymans@gmail.com>" },
+ { SPA_KEY_FACTORY_DESCRIPTION, "Play audio with AVB" },
+ { SPA_KEY_FACTORY_USAGE, "[]" },
+};
+
+static const struct spa_dict info = SPA_DICT_INIT_ARRAY(info_items);
+
+const struct spa_handle_factory spa_avb_sink_factory = {
+ SPA_VERSION_HANDLE_FACTORY,
+ "avb.pcm.sink",
+ &info,
+ impl_get_size,
+ impl_init,
+ impl_enum_interface_info,
+};
diff --git a/spa/plugins/avb/avb-pcm-source.c b/spa/plugins/avb/avb-pcm-source.c
new file mode 100644
index 0000000..9ff9b21
--- /dev/null
+++ b/spa/plugins/avb/avb-pcm-source.c
@@ -0,0 +1,910 @@
+/* Spa AVB PCM Source
+ *
+ * Copyright © 2022 Wim Taymans
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice (including the next
+ * paragraph) shall be included in all copies or substantial portions of the
+ * Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#include <stddef.h>
+
+#include <spa/node/node.h>
+#include <spa/node/utils.h>
+#include <spa/node/keys.h>
+#include <spa/monitor/device.h>
+#include <spa/utils/keys.h>
+#include <spa/utils/names.h>
+#include <spa/utils/string.h>
+#include <spa/param/audio/format.h>
+#include <spa/pod/filter.h>
+
+#include "avb-pcm.h"
+
+#define CHECK_PORT(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) == 0)
+#define GET_PORT(this,d,p) (&this->ports[p])
+
+static void reset_props(struct props *props)
+{
+ snprintf(props->ifname, sizeof(props->ifname), "%s", DEFAULT_IFNAME);
+ parse_addr(props->addr, DEFAULT_ADDR);
+ props->prio = DEFAULT_PRIO;
+ parse_streamid(&props->streamid, DEFAULT_STREAMID);
+ props->mtt = DEFAULT_MTT;
+ props->t_uncertainty = DEFAULT_TU;
+ props->frames_per_pdu = DEFAULT_FRAMES_PER_PDU;
+}
+
+static void emit_node_info(struct state *this, bool full)
+{
+ uint64_t old = full ? this->info.change_mask : 0;
+
+ if (full)
+ this->info.change_mask = this->info_all;
+ if (this->info.change_mask) {
+ struct spa_dict_item items[4];
+ uint32_t i, n_items = 0;
+
+ items[n_items++] = SPA_DICT_ITEM_INIT(SPA_KEY_DEVICE_API, "avb");
+ items[n_items++] = SPA_DICT_ITEM_INIT(SPA_KEY_MEDIA_CLASS, "Audio/Source");
+ items[n_items++] = SPA_DICT_ITEM_INIT(SPA_KEY_NODE_DRIVER, "true");
+ this->info.props = &SPA_DICT_INIT(items, n_items);
+
+ if (this->info.change_mask & SPA_NODE_CHANGE_MASK_PARAMS) {
+ for (i = 0; i < this->info.n_params; i++) {
+ if (this->params[i].user > 0) {
+ this->params[i].flags ^= SPA_PARAM_INFO_SERIAL;
+ this->params[i].user = 0;
+ }
+ }
+ }
+ spa_node_emit_info(&this->hooks, &this->info);
+
+ this->info.change_mask = old;
+ }
+}
+
+static void emit_port_info(struct state *this, struct port *port, bool full)
+{
+ uint64_t old = full ? port->info.change_mask : 0;
+
+ if (full)
+ port->info.change_mask = port->info_all;
+ if (port->info.change_mask) {
+ uint32_t i;
+
+ if (port->info.change_mask & SPA_PORT_CHANGE_MASK_PARAMS) {
+ for (i = 0; i < port->info.n_params; i++) {
+ if (port->params[i].user > 0) {
+ port->params[i].flags ^= SPA_PARAM_INFO_SERIAL;
+ port->params[i].user = 0;
+ }
+ }
+ }
+ spa_node_emit_port_info(&this->hooks,
+ port->direction, port->id, &port->info);
+ port->info.change_mask = old;
+ }
+}
+
+static int impl_node_enum_params(void *object, int seq,
+ uint32_t id, uint32_t start, uint32_t num,
+ const struct spa_pod *filter)
+{
+ struct state *this = object;
+ struct spa_pod *param;
+ struct spa_pod_builder b = { 0 };
+ uint8_t buffer[4096];
+ struct spa_result_node_params result;
+ uint32_t count = 0;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+ spa_return_val_if_fail(num != 0, -EINVAL);
+
+ result.id = id;
+ result.next = start;
+ next:
+ result.index = result.next++;
+
+ spa_pod_builder_init(&b, buffer, sizeof(buffer));
+
+ switch (id) {
+ case SPA_PARAM_PropInfo:
+ {
+ switch (result.index) {
+ default:
+ param = spa_avb_enum_propinfo(this, result.index, &b);
+ if (param == NULL)
+ return 0;
+ }
+ break;
+ }
+ case SPA_PARAM_Props:
+ {
+ struct spa_pod_frame f;
+
+ switch (result.index) {
+ case 0:
+ spa_pod_builder_push_object(&b, &f,
+ SPA_TYPE_OBJECT_Props, id);
+ spa_pod_builder_add(&b,
+ SPA_PROP_latencyOffsetNsec, SPA_POD_Long(this->process_latency.ns),
+ 0);
+ spa_avb_add_prop_params(this, &b);
+ param = spa_pod_builder_pop(&b, &f);
+ break;
+ default:
+ return 0;
+ }
+ break;
+ }
+ case SPA_PARAM_IO:
+ switch (result.index) {
+ case 0:
+ param = spa_pod_builder_add_object(&b,
+ SPA_TYPE_OBJECT_ParamIO, id,
+ SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Clock),
+ SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_clock)));
+ break;
+ case 1:
+ param = spa_pod_builder_add_object(&b,
+ SPA_TYPE_OBJECT_ParamIO, id,
+ SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Position),
+ SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_position)));
+ break;
+ default:
+ return 0;
+ }
+ break;
+
+ case SPA_PARAM_ProcessLatency:
+ switch (result.index) {
+ case 0:
+ param = spa_process_latency_build(&b, id, &this->process_latency);
+ break;
+ default:
+ return 0;
+ }
+ break;
+
+ default:
+ return -ENOENT;
+ }
+
+ if (spa_pod_filter(&b, &result.param, param, filter) < 0)
+ goto next;
+
+ spa_node_emit_result(&this->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result);
+
+ if (++count != num)
+ goto next;
+
+ return 0;
+}
+
+static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size)
+{
+ struct state *this = object;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+
+ switch (id) {
+ case SPA_IO_Clock:
+ this->clock = data;
+ break;
+ case SPA_IO_Position:
+ this->position = data;
+ break;
+ default:
+ return -ENOENT;
+ }
+ spa_avb_reassign_follower(this);
+
+ return 0;
+}
+
+static void handle_process_latency(struct state *this,
+ const struct spa_process_latency_info *info)
+{
+ bool ns_changed = this->process_latency.ns != info->ns;
+ struct port *port = &this->ports[0];
+
+ if (this->process_latency.quantum == info->quantum &&
+ this->process_latency.rate == info->rate &&
+ !ns_changed)
+ return;
+
+ this->process_latency = *info;
+
+ this->info.change_mask |= SPA_NODE_CHANGE_MASK_PARAMS;
+ if (ns_changed)
+ this->params[NODE_Props].user++;
+ this->params[NODE_ProcessLatency].user++;
+
+ port->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS;
+ port->params[PORT_Latency].user++;
+}
+
+static int impl_node_set_param(void *object, uint32_t id, uint32_t flags,
+ const struct spa_pod *param)
+{
+ struct state *this = object;
+ int res;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+
+ switch (id) {
+ case SPA_PARAM_Props:
+ {
+ struct props *p = &this->props;
+ struct spa_pod *params = NULL;
+ int64_t lat_ns = -1;
+
+ if (param == NULL) {
+ reset_props(p);
+ return 0;
+ }
+
+ spa_pod_parse_object(param,
+ SPA_TYPE_OBJECT_Props, NULL,
+ SPA_PROP_latencyOffsetNsec, SPA_POD_OPT_Long(&lat_ns),
+ SPA_PROP_params, SPA_POD_OPT_Pod(&params));
+
+ spa_avb_parse_prop_params(this, params);
+ if (lat_ns != -1) {
+ struct spa_process_latency_info info;
+ info = this->process_latency;
+ info.ns = lat_ns;
+ handle_process_latency(this, &info);
+ }
+ emit_node_info(this, false);
+ emit_port_info(this, &this->ports[0], false);
+ break;
+ }
+ case SPA_PARAM_ProcessLatency:
+ {
+ struct spa_process_latency_info info;
+ if ((res = spa_process_latency_parse(param, &info)) < 0)
+ return res;
+
+ handle_process_latency(this, &info);
+
+ emit_node_info(this, false);
+ emit_port_info(this, &this->ports[0], false);
+ break;
+ }
+ default:
+ return -ENOENT;
+ }
+ return 0;
+}
+
+static int impl_node_send_command(void *object, const struct spa_command *command)
+{
+ struct state *this = object;
+ int res;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+ spa_return_val_if_fail(command != NULL, -EINVAL);
+
+ switch (SPA_NODE_COMMAND_ID(command)) {
+ case SPA_NODE_COMMAND_ParamBegin:
+ break;
+ case SPA_NODE_COMMAND_ParamEnd:
+ break;
+ case SPA_NODE_COMMAND_Start:
+ if (!this->ports[0].have_format)
+ return -EIO;
+ if (this->ports[0].n_buffers == 0)
+ return -EIO;
+ if ((res = spa_avb_start(this)) < 0)
+ return res;
+ break;
+ case SPA_NODE_COMMAND_Suspend:
+ case SPA_NODE_COMMAND_Pause:
+ if ((res = spa_avb_pause(this)) < 0)
+ return res;
+ break;
+ default:
+ return -ENOTSUP;
+ }
+ return 0;
+}
+
+
+static int
+impl_node_add_listener(void *object,
+ struct spa_hook *listener,
+ const struct spa_node_events *events,
+ void *data)
+{
+ struct state *this = object;
+ struct spa_hook_list save;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+
+ spa_hook_list_isolate(&this->hooks, &save, listener, events, data);
+
+ emit_node_info(this, true);
+ emit_port_info(this, &this->ports[0], true);
+
+ spa_hook_list_join(&this->hooks, &save);
+
+ return 0;
+}
+
+static int
+impl_node_set_callbacks(void *object,
+ const struct spa_node_callbacks *callbacks,
+ void *data)
+{
+ struct state *this = object;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+
+ this->callbacks = SPA_CALLBACKS_INIT(callbacks, data);
+
+ return 0;
+}
+
+static int
+impl_node_sync(void *object, int seq)
+{
+ struct state *this = object;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+
+ spa_node_emit_result(&this->hooks, seq, 0, 0, NULL);
+
+ return 0;
+}
+
+static int impl_node_add_port(void *object, enum spa_direction direction, uint32_t port_id,
+ const struct spa_dict *props)
+{
+ return -ENOTSUP;
+}
+
+static int impl_node_remove_port(void *object, enum spa_direction direction, uint32_t port_id)
+{
+ return -ENOTSUP;
+}
+
+static int
+impl_node_port_enum_params(void *object, int seq,
+ enum spa_direction direction, uint32_t port_id,
+ uint32_t id, uint32_t start, uint32_t num,
+ const struct spa_pod *filter)
+{
+
+ struct state *this = object;
+ struct spa_pod *param;
+ struct spa_pod_builder b = { 0 };
+ uint8_t buffer[1024];
+ struct spa_result_node_params result;
+ uint32_t count = 0;
+ struct port *port;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+ spa_return_val_if_fail(num != 0, -EINVAL);
+
+ spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
+
+ port = GET_PORT(this, direction, port_id);
+
+ result.id = id;
+ result.next = start;
+ next:
+ result.index = result.next++;
+
+ spa_pod_builder_init(&b, buffer, sizeof(buffer));
+
+ switch (id) {
+ case SPA_PARAM_EnumFormat:
+ return spa_avb_enum_format(this, seq, start, num, filter);
+
+ case SPA_PARAM_Format:
+ if (!port->have_format)
+ return -EIO;
+ if (result.index > 0)
+ return 0;
+
+ param = spa_format_audio_raw_build(&b, id,
+ &port->current_format.info.raw);
+ break;
+
+ case SPA_PARAM_Buffers:
+ if (!port->have_format)
+ return -EIO;
+ if (result.index > 0)
+ return 0;
+
+ param = spa_pod_builder_add_object(&b,
+ SPA_TYPE_OBJECT_ParamBuffers, id,
+ SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(2, 1, MAX_BUFFERS),
+ SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(this->blocks),
+ SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(
+ this->quantum_limit * this->stride,
+ 16 * this->stride,
+ INT32_MAX),
+ SPA_PARAM_BUFFERS_stride, SPA_POD_Int(this->stride));
+ break;
+
+ case SPA_PARAM_Meta:
+ switch (result.index) {
+ case 0:
+ param = spa_pod_builder_add_object(&b,
+ SPA_TYPE_OBJECT_ParamMeta, id,
+ SPA_PARAM_META_type, SPA_POD_Id(SPA_META_Header),
+ SPA_PARAM_META_size, SPA_POD_Int(sizeof(struct spa_meta_header)));
+ break;
+ default:
+ return 0;
+ }
+ break;
+
+ case SPA_PARAM_IO:
+ switch (result.index) {
+ case 0:
+ param = spa_pod_builder_add_object(&b,
+ SPA_TYPE_OBJECT_ParamIO, id,
+ SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers),
+ SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers)));
+ break;
+ case 1:
+ param = spa_pod_builder_add_object(&b,
+ SPA_TYPE_OBJECT_ParamIO, id,
+ SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_RateMatch),
+ SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_rate_match)));
+ break;
+ default:
+ return 0;
+ }
+ break;
+
+ case SPA_PARAM_Latency:
+ switch (result.index) {
+ case 0: case 1:
+ {
+ struct spa_latency_info latency = this->latency[result.index];
+ if (latency.direction == SPA_DIRECTION_OUTPUT)
+ spa_process_latency_info_add(&this->process_latency, &latency);
+ param = spa_latency_build(&b, id, &latency);
+ break;
+ }
+ default:
+ return 0;
+ }
+ break;
+
+ default:
+ return -ENOENT;
+ }
+
+ if (spa_pod_filter(&b, &result.param, param, filter) < 0)
+ goto next;
+
+ spa_node_emit_result(&this->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result);
+
+ if (++count != num)
+ goto next;
+
+ return 0;
+}
+
+static int clear_buffers(struct state *this, struct port *port)
+{
+ if (port->n_buffers > 0) {
+ spa_list_init(&port->ready);
+ port->n_buffers = 0;
+ }
+ return 0;
+}
+
+static int port_set_format(void *object, struct port *port,
+ uint32_t flags, const struct spa_pod *format)
+{
+ struct state *this = object;
+ int err;
+
+ if (format == NULL) {
+ if (!port->have_format)
+ return 0;
+
+ spa_log_debug(this->log, "clear format");
+ port->have_format = false;
+ spa_avb_clear_format(this);
+ clear_buffers(this, port);
+ } else {
+ struct spa_audio_info info = { 0 };
+
+ if ((err = spa_format_parse(format, &info.media_type, &info.media_subtype)) < 0)
+ return err;
+
+ if (info.media_type != SPA_MEDIA_TYPE_audio ||
+ info.media_subtype != SPA_MEDIA_SUBTYPE_raw)
+ return -EINVAL;
+
+ if (spa_format_audio_raw_parse(format, &info.info.raw) < 0)
+ return -EINVAL;
+
+ if ((err = spa_avb_set_format(this, &info, flags)) < 0)
+ return err;
+
+ port->current_format = info;
+ port->have_format = true;
+ }
+
+ this->info.change_mask |= SPA_NODE_CHANGE_MASK_PROPS;
+ emit_node_info(this, false);
+
+ port->info.change_mask |= SPA_PORT_CHANGE_MASK_RATE;
+ port->info.rate = SPA_FRACTION(1, this->rate);
+ port->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS;
+ if (port->have_format) {
+ port->params[PORT_Format] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_READWRITE);
+ port->params[PORT_Buffers] = SPA_PARAM_INFO(SPA_PARAM_Buffers, SPA_PARAM_INFO_READ);
+ port->params[PORT_Latency].user++;
+ } else {
+ port->params[PORT_Format] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE);
+ port->params[PORT_Buffers] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0);
+ }
+ emit_port_info(this, port, false);
+
+ return 0;
+}
+
+static int
+impl_node_port_set_param(void *object,
+ enum spa_direction direction, uint32_t port_id,
+ uint32_t id, uint32_t flags,
+ const struct spa_pod *param)
+{
+ struct state *this = object;
+ struct port *port;
+ int res;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+
+ spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
+
+ port = GET_PORT(this, direction, port_id);
+
+ switch (id) {
+ case SPA_PARAM_Format:
+ res = port_set_format(this, port, flags, param);
+ break;
+ case SPA_PARAM_Latency:
+ {
+ struct spa_latency_info info;
+ if ((res = spa_latency_parse(param, &info)) < 0)
+ return res;
+ if (direction == info.direction)
+ return -EINVAL;
+
+ this->latency[info.direction] = info;
+ port->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS;
+ port->params[PORT_Latency].user++;
+ emit_port_info(this, port, false);
+ break;
+ }
+ default:
+ res = -ENOENT;
+ break;
+ }
+ return res;
+}
+
+static int
+impl_node_port_use_buffers(void *object,
+ enum spa_direction direction, uint32_t port_id,
+ uint32_t flags,
+ struct spa_buffer **buffers, uint32_t n_buffers)
+{
+ struct state *this = object;
+ struct port *port;
+ uint32_t i;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+
+ spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
+
+ port = GET_PORT(this, direction, port_id);
+
+ spa_log_debug(this->log, "%p: use %d buffers", this, n_buffers);
+
+ if (port->n_buffers > 0) {
+ spa_avb_pause(this);
+ clear_buffers(this, port);
+ }
+ if (n_buffers > 0 && !port->have_format)
+ return -EIO;
+ if (n_buffers > MAX_BUFFERS)
+ return -ENOSPC;
+
+ for (i = 0; i < n_buffers; i++) {
+ struct buffer *b = &port->buffers[i];
+ struct spa_data *d = buffers[i]->datas;
+
+ b->buf = buffers[i];
+ b->id = i;
+ b->flags = BUFFER_FLAG_OUT;
+
+ b->h = spa_buffer_find_meta_data(b->buf, SPA_META_Header, sizeof(*b->h));
+
+ if (d[0].data == NULL) {
+ spa_log_error(this->log, "%p: need mapped memory", this);
+ return -EINVAL;
+ }
+ spa_log_debug(this->log, "%p: %d %p data:%p", this, i, b->buf, d[0].data);
+ }
+ port->n_buffers = n_buffers;
+
+ return 0;
+}
+
+static int
+impl_node_port_set_io(void *object,
+ enum spa_direction direction,
+ uint32_t port_id,
+ uint32_t id,
+ void *data, size_t size)
+{
+ struct state *this = object;
+ struct port *port;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+
+ spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
+
+ port = GET_PORT(this, direction, port_id);
+
+ spa_log_debug(this->log, "%p: io %d %p %zd", this, id, data, size);
+
+ switch (id) {
+ case SPA_IO_Buffers:
+ port->io = data;
+ break;
+ case SPA_IO_RateMatch:
+ port->rate_match = data;
+ break;
+ default:
+ return -ENOENT;
+ }
+ return 0;
+}
+
+static int impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id)
+{
+ return -ENOTSUP;
+}
+
+static int impl_node_process(void *object)
+{
+ struct state *this = object;
+ struct port *port;
+ struct spa_io_buffers *io;
+ struct buffer *b;
+
+ spa_return_val_if_fail(this != NULL, -EINVAL);
+
+ port = GET_PORT(this, SPA_DIRECTION_OUTPUT, 0);
+ if ((io = port->io) == NULL)
+ return -EIO;
+
+ spa_log_trace_fp(this->log, "%p: process %d %d/%d %d", this, io->status,
+ io->buffer_id, port->n_buffers, this->following);
+
+ if (io->status == SPA_STATUS_HAVE_DATA)
+ return SPA_STATUS_HAVE_DATA;
+
+ if (io->buffer_id < port->n_buffers) {
+ spa_avb_recycle_buffer(this, port, io->buffer_id);
+ io->buffer_id = SPA_ID_INVALID;
+ }
+
+ if (spa_list_is_empty(&port->ready) && this->following) {
+ spa_avb_read(this);
+ }
+ if (spa_list_is_empty(&port->ready) || !this->following)
+ return SPA_STATUS_OK;
+
+ b = spa_list_first(&port->ready, struct buffer, link);
+ spa_list_remove(&b->link);
+ SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT);
+
+ spa_log_trace_fp(this->log, "%p: dequeue buffer %d", this, b->id);
+
+ io->buffer_id = b->id;
+ io->status = SPA_STATUS_HAVE_DATA;
+
+ return SPA_STATUS_HAVE_DATA;
+}
+
+static const struct spa_node_methods impl_node = {
+ SPA_VERSION_NODE_METHODS,
+ .add_listener = impl_node_add_listener,
+ .set_callbacks = impl_node_set_callbacks,
+ .sync = impl_node_sync,
+ .enum_params = impl_node_enum_params,
+ .set_param = impl_node_set_param,
+ .set_io = impl_node_set_io,
+ .send_command = impl_node_send_command,
+ .add_port = impl_node_add_port,
+ .remove_port = impl_node_remove_port,
+ .port_enum_params = impl_node_port_enum_params,
+ .port_set_param = impl_node_port_set_param,
+ .port_use_buffers = impl_node_port_use_buffers,
+ .port_set_io = impl_node_port_set_io,
+ .port_reuse_buffer = impl_node_port_reuse_buffer,
+ .process = impl_node_process,
+};
+
+static int impl_get_interface(struct spa_handle *handle, const char *type, void **interface)
+{
+ struct state *this;
+
+ spa_return_val_if_fail(handle != NULL, -EINVAL);
+ spa_return_val_if_fail(interface != NULL, -EINVAL);
+
+ this = (struct state *) handle;
+
+ if (spa_streq(type, SPA_TYPE_INTERFACE_Node))
+ *interface = &this->node;
+ else
+ return -ENOENT;
+
+ return 0;
+}
+
+static int impl_clear(struct spa_handle *handle)
+{
+ struct state *this;
+ spa_return_val_if_fail(handle != NULL, -EINVAL);
+ this = (struct state *) handle;
+ spa_avb_clear(this);
+ return 0;
+}
+
+static size_t
+impl_get_size(const struct spa_handle_factory *factory,
+ const struct spa_dict *params)
+{
+ return sizeof(struct state);
+}
+
+static int
+impl_init(const struct spa_handle_factory *factory,
+ struct spa_handle *handle, const struct spa_dict *info, const struct spa_support *support, uint32_t n_support)
+{
+ struct state *this;
+ struct port *port;
+
+ spa_return_val_if_fail(factory != NULL, -EINVAL);
+ spa_return_val_if_fail(handle != NULL, -EINVAL);
+
+ handle->get_interface = impl_get_interface;
+ handle->clear = impl_clear;
+
+ this = (struct state *) handle;
+
+ this->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log);
+ avb_log_topic_init(this->log);
+
+ this->data_system = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DataSystem);
+ this->data_loop = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DataLoop);
+
+ if (this->data_loop == NULL) {
+ spa_log_error(this->log, "a data loop is needed");
+ return -EINVAL;
+ }
+ if (this->data_system == NULL) {
+ spa_log_error(this->log, "a data system is needed");
+ return -EINVAL;
+ }
+
+ this->node.iface = SPA_INTERFACE_INIT(
+ SPA_TYPE_INTERFACE_Node,
+ SPA_VERSION_NODE,
+ &impl_node, this);
+
+ spa_hook_list_init(&this->hooks);
+
+ this->info_all = SPA_NODE_CHANGE_MASK_FLAGS |
+ SPA_NODE_CHANGE_MASK_PROPS |
+ SPA_NODE_CHANGE_MASK_PARAMS;
+ this->info = SPA_NODE_INFO_INIT();
+ this->info.max_output_ports = 1;
+ this->info.flags = SPA_NODE_FLAG_RT;
+ this->params[NODE_PropInfo] = SPA_PARAM_INFO(SPA_PARAM_PropInfo, SPA_PARAM_INFO_READ);
+ this->params[NODE_Props] = SPA_PARAM_INFO(SPA_PARAM_Props, SPA_PARAM_INFO_READWRITE);
+ this->params[NODE_IO] = SPA_PARAM_INFO(SPA_PARAM_IO, SPA_PARAM_INFO_READ);
+ this->params[NODE_ProcessLatency] = SPA_PARAM_INFO(SPA_PARAM_ProcessLatency, SPA_PARAM_INFO_READWRITE);
+ this->info.params = this->params;
+ this->info.n_params = N_NODE_PARAMS;
+
+ reset_props(&this->props);
+
+ port = GET_PORT(this, SPA_DIRECTION_OUTPUT, 0);
+ port->direction = SPA_DIRECTION_OUTPUT;
+
+ port->info_all = SPA_PORT_CHANGE_MASK_FLAGS |
+ SPA_PORT_CHANGE_MASK_PARAMS;
+ port->info = SPA_PORT_INFO_INIT();
+ port->info.flags = SPA_PORT_FLAG_LIVE |
+ SPA_PORT_FLAG_PHYSICAL |
+ SPA_PORT_FLAG_TERMINAL;
+ port->params[PORT_EnumFormat] = SPA_PARAM_INFO(SPA_PARAM_EnumFormat, SPA_PARAM_INFO_READ);
+ port->params[PORT_Meta] = SPA_PARAM_INFO(SPA_PARAM_Meta, SPA_PARAM_INFO_READ);
+ port->params[PORT_IO] = SPA_PARAM_INFO(SPA_PARAM_IO, SPA_PARAM_INFO_READ);
+ port->params[PORT_Format] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE);
+ port->params[PORT_Buffers] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0);
+ port->params[PORT_Latency] = SPA_PARAM_INFO(SPA_PARAM_Latency, SPA_PARAM_INFO_READWRITE);
+ port->info.params = port->params;
+ port->info.n_params = N_PORT_PARAMS;
+
+ spa_list_init(&port->ready);
+
+ this->latency[port->direction] = SPA_LATENCY_INFO(
+ port->direction,
+ .min_quantum = 1.0f,
+ .max_quantum = 1.0f);
+ this->latency[SPA_DIRECTION_INPUT] = SPA_LATENCY_INFO(SPA_DIRECTION_INPUT);
+
+ return spa_avb_init(this, info);
+}
+
+static const struct spa_interface_info impl_interfaces[] = {
+ {SPA_TYPE_INTERFACE_Node,},
+};
+
+static int
+impl_enum_interface_info(const struct spa_handle_factory *factory,
+ const struct spa_interface_info **info, uint32_t *index)
+{
+ spa_return_val_if_fail(factory != NULL, -EINVAL);
+ spa_return_val_if_fail(info != NULL, -EINVAL);
+ spa_return_val_if_fail(index != NULL, -EINVAL);
+
+ switch (*index) {
+ case 0:
+ *info = &impl_interfaces[*index];
+ break;
+ default:
+ return 0;
+ }
+ (*index)++;
+ return 1;
+}
+
+static const struct spa_dict_item info_items[] = {
+ { SPA_KEY_FACTORY_AUTHOR, "Wim Taymans <wim.taymans@gmail.com>" },
+ { SPA_KEY_FACTORY_DESCRIPTION, "Play audio with AVB" },
+ { SPA_KEY_FACTORY_USAGE, "[]" },
+};
+
+static const struct spa_dict info = SPA_DICT_INIT_ARRAY(info_items);
+
+const struct spa_handle_factory spa_avb_source_factory = {
+ SPA_VERSION_HANDLE_FACTORY,
+ "avb.pcm.source",
+ &info,
+ impl_get_size,
+ impl_init,
+ impl_enum_interface_info,
+};
diff --git a/spa/plugins/avb/avb-pcm.c b/spa/plugins/avb/avb-pcm.c
new file mode 100644
index 0000000..484adc6
--- /dev/null
+++ b/spa/plugins/avb/avb-pcm.c
@@ -0,0 +1,1227 @@
+/* Spa AVB PCM
+ *
+ * Copyright © 2022 Wim Taymans
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice (including the next
+ * paragraph) shall be included in all copies or substantial portions of the
+ * Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sched.h>
+#include <errno.h>
+#include <getopt.h>
+#include <sys/time.h>
+#include <math.h>
+#include <limits.h>
+#include <unistd.h>
+#include <sys/ioctl.h>
+#include <arpa/inet.h>
+
+#include <spa/pod/filter.h>
+#include <spa/utils/result.h>
+#include <spa/utils/string.h>
+#include <spa/support/system.h>
+#include <spa/utils/keys.h>
+
+#include "avb-pcm.h"
+
+#define TAI_OFFSET (37ULL * SPA_NSEC_PER_SEC)
+#define TAI_TO_UTC(t) (t - TAI_OFFSET)
+
+static int avb_set_param(struct state *state, const char *k, const char *s)
+{
+ struct props *p = &state->props;
+ int fmt_change = 0;
+ if (spa_streq(k, SPA_KEY_AUDIO_CHANNELS)) {
+ state->default_channels = atoi(s);
+ fmt_change++;
+ } else if (spa_streq(k, SPA_KEY_AUDIO_RATE)) {
+ state->default_rate = atoi(s);
+ fmt_change++;
+ } else if (spa_streq(k, SPA_KEY_AUDIO_FORMAT)) {
+ state->default_format = spa_avb_format_from_name(s, strlen(s));
+ fmt_change++;
+ } else if (spa_streq(k, SPA_KEY_AUDIO_POSITION)) {
+ spa_avb_parse_position(&state->default_pos, s, strlen(s));
+ fmt_change++;
+ } else if (spa_streq(k, SPA_KEY_AUDIO_ALLOWED_RATES)) {
+ state->n_allowed_rates = spa_avb_parse_rates(state->allowed_rates,
+ MAX_RATES, s, strlen(s));
+ fmt_change++;
+ } else if (spa_streq(k, "avb.ifname")) {
+ snprintf(p->ifname, sizeof(p->ifname), "%s", s);
+ } else if (spa_streq(k, "avb.macaddr")) {
+ parse_addr(p->addr, s);
+ } else if (spa_streq(k, "avb.prio")) {
+ p->prio = atoi(s);
+ } else if (spa_streq(k, "avb.streamid")) {
+ parse_streamid(&p->streamid, s);
+ } else if (spa_streq(k, "avb.mtt")) {
+ p->mtt = atoi(s);
+ } else if (spa_streq(k, "avb.time-uncertainty")) {
+ p->t_uncertainty = atoi(s);
+ } else if (spa_streq(k, "avb.frames-per-pdu")) {
+ p->frames_per_pdu = atoi(s);
+ } else if (spa_streq(k, "avb.ptime-tolerance")) {
+ p->ptime_tolerance = atoi(s);
+ } else if (spa_streq(k, "latency.internal.rate")) {
+ state->process_latency.rate = atoi(s);
+ } else if (spa_streq(k, "latency.internal.ns")) {
+ state->process_latency.ns = atoi(s);
+ } else if (spa_streq(k, "clock.name")) {
+ spa_scnprintf(state->clock_name,
+ sizeof(state->clock_name), "%s", s);
+ } else
+ return 0;
+
+ if (fmt_change > 0) {
+ struct port *port = &state->ports[0];
+ port->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS;
+ port->params[PORT_EnumFormat].user++;
+ }
+ return 1;
+}
+
+static int position_to_string(struct channel_map *map, char *val, size_t len)
+{
+ uint32_t i, o = 0;
+ int r;
+ o += snprintf(val, len, "[ ");
+ for (i = 0; i < map->channels; i++) {
+ r = snprintf(val+o, len-o, "%s%s", i == 0 ? "" : ", ",
+ spa_debug_type_find_short_name(spa_type_audio_channel,
+ map->pos[i]));
+ if (r < 0 || o + r >= len)
+ return -ENOSPC;
+ o += r;
+ }
+ if (len > o)
+ o += snprintf(val+o, len-o, " ]");
+ return 0;
+}
+
+static int uint32_array_to_string(uint32_t *vals, uint32_t n_vals, char *val, size_t len)
+{
+ uint32_t i, o = 0;
+ int r;
+ o += snprintf(val, len, "[ ");
+ for (i = 0; i < n_vals; i++) {
+ r = snprintf(val+o, len-o, "%s%d", i == 0 ? "" : ", ", vals[i]);
+ if (r < 0 || o + r >= len)
+ return -ENOSPC;
+ o += r;
+ }
+ if (len > o)
+ o += snprintf(val+o, len-o, " ]");
+ return 0;
+}
+
+struct spa_pod *spa_avb_enum_propinfo(struct state *state,
+ uint32_t idx, struct spa_pod_builder *b)
+{
+ struct spa_pod *param;
+ struct props *p = &state->props;
+ char tmp[128];
+
+ switch (idx) {
+ case 0:
+ param = spa_pod_builder_add_object(b,
+ SPA_TYPE_OBJECT_PropInfo, SPA_PARAM_PropInfo,
+ SPA_PROP_INFO_name, SPA_POD_String(SPA_KEY_AUDIO_CHANNELS),
+ SPA_PROP_INFO_description, SPA_POD_String("Audio Channels"),
+ SPA_PROP_INFO_type, SPA_POD_Int(state->default_channels),
+ SPA_PROP_INFO_params, SPA_POD_Bool(true));
+ break;
+ case 1:
+ param = spa_pod_builder_add_object(b,
+ SPA_TYPE_OBJECT_PropInfo, SPA_PARAM_PropInfo,
+ SPA_PROP_INFO_name, SPA_POD_String(SPA_KEY_AUDIO_RATE),
+ SPA_PROP_INFO_description, SPA_POD_String("Audio Rate"),
+ SPA_PROP_INFO_type, SPA_POD_Int(state->default_rate),
+ SPA_PROP_INFO_params, SPA_POD_Bool(true));
+ break;
+ case 2:
+ param = spa_pod_builder_add_object(b,
+ SPA_TYPE_OBJECT_PropInfo, SPA_PARAM_PropInfo,
+ SPA_PROP_INFO_name, SPA_POD_String(SPA_KEY_AUDIO_FORMAT),
+ SPA_PROP_INFO_description, SPA_POD_String("Audio Format"),
+ SPA_PROP_INFO_type, SPA_POD_String(
+ spa_debug_type_find_short_name(spa_type_audio_format,
+ state->default_format)),
+ SPA_PROP_INFO_params, SPA_POD_Bool(true));
+ break;
+ case 3:
+ {
+ char buf[1024];
+ position_to_string(&state->default_pos, buf, sizeof(buf));
+ param = spa_pod_builder_add_object(b,
+ SPA_TYPE_OBJECT_PropInfo, SPA_PARAM_PropInfo,
+ SPA_PROP_INFO_name, SPA_POD_String(SPA_KEY_AUDIO_POSITION),
+ SPA_PROP_INFO_description, SPA_POD_String("Audio Position"),
+ SPA_PROP_INFO_type, SPA_POD_String(buf),
+ SPA_PROP_INFO_params, SPA_POD_Bool(true));
+ break;
+ }
+ case 4:
+ {
+ char buf[1024];
+ uint32_array_to_string(state->allowed_rates, state->n_allowed_rates, buf, sizeof(buf));
+ param = spa_pod_builder_add_object(b,
+ SPA_TYPE_OBJECT_PropInfo, SPA_PARAM_PropInfo,
+ SPA_PROP_INFO_name, SPA_POD_String(SPA_KEY_AUDIO_ALLOWED_RATES),
+ SPA_PROP_INFO_description, SPA_POD_String("Audio Allowed Rates"),
+ SPA_PROP_INFO_type, SPA_POD_String(buf),
+ SPA_PROP_INFO_params, SPA_POD_Bool(true));
+ break;
+ }
+ case 5:
+ param = spa_pod_builder_add_object(b,
+ SPA_TYPE_OBJECT_PropInfo, SPA_PARAM_PropInfo,
+ SPA_PROP_INFO_name, SPA_POD_String("avb.ifname"),
+ SPA_PROP_INFO_description, SPA_POD_String("The AVB interface name"),
+ SPA_PROP_INFO_type, SPA_POD_Stringn(p->ifname, sizeof(p->ifname)),
+ SPA_PROP_INFO_params, SPA_POD_Bool(true));
+ break;
+ case 6:
+ format_addr(tmp, sizeof(tmp), p->addr);
+ param = spa_pod_builder_add_object(b,
+ SPA_TYPE_OBJECT_PropInfo, SPA_PARAM_PropInfo,
+ SPA_PROP_INFO_name, SPA_POD_String("avb.macaddr"),
+ SPA_PROP_INFO_description, SPA_POD_String("The AVB MAC address"),
+ SPA_PROP_INFO_type, SPA_POD_String(tmp),
+ SPA_PROP_INFO_params, SPA_POD_Bool(true));
+ break;
+ case 7:
+ param = spa_pod_builder_add_object(b,
+ SPA_TYPE_OBJECT_PropInfo, SPA_PARAM_PropInfo,
+ SPA_PROP_INFO_name, SPA_POD_String("avb.prio"),
+ SPA_PROP_INFO_description, SPA_POD_String("The AVB stream priority"),
+ SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(p->prio, 0, INT32_MAX),
+ SPA_PROP_INFO_params, SPA_POD_Bool(true));
+ break;
+ case 8:
+ format_streamid(tmp, sizeof(tmp), p->streamid);
+ param = spa_pod_builder_add_object(b,
+ SPA_TYPE_OBJECT_PropInfo, SPA_PARAM_PropInfo,
+ SPA_PROP_INFO_name, SPA_POD_String("avb.streamid"),
+ SPA_PROP_INFO_description, SPA_POD_String("The AVB stream id"),
+ SPA_PROP_INFO_type, SPA_POD_String(tmp),
+ SPA_PROP_INFO_params, SPA_POD_Bool(true));
+ break;
+ case 9:
+ param = spa_pod_builder_add_object(b,
+ SPA_TYPE_OBJECT_PropInfo, SPA_PARAM_PropInfo,
+ SPA_PROP_INFO_name, SPA_POD_String("avb.mtt"),
+ SPA_PROP_INFO_description, SPA_POD_String("The AVB mtt"),
+ SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(p->mtt, 0, INT32_MAX),
+ SPA_PROP_INFO_params, SPA_POD_Bool(true));
+ break;
+ case 10:
+ param = spa_pod_builder_add_object(b,
+ SPA_TYPE_OBJECT_PropInfo, SPA_PARAM_PropInfo,
+ SPA_PROP_INFO_name, SPA_POD_String("avb.time-uncertainty"),
+ SPA_PROP_INFO_description, SPA_POD_String("The AVB time uncertainty"),
+ SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(p->t_uncertainty, 0, INT32_MAX),
+ SPA_PROP_INFO_params, SPA_POD_Bool(true));
+ break;
+ case 11:
+ param = spa_pod_builder_add_object(b,
+ SPA_TYPE_OBJECT_PropInfo, SPA_PARAM_PropInfo,
+ SPA_PROP_INFO_name, SPA_POD_String("avb.frames-per-pdu"),
+ SPA_PROP_INFO_description, SPA_POD_String("The AVB frames per packet"),
+ SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(p->frames_per_pdu, 0, INT32_MAX),
+ SPA_PROP_INFO_params, SPA_POD_Bool(true));
+ break;
+ case 12:
+ param = spa_pod_builder_add_object(b,
+ SPA_TYPE_OBJECT_PropInfo, SPA_PARAM_PropInfo,
+ SPA_PROP_INFO_name, SPA_POD_String("avb.ptime-tolerance"),
+ SPA_PROP_INFO_description, SPA_POD_String("The AVB packet tolerance"),
+ SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(p->ptime_tolerance, 0, INT32_MAX),
+ SPA_PROP_INFO_params, SPA_POD_Bool(true));
+ break;
+ case 13:
+ param = spa_pod_builder_add_object(b,
+ SPA_TYPE_OBJECT_PropInfo, SPA_PARAM_PropInfo,
+ SPA_PROP_INFO_name, SPA_POD_String("latency.internal.rate"),
+ SPA_PROP_INFO_description, SPA_POD_String("Internal latency in samples"),
+ SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(state->process_latency.rate,
+ 0, 65536),
+ SPA_PROP_INFO_params, SPA_POD_Bool(true));
+ break;
+ case 14:
+ param = spa_pod_builder_add_object(b,
+ SPA_TYPE_OBJECT_PropInfo, SPA_PARAM_PropInfo,
+ SPA_PROP_INFO_name, SPA_POD_String("latency.internal.ns"),
+ SPA_PROP_INFO_description, SPA_POD_String("Internal latency in nanoseconds"),
+ SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Long(state->process_latency.ns,
+ 0, 2 * SPA_NSEC_PER_SEC),
+ SPA_PROP_INFO_params, SPA_POD_Bool(true));
+ break;
+ case 15:
+ param = spa_pod_builder_add_object(b,
+ SPA_TYPE_OBJECT_PropInfo, SPA_PARAM_PropInfo,
+ SPA_PROP_INFO_name, SPA_POD_String("clock.name"),
+ SPA_PROP_INFO_description, SPA_POD_String("The name of the clock"),
+ SPA_PROP_INFO_type, SPA_POD_String(state->clock_name),
+ SPA_PROP_INFO_params, SPA_POD_Bool(true));
+ break;
+ default:
+ return NULL;
+ }
+ return param;
+}
+
+int spa_avb_add_prop_params(struct state *state, struct spa_pod_builder *b)
+{
+ struct props *p = &state->props;
+ struct spa_pod_frame f[1];
+ char buf[1024];
+
+ spa_pod_builder_prop(b, SPA_PROP_params, 0);
+ spa_pod_builder_push_struct(b, &f[0]);
+
+ spa_pod_builder_string(b, SPA_KEY_AUDIO_CHANNELS);
+ spa_pod_builder_int(b, state->default_channels);
+
+ spa_pod_builder_string(b, SPA_KEY_AUDIO_RATE);
+ spa_pod_builder_int(b, state->default_rate);
+
+ spa_pod_builder_string(b, SPA_KEY_AUDIO_FORMAT);
+ spa_pod_builder_string(b,
+ spa_debug_type_find_short_name(spa_type_audio_format,
+ state->default_format));
+
+ position_to_string(&state->default_pos, buf, sizeof(buf));
+ spa_pod_builder_string(b, SPA_KEY_AUDIO_POSITION);
+ spa_pod_builder_string(b, buf);
+
+ uint32_array_to_string(state->allowed_rates, state->n_allowed_rates,
+ buf, sizeof(buf));
+ spa_pod_builder_string(b, SPA_KEY_AUDIO_ALLOWED_RATES);
+ spa_pod_builder_string(b, buf);
+
+ spa_pod_builder_string(b, "avb.ifname");
+ spa_pod_builder_string(b, p->ifname);
+
+ format_addr(buf, sizeof(buf), p->addr);
+ spa_pod_builder_string(b, "avb.macadr");
+ spa_pod_builder_string(b, buf);
+
+ spa_pod_builder_string(b, "avb.prio");
+ spa_pod_builder_int(b, p->prio);
+
+ format_streamid(buf, sizeof(buf), p->streamid);
+ spa_pod_builder_string(b, "avb.streamid");
+ spa_pod_builder_string(b, buf);
+ spa_pod_builder_string(b, "avb.mtt");
+ spa_pod_builder_int(b, p->mtt);
+ spa_pod_builder_string(b, "avb.time-uncertainty");
+ spa_pod_builder_int(b, p->t_uncertainty);
+ spa_pod_builder_string(b, "avb.frames-per-pdu");
+ spa_pod_builder_int(b, p->frames_per_pdu);
+ spa_pod_builder_string(b, "avb.ptime-tolerance");
+ spa_pod_builder_int(b, p->ptime_tolerance);
+
+ spa_pod_builder_string(b, "latency.internal.rate");
+ spa_pod_builder_int(b, state->process_latency.rate);
+
+ spa_pod_builder_string(b, "latency.internal.ns");
+ spa_pod_builder_long(b, state->process_latency.ns);
+
+ spa_pod_builder_string(b, "clock.name");
+ spa_pod_builder_string(b, state->clock_name);
+
+ spa_pod_builder_pop(b, &f[0]);
+ return 0;
+}
+
+int spa_avb_parse_prop_params(struct state *state, struct spa_pod *params)
+{
+ struct spa_pod_parser prs;
+ struct spa_pod_frame f;
+ int changed = 0;
+
+ if (params == NULL)
+ return 0;
+
+ spa_pod_parser_pod(&prs, params);
+ if (spa_pod_parser_push_struct(&prs, &f) < 0)
+ return 0;
+
+ while (true) {
+ const char *name;
+ struct spa_pod *pod;
+ char value[512];
+
+ if (spa_pod_parser_get_string(&prs, &name) < 0)
+ break;
+
+ if (spa_pod_parser_get_pod(&prs, &pod) < 0)
+ break;
+ if (spa_pod_is_string(pod)) {
+ spa_pod_copy_string(pod, sizeof(value), value);
+ } else if (spa_pod_is_int(pod)) {
+ snprintf(value, sizeof(value), "%d",
+ SPA_POD_VALUE(struct spa_pod_int, pod));
+ } else if (spa_pod_is_long(pod)) {
+ snprintf(value, sizeof(value), "%"PRIi64,
+ SPA_POD_VALUE(struct spa_pod_long, pod));
+ } else if (spa_pod_is_bool(pod)) {
+ snprintf(value, sizeof(value), "%s",
+ SPA_POD_VALUE(struct spa_pod_bool, pod) ?
+ "true" : "false");
+ } else
+ continue;
+
+ spa_log_info(state->log, "key:'%s' val:'%s'", name, value);
+ avb_set_param(state, name, value);
+ changed++;
+ }
+ if (changed > 0) {
+ state->info.change_mask |= SPA_NODE_CHANGE_MASK_PARAMS;
+ state->params[NODE_Props].user++;
+ }
+ return changed;
+}
+
+int spa_avb_init(struct state *state, const struct spa_dict *info)
+{
+ uint32_t i;
+
+ state->quantum_limit = 8192;
+ for (i = 0; info && i < info->n_items; i++) {
+ const char *k = info->items[i].key;
+ const char *s = info->items[i].value;
+ if (spa_streq(k, "clock.quantum-limit")) {
+ spa_atou32(s, &state->quantum_limit, 0);
+ } else {
+ avb_set_param(state, k, s);
+ }
+ }
+
+ state->ringbuffer_size = state->quantum_limit * 64;
+ state->ringbuffer_data = calloc(1, state->ringbuffer_size * 4);
+ spa_ringbuffer_init(&state->ring);
+ return 0;
+}
+
+int spa_avb_clear(struct state *state)
+{
+ return 0;
+}
+
+static int spa_format_to_aaf(uint32_t format)
+{
+ switch(format) {
+ case SPA_AUDIO_FORMAT_F32_BE: return SPA_AVBTP_AAF_FORMAT_FLOAT_32BIT;
+ case SPA_AUDIO_FORMAT_S32_BE: return SPA_AVBTP_AAF_FORMAT_INT_32BIT;
+ case SPA_AUDIO_FORMAT_S24_BE: return SPA_AVBTP_AAF_FORMAT_INT_24BIT;
+ case SPA_AUDIO_FORMAT_S16_BE: return SPA_AVBTP_AAF_FORMAT_INT_16BIT;
+ default: return SPA_AVBTP_AAF_FORMAT_USER;
+ }
+}
+
+static int calc_frame_size(uint32_t format)
+{
+ switch(format) {
+ case SPA_AUDIO_FORMAT_F32_BE:
+ case SPA_AUDIO_FORMAT_S32_BE: return 4;
+ case SPA_AUDIO_FORMAT_S24_BE: return 3;
+ case SPA_AUDIO_FORMAT_S16_BE: return 2;
+ default: return 0;
+ }
+}
+
+static int spa_rate_to_aaf(uint32_t rate)
+{
+ switch(rate) {
+ case 8000: return SPA_AVBTP_AAF_PCM_NSR_8KHZ;
+ case 16000: return SPA_AVBTP_AAF_PCM_NSR_16KHZ;
+ case 24000: return SPA_AVBTP_AAF_PCM_NSR_24KHZ;
+ case 32000: return SPA_AVBTP_AAF_PCM_NSR_32KHZ;
+ case 44100: return SPA_AVBTP_AAF_PCM_NSR_44_1KHZ;
+ case 48000: return SPA_AVBTP_AAF_PCM_NSR_48KHZ;
+ case 88200: return SPA_AVBTP_AAF_PCM_NSR_88_2KHZ;
+ case 96000: return SPA_AVBTP_AAF_PCM_NSR_96KHZ;
+ case 176400: return SPA_AVBTP_AAF_PCM_NSR_176_4KHZ;
+ case 192000: return SPA_AVBTP_AAF_PCM_NSR_192KHZ;
+ default: return SPA_AVBTP_AAF_PCM_NSR_USER;
+ }
+}
+
+int
+spa_avb_enum_format(struct state *state, int seq, uint32_t start, uint32_t num,
+ const struct spa_pod *filter)
+{
+ uint8_t buffer[4096];
+ struct spa_pod_builder b = { 0 };
+ struct spa_pod_frame f[2];
+ struct spa_pod *fmt;
+ int res = 0;
+ struct spa_result_node_params result;
+ uint32_t count = 0;
+
+ result.id = SPA_PARAM_EnumFormat;
+ result.next = start;
+
+next:
+ result.index = result.next++;
+
+ if (result.index > 0)
+ return 0;
+
+ spa_pod_builder_init(&b, buffer, sizeof(buffer));
+
+ spa_pod_builder_push_object(&b, &f[0], SPA_TYPE_OBJECT_Format, SPA_PARAM_EnumFormat);
+ spa_pod_builder_add(&b,
+ SPA_FORMAT_mediaType, SPA_POD_Id(SPA_MEDIA_TYPE_audio),
+ SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_raw),
+ 0);
+
+ spa_pod_builder_prop(&b, SPA_FORMAT_AUDIO_format, 0);
+ if (state->default_format != 0) {
+ spa_pod_builder_id(&b, state->default_format);
+ } else {
+ spa_pod_builder_push_choice(&b, &f[1], SPA_CHOICE_Enum, 0);
+ spa_pod_builder_id(&b, SPA_AUDIO_FORMAT_F32_BE);
+ spa_pod_builder_id(&b, SPA_AUDIO_FORMAT_F32_BE);
+ spa_pod_builder_id(&b, SPA_AUDIO_FORMAT_S32_BE);
+ spa_pod_builder_id(&b, SPA_AUDIO_FORMAT_S24_BE);
+ spa_pod_builder_id(&b, SPA_AUDIO_FORMAT_S16_BE);
+ spa_pod_builder_pop(&b, &f[1]);
+ }
+ spa_pod_builder_prop(&b, SPA_FORMAT_AUDIO_rate, 0);
+ if (state->default_rate != 0) {
+ spa_pod_builder_int(&b, state->default_rate);
+ } else {
+ spa_pod_builder_push_choice(&b, &f[1], SPA_CHOICE_Enum, 0);
+ spa_pod_builder_int(&b, 48000);
+ spa_pod_builder_int(&b, 8000);
+ spa_pod_builder_int(&b, 16000);
+ spa_pod_builder_int(&b, 24000);
+ spa_pod_builder_int(&b, 32000);
+ spa_pod_builder_int(&b, 44100);
+ spa_pod_builder_int(&b, 48000);
+ spa_pod_builder_int(&b, 88200);
+ spa_pod_builder_int(&b, 96000);
+ spa_pod_builder_int(&b, 176400);
+ spa_pod_builder_int(&b, 192000);
+ spa_pod_builder_pop(&b, &f[1]);
+ }
+ spa_pod_builder_prop(&b, SPA_FORMAT_AUDIO_channels, 0);
+ if (state->default_channels != 0) {
+ spa_pod_builder_int(&b, state->default_channels);
+ } else {
+ spa_pod_builder_push_choice(&b, &f[1], SPA_CHOICE_Range, 0);
+ spa_pod_builder_int(&b, 8);
+ spa_pod_builder_int(&b, 2);
+ spa_pod_builder_int(&b, 32);
+ spa_pod_builder_pop(&b, &f[1]);
+ }
+ fmt = spa_pod_builder_pop(&b, &f[0]);
+
+ if (spa_pod_filter(&b, &result.param, fmt, filter) < 0)
+ goto next;
+
+ spa_node_emit_result(&state->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result);
+
+ if (++count != num)
+ goto next;
+
+ return res;
+}
+
+static int setup_socket(struct state *state)
+{
+ int fd, res;
+ struct ifreq req;
+ struct props *p = &state->props;
+
+ fd = socket(AF_PACKET, SOCK_DGRAM|SOCK_NONBLOCK, htons(ETH_P_TSN));
+ if (fd < 0) {
+ spa_log_error(state->log, "socket() failed: %m");
+ return -errno;
+ }
+
+ snprintf(req.ifr_name, sizeof(req.ifr_name), "%s", p->ifname);
+ res = ioctl(fd, SIOCGIFINDEX, &req);
+ if (res < 0) {
+ spa_log_error(state->log, "SIOCGIFINDEX %s failed: %m", p->ifname);
+ res = -errno;
+ goto error_close;
+ }
+
+ state->sock_addr.sll_family = AF_PACKET;
+ state->sock_addr.sll_protocol = htons(ETH_P_TSN);
+ state->sock_addr.sll_halen = ETH_ALEN;
+ state->sock_addr.sll_ifindex = req.ifr_ifindex;
+ memcpy(&state->sock_addr.sll_addr, p->addr, ETH_ALEN);
+
+ if (state->ports[0].direction == SPA_DIRECTION_INPUT) {
+ struct sock_txtime txtime_cfg;
+
+ res = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &p->prio,
+ sizeof(p->prio));
+ if (res < 0) {
+ spa_log_error(state->log, "setsockopt(SO_PRIORITY %d) failed: %m", p->prio);
+ res = -errno;
+ goto error_close;
+ }
+
+ txtime_cfg.clockid = CLOCK_TAI;
+ txtime_cfg.flags = 0;
+ res = setsockopt(fd, SOL_SOCKET, SO_TXTIME, &txtime_cfg,
+ sizeof(txtime_cfg));
+ if (res < 0) {
+ spa_log_error(state->log, "setsockopt(SO_TXTIME) failed: %m");
+ res = -errno;
+ goto error_close;
+ }
+ } else {
+ struct packet_mreq mreq = { 0 };
+
+ res = bind(fd, (struct sockaddr *) &state->sock_addr,
+ sizeof(state->sock_addr));
+ if (res < 0) {
+ spa_log_error(state->log, "bind() failed: %m");
+ res = -errno;
+ goto error_close;
+ }
+
+ mreq.mr_ifindex = req.ifr_ifindex;
+ mreq.mr_type = PACKET_MR_MULTICAST;
+ mreq.mr_alen = ETH_ALEN;
+ memcpy(&mreq.mr_address, p->addr, ETH_ALEN);
+ res = setsockopt(fd, SOL_PACKET, PACKET_ADD_MEMBERSHIP,
+ &mreq, sizeof(struct packet_mreq));
+ if (res < 0) {
+ spa_log_error(state->log, "setsockopt(ADD_MEMBERSHIP) failed: %m");
+ res = -errno;
+ goto error_close;
+ }
+ }
+ state->sockfd = fd;
+ return 0;
+
+error_close:
+ close(fd);
+ return res;
+}
+
+static int setup_packet(struct state *state, struct spa_audio_info *fmt)
+{
+ struct spa_avbtp_packet_aaf *pdu;
+ struct props *p = &state->props;
+ ssize_t payload_size, hdr_size, pdu_size;
+
+ hdr_size = sizeof(*pdu);
+ payload_size = state->stride * p->frames_per_pdu;
+ pdu_size = hdr_size + payload_size;
+ if ((pdu = calloc(1, pdu_size)) == NULL)
+ return -errno;
+
+ SPA_AVBTP_PACKET_AAF_SET_SUBTYPE(pdu, SPA_AVBTP_SUBTYPE_AAF);
+
+ if (state->ports[0].direction == SPA_DIRECTION_INPUT) {
+ SPA_AVBTP_PACKET_AAF_SET_SV(pdu, 1);
+ SPA_AVBTP_PACKET_AAF_SET_STREAM_ID(pdu, p->streamid);
+ SPA_AVBTP_PACKET_AAF_SET_TV(pdu, 1);
+ SPA_AVBTP_PACKET_AAF_SET_FORMAT(pdu, spa_format_to_aaf(state->format));
+ SPA_AVBTP_PACKET_AAF_SET_NSR(pdu, spa_rate_to_aaf(state->rate));
+ SPA_AVBTP_PACKET_AAF_SET_CHAN_PER_FRAME(pdu, state->channels);
+ SPA_AVBTP_PACKET_AAF_SET_BIT_DEPTH(pdu, calc_frame_size(state->format)*8);
+ SPA_AVBTP_PACKET_AAF_SET_DATA_LEN(pdu, payload_size);
+ SPA_AVBTP_PACKET_AAF_SET_SP(pdu, SPA_AVBTP_AAF_PCM_SP_NORMAL);
+ }
+ state->pdu = pdu;
+ state->hdr_size = hdr_size;
+ state->payload_size = payload_size;
+ state->pdu_size = pdu_size;
+ return 0;
+}
+
+static int setup_msg(struct state *state)
+{
+ state->iov[0].iov_base = state->pdu;
+ state->iov[0].iov_len = state->hdr_size;
+ state->iov[1].iov_base = state->pdu->payload;
+ state->iov[1].iov_len = state->payload_size;
+ state->iov[2].iov_base = state->pdu->payload;
+ state->iov[2].iov_len = 0;
+ state->msg.msg_name = &state->sock_addr;
+ state->msg.msg_namelen = sizeof(state->sock_addr);
+ state->msg.msg_iov = state->iov;
+ state->msg.msg_iovlen = 3;
+ state->msg.msg_control = state->control;
+ state->msg.msg_controllen = sizeof(state->control);
+ state->cmsg = CMSG_FIRSTHDR(&state->msg);
+ state->cmsg->cmsg_level = SOL_SOCKET;
+ state->cmsg->cmsg_type = SCM_TXTIME;
+ state->cmsg->cmsg_len = CMSG_LEN(sizeof(__u64));
+ return 0;
+}
+
+int spa_avb_clear_format(struct state *state)
+{
+ close(state->sockfd);
+ close(state->timerfd);
+ free(state->pdu);
+
+ return 0;
+}
+
+int spa_avb_set_format(struct state *state, struct spa_audio_info *fmt, uint32_t flags)
+{
+ int res, frame_size;
+ struct props *p = &state->props;
+
+ frame_size = calc_frame_size(fmt->info.raw.format);
+ if (frame_size == 0)
+ return -EINVAL;
+
+ if (fmt->info.raw.rate == 0 ||
+ fmt->info.raw.channels == 0)
+ return -EINVAL;
+
+ state->format = fmt->info.raw.format;
+ state->rate = fmt->info.raw.rate;
+ state->channels = fmt->info.raw.channels;
+ state->blocks = 1;
+ state->stride = state->channels * frame_size;
+
+ if ((res = setup_socket(state)) < 0)
+ return res;
+
+ if ((res = spa_system_timerfd_create(state->data_system,
+ CLOCK_REALTIME, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK)) < 0)
+ goto error_close_sockfd;
+
+ state->timerfd = res;
+
+ if ((res = setup_packet(state, fmt)) < 0)
+ return res;
+
+ if ((res = setup_msg(state)) < 0)
+ return res;
+
+ state->pdu_period = SPA_NSEC_PER_SEC * p->frames_per_pdu /
+ state->rate;
+
+ return 0;
+
+error_close_sockfd:
+ close(state->sockfd);
+ return res;
+}
+
+void spa_avb_recycle_buffer(struct state *this, struct port *port, uint32_t buffer_id)
+{
+ struct buffer *b = &port->buffers[buffer_id];
+
+ if (SPA_FLAG_IS_SET(b->flags, BUFFER_FLAG_OUT)) {
+ spa_log_trace_fp(this->log, "%p: recycle buffer %u", this, buffer_id);
+ spa_list_append(&port->free, &b->link);
+ SPA_FLAG_CLEAR(b->flags, BUFFER_FLAG_OUT);
+ }
+}
+
+static void reset_buffers(struct state *this, struct port *port)
+{
+ uint32_t i;
+
+ spa_list_init(&port->free);
+ spa_list_init(&port->ready);
+
+ for (i = 0; i < port->n_buffers; i++) {
+ struct buffer *b = &port->buffers[i];
+ if (port->direction == SPA_DIRECTION_INPUT) {
+ SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT);
+ spa_node_call_reuse_buffer(&this->callbacks, 0, b->id);
+ } else {
+ spa_list_append(&port->free, &b->link);
+ SPA_FLAG_CLEAR(b->flags, BUFFER_FLAG_OUT);
+ }
+ }
+}
+
+static inline bool is_pdu_valid(struct state *state)
+{
+ uint8_t seq_num;
+
+ seq_num = SPA_AVBTP_PACKET_AAF_GET_SEQ_NUM(state->pdu);
+
+ if (state->prev_seq != 0 && (uint8_t)(state->prev_seq + 1) != seq_num) {
+ spa_log_warn(state->log, "dropped packets %d != %d", state->prev_seq + 1, seq_num);
+ }
+ state->prev_seq = seq_num;
+ return true;
+}
+
+static inline void
+set_iovec(struct spa_ringbuffer *rbuf, void *buffer, uint32_t size,
+ uint32_t offset, struct iovec *iov, uint32_t len)
+{
+ iov[0].iov_len = SPA_MIN(len, size - offset);
+ iov[0].iov_base = SPA_PTROFF(buffer, offset, void);
+ iov[1].iov_len = len - iov[0].iov_len;
+ iov[1].iov_base = buffer;
+}
+
+static void avb_on_socket_event(struct spa_source *source)
+{
+ struct state *state = source->data;
+ ssize_t n;
+ int32_t filled;
+ uint32_t subtype, index;
+ struct spa_avbtp_packet_aaf *pdu = state->pdu;
+ bool overrun = false;
+
+ filled = spa_ringbuffer_get_write_index(&state->ring, &index);
+ overrun = filled > (int32_t) state->ringbuffer_size;
+ if (overrun) {
+ state->iov[1].iov_base = state->pdu->payload;
+ state->iov[1].iov_len = state->payload_size;
+ state->iov[2].iov_len = 0;
+ } else {
+ set_iovec(&state->ring,
+ state->ringbuffer_data,
+ state->ringbuffer_size,
+ index % state->ringbuffer_size,
+ &state->iov[1], state->payload_size);
+ }
+
+ n = recvmsg(state->sockfd, &state->msg, 0);
+ if (n < 0) {
+ spa_log_error(state->log, "recv() failed: %m");
+ return;
+ }
+ if (n != (ssize_t)state->pdu_size) {
+ spa_log_error(state->log, "AVB packet dropped: Invalid size");
+ return;
+ }
+
+ subtype = SPA_AVBTP_PACKET_AAF_GET_SUBTYPE(pdu);
+ if (subtype != SPA_AVBTP_SUBTYPE_AAF) {
+ spa_log_error(state->log, "non supported subtype %d", subtype);
+ return;
+ }
+ if (!is_pdu_valid(state)) {
+ spa_log_error(state->log, "AAF PDU invalid");
+ return;
+ }
+ if (overrun) {
+ spa_log_warn(state->log, "overrun %d", filled);
+ return;
+ }
+ index += state->payload_size;
+ spa_ringbuffer_write_update(&state->ring, index);
+}
+
+static void set_timeout(struct state *state, uint64_t next_time)
+{
+ struct itimerspec ts;
+ uint64_t time_utc;
+
+ spa_log_trace(state->log, "set timeout %"PRIu64, next_time);
+
+ time_utc = next_time > TAI_OFFSET ? TAI_TO_UTC(next_time) : 0;
+ ts.it_value.tv_sec = time_utc / SPA_NSEC_PER_SEC;
+ ts.it_value.tv_nsec = time_utc % SPA_NSEC_PER_SEC;
+ ts.it_interval.tv_sec = 0;
+ ts.it_interval.tv_nsec = 0;
+ spa_system_timerfd_settime(state->data_system,
+ state->timer_source.fd, SPA_FD_TIMER_ABSTIME, &ts, NULL);
+}
+
+static int flush_write(struct state *state, uint64_t current_time)
+{
+ int32_t avail, wanted;
+ uint32_t index;
+ uint64_t ptime, txtime;
+ int pdu_count;
+ struct props *p = &state->props;
+ struct spa_avbtp_packet_aaf *pdu = state->pdu;
+ ssize_t n;
+
+ avail = spa_ringbuffer_get_read_index(&state->ring, &index);
+ wanted = state->duration * state->stride;
+ if (avail < wanted) {
+ spa_log_warn(state->log, "underrun %d < %d", avail, wanted);
+ return -EPIPE;
+ }
+
+ pdu_count = state->duration / p->frames_per_pdu;
+
+ txtime = current_time + p->t_uncertainty;
+ ptime = txtime + p->mtt;
+
+ while (pdu_count--) {
+ *(__u64 *)CMSG_DATA(state->cmsg) = txtime;
+
+ set_iovec(&state->ring,
+ state->ringbuffer_data,
+ state->ringbuffer_size,
+ index % state->ringbuffer_size,
+ &state->iov[1], state->payload_size);
+
+ SPA_AVBTP_PACKET_AAF_SET_SEQ_NUM(pdu, state->pdu_seq++);
+ SPA_AVBTP_PACKET_AAF_SET_TIMESTAMP(pdu, ptime);
+
+ n = sendmsg(state->sockfd, &state->msg, MSG_NOSIGNAL);
+ if (n < 0 || n != (ssize_t)state->pdu_size) {
+ spa_log_error(state->log, "sendmdg() failed: %m");
+ }
+ txtime += state->pdu_period;
+ ptime += state->pdu_period;
+ index += state->payload_size;
+ }
+ spa_ringbuffer_read_update(&state->ring, index);
+ return 0;
+}
+
+int spa_avb_write(struct state *state)
+{
+ int32_t filled;
+ uint32_t index, to_write;
+ struct port *port = &state->ports[0];
+
+ filled = spa_ringbuffer_get_write_index(&state->ring, &index);
+ if (filled < 0) {
+ spa_log_warn(state->log, "underrun %d", filled);
+ } else if (filled > (int32_t)state->ringbuffer_size) {
+ spa_log_warn(state->log, "overrun %d", filled);
+ }
+ to_write = state->ringbuffer_size - filled;
+
+ while (!spa_list_is_empty(&port->ready) && to_write > 0) {
+ size_t n_bytes;
+ struct buffer *b;
+ struct spa_data *d;
+ uint32_t offs, avail, size;
+
+ b = spa_list_first(&port->ready, struct buffer, link);
+ d = b->buf->datas;
+
+ offs = SPA_MIN(d[0].chunk->offset + port->ready_offset, d[0].maxsize);
+ size = SPA_MIN(d[0].chunk->size, d[0].maxsize - offs);
+ avail = size - offs;
+
+ n_bytes = SPA_MIN(avail, to_write);
+ if (n_bytes == 0)
+ break;
+
+ spa_ringbuffer_write_data(&state->ring,
+ state->ringbuffer_data,
+ state->ringbuffer_size,
+ index % state->ringbuffer_size,
+ SPA_PTROFF(d[0].data, offs, void),
+ n_bytes);
+
+ port->ready_offset += n_bytes;
+
+ if (port->ready_offset >= size || avail == 0) {
+ spa_list_remove(&b->link);
+ SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT);
+ port->io->buffer_id = b->id;
+ spa_log_trace_fp(state->log, "%p: reuse buffer %u", state, b->id);
+
+ spa_node_call_reuse_buffer(&state->callbacks, 0, b->id);
+
+ port->ready_offset = 0;
+ }
+ to_write -= n_bytes;
+ index += n_bytes;
+ }
+ spa_ringbuffer_write_update(&state->ring, index);
+
+ if (state->following) {
+ flush_write(state, state->position->clock.nsec);
+ }
+ return 0;
+}
+
+static int handle_play(struct state *state, uint64_t current_time)
+{
+ flush_write(state, current_time);
+ spa_node_call_ready(&state->callbacks, SPA_STATUS_NEED_DATA);
+ return 0;
+}
+
+int spa_avb_read(struct state *state)
+{
+ int32_t avail, wanted;
+ uint32_t index;
+ struct port *port = &state->ports[0];
+ struct buffer *b;
+ struct spa_data *d;
+ uint32_t n_bytes;
+
+ if (state->position)
+ state->duration = state->position->clock.duration;
+
+ avail = spa_ringbuffer_get_read_index(&state->ring, &index);
+ wanted = state->duration * state->stride;
+
+ if (spa_list_is_empty(&port->free)) {
+ spa_log_warn(state->log, "out of buffers");
+ return -EPIPE;
+ }
+
+ b = spa_list_first(&port->free, struct buffer, link);
+ d = b->buf->datas;
+
+ n_bytes = SPA_MIN(d[0].maxsize, (uint32_t)wanted);
+
+ if (avail < wanted) {
+ spa_log_warn(state->log, "capture underrun %d < %d", avail, wanted);
+ memset(d[0].data, 0, n_bytes);
+ } else {
+ spa_ringbuffer_read_data(&state->ring,
+ state->ringbuffer_data,
+ state->ringbuffer_size,
+ index % state->ringbuffer_size,
+ d[0].data, n_bytes);
+ index += n_bytes;
+ spa_ringbuffer_read_update(&state->ring, index);
+ }
+
+ d[0].chunk->offset = 0;
+ d[0].chunk->size = n_bytes;
+ d[0].chunk->stride = state->stride;
+ d[0].chunk->flags = 0;
+
+ spa_list_remove(&b->link);
+ spa_list_append(&port->ready, &b->link);
+
+ return 0;
+}
+
+static int handle_capture(struct state *state, uint64_t current_time)
+{
+ struct port *port = &state->ports[0];
+ struct spa_io_buffers *io;
+ struct buffer *b;
+
+ spa_avb_read(state);
+
+ if (spa_list_is_empty(&port->ready))
+ return 0;
+
+ io = port->io;
+ if (io != NULL &&
+ (io->status != SPA_STATUS_HAVE_DATA || port->rate_match != NULL)) {
+ if (io->buffer_id < port->n_buffers)
+ spa_avb_recycle_buffer(state, port, io->buffer_id);
+
+ b = spa_list_first(&port->ready, struct buffer, link);
+ spa_list_remove(&b->link);
+ SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT);
+
+ io->buffer_id = b->id;
+ io->status = SPA_STATUS_HAVE_DATA;
+ spa_log_trace_fp(state->log, "%p: output buffer:%d", state, b->id);
+ }
+ spa_node_call_ready(&state->callbacks, SPA_STATUS_HAVE_DATA);
+ return 0;
+}
+
+static void avb_on_timeout_event(struct spa_source *source)
+{
+ struct state *state = source->data;
+ uint64_t expirations, current_time, duration;
+ uint32_t rate;
+ int res;
+
+ spa_log_trace(state->log, "timeout");
+
+ if ((res = spa_system_timerfd_read(state->data_system,
+ state->timer_source.fd, &expirations)) < 0) {
+ if (res != -EAGAIN)
+ spa_log_error(state->log, "read timerfd: %s", spa_strerror(res));
+ return;
+ }
+
+ current_time = state->next_time;
+ if (SPA_LIKELY(state->position)) {
+ duration = state->position->clock.duration;
+ rate = state->position->clock.rate.denom;
+ } else {
+ duration = 1024;
+ rate = 48000;
+ }
+ state->duration = duration;
+
+ if (state->ports[0].direction == SPA_DIRECTION_INPUT)
+ handle_play(state, current_time);
+ else
+ handle_capture(state, current_time);
+
+ state->next_time = current_time + duration * SPA_NSEC_PER_SEC / rate;
+
+ if (SPA_LIKELY(state->clock)) {
+ state->clock->nsec = current_time;
+ state->clock->position += duration;
+ state->clock->duration = duration;
+ state->clock->delay = 0;
+ state->clock->rate_diff = 1.0;
+ state->clock->next_nsec = state->next_time;
+ }
+
+ set_timeout(state, state->next_time);
+}
+
+static int set_timers(struct state *state)
+{
+ struct timespec now;
+ int res;
+
+ if ((res = spa_system_clock_gettime(state->data_system, CLOCK_TAI, &now)) < 0)
+ return res;
+
+ state->next_time = SPA_TIMESPEC_TO_NSEC(&now);
+
+ if (state->following) {
+ set_timeout(state, 0);
+ } else {
+ set_timeout(state, state->next_time);
+ }
+ return 0;
+}
+
+static inline bool is_following(struct state *state)
+{
+ return state->position && state->clock && state->position->clock.id != state->clock->id;
+}
+
+static int do_reassign_follower(struct spa_loop *loop,
+ bool async,
+ uint32_t seq,
+ const void *data,
+ size_t size,
+ void *user_data)
+{
+ struct state *state = user_data;
+ spa_dll_init(&state->dll);
+ set_timers(state);
+ return 0;
+}
+
+int spa_avb_reassign_follower(struct state *state)
+{
+ bool following, freewheel;
+
+ if (!state->started)
+ return 0;
+
+ following = is_following(state);
+ if (following != state->following) {
+ spa_log_debug(state->log, "%p: reassign follower %d->%d", state, state->following, following);
+ state->following = following;
+ spa_loop_invoke(state->data_loop, do_reassign_follower, 0, NULL, 0, true, state);
+ }
+
+ freewheel = state->position &&
+ SPA_FLAG_IS_SET(state->position->clock.flags, SPA_IO_CLOCK_FLAG_FREEWHEEL);
+
+ if (state->freewheel != freewheel) {
+ spa_log_debug(state->log, "%p: freewheel %d->%d", state, state->freewheel, freewheel);
+ state->freewheel = freewheel;
+ }
+ return 0;
+}
+
+int spa_avb_start(struct state *state)
+{
+ if (state->started)
+ return 0;
+
+ if (state->position) {
+ state->duration = state->position->clock.duration;
+ state->rate_denom = state->position->clock.rate.denom;
+ } else {
+ state->duration = 1024;
+ state->rate_denom = state->rate;
+ }
+
+ spa_dll_init(&state->dll);
+ state->max_error = (256.0 * state->rate) / state->rate_denom;
+
+ state->following = is_following(state);
+
+ state->timer_source.func = avb_on_timeout_event;
+ state->timer_source.data = state;
+ state->timer_source.fd = state->timerfd;
+ state->timer_source.mask = SPA_IO_IN;
+ state->timer_source.rmask = 0;
+ spa_loop_add_source(state->data_loop, &state->timer_source);
+
+ state->pdu_seq = 0;
+
+ if (state->ports[0].direction == SPA_DIRECTION_OUTPUT) {
+ state->sock_source.func = avb_on_socket_event;
+ state->sock_source.data = state;
+ state->sock_source.fd = state->sockfd;
+ state->sock_source.mask = SPA_IO_IN;
+ state->sock_source.rmask = 0;
+ spa_loop_add_source(state->data_loop, &state->sock_source);
+ }
+
+ reset_buffers(state, &state->ports[0]);
+
+ set_timers(state);
+
+ state->started = true;
+
+ return 0;
+}
+
+static int do_remove_source(struct spa_loop *loop,
+ bool async,
+ uint32_t seq,
+ const void *data,
+ size_t size,
+ void *user_data)
+{
+ struct state *state = user_data;
+
+ spa_loop_remove_source(state->data_loop, &state->timer_source);
+
+ if (state->ports[0].direction == SPA_DIRECTION_OUTPUT) {
+ spa_loop_remove_source(state->data_loop, &state->sock_source);
+ }
+ return 0;
+}
+
+int spa_avb_pause(struct state *state)
+{
+ if (!state->started)
+ return 0;
+
+ spa_log_debug(state->log, "%p: pause", state);
+
+ spa_loop_invoke(state->data_loop, do_remove_source, 0, NULL, 0, true, state);
+
+ state->started = false;
+ set_timeout(state, 0);
+
+ return 0;
+}
diff --git a/spa/plugins/avb/avb-pcm.h b/spa/plugins/avb/avb-pcm.h
new file mode 100644
index 0000000..bb3bce6
--- /dev/null
+++ b/spa/plugins/avb/avb-pcm.h
@@ -0,0 +1,343 @@
+/* Spa AVB PCM
+ *
+ * Copyright © 2022 Wim Taymans
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice (including the next
+ * paragraph) shall be included in all copies or substantial portions of the
+ * Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#ifndef SPA_AVB_PCM_H
+#define SPA_AVB_PCM_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stddef.h>
+#include <math.h>
+#include <linux/if_ether.h>
+#include <linux/if_packet.h>
+#include <linux/net_tstamp.h>
+#include <limits.h>
+#include <net/if.h>
+
+#include <avbtp/packets.h>
+
+#include <spa/support/plugin.h>
+#include <spa/support/loop.h>
+#include <spa/utils/list.h>
+#include <spa/utils/json.h>
+#include <spa/utils/dll.h>
+
+#include <spa/node/node.h>
+#include <spa/node/utils.h>
+#include <spa/node/io.h>
+#include <spa/debug/types.h>
+#include <spa/utils/ringbuffer.h>
+#include <spa/param/param.h>
+#include <spa/param/latency-utils.h>
+#include <spa/param/audio/format-utils.h>
+
+#include "avb.h"
+
+#define MAX_RATES 16
+
+#define DEFAULT_IFNAME "eth0"
+#define DEFAULT_ADDR "01:AA:AA:AA:AA:AA"
+#define DEFAULT_PRIO 0
+#define DEFAULT_STREAMID "AA:BB:CC:DD:EE:FF:0000"
+#define DEFAULT_MTT 5000000
+#define DEFAULT_TU 1000000
+#define DEFAULT_FRAMES_PER_PDU 8
+
+#define DEFAULT_PERIOD 1024u
+#define DEFAULT_RATE 48000u
+#define DEFAULT_CHANNELS 8u
+
+struct props {
+ char ifname[IFNAMSIZ];
+ unsigned char addr[ETH_ALEN];
+ int prio;
+ uint64_t streamid;
+ int mtt;
+ int t_uncertainty;
+ uint32_t frames_per_pdu;
+ int ptime_tolerance;
+};
+
+static inline int parse_addr(unsigned char addr[ETH_ALEN], const char *str)
+{
+ unsigned char ad[ETH_ALEN];
+ if (sscanf(str, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx",
+ &ad[0], &ad[1], &ad[2], &ad[3], &ad[4], &ad[5]) != 6)
+ return -EINVAL;
+ memcpy(addr, ad, sizeof(ad));
+ return 0;
+}
+static inline char *format_addr(char *str, size_t size, const unsigned char addr[ETH_ALEN])
+{
+ snprintf(str, size, "%02x:%02x:%02x:%02x:%02x:%02x",
+ addr[0], addr[1], addr[2],
+ addr[3], addr[4], addr[5]);
+ return str;
+}
+
+static inline int parse_streamid(uint64_t *streamid, const char *str)
+{
+ unsigned char addr[6];
+ unsigned short unique_id;
+ if (sscanf(str, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx:%hx",
+ &addr[0], &addr[1], &addr[2], &addr[3],
+ &addr[4], &addr[5], &unique_id) != 7)
+ return -EINVAL;
+ *streamid = (uint64_t) addr[0] << 56 |
+ (uint64_t) addr[1] << 48 |
+ (uint64_t) addr[2] << 40 |
+ (uint64_t) addr[3] << 32 |
+ (uint64_t) addr[4] << 24 |
+ (uint64_t) addr[5] << 16 |
+ unique_id;
+ return 0;
+}
+static inline char *format_streamid(char *str, size_t size, const uint64_t streamid)
+{
+ snprintf(str, size, "%02x:%02x:%02x:%02x:%02x:%02x:%04x",
+ (uint8_t)(streamid >> 56),
+ (uint8_t)(streamid >> 48),
+ (uint8_t)(streamid >> 40),
+ (uint8_t)(streamid >> 32),
+ (uint8_t)(streamid >> 24),
+ (uint8_t)(streamid >> 16),
+ (uint16_t)(streamid));
+ return str;
+}
+
+#define MAX_BUFFERS 32
+
+struct buffer {
+ uint32_t id;
+#define BUFFER_FLAG_OUT (1<<0)
+ uint32_t flags;
+ struct spa_buffer *buf;
+ struct spa_meta_header *h;
+ struct spa_list link;
+};
+
+#define BW_MAX 0.128
+#define BW_MED 0.064
+#define BW_MIN 0.016
+#define BW_PERIOD (3 * SPA_NSEC_PER_SEC)
+
+struct channel_map {
+ uint32_t channels;
+ uint32_t pos[SPA_AUDIO_MAX_CHANNELS];
+};
+
+struct port {
+ enum spa_direction direction;
+ uint32_t id;
+
+ uint64_t info_all;
+ struct spa_port_info info;
+#define PORT_EnumFormat 0
+#define PORT_Meta 1
+#define PORT_IO 2
+#define PORT_Format 3
+#define PORT_Buffers 4
+#define PORT_Latency 5
+#define N_PORT_PARAMS 6
+ struct spa_param_info params[N_PORT_PARAMS];
+
+ bool have_format;
+ struct spa_audio_info current_format;
+
+ struct spa_io_buffers *io;
+ struct spa_io_rate_match *rate_match;
+ struct buffer buffers[MAX_BUFFERS];
+ unsigned int n_buffers;
+
+ struct spa_list free;
+ struct spa_list ready;
+ uint32_t ready_offset;
+};
+
+struct state {
+ struct spa_handle handle;
+ struct spa_node node;
+
+ struct spa_log *log;
+ struct spa_system *data_system;
+ struct spa_loop *data_loop;
+
+ struct spa_hook_list hooks;
+ struct spa_callbacks callbacks;
+
+ uint64_t info_all;
+ struct spa_node_info info;
+#define NODE_PropInfo 0
+#define NODE_Props 1
+#define NODE_IO 2
+#define NODE_ProcessLatency 3
+#define N_NODE_PARAMS 4
+ struct spa_param_info params[N_NODE_PARAMS];
+ struct props props;
+
+ uint32_t default_period_size;
+ uint32_t default_format;
+ unsigned int default_channels;
+ unsigned int default_rate;
+ uint32_t allowed_rates[MAX_RATES];
+ uint32_t n_allowed_rates;
+ struct channel_map default_pos;
+ char clock_name[64];
+ uint32_t quantum_limit;
+
+ uint32_t format;
+ uint32_t rate;
+ uint32_t channels;
+ uint32_t stride;
+ uint32_t blocks;
+ uint32_t rate_denom;
+
+ struct spa_io_clock *clock;
+ struct spa_io_position *position;
+
+ struct port ports[1];
+
+ uint32_t duration;
+ unsigned int following:1;
+ unsigned int matching:1;
+ unsigned int resample:1;
+ unsigned int started:1;
+ unsigned int freewheel:1;
+
+ int timerfd;
+ struct spa_source timer_source;
+ uint64_t next_time;
+
+ int sockfd;
+ struct spa_source sock_source;
+ struct sockaddr_ll sock_addr;
+
+ struct spa_avbtp_packet_aaf *pdu;
+ size_t hdr_size;
+ size_t payload_size;
+ size_t pdu_size;
+ int64_t pdu_period;
+ uint8_t pdu_seq;
+ uint8_t prev_seq;
+
+ struct iovec iov[3];
+ struct msghdr msg;
+ char control[CMSG_SPACE(sizeof(__u64))];
+ struct cmsghdr *cmsg;
+
+ uint8_t *ringbuffer_data;
+ uint32_t ringbuffer_size;
+ struct spa_ringbuffer ring;
+
+ struct spa_dll dll;
+ double max_error;
+
+ struct spa_latency_info latency[2];
+ struct spa_process_latency_info process_latency;
+};
+
+struct spa_pod *spa_avb_enum_propinfo(struct state *state,
+ uint32_t idx, struct spa_pod_builder *b);
+int spa_avb_add_prop_params(struct state *state, struct spa_pod_builder *b);
+int spa_avb_parse_prop_params(struct state *state, struct spa_pod *params);
+
+int spa_avb_enum_format(struct state *state, int seq,
+ uint32_t start, uint32_t num,
+ const struct spa_pod *filter);
+
+int spa_avb_clear_format(struct state *state);
+int spa_avb_set_format(struct state *state, struct spa_audio_info *info, uint32_t flags);
+
+int spa_avb_init(struct state *state, const struct spa_dict *info);
+int spa_avb_clear(struct state *state);
+
+int spa_avb_start(struct state *state);
+int spa_avb_reassign_follower(struct state *state);
+int spa_avb_pause(struct state *state);
+
+int spa_avb_write(struct state *state);
+int spa_avb_read(struct state *state);
+int spa_avb_skip(struct state *state);
+
+void spa_avb_recycle_buffer(struct state *state, struct port *port, uint32_t buffer_id);
+
+static inline uint32_t spa_avb_format_from_name(const char *name, size_t len)
+{
+ int i;
+ for (i = 0; spa_type_audio_format[i].name; i++) {
+ if (strncmp(name, spa_debug_type_short_name(spa_type_audio_format[i].name), len) == 0)
+ return spa_type_audio_format[i].type;
+ }
+ return SPA_AUDIO_FORMAT_UNKNOWN;
+}
+
+static inline uint32_t spa_avb_channel_from_name(const char *name)
+{
+ int i;
+ for (i = 0; spa_type_audio_channel[i].name; i++) {
+ if (strcmp(name, spa_debug_type_short_name(spa_type_audio_channel[i].name)) == 0)
+ return spa_type_audio_channel[i].type;
+ }
+ return SPA_AUDIO_CHANNEL_UNKNOWN;
+}
+
+static inline void spa_avb_parse_position(struct channel_map *map, const char *val, size_t len)
+{
+ struct spa_json it[2];
+ char v[256];
+
+ spa_json_init(&it[0], val, len);
+ if (spa_json_enter_array(&it[0], &it[1]) <= 0)
+ spa_json_init(&it[1], val, len);
+
+ map->channels = 0;
+ while (spa_json_get_string(&it[1], v, sizeof(v)) > 0 &&
+ map->channels < SPA_AUDIO_MAX_CHANNELS) {
+ map->pos[map->channels++] = spa_avb_channel_from_name(v);
+ }
+}
+
+static inline uint32_t spa_avb_parse_rates(uint32_t *rates, uint32_t max, const char *val, size_t len)
+{
+ struct spa_json it[2];
+ char v[256];
+ uint32_t count;
+
+ spa_json_init(&it[0], val, len);
+ if (spa_json_enter_array(&it[0], &it[1]) <= 0)
+ spa_json_init(&it[1], val, len);
+
+ count = 0;
+ while (spa_json_get_string(&it[1], v, sizeof(v)) > 0 && count < max)
+ rates[count++] = atoi(v);
+ return count;
+}
+
+#ifdef __cplusplus
+} /* extern "C" */
+#endif
+
+#endif /* SPA_AVB_PCM_H */
diff --git a/spa/plugins/avb/avb.c b/spa/plugins/avb/avb.c
new file mode 100644
index 0000000..8f67310
--- /dev/null
+++ b/spa/plugins/avb/avb.c
@@ -0,0 +1,54 @@
+/* Spa AVB support
+ *
+ * Copyright © 2022 Wim Taymans
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice (including the next
+ * paragraph) shall be included in all copies or substantial portions of the
+ * Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#include <errno.h>
+
+#include <spa/support/plugin.h>
+#include <spa/support/log.h>
+
+extern const struct spa_handle_factory spa_avb_sink_factory;
+extern const struct spa_handle_factory spa_avb_source_factory;
+
+struct spa_log_topic log_topic = SPA_LOG_TOPIC(0, "spa.avb");
+struct spa_log_topic *avb_log_topic = &log_topic;
+
+SPA_EXPORT
+int spa_handle_factory_enum(const struct spa_handle_factory **factory, uint32_t *index)
+{
+ spa_return_val_if_fail(factory != NULL, -EINVAL);
+ spa_return_val_if_fail(index != NULL, -EINVAL);
+
+ switch (*index) {
+ case 0:
+ *factory = &spa_avb_sink_factory;
+ break;
+ case 1:
+ *factory = &spa_avb_source_factory;
+ break;
+ default:
+ return 0;
+ }
+ (*index)++;
+ return 1;
+}
diff --git a/spa/plugins/avb/avb.h b/spa/plugins/avb/avb.h
new file mode 100644
index 0000000..a99a0fe
--- /dev/null
+++ b/spa/plugins/avb/avb.h
@@ -0,0 +1,39 @@
+/* Spa AVB
+ *
+ * Copyright © 2022 Wim Taymans
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice (including the next
+ * paragraph) shall be included in all copies or substantial portions of the
+ * Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#ifndef SPA_AVB_H
+#define SPA_AVB_H
+
+#include <spa/support/log.h>
+
+#undef SPA_LOG_TOPIC_DEFAULT
+#define SPA_LOG_TOPIC_DEFAULT avb_log_topic
+extern struct spa_log_topic *avb_log_topic;
+
+static inline void avb_log_topic_init(struct spa_log *log)
+{
+ spa_log_topic_init(log, avb_log_topic);
+}
+
+#endif /* SPA_AVB_H */
diff --git a/spa/plugins/avb/avbtp/packets.h b/spa/plugins/avb/avbtp/packets.h
new file mode 100644
index 0000000..3d4a652
--- /dev/null
+++ b/spa/plugins/avb/avbtp/packets.h
@@ -0,0 +1,220 @@
+/* Spa AVB support
+ *
+ * Copyright © 2022 Wim Taymans
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice (including the next
+ * paragraph) shall be included in all copies or substantial portions of the
+ * Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#ifndef SPA_AVB_PACKETS_H
+#define SPA_AVB_PACKETS_H
+
+#define SPA_AVBTP_SUBTYPE_61883_IIDC 0x00
+#define SPA_AVBTP_SUBTYPE_MMA_STREAM 0x01
+#define SPA_AVBTP_SUBTYPE_AAF 0x02
+#define SPA_AVBTP_SUBTYPE_CVF 0x03
+#define SPA_AVBTP_SUBTYPE_CRF 0x04
+#define SPA_AVBTP_SUBTYPE_TSCF 0x05
+#define SPA_AVBTP_SUBTYPE_SVF 0x06
+#define SPA_AVBTP_SUBTYPE_RVF 0x07
+#define SPA_AVBTP_SUBTYPE_AEF_CONTINUOUS 0x6E
+#define SPA_AVBTP_SUBTYPE_VSF_STREAM 0x6F
+#define SPA_AVBTP_SUBTYPE_EF_STREAM 0x7F
+#define SPA_AVBTP_SUBTYPE_NTSCF 0x82
+#define SPA_AVBTP_SUBTYPE_ESCF 0xEC
+#define SPA_AVBTP_SUBTYPE_EECF 0xED
+#define SPA_AVBTP_SUBTYPE_AEF_DISCRETE 0xEE
+#define SPA_AVBTP_SUBTYPE_ADP 0xFA
+#define SPA_AVBTP_SUBTYPE_AECP 0xFB
+#define SPA_AVBTP_SUBTYPE_ACMP 0xFC
+#define SPA_AVBTP_SUBTYPE_MAAP 0xFE
+#define SPA_AVBTP_SUBTYPE_EF_CONTROL 0xFF
+
+struct spa_avbtp_packet_common {
+ uint8_t subtype;
+#if __BYTE_ORDER == __BIG_ENDIAN
+ unsigned sv:1; /* stream_id valid */
+ unsigned version:3;
+ unsigned subtype_data1:4;
+#elif __BYTE_ORDER == __LITTLE_ENDIAN
+ unsigned subtype_data1:4;
+ unsigned version:3;
+ unsigned sv:1;
+#elif
+#error "Unknown byte order"
+#endif
+ uint16_t subtype_data2;
+ uint64_t stream_id;
+ uint8_t payload[0];
+} __attribute__ ((__packed__));
+
+#define SPA_AVBTP_PACKET_SET_SUBTYPE(p,v) ((p)->subtype = (v))
+#define SPA_AVBTP_PACKET_SET_SV(p,v) ((p)->sv = (v))
+#define SPA_AVBTP_PACKET_SET_VERSION(p,v) ((p)->version = (v))
+#define SPA_AVBTP_PACKET_SET_STREAM_ID(p,v) ((p)->stream_id = htobe64(v))
+
+#define SPA_AVBTP_PACKET_GET_SUBTYPE(p) ((p)->subtype)
+#define SPA_AVBTP_PACKET_GET_SV(p) ((p)->sv)
+#define SPA_AVBTP_PACKET_GET_VERSION(p) ((p)->version)
+#define SPA_AVBTP_PACKET_GET_STREAM_ID(p) be64toh((p)->stream_id)
+
+struct spa_avbtp_packet_cc {
+ uint8_t subtype;
+#if __BYTE_ORDER == __BIG_ENDIAN
+ unsigned sv:1;
+ unsigned version:3;
+ unsigned control_data1:4;
+#elif __BYTE_ORDER == __LITTLE_ENDIAN
+ unsigned control_data1:4;
+ unsigned version:3;
+ unsigned sv:1;
+#endif
+ uint8_t status;
+ uint16_t control_frame_length;
+ uint64_t stream_id;
+ uint8_t payload[0];
+} __attribute__ ((__packed__));
+
+#define SPA_AVBTP_PACKET_CC_SET_SUBTYPE(p,v) ((p)->subtype = (v))
+#define SPA_AVBTP_PACKET_CC_SET_SV(p,v) ((p)->sv = (v))
+#define SPA_AVBTP_PACKET_CC_SET_VERSION(p,v) ((p)->version = (v))
+#define SPA_AVBTP_PACKET_CC_SET_STREAM_ID(p,v) ((p)->stream_id = htobe64(v))
+#define SPA_AVBTP_PACKET_CC_SET_STATUS(p,v) ((p)->status = (v))
+#define SPA_AVBTP_PACKET_CC_SET_LENGTH(p,v) ((p)->control_frame_length = htons(v))
+
+#define SPA_AVBTP_PACKET_CC_GET_SUBTYPE(p) ((p)->subtype)
+#define SPA_AVBTP_PACKET_CC_GET_SV(p) ((p)->sv)
+#define SPA_AVBTP_PACKET_CC_GET_VERSION(p) ((p)->version)
+#define SPA_AVBTP_PACKET_CC_GET_STREAM_ID(p) be64toh((p)->stream_id)
+#define SPA_AVBTP_PACKET_CC_GET_STATUS(p) ((p)->status)
+#define SPA_AVBTP_PACKET_CC_GET_LENGTH(p) ntohs((p)->control_frame_length)
+
+/* AAF */
+struct spa_avbtp_packet_aaf {
+ uint8_t subtype;
+#if __BYTE_ORDER == __BIG_ENDIAN
+ unsigned sv:1;
+ unsigned version:3;
+ unsigned mr:1;
+ unsigned _r1:1;
+ unsigned gv:1;
+ unsigned tv:1;
+
+ uint8_t seq_num;
+
+ unsigned _r2:7;
+ unsigned tu:1;
+#elif __BYTE_ORDER == __LITTLE_ENDIAN
+ unsigned tv:1;
+ unsigned gv:1;
+ unsigned _r1:1;
+ unsigned mr:1;
+ unsigned version:3;
+ unsigned sv:1;
+
+ uint8_t seq_num;
+
+ unsigned tu:1;
+ unsigned _r2:7;
+#endif
+ uint64_t stream_id;
+ uint32_t timestamp;
+#define SPA_AVBTP_AAF_FORMAT_USER 0x00
+#define SPA_AVBTP_AAF_FORMAT_FLOAT_32BIT 0x01
+#define SPA_AVBTP_AAF_FORMAT_INT_32BIT 0x02
+#define SPA_AVBTP_AAF_FORMAT_INT_24BIT 0x03
+#define SPA_AVBTP_AAF_FORMAT_INT_16BIT 0x04
+#define SPA_AVBTP_AAF_FORMAT_AES3_32BIT 0x05
+ uint8_t format;
+
+#define SPA_AVBTP_AAF_PCM_NSR_USER 0x00
+#define SPA_AVBTP_AAF_PCM_NSR_8KHZ 0x01
+#define SPA_AVBTP_AAF_PCM_NSR_16KHZ 0x02
+#define SPA_AVBTP_AAF_PCM_NSR_32KHZ 0x03
+#define SPA_AVBTP_AAF_PCM_NSR_44_1KHZ 0x04
+#define SPA_AVBTP_AAF_PCM_NSR_48KHZ 0x05
+#define SPA_AVBTP_AAF_PCM_NSR_88_2KHZ 0x06
+#define SPA_AVBTP_AAF_PCM_NSR_96KHZ 0x07
+#define SPA_AVBTP_AAF_PCM_NSR_176_4KHZ 0x08
+#define SPA_AVBTP_AAF_PCM_NSR_192KHZ 0x09
+#define SPA_AVBTP_AAF_PCM_NSR_24KHZ 0x0A
+#if __BYTE_ORDER == __BIG_ENDIAN
+ unsigned nsr:4;
+ unsigned _r3:4;
+#elif __BYTE_ORDER == __LITTLE_ENDIAN
+ unsigned _r3:4;
+ unsigned nsr:4;
+#endif
+ uint8_t chan_per_frame;
+ uint8_t bit_depth;
+ uint16_t data_len;
+
+#define SPA_AVBTP_AAF_PCM_SP_NORMAL 0x00
+#define SPA_AVBTP_AAF_PCM_SP_SPARSE 0x01
+#if __BYTE_ORDER == __BIG_ENDIAN
+ unsigned _r4:3;
+ unsigned sp:1;
+ unsigned event:4;
+#elif __BYTE_ORDER == __LITTLE_ENDIAN
+ unsigned event:4;
+ unsigned sp:1;
+ unsigned _r4:3;
+#endif
+ uint8_t _r5;
+ uint8_t payload[0];
+} __attribute__ ((__packed__));
+
+#define SPA_AVBTP_PACKET_AAF_SET_SUBTYPE(p,v) ((p)->subtype = (v))
+#define SPA_AVBTP_PACKET_AAF_SET_SV(p,v) ((p)->sv = (v))
+#define SPA_AVBTP_PACKET_AAF_SET_VERSION(p,v) ((p)->version = (v))
+#define SPA_AVBTP_PACKET_AAF_SET_MR(p,v) ((p)->mr = (v))
+#define SPA_AVBTP_PACKET_AAF_SET_GV(p,v) ((p)->gv = (v))
+#define SPA_AVBTP_PACKET_AAF_SET_TV(p,v) ((p)->tv = (v))
+#define SPA_AVBTP_PACKET_AAF_SET_SEQ_NUM(p,v) ((p)->seq_num = (v))
+#define SPA_AVBTP_PACKET_AAF_SET_TU(p,v) ((p)->tu = (v))
+#define SPA_AVBTP_PACKET_AAF_SET_STREAM_ID(p,v) ((p)->stream_id = htobe64(v))
+#define SPA_AVBTP_PACKET_AAF_SET_TIMESTAMP(p,v) ((p)->timestamp = htonl(v))
+#define SPA_AVBTP_PACKET_AAF_SET_DATA_LEN(p,v) ((p)->data_len = htons(v))
+#define SPA_AVBTP_PACKET_AAF_SET_FORMAT(p,v) ((p)->format = (v))
+#define SPA_AVBTP_PACKET_AAF_SET_NSR(p,v) ((p)->nsr = (v))
+#define SPA_AVBTP_PACKET_AAF_SET_CHAN_PER_FRAME(p,v) ((p)->chan_per_frame = (v))
+#define SPA_AVBTP_PACKET_AAF_SET_BIT_DEPTH(p,v) ((p)->bit_depth = (v))
+#define SPA_AVBTP_PACKET_AAF_SET_SP(p,v) ((p)->sp = (v))
+#define SPA_AVBTP_PACKET_AAF_SET_EVENT(p,v) ((p)->event = (v))
+
+#define SPA_AVBTP_PACKET_AAF_GET_SUBTYPE(p) ((p)->subtype)
+#define SPA_AVBTP_PACKET_AAF_GET_SV(p) ((p)->sv)
+#define SPA_AVBTP_PACKET_AAF_GET_VERSION(p) ((p)->version)
+#define SPA_AVBTP_PACKET_AAF_GET_MR(p) ((p)->mr)
+#define SPA_AVBTP_PACKET_AAF_GET_GV(p) ((p)->gv)
+#define SPA_AVBTP_PACKET_AAF_GET_TV(p) ((p)->tv)
+#define SPA_AVBTP_PACKET_AAF_GET_SEQ_NUM(p) ((p)->seq_num)
+#define SPA_AVBTP_PACKET_AAF_GET_TU(p) ((p)->tu)
+#define SPA_AVBTP_PACKET_AAF_GET_STREAM_ID(p) be64toh((p)->stream_id)
+#define SPA_AVBTP_PACKET_AAF_GET_TIMESTAMP(p) ntohl((p)->timestamp)
+#define SPA_AVBTP_PACKET_AAF_GET_DATA_LEN(p) ntohs((p)->data_len)
+#define SPA_AVBTP_PACKET_AAF_GET_FORMAT(p) ((p)->format)
+#define SPA_AVBTP_PACKET_AAF_GET_NSR(p) ((p)->nsr)
+#define SPA_AVBTP_PACKET_AAF_GET_CHAN_PER_FRAME(p) ((p)->chan_per_frame)
+#define SPA_AVBTP_PACKET_AAF_GET_BIT_DEPTH(p) ((p)->bit_depth)
+#define SPA_AVBTP_PACKET_AAF_GET_SP(p) ((p)->sp)
+#define SPA_AVBTP_PACKET_AAF_GET_EVENT(p) ((p)->event)
+
+
+#endif /* SPA_AVB_PACKETS_H */
diff --git a/spa/plugins/avb/meson.build b/spa/plugins/avb/meson.build
new file mode 100644
index 0000000..2d9759e
--- /dev/null
+++ b/spa/plugins/avb/meson.build
@@ -0,0 +1,14 @@
+spa_avb_sources = ['avb.c',
+ 'avb.h',
+ 'avb-pcm-sink.c',
+ 'avb-pcm-source.c',
+ 'avb-pcm.c' ]
+
+spa_avb = shared_library(
+ 'spa-avb',
+ [ spa_avb_sources ],
+ include_directories : [configinc],
+ dependencies : [ spa_dep, mathlib, epoll_shim_dep ],
+ install : true,
+ install_dir : spa_plugindir / 'avb'
+)