Node

Node.js multiple query transactions

When using relational databases it’s a common pattern to perform several data manipulation commands (insert/update/delete) within a transaction. A typical use case of this is to insert into two tables related by foreign keys, where you need the auto-incremented “id” value from the first insert to use in the second. Often people will use stored procedures to do this, but what if you wanted to do it in Node? This can get complex due to the nested callbacks that naive node.js code can generate, and shockingly is even the recommended way to do things from the node-pg documentation. Here’s how it generally looks:

db.begin_transaction(function (err, client) { // assume we have implemented a "begin_transaction" function
    if (err) return cb(err);
    client.query("INSERT INTO Table1 (col1, col2) VALUES ($1, $2) RETURNING id", table1_values, function (err, results) {
        if (err) {
            return client.rollback_transaction(function () {
                cb(err);
            }
        }
        var table1_id = results.rows[0].id; 
        var values = [ table1_id, table2_values[0], table2_values[1] ];
        client.query("INSERT INTO Table2 (table1_id, col1, col2) VALUES ($1, $2, $3) RETURNING id", values, function (err, results) {
            if (err) {
                return client.rollback_transaction(function () {
                    cb(err);
                }
            }
            client.commit_transaction(function (err) {
                if (err) {
                    return cb(err);
                }
                cb(null, table1_id, results.rows[0].id);
            });
        });
    });
});

As you can imagine this gets much worse when you get to 3 queries or more.

But let’s break it down – what are we looking at here? It’s effectively a “waterfall”, very much like async.waterfall. So let’s implement the equivalent to async.waterfall in our database library:

exports.waterfall = function waterfall (tasks, cb) {
    pg.connect(connstring, function (err, client, done) {
        if (err) {
            return cb(err);
        }

        client.query(begin_transaction, function (err) {
            if (err) {
                done();
                return cb(err);
            }
            
            var wrapIterator = function (iterator) {
                return function (err) {
                    if (err) {
                        client.query(rollback_transaction, function () {
                            done();
                            cb(err);
                        });
                    }
                    else {
                        var args = Array.prototype.slice.call(arguments, 1);
                        var next = iterator.next();
                        if (next) {
                            args.unshift(client);
                            args.push(wrapIterator(next));
                        }
                        else {
                            args.unshift(client);
                            args.push(function (err, results) {
                                var args = Array.prototype.slice.call(arguments, 0);
                                if (err) {
                                    client.query(rollback_transaction, function () {
                                        done();
                                        cb(err);
                                    });
                                }
                                else {
                                    client.query(commit_transaction, function () {
                                        done();
                                        cb.apply(null, args);
                                    })
                                }
                            })
                        }
                        async.setImmediate(function () {
                            iterator.apply(null, args);
                        });
                    }
                };
            };
            wrapIterator(async.iterator(tasks))();
        });
    });
}

While it looks complicated, most of the code was cut-and-paste from the internal implementation of async.waterfall.

So now we have that, what does it look like in use?

db.waterfall([
        function (client, cb) {
            client.query("INSERT INTO Table1 (col1, col2) VALUES ($1, $2) RETURNING id", table1_values, cb);
        },
        function (client, results, cb) {
            var table1_id = results.rows[0].id;
            var values = [ table1_id, table2_values[0], table2_values[1] ];
            client.query("INSERT INTO Table2 (table1_id, col1, col2) VALUES ($1, $2, $3) RETURNING id", values, cb);
        },
    ], cb);

Much simpler and easier to debug.

Let me know if you find this useful.

Standard

One thought on “Node.js multiple query transactions

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s