Restart FrameProducer if it dies unexpectedly.

This commit is contained in:
Simo Kinnunen
2015-04-16 16:34:58 +09:00
parent 53bd852d48
commit 88162e8a33
2 changed files with 77 additions and 20 deletions

View File

@@ -15,6 +15,7 @@ var FrameParser = require('./util/frameparser')
var FrameConfig = require('./util/frameconfig')
var BroadcastSet = require('./util/broadcastset')
var StateQueue = require('./util/statequeue')
var RiskyStream = require('./util/riskystream')
module.exports = syrup.serial()
.dependency(require('../../support/adb'))
@@ -53,15 +54,17 @@ module.exports = syrup.serial()
this.runningState = FrameProducer.STATE_STARTING
this._startService().bind(this)
.then(function(out) {
this.output = out
return this._readOutput(out)
this.output = new RiskyStream(out)
.on('unexpectedEnd', this._outputEnded.bind(this))
return this._readOutput(this.output.stream)
})
.then(function() {
return this._connectService()
})
.then(function(socket) {
this.socket = socket
return this._readBanner(socket)
this.socket = new RiskyStream(socket)
.on('unexpectedEnd', this._socketEnded.bind(this))
return this._readBanner(this.socket.stream)
})
.then(function(banner) {
this.banner = banner
@@ -83,10 +86,10 @@ module.exports = syrup.serial()
case FrameProducer.STATE_STARTED:
if (this.desiredState.next() === FrameProducer.STATE_STOPPED) {
this.runningState = FrameProducer.STATE_STOPPING
this._disconnectService().bind(this)
this._disconnectService(this.socket).bind(this)
.timeout(2000)
.then(function() {
return this._stopService().timeout(10000)
return this._stopService(this.output).timeout(10000)
})
.then(function() {
this.runningState = FrameProducer.STATE_STOPPED
@@ -123,6 +126,17 @@ module.exports = syrup.serial()
this._ensureState()
}
FrameProducer.prototype.restart = function() {
switch (this.runningState) {
case FrameProducer.STATE_STARTED:
case FrameProducer.STATE_STARTING:
this.desiredState.push(FrameProducer.STATE_STOPPED)
this.desiredState.push(FrameProducer.STATE_STARTED)
this._ensureState()
break
}
}
FrameProducer.prototype.updateRotation = function(rotation) {
if (this.frameConfig.rotation === rotation) {
log.info('Keeping %d as current frame producer rotation', rotation)
@@ -149,14 +163,17 @@ module.exports = syrup.serial()
}
FrameProducer.prototype._configChanged = function() {
switch (this.runningState) {
case FrameProducer.STATE_STARTED:
case FrameProducer.STATE_STARTING:
this.desiredState.push(FrameProducer.STATE_STOPPED)
this.desiredState.push(FrameProducer.STATE_STARTED)
this._ensureState()
break
}
this.restart()
}
FrameProducer.prototype._socketEnded = function() {
log.warn('Connection to minicap ended unexpectedly')
this.restart()
}
FrameProducer.prototype._outputEnded = function() {
log.warn('Shell keeping minicap running ended unexpectedly')
this.restart()
}
FrameProducer.prototype._startService = function() {
@@ -203,10 +220,12 @@ module.exports = syrup.serial()
return tryConnect(5, 100)
}
FrameProducer.prototype._disconnectService = function() {
FrameProducer.prototype._disconnectService = function(socket) {
log.info('Disconnecting from minicap service')
var socket = this.socket
if (!socket || socket.ended) {
return Promise.resolve(true)
}
var endListener
return new Promise(function(resolve/*, reject*/) {
@@ -221,16 +240,19 @@ module.exports = syrup.serial()
})
}
FrameProducer.prototype._stopService = function() {
FrameProducer.prototype._stopService = function(output) {
log.info('Stopping minicap service')
if (!output || output.ended) {
return Promise.resolve(true)
}
var pid = this.banner ? this.banner.pid : -1
var output = this.output
function waitForEnd() {
var endListener
return new Promise(function(resolve/*, reject*/) {
output.on('end', endListener = function() {
output.expectEnd().on('end', endListener = function() {
resolve(true)
})
})
@@ -286,7 +308,7 @@ module.exports = syrup.serial()
}
FrameProducer.prototype._readFrames = function() {
var parser = this.socket.pipe(new FrameParser())
var parser = this.socket.stream.pipe(new FrameParser())
var emit = this.emit.bind(this)
function tryRead() {

View File

@@ -0,0 +1,35 @@
var util = require('util')
var EventEmitter = require('eventemitter3').EventEmitter
function RiskyStream(stream) {
this.endListener = function() {
this.ended = true
this.stream.removeListener('end', this.endListener)
if (!this.expectingEnd) {
this.emit('unexpectedEnd')
}
this.emit('end')
}.bind(this)
this.stream = stream
.on('end', this.endListener)
this.expectingEnd = false
this.ended = false
}
util.inherits(RiskyStream, EventEmitter)
RiskyStream.prototype.end = function() {
this.expectEnd()
return this.stream.end()
}
RiskyStream.prototype.expectEnd = function() {
this.expectingEnd = true
return this
}
module.exports = RiskyStream