diff options
Diffstat (limited to 'src/backend/backup/basebackup_throttle.c')
-rw-r--r-- | src/backend/backup/basebackup_throttle.c | 199 |
1 files changed, 199 insertions, 0 deletions
diff --git a/src/backend/backup/basebackup_throttle.c b/src/backend/backup/basebackup_throttle.c new file mode 100644 index 0000000..62ba732 --- /dev/null +++ b/src/backend/backup/basebackup_throttle.c @@ -0,0 +1,199 @@ +/*------------------------------------------------------------------------- + * + * basebackup_throttle.c + * Basebackup sink implementing throttling. Data is forwarded to the + * next base backup sink in the chain at a rate no greater than the + * configured maximum. + * + * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/backup/basebackup_throttle.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "backup/basebackup_sink.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "storage/latch.h" +#include "utils/timestamp.h" + +typedef struct bbsink_throttle +{ + /* Common information for all types of sink. */ + bbsink base; + + /* The actual number of bytes, transfer of which may cause sleep. */ + uint64 throttling_sample; + + /* Amount of data already transferred but not yet throttled. */ + int64 throttling_counter; + + /* The minimum time required to transfer throttling_sample bytes. */ + TimeOffset elapsed_min_unit; + + /* The last check of the transfer rate. */ + TimestampTz throttled_last; +} bbsink_throttle; + +static void bbsink_throttle_begin_backup(bbsink *sink); +static void bbsink_throttle_archive_contents(bbsink *sink, size_t len); +static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len); +static void throttle(bbsink_throttle *sink, size_t increment); + +static const bbsink_ops bbsink_throttle_ops = { + .begin_backup = bbsink_throttle_begin_backup, + .begin_archive = bbsink_forward_begin_archive, + .archive_contents = bbsink_throttle_archive_contents, + .end_archive = bbsink_forward_end_archive, + .begin_manifest = bbsink_forward_begin_manifest, + .manifest_contents = bbsink_throttle_manifest_contents, + .end_manifest = bbsink_forward_end_manifest, + .end_backup = bbsink_forward_end_backup, + .cleanup = bbsink_forward_cleanup +}; + +/* + * How frequently to throttle, as a fraction of the specified rate-second. + */ +#define THROTTLING_FREQUENCY 8 + +/* + * Create a new basebackup sink that performs throttling and forwards data + * to a successor sink. + */ +bbsink * +bbsink_throttle_new(bbsink *next, uint32 maxrate) +{ + bbsink_throttle *sink; + + Assert(next != NULL); + Assert(maxrate > 0); + + sink = palloc0(sizeof(bbsink_throttle)); + *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops; + sink->base.bbs_next = next; + + sink->throttling_sample = + (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY; + + /* + * The minimum amount of time for throttling_sample bytes to be + * transferred. + */ + sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY; + + return &sink->base; +} + +/* + * There's no real work to do here, but we need to record the current time so + * that it can be used for future calculations. + */ +static void +bbsink_throttle_begin_backup(bbsink *sink) +{ + bbsink_throttle *mysink = (bbsink_throttle *) sink; + + bbsink_forward_begin_backup(sink); + + /* The 'real data' starts now (header was ignored). */ + mysink->throttled_last = GetCurrentTimestamp(); +} + +/* + * First throttle, and then pass archive contents to next sink. + */ +static void +bbsink_throttle_archive_contents(bbsink *sink, size_t len) +{ + throttle((bbsink_throttle *) sink, len); + + bbsink_forward_archive_contents(sink, len); +} + +/* + * First throttle, and then pass manifest contents to next sink. + */ +static void +bbsink_throttle_manifest_contents(bbsink *sink, size_t len) +{ + throttle((bbsink_throttle *) sink, len); + + bbsink_forward_manifest_contents(sink, len); +} + +/* + * Increment the network transfer counter by the given number of bytes, + * and sleep if necessary to comply with the requested network transfer + * rate. + */ +static void +throttle(bbsink_throttle *sink, size_t increment) +{ + TimeOffset elapsed_min; + + Assert(sink->throttling_counter >= 0); + + sink->throttling_counter += increment; + if (sink->throttling_counter < sink->throttling_sample) + return; + + /* How much time should have elapsed at minimum? */ + elapsed_min = sink->elapsed_min_unit * + (sink->throttling_counter / sink->throttling_sample); + + /* + * Since the latch could be set repeatedly because of concurrently WAL + * activity, sleep in a loop to ensure enough time has passed. + */ + for (;;) + { + TimeOffset elapsed, + sleep; + int wait_result; + + /* Time elapsed since the last measurement (and possible wake up). */ + elapsed = GetCurrentTimestamp() - sink->throttled_last; + + /* sleep if the transfer is faster than it should be */ + sleep = elapsed_min - elapsed; + if (sleep <= 0) + break; + + ResetLatch(MyLatch); + + /* We're eating a potentially set latch, so check for interrupts */ + CHECK_FOR_INTERRUPTS(); + + /* + * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be + * the maximum time to sleep. Thus the cast to long is safe. + */ + wait_result = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + (long) (sleep / 1000), + WAIT_EVENT_BASE_BACKUP_THROTTLE); + + if (wait_result & WL_LATCH_SET) + CHECK_FOR_INTERRUPTS(); + + /* Done waiting? */ + if (wait_result & WL_TIMEOUT) + break; + } + + /* + * As we work with integers, only whole multiple of throttling_sample was + * processed. The rest will be done during the next call of this function. + */ + sink->throttling_counter %= sink->throttling_sample; + + /* + * Time interval for the remaining amount and possible next increments + * starts now. + */ + sink->throttled_last = GetCurrentTimestamp(); +} |