diff --git a/lib/units/reaper/index.js b/lib/units/reaper/index.js index 1b33b9d3..c99eb690 100644 --- a/lib/units/reaper/index.js +++ b/lib/units/reaper/index.js @@ -56,18 +56,6 @@ module.exports = function(options) { lifecycle.fatal() }) - sub.on('message', wirerouter() - .on(wire.DevicePresentMessage, function(channel, message) { - ttlset.bump(message.serial, Date.now()) - }) - .on(wire.DeviceHeartbeatMessage, function(channel, message) { - ttlset.bump(message.serial, Date.now()) - }) - .on(wire.DeviceAbsentMessage, function(channel, message) { - ttlset.drop(message.serial) - }) - .handler()) - ttlset.on('timeout', function(serial) { log.info('Reaping device "%s" due to heartbeat timeout', serial) push.send([ @@ -78,20 +66,32 @@ module.exports = function(options) { ]) }) - dbapi.loadPresentDevices() - .then(function(cursor) { - return Promise.promisify(cursor.toArray, cursor)() - .then(function(list) { - var now = Date.now() - list.forEach(function(device) { - ttlset.bump(device.serial, now) + function loadInitialState() { + return dbapi.loadPresentDevices() + .then(function(cursor) { + return Promise.promisify(cursor.toArray, cursor)() + .then(function(list) { + var now = Date.now() + list.forEach(function(device) { + ttlset.bump(device.serial, now) + }) }) - }) - }) - .catch(function(err) { - log.error('Failed to load device list: ', err.stack) - lifecycle.fatal() - }) + }) + } + + function listenToChanges() { + sub.on('message', wirerouter() + .on(wire.DevicePresentMessage, function(channel, message) { + ttlset.bump(message.serial, Date.now()) + }) + .on(wire.DeviceHeartbeatMessage, function(channel, message) { + ttlset.bump(message.serial, Date.now()) + }) + .on(wire.DeviceAbsentMessage, function(channel, message) { + ttlset.drop(message.serial) + }) + .handler()) + } log.info('Reaping devices with no heartbeat') @@ -107,4 +107,6 @@ module.exports = function(options) { ttlset.stop() }) + + loadInitialState().then(listenToChanges) }