diff --git a/lib/roles/app.js b/lib/roles/app.js index 2c9fa71a..0e5a8c3b 100644 --- a/lib/roles/app.js +++ b/lib/roles/app.js @@ -478,13 +478,12 @@ module.exports = function(options) { } }) .catch(function(err) { - var reply = wireutil.reply('storage') log.error('Storage upload had an error', err.stack) leaveChannel(responseChannel) - push.send([ - channel - , reply.fail('fail') - ]) + socket.emit('tx.cancel', responseChannel, { + success: false + , data: 'fail_upload' + }) }) }) .on('forward.test', function(channel, responseChannel, data) { diff --git a/res/app/components/stf/control/transaction-service.js b/res/app/components/stf/control/transaction-service.js index 96197c25..c0bec94f 100644 --- a/res/app/components/stf/control/transaction-service.js +++ b/res/app/components/stf/control/transaction-service.js @@ -25,8 +25,17 @@ module.exports = function TransactionServiceFactory(socket) { } } + function cancelListener(someChannel, data) { + if (someChannel === channel) { + Object.keys(pending).forEach(function(source) { + pending[source].cancel(data) + }) + } + } + socket.on('tx.done', doneListener) socket.on('tx.progress', progressListener) + socket.on('tx.cancel', cancelListener) this.channel = channel this.results = results @@ -40,6 +49,7 @@ module.exports = function TransactionServiceFactory(socket) { .finally(function() { socket.removeListener('tx.done', doneListener) socket.removeListener('tx.progress', progressListener) + socket.removeListener('tx.cancel', cancelListener) socket.emit('tx.cleanup', channel) }) .progressed(function() { @@ -67,8 +77,15 @@ module.exports = function TransactionServiceFactory(socket) { } } + function cancelListener(someChannel, data) { + if (someChannel === channel) { + pending.cancel(data) + } + } + socket.on('tx.done', doneListener) socket.on('tx.progress', progressListener) + socket.on('tx.cancel', cancelListener) this.channel = channel this.result = result @@ -77,6 +94,7 @@ module.exports = function TransactionServiceFactory(socket) { .finally(function() { socket.removeListener('tx.done', doneListener) socket.removeListener('tx.progress', progressListener) + socket.removeListener('tx.cancel', cancelListener) socket.emit('tx.cleanup', channel) }) .progressed(function() { @@ -90,7 +108,7 @@ module.exports = function TransactionServiceFactory(socket) { function PendingTransactionResult(result) { var resolver = Promise.defer() , seq = 0 - , last = null + , last = Infinity , unplaced = [] resolver.promise.finally(function() { @@ -102,7 +120,7 @@ module.exports = function TransactionServiceFactory(socket) { var message , foundAny = false - while ((message = unplaced[seq])) { + while (seq <= last && (message = unplaced[seq])) { unplaced[seq] = void 0 if (seq === last) { @@ -149,6 +167,14 @@ module.exports = function TransactionServiceFactory(socket) { readQueue() } + this.cancel = function(message) { + if (!result.settled) { + last = message.seq = seq + unplaced[message.seq] = message + readQueue() + } + } + this.result = result this.promise = resolver.promise }