可以在 PostgreSQL 源码树的
contrib/test_decoding
子目录中找到一个输出插件的例子。
一个输出插件是通过动态载入一个以输出插件名称作为基础名称的共享库来载入的。
将使用普通的库搜索路径来定位该库。为了提供所要求的输出插件回调并且指示该
库确实是一个输出插件,需要提供一个名为
_PG_output_plugin_init
的函数。这个函数会被传入一个
结构,其中被填充了各个动作的回调函数指针。
typedef struct OutputPluginCallbacks { LogicalDecodeStartupCB startup_cb; LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; LogicalDecodeFilterPrepareCB filter_prepare_cb; LogicalDecodeBeginPrepareCB begin_prepare_cb; LogicalDecodePrepareCB prepare_cb; LogicalDecodeCommitPreparedCB commit_prepared_cb; LogicalDecodeRollbackPreparedCB rollback_prepared_cb; LogicalDecodeStreamStartCB stream_start_cb; LogicalDecodeStreamStopCB stream_stop_cb; LogicalDecodeStreamAbortCB stream_abort_cb; LogicalDecodeStreamPrepareCB stream_prepare_cb; LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
回调函数begin_cb
、change_cb
以及commit_cb
是必需的,而
startup_cb
、filter_by_origin_cb
、truncate_cb
和shutdown_cb
是可选的。如果没有设置truncate_cb
但是要对一个TRUNCATE
进行编码,则该动作将被忽略。
输出插件也可以定义支持大的、在处理事务的流的函数。
stream_start_cb
、stream_stop_cb
、stream_abort_cb
、stream_commit_cb
、stream_change_cb
、 和 stream_prepare_cb
是需要的, stream_message_cb
和 stream_truncate_cb
是可选的。
输出插件还可以定义支持两阶段提交的函数,允许活动在PREPARE TRANSACTION
上被解码。
begin_prepare_cb
, prepare_cb
、stream_prepare_cb
、commit_prepared_cb
和 rollback_prepared_cb
回调是需要的,filter_prepare_cb
是可选的。
要解码、格式化并且输出更改,输出插件可以使用大部分后端的标准功能,包括调用
输出函数。只要访问的关系是initdb
在
pg_catalog
模式中创建的或者被使用
ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
注意要访问输出插件中的用户目录表或常规系统目录表,只能通过systable_*
扫描 APIs完成。
通过heap_*
扫描 APIs访问将出错
此外标记为用户提供的系统表,就允许对关系的只读访问。
任何导致事务 ID 分配的动作都被禁止。
其中包括写表、执行 DDL 更改以及调用pg_current_xact_id()
。
输出插件回调可以以近乎任意格式向消费者传递数据。对于某些用例,例如通过 SQL
查看更改,以可能包含任何数据的数据类型(例如bytea
)返回数据
可能会很麻烦。如果输出插件只输出服务器编码的文本数据,它可以在
启动回调中通过把OutputPluginOptions.output_type
设
置为OUTPUT_PLUGIN_TEXTUAL_OUTPUT
替代
OUTPUT_PLUGIN_BINARY_OUTPUT
来声明这一点。在这种情况下,
所有的数据必须是属于服务器的编码,这样一个text
数据就能包含它。在
启用了断言的编译中会检查这一点。
一个输出插件需要提供一些回调,它通过它们得到有关更改发生的通知。
并发事务以提交顺序被解码,并且只有属于特定事务的更改会在 begin
和commit
回调之间被解码。
被显式或隐式回滚的事务不会被解码。
成功的检查点被折叠到包含它们的事务中,并且保持它们在该事务中被执行的顺序。
如果提供了解码它们所需要的输出插件回调,那么使用PREPARE TRANSACTION
为两阶段提交准备的事务也将被解码。
有可能通过ROLLBACK PREPARED
命令并发地中止正在解码的当前准备好的事务。
在这种情况下,该事务的逻辑解码也将被中止。
一旦检测到中止并且调用prepare_cb
回调,就会跳过此事务的所有更改。
因此即使在并发中止的情况下,也会向输出插件提供足够的信息,以便一旦解码后它可以正确应对ROLLBACK PREPARED
。
只有已经被安全地刷入磁盘的事务将会被解码。当
synchronous_commit
被设置为off
时,这会导致一个COMMIT
在随后的
pg_logical_slot_get_changes()
中不会立即被解码。
只要一个复制槽被创建或者被要求流式传送更改,可选的
startup_cb
回调就会被调用,不管有多少更改准备输出。
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx, OutputPluginOptions *options, bool is_init);
当复制槽被创建时,is_init
参数将为真,否则为假。
options
指向一个输出插件可以设置的选项
的结构:
typedef struct OutputPluginOptions { OutputPluginOutputType output_type; bool receive_rewrites; } OutputPluginOptions;
output_type
必须被设置为
OUTPUT_PLUGIN_TEXTUAL_OUTPUT
或者OUTPUT_PLUGIN_BINARY_OUTPUT
。另见
第 49.6.3 节。如果receive_rewrites
为真,还将为在某些DDL操作期间的堆重写造成的更改调用输出插件。这些是处理DDL复制的插件感兴趣的事情,但是它们要求特殊的处理。
启动回调应该验证出现在
ctx->output_plugin_options
中的选项。如果输出插件
需要有一个状态,它可以使用
ctx->output_plugin_private
来存储之。
只要一个之前活跃的复制槽不再使用,就会调用可选的
shutdown_cb
回调,它可以被用来释放输出插件
私有的资源。该槽并不一定需要被删除,只要其中的流被停止即可。
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
只要一个已提交事务的开始动作被解码,就会调用必须提供的
begin_cb
回调。被中止的事务及其内容不会被解码。
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
txn
参数包含有关该事务的元信息,例如该
事务被提交的时间戳以及该事务的 XID。
只要一个已提交事务的提交动作被解码,就会调用必须提供的
commit_cb
回调。在此之前,如果有任何被修改
的行,将为所有被修改的行调用change_cb
回调。
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
对于一个事务中的每一个行修改,都将调用必须提供的change_cb
回调,这种修改可能是一个INSERT
、UPDATE
或者DELETE
。
即使原始命令一次修改了多行,该回调也会为其中的每一行调用一次。
change_cb
回调可以访问系统或用户目录表,以帮助输出行修改细节的过程。
在解码一个准备好的(但仍未提交)事务或解码一个未提交的事务的情况下,这个更改回调也可能由于同时回滚这一相同事务而出错。
在这种情况下,对这个中止事务的逻辑解码被优雅的停止。
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change);
ctx
和txn
参数与
begin_cb
和commit_cb
回调具有相同的内容,但是额外多出一个关系描述符
relation
指向该行所属的关系以及一个结构
change
描述被传入的行修改。
只有没有被标记为“不做日志”(见
UNLOGGED
)并且非临时(见
TEMPORARY
or TEMP
)的用户定义表中的
更改才能用逻辑解码抽取。
truncate_cb
回调会为一个TRUNCATE
命令被调用。
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change);
参数类似于change_cb
回调。不过,由于通过外键连接起来的表上的TRUNCATE
动作需要一起被执行,这个回调会接收到一个关系的数组而不是单个关系。详情请见对TRUNCATE语句的介绍。
可选的filter_by_origin_cb
回调被用来
决定从origin_id
重放的数据是否是
输出插件感兴趣的数据。
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx, RepOriginId origin_id);
ctx
参数具有和其他回调相同的内容。
对这个回调只有复制源的信息可用。要标志传进来的节点上发生的
更改是无关的,返回真,这会导致这些更改被过滤掉,否则返回假。
对于被过滤掉的事务和更改将不会调用其他回调。
在实现级联或者多向复制方案时,这个回调可以派上用场。用源头 过滤允许阻止在这样的设置下来回地复制同样的更改。虽然事务和 更改也携带了有关源头的信息,通过这个回调过滤明显更有效些。
只要一个逻辑解码消息被解码出来,可选的message_cb
回调就会被调用。
typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message);
txn
参数包含关于该事务的元信息,如被提交的时间戳和 XID。
不过要注意,当消息是非事务性的并且记录该消息的事务中还没有被分配 XID 时,这个参数可以为 NULL。
lsn
是该消息的 WAL 位置。
transactional
说明该消息是否为事务性的。
类似于变更回调,在解码一个准备好的(但仍未提交)事务或解码一个未提交的事务的情况下,此消息回调也可能由于同时回滚这一相同事务而出错。
在这种情况下,这个中止事务的逻辑解码被优雅的停止。
prefix
是一个任意的空终结的前缀,它当前插件被用来标识感兴趣的消息。
最后的message
参数保存着大小为message_size
的消息。
应该格外小心确保输出插件用于标识感兴趣消息的前缀是唯一的。建议使用扩展或者输出插件本身的名称。
可选的filter_prepare_cb
回调被调用,以决定作为当前两阶段提交事务一部分的数据是否考虑在这个准备阶段进行解码,还是以后在COMMIT PREPARED
时作为常规的一阶段事务。
要表示要跳过解码,返回true
;否则是false
。
如果回调没有被定义,则假定false
(也就是说,没有过滤,所有使用两阶段提交的事务也在两个阶段进行解码)。
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, TransactionId xid, const char *gid);
ctx
参数与其他回调具有相同的内容。
参数xid
和gid
提供了两种不同的方式以标识事务。
后面的COMMIT PREPARED
或ROLLBACK PREPARED
携带这两个标识符,提供了输出插件可用的选项。
每个事务可以多次调用回调来解码,并且在每次它被调用时,必须为给定的xid
和 gid
对提供相同的静态答案。
必需的begin_prepare_cb
回调函数在已解码准备事务的开始时被调用。
gid
字段是txn
参数的一部分,可以在此回调函数中使用,
以检查插件是否已经接收到此PREPARE
,在这种情况下,它可以出错或跳过事务的其余更改。
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
所需的prepare_cb
回调被调用,当为两阶段提交准备的事务被解码的时候。
如果有任何修改的行,那么所有修改行的change_cb
回调将在这之前被调用。
gid
字段,是txn
参数的一部分,可以在这个回调中使用。
typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
所需要的commit_prepared_cb
回调被调用,当事务COMMIT PREPARED
被解码时。
gid
字段,是txn
参数的一部分,可以在这个回调中使用。
typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
所需要的rollback_prepared_cb
回调被调用,当事务ROLLBACK PREPARED
被解码时。
gid
字段,是txn
参数的一部分,可以在这个回调中使用。
参数prepare_end_lsn
和 prepare_time
可用于检查插件是否已经收到这个PREPARE TRANSACTION
,在这种情况下,它可以应用回滚,否则,它可以跳过回滚操作。
单独的gid
是不够的,因为下游节点可以有一个具有相同标识符的准备事务。
typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
stream_start_cb
回调被调用,在打开一个正在进行的事务中的流更改的块的时候。
typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
stream_stop_cb
回调被调用,在关闭正在进行的事务中的流更改的块时。
typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
stream_abort_cb
被调用,以中止先前的流事务。
typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn);
stream_prepare_cb
回调被调用,以准备一个先前的流事务,作为两阶段提交的一部分。
typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
stream_commit_cb
回调被调用,以提交一个先前的流事务。
typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
stream_change_cb
回调被调用,在发送流更改块中的更改时(由stream_start_cb
和 stream_stop_cb
调用划分)。
实际的更改不会显示,因为事务可以在稍后的点及时中止,并且我们不会解码已中止的事务的更改。
typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change);
stream_message_cb
回调被调用,在流更改块中发送通用消息时(由stream_start_cb
和 stream_stop_cb
调用划分)。
事务性消息的消息内容不会显示,因为事务可以在稍后的点及时中止,而且我们不会解码已中止事务的更改。
typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message);
stream_truncate_cb
回调被调用,用于在流更改的块中的 TRUNCATE
命令(由stream_start_cb
和 stream_stop_cb
调用划分)。
typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change);
此参数与stream_change_cb
回调相似。
但是,因为对由外键连接的表的TRUNCATE
操作需要一起执行,因此该回调接收一个关系数组,而不仅是单个关系。
详细信息请参见TRUNCATE语句的描述。
在begin_cb
、commit_cb
或者
change_cb
回调中,为了实际产生输出,
输出插件可以把数据写入到ctx->out
中的
StringInfo
输出缓冲区中。在写出到输出缓冲区之前,必须先
调用OutputPluginPrepareWrite(ctx, last_write)
,在完
成写入到缓冲区后,必须调用
OutputPluginWrite(ctx, last_write)
来执行写出。
last_write
指出一次特定的写出是否为该回调的最后
一次写出。
下面的例子展示了如何把数据输出给一个输出插件的消费者:
OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "BEGIN %u", txn->xid); OutputPluginWrite(ctx, true);