流复制的原理:
物理复制也叫流复制,流复制的原理是主库把wal发送给备库,备库接收wal后,进行重放。
逻辑复制的原理:
逻辑复制也是基于wal文件,在逻辑复制中把主库称为源端库,备库称为目标端数据库,源端数据库根据预先指定好的逻辑解析规则对wal文件进行解析,把dml操作解析成一定的逻辑变化信息(标准sql语句),源端数据库把标准sql语句发给目标端数据库,目标端数据库接收到之后进行应用,从而实现数据同步。
流复制和逻辑复制的区别:
流复制主库上的事务提交不需要等待备库接收到wal文件后的确认,逻辑复制相反。
流复制要求主备库的大版本一致,逻辑复制可以跨大版本的数据同步,也可以实现异构数据库的数据同步。
流复制的主库可读写,从库只允许读,逻辑复制的目标端数据库要求可读写
流复制是对实例级别的复制(整个postgresql数据库),逻辑复制是选择性的复制一些表,所以是对表级别的复制。
流复制有主库的ddl、dml操作,逻辑复制只有dml操作。
补充:postgresql 同步流复制原理和代码浅析
背景
数据库acid中的持久化如何实现
数据库acid里面的d,持久化。 指的是对于用户来说提交的事务,数据是可靠的,即使数据库crash了,在硬件完好的情况下,也能恢复回来。
postgresql是怎么做到的呢,看一幅图,画得比较丑,凑合看吧。
假设一个事务,对数据库做了一些操作,并且产生了一些脏数据,首先这些脏数据会在数据库的shared buffer中。
同时,产生这些脏数据的同时也会产生对应的redo信息,产生的redo会有对应的lsn号(你可以理解为redo 的虚拟地址空间的一个唯一的offset,每一笔redo都有),这个lsn号也会记录到shared buffer中对应的脏页中。
walwriter是负责将wal buffer flush到持久化设备的进程,同时它会更新一个全局变量,记录已经flush的最大的lsn号。
bgwriter是负责将shared buffer的脏页持久化到持久化设备的进程,它在flush时,除了要遵循lru算法之外,还要通过lsn全局变量的比对,来保证脏页对应的redo记录已经flush到持久化设备了,如果发现还对应的redo没有持久化,会触发wal writer去flush wal buffer。 (即确保日志比脏数据先落盘)
当用户提交事务时,也会产生一笔提交事务的redo,这笔redo也携带了lsn号。backend process 同样需要等待对应lsn flush到磁盘后才会返回给用户提交成功的信号。(保证日志先落盘,然后返回给用户)
数据库同步复制原理浅析
同步流复制,即保证standby节点和本地节点的日志双双落盘。
postgresql使用另一组全局变量,记录同步流复制节点已经接收到的xlog lsn,以及已经持久化的xlog lsn。
用户在发起提交请求后,backend process除了要判断本地wal有没有持久化,同时还需要判断同步流复制节点的xlog有没有接收到或持久化(通过synchronous_commit参数控制)。
如果同步流复制节点的xlog还没有接收或持久化,backend process会进入等待状态。
数据库同步复制代码浅析
对应的代码和解释如下:
committransaction @ src/backend/access/transam/xact.c recordtransactioncommit @ src/backend/access/transam/xact.c
/* * if we didn't create xlog entries, we're done here; otherwise we * should trigger flushing those entries the same as a commit record * would. this will primarily happen for hot pruning and the like; we * want these to be flushed to disk in due time. */ if (!wrote_xlog) // 没有产生redo的事务,直接返回 goto cleanup; if (wrote_xlog && markxidcommitted) // 如果产生了redo, 等待同步流复制 syncrepwaitforlsn(xactlastrecend);
syncrepwaitforlsn @ src/backend/replication/syncrep.c
/*
* wait for synchronous replication, if requested by user.
*
* initially backends start in state sync_rep_not_waiting and then
* change that state to sync_rep_waiting before adding ourselves
* to the wait queue. during syncrepwakequeue() a walsender changes
* the state to sync_rep_wait_complete once replication is confirmed.
* this backend then resets its state to sync_rep_not_waiting.
*/
void
syncrepwaitforlsn(xlogrecptr xactcommitlsn)
{
...
/*
* fast exit if user has not requested sync replication, or there are no
* sync replication standby names defined. note that those standbys don't
* need to be connected.
*/
if (!syncreprequested() || !syncstandbysdefined()) // 如果不是同步事务或者没有定义同步流复制节点,直接返回
return;
...
/*
* we don't wait for sync rep if walsndctl->sync_standbys_defined is not
* set. see syncrepupdatesyncstandbysdefined.
*
* also check that the standby hasn't already replied. unlikely race
* condition but we'll be fetching that cache line anyway so it's likely
* to be a low cost check.
*/
if (!walsndctl->sync_standbys_defined ||
xactcommitlsn <= walsndctl->lsn[mode]) // 如果没有定义同步流复制节点,或者判断到commit lsn小于已同步的lsn,说明xlog已经flush了,直接返回。
{
lwlockrelease(syncreplock);
return;
}
...
// 进入循环等待状态,说明本地的xlog已经flush了,只是等待同步流复制节点的redo同步状态。
/*
* wait for specified lsn to be confirmed.
*
* each proc has its own wait latch, so we perform a normal latch
* check/wait loop here.
*/
for (;;) // 进入等待状态,检查latch是否满足释放等待的条件(wal sender会根据redo的同步情况,实时更新对应的latch)
{
int syncrepstate;
/* must reset the latch before testing state. */
resetlatch(&myproc->proclatch);
syncrepstate = myproc->syncrepstate;
if (syncrepstate == sync_rep_waiting)
{
lwlockacquire(syncreplock, lw_shared);
syncrepstate = myproc->syncrepstate;
lwlockrelease(syncreplock);
}
if (syncrepstate == sync_rep_wait_complete) // 说明xlog同步完成,退出等待
break;
// 如果本地进程挂了,输出的消息内容是,本地事务信息已持久化,但是远程也许还没有持久化
if (procdiepending)
{
ereport(warning,
(errcode(errcode_admin_shutdown),
errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
errdetail("the transaction has already committed locally, but might not have been replicated to the standby.")));
wheretosendoutput = destnone;
syncrepcancelwait();
break;
}
// 如果用户主动cancel query,输出的消息内容是,本地事务信息已持久化,但是远程也许还没有持久化
if (querycancelpending)
{
querycancelpending = false;
ereport(warning,
(errmsg("canceling wait for synchronous replication due to user request"),
errdetail("the transaction has already committed locally, but might not have been replicated to the standby.")));
syncrepcancelwait();
break;
}
// 如果postgres主进程挂了,进入退出流程。
if (!postmasterisalive())
{
procdiepending = true;
wheretosendoutput = destnone;
syncrepcancelwait();
break;
}
// 等待wal sender来修改对应的latch
/*
* wait on latch. any condition that should wake us up will set the
* latch, so no need for timeout.
*/
waitlatch(&myproc->proclatch, wl_latch_set | wl_postmaster_death, -1);
注意用户进入等待状态后,只有主动cancel , 或者kill(terminate) , 或者主进程die才能退出无限的等待状态。后面会讲到如何将同步级别降级为异步。
前面提到了,用户端需要等待latch的释放信号。
那么谁来给它这个信号了,是wal sender进程,源码和解释如下 :
src/backend/replication/walsender.c
startreplication
walsndloop
processrepliesifany
processstandbymessage
processstandbyreplymessage
if (!am_cascading_walsender) // 非级联流复制节点,那么它将调用syncrepreleasewaiters修改backend process等待队列中它们对应的 latch。
syncrepreleasewaiters();
syncrepreleasewaiters @ src/backend/replication/syncrep.c
/*
* update the lsns on each queue based upon our latest state. this
* implements a simple policy of first-valid-standby-releases-waiter.
*
* other policies are possible, which would change what we do here and what
* perhaps also which information we store as well.
*/
void
syncrepreleasewaiters(void)
{
...
// 释放满足条件的等待队列
/*
* set the lsn first so that when we wake backends they will release up to
* this location.
*/
if (walsndctl->lsn[sync_rep_wait_write] < mywalsnd->write)
{
walsndctl->lsn[sync_rep_wait_write] = mywalsnd->write;
numwrite = syncrepwakequeue(false, sync_rep_wait_write);
}
if (walsndctl->lsn[sync_rep_wait_flush] < mywalsnd->flush)
{
walsndctl->lsn[sync_rep_wait_flush] = mywalsnd->flush;
numflush = syncrepwakequeue(false, sync_rep_wait_flush);
}
...
syncrepwakequeue @ src/backend/replication/syncrep.c
/*
* walk the specified queue from head. set the state of any backends that
* need to be woken, remove them from the queue, and then wake them.
* pass all = true to wake whole queue; otherwise, just wake up to
* the walsender's lsn.
*
* must hold syncreplock.
*/
static int
syncrepwakequeue(bool all, int mode)
{
...
while (proc) // 修改对应的backend process 的latch
{
/*
* assume the queue is ordered by lsn
*/
if (!all && walsndctl->lsn[mode] < proc->waitlsn)
return numprocs;
/*
* move to next proc, so we can delete thisproc from the queue.
* thisproc is valid, proc may be null after this.
*/
thisproc = proc;
proc = (pgproc *) shmqueuenext(&(walsndctl->syncrepqueue[mode]),
&(proc->syncreplinks),
offsetof(pgproc, syncreplinks));
/*
* set state to complete; see syncrepwaitforlsn() for discussion of
* the various states.
*/
thisproc->syncrepstate = sync_rep_wait_complete; // 满足条件时,改成sync_rep_wait_complete
....
如何设置事务可靠性级别
postgresql 支持在会话中设置事务的可靠性级别。
off 表示commit 时不需要等待wal 持久化。
local 表示commit 是只需要等待本地数据库的wal 持久化。
remote_write 表示commit 需要等待本地数据库的wal 持久化,同时需要等待sync standby节点wal write buffer完成(不需要持久化)。
on 表示commit 需要等待本地数据库的wal 持久化,同时需要等待sync standby节点wal持久化。
提醒一点, synchronous_commit 的任何一种设置,都不影响wal日志持久化必须先于shared buffer脏数据持久化。 所以不管你怎么设置,都不好影响数据的一致性。
synchronous_commit = off # synchronization level;
# off, local, remote_write, or on
如何实现同步复制降级
从前面的代码解析可以得知,如果 backend process 进入了等待循环,只接受几种信号降级。 并且降级后会告警,表示本地wal已持久化,但是sync standby节点不确定wal有没有持久化。
如果你只配置了1个standby,并且将它配置为同步流复制节点。一旦出现网络抖动,或者sync standby节点故障,将导致同步事务进入等待状态。
怎么降级呢?
方法1.
修改配置文件并重置
$ vi postgresql.conf synchronous_commit = local $ pg_ctl reload
然后cancel 所有query .
postgres=# select pg_cancel_backend(pid) from pg_stat_activity where pid<>pg_backend_pid();
收到这样的信号,表示事务成功提交,同时表示wal不知道有没有同步到sync standby。
warning: canceling wait for synchronous replication due to user request detail: the transaction has already committed locally, but might not have been replicated to the standby. commit postgres=# show synchronous_commit ; synchronous_commit -------------------- off (1 row)
同时它会读到全局变量synchronous_commit 已经是 local了。
这样就完成了降级的动作。
方法2.
方法1的降级需要对已有的正在等待wal sync的pid使用cancel进行处理,有点不人性化。
可以通过修改代码的方式,做到更人性化。
syncrepwaitforlsn for循环中,加一个判断,如果发现全局变量sync commit变成local, off了,则告警并退出。这样就不需要人为的去cancel query了.
warning: canceling wait for synchronous replication due to user request
detail: the transaction has already committed locally, but might not have been replicated to the standby.
以上为个人经验,希望能给大家一个参考,也希望大家多多支持www.887551.com。如有错误或未考虑完全的地方,望不吝赐教。