Skytools database scripting framework & PgQ


In this post we will look at the skytools scripting framework in general and also look into writing simple queue consumers. There are quite a lot of tasks in the database that don’t need immediate completion and can aswell run in the background. A simple example for this would be sending out e-mails of user creation / password reminders etc. Usually these kind of batch jobs are done by using a queue table that the script fetches data from and after completion removes the row from table. PgQ enables us to do this even more conveniently and is a lot more effective performancewise.

PgQ is the queueing solution that empowers the londiste replication. When you have londiste installed in your database you have also PgQ installed.
So first of all let’s bring out a few key points why it’s better to use PgQ instead of queue tables:

  • Performance : cleanup is done by rotating between 3 tables and using truncate to get rid of old data, no need for delete queries
  • Scalability : one PgQ queue can have basically unlimited consumers that keep their own high watermark and share the data
  • Retry queue: if a queue message can not be processed instantly it can be moved to the retry queue that will automatically reinsert the events into main queue later

We will not look at any of above topics in this post but i promise i will do this sometime in the future…


If you already have londiste installed you don’t have to do anything.
If not locate the txid.sql & pgq.sql file inside the skytools frame and load it into the database in the same order
Create configuration file for the ticker and put the ticker daemon running. If you don’t know what i’m talking about look at the ticker setup section in clustering with plproxy part II
Create the queue

select * from pgq.create_queue('mailer');

This should generate the result of ’1′ indicating that Markos brain was in set_no_documentation = 1 mode while writing code :P
More informative would be to do a select on the pgq.queue or in older versions pgq.queue_config table:

queries=# select * from pgq.queue;
-[ RECORD 1 ]------------+------------------------------
queue_id | 1
queue_name | mailer
queue_ntables | 3
queue_cur_table | 0
queue_rotation_period | 02:00:00
queue_switch_step1 | 1057688
queue_switch_step2 | 1057688
queue_switch_time | 2007-10-19 18:57:55.775194+03
queue_external_ticker | f
queue_ticker_max_count | 500
queue_ticker_max_lag | 00:00:03
queue_ticker_idle_period | 00:01:00
queue_data_pfx | pgq.event_1
queue_event_seq | pgq.event_1_id_seq
queue_tick_seq | pgq.event_1_tick_seq

A peek inside the queue

The queue data itself is in 3+1 tables the pgq.event_X table is the table that the other 3 inherit:

queries=# \dt pgq.event_
pgq.event_1 pgq.event_1_0 pgq.event_1_1 pgq.event_1_2 pgq.event_template

So if you want to look at the data inside the queue the main table is enough.
The event itself is simple it consists of 2 fields that are filled by the user ev_type & ev_data both of which are text fields.
An example of ev_type field values could be ‘I’,’U’,’D’ for replication actions (Insert, Update, Delete)
Ev data contains all the data that you want to send, it can be a single value but we usually go for urlencoded strings:


as the tools in skytools framework support this format.

Putting messages to queue

For inserting events we ourselves usually use the following trick:
create a dummy table that is used only to define the events structure

queries=# create schema queue;
queries=# create table queue.welcome_email(username text, language text, firstname text, lastname text);

add a pgq.logutriga to the table, what pgq.logutriga does is it urlencodes the inserted column field pairs: column1=value1&column2=value2..
and inserts it to the queue given as the parameter. The other parameter is either ‘SKIP’ or ‘OK’. Skip meaning that data is discarded after trigger has processed it. ‘OK’ meaning that data will be actually inserted to the table. Both of them are useful but mostly you don’t actually need the data for any other reasons than just debugging.

queries=# CREATE TRIGGER ins_to_queue BEFORE INSERT ON queue.welcome_email FOR EACH ROW EXECUTE PROCEDURE pgq.logutriga('mailer', 'welcome_email', 'SKIP');

or if you don’t have the latest PgQ version then:

queries=# CREATE TRIGGER ins_to_queue BEFORE INSERT ON queue.welcome_email FOR EACH ROW EXECUTE PROCEDURE pgq.logutriga('mailer','SKIP');

So finally we come to the actual event creation:

queries=# insert into queue.welcome_email (username, language, firstname, lastname) values ('kristokaiv','Kristo','Kaiv','');
queries=# select * from pgq.event_1 where ev_data like '%kristokaiv%';
-[ RECORD 1 ]------------------------------------------------------------------------------------
ev_id | 2
ev_time | 2007-10-19 19:56:53.353656+03
ev_txid | 1057748
ev_owner |
ev_retry |
ev_type | I:
ev_data | username=kristokaiv&language=Kristo&firstname=Kaiv&
ev_extra1 | queue.welcome_email
ev_extra2 |
ev_extra3 |
ev_extra4 |

As you can see the for every column in our dummy table we have the inserted value and everything is nicely encoded. Also the name of the table on which the trigger resides is added to one of the extra columns. This enables us to use multiple dummy queue tables to insert to one queue table.

If you don’t like this solution much you could or would like to use different encoding you are absolutely free to do this. When dummy tables are not your favourite weapon of choice but urlencoding is fine then you could for example create the following function:

CREATE OR REPLACE FUNCTION public.urlencode (text, text)  RETURNS text AS $$
    import skytools 
    key_value = {args[0]:args[1]}
    return skytools.db_urlencode (key_value)

And later on explicitly add the key, value pairs together:

perform pgq.insert_event('mailer','welcome_email',urlencode('username','kristo.kaiv')||'&'||urlencode('email',''));

but this dummy table based solution is imho quite easy to maintain.

consuming the event

This is what our consumer looks like:

import sys, os, pgq, skytools

class Mailer(pgq.Consumer):
    def sendWelcomeMail(self, params):
        """try to send mail, return true on success, false on failure"""
        return True

    def process_batch(self, src_db, batch_id, ev_list):
        for ev in ev_list:
            d = skytools.db_urldecode(
            self.log.debug ("event : %s | type : %s | inserted by : %s" % (d, ev.type, ev.extra1))
            if not self.sendWelcomeMail(d):

if __name__ == '__main__':
    script = Mailer("mailer_daemon","src_db",sys.argv[1:])

Quite short isn’t it?
I didn’t actually remove the mail sending part but it was never there. I admit i have absolutely no idea how to send an e-mail from python. Sending them however isn’t probably also the topic you are interested in, so let’s skip it.
The parameters that are given to the pgq.Consumer are:

  • name of the configuration section in config file : “mailer_daemon”
  • name of parameter from config file that contains the connection string to PgQ database : “src_db”
  • command line arguments

Our configuration file looks like this

job_name = mailer_daemon
src_db = dbname=queries
pgq_queue_name = mailer
logfile = %(job_name)s.log
pidfile = %(job_name)

running the script & monitoring

Start the script with:

python mailer.conf -v

-v means all self.log.debug info will be also displayed, this is useful for debugging what your script does. To run the script as a background process use the key -d. If you start the process in background be sure you have a pid file defined in the script config.
The script will start producing output that looks like this:

mbpro:~/temp kristokaiv$ python mailer.conf -v
2007-10-19 20:59:59,968 7634 DEBUG Attaching
2007-10-19 21:00:00,002 7634 DEBUG event : {'hi!': None} | type : 4 | inserted by : None
2007-10-19 21:00:00,002 7634 DEBUG event : {'username': 'kristokaiv', 'lastname': '', 'firstname': 'Kaiv', 'language': 'Kristo'} | type : I: | inserted by : queue.welcome_email
2007-10-19 21:00:00,011 7634 INFO {count: 2, duration: 0.0260310173035}
2007-10-19 21:00:00,018 7634 INFO {count: 0, duration: 0.00694489479065}

You can monitor the queue status in the following way:

mbpro:~/temp kristokaiv$ ticker2.ini status
Postgres version: 8.2.3   PgQ version: 2.1.5

Event queue                                    Rotation        Ticker   TLag
mailer                                          3/7200s    500/3s/60s     2s

If you need to automate reporting the queue statuses you can find it by calling

select * from pgq.get_queue_info();

in the database. This concludes the short introduction to writing queue based batch jobs using PgQ and skytools framework

About these ads

About this entry