var Promise = require('bluebird') var uuid = require('node-uuid') module.exports = function TransactionServiceFactory(socket, TransactionError) { var transactionService = {} function createChannel() { return 'tx.' + uuid.v4() } function MultiTargetTransaction(targets, options) { var pending = Object.create(null) , results = [] , channel = createChannel() function doneListener(someChannel, data) { if (someChannel === channel) { pending[data.source].done(data) } } function progressListener(someChannel, data) { if (someChannel === channel) { pending[data.source].progress(data) } } function cancelListener(someChannel, data) { if (someChannel === channel) { Object.keys(pending).forEach(function(source) { pending[source].cancel(data) }) } } socket.on('tx.done', doneListener) socket.on('tx.progress', progressListener) socket.on('tx.cancel', cancelListener) this.channel = channel this.results = results this.promise = Promise.settle(targets.map(function(target) { var result = new options.result(target) var pendingResult = new PendingTransactionResult(result) pending[options.id ? target[options.id] : target.id] = pendingResult results.push(result) return pendingResult.promise })) .finally(function() { socket.removeListener('tx.done', doneListener) socket.removeListener('tx.progress', progressListener) socket.removeListener('tx.cancel', cancelListener) socket.emit('tx.cleanup', channel) }) .progressed(function() { return results }) .then(function() { return results }) } function SingleTargetTransaction(target, options) { var result = new options.result(target) , pending = new PendingTransactionResult(result) , channel = createChannel() function doneListener(someChannel, data) { if (someChannel === channel) { pending.done(data) } } function progressListener(someChannel, data) { if (someChannel === channel) { pending.progress(data) } } function cancelListener(someChannel, data) { if (someChannel === channel) { pending.cancel(data) } } socket.on('tx.done', doneListener) socket.on('tx.progress', progressListener) socket.on('tx.cancel', cancelListener) this.channel = channel this.result = result this.results = [result] this.promise = pending.promise .finally(function() { socket.removeListener('tx.done', doneListener) socket.removeListener('tx.progress', progressListener) socket.removeListener('tx.cancel', cancelListener) socket.emit('tx.cleanup', channel) }) .progressed(function() { return result }) .then(function() { return result }) } function PendingTransactionResult(result) { var resolver = Promise.defer() , seq = 0 , last = Infinity , unplaced = [] function readQueue() { var message , foundAny = false while (seq <= last && (message = unplaced[seq])) { unplaced[seq] = void 0 if (seq === last) { result.success = message.success if (message.body) { result.body = JSON.parse(message.body) } if (result.success) { if (message.data) { result.lastData = result.data[seq] = message.data } resolver.resolve(result) } else { result.lastData = result.error = message.data resolver.reject(new TransactionError(result)) } return } else { if (message.progress) { result.progress = message.progress } } foundAny = true result.lastData = result.data[seq++] = message.data } if (foundAny) { resolver.progress(result) } } this.progress = function(message) { unplaced[message.seq] = message readQueue() } this.done = function(message) { last = message.seq unplaced[message.seq] = message readQueue() } this.cancel = function(message) { if (!result.settled) { last = message.seq = seq unplaced[message.seq] = message readQueue() } } this.result = result this.promise = resolver.promise.finally(function() { result.settled = true result.progress = 100 }) } function TransactionResult(source) { this.source = source this.settled = false this.success = false this.progress = 0 this.error = null this.data = [] this.lastData = null this.body = null } function DeviceTransactionResult(device) { TransactionResult.call(this, device) this.device = this.source } DeviceTransactionResult.prototype = Object.create(TransactionResult) DeviceTransactionResult.constructor = DeviceTransactionResult transactionService.create = function(target, options) { if (options && !options.result) { options.result = TransactionResult } if (Array.isArray(target)) { return new MultiTargetTransaction(target, options || { result: DeviceTransactionResult , id: 'serial' }) } else { return new SingleTargetTransaction(target, options || { result: DeviceTransactionResult , id: 'serial' }) } } transactionService.punch = function(channel) { var resolver = Promise.defer() function punchListener(someChannel) { if (channel === someChannel) { resolver.resolve(channel) } } socket.on('tx.punch', punchListener) socket.emit('tx.punch', channel) return resolver.promise .timeout(5000) .finally(function() { socket.removeListener('tx.punch', punchListener) }) } return transactionService }