Phone: +972 54 305 7642 Email: info@postgrespro.co.il

PostgreSQL Blog

Postgres Miktzoanim
  • 13 Jul

    PostreSQL and RabbitMQ.

    Introduction

    I want to share the experience of using PostgreSQL and RabbitMQ. The matter is that recently we had a question of guaranteed, transactional delivery of messages from DB to the queue of RabbitMQ. In addition, it was required to do this as quickly as possible. I think you are aware of the fact that PostgreSQL has a remarkable opportunity for extensibility. And in particular, he has an extension that can work with RabbitMQ. It is called  pg_amqp.

    Installation

    pg_amqp provides stored procedures in PostgreSQL for sending to amqp. The extension works fine at the application logic level: by rolling back the transaction to PostgreSQL – the data in amqp will not get caught. And if the commit – get.

    Install the extension:

    git clone https://github.com/omniti-labs/pg_amqp.git
    cd pg_amqp
    make
    make install

    Add this line to my postgresql.conf file:

    shared_preload_libraries = 'pg_amqp.so'
    
    

    And create extension:

    CREATE EXTENSION amqp;

    In the database was added the scheme “amqp”.  In the schema we have once table and four functions:

    CREATE TABLE amqp.broker
    (broker_id serial NOT NULL,
     host text NOT NULL,
     port integer NOT NULL DEFAULT 5672,
     vhost text,
     username text,
     password text,
     CONSTRAINT broker_pkey PRIMARY KEY (broker_id, host, port));

    broker_id – serial number

    host – RabbitMQ hosts

    port – RabbitMQ port

    vhost – virtual host

    username – RabbitMQ user name

    password – RabbitMQ user password

    amqp.publish(broker_id integer, exchange character varying, routing_key character varying,
     message character varying, delivery_mode integer DEFAULT NULL::integer, content_type character varying DEFAULT NULL::character varying, reply_to character varying DEFAULT NULL::character varying,
     correlation_id character varying DEFAULT NULL::character varying)
    
    amqp.autonomous_publish(broker_id integer, exchange character varying, routing_key character varying,
     message character varying, delivery_mode integer DEFAULT NULL::integer, content_type character varying DEFAULT NULL::character varying, reply_to character varying DEFAULT NULL::character varying,
     correlation_id character varying DEFAULT NULL::character varying)
    
    amqp.exchange_declare(broker_id integer, exchange character varying, exchange_type character varying,
     passive boolean, durable boolean, auto_delete boolean DEFAULT false)
    
    amqp.disconnect(broker_id integer)

    Adding broker:

    INSERT INTO amqp.broker(host, port, vhost, username, password)
     VALUES ('10.0.0.100', 5672, '/', 'user', 'pass');

    We will use pg_amqp for push notifications. For example:

    BEGIN;
    UPDATE products SET name = milk, quantity = 10 , price= 10 RETURNING amqp.publish(1, 'pushDBEx', '', CONCAT(name,' now costs', price::TEXT, ' dollars' );
    SELECT amqp.disconnect(1);
    COMMIT;

    We will use pg_amqp for autonomous transactions. For example:

    BEGIN; 
    INSERT INTO products (name, quantity,price) VALUES ('milk', 10, 10 );
    SELECT amqp.publish(1, 'pushDBEx', '', CONCAT(name,' now costs', price::TEXT, ' dollars' );
    SELECT amqp.disconnect(1); 
    ROLLBACK;

    Despite the rollback recording, it will still be sent to the queue.

    In fact, the extension does not guarantee that the message will fall into amqp. Inside, only a sequential transaction commit first in PostgreSQL, and then in amqp. And if the connection with amqp is lost between the two commits, the message will be lost. Despite the fact that the probability of such an event is small, the lost packages will be.

    For those who are not allowed to lose 0.01% of packets, we can advise additionally to save messages in the PostgreSQL table and send again messages to another queue with a slight delay. Thus, by checking the data of several queues, you can reduce the probability of message loss.

    Conclusion

    PostgreSQL has an interesting extension pg_amqp to work with the RabbitMQ queue. This can be useful if you need to be sure to send the results of executing database commands, as well as for autonomous transactions (such as auditing) in PostgreSQL.

0 comment

Leave a reply