Author:
Logical Decoding是9.4里面的一个主要功能,是向最终实现逻辑复制迈出的一大步。简言之,它的功能是从PG的WAL日志中,读取数据库更新信息,然后“翻译”(Decode)成逻辑的形式,可发送到远程从库做数据同步。这个功能还可以用于,DBA在数据库宕机,并发生主从切换后,检查原主库有哪些更新宕机前未同步到从库,并手动同步来弥补丢失的(已提交)的更新。这里我们探索一下它的使用和实现原理。
使用
1)首先需要将 wal_level这个配置参数设置为logical,并保证max_replication_slots至少为1。
2)创建Logical Replication Slot。Logical Decoding利用了Logical Replication Slot来获取和Decode日志。关于Physical Replication Slot我们在上期中有详细介绍,而Logical Replication Slot与Physical Replication Slot的数据结构类似。创建一个Logical Replication Slot的命令如下:
SELECT * FROM pg_create_logical_replication_slot('my_rep_slot', 'test_decoding');
slot_name | xlog_position
-------------+---------------
my_rep_slot | 0/7FE68E8
select * from pg_replication_slots;
slot_name | plugin | slot_type | datoid | database | active | xmin | catalog_xmin | restart_lsn
---------------+---------------+-----------+--------+----------+--------+------+--------------+-------------
my_rep_slot | test_decoding | logical | 13003 | postgres | f | | 9735 | 0/7FE6828
注意,创建Logical Replication Slot,需要指定一个输出插件(Output Plugin)。这个插件要提供一些回调函数,用于格式化输出日志。就是说,内核中的Logical Decoding先读取WAL日志,将其Decode成一种半成品式的格式(已包含所有有效信息,比如被更新的表名,更新类型,更新前后的数据记录即Tuple),然后交由输出插件最终呈现给用户。这里我们使用了系统自带的一个插件,即test_decoding。上面的输出中,创建命令返回的xlog_position的值是当前系统中,最后被写入磁盘的日志记录的LSN。此LSN之后的日志,都可以通过Logical Decoding进行解析了。
3)解析日志。
--先做一个执行插入操作的事务:
begin;
insert into test values(2);
commit;
--使用pg_logical_slot_peek_changes,Decode对应的WAL日志。
SELECT * FROM pg_logical_slot_peek_changes('my_rep_slot', NULL, NULL, 'include-timestamp', 'on', 'include-xids', 'on');
location | xid | data
-----------+------+------------------------------------------------
0/7FE6C80 | 9736 | BEGIN 9736
0/7FE6C80 | 9736 | table public.test: INSERT: col[integer]:2
0/7FE6DE0 | 9736 | COMMIT 9736 (at 2015-03-13 15:03:49.803582+08)
pg_logical_slot_peek_changes返回数据中的第二行记录了我们所做的INSERT操作(只有在事务提交后,才能看到这些修改)。而我们通过pg_xlogdump可以看到原来的WAL日志记录为:
<PG installdir>/bin/pg_xlogdump -s 0/7FE6C80 -n 1
rmgr: Heap len (rec/tot): 31/ 219, tx: 9736, lsn: 0/07FE6C80, prev 0/07FE6C48, bkp: 1000, desc: insert: rel 1663/13003/16507; tid 0/3
也就是说,Logical Decoding把这条日志,反解析成一个“table public.test: INSERT: col[integer]:2”字符串。其实如果对输出插件稍作修改,可以直接解析成可执行的SQL语句:“INSERT INTO public.test (col) VALUES(2)“
那么这是如何做到的呢?下面我们看看其中原理。
原理
追踪一下pg_logical_slot_peek_changes的调用链,不难看到Decoding的整个过程。在pg_logical_slot_get_changes_guts中,从restart_lsn(即上次的最后读取后,剩下的事务中最先开始的事务对应的LSN)开始,先用XLogReadRecord函数(注意,会先从cache里面读取日志,如果cache里面没有,则会到磁盘中的日志段里面读取)获取一个日志记录,存入结构体XLogRecord,紧接着用LogicalDecodingProcessRecord做Decode。如此循环,直到读完日志或到达指定点。
LogicalDecodingProcessRecord是解析日志的关键。它在内存中维护一个哈希表(LogicalDecodingContext->reorder->by_txn),存放正在处理的事务信息。在处理每个日志记录时,如果遇到一个BEGIN操作,就在哈希表中插入相应事务。而只有在遇到COMMIT操作的时候,才会把整个事务的所有语句解析出来(调用ReorderBufferCommit)。这个过程中,它要为每个事务维护一个快照(Snapshot)。每次有事务做COMMIT都要更新一下这个快照。这样,等到事务COMMIT时,它的快照是最新的,可以用来访问系统表,得到如relation node id与relation名字之间的对应关系等信息,从而完成Decode。需要说明的是,LogicalDecodingProcessRecord在维护快照时做了优化:因为Decode过程只需要访问系统表,所以快照中只保留了那些更新了系统表的事务。
另外,Replication Slot的xmin信息会影响系统的Vacuum,使其保留仍然需要的数据版本。而SnapBuildProcessRunningXacts会不断更新Replication Slot中的xmin信息,避免使Vacuum停滞。
输出Decode后日志的过程,都在DecodeCommit调用的ReorderBufferCommit函数中。在ReorderBufferCommit中,调用了输出插件的apply_change等回调函数,会将日志信息打印成我们最终看到的字符串,这样就完成了Decode。