Saturday, December 1, 2012

Audit trail with Postgresql hstore data type

Making audit trail on Postgres is just a walk in the park with hstore data type.


test=# SELECT 'apple=>1,orange=>6,guava=>8'::hstore;
                  hstore                   
-------------------------------------------
 "apple"=>"1", "guava"=>"8", "orange"=>"6"
(1 row)



So what is hstore? This is the explanation from Postgres documentation:

This module implements the hstore data type for storing sets of key/value pairs within a single PostgreSQL value. This can be useful in various scenarios, such as rows with many attributes that are rarely examined, or semi-structured data. Keys and values are simply text strings.


To enumerate those key value pairs, use the each function:

test=# select * from each(
(SELECT 'apple=>1,orange=>6,guava=>8'::hstore));
  key   | value 
--------+-------
 apple  | 1
 guava  | 8
 orange | 6
(3 rows)


Do a light bulb lit up on your head? Yeah me too, I'm so using that hstore to do an audit trail on Postgres trigger. But before we head to that, please do note note that you cannot reduce an existing table columns to an hstore type directly:

test=# with sample_row as 
(select 'Lennon'::text as lastname, 'John'::text as firstname, 1940 as birth_year) 
select hstore(*) from sample_row;

ERROR:  function hstore() does not exist
LINE 1: ...n'::text as firstname, 1940 as birth_year) select hstore(*) ...
                                                             ^
HINT:  No function matches the given name and argument types. You might need to add explicit type casts.

To reduce all the columns of a table row to one column, just pass the table name as the parameter to hstore:

test=# with sample_row as 
(select 'Lennon'::text as lastname, 'John'::text as firstname, 1940 as birth_year) 
select hstore(sample_row) from sample_row;
                             hstore                              
-----------------------------------------------------------------
 "lastname"=>"Lennon", "firstname"=>"John", "birth_year"=>"1940"
(1 row)

Better yet, give the table an alias:
test=# with sample_row as 
(select 'Lennon'::text as lastname, 'John'::text as firstname, 1940 as birth_year) 
select hstore(p) from sample_row p;
                             hstore                              
-----------------------------------------------------------------
 "lastname"=>"Lennon", "firstname"=>"John", "birth_year"=>"1940"
(1 row)


Now that the table columns is now reduced to an hstore data type. We could enumerate them now using each function:

test=# with sample_row as 
(select 'Lennon'::text as lastname, 'John'::text as firstname, 1940 as birth_year) 
select * from each((select hstore(p) from sample_row p));
    key     | value  
------------+--------
 lastname   | Lennon
 firstname  | John
 birth_year | 1940
(3 rows)


Armed with this knowledge, we could now proceed to make an audit trail for a table.


First, we create the audit trail table:

create extension hstore; 
create sequence group_op;

create table data_trail
(
data_trail_id serial not null primary key,
table_name text not null,
table_op text not null,
group_op bigint not null,
field_name text not null,
current_value text,
old_value text
);

Then create a sample table:
create table person
(
 person_id serial not null primary key,
 lastname text not null,
 firstname text not null,
 nickname text null,
 favorite_number int null
);

group_op is for grouping related operation on a table.


Now let's create the audit trail trigger, the insert trigger is the easiest of the bunch, here's how to handle INSERT audit:

create or replace function log_insert() returns trigger
as
$$
declare s bigint;
begin
 s := nextval('group_op');

 insert into data_trail(table_name, table_op, group_op, field_name, current_value)
 select TG_TABLE_NAME, TG_OP, s, x.key, x.value 
 from each(hstore(new.*)) as x;
 return new;
end;
$$ language 'plpgsql';

I prefer to use the new.* to new, so as to fully-qualify that reserved identifier, you may have a field named new in your table.

To wire that trigger on the table:
create trigger log_insert_trigger
after insert on product
for each row
execute procedure log_insert();

Finally to test:
insert into person(lastname,firstname,nickname, favorite_number) values
('lennon','john winston','john',default),
('mccartney','james paul','paul',default),
('harrison','george',default,default),
('starr','richard','ringo', 10);

Here's the content of person table:
test=# select * from person;
 person_id | lastname  |  firstname   | nickname | favorite_number 
-----------+-----------+--------------+----------+-----------------
         1 | lennon    | john winston | john     |                
         2 | mccartney | james paul   | paul     |                
         3 | harrison  | george       |          |                
         4 | starr     | richard      | ringo    |              10
(4 rows)

Here's the content of data trail:
test=# select * from data_trail;
 data_trail_id | table_name | table_op | group_op |   field_name    | current_value | old_value 
---------------+------------+----------+----------+-----------------+---------------+-----------
             1 | person     | INSERT   |        1 | lastname        | lennon        | 
             2 | person     | INSERT   |        1 | nickname        | john          | 
             3 | person     | INSERT   |        1 | firstname       | john winston  | 
             4 | person     | INSERT   |        1 | person_id       | 1             | 
             5 | person     | INSERT   |        1 | favorite_number |               | 
             6 | person     | INSERT   |        2 | lastname        | mccartney     | 
             7 | person     | INSERT   |        2 | nickname        | paul          | 
             8 | person     | INSERT   |        2 | firstname       | james paul    | 
             9 | person     | INSERT   |        2 | person_id       | 2             | 
            10 | person     | INSERT   |        2 | favorite_number |               | 
            11 | person     | INSERT   |        3 | lastname        | harrison      | 
            12 | person     | INSERT   |        3 | nickname        |               | 
            13 | person     | INSERT   |        3 | firstname       | george        | 
            14 | person     | INSERT   |        3 | person_id       | 3             | 
            15 | person     | INSERT   |        3 | favorite_number |               | 
            16 | person     | INSERT   |        4 | lastname        | starr         | 
            17 | person     | INSERT   |        4 | nickname        | ringo         | 
            18 | person     | INSERT   |        4 | firstname       | richard       | 
            19 | person     | INSERT   |        4 | person_id       | 4             | 
            20 | person     | INSERT   |        4 | favorite_number | 10            | 
(20 rows)

As we can see, all those related inserts can be identified by the group_op.


Now let's try to make an audit trail for UPDATE:
create or replace function log_update() returns trigger
as
$$
declare 
 s bigint;


begin
 s := nextval('group_op');

        
 with changes as
 (
  select n.key, n.value as new_value, o.value as old_value
  from each(hstore(new.*)) as n
  join each(hstore(old.*)) as o using(key)
  where n.value is distinct from o.value 
 )
 insert into data_trail(table_name, table_op, group_op, field_name, current_value, old_value)
 select TG_TABLE_NAME, TG_OP, s, key, new_value, old_value
 from changes

 union

 select TG_TABLE_NAME, TG_OP, s, key, n.value, o.value
 from each(hstore(new.*)) as n
 join each(hstore(old.*)) as o using(key)
 where 
  exists(select * from changes)
  and 
  key in 
   (select column_name 
   from information_schema.key_column_usage
   where constraint_name = 
    (select constraint_name 
    from information_schema.table_constraints 
    where (table_schema,table_name,constraint_type) = (TG_TABLE_SCHEMA,TG_TABLE_NAME,'PRIMARY KEY')
    )
   );
   
  
 return new;
end;
$$ language 'plpgsql';

We only log those columns that changed, and we also include the primary key regardless if the modified row had its primary key value changed or not, hence the need to use the information_schema.key_column_usage.

Then to wire the trigger on person table
create trigger log_update_trigger
after update on person
for each row
execute procedure log_update();

To test if our logic works, issue this command:
update person set lastname = 'lennon', firstname = 'john ono', nickname = 'john', favorite_number = 9 where person_id = 1;

We only change the firstname and favorite number, the lastname and nickname stayed the same:

Then check the audit trail:
test=# select * from data_trail;
 data_trail_id | table_name | table_op | group_op |   field_name    | current_value |  old_value   
---------------+------------+----------+----------+-----------------+---------------+--------------
             1 | person     | INSERT   |        1 | lastname        | lennon        | 
             2 | person     | INSERT   |        1 | nickname        | john          | 
             3 | person     | INSERT   |        1 | firstname       | john winston  | 
             4 | person     | INSERT   |        1 | person_id       | 1             | 
             5 | person     | INSERT   |        1 | favorite_number |               | 
             6 | person     | INSERT   |        2 | lastname        | mccartney     | 
             7 | person     | INSERT   |        2 | nickname        | paul          | 
             8 | person     | INSERT   |        2 | firstname       | james paul    | 
             9 | person     | INSERT   |        2 | person_id       | 2             | 
            10 | person     | INSERT   |        2 | favorite_number |               | 
            11 | person     | INSERT   |        3 | lastname        | harrison      | 
            12 | person     | INSERT   |        3 | nickname        |               | 
            13 | person     | INSERT   |        3 | firstname       | george        | 
            14 | person     | INSERT   |        3 | person_id       | 3             | 
            15 | person     | INSERT   |        3 | favorite_number |               | 
            16 | person     | INSERT   |        4 | lastname        | starr         | 
            17 | person     | INSERT   |        4 | nickname        | ringo         | 
            18 | person     | INSERT   |        4 | firstname       | richard       | 
            19 | person     | INSERT   |        4 | person_id       | 4             | 
            20 | person     | INSERT   |        4 | favorite_number | 10            | 
            21 | person     | UPDATE   |        5 | person_id       | 1             | 1
            22 | person     | UPDATE   |        5 | favorite_number | 9             | 
            23 | person     | UPDATE   |        5 | firstname       | john ono      | john winston
(23 rows)

As we can see, person_id #1's lastname and nickname columns are not logged even they are included in the UPDATE command. This is a desirable functionality on audit trail trigger as most ORMs merely set every fields of the table regardless if the table class' column(s) was changed or not.

And also, try to re-issue the same update command above, nothing will be logged on your audit trail table. The update trigger above was designed to handle that scenario.


Finally for the delete trigger:
create or replace function log_delete() returns trigger
as
$$
declare s bigint;
begin
 s := nextval('group_op');
 insert into data_trail(table_name, table_op, group_op, field_name, current_value)
 select TG_TABLE_NAME, TG_OP, s, x.key, x.value 
 from each(hstore(old.*)) as x
 where x.key in 
  (select column_name 
  from information_schema.key_column_usage
  where constraint_name = 
   (select constraint_name 
   from information_schema.table_constraints 
   where (table_schema,table_name,constraint_type) = (TG_TABLE_SCHEMA,TG_TABLE_NAME,'PRIMARY KEY')
   )
  );

 return old;
end;
$$ language 'plpgsql';


create trigger log_delete_trigger
after delete on person
for each row
execute procedure log_delete();

We just logged the key of the deleted row. All else are excluded. To test, issue this command:
delete from person where person_id = 1;

Output:
test=# select * from data_trail;
 data_trail_id | table_name | table_op | group_op |   field_name    | current_value |  old_value   
---------------+------------+----------+----------+-----------------+---------------+--------------
             1 | person     | INSERT   |        1 | lastname        | lennon        | 
             2 | person     | INSERT   |        1 | nickname        | john          | 
             3 | person     | INSERT   |        1 | firstname       | john winston  | 
             4 | person     | INSERT   |        1 | person_id       | 1             | 
             5 | person     | INSERT   |        1 | favorite_number |               | 
             6 | person     | INSERT   |        2 | lastname        | mccartney     | 
             7 | person     | INSERT   |        2 | nickname        | paul          | 
             8 | person     | INSERT   |        2 | firstname       | james paul    | 
             9 | person     | INSERT   |        2 | person_id       | 2             | 
            10 | person     | INSERT   |        2 | favorite_number |               | 
            11 | person     | INSERT   |        3 | lastname        | harrison      | 
            12 | person     | INSERT   |        3 | nickname        |               | 
            13 | person     | INSERT   |        3 | firstname       | george        | 
            14 | person     | INSERT   |        3 | person_id       | 3             | 
            15 | person     | INSERT   |        3 | favorite_number |               | 
            16 | person     | INSERT   |        4 | lastname        | starr         | 
            17 | person     | INSERT   |        4 | nickname        | ringo         | 
            18 | person     | INSERT   |        4 | firstname       | richard       | 
            19 | person     | INSERT   |        4 | person_id       | 4             | 
            20 | person     | INSERT   |        4 | favorite_number | 10            | 
            21 | person     | UPDATE   |        5 | person_id       | 1             | 1
            22 | person     | UPDATE   |        5 | favorite_number | 9             | 
            23 | person     | UPDATE   |        5 | firstname       | john ono      | john winston
            24 | person     | DELETE   |        6 | person_id       | 1             | 
(24 rows)



Finally, it maybe already obvious, but it's worth noting to point out that the above trigger is not tied to one table only:

create table product
(
 product_id serial not null primary key,
 product_name text not null,
 product_description text not null
);


create trigger log_insert_trigger
after insert on product
for each row
execute procedure log_insert();


create trigger log_update_trigger
after update on product
for each row
execute procedure log_update();



create trigger log_delete_trigger
after delete on product
for each row
execute procedure log_delete();



insert into product(product_name, product_description) values
('keyboard','interface between you and the machine'),
('mouse','interface between you and Angry Bird');


Output on audit trail:
            13 | person     | INSERT   |        3 | firstname           | george                                | 
            14 | person     | INSERT   |        3 | person_id           | 3                                     | 
            15 | person     | INSERT   |        3 | favorite_number     |                                       | 
            16 | person     | INSERT   |        4 | lastname            | starr                                 | 
            17 | person     | INSERT   |        4 | nickname            | ringo                                 | 
            18 | person     | INSERT   |        4 | firstname           | richard                               | 
            19 | person     | INSERT   |        4 | person_id           | 4                                     | 
            20 | person     | INSERT   |        4 | favorite_number     | 10                                    | 
            21 | person     | UPDATE   |        5 | person_id           | 1                                     | 1
            22 | person     | UPDATE   |        5 | favorite_number     | 9                                     | 
            23 | person     | UPDATE   |        5 | firstname           | john ono                              | john winston
            24 | person     | DELETE   |        6 | person_id           | 1                                     | 
            25 | product    | INSERT   |        7 | product_id          | 5                                     | 
            26 | product    | INSERT   |        7 | product_name        | keyboard                              | 
            27 | product    | INSERT   |        7 | product_description | interface between you and the machine | 
            28 | product    | INSERT   |        8 | product_id          | 6                                     | 
            29 | product    | INSERT   |        8 | product_name        | mouse                                 | 
            30 | product    | INSERT   |        8 | product_description | interface between you and Angry Bird  | 
(30 rows)

3 comments:

  1. Hi,
    I'm also currently working on this audit trail trigger.Mine trigger works perfectly if i manually make entry to the DB but fails for Update Event when testing from Frontend.

    Any suggestions for this issue???

    ReplyDelete
  2. Hey.. can you please change the table name from Product to Person

    create trigger log_insert_trigger
    after insert on product
    for each row
    execute procedure log_insert();

    ReplyDelete