diff --git a/lib/db/index.js b/lib/db/index.js index 5eaa2833..0d5c0988 100644 --- a/lib/db/index.js +++ b/lib/db/index.js @@ -101,6 +101,21 @@ db.connect = (function() { } })() +// Verifies that we can form a connection. Useful if it's necessary to make +// sure that a handler doesn't run at all if the database is on a break. In +// normal operation connections are formed lazily. In particular, this was +// an issue with the processor unit, as it started processing messages before +// it was actually truly able to save anything to the database. This lead to +// lost messages in certain situations. +db.ensureConnectivity = function(fn) { + return function() { + var args = [].slice.call(arguments) + return db.connect().then(function() { + return fn.apply(null, args) + }) + } +} + // Close connection, we don't really care if it hasn't been created yet or not db.close = function() { return db.connect().then(function(conn) { diff --git a/lib/units/processor/index.js b/lib/units/processor/index.js index dfe464c1..d2a866c1 100644 --- a/lib/units/processor/index.js +++ b/lib/units/processor/index.js @@ -4,12 +4,13 @@ var logger = require('../../util/logger') var wire = require('../../wire') var wirerouter = require('../../wire/router') var wireutil = require('../../wire/util') +var db = require('../../db') var dbapi = require('../../db/api') var lifecycle = require('../../util/lifecycle') var srv = require('../../util/srv') var zmqutil = require('../../util/zmqutil') -module.exports = function(options) { +module.exports = db.ensureConnectivity(function(options) { var log = logger.createLogger('processor') if (options.name) { @@ -241,4 +242,4 @@ module.exports = function(options) { } }) }) -} +})