Предварительная настройка

Создадим базу данных.

α=> CREATE DATABASE replica_logical;
CREATE DATABASE
α=> \c replica_logical
You are now connected to database "replica_logical" as user "student".

Сначала создадим второй сервер как копию первого. Для этого выполним резервное копирование в каталог PGDATA второго сервера.

postgres$ rm -rf /var/lib/postgresql/10/beta/*
postgres$ pg_basebackup -U student --pgdata=/var/lib/postgresql/10/beta

Меняем порт и запускаем сервер:

postgres$ echo 'port = 5433' >> /var/lib/postgresql/10/beta/postgresql.auto.conf
student$ sudo pg_ctlcluster 10 beta start

Теперь на первом сервере создадим таблицу и заполним ее данными.

α=> CREATE TABLE test(id serial PRIMARY KEY, descr text);
CREATE TABLE
α=> INSERT INTO test(descr) VALUES ('Раз'), ('Два'), ('Три');
INSERT 0 3

Логическая репликация

Мы хотим настроить между серверами логическую репликацию таблицы test. Для этого нам понадобится изменить уровень журнала.

student$ psql -U postgres -c "ALTER SYSTEM SET wal_level = logical"
ALTER SYSTEM
student$ sudo pg_ctlcluster 10 alpha restart

На втором сервере таблицы test нет. Поскольку команды DDL не реплицируются, таблицу необходимо создать вручную. При этом таблица подписчика может содержать и дополнительные столбцы, если это необъодимо.

student$ psql -p 5433 -d replica_logical
β=> CREATE TABLE test(id serial PRIMARY KEY, descr text, additional text);
CREATE TABLE

На первом сервере создаем публикацию для таблицы test. Публикация относится к конкретной базе данных; в нее можно включить и несколько таблиц, а можно даже все таблицы сразу (FOR ALL TABLES).

student$ psql -d replica_logical
α=> CREATE PUBLICATION test_pub FOR TABLE test;
CREATE PUBLICATION
α=> \dRp+
                Publication test_pub
  Owner  | All tables | Inserts | Updates | Deletes 
---------+------------+---------+---------+---------
 student | f          | t       | t       | t
Tables:
    "public.test"


На втором сервере подписываемся на публикацию. При этом на публикующем сервере будет создан слот логической репликации.

Подписку может создать только суперпользователь. А роль для подключения к публикующему серверу долна иметь атрибуты REPLICAION и LOGIN, и должна иметь право чтения публикуемых таблиц - роль student подходит под эти требования.

β=> \c - postgres
You are now connected to database "replica_logical" as user "postgres".
β=> CREATE SUBSCRIPTION test_sub
CONNECTION 'port=5432 user=student dbname=replica_logical'
PUBLICATION test_pub;

NOTICE:  created replication slot "test_sub" on publisher
CREATE SUBSCRIPTION

β=> \c - student
You are now connected to database "replica_logical" as user "student".
β=> \dRs
            List of subscriptions
   Name   |  Owner   | Enabled | Publication 
----------+----------+---------+-------------
 test_sub | postgres | t       | {test_pub}
(1 row)


По умолчанию данные сначала синхронизируются между серверами, и только после этого запускается процесс репликации. Это выполняется "бесшовно" с гарантией того, что никакие изменения не будут потеряны.

β=> SELECT * FROM test;
 id | descr | additional 
----+-------+------------
  1 | Раз   | 
  2 | Два   | 
  3 | Три   | 
(3 rows)


Проверим, как работает репликация изменений.

α=> INSERT INTO test(descr) VALUES ('Четыре');
INSERT 0 1

β=> SELECT * FROM test;
 id | descr  | additional 
----+--------+------------
  1 | Раз    | 
  2 | Два    | 
  3 | Три    | 
  4 | Четыре | 
(4 rows)


Состояние подписки можно посмотреть в представлении:

β=> SELECT * FROM pg_stat_subscription \gx
-[ RECORD 1 ]---------+------------------------------
subid                 | 41010
subname               | test_sub
pid                   | 4004
relid                 | 
received_lsn          | 0/11030C6C
last_msg_send_time    | 2018-06-13 19:59:55.353334+03
last_msg_receipt_time | 2018-06-13 19:59:55.353694+03
latest_end_lsn        | 0/11030C6C
latest_end_time       | 2018-06-13 19:59:55.353334+03


К процессам сервера добавился logical replication worker (его номер указан в pg_stat_subscription.pid):

postgres$ ps -o pid,command --ppid `head -n 1 /var/lib/postgresql/10/beta/postmaster.pid`
  PID COMMAND
 3726 postgres: 10/beta: checkpointer process   
 3727 postgres: 10/beta: writer process   
 3728 postgres: 10/beta: wal writer process   
 3729 postgres: 10/beta: autovacuum launcher process   
 3730 postgres: 10/beta: stats collector process   
 3731 postgres: 10/beta: bgworker: logical replication launcher   
 4004 postgres: 10/beta: bgworker: logical replication worker for subscription 41010   
 4023 postgres: 10/beta: student replica_logical [local] idle

Заметим, что последовательности не реплицируются. На втором сервере создалась своя собственная последовательность:

β=> INSERT INTO test(descr) VALUES ('Пять - локально');
ERROR:  duplicate key value violates unique constraint "test_pkey"
DETAIL:  Key (id)=(1) already exists.

А вот так получится:

β=> INSERT INTO test VALUES (5, 'Пять - локально');
INSERT 0 1

Конфликты

Что произойдет, если значение с таким же ключом (5) появится на публикующем сервере?

α=> INSERT INTO test(descr) VALUES ('Пять');
INSERT 0 1
α=> INSERT INTO test(descr) VALUES ('Шесть');
INSERT 0 1

При репликации возникнет конфликт, и она будет приостановлена.

β=> SELECT * FROM test;
 id |      descr      | additional 
----+-----------------+------------
  1 | Раз             | 
  2 | Два             | 
  3 | Три             | 
  4 | Четыре          | 
  5 | Пять - локально | 
(5 rows)


Фактически, процесс logical replication worker будет периодически перезапускаться, проверяя, не устранен ли конфликт. Поэтому информация в pg_stat_subscription пропадает:

β=> SELECT * FROM pg_stat_subscription \gx
-[ RECORD 1 ]---------+---------
subid                 | 41010
subname               | test_sub
pid                   | 
relid                 | 
received_lsn          | 
last_msg_send_time    | 
last_msg_receipt_time | 
latest_end_lsn        | 
latest_end_time       | 


В журнал сообщений будут попадать записи о нарушении ограничений целостности:

postgres$ tail -n 3 /var/log/postgresql/postgresql-10-beta.log
2018-06-13 19:59:56.666 MSK [4004] ERROR:  duplicate key value violates unique constraint "test_pkey"
2018-06-13 19:59:56.666 MSK [4004] DETAIL:  Key (id)=(5) already exists.
2018-06-13 19:59:56.668 MSK [3724] LOG:  worker process: logical replication worker for subscription 41010 (PID 4004) exited with exit code 1

Чтобы разрешить этот конфликт, надо удалить конфликтующую строку из таблицы:

β=> DELETE FROM test WHERE id = 5;
DELETE 1

...и немного подождем.


Проверим:

β=> SELECT * FROM test;
 id | descr  | additional 
----+--------+------------
  1 | Раз    | 
  2 | Два    | 
  3 | Три    | 
  4 | Четыре | 
  5 | Пять   | 
  6 | Шесть  | 
(6 rows)

Данные появились, репликация восстановлена.


Триггеры на подписчике

На подписчике могут выполняться триггеры, но если просто создать триггер, то он не отработает. Это удобно, если на обоих серверах созданы одинаковые таблицы с одинаковым набором триггеров: в таком случае триггер уже отработал на публикующем сервере, его не надо выполнять на подписчике.

Попробуем.

β=> CREATE FUNCTION change_descr() RETURNS trigger AS $$
BEGIN
  NEW.additional := 'получено из публикации';
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION

β=> CREATE TRIGGER test_before_row
BEFORE INSERT OR UPDATE ON test
FOR EACH ROW
EXECUTE PROCEDURE change_descr();
CREATE TRIGGER

α=> INSERT INTO test(descr) VALUES ('Семь');
INSERT 0 1
β=> SELECT * FROM test;
 id | descr  | additional 
----+--------+------------
  1 | Раз    | 
  2 | Два    | 
  3 | Три    | 
  4 | Четыре | 
  5 | Пять   | 
  6 | Шесть  | 
  7 | Семь   | 
(7 rows)


Чтобы триггер работал, надо изменить таблицу:

β=> ALTER TABLE test ENABLE REPLICA TRIGGER test_before_row;
ALTER TABLE

α=> INSERT INTO test(descr) VALUES ('Восемь');
INSERT 0 1
β=> SELECT * FROM test;
 id | descr  |       additional       
----+--------+------------------------
  1 | Раз    | 
  2 | Два    | 
  3 | Три    | 
  4 | Четыре | 
  5 | Пять   | 
  6 | Шесть  | 
  7 | Семь   | 
  8 | Восемь | получено из публикации
(8 rows)

Можно сделать и так, чтобы триггеры срабатывали не только для репликации, но и локально (ENABLE ALWAYS TRIGGER). Различить эти ситуации можно с помощью функции pg_replication_origin_session_is_setup, но она требует прав суперпользователя.


Удаление подписки

Если репликация больше не нужна, надо удалить подписку - иначе на публикующем сервере останется открытым репликационный слот.

β=> \c - postgres
You are now connected to database "replica_logical" as user "postgres".
β=> DROP SUBSCRIPTION test_sub;
NOTICE:  dropped replication slot "test_sub" on publisher
DROP SUBSCRIPTION

Конец демонстрации.