本文共 22420 字,大约阅读时间需要 74 分钟。
mkdir patchcd patchwget http://www.postgresql.org/message-id/attachment/29336/0001-Add-support-for-multiple-kinds-of-external-toast-dat.patch.gzwget http://www.postgresql.org/message-id/attachment/29337/0002-wal_decoding-Add-pg_xlog_wait_remote_-apply-receive-.patch.gzwget http://www.postgresql.org/message-id/attachment/29338/0003-wal_decoding-Add-a-new-RELFILENODE-syscache-to-fetch.patch.gzwget http://www.postgresql.org/message-id/attachment/29339/0004-wal_decoding-Add-RelationMapFilenodeToOid-function-t.patch.gzwget http://www.postgresql.org/message-id/attachment/29340/0005-wal_decoding-Add-pg_relation_by_filenode-to-lookup-u.patch.gzwget http://www.postgresql.org/message-id/attachment/29341/0006-wal_decoding-Introduce-InvalidCommandId-and-declare-.patch.gzwget http://www.postgresql.org/message-id/attachment/29342/0007-wal_decoding-Adjust-all-Satisfies-routines-to-take-a.patch.gzwget http://www.postgresql.org/message-id/attachment/29343/0008-wal_decoding-Allow-walsender-s-to-connect-to-a-speci.patch.gzwget http://www.postgresql.org/message-id/attachment/29344/0009-wal_decoding-Add-alreadyLocked-parameter-to-GetOldes.patch.gzwget http://www.postgresql.org/message-id/attachment/29345/0010-wal_decoding-Log-xl_running_xact-s-at-a-higher-frequ.patch.gzwget http://www.postgresql.org/message-id/attachment/29346/0011-wal_decoding-copydir-make-fsync_fname-public.patch.gzwget http://www.postgresql.org/message-id/attachment/29347/0012-wal_decoding-Add-information-about-a-tables-primary-.patch.gzwget http://www.postgresql.org/message-id/attachment/29348/0013-wal_decoding-Introduce-wal-decoding-via-catalog-time.patch.gzwget http://www.postgresql.org/message-id/attachment/29349/0014-wal_decoding-test_decoding-Add-a-simple-decoding-mod.patch.gzwget http://www.postgresql.org/message-id/attachment/29350/0015-wal_decoding-pg_receivellog-Introduce-pg_receivexlog.patch.gzwget http://www.postgresql.org/message-id/attachment/29351/0016-wal_decoding-test_logical_decoding-Add-extension-for.patch.gzwget http://www.postgresql.org/message-id/attachment/29352/0017-wal_decoding-design-document-v2.4-and-snapshot-build.patch.gzwget http://www.postgresql.org/message-id/attachment/29390/0001-wal_decoding-mergme-Fix-pg_basebackup-makefile.patchwget http://www.postgresql.org/message-id/attachment/29391/0002-wal_decoding-mergme-Fix-test_logical_decoding-Makefi.patchwget http://www.postgresql.org/message-id/attachment/29564/0001-wal_decoding-mergme-Don-t-use-out-of-scope-local-var.patchgunzip *.gz打补丁
tar -zxvf postgresql-bab54e3.tar.gzcd postgresql-bab54e3patch -p1 < ../patch/0001-Add-support-for-multiple-kinds-of-external-toast-dat.patchpatch -p1 < ../patch/0002-wal_decoding-Add-pg_xlog_wait_remote_-apply-receive-.patchpatch -p1 < ../patch/0003-wal_decoding-Add-a-new-RELFILENODE-syscache-to-fetch.patchpatch -p1 < ../patch/0004-wal_decoding-Add-RelationMapFilenodeToOid-function-t.patchpatch -p1 < ../patch/0005-wal_decoding-Add-pg_relation_by_filenode-to-lookup-u.patchpatch -p1 < ../patch/0006-wal_decoding-Introduce-InvalidCommandId-and-declare-.patchpatch -p1 < ../patch/0007-wal_decoding-Adjust-all-Satisfies-routines-to-take-a.patchpatch -p1 < ../patch/0008-wal_decoding-Allow-walsender-s-to-connect-to-a-speci.patchpatch -p1 < ../patch/0009-wal_decoding-Add-alreadyLocked-parameter-to-GetOldes.patchpatch -p1 < ../patch/0010-wal_decoding-Log-xl_running_xact-s-at-a-higher-frequ.patchpatch -p1 < ../patch/0011-wal_decoding-copydir-make-fsync_fname-public.patchpatch -p1 < ../patch/0012-wal_decoding-Add-information-about-a-tables-primary-.patchpatch -p1 < ../patch/0013-wal_decoding-Introduce-wal-decoding-via-catalog-time.patchpatch -p1 < ../patch/0014-wal_decoding-test_decoding-Add-a-simple-decoding-mod.patchpatch -p1 < ../patch/0015-wal_decoding-pg_receivellog-Introduce-pg_receivexlog.patchpatch -p1 < ../patch/0016-wal_decoding-test_logical_decoding-Add-extension-for.patchpatch -p1 < ../patch/0017-wal_decoding-design-document-v2.4-and-snapshot-build.patchpatch -p1 < ../patch/0001-wal_decoding-mergme-Fix-pg_basebackup-makefile.patchpatch -p1 < ../patch/0002-wal_decoding-mergme-Fix-test_logical_decoding-Makefi.patchpatch -p1 < ../patch/0001-wal_decoding-mergme-Don-t-use-out-of-scope-local-var.patch
cd postgresql-bab54e3./configure --prefix=/home/pg94/pgsql9.4devel --with-pgport=1921 --with-perl --with-tcl --with-python --with-openssl --with-pam --without-ldap --with-libxml --with-libxslt --enable-thread-safety --with-wal-blocksize=16 && gmake && gmake install报错
snapbuild.c:206: error: redefinition of typedef ‘SnapBuild’../../../../src/include/replication/snapbuild.h:54: error: previous declaration of ‘SnapBuild’ was heregmake[4]: *** [snapbuild.o] Error 1
typedef struct SnapBuild{ /* how far are we along building our first full snapshot */ SnapBuildState state; /* private memory context used to allocate memory for this module. */ MemoryContext context; /* all transactions < than this have committed/aborted */ TransactionId xmin; /* all transactions >= than this are uncommitted */ TransactionId xmax; /* * Don't replay commits from an LSN <= this LSN. This can be set * externally but it will also be advanced (never retreat) from within * snapbuild.c. */ XLogRecPtr transactions_after; /* * Don't start decoding WAL until the "xl_running_xacts" information * indicates there are no running xids with a xid smaller than this. */ TransactionId initial_xmin_horizon; /* * Snapshot thats valid to see all currently committed transactions that * see catalog modifications. */ Snapshot snapshot; /* * LSN of the last location we are sure a snapshot has been serialized to. */ XLogRecPtr last_serialized_snapshot; ReorderBuffer *reorder; /* variable length data */ /* * Information about initially running transactions * * When we start building a snapshot there already may be transactions in * progress. Those are stored in running.xip. We don't have enough * information about those to decode their contents, so until they are * finished (xcnt=0) we cannot switch to a CONSISTENT state. */ struct { /* * As long as running.xcnt all XIDs < running.xmin and > running.xmax * have to be checked whether they still are running. */ TransactionId xmin; TransactionId xmax; size_t xcnt; /* number of used xip entries */ size_t xcnt_space; /* allocated size of xip */ TransactionId *xip; /* running xacts array, xidComparator-sorted */ } running; /* * Array of transactions which could have catalog changes that committed * between xmin and xmax */ struct { /* number of committed transactions */ size_t xcnt; /* available space for committed transactions */ size_t xcnt_space; /* * Until we reach a CONSISTENT state, we record commits of all * transactions, not just the catalog changing ones. Record when that * changes so we know we cannot export a snapshot safely anymore. */ bool includes_all_transactions; /* * Array of committed transactions that have modified the catalog. * * As this array is frequently modified we do *not* keep it in * xidComparator order. Instead we sort the array when building & * distributing a snapshot. * * XXX: That doesn't seem to be good reasoning anymore. Everytime we * add something here after becoming consistent will also require * distributing a snapshot. Storing them sorted would potentially make * it easier to purge as well (but more complicated wrt wraparound?). */ TransactionId *xip; } committed;} SnapBuild;注释src/include/replication/snapbuild.h中的 SnapBuild 定义 :
// struct SnapBuild;// typedef struct SnapBuild SnapBuild;重新执行gmake, 正常.
cd postgresql-bab54e3gmakegmake installcd contribgmakegmake install初始化数据库
initdb -E UTF8 -D $PGDATA --locale=C -W -U postgres
#wal_level = minimal # minimal, archive, logical or hot_standby # (change requires restart)#max_wal_senders = 0 # max number of walsender processes, including # both physical and logical replication senders. # (change requires restart)#max_logical_slots = 0 # max number of logical replication sender # and receiver processes. Logical senders # (but not receivers) also consume a # max_wal_senders slot. # (change requires restart)将以上参数配置为 :
wal_level = logicalmax_wal_senders = 16max_logical_slots = 8
pg_receivellog receives PostgreSQL logical change stream.Usage: pg_receivellog [OPTION]...Options: -f, --file=FILE receive log into this file. - for stdout -n, --no-loop do not loop on connection lost -v, --verbose output verbose messages -V, --version output version information, then exit -?, --help show this help, then exitConnection options: -d, --database=DBNAME database to connect to -h, --host=HOSTNAME database server host or socket directory -p, --port=PORT database server port number -U, --username=NAME connect as specified database user -w, --no-password never prompt for password -W, --password force password prompt (should happen automatically)Replication options: -P, --plugin=PLUGIN use output plugin PLUGIN (defaults to test_decoding) -s, --status-interval=INTERVAL time between status packets sent to server (in seconds) -S, --slot=SLOT use existing replication slot SLOT instead of starting a new oneAction to be performed: --init initiate a new replication slot (for the slotname see --slot) --start start streaming in a replication slot (for the slotname see --slot) --stop stop the replication slot (for the slotname see --slot)Report bugs to新增的几个函数 :.
digoal=# \df *.*replica* List of functions Schema | Name | Result data type | Argument data types | Type ------------+--------------------------+------------------+-----------------------------------------------------------------------+-------- pg_catalog | init_logical_replication | record | slotname name, plugin name, OUT slotname text, OUT xlog_position text | normal pg_catalog | stop_logical_replication | integer | name | normal(2 rows)新增的extension :
cat test_logical_decoding--1.0.sql-- complain if script is sourced in psql, rather than via CREATE EXTENSION\echo Use "CREATE EXTENSION test_logical_decoding" to load this file. \quitCREATE FUNCTION start_logical_replication (slotname name, pos text, VARIADIC options text[] DEFAULT '{}', OUT location text, OUT xid bigint, OUT data text) RETURNS SETOF recordAS 'MODULE_PATHNAME', 'start_logical_replication'LANGUAGE C IMMUTABLE STRICT;启动数据库
pg_ctl start测试 :
pg94@db-192-168-100-216-> psqlpsql (9.4devel)Type "help" for help.digoal=# create extension test_logical_decoding;CREATE EXTENSIONdigoal=# SELECT * FROM init_logical_replication('regression_slot', 'test_decoding'); slotname | xlog_position -----------------+--------------- regression_slot | 0/18402B8(1 row)digoal=# CREATE TABLE foo(id serial primary key, data text);CREATE TABLEdigoal=# INSERT INTO foo(data) VALUES(1);INSERT 0 1digoal=# UPDATE foo SET id = -id, data = ':'||data;UPDATE 1digoal=# DELETE FROM foo;DELETE 1digoal=# DROP TABLE foo;DROP TABLEdigoal=# SELECT * FROM start_logical_replication('regression_slot', 'now', 'hide-xids', '0'); location | xid | data -----------+------+-------------------------------------------------------------------------------- 0/18402E8 | 1686 | BEGIN 0/18402E8 | 1686 | COMMIT 0/185EEB0 | 1687 | BEGIN 0/185EEB0 | 1687 | table "foo": INSERT: id[int4]:1 data[text]:1 0/185EEB0 | 1687 | COMMIT 0/185EFC8 | 1688 | BEGIN 0/185EFC8 | 1688 | table "foo": UPDATE: old-pkey: id[int4]:1 new-tuple: id[int4]:-1 data[text]::1 0/185EFC8 | 1688 | COMMIT 0/185F0A0 | 1689 | BEGIN 0/185F0A0 | 1689 | table "foo": DELETE: id[int4]:-1 0/185F0A0 | 1689 | COMMIT 0/185F2A8 | 1690 | BEGIN 0/185F2A8 | 1690 | COMMIT(13 rows)digoal=# SELECT * FROM pg_stat_logical_decoding ; slot_name | plugin | database | active | xmin | restart_decoding_lsn -----------------+---------------+----------+--------+------+---------------------- regression_slot | test_decoding | 16384 | f | 1686 | 0/1840280(1 row)digoal=# SELECT * FROM stop_logical_replication('regression_slot'); stop_logical_replication -------------------------- 0(1 row)
local replication postgres trusthost replication postgres 127.0.0.1/32 trustpg_ctl reload
pg94@db-192-168-100-216-> pg_receivellog -f ./test.logical -d digoal -h $PGDATA -p 1921 -U postgres -P test_decoding -s 1 -S test --initWARNING: Initiating logical rep from 0/1861E10pg94@db-192-168-100-216-> pg_receivellog -f ./test.logical -d digoal -h $PGDATA -p 1921 -U postgres -P test_decoding -s 1 -S test --startWARNING: Starting logical replication查询pg_stat_replication
digoal=# select * from pg_stat_replication ; pid | usesysid | usename | application_name | client_addr | client_hostname | client_port | backend_start | state | sent_location | write_location | flush_location | replay_location | sync_priority | sync_state -------+----------+----------+------------------+-------------+-----------------+-------------+-------------------------------+-----------+---------------+----------------+----------------+-----------------+---------------+------------ 30794 | 10 | postgres | pg_receivellog | | | -1 | 2013-07-30 16:38:56.100306+08 | streaming | 0/1865AF8 | | | | 0 | async(1 row)DDL未记录 :
digoal=# create table t(id int);CREATE TABLEpg94@db-192-168-100-216-> cat test.logical BEGIN 1692COMMIT 1692DML有记录
digoal=# insert into t values (1);INSERT 0 1pg94@db-192-168-100-216-> cat test.logical BEGIN 1692COMMIT 1692BEGIN 1693table "t": INSERT: id[int4]:1COMMIT 1693
digoal=# create table t1(id int);CREATE TABLEpg94@db-192-168-100-216-> cat test.logical BEGIN 1692COMMIT 1692BEGIN 1693table "t": INSERT: id[int4]:1COMMIT 1693BEGIN 1694COMMIT 1694digoal=# insert into t1 values (1);INSERT 0 1pg94@db-192-168-100-216-> cat test.logical BEGIN 1692COMMIT 1692BEGIN 1693table "t": INSERT: id[int4]:1COMMIT 1693BEGIN 1694COMMIT 1694BEGIN 1695table "t1": INSERT: id[int4]:1COMMIT 1695[参考]
By Andres FreundHi!I am rather pleased to announce the next version of the changesetextraction patchset. Thanks to help from a large number of people Ithink we are slowly getting to the point where it is gettingcommittable.Since the last submitted version(20121115002746(dot)GA7692(at)awork2(dot)anarazel(dot)de) a large number of fixes andthe result of good amount of review has been added to the tree. Allbugs known to me have been fixed.Fixes include:* synchronous replication support* don't peg the xmin for user tables, do it only for catalog ones.* arbitrarily large transaction support by spilling large transactions to disk* spill snapshots to disk, so we can restart without waiting for a new snapshot to be built* Don't read all WAL from the establishment of a logical slot* tests via SQL interface to changeset extractionThe todo list includes:* morph the "logical slot" interface into being "replication slots" that can also be used by streaming replication* move some more code from snapbuild.c to decode.c to remove a largely duplicated switch* do some more header/comment cleanup & clarification* move pg_receivellog into its own directory in src/bin or contrib/.* user/developer level documentationThe patch series currently has two interfaces to logical decoding. One -which is primarily useful for pg_regress style tests and playing around- is SQL based, the other one uses a walsender replication connection.A quick demonstration of the SQL interface (server needs to be startedwith wal_level = logical and max_logical_slots > 0):=# CREATE EXTENSION test_logical_decoding;=# SELECT * FROM init_logical_replication('regression_slot', 'test_decoding'); slotname | xlog_position -----------------+--------------- regression_slot | 0/17D5908(1 row)=# CREATE TABLE foo(id serial primary key, data text);=# INSERT INTO foo(data) VALUES(1);=# UPDATE foo SET id = -id, data = ':'||data;=# DELETE FROM foo;=# DROP TABLE foo;=# SELECT * FROM start_logical_replication('regression_slot', 'now', 'hide-xids', '0'); location | xid | data-----------+-----+-------------------------------------------------------------------------------- 0/17D59B8 | 695 | BEGIN 0/17D59B8 | 695 | COMMIT 0/17E8B58 | 696 | BEGIN 0/17E8B58 | 696 | table "foo": INSERT: id[int4]:1 data[text]:1 0/17E8B58 | 696 | COMMIT 0/17E8CA8 | 697 | BEGIN 0/17E8CA8 | 697 | table "foo": UPDATE: old-pkey: id[int4]:1 new-tuple: id[int4]:-1 data[text]::1 0/17E8CA8 | 697 | COMMIT 0/17E8E50 | 698 | BEGIN 0/17E8E50 | 698 | table "foo": DELETE: id[int4]:-1 0/17E8E50 | 698 | COMMIT 0/17E9058 | 699 | BEGIN 0/17E9058 | 699 | COMMIT(13 rows)=# SELECT * FROM pg_stat_logical_decoding ; slot_name | plugin | database | active | xmin | restart_decoding_lsn -----------------+---------------+----------+--------+------+---------------------- regression_slot | test_decoding | 12042 | f | 695 | 0/17D58D0(1 row)=# SELECT * FROM stop_logical_replication('regression_slot'); stop_logical_replication-------------------------- 0The walsender interface has the same callsINIT_LOGICAL_REPLICATION 'slot' 'plugin';START_LOGICAL_REPLICATION 'slot' restart_lsn [(option value)*];STOP_LOGICAL_REPLICATION 'slot';The only difference is that START_LOGICAL_REPLICATION can stream changesand it can support synchronous replication.The output seen in the 'data' column is produced by a so called 'outputplugin' which users of the facility can write to suit their needs. Theycan be written by implementing 5 functions in the shared object that'spassed to init_logical_replication() above:* pg_decode_init (optional)* pg_decode_begin_txn* pg_decode_change* pg_decode_commit_txn* pg_decode_cleanup (optional)The most interesting function pg_decode_change get's passed a structurecontaining old/new versions of the row, the 'struct Relation' belongingto it and metainformation about the transaction.The output plugin can rely on syscache lookups et al. to decode thechanged tuple in whatever fashion it wants.I'd like to invite reviewers to first look at:* the output plugin interface* the walsender/SRF interface* patch 12 which contains most of the codeWhen reading the code, the information flow during decoding might beinteresting:--------------- +---------------+ | XLogReader | +---------------+ | XLOG Records | v +---------------+ | decode.c | +---------------+ | | | | v |+---------------+ || snapbuild.c | HeapTupleData+---------------+ | | | catalog snapshots | | | v v +---------------+ |reorderbuffer.c| +---------------+ | HeapTuple & Metadata | v +---------------+ | Output Plugin | +---------------+ | Whatever you want | v +---------------+ | Output Handler| | | |WalSnd or SRF | +---------------+---------------Overview of the attached patches:0001: indirect toast tuples; required but submitted independently0002: functions for testing; not required,0003: (tablespace, filenode) syscache; required0004: RelationMapFilenodeToOid: required, simple0005: pg_relation_by_filenode() function; not required but useful0006: Introduce InvalidCommandId: required, simple0007: Adjust Satisfies* interface: required, mechanical,0008: Allow walsender to attach to a database: required, needs review0009: New GetOldestXmin() parameter; required, pretty boring0010: Log xl_running_xact regularly in the bgwriter: required0011: make fsync_fname() public; required, needs to be in a different file0012: Relcache support for an Relation's primary key: required0013: Actual changeset extraction; required0014: Output plugin demo; not required (except for testing) but useful0015: Add pg_receivellog program: not required but useful0016: Add test_logical_decoding extension; not required, but contains the tests for the feature. Uses 00140017: Snapshot building docs; not requiredGreetings,Andres Freund
转载地址:http://mscha.baihongyu.com/