diff options
Diffstat (limited to 'src/gst/gstpipewirecore.c')
-rw-r--r-- | src/gst/gstpipewirecore.c | 202 |
1 files changed, 202 insertions, 0 deletions
diff --git a/src/gst/gstpipewirecore.c b/src/gst/gstpipewirecore.c new file mode 100644 index 0000000..6910f4e --- /dev/null +++ b/src/gst/gstpipewirecore.c @@ -0,0 +1,202 @@ +/* GStreamer + * + * Copyright © 2020 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include "config.h" +#include <unistd.h> +#include <fcntl.h> + +#include <spa/utils/result.h> + +#include "gstpipewirecore.h" + +/* a list of global cores indexed by fd. */ +G_LOCK_DEFINE_STATIC (cores_lock); +static GList *cores; + +static void +on_core_error(void *data, uint32_t id, int seq, int res, const char *message) +{ + GstPipeWireCore *core = data; + + pw_log_warn("error id:%u seq:%d res:%d (%s): %s", + id, seq, res, spa_strerror(res), message); + + if (id == PW_ID_CORE) { + core->last_error = res; + } + pw_thread_loop_signal(core->loop, FALSE); +} + +static void on_core_done (void *data, uint32_t id, int seq) +{ + GstPipeWireCore * core = data; + if (id == PW_ID_CORE) { + core->last_seq = seq; + pw_thread_loop_signal (core->loop, FALSE); + } +} + +static const struct pw_core_events core_events = { + PW_VERSION_CORE_EVENTS, + .done = on_core_done, + .error = on_core_error, +}; + +static GstPipeWireCore *make_core (int fd) +{ + GstPipeWireCore *core; + + core = g_new (GstPipeWireCore, 1); + core->refcount = 1; + core->fd = fd; + core->loop = pw_thread_loop_new ("pipewire-main-loop", NULL); + core->context = pw_context_new (pw_thread_loop_get_loop(core->loop), NULL, 0); + core->last_seq = -1; + core->last_error = 0; + GST_DEBUG ("loop %p context %p", core->loop, core->context); + + if (pw_thread_loop_start (core->loop) < 0) + goto mainloop_failed; + + pw_thread_loop_lock (core->loop); + + if (fd == -1) + core->core = pw_context_connect (core->context, NULL, 0); + else + core->core = pw_context_connect_fd (core->context, fcntl(fd, F_DUPFD_CLOEXEC, 3), NULL, 0); + + if (core->core == NULL) + goto connection_failed; + + pw_core_add_listener(core->core, + &core->core_listener, + &core_events, + core); + + pw_thread_loop_unlock (core->loop); + + return core; + +mainloop_failed: + { + GST_ERROR ("error starting mainloop"); + pw_context_destroy (core->context); + pw_thread_loop_destroy (core->loop); + g_free (core); + return NULL; + } +connection_failed: + { + GST_ERROR ("error connect: %m"); + pw_thread_loop_unlock (core->loop); + pw_context_destroy (core->context); + pw_thread_loop_destroy (core->loop); + g_free (core); + return NULL; + } +} + +typedef struct { + int fd; +} FindData; + +static gint +core_find (GstPipeWireCore * core, FindData * data) +{ + /* fd's must match */ + if (core->fd == data->fd) + return 0; + return 1; +} + +GstPipeWireCore *gst_pipewire_core_get (int fd) +{ + GstPipeWireCore *core; + FindData data; + GList *found; + + data.fd = fd; + + G_LOCK (cores_lock); + found = g_list_find_custom (cores, &data, (GCompareFunc) core_find); + if (found != NULL) { + core = (GstPipeWireCore *) found->data; + core->refcount++; + GST_DEBUG ("found core %p", core); + } else { + core = make_core(fd); + if (core != NULL) { + GST_DEBUG ("created core %p", core); + /* add to list on success */ + cores = g_list_prepend (cores, core); + } else { + GST_WARNING ("could not create core"); + } + } + G_UNLOCK (cores_lock); + + return core; +} + +static void do_sync(GstPipeWireCore * core) +{ + struct timespec abstime; + core->pending_seq = pw_core_sync(core->core, 0, core->pending_seq); + pw_thread_loop_get_time (core->loop, &abstime, + GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); + while (true) { + if (core->last_seq == core->pending_seq || core->last_error < 0) + break; + if (pw_thread_loop_timed_wait_full (core->loop, &abstime) < 0) + break; + } +} + +void gst_pipewire_core_release (GstPipeWireCore *core) +{ + gboolean zero; + + G_LOCK (cores_lock); + core->refcount--; + if ((zero = (core->refcount == 0))) { + GST_DEBUG ("closing core %p", core); + /* remove from list, we can release the mutex after removing the connection + * from the list because after that, nobody can access the connection anymore. */ + cores = g_list_remove (cores, core); + } + G_UNLOCK (cores_lock); + + if (zero) { + pw_thread_loop_lock (core->loop); + do_sync(core); + + pw_core_disconnect (core->core); + pw_thread_loop_unlock (core->loop); + pw_thread_loop_stop (core->loop); + pw_context_destroy (core->context); + pw_thread_loop_destroy (core->loop); + + free(core); + } +} |