diff --git a/lib/units/device/plugins/screen/stream.js b/lib/units/device/plugins/screen/stream.js index be1eb02b..6a95de4f 100644 --- a/lib/units/device/plugins/screen/stream.js +++ b/lib/units/device/plugins/screen/stream.js @@ -15,6 +15,7 @@ var FrameParser = require('./util/frameparser') var FrameConfig = require('./util/frameconfig') var BroadcastSet = require('./util/broadcastset') var StateQueue = require('./util/statequeue') +var RiskyStream = require('./util/riskystream') module.exports = syrup.serial() .dependency(require('../../support/adb')) @@ -53,15 +54,17 @@ module.exports = syrup.serial() this.runningState = FrameProducer.STATE_STARTING this._startService().bind(this) .then(function(out) { - this.output = out - return this._readOutput(out) + this.output = new RiskyStream(out) + .on('unexpectedEnd', this._outputEnded.bind(this)) + return this._readOutput(this.output.stream) }) .then(function() { return this._connectService() }) .then(function(socket) { - this.socket = socket - return this._readBanner(socket) + this.socket = new RiskyStream(socket) + .on('unexpectedEnd', this._socketEnded.bind(this)) + return this._readBanner(this.socket.stream) }) .then(function(banner) { this.banner = banner @@ -83,10 +86,10 @@ module.exports = syrup.serial() case FrameProducer.STATE_STARTED: if (this.desiredState.next() === FrameProducer.STATE_STOPPED) { this.runningState = FrameProducer.STATE_STOPPING - this._disconnectService().bind(this) + this._disconnectService(this.socket).bind(this) .timeout(2000) .then(function() { - return this._stopService().timeout(10000) + return this._stopService(this.output).timeout(10000) }) .then(function() { this.runningState = FrameProducer.STATE_STOPPED @@ -123,6 +126,17 @@ module.exports = syrup.serial() this._ensureState() } + FrameProducer.prototype.restart = function() { + switch (this.runningState) { + case FrameProducer.STATE_STARTED: + case FrameProducer.STATE_STARTING: + this.desiredState.push(FrameProducer.STATE_STOPPED) + this.desiredState.push(FrameProducer.STATE_STARTED) + this._ensureState() + break + } + } + FrameProducer.prototype.updateRotation = function(rotation) { if (this.frameConfig.rotation === rotation) { log.info('Keeping %d as current frame producer rotation', rotation) @@ -149,14 +163,17 @@ module.exports = syrup.serial() } FrameProducer.prototype._configChanged = function() { - switch (this.runningState) { - case FrameProducer.STATE_STARTED: - case FrameProducer.STATE_STARTING: - this.desiredState.push(FrameProducer.STATE_STOPPED) - this.desiredState.push(FrameProducer.STATE_STARTED) - this._ensureState() - break - } + this.restart() + } + + FrameProducer.prototype._socketEnded = function() { + log.warn('Connection to minicap ended unexpectedly') + this.restart() + } + + FrameProducer.prototype._outputEnded = function() { + log.warn('Shell keeping minicap running ended unexpectedly') + this.restart() } FrameProducer.prototype._startService = function() { @@ -203,10 +220,12 @@ module.exports = syrup.serial() return tryConnect(5, 100) } - FrameProducer.prototype._disconnectService = function() { + FrameProducer.prototype._disconnectService = function(socket) { log.info('Disconnecting from minicap service') - var socket = this.socket + if (!socket || socket.ended) { + return Promise.resolve(true) + } var endListener return new Promise(function(resolve/*, reject*/) { @@ -221,16 +240,19 @@ module.exports = syrup.serial() }) } - FrameProducer.prototype._stopService = function() { + FrameProducer.prototype._stopService = function(output) { log.info('Stopping minicap service') + if (!output || output.ended) { + return Promise.resolve(true) + } + var pid = this.banner ? this.banner.pid : -1 - var output = this.output function waitForEnd() { var endListener return new Promise(function(resolve/*, reject*/) { - output.on('end', endListener = function() { + output.expectEnd().on('end', endListener = function() { resolve(true) }) }) @@ -286,7 +308,7 @@ module.exports = syrup.serial() } FrameProducer.prototype._readFrames = function() { - var parser = this.socket.pipe(new FrameParser()) + var parser = this.socket.stream.pipe(new FrameParser()) var emit = this.emit.bind(this) function tryRead() { diff --git a/lib/units/device/plugins/screen/util/riskystream.js b/lib/units/device/plugins/screen/util/riskystream.js new file mode 100644 index 00000000..b953e922 --- /dev/null +++ b/lib/units/device/plugins/screen/util/riskystream.js @@ -0,0 +1,35 @@ +var util = require('util') + +var EventEmitter = require('eventemitter3').EventEmitter + +function RiskyStream(stream) { + this.endListener = function() { + this.ended = true + this.stream.removeListener('end', this.endListener) + + if (!this.expectingEnd) { + this.emit('unexpectedEnd') + } + + this.emit('end') + }.bind(this) + + this.stream = stream + .on('end', this.endListener) + this.expectingEnd = false + this.ended = false +} + +util.inherits(RiskyStream, EventEmitter) + +RiskyStream.prototype.end = function() { + this.expectEnd() + return this.stream.end() +} + +RiskyStream.prototype.expectEnd = function() { + this.expectingEnd = true + return this +} + +module.exports = RiskyStream