diff options
Diffstat (limited to 'src/modules/module-avb/acmp.c')
-rw-r--r-- | src/modules/module-avb/acmp.c | 476 |
1 files changed, 476 insertions, 0 deletions
diff --git a/src/modules/module-avb/acmp.c b/src/modules/module-avb/acmp.c new file mode 100644 index 0000000..a7a409a --- /dev/null +++ b/src/modules/module-avb/acmp.c @@ -0,0 +1,476 @@ +/* AVB support + * + * Copyright © 2022 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include <spa/utils/json.h> +#include <spa/debug/mem.h> + +#include <pipewire/pipewire.h> + +#include "acmp.h" +#include "msrp.h" +#include "internal.h" +#include "stream.h" + +static const uint8_t mac[6] = AVB_BROADCAST_MAC; + +struct pending { + struct spa_list link; + uint64_t last_time; + uint64_t timeout; + uint16_t old_sequence_id; + uint16_t sequence_id; + uint16_t retry; + size_t size; + void *ptr; +}; + +struct acmp { + struct server *server; + struct spa_hook server_listener; + +#define PENDING_TALKER 0 +#define PENDING_LISTENER 1 +#define PENDING_CONTROLLER 2 + struct spa_list pending[3]; + uint16_t sequence_id[3]; +}; + +static void *pending_new(struct acmp *acmp, uint32_t type, uint64_t now, uint32_t timeout_ms, + const void *m, size_t size) +{ + struct pending *p; + struct avb_ethernet_header *h; + struct avb_packet_acmp *pm; + + p = calloc(1, sizeof(*p) + size); + if (p == NULL) + return NULL; + p->last_time = now; + p->timeout = timeout_ms * SPA_NSEC_PER_MSEC; + p->sequence_id = acmp->sequence_id[type]++; + p->size = size; + p->ptr = SPA_PTROFF(p, sizeof(*p), void); + memcpy(p->ptr, m, size); + + h = p->ptr; + pm = SPA_PTROFF(h, sizeof(*h), void); + p->old_sequence_id = ntohs(pm->sequence_id); + pm->sequence_id = htons(p->sequence_id); + spa_list_append(&acmp->pending[type], &p->link); + + return p->ptr; +} + +static struct pending *pending_find(struct acmp *acmp, uint32_t type, uint16_t sequence_id) +{ + struct pending *p; + spa_list_for_each(p, &acmp->pending[type], link) + if (p->sequence_id == sequence_id) + return p; + return NULL; +} + +static void pending_free(struct acmp *acmp, struct pending *p) +{ + spa_list_remove(&p->link); + free(p); +} + +struct msg_info { + uint16_t type; + const char *name; + int (*handle) (struct acmp *acmp, uint64_t now, const void *m, int len); +}; + +static int reply_not_supported(struct acmp *acmp, uint8_t type, const void *m, int len) +{ + struct server *server = acmp->server; + uint8_t buf[len]; + struct avb_ethernet_header *h = (void*)buf; + struct avb_packet_acmp *reply = SPA_PTROFF(h, sizeof(*h), void); + + memcpy(h, m, len); + AVB_PACKET_ACMP_SET_MESSAGE_TYPE(reply, type); + AVB_PACKET_ACMP_SET_STATUS(reply, AVB_ACMP_STATUS_NOT_SUPPORTED); + + return avb_server_send_packet(server, h->src, AVB_TSN_ETH, buf, len); +} + +static int retry_pending(struct acmp *acmp, uint64_t now, struct pending *p) +{ + struct server *server = acmp->server; + struct avb_ethernet_header *h = p->ptr; + p->retry++; + p->last_time = now; + return avb_server_send_packet(server, h->dest, AVB_TSN_ETH, p->ptr, p->size); +} + +static int handle_connect_tx_command(struct acmp *acmp, uint64_t now, const void *m, int len) +{ + struct server *server = acmp->server; + uint8_t buf[len]; + struct avb_ethernet_header *h = (void*)buf; + struct avb_packet_acmp *reply = SPA_PTROFF(h, sizeof(*h), void); + const struct avb_packet_acmp *p = SPA_PTROFF(m, sizeof(*h), void); + int status = AVB_ACMP_STATUS_SUCCESS; + struct stream *stream; + + if (be64toh(p->talker_guid) != server->entity_id) + return 0; + + memcpy(buf, m, len); + stream = server_find_stream(server, SPA_DIRECTION_OUTPUT, + reply->talker_unique_id); + if (stream == NULL) { + status = AVB_ACMP_STATUS_TALKER_NO_STREAM_INDEX; + goto done; + } + + AVB_PACKET_ACMP_SET_MESSAGE_TYPE(reply, AVB_ACMP_MESSAGE_TYPE_CONNECT_TX_RESPONSE); + reply->stream_id = htobe64(stream->id); + + stream_activate(stream, now); + + memcpy(reply->stream_dest_mac, stream->addr, 6); + reply->connection_count = htons(1); + reply->stream_vlan_id = htons(stream->vlan_id); + +done: + AVB_PACKET_ACMP_SET_STATUS(reply, status); + return avb_server_send_packet(server, h->dest, AVB_TSN_ETH, buf, len); +} + +static int handle_connect_tx_response(struct acmp *acmp, uint64_t now, const void *m, int len) +{ + struct server *server = acmp->server; + struct avb_ethernet_header *h; + const struct avb_packet_acmp *resp = SPA_PTROFF(m, sizeof(*h), void); + struct avb_packet_acmp *reply; + struct pending *pending; + uint16_t sequence_id; + struct stream *stream; + int res; + + if (be64toh(resp->listener_guid) != server->entity_id) + return 0; + + sequence_id = ntohs(resp->sequence_id); + + pending = pending_find(acmp, PENDING_TALKER, sequence_id); + if (pending == NULL) + return 0; + + h = pending->ptr; + pending->size = SPA_MIN((int)pending->size, len); + memcpy(h, m, pending->size); + + reply = SPA_PTROFF(h, sizeof(*h), void); + reply->sequence_id = htons(pending->old_sequence_id); + AVB_PACKET_ACMP_SET_MESSAGE_TYPE(reply, AVB_ACMP_MESSAGE_TYPE_CONNECT_RX_RESPONSE); + + stream = server_find_stream(server, SPA_DIRECTION_INPUT, + ntohs(reply->listener_unique_id)); + if (stream == NULL) + return 0; + + stream->peer_id = be64toh(reply->stream_id); + memcpy(stream->addr, reply->stream_dest_mac, 6); + stream_activate(stream, now); + + res = avb_server_send_packet(server, h->dest, AVB_TSN_ETH, h, pending->size); + + pending_free(acmp, pending); + + return res; +} + +static int handle_disconnect_tx_command(struct acmp *acmp, uint64_t now, const void *m, int len) +{ + struct server *server = acmp->server; + uint8_t buf[len]; + struct avb_ethernet_header *h = (void*)buf; + struct avb_packet_acmp *reply = SPA_PTROFF(h, sizeof(*h), void); + const struct avb_packet_acmp *p = SPA_PTROFF(m, sizeof(*h), void); + int status = AVB_ACMP_STATUS_SUCCESS; + struct stream *stream; + + if (be64toh(p->talker_guid) != server->entity_id) + return 0; + + memcpy(buf, m, len); + stream = server_find_stream(server, SPA_DIRECTION_OUTPUT, + reply->talker_unique_id); + if (stream == NULL) { + status = AVB_ACMP_STATUS_TALKER_NO_STREAM_INDEX; + goto done; + } + + AVB_PACKET_ACMP_SET_MESSAGE_TYPE(reply, AVB_ACMP_MESSAGE_TYPE_DISCONNECT_TX_RESPONSE); + + stream_deactivate(stream, now); + +done: + AVB_PACKET_ACMP_SET_STATUS(reply, status); + return avb_server_send_packet(server, h->dest, AVB_TSN_ETH, buf, len); +} + +static int handle_disconnect_tx_response(struct acmp *acmp, uint64_t now, const void *m, int len) +{ + struct server *server = acmp->server; + struct avb_ethernet_header *h; + struct avb_packet_acmp *reply; + const struct avb_packet_acmp *resp = SPA_PTROFF(m, sizeof(*h), void); + struct pending *pending; + uint16_t sequence_id; + struct stream *stream; + int res; + + if (be64toh(resp->listener_guid) != server->entity_id) + return 0; + + sequence_id = ntohs(resp->sequence_id); + + pending = pending_find(acmp, PENDING_TALKER, sequence_id); + if (pending == NULL) + return 0; + + h = pending->ptr; + pending->size = SPA_MIN((int)pending->size, len); + memcpy(h, m, pending->size); + + reply = SPA_PTROFF(h, sizeof(*h), void); + reply->sequence_id = htons(pending->old_sequence_id); + AVB_PACKET_ACMP_SET_MESSAGE_TYPE(reply, AVB_ACMP_MESSAGE_TYPE_DISCONNECT_RX_RESPONSE); + + stream = server_find_stream(server, SPA_DIRECTION_INPUT, + reply->listener_unique_id); + if (stream == NULL) + return 0; + + stream_deactivate(stream, now); + + res = avb_server_send_packet(server, h->dest, AVB_TSN_ETH, h, pending->size); + + pending_free(acmp, pending); + + return res; +} + +static int handle_connect_rx_command(struct acmp *acmp, uint64_t now, const void *m, int len) +{ + struct server *server = acmp->server; + struct avb_ethernet_header *h; + const struct avb_packet_acmp *p = SPA_PTROFF(m, sizeof(*h), void); + struct avb_packet_acmp *cmd; + + if (be64toh(p->listener_guid) != server->entity_id) + return 0; + + h = pending_new(acmp, PENDING_TALKER, now, + AVB_ACMP_TIMEOUT_CONNECT_TX_COMMAND_MS, m, len); + if (h == NULL) + return -errno; + + cmd = SPA_PTROFF(h, sizeof(*h), void); + AVB_PACKET_ACMP_SET_MESSAGE_TYPE(cmd, AVB_ACMP_MESSAGE_TYPE_CONNECT_TX_COMMAND); + AVB_PACKET_ACMP_SET_STATUS(cmd, AVB_ACMP_STATUS_SUCCESS); + + return avb_server_send_packet(server, h->dest, AVB_TSN_ETH, h, len); +} + +static int handle_ignore(struct acmp *acmp, uint64_t now, const void *m, int len) +{ + return 0; +} + +static int handle_disconnect_rx_command(struct acmp *acmp, uint64_t now, const void *m, int len) +{ + struct server *server = acmp->server; + struct avb_ethernet_header *h; + const struct avb_packet_acmp *p = SPA_PTROFF(m, sizeof(*h), void); + struct avb_packet_acmp *cmd; + + if (be64toh(p->listener_guid) != server->entity_id) + return 0; + + h = pending_new(acmp, PENDING_TALKER, now, + AVB_ACMP_TIMEOUT_DISCONNECT_TX_COMMAND_MS, m, len); + if (h == NULL) + return -errno; + + cmd = SPA_PTROFF(h, sizeof(*h), void); + AVB_PACKET_ACMP_SET_MESSAGE_TYPE(cmd, AVB_ACMP_MESSAGE_TYPE_DISCONNECT_TX_COMMAND); + AVB_PACKET_ACMP_SET_STATUS(cmd, AVB_ACMP_STATUS_SUCCESS); + + return avb_server_send_packet(server, h->dest, AVB_TSN_ETH, h, len); +} + +static const struct msg_info msg_info[] = { + { AVB_ACMP_MESSAGE_TYPE_CONNECT_TX_COMMAND, "connect-tx-command", handle_connect_tx_command, }, + { AVB_ACMP_MESSAGE_TYPE_CONNECT_TX_RESPONSE, "connect-tx-response", handle_connect_tx_response, }, + { AVB_ACMP_MESSAGE_TYPE_DISCONNECT_TX_COMMAND, "disconnect-tx-command", handle_disconnect_tx_command, }, + { AVB_ACMP_MESSAGE_TYPE_DISCONNECT_TX_RESPONSE, "disconnect-tx-response", handle_disconnect_tx_response, }, + { AVB_ACMP_MESSAGE_TYPE_GET_TX_STATE_COMMAND, "get-tx-state-command", NULL, }, + { AVB_ACMP_MESSAGE_TYPE_GET_TX_STATE_RESPONSE, "get-tx-state-response", handle_ignore, }, + { AVB_ACMP_MESSAGE_TYPE_CONNECT_RX_COMMAND, "connect-rx-command", handle_connect_rx_command, }, + { AVB_ACMP_MESSAGE_TYPE_CONNECT_RX_RESPONSE, "connect-rx-response", handle_ignore, }, + { AVB_ACMP_MESSAGE_TYPE_DISCONNECT_RX_COMMAND, "disconnect-rx-command", handle_disconnect_rx_command, }, + { AVB_ACMP_MESSAGE_TYPE_DISCONNECT_RX_RESPONSE, "disconnect-rx-response", handle_ignore, }, + { AVB_ACMP_MESSAGE_TYPE_GET_RX_STATE_COMMAND, "get-rx-state-command", NULL, }, + { AVB_ACMP_MESSAGE_TYPE_GET_RX_STATE_RESPONSE, "get-rx-state-response", handle_ignore, }, + { AVB_ACMP_MESSAGE_TYPE_GET_TX_CONNECTION_COMMAND, "get-tx-connection-command", NULL, }, + { AVB_ACMP_MESSAGE_TYPE_GET_TX_CONNECTION_RESPONSE, "get-tx-connection-response", handle_ignore, }, +}; + +static inline const struct msg_info *find_msg_info(uint16_t type, const char *name) +{ + SPA_FOR_EACH_ELEMENT_VAR(msg_info, i) { + if ((name == NULL && type == i->type) || + (name != NULL && spa_streq(name, i->name))) + return i; + } + return NULL; +} + +static int acmp_message(void *data, uint64_t now, const void *message, int len) +{ + struct acmp *acmp = data; + struct server *server = acmp->server; + const struct avb_ethernet_header *h = message; + const struct avb_packet_acmp *p = SPA_PTROFF(h, sizeof(*h), void); + const struct msg_info *info; + int message_type; + + if (ntohs(h->type) != AVB_TSN_ETH) + return 0; + if (memcmp(h->dest, mac, 6) != 0 && + memcmp(h->dest, server->mac_addr, 6) != 0) + return 0; + + if (AVB_PACKET_GET_SUBTYPE(&p->hdr) != AVB_SUBTYPE_ACMP) + return 0; + + message_type = AVB_PACKET_ACMP_GET_MESSAGE_TYPE(p); + + info = find_msg_info(message_type, NULL); + if (info == NULL) + return 0; + + pw_log_info("got ACMP message %s", info->name); + + if (info->handle == NULL) + return reply_not_supported(acmp, message_type | 1, message, len); + + return info->handle(acmp, now, message, len); +} + +static void acmp_destroy(void *data) +{ + struct acmp *acmp = data; + spa_hook_remove(&acmp->server_listener); + free(acmp); +} + +static void check_timeout(struct acmp *acmp, uint64_t now, uint16_t type) +{ + struct pending *p, *t; + + spa_list_for_each_safe(p, t, &acmp->pending[type], link) { + if (p->last_time + p->timeout > now) + continue; + + if (p->retry == 0) { + pw_log_info("%p: pending timeout, retry", p); + retry_pending(acmp, now, p); + } else { + pw_log_info("%p: pending timeout, fail", p); + pending_free(acmp, p); + } + } +} +static void acmp_periodic(void *data, uint64_t now) +{ + struct acmp *acmp = data; + check_timeout(acmp, now, PENDING_TALKER); + check_timeout(acmp, now, PENDING_LISTENER); + check_timeout(acmp, now, PENDING_CONTROLLER); +} + +static int do_help(struct acmp *acmp, const char *args, FILE *out) +{ + fprintf(out, "{ \"type\": \"help\"," + "\"text\": \"" + "/adp/help: this help \\n" + "\" }"); + return 0; +} + +static int acmp_command(void *data, uint64_t now, const char *command, const char *args, FILE *out) +{ + struct acmp *acmp = data; + int res; + + if (!spa_strstartswith(command, "/acmp/")) + return 0; + + command += strlen("/acmp/"); + + if (spa_streq(command, "help")) + res = do_help(acmp, args, out); + else + res = -ENOTSUP; + + return res; +} + +static const struct server_events server_events = { + AVB_VERSION_SERVER_EVENTS, + .destroy = acmp_destroy, + .message = acmp_message, + .periodic = acmp_periodic, + .command = acmp_command +}; + +struct avb_acmp *avb_acmp_register(struct server *server) +{ + struct acmp *acmp; + + acmp = calloc(1, sizeof(*acmp)); + if (acmp == NULL) + return NULL; + + acmp->server = server; + spa_list_init(&acmp->pending[PENDING_TALKER]); + spa_list_init(&acmp->pending[PENDING_LISTENER]); + spa_list_init(&acmp->pending[PENDING_CONTROLLER]); + + avdecc_server_add_listener(server, &acmp->server_listener, &server_events, acmp); + + return (struct avb_acmp*)acmp; +} + +void avb_acmp_unregister(struct avb_acmp *acmp) +{ + acmp_destroy(acmp); +} |