博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
PostgreSQL 9.4 logical replicatoin patch: logical changeset generation v5
阅读量:6388 次
发布时间:2019-06-23

本文共 22420 字,大约阅读时间需要 74 分钟。

PostgreSQL 的xlog逻辑解析补丁, 可以从xlog中解出数据库中执行的SQL, 但是从测试情况来看目前不支持DDL的解析, 只有DML的解析.
未来PostgreSQL可以基于此增加基于SQL复制的功能. 
补丁比较多, 有些是单独提交的, 本文将简单的测试一下.

下载PostgreSQL源码, 注意最好使用补丁释放前一天的版本 : 

下载补丁
mkdir patchcd patchwget http://www.postgresql.org/message-id/attachment/29336/0001-Add-support-for-multiple-kinds-of-external-toast-dat.patch.gzwget http://www.postgresql.org/message-id/attachment/29337/0002-wal_decoding-Add-pg_xlog_wait_remote_-apply-receive-.patch.gzwget http://www.postgresql.org/message-id/attachment/29338/0003-wal_decoding-Add-a-new-RELFILENODE-syscache-to-fetch.patch.gzwget http://www.postgresql.org/message-id/attachment/29339/0004-wal_decoding-Add-RelationMapFilenodeToOid-function-t.patch.gzwget http://www.postgresql.org/message-id/attachment/29340/0005-wal_decoding-Add-pg_relation_by_filenode-to-lookup-u.patch.gzwget http://www.postgresql.org/message-id/attachment/29341/0006-wal_decoding-Introduce-InvalidCommandId-and-declare-.patch.gzwget http://www.postgresql.org/message-id/attachment/29342/0007-wal_decoding-Adjust-all-Satisfies-routines-to-take-a.patch.gzwget http://www.postgresql.org/message-id/attachment/29343/0008-wal_decoding-Allow-walsender-s-to-connect-to-a-speci.patch.gzwget http://www.postgresql.org/message-id/attachment/29344/0009-wal_decoding-Add-alreadyLocked-parameter-to-GetOldes.patch.gzwget http://www.postgresql.org/message-id/attachment/29345/0010-wal_decoding-Log-xl_running_xact-s-at-a-higher-frequ.patch.gzwget http://www.postgresql.org/message-id/attachment/29346/0011-wal_decoding-copydir-make-fsync_fname-public.patch.gzwget http://www.postgresql.org/message-id/attachment/29347/0012-wal_decoding-Add-information-about-a-tables-primary-.patch.gzwget http://www.postgresql.org/message-id/attachment/29348/0013-wal_decoding-Introduce-wal-decoding-via-catalog-time.patch.gzwget http://www.postgresql.org/message-id/attachment/29349/0014-wal_decoding-test_decoding-Add-a-simple-decoding-mod.patch.gzwget http://www.postgresql.org/message-id/attachment/29350/0015-wal_decoding-pg_receivellog-Introduce-pg_receivexlog.patch.gzwget http://www.postgresql.org/message-id/attachment/29351/0016-wal_decoding-test_logical_decoding-Add-extension-for.patch.gzwget http://www.postgresql.org/message-id/attachment/29352/0017-wal_decoding-design-document-v2.4-and-snapshot-build.patch.gzwget http://www.postgresql.org/message-id/attachment/29390/0001-wal_decoding-mergme-Fix-pg_basebackup-makefile.patchwget http://www.postgresql.org/message-id/attachment/29391/0002-wal_decoding-mergme-Fix-test_logical_decoding-Makefi.patchwget http://www.postgresql.org/message-id/attachment/29564/0001-wal_decoding-mergme-Don-t-use-out-of-scope-local-var.patchgunzip *.gz
打补丁
tar -zxvf postgresql-bab54e3.tar.gzcd postgresql-bab54e3patch -p1 < ../patch/0001-Add-support-for-multiple-kinds-of-external-toast-dat.patchpatch -p1 < ../patch/0002-wal_decoding-Add-pg_xlog_wait_remote_-apply-receive-.patchpatch -p1 < ../patch/0003-wal_decoding-Add-a-new-RELFILENODE-syscache-to-fetch.patchpatch -p1 < ../patch/0004-wal_decoding-Add-RelationMapFilenodeToOid-function-t.patchpatch -p1 < ../patch/0005-wal_decoding-Add-pg_relation_by_filenode-to-lookup-u.patchpatch -p1 < ../patch/0006-wal_decoding-Introduce-InvalidCommandId-and-declare-.patchpatch -p1 < ../patch/0007-wal_decoding-Adjust-all-Satisfies-routines-to-take-a.patchpatch -p1 < ../patch/0008-wal_decoding-Allow-walsender-s-to-connect-to-a-speci.patchpatch -p1 < ../patch/0009-wal_decoding-Add-alreadyLocked-parameter-to-GetOldes.patchpatch -p1 < ../patch/0010-wal_decoding-Log-xl_running_xact-s-at-a-higher-frequ.patchpatch -p1 < ../patch/0011-wal_decoding-copydir-make-fsync_fname-public.patchpatch -p1 < ../patch/0012-wal_decoding-Add-information-about-a-tables-primary-.patchpatch -p1 < ../patch/0013-wal_decoding-Introduce-wal-decoding-via-catalog-time.patchpatch -p1 < ../patch/0014-wal_decoding-test_decoding-Add-a-simple-decoding-mod.patchpatch -p1 < ../patch/0015-wal_decoding-pg_receivellog-Introduce-pg_receivexlog.patchpatch -p1 < ../patch/0016-wal_decoding-test_logical_decoding-Add-extension-for.patchpatch -p1 < ../patch/0017-wal_decoding-design-document-v2.4-and-snapshot-build.patchpatch -p1 < ../patch/0001-wal_decoding-mergme-Fix-pg_basebackup-makefile.patchpatch -p1 < ../patch/0002-wal_decoding-mergme-Fix-test_logical_decoding-Makefi.patchpatch -p1 < ../patch/0001-wal_decoding-mergme-Don-t-use-out-of-scope-local-var.patch
确保没有FAILED的输出.

编译安装
cd postgresql-bab54e3./configure --prefix=/home/pg94/pgsql9.4devel --with-pgport=1921 --with-perl --with-tcl --with-python --with-openssl --with-pam --without-ldap --with-libxml --with-libxslt --enable-thread-safety --with-wal-blocksize=16 && gmake && gmake install
报错
snapbuild.c:206: error: redefinition of typedef ‘SnapBuild’../../../../src/include/replication/snapbuild.h:54: error: previous declaration of ‘SnapBuild’ was heregmake[4]: *** [snapbuild.o] Error 1
修复错误
将src/backend/replication/logical/snapbuild.c中的
SnapBuild结构定义复制到
src/include/replication/snapbuild.h, 
同时删除
src/backend/replication/logical/snapbuild.c中的
SnapBuild结构定义.
如下
typedef struct SnapBuild{        /* how far are we along building our first full snapshot */        SnapBuildState state;        /* private memory context used to allocate memory for this module. */        MemoryContext context;        /* all transactions < than this have committed/aborted */        TransactionId xmin;        /* all transactions >= than this are uncommitted */        TransactionId xmax;        /*         * Don't replay commits from an LSN <= this LSN. This can be set         * externally but it will also be advanced (never retreat) from within         * snapbuild.c.         */        XLogRecPtr      transactions_after;        /*         * Don't start decoding WAL until the "xl_running_xacts" information         * indicates there are no running xids with a xid smaller than this.         */        TransactionId initial_xmin_horizon;        /*         * Snapshot thats valid to see all currently committed transactions that         * see catalog modifications.         */        Snapshot        snapshot;        /*         * LSN of the last location we are sure a snapshot has been serialized to.         */        XLogRecPtr      last_serialized_snapshot;        ReorderBuffer *reorder;        /* variable length data */        /*         * Information about initially running transactions         *         * When we start building a snapshot there already may be transactions in         * progress.  Those are stored in running.xip.  We don't have enough         * information about those to decode their contents, so until they are         * finished (xcnt=0) we cannot switch to a CONSISTENT state.         */        struct        {                /*                 * As long as running.xcnt all XIDs < running.xmin and > running.xmax                 * have to be checked whether they still are running.                 */                TransactionId xmin;                TransactionId xmax;                size_t          xcnt;           /* number of used xip entries */                size_t          xcnt_space; /* allocated size of xip */                TransactionId *xip;             /* running xacts array, xidComparator-sorted */        }                       running;        /*         * Array of transactions which could have catalog changes that committed         * between xmin and xmax         */        struct        {                /* number of committed transactions */                size_t          xcnt;                /* available space for committed transactions */                size_t          xcnt_space;                /*                 * Until we reach a CONSISTENT state, we record commits of all                 * transactions, not just the catalog changing ones. Record when that                 * changes so we know we cannot export a snapshot safely anymore.                 */                bool            includes_all_transactions;                /*                 * Array of committed transactions that have modified the catalog.                 *                 * As this array is frequently modified we do *not* keep it in                 * xidComparator order. Instead we sort the array when building &                 * distributing a snapshot.                 *                 * XXX: That doesn't seem to be good reasoning anymore. Everytime we                 * add something here after becoming consistent will also require                 * distributing a snapshot. Storing them sorted would potentially make                 * it easier to purge as well (but more complicated wrt wraparound?).                 */                TransactionId *xip;        }                       committed;} SnapBuild;
注释src/include/replication/snapbuild.h中的
SnapBuild
定义 : 
// struct SnapBuild;// typedef struct SnapBuild SnapBuild;
重新执行gmake, 正常.
cd postgresql-bab54e3gmakegmake installcd contribgmakegmake install
初始化数据库
initdb -E UTF8 -D $PGDATA --locale=C -W -U postgres
逻辑复制的相关参数的变化 : 
postgresql.conf
#wal_level = minimal                    # minimal, archive, logical or hot_standby                                        # (change requires restart)#max_wal_senders = 0            # max number of walsender processes, including                                # both physical and logical replication senders.                                # (change requires restart)#max_logical_slots = 0          # max number of logical replication sender                                # and receiver processes. Logical senders                                # (but not receivers) also consume a                                # max_wal_senders slot.                                # (change requires restart)
将以上参数配置为 : 
wal_level = logicalmax_wal_senders = 16max_logical_slots = 8
逻辑复制新增的命令
pg_receivellog
pg_receivellog receives PostgreSQL logical change stream.Usage:  pg_receivellog [OPTION]...Options:  -f, --file=FILE        receive log into this file. - for stdout  -n, --no-loop          do not loop on connection lost  -v, --verbose          output verbose messages  -V, --version          output version information, then exit  -?, --help             show this help, then exitConnection options:  -d, --database=DBNAME  database to connect to  -h, --host=HOSTNAME    database server host or socket directory  -p, --port=PORT        database server port number  -U, --username=NAME    connect as specified database user  -w, --no-password      never prompt for password  -W, --password         force password prompt (should happen automatically)Replication options:  -P, --plugin=PLUGIN    use output plugin PLUGIN (defaults to test_decoding)  -s, --status-interval=INTERVAL                         time between status packets sent to server (in seconds)  -S, --slot=SLOT        use existing replication slot SLOT instead of starting a new oneAction to be performed:      --init             initiate a new replication slot (for the slotname see --slot)      --start            start streaming in a replication slot (for the slotname see --slot)      --stop             stop the replication slot (for the slotname see --slot)Report bugs to 
.
新增的几个函数 : 
digoal=# \df *.*replica*                                                             List of functions   Schema   |           Name           | Result data type |                          Argument data types                          |  Type  ------------+--------------------------+------------------+-----------------------------------------------------------------------+-------- pg_catalog | init_logical_replication | record           | slotname name, plugin name, OUT slotname text, OUT xlog_position text | normal pg_catalog | stop_logical_replication | integer          | name                                                                  | normal(2 rows)
新增的extension  : 
cat test_logical_decoding--1.0.sql-- complain if script is sourced in psql, rather than via CREATE EXTENSION\echo Use "CREATE EXTENSION test_logical_decoding" to load this file. \quitCREATE FUNCTION start_logical_replication (slotname name, pos text, VARIADIC options text[] DEFAULT '{}', OUT location text, OUT xid bigint, OUT data text) RETURNS SETOF recordAS 'MODULE_PATHNAME', 'start_logical_replication'LANGUAGE C IMMUTABLE STRICT;
启动数据库
pg_ctl start
测试 : 
pg94@db-192-168-100-216-> psqlpsql (9.4devel)Type "help" for help.digoal=# create extension test_logical_decoding;CREATE EXTENSIONdigoal=# SELECT * FROM init_logical_replication('regression_slot', 'test_decoding');    slotname     | xlog_position -----------------+--------------- regression_slot | 0/18402B8(1 row)digoal=# CREATE TABLE foo(id serial primary key, data text);CREATE TABLEdigoal=# INSERT INTO foo(data) VALUES(1);INSERT 0 1digoal=# UPDATE foo SET id = -id, data = ':'||data;UPDATE 1digoal=# DELETE FROM foo;DELETE 1digoal=# DROP TABLE foo;DROP TABLEdigoal=# SELECT * FROM start_logical_replication('regression_slot', 'now', 'hide-xids', '0'); location  | xid  |                                      data                                      -----------+------+-------------------------------------------------------------------------------- 0/18402E8 | 1686 | BEGIN 0/18402E8 | 1686 | COMMIT 0/185EEB0 | 1687 | BEGIN 0/185EEB0 | 1687 | table "foo": INSERT: id[int4]:1 data[text]:1 0/185EEB0 | 1687 | COMMIT 0/185EFC8 | 1688 | BEGIN 0/185EFC8 | 1688 | table "foo": UPDATE: old-pkey: id[int4]:1 new-tuple: id[int4]:-1 data[text]::1 0/185EFC8 | 1688 | COMMIT 0/185F0A0 | 1689 | BEGIN 0/185F0A0 | 1689 | table "foo": DELETE: id[int4]:-1 0/185F0A0 | 1689 | COMMIT 0/185F2A8 | 1690 | BEGIN 0/185F2A8 | 1690 | COMMIT(13 rows)digoal=# SELECT * FROM pg_stat_logical_decoding ;    slot_name    |    plugin     | database | active | xmin | restart_decoding_lsn -----------------+---------------+----------+--------+------+---------------------- regression_slot | test_decoding |    16384 | f      | 1686 | 0/1840280(1 row)digoal=# SELECT * FROM stop_logical_replication('regression_slot'); stop_logical_replication --------------------------                        0(1 row)
pg_receivellog测试
配置pg_hba.conf
local   replication     postgres                                trusthost    replication     postgres        127.0.0.1/32            trustpg_ctl reload
pg94@db-192-168-100-216-> pg_receivellog -f ./test.logical -d digoal -h $PGDATA -p 1921 -U postgres -P test_decoding -s 1 -S test --initWARNING:  Initiating logical rep from 0/1861E10pg94@db-192-168-100-216-> pg_receivellog -f ./test.logical -d digoal -h $PGDATA -p 1921 -U postgres -P test_decoding -s 1 -S test --startWARNING:  Starting logical replication
查询pg_stat_replication
digoal=# select * from pg_stat_replication ;  pid  | usesysid | usename  | application_name | client_addr | client_hostname | client_port |         backend_start         |   state   | sent_location | write_location | flush_location | replay_location | sync_priority | sync_state -------+----------+----------+------------------+-------------+-----------------+-------------+-------------------------------+-----------+---------------+----------------+----------------+-----------------+---------------+------------ 30794 |       10 | postgres | pg_receivellog   |             |                 |          -1 | 2013-07-30 16:38:56.100306+08 | streaming | 0/1865AF8     |                |                |                 |             0 | async(1 row)
DDL未记录 : 
digoal=# create table t(id int);CREATE TABLEpg94@db-192-168-100-216-> cat test.logical BEGIN 1692COMMIT 1692
DML有记录
digoal=# insert into t values (1);INSERT 0 1pg94@db-192-168-100-216-> cat test.logical BEGIN 1692COMMIT 1692BEGIN 1693table "t": INSERT: id[int4]:1COMMIT 1693
digoal=# create table t1(id int);CREATE TABLEpg94@db-192-168-100-216-> cat test.logical BEGIN 1692COMMIT 1692BEGIN 1693table "t": INSERT: id[int4]:1COMMIT 1693BEGIN 1694COMMIT 1694digoal=# insert into t1 values (1);INSERT 0 1pg94@db-192-168-100-216-> cat test.logical BEGIN 1692COMMIT 1692BEGIN 1693table "t": INSERT: id[int4]:1COMMIT 1693BEGIN 1694COMMIT 1694BEGIN 1695table "t1": INSERT: id[int4]:1COMMIT 1695
[参考]
1. 
2. 
3. 补丁介绍
By Andres FreundHi!I am rather pleased to announce the next version of the changesetextraction patchset. Thanks to help from a large number of people Ithink we are slowly getting to the point where it is gettingcommittable.Since the last submitted version(20121115002746(dot)GA7692(at)awork2(dot)anarazel(dot)de) a large number of fixes andthe result of good amount of review has been added to the tree. Allbugs known to me have been fixed.Fixes include:* synchronous replication support* don't peg the xmin for user tables, do it only for catalog ones.* arbitrarily large transaction support by spilling large transactions  to disk* spill snapshots to disk, so we can restart without waiting for a new  snapshot to be built* Don't read all WAL from the establishment of a logical slot* tests via SQL interface to changeset extractionThe todo list includes:* morph the "logical slot" interface into being "replication slots" that  can also be used by streaming replication* move some more code from snapbuild.c to decode.c to remove a largely  duplicated switch* do some more header/comment cleanup & clarification* move pg_receivellog into its own directory in src/bin or contrib/.* user/developer level documentationThe patch series currently has two interfaces to logical decoding. One -which is primarily useful for pg_regress style tests and playing around- is SQL based, the other one uses a walsender replication connection.A quick demonstration of the SQL interface (server needs to be startedwith wal_level = logical and max_logical_slots > 0):=# CREATE EXTENSION test_logical_decoding;=# SELECT * FROM init_logical_replication('regression_slot', 'test_decoding');    slotname     | xlog_position -----------------+--------------- regression_slot | 0/17D5908(1 row)=# CREATE TABLE foo(id serial primary key, data text);=# INSERT INTO foo(data) VALUES(1);=# UPDATE foo SET id = -id, data = ':'||data;=# DELETE FROM foo;=# DROP TABLE foo;=# SELECT * FROM start_logical_replication('regression_slot', 'now', 'hide-xids', '0'); location  | xid |                                      data-----------+-----+-------------------------------------------------------------------------------- 0/17D59B8 | 695 | BEGIN 0/17D59B8 | 695 | COMMIT 0/17E8B58 | 696 | BEGIN 0/17E8B58 | 696 | table "foo": INSERT: id[int4]:1 data[text]:1 0/17E8B58 | 696 | COMMIT 0/17E8CA8 | 697 | BEGIN 0/17E8CA8 | 697 | table "foo": UPDATE: old-pkey: id[int4]:1 new-tuple: id[int4]:-1 data[text]::1 0/17E8CA8 | 697 | COMMIT 0/17E8E50 | 698 | BEGIN 0/17E8E50 | 698 | table "foo": DELETE: id[int4]:-1 0/17E8E50 | 698 | COMMIT 0/17E9058 | 699 | BEGIN 0/17E9058 | 699 | COMMIT(13 rows)=# SELECT * FROM pg_stat_logical_decoding ;    slot_name    |    plugin     | database | active | xmin | restart_decoding_lsn -----------------+---------------+----------+--------+------+---------------------- regression_slot | test_decoding |    12042 | f      |  695 | 0/17D58D0(1 row)=# SELECT * FROM stop_logical_replication('regression_slot'); stop_logical_replication--------------------------                        0The walsender interface has the same callsINIT_LOGICAL_REPLICATION 'slot' 'plugin';START_LOGICAL_REPLICATION 'slot' restart_lsn [(option value)*];STOP_LOGICAL_REPLICATION 'slot';The only difference is that START_LOGICAL_REPLICATION can stream changesand it can support synchronous replication.The output seen in the 'data' column is produced by a so called 'outputplugin' which users of the facility can write to suit their needs. Theycan be written by implementing 5 functions in the shared object that'spassed to init_logical_replication() above:* pg_decode_init (optional)* pg_decode_begin_txn* pg_decode_change* pg_decode_commit_txn* pg_decode_cleanup (optional)The most interesting function pg_decode_change get's passed a structurecontaining old/new versions of the row, the 'struct Relation' belongingto it and metainformation about the transaction.The output plugin can rely on syscache lookups et al. to decode thechanged tuple in whatever fashion it wants.I'd like to invite reviewers to first look at:* the output plugin interface* the walsender/SRF interface* patch 12 which contains most of the codeWhen reading the code, the information flow during decoding might beinteresting:---------------          +---------------+          | XLogReader    |          +---------------+                  |            XLOG Records                  |                  v          +---------------+          | decode.c      |          +---------------+             |       |             |       |             v       |+---------------+    || snapbuild.c   |  HeapTupleData+---------------+    |             |       |  catalog snapshots  |             |       |             v       v	  +---------------+	  |reorderbuffer.c|	  +---------------+                 |        HeapTuple & Metadata                 |                 v          +---------------+	  | Output Plugin |	  +---------------+                 |          Whatever you want                 |                 v          +---------------+	  | Output Handler|	  |               |	  |WalSnd or SRF  |	  +---------------+---------------Overview of the attached patches:0001: indirect toast tuples; required but submitted independently0002: functions for testing; not required,0003: (tablespace, filenode) syscache; required0004: RelationMapFilenodeToOid: required, simple0005: pg_relation_by_filenode() function; not required but useful0006: Introduce InvalidCommandId: required, simple0007: Adjust Satisfies* interface: required, mechanical,0008: Allow walsender to attach to a database: required, needs review0009: New GetOldestXmin() parameter; required, pretty boring0010: Log xl_running_xact regularly in the bgwriter: required0011: make fsync_fname() public; required, needs to be in a different file0012: Relcache support for an Relation's primary key: required0013: Actual changeset extraction; required0014: Output plugin demo; not required (except for testing) but useful0015: Add pg_receivellog program: not required but useful0016: Add test_logical_decoding extension; not required, but contains      the tests for the feature. Uses 00140017: Snapshot building docs; not requiredGreetings,Andres Freund

转载地址:http://mscha.baihongyu.com/

你可能感兴趣的文章
解决time_wait过多的问题
查看>>
技术转载:Jni学习一:了解Jni
查看>>
vue教程2-07 自定义指令
查看>>
Node.js之循环依赖
查看>>
python3调用阿里云短信服务
查看>>
Linux-百度云之AccleriderMini使用
查看>>
bootstrapTable refresh 方法使用简单举例
查看>>
2、TestNG+Maven+IDEA环境搭建
查看>>
maven插件运行过程中自动执行sql文件
查看>>
New UWP Community Toolkit - XAML Brushes
查看>>
C# ==、Equals、ReferenceEquals 区别与联系 (转载)
查看>>
layer弹出层的关闭问题
查看>>
LeetCode——3Sum &amp; 3Sum Closest
查看>>
netstat详解
查看>>
微信小程序 --- e.currentTarget.dataset.id 获取不到值
查看>>
Introducing stapbpf – SystemTap’s new BPF backend
查看>>
详细介绍MySQL/MariaDB的锁
查看>>
0603-Zuul构建API Gateway-通过Zuul上传文件,禁用Zuul的Filter
查看>>
cocos2dx-2.x CCFileUtils文件管理分析(2)
查看>>
Emacs中多个golang项目的配置方法
查看>>