A Guide to Building an Active-Active PostgreSQL Cluster

Dave Cramer

14 min read

One of the toughest challenges facing database engineers today is ensuring their data is always accessible so they can meet the high-availability requirements for their applications.

While this problem may seem trivial in the world of applications where one can have many instances behind geographically distributed load balancers, in the database world where there is only one version of the data globally, the problem faces many obstacles.

PostgreSQL replication has advanced considerably in recent major releases, including continuous improvements to streaming replication and the addition of logical replication in PostgreSQL 10. While these capabilities represent important enhancements to PostgreSQL, enabling users to address a wide variety of clustered architectures, they do not (easily) address the use cases where the application requires access to an updatable database in two or more geographic regions - often referred to as an "active-active" cluster.

More specifically, an active-active cluster is one where the application can write to any instance in the cluster and the data will be written to all of the instances in the cluster, enabling each instance in the cluster to be used to:

  • Provide near zero downtime as the new instance is already in a read/write state; there is no need to reconfigure it.

  • Provide near zero downtime upgrades from one version to another

  • Improve latency for users in geographically distributed clusters. By providing an instance physically closer to the user latency is reduced.

While there are a number of proprietary solutions that attempt to address active-active PostgreSQL requirements, this post and a series to follow provides users with potential reference architectures and configurations that enable active-active PostgreSQL configurations using entirely open source software.

This post is of course only one approach to deploying an active-active PostgreSQL cluster.  There are other ways to deploy an active-active setup with PostgreSQL.  I will cover some ways to do this in the future - stay tuned!

SymmetricDS

One open source solution that enables active-active database configurations is SymmetricDS.

SymmetricDS is an open source solution that provides data replication and synchronization for a variety of databases, including PostgreSQL.

Some of the more interesting features of SymmetricDS are:

  • A web-based transport layer. This is of particular interest for many cloud offerings that do not allow arbitrary ports to be opened.
  • Cross platform. The system is built using JDBC and Java so any host/database which supports Java can be synchronized
  • Cross database platform. Since it is using JDBC changes from different database vendors can be synchronized. This is particularly interesting for doing near zero downtime migrations to PostgreSQL from other database systems.

SymmetricDS is free software licensed under the GNU General Public License (GPL) version 3.0.

This guide will look into getting you started with SymmetricDS with two PostgreSQL databases in an active-active configuration.

Requirements

As mentioned above SymmetricDS is implemented in Java and will require a Java runtime. For the demonstrated environment, I used openjdk-8-jdk on Ubuntu 16.04 (Xenial).

Of course, we will require SymmetricDS - it can be downloaded from here: https://www.symmetricds.org/download

The resulting file at the time of writing was symmetric-server-3.9.15.zip, unzipping in my home directory resulted in

$ ls symmetric-server-3.9.15
bin change-log.txt conf databases engines lib logs patches samples security sql tmp web

Last but not least, you will need to have PostgreSQL installed. SymmetricDS supports some fairly old versions of PostgreSQL. I tested this setup on both PostgreSQL 11 and PostgreSQL 10.

Getting Started

To get started, first create a database called sales on your PostgreSQL instance:

createdb -U postgres -O rep sales

To be overly specific, this command creates a database named sales that is owned by the user rep.

Now we need some tables to synchronize. In the sales database, let's create two tables that will help track sales:

CREATE TABLE item (
    id serial PRIMARY KEY,
    description text,
    price numeric (8,2)
);

and

CREATE TABLE sale (
    id serial PRIMARY KEY,
    item_id int REFERENCES item(id),
    price numeric(8,2)
);

SymmetricDS has the concept of engines. Each engine drives synchronization for a database. Think of the engine as the "identifier" for the database you want to synchronize with SymmetricDS.

In order to configure an engine we need to create a properties file in the engines subdirectory. At a minimum it requires the following configuration:

sync.url=http\://192.168.1.24\:31415/sync/sales
group.id=primary
db.init.sql=
registration.url=
db.driver=org.postgresql.Driver
db.user=rep
db.password=foo
db.url=jdbc\:postgresql\://192.168.1.24/sales?protocolVersion\=3&stringtype\=unspecified&socketTimeout\=300&tcpKeepAlive\=true
engine.name=sales
external.id=1
db.validation.query=select 1
cluster.lock.enabled=false

There are settings to connect to the database (specifically in this case, the JDBC driver): the user, password and the database that you wish to replicate

Additionally we have the name of the engine sales, the groupID and the externalID. We will explain the uses of the latter two options below.

At this point we can start the system using the sym_service command. You can execute this command from the directory you installed the SymmetricDS binaries into:

bin/sym_service start

If you correctly SymmetricDS, you should see output in your console similar to this:

Waiting for server to start

......

Started

Make sure you check the logs to see if there are any issues

_____ __ _ ____ _____
/ ___/ __ _____ __ ___ __ ___ _/ /_ ____(_)___ / __ | / ___/
__ \ / / / / _ \`_ \/ _ \`_ \/ _ \/_ __// __/ / __/ / / / / __ \
___/ // /_/ / // // / // // / __// / / / / / /_ / /_/ / ___/ /
/____/ __ /_//_//_/_//_//_/___/ _/ /_/ /_/__/ /_____/ /____/
/____/

+-----------------------------------------------------------------+

| Copyright (C) 2007-2018 JumpMind, Inc. |
|
| Licensed under the GNU General Public License version 3. |
| This software comes with ABSOLUTELY NO WARRANTY. |
| See http://www.gnu.org/licenses/gpl.html |
+-----------------------------------------------------------------+

2018-12-05 10:34:49,937 INFO [startup] [SymmetricWebServer] [main] About to start SymmetricDS web server on host:port 0.0.0.0:31415
2018-12-05 10:34:50,373 INFO [sales] [AbstractSymmetricEngine] [symmetric-engine-startup-1] Initializing connection to database
2018-12-05 10:34:50,658 INFO [sales] [JdbcDatabasePlatformFactory] [symmetric-engine-startup-1] Detected database 'PostgreSQL', version '11', protocol 'postgresql'
2018-12-05 10:34:50,672 INFO [sales] [JdbcDatabasePlatformFactory] [symmetric-engine-startup-1] The IDatabasePlatform being used is org.jumpmind.db.platform.postgresql.PostgreSqlDatabasePlatform
2018-12-05 10:34:50,798 INFO [sales] [PostgreSqlSymmetricDialect] [symmetric-engine-startup-1] The DbDialect being used is org.jumpmind.symmetric.db.postgresql.PostgreSqlSymmetricDialect
2018-12-05 10:34:50,856 INFO [sales] [ExtensionService] [symmetric-engine-startup-1] Found 0 extension points from the database that will be registered
2018-12-05 10:34:50,861 INFO [sales] [StagingManager] [symmetric-engine-startup-1] The staging directory was initialized at the following location: tmp/sales
2018-12-05 10:34:51,730 INFO [sales] [ExtensionService] [symmetric-engine-startup-1] Found 0 extension points from the database that will be registered
2018-12-05 10:34:51,731 INFO [sales] [ClientExtensionService] [symmetric-engine-startup-1] Found 7 extension points from spring that will be registered
2018-12-05 10:34:51,740 INFO [sales] [AbstractSymmetricEngine] [symmetric-engine-startup-1] Initializing SymmetricDS database
2018-12-05 10:34:51,740 INFO [sales] [PostgreSqlSymmetricDialect] [symmetric-engine-startup-1] Checking if SymmetricDS tables need created or altered

When SymmetricDS starts up, it connects to the database specified in the properties file and the first time will create the tables it requires for synchronization.

For reference, the relevant tables are:

  • sym_node : Identifies the node and configures things like node id, node group, external id, sync url
  • sym_node_identity : unique identity for this node
  • sym_trigger : specify which tables are being replicated and which router to use
  • sym_router : create a “router” to route tables to synchronize

We will need to add some data to these tables in order to have our instances synchronize correctly with each other. First, we create a node group link:

INSERT INTO sym_node_group_link (source_node_group_id,target_node_group_id,data_event_action)
VALUES ('primary','primary','P');

Then, we create a route:

INSERT INTO sym_router (router_id,source_node_group_id,target_node_group_id,router_type,router_expression,sync_on_update,sync_on_insert,sync_on_delete,use_source_catalog_schema,create_time,last_update_by,last_update_time)
VALUES ('primary_2_primary', 'primary', 'primary', 'default', NULL, 1, 1, 1, 0, CURRENT_TIMESTAMP, 'console', CURRENT_TIMESTAMP);

Finally we add some of the parameters required to manage the synchronization:

INSERT INTO sym_parameter (external_id, node_group_id, param_key, param_value, create_time, last_update_by, last_update_time)
VALUES ('ALL', 'ALL', 'push.thread.per.server.count', '10', CURRENT_TIMESTAMP, 'console', CURRENT_TIMESTAMP);

INSERT INTO sym_parameter (external_id, node_group_id, param_key, param_value)
VALUES ('ALL', 'ALL', 'job.pull.period.time.ms', 2000);

INSERT INTO sym_parameter (external_id, node_group_id, param_key, param_value)
VALUES ('ALL', 'ALL', 'job.push.period.time.ms', 2000);

At this point the router is setup. We then need to add some triggers so SymmetricDS understands what it should do when the targets tables receive modifications:

INSERT INTO sym_trigger (trigger_id, source_schema_name, source_table_name, channel_id, sync_on_update, sync_on_insert, sync_on_delete, sync_on_update_condition, sync_on_insert_condition, sync_on_delete_condition, last_update_time, create_time)
VALUES ('public.item', 'public', 'item', 'default', 1, 1, 1, '1=1', '1=1', '1=1', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP);

INSERT INTO sym_trigger (trigger_id, source_schema_name, source_table_name, channel_id, sync_on_update, sync_on_insert, sync_on_delete, sync_on_update_condition, sync_on_insert_condition, sync_on_delete_condition, last_update_time, create_time)
VALUES ('public.sale', 'public', 'sale', 'default', 1, 1, 1, '1=1', '1=1', '1=1', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP);

INSERT INTO sym_trigger_router (trigger_id, router_id, enabled, initial_load_order, create_time, last_update_time)
VALUES ('public.item', 'primary_2_primary', 1, 10, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP);

INSERT INTO sym_trigger_router (trigger_id, router_id, enabled, initial_load_order, create_time, last_update_time)
VALUES ('public.sale', 'primary_2_primary', 1, 10, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP);

We can see there are now triggers on the item, and sale table

\d item

Table "public.item"
Column       | Type         | Collation | Nullable | Default
-------------+--------------+-----------+----------+----------------------------------
id           | integer      |           | not null | nextval('item_id_seq'::regclass)
description  | text         |           |          |
price        | numeric(8,2) |           |          |

Indexes:
    "item_pkey" PRIMARY KEY, btree (id)
Referenced by:
    TABLE "sale" CONSTRAINT "sale_item_id_fkey" FOREIGN KEY (item_id) REFERENCES item(id)
Triggers:
    sym_on_d_for_pblctm_prmry AFTER DELETE ON item FOR EACH ROW EXECUTE PROCEDURE fsym_on_d_for_pblctm_prmry()
    sym_on_i_for_pblctm_prmry AFTER INSERT ON item FOR EACH ROW EXECUTE PROCEDURE fsym_on_i_for_pblctm_prmry()
    sym_on_u_for_pblctm_prmry AFTER UPDATE ON item FOR EACH ROW EXECUTE PROCEDURE fsym_on_u_for_pblctm_prmry()

\d sale

Table "public.sale"
Column   | Type         | Collation | Nullable | Default
---------+--------------+-----------+----------+----------------------------------
id       | integer      |           | not null | nextval('sale_id_seq'::regclass)
item_id  | integer      |           |          |
price    | numeric(8,2) |           |          |

Indexes:
    "sale_pkey" PRIMARY KEY, btree (id)
Foreign-key constraints:
    "sale_item_id_fkey" FOREIGN KEY (item_id) REFERENCES item(id)
Triggers:
    sym_on_d_for_pblcsl_prmry AFTER DELETE ON sale FOR EACH ROW EXECUTE PROCEDURE fsym_on_d_for_pblcsl_prmry()
    sym_on_i_for_pblcsl_prmry AFTER INSERT ON sale FOR EACH ROW EXECUTE PROCEDURE fsym_on_i_for_pblcsl_prmry()
    sym_on_u_for_pblcsl_prmry AFTER UPDATE ON sale FOR EACH ROW EXECUTE PROCEDURE fsym_on_u_for_pblcsl_prmry()

Great! We now have our first active database set up.

Setting Up the Second Active Database

Let's create a second active database so we can create our active-active environment. Let's create a database called sales2:

createdb -U postgres -O rep sales2

We will then create a sales2.properties file with the following configuration:

db.connection.properties=
db.password=foo
sync.url=http\://192.168.1.27\:31415/sync/sales2
group.id=primary
db.init.sql=
db.driver=org.postgresql.Driver
db.user=rep
engine.name=sales2
external.id=sales2
db.validation.query=select 1
cluster.lock.enabled=false
registration.url=http\://192.168.1.24\:31415/sync/sales2
db.url=jdbc\:postgresql\://192.168.1.27/sales2?protocolVersion\=3&stringtype\=unspecified&socketTimeout\=300&tcpKeepAlive\=true

Just like before, you can start SymmetricDS for the second instance by running the following command:

bin/sym_service start

When inspecting the logs this time, you should find a line that looks something like:

Using registration URL of http://192.168.1.24:31415/sync/sales2/registration?nodeGroupId=primary&externalId=sales2&syncURL=http%3A%2F%2F192.168.1.27%3A31415%2Fsync%2Fsales2&schemaVersion=%3F&databaseType=PostgreSQL&databaseVersion=10.6&symmetricVersion=3.9.15&deploymentType=server&hostName=ubuntu2&ipAddress=192.168.1.27

This URL provides an identifier that allows the two PostgreSQL instances to become aware of each other via SymmetricDS. On the first host (sales) you will execute the following command:

bin/symadmin -e sales2 open-registration primary sales2

This command instructs the node to open registration for a node with an external id of sales2 to register into the primary group. When we check the logs, we can see that the triggers have been synchronized from the first server to the second and our node has been registered.

2018-12-06 09:36:29,856 INFO [sales2] [TriggerRouterService] [sales2-job-4] Done synchronizing triggers
2018-12-06 09:36:29,889 INFO [sales2] [RegistrationService] [sales2-job-4] Successfully registered node [id=sales2]

Further confirmation can be seen in the sym_node table on the first server.

When you execute the following query on the sales2 instance:

SELECT node_id, node_group_id, external_id FROM sym_node;

You should see the following:

node_id  | node_group_id | external_id

---------+---------------+-------------
1        | primary       | 1
sales    | primary       | sales

At this point, all of the sym_* tables are synced, but the item and sale table need to be moved over

On the first node, we can execute:

bin/symadmin -e sales2 send-schema -n sales2

After that executes, go to the sales2 node and you can see the schema has been imported. For example, here is the item table:

Table "public.item"
Column       | Type         | Collation | Nullable | Default
-------------+--------------+-----------+----------+----------------------------------
id           | integer      |           | not null | nextval('item_id_seq'::regclass)
description  | text         |           |          |
price        | numeric(8,2) |           |          |

Indexes:
    "item_pkey" PRIMARY KEY, btree (id)
Referenced by:
    TABLE "sale" CONSTRAINT "sale_item_id_fkey" FOREIGN KEY (item_id) REFERENCES item(id)
Triggers:
    sym_on_d_for_pblctm_prmry AFTER DELETE ON item FOR EACH ROW EXECUTE PROCEDURE fsym_on_d_for_pblctm_prmry()
    sym_on_i_for_pblctm_prmry AFTER INSERT ON item FOR EACH ROW EXECUTE PROCEDURE fsym_on_i_for_pblctm_prmry()
    sym_on_u_for_pblctm_prmry AFTER UPDATE ON item FOR EACH ROW EXECUTE PROCEDURE fsym_on_u_for_pblctm_prmry()

Now we are ready to see if we can replicate data from the sales node to the sales2 node.

Testing Active-Active Replication

Let's see if we can write data on the sales node and have it appear in the sales2 instance. On the sales node, execute the following query:

INSERT INTO item (description, price) VALUES ('screw', .05);

If everything is setup and in working order, on the sales2 node, you should see the following when you execute the query below:

TABLE item;

id  | description | price
----+-------------+-------
1   | screw       | 0.05

(If you are newer to PostgreSQL, TABLE is another way to say SELECT * FROM)

So the big question, can we INSERT data into sales2 and have it appear in sales?

On sales2, execute the following command:

INSERT INTO item (description, price) VALUES ('hammer', 10);

First, let's inspect what data is presents in sales2:

TABLE item;

id  | description | price
----+-------------+-------
1   | screw       | 0.05
2   | hammer      | 10.00

Now, execute the following on sales:

TABLE item;

and you should see:

id  | description | price
----+-------------+-------
1   | screw       | 0.05
2   | hammer      | 10.00

And there you have it folks we now have simple active-active replication!

Managing Conflicting Writes

One of the challenges of running an active-active database systems is dealing with conflicting writes, e.g. when two nodes try to update the same row at the same time with different values. Fortunately, SymmetricDS has multiple strategies for conflict resolution.

There are a number of detection mechanisms:

  • USE_PK_DATA, this is only used for inserts and if the primary key exists in the insert the conflict will be detected.
  • USE_CHANGED_DATA

as well as manual mechanism:

  • MANUAL: puts the error information in the sym_incoming_error table

Let's test what happens If we set the detection mechanism to USE_PK_DATA, and resolution to MANUAL and then attempt to insert on one instance. On sales, running the following query:

INSERT INTO item(id, description, price) VALUES (5, 'nut', 4);

And on sales2, run the following query:

INSERT INTO item(id, description, price) VALUES (5, 'bolt', 5);

In both sales and sales2, we see a row get inserted into the sym_incoming_error table. On the sales instance we see the data that was inserted into sales2. On sales, run the following query:

TABLE sym_incoming_error;

The output should look like:

 batch_id | node_id | failed_row_number | failed_line_number | target_catalog_name | target_schema_name | target_table_name | event_type | binary_encoding | column_names         | pk_column_names | row_data          | old_data | cur_data | resolve_data | resolve_ignore | conflict_id | create_time | last_update_by | last_update_time

----------+---------+-------------------+--------------------+---------------------+--------------------+-------------------+------------+-----------------+----------------------+-----------------+-------------------+----------+----------+--------------+----------------+-------------+-------------------------+----------------+-------------------------
       27 | sales2  | 1                 | 1                  |                     |                    | item              | I          | BASE64          | id,description,price | id              | "5","bolt","5.00" |          |          |              | 0              | item

And on the second instance we see the first instances data.

 batch_id | node_id | failed_row_number | failed_line_number | target_catalog_name | target_schema_name | target_table_name | event_type | binary_encoding | column_names         | pk_column_names | row_data          | old_data | cur_data | resolve_data | resolve_ignore | conflict_id | create_time | last_update_by | last_update_time

----------+---------+-------------------+--------------------+---------------------+--------------------+-------------------+------------+-----------------+----------------------+-----------------+-------------------+----------+----------+--------------+----------------+-------------+-------------------------+----------------+-------------------------
       67 | 1       | 1                 | 1                  |                     |                    | item              | I          | BASE64          | id,description,price | id              | "5","nut","4.00"  |          |          |              | 0              | item

At this point it is up to the user to resolve the conflict. No more data will be synchronized on this table until the conflict is resolved. This is done by manually updating the row in question and then setting the value in the resolve_ignore to 1.

Current Limitations

One problem that I glossed over here is that sequences are not replicated so it is possible to get:

insert into item (description, price) values ('hammer', 10);
ERROR: duplicate key value violates unique constraint "item_pkey"DETAIL: Key (id)=(4) already exists.

The reason this occurs is that on the other node an entry has been made with an id of 4.

A very simplistic solution to this is to set the sequences so that they do not overlap. For instance, in the sales database, you can do the following, assuming we want to start at 4 on item_id_seq :

ALTER SEQUENCE item_id_seq RESTART 4 INCREMENT 2;

At this point, the next value in the sequence will 6 only return even numbers.

And on the sales2 node execute:

ALTER SEQUENCE item_id_seq RESTART 5 INCREMENT 2;

The next value will be 7 and only give us odd numbers.

You will also notice that in this setup, by default the sym_* tables are in the public schema with all of the rest of the tables. The recommended way to change this is to alter the replication user to use a different schema. You can do so by running the following command on each instance:

ALTER USER {user name} SET search_path to {schema name};

and adjust {user name} and {schema name} to be your replication user and schema where your sym_* tables live, respectively.

NOTE: it is important to set use_source_catalog_schema to 1 in the router to make sure that any data routed goes to the same schema as the source. By default, the above replication user will not see any schema but the sym schema set above.

For more information about active-active PostgreSQL clusters and other ways to build out high-availability environments, please visit: https://www.crunchydata.com/products/crunchy-high-availability-postgresql

Avatar for Dave Cramer

Written by

Dave Cramer

January 29, 2019 More by this author