summaryrefslogtreecommitdiffstats
path: root/demux/demux.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-15 20:36:56 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-15 20:36:56 +0000
commit51de1d8436100f725f3576aefa24a2bd2057bc28 (patch)
treec6d1d5264b6d40a8d7ca34129f36b7d61e188af3 /demux/demux.c
parentInitial commit. (diff)
downloadmpv-51de1d8436100f725f3576aefa24a2bd2057bc28.tar.xz
mpv-51de1d8436100f725f3576aefa24a2bd2057bc28.zip
Adding upstream version 0.37.0.upstream/0.37.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'demux/demux.c')
-rw-r--r--demux/demux.c4624
1 files changed, 4624 insertions, 0 deletions
diff --git a/demux/demux.c b/demux/demux.c
new file mode 100644
index 0000000..256e1b6
--- /dev/null
+++ b/demux/demux.c
@@ -0,0 +1,4624 @@
+/*
+ * This file is part of mpv.
+ *
+ * mpv is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * mpv is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with mpv. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <assert.h>
+#include <float.h>
+#include <limits.h>
+#include <math.h>
+#include <stdatomic.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "cache.h"
+#include "config.h"
+#include "options/m_config.h"
+#include "options/m_option.h"
+#include "mpv_talloc.h"
+#include "common/av_common.h"
+#include "common/msg.h"
+#include "common/global.h"
+#include "common/recorder.h"
+#include "common/stats.h"
+#include "misc/charset_conv.h"
+#include "misc/thread_tools.h"
+#include "osdep/timer.h"
+#include "osdep/threads.h"
+
+#include "stream/stream.h"
+#include "demux.h"
+#include "timeline.h"
+#include "stheader.h"
+#include "cue.h"
+
+// Demuxer list
+extern const struct demuxer_desc demuxer_desc_edl;
+extern const struct demuxer_desc demuxer_desc_cue;
+extern const demuxer_desc_t demuxer_desc_rawaudio;
+extern const demuxer_desc_t demuxer_desc_rawvideo;
+extern const demuxer_desc_t demuxer_desc_mf;
+extern const demuxer_desc_t demuxer_desc_matroska;
+extern const demuxer_desc_t demuxer_desc_lavf;
+extern const demuxer_desc_t demuxer_desc_playlist;
+extern const demuxer_desc_t demuxer_desc_disc;
+extern const demuxer_desc_t demuxer_desc_rar;
+extern const demuxer_desc_t demuxer_desc_libarchive;
+extern const demuxer_desc_t demuxer_desc_null;
+extern const demuxer_desc_t demuxer_desc_timeline;
+
+static const demuxer_desc_t *const demuxer_list[] = {
+ &demuxer_desc_disc,
+ &demuxer_desc_edl,
+ &demuxer_desc_cue,
+ &demuxer_desc_rawaudio,
+ &demuxer_desc_rawvideo,
+ &demuxer_desc_matroska,
+#if HAVE_LIBARCHIVE
+ &demuxer_desc_libarchive,
+#endif
+ &demuxer_desc_lavf,
+ &demuxer_desc_mf,
+ &demuxer_desc_playlist,
+ &demuxer_desc_null,
+ NULL
+};
+
+#define OPT_BASE_STRUCT struct demux_opts
+
+static bool get_demux_sub_opts(int index, const struct m_sub_options **sub);
+
+const struct m_sub_options demux_conf = {
+ .opts = (const struct m_option[]){
+ {"cache", OPT_CHOICE(enable_cache,
+ {"no", 0}, {"auto", -1}, {"yes", 1})},
+ {"cache-on-disk", OPT_BOOL(disk_cache)},
+ {"demuxer-readahead-secs", OPT_DOUBLE(min_secs), M_RANGE(0, DBL_MAX)},
+ {"demuxer-hysteresis-secs", OPT_DOUBLE(hyst_secs), M_RANGE(0, DBL_MAX)},
+ {"demuxer-max-bytes", OPT_BYTE_SIZE(max_bytes),
+ M_RANGE(0, M_MAX_MEM_BYTES)},
+ {"demuxer-max-back-bytes", OPT_BYTE_SIZE(max_bytes_bw),
+ M_RANGE(0, M_MAX_MEM_BYTES)},
+ {"demuxer-donate-buffer", OPT_BOOL(donate_fw)},
+ {"force-seekable", OPT_BOOL(force_seekable)},
+ {"cache-secs", OPT_DOUBLE(min_secs_cache), M_RANGE(0, DBL_MAX)},
+ {"access-references", OPT_BOOL(access_references)},
+ {"demuxer-seekable-cache", OPT_CHOICE(seekable_cache,
+ {"auto", -1}, {"no", 0}, {"yes", 1})},
+ {"index", OPT_CHOICE(index_mode, {"default", 1}, {"recreate", 0})},
+ {"mf-fps", OPT_DOUBLE(mf_fps)},
+ {"mf-type", OPT_STRING(mf_type)},
+ {"sub-create-cc-track", OPT_BOOL(create_ccs)},
+ {"stream-record", OPT_STRING(record_file)},
+ {"video-backward-overlap", OPT_CHOICE(video_back_preroll, {"auto", -1}),
+ M_RANGE(0, 1024)},
+ {"audio-backward-overlap", OPT_CHOICE(audio_back_preroll, {"auto", -1}),
+ M_RANGE(0, 1024)},
+ {"video-backward-batch", OPT_INT(back_batch[STREAM_VIDEO]),
+ M_RANGE(0, 1024)},
+ {"audio-backward-batch", OPT_INT(back_batch[STREAM_AUDIO]),
+ M_RANGE(0, 1024)},
+ {"demuxer-backward-playback-step", OPT_DOUBLE(back_seek_size),
+ M_RANGE(0, DBL_MAX)},
+ {"metadata-codepage", OPT_STRING(meta_cp)},
+ {0}
+ },
+ .size = sizeof(struct demux_opts),
+ .defaults = &(const struct demux_opts){
+ .enable_cache = -1, // auto
+ .max_bytes = 150 * 1024 * 1024,
+ .max_bytes_bw = 50 * 1024 * 1024,
+ .donate_fw = true,
+ .min_secs = 1.0,
+ .min_secs_cache = 1000.0 * 60 * 60,
+ .seekable_cache = -1,
+ .index_mode = 1,
+ .mf_fps = 1.0,
+ .access_references = true,
+ .video_back_preroll = -1,
+ .audio_back_preroll = -1,
+ .back_seek_size = 60,
+ .back_batch = {
+ [STREAM_VIDEO] = 1,
+ [STREAM_AUDIO] = 10,
+ },
+ .meta_cp = "auto",
+ },
+ .get_sub_options = get_demux_sub_opts,
+};
+
+struct demux_internal {
+ struct mp_log *log;
+ struct mpv_global *global;
+ struct stats_ctx *stats;
+
+ bool can_cache; // not a slave demuxer; caching makes sense
+ bool can_record; // stream recording is allowed
+
+ // The demuxer runs potentially in another thread, so we keep two demuxer
+ // structs; the real demuxer can access the shadow struct only.
+ struct demuxer *d_thread; // accessed by demuxer impl. (producer)
+ struct demuxer *d_user; // accessed by player (consumer)
+
+ // The lock protects the packet queues (struct demux_stream),
+ // and the fields below.
+ mp_mutex lock;
+ mp_cond wakeup;
+ mp_thread thread;
+
+ // -- All the following fields are protected by lock.
+
+ bool thread_terminate;
+ bool threading;
+ bool shutdown_async;
+ void (*wakeup_cb)(void *ctx);
+ void *wakeup_cb_ctx;
+
+ struct sh_stream **streams;
+ int num_streams;
+
+ char *meta_charset;
+
+ // If non-NULL, a stream which is used for global (timed) metadata. It will
+ // be an arbitrary stream, which hopefully will happen to work.
+ struct sh_stream *metadata_stream;
+
+ int events;
+
+ struct demux_cache *cache;
+
+ bool warned_queue_overflow;
+ bool eof; // whether we're in EOF state
+ double min_secs;
+ double hyst_secs; // stop reading till there's hyst_secs remaining
+ bool hyst_active;
+ size_t max_bytes;
+ size_t max_bytes_bw;
+ bool seekable_cache;
+ bool using_network_cache_opts;
+ char *record_filename;
+
+ // Whether the demuxer thread should prefetch packets. This is set to false
+ // if EOF was reached or the demuxer cache is full. This is also important
+ // in the initial state: the decoder thread needs to select streams before
+ // the first packet is read, so this is set to true by packet reading only.
+ // Reset to false again on EOF or if prefetching is done.
+ bool reading;
+
+ // Set if we just performed a seek, without reading packets yet. Used to
+ // avoid a redundant initial seek after enabling streams. We could just
+ // allow it, but to avoid buggy seeking affecting normal playback, we don't.
+ bool after_seek;
+ // Set in addition to after_seek if we think we seeked to the start of the
+ // file (or if the demuxer was just opened).
+ bool after_seek_to_start;
+
+ // Demuxing backwards. Since demuxer implementations don't support this
+ // directly, it is emulated by seeking backwards for every packet run. Also,
+ // packets between keyframes are demuxed forwards (you can't decode that
+ // stuff otherwise), which adds complexity on top of it.
+ bool back_demuxing;
+
+ // For backward demuxing:
+ bool need_back_seek; // back-step seek needs to be triggered
+ bool back_any_need_recheck; // at least 1 ds->back_need_recheck set
+
+ bool tracks_switched; // thread needs to inform demuxer of this
+
+ bool seeking; // there's a seek queued
+ int seek_flags; // flags for next seek (if seeking==true)
+ double seek_pts;
+
+ // (fields for debugging)
+ double seeking_in_progress; // low level seek state
+ int low_level_seeks; // number of started low level seeks
+ double demux_ts; // last demuxed DTS or PTS
+
+ double ts_offset; // timestamp offset to apply to everything
+
+ // (sorted by least recent use: index 0 is least recently used)
+ struct demux_cached_range **ranges;
+ int num_ranges;
+
+ size_t total_bytes; // total sum of packet data buffered
+ // Range from which decoder is reading, and to which demuxer is appending.
+ // This is normally never NULL. This is always ranges[num_ranges - 1].
+ // This is can be NULL during initialization or deinitialization.
+ struct demux_cached_range *current_range;
+
+ double highest_av_pts; // highest non-subtitle PTS seen - for duration
+
+ bool blocked;
+
+ // Transient state.
+ double duration;
+ // Cached state.
+ int64_t stream_size;
+ int64_t last_speed_query;
+ double speed_query_prev_sample;
+ uint64_t bytes_per_second;
+ int64_t next_cache_update;
+
+ // demux user state (user thread, somewhat similar to reader/decoder state)
+ double last_playback_pts; // last playback_pts from demux_update()
+ bool force_metadata_update;
+ int cached_metadata_index; // speed up repeated lookups
+
+ struct mp_recorder *dumper;
+ int dumper_status;
+
+ bool owns_stream;
+
+ // -- Access from demuxer thread only
+ bool enable_recording;
+ struct mp_recorder *recorder;
+ int64_t slave_unbuffered_read_bytes; // value repoted from demuxer impl.
+ int64_t hack_unbuffered_read_bytes; // for demux_get_bytes_read_hack()
+ int64_t cache_unbuffered_read_bytes; // for demux_reader_state.bytes_per_second
+ int64_t byte_level_seeks; // for demux_reader_state.byte_level_seeks
+};
+
+struct timed_metadata {
+ double pts;
+ struct mp_tags *tags;
+ bool from_stream;
+};
+
+// A continuous range of cached packets for all enabled streams.
+// (One demux_queue for each known stream.)
+struct demux_cached_range {
+ // streams[] is indexed by demux_stream->index
+ struct demux_queue **streams;
+ int num_streams;
+
+ // Computed from the stream queue's values. These fields (unlike as with
+ // demux_queue) are always either NOPTS, or fully valid.
+ double seek_start, seek_end;
+
+ bool is_bof; // set if the file begins with this range
+ bool is_eof; // set if the file ends with this range
+
+ struct timed_metadata **metadata;
+ int num_metadata;
+};
+
+#define QUEUE_INDEX_SIZE_MASK(queue) ((queue)->index_size - 1)
+
+// Access the idx-th entry in the given demux_queue.
+// Requirement: idx >= 0 && idx < queue->num_index
+#define QUEUE_INDEX_ENTRY(queue, idx) \
+ ((queue)->index[((queue)->index0 + (idx)) & QUEUE_INDEX_SIZE_MASK(queue)])
+
+// Don't index packets whose timestamps that are within the last index entry by
+// this amount of time (it's better to seek them manually).
+#define INDEX_STEP_SIZE 1.0
+
+struct index_entry {
+ double pts;
+ struct demux_packet *pkt;
+};
+
+// A continuous list of cached packets for a single stream/range. There is one
+// for each stream and range. Also contains some state for use during demuxing
+// (keeping it across seeks makes it easier to resume demuxing).
+struct demux_queue {
+ struct demux_stream *ds;
+ struct demux_cached_range *range;
+
+ struct demux_packet *head;
+ struct demux_packet *tail;
+
+ uint64_t tail_cum_pos; // cumulative size including tail packet
+
+ bool correct_dts; // packet DTS is strictly monotonically increasing
+ bool correct_pos; // packet pos is strictly monotonically increasing
+ int64_t last_pos; // for determining correct_pos
+ int64_t last_pos_fixup; // for filling in unset dp->pos values
+ double last_dts; // for determining correct_dts
+ double last_ts; // timestamp of the last packet added to queue
+
+ // for incrementally determining seek PTS range
+ struct demux_packet *keyframe_latest;
+ struct demux_packet *keyframe_first; // cached value of first KF packet
+
+ // incrementally maintained seek range, possibly invalid
+ double seek_start, seek_end;
+ double last_pruned; // timestamp of last pruned keyframe
+
+ bool is_bof; // started demuxing at beginning of file
+ bool is_eof; // received true EOF here
+
+ // Complete index, though it may skip some entries to reduce density.
+ struct index_entry *index; // ring buffer
+ size_t index_size; // size of index[] (0 or a power of 2)
+ size_t index0; // first index entry
+ size_t num_index; // number of index entries (wraps on index_size)
+};
+
+struct demux_stream {
+ struct demux_internal *in;
+ struct sh_stream *sh; // ds->sh->ds == ds
+ enum stream_type type; // equals to sh->type
+ int index; // equals to sh->index
+ // --- all fields are protected by in->lock
+
+ void (*wakeup_cb)(void *ctx);
+ void *wakeup_cb_ctx;
+
+ // demuxer state
+ bool selected; // user wants packets from this stream
+ bool eager; // try to keep at least 1 packet queued
+ // if false, this stream is disabled, or passively
+ // read (like subtitles)
+ bool still_image; // stream has still video images
+ bool refreshing; // finding old position after track switches
+ bool eof; // end of demuxed stream? (true if no more packets)
+
+ bool global_correct_dts;// all observed so far
+ bool global_correct_pos;
+
+ // current queue - used both for reading and demuxing (this is never NULL)
+ struct demux_queue *queue;
+
+ // reader (decoder) state (bitrate calculations are part of it because we
+ // want to return the bitrate closest to the "current position")
+ double base_ts; // timestamp of the last packet returned to decoder
+ double last_br_ts; // timestamp of last packet bitrate was calculated
+ size_t last_br_bytes; // summed packet sizes since last bitrate calculation
+ double bitrate;
+ struct demux_packet *reader_head; // points at current decoder position
+ bool skip_to_keyframe;
+ bool attached_picture_added;
+ bool need_wakeup; // call wakeup_cb on next reader_head state change
+ double force_read_until;// eager=false streams (subs): force read-ahead
+
+ // For demux_internal.dumper. Currently, this is used only temporarily
+ // during blocking dumping.
+ struct demux_packet *dump_pos;
+
+ // for refresh seeks: pos/dts of last packet returned to reader
+ int64_t last_ret_pos;
+ double last_ret_dts;
+
+ // Backwards demuxing.
+ bool back_need_recheck; // flag for incremental find_backward_restart_pos work
+ // pos/dts of the previous keyframe packet returned; always valid if back-
+ // demuxing is enabled, and back_restart_eof/back_restart_next are false.
+ int64_t back_restart_pos;
+ double back_restart_dts;
+ bool back_restart_eof; // restart position is at EOF; overrides pos/dts
+ bool back_restart_next; // restart before next keyframe; overrides above
+ bool back_restarting; // searching keyframe before restart pos
+ // Current PTS lower bound for back demuxing.
+ double back_seek_pos;
+ // pos/dts of the packet to resume demuxing from when another stream caused
+ // a seek backward to get more packets. reader_head will be reset to this
+ // packet as soon as it's encountered again.
+ int64_t back_resume_pos;
+ double back_resume_dts;
+ bool back_resuming; // resuming mode (above fields are valid/used)
+ // Set to true if the first packet (keyframe) of a range was returned.
+ bool back_range_started;
+ // Number of KF packets at start of range yet to return. -1 is used for BOF.
+ int back_range_count;
+ // Number of KF packets yet to return that are marked as preroll.
+ int back_range_preroll;
+ // Static packet preroll count.
+ int back_preroll;
+
+ // for closed captions (demuxer_feed_caption)
+ struct sh_stream *cc;
+ bool ignore_eof; // ignore stream in underrun detection
+};
+
+static void switch_to_fresh_cache_range(struct demux_internal *in);
+static void demuxer_sort_chapters(demuxer_t *demuxer);
+static MP_THREAD_VOID demux_thread(void *pctx);
+static void update_cache(struct demux_internal *in);
+static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp);
+static struct demux_packet *advance_reader_head(struct demux_stream *ds);
+static bool queue_seek(struct demux_internal *in, double seek_pts, int flags,
+ bool clear_back_state);
+static struct demux_packet *compute_keyframe_times(struct demux_packet *pkt,
+ double *out_kf_min,
+ double *out_kf_max);
+static void find_backward_restart_pos(struct demux_stream *ds);
+static struct demux_packet *find_seek_target(struct demux_queue *queue,
+ double pts, int flags);
+static void prune_old_packets(struct demux_internal *in);
+static void dumper_close(struct demux_internal *in);
+static void demux_convert_tags_charset(struct demuxer *demuxer);
+
+static uint64_t get_forward_buffered_bytes(struct demux_stream *ds)
+{
+ if (!ds->reader_head)
+ return 0;
+ return ds->queue->tail_cum_pos - ds->reader_head->cum_pos;
+}
+
+#if 0
+// very expensive check for redundant cached queue state
+static void check_queue_consistency(struct demux_internal *in)
+{
+ uint64_t total_bytes = 0;
+
+ assert(in->current_range && in->num_ranges > 0);
+ assert(in->current_range == in->ranges[in->num_ranges - 1]);
+
+ for (int n = 0; n < in->num_ranges; n++) {
+ struct demux_cached_range *range = in->ranges[n];
+ int range_num_packets = 0;
+
+ assert(range->num_streams == in->num_streams);
+
+ for (int i = 0; i < range->num_streams; i++) {
+ struct demux_queue *queue = range->streams[i];
+
+ assert(queue->range == range);
+
+ size_t fw_bytes = 0;
+ bool is_forward = false;
+ bool kf_found = false;
+ bool kf1_found = false;
+ size_t next_index = 0;
+ uint64_t queue_total_bytes = 0;
+ for (struct demux_packet *dp = queue->head; dp; dp = dp->next) {
+ is_forward |= dp == queue->ds->reader_head;
+ kf_found |= dp == queue->keyframe_latest;
+ kf1_found |= dp == queue->keyframe_first;
+
+ size_t bytes = demux_packet_estimate_total_size(dp);
+ total_bytes += bytes;
+ queue_total_bytes += bytes;
+ if (is_forward) {
+ fw_bytes += bytes;
+ assert(range == in->current_range);
+ assert(queue->ds->queue == queue);
+ }
+ range_num_packets += 1;
+
+ if (!dp->next)
+ assert(queue->tail == dp);
+
+ if (next_index < queue->num_index &&
+ QUEUE_INDEX_ENTRY(queue, next_index).pkt == dp)
+ next_index += 1;
+ }
+ if (!queue->head)
+ assert(!queue->tail);
+ assert(next_index == queue->num_index);
+
+ uint64_t queue_total_bytes2 = 0;
+ if (queue->head)
+ queue_total_bytes2 = queue->tail_cum_pos - queue->head->cum_pos;
+
+ assert(queue_total_bytes == queue_total_bytes2);
+
+ // If the queue is currently used...
+ if (queue->ds->queue == queue) {
+ // ...reader_head and others must be in the queue.
+ assert(is_forward == !!queue->ds->reader_head);
+ assert(kf_found == !!queue->keyframe_latest);
+ uint64_t fw_bytes2 = get_forward_buffered_bytes(queue->ds);
+ assert(fw_bytes == fw_bytes2);
+ }
+
+ assert(kf1_found == !!queue->keyframe_first);
+
+ if (range != in->current_range) {
+ assert(fw_bytes == 0);
+ }
+
+ if (queue->keyframe_latest)
+ assert(queue->keyframe_latest->keyframe);
+
+ total_bytes += queue->index_size * sizeof(struct index_entry);
+ }
+
+ // Invariant needed by pruning; violation has worse effects than just
+ // e.g. broken seeking due to incorrect seek ranges.
+ if (range->seek_start != MP_NOPTS_VALUE)
+ assert(range_num_packets > 0);
+ }
+
+ assert(in->total_bytes == total_bytes);
+}
+#endif
+
+// (this doesn't do most required things for a switch, like updating ds->queue)
+static void set_current_range(struct demux_internal *in,
+ struct demux_cached_range *range)
+{
+ in->current_range = range;
+
+ // Move to in->ranges[in->num_ranges-1] (for LRU sorting/invariant)
+ for (int n = 0; n < in->num_ranges; n++) {
+ if (in->ranges[n] == range) {
+ MP_TARRAY_REMOVE_AT(in->ranges, in->num_ranges, n);
+ break;
+ }
+ }
+ MP_TARRAY_APPEND(in, in->ranges, in->num_ranges, range);
+}
+
+static void prune_metadata(struct demux_cached_range *range)
+{
+ int first_needed = 0;
+
+ if (range->seek_start == MP_NOPTS_VALUE) {
+ first_needed = range->num_metadata;
+ } else {
+ for (int n = 0; n < range->num_metadata ; n++) {
+ if (range->metadata[n]->pts > range->seek_start)
+ break;
+ first_needed = n;
+ }
+ }
+
+ // Always preserve the last entry.
+ first_needed = MPMIN(first_needed, range->num_metadata - 1);
+
+ // (Could make this significantly more efficient for large first_needed,
+ // however that might be very rare and even then it might not matter.)
+ for (int n = 0; n < first_needed; n++) {
+ talloc_free(range->metadata[0]);
+ MP_TARRAY_REMOVE_AT(range->metadata, range->num_metadata, 0);
+ }
+}
+
+// Refresh range->seek_start/end. Idempotent.
+static void update_seek_ranges(struct demux_cached_range *range)
+{
+ range->seek_start = range->seek_end = MP_NOPTS_VALUE;
+ range->is_bof = true;
+ range->is_eof = true;
+
+ double min_start_pts = MP_NOPTS_VALUE;
+ double max_end_pts = MP_NOPTS_VALUE;
+
+ for (int n = 0; n < range->num_streams; n++) {
+ struct demux_queue *queue = range->streams[n];
+
+ if (queue->ds->selected && queue->ds->eager) {
+ if (queue->is_bof) {
+ min_start_pts = MP_PTS_MIN(min_start_pts, queue->seek_start);
+ } else {
+ range->seek_start =
+ MP_PTS_MAX(range->seek_start, queue->seek_start);
+ }
+
+ if (queue->is_eof) {
+ max_end_pts = MP_PTS_MAX(max_end_pts, queue->seek_end);
+ } else {
+ range->seek_end = MP_PTS_MIN(range->seek_end, queue->seek_end);
+ }
+
+ range->is_eof &= queue->is_eof;
+ range->is_bof &= queue->is_bof;
+
+ bool empty = queue->is_eof && !queue->head;
+ if (queue->seek_start >= queue->seek_end && !empty &&
+ !(queue->seek_start == queue->seek_end &&
+ queue->seek_start != MP_NOPTS_VALUE))
+ goto broken;
+ }
+ }
+
+ if (range->is_eof)
+ range->seek_end = max_end_pts;
+ if (range->is_bof)
+ range->seek_start = min_start_pts;
+
+ // Sparse (subtitle) stream behavior is not very clearly defined, but
+ // usually we don't want it to restrict the range of other streams. For
+ // example, if there are subtitle packets at position 5 and 10 seconds, and
+ // the demuxer demuxed the other streams until position 7 seconds, the seek
+ // range end position is 7.
+ // Assume that reading a non-sparse (audio/video) packet gets all sparse
+ // packets that are needed before that non-sparse packet.
+ // This is incorrect in any of these cases:
+ // - sparse streams only (it's unknown how to determine an accurate range)
+ // - if sparse streams have non-keyframe packets (we set queue->last_pruned
+ // to the start of the pruned keyframe range - we'd need the end or so)
+ // We also assume that ds->eager equals to a stream not being sparse
+ // (usually true, except if only sparse streams are selected).
+ // We also rely on the fact that the demuxer position will always be ahead
+ // of the seek_end for audio/video, because they need to prefetch at least
+ // 1 packet to detect the end of a keyframe range. This means that there's
+ // a relatively high guarantee to have all sparse (subtitle) packets within
+ // the seekable range.
+ // As a consequence, the code _never_ checks queue->seek_end for a sparse
+ // queue, as the end of it is implied by the highest PTS of a non-sparse
+ // stream (i.e. the latest demuxer position).
+ // On the other hand, if a sparse packet was pruned, and that packet has
+ // a higher PTS than seek_start for non-sparse queues, that packet is
+ // missing. So the range's seek_start needs to be adjusted accordingly.
+ for (int n = 0; n < range->num_streams; n++) {
+ struct demux_queue *queue = range->streams[n];
+ if (queue->ds->selected && !queue->ds->eager &&
+ queue->last_pruned != MP_NOPTS_VALUE &&
+ range->seek_start != MP_NOPTS_VALUE)
+ {
+ // (last_pruned is _exclusive_ to the seekable range, so add a small
+ // value to exclude it from the valid range.)
+ range->seek_start =
+ MP_PTS_MAX(range->seek_start, queue->last_pruned + 0.1);
+ }
+ }
+
+ if (range->seek_start >= range->seek_end)
+ goto broken;
+
+ prune_metadata(range);
+ return;
+
+broken:
+ range->seek_start = range->seek_end = MP_NOPTS_VALUE;
+ prune_metadata(range);
+}
+
+// Remove queue->head from the queue.
+static void remove_head_packet(struct demux_queue *queue)
+{
+ struct demux_packet *dp = queue->head;
+
+ assert(queue->ds->reader_head != dp);
+ if (queue->keyframe_first == dp)
+ queue->keyframe_first = NULL;
+ if (queue->keyframe_latest == dp)
+ queue->keyframe_latest = NULL;
+ queue->is_bof = false;
+
+ uint64_t end_pos = dp->next ? dp->next->cum_pos : queue->tail_cum_pos;
+ queue->ds->in->total_bytes -= end_pos - dp->cum_pos;
+
+ if (queue->num_index && queue->index[queue->index0].pkt == dp) {
+ queue->index0 = (queue->index0 + 1) & QUEUE_INDEX_SIZE_MASK(queue);
+ queue->num_index -= 1;
+ }
+
+ queue->head = dp->next;
+ if (!queue->head)
+ queue->tail = NULL;
+
+ talloc_free(dp);
+}
+
+static void free_index(struct demux_queue *queue)
+{
+ struct demux_stream *ds = queue->ds;
+ struct demux_internal *in = ds->in;
+
+ in->total_bytes -= queue->index_size * sizeof(queue->index[0]);
+ queue->index_size = 0;
+ queue->index0 = 0;
+ queue->num_index = 0;
+ TA_FREEP(&queue->index);
+}
+
+static void clear_queue(struct demux_queue *queue)
+{
+ struct demux_stream *ds = queue->ds;
+ struct demux_internal *in = ds->in;
+
+ if (queue->head)
+ in->total_bytes -= queue->tail_cum_pos - queue->head->cum_pos;
+
+ free_index(queue);
+
+ struct demux_packet *dp = queue->head;
+ while (dp) {
+ struct demux_packet *dn = dp->next;
+ assert(ds->reader_head != dp);
+ talloc_free(dp);
+ dp = dn;
+ }
+ queue->head = queue->tail = NULL;
+ queue->keyframe_first = NULL;
+ queue->keyframe_latest = NULL;
+ queue->seek_start = queue->seek_end = queue->last_pruned = MP_NOPTS_VALUE;
+
+ queue->correct_dts = queue->correct_pos = true;
+ queue->last_pos = -1;
+ queue->last_ts = queue->last_dts = MP_NOPTS_VALUE;
+ queue->last_pos_fixup = -1;
+
+ queue->is_eof = false;
+ queue->is_bof = false;
+}
+
+static void clear_cached_range(struct demux_internal *in,
+ struct demux_cached_range *range)
+{
+ for (int n = 0; n < range->num_streams; n++)
+ clear_queue(range->streams[n]);
+
+ for (int n = 0; n < range->num_metadata; n++)
+ talloc_free(range->metadata[n]);
+ range->num_metadata = 0;
+
+ update_seek_ranges(range);
+}
+
+// Remove ranges with no data (except in->current_range). Also remove excessive
+// ranges.
+static void free_empty_cached_ranges(struct demux_internal *in)
+{
+ while (1) {
+ struct demux_cached_range *worst = NULL;
+
+ int end = in->num_ranges - 1;
+
+ // (Not set during early init or late destruction.)
+ if (in->current_range) {
+ assert(in->current_range && in->num_ranges > 0);
+ assert(in->current_range == in->ranges[in->num_ranges - 1]);
+ end -= 1;
+ }
+
+ for (int n = end; n >= 0; n--) {
+ struct demux_cached_range *range = in->ranges[n];
+ if (range->seek_start == MP_NOPTS_VALUE || !in->seekable_cache) {
+ clear_cached_range(in, range);
+ MP_TARRAY_REMOVE_AT(in->ranges, in->num_ranges, n);
+ for (int i = 0; i < range->num_streams; i++)
+ talloc_free(range->streams[i]);
+ talloc_free(range);
+ } else {
+ if (!worst || (range->seek_end - range->seek_start <
+ worst->seek_end - worst->seek_start))
+ worst = range;
+ }
+ }
+
+ if (in->num_ranges <= MAX_SEEK_RANGES || !worst)
+ break;
+
+ clear_cached_range(in, worst);
+ }
+}
+
+static void ds_clear_reader_queue_state(struct demux_stream *ds)
+{
+ ds->reader_head = NULL;
+ ds->eof = false;
+ ds->need_wakeup = true;
+}
+
+static void ds_clear_reader_state(struct demux_stream *ds,
+ bool clear_back_state)
+{
+ ds_clear_reader_queue_state(ds);
+
+ ds->base_ts = ds->last_br_ts = MP_NOPTS_VALUE;
+ ds->last_br_bytes = 0;
+ ds->bitrate = -1;
+ ds->skip_to_keyframe = false;
+ ds->attached_picture_added = false;
+ ds->last_ret_pos = -1;
+ ds->last_ret_dts = MP_NOPTS_VALUE;
+ ds->force_read_until = MP_NOPTS_VALUE;
+
+ if (clear_back_state) {
+ ds->back_restart_pos = -1;
+ ds->back_restart_dts = MP_NOPTS_VALUE;
+ ds->back_restart_eof = false;
+ ds->back_restart_next = ds->in->back_demuxing;
+ ds->back_restarting = ds->in->back_demuxing && ds->eager;
+ ds->back_seek_pos = MP_NOPTS_VALUE;
+ ds->back_resume_pos = -1;
+ ds->back_resume_dts = MP_NOPTS_VALUE;
+ ds->back_resuming = false;
+ ds->back_range_started = false;
+ ds->back_range_count = 0;
+ ds->back_range_preroll = 0;
+ }
+}
+
+// called locked, from user thread only
+static void clear_reader_state(struct demux_internal *in,
+ bool clear_back_state)
+{
+ for (int n = 0; n < in->num_streams; n++)
+ ds_clear_reader_state(in->streams[n]->ds, clear_back_state);
+ in->warned_queue_overflow = false;
+ in->d_user->filepos = -1; // implicitly synchronized
+ in->blocked = false;
+ in->need_back_seek = false;
+}
+
+// Call if the observed reader state on this stream somehow changes. The wakeup
+// is skipped if the reader successfully read a packet, because that means we
+// expect it to come back and ask for more.
+static void wakeup_ds(struct demux_stream *ds)
+{
+ if (ds->need_wakeup) {
+ if (ds->wakeup_cb) {
+ ds->wakeup_cb(ds->wakeup_cb_ctx);
+ } else if (ds->in->wakeup_cb) {
+ ds->in->wakeup_cb(ds->in->wakeup_cb_ctx);
+ }
+ ds->need_wakeup = false;
+ mp_cond_signal(&ds->in->wakeup);
+ }
+}
+
+static void update_stream_selection_state(struct demux_internal *in,
+ struct demux_stream *ds)
+{
+ ds->eof = false;
+ ds->refreshing = false;
+
+ // We still have to go over the whole stream list to update ds->eager for
+ // other streams too, because they depend on other stream's selections.
+
+ bool any_av_streams = false;
+ bool any_streams = false;
+
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *s = in->streams[n]->ds;
+
+ s->still_image = s->sh->still_image;
+ s->eager = s->selected && !s->sh->attached_picture;
+ if (s->eager && !s->still_image)
+ any_av_streams |= s->type != STREAM_SUB;
+ any_streams |= s->selected;
+ }
+
+ // Subtitles are only eagerly read if there are no other eagerly read
+ // streams.
+ if (any_av_streams) {
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *s = in->streams[n]->ds;
+
+ if (s->type == STREAM_SUB)
+ s->eager = false;
+ }
+ }
+
+ if (!any_streams)
+ in->blocked = false;
+
+ ds_clear_reader_state(ds, true);
+
+ // Make sure any stream reselection or addition is reflected in the seek
+ // ranges, and also get rid of data that is not needed anymore (or
+ // rather, which can't be kept consistent). This has to happen after we've
+ // updated all the subtle state (like s->eager).
+ for (int n = 0; n < in->num_ranges; n++) {
+ struct demux_cached_range *range = in->ranges[n];
+
+ if (!ds->selected)
+ clear_queue(range->streams[ds->index]);
+
+ update_seek_ranges(range);
+ }
+
+ free_empty_cached_ranges(in);
+
+ wakeup_ds(ds);
+}
+
+void demux_set_ts_offset(struct demuxer *demuxer, double offset)
+{
+ struct demux_internal *in = demuxer->in;
+ mp_mutex_lock(&in->lock);
+ in->ts_offset = offset;
+ mp_mutex_unlock(&in->lock);
+}
+
+static void add_missing_streams(struct demux_internal *in,
+ struct demux_cached_range *range)
+{
+ for (int n = range->num_streams; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+
+ struct demux_queue *queue = talloc_ptrtype(NULL, queue);
+ *queue = (struct demux_queue){
+ .ds = ds,
+ .range = range,
+ };
+ clear_queue(queue);
+ MP_TARRAY_APPEND(range, range->streams, range->num_streams, queue);
+ assert(range->streams[ds->index] == queue);
+ }
+}
+
+// Allocate a new sh_stream of the given type. It either has to be released
+// with talloc_free(), or added to a demuxer with demux_add_sh_stream(). You
+// cannot add or read packets from the stream before it has been added.
+// type may be changed later, but only before demux_add_sh_stream().
+struct sh_stream *demux_alloc_sh_stream(enum stream_type type)
+{
+ struct sh_stream *sh = talloc_ptrtype(NULL, sh);
+ *sh = (struct sh_stream) {
+ .type = type,
+ .index = -1,
+ .ff_index = -1, // may be overwritten by demuxer
+ .demuxer_id = -1, // ... same
+ .program_id = -1, // ... same
+ .codec = talloc_zero(sh, struct mp_codec_params),
+ .tags = talloc_zero(sh, struct mp_tags),
+ };
+ sh->codec->type = type;
+ return sh;
+}
+
+// Add a new sh_stream to the demuxer. Note that as soon as the stream has been
+// added, it must be immutable, and must not be released (this will happen when
+// the demuxer is destroyed).
+static void demux_add_sh_stream_locked(struct demux_internal *in,
+ struct sh_stream *sh)
+{
+ assert(!sh->ds); // must not be added yet
+
+ sh->index = in->num_streams;
+
+ sh->ds = talloc(sh, struct demux_stream);
+ *sh->ds = (struct demux_stream) {
+ .in = in,
+ .sh = sh,
+ .type = sh->type,
+ .index = sh->index,
+ .global_correct_dts = true,
+ .global_correct_pos = true,
+ };
+
+ struct demux_stream *ds = sh->ds;
+
+ if (!sh->codec->codec)
+ sh->codec->codec = "";
+
+ if (sh->ff_index < 0)
+ sh->ff_index = sh->index;
+
+ MP_TARRAY_APPEND(in, in->streams, in->num_streams, sh);
+ assert(in->streams[sh->index] == sh);
+
+ if (in->current_range) {
+ for (int n = 0; n < in->num_ranges; n++)
+ add_missing_streams(in, in->ranges[n]);
+
+ sh->ds->queue = in->current_range->streams[sh->ds->index];
+ }
+
+ update_stream_selection_state(in, sh->ds);
+
+ switch (ds->type) {
+ case STREAM_AUDIO:
+ ds->back_preroll = in->d_user->opts->audio_back_preroll;
+ if (ds->back_preroll < 0) { // auto
+ ds->back_preroll = mp_codec_is_lossless(sh->codec->codec) ? 0 : 1;
+ if (sh->codec->codec && (strcmp(sh->codec->codec, "opus") == 0 ||
+ strcmp(sh->codec->codec, "vorbis") == 0 ||
+ strcmp(sh->codec->codec, "mp3") == 0))
+ ds->back_preroll = 2;
+ }
+ break;
+ case STREAM_VIDEO:
+ ds->back_preroll = in->d_user->opts->video_back_preroll;
+ if (ds->back_preroll < 0)
+ ds->back_preroll = 0; // auto
+ break;
+ }
+
+ if (!ds->sh->attached_picture) {
+ // Typically this is used for webradio, so any stream will do.
+ if (!in->metadata_stream)
+ in->metadata_stream = sh;
+ }
+
+ in->events |= DEMUX_EVENT_STREAMS;
+ if (in->wakeup_cb)
+ in->wakeup_cb(in->wakeup_cb_ctx);
+}
+
+// For demuxer implementations only.
+void demux_add_sh_stream(struct demuxer *demuxer, struct sh_stream *sh)
+{
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_thread);
+ mp_mutex_lock(&in->lock);
+ demux_add_sh_stream_locked(in, sh);
+ mp_mutex_unlock(&in->lock);
+}
+
+// Return a stream with the given index. Since streams can only be added during
+// the lifetime of the demuxer, it is guaranteed that an index within the valid
+// range [0, demux_get_num_stream()) always returns a valid sh_stream pointer,
+// which will be valid until the demuxer is destroyed.
+struct sh_stream *demux_get_stream(struct demuxer *demuxer, int index)
+{
+ struct demux_internal *in = demuxer->in;
+ mp_mutex_lock(&in->lock);
+ assert(index >= 0 && index < in->num_streams);
+ struct sh_stream *r = in->streams[index];
+ mp_mutex_unlock(&in->lock);
+ return r;
+}
+
+// See demux_get_stream().
+int demux_get_num_stream(struct demuxer *demuxer)
+{
+ struct demux_internal *in = demuxer->in;
+ mp_mutex_lock(&in->lock);
+ int r = in->num_streams;
+ mp_mutex_unlock(&in->lock);
+ return r;
+}
+
+// It's UB to call anything but demux_dealloc() on the demuxer after this.
+static void demux_shutdown(struct demux_internal *in)
+{
+ struct demuxer *demuxer = in->d_user;
+
+ if (in->recorder) {
+ mp_recorder_destroy(in->recorder);
+ in->recorder = NULL;
+ }
+
+ dumper_close(in);
+
+ if (demuxer->desc->close)
+ demuxer->desc->close(in->d_thread);
+ demuxer->priv = NULL;
+ in->d_thread->priv = NULL;
+
+ demux_flush(demuxer);
+ assert(in->total_bytes == 0);
+
+ in->current_range = NULL;
+ free_empty_cached_ranges(in);
+
+ talloc_free(in->cache);
+ in->cache = NULL;
+
+ if (in->owns_stream)
+ free_stream(demuxer->stream);
+ demuxer->stream = NULL;
+}
+
+static void demux_dealloc(struct demux_internal *in)
+{
+ for (int n = 0; n < in->num_streams; n++)
+ talloc_free(in->streams[n]);
+ mp_mutex_destroy(&in->lock);
+ mp_cond_destroy(&in->wakeup);
+ talloc_free(in->d_user);
+}
+
+void demux_free(struct demuxer *demuxer)
+{
+ if (!demuxer)
+ return;
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_user);
+
+ demux_stop_thread(demuxer);
+ demux_shutdown(in);
+ demux_dealloc(in);
+}
+
+// Start closing the demuxer and eventually freeing the demuxer asynchronously.
+// You must not access the demuxer once this has been started. Once the demuxer
+// is shutdown, the wakeup callback is invoked. Then you need to call
+// demux_free_async_finish() to end the operation (it must not be called from
+// the wakeup callback).
+// This can return NULL. Then the demuxer cannot be free'd asynchronously, and
+// you need to call demux_free() instead.
+struct demux_free_async_state *demux_free_async(struct demuxer *demuxer)
+{
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_user);
+
+ if (!in->threading)
+ return NULL;
+
+ mp_mutex_lock(&in->lock);
+ in->thread_terminate = true;
+ in->shutdown_async = true;
+ mp_cond_signal(&in->wakeup);
+ mp_mutex_unlock(&in->lock);
+
+ return (struct demux_free_async_state *)demuxer->in; // lies
+}
+
+// As long as state is valid, you can call this to request immediate abort.
+// Roughly behaves as demux_cancel_and_free(), except you still need to wait
+// for the result.
+void demux_free_async_force(struct demux_free_async_state *state)
+{
+ struct demux_internal *in = (struct demux_internal *)state; // reverse lies
+
+ mp_cancel_trigger(in->d_user->cancel);
+}
+
+// Check whether the demuxer is shutdown yet. If not, return false, and you
+// need to call this again in the future (preferably after you were notified by
+// the wakeup callback). If yes, deallocate all state, and return true (in
+// particular, the state ptr becomes invalid, and the wakeup callback will never
+// be called again).
+bool demux_free_async_finish(struct demux_free_async_state *state)
+{
+ struct demux_internal *in = (struct demux_internal *)state; // reverse lies
+
+ mp_mutex_lock(&in->lock);
+ bool busy = in->shutdown_async;
+ mp_mutex_unlock(&in->lock);
+
+ if (busy)
+ return false;
+
+ demux_stop_thread(in->d_user);
+ demux_dealloc(in);
+ return true;
+}
+
+// Like demux_free(), but trigger an abort, which will force the demuxer to
+// terminate immediately. If this wasn't opened with demux_open_url(), there is
+// some chance this will accidentally abort other things via demuxer->cancel.
+void demux_cancel_and_free(struct demuxer *demuxer)
+{
+ if (!demuxer)
+ return;
+ mp_cancel_trigger(demuxer->cancel);
+ demux_free(demuxer);
+}
+
+// Start the demuxer thread, which reads ahead packets on its own.
+void demux_start_thread(struct demuxer *demuxer)
+{
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_user);
+
+ if (!in->threading) {
+ in->threading = true;
+ if (mp_thread_create(&in->thread, demux_thread, in))
+ in->threading = false;
+ }
+}
+
+void demux_stop_thread(struct demuxer *demuxer)
+{
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_user);
+
+ if (in->threading) {
+ mp_mutex_lock(&in->lock);
+ in->thread_terminate = true;
+ mp_cond_signal(&in->wakeup);
+ mp_mutex_unlock(&in->lock);
+ mp_thread_join(in->thread);
+ in->threading = false;
+ in->thread_terminate = false;
+ }
+}
+
+// The demuxer thread will call cb(ctx) if there's a new packet, or EOF is reached.
+void demux_set_wakeup_cb(struct demuxer *demuxer, void (*cb)(void *ctx), void *ctx)
+{
+ struct demux_internal *in = demuxer->in;
+ mp_mutex_lock(&in->lock);
+ in->wakeup_cb = cb;
+ in->wakeup_cb_ctx = ctx;
+ mp_mutex_unlock(&in->lock);
+}
+
+void demux_start_prefetch(struct demuxer *demuxer)
+{
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_user);
+
+ mp_mutex_lock(&in->lock);
+ in->reading = true;
+ mp_cond_signal(&in->wakeup);
+ mp_mutex_unlock(&in->lock);
+}
+
+const char *stream_type_name(enum stream_type type)
+{
+ switch (type) {
+ case STREAM_VIDEO: return "video";
+ case STREAM_AUDIO: return "audio";
+ case STREAM_SUB: return "sub";
+ default: return "unknown";
+ }
+}
+
+static struct sh_stream *demuxer_get_cc_track_locked(struct sh_stream *stream)
+{
+ struct sh_stream *sh = stream->ds->cc;
+
+ if (!sh) {
+ sh = demux_alloc_sh_stream(STREAM_SUB);
+ if (!sh)
+ return NULL;
+ sh->codec->codec = "eia_608";
+ sh->default_track = true;
+ sh->hls_bitrate = stream->hls_bitrate;
+ sh->program_id = stream->program_id;
+ stream->ds->cc = sh;
+ demux_add_sh_stream_locked(stream->ds->in, sh);
+ sh->ds->ignore_eof = true;
+ }
+
+ return sh;
+}
+
+void demuxer_feed_caption(struct sh_stream *stream, demux_packet_t *dp)
+{
+ struct demux_internal *in = stream->ds->in;
+
+ mp_mutex_lock(&in->lock);
+ struct sh_stream *sh = demuxer_get_cc_track_locked(stream);
+ if (!sh) {
+ mp_mutex_unlock(&in->lock);
+ talloc_free(dp);
+ return;
+ }
+
+ dp->keyframe = true;
+ dp->pts = MP_ADD_PTS(dp->pts, -in->ts_offset);
+ dp->dts = MP_ADD_PTS(dp->dts, -in->ts_offset);
+ dp->stream = sh->index;
+ add_packet_locked(sh, dp);
+ mp_mutex_unlock(&in->lock);
+}
+
+static void error_on_backward_demuxing(struct demux_internal *in)
+{
+ if (!in->back_demuxing)
+ return;
+ MP_ERR(in, "Disabling backward demuxing.\n");
+ in->back_demuxing = false;
+ clear_reader_state(in, true);
+}
+
+static void perform_backward_seek(struct demux_internal *in)
+{
+ double target = MP_NOPTS_VALUE;
+
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+
+ if (ds->reader_head && !ds->back_restarting && !ds->back_resuming &&
+ ds->eager)
+ {
+ ds->back_resuming = true;
+ ds->back_resume_pos = ds->reader_head->pos;
+ ds->back_resume_dts = ds->reader_head->dts;
+ }
+
+ target = MP_PTS_MIN(target, ds->back_seek_pos);
+ }
+
+ target = MP_PTS_OR_DEF(target, in->d_thread->start_time);
+
+ MP_VERBOSE(in, "triggering backward seek to get more packets\n");
+ queue_seek(in, target, SEEK_SATAN | SEEK_HR, false);
+ in->reading = true;
+
+ // Don't starve other threads.
+ mp_mutex_unlock(&in->lock);
+ mp_mutex_lock(&in->lock);
+}
+
+// For incremental backward demuxing search work.
+static void check_backward_seek(struct demux_internal *in)
+{
+ in->back_any_need_recheck = false;
+
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+
+ if (ds->back_need_recheck)
+ find_backward_restart_pos(ds);
+ }
+}
+
+// Search for a packet to resume demuxing from.
+// The implementation of this function is quite awkward, because the packet
+// queue is a singly linked list without back links, while it needs to search
+// backwards.
+// This is the core of backward demuxing.
+static void find_backward_restart_pos(struct demux_stream *ds)
+{
+ struct demux_internal *in = ds->in;
+
+ ds->back_need_recheck = false;
+ if (!ds->back_restarting)
+ return;
+
+ struct demux_packet *first = ds->reader_head;
+ struct demux_packet *last = ds->queue->tail;
+
+ if (first && !first->keyframe)
+ MP_WARN(in, "Queue not starting on keyframe.\n");
+
+ // Packet at back_restart_pos. (Note: we don't actually need it, only the
+ // packet immediately before it. But same effort.)
+ // If this is NULL, look for EOF (resume from very last keyframe).
+ struct demux_packet *back_restart = NULL;
+
+ if (ds->back_restart_next) {
+ // Initial state. Switch to one of the other modi.
+
+ for (struct demux_packet *cur = first; cur; cur = cur->next) {
+ // Restart for next keyframe after reader_head.
+ if (cur != first && cur->keyframe) {
+ ds->back_restart_dts = cur->dts;
+ ds->back_restart_pos = cur->pos;
+ ds->back_restart_eof = false;
+ ds->back_restart_next = false;
+ break;
+ }
+ }
+
+ if (ds->back_restart_next && ds->eof) {
+ // Restart from end if nothing was found.
+ ds->back_restart_eof = true;
+ ds->back_restart_next = false;
+ }
+
+ if (ds->back_restart_next)
+ return;
+ }
+
+ if (ds->back_restart_eof) {
+ // We're trying to find EOF (without discarding packets). Only continue
+ // if we really reach EOF.
+ if (!ds->eof)
+ return;
+ } else if (!first && ds->eof) {
+ // Reached EOF during normal backward demuxing. We probably returned the
+ // last keyframe range to user. Need to resume at an earlier position.
+ // Fall through, hit the no-keyframe case (and possibly the BOF check
+ // if there are no packets at all), and then resume_earlier.
+ } else if (!first) {
+ return; // no packets yet
+ } else {
+ assert(last);
+
+ if ((ds->global_correct_dts && last->dts < ds->back_restart_dts) ||
+ (ds->global_correct_pos && last->pos < ds->back_restart_pos))
+ return; // restart pos not reached yet
+
+ // The target we're searching for is apparently before the start of the
+ // queue.
+ if ((ds->global_correct_dts && first->dts > ds->back_restart_dts) ||
+ (ds->global_correct_pos && first->pos > ds->back_restart_pos))
+ goto resume_earlier; // current position is too late; seek back
+
+
+ for (struct demux_packet *cur = first; cur; cur = cur->next) {
+ if ((ds->global_correct_dts && cur->dts == ds->back_restart_dts) ||
+ (ds->global_correct_pos && cur->pos == ds->back_restart_pos))
+ {
+ back_restart = cur;
+ break;
+ }
+ }
+
+ if (!back_restart) {
+ // The packet should have been in the searched range; maybe dts/pos
+ // determinism assumptions were broken.
+ MP_ERR(in, "Demuxer not cooperating.\n");
+ error_on_backward_demuxing(in);
+ return;
+ }
+ }
+
+ // Find where to restart demuxing. It's usually the last keyframe packet
+ // before restart_pos, but might be up to back_preroll + batch keyframe
+ // packets earlier.
+
+ // (Normally, we'd just iterate backwards, but no back links.)
+ int num_kf = 0;
+ struct demux_packet *pre_1 = NULL; // idiotic "optimization" for total=1
+ for (struct demux_packet *dp = first; dp != back_restart; dp = dp->next) {
+ if (dp->keyframe) {
+ num_kf++;
+ pre_1 = dp;
+ }
+ }
+
+ // Number of renderable keyframes to return to user.
+ // (Excludes preroll, which is decoded by user, but then discarded.)
+ int batch = MPMAX(in->d_user->opts->back_batch[ds->type], 1);
+ // Number of keyframes to return to the user in total.
+ int total = batch + ds->back_preroll;
+
+ assert(total >= 1);
+
+ bool is_bof = ds->queue->is_bof &&
+ (first == ds->queue->head || ds->back_seek_pos < ds->queue->seek_start);
+
+ struct demux_packet *target = NULL; // resume pos
+ // nr. of keyframes, incl. target, excl. restart_pos
+ int got_total = num_kf < total && is_bof ? num_kf : total;
+ int got_preroll = MPMAX(got_total - batch, 0);
+
+ if (got_total == 1) {
+ target = pre_1;
+ } else if (got_total <= num_kf) {
+ int cur_kf = 0;
+ for (struct demux_packet *dp = first; dp != back_restart; dp = dp->next) {
+ if (dp->keyframe) {
+ if (num_kf - cur_kf == got_total) {
+ target = dp;
+ break;
+ }
+ cur_kf++;
+ }
+ }
+ }
+
+ if (!target) {
+ if (is_bof) {
+ MP_VERBOSE(in, "BOF for stream %d\n", ds->index);
+ ds->back_restarting = false;
+ ds->back_range_started = false;
+ ds->back_range_count = -1;
+ ds->back_range_preroll = 0;
+ ds->need_wakeup = true;
+ wakeup_ds(ds);
+ return;
+ }
+ goto resume_earlier;
+ }
+
+ // Skip reader_head from previous keyframe to current one.
+ // Or if preroll is involved, the first preroll packet.
+ while (ds->reader_head != target) {
+ if (!advance_reader_head(ds))
+ MP_ASSERT_UNREACHABLE(); // target must be in list
+ }
+
+ double seek_pts;
+ compute_keyframe_times(target, &seek_pts, NULL);
+ if (seek_pts != MP_NOPTS_VALUE)
+ ds->back_seek_pos = seek_pts;
+
+ // For next backward adjust action.
+ struct demux_packet *restart_pkt = NULL;
+ int kf_pos = 0;
+ for (struct demux_packet *dp = target; dp; dp = dp->next) {
+ if (dp->keyframe) {
+ if (kf_pos == got_preroll) {
+ restart_pkt = dp;
+ break;
+ }
+ kf_pos++;
+ }
+ }
+ assert(restart_pkt);
+ ds->back_restart_dts = restart_pkt->dts;
+ ds->back_restart_pos = restart_pkt->pos;
+
+ ds->back_restarting = false;
+ ds->back_range_started = false;
+ ds->back_range_count = got_total;
+ ds->back_range_preroll = got_preroll;
+ ds->need_wakeup = true;
+ wakeup_ds(ds);
+ return;
+
+resume_earlier:
+ // We want to seek back to get earlier packets. But before we do this, we
+ // must be sure that other streams have initialized their state. The only
+ // time when this state is not initialized is right after the seek that
+ // started backward demuxing (not any subsequent backstep seek). If this
+ // initialization is omitted, the stream would try to start demuxing from
+ // the "current" position. If another stream backstepped before that, the
+ // other stream will miss the original seek target, and start playback from
+ // a position that is too early.
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds2 = in->streams[n]->ds;
+ if (ds2 == ds || !ds2->eager)
+ continue;
+
+ if (ds2->back_restarting && ds2->back_restart_next) {
+ MP_VERBOSE(in, "delaying stream %d for %d\n", ds->index, ds2->index);
+ return;
+ }
+ }
+
+ if (ds->back_seek_pos != MP_NOPTS_VALUE) {
+ struct demux_packet *t =
+ find_seek_target(ds->queue, ds->back_seek_pos - 0.001, 0);
+ if (t && t != ds->reader_head) {
+ double pts;
+ compute_keyframe_times(t, &pts, NULL);
+ ds->back_seek_pos = MP_PTS_MIN(ds->back_seek_pos, pts);
+ ds_clear_reader_state(ds, false);
+ ds->reader_head = t;
+ ds->back_need_recheck = true;
+ in->back_any_need_recheck = true;
+ mp_cond_signal(&in->wakeup);
+ } else {
+ ds->back_seek_pos -= in->d_user->opts->back_seek_size;
+ in->need_back_seek = true;
+ }
+ }
+}
+
+// Process that one or multiple packets were added.
+static void back_demux_see_packets(struct demux_stream *ds)
+{
+ struct demux_internal *in = ds->in;
+
+ if (!ds->selected || !in->back_demuxing || !ds->eager)
+ return;
+
+ assert(!(ds->back_resuming && ds->back_restarting));
+
+ if (!ds->global_correct_dts && !ds->global_correct_pos) {
+ MP_ERR(in, "Can't demux backward due to demuxer problems.\n");
+ error_on_backward_demuxing(in);
+ return;
+ }
+
+ while (ds->back_resuming && ds->reader_head) {
+ struct demux_packet *head = ds->reader_head;
+ if ((ds->global_correct_dts && head->dts == ds->back_resume_dts) ||
+ (ds->global_correct_pos && head->pos == ds->back_resume_pos))
+ {
+ ds->back_resuming = false;
+ ds->need_wakeup = true;
+ wakeup_ds(ds); // probably
+ break;
+ }
+ advance_reader_head(ds);
+ }
+
+ if (ds->back_restarting)
+ find_backward_restart_pos(ds);
+}
+
+// Add the keyframe to the end of the index. Not all packets are actually added.
+static void add_index_entry(struct demux_queue *queue, struct demux_packet *dp,
+ double pts)
+{
+ struct demux_internal *in = queue->ds->in;
+
+ assert(dp->keyframe && pts != MP_NOPTS_VALUE);
+
+ if (queue->num_index > 0) {
+ struct index_entry *last = &QUEUE_INDEX_ENTRY(queue, queue->num_index - 1);
+ if (pts - last->pts < INDEX_STEP_SIZE)
+ return;
+ }
+
+ if (queue->num_index == queue->index_size) {
+ // Needs to honor power-of-2 requirement.
+ size_t new_size = MPMAX(128, queue->index_size * 2);
+ assert(!(new_size & (new_size - 1)));
+ MP_DBG(in, "stream %d: resize index to %zu\n", queue->ds->index,
+ new_size);
+ // Note: we could tolerate allocation failure, and just discard the
+ // entire index (and prevent the index from being recreated).
+ MP_RESIZE_ARRAY(NULL, queue->index, new_size);
+ size_t highest_index = queue->index0 + queue->num_index;
+ for (size_t n = queue->index_size; n < highest_index; n++)
+ queue->index[n] = queue->index[n - queue->index_size];
+ in->total_bytes +=
+ (new_size - queue->index_size) * sizeof(queue->index[0]);
+ queue->index_size = new_size;
+ }
+
+ assert(queue->num_index < queue->index_size);
+
+ queue->num_index += 1;
+
+ QUEUE_INDEX_ENTRY(queue, queue->num_index - 1) = (struct index_entry){
+ .pts = pts,
+ .pkt = dp,
+ };
+}
+
+// Check whether the next range in the list is, and if it appears to overlap,
+// try joining it into a single range.
+static void attempt_range_joining(struct demux_internal *in)
+{
+ struct demux_cached_range *current = in->current_range;
+ struct demux_cached_range *next = NULL;
+ double next_dist = INFINITY;
+
+ assert(current && in->num_ranges > 0);
+ assert(current == in->ranges[in->num_ranges - 1]);
+
+ for (int n = 0; n < in->num_ranges - 1; n++) {
+ struct demux_cached_range *range = in->ranges[n];
+
+ if (current->seek_start <= range->seek_start) {
+ // This uses ">" to get some non-0 overlap.
+ double dist = current->seek_end - range->seek_start;
+ if (dist > 0 && dist < next_dist) {
+ next = range;
+ next_dist = dist;
+ }
+ }
+ }
+
+ if (!next)
+ return;
+
+ MP_VERBOSE(in, "going to join ranges %f-%f + %f-%f\n",
+ current->seek_start, current->seek_end,
+ next->seek_start, next->seek_end);
+
+ // Try to find a join point, where packets obviously overlap. (It would be
+ // better and faster to do this incrementally, but probably too complex.)
+ // The current range can overlap arbitrarily with the next one, not only by
+ // the seek overlap, but for arbitrary packet readahead as well.
+ // We also drop the overlapping packets (if joining fails, we discard the
+ // entire next range anyway, so this does no harm).
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+
+ struct demux_queue *q1 = current->streams[n];
+ struct demux_queue *q2 = next->streams[n];
+
+ if (!ds->global_correct_pos && !ds->global_correct_dts) {
+ MP_WARN(in, "stream %d: ranges unjoinable\n", n);
+ goto failed;
+ }
+
+ struct demux_packet *end = q1->tail;
+ bool join_point_found = !end; // no packets yet -> joining will work
+ if (end) {
+ while (q2->head) {
+ struct demux_packet *dp = q2->head;
+
+ // Some weird corner-case. We'd have to search the equivalent
+ // packet in q1 to update it correctly. Better just give up.
+ if (dp == q2->keyframe_latest) {
+ MP_VERBOSE(in, "stream %d: not enough keyframes for join\n", n);
+ goto failed;
+ }
+
+ if ((ds->global_correct_dts && dp->dts == end->dts) ||
+ (ds->global_correct_pos && dp->pos == end->pos))
+ {
+ // Do some additional checks as a (imperfect) sanity check
+ // in case pos/dts are not "correct" across the ranges (we
+ // never actually check that).
+ if (dp->dts != end->dts || dp->pos != end->pos ||
+ dp->pts != end->pts)
+ {
+ MP_WARN(in,
+ "stream %d: non-repeatable demuxer behavior\n", n);
+ goto failed;
+ }
+
+ remove_head_packet(q2);
+ join_point_found = true;
+ break;
+ }
+
+ // This happens if the next range misses the end packet. For
+ // normal streams (ds->eager==true), this is a failure to find
+ // an overlap. For subtitles, this can mean the current_range
+ // has a subtitle somewhere before the end of its range, and
+ // next has another subtitle somewhere after the start of its
+ // range.
+ if ((ds->global_correct_dts && dp->dts > end->dts) ||
+ (ds->global_correct_pos && dp->pos > end->pos))
+ break;
+
+ remove_head_packet(q2);
+ }
+ }
+
+ // For enabled non-sparse streams, always require an overlap packet.
+ if (ds->eager && !join_point_found) {
+ MP_WARN(in, "stream %d: no join point found\n", n);
+ goto failed;
+ }
+ }
+
+ // Actually join the ranges. Now that we think it will work, mutate the
+ // data associated with the current range.
+
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_queue *q1 = current->streams[n];
+ struct demux_queue *q2 = next->streams[n];
+
+ struct demux_stream *ds = in->streams[n]->ds;
+ assert(ds->queue == q1);
+
+ // First new packet that is appended to the current range.
+ struct demux_packet *join_point = q2->head;
+
+ if (q2->head) {
+ if (q1->head) {
+ q1->tail->next = q2->head;
+ } else {
+ q1->head = q2->head;
+ }
+ q1->tail = q2->tail;
+ }
+
+ q1->seek_end = q2->seek_end;
+ q1->correct_dts &= q2->correct_dts;
+ q1->correct_pos &= q2->correct_pos;
+ q1->last_pos = q2->last_pos;
+ q1->last_dts = q2->last_dts;
+ q1->last_ts = q2->last_ts;
+ q1->keyframe_latest = q2->keyframe_latest;
+ q1->is_eof = q2->is_eof;
+
+ q1->last_pos_fixup = -1;
+
+ q2->head = q2->tail = NULL;
+ q2->keyframe_first = NULL;
+ q2->keyframe_latest = NULL;
+
+ if (ds->selected && !ds->reader_head)
+ ds->reader_head = join_point;
+ ds->skip_to_keyframe = false;
+
+ // Make the cum_pos values in all q2 packets continuous.
+ for (struct demux_packet *dp = join_point; dp; dp = dp->next) {
+ uint64_t next_pos = dp->next ? dp->next->cum_pos : q2->tail_cum_pos;
+ uint64_t size = next_pos - dp->cum_pos;
+ dp->cum_pos = q1->tail_cum_pos;
+ q1->tail_cum_pos += size;
+ }
+
+ // And update the index with packets from q2.
+ for (size_t i = 0; i < q2->num_index; i++) {
+ struct index_entry *e = &QUEUE_INDEX_ENTRY(q2, i);
+ add_index_entry(q1, e->pkt, e->pts);
+ }
+ free_index(q2);
+
+ // For moving demuxer position.
+ ds->refreshing = ds->selected;
+ }
+
+ for (int n = 0; n < next->num_metadata; n++) {
+ MP_TARRAY_APPEND(current, current->metadata, current->num_metadata,
+ next->metadata[n]);
+ }
+ next->num_metadata = 0;
+
+ update_seek_ranges(current);
+
+ // Move demuxing position to after the current range.
+ in->seeking = true;
+ in->seek_flags = SEEK_HR;
+ in->seek_pts = next->seek_end - 1.0;
+
+ MP_VERBOSE(in, "ranges joined!\n");
+
+ for (int n = 0; n < in->num_streams; n++)
+ back_demux_see_packets(in->streams[n]->ds);
+
+failed:
+ clear_cached_range(in, next);
+ free_empty_cached_ranges(in);
+}
+
+// Compute the assumed first and last frame timestamp for keyframe range
+// starting at pkt. To get valid results, pkt->keyframe must be true, otherwise
+// nonsense will be returned.
+// Always sets *out_kf_min and *out_kf_max without reading them. Both are set
+// to NOPTS if there are no timestamps at all in the stream. *kf_max will not
+// be set to the actual end time of the decoded output, just the last frame
+// (audio will typically end up with kf_min==kf_max).
+// Either of out_kf_min and out_kf_max can be NULL, which discards the result.
+// Return the next keyframe packet after pkt, or NULL if there's none.
+static struct demux_packet *compute_keyframe_times(struct demux_packet *pkt,
+ double *out_kf_min,
+ double *out_kf_max)
+{
+ struct demux_packet *start = pkt;
+ double min = MP_NOPTS_VALUE;
+ double max = MP_NOPTS_VALUE;
+
+ while (pkt) {
+ if (pkt->keyframe && pkt != start)
+ break;
+
+ double ts = MP_PTS_OR_DEF(pkt->pts, pkt->dts);
+ if (pkt->segmented && ((pkt->start != MP_NOPTS_VALUE && ts < pkt->start) ||
+ (pkt->end != MP_NOPTS_VALUE && ts > pkt->end)))
+ ts = MP_NOPTS_VALUE;
+
+ min = MP_PTS_MIN(min, ts);
+ max = MP_PTS_MAX(max, ts);
+
+ pkt = pkt->next;
+ }
+
+ if (out_kf_min)
+ *out_kf_min = min;
+ if (out_kf_max)
+ *out_kf_max = max;
+ return pkt;
+}
+
+// Determine seekable range when a packet is added. If dp==NULL, treat it as
+// EOF (i.e. closes the current block).
+// This has to deal with a number of corner cases, such as demuxers potentially
+// starting output at non-keyframes.
+// Can join seek ranges, which messes with in->current_range and all.
+static void adjust_seek_range_on_packet(struct demux_stream *ds,
+ struct demux_packet *dp)
+{
+ struct demux_queue *queue = ds->queue;
+
+ if (!ds->in->seekable_cache)
+ return;
+
+ bool new_eof = !dp;
+ bool update_ranges = queue->is_eof != new_eof;
+ queue->is_eof = new_eof;
+
+ if (!dp || dp->keyframe) {
+ if (queue->keyframe_latest) {
+ double kf_min, kf_max;
+ compute_keyframe_times(queue->keyframe_latest, &kf_min, &kf_max);
+
+ if (kf_min != MP_NOPTS_VALUE) {
+ add_index_entry(queue, queue->keyframe_latest, kf_min);
+
+ // Initialize the queue's start if it's unset.
+ if (queue->seek_start == MP_NOPTS_VALUE) {
+ update_ranges = true;
+ queue->seek_start = kf_min + ds->sh->seek_preroll;
+ }
+ }
+
+ if (kf_max != MP_NOPTS_VALUE &&
+ (queue->seek_end == MP_NOPTS_VALUE || kf_max > queue->seek_end))
+ {
+ // If the queue was past the current range's end even before
+ // this update, it means _other_ streams are not there yet,
+ // and the seek range doesn't need to be updated. This means
+ // if the _old_ queue->seek_end was already after the range end,
+ // then the new seek_end won't extend the range either.
+ if (queue->range->seek_end == MP_NOPTS_VALUE ||
+ queue->seek_end <= queue->range->seek_end)
+ {
+ update_ranges = true;
+ }
+
+ queue->seek_end = kf_max;
+ }
+ }
+
+ queue->keyframe_latest = dp;
+ }
+
+ // Adding a sparse packet never changes the seek range.
+ if (update_ranges && ds->eager) {
+ update_seek_ranges(queue->range);
+ attempt_range_joining(ds->in);
+ }
+}
+
+static struct mp_recorder *recorder_create(struct demux_internal *in,
+ const char *dst)
+{
+ struct sh_stream **streams = NULL;
+ int num_streams = 0;
+ for (int n = 0; n < in->num_streams; n++) {
+ struct sh_stream *stream = in->streams[n];
+ if (stream->ds->selected)
+ MP_TARRAY_APPEND(NULL, streams, num_streams, stream);
+ }
+
+ struct demuxer *demuxer = in->d_thread;
+ struct demux_attachment **attachments = talloc_array(NULL, struct demux_attachment*, demuxer->num_attachments);
+ for (int n = 0; n < demuxer->num_attachments; n++) {
+ attachments[n] = &demuxer->attachments[n];
+ }
+
+ struct mp_recorder *res = mp_recorder_create(in->d_thread->global, dst,
+ streams, num_streams,
+ attachments, demuxer->num_attachments);
+ talloc_free(streams);
+ talloc_free(attachments);
+ return res;
+}
+
+static void write_dump_packet(struct demux_internal *in, struct demux_packet *dp)
+{
+ assert(in->dumper);
+ assert(in->dumper_status == CONTROL_TRUE);
+
+ struct mp_recorder_sink *sink =
+ mp_recorder_get_sink(in->dumper, in->streams[dp->stream]);
+ if (sink) {
+ mp_recorder_feed_packet(sink, dp);
+ } else {
+ MP_ERR(in, "New stream appeared; stopping recording.\n");
+ in->dumper_status = CONTROL_ERROR;
+ }
+}
+
+static void record_packet(struct demux_internal *in, struct demux_packet *dp)
+{
+ // (should preferably be outside of the lock)
+ if (in->enable_recording && !in->recorder &&
+ in->d_user->opts->record_file && in->d_user->opts->record_file[0])
+ {
+ // Later failures shouldn't make it retry and overwrite the previously
+ // recorded file.
+ in->enable_recording = false;
+
+ in->recorder = recorder_create(in, in->d_user->opts->record_file);
+ if (!in->recorder)
+ MP_ERR(in, "Disabling recording.\n");
+ }
+
+ if (in->recorder) {
+ struct mp_recorder_sink *sink =
+ mp_recorder_get_sink(in->recorder, in->streams[dp->stream]);
+ if (sink) {
+ mp_recorder_feed_packet(sink, dp);
+ } else {
+ MP_ERR(in, "New stream appeared; stopping recording.\n");
+ mp_recorder_destroy(in->recorder);
+ in->recorder = NULL;
+ }
+ }
+
+ if (in->dumper_status == CONTROL_OK)
+ write_dump_packet(in, dp);
+}
+
+static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp)
+{
+ struct demux_stream *ds = stream ? stream->ds : NULL;
+ assert(ds && ds->in);
+ if (!dp->len || demux_cancel_test(ds->in->d_thread)) {
+ talloc_free(dp);
+ return;
+ }
+
+ assert(dp->stream == stream->index);
+ assert(!dp->next);
+
+ struct demux_internal *in = ds->in;
+
+ in->after_seek = false;
+ in->after_seek_to_start = false;
+
+ double ts = dp->dts == MP_NOPTS_VALUE ? dp->pts : dp->dts;
+ if (dp->segmented)
+ ts = MP_PTS_MIN(ts, dp->end);
+
+ if (ts != MP_NOPTS_VALUE)
+ in->demux_ts = ts;
+
+ struct demux_queue *queue = ds->queue;
+
+ bool drop = !ds->selected || in->seeking || ds->sh->attached_picture;
+
+ if (!drop) {
+ // If libavformat splits packets, some packets will have pos unset, so
+ // make up one based on the first packet => makes refresh seeks work.
+ if ((dp->pos < 0 || dp->pos == queue->last_pos_fixup) &&
+ !dp->keyframe && queue->last_pos_fixup >= 0)
+ dp->pos = queue->last_pos_fixup + 1;
+ queue->last_pos_fixup = dp->pos;
+ }
+
+ if (!drop && ds->refreshing) {
+ // Resume reading once the old position was reached (i.e. we start
+ // returning packets where we left off before the refresh).
+ // If it's the same position, drop, but continue normally next time.
+ if (queue->correct_dts) {
+ ds->refreshing = dp->dts < queue->last_dts;
+ } else if (queue->correct_pos) {
+ ds->refreshing = dp->pos < queue->last_pos;
+ } else {
+ ds->refreshing = false; // should not happen
+ MP_WARN(in, "stream %d: demux refreshing failed\n", ds->index);
+ }
+ drop = true;
+ }
+
+ if (drop) {
+ talloc_free(dp);
+ return;
+ }
+
+ record_packet(in, dp);
+
+ if (in->cache && in->d_user->opts->disk_cache) {
+ int64_t pos = demux_cache_write(in->cache, dp);
+ if (pos >= 0) {
+ demux_packet_unref_contents(dp);
+ dp->is_cached = true;
+ dp->cached_data.pos = pos;
+ }
+ }
+
+ queue->correct_pos &= dp->pos >= 0 && dp->pos > queue->last_pos;
+ queue->correct_dts &= dp->dts != MP_NOPTS_VALUE && dp->dts > queue->last_dts;
+ queue->last_pos = dp->pos;
+ queue->last_dts = dp->dts;
+ ds->global_correct_pos &= queue->correct_pos;
+ ds->global_correct_dts &= queue->correct_dts;
+
+ // (keep in mind that even if the reader went out of data, the queue is not
+ // necessarily empty due to the backbuffer)
+ if (!ds->reader_head && (!ds->skip_to_keyframe || dp->keyframe)) {
+ ds->reader_head = dp;
+ ds->skip_to_keyframe = false;
+ }
+
+ size_t bytes = demux_packet_estimate_total_size(dp);
+ in->total_bytes += bytes;
+ dp->cum_pos = queue->tail_cum_pos;
+ queue->tail_cum_pos += bytes;
+
+ if (queue->tail) {
+ // next packet in stream
+ queue->tail->next = dp;
+ queue->tail = dp;
+ } else {
+ // first packet in stream
+ queue->head = queue->tail = dp;
+ }
+
+ if (!ds->ignore_eof) {
+ // obviously not true anymore
+ ds->eof = false;
+ in->eof = false;
+ }
+
+ // For video, PTS determination is not trivial, but for other media types
+ // distinguishing PTS and DTS is not useful.
+ if (stream->type != STREAM_VIDEO && dp->pts == MP_NOPTS_VALUE)
+ dp->pts = dp->dts;
+
+ if (ts != MP_NOPTS_VALUE && (ts > queue->last_ts || ts + 10 < queue->last_ts))
+ queue->last_ts = ts;
+ if (ds->base_ts == MP_NOPTS_VALUE)
+ ds->base_ts = queue->last_ts;
+
+ const char *num_pkts = queue->head == queue->tail ? "1" : ">1";
+ uint64_t fw_bytes = get_forward_buffered_bytes(ds);
+ MP_TRACE(in, "append packet to %s: size=%zu pts=%f dts=%f pos=%"PRIi64" "
+ "[num=%s size=%zd]\n", stream_type_name(stream->type),
+ dp->len, dp->pts, dp->dts, dp->pos, num_pkts, (size_t)fw_bytes);
+
+ adjust_seek_range_on_packet(ds, dp);
+
+ // May need to reduce backward cache.
+ prune_old_packets(in);
+
+ // Possibly update duration based on highest TS demuxed (but ignore subs).
+ if (stream->type != STREAM_SUB) {
+ if (dp->segmented)
+ ts = MP_PTS_MIN(ts, dp->end);
+ if (ts > in->highest_av_pts) {
+ in->highest_av_pts = ts;
+ double duration = in->highest_av_pts - in->d_thread->start_time;
+ if (duration > in->d_thread->duration) {
+ in->d_thread->duration = duration;
+ // (Don't wakeup user thread, would be too noisy.)
+ in->events |= DEMUX_EVENT_DURATION;
+ in->duration = duration;
+ }
+ }
+ }
+
+ // Don't process the packet further if it's skipped by the previous seek
+ // (see reader_head check/assignment above).
+ if (!ds->reader_head)
+ return;
+
+ back_demux_see_packets(ds);
+
+ wakeup_ds(ds);
+}
+
+static void mark_stream_eof(struct demux_stream *ds)
+{
+ if (!ds->eof) {
+ ds->eof = true;
+ adjust_seek_range_on_packet(ds, NULL);
+ back_demux_see_packets(ds);
+ wakeup_ds(ds);
+ }
+}
+
+static bool lazy_stream_needs_wait(struct demux_stream *ds)
+{
+ struct demux_internal *in = ds->in;
+ // Attempt to read until force_read_until was reached, or reading has
+ // stopped for some reason (true EOF, queue overflow).
+ return !ds->eager && !in->back_demuxing &&
+ !in->eof && ds->force_read_until != MP_NOPTS_VALUE &&
+ (in->demux_ts == MP_NOPTS_VALUE ||
+ in->demux_ts <= ds->force_read_until);
+}
+
+// Returns true if there was "progress" (lock was released temporarily).
+static bool read_packet(struct demux_internal *in)
+{
+ bool was_reading = in->reading;
+ in->reading = false;
+
+ if (!was_reading || in->blocked || demux_cancel_test(in->d_thread))
+ return false;
+
+ // Check if we need to read a new packet. We do this if all queues are below
+ // the minimum, or if a stream explicitly needs new packets. Also includes
+ // safe-guards against packet queue overflow.
+ bool read_more = false, prefetch_more = false, refresh_more = false;
+ uint64_t total_fw_bytes = 0;
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+ if (ds->eager) {
+ read_more |= !ds->reader_head;
+ if (in->back_demuxing)
+ read_more |= ds->back_restarting || ds->back_resuming;
+ } else {
+ if (lazy_stream_needs_wait(ds)) {
+ read_more = true;
+ } else {
+ mark_stream_eof(ds); // let playback continue
+ }
+ }
+ refresh_more |= ds->refreshing;
+ if (ds->eager && ds->queue->last_ts != MP_NOPTS_VALUE &&
+ in->min_secs > 0 && ds->base_ts != MP_NOPTS_VALUE &&
+ ds->queue->last_ts >= ds->base_ts &&
+ !in->back_demuxing)
+ {
+ if (ds->queue->last_ts - ds->base_ts <= in->hyst_secs)
+ in->hyst_active = false;
+ if (!in->hyst_active)
+ prefetch_more |= ds->queue->last_ts - ds->base_ts < in->min_secs;
+ }
+ total_fw_bytes += get_forward_buffered_bytes(ds);
+ }
+
+ MP_TRACE(in, "bytes=%zd, read_more=%d prefetch_more=%d, refresh_more=%d\n",
+ (size_t)total_fw_bytes, read_more, prefetch_more, refresh_more);
+ if (total_fw_bytes >= in->max_bytes) {
+ // if we hit the limit just by prefetching, simply stop prefetching
+ if (!read_more) {
+ in->hyst_active = !!in->hyst_secs;
+ return false;
+ }
+ if (!in->warned_queue_overflow) {
+ in->warned_queue_overflow = true;
+ MP_WARN(in, "Too many packets in the demuxer packet queues:\n");
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+ if (ds->selected) {
+ size_t num_pkts = 0;
+ for (struct demux_packet *dp = ds->reader_head;
+ dp; dp = dp->next)
+ num_pkts++;
+ uint64_t fw_bytes = get_forward_buffered_bytes(ds);
+ MP_WARN(in, " %s/%d: %zd packets, %zd bytes%s%s\n",
+ stream_type_name(ds->type), n,
+ num_pkts, (size_t)fw_bytes,
+ ds->eager ? "" : " (lazy)",
+ ds->refreshing ? " (refreshing)" : "");
+ }
+ }
+ if (in->back_demuxing)
+ MP_ERR(in, "Backward playback is likely stuck/broken now.\n");
+ }
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+ if (!ds->reader_head)
+ mark_stream_eof(ds);
+ }
+ return false;
+ }
+
+ if (!read_more && !prefetch_more && !refresh_more) {
+ in->hyst_active = !!in->hyst_secs;
+ return false;
+ }
+
+ if (in->after_seek_to_start) {
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+ in->current_range->streams[n]->is_bof =
+ ds->selected && !ds->refreshing;
+ }
+ }
+
+ // Actually read a packet. Drop the lock while doing so, because waiting
+ // for disk or network I/O can take time.
+ in->reading = true;
+ in->after_seek = false;
+ in->after_seek_to_start = false;
+ mp_mutex_unlock(&in->lock);
+
+ struct demuxer *demux = in->d_thread;
+ struct demux_packet *pkt = NULL;
+
+ bool eof = true;
+ if (demux->desc->read_packet && !demux_cancel_test(demux))
+ eof = !demux->desc->read_packet(demux, &pkt);
+
+ mp_mutex_lock(&in->lock);
+ update_cache(in);
+
+ if (pkt) {
+ assert(pkt->stream >= 0 && pkt->stream < in->num_streams);
+ add_packet_locked(in->streams[pkt->stream], pkt);
+ }
+
+ if (!in->seeking) {
+ if (eof) {
+ for (int n = 0; n < in->num_streams; n++)
+ mark_stream_eof(in->streams[n]->ds);
+ // If we had EOF previously, then don't wakeup (avoids wakeup loop)
+ if (!in->eof) {
+ if (in->wakeup_cb)
+ in->wakeup_cb(in->wakeup_cb_ctx);
+ mp_cond_signal(&in->wakeup);
+ MP_VERBOSE(in, "EOF reached.\n");
+ }
+ }
+ in->eof = eof;
+ in->reading = !eof;
+ }
+ return true;
+}
+
+static void prune_old_packets(struct demux_internal *in)
+{
+ assert(in->current_range == in->ranges[in->num_ranges - 1]);
+
+ // It's not clear what the ideal way to prune old packets is. For now, we
+ // prune the oldest packet runs, as long as the total cache amount is too
+ // big.
+ while (1) {
+ uint64_t fw_bytes = 0;
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+ fw_bytes += get_forward_buffered_bytes(ds);
+ }
+ uint64_t max_avail = in->max_bytes_bw;
+ // Backward cache (if enabled at all) can use unused forward cache.
+ // Still leave 1 byte free, so the read_packet logic doesn't get stuck.
+ if (max_avail && in->max_bytes > (fw_bytes + 1) && in->d_user->opts->donate_fw)
+ max_avail += in->max_bytes - (fw_bytes + 1);
+ if (in->total_bytes - fw_bytes <= max_avail)
+ break;
+
+ // (Start from least recently used range.)
+ struct demux_cached_range *range = in->ranges[0];
+ double earliest_ts = MP_NOPTS_VALUE;
+ struct demux_stream *earliest_stream = NULL;
+
+ for (int n = 0; n < range->num_streams; n++) {
+ struct demux_queue *queue = range->streams[n];
+ struct demux_stream *ds = queue->ds;
+
+ if (queue->head && queue->head != ds->reader_head) {
+ struct demux_packet *dp = queue->head;
+ double ts = queue->seek_start;
+ // If the ts is NOPTS, the queue has no retainable packets, so
+ // delete them all. This code is not run when there's enough
+ // free space, so normally the queue gets the chance to build up.
+ bool prune_always =
+ !in->seekable_cache || ts == MP_NOPTS_VALUE || !dp->keyframe;
+ if (prune_always || !earliest_stream || ts < earliest_ts) {
+ earliest_ts = ts;
+ earliest_stream = ds;
+ if (prune_always)
+ break;
+ }
+ }
+ }
+
+ // In some cases (like when the seek index became huge), there aren't
+ // any backwards packets, even if the total cache size is exceeded.
+ if (!earliest_stream)
+ break;
+
+ struct demux_stream *ds = earliest_stream;
+ struct demux_queue *queue = range->streams[ds->index];
+
+ bool non_kf_prune = queue->head && !queue->head->keyframe;
+ bool kf_was_pruned = false;
+
+ while (queue->head && queue->head != ds->reader_head) {
+ if (queue->head->keyframe) {
+ // If the cache is seekable, only delete until up the next
+ // keyframe. This is not always efficient, but ensures we
+ // prune all streams fairly.
+ // Also, if the first packet was _not_ a keyframe, we want it
+ // to remove all preceding non-keyframe packets first, before
+ // re-evaluating what to prune next.
+ if ((kf_was_pruned || non_kf_prune) && in->seekable_cache)
+ break;
+ kf_was_pruned = true;
+ }
+
+ remove_head_packet(queue);
+ }
+
+ // Need to update the seekable time range.
+ if (kf_was_pruned) {
+ assert(!queue->keyframe_first); // it was just deleted, supposedly
+
+ queue->keyframe_first = queue->head;
+ // (May happen if reader_head stopped pruning the range, and there's
+ // no next range.)
+ while (queue->keyframe_first && !queue->keyframe_first->keyframe)
+ queue->keyframe_first = queue->keyframe_first->next;
+
+ if (queue->seek_start != MP_NOPTS_VALUE)
+ queue->last_pruned = queue->seek_start;
+
+ double kf_min;
+ compute_keyframe_times(queue->keyframe_first, &kf_min, NULL);
+
+ bool update_range = true;
+
+ queue->seek_start = kf_min;
+
+ if (queue->seek_start != MP_NOPTS_VALUE) {
+ queue->seek_start += ds->sh->seek_preroll;
+
+ // Don't need to update if the new start is still before the
+ // range's start (or if the range was undefined anyway).
+ if (range->seek_start == MP_NOPTS_VALUE ||
+ queue->seek_start <= range->seek_start)
+ {
+ update_range = false;
+ }
+ }
+
+ if (update_range)
+ update_seek_ranges(range);
+ }
+
+ if (range != in->current_range && range->seek_start == MP_NOPTS_VALUE)
+ free_empty_cached_ranges(in);
+ }
+}
+
+static void execute_trackswitch(struct demux_internal *in)
+{
+ in->tracks_switched = false;
+
+ mp_mutex_unlock(&in->lock);
+
+ if (in->d_thread->desc->switched_tracks)
+ in->d_thread->desc->switched_tracks(in->d_thread);
+
+ mp_mutex_lock(&in->lock);
+}
+
+static void execute_seek(struct demux_internal *in)
+{
+ int flags = in->seek_flags;
+ double pts = in->seek_pts;
+ in->eof = false;
+ in->seeking = false;
+ in->seeking_in_progress = pts;
+ in->demux_ts = MP_NOPTS_VALUE;
+ in->low_level_seeks += 1;
+ in->after_seek = true;
+ in->after_seek_to_start =
+ !(flags & (SEEK_FORWARD | SEEK_FACTOR)) &&
+ pts <= in->d_thread->start_time;
+
+ for (int n = 0; n < in->num_streams; n++)
+ in->streams[n]->ds->queue->last_pos_fixup = -1;
+
+ if (in->recorder)
+ mp_recorder_mark_discontinuity(in->recorder);
+
+ mp_mutex_unlock(&in->lock);
+
+ MP_VERBOSE(in, "execute seek (to %f flags %d)\n", pts, flags);
+
+ if (in->d_thread->desc->seek)
+ in->d_thread->desc->seek(in->d_thread, pts, flags);
+
+ MP_VERBOSE(in, "seek done\n");
+
+ mp_mutex_lock(&in->lock);
+
+ in->seeking_in_progress = MP_NOPTS_VALUE;
+}
+
+static void update_opts(struct demuxer *demuxer)
+{
+ struct demux_opts *opts = demuxer->opts;
+ struct demux_internal *in = demuxer->in;
+
+ in->min_secs = opts->min_secs;
+ in->hyst_secs = opts->hyst_secs;
+ in->max_bytes = opts->max_bytes;
+ in->max_bytes_bw = opts->max_bytes_bw;
+
+ int seekable = opts->seekable_cache;
+ bool is_streaming = in->d_thread->is_streaming;
+ bool use_cache = is_streaming;
+ if (opts->enable_cache >= 0)
+ use_cache = opts->enable_cache == 1;
+
+ if (use_cache) {
+ in->min_secs = MPMAX(in->min_secs, opts->min_secs_cache);
+ if (seekable < 0)
+ seekable = 1;
+ }
+ in->seekable_cache = seekable == 1;
+ in->using_network_cache_opts = is_streaming && use_cache;
+
+ if (!in->seekable_cache)
+ in->max_bytes_bw = 0;
+
+ if (!in->can_cache) {
+ in->seekable_cache = false;
+ in->min_secs = 0;
+ in->max_bytes = 1;
+ in->max_bytes_bw = 0;
+ in->using_network_cache_opts = false;
+ }
+
+ if (in->seekable_cache && opts->disk_cache && !in->cache) {
+ in->cache = demux_cache_create(in->global, in->log);
+ if (!in->cache)
+ MP_ERR(in, "Failed to create file cache.\n");
+ }
+
+ // The filename option really decides whether recording should be active.
+ // So if the filename changes, act upon it.
+ char *old = in->record_filename ? in->record_filename : "";
+ char *new = opts->record_file ? opts->record_file : "";
+ if (strcmp(old, new) != 0) {
+ if (in->recorder) {
+ MP_WARN(in, "Stopping recording.\n");
+ mp_recorder_destroy(in->recorder);
+ in->recorder = NULL;
+ }
+ talloc_free(in->record_filename);
+ in->record_filename = talloc_strdup(in, opts->record_file);
+ // Note: actual recording only starts once packets are read. It may be
+ // important to delay creating in->recorder to that point, because the
+ // demuxer might detect more streams until finding the first packet.
+ in->enable_recording = in->can_record;
+ }
+
+ // In case the cache was reduced in size.
+ prune_old_packets(in);
+
+ // In case the seekable cache was disabled.
+ free_empty_cached_ranges(in);
+}
+
+// Make demuxing progress. Return whether progress was made.
+static bool thread_work(struct demux_internal *in)
+{
+ if (m_config_cache_update(in->d_user->opts_cache))
+ update_opts(in->d_user);
+ if (in->tracks_switched) {
+ execute_trackswitch(in);
+ return true;
+ }
+ if (in->need_back_seek) {
+ perform_backward_seek(in);
+ return true;
+ }
+ if (in->back_any_need_recheck) {
+ check_backward_seek(in);
+ return true;
+ }
+ if (in->seeking) {
+ execute_seek(in);
+ return true;
+ }
+ if (read_packet(in))
+ return true; // read_packet unlocked, so recheck conditions
+ if (mp_time_ns() >= in->next_cache_update) {
+ update_cache(in);
+ return true;
+ }
+ return false;
+}
+
+static MP_THREAD_VOID demux_thread(void *pctx)
+{
+ struct demux_internal *in = pctx;
+ mp_thread_set_name("demux");
+ mp_mutex_lock(&in->lock);
+
+ stats_register_thread_cputime(in->stats, "thread");
+
+ while (!in->thread_terminate) {
+ if (thread_work(in))
+ continue;
+ mp_cond_signal(&in->wakeup);
+ mp_cond_timedwait_until(&in->wakeup, &in->lock, in->next_cache_update);
+ }
+
+ if (in->shutdown_async) {
+ mp_mutex_unlock(&in->lock);
+ demux_shutdown(in);
+ mp_mutex_lock(&in->lock);
+ in->shutdown_async = false;
+ if (in->wakeup_cb)
+ in->wakeup_cb(in->wakeup_cb_ctx);
+ }
+
+ stats_unregister_thread(in->stats, "thread");
+
+ mp_mutex_unlock(&in->lock);
+ MP_THREAD_RETURN();
+}
+
+// Low-level part of dequeueing a packet.
+static struct demux_packet *advance_reader_head(struct demux_stream *ds)
+{
+ struct demux_packet *pkt = ds->reader_head;
+ if (!pkt)
+ return NULL;
+
+ ds->reader_head = pkt->next;
+
+ ds->last_ret_pos = pkt->pos;
+ ds->last_ret_dts = pkt->dts;
+
+ return pkt;
+}
+
+// Return a newly allocated new packet. The pkt parameter may be either a
+// in-memory packet (then a new reference is made), or a reference to
+// packet in the disk cache (then the packet is read from disk).
+static struct demux_packet *read_packet_from_cache(struct demux_internal *in,
+ struct demux_packet *pkt)
+{
+ if (!pkt)
+ return NULL;
+
+ if (pkt->is_cached) {
+ assert(in->cache);
+ struct demux_packet *meta = pkt;
+ pkt = demux_cache_read(in->cache, pkt->cached_data.pos);
+ if (pkt) {
+ demux_packet_copy_attribs(pkt, meta);
+ } else {
+ MP_ERR(in, "Failed to retrieve packet from cache.\n");
+ }
+ } else {
+ // The returned packet is mutated etc. and will be owned by the user.
+ pkt = demux_copy_packet(pkt);
+ }
+
+ return pkt;
+}
+
+// Returns:
+// < 0: EOF was reached, *res is not set
+// == 0: no new packet yet, wait, *res is not set
+// > 0: new packet is moved to *res
+static int dequeue_packet(struct demux_stream *ds, double min_pts,
+ struct demux_packet **res)
+{
+ struct demux_internal *in = ds->in;
+
+ if (!ds->selected)
+ return -1;
+ if (in->blocked)
+ return 0;
+
+ if (ds->sh->attached_picture) {
+ ds->eof = true;
+ if (ds->attached_picture_added)
+ return -1;
+ ds->attached_picture_added = true;
+ struct demux_packet *pkt = demux_copy_packet(ds->sh->attached_picture);
+ MP_HANDLE_OOM(pkt);
+ pkt->stream = ds->sh->index;
+ *res = pkt;
+ return 1;
+ }
+
+ if (!in->reading && !in->eof) {
+ in->reading = true; // enable demuxer thread prefetching
+ mp_cond_signal(&in->wakeup);
+ }
+
+ ds->force_read_until = min_pts;
+
+ if (ds->back_resuming || ds->back_restarting) {
+ assert(in->back_demuxing);
+ return 0;
+ }
+
+ bool eof = !ds->reader_head && ds->eof;
+
+ if (in->back_demuxing) {
+ // Subtitles not supported => EOF.
+ if (!ds->eager)
+ return -1;
+
+ // Next keyframe (or EOF) was reached => step back.
+ if (ds->back_range_started && !ds->back_range_count &&
+ ((ds->reader_head && ds->reader_head->keyframe) || eof))
+ {
+ ds->back_restarting = true;
+ ds->back_restart_eof = false;
+ ds->back_restart_next = false;
+
+ find_backward_restart_pos(ds);
+
+ if (ds->back_restarting)
+ return 0;
+ }
+
+ eof = ds->back_range_count < 0;
+ }
+
+ ds->need_wakeup = !ds->reader_head;
+ if (!ds->reader_head || eof) {
+ if (!ds->eager) {
+ // Non-eager streams temporarily return EOF. If they returned 0,
+ // the reader would have to wait for new packets, which does not
+ // make sense due to the sparseness and passiveness of non-eager
+ // streams.
+ // Unless the min_pts feature is used: then EOF is only signaled
+ // if read-ahead went above min_pts.
+ if (!lazy_stream_needs_wait(ds))
+ ds->eof = eof = true;
+ }
+ return eof ? -1 : 0;
+ }
+
+ struct demux_packet *pkt = advance_reader_head(ds);
+ assert(pkt);
+ pkt = read_packet_from_cache(in, pkt);
+ if (!pkt)
+ return 0;
+
+ if (in->back_demuxing) {
+ if (pkt->keyframe) {
+ assert(ds->back_range_count > 0);
+ ds->back_range_count -= 1;
+ if (ds->back_range_preroll >= 0)
+ ds->back_range_preroll -= 1;
+ }
+
+ if (ds->back_range_preroll >= 0)
+ pkt->back_preroll = true;
+
+ if (!ds->back_range_started) {
+ pkt->back_restart = true;
+ ds->back_range_started = true;
+ }
+ }
+
+ double ts = MP_PTS_OR_DEF(pkt->dts, pkt->pts);
+ if (ts != MP_NOPTS_VALUE)
+ ds->base_ts = ts;
+
+ if (pkt->keyframe && ts != MP_NOPTS_VALUE) {
+ // Update bitrate - only at keyframe points, because we use the
+ // (possibly) reordered packet timestamps instead of realtime.
+ double d = ts - ds->last_br_ts;
+ if (ds->last_br_ts == MP_NOPTS_VALUE || d < 0) {
+ ds->bitrate = -1;
+ ds->last_br_ts = ts;
+ ds->last_br_bytes = 0;
+ } else if (d >= 0.5) { // a window of least 500ms for UI purposes
+ ds->bitrate = ds->last_br_bytes / d;
+ ds->last_br_ts = ts;
+ ds->last_br_bytes = 0;
+ }
+ }
+ ds->last_br_bytes += pkt->len;
+
+ // This implies this function is actually called from "the" user thread.
+ if (pkt->pos >= in->d_user->filepos)
+ in->d_user->filepos = pkt->pos;
+ in->d_user->filesize = in->stream_size;
+
+ pkt->pts = MP_ADD_PTS(pkt->pts, in->ts_offset);
+ pkt->dts = MP_ADD_PTS(pkt->dts, in->ts_offset);
+
+ if (pkt->segmented) {
+ pkt->start = MP_ADD_PTS(pkt->start, in->ts_offset);
+ pkt->end = MP_ADD_PTS(pkt->end, in->ts_offset);
+ }
+
+ prune_old_packets(in);
+ *res = pkt;
+ return 1;
+}
+
+// Poll the demuxer queue, and if there's a packet, return it. Otherwise, just
+// make the demuxer thread read packets for this stream, and if there's at
+// least one packet, call the wakeup callback.
+// This enables readahead if it wasn't yet (except for interleaved subtitles).
+// Returns:
+// < 0: EOF was reached, *out_pkt=NULL
+// == 0: no new packet yet, but maybe later, *out_pkt=NULL
+// > 0: new packet read, *out_pkt is set
+// Note: when reading interleaved subtitles, the demuxer won't try to forcibly
+// read ahead to get the next subtitle packet (as the next packet could be
+// minutes away). In this situation, this function will just return -1.
+int demux_read_packet_async(struct sh_stream *sh, struct demux_packet **out_pkt)
+{
+ return demux_read_packet_async_until(sh, MP_NOPTS_VALUE, out_pkt);
+}
+
+// Like demux_read_packet_async(). They are the same for min_pts==MP_NOPTS_VALUE.
+// If min_pts is set, and the stream is lazily read (eager=false, interleaved
+// subtitles), then return 0 until demuxing has reached min_pts, or the queue
+// overflowed, or EOF was reached, or a packet was read for this stream.
+int demux_read_packet_async_until(struct sh_stream *sh, double min_pts,
+ struct demux_packet **out_pkt)
+{
+ struct demux_stream *ds = sh ? sh->ds : NULL;
+ *out_pkt = NULL;
+ if (!ds)
+ return -1;
+ struct demux_internal *in = ds->in;
+
+ mp_mutex_lock(&in->lock);
+ int r = -1;
+ while (1) {
+ r = dequeue_packet(ds, min_pts, out_pkt);
+ if (in->threading || in->blocked || r != 0)
+ break;
+ // Needs to actually read packets until we got a packet or EOF.
+ thread_work(in);
+ }
+ mp_mutex_unlock(&in->lock);
+ return r;
+}
+
+// Read and return any packet we find. NULL means EOF.
+// Does not work with threading (don't call demux_start_thread()).
+struct demux_packet *demux_read_any_packet(struct demuxer *demuxer)
+{
+ struct demux_internal *in = demuxer->in;
+ mp_mutex_lock(&in->lock);
+ assert(!in->threading); // doesn't work with threading
+ struct demux_packet *out_pkt = NULL;
+ bool read_more = true;
+ while (read_more && !in->blocked) {
+ bool all_eof = true;
+ for (int n = 0; n < in->num_streams; n++) {
+ int r = dequeue_packet(in->streams[n]->ds, MP_NOPTS_VALUE, &out_pkt);
+ if (r > 0)
+ goto done;
+ if (r == 0)
+ all_eof = false;
+ }
+ // retry after calling this
+ read_more = thread_work(in);
+ read_more &= !all_eof;
+ }
+done:
+ mp_mutex_unlock(&in->lock);
+ return out_pkt;
+}
+
+int demuxer_help(struct mp_log *log, const m_option_t *opt, struct bstr name)
+{
+ int i;
+
+ mp_info(log, "Available demuxers:\n");
+ mp_info(log, " demuxer: info:\n");
+ for (i = 0; demuxer_list[i]; i++) {
+ mp_info(log, "%10s %s\n",
+ demuxer_list[i]->name, demuxer_list[i]->desc);
+ }
+ mp_info(log, "\n");
+
+ return M_OPT_EXIT;
+}
+
+static const char *d_level(enum demux_check level)
+{
+ switch (level) {
+ case DEMUX_CHECK_FORCE: return "force";
+ case DEMUX_CHECK_UNSAFE: return "unsafe";
+ case DEMUX_CHECK_REQUEST:return "request";
+ case DEMUX_CHECK_NORMAL: return "normal";
+ }
+ MP_ASSERT_UNREACHABLE();
+}
+
+static int decode_float(char *str, float *out)
+{
+ char *rest;
+ float dec_val;
+
+ dec_val = strtod(str, &rest);
+ if (!rest || (rest == str) || !isfinite(dec_val))
+ return -1;
+
+ *out = dec_val;
+ return 0;
+}
+
+static int decode_gain(struct mp_log *log, struct mp_tags *tags,
+ const char *tag, float *out)
+{
+ char *tag_val = NULL;
+ float dec_val;
+
+ tag_val = mp_tags_get_str(tags, tag);
+ if (!tag_val)
+ return -1;
+
+ if (decode_float(tag_val, &dec_val) < 0) {
+ mp_msg(log, MSGL_ERR, "Invalid replaygain value\n");
+ return -1;
+ }
+
+ *out = dec_val;
+ return 0;
+}
+
+static int decode_peak(struct mp_log *log, struct mp_tags *tags,
+ const char *tag, float *out)
+{
+ char *tag_val = NULL;
+ float dec_val;
+
+ *out = 1.0;
+
+ tag_val = mp_tags_get_str(tags, tag);
+ if (!tag_val)
+ return 0;
+
+ if (decode_float(tag_val, &dec_val) < 0 || dec_val <= 0.0)
+ return -1;
+
+ *out = dec_val;
+ return 0;
+}
+
+static struct replaygain_data *decode_rgain(struct mp_log *log,
+ struct mp_tags *tags)
+{
+ struct replaygain_data rg = {0};
+
+ // Set values in *rg, using track gain as a fallback for album gain if the
+ // latter is not present. This behavior matches that in demux/demux_lavf.c's
+ // export_replaygain; if you change this, please make equivalent changes
+ // there too.
+ if (decode_gain(log, tags, "REPLAYGAIN_TRACK_GAIN", &rg.track_gain) >= 0 &&
+ decode_peak(log, tags, "REPLAYGAIN_TRACK_PEAK", &rg.track_peak) >= 0)
+ {
+ if (decode_gain(log, tags, "REPLAYGAIN_ALBUM_GAIN", &rg.album_gain) < 0 ||
+ decode_peak(log, tags, "REPLAYGAIN_ALBUM_PEAK", &rg.album_peak) < 0)
+ {
+ // Album gain is undefined; fall back to track gain.
+ rg.album_gain = rg.track_gain;
+ rg.album_peak = rg.track_peak;
+ }
+ return talloc_dup(NULL, &rg);
+ }
+
+ if (decode_gain(log, tags, "REPLAYGAIN_GAIN", &rg.track_gain) >= 0 &&
+ decode_peak(log, tags, "REPLAYGAIN_PEAK", &rg.track_peak) >= 0)
+ {
+ rg.album_gain = rg.track_gain;
+ rg.album_peak = rg.track_peak;
+ return talloc_dup(NULL, &rg);
+ }
+
+ // The r128 replaygain tags declared in RFC 7845 for opus files. The tags
+ // are generated with EBU-R128, which does not use peak meters. And the
+ // values are stored as a Q7.8 fixed point number in dB.
+ if (decode_gain(log, tags, "R128_TRACK_GAIN", &rg.track_gain) >= 0) {
+ if (decode_gain(log, tags, "R128_ALBUM_GAIN", &rg.album_gain) < 0) {
+ // Album gain is undefined; fall back to track gain.
+ rg.album_gain = rg.track_gain;
+ }
+ rg.track_gain /= 256.;
+ rg.album_gain /= 256.;
+
+ // Add 5dB to compensate for the different reference levels between
+ // our reference of ReplayGain 2 (-18 LUFS) and EBU R128 (-23 LUFS).
+ rg.track_gain += 5.;
+ rg.album_gain += 5.;
+ return talloc_dup(NULL, &rg);
+ }
+
+ return NULL;
+}
+
+static void demux_update_replaygain(demuxer_t *demuxer)
+{
+ struct demux_internal *in = demuxer->in;
+ for (int n = 0; n < in->num_streams; n++) {
+ struct sh_stream *sh = in->streams[n];
+ if (sh->type == STREAM_AUDIO && !sh->codec->replaygain_data) {
+ struct replaygain_data *rg = decode_rgain(demuxer->log, sh->tags);
+ if (!rg)
+ rg = decode_rgain(demuxer->log, demuxer->metadata);
+ if (rg)
+ sh->codec->replaygain_data = talloc_steal(in, rg);
+ }
+ }
+}
+
+// Copy some fields from src to dst (for initialization).
+static void demux_copy(struct demuxer *dst, struct demuxer *src)
+{
+ // Note that we do as shallow copies as possible. We expect the data
+ // that is not-copied (only referenced) to be immutable.
+ // This implies e.g. that no chapters are added after initialization.
+ dst->chapters = src->chapters;
+ dst->num_chapters = src->num_chapters;
+ dst->editions = src->editions;
+ dst->num_editions = src->num_editions;
+ dst->edition = src->edition;
+ dst->attachments = src->attachments;
+ dst->num_attachments = src->num_attachments;
+ dst->matroska_data = src->matroska_data;
+ dst->playlist = src->playlist;
+ dst->seekable = src->seekable;
+ dst->partially_seekable = src->partially_seekable;
+ dst->filetype = src->filetype;
+ dst->ts_resets_possible = src->ts_resets_possible;
+ dst->fully_read = src->fully_read;
+ dst->start_time = src->start_time;
+ dst->duration = src->duration;
+ dst->is_network = src->is_network;
+ dst->is_streaming = src->is_streaming;
+ dst->stream_origin = src->stream_origin;
+ dst->priv = src->priv;
+ dst->metadata = mp_tags_dup(dst, src->metadata);
+}
+
+// Update metadata after initialization. If sh==NULL, it's global metadata,
+// otherwise it's bound to the stream. If pts==NOPTS, use the highest known pts
+// in the stream. Caller retains ownership of tags ptr. Called locked.
+static void add_timed_metadata(struct demux_internal *in, struct mp_tags *tags,
+ struct sh_stream *sh, double pts)
+{
+ struct demux_cached_range *r = in->current_range;
+ if (!r)
+ return;
+
+ // We don't expect this, nor do we find it useful.
+ if (sh && sh != in->metadata_stream)
+ return;
+
+ if (pts == MP_NOPTS_VALUE) {
+ for (int n = 0; n < r->num_streams; n++)
+ pts = MP_PTS_MAX(pts, r->streams[n]->last_ts);
+
+ // Tends to happen when doing the initial icy update.
+ if (pts == MP_NOPTS_VALUE)
+ pts = in->d_thread->start_time;
+ }
+
+ struct timed_metadata *tm = talloc_zero(NULL, struct timed_metadata);
+ *tm = (struct timed_metadata){
+ .pts = pts,
+ .tags = mp_tags_dup(tm, tags),
+ .from_stream = !!sh,
+ };
+ MP_TARRAY_APPEND(r, r->metadata, r->num_metadata, tm);
+}
+
+// This is called by demuxer implementations if sh->tags changed. Note that
+// sh->tags itself is never actually changed (it's immutable, because sh->tags
+// can be accessed by the playback thread, and there is no synchronization).
+// pts is the time at/after which the metadata becomes effective. You're
+// supposed to call this ordered by time, and only while a packet is being
+// read.
+// Ownership of tags goes to the function.
+void demux_stream_tags_changed(struct demuxer *demuxer, struct sh_stream *sh,
+ struct mp_tags *tags, double pts)
+{
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_thread);
+ struct demux_stream *ds = sh ? sh->ds : NULL;
+ assert(!sh || ds); // stream must have been added
+
+ mp_mutex_lock(&in->lock);
+
+ if (pts == MP_NOPTS_VALUE) {
+ MP_WARN(in, "Discarding timed metadata without timestamp.\n");
+ } else {
+ add_timed_metadata(in, tags, sh, pts);
+ }
+ talloc_free(tags);
+
+ mp_mutex_unlock(&in->lock);
+}
+
+// This is called by demuxer implementations if demuxer->metadata changed.
+// (It will be propagated to the user as timed metadata.)
+void demux_metadata_changed(demuxer_t *demuxer)
+{
+ assert(demuxer == demuxer->in->d_thread); // call from demuxer impl. only
+ struct demux_internal *in = demuxer->in;
+
+ mp_mutex_lock(&in->lock);
+ add_timed_metadata(in, demuxer->metadata, NULL, MP_NOPTS_VALUE);
+ mp_mutex_unlock(&in->lock);
+}
+
+// Called locked, with user demuxer.
+static void update_final_metadata(demuxer_t *demuxer, struct timed_metadata *tm)
+{
+ assert(demuxer == demuxer->in->d_user);
+ struct demux_internal *in = demuxer->in;
+
+ struct mp_tags *dyn_tags = NULL;
+
+ // Often useful for audio-only files, which have metadata in the audio track
+ // metadata instead of the main metadata, but can also have cover art
+ // metadata (which libavformat likes to treat as video streams).
+ int astreams = 0;
+ int astream_id = -1;
+ int vstreams = 0;
+ for (int n = 0; n < in->num_streams; n++) {
+ struct sh_stream *sh = in->streams[n];
+ if (sh->type == STREAM_VIDEO && !sh->attached_picture)
+ vstreams += 1;
+ if (sh->type == STREAM_AUDIO) {
+ astreams += 1;
+ astream_id = n;
+ }
+ }
+
+ // Use the metadata_stream tags only if this really seems to be an audio-
+ // only stream. Otherwise it will happen too often that "uninteresting"
+ // stream metadata will trash the actual file tags.
+ if (vstreams == 0 && astreams == 1 &&
+ in->streams[astream_id] == in->metadata_stream)
+ {
+ dyn_tags = in->metadata_stream->tags;
+ if (tm && tm->from_stream)
+ dyn_tags = tm->tags;
+ }
+
+ // Global metadata updates.
+ if (tm && !tm->from_stream)
+ dyn_tags = tm->tags;
+
+ if (dyn_tags)
+ mp_tags_merge(demuxer->metadata, dyn_tags);
+}
+
+static struct timed_metadata *lookup_timed_metadata(struct demux_internal *in,
+ double pts)
+{
+ struct demux_cached_range *r = in->current_range;
+
+ if (!r || !r->num_metadata || pts == MP_NOPTS_VALUE)
+ return NULL;
+
+ int start = 1;
+ int i = in->cached_metadata_index;
+ if (i >= 0 && i < r->num_metadata && r->metadata[i]->pts <= pts)
+ start = i + 1;
+
+ in->cached_metadata_index = r->num_metadata - 1;
+ for (int n = start; n < r->num_metadata; n++) {
+ if (r->metadata[n]->pts >= pts) {
+ in->cached_metadata_index = n - 1;
+ break;
+ }
+ }
+
+ return r->metadata[in->cached_metadata_index];
+}
+
+// Called by the user thread (i.e. player) to update metadata and other things
+// from the demuxer thread.
+// The pts parameter is the current playback position.
+void demux_update(demuxer_t *demuxer, double pts)
+{
+ assert(demuxer == demuxer->in->d_user);
+ struct demux_internal *in = demuxer->in;
+
+ mp_mutex_lock(&in->lock);
+
+ if (!in->threading)
+ update_cache(in);
+
+ // This implies this function is actually called from "the" user thread.
+ in->d_user->filesize = in->stream_size;
+
+ pts = MP_ADD_PTS(pts, -in->ts_offset);
+
+ struct timed_metadata *prev = lookup_timed_metadata(in, in->last_playback_pts);
+ struct timed_metadata *cur = lookup_timed_metadata(in, pts);
+ if (prev != cur || in->force_metadata_update) {
+ in->force_metadata_update = false;
+ update_final_metadata(demuxer, cur);
+ demuxer->events |= DEMUX_EVENT_METADATA;
+ }
+
+ in->last_playback_pts = pts;
+
+ demuxer->events |= in->events;
+ in->events = 0;
+ if (demuxer->events & (DEMUX_EVENT_METADATA | DEMUX_EVENT_STREAMS))
+ demux_update_replaygain(demuxer);
+ if (demuxer->events & DEMUX_EVENT_DURATION)
+ demuxer->duration = in->duration;
+
+ mp_mutex_unlock(&in->lock);
+}
+
+static void demux_init_cuesheet(struct demuxer *demuxer)
+{
+ char *cue = mp_tags_get_str(demuxer->metadata, "cuesheet");
+ if (cue && !demuxer->num_chapters) {
+ struct cue_file *f = mp_parse_cue(bstr0(cue));
+ if (f) {
+ if (mp_check_embedded_cue(f) < 0) {
+ MP_WARN(demuxer, "Embedded cue sheet references more than one file. "
+ "Ignoring it.\n");
+ } else {
+ for (int n = 0; n < f->num_tracks; n++) {
+ struct cue_track *t = &f->tracks[n];
+ int idx = demuxer_add_chapter(demuxer, "", t->start, -1);
+ mp_tags_merge(demuxer->chapters[idx].metadata, t->tags);
+ }
+ }
+ }
+ talloc_free(f);
+ }
+}
+
+// A demuxer can use this during opening if all data was read from the stream.
+// Calling this after opening was completed is not allowed. Also, if opening
+// failed, this must not be called (or trying another demuxer would fail).
+// Useful so that e.g. subtitles don't keep the file or socket open.
+// If there's ever the situation where we can't allow the demuxer to close
+// the stream, this function could ignore the request.
+void demux_close_stream(struct demuxer *demuxer)
+{
+ struct demux_internal *in = demuxer->in;
+ assert(!in->threading && demuxer == in->d_thread);
+
+ if (!demuxer->stream || !in->owns_stream)
+ return;
+
+ MP_VERBOSE(demuxer, "demuxer read all data; closing stream\n");
+ free_stream(demuxer->stream);
+ demuxer->stream = NULL;
+ in->d_user->stream = NULL;
+}
+
+static void demux_init_ccs(struct demuxer *demuxer, struct demux_opts *opts)
+{
+ struct demux_internal *in = demuxer->in;
+ if (!opts->create_ccs)
+ return;
+ mp_mutex_lock(&in->lock);
+ for (int n = 0; n < in->num_streams; n++) {
+ struct sh_stream *sh = in->streams[n];
+ if (sh->type == STREAM_VIDEO && !sh->attached_picture)
+ demuxer_get_cc_track_locked(sh);
+ }
+ mp_mutex_unlock(&in->lock);
+}
+
+// Return whether "heavy" caching on this stream is enabled. By default, this
+// corresponds to whether the source stream is considered in the network. The
+// only effect should be adjusting display behavior (of cache stats etc.), and
+// possibly switching between which set of options influence cache settings.
+bool demux_is_network_cached(demuxer_t *demuxer)
+{
+ struct demux_internal *in = demuxer->in;
+ mp_mutex_lock(&in->lock);
+ bool r = in->using_network_cache_opts;
+ mp_mutex_unlock(&in->lock);
+ return r;
+}
+
+struct parent_stream_info {
+ bool seekable;
+ bool is_network;
+ bool is_streaming;
+ int stream_origin;
+ struct mp_cancel *cancel;
+ char *filename;
+};
+
+static struct demuxer *open_given_type(struct mpv_global *global,
+ struct mp_log *log,
+ const struct demuxer_desc *desc,
+ struct stream *stream,
+ struct parent_stream_info *sinfo,
+ struct demuxer_params *params,
+ enum demux_check check)
+{
+ if (mp_cancel_test(sinfo->cancel))
+ return NULL;
+
+ struct demuxer *demuxer = talloc_ptrtype(NULL, demuxer);
+ struct m_config_cache *opts_cache =
+ m_config_cache_alloc(demuxer, global, &demux_conf);
+ struct demux_opts *opts = opts_cache->opts;
+ *demuxer = (struct demuxer) {
+ .desc = desc,
+ .stream = stream,
+ .cancel = sinfo->cancel,
+ .seekable = sinfo->seekable,
+ .filepos = -1,
+ .global = global,
+ .log = mp_log_new(demuxer, log, desc->name),
+ .glog = log,
+ .filename = talloc_strdup(demuxer, sinfo->filename),
+ .is_network = sinfo->is_network,
+ .is_streaming = sinfo->is_streaming,
+ .stream_origin = sinfo->stream_origin,
+ .access_references = opts->access_references,
+ .opts = opts,
+ .opts_cache = opts_cache,
+ .events = DEMUX_EVENT_ALL,
+ .duration = -1,
+ };
+
+ struct demux_internal *in = demuxer->in = talloc_ptrtype(demuxer, in);
+ *in = (struct demux_internal){
+ .global = global,
+ .log = demuxer->log,
+ .stats = stats_ctx_create(in, global, "demuxer"),
+ .can_cache = params && params->is_top_level,
+ .can_record = params && params->stream_record,
+ .d_thread = talloc(demuxer, struct demuxer),
+ .d_user = demuxer,
+ .after_seek = true, // (assumed identical to initial demuxer state)
+ .after_seek_to_start = true,
+ .highest_av_pts = MP_NOPTS_VALUE,
+ .seeking_in_progress = MP_NOPTS_VALUE,
+ .demux_ts = MP_NOPTS_VALUE,
+ .owns_stream = !params->external_stream,
+ };
+ mp_mutex_init(&in->lock);
+ mp_cond_init(&in->wakeup);
+
+ *in->d_thread = *demuxer;
+
+ in->d_thread->metadata = talloc_zero(in->d_thread, struct mp_tags);
+
+ mp_dbg(log, "Trying demuxer: %s (force-level: %s)\n",
+ desc->name, d_level(check));
+
+ if (stream)
+ stream_seek(stream, 0);
+
+ in->d_thread->params = params; // temporary during open()
+ int ret = demuxer->desc->open(in->d_thread, check);
+ if (ret >= 0) {
+ in->d_thread->params = NULL;
+ if (in->d_thread->filetype)
+ mp_verbose(log, "Detected file format: %s (%s)\n",
+ in->d_thread->filetype, desc->desc);
+ else
+ mp_verbose(log, "Detected file format: %s\n", desc->desc);
+ if (!in->d_thread->seekable)
+ mp_verbose(log, "Stream is not seekable.\n");
+ if (!in->d_thread->seekable && opts->force_seekable) {
+ mp_warn(log, "Not seekable, but enabling seeking on user request.\n");
+ in->d_thread->seekable = true;
+ in->d_thread->partially_seekable = true;
+ }
+ demux_init_cuesheet(in->d_thread);
+ demux_init_ccs(demuxer, opts);
+ demux_convert_tags_charset(in->d_thread);
+ demux_copy(in->d_user, in->d_thread);
+ in->duration = in->d_thread->duration;
+ demuxer_sort_chapters(demuxer);
+ in->events = DEMUX_EVENT_ALL;
+
+ struct demuxer *sub = NULL;
+ if (!(params && params->disable_timeline)) {
+ struct timeline *tl = timeline_load(global, log, demuxer);
+ if (tl) {
+ struct demuxer_params params2 = {0};
+ params2.timeline = tl;
+ params2.is_top_level = params && params->is_top_level;
+ params2.stream_record = params && params->stream_record;
+ sub =
+ open_given_type(global, log, &demuxer_desc_timeline,
+ NULL, sinfo, &params2, DEMUX_CHECK_FORCE);
+ if (sub) {
+ in->can_cache = false;
+ in->can_record = false;
+ } else {
+ timeline_destroy(tl);
+ }
+ }
+ }
+
+ switch_to_fresh_cache_range(in);
+
+ update_opts(demuxer);
+
+ demux_update(demuxer, MP_NOPTS_VALUE);
+
+ demuxer = sub ? sub : demuxer;
+ return demuxer;
+ }
+
+ demuxer->stream = NULL;
+ demux_free(demuxer);
+ return NULL;
+}
+
+static const int d_normal[] = {DEMUX_CHECK_NORMAL, DEMUX_CHECK_UNSAFE, -1};
+static const int d_request[] = {DEMUX_CHECK_REQUEST, -1};
+static const int d_force[] = {DEMUX_CHECK_FORCE, -1};
+
+// params can be NULL
+// This may free the stream parameter on success.
+static struct demuxer *demux_open(struct stream *stream,
+ struct mp_cancel *cancel,
+ struct demuxer_params *params,
+ struct mpv_global *global)
+{
+ const int *check_levels = d_normal;
+ const struct demuxer_desc *check_desc = NULL;
+ struct mp_log *log = mp_log_new(NULL, global->log, "!demux");
+ struct demuxer *demuxer = NULL;
+ char *force_format = params ? params->force_format : NULL;
+
+ struct parent_stream_info sinfo = {
+ .seekable = stream->seekable,
+ .is_network = stream->is_network,
+ .is_streaming = stream->streaming,
+ .stream_origin = stream->stream_origin,
+ .cancel = cancel,
+ .filename = talloc_strdup(NULL, stream->url),
+ };
+
+ if (!force_format)
+ force_format = stream->demuxer;
+
+ if (force_format && force_format[0] && !stream->is_directory) {
+ check_levels = d_request;
+ if (force_format[0] == '+') {
+ force_format += 1;
+ check_levels = d_force;
+ }
+ for (int n = 0; demuxer_list[n]; n++) {
+ if (strcmp(demuxer_list[n]->name, force_format) == 0) {
+ check_desc = demuxer_list[n];
+ break;
+ }
+ }
+ if (!check_desc) {
+ mp_err(log, "Demuxer %s does not exist.\n", force_format);
+ goto done;
+ }
+ }
+
+ // Test demuxers from first to last, one pass for each check_levels[] entry
+ for (int pass = 0; check_levels[pass] != -1; pass++) {
+ enum demux_check level = check_levels[pass];
+ mp_verbose(log, "Trying demuxers for level=%s.\n", d_level(level));
+ for (int n = 0; demuxer_list[n]; n++) {
+ const struct demuxer_desc *desc = demuxer_list[n];
+ if (!check_desc || desc == check_desc) {
+ demuxer = open_given_type(global, log, desc, stream, &sinfo,
+ params, level);
+ if (demuxer) {
+ talloc_steal(demuxer, log);
+ log = NULL;
+ goto done;
+ }
+ }
+ }
+ }
+
+done:
+ talloc_free(sinfo.filename);
+ talloc_free(log);
+ return demuxer;
+}
+
+static struct stream *create_webshit_concat_stream(struct mpv_global *global,
+ struct mp_cancel *c,
+ bstr init, struct stream *real)
+{
+ struct stream *mem = stream_memory_open(global, init.start, init.len);
+ assert(mem);
+
+ struct stream *streams[2] = {mem, real};
+ struct stream *concat = stream_concat_open(global, c, streams, 2);
+ if (!concat) {
+ free_stream(mem);
+ free_stream(real);
+ }
+ return concat;
+}
+
+// Convenience function: open the stream, enable the cache (according to params
+// and global opts.), open the demuxer.
+// Also for some reason may close the opened stream if it's not needed.
+// demuxer->cancel is not the cancel parameter, but is its own object that will
+// be a slave (mp_cancel_set_parent()) to provided cancel object.
+// demuxer->cancel is automatically freed.
+struct demuxer *demux_open_url(const char *url,
+ struct demuxer_params *params,
+ struct mp_cancel *cancel,
+ struct mpv_global *global)
+{
+ if (!params)
+ return NULL;
+ struct mp_cancel *priv_cancel = mp_cancel_new(NULL);
+ if (cancel)
+ mp_cancel_set_parent(priv_cancel, cancel);
+ struct stream *s = params->external_stream;
+ if (!s) {
+ s = stream_create(url, STREAM_READ | params->stream_flags,
+ priv_cancel, global);
+ if (s && params->init_fragment.len) {
+ s = create_webshit_concat_stream(global, priv_cancel,
+ params->init_fragment, s);
+ }
+ }
+ if (!s) {
+ talloc_free(priv_cancel);
+ return NULL;
+ }
+ struct demuxer *d = demux_open(s, priv_cancel, params, global);
+ if (d) {
+ talloc_steal(d->in, priv_cancel);
+ assert(d->cancel);
+ } else {
+ params->demuxer_failed = true;
+ if (!params->external_stream)
+ free_stream(s);
+ talloc_free(priv_cancel);
+ }
+ return d;
+}
+
+// clear the packet queues
+void demux_flush(demuxer_t *demuxer)
+{
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_user);
+
+ mp_mutex_lock(&in->lock);
+ clear_reader_state(in, true);
+ for (int n = 0; n < in->num_ranges; n++)
+ clear_cached_range(in, in->ranges[n]);
+ free_empty_cached_ranges(in);
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+ ds->refreshing = false;
+ ds->eof = false;
+ }
+ in->eof = false;
+ in->seeking = false;
+ mp_mutex_unlock(&in->lock);
+}
+
+// Does some (but not all) things for switching to another range.
+static void switch_current_range(struct demux_internal *in,
+ struct demux_cached_range *range)
+{
+ struct demux_cached_range *old = in->current_range;
+ assert(old != range);
+
+ set_current_range(in, range);
+
+ if (old) {
+ // Remove packets which can't be used when seeking back to the range.
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_queue *queue = old->streams[n];
+
+ // Remove all packets which cannot be involved in seeking.
+ while (queue->head && !queue->head->keyframe)
+ remove_head_packet(queue);
+ }
+
+ // Exclude weird corner cases that break resuming.
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+ // This is needed to resume or join the range at all.
+ if (ds->selected && !(ds->global_correct_dts ||
+ ds->global_correct_pos))
+ {
+ MP_VERBOSE(in, "discarding unseekable range due to stream %d\n", n);
+ clear_cached_range(in, old);
+ break;
+ }
+ }
+ }
+
+ // Set up reading from new range (as well as writing to it).
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+
+ ds->queue = range->streams[n];
+ ds->refreshing = false;
+ ds->eof = false;
+ }
+
+ // No point in keeping any junk (especially if old current_range is empty).
+ free_empty_cached_ranges(in);
+
+ // The change detection doesn't work across ranges.
+ in->force_metadata_update = true;
+}
+
+// Search for the entry with the highest index with entry.pts <= pts true.
+static struct demux_packet *search_index(struct demux_queue *queue, double pts)
+{
+ size_t a = 0;
+ size_t b = queue->num_index;
+
+ while (a < b) {
+ size_t m = a + (b - a) / 2;
+ struct index_entry *e = &QUEUE_INDEX_ENTRY(queue, m);
+
+ bool m_ok = e->pts <= pts;
+
+ if (a + 1 == b)
+ return m_ok ? e->pkt : NULL;
+
+ if (m_ok) {
+ a = m;
+ } else {
+ b = m;
+ }
+ }
+
+ return NULL;
+}
+
+static struct demux_packet *find_seek_target(struct demux_queue *queue,
+ double pts, int flags)
+{
+ pts -= queue->ds->sh->seek_preroll;
+
+ struct demux_packet *start = search_index(queue, pts);
+ if (!start)
+ start = queue->head;
+
+ struct demux_packet *target = NULL;
+ struct demux_packet *next = NULL;
+ for (struct demux_packet *dp = start; dp; dp = next) {
+ next = dp->next;
+ if (!dp->keyframe)
+ continue;
+
+ double range_pts;
+ next = compute_keyframe_times(dp, &range_pts, NULL);
+
+ if (range_pts == MP_NOPTS_VALUE)
+ continue;
+
+ if (flags & SEEK_FORWARD) {
+ // Stop on the first packet that is >= pts.
+ if (target)
+ break;
+ if (range_pts < pts)
+ continue;
+ } else {
+ // Stop before the first packet that is > pts.
+ // This still returns a packet with > pts if there's no better one.
+ if (target && range_pts > pts)
+ break;
+ }
+
+ target = dp;
+ }
+
+ return target;
+}
+
+// Return a cache range for the given pts/flags, or NULL if none available.
+// must be called locked
+static struct demux_cached_range *find_cache_seek_range(struct demux_internal *in,
+ double pts, int flags)
+{
+ // Note about queued low level seeks: in->seeking can be true here, and it
+ // might come from a previous resume seek to the current range. If we end
+ // up seeking into the current range (i.e. just changing time offset), the
+ // seek needs to continue. Otherwise, we override the queued seek anyway.
+ if ((flags & SEEK_FACTOR) || !in->seekable_cache)
+ return NULL;
+
+ struct demux_cached_range *res = NULL;
+
+ for (int n = 0; n < in->num_ranges; n++) {
+ struct demux_cached_range *r = in->ranges[n];
+ if (r->seek_start != MP_NOPTS_VALUE) {
+ MP_VERBOSE(in, "cached range %d: %f <-> %f (bof=%d, eof=%d)\n",
+ n, r->seek_start, r->seek_end, r->is_bof, r->is_eof);
+
+ if ((pts >= r->seek_start || r->is_bof) &&
+ (pts <= r->seek_end || r->is_eof))
+ {
+ MP_VERBOSE(in, "...using this range for in-cache seek.\n");
+ res = r;
+ break;
+ }
+ }
+ }
+
+ return res;
+}
+
+// Adjust the seek target to the found video key frames. Otherwise the
+// video will undershoot the seek target, while audio will be closer to it.
+// The player frontend will play the additional video without audio, so
+// you get silent audio for the amount of "undershoot". Adjusting the seek
+// target will make the audio seek to the video target or before.
+// (If hr-seeks are used, it's better to skip this, as it would only mean
+// that more audio data than necessary would have to be decoded.)
+static void adjust_cache_seek_target(struct demux_internal *in,
+ struct demux_cached_range *range,
+ double *pts, int *flags)
+{
+ if (*flags & SEEK_HR)
+ return;
+
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+ struct demux_queue *queue = range->streams[n];
+ if (ds->selected && ds->type == STREAM_VIDEO) {
+ struct demux_packet *target = find_seek_target(queue, *pts, *flags);
+ if (target) {
+ double target_pts;
+ compute_keyframe_times(target, &target_pts, NULL);
+ if (target_pts != MP_NOPTS_VALUE) {
+ MP_VERBOSE(in, "adjust seek target %f -> %f\n",
+ *pts, target_pts);
+ // (We assume the find_seek_target() call will return
+ // the same target for the video stream.)
+ *pts = target_pts;
+ *flags &= ~SEEK_FORWARD;
+ }
+ }
+ break;
+ }
+ }
+}
+
+// must be called locked
+// range must be non-NULL and from find_cache_seek_range() using the same pts
+// and flags, before any other changes to the cached state
+static void execute_cache_seek(struct demux_internal *in,
+ struct demux_cached_range *range,
+ double pts, int flags)
+{
+ adjust_cache_seek_target(in, range, &pts, &flags);
+
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+ struct demux_queue *queue = range->streams[n];
+
+ struct demux_packet *target = find_seek_target(queue, pts, flags);
+ ds->reader_head = target;
+ ds->skip_to_keyframe = !target;
+ if (ds->reader_head)
+ ds->base_ts = MP_PTS_OR_DEF(ds->reader_head->pts, ds->reader_head->dts);
+
+ MP_VERBOSE(in, "seeking stream %d (%s) to ",
+ n, stream_type_name(ds->type));
+
+ if (target) {
+ MP_VERBOSE(in, "packet %f/%f\n", target->pts, target->dts);
+ } else {
+ MP_VERBOSE(in, "nothing\n");
+ }
+ }
+
+ // If we seek to another range, we want to seek the low level demuxer to
+ // there as well, because reader and demuxer queue must be the same.
+ if (in->current_range != range) {
+ switch_current_range(in, range);
+
+ in->seeking = true;
+ in->seek_flags = SEEK_HR;
+ in->seek_pts = range->seek_end - 1.0;
+
+ // When new packets are being appended, they could overlap with the old
+ // range due to demuxer seek imprecisions, or because the queue contains
+ // packets past the seek target but before the next seek target. Don't
+ // append them twice, instead skip them until new packets are found.
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+
+ ds->refreshing = ds->selected;
+ }
+
+ MP_VERBOSE(in, "resuming demuxer to end of cached range\n");
+ }
+}
+
+// Create a new blank cache range, and backup the old one. If the seekable
+// demuxer cache is disabled, merely reset the current range to a blank state.
+static void switch_to_fresh_cache_range(struct demux_internal *in)
+{
+ if (!in->seekable_cache && in->current_range) {
+ clear_cached_range(in, in->current_range);
+ return;
+ }
+
+ struct demux_cached_range *range = talloc_ptrtype(NULL, range);
+ *range = (struct demux_cached_range){
+ .seek_start = MP_NOPTS_VALUE,
+ .seek_end = MP_NOPTS_VALUE,
+ };
+ MP_TARRAY_APPEND(in, in->ranges, in->num_ranges, range);
+ add_missing_streams(in, range);
+
+ switch_current_range(in, range);
+}
+
+int demux_seek(demuxer_t *demuxer, double seek_pts, int flags)
+{
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_user);
+
+ mp_mutex_lock(&in->lock);
+
+ if (!(flags & SEEK_FACTOR))
+ seek_pts = MP_ADD_PTS(seek_pts, -in->ts_offset);
+
+ int res = queue_seek(in, seek_pts, flags, true);
+
+ mp_cond_signal(&in->wakeup);
+ mp_mutex_unlock(&in->lock);
+
+ return res;
+}
+
+static bool queue_seek(struct demux_internal *in, double seek_pts, int flags,
+ bool clear_back_state)
+{
+ if (seek_pts == MP_NOPTS_VALUE)
+ return false;
+
+ MP_VERBOSE(in, "queuing seek to %f%s\n", seek_pts,
+ in->seeking ? " (cascade)" : "");
+
+ bool require_cache = flags & SEEK_CACHED;
+ flags &= ~(unsigned)SEEK_CACHED;
+
+ bool set_backwards = flags & SEEK_SATAN;
+ flags &= ~(unsigned)SEEK_SATAN;
+
+ bool force_seek = flags & SEEK_FORCE;
+ flags &= ~(unsigned)SEEK_FORCE;
+
+ bool block = flags & SEEK_BLOCK;
+ flags &= ~(unsigned)SEEK_BLOCK;
+
+ struct demux_cached_range *cache_target =
+ find_cache_seek_range(in, seek_pts, flags);
+
+ if (!cache_target) {
+ if (require_cache) {
+ MP_VERBOSE(in, "Cached seek not possible.\n");
+ return false;
+ }
+ if (!in->d_thread->seekable && !force_seek) {
+ MP_WARN(in, "Cannot seek in this file.\n");
+ return false;
+ }
+ }
+
+ in->eof = false;
+ in->reading = false;
+ in->back_demuxing = set_backwards;
+
+ clear_reader_state(in, clear_back_state);
+
+ in->blocked = block;
+
+ if (cache_target) {
+ execute_cache_seek(in, cache_target, seek_pts, flags);
+ } else {
+ switch_to_fresh_cache_range(in);
+
+ in->seeking = true;
+ in->seek_flags = flags;
+ in->seek_pts = seek_pts;
+ }
+
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+
+ if (in->back_demuxing) {
+ if (ds->back_seek_pos == MP_NOPTS_VALUE)
+ ds->back_seek_pos = seek_pts;
+ // Process possibly cached packets.
+ back_demux_see_packets(in->streams[n]->ds);
+ }
+
+ wakeup_ds(ds);
+ }
+
+ if (!in->threading && in->seeking)
+ execute_seek(in);
+
+ return true;
+}
+
+struct sh_stream *demuxer_stream_by_demuxer_id(struct demuxer *d,
+ enum stream_type t, int id)
+{
+ if (id < 0)
+ return NULL;
+ int num = demux_get_num_stream(d);
+ for (int n = 0; n < num; n++) {
+ struct sh_stream *s = demux_get_stream(d, n);
+ if (s->type == t && s->demuxer_id == id)
+ return s;
+ }
+ return NULL;
+}
+
+// An obscure mechanism to get stream switching to be executed "faster" (as
+// perceived by the user), by making the stream return packets from the
+// current position
+// On a switch, it seeks back, and then grabs all packets that were
+// "missing" from the packet queue of the newly selected stream.
+static void initiate_refresh_seek(struct demux_internal *in,
+ struct demux_stream *stream,
+ double start_ts)
+{
+ struct demuxer *demux = in->d_thread;
+ bool seekable = demux->desc->seek && demux->seekable &&
+ !demux->partially_seekable;
+
+ bool normal_seek = true;
+ bool refresh_possible = true;
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+
+ if (!ds->selected)
+ continue;
+
+ if (ds->type == STREAM_VIDEO || ds->type == STREAM_AUDIO)
+ start_ts = MP_PTS_MIN(start_ts, ds->base_ts);
+
+ // If there were no other streams selected, we can use a normal seek.
+ normal_seek &= stream == ds;
+
+ refresh_possible &= ds->queue->correct_dts || ds->queue->correct_pos;
+ }
+
+ if (start_ts == MP_NOPTS_VALUE || !seekable)
+ return;
+
+ if (!normal_seek) {
+ if (!refresh_possible) {
+ MP_VERBOSE(in, "can't issue refresh seek\n");
+ return;
+ }
+
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+
+ bool correct_pos = ds->queue->correct_pos;
+ bool correct_dts = ds->queue->correct_dts;
+
+ // We need to re-read all packets anyway, so discard the buffered
+ // data. (In theory, we could keep the packets, and be able to use
+ // it for seeking if partially read streams are deselected again,
+ // but this causes other problems like queue overflows when
+ // selecting a new stream.)
+ ds_clear_reader_queue_state(ds);
+ clear_queue(ds->queue);
+
+ // Streams which didn't have any packets yet will return all packets,
+ // other streams return packets only starting from the last position.
+ if (ds->selected && (ds->last_ret_pos != -1 ||
+ ds->last_ret_dts != MP_NOPTS_VALUE))
+ {
+ ds->refreshing = true;
+ ds->queue->correct_dts = correct_dts;
+ ds->queue->correct_pos = correct_pos;
+ ds->queue->last_pos = ds->last_ret_pos;
+ ds->queue->last_dts = ds->last_ret_dts;
+ }
+
+ update_seek_ranges(in->current_range);
+ }
+
+ start_ts -= 1.0; // small offset to get correct overlap
+ }
+
+ MP_VERBOSE(in, "refresh seek to %f\n", start_ts);
+ in->seeking = true;
+ in->seek_flags = SEEK_HR;
+ in->seek_pts = start_ts;
+}
+
+// Set whether the given stream should return packets.
+// ref_pts is used only if the stream is enabled. Then it serves as approximate
+// start pts for this stream (in the worst case it is ignored).
+void demuxer_select_track(struct demuxer *demuxer, struct sh_stream *stream,
+ double ref_pts, bool selected)
+{
+ struct demux_internal *in = demuxer->in;
+ struct demux_stream *ds = stream->ds;
+ mp_mutex_lock(&in->lock);
+ ref_pts = MP_ADD_PTS(ref_pts, -in->ts_offset);
+ // don't flush buffers if stream is already selected / unselected
+ if (ds->selected != selected) {
+ MP_VERBOSE(in, "%sselect track %d\n", selected ? "" : "de", stream->index);
+ ds->selected = selected;
+ update_stream_selection_state(in, ds);
+ in->tracks_switched = true;
+ if (ds->selected) {
+ if (in->back_demuxing)
+ ds->back_seek_pos = ref_pts;
+ if (!in->after_seek)
+ initiate_refresh_seek(in, ds, ref_pts);
+ }
+ if (in->threading) {
+ mp_cond_signal(&in->wakeup);
+ } else {
+ execute_trackswitch(in);
+ }
+ }
+ mp_mutex_unlock(&in->lock);
+}
+
+// Execute a refresh seek on the given stream.
+// ref_pts has the same meaning as with demuxer_select_track()
+void demuxer_refresh_track(struct demuxer *demuxer, struct sh_stream *stream,
+ double ref_pts)
+{
+ struct demux_internal *in = demuxer->in;
+ struct demux_stream *ds = stream->ds;
+ mp_mutex_lock(&in->lock);
+ ref_pts = MP_ADD_PTS(ref_pts, -in->ts_offset);
+ if (ds->selected) {
+ MP_VERBOSE(in, "refresh track %d\n", stream->index);
+ update_stream_selection_state(in, ds);
+ if (in->back_demuxing)
+ ds->back_seek_pos = ref_pts;
+ if (!in->after_seek)
+ initiate_refresh_seek(in, ds, ref_pts);
+ }
+ mp_mutex_unlock(&in->lock);
+}
+
+// This is for demuxer implementations only. demuxer_select_track() sets the
+// logical state, while this function returns the actual state (in case the
+// demuxer attempts to cache even unselected packets for track switching - this
+// will potentially be done in the future).
+bool demux_stream_is_selected(struct sh_stream *stream)
+{
+ if (!stream)
+ return false;
+ bool r = false;
+ mp_mutex_lock(&stream->ds->in->lock);
+ r = stream->ds->selected;
+ mp_mutex_unlock(&stream->ds->in->lock);
+ return r;
+}
+
+void demux_set_stream_wakeup_cb(struct sh_stream *sh,
+ void (*cb)(void *ctx), void *ctx)
+{
+ mp_mutex_lock(&sh->ds->in->lock);
+ sh->ds->wakeup_cb = cb;
+ sh->ds->wakeup_cb_ctx = ctx;
+ sh->ds->need_wakeup = true;
+ mp_mutex_unlock(&sh->ds->in->lock);
+}
+
+int demuxer_add_attachment(demuxer_t *demuxer, char *name, char *type,
+ void *data, size_t data_size)
+{
+ if (!(demuxer->num_attachments % 32))
+ demuxer->attachments = talloc_realloc(demuxer, demuxer->attachments,
+ struct demux_attachment,
+ demuxer->num_attachments + 32);
+
+ struct demux_attachment *att = &demuxer->attachments[demuxer->num_attachments];
+ att->name = talloc_strdup(demuxer->attachments, name);
+ att->type = talloc_strdup(demuxer->attachments, type);
+ att->data = talloc_memdup(demuxer->attachments, data, data_size);
+ att->data_size = data_size;
+
+ return demuxer->num_attachments++;
+}
+
+static int chapter_compare(const void *p1, const void *p2)
+{
+ struct demux_chapter *c1 = (void *)p1;
+ struct demux_chapter *c2 = (void *)p2;
+
+ if (c1->pts > c2->pts)
+ return 1;
+ else if (c1->pts < c2->pts)
+ return -1;
+ return c1->original_index > c2->original_index ? 1 :-1; // never equal
+}
+
+static void demuxer_sort_chapters(demuxer_t *demuxer)
+{
+ if (demuxer->num_chapters) {
+ qsort(demuxer->chapters, demuxer->num_chapters,
+ sizeof(struct demux_chapter), chapter_compare);
+ }
+}
+
+int demuxer_add_chapter(demuxer_t *demuxer, char *name,
+ double pts, uint64_t demuxer_id)
+{
+ struct demux_chapter new = {
+ .original_index = demuxer->num_chapters,
+ .pts = pts,
+ .metadata = talloc_zero(demuxer, struct mp_tags),
+ .demuxer_id = demuxer_id,
+ };
+ mp_tags_set_str(new.metadata, "TITLE", name);
+ MP_TARRAY_APPEND(demuxer, demuxer->chapters, demuxer->num_chapters, new);
+ return demuxer->num_chapters - 1;
+}
+
+// Disallow reading any packets and make readers think there is no new data
+// yet, until a seek is issued.
+void demux_block_reading(struct demuxer *demuxer, bool block)
+{
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_user);
+
+ mp_mutex_lock(&in->lock);
+ in->blocked = block;
+ for (int n = 0; n < in->num_streams; n++) {
+ in->streams[n]->ds->need_wakeup = true;
+ wakeup_ds(in->streams[n]->ds);
+ }
+ mp_cond_signal(&in->wakeup);
+ mp_mutex_unlock(&in->lock);
+}
+
+static void update_bytes_read(struct demux_internal *in)
+{
+ struct demuxer *demuxer = in->d_thread;
+
+ int64_t new = in->slave_unbuffered_read_bytes;
+ in->slave_unbuffered_read_bytes = 0;
+
+ int64_t new_seeks = 0;
+
+ struct stream *stream = demuxer->stream;
+ if (stream) {
+ new += stream->total_unbuffered_read_bytes;
+ stream->total_unbuffered_read_bytes = 0;
+ new_seeks += stream->total_stream_seeks;
+ stream->total_stream_seeks = 0;
+ }
+
+ in->cache_unbuffered_read_bytes += new;
+ in->hack_unbuffered_read_bytes += new;
+ in->byte_level_seeks += new_seeks;
+}
+
+// must be called locked, temporarily unlocks
+static void update_cache(struct demux_internal *in)
+{
+ struct demuxer *demuxer = in->d_thread;
+ struct stream *stream = demuxer->stream;
+
+ int64_t now = mp_time_ns();
+ int64_t diff = now - in->last_speed_query;
+ bool do_update = diff >= MP_TIME_S_TO_NS(1) || !in->last_speed_query;
+
+ // Don't lock while querying the stream.
+ mp_mutex_unlock(&in->lock);
+
+ int64_t stream_size = -1;
+ struct mp_tags *stream_metadata = NULL;
+ if (stream) {
+ if (do_update)
+ stream_size = stream_get_size(stream);
+ stream_control(stream, STREAM_CTRL_GET_METADATA, &stream_metadata);
+ }
+
+ mp_mutex_lock(&in->lock);
+
+ update_bytes_read(in);
+
+ if (do_update)
+ in->stream_size = stream_size;
+ if (stream_metadata) {
+ add_timed_metadata(in, stream_metadata, NULL, MP_NOPTS_VALUE);
+ talloc_free(stream_metadata);
+ }
+
+ in->next_cache_update = INT64_MAX;
+
+ if (do_update) {
+ uint64_t bytes = in->cache_unbuffered_read_bytes;
+ in->cache_unbuffered_read_bytes = 0;
+ in->last_speed_query = now;
+ double speed = bytes / (diff / (double)MP_TIME_S_TO_NS(1));
+ in->bytes_per_second = 0.5 * in->speed_query_prev_sample +
+ 0.5 * speed;
+ in->speed_query_prev_sample = speed;
+ }
+ // The idea is to update as long as there is "activity".
+ if (in->bytes_per_second)
+ in->next_cache_update = now + MP_TIME_S_TO_NS(1) + MP_TIME_US_TO_NS(1);
+}
+
+static void dumper_close(struct demux_internal *in)
+{
+ if (in->dumper)
+ mp_recorder_destroy(in->dumper);
+ in->dumper = NULL;
+ if (in->dumper_status == CONTROL_TRUE)
+ in->dumper_status = CONTROL_FALSE; // make abort equal to success
+}
+
+static int range_time_compare(const void *p1, const void *p2)
+{
+ struct demux_cached_range *r1 = *((struct demux_cached_range **)p1);
+ struct demux_cached_range *r2 = *((struct demux_cached_range **)p2);
+
+ if (r1->seek_start == r2->seek_start)
+ return 0;
+ return r1->seek_start < r2->seek_start ? -1 : 1;
+}
+
+static void dump_cache(struct demux_internal *in, double start, double end)
+{
+ in->dumper_status = in->dumper ? CONTROL_TRUE : CONTROL_ERROR;
+ if (!in->dumper)
+ return;
+
+ // (only in pathological cases there might be more ranges than allowed)
+ struct demux_cached_range *ranges[MAX_SEEK_RANGES];
+ int num_ranges = 0;
+ for (int n = 0; n < MPMIN(MP_ARRAY_SIZE(ranges), in->num_ranges); n++)
+ ranges[num_ranges++] = in->ranges[n];
+ qsort(ranges, num_ranges, sizeof(ranges[0]), range_time_compare);
+
+ for (int n = 0; n < num_ranges; n++) {
+ struct demux_cached_range *r = ranges[n];
+ if (r->seek_start == MP_NOPTS_VALUE)
+ continue;
+ if (r->seek_end <= start)
+ continue;
+ if (end != MP_NOPTS_VALUE && r->seek_start >= end)
+ continue;
+
+ mp_recorder_mark_discontinuity(in->dumper);
+
+ double pts = start;
+ int flags = 0;
+ adjust_cache_seek_target(in, r, &pts, &flags);
+
+ for (int i = 0; i < r->num_streams; i++) {
+ struct demux_queue *q = r->streams[i];
+ struct demux_stream *ds = q->ds;
+
+ ds->dump_pos = find_seek_target(q, pts, flags);
+ }
+
+ // We need to reinterleave the separate streams somehow, which makes
+ // everything more complex.
+ while (1) {
+ struct demux_packet *next = NULL;
+ double next_dts = MP_NOPTS_VALUE;
+
+ for (int i = 0; i < r->num_streams; i++) {
+ struct demux_stream *ds = r->streams[i]->ds;
+ struct demux_packet *dp = ds->dump_pos;
+
+ if (!dp)
+ continue;
+ assert(dp->stream == ds->index);
+
+ double pdts = MP_PTS_OR_DEF(dp->dts, dp->pts);
+
+ // Check for stream EOF. Note that we don't try to EOF
+ // streams at the same point (e.g. video can take longer
+ // to finish than audio, so the output file will have no
+ // audio for the last part of the video). Too much effort.
+ if (pdts != MP_NOPTS_VALUE && end != MP_NOPTS_VALUE &&
+ pdts >= end && dp->keyframe)
+ {
+ ds->dump_pos = NULL;
+ continue;
+ }
+
+ if (pdts == MP_NOPTS_VALUE || next_dts == MP_NOPTS_VALUE ||
+ pdts < next_dts)
+ {
+ next_dts = pdts;
+ next = dp;
+ }
+ }
+
+ if (!next)
+ break;
+
+ struct demux_stream *ds = in->streams[next->stream]->ds;
+ ds->dump_pos = next->next;
+
+ struct demux_packet *dp = read_packet_from_cache(in, next);
+ if (!dp) {
+ in->dumper_status = CONTROL_ERROR;
+ break;
+ }
+
+ write_dump_packet(in, dp);
+
+ talloc_free(dp);
+ }
+
+ if (in->dumper_status != CONTROL_OK)
+ break;
+ }
+
+ // (strictly speaking unnecessary; for clarity)
+ for (int n = 0; n < in->num_streams; n++)
+ in->streams[n]->ds->dump_pos = NULL;
+
+ // If dumping (in end==NOPTS mode) doesn't continue at the range that
+ // was written last, we have a discontinuity.
+ if (num_ranges && ranges[num_ranges - 1] != in->current_range)
+ mp_recorder_mark_discontinuity(in->dumper);
+
+ // end=NOPTS means the demuxer output continues to be written to the
+ // dump file.
+ if (end != MP_NOPTS_VALUE || in->dumper_status != CONTROL_OK)
+ dumper_close(in);
+}
+
+// Set the current cache dumping mode. There is only at most 1 dump process
+// active, so calling this aborts the previous dumping. Passing file==NULL
+// stops dumping.
+// This is synchronous with demux_cache_dump_get_status() (i.e. starting or
+// aborting is not asynchronous). On status change, the demuxer wakeup callback
+// is invoked (except for this call).
+// Returns whether dumping was logically started.
+bool demux_cache_dump_set(struct demuxer *demuxer, double start, double end,
+ char *file)
+{
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_user);
+
+ bool res = false;
+
+ mp_mutex_lock(&in->lock);
+
+ start = MP_ADD_PTS(start, -in->ts_offset);
+ end = MP_ADD_PTS(end, -in->ts_offset);
+
+ dumper_close(in);
+
+ if (file && file[0] && start != MP_NOPTS_VALUE) {
+ res = true;
+
+ in->dumper = recorder_create(in, file);
+
+ // This is not asynchronous and will freeze the shit for a while if the
+ // user is unlucky. It could be moved to a thread with some effort.
+ // General idea: iterate over all cache ranges, dump what intersects.
+ // After that, and if the user requested it, make it dump all newly
+ // received packets, even if it's awkward (consider the case if the
+ // current range is not the last range).
+ dump_cache(in, start, end);
+ }
+
+ mp_mutex_unlock(&in->lock);
+
+ return res;
+}
+
+// Returns one of CONTROL_*. CONTROL_TRUE means dumping is in progress.
+int demux_cache_dump_get_status(struct demuxer *demuxer)
+{
+ struct demux_internal *in = demuxer->in;
+ mp_mutex_lock(&in->lock);
+ int status = in->dumper_status;
+ mp_mutex_unlock(&in->lock);
+ return status;
+}
+
+// Return what range demux_cache_dump_set() would (probably) yield. This is a
+// conservative amount (in addition to internal consistency of this code, it
+// depends on what a player will do with the resulting file).
+// Use for_end==true to get the end of dumping, other the start.
+// Returns NOPTS if nothing was found.
+double demux_probe_cache_dump_target(struct demuxer *demuxer, double pts,
+ bool for_end)
+{
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_user);
+
+ double res = MP_NOPTS_VALUE;
+ if (pts == MP_NOPTS_VALUE)
+ return pts;
+
+ mp_mutex_lock(&in->lock);
+
+ pts = MP_ADD_PTS(pts, -in->ts_offset);
+
+ // (When determining the end, look before the keyframe at pts, so subtract
+ // an arbitrary amount to round down.)
+ double seek_pts = for_end ? pts - 0.001 : pts;
+ int flags = 0;
+ struct demux_cached_range *r = find_cache_seek_range(in, seek_pts, flags);
+ if (r) {
+ if (!for_end)
+ adjust_cache_seek_target(in, r, &pts, &flags);
+
+ double t[STREAM_TYPE_COUNT];
+ for (int n = 0; n < STREAM_TYPE_COUNT; n++)
+ t[n] = MP_NOPTS_VALUE;
+
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+ struct demux_queue *q = r->streams[n];
+
+ struct demux_packet *dp = find_seek_target(q, pts, flags);
+ if (dp) {
+ if (for_end) {
+ while (dp) {
+ double pdts = MP_PTS_OR_DEF(dp->dts, dp->pts);
+
+ if (pdts != MP_NOPTS_VALUE && pdts >= pts && dp->keyframe)
+ break;
+
+ t[ds->type] = MP_PTS_MAX(t[ds->type], pdts);
+
+ dp = dp->next;
+ }
+ } else {
+ double start;
+ compute_keyframe_times(dp, &start, NULL);
+ start = MP_PTS_MAX(start, r->seek_start);
+ t[ds->type] = MP_PTS_MAX(t[ds->type], start);
+ }
+ }
+ }
+
+ res = t[STREAM_VIDEO];
+ if (res == MP_NOPTS_VALUE)
+ res = t[STREAM_AUDIO];
+ if (res == MP_NOPTS_VALUE) {
+ for (int n = 0; n < STREAM_TYPE_COUNT; n++) {
+ res = t[n];
+ if (res != MP_NOPTS_VALUE)
+ break;
+ }
+ }
+ }
+
+ res = MP_ADD_PTS(res, in->ts_offset);
+
+ mp_mutex_unlock(&in->lock);
+
+ return res;
+}
+
+// Used by demuxers to report the amount of transferred bytes. This is for
+// streams which circumvent demuxer->stream (stream statistics are handled by
+// demux.c itself).
+void demux_report_unbuffered_read_bytes(struct demuxer *demuxer, int64_t new)
+{
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_thread);
+
+ in->slave_unbuffered_read_bytes += new;
+}
+
+// Return bytes read since last query. It's a hack because it works only if
+// the demuxer thread is disabled.
+int64_t demux_get_bytes_read_hack(struct demuxer *demuxer)
+{
+ struct demux_internal *in = demuxer->in;
+
+ // Required because demuxer==in->d_user, and we access in->d_thread.
+ // Locking won't solve this, because we also need to access struct stream.
+ assert(!in->threading);
+
+ update_bytes_read(in);
+
+ int64_t res = in->hack_unbuffered_read_bytes;
+ in->hack_unbuffered_read_bytes = 0;
+ return res;
+}
+
+void demux_get_bitrate_stats(struct demuxer *demuxer, double *rates)
+{
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_user);
+
+ mp_mutex_lock(&in->lock);
+
+ for (int n = 0; n < STREAM_TYPE_COUNT; n++)
+ rates[n] = -1;
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+ if (ds->selected && ds->bitrate >= 0)
+ rates[ds->type] = MPMAX(0, rates[ds->type]) + ds->bitrate;
+ }
+
+ mp_mutex_unlock(&in->lock);
+}
+
+void demux_get_reader_state(struct demuxer *demuxer, struct demux_reader_state *r)
+{
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_user);
+
+ mp_mutex_lock(&in->lock);
+
+ *r = (struct demux_reader_state){
+ .eof = in->eof,
+ .ts_reader = MP_NOPTS_VALUE,
+ .ts_end = MP_NOPTS_VALUE,
+ .ts_duration = -1,
+ .total_bytes = in->total_bytes,
+ .seeking = in->seeking_in_progress,
+ .low_level_seeks = in->low_level_seeks,
+ .ts_last = in->demux_ts,
+ .bytes_per_second = in->bytes_per_second,
+ .byte_level_seeks = in->byte_level_seeks,
+ .file_cache_bytes = in->cache ? demux_cache_get_size(in->cache) : -1,
+ };
+ bool any_packets = false;
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+ if (ds->eager && !(!ds->queue->head && ds->eof) && !ds->ignore_eof) {
+ r->underrun |= !ds->reader_head && !ds->eof && !ds->still_image;
+ r->ts_reader = MP_PTS_MAX(r->ts_reader, ds->base_ts);
+ r->ts_end = MP_PTS_MAX(r->ts_end, ds->queue->last_ts);
+ any_packets |= !!ds->reader_head;
+ }
+ r->fw_bytes += get_forward_buffered_bytes(ds);
+ }
+ r->idle = (!in->reading && !r->underrun) || r->eof;
+ r->underrun &= !r->idle && in->threading;
+ r->ts_reader = MP_ADD_PTS(r->ts_reader, in->ts_offset);
+ r->ts_end = MP_ADD_PTS(r->ts_end, in->ts_offset);
+ if (r->ts_reader != MP_NOPTS_VALUE && r->ts_reader <= r->ts_end)
+ r->ts_duration = r->ts_end - r->ts_reader;
+ if (in->seeking || !any_packets)
+ r->ts_duration = 0;
+ for (int n = 0; n < MPMIN(in->num_ranges, MAX_SEEK_RANGES); n++) {
+ struct demux_cached_range *range = in->ranges[n];
+ if (range->seek_start != MP_NOPTS_VALUE) {
+ r->seek_ranges[r->num_seek_ranges++] =
+ (struct demux_seek_range){
+ .start = MP_ADD_PTS(range->seek_start, in->ts_offset),
+ .end = MP_ADD_PTS(range->seek_end, in->ts_offset),
+ };
+ r->bof_cached |= range->is_bof;
+ r->eof_cached |= range->is_eof;
+ }
+ }
+
+ mp_mutex_unlock(&in->lock);
+}
+
+bool demux_cancel_test(struct demuxer *demuxer)
+{
+ return mp_cancel_test(demuxer->cancel);
+}
+
+struct demux_chapter *demux_copy_chapter_data(struct demux_chapter *c, int num)
+{
+ struct demux_chapter *new = talloc_array(NULL, struct demux_chapter, num);
+ for (int n = 0; n < num; n++) {
+ new[n] = c[n];
+ new[n].metadata = mp_tags_dup(new, new[n].metadata);
+ }
+ return new;
+}
+
+static void visit_tags(void *ctx, void (*visit)(void *ctx, void *ta, char **s),
+ struct mp_tags *tags)
+{
+ for (int n = 0; n < (tags ? tags->num_keys : 0); n++)
+ visit(ctx, tags, &tags->values[n]);
+}
+
+static void visit_meta(struct demuxer *demuxer, void *ctx,
+ void (*visit)(void *ctx, void *ta, char **s))
+{
+ struct demux_internal *in = demuxer->in;
+
+ for (int n = 0; n < in->num_streams; n++) {
+ struct sh_stream *sh = in->streams[n];
+
+ visit(ctx, sh, &sh->title);
+ visit_tags(ctx, visit, sh->tags);
+ }
+
+ for (int n = 0; n < demuxer->num_chapters; n++)
+ visit_tags(ctx, visit, demuxer->chapters[n].metadata);
+
+ visit_tags(ctx, visit, demuxer->metadata);
+}
+
+
+static void visit_detect(void *ctx, void *ta, char **s)
+{
+ char **all = ctx;
+
+ if (*s)
+ *all = talloc_asprintf_append_buffer(*all, "%s\n", *s);
+}
+
+static void visit_convert(void *ctx, void *ta, char **s)
+{
+ struct demuxer *demuxer = ctx;
+ struct demux_internal *in = demuxer->in;
+
+ if (!*s)
+ return;
+
+ bstr data = bstr0(*s);
+ bstr conv = mp_iconv_to_utf8(in->log, data, in->meta_charset,
+ MP_ICONV_VERBOSE);
+ if (conv.start && conv.start != data.start) {
+ char *ns = conv.start; // 0-termination is guaranteed
+ // (The old string might not be an alloc, but if it is, it's a talloc
+ // child, and will not leak, even if it stays allocated uselessly.)
+ *s = ns;
+ talloc_steal(ta, *s);
+ }
+}
+
+static void demux_convert_tags_charset(struct demuxer *demuxer)
+{
+ struct demux_internal *in = demuxer->in;
+
+ char *cp = demuxer->opts->meta_cp;
+ if (!cp || mp_charset_is_utf8(cp))
+ return;
+
+ char *data = talloc_strdup(NULL, "");
+ visit_meta(demuxer, &data, visit_detect);
+
+ in->meta_charset = (char *)mp_charset_guess(in, in->log, bstr0(data), cp, 0);
+ if (in->meta_charset && !mp_charset_is_utf8(in->meta_charset)) {
+ MP_INFO(demuxer, "Using tag charset: %s\n", in->meta_charset);
+ visit_meta(demuxer, demuxer, visit_convert);
+ }
+
+ talloc_free(data);
+}
+
+static bool get_demux_sub_opts(int index, const struct m_sub_options **sub)
+{
+ if (!demuxer_list[index])
+ return false;
+ *sub = demuxer_list[index]->options;
+ return true;
+}