diff options
Diffstat (limited to 'src/gst')
-rw-r--r-- | src/gst/.editorconfig | 7 | ||||
-rw-r--r-- | src/gst/gstpipewire.c | 69 | ||||
-rw-r--r-- | src/gst/gstpipewireclock.c | 127 | ||||
-rw-r--r-- | src/gst/gstpipewireclock.h | 71 | ||||
-rw-r--r-- | src/gst/gstpipewirecore.c | 202 | ||||
-rw-r--r-- | src/gst/gstpipewirecore.h | 60 | ||||
-rw-r--r-- | src/gst/gstpipewiredeviceprovider.c | 754 | ||||
-rw-r--r-- | src/gst/gstpipewiredeviceprovider.h | 110 | ||||
-rw-r--r-- | src/gst/gstpipewireformat.c | 981 | ||||
-rw-r--r-- | src/gst/gstpipewireformat.h | 42 | ||||
-rw-r--r-- | src/gst/gstpipewirepool.c | 276 | ||||
-rw-r--r-- | src/gst/gstpipewirepool.h | 92 | ||||
-rw-r--r-- | src/gst/gstpipewiresink.c | 924 | ||||
-rw-r--r-- | src/gst/gstpipewiresink.h | 110 | ||||
-rw-r--r-- | src/gst/gstpipewiresrc.c | 1439 | ||||
-rw-r--r-- | src/gst/gstpipewiresrc.h | 110 | ||||
-rw-r--r-- | src/gst/meson.build | 33 |
17 files changed, 5407 insertions, 0 deletions
diff --git a/src/gst/.editorconfig b/src/gst/.editorconfig new file mode 100644 index 0000000..b6330b2 --- /dev/null +++ b/src/gst/.editorconfig @@ -0,0 +1,7 @@ +[*] +indent_style = space +indent_size = 2 +end_of_line = lf +charset = utf-8 +trim_trailing_whitespace = true +insert_final_newline = true diff --git a/src/gst/gstpipewire.c b/src/gst/gstpipewire.c new file mode 100644 index 0000000..b87f8a3 --- /dev/null +++ b/src/gst/gstpipewire.c @@ -0,0 +1,69 @@ +/* PipeWire GStreamer Elements + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +/** + * SECTION:element-pipewiresrc + * + * <refsect2> + * <title>Example launch line</title> + * |[ + * gst-launch -v pipewiresrc ! ximagesink + * ]| Shows PipeWire output in an X window. + * </refsect2> + */ + +#include "config.h" + +#include "gstpipewiresrc.h" +#include "gstpipewiresink.h" +#include "gstpipewiredeviceprovider.h" + +GST_DEBUG_CATEGORY (pipewire_debug); + +static gboolean +plugin_init (GstPlugin *plugin) +{ + pw_init (NULL, NULL); + + gst_element_register (plugin, "pipewiresrc", GST_RANK_PRIMARY + 1, + GST_TYPE_PIPEWIRE_SRC); + gst_element_register (plugin, "pipewiresink", GST_RANK_NONE, + GST_TYPE_PIPEWIRE_SINK); + +#ifdef HAVE_GSTREAMER_DEVICE_PROVIDER + if (!gst_device_provider_register (plugin, "pipewiredeviceprovider", + GST_RANK_PRIMARY + 1, GST_TYPE_PIPEWIRE_DEVICE_PROVIDER)) + return FALSE; +#endif + + GST_DEBUG_CATEGORY_INIT (pipewire_debug, "pipewire", 0, "PipeWire elements"); + + return TRUE; +} + +GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, + GST_VERSION_MINOR, + pipewire, + "Uses PipeWire to handle media streams", + plugin_init, PACKAGE_VERSION, "MIT/X11", "pipewire", "pipewire.org") diff --git a/src/gst/gstpipewireclock.c b/src/gst/gstpipewireclock.c new file mode 100644 index 0000000..44b1158 --- /dev/null +++ b/src/gst/gstpipewireclock.c @@ -0,0 +1,127 @@ +/* GStreamer + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include "config.h" + +#include <gst/gst.h> + +#include "gstpipewireclock.h" + +GST_DEBUG_CATEGORY_STATIC (gst_pipewire_clock_debug_category); +#define GST_CAT_DEFAULT gst_pipewire_clock_debug_category + +G_DEFINE_TYPE (GstPipeWireClock, gst_pipewire_clock, GST_TYPE_SYSTEM_CLOCK); + +GstClock * +gst_pipewire_clock_new (struct pw_stream *stream, GstClockTime last_time) +{ + GstPipeWireClock *clock; + + clock = g_object_new (GST_TYPE_PIPEWIRE_CLOCK, NULL); + clock->stream = stream; + clock->last_time = last_time; + clock->time_offset = last_time; + + return GST_CLOCK_CAST (clock); +} + +static GstClockTime +gst_pipewire_clock_get_internal_time (GstClock * clock) +{ + GstPipeWireClock *pclock = (GstPipeWireClock *) clock; + GstClockTime result; + struct timespec ts; + + clock_gettime(CLOCK_MONOTONIC, &ts); +#if 0 + struct pw_time t; + if (pclock->stream == NULL || + pw_stream_get_time (pclock->stream, &t) < 0 || + t.rate.denom == 0) + return pclock->last_time; + + result = gst_util_uint64_scale_int (t.ticks, GST_SECOND * t.rate.num, t.rate.denom); + result += SPA_TIMESPEC_TO_NSEC(&ts) - t.now; + + result += pclock->time_offset; + pclock->last_time = result; + + GST_DEBUG ("%"PRId64", %d/%d %"PRId64" %"PRId64, + t.ticks, t.rate.num, t.rate.denom, t.now, result); +#else + result = SPA_TIMESPEC_TO_NSEC(&ts); + result += pclock->time_offset; + pclock->last_time = result; +#endif + + return result; +} + +static void +gst_pipewire_clock_finalize (GObject * object) +{ + GstPipeWireClock *clock = GST_PIPEWIRE_CLOCK (object); + + GST_DEBUG_OBJECT (clock, "finalize"); + + G_OBJECT_CLASS (gst_pipewire_clock_parent_class)->finalize (object); +} + +static void +gst_pipewire_clock_class_init (GstPipeWireClockClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstClockClass *gstclock_class = GST_CLOCK_CLASS (klass); + + gobject_class->finalize = gst_pipewire_clock_finalize; + + gstclock_class->get_internal_time = gst_pipewire_clock_get_internal_time; + + GST_DEBUG_CATEGORY_INIT (gst_pipewire_clock_debug_category, "pipewireclock", 0, + "debug category for pipewireclock object"); +} + +static void +gst_pipewire_clock_init (GstPipeWireClock * clock) +{ + GST_OBJECT_FLAG_SET (clock, GST_CLOCK_FLAG_CAN_SET_MASTER); +} + +void +gst_pipewire_clock_reset (GstPipeWireClock * clock, GstClockTime time) +{ + GstClockTimeDiff time_offset; + + if (clock->last_time >= time) + time_offset = clock->last_time - time; + else + time_offset = -(time - clock->last_time); + + clock->time_offset = time_offset; + + GST_DEBUG_OBJECT (clock, + "reset clock to %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT + ", offset %" GST_STIME_FORMAT, GST_TIME_ARGS (time), + GST_TIME_ARGS (clock->last_time), GST_STIME_ARGS (time_offset)); +} diff --git a/src/gst/gstpipewireclock.h b/src/gst/gstpipewireclock.h new file mode 100644 index 0000000..0a3e447 --- /dev/null +++ b/src/gst/gstpipewireclock.h @@ -0,0 +1,71 @@ +/* GStreamer + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef __GST_PIPEWIRE_CLOCK_H__ +#define __GST_PIPEWIRE_CLOCK_H__ + +#include <gst/gst.h> + +#include <pipewire/pipewire.h> + +G_BEGIN_DECLS + +#define GST_TYPE_PIPEWIRE_CLOCK \ + (gst_pipewire_clock_get_type()) +#define GST_PIPEWIRE_CLOCK(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_PIPEWIRE_CLOCK,GstPipeWireClock)) +#define GST_PIPEWIRE_CLOCK_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_PIPEWIRE_CLOCK,GstPipeWireClockClass)) +#define GST_IS_PIPEWIRE_CLOCK(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PIPEWIRE_CLOCK)) +#define GST_IS_PIPEWIRE_CLOCK_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PIPEWIRE_CLOCK)) +#define GST_PIPEWIRE_CLOCK_GET_CLASS(klass) \ + (G_TYPE_INSTANCE_GET_CLASS ((klass), GST_TYPE_PIPEWIRE_CLOCK, GstPipeWireClockClass)) + +typedef struct _GstPipeWireClock GstPipeWireClock; +typedef struct _GstPipeWireClockClass GstPipeWireClockClass; + +struct _GstPipeWireClock { + GstSystemClock parent; + + struct pw_stream *stream; + GstClockTime last_time; + GstClockTimeDiff time_offset; +}; + +struct _GstPipeWireClockClass { + GstSystemClockClass parent_class; +}; + +GType gst_pipewire_clock_get_type (void); + +GstClock * gst_pipewire_clock_new (struct pw_stream *stream, + GstClockTime last_time); +void gst_pipewire_clock_reset (GstPipeWireClock *clock, + GstClockTime time); + +G_END_DECLS + +#endif /* __GST_PIPEWIRE_CLOCK_H__ */ diff --git a/src/gst/gstpipewirecore.c b/src/gst/gstpipewirecore.c new file mode 100644 index 0000000..6910f4e --- /dev/null +++ b/src/gst/gstpipewirecore.c @@ -0,0 +1,202 @@ +/* GStreamer + * + * Copyright © 2020 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include "config.h" +#include <unistd.h> +#include <fcntl.h> + +#include <spa/utils/result.h> + +#include "gstpipewirecore.h" + +/* a list of global cores indexed by fd. */ +G_LOCK_DEFINE_STATIC (cores_lock); +static GList *cores; + +static void +on_core_error(void *data, uint32_t id, int seq, int res, const char *message) +{ + GstPipeWireCore *core = data; + + pw_log_warn("error id:%u seq:%d res:%d (%s): %s", + id, seq, res, spa_strerror(res), message); + + if (id == PW_ID_CORE) { + core->last_error = res; + } + pw_thread_loop_signal(core->loop, FALSE); +} + +static void on_core_done (void *data, uint32_t id, int seq) +{ + GstPipeWireCore * core = data; + if (id == PW_ID_CORE) { + core->last_seq = seq; + pw_thread_loop_signal (core->loop, FALSE); + } +} + +static const struct pw_core_events core_events = { + PW_VERSION_CORE_EVENTS, + .done = on_core_done, + .error = on_core_error, +}; + +static GstPipeWireCore *make_core (int fd) +{ + GstPipeWireCore *core; + + core = g_new (GstPipeWireCore, 1); + core->refcount = 1; + core->fd = fd; + core->loop = pw_thread_loop_new ("pipewire-main-loop", NULL); + core->context = pw_context_new (pw_thread_loop_get_loop(core->loop), NULL, 0); + core->last_seq = -1; + core->last_error = 0; + GST_DEBUG ("loop %p context %p", core->loop, core->context); + + if (pw_thread_loop_start (core->loop) < 0) + goto mainloop_failed; + + pw_thread_loop_lock (core->loop); + + if (fd == -1) + core->core = pw_context_connect (core->context, NULL, 0); + else + core->core = pw_context_connect_fd (core->context, fcntl(fd, F_DUPFD_CLOEXEC, 3), NULL, 0); + + if (core->core == NULL) + goto connection_failed; + + pw_core_add_listener(core->core, + &core->core_listener, + &core_events, + core); + + pw_thread_loop_unlock (core->loop); + + return core; + +mainloop_failed: + { + GST_ERROR ("error starting mainloop"); + pw_context_destroy (core->context); + pw_thread_loop_destroy (core->loop); + g_free (core); + return NULL; + } +connection_failed: + { + GST_ERROR ("error connect: %m"); + pw_thread_loop_unlock (core->loop); + pw_context_destroy (core->context); + pw_thread_loop_destroy (core->loop); + g_free (core); + return NULL; + } +} + +typedef struct { + int fd; +} FindData; + +static gint +core_find (GstPipeWireCore * core, FindData * data) +{ + /* fd's must match */ + if (core->fd == data->fd) + return 0; + return 1; +} + +GstPipeWireCore *gst_pipewire_core_get (int fd) +{ + GstPipeWireCore *core; + FindData data; + GList *found; + + data.fd = fd; + + G_LOCK (cores_lock); + found = g_list_find_custom (cores, &data, (GCompareFunc) core_find); + if (found != NULL) { + core = (GstPipeWireCore *) found->data; + core->refcount++; + GST_DEBUG ("found core %p", core); + } else { + core = make_core(fd); + if (core != NULL) { + GST_DEBUG ("created core %p", core); + /* add to list on success */ + cores = g_list_prepend (cores, core); + } else { + GST_WARNING ("could not create core"); + } + } + G_UNLOCK (cores_lock); + + return core; +} + +static void do_sync(GstPipeWireCore * core) +{ + struct timespec abstime; + core->pending_seq = pw_core_sync(core->core, 0, core->pending_seq); + pw_thread_loop_get_time (core->loop, &abstime, + GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); + while (true) { + if (core->last_seq == core->pending_seq || core->last_error < 0) + break; + if (pw_thread_loop_timed_wait_full (core->loop, &abstime) < 0) + break; + } +} + +void gst_pipewire_core_release (GstPipeWireCore *core) +{ + gboolean zero; + + G_LOCK (cores_lock); + core->refcount--; + if ((zero = (core->refcount == 0))) { + GST_DEBUG ("closing core %p", core); + /* remove from list, we can release the mutex after removing the connection + * from the list because after that, nobody can access the connection anymore. */ + cores = g_list_remove (cores, core); + } + G_UNLOCK (cores_lock); + + if (zero) { + pw_thread_loop_lock (core->loop); + do_sync(core); + + pw_core_disconnect (core->core); + pw_thread_loop_unlock (core->loop); + pw_thread_loop_stop (core->loop); + pw_context_destroy (core->context); + pw_thread_loop_destroy (core->loop); + + free(core); + } +} diff --git a/src/gst/gstpipewirecore.h b/src/gst/gstpipewirecore.h new file mode 100644 index 0000000..c64d466 --- /dev/null +++ b/src/gst/gstpipewirecore.h @@ -0,0 +1,60 @@ +/* GStreamer + * + * Copyright © 2020 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef __GST_PIPEWIRE_CORE_H__ +#define __GST_PIPEWIRE_CORE_H__ + +#include <gst/gst.h> + +#include <pipewire/pipewire.h> + +G_BEGIN_DECLS + +typedef struct _GstPipeWireCore GstPipeWireCore; + +#define GST_PIPEWIRE_DEFAULT_TIMEOUT 30 + +/** + * GstPipeWireCore: + * + * Opaque data structure. + */ +struct _GstPipeWireCore { + gint refcount; + int fd; + struct pw_thread_loop *loop; + struct pw_context *context; + struct pw_core *core; + struct spa_hook core_listener; + int last_error; + int last_seq; + int pending_seq; +}; + +GstPipeWireCore *gst_pipewire_core_get (int fd); +void gst_pipewire_core_release (GstPipeWireCore *core); + +G_END_DECLS + +#endif /* __GST_PIPEWIRE_CORE_H__ */ diff --git a/src/gst/gstpipewiredeviceprovider.c b/src/gst/gstpipewiredeviceprovider.c new file mode 100644 index 0000000..598a7c5 --- /dev/null +++ b/src/gst/gstpipewiredeviceprovider.c @@ -0,0 +1,754 @@ +/* GStreamer + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include "config.h" + +#include <string.h> + +#include <spa/utils/result.h> +#include <spa/utils/string.h> + +#include <gst/gst.h> + +#include "gstpipewireformat.h" +#include "gstpipewiredeviceprovider.h" +#include "gstpipewiresrc.h" +#include "gstpipewiresink.h" + +GST_DEBUG_CATEGORY_EXTERN (pipewire_debug); +#define GST_CAT_DEFAULT pipewire_debug + +G_DEFINE_TYPE (GstPipeWireDevice, gst_pipewire_device, GST_TYPE_DEVICE); + +enum +{ + PROP_ID = 1, + PROP_SERIAL, + PROP_FD_DEVICE, +}; + +static GstElement * +gst_pipewire_device_create_element (GstDevice * device, const gchar * name) +{ + GstPipeWireDevice *pipewire_dev = GST_PIPEWIRE_DEVICE (device); + GstElement *elem; + gchar *serial_str; + + elem = gst_element_factory_make (pipewire_dev->element, name); + + serial_str = g_strdup_printf ("%"PRIu64, pipewire_dev->serial); + g_object_set (elem, "target-object", serial_str, + "fd", pipewire_dev->fd, NULL); + g_free (serial_str); + + return elem; +} + +static gboolean +gst_pipewire_device_reconfigure_element (GstDevice * device, GstElement * element) +{ + GstPipeWireDevice *pipewire_dev = GST_PIPEWIRE_DEVICE (device); + gchar *serial_str; + + if (spa_streq(pipewire_dev->element, "pipewiresrc")) { + if (!GST_IS_PIPEWIRE_SRC (element)) + return FALSE; + } else if (spa_streq(pipewire_dev->element, "pipewiresink")) { + if (!GST_IS_PIPEWIRE_SINK (element)) + return FALSE; + } else { + g_assert_not_reached (); + } + + serial_str = g_strdup_printf ("%"PRIu64, pipewire_dev->serial); + g_object_set (element, "target-object", serial_str, + "fd", pipewire_dev->fd, NULL); + g_free (serial_str); + + return TRUE; +} + + +static void +gst_pipewire_device_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstPipeWireDevice *device; + + device = GST_PIPEWIRE_DEVICE_CAST (object); + + switch (prop_id) { + case PROP_ID: + g_value_set_uint (value, device->id); + break; + case PROP_SERIAL: + g_value_set_uint64 (value, device->serial); + break; + case PROP_FD_DEVICE: + g_value_set_int (value, device->fd); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_pipewire_device_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstPipeWireDevice *device; + + device = GST_PIPEWIRE_DEVICE_CAST (object); + + switch (prop_id) { + case PROP_ID: + device->id = g_value_get_uint (value); + break; + case PROP_SERIAL: + device->serial = g_value_get_uint64 (value); + break; + case PROP_FD_DEVICE: + device->fd = g_value_get_int (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_pipewire_device_finalize (GObject * object) +{ + G_OBJECT_CLASS (gst_pipewire_device_parent_class)->finalize (object); +} + +static void +gst_pipewire_device_class_init (GstPipeWireDeviceClass * klass) +{ + GstDeviceClass *dev_class = GST_DEVICE_CLASS (klass); + GObjectClass *object_class = G_OBJECT_CLASS (klass); + + dev_class->create_element = gst_pipewire_device_create_element; + dev_class->reconfigure_element = gst_pipewire_device_reconfigure_element; + + object_class->get_property = gst_pipewire_device_get_property; + object_class->set_property = gst_pipewire_device_set_property; + object_class->finalize = gst_pipewire_device_finalize; + + g_object_class_install_property (object_class, PROP_ID, + g_param_spec_uint ("id", "Id", + "The internal id of the PipeWire device", 0, G_MAXUINT32, SPA_ID_INVALID, + G_PARAM_STATIC_STRINGS | G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY)); + + g_object_class_install_property (object_class, PROP_SERIAL, + g_param_spec_uint64 ("serial", "Serial", + "The internal serial of the PipeWire device", 0, G_MAXUINT64, SPA_ID_INVALID, + G_PARAM_STATIC_STRINGS | G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY)); + + g_object_class_install_property (object_class, + PROP_FD_DEVICE, + g_param_spec_int ("fd", "Fd", "The fd to connect with", -1, G_MAXINT, -1, + G_PARAM_STATIC_STRINGS | G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY)); +} + +static void +gst_pipewire_device_init (GstPipeWireDevice * device) +{ +} + +G_DEFINE_TYPE (GstPipeWireDeviceProvider, gst_pipewire_device_provider, + GST_TYPE_DEVICE_PROVIDER); + +enum +{ + PROP_0, + PROP_CLIENT_NAME, + PROP_FD, + PROP_LAST +}; + +struct node_data { + struct spa_list link; + GstPipeWireDeviceProvider *self; + struct pw_node *proxy; + struct spa_hook proxy_listener; + uint32_t id; + uint64_t serial; + struct spa_hook node_listener; + struct pw_node_info *info; + GstCaps *caps; + GstDevice *dev; +}; + +struct port_data { + struct node_data *node_data; + struct pw_port *proxy; + struct spa_hook proxy_listener; + uint32_t id; + uint64_t serial; + struct spa_hook port_listener; +}; + +static struct node_data *find_node_data(struct spa_list *nodes, uint32_t id) +{ + struct node_data *n; + spa_list_for_each(n, nodes, link) { + if (n->id == id) + return n; + } + return NULL; +} + +static GstDevice * +new_node (GstPipeWireDeviceProvider *self, struct node_data *data) +{ + GstStructure *props; + const gchar *klass = NULL, *name = NULL; + GstPipeWireDeviceType type; + const struct pw_node_info *info = data->info; + const gchar *element = NULL; + GstPipeWireDevice *gstdev; + + if (info->max_input_ports > 0 && info->max_output_ports == 0) { + type = GST_PIPEWIRE_DEVICE_TYPE_SINK; + element = "pipewiresink"; + } else if (info->max_output_ports > 0 && info->max_input_ports == 0) { + type = GST_PIPEWIRE_DEVICE_TYPE_SOURCE; + element = "pipewiresrc"; + } else { + return NULL; + } + + props = gst_structure_new_empty ("pipewire-proplist"); + if (info->props) { + const struct spa_dict_item *item; + spa_dict_for_each (item, info->props) + gst_structure_set (props, item->key, G_TYPE_STRING, item->value, NULL); + + klass = spa_dict_lookup (info->props, PW_KEY_MEDIA_CLASS); + name = spa_dict_lookup (info->props, PW_KEY_NODE_DESCRIPTION); + } + if (klass == NULL) + klass = "unknown/unknown"; + if (name == NULL) + name = "unknown"; + + gstdev = g_object_new (GST_TYPE_PIPEWIRE_DEVICE, + "display-name", name, "caps", data->caps, "device-class", klass, + "id", data->id, "serial", data->serial, "fd", self->fd, + "properties", props, NULL); + + gstdev->id = data->id; + gstdev->serial = data->serial; + gstdev->type = type; + gstdev->element = element; + if (props) + gst_structure_free (props); + + return GST_DEVICE (gstdev); +} + +static void do_add_nodes(GstPipeWireDeviceProvider *self) +{ + struct node_data *nd; + + spa_list_for_each(nd, &self->nodes, link) { + if (nd->dev != NULL) + continue; + pw_log_info("add node %d", nd->id); + nd->dev = new_node (self, nd); + if (nd->dev) { + if(self->list_only) + self->devices = g_list_prepend (self->devices, gst_object_ref_sink (nd->dev)); + else + gst_device_provider_device_add (GST_DEVICE_PROVIDER (self), nd->dev); + } + } +} + +static void resync(GstPipeWireDeviceProvider *self) +{ + self->seq = pw_core_sync(self->core->core, PW_ID_CORE, self->seq); + pw_log_debug("resync %d", self->seq); +} + +static void +on_core_done (void *data, uint32_t id, int seq) +{ + GstPipeWireDeviceProvider *self = data; + + pw_log_debug("check %d %d", seq, self->seq); + if (id == PW_ID_CORE && seq == self->seq) { + do_add_nodes(self); + self->end = true; + if (self->core) + pw_thread_loop_signal (self->core->loop, FALSE); + } +} + + +static void +on_core_error(void *data, uint32_t id, int seq, int res, const char *message) +{ + GstPipeWireDeviceProvider *self = data; + + pw_log_warn("error id:%u seq:%d res:%d (%s): %s", + id, seq, res, spa_strerror(res), message); + + if (id == PW_ID_CORE) { + self->error = res; + } + pw_thread_loop_signal(self->core->loop, FALSE); +} + +static const struct pw_core_events core_events = { + PW_VERSION_CORE_EVENTS, + .done = on_core_done, + .error = on_core_error, +}; + +static void port_event_info(void *data, const struct pw_port_info *info) +{ + struct port_data *port_data = data; + struct node_data *node_data = port_data->node_data; + uint32_t i; + + pw_log_debug("%p", port_data); + + if (info->change_mask & PW_PORT_CHANGE_MASK_PARAMS) { + for (i = 0; i < info->n_params; i++) { + uint32_t id = info->params[i].id; + + if (id == SPA_PARAM_EnumFormat && + info->params[i].flags & SPA_PARAM_INFO_READ && + node_data->caps == NULL) { + node_data->caps = gst_caps_new_empty (); + pw_port_enum_params(port_data->proxy, 0, id, 0, UINT32_MAX, NULL); + resync(node_data->self); + } + } + } +} + +static void port_event_param(void *data, int seq, uint32_t id, + uint32_t index, uint32_t next, const struct spa_pod *param) +{ + struct port_data *port_data = data; + struct node_data *node_data = port_data->node_data; + GstCaps *c1; + + c1 = gst_caps_from_format (param); + if (c1 && node_data->caps) + gst_caps_append (node_data->caps, c1); +} + +static const struct pw_port_events port_events = { + PW_VERSION_PORT_EVENTS, + .info = port_event_info, + .param = port_event_param +}; + +static void node_event_info(void *data, const struct pw_node_info *info) +{ + struct node_data *node_data = data; + uint32_t i; + + pw_log_debug("%p", node_data->proxy); + + info = node_data->info = pw_node_info_update(node_data->info, info); + + if (info->change_mask & PW_NODE_CHANGE_MASK_PARAMS) { + for (i = 0; i < info->n_params; i++) { + uint32_t id = info->params[i].id; + + if (id == SPA_PARAM_EnumFormat && + info->params[i].flags & SPA_PARAM_INFO_READ && + node_data->caps == NULL) { + node_data->caps = gst_caps_new_empty (); + pw_node_enum_params(node_data->proxy, 0, id, 0, UINT32_MAX, NULL); + resync(node_data->self); + } + } + } +} + +static void node_event_param(void *data, int seq, uint32_t id, + uint32_t index, uint32_t next, const struct spa_pod *param) +{ + struct node_data *node_data = data; + GstCaps *c1; + + c1 = gst_caps_from_format (param); + if (c1 && node_data->caps) + gst_caps_append (node_data->caps, c1); +} + +static const struct pw_node_events node_events = { + PW_VERSION_NODE_EVENTS, + .info = node_event_info, + .param = node_event_param +}; + +static void +removed_node (void *data) +{ + struct node_data *nd = data; + pw_proxy_destroy((struct pw_proxy*)nd->proxy); +} + +static void +destroy_node (void *data) +{ + struct node_data *nd = data; + GstPipeWireDeviceProvider *self = nd->self; + GstDeviceProvider *provider = GST_DEVICE_PROVIDER (self); + + pw_log_debug("destroy %p", nd); + + if (nd->dev != NULL) { + gst_device_provider_device_remove (provider, GST_DEVICE (nd->dev)); + } + if (nd->caps) + gst_caps_unref(nd->caps); + if (nd->info) + pw_node_info_free(nd->info); + + spa_list_remove(&nd->link); +} + +static const struct pw_proxy_events proxy_node_events = { + PW_VERSION_PROXY_EVENTS, + .removed = removed_node, + .destroy = destroy_node, +}; + +static void +removed_port (void *data) +{ + struct port_data *pd = data; + pw_proxy_destroy((struct pw_proxy*)pd->proxy); +} + +static void +destroy_port (void *data) +{ + struct port_data *pd = data; + pw_log_debug("destroy %p", pd); +} + +static const struct pw_proxy_events proxy_port_events = { + PW_VERSION_PROXY_EVENTS, + .removed = removed_port, + .destroy = destroy_port, +}; + +static void registry_event_global(void *data, uint32_t id, uint32_t permissions, + const char *type, uint32_t version, + const struct spa_dict *props) +{ + GstPipeWireDeviceProvider *self = data; + GstDeviceProvider *provider = (GstDeviceProvider*)self; + struct node_data *nd; + const char *str; + + if (spa_streq(type, PW_TYPE_INTERFACE_Node)) { + struct pw_node *node; + + node = pw_registry_bind(self->registry, + id, type, PW_VERSION_NODE, sizeof(*nd)); + if (node == NULL) + goto no_mem; + + if (props != NULL) { + str = spa_dict_lookup(props, PW_KEY_OBJECT_PATH); + if (str != NULL) { + if (g_str_has_prefix(str, "alsa:")) + gst_device_provider_hide_provider (provider, "pulsedeviceprovider"); + else if (g_str_has_prefix(str, "v4l2:")) + gst_device_provider_hide_provider (provider, "v4l2deviceprovider"); + else if (g_str_has_prefix(str, "libcamera:")) + gst_device_provider_hide_provider (provider, "libcameraprovider"); + } + } + + nd = pw_proxy_get_user_data((struct pw_proxy*)node); + nd->self = self; + nd->proxy = node; + nd->id = id; + if (!props || !spa_atou64(spa_dict_lookup(props, PW_KEY_OBJECT_SERIAL), &nd->serial, 0)) + nd->serial = SPA_ID_INVALID; + spa_list_append(&self->nodes, &nd->link); + pw_node_add_listener(node, &nd->node_listener, &node_events, nd); + pw_proxy_add_listener((struct pw_proxy*)node, &nd->proxy_listener, &proxy_node_events, nd); + resync(self); + } + else if (spa_streq(type, PW_TYPE_INTERFACE_Port)) { + struct pw_port *port; + struct port_data *pd; + + if ((str = spa_dict_lookup(props, PW_KEY_NODE_ID)) == NULL) + return; + + if ((nd = find_node_data(&self->nodes, atoi(str))) == NULL) + return; + + port = pw_registry_bind(self->registry, + id, type, PW_VERSION_PORT, sizeof(*pd)); + if (port == NULL) + goto no_mem; + + pd = pw_proxy_get_user_data((struct pw_proxy*)port); + pd->node_data = nd; + pd->proxy = port; + pd->id = id; + if (!props || !spa_atou64(spa_dict_lookup(props, PW_KEY_OBJECT_SERIAL), &pd->serial, 0)) + pd->serial = SPA_ID_INVALID; + pw_port_add_listener(port, &pd->port_listener, &port_events, pd); + pw_proxy_add_listener((struct pw_proxy*)port, &pd->proxy_listener, &proxy_port_events, pd); + resync(self); + } + + return; + +no_mem: + GST_ERROR_OBJECT(self, "failed to create proxy"); + return; +} + +static void registry_event_global_remove(void *data, uint32_t id) +{ +} + +static const struct pw_registry_events registry_events = { + PW_VERSION_REGISTRY_EVENTS, + .global = registry_event_global, + .global_remove = registry_event_global_remove, +}; + +static GList * +gst_pipewire_device_provider_probe (GstDeviceProvider * provider) +{ + GstPipeWireDeviceProvider *self = GST_PIPEWIRE_DEVICE_PROVIDER (provider); + + GST_DEBUG_OBJECT (self, "starting probe"); + + self->core = gst_pipewire_core_get(self->fd); + if (self->core == NULL) { + GST_ERROR_OBJECT (self, "Failed to connect"); + goto failed; + } + + GST_DEBUG_OBJECT (self, "connected"); + + pw_thread_loop_lock (self->core->loop); + + spa_list_init(&self->nodes); + spa_list_init(&self->pending); + self->end = FALSE; + self->error = 0; + self->list_only = TRUE; + self->devices = NULL; + self->registry = pw_core_get_registry(self->core->core, PW_VERSION_REGISTRY, 0); + + pw_core_add_listener(self->core->core, &self->core_listener, &core_events, self); + pw_registry_add_listener(self->registry, &self->registry_listener, ®istry_events, self); + + resync(self); + + for (;;) { + if (self->error < 0) + break; + if (self->end) + break; + pw_thread_loop_wait (self->core->loop); + } + + GST_DEBUG_OBJECT (self, "disconnect"); + + g_clear_pointer ((struct pw_proxy**)&self->registry, pw_proxy_destroy); + pw_thread_loop_unlock (self->core->loop); + g_clear_pointer (&self->core, gst_pipewire_core_release); + + return self->devices; + +failed: + return NULL; +} + +static gboolean +gst_pipewire_device_provider_start (GstDeviceProvider * provider) +{ + GstPipeWireDeviceProvider *self = GST_PIPEWIRE_DEVICE_PROVIDER (provider); + + GST_DEBUG_OBJECT (self, "starting provider"); + + self->core = gst_pipewire_core_get(self->fd); + if (self->core == NULL) { + GST_ERROR_OBJECT (self, "Failed to connect"); + goto failed; + } + + GST_DEBUG_OBJECT (self, "connected"); + + pw_thread_loop_lock (self->core->loop); + + spa_list_init(&self->nodes); + spa_list_init(&self->pending); + self->end = FALSE; + self->error = 0; + self->list_only = FALSE; + self->registry = pw_core_get_registry(self->core->core, PW_VERSION_REGISTRY, 0); + + pw_core_add_listener(self->core->core, &self->core_listener, &core_events, self); + pw_registry_add_listener(self->registry, &self->registry_listener, ®istry_events, self); + + resync(self); + + for (;;) { + if (self->error < 0) + break; + if (self->end) + break; + pw_thread_loop_wait (self->core->loop); + } + + GST_DEBUG_OBJECT (self, "started"); + + pw_thread_loop_unlock (self->core->loop); + + return TRUE; + +failed: + return TRUE; +} + +static void +gst_pipewire_device_provider_stop (GstDeviceProvider * provider) +{ + GstPipeWireDeviceProvider *self = GST_PIPEWIRE_DEVICE_PROVIDER (provider); + + GST_DEBUG_OBJECT (self, "stopping provider"); + + g_clear_pointer ((struct pw_proxy**)&self->registry, pw_proxy_destroy); + g_clear_pointer (&self->core, gst_pipewire_core_release); +} + +static void +gst_pipewire_device_provider_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec) +{ + GstPipeWireDeviceProvider *self = GST_PIPEWIRE_DEVICE_PROVIDER (object); + + switch (prop_id) { + case PROP_CLIENT_NAME: + g_free (self->client_name); + if (!g_value_get_string (value)) { + GST_WARNING_OBJECT (self, + "Empty PipeWire client name not allowed. " + "Resetting to default value"); + self->client_name = g_strdup(pw_get_client_name ()); + } else + self->client_name = g_value_dup_string (value); + break; + + case PROP_FD: + self->fd = g_value_get_int (value); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_pipewire_device_provider_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec) +{ + GstPipeWireDeviceProvider *self = GST_PIPEWIRE_DEVICE_PROVIDER (object); + + switch (prop_id) { + case PROP_CLIENT_NAME: + g_value_set_string (value, self->client_name); + break; + + case PROP_FD: + g_value_set_int (value, self->fd); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_pipewire_device_provider_finalize (GObject * object) +{ + GstPipeWireDeviceProvider *self = GST_PIPEWIRE_DEVICE_PROVIDER (object); + + g_free (self->client_name); + + G_OBJECT_CLASS (gst_pipewire_device_provider_parent_class)->finalize (object); +} + +static void +gst_pipewire_device_provider_class_init (GstPipeWireDeviceProviderClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstDeviceProviderClass *dm_class = GST_DEVICE_PROVIDER_CLASS (klass); + + gobject_class->set_property = gst_pipewire_device_provider_set_property; + gobject_class->get_property = gst_pipewire_device_provider_get_property; + gobject_class->finalize = gst_pipewire_device_provider_finalize; + + dm_class->probe = gst_pipewire_device_provider_probe; + dm_class->start = gst_pipewire_device_provider_start; + dm_class->stop = gst_pipewire_device_provider_stop; + + g_object_class_install_property (gobject_class, + PROP_CLIENT_NAME, + g_param_spec_string ("client-name", "Client Name", + "The PipeWire client_name_to_use", pw_get_client_name (), + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_READY)); + + g_object_class_install_property (gobject_class, + PROP_FD, + g_param_spec_int ("fd", "Fd", "The fd to connect with", -1, G_MAXINT, -1, + G_PARAM_STATIC_STRINGS | G_PARAM_READWRITE)); + + gst_device_provider_class_set_static_metadata (dm_class, + "PipeWire Device Provider", "Sink/Source/Audio/Video", + "List and provide PipeWire source and sink devices", + "Wim Taymans <wim.taymans@gmail.com>"); +} + +static void +gst_pipewire_device_provider_init (GstPipeWireDeviceProvider * self) +{ + self->client_name = g_strdup(pw_get_client_name ()); + self->fd = -1; +} diff --git a/src/gst/gstpipewiredeviceprovider.h b/src/gst/gstpipewiredeviceprovider.h new file mode 100644 index 0000000..c940ba4 --- /dev/null +++ b/src/gst/gstpipewiredeviceprovider.h @@ -0,0 +1,110 @@ +/* GStreamer + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + + +#ifndef __GST_PIPEWIRE_DEVICE_PROVIDER_H__ +#define __GST_PIPEWIRE_DEVICE_PROVIDER_H__ + +#include "config.h" + +#include <gst/gst.h> + +#include <pipewire/pipewire.h> +#include <gst/gstpipewirecore.h> + +G_BEGIN_DECLS + +typedef struct _GstPipeWireDevice GstPipeWireDevice; +typedef struct _GstPipeWireDeviceClass GstPipeWireDeviceClass; + +#define GST_TYPE_PIPEWIRE_DEVICE (gst_pipewire_device_get_type()) +#define GST_IS_PIPEWIRE_DEVICE(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_PIPEWIRE_DEVICE)) +#define GST_IS_PIPEWIRE_DEVICE_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_PIPEWIRE_DEVICE)) +#define GST_PIPEWIRE_DEVICE_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_PIPEWIRE_DEVICE, GstPipeWireDeviceClass)) +#define GST_PIPEWIRE_DEVICE(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_PIPEWIRE_DEVICE, GstPipeWireDevice)) +#define GST_PIPEWIRE_DEVICE_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_DEVICE, GstPipeWireDeviceClass)) +#define GST_PIPEWIRE_DEVICE_CAST(obj) ((GstPipeWireDevice *)(obj)) + +typedef enum { + GST_PIPEWIRE_DEVICE_TYPE_UNKNOWN, + GST_PIPEWIRE_DEVICE_TYPE_SOURCE, + GST_PIPEWIRE_DEVICE_TYPE_SINK, +} GstPipeWireDeviceType; + +struct _GstPipeWireDevice { + GstDevice parent; + + GstPipeWireDeviceType type; + uint32_t id; + uint64_t serial; + int fd; + const gchar *element; +}; + +struct _GstPipeWireDeviceClass { + GstDeviceClass parent_class; +}; + +GType gst_pipewire_device_get_type (void); + +typedef struct _GstPipeWireDeviceProvider GstPipeWireDeviceProvider; +typedef struct _GstPipeWireDeviceProviderClass GstPipeWireDeviceProviderClass; + +#define GST_TYPE_PIPEWIRE_DEVICE_PROVIDER (gst_pipewire_device_provider_get_type()) +#define GST_IS_PIPEWIRE_DEVICE_PROVIDER(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_PIPEWIRE_DEVICE_PROVIDER)) +#define GST_IS_PIPEWIRE_DEVICE_PROVIDER_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_PIPEWIRE_DEVICE_PROVIDER)) +#define GST_PIPEWIRE_DEVICE_PROVIDER_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_PIPEWIRE_DEVICE_PROVIDER, GstPipeWireDeviceProviderClass)) +#define GST_PIPEWIRE_DEVICE_PROVIDER(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_PIPEWIRE_DEVICE_PROVIDER, GstPipeWireDeviceProvider)) +#define GST_PIPEWIRE_DEVICE_PROVIDER_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_DEVICE_PROVIDER, GstPipeWireDeviceProviderClass)) +#define GST_PIPEWIRE_DEVICE_PROVIDER_CAST(obj) ((GstPipeWireDeviceProvider *)(obj)) + +struct _GstPipeWireDeviceProvider { + GstDeviceProvider parent; + + gchar *client_name; + int fd; + + GstPipeWireCore *core; + struct spa_hook core_listener; + struct pw_registry *registry; + struct spa_hook registry_listener; + struct spa_list nodes; + struct spa_list pending; + int seq; + + int error; + gboolean end; + gboolean list_only; + GList *devices; +}; + +struct _GstPipeWireDeviceProviderClass { + GstDeviceProviderClass parent_class; +}; + +GType gst_pipewire_device_provider_get_type (void); + +G_END_DECLS + +#endif /* __GST_PIPEWIRE_DEVICE_PROVIDER_H__ */ diff --git a/src/gst/gstpipewireformat.c b/src/gst/gstpipewireformat.c new file mode 100644 index 0000000..006c499 --- /dev/null +++ b/src/gst/gstpipewireformat.c @@ -0,0 +1,981 @@ +/* GStreamer + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include <stdio.h> + +#include <gst/gst.h> +#include <gst/allocators/gstdmabuf.h> +#include <gst/video/video.h> +#include <gst/audio/audio.h> + +#include <spa/utils/string.h> +#include <spa/utils/type.h> +#include <spa/param/video/format-utils.h> +#include <spa/param/audio/format-utils.h> +#include <spa/pod/builder.h> + +#include "gstpipewireformat.h" + +#ifndef DRM_FORMAT_MOD_INVALID +#define DRM_FORMAT_MOD_INVALID ((1ULL << 56) - 1) +#endif + +#ifndef DRM_FORMAT_MOD_LINEAR +#define DRM_FORMAT_MOD_LINEAR 0 +#endif + +struct media_type { + const char *name; + uint32_t media_type; + uint32_t media_subtype; +}; + +static const struct media_type media_type_map[] = { + { "video/x-raw", SPA_MEDIA_TYPE_video, SPA_MEDIA_SUBTYPE_raw }, + { "audio/x-raw", SPA_MEDIA_TYPE_audio, SPA_MEDIA_SUBTYPE_raw }, + { "image/jpeg", SPA_MEDIA_TYPE_video, SPA_MEDIA_SUBTYPE_mjpg }, + { "video/x-jpeg", SPA_MEDIA_TYPE_video, SPA_MEDIA_SUBTYPE_mjpg }, + { "video/x-h264", SPA_MEDIA_TYPE_video, SPA_MEDIA_SUBTYPE_h264 }, + { "audio/x-mulaw", SPA_MEDIA_TYPE_audio, SPA_MEDIA_SUBTYPE_raw }, + { "audio/x-alaw", SPA_MEDIA_TYPE_audio, SPA_MEDIA_SUBTYPE_raw }, + { "audio/mpeg", SPA_MEDIA_TYPE_audio, SPA_MEDIA_SUBTYPE_mp3 }, + { "audio/x-flac", SPA_MEDIA_TYPE_audio, SPA_MEDIA_SUBTYPE_flac }, + { NULL, } +}; + +static const uint32_t video_format_map[] = { + SPA_VIDEO_FORMAT_UNKNOWN, + SPA_VIDEO_FORMAT_ENCODED, + SPA_VIDEO_FORMAT_I420, + SPA_VIDEO_FORMAT_YV12, + SPA_VIDEO_FORMAT_YUY2, + SPA_VIDEO_FORMAT_UYVY, + SPA_VIDEO_FORMAT_AYUV, + SPA_VIDEO_FORMAT_RGBx, + SPA_VIDEO_FORMAT_BGRx, + SPA_VIDEO_FORMAT_xRGB, + SPA_VIDEO_FORMAT_xBGR, + SPA_VIDEO_FORMAT_RGBA, + SPA_VIDEO_FORMAT_BGRA, + SPA_VIDEO_FORMAT_ARGB, + SPA_VIDEO_FORMAT_ABGR, + SPA_VIDEO_FORMAT_RGB, + SPA_VIDEO_FORMAT_BGR, + SPA_VIDEO_FORMAT_Y41B, + SPA_VIDEO_FORMAT_Y42B, + SPA_VIDEO_FORMAT_YVYU, + SPA_VIDEO_FORMAT_Y444, + SPA_VIDEO_FORMAT_v210, + SPA_VIDEO_FORMAT_v216, + SPA_VIDEO_FORMAT_NV12, + SPA_VIDEO_FORMAT_NV21, + SPA_VIDEO_FORMAT_GRAY8, + SPA_VIDEO_FORMAT_GRAY16_BE, + SPA_VIDEO_FORMAT_GRAY16_LE, + SPA_VIDEO_FORMAT_v308, + SPA_VIDEO_FORMAT_RGB16, + SPA_VIDEO_FORMAT_BGR16, + SPA_VIDEO_FORMAT_RGB15, + SPA_VIDEO_FORMAT_BGR15, + SPA_VIDEO_FORMAT_UYVP, + SPA_VIDEO_FORMAT_A420, + SPA_VIDEO_FORMAT_RGB8P, + SPA_VIDEO_FORMAT_YUV9, + SPA_VIDEO_FORMAT_YVU9, + SPA_VIDEO_FORMAT_IYU1, + SPA_VIDEO_FORMAT_ARGB64, + SPA_VIDEO_FORMAT_AYUV64, + SPA_VIDEO_FORMAT_r210, + SPA_VIDEO_FORMAT_I420_10BE, + SPA_VIDEO_FORMAT_I420_10LE, + SPA_VIDEO_FORMAT_I422_10BE, + SPA_VIDEO_FORMAT_I422_10LE, + SPA_VIDEO_FORMAT_Y444_10BE, + SPA_VIDEO_FORMAT_Y444_10LE, + SPA_VIDEO_FORMAT_GBR, + SPA_VIDEO_FORMAT_GBR_10BE, + SPA_VIDEO_FORMAT_GBR_10LE, + SPA_VIDEO_FORMAT_NV16, + SPA_VIDEO_FORMAT_NV24, + SPA_VIDEO_FORMAT_NV12_64Z32, + SPA_VIDEO_FORMAT_A420_10BE, + SPA_VIDEO_FORMAT_A420_10LE, + SPA_VIDEO_FORMAT_A422_10BE, + SPA_VIDEO_FORMAT_A422_10LE, + SPA_VIDEO_FORMAT_A444_10BE, + SPA_VIDEO_FORMAT_A444_10LE, + SPA_VIDEO_FORMAT_NV61, + SPA_VIDEO_FORMAT_P010_10BE, + SPA_VIDEO_FORMAT_P010_10LE, + SPA_VIDEO_FORMAT_IYU2, + SPA_VIDEO_FORMAT_VYUY, + SPA_VIDEO_FORMAT_GBRA, + SPA_VIDEO_FORMAT_GBRA_10BE, + SPA_VIDEO_FORMAT_GBRA_10LE, + SPA_VIDEO_FORMAT_GBR_12BE, + SPA_VIDEO_FORMAT_GBR_12LE, + SPA_VIDEO_FORMAT_GBRA_12BE, + SPA_VIDEO_FORMAT_GBRA_12LE, + SPA_VIDEO_FORMAT_I420_12BE, + SPA_VIDEO_FORMAT_I420_12LE, + SPA_VIDEO_FORMAT_I422_12BE, + SPA_VIDEO_FORMAT_I422_12LE, + SPA_VIDEO_FORMAT_Y444_12BE, + SPA_VIDEO_FORMAT_Y444_12LE, +}; + +#if __BYTE_ORDER == __BIG_ENDIAN +#define _FORMAT_LE(fmt) SPA_AUDIO_FORMAT_ ## fmt ## _OE +#define _FORMAT_BE(fmt) SPA_AUDIO_FORMAT_ ## fmt +#elif __BYTE_ORDER == __LITTLE_ENDIAN +#define _FORMAT_LE(fmt) SPA_AUDIO_FORMAT_ ## fmt +#define _FORMAT_BE(fmt) SPA_AUDIO_FORMAT_ ## fmt ## _OE +#endif + +static const uint32_t audio_format_map[] = { + SPA_AUDIO_FORMAT_UNKNOWN, + SPA_AUDIO_FORMAT_ENCODED, + SPA_AUDIO_FORMAT_S8, + SPA_AUDIO_FORMAT_U8, + _FORMAT_LE (S16), + _FORMAT_BE (S16), + _FORMAT_LE (U16), + _FORMAT_BE (U16), + _FORMAT_LE (S24_32), + _FORMAT_BE (S24_32), + _FORMAT_LE (U24_32), + _FORMAT_BE (U24_32), + _FORMAT_LE (S32), + _FORMAT_BE (S32), + _FORMAT_LE (U32), + _FORMAT_BE (U32), + _FORMAT_LE (S24), + _FORMAT_BE (S24), + _FORMAT_LE (U24), + _FORMAT_BE (U24), + _FORMAT_LE (S20), + _FORMAT_BE (S20), + _FORMAT_LE (U20), + _FORMAT_BE (U20), + _FORMAT_LE (S18), + _FORMAT_BE (S18), + _FORMAT_LE (U18), + _FORMAT_BE (U18), + _FORMAT_LE (F32), + _FORMAT_BE (F32), + _FORMAT_LE (F64), + _FORMAT_BE (F64), +}; + +typedef struct { + struct spa_pod_builder b; + const struct media_type *type; + uint32_t id; + const GstCapsFeatures *cf; + const GstStructure *cs; + GPtrArray *array; +} ConvertData; + +static const struct media_type * +find_media_types (const char *name) +{ + int i; + for (i = 0; media_type_map[i].name; i++) { + if (spa_streq(media_type_map[i].name, name)) + return &media_type_map[i]; + } + return NULL; +} + +static int find_index(const uint32_t *items, int n_items, uint32_t id) +{ + int i; + for (i = 0; i < n_items; i++) + if (items[i] == id) + return i; + return -1; +} + +static const char * +get_nth_string (const GValue *val, int idx) +{ + const GValue *v = NULL; + GType type = G_VALUE_TYPE (val); + + if (type == G_TYPE_STRING && idx == 0) + v = val; + else if (type == GST_TYPE_LIST) { + GArray *array = g_value_peek_pointer (val); + if (idx < (int)(array->len + 1)) { + v = &g_array_index (array, GValue, SPA_MAX (idx - 1, 0)); + } + } + if (v) + return g_value_get_string (v); + + return NULL; +} + +static bool +get_nth_int (const GValue *val, int idx, int *res) +{ + const GValue *v = NULL; + GType type = G_VALUE_TYPE (val); + + if (type == G_TYPE_INT && idx == 0) { + v = val; + } else if (type == GST_TYPE_INT_RANGE) { + if (idx == 0 || idx == 1) { + *res = gst_value_get_int_range_min (val); + return true; + } else if (idx == 2) { + *res = gst_value_get_int_range_max (val); + return true; + } + } else if (type == GST_TYPE_LIST) { + GArray *array = g_value_peek_pointer (val); + if (idx < (int)(array->len + 1)) { + v = &g_array_index (array, GValue, SPA_MAX (idx - 1, 0)); + } + } + if (v) { + *res = g_value_get_int (v); + return true; + } + return false; +} + +static gboolean +get_nth_fraction (const GValue *val, int idx, struct spa_fraction *f) +{ + const GValue *v = NULL; + GType type = G_VALUE_TYPE (val); + + if (type == GST_TYPE_FRACTION && idx == 0) { + v = val; + } else if (type == GST_TYPE_FRACTION_RANGE) { + if (idx == 0 || idx == 1) { + v = gst_value_get_fraction_range_min (val); + } else if (idx == 2) { + v = gst_value_get_fraction_range_max (val); + } + } else if (type == GST_TYPE_LIST) { + GArray *array = g_value_peek_pointer (val); + if (idx < (int)(array->len + 1)) { + v = &g_array_index (array, GValue, SPA_MAX (idx-1, 0)); + } + } + if (v) { + f->num = gst_value_get_fraction_numerator (v); + f->denom = gst_value_get_fraction_denominator (v); + return true; + } + return false; +} + +static gboolean +get_nth_rectangle (const GValue *width, const GValue *height, int idx, struct spa_rectangle *r) +{ + const GValue *w = NULL, *h = NULL; + GType wt = G_VALUE_TYPE (width); + GType ht = G_VALUE_TYPE (height); + + if (wt == G_TYPE_INT && ht == G_TYPE_INT && idx == 0) { + w = width; + h = height; + } else if (wt == GST_TYPE_INT_RANGE && ht == GST_TYPE_INT_RANGE) { + if (idx == 0 || idx == 1) { + r->width = gst_value_get_int_range_min (width); + r->height = gst_value_get_int_range_min (height); + return true; + } else if (idx == 2) { + r->width = gst_value_get_int_range_max (width); + r->height = gst_value_get_int_range_max (height); + return true; + } else if (idx == 3) { + r->width = gst_value_get_int_range_step (width); + r->height = gst_value_get_int_range_step (height); + if (r->width > 1 || r->height > 1) + return true; + else + return false; + } + } else if (wt == GST_TYPE_LIST && ht == GST_TYPE_LIST) { + GArray *wa = g_value_peek_pointer (width); + GArray *ha = g_value_peek_pointer (height); + if (idx < (int)(wa->len + 1)) + w = &g_array_index (wa, GValue, SPA_MAX (idx-1, 0)); + if (idx < (int)(ha->len + 1)) + h = &g_array_index (ha, GValue, SPA_MAX (idx-1, 0)); + } + if (w && h) { + r->width = g_value_get_int (w); + r->height = g_value_get_int (h); + return true; + } + return false; +} + +static uint32_t +get_range_type (const GValue *val) +{ + GType type = G_VALUE_TYPE (val); + + if (type == GST_TYPE_LIST) + return SPA_CHOICE_Enum; + if (type == GST_TYPE_DOUBLE_RANGE || type == GST_TYPE_FRACTION_RANGE) + return SPA_CHOICE_Range; + if (type == GST_TYPE_INT_RANGE) { + if (gst_value_get_int_range_step (val) == 1) + return SPA_CHOICE_Range; + else + return SPA_CHOICE_Step; + } + if (type == GST_TYPE_INT64_RANGE) { + if (gst_value_get_int64_range_step (val) == 1) + return SPA_CHOICE_Range; + else + return SPA_CHOICE_Step; + } + return SPA_CHOICE_None; +} + +static uint32_t +get_range_type2 (const GValue *v1, const GValue *v2) +{ + uint32_t r1, r2; + + r1 = get_range_type (v1); + r2 = get_range_type (v2); + + if (r1 == r2) + return r1; + if (r1 == SPA_CHOICE_Step || r2 == SPA_CHOICE_Step) + return SPA_CHOICE_Step; + if (r1 == SPA_CHOICE_Range || r2 == SPA_CHOICE_Range) + return SPA_CHOICE_Range; + return SPA_CHOICE_Range; +} + +static gboolean +handle_video_fields (ConvertData *d) +{ + const GValue *value, *value2; + int i; + struct spa_pod_choice *choice; + struct spa_pod_frame f; + + value = gst_structure_get_value (d->cs, "format"); + if (value) { + const char *v; + int idx; + for (i = 0; (v = get_nth_string (value, i)); i++) { + if (i == 0) { + spa_pod_builder_prop (&d->b, SPA_FORMAT_VIDEO_format, 0); + spa_pod_builder_push_choice(&d->b, &f, get_range_type (value), 0); + } + + idx = gst_video_format_from_string (v); + if (idx != GST_VIDEO_FORMAT_UNKNOWN && idx < (int)SPA_N_ELEMENTS (video_format_map)) + spa_pod_builder_id (&d->b, video_format_map[idx]); + } + if (i > 0) { + choice = spa_pod_builder_pop(&d->b, &f); + if (i == 1) + choice->body.type = SPA_CHOICE_None; + } + } + value = gst_structure_get_value (d->cs, "width"); + value2 = gst_structure_get_value (d->cs, "height"); + if (value && value2) { + struct spa_rectangle v; + for (i = 0; get_nth_rectangle (value, value2, i, &v); i++) { + if (i == 0) { + spa_pod_builder_prop (&d->b, SPA_FORMAT_VIDEO_size, 0); + spa_pod_builder_push_choice(&d->b, &f, get_range_type2 (value, value2), 0); + } + + spa_pod_builder_rectangle (&d->b, v.width, v.height); + } + if (i > 0) { + choice = spa_pod_builder_pop(&d->b, &f); + if (i == 1) + choice->body.type = SPA_CHOICE_None; + } + } + + value = gst_structure_get_value (d->cs, "framerate"); + if (value) { + struct spa_fraction v; + for (i = 0; get_nth_fraction (value, i, &v); i++) { + if (i == 0) { + spa_pod_builder_prop (&d->b, SPA_FORMAT_VIDEO_framerate, 0); + spa_pod_builder_push_choice(&d->b, &f, get_range_type (value), 0); + } + + spa_pod_builder_fraction (&d->b, v.num, v.denom); + } + if (i > 0) { + choice = spa_pod_builder_pop(&d->b, &f); + if (i == 1) + choice->body.type = SPA_CHOICE_None; + } + } + + value = gst_structure_get_value (d->cs, "max-framerate"); + if (value) { + struct spa_fraction v; + for (i = 0; get_nth_fraction (value, i, &v); i++) { + if (i == 0) { + spa_pod_builder_prop (&d->b, SPA_FORMAT_VIDEO_maxFramerate, 0); + spa_pod_builder_push_choice(&d->b, &f, get_range_type (value), 0); + } + + spa_pod_builder_fraction (&d->b, v.num, v.denom); + } + if (i > 0) { + choice = spa_pod_builder_pop(&d->b, &f); + if (i == 1) + choice->body.type = SPA_CHOICE_None; + } + } + return TRUE; +} + +static void +set_default_channels (struct spa_pod_builder *b, uint32_t channels) +{ + uint32_t position[SPA_AUDIO_MAX_CHANNELS] = {0}; + gboolean ok = TRUE; + + switch (channels) { + case 8: + position[6] = SPA_AUDIO_CHANNEL_SL; + position[7] = SPA_AUDIO_CHANNEL_SR; + SPA_FALLTHROUGH + case 6: + position[5] = SPA_AUDIO_CHANNEL_LFE; + SPA_FALLTHROUGH + case 5: + position[4] = SPA_AUDIO_CHANNEL_FC; + SPA_FALLTHROUGH + case 4: + position[2] = SPA_AUDIO_CHANNEL_RL; + position[3] = SPA_AUDIO_CHANNEL_RR; + SPA_FALLTHROUGH + case 2: + position[0] = SPA_AUDIO_CHANNEL_FL; + position[1] = SPA_AUDIO_CHANNEL_FR; + break; + case 1: + position[0] = SPA_AUDIO_CHANNEL_MONO; + break; + default: + ok = FALSE; + break; + } + + if (ok) + spa_pod_builder_add (b, SPA_FORMAT_AUDIO_position, + SPA_POD_Array(sizeof(uint32_t), SPA_TYPE_Id, channels, position), 0); +} + +static gboolean +handle_audio_fields (ConvertData *d) +{ + const GValue *value; + struct spa_pod_choice *choice; + struct spa_pod_frame f; + int i = 0; + + value = gst_structure_get_value (d->cs, "format"); + if (value) { + const char *v; + int idx; + for (i = 0; (v = get_nth_string (value, i)); i++) { + if (i == 0) { + spa_pod_builder_prop (&d->b, SPA_FORMAT_AUDIO_format, 0); + spa_pod_builder_push_choice(&d->b, &f, get_range_type (value), 0); + } + + idx = gst_audio_format_from_string (v); + if (idx < (int)SPA_N_ELEMENTS (audio_format_map)) + spa_pod_builder_id (&d->b, audio_format_map[idx]); + } + if (i > 0) { + choice = spa_pod_builder_pop(&d->b, &f); + if (i == 1) + choice->body.type = SPA_CHOICE_None; + } + } else if (strcmp(d->type->name, "audio/x-mulaw") == 0) { + spa_pod_builder_prop (&d->b, SPA_FORMAT_AUDIO_format, 0); + spa_pod_builder_id (&d->b, SPA_AUDIO_FORMAT_ULAW); + } else if (strcmp(d->type->name, "audio/x-alaw") == 0) { + spa_pod_builder_prop (&d->b, SPA_FORMAT_AUDIO_format, 0); + spa_pod_builder_id (&d->b, SPA_AUDIO_FORMAT_ALAW); + } else if (strcmp(d->type->name, "audio/mpeg") == 0) { + spa_pod_builder_prop (&d->b, SPA_FORMAT_AUDIO_format, 0); + spa_pod_builder_id (&d->b, SPA_AUDIO_FORMAT_ENCODED); + } else if (strcmp(d->type->name, "audio/x-flac") == 0) { + spa_pod_builder_prop (&d->b, SPA_FORMAT_AUDIO_format, 0); + spa_pod_builder_id (&d->b, SPA_AUDIO_FORMAT_ENCODED); + } + +#if 0 + value = gst_structure_get_value (d->cs, "layout"); + if (value) { + const char *v; + for (i = 0; (v = get_nth_string (value, i)); i++) { + enum spa_audio_layout layout; + + if (spa_streq(v, "interleaved")) + layout = SPA_AUDIO_LAYOUT_INTERLEAVED; + else if (spa_streq(v, "non-interleaved")) + layout = SPA_AUDIO_LAYOUT_NON_INTERLEAVED; + else + break; + + if (i == 0) { + spa_pod_builder_prop (&d->b, SPA_FORMAT_AUDIO_layout, 0); + spa_pod_builder_push_choice(&d->b, &f, get_range_type (value), 0); + } + + spa_pod_builder_id (&d->b, layout); + } + if (i > 0) { + choice = spa_pod_builder_pop(&d->b, &f); + if (i == 1) + choice->body.type = SPA_CHOICE_None; + } + } +#endif + value = gst_structure_get_value (d->cs, "rate"); + if (value) { + int v; + for (i = 0; get_nth_int (value, i, &v); i++) { + if (i == 0) { + spa_pod_builder_prop (&d->b, SPA_FORMAT_AUDIO_rate, 0); + spa_pod_builder_push_choice(&d->b, &f, get_range_type (value), 0); + } + + spa_pod_builder_int (&d->b, v); + } + if (i > 0) { + choice = spa_pod_builder_pop(&d->b, &f); + if (i == 1) + choice->body.type = SPA_CHOICE_None; + } + } + value = gst_structure_get_value (d->cs, "channels"); + if (value) { + int v; + for (i = 0; get_nth_int (value, i, &v); i++) { + if (i == 0) { + spa_pod_builder_prop (&d->b, SPA_FORMAT_AUDIO_channels, 0); + spa_pod_builder_push_choice(&d->b, &f, get_range_type (value), 0); + } + + spa_pod_builder_int (&d->b, v); + } + if (i > 0) { + choice = spa_pod_builder_pop(&d->b, &f); + if (i == 1) { + choice->body.type = SPA_CHOICE_None; + set_default_channels (&d->b, v); + } + } + } + return TRUE; +} + +static int +builder_overflow (void *event_data, uint32_t size) +{ + struct spa_pod_builder *b = event_data; + b->size = SPA_ROUND_UP_N (size, 512); + b->data = realloc (b->data, b->size); + if (b->data == NULL) + return -errno; + return 0; +} + +static const struct spa_pod_builder_callbacks builder_callbacks = { + SPA_VERSION_POD_BUILDER_CALLBACKS, + .overflow = builder_overflow +}; + +static struct spa_pod * +convert_1 (ConvertData *d) +{ + struct spa_pod_frame f; + + if (!(d->type = find_media_types (gst_structure_get_name (d->cs)))) + return NULL; + + spa_pod_builder_set_callbacks(&d->b, &builder_callbacks, &d->b); + + spa_pod_builder_push_object (&d->b, &f, SPA_TYPE_OBJECT_Format, d->id); + + spa_pod_builder_prop (&d->b, SPA_FORMAT_mediaType, 0); + spa_pod_builder_id(&d->b, d->type->media_type); + + spa_pod_builder_prop (&d->b, SPA_FORMAT_mediaSubtype, 0); + spa_pod_builder_id(&d->b, d->type->media_subtype); + + if (d->cf && gst_caps_features_contains (d->cf, GST_CAPS_FEATURE_MEMORY_DMABUF)) { + struct spa_pod_frame f2; + + spa_pod_builder_prop (&d->b, SPA_FORMAT_VIDEO_modifier, + (SPA_POD_PROP_FLAG_MANDATORY | SPA_POD_PROP_FLAG_DONT_FIXATE)); + spa_pod_builder_push_choice (&d->b, &f2, SPA_CHOICE_Enum, 0); + spa_pod_builder_long (&d->b, DRM_FORMAT_MOD_INVALID); + spa_pod_builder_long (&d->b, DRM_FORMAT_MOD_INVALID); + spa_pod_builder_long (&d->b, DRM_FORMAT_MOD_LINEAR); + spa_pod_builder_pop (&d->b, &f2); + } + + if (d->type->media_type == SPA_MEDIA_TYPE_video) + handle_video_fields (d); + else if (d->type->media_type == SPA_MEDIA_TYPE_audio) + handle_audio_fields (d); + + spa_pod_builder_pop (&d->b, &f); + + return SPA_PTROFF (d->b.data, 0, struct spa_pod); +} + +struct spa_pod * +gst_caps_to_format (GstCaps *caps, guint index, uint32_t id) +{ + ConvertData d; + struct spa_pod *res; + + g_return_val_if_fail (GST_IS_CAPS (caps), NULL); + g_return_val_if_fail (gst_caps_is_fixed (caps), NULL); + + spa_zero (d); + d.cf = gst_caps_get_features (caps, index); + d.cs = gst_caps_get_structure (caps, index); + d.id = id; + + res = convert_1 (&d); + + return res; +} + +static gboolean +foreach_func (GstCapsFeatures *features, + GstStructure *structure, + ConvertData *d) +{ + struct spa_pod *fmt; + int idx; + + spa_zero(d->b); + d->cf = features; + d->cs = structure; + + if (d->cf && gst_caps_features_contains (d->cf, GST_CAPS_FEATURE_MEMORY_DMABUF)) + idx = 0; + else + idx = -1; + + if ((fmt = convert_1 (d))) + g_ptr_array_insert (d->array, idx, fmt); + + return TRUE; +} + + +GPtrArray * +gst_caps_to_format_all (GstCaps *caps, uint32_t id) +{ + ConvertData d; + + spa_zero (d); + d.id = id; + d.array = g_ptr_array_new_full (gst_caps_get_size (caps), (GDestroyNotify)g_free); + + gst_caps_foreach (caps, (GstCapsForeachFunc) foreach_func, &d); + + return d.array; +} + +typedef const char *(*id_to_string_func)(uint32_t id); + +static const char *video_id_to_string(uint32_t id) +{ + int idx; + if ((idx = find_index(video_format_map, SPA_N_ELEMENTS(video_format_map), id)) == -1) + return NULL; + return gst_video_format_to_string(idx); +} + +static const char *audio_id_to_string(uint32_t id) +{ + int idx; + if ((idx = find_index(audio_format_map, SPA_N_ELEMENTS(audio_format_map), id)) == -1) + return NULL; + return gst_audio_format_to_string(idx); +} + +static void +handle_id_prop (const struct spa_pod_prop *prop, const char *key, id_to_string_func func, GstCaps *res) +{ + const char * str; + struct spa_pod *val; + uint32_t *id; + uint32_t i, n_items, choice; + + val = spa_pod_get_values(&prop->value, &n_items, &choice); + if (val->type != SPA_TYPE_Id) + return; + + id = SPA_POD_BODY(val); + + switch (choice) { + case SPA_CHOICE_None: + if (!(str = func(id[0]))) + return; + gst_caps_set_simple (res, key, G_TYPE_STRING, str, NULL); + break; + case SPA_CHOICE_Enum: + { + GValue list = { 0 }, v = { 0 }; + + g_value_init (&list, GST_TYPE_LIST); + for (i = 1; i < n_items; i++) { + if (!(str = func(id[i]))) + continue; + + g_value_init (&v, G_TYPE_STRING); + g_value_set_string (&v, str); + gst_value_list_append_and_take_value (&list, &v); + } + gst_caps_set_value (res, key, &list); + g_value_unset (&list); + break; + } + default: + break; + } +} + +static void +handle_int_prop (const struct spa_pod_prop *prop, const char *key, GstCaps *res) +{ + struct spa_pod *val; + uint32_t *ints; + uint32_t i, n_items, choice; + + val = spa_pod_get_values(&prop->value, &n_items, &choice); + if (val->type != SPA_TYPE_Int) + return; + + ints = SPA_POD_BODY(val); + + switch (choice) { + case SPA_CHOICE_None: + gst_caps_set_simple (res, key, G_TYPE_INT, ints[0], NULL); + break; + case SPA_CHOICE_Range: + case SPA_CHOICE_Step: + { + if (n_items < 3) + return; + gst_caps_set_simple (res, key, GST_TYPE_INT_RANGE, ints[1], ints[2], NULL); + break; + } + case SPA_CHOICE_Enum: + { + GValue list = { 0 }, v = { 0 }; + + g_value_init (&list, GST_TYPE_LIST); + for (i = 1; i < n_items; i++) { + g_value_init (&v, G_TYPE_INT); + g_value_set_int (&v, ints[i]); + gst_value_list_append_and_take_value (&list, &v); + } + gst_caps_set_value (res, key, &list); + g_value_unset (&list); + break; + } + default: + break; + } +} + +static void +handle_rect_prop (const struct spa_pod_prop *prop, const char *width, const char *height, GstCaps *res) +{ + struct spa_pod *val; + struct spa_rectangle *rect; + uint32_t i, n_items, choice; + + val = spa_pod_get_values(&prop->value, &n_items, &choice); + if (val->type != SPA_TYPE_Rectangle) + return; + + rect = SPA_POD_BODY(val); + + switch (choice) { + case SPA_CHOICE_None: + gst_caps_set_simple (res, width, G_TYPE_INT, rect[0].width, + height, G_TYPE_INT, rect[0].height, NULL); + break; + case SPA_CHOICE_Range: + case SPA_CHOICE_Step: + { + if (n_items < 3) + return; + gst_caps_set_simple (res, width, GST_TYPE_INT_RANGE, rect[1].width, rect[2].width, + height, GST_TYPE_INT_RANGE, rect[1].height, rect[2].height, NULL); + break; + } + case SPA_CHOICE_Enum: + { + GValue l1 = { 0 }, l2 = { 0 }, v1 = { 0 }, v2 = { 0 }; + + g_value_init (&l1, GST_TYPE_LIST); + g_value_init (&l2, GST_TYPE_LIST); + for (i = 1; i < n_items; i++) { + g_value_init (&v1, G_TYPE_INT); + g_value_set_int (&v1, rect[i].width); + gst_value_list_append_and_take_value (&l1, &v1); + + g_value_init (&v2, G_TYPE_INT); + g_value_set_int (&v2, rect[i].height); + gst_value_list_append_and_take_value (&l2, &v2); + } + gst_caps_set_value (res, width, &l1); + gst_caps_set_value (res, height, &l2); + g_value_unset (&l1); + g_value_unset (&l2); + break; + } + default: + break; + } +} + +static void +handle_fraction_prop (const struct spa_pod_prop *prop, const char *key, GstCaps *res) +{ + struct spa_pod *val; + struct spa_fraction *fract; + uint32_t i, n_items, choice; + + val = spa_pod_get_values(&prop->value, &n_items, &choice); + if (val->type != SPA_TYPE_Fraction) + return; + + fract = SPA_POD_BODY(val); + + switch (choice) { + case SPA_CHOICE_None: + gst_caps_set_simple (res, key, GST_TYPE_FRACTION, fract[0].num, fract[0].denom, NULL); + break; + case SPA_CHOICE_Range: + case SPA_CHOICE_Step: + { + if (n_items < 3) + return; + gst_caps_set_simple (res, key, GST_TYPE_FRACTION_RANGE, fract[1].num, fract[1].denom, + fract[2].num, fract[2].denom, NULL); + break; + } + case SPA_CHOICE_Enum: + { + GValue l1 = { 0 }, v1 = { 0 }; + + g_value_init (&l1, GST_TYPE_LIST); + for (i = 1; i < n_items; i++) { + g_value_init (&v1, GST_TYPE_FRACTION); + gst_value_set_fraction (&v1, fract[i].num, fract[i].denom); + gst_value_list_append_and_take_value (&l1, &v1); + } + gst_caps_set_value (res, key, &l1); + g_value_unset (&l1); + break; + } + default: + break; + } +} +GstCaps * +gst_caps_from_format (const struct spa_pod *format) +{ + GstCaps *res = NULL; + uint32_t media_type, media_subtype; + const struct spa_pod_prop *prop = NULL; + const struct spa_pod_object *obj = (const struct spa_pod_object *) format; + + if (spa_format_parse(format, &media_type, &media_subtype) < 0) + return res; + + if (media_type == SPA_MEDIA_TYPE_video) { + if (media_subtype == SPA_MEDIA_SUBTYPE_raw) { + res = gst_caps_new_empty_simple ("video/x-raw"); + if ((prop = spa_pod_object_find_prop (obj, prop, SPA_FORMAT_VIDEO_format))) { + handle_id_prop (prop, "format", video_id_to_string, res); + } + } + else if (media_subtype == SPA_MEDIA_SUBTYPE_mjpg) { + res = gst_caps_new_empty_simple ("image/jpeg"); + } + else if (media_subtype == SPA_MEDIA_SUBTYPE_h264) { + res = gst_caps_new_simple ("video/x-h264", + "stream-format", G_TYPE_STRING, "byte-stream", + "alignment", G_TYPE_STRING, "au", + NULL); + } else { + return NULL; + } + if ((prop = spa_pod_object_find_prop (obj, prop, SPA_FORMAT_VIDEO_size))) { + handle_rect_prop (prop, "width", "height", res); + } + if ((prop = spa_pod_object_find_prop (obj, prop, SPA_FORMAT_VIDEO_framerate))) { + handle_fraction_prop (prop, "framerate", res); + } + if ((prop = spa_pod_object_find_prop (obj, prop, SPA_FORMAT_VIDEO_maxFramerate))) { + handle_fraction_prop (prop, "max-framerate", res); + } + } else if (media_type == SPA_MEDIA_TYPE_audio) { + if (media_subtype == SPA_MEDIA_SUBTYPE_raw) { + res = gst_caps_new_simple ("audio/x-raw", + "layout", G_TYPE_STRING, "interleaved", + NULL); + if ((prop = spa_pod_object_find_prop (obj, prop, SPA_FORMAT_AUDIO_format))) { + handle_id_prop (prop, "format", audio_id_to_string, res); + } + if ((prop = spa_pod_object_find_prop (obj, prop, SPA_FORMAT_AUDIO_rate))) { + handle_int_prop (prop, "rate", res); + } + if ((prop = spa_pod_object_find_prop (obj, prop, SPA_FORMAT_AUDIO_channels))) { + handle_int_prop (prop, "channels", res); + } + } + } + return res; +} diff --git a/src/gst/gstpipewireformat.h b/src/gst/gstpipewireformat.h new file mode 100644 index 0000000..e077986 --- /dev/null +++ b/src/gst/gstpipewireformat.h @@ -0,0 +1,42 @@ +/* GStreamer + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef _GST_PIPEWIRE_FORMAT_H_ +#define _GST_PIPEWIRE_FORMAT_H_ + +#include <gst/gst.h> + +#include <spa/pod/pod.h> + +G_BEGIN_DECLS + +struct spa_pod * gst_caps_to_format (GstCaps *caps, + guint index, uint32_t id); +GPtrArray * gst_caps_to_format_all (GstCaps *caps, uint32_t id); + +GstCaps * gst_caps_from_format (const struct spa_pod *format); + +G_END_DECLS + +#endif diff --git a/src/gst/gstpipewirepool.c b/src/gst/gstpipewirepool.c new file mode 100644 index 0000000..7ecb802 --- /dev/null +++ b/src/gst/gstpipewirepool.c @@ -0,0 +1,276 @@ +/* GStreamer + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include "config.h" + +#include <unistd.h> + +#include <gst/gst.h> + +#include <gst/allocators/gstfdmemory.h> +#include <gst/allocators/gstdmabuf.h> + +#include <gst/video/gstvideometa.h> + +#include "gstpipewirepool.h" + +GST_DEBUG_CATEGORY_STATIC (gst_pipewire_pool_debug_category); +#define GST_CAT_DEFAULT gst_pipewire_pool_debug_category + +G_DEFINE_TYPE (GstPipeWirePool, gst_pipewire_pool, GST_TYPE_BUFFER_POOL); + +enum +{ + ACTIVATED, + /* FILL ME */ + LAST_SIGNAL +}; + + +static guint pool_signals[LAST_SIGNAL] = { 0 }; + +static GQuark pool_data_quark; + +GstPipeWirePool * +gst_pipewire_pool_new (void) +{ + GstPipeWirePool *pool; + + pool = g_object_new (GST_TYPE_PIPEWIRE_POOL, NULL); + + return pool; +} + +static void +pool_data_destroy (gpointer user_data) +{ + GstPipeWirePoolData *data = user_data; + + gst_object_unref (data->pool); + g_slice_free (GstPipeWirePoolData, data); +} + +void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b) +{ + GstBuffer *buf; + uint32_t i; + GstPipeWirePoolData *data; + + GST_LOG_OBJECT (pool, "wrap buffer"); + + data = g_slice_new (GstPipeWirePoolData); + + buf = gst_buffer_new (); + + for (i = 0; i < b->buffer->n_datas; i++) { + struct spa_data *d = &b->buffer->datas[i]; + GstMemory *gmem = NULL; + + GST_LOG_OBJECT (pool, "wrap buffer %d %d", d->mapoffset, d->maxsize); + if (d->type == SPA_DATA_MemFd) { + GST_LOG_OBJECT (pool, "memory type MemFd"); + gmem = gst_fd_allocator_alloc (pool->fd_allocator, dup(d->fd), + d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE); + gst_memory_resize (gmem, d->mapoffset, d->maxsize); + } + else if(d->type == SPA_DATA_DmaBuf) { + GST_LOG_OBJECT (pool, "memory type DmaBuf"); + gmem = gst_fd_allocator_alloc (pool->dmabuf_allocator, dup(d->fd), + d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE); + gst_memory_resize (gmem, d->mapoffset, d->maxsize); + } + else if (d->type == SPA_DATA_MemPtr) { + GST_LOG_OBJECT (pool, "memory type MemPtr"); + gmem = gst_memory_new_wrapped (0, d->data, d->maxsize, 0, + d->maxsize, NULL, NULL); + } + if (gmem) + gst_buffer_insert_memory (buf, i, gmem); + } + + data->pool = gst_object_ref (pool); + data->owner = NULL; + data->header = spa_buffer_find_meta_data (b->buffer, SPA_META_Header, sizeof(*data->header)); + data->flags = GST_BUFFER_FLAGS (buf); + data->b = b; + data->buf = buf; + data->crop = spa_buffer_find_meta_data (b->buffer, SPA_META_VideoCrop, sizeof(*data->crop)); + if (data->crop) + gst_buffer_add_video_crop_meta(buf); + data->videotransform = + spa_buffer_find_meta_data (b->buffer, SPA_META_VideoTransform, sizeof(*data->videotransform)); + + gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buf), + pool_data_quark, + data, + pool_data_destroy); + b->user_data = data; +} + +GstPipeWirePoolData *gst_pipewire_pool_get_data (GstBuffer *buffer) +{ + return gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buffer), pool_data_quark); +} + +#if 0 +gboolean +gst_pipewire_pool_add_buffer (GstPipeWirePool *pool, GstBuffer *buffer) +{ + g_return_val_if_fail (GST_IS_PIPEWIRE_POOL (pool), FALSE); + g_return_val_if_fail (GST_IS_BUFFER (buffer), FALSE); + + GST_OBJECT_LOCK (pool); + g_queue_push_tail (&pool->available, buffer); + g_cond_signal (&pool->cond); + GST_OBJECT_UNLOCK (pool); + + return TRUE; +} + +gboolean +gst_pipewire_pool_remove_buffer (GstPipeWirePool *pool, GstBuffer *buffer) +{ + gboolean res; + + g_return_val_if_fail (GST_IS_PIPEWIRE_POOL (pool), FALSE); + g_return_val_if_fail (GST_IS_BUFFER (buffer), FALSE); + + GST_OBJECT_LOCK (pool); + res = g_queue_remove (&pool->available, buffer); + GST_OBJECT_UNLOCK (pool); + + return res; +} +#endif + +static GstFlowReturn +acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer, + GstBufferPoolAcquireParams * params) +{ + GstPipeWirePool *p = GST_PIPEWIRE_POOL (pool); + GstPipeWirePoolData *data; + struct pw_buffer *b; + + GST_OBJECT_LOCK (pool); + while (TRUE) { + if (G_UNLIKELY (GST_BUFFER_POOL_IS_FLUSHING (pool))) + goto flushing; + + if ((b = pw_stream_dequeue_buffer(p->stream))) + break; + + if (params && (params->flags & GST_BUFFER_POOL_ACQUIRE_FLAG_DONTWAIT)) + goto no_more_buffers; + + GST_WARNING ("queue empty"); + g_cond_wait (&p->cond, GST_OBJECT_GET_LOCK (pool)); + } + + data = b->user_data; + *buffer = data->buf; + + GST_OBJECT_UNLOCK (pool); + GST_DEBUG ("acquire buffer %p", *buffer); + + return GST_FLOW_OK; + +flushing: + { + GST_OBJECT_UNLOCK (pool); + return GST_FLOW_FLUSHING; + } +no_more_buffers: + { + GST_LOG_OBJECT (pool, "no more buffers"); + GST_OBJECT_UNLOCK (pool); + return GST_FLOW_EOS; + } +} + +static void +flush_start (GstBufferPool * pool) +{ + GstPipeWirePool *p = GST_PIPEWIRE_POOL (pool); + + GST_DEBUG ("flush start"); + GST_OBJECT_LOCK (pool); + g_cond_signal (&p->cond); + GST_OBJECT_UNLOCK (pool); +} + +static void +release_buffer (GstBufferPool * pool, GstBuffer *buffer) +{ + GST_DEBUG ("release buffer %p", buffer); +} + +static gboolean +do_start (GstBufferPool * pool) +{ + g_signal_emit (pool, pool_signals[ACTIVATED], 0, NULL); + return TRUE; +} + +static void +gst_pipewire_pool_finalize (GObject * object) +{ + GstPipeWirePool *pool = GST_PIPEWIRE_POOL (object); + + GST_DEBUG_OBJECT (pool, "finalize"); + g_object_unref (pool->fd_allocator); + g_object_unref (pool->dmabuf_allocator); + + G_OBJECT_CLASS (gst_pipewire_pool_parent_class)->finalize (object); +} + +static void +gst_pipewire_pool_class_init (GstPipeWirePoolClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstBufferPoolClass *bufferpool_class = GST_BUFFER_POOL_CLASS (klass); + + gobject_class->finalize = gst_pipewire_pool_finalize; + + bufferpool_class->start = do_start; + bufferpool_class->flush_start = flush_start; + bufferpool_class->acquire_buffer = acquire_buffer; + bufferpool_class->release_buffer = release_buffer; + + pool_signals[ACTIVATED] = + g_signal_new ("activated", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, + 0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 0, G_TYPE_NONE); + + GST_DEBUG_CATEGORY_INIT (gst_pipewire_pool_debug_category, "pipewirepool", 0, + "debug category for pipewirepool object"); + + pool_data_quark = g_quark_from_static_string ("GstPipeWirePoolDataQuark"); +} + +static void +gst_pipewire_pool_init (GstPipeWirePool * pool) +{ + pool->fd_allocator = gst_fd_allocator_new (); + pool->dmabuf_allocator = gst_dmabuf_allocator_new (); + g_cond_init (&pool->cond); +} diff --git a/src/gst/gstpipewirepool.h b/src/gst/gstpipewirepool.h new file mode 100644 index 0000000..acf8106 --- /dev/null +++ b/src/gst/gstpipewirepool.h @@ -0,0 +1,92 @@ +/* GStreamer + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef __GST_PIPEWIRE_POOL_H__ +#define __GST_PIPEWIRE_POOL_H__ + +#include <gst/gst.h> + +#include <pipewire/pipewire.h> + +G_BEGIN_DECLS + +#define GST_TYPE_PIPEWIRE_POOL \ + (gst_pipewire_pool_get_type()) +#define GST_PIPEWIRE_POOL(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_PIPEWIRE_POOL,GstPipeWirePool)) +#define GST_PIPEWIRE_POOL_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_PIPEWIRE_POOL,GstPipeWirePoolClass)) +#define GST_IS_PIPEWIRE_POOL(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PIPEWIRE_POOL)) +#define GST_IS_PIPEWIRE_POOL_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PIPEWIRE_POOL)) +#define GST_PIPEWIRE_POOL_GET_CLASS(klass) \ + (G_TYPE_INSTANCE_GET_CLASS ((klass), GST_TYPE_PIPEWIRE_POOL, GstPipeWirePoolClass)) + +typedef struct _GstPipeWirePoolData GstPipeWirePoolData; +typedef struct _GstPipeWirePool GstPipeWirePool; +typedef struct _GstPipeWirePoolClass GstPipeWirePoolClass; + +struct _GstPipeWirePoolData { + GstPipeWirePool *pool; + void *owner; + struct spa_meta_header *header; + guint flags; + struct pw_buffer *b; + GstBuffer *buf; + gboolean queued; + struct spa_meta_region *crop; + struct spa_meta_videotransform *videotransform; +}; + +struct _GstPipeWirePool { + GstBufferPool parent; + + struct pw_stream *stream; + struct pw_type *t; + + GstAllocator *fd_allocator; + GstAllocator *dmabuf_allocator; + + GCond cond; +}; + +struct _GstPipeWirePoolClass { + GstBufferPoolClass parent_class; +}; + +GType gst_pipewire_pool_get_type (void); + +GstPipeWirePool * gst_pipewire_pool_new (void); + +void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *buffer); + +GstPipeWirePoolData *gst_pipewire_pool_get_data (GstBuffer *buffer); + +//gboolean gst_pipewire_pool_add_buffer (GstPipeWirePool *pool, GstBuffer *buffer); +//gboolean gst_pipewire_pool_remove_buffer (GstPipeWirePool *pool, GstBuffer *buffer); + +G_END_DECLS + +#endif /* __GST_PIPEWIRE_POOL_H__ */ diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c new file mode 100644 index 0000000..91bdfcc --- /dev/null +++ b/src/gst/gstpipewiresink.c @@ -0,0 +1,924 @@ +/* GStreamer + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +/** + * SECTION:element-pipewiresink + * + * <refsect2> + * <title>Example launch line</title> + * |[ + * gst-launch -v videotestsrc ! pipewiresink + * ]| Sends a test video source to PipeWire + * </refsect2> + */ + +#define PW_ENABLE_DEPRECATED + +#include "config.h" +#include "gstpipewiresink.h" + +#include <string.h> +#include <stdlib.h> +#include <fcntl.h> +#include <unistd.h> +#include <sys/socket.h> + +#include <spa/pod/builder.h> +#include <spa/utils/result.h> + +#include <gst/video/video.h> + +#include "gstpipewireformat.h" + +GST_DEBUG_CATEGORY_STATIC (pipewire_sink_debug); +#define GST_CAT_DEFAULT pipewire_sink_debug + +#define DEFAULT_PROP_MODE GST_PIPEWIRE_SINK_MODE_DEFAULT + +#define MIN_BUFFERS 8u + +enum +{ + PROP_0, + PROP_PATH, + PROP_TARGET_OBJECT, + PROP_CLIENT_NAME, + PROP_CLIENT_PROPERTIES, + PROP_STREAM_PROPERTIES, + PROP_MODE, + PROP_FD +}; + +GType +gst_pipewire_sink_mode_get_type (void) +{ + static gsize mode_type = 0; + static const GEnumValue mode[] = { + {GST_PIPEWIRE_SINK_MODE_DEFAULT, "GST_PIPEWIRE_SINK_MODE_DEFAULT", "default"}, + {GST_PIPEWIRE_SINK_MODE_RENDER, "GST_PIPEWIRE_SINK_MODE_RENDER", "render"}, + {GST_PIPEWIRE_SINK_MODE_PROVIDE, "GST_PIPEWIRE_SINK_MODE_PROVIDE", "provide"}, + {0, NULL, NULL}, + }; + + if (g_once_init_enter (&mode_type)) { + GType tmp = + g_enum_register_static ("GstPipeWireSinkMode", mode); + g_once_init_leave (&mode_type, tmp); + } + + return (GType) mode_type; +} + + +static GstStaticPadTemplate gst_pipewire_sink_template = +GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY + ); + +#define gst_pipewire_sink_parent_class parent_class +G_DEFINE_TYPE (GstPipeWireSink, gst_pipewire_sink, GST_TYPE_BASE_SINK); + +static void gst_pipewire_sink_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_pipewire_sink_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); + +static GstStateChangeReturn +gst_pipewire_sink_change_state (GstElement * element, GstStateChange transition); + +static gboolean gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps); +static GstCaps *gst_pipewire_sink_sink_fixate (GstBaseSink * bsink, + GstCaps * caps); + +static GstFlowReturn gst_pipewire_sink_render (GstBaseSink * psink, + GstBuffer * buffer); +static gboolean gst_pipewire_sink_start (GstBaseSink * basesink); +static gboolean gst_pipewire_sink_stop (GstBaseSink * basesink); + +static void +gst_pipewire_sink_finalize (GObject * object) +{ + GstPipeWireSink *pwsink = GST_PIPEWIRE_SINK (object); + + g_object_unref (pwsink->pool); + + if (pwsink->stream_properties) + gst_structure_free (pwsink->stream_properties); + if (pwsink->client_properties) + gst_structure_free (pwsink->client_properties); + g_free (pwsink->path); + g_free (pwsink->target_object); + g_free (pwsink->client_name); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static gboolean +gst_pipewire_sink_propose_allocation (GstBaseSink * bsink, GstQuery * query) +{ + GstPipeWireSink *pwsink = GST_PIPEWIRE_SINK (bsink); + + gst_query_add_allocation_pool (query, GST_BUFFER_POOL_CAST (pwsink->pool), 0, 0, 0); + return TRUE; +} + +static void +gst_pipewire_sink_class_init (GstPipeWireSinkClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + GstBaseSinkClass *gstbasesink_class; + + gobject_class = (GObjectClass *) klass; + gstelement_class = (GstElementClass *) klass; + gstbasesink_class = (GstBaseSinkClass *) klass; + + gobject_class->finalize = gst_pipewire_sink_finalize; + gobject_class->set_property = gst_pipewire_sink_set_property; + gobject_class->get_property = gst_pipewire_sink_get_property; + + g_object_class_install_property (gobject_class, + PROP_PATH, + g_param_spec_string ("path", + "Path", + "The sink path to connect to (NULL = default)", + NULL, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS | + G_PARAM_DEPRECATED)); + + g_object_class_install_property (gobject_class, + PROP_TARGET_OBJECT, + g_param_spec_string ("target-object", + "Target object", + "The sink name/serial to connect to (NULL = default)", + NULL, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_CLIENT_NAME, + g_param_spec_string ("client-name", + "Client Name", + "The client name to use (NULL = default)", + NULL, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_CLIENT_PROPERTIES, + g_param_spec_boxed ("client-properties", + "Client properties", + "List of PipeWire client properties", + GST_TYPE_STRUCTURE, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_STREAM_PROPERTIES, + g_param_spec_boxed ("stream-properties", + "Stream properties", + "List of PipeWire stream properties", + GST_TYPE_STRUCTURE, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_MODE, + g_param_spec_enum ("mode", + "Mode", + "The mode to operate in", + GST_TYPE_PIPEWIRE_SINK_MODE, + DEFAULT_PROP_MODE, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_FD, + g_param_spec_int ("fd", + "Fd", + "The fd to connect with", + -1, G_MAXINT, -1, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + gstelement_class->change_state = gst_pipewire_sink_change_state; + + gst_element_class_set_static_metadata (gstelement_class, + "PipeWire sink", "Sink/Video", + "Send video to PipeWire", "Wim Taymans <wim.taymans@gmail.com>"); + + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&gst_pipewire_sink_template)); + + gstbasesink_class->set_caps = gst_pipewire_sink_setcaps; + gstbasesink_class->fixate = gst_pipewire_sink_sink_fixate; + gstbasesink_class->propose_allocation = gst_pipewire_sink_propose_allocation; + gstbasesink_class->start = gst_pipewire_sink_start; + gstbasesink_class->stop = gst_pipewire_sink_stop; + gstbasesink_class->render = gst_pipewire_sink_render; + + GST_DEBUG_CATEGORY_INIT (pipewire_sink_debug, "pipewiresink", 0, + "PipeWire Sink"); +} + +static void +pool_activated (GstPipeWirePool *pool, GstPipeWireSink *sink) +{ + GstStructure *config; + GstCaps *caps; + guint size; + guint min_buffers; + guint max_buffers; + const struct spa_pod *port_params[3]; + struct spa_pod_builder b = { NULL }; + uint8_t buffer[1024]; + struct spa_pod_frame f; + + config = gst_buffer_pool_get_config (GST_BUFFER_POOL (pool)); + gst_buffer_pool_config_get_params (config, &caps, &size, &min_buffers, &max_buffers); + + spa_pod_builder_init (&b, buffer, sizeof (buffer)); + spa_pod_builder_push_object (&b, &f, SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers); + if (size == 0) + spa_pod_builder_add (&b, + SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(0, 0, INT32_MAX), + 0); + else + spa_pod_builder_add (&b, + SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(size, size, INT32_MAX), + 0); + + spa_pod_builder_add (&b, + SPA_PARAM_BUFFERS_stride, SPA_POD_CHOICE_RANGE_Int(0, 0, INT32_MAX), + SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int( + SPA_MAX(MIN_BUFFERS, min_buffers), + SPA_MAX(MIN_BUFFERS, min_buffers), + max_buffers ? max_buffers : INT32_MAX), + SPA_PARAM_BUFFERS_dataType, SPA_POD_CHOICE_FLAGS_Int( + (1<<SPA_DATA_MemFd) | + (1<<SPA_DATA_MemPtr)), + 0); + port_params[0] = spa_pod_builder_pop (&b, &f); + + port_params[1] = spa_pod_builder_add_object (&b, + SPA_TYPE_OBJECT_ParamMeta, SPA_PARAM_Meta, + SPA_PARAM_META_type, SPA_POD_Int(SPA_META_Header), + SPA_PARAM_META_size, SPA_POD_Int(sizeof (struct spa_meta_header))); + + port_params[2] = spa_pod_builder_add_object (&b, + SPA_TYPE_OBJECT_ParamMeta, SPA_PARAM_Meta, + SPA_PARAM_META_type, SPA_POD_Int(SPA_META_VideoCrop), + SPA_PARAM_META_size, SPA_POD_Int(sizeof (struct spa_meta_region))); + + pw_thread_loop_lock (sink->core->loop); + pw_stream_update_params (sink->stream, port_params, 3); + pw_thread_loop_unlock (sink->core->loop); +} + +static void +gst_pipewire_sink_init (GstPipeWireSink * sink) +{ + sink->pool = gst_pipewire_pool_new (); + sink->client_name = g_strdup(pw_get_client_name()); + sink->mode = DEFAULT_PROP_MODE; + sink->fd = -1; + + g_signal_connect (sink->pool, "activated", G_CALLBACK (pool_activated), sink); +} + +static GstCaps * +gst_pipewire_sink_sink_fixate (GstBaseSink * bsink, GstCaps * caps) +{ + GstStructure *structure; + + caps = gst_caps_make_writable (caps); + + structure = gst_caps_get_structure (caps, 0); + + if (gst_structure_has_name (structure, "video/x-raw")) { + gst_structure_fixate_field_nearest_int (structure, "width", 320); + gst_structure_fixate_field_nearest_int (structure, "height", 240); + gst_structure_fixate_field_nearest_fraction (structure, "framerate", 30, 1); + + if (gst_structure_has_field (structure, "pixel-aspect-ratio")) + gst_structure_fixate_field_nearest_fraction (structure, + "pixel-aspect-ratio", 1, 1); + else + gst_structure_set (structure, "pixel-aspect-ratio", GST_TYPE_FRACTION, 1, 1, + NULL); + + if (gst_structure_has_field (structure, "colorimetry")) + gst_structure_fixate_field_string (structure, "colorimetry", "bt601"); + if (gst_structure_has_field (structure, "chroma-site")) + gst_structure_fixate_field_string (structure, "chroma-site", "mpeg2"); + + if (gst_structure_has_field (structure, "interlace-mode")) + gst_structure_fixate_field_string (structure, "interlace-mode", + "progressive"); + else + gst_structure_set (structure, "interlace-mode", G_TYPE_STRING, + "progressive", NULL); + } else if (gst_structure_has_name (structure, "audio/x-raw")) { + gst_structure_fixate_field_string (structure, "format", "S16LE"); + gst_structure_fixate_field_nearest_int (structure, "channels", 2); + gst_structure_fixate_field_nearest_int (structure, "rate", 44100); + } else if (gst_structure_has_name (structure, "audio/mpeg")) { + gst_structure_fixate_field_string (structure, "format", "Encoded"); + gst_structure_fixate_field_nearest_int (structure, "channels", 2); + gst_structure_fixate_field_nearest_int (structure, "rate", 44100); + } else if (gst_structure_has_name (structure, "audio/x-flac")) { + gst_structure_fixate_field_string (structure, "format", "Encoded"); + gst_structure_fixate_field_nearest_int (structure, "channels", 2); + gst_structure_fixate_field_nearest_int (structure, "rate", 44100); + } + + caps = GST_BASE_SINK_CLASS (parent_class)->fixate (bsink, caps); + + return caps; +} + +static void +gst_pipewire_sink_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstPipeWireSink *pwsink = GST_PIPEWIRE_SINK (object); + + switch (prop_id) { + case PROP_PATH: + g_free (pwsink->path); + pwsink->path = g_value_dup_string (value); + break; + + case PROP_TARGET_OBJECT: + g_free (pwsink->target_object); + pwsink->target_object = g_value_dup_string (value); + break; + + case PROP_CLIENT_NAME: + g_free (pwsink->client_name); + pwsink->client_name = g_value_dup_string (value); + break; + + case PROP_CLIENT_PROPERTIES: + if (pwsink->client_properties) + gst_structure_free (pwsink->client_properties); + pwsink->client_properties = + gst_structure_copy (gst_value_get_structure (value)); + break; + + case PROP_STREAM_PROPERTIES: + if (pwsink->stream_properties) + gst_structure_free (pwsink->stream_properties); + pwsink->stream_properties = + gst_structure_copy (gst_value_get_structure (value)); + break; + + case PROP_MODE: + pwsink->mode = g_value_get_enum (value); + break; + + case PROP_FD: + pwsink->fd = g_value_get_int (value); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_pipewire_sink_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstPipeWireSink *pwsink = GST_PIPEWIRE_SINK (object); + + switch (prop_id) { + case PROP_PATH: + g_value_set_string (value, pwsink->path); + break; + + case PROP_TARGET_OBJECT: + g_value_set_string (value, pwsink->target_object); + break; + + case PROP_CLIENT_NAME: + g_value_set_string (value, pwsink->client_name); + break; + + case PROP_CLIENT_PROPERTIES: + gst_value_set_structure (value, pwsink->client_properties); + break; + + case PROP_STREAM_PROPERTIES: + gst_value_set_structure (value, pwsink->stream_properties); + break; + + case PROP_MODE: + g_value_set_enum (value, pwsink->mode); + break; + + case PROP_FD: + g_value_set_int (value, pwsink->fd); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +on_add_buffer (void *_data, struct pw_buffer *b) +{ + GstPipeWireSink *pwsink = _data; + gst_pipewire_pool_wrap_buffer (pwsink->pool, b); +} + +static void +on_remove_buffer (void *_data, struct pw_buffer *b) +{ + GstPipeWireSink *pwsink = _data; + GstPipeWirePoolData *data = b->user_data; + + GST_LOG_OBJECT (pwsink, "remove buffer"); + + gst_buffer_unref (data->buf); +} + +static void +do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer) +{ + GstPipeWirePoolData *data; + gboolean res; + guint i; + struct spa_buffer *b; + + data = gst_pipewire_pool_get_data(buffer); + + b = data->b->buffer; + + if (data->header) { + data->header->seq = GST_BUFFER_OFFSET (buffer); + data->header->pts = GST_BUFFER_PTS (buffer); + data->header->dts_offset = GST_BUFFER_DTS (buffer); + } + if (data->crop) { + GstVideoCropMeta *meta = gst_buffer_get_video_crop_meta (buffer); + if (meta) { + data->crop->region.position.x = meta->x; + data->crop->region.position.y = meta->y; + data->crop->region.size.width = meta->width; + data->crop->region.size.height = meta->width; + } + } + for (i = 0; i < b->n_datas; i++) { + struct spa_data *d = &b->datas[i]; + GstMemory *mem = gst_buffer_peek_memory (buffer, i); + d->chunk->offset = mem->offset; + d->chunk->size = mem->size; + d->chunk->stride = 0; + } + + if ((res = pw_stream_queue_buffer (pwsink->stream, data->b)) < 0) { + g_warning ("can't send buffer %s", spa_strerror(res)); + } +} + + +static void +on_process (void *data) +{ + GstPipeWireSink *pwsink = data; + GST_DEBUG ("signal"); + g_cond_signal (&pwsink->pool->cond); +} + +static void +on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state state, const char *error) +{ + GstPipeWireSink *pwsink = data; + + GST_DEBUG ("got stream state %d", state); + + switch (state) { + case PW_STREAM_STATE_UNCONNECTED: + case PW_STREAM_STATE_CONNECTING: + case PW_STREAM_STATE_PAUSED: + break; + case PW_STREAM_STATE_STREAMING: + if (pw_stream_is_driving (pwsink->stream)) + pw_stream_trigger_process (pwsink->stream); + break; + case PW_STREAM_STATE_ERROR: + GST_ELEMENT_ERROR (pwsink, RESOURCE, FAILED, + ("stream error: %s", error), (NULL)); + break; + } + pw_thread_loop_signal (pwsink->core->loop, FALSE); +} + +static void +on_param_changed (void *data, uint32_t id, const struct spa_pod *param) +{ + GstPipeWireSink *pwsink = data; + + if (param == NULL || id != SPA_PARAM_Format) + return; + + if (gst_buffer_pool_is_active (GST_BUFFER_POOL_CAST (pwsink->pool))) + pool_activated (pwsink->pool, pwsink); +} + +static gboolean +gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) +{ + GstPipeWireSink *pwsink; + GPtrArray *possible; + enum pw_stream_state state; + const char *error = NULL; + gboolean res = FALSE; + GstStructure *config; + guint size; + guint min_buffers; + guint max_buffers; + struct timespec abstime; + + pwsink = GST_PIPEWIRE_SINK (bsink); + + possible = gst_caps_to_format_all (caps, SPA_PARAM_EnumFormat); + + pw_thread_loop_lock (pwsink->core->loop); + state = pw_stream_get_state (pwsink->stream, &error); + + if (state == PW_STREAM_STATE_ERROR) + goto start_error; + + if (state == PW_STREAM_STATE_UNCONNECTED) { + enum pw_stream_flags flags = 0; + uint32_t target_id; + + if (pwsink->mode != GST_PIPEWIRE_SINK_MODE_PROVIDE) + flags |= PW_STREAM_FLAG_AUTOCONNECT; + else + flags |= PW_STREAM_FLAG_DRIVER; + + target_id = pwsink->path ? (uint32_t)atoi(pwsink->path) : PW_ID_ANY; + + if (pwsink->target_object) { + struct spa_dict_item items[2] = { + SPA_DICT_ITEM_INIT(PW_KEY_TARGET_OBJECT, pwsink->target_object), + /* XXX deprecated but the portal and some example apps only + * provide the object id */ + SPA_DICT_ITEM_INIT(PW_KEY_NODE_TARGET, NULL), + }; + struct spa_dict dict = SPA_DICT_INIT_ARRAY(items); + uint64_t serial; + + /* If target.object is a name, set it also to node.target */ + if (spa_atou64(pwsink->target_object, &serial, 0)) { + dict.n_items = 1; + } else { + target_id = PW_ID_ANY; + items[1].value = pwsink->target_object; + } + + pw_stream_update_properties (pwsink->stream, &dict); + } + + pw_stream_connect (pwsink->stream, + PW_DIRECTION_OUTPUT, + target_id, + flags, + (const struct spa_pod **) possible->pdata, + possible->len); + + pw_thread_loop_get_time (pwsink->core->loop, &abstime, + GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); + + while (TRUE) { + state = pw_stream_get_state (pwsink->stream, &error); + + if (state >= PW_STREAM_STATE_PAUSED) + break; + + if (state == PW_STREAM_STATE_ERROR) + goto start_error; + + if (pw_thread_loop_timed_wait_full (pwsink->core->loop, &abstime) < 0) { + error = "timeout"; + goto start_error; + } + } + } + res = TRUE; + + config = gst_buffer_pool_get_config (GST_BUFFER_POOL_CAST (pwsink->pool)); + gst_buffer_pool_config_get_params (config, NULL, &size, &min_buffers, &max_buffers); + gst_buffer_pool_config_set_params (config, caps, size, min_buffers, max_buffers); + gst_buffer_pool_set_config (GST_BUFFER_POOL_CAST (pwsink->pool), config); + + pw_thread_loop_unlock (pwsink->core->loop); + + pwsink->negotiated = res; + + return res; + +start_error: + { + GST_ERROR ("could not start stream: %s", error); + pw_thread_loop_unlock (pwsink->core->loop); + g_ptr_array_unref (possible); + return FALSE; + } +} + +static GstFlowReturn +gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) +{ + GstPipeWireSink *pwsink; + GstFlowReturn res = GST_FLOW_OK; + const char *error = NULL; + gboolean unref_buffer = FALSE; + + pwsink = GST_PIPEWIRE_SINK (bsink); + + if (!pwsink->negotiated) + goto not_negotiated; + + if (buffer->pool != GST_BUFFER_POOL_CAST (pwsink->pool) && + !gst_buffer_pool_is_active (GST_BUFFER_POOL_CAST (pwsink->pool))) { + GstStructure *config; + GstCaps *caps; + guint size, min_buffers, max_buffers; + + config = gst_buffer_pool_get_config (GST_BUFFER_POOL_CAST (pwsink->pool)); + gst_buffer_pool_config_get_params (config, &caps, &size, &min_buffers, &max_buffers); + + size = (size == 0) ? gst_buffer_get_size (buffer) : size; + + gst_buffer_pool_config_set_params (config, caps, size, min_buffers, max_buffers); + gst_buffer_pool_set_config (GST_BUFFER_POOL_CAST (pwsink->pool), config); + + gst_buffer_pool_set_active (GST_BUFFER_POOL_CAST (pwsink->pool), TRUE); + } + + pw_thread_loop_lock (pwsink->core->loop); + if (pw_stream_get_state (pwsink->stream, &error) != PW_STREAM_STATE_STREAMING) + goto done_unlock; + + if (buffer->pool != GST_BUFFER_POOL_CAST (pwsink->pool)) { + GstBuffer *b = NULL; + GstMapInfo info = { 0, }; + GstBufferPoolAcquireParams params = { 0, }; + + pw_thread_loop_unlock (pwsink->core->loop); + + if ((res = gst_buffer_pool_acquire_buffer (GST_BUFFER_POOL_CAST (pwsink->pool), &b, ¶ms)) != GST_FLOW_OK) + goto done; + + gst_buffer_map (b, &info, GST_MAP_WRITE); + gst_buffer_extract (buffer, 0, info.data, info.maxsize); + gst_buffer_unmap (b, &info); + gst_buffer_resize (b, 0, gst_buffer_get_size (buffer)); + buffer = b; + unref_buffer = TRUE; + + pw_thread_loop_lock (pwsink->core->loop); + if (pw_stream_get_state (pwsink->stream, &error) != PW_STREAM_STATE_STREAMING) + goto done_unlock; + } + + GST_DEBUG ("push buffer"); + do_send_buffer (pwsink, buffer); + if (unref_buffer) + gst_buffer_unref (buffer); + + if (pw_stream_is_driving (pwsink->stream)) + pw_stream_trigger_process (pwsink->stream); + +done_unlock: + pw_thread_loop_unlock (pwsink->core->loop); +done: + return res; + +not_negotiated: + { + return GST_FLOW_NOT_NEGOTIATED; + } +} + +static gboolean +copy_properties (GQuark field_id, + const GValue *value, + gpointer user_data) +{ + struct pw_properties *properties = user_data; + GValue dst = { 0 }; + + if (g_value_type_transformable (G_VALUE_TYPE(value), G_TYPE_STRING)) { + g_value_init(&dst, G_TYPE_STRING); + if (g_value_transform(value, &dst)) { + pw_properties_set (properties, + g_quark_to_string (field_id), + g_value_get_string (&dst)); + } + g_value_unset(&dst); + } + return TRUE; +} + +static const struct pw_stream_events stream_events = { + PW_VERSION_STREAM_EVENTS, + .state_changed = on_state_changed, + .param_changed = on_param_changed, + .add_buffer = on_add_buffer, + .remove_buffer = on_remove_buffer, + .process = on_process, +}; + +static gboolean +gst_pipewire_sink_start (GstBaseSink * basesink) +{ + GstPipeWireSink *pwsink = GST_PIPEWIRE_SINK (basesink); + struct pw_properties *props; + + pwsink->negotiated = FALSE; + + pw_thread_loop_lock (pwsink->core->loop); + + props = pw_properties_new (NULL, NULL); + if (pwsink->client_name) { + pw_properties_set (props, PW_KEY_NODE_NAME, pwsink->client_name); + pw_properties_set (props, PW_KEY_NODE_DESCRIPTION, pwsink->client_name); + } + if (pwsink->stream_properties) { + gst_structure_foreach (pwsink->stream_properties, copy_properties, props); + } + + if ((pwsink->stream = pw_stream_new (pwsink->core->core, pwsink->client_name, props)) == NULL) + goto no_stream; + + pwsink->pool->stream = pwsink->stream; + + pw_stream_add_listener(pwsink->stream, + &pwsink->stream_listener, + &stream_events, + pwsink); + + pw_thread_loop_unlock (pwsink->core->loop); + + return TRUE; + +no_stream: + { + GST_ELEMENT_ERROR (pwsink, RESOURCE, FAILED, ("can't create stream"), (NULL)); + pw_thread_loop_unlock (pwsink->core->loop); + return FALSE; + } +} + +static gboolean +gst_pipewire_sink_stop (GstBaseSink * basesink) +{ + GstPipeWireSink *pwsink = GST_PIPEWIRE_SINK (basesink); + + pw_thread_loop_lock (pwsink->core->loop); + if (pwsink->stream) { + pw_stream_destroy (pwsink->stream); + pwsink->stream = NULL; + pwsink->pool->stream = NULL; + } + pw_thread_loop_unlock (pwsink->core->loop); + + pwsink->negotiated = FALSE; + + return TRUE; +} + +static gboolean +gst_pipewire_sink_open (GstPipeWireSink * pwsink) +{ + struct pw_properties *props; + + GST_DEBUG_OBJECT (pwsink, "open"); + + pwsink->core = gst_pipewire_core_get(pwsink->fd); + if (pwsink->core == NULL) + goto connect_error; + + pw_thread_loop_lock (pwsink->core->loop); + + props = pw_properties_new (NULL, NULL); + if (pwsink->client_properties) { + gst_structure_foreach (pwsink->client_properties, copy_properties, props); + pw_core_update_properties (pwsink->core->core, &props->dict); + } + pw_properties_free(props); + pw_thread_loop_unlock (pwsink->core->loop); + + return TRUE; + + /* ERRORS */ +connect_error: + { + GST_ELEMENT_ERROR (pwsink, RESOURCE, FAILED, + ("Failed to connect"), (NULL)); + return FALSE; + } +} + +static gboolean +gst_pipewire_sink_close (GstPipeWireSink * pwsink) +{ + pw_thread_loop_lock (pwsink->core->loop); + if (pwsink->stream) { + pw_stream_destroy (pwsink->stream); + pwsink->stream = NULL; + } + pw_thread_loop_unlock (pwsink->core->loop); + + if (pwsink->core) { + gst_pipewire_core_release (pwsink->core); + pwsink->core = NULL; + } + return TRUE; +} + +static GstStateChangeReturn +gst_pipewire_sink_change_state (GstElement * element, GstStateChange transition) +{ + GstStateChangeReturn ret; + GstPipeWireSink *this = GST_PIPEWIRE_SINK_CAST (element); + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + if (!gst_pipewire_sink_open (this)) + goto open_failed; + break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + break; + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + /* uncork and start play */ + pw_thread_loop_lock (this->core->loop); + pw_stream_set_active(this->stream, true); + pw_thread_loop_unlock (this->core->loop); + gst_buffer_pool_set_flushing(GST_BUFFER_POOL_CAST(this->pool), FALSE); + break; + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + /* stop play ASAP by corking */ + pw_thread_loop_lock (this->core->loop); + pw_stream_set_active(this->stream, false); + pw_thread_loop_unlock (this->core->loop); + gst_buffer_pool_set_flushing(GST_BUFFER_POOL_CAST(this->pool), TRUE); + break; + default: + break; + } + + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + + switch (transition) { + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + break; + case GST_STATE_CHANGE_PAUSED_TO_READY: + gst_buffer_pool_set_active(GST_BUFFER_POOL_CAST(this->pool), FALSE); + break; + case GST_STATE_CHANGE_READY_TO_NULL: + gst_pipewire_sink_close (this); + break; + default: + break; + } + return ret; + + /* ERRORS */ +open_failed: + { + return GST_STATE_CHANGE_FAILURE; + } +} diff --git a/src/gst/gstpipewiresink.h b/src/gst/gstpipewiresink.h new file mode 100644 index 0000000..703cd0f --- /dev/null +++ b/src/gst/gstpipewiresink.h @@ -0,0 +1,110 @@ +/* GStreamer + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef __GST_PIPEWIRE_SINK_H__ +#define __GST_PIPEWIRE_SINK_H__ + +#include <gst/gst.h> +#include <gst/base/gstbasesink.h> + +#include <pipewire/pipewire.h> +#include <gst/gstpipewirepool.h> +#include <gst/gstpipewirecore.h> + +G_BEGIN_DECLS + +#define GST_TYPE_PIPEWIRE_SINK \ + (gst_pipewire_sink_get_type()) +#define GST_PIPEWIRE_SINK(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_PIPEWIRE_SINK,GstPipeWireSink)) +#define GST_PIPEWIRE_SINK_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_PIPEWIRE_SINK,GstPipeWireSinkClass)) +#define GST_IS_PIPEWIRE_SINK(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PIPEWIRE_SINK)) +#define GST_IS_PIPEWIRE_SINK_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PIPEWIRE_SINK)) +#define GST_PIPEWIRE_SINK_CAST(obj) \ + ((GstPipeWireSink *) (obj)) + +typedef struct _GstPipeWireSink GstPipeWireSink; +typedef struct _GstPipeWireSinkClass GstPipeWireSinkClass; + + +/** + * GstPipeWireSinkMode: + * @GST_PIPEWIRE_SINK_MODE_DEFAULT: the default mode as configured in the server + * @GST_PIPEWIRE_SINK_MODE_RENDER: try to render the media + * @GST_PIPEWIRE_SINK_MODE_PROVIDE: provide the media + * + * Different modes of operation. + */ +typedef enum +{ + GST_PIPEWIRE_SINK_MODE_DEFAULT, + GST_PIPEWIRE_SINK_MODE_RENDER, + GST_PIPEWIRE_SINK_MODE_PROVIDE, +} GstPipeWireSinkMode; + +#define GST_TYPE_PIPEWIRE_SINK_MODE (gst_pipewire_sink_mode_get_type ()) + +/** + * GstPipeWireSink: + * + * Opaque data structure. + */ +struct _GstPipeWireSink { + GstBaseSink element; + + /*< private >*/ + gchar *path; + gchar *target_object; + gchar *client_name; + int fd; + + /* video state */ + gboolean negotiated; + + GstPipeWireCore *core; + struct spa_hook core_listener; + GstStructure *client_properties; + + struct pw_stream *stream; + struct spa_hook stream_listener; + + GstStructure *stream_properties; + GstPipeWireSinkMode mode; + + GstPipeWirePool *pool; +}; + +struct _GstPipeWireSinkClass { + GstBaseSinkClass parent_class; +}; + +GType gst_pipewire_sink_get_type (void); +GType gst_pipewire_sink_mode_get_type (void); + +G_END_DECLS + +#endif /* __GST_PIPEWIRE_SINK_H__ */ diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c new file mode 100644 index 0000000..7243cc1 --- /dev/null +++ b/src/gst/gstpipewiresrc.c @@ -0,0 +1,1439 @@ +/* GStreamer + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +/** + * SECTION:element-pipewiresrc + * + * <refsect2> + * <title>Example launch line</title> + * |[ + * gst-launch -v pipewiresrc ! videoconvert ! ximagesink + * ]| Shows pipewire output in an X window. + * </refsect2> + */ + +#define PW_ENABLE_DEPRECATED + +#include "config.h" +#include "gstpipewiresrc.h" +#include "gstpipewireformat.h" + +#include <string.h> +#include <stdlib.h> +#include <fcntl.h> +#include <sys/socket.h> +#include <unistd.h> + +#include <spa/param/video/format.h> +#include <spa/pod/builder.h> +#include <spa/utils/result.h> + +#include <gst/net/gstnetclientclock.h> +#include <gst/allocators/gstfdmemory.h> +#include <gst/allocators/gstdmabuf.h> +#include <gst/video/video.h> + +#include "gstpipewireclock.h" + +static GQuark process_mem_data_quark; + +GST_DEBUG_CATEGORY_STATIC (pipewire_src_debug); +#define GST_CAT_DEFAULT pipewire_src_debug + +#define DEFAULT_ALWAYS_COPY false +#define DEFAULT_MIN_BUFFERS 8 +#define DEFAULT_MAX_BUFFERS INT32_MAX +#define DEFAULT_RESEND_LAST false +#define DEFAULT_KEEPALIVE_TIME 0 + +enum +{ + PROP_0, + PROP_PATH, + PROP_TARGET_OBJECT, + PROP_CLIENT_NAME, + PROP_CLIENT_PROPERTIES, + PROP_STREAM_PROPERTIES, + PROP_ALWAYS_COPY, + PROP_MIN_BUFFERS, + PROP_MAX_BUFFERS, + PROP_FD, + PROP_RESEND_LAST, + PROP_KEEPALIVE_TIME, +}; + + +static GstStaticPadTemplate gst_pipewire_src_template = +GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY + ); + +#define gst_pipewire_src_parent_class parent_class +G_DEFINE_TYPE (GstPipeWireSrc, gst_pipewire_src, GST_TYPE_PUSH_SRC); + +static GstStateChangeReturn +gst_pipewire_src_change_state (GstElement * element, GstStateChange transition); + +static gboolean gst_pipewire_src_send_event (GstElement * elem, GstEvent * event); + +static gboolean gst_pipewire_src_negotiate (GstBaseSrc * basesrc); + +static GstFlowReturn gst_pipewire_src_create (GstPushSrc * psrc, + GstBuffer ** buffer); +static gboolean gst_pipewire_src_unlock (GstBaseSrc * basesrc); +static gboolean gst_pipewire_src_unlock_stop (GstBaseSrc * basesrc); +static gboolean gst_pipewire_src_start (GstBaseSrc * basesrc); +static gboolean gst_pipewire_src_stop (GstBaseSrc * basesrc); +static gboolean gst_pipewire_src_event (GstBaseSrc * src, GstEvent * event); +static gboolean gst_pipewire_src_query (GstBaseSrc * src, GstQuery * query); + +static void +gst_pipewire_src_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (object); + + switch (prop_id) { + case PROP_PATH: + g_free (pwsrc->path); + pwsrc->path = g_value_dup_string (value); + break; + + case PROP_TARGET_OBJECT: + g_free (pwsrc->target_object); + pwsrc->target_object = g_value_dup_string (value); + break; + + case PROP_CLIENT_NAME: + g_free (pwsrc->client_name); + pwsrc->client_name = g_value_dup_string (value); + break; + + case PROP_CLIENT_PROPERTIES: + if (pwsrc->client_properties) + gst_structure_free (pwsrc->client_properties); + pwsrc->client_properties = + gst_structure_copy (gst_value_get_structure (value)); + break; + + case PROP_STREAM_PROPERTIES: + if (pwsrc->stream_properties) + gst_structure_free (pwsrc->stream_properties); + pwsrc->stream_properties = + gst_structure_copy (gst_value_get_structure (value)); + break; + + case PROP_ALWAYS_COPY: + pwsrc->always_copy = g_value_get_boolean (value); + break; + + case PROP_MIN_BUFFERS: + pwsrc->min_buffers = g_value_get_int (value); + break; + + case PROP_MAX_BUFFERS: + pwsrc->max_buffers = g_value_get_int (value); + break; + + case PROP_FD: + pwsrc->fd = g_value_get_int (value); + break; + + case PROP_RESEND_LAST: + pwsrc->resend_last = g_value_get_boolean (value); + break; + + case PROP_KEEPALIVE_TIME: + pwsrc->keepalive_time = g_value_get_int (value); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_pipewire_src_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (object); + + switch (prop_id) { + case PROP_PATH: + g_value_set_string (value, pwsrc->path); + break; + + case PROP_TARGET_OBJECT: + g_value_set_string (value, pwsrc->target_object); + break; + + case PROP_CLIENT_NAME: + g_value_set_string (value, pwsrc->client_name); + break; + + case PROP_CLIENT_PROPERTIES: + gst_value_set_structure (value, pwsrc->client_properties); + break; + + case PROP_STREAM_PROPERTIES: + gst_value_set_structure (value, pwsrc->stream_properties); + break; + + case PROP_ALWAYS_COPY: + g_value_set_boolean (value, pwsrc->always_copy); + break; + + case PROP_MIN_BUFFERS: + g_value_set_int (value, pwsrc->min_buffers); + break; + + case PROP_MAX_BUFFERS: + g_value_set_int (value, pwsrc->max_buffers); + break; + + case PROP_FD: + g_value_set_int (value, pwsrc->fd); + break; + + case PROP_RESEND_LAST: + g_value_set_boolean (value, pwsrc->resend_last); + break; + + case PROP_KEEPALIVE_TIME: + g_value_set_int (value, pwsrc->keepalive_time); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static GstClock * +gst_pipewire_src_provide_clock (GstElement * elem) +{ + GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (elem); + GstClock *clock; + + GST_OBJECT_LOCK (pwsrc); + if (!GST_OBJECT_FLAG_IS_SET (pwsrc, GST_ELEMENT_FLAG_PROVIDE_CLOCK)) + goto clock_disabled; + + if (pwsrc->clock && pwsrc->is_live) + clock = GST_CLOCK_CAST (gst_object_ref (pwsrc->clock)); + else + clock = NULL; + GST_OBJECT_UNLOCK (pwsrc); + + return clock; + + /* ERRORS */ +clock_disabled: + { + GST_DEBUG_OBJECT (pwsrc, "clock provide disabled"); + GST_OBJECT_UNLOCK (pwsrc); + return NULL; + } +} + +static void +gst_pipewire_src_finalize (GObject * object) +{ + GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (object); + + if (pwsrc->stream_properties) + gst_structure_free (pwsrc->stream_properties); + if (pwsrc->client_properties) + gst_structure_free (pwsrc->client_properties); + if (pwsrc->clock) + gst_object_unref (pwsrc->clock); + g_free (pwsrc->path); + g_free (pwsrc->target_object); + g_free (pwsrc->client_name); + g_object_unref(pwsrc->pool); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_pipewire_src_class_init (GstPipeWireSrcClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + GstBaseSrcClass *gstbasesrc_class; + GstPushSrcClass *gstpushsrc_class; + + gobject_class = (GObjectClass *) klass; + gstelement_class = (GstElementClass *) klass; + gstbasesrc_class = (GstBaseSrcClass *) klass; + gstpushsrc_class = (GstPushSrcClass *) klass; + + gobject_class->finalize = gst_pipewire_src_finalize; + gobject_class->set_property = gst_pipewire_src_set_property; + gobject_class->get_property = gst_pipewire_src_get_property; + + g_object_class_install_property (gobject_class, + PROP_PATH, + g_param_spec_string ("path", + "Path", + "The source path to connect to (NULL = default)", + NULL, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS | + G_PARAM_DEPRECATED)); + + g_object_class_install_property (gobject_class, + PROP_TARGET_OBJECT, + g_param_spec_string ("target-object", + "Target object", + "The source name/serial to connect to (NULL = default)", + NULL, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_CLIENT_NAME, + g_param_spec_string ("client-name", + "Client Name", + "The client name to use (NULL = default)", + NULL, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_CLIENT_PROPERTIES, + g_param_spec_boxed ("client-properties", + "client properties", + "list of PipeWire client properties", + GST_TYPE_STRUCTURE, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_STREAM_PROPERTIES, + g_param_spec_boxed ("stream-properties", + "stream properties", + "list of PipeWire stream properties", + GST_TYPE_STRUCTURE, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_ALWAYS_COPY, + g_param_spec_boolean ("always-copy", + "Always copy", + "Always copy the buffer and data", + DEFAULT_ALWAYS_COPY, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_MIN_BUFFERS, + g_param_spec_int ("min-buffers", + "Min Buffers", + "Minimum number of buffers to negotiate with PipeWire", + 1, G_MAXINT, DEFAULT_MIN_BUFFERS, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_MAX_BUFFERS, + g_param_spec_int ("max-buffers", + "Max Buffers", + "Maximum number of buffers to negotiate with PipeWire", + 1, G_MAXINT, DEFAULT_MAX_BUFFERS, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_FD, + g_param_spec_int ("fd", + "Fd", + "The fd to connect with", + -1, G_MAXINT, -1, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_RESEND_LAST, + g_param_spec_boolean ("resend-last", + "Resend last", + "Resend last buffer on EOS", + DEFAULT_RESEND_LAST, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_KEEPALIVE_TIME, + g_param_spec_int ("keepalive-time", + "Keepalive Time", + "Periodically send last buffer (in milliseconds, 0 = disabled)", + 0, G_MAXINT, DEFAULT_KEEPALIVE_TIME, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + gstelement_class->provide_clock = gst_pipewire_src_provide_clock; + gstelement_class->change_state = gst_pipewire_src_change_state; + gstelement_class->send_event = gst_pipewire_src_send_event; + + gst_element_class_set_static_metadata (gstelement_class, + "PipeWire source", "Source/Video", + "Uses PipeWire to create video", "Wim Taymans <wim.taymans@gmail.com>"); + + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&gst_pipewire_src_template)); + + gstbasesrc_class->negotiate = gst_pipewire_src_negotiate; + gstbasesrc_class->unlock = gst_pipewire_src_unlock; + gstbasesrc_class->unlock_stop = gst_pipewire_src_unlock_stop; + gstbasesrc_class->start = gst_pipewire_src_start; + gstbasesrc_class->stop = gst_pipewire_src_stop; + gstbasesrc_class->event = gst_pipewire_src_event; + gstbasesrc_class->query = gst_pipewire_src_query; + gstpushsrc_class->create = gst_pipewire_src_create; + + GST_DEBUG_CATEGORY_INIT (pipewire_src_debug, "pipewiresrc", 0, + "PipeWire Source"); + + process_mem_data_quark = g_quark_from_static_string ("GstPipeWireSrcProcessMemQuark"); +} + +static void +gst_pipewire_src_init (GstPipeWireSrc * src) +{ + /* we operate in time */ + gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME); + + /* we're a live source, unless explicitly requested not to be */ + gst_base_src_set_live (GST_BASE_SRC (src), TRUE); + + GST_OBJECT_FLAG_SET (src, GST_ELEMENT_FLAG_PROVIDE_CLOCK); + + src->always_copy = DEFAULT_ALWAYS_COPY; + src->min_buffers = DEFAULT_MIN_BUFFERS; + src->max_buffers = DEFAULT_MAX_BUFFERS; + src->fd = -1; + src->resend_last = DEFAULT_RESEND_LAST; + src->keepalive_time = DEFAULT_KEEPALIVE_TIME; + + src->client_name = g_strdup(pw_get_client_name ()); + + src->pool = gst_pipewire_pool_new (); +} + +static gboolean +buffer_recycle (GstMiniObject *obj) +{ + GstPipeWireSrc *src; + GstPipeWirePoolData *data; + int res; + + data = gst_pipewire_pool_get_data (GST_BUFFER_CAST(obj)); + + GST_OBJECT_LOCK (data->pool); + if (!obj->dispose) { + GST_OBJECT_UNLOCK (data->pool); + return TRUE; + } + + GST_BUFFER_FLAGS (obj) = data->flags; + src = data->owner; + + pw_thread_loop_lock (src->core->loop); + if (!obj->dispose) { + pw_thread_loop_unlock (src->core->loop); + GST_OBJECT_UNLOCK (data->pool); + return TRUE; + } + + gst_mini_object_ref (obj); + + data->queued = TRUE; + + if ((res = pw_stream_queue_buffer (src->stream, data->b)) < 0) + GST_WARNING_OBJECT (src, "can't queue recycled buffer %p, %s", obj, spa_strerror(res)); + else + GST_LOG_OBJECT (src, "recycle buffer %p", obj); + + pw_thread_loop_unlock (src->core->loop); + + GST_OBJECT_UNLOCK (data->pool); + + return FALSE; +} + +static void +on_add_buffer (void *_data, struct pw_buffer *b) +{ + GstPipeWireSrc *pwsrc = _data; + GstPipeWirePoolData *data; + + gst_pipewire_pool_wrap_buffer (pwsrc->pool, b); + data = b->user_data; + GST_DEBUG_OBJECT (pwsrc, "add buffer %p", data->buf); + data->owner = pwsrc; + data->queued = TRUE; + GST_MINI_OBJECT_CAST (data->buf)->dispose = buffer_recycle; +} + +static void +on_remove_buffer (void *_data, struct pw_buffer *b) +{ + GstPipeWireSrc *pwsrc = _data; + GstPipeWirePoolData *data = b->user_data; + GstBuffer *buf = data->buf; + int res; + + GST_DEBUG_OBJECT (pwsrc, "remove buffer %p", buf); + + GST_MINI_OBJECT_CAST (buf)->dispose = NULL; + + if (data->queued) { + gst_buffer_unref (buf); + } else { + if ((res = pw_stream_queue_buffer (pwsrc->stream, b)) < 0) + GST_WARNING_OBJECT (pwsrc, "can't queue removed buffer %p, %s", buf, spa_strerror(res)); + } +} + +static const char * const transform_map[] = { + [SPA_META_TRANSFORMATION_None] = "rotate-0", + [SPA_META_TRANSFORMATION_90] = "rotate-90", + [SPA_META_TRANSFORMATION_180] = "rotate-180", + [SPA_META_TRANSFORMATION_270] = "rotate-270", + [SPA_META_TRANSFORMATION_Flipped] = "flip-rotate-0", + [SPA_META_TRANSFORMATION_Flipped90] = "flip-rotate-270", + [SPA_META_TRANSFORMATION_Flipped180] = "flip-rotate-180", + [SPA_META_TRANSFORMATION_Flipped270] = "flip-rotate-90", +}; + +static const char *spa_transform_value_to_gst_image_orientation(uint32_t transform_value) +{ + if (transform_value >= SPA_N_ELEMENTS(transform_map)) + transform_value = SPA_META_TRANSFORMATION_None; + + return transform_map[transform_value]; +} + +static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc) +{ + struct pw_buffer *b; + GstBuffer *buf; + GstPipeWirePoolData *data; + struct spa_meta_header *h; + struct spa_meta_region *crop; + struct spa_meta_videotransform *videotransform; + guint i; + + b = pw_stream_dequeue_buffer (pwsrc->stream); + if (b == NULL) + return NULL; + + data = b->user_data; + + if (!GST_IS_BUFFER (data->buf)) { + GST_ERROR_OBJECT (pwsrc, "stream buffer %p is missing", data->buf); + return NULL; + } + + if (!data->queued) { + GST_ERROR_OBJECT (pwsrc, "buffer %p was not recycled", data->buf); + return NULL; + } + + GST_LOG_OBJECT (pwsrc, "got new buffer %p", data->buf); + + buf = gst_buffer_new (); + + data->queued = FALSE; + GST_BUFFER_PTS (buf) = GST_CLOCK_TIME_NONE; + GST_BUFFER_DTS (buf) = GST_CLOCK_TIME_NONE; + + h = data->header; + if (h) { + GST_LOG_OBJECT (pwsrc, "pts %" G_GUINT64_FORMAT ", dts_offset %" G_GUINT64_FORMAT, h->pts, h->dts_offset); + + if (GST_CLOCK_TIME_IS_VALID (h->pts)) { + GST_BUFFER_PTS (buf) = h->pts + GST_PIPEWIRE_CLOCK (pwsrc->clock)->time_offset; + if (GST_BUFFER_PTS (buf) + h->dts_offset > 0) + GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf) + h->dts_offset; + } + GST_BUFFER_OFFSET (buf) = h->seq; + } + crop = data->crop; + if (crop) { + GstVideoCropMeta *meta = gst_buffer_get_video_crop_meta(buf); + if (meta) { + meta->x = crop->region.position.x; + meta->y = crop->region.position.y; + meta->width = crop->region.size.width; + meta->height = crop->region.size.height; + } + } + + videotransform = data->videotransform; + if (videotransform) { + if (pwsrc->transform_value != videotransform->transform) { + GstEvent *tag_event; + const char* tag_string; + + tag_string = + spa_transform_value_to_gst_image_orientation(videotransform->transform); + + GST_LOG_OBJECT (pwsrc, "got new videotransform: %u / %s", + videotransform->transform, tag_string); + + tag_event = gst_event_new_tag(gst_tag_list_new(GST_TAG_IMAGE_ORIENTATION, + tag_string, NULL)); + gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc), tag_event); + + pwsrc->transform_value = videotransform->transform; + } + } + + for (i = 0; i < b->buffer->n_datas; i++) { + struct spa_data *d = &b->buffer->datas[i]; + GstMemory *pmem = gst_buffer_peek_memory (data->buf, i); + if (pmem) { + GstMemory *mem; + if (!pwsrc->always_copy) + mem = gst_memory_share (pmem, d->chunk->offset, d->chunk->size); + else + mem = gst_memory_copy (pmem, d->chunk->offset, d->chunk->size); + gst_buffer_insert_memory (buf, i, mem); + } + if (d->chunk->flags & SPA_CHUNK_FLAG_CORRUPTED) + GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_CORRUPTED); + } + if (!pwsrc->always_copy) + gst_buffer_add_parent_buffer_meta (buf, data->buf); + gst_buffer_unref (data->buf); + return buf; +} + +static void +on_process (void *_data) +{ + GstPipeWireSrc *pwsrc = _data; + pw_thread_loop_signal (pwsrc->core->loop, FALSE); +} + +static void +on_state_changed (void *data, + enum pw_stream_state old, + enum pw_stream_state state, const char *error) +{ + GstPipeWireSrc *pwsrc = data; + + GST_DEBUG ("got stream state %s", pw_stream_state_as_string (state)); + + switch (state) { + case PW_STREAM_STATE_UNCONNECTED: + case PW_STREAM_STATE_CONNECTING: + case PW_STREAM_STATE_PAUSED: + case PW_STREAM_STATE_STREAMING: + break; + case PW_STREAM_STATE_ERROR: + GST_ELEMENT_ERROR (pwsrc, RESOURCE, FAILED, + ("stream error: %s", error), (NULL)); + break; + } + pw_thread_loop_signal (pwsrc->core->loop, FALSE); +} + +static void +parse_stream_properties (GstPipeWireSrc *pwsrc, const struct pw_properties *props) +{ + const gchar *var; + gboolean is_live; + + GST_OBJECT_LOCK (pwsrc); + var = pw_properties_get (props, PW_KEY_STREAM_IS_LIVE); + is_live = pwsrc->is_live = var ? pw_properties_parse_bool(var) : TRUE; + + var = pw_properties_get (props, PW_KEY_STREAM_LATENCY_MIN); + pwsrc->min_latency = var ? (GstClockTime) atoi (var) : 0; + + var = pw_properties_get (props, PW_KEY_STREAM_LATENCY_MAX); + pwsrc->max_latency = var ? (GstClockTime) atoi (var) : GST_CLOCK_TIME_NONE; + GST_OBJECT_UNLOCK (pwsrc); + + GST_DEBUG_OBJECT (pwsrc, "live %d", is_live); + + gst_base_src_set_live (GST_BASE_SRC (pwsrc), is_live); +} + +static gboolean +gst_pipewire_src_stream_start (GstPipeWireSrc *pwsrc) +{ + const char *error = NULL; + struct timespec abstime; + + pw_thread_loop_lock (pwsrc->core->loop); + GST_DEBUG_OBJECT (pwsrc, "doing stream start"); + + pw_thread_loop_get_time (pwsrc->core->loop, &abstime, + GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); + + while (TRUE) { + enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error); + + GST_DEBUG_OBJECT (pwsrc, "waiting for STREAMING, now %s", pw_stream_state_as_string (state)); + if (state == PW_STREAM_STATE_STREAMING) + break; + + if (state == PW_STREAM_STATE_ERROR) + goto start_error; + + if (pwsrc->flushing) { + error = "flushing"; + goto start_error; + } + + if (pw_thread_loop_timed_wait_full (pwsrc->core->loop, &abstime) < 0) { + error = "timeout"; + goto start_error; + } + } + + parse_stream_properties (pwsrc, pw_stream_get_properties (pwsrc->stream)); + GST_DEBUG_OBJECT (pwsrc, "signal started"); + pwsrc->started = TRUE; + pw_thread_loop_signal (pwsrc->core->loop, FALSE); + pw_thread_loop_unlock (pwsrc->core->loop); + + return TRUE; + +start_error: + { + GST_DEBUG_OBJECT (pwsrc, "error starting stream: %s", error); + pw_thread_loop_signal (pwsrc->core->loop, FALSE); + pw_thread_loop_unlock (pwsrc->core->loop); + return FALSE; + } +} + +static enum pw_stream_state +wait_started (GstPipeWireSrc *this) +{ + enum pw_stream_state state; + const char *error = NULL; + struct timespec abstime; + + pw_thread_loop_lock (this->core->loop); + + pw_thread_loop_get_time (this->core->loop, &abstime, + GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); + + while (TRUE) { + state = pw_stream_get_state (this->stream, &error); + + GST_DEBUG_OBJECT (this, "waiting for started signal, state now %s", + pw_stream_state_as_string (state)); + + if (state == PW_STREAM_STATE_ERROR) + break; + + if (this->flushing) { + state = PW_STREAM_STATE_ERROR; + break; + } + + if (this->started) + break; + + if (pw_thread_loop_timed_wait_full (this->core->loop, &abstime) < 0) { + state = PW_STREAM_STATE_ERROR; + break; + } + } + GST_DEBUG_OBJECT (this, "got started signal: %s", + pw_stream_state_as_string (state)); + pw_thread_loop_unlock (this->core->loop); + + return state; +} + +static gboolean +gst_pipewire_src_negotiate (GstBaseSrc * basesrc) +{ + GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc); + GstCaps *thiscaps; + GstCaps *caps = NULL; + GstCaps *peercaps = NULL; + gboolean result = FALSE; + GPtrArray *possible; + const char *error = NULL; + struct timespec abstime; + uint32_t target_id; + + /* first see what is possible on our source pad */ + thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL); + GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps); + /* nothing or anything is allowed, we're done */ + if (thiscaps == NULL) + goto no_nego_needed; + + if (G_UNLIKELY (gst_caps_is_empty (thiscaps))) + goto no_caps; + + /* get the peer caps */ + peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps); + GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps); + if (peercaps) { + /* The result is already a subset of our caps */ + caps = peercaps; + gst_caps_unref (thiscaps); + } else { + /* no peer, work with our own caps then */ + caps = thiscaps; + } + if (caps == NULL || gst_caps_is_empty (caps)) + goto no_common_caps; + + GST_DEBUG_OBJECT (basesrc, "have common caps: %" GST_PTR_FORMAT, caps); + + /* open a connection with these caps */ + possible = gst_caps_to_format_all (caps, SPA_PARAM_EnumFormat); + gst_caps_unref (caps); + + /* first disconnect */ + pw_thread_loop_lock (pwsrc->core->loop); + if (pw_stream_get_state(pwsrc->stream, &error) != PW_STREAM_STATE_UNCONNECTED) { + GST_DEBUG_OBJECT (basesrc, "disconnect capture"); + pw_stream_disconnect (pwsrc->stream); + while (TRUE) { + enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error); + + GST_DEBUG_OBJECT (basesrc, "waiting for UNCONNECTED, now %s", pw_stream_state_as_string (state)); + if (state == PW_STREAM_STATE_UNCONNECTED) + break; + + if (state == PW_STREAM_STATE_ERROR || pwsrc->flushing) { + g_ptr_array_unref (possible); + goto connect_error; + } + + pw_thread_loop_wait (pwsrc->core->loop); + } + } + + target_id = pwsrc->path ? (uint32_t)atoi(pwsrc->path) : PW_ID_ANY; + + if (pwsrc->target_object) { + struct spa_dict_item items[2] = { + SPA_DICT_ITEM_INIT(PW_KEY_TARGET_OBJECT, pwsrc->target_object), + /* XXX deprecated but the portal and some example apps only + * provide the object id */ + SPA_DICT_ITEM_INIT(PW_KEY_NODE_TARGET, NULL), + }; + struct spa_dict dict = SPA_DICT_INIT_ARRAY(items); + uint64_t serial; + + /* If target.object is a name, set it also to node.target */ + if (spa_atou64(pwsrc->target_object, &serial, 0)) { + dict.n_items = 1; + } else { + target_id = PW_ID_ANY; + items[1].value = pwsrc->target_object; + } + + pw_stream_update_properties (pwsrc->stream, &dict); + } + + GST_DEBUG_OBJECT (basesrc, "connect capture with path %s, target-object %s", + pwsrc->path, pwsrc->target_object); + pwsrc->negotiated = FALSE; + pw_stream_connect (pwsrc->stream, + PW_DIRECTION_INPUT, + target_id, + PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_DONT_RECONNECT, + (const struct spa_pod **)possible->pdata, + possible->len); + g_ptr_array_free (possible, TRUE); + + pw_thread_loop_get_time (pwsrc->core->loop, &abstime, + GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); + + while (TRUE) { + enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error); + + GST_DEBUG_OBJECT (basesrc, "waiting for NEGOTIATED, now %s", pw_stream_state_as_string (state)); + if (state == PW_STREAM_STATE_ERROR || pwsrc->flushing) + goto connect_error; + + if (pwsrc->negotiated) + break; + + if (pw_thread_loop_timed_wait_full (pwsrc->core->loop, &abstime) < 0) + goto connect_error; + } + caps = pwsrc->caps; + pwsrc->caps = NULL; + pw_thread_loop_unlock (pwsrc->core->loop); + + if (caps == NULL) + goto no_caps; + + gst_pipewire_clock_reset (GST_PIPEWIRE_CLOCK (pwsrc->clock), 0); + + GST_DEBUG_OBJECT (pwsrc, "set format %" GST_PTR_FORMAT, caps); + result = gst_base_src_set_caps (GST_BASE_SRC (pwsrc), caps); + gst_caps_unref (caps); + + result = gst_pipewire_src_stream_start (pwsrc); + + pwsrc->started = result; + + return result; + +no_nego_needed: + { + GST_DEBUG_OBJECT (basesrc, "no negotiation needed"); + if (thiscaps) + gst_caps_unref (thiscaps); + return TRUE; + } +no_caps: + { + GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT, + ("No supported formats found"), + ("This element did not produce valid caps")); + if (thiscaps) + gst_caps_unref (thiscaps); + return FALSE; + } +no_common_caps: + { + GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT, + ("No supported formats found"), + ("This element does not have formats in common with the peer")); + if (caps) + gst_caps_unref (caps); + return FALSE; + } +connect_error: + { + GST_DEBUG_OBJECT (basesrc, "connect error"); + pw_thread_loop_unlock (pwsrc->core->loop); + return FALSE; + } +} + +static void +on_param_changed (void *data, uint32_t id, + const struct spa_pod *param) +{ + GstPipeWireSrc *pwsrc = data; + + if (param == NULL || id != SPA_PARAM_Format) { + GST_DEBUG_OBJECT (pwsrc, "clear format"); + return; + } + if (pwsrc->caps) + gst_caps_unref(pwsrc->caps); + pwsrc->caps = gst_caps_from_format (param); + + pwsrc->negotiated = pwsrc->caps != NULL; + + if (pwsrc->negotiated) { + const struct spa_pod *params[4]; + struct spa_pod_builder b = { NULL }; + uint8_t buffer[512]; + uint32_t buffers = CLAMP (16, pwsrc->min_buffers, pwsrc->max_buffers); + int buffertypes; + + buffertypes = (1<<SPA_DATA_DmaBuf); + if (spa_pod_find_prop (param, NULL, SPA_FORMAT_VIDEO_modifier)) { + gst_caps_features_remove (gst_caps_get_features (pwsrc->caps, 0), + GST_CAPS_FEATURE_MEMORY_SYSTEM_MEMORY); + gst_caps_features_add (gst_caps_get_features (pwsrc->caps, 0), + GST_CAPS_FEATURE_MEMORY_DMABUF); + } else { + buffertypes |= ((1<<SPA_DATA_MemFd) | (1<<SPA_DATA_MemPtr)); + } + + GST_DEBUG_OBJECT (pwsrc, "we got format %" GST_PTR_FORMAT, pwsrc->caps); + + spa_pod_builder_init (&b, buffer, sizeof (buffer)); + params[0] = spa_pod_builder_add_object (&b, + SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers, + SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(buffers, + pwsrc->min_buffers, + pwsrc->max_buffers), + SPA_PARAM_BUFFERS_blocks, SPA_POD_CHOICE_RANGE_Int(0, 1, INT32_MAX), + SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(0, 0, INT32_MAX), + SPA_PARAM_BUFFERS_stride, SPA_POD_CHOICE_RANGE_Int(0, 0, INT32_MAX), + SPA_PARAM_BUFFERS_dataType, SPA_POD_CHOICE_FLAGS_Int(buffertypes)); + + params[1] = spa_pod_builder_add_object (&b, + SPA_TYPE_OBJECT_ParamMeta, SPA_PARAM_Meta, + SPA_PARAM_META_type, SPA_POD_Id(SPA_META_Header), + SPA_PARAM_META_size, SPA_POD_Int(sizeof (struct spa_meta_header))); + params[2] = spa_pod_builder_add_object (&b, + SPA_TYPE_OBJECT_ParamMeta, SPA_PARAM_Meta, + SPA_PARAM_META_type, SPA_POD_Id(SPA_META_VideoCrop), + SPA_PARAM_META_size, SPA_POD_Int(sizeof (struct spa_meta_region))); + params[3] = spa_pod_builder_add_object (&b, + SPA_TYPE_OBJECT_ParamMeta, SPA_PARAM_Meta, + SPA_PARAM_META_type, SPA_POD_Id(SPA_META_VideoTransform), + SPA_PARAM_META_size, SPA_POD_Int(sizeof (struct spa_meta_videotransform))); + + GST_DEBUG_OBJECT (pwsrc, "doing finish format"); + pw_stream_update_params (pwsrc->stream, params, SPA_N_ELEMENTS(params)); + } else { + GST_WARNING_OBJECT (pwsrc, "finish format with error"); + pw_stream_set_error (pwsrc->stream, -EINVAL, "unhandled format"); + } + pw_thread_loop_signal (pwsrc->core->loop, FALSE); +} + +static gboolean +gst_pipewire_src_unlock (GstBaseSrc * basesrc) +{ + GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc); + + pw_thread_loop_lock (pwsrc->core->loop); + GST_DEBUG_OBJECT (pwsrc, "setting flushing"); + pwsrc->flushing = TRUE; + pw_thread_loop_signal (pwsrc->core->loop, FALSE); + pw_thread_loop_unlock (pwsrc->core->loop); + + return TRUE; +} + +static gboolean +gst_pipewire_src_unlock_stop (GstBaseSrc * basesrc) +{ + GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc); + + pw_thread_loop_lock (pwsrc->core->loop); + GST_DEBUG_OBJECT (pwsrc, "unsetting flushing"); + pwsrc->flushing = FALSE; + pw_thread_loop_unlock (pwsrc->core->loop); + + return TRUE; +} + +static gboolean +gst_pipewire_src_event (GstBaseSrc * src, GstEvent * event) +{ + gboolean res = FALSE; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_CUSTOM_UPSTREAM: + if (gst_video_event_is_force_key_unit (event)) { + GstClockTime running_time; + gboolean all_headers; + guint count; + + gst_video_event_parse_upstream_force_key_unit (event, + &running_time, &all_headers, &count); + + res = TRUE; + } else { + res = GST_BASE_SRC_CLASS (parent_class)->event (src, event); + } + break; + default: + res = GST_BASE_SRC_CLASS (parent_class)->event (src, event); + break; + } + return res; +} + +static gboolean +gst_pipewire_src_query (GstBaseSrc * src, GstQuery * query) +{ + gboolean res = FALSE; + GstPipeWireSrc *pwsrc; + + pwsrc = GST_PIPEWIRE_SRC (src); + + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_LATENCY: + GST_OBJECT_LOCK (pwsrc); + pwsrc->min_latency = 10000000; + pwsrc->max_latency = GST_CLOCK_TIME_NONE; + gst_query_set_latency (query, pwsrc->is_live, pwsrc->min_latency, pwsrc->max_latency); + GST_OBJECT_UNLOCK (pwsrc); + res = TRUE; + break; + default: + res = GST_BASE_SRC_CLASS (parent_class)->query (src, query); + break; + } + return res; +} + +static GstFlowReturn +gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer) +{ + GstPipeWireSrc *pwsrc; + GstClockTime pts, dts, base_time; + const char *error = NULL; + GstBuffer *buf; + gboolean update_time = FALSE, timeout = FALSE; + + pwsrc = GST_PIPEWIRE_SRC (psrc); + + pw_thread_loop_lock (pwsrc->core->loop); + if (!pwsrc->negotiated) + goto not_negotiated; + + while (TRUE) { + enum pw_stream_state state; + + if (pwsrc->flushing) + goto streaming_stopped; + + if (pwsrc->stream == NULL) + goto streaming_error; + + state = pw_stream_get_state (pwsrc->stream, &error); + if (state == PW_STREAM_STATE_ERROR) + goto streaming_error; + + if (state != PW_STREAM_STATE_STREAMING) + goto streaming_stopped; + + if (pwsrc->eos) { + if (pwsrc->last_buffer == NULL) + goto streaming_eos; + buf = pwsrc->last_buffer; + pwsrc->last_buffer = NULL; + update_time = TRUE; + GST_LOG_OBJECT (pwsrc, "EOS, send last buffer"); + break; + } else if (timeout) { + if (pwsrc->last_buffer != NULL) { + update_time = TRUE; + buf = gst_buffer_ref(pwsrc->last_buffer); + GST_LOG_OBJECT (pwsrc, "timeout, send keepalive buffer"); + break; + } + } else { + buf = dequeue_buffer (pwsrc); + GST_LOG_OBJECT (pwsrc, "popped buffer %p", buf); + if (buf != NULL) { + if (pwsrc->resend_last || pwsrc->keepalive_time > 0) + gst_buffer_replace (&pwsrc->last_buffer, buf); + break; + } + } + timeout = FALSE; + if (pwsrc->keepalive_time > 0) { + struct timespec abstime; + pw_thread_loop_get_time(pwsrc->core->loop, &abstime, + pwsrc->keepalive_time * SPA_NSEC_PER_MSEC); + if (pw_thread_loop_timed_wait_full (pwsrc->core->loop, &abstime) == -ETIMEDOUT) + timeout = TRUE; + } else { + pw_thread_loop_wait (pwsrc->core->loop); + } + } + pw_thread_loop_unlock (pwsrc->core->loop); + + *buffer = buf; + + if (pwsrc->is_live) + base_time = GST_ELEMENT_CAST (psrc)->base_time; + else + base_time = 0; + + if (update_time) { + GstClock *clock = gst_element_get_clock (GST_ELEMENT_CAST (pwsrc)); + if (clock != NULL) { + pts = dts = gst_clock_get_time (clock); + gst_object_unref (clock); + } else { + pts = dts = GST_CLOCK_TIME_NONE; + } + } else { + pts = GST_BUFFER_PTS (*buffer); + dts = GST_BUFFER_DTS (*buffer); + } + + if (GST_CLOCK_TIME_IS_VALID (pts)) + pts = (pts >= base_time ? pts - base_time : 0); + if (GST_CLOCK_TIME_IS_VALID (dts)) + dts = (dts >= base_time ? dts - base_time : 0); + + GST_LOG_OBJECT (pwsrc, + "pts %" G_GUINT64_FORMAT ", dts %" G_GUINT64_FORMAT + ", base-time %" GST_TIME_FORMAT " -> %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT, + GST_BUFFER_PTS (*buffer), GST_BUFFER_DTS (*buffer), GST_TIME_ARGS (base_time), + GST_TIME_ARGS (pts), GST_TIME_ARGS (dts)); + + GST_BUFFER_PTS (*buffer) = pts; + GST_BUFFER_DTS (*buffer) = dts; + + return GST_FLOW_OK; + +not_negotiated: + { + pw_thread_loop_unlock (pwsrc->core->loop); + return GST_FLOW_NOT_NEGOTIATED; + } +streaming_eos: + { + pw_thread_loop_unlock (pwsrc->core->loop); + return GST_FLOW_EOS; + } +streaming_error: + { + pw_thread_loop_unlock (pwsrc->core->loop); + return GST_FLOW_ERROR; + } +streaming_stopped: + { + pw_thread_loop_unlock (pwsrc->core->loop); + return GST_FLOW_FLUSHING; + } +} + +static gboolean +gst_pipewire_src_start (GstBaseSrc * basesrc) +{ + return TRUE; +} + +static gboolean +gst_pipewire_src_stop (GstBaseSrc * basesrc) +{ + GstPipeWireSrc *pwsrc; + + pwsrc = GST_PIPEWIRE_SRC (basesrc); + + pw_thread_loop_lock (pwsrc->core->loop); + pwsrc->eos = false; + gst_buffer_replace (&pwsrc->last_buffer, NULL); + gst_caps_replace(&pwsrc->caps, NULL); + pw_thread_loop_unlock (pwsrc->core->loop); + + return TRUE; +} + +static gboolean +copy_properties (GQuark field_id, + const GValue *value, + gpointer user_data) +{ + struct pw_properties *properties = user_data; + GValue dst = { 0 }; + + if (g_value_type_transformable (G_VALUE_TYPE(value), G_TYPE_STRING)) { + g_value_init(&dst, G_TYPE_STRING); + if (g_value_transform(value, &dst)) { + pw_properties_set (properties, + g_quark_to_string (field_id), + g_value_get_string (&dst)); + } + g_value_unset(&dst); + } + return TRUE; +} + +static const struct pw_stream_events stream_events = { + PW_VERSION_STREAM_EVENTS, + .state_changed = on_state_changed, + .param_changed = on_param_changed, + .add_buffer = on_add_buffer, + .remove_buffer = on_remove_buffer, + .process = on_process, +}; + +static gboolean +gst_pipewire_src_open (GstPipeWireSrc * pwsrc) +{ + struct pw_properties *props; + + GST_DEBUG_OBJECT (pwsrc, "open"); + + pwsrc->core = gst_pipewire_core_get(pwsrc->fd); + if (pwsrc->core == NULL) + goto connect_error; + + pw_thread_loop_lock (pwsrc->core->loop); + + props = pw_properties_new (NULL, NULL); + if (pwsrc->client_properties) { + gst_structure_foreach (pwsrc->client_properties, copy_properties, props); + pw_core_update_properties (pwsrc->core->core, &props->dict); + pw_properties_clear(props); + } + if (pwsrc->client_name) { + pw_properties_set (props, PW_KEY_NODE_NAME, pwsrc->client_name); + pw_properties_set (props, PW_KEY_NODE_DESCRIPTION, pwsrc->client_name); + } + if (pwsrc->stream_properties) { + gst_structure_foreach (pwsrc->stream_properties, copy_properties, props); + } + + if ((pwsrc->stream = pw_stream_new (pwsrc->core->core, + pwsrc->client_name, props)) == NULL) + goto no_stream; + + + pw_stream_add_listener(pwsrc->stream, + &pwsrc->stream_listener, + &stream_events, + pwsrc); + + pwsrc->clock = gst_pipewire_clock_new (pwsrc->stream, pwsrc->last_time); + pw_thread_loop_unlock (pwsrc->core->loop); + + return TRUE; + + /* ERRORS */ +connect_error: + { + GST_ELEMENT_ERROR (pwsrc, RESOURCE, FAILED, ("can't connect"), (NULL)); + return FALSE; + } +no_stream: + { + GST_ELEMENT_ERROR (pwsrc, RESOURCE, FAILED, ("can't create stream"), (NULL)); + pw_thread_loop_unlock (pwsrc->core->loop); + gst_pipewire_core_release (pwsrc->core); + pwsrc->core = NULL; + return FALSE; + } +} + +static void +gst_pipewire_src_close (GstPipeWireSrc * pwsrc) +{ + pwsrc->last_time = gst_clock_get_time (pwsrc->clock); + + GST_DEBUG_OBJECT (pwsrc, "close"); + + gst_element_post_message (GST_ELEMENT (pwsrc), + gst_message_new_clock_lost (GST_OBJECT_CAST (pwsrc), pwsrc->clock)); + + GST_OBJECT_LOCK (pwsrc); + GST_PIPEWIRE_CLOCK (pwsrc->clock)->stream = NULL; + g_clear_object (&pwsrc->clock); + GST_OBJECT_UNLOCK (pwsrc); + + GST_OBJECT_LOCK (pwsrc->pool); + pw_thread_loop_lock (pwsrc->core->loop); + if (pwsrc->stream) { + pw_stream_destroy (pwsrc->stream); + pwsrc->stream = NULL; + } + pw_thread_loop_unlock (pwsrc->core->loop); + GST_OBJECT_UNLOCK (pwsrc->pool); + + if (pwsrc->core) { + gst_pipewire_core_release (pwsrc->core); + pwsrc->core = NULL; + } +} + +static gboolean +gst_pipewire_src_send_event (GstElement * elem, GstEvent * event) +{ + GstPipeWireSrc *this = GST_PIPEWIRE_SRC_CAST (elem); + gboolean ret; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_EOS: + GST_DEBUG_OBJECT (this, "got EOS"); + pw_thread_loop_lock (this->core->loop); + this->eos = true; + pw_thread_loop_signal (this->core->loop, FALSE); + pw_thread_loop_unlock (this->core->loop); + ret = TRUE; + break; + default: + ret = GST_ELEMENT_CLASS (parent_class)->send_event (elem, event); + break; + } + return ret; +} + +static GstStateChangeReturn +gst_pipewire_src_change_state (GstElement * element, GstStateChange transition) +{ + GstStateChangeReturn ret; + GstPipeWireSrc *this = GST_PIPEWIRE_SRC_CAST (element); + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + if (!gst_pipewire_src_open (this)) + goto open_failed; + break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + break; + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + /* uncork and start recording */ + pw_thread_loop_lock (this->core->loop); + pw_stream_set_active(this->stream, true); + pw_thread_loop_unlock (this->core->loop); + break; + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + /* stop recording ASAP by corking */ + pw_thread_loop_lock (this->core->loop); + pw_stream_set_active(this->stream, false); + pw_thread_loop_unlock (this->core->loop); + break; + default: + break; + } + + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + + switch (transition) { + case GST_STATE_CHANGE_READY_TO_PAUSED: + if (wait_started (this) == PW_STREAM_STATE_ERROR) + goto open_failed; + + if (gst_base_src_is_live (GST_BASE_SRC (element))) + ret = GST_STATE_CHANGE_NO_PREROLL; + break; + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + break; + case GST_STATE_CHANGE_PAUSED_TO_READY: + pw_thread_loop_lock (this->core->loop); + this->negotiated = FALSE; + pw_thread_loop_unlock (this->core->loop); + break; + case GST_STATE_CHANGE_READY_TO_NULL: + gst_pipewire_src_close (this); + break; + default: + break; + } + return ret; + + /* ERRORS */ +open_failed: + { + return GST_STATE_CHANGE_FAILURE; + } +} diff --git a/src/gst/gstpipewiresrc.h b/src/gst/gstpipewiresrc.h new file mode 100644 index 0000000..e1def85 --- /dev/null +++ b/src/gst/gstpipewiresrc.h @@ -0,0 +1,110 @@ +/* GStreamer + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef __GST_PIPEWIRE_SRC_H__ +#define __GST_PIPEWIRE_SRC_H__ + +#include <gst/gst.h> +#include <gst/base/gstpushsrc.h> + +#include <pipewire/pipewire.h> +#include <gst/gstpipewirepool.h> +#include <gst/gstpipewirecore.h> + +G_BEGIN_DECLS + +#define GST_TYPE_PIPEWIRE_SRC \ + (gst_pipewire_src_get_type()) +#define GST_PIPEWIRE_SRC(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_PIPEWIRE_SRC,GstPipeWireSrc)) +#define GST_PIPEWIRE_SRC_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_PIPEWIRE_SRC,GstPipeWireSrcClass)) +#define GST_IS_PIPEWIRE_SRC(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PIPEWIRE_SRC)) +#define GST_IS_PIPEWIRE_SRC_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PIPEWIRE_SRC)) +#define GST_PIPEWIRE_SRC_CAST(obj) \ + ((GstPipeWireSrc *) (obj)) + +typedef struct _GstPipeWireSrc GstPipeWireSrc; +typedef struct _GstPipeWireSrcClass GstPipeWireSrcClass; + +/** + * GstPipeWireSrc: + * + * Opaque data structure. + */ +struct _GstPipeWireSrc { + GstPushSrc element; + + /*< private >*/ + gchar *path; + gchar *target_object; + gchar *client_name; + gboolean always_copy; + gint min_buffers; + gint max_buffers; + int fd; + gboolean resend_last; + gint keepalive_time; + + GstCaps *caps; + + gboolean negotiated; + gboolean flushing; + gboolean started; + gboolean eos; + + gboolean is_live; + GstClockTime min_latency; + GstClockTime max_latency; + + GstStructure *client_properties; + GstPipeWireCore *core; + struct spa_hook core_listener; + int last_seq; + int pending_seq; + + struct pw_stream *stream; + struct spa_hook stream_listener; + + GstBuffer *last_buffer; + GstStructure *stream_properties; + + GstPipeWirePool *pool; + GstClock *clock; + GstClockTime last_time; + + enum spa_meta_videotransform_value transform_value; +}; + +struct _GstPipeWireSrcClass { + GstPushSrcClass parent_class; +}; + +GType gst_pipewire_src_get_type (void); + +G_END_DECLS + +#endif /* __GST_PIPEWIRE_SRC_H__ */ diff --git a/src/gst/meson.build b/src/gst/meson.build new file mode 100644 index 0000000..fd552f6 --- /dev/null +++ b/src/gst/meson.build @@ -0,0 +1,33 @@ +pipewire_gst_sources = [ + 'gstpipewire.c', + 'gstpipewirecore.c', + 'gstpipewireclock.c', + 'gstpipewireformat.c', + 'gstpipewirepool.c', + 'gstpipewiresink.c', + 'gstpipewiresrc.c', +] + +if get_option('gstreamer-device-provider').allowed() + pipewire_gst_sources += [ 'gstpipewiredeviceprovider.c' ] +endif + +pipewire_gst_headers = [ + 'gstpipewireclock.h', + 'gstpipewirecore.h', + 'gstpipewiredeviceprovider.h', + 'gstpipewireformat.h', + 'gstpipewirepool.h', + 'gstpipewiresink.h', + 'gstpipewiresrc.h', +] + +pipewire_gst = shared_library('gstpipewire', + pipewire_gst_sources, + include_directories : [ configinc ], + dependencies : [ spa_dep, gst_dep, pipewire_dep ], + install : true, + install_dir : '@0@/gstreamer-1.0'.format(get_option('libdir')), +) + +plugins = [pipewire_gst] |