1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
|
/*-------------------------------------------------------------------------
*
* walreceiver.h
* Exports from replication/walreceiverfuncs.c.
*
* Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
*
* src/include/replication/walreceiver.h
*
*-------------------------------------------------------------------------
*/
#ifndef _WALRECEIVER_H
#define _WALRECEIVER_H
#include "access/xlog.h"
#include "access/xlogdefs.h"
#include "getaddrinfo.h" /* for NI_MAXHOST */
#include "pgtime.h"
#include "port/atomics.h"
#include "replication/logicalproto.h"
#include "replication/walsender.h"
#include "storage/condition_variable.h"
#include "storage/latch.h"
#include "storage/spin.h"
#include "utils/tuplestore.h"
/* user-settable parameters */
extern PGDLLIMPORT int wal_receiver_status_interval;
extern PGDLLIMPORT int wal_receiver_timeout;
extern PGDLLIMPORT bool hot_standby_feedback;
/*
* MAXCONNINFO: maximum size of a connection string.
*
* XXX: Should this move to pg_config_manual.h?
*/
#define MAXCONNINFO 1024
/* Can we allow the standby to accept replication connection from another standby? */
#define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0)
/*
* Values for WalRcv->walRcvState.
*/
typedef enum
{
WALRCV_STOPPED, /* stopped and mustn't start up again */
WALRCV_STARTING, /* launched, but the process hasn't
* initialized yet */
WALRCV_STREAMING, /* walreceiver is streaming */
WALRCV_WAITING, /* stopped streaming, waiting for orders */
WALRCV_RESTARTING, /* asked to restart streaming */
WALRCV_STOPPING /* requested to stop, but still running */
} WalRcvState;
/* Shared memory area for management of walreceiver process */
typedef struct
{
/*
* PID of currently active walreceiver process, its current state and
* start time (actually, the time at which it was requested to be
* started).
*/
pid_t pid;
WalRcvState walRcvState;
ConditionVariable walRcvStoppedCV;
pg_time_t startTime;
/*
* receiveStart and receiveStartTLI indicate the first byte position and
* timeline that will be received. When startup process starts the
* walreceiver, it sets these to the point where it wants the streaming to
* begin.
*/
XLogRecPtr receiveStart;
TimeLineID receiveStartTLI;
/*
* flushedUpto-1 is the last byte position that has already been received,
* and receivedTLI is the timeline it came from. At the first startup of
* walreceiver, these are set to receiveStart and receiveStartTLI. After
* that, walreceiver updates these whenever it flushes the received WAL to
* disk.
*/
XLogRecPtr flushedUpto;
TimeLineID receivedTLI;
/*
* latestChunkStart is the starting byte position of the current "batch"
* of received WAL. It's actually the same as the previous value of
* flushedUpto before the last flush to disk. Startup process can use
* this to detect whether it's keeping up or not.
*/
XLogRecPtr latestChunkStart;
/*
* Time of send and receive of any message received.
*/
TimestampTz lastMsgSendTime;
TimestampTz lastMsgReceiptTime;
/*
* Latest reported end of WAL on the sender
*/
XLogRecPtr latestWalEnd;
TimestampTz latestWalEndTime;
/*
* connection string; initially set to connect to the primary, and later
* clobbered to hide security-sensitive fields.
*/
char conninfo[MAXCONNINFO];
/*
* Host name (this can be a host name, an IP address, or a directory path)
* and port number of the active replication connection.
*/
char sender_host[NI_MAXHOST];
int sender_port;
/*
* replication slot name; is also used for walreceiver to connect with the
* primary
*/
char slotname[NAMEDATALEN];
/*
* If it's a temporary replication slot, it needs to be recreated when
* connecting.
*/
bool is_temp_slot;
/* set true once conninfo is ready to display (obfuscated pwds etc) */
bool ready_to_display;
/*
* Latch used by startup process to wake up walreceiver after telling it
* where to start streaming (after setting receiveStart and
* receiveStartTLI), and also to tell it to send apply feedback to the
* primary whenever specially marked commit records are applied. This is
* normally mapped to procLatch when walreceiver is running.
*/
Latch *latch;
slock_t mutex; /* locks shared variables shown above */
/*
* Like flushedUpto, but advanced after writing and before flushing,
* without the need to acquire the spin lock. Data can be read by another
* process up to this point, but shouldn't be used for data integrity
* purposes.
*/
pg_atomic_uint64 writtenUpto;
/*
* force walreceiver reply? This doesn't need to be locked; memory
* barriers for ordering are sufficient. But we do need atomic fetch and
* store semantics, so use sig_atomic_t.
*/
sig_atomic_t force_reply; /* used as a bool */
} WalRcvData;
extern PGDLLIMPORT WalRcvData *WalRcv;
typedef struct
{
bool logical; /* True if this is logical replication stream,
* false if physical stream. */
char *slotname; /* Name of the replication slot or NULL. */
XLogRecPtr startpoint; /* LSN of starting point. */
union
{
struct
{
TimeLineID startpointTLI; /* Starting timeline */
} physical;
struct
{
uint32 proto_version; /* Logical protocol version */
List *publication_names; /* String list of publications */
bool binary; /* Ask publisher to use binary */
bool streaming; /* Streaming of large transactions */
bool twophase; /* Streaming of two-phase transactions at
* prepare time */
} logical;
} proto;
} WalRcvStreamOptions;
struct WalReceiverConn;
typedef struct WalReceiverConn WalReceiverConn;
/*
* Status of walreceiver query execution.
*
* We only define statuses that are currently used.
*/
typedef enum
{
WALRCV_ERROR, /* There was error when executing the query. */
WALRCV_OK_COMMAND, /* Query executed utility or replication
* command. */
WALRCV_OK_TUPLES, /* Query returned tuples. */
WALRCV_OK_COPY_IN, /* Query started COPY FROM. */
WALRCV_OK_COPY_OUT, /* Query started COPY TO. */
WALRCV_OK_COPY_BOTH /* Query started COPY BOTH replication
* protocol. */
} WalRcvExecStatus;
/*
* Return value for walrcv_exec, returns the status of the execution and
* tuples if any.
*/
typedef struct WalRcvExecResult
{
WalRcvExecStatus status;
int sqlstate;
char *err;
Tuplestorestate *tuplestore;
TupleDesc tupledesc;
} WalRcvExecResult;
/* WAL receiver - libpqwalreceiver hooks */
/*
* walrcv_connect_fn
*
* Establish connection to a cluster. 'logical' is true if the
* connection is logical, and false if the connection is physical.
* 'appname' is a name associated to the connection, to use for example
* with fallback_application_name or application_name. Returns the
* details about the connection established, as defined by
* WalReceiverConn for each WAL receiver module. On error, NULL is
* returned with 'err' including the error generated.
*/
typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo,
bool logical,
const char *appname,
char **err);
/*
* walrcv_check_conninfo_fn
*
* Parse and validate the connection string given as of 'conninfo'.
*/
typedef void (*walrcv_check_conninfo_fn) (const char *conninfo);
/*
* walrcv_get_conninfo_fn
*
* Returns a user-displayable conninfo string. Note that any
* security-sensitive fields should be obfuscated.
*/
typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
/*
* walrcv_get_senderinfo_fn
*
* Provide information of the WAL sender this WAL receiver is connected
* to, as of 'sender_host' for the host of the sender and 'sender_port'
* for its port.
*/
typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
char **sender_host,
int *sender_port);
/*
* walrcv_identify_system_fn
*
* Run IDENTIFY_SYSTEM on the cluster connected to and validate the
* identity of the cluster. Returns the system ID of the cluster
* connected to. 'primary_tli' is the timeline ID of the sender.
*/
typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
TimeLineID *primary_tli);
/*
* walrcv_server_version_fn
*
* Returns the version number of the cluster connected to.
*/
typedef int (*walrcv_server_version_fn) (WalReceiverConn *conn);
/*
* walrcv_readtimelinehistoryfile_fn
*
* Fetch from cluster the timeline history file for timeline 'tli'.
* Returns the name of the timeline history file as of 'filename', its
* contents as of 'content' and its 'size'.
*/
typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn,
TimeLineID tli,
char **filename,
char **content,
int *size);
/*
* walrcv_startstreaming_fn
*
* Start streaming WAL data from given streaming options. Returns true
* if the connection has switched successfully to copy-both mode and false
* if the server received the command and executed it successfully, but
* didn't switch to copy-mode.
*/
typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn,
const WalRcvStreamOptions *options);
/*
* walrcv_endstreaming_fn
*
* Stop streaming of WAL data. Returns the next timeline ID of the cluster
* connected to in 'next_tli', or 0 if there was no report.
*/
typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn,
TimeLineID *next_tli);
/*
* walrcv_receive_fn
*
* Receive a message available from the WAL stream. 'buffer' is a pointer
* to a buffer holding the message received. Returns the length of the data,
* 0 if no data is available yet ('wait_fd' is a socket descriptor which can
* be waited on before a retry), and -1 if the cluster ended the COPY.
*/
typedef int (*walrcv_receive_fn) (WalReceiverConn *conn,
char **buffer,
pgsocket *wait_fd);
/*
* walrcv_send_fn
*
* Send a message of size 'nbytes' to the WAL stream with 'buffer' as
* contents.
*/
typedef void (*walrcv_send_fn) (WalReceiverConn *conn,
const char *buffer,
int nbytes);
/*
* walrcv_create_slot_fn
*
* Create a new replication slot named 'slotname'. 'temporary' defines
* if the slot is temporary. 'snapshot_action' defines the behavior wanted
* for an exported snapshot (see replication protocol for more details).
* 'lsn' includes the LSN position at which the created slot became
* consistent. Returns the name of the exported snapshot for a logical
* slot, or NULL for a physical slot.
*/
typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
const char *slotname,
bool temporary,
bool two_phase,
CRSSnapshotAction snapshot_action,
XLogRecPtr *lsn);
/*
* walrcv_get_backend_pid_fn
*
* Returns the PID of the remote backend process.
*/
typedef pid_t (*walrcv_get_backend_pid_fn) (WalReceiverConn *conn);
/*
* walrcv_exec_fn
*
* Send generic queries (and commands) to the remote cluster. 'nRetTypes'
* is the expected number of returned attributes, and 'retTypes' an array
* including their type OIDs. Returns the status of the execution and
* tuples if any.
*/
typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
const char *query,
const int nRetTypes,
const Oid *retTypes);
/*
* walrcv_disconnect_fn
*
* Disconnect with the cluster.
*/
typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
typedef struct WalReceiverFunctionsType
{
walrcv_connect_fn walrcv_connect;
walrcv_check_conninfo_fn walrcv_check_conninfo;
walrcv_get_conninfo_fn walrcv_get_conninfo;
walrcv_get_senderinfo_fn walrcv_get_senderinfo;
walrcv_identify_system_fn walrcv_identify_system;
walrcv_server_version_fn walrcv_server_version;
walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
walrcv_startstreaming_fn walrcv_startstreaming;
walrcv_endstreaming_fn walrcv_endstreaming;
walrcv_receive_fn walrcv_receive;
walrcv_send_fn walrcv_send;
walrcv_create_slot_fn walrcv_create_slot;
walrcv_get_backend_pid_fn walrcv_get_backend_pid;
walrcv_exec_fn walrcv_exec;
walrcv_disconnect_fn walrcv_disconnect;
} WalReceiverFunctionsType;
extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
#define walrcv_connect(conninfo, logical, appname, err) \
WalReceiverFunctions->walrcv_connect(conninfo, logical, appname, err)
#define walrcv_check_conninfo(conninfo) \
WalReceiverFunctions->walrcv_check_conninfo(conninfo)
#define walrcv_get_conninfo(conn) \
WalReceiverFunctions->walrcv_get_conninfo(conn)
#define walrcv_get_senderinfo(conn, sender_host, sender_port) \
WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
#define walrcv_identify_system(conn, primary_tli) \
WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
#define walrcv_server_version(conn) \
WalReceiverFunctions->walrcv_server_version(conn)
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
#define walrcv_startstreaming(conn, options) \
WalReceiverFunctions->walrcv_startstreaming(conn, options)
#define walrcv_endstreaming(conn, next_tli) \
WalReceiverFunctions->walrcv_endstreaming(conn, next_tli)
#define walrcv_receive(conn, buffer, wait_fd) \
WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
#define walrcv_send(conn, buffer, nbytes) \
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \
WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
#define walrcv_get_backend_pid(conn) \
WalReceiverFunctions->walrcv_get_backend_pid(conn)
#define walrcv_exec(conn, exec, nRetTypes, retTypes) \
WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_disconnect(conn) \
WalReceiverFunctions->walrcv_disconnect(conn)
static inline void
walrcv_clear_result(WalRcvExecResult *walres)
{
if (!walres)
return;
if (walres->err)
pfree(walres->err);
if (walres->tuplestore)
tuplestore_end(walres->tuplestore);
if (walres->tupledesc)
FreeTupleDesc(walres->tupledesc);
pfree(walres);
}
/* prototypes for functions in walreceiver.c */
extern void WalReceiverMain(void) pg_attribute_noreturn();
extern void ProcessWalRcvInterrupts(void);
/* prototypes for functions in walreceiverfuncs.c */
extern Size WalRcvShmemSize(void);
extern void WalRcvShmemInit(void);
extern void ShutdownWalRcv(void);
extern bool WalRcvStreaming(void);
extern bool WalRcvRunning(void);
extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
const char *conninfo, const char *slotname,
bool create_temp_slot);
extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
extern XLogRecPtr GetWalRcvWriteRecPtr(void);
extern int GetReplicationApplyDelay(void);
extern int GetReplicationTransferLatency(void);
extern void WalRcvForceReply(void);
#endif /* _WALRECEIVER_H */
|