Integrate new minicap along with a moderate rewrite. What's currently missing is recovering from socket death.

This commit is contained in:
Simo Kinnunen
2015-04-15 18:55:46 +09:00
parent 6fe4f8ae1b
commit 95e9dd0b82
43 changed files with 1138 additions and 438 deletions

View File

@@ -0,0 +1,81 @@
var util = require('util')
var syrup = require('stf-syrup')
var adbkit = require('adbkit')
var logger = require('../../../../util/logger')
var wire = require('../../../../wire')
var wireutil = require('../../../../wire/util')
/*jshint maxlen:90*/
module.exports = syrup.serial()
.dependency(require('../../support/adb'))
.dependency(require('../../support/router'))
.dependency(require('../../support/push'))
.dependency(require('../../support/storage'))
.dependency(require('../../resources/minicap'))
.dependency(require('../display'))
.define(function(options, adb, router, push, storage, minicap, display) {
var log = logger.createLogger('device:plugins:screen:capture')
var plugin = Object.create(null)
function projectionFormat() {
return util.format(
'%dx%d@%dx%d/%d'
, display.width
, display.height
, display.width
, display.height
, display.rotation
)
}
plugin.capture = function() {
log.info('Capturing screenshot')
var file = util.format('/data/local/tmp/minicap_%d.jpg', Date.now())
return minicap.run(util.format('-P %s -s >%s', projectionFormat(), file))
.then(adbkit.util.readAll)
.then(function() {
return adb.stat(options.serial, file)
})
.then(function(stats) {
if (stats.size === 0) {
throw new Error('Empty screenshot; possibly secure screen?')
}
return adb.pull(options.serial, file)
.then(function(transfer) {
return storage.store('image', transfer, {
filename: util.format('%s.jpg', options.serial)
, contentType: 'image/jpeg'
, knownLength: stats.size
})
})
})
.finally(function() {
return adb.shell(options.serial, ['rm', '-f', file])
.then(adbkit.util.readAll)
})
}
router.on(wire.ScreenCaptureMessage, function(channel) {
var reply = wireutil.reply(options.serial)
plugin.capture()
.then(function(file) {
push.send([
channel
, reply.okay('success', file)
])
})
.catch(function(err) {
log.error('Screen capture failed', err.stack)
push.send([
channel
, reply.fail(err.message)
])
})
})
return plugin
})

View File

@@ -0,0 +1,17 @@
var syrup = require('stf-syrup')
var _ = require('lodash')
module.exports = syrup.serial()
.define(function(options) {
var plugin = Object.create(null)
plugin.devicePort = 9002
plugin.publicPort = options.ports.pop()
plugin.publicUrl = _.template(options.screenWsUrlPattern)({
publicIp: options.publicIp
, publicPort: plugin.publicPort
, serial: options.serial
})
return plugin
})

View File

@@ -0,0 +1,394 @@
var util = require('util')
var Promise = require('bluebird')
var syrup = require('stf-syrup')
var WebSocketServer = require('ws').Server
var uuid = require('node-uuid')
var EventEmitter = require('eventemitter3').EventEmitter
var split = require('split')
var adbkit = require('adbkit')
var logger = require('../../../../util/logger')
var lifecycle = require('../../../../util/lifecycle')
var bannerutil = require('./util/banner')
var FrameParser = require('./util/frameparser')
var FrameConfig = require('./util/frameconfig')
var BroadcastSet = require('./util/broadcastset')
var StateQueue = require('./util/statequeue')
module.exports = syrup.serial()
.dependency(require('../../support/adb'))
.dependency(require('../../resources/minicap'))
.dependency(require('../display'))
.dependency(require('./options'))
.define(function(options, adb, minicap, display, screenOptions) {
var log = logger.createLogger('device:plugins:screen:stream')
var plugin = Object.create(null)
function FrameProducer(config) {
this.actionQueue = []
this.runningState = FrameProducer.STATE_STOPPED
this.desiredState = new StateQueue()
this.output = null
this.socket = null
this.banner = null
this.frameConfig = config
}
util.inherits(FrameProducer, EventEmitter)
FrameProducer.STATE_STOPPED = 1
FrameProducer.STATE_STARTING = 2
FrameProducer.STATE_STARTED = 3
FrameProducer.STATE_STOPPING = 4
FrameProducer.prototype._ensureState = function() {
switch (this.runningState) {
case FrameProducer.STATE_STARTING:
case FrameProducer.STATE_STOPPING:
// Just wait.
break
case FrameProducer.STATE_STOPPED:
if (this.desiredState.next() === FrameProducer.STATE_STARTED) {
this.runningState = FrameProducer.STATE_STARTING
this._startService().bind(this)
.then(function(out) {
this.output = out
return this._readOutput(out)
})
.then(function() {
return this._connectService()
})
.then(function(socket) {
this.socket = socket
return this._readBanner(socket)
})
.then(function(banner) {
this.banner = banner
return this._readFrames()
})
.then(function() {
this.runningState = FrameProducer.STATE_STARTED
this.emit('start')
})
.catch(function(err) {
this.runningState = FrameProducer.STATE_STOPPED
this.emit('error', err)
})
.finally(function() {
this._ensureState()
})
}
break
case FrameProducer.STATE_STARTED:
if (this.desiredState.next() === FrameProducer.STATE_STOPPED) {
this.runningState = FrameProducer.STATE_STOPPING
this._disconnectService().bind(this)
.timeout(2000)
.then(function() {
return this._stopService().timeout(10000)
})
.then(function() {
this.runningState = FrameProducer.STATE_STOPPED
this.emit('stop')
})
.catch(function(err) {
// In practice we _should_ never get here due to _stopService()
// being quite aggressive. But if we do, well... assume it
// stopped anyway for now.
this.runningState = FrameProducer.STATE_STOPPED
this.emit('error', err)
this.emit('stop')
})
.finally(function() {
this.output = null
this.socket = null
this.banner = null
this._ensureState()
})
}
break
}
}
FrameProducer.prototype.start = function() {
log.info('Requesting frame producer to start')
this.desiredState.push(FrameProducer.STATE_STARTED)
this._ensureState()
}
FrameProducer.prototype.stop = function() {
log.info('Requesting frame producer to stop')
this.desiredState.push(FrameProducer.STATE_STOPPED)
this._ensureState()
}
FrameProducer.prototype.updateRotation = function(rotation) {
if (this.frameConfig.rotation === rotation) {
log.info('Keeping %d as current frame producer rotation', rotation)
return
}
log.info('Setting frame producer rotation to %d', rotation)
this.frameConfig.rotation = rotation
this._configChanged()
}
FrameProducer.prototype.updateProjection = function(width, height) {
if (this.frameConfig.virtualWidth === width &&
this.frameConfig.virtualHeight === height) {
log.info(
'Keeping %dx%d as current frame producer projection', width, height)
return
}
log.info('Setting frame producer projection to %dx%d', width, height)
this.frameConfig.virtualWidth = width
this.frameConfig.virtualHeight = height
this._configChanged()
}
FrameProducer.prototype._configChanged = function() {
switch (this.runningState) {
case FrameProducer.STATE_STARTED:
case FrameProducer.STATE_STARTING:
this.desiredState.push(FrameProducer.STATE_STOPPED)
this.desiredState.push(FrameProducer.STATE_STARTED)
this._ensureState()
break
}
}
FrameProducer.prototype._startService = function() {
log.info('Launching screen service')
return minicap.run(util.format('-P %s', this.frameConfig.toString()))
.timeout(10000)
}
FrameProducer.prototype._readOutput = function(out) {
out.pipe(split()).on('data', function(line) {
var trimmed = line.toString().trim()
if (trimmed === '') {
return
}
if (/ERROR/.test(line)) {
log.fatal('minicap error: "%s"', line)
return lifecycle.fatal()
}
log.info('minicap says: "%s"', line)
})
}
FrameProducer.prototype._connectService = function() {
function tryConnect(times, delay) {
return adb.openLocal(options.serial, 'localabstract:minicap')
.timeout(10000)
.then(function(out) {
return out
})
.catch(function(err) {
if (/closed/.test(err.message) && times > 1) {
return Promise.delay(delay)
.then(function() {
return tryConnect(--times, delay * 2)
})
}
return Promise.reject(err)
})
}
log.info('Connecting to minicap service')
return tryConnect(5, 100)
}
FrameProducer.prototype._disconnectService = function() {
log.info('Disconnecting from minicap service')
var socket = this.socket
var endListener
return new Promise(function(resolve/*, reject*/) {
socket.on('end', endListener = function() {
resolve(true)
})
socket.end()
})
.finally(function() {
socket.removeListener('end', endListener)
})
}
FrameProducer.prototype._stopService = function() {
log.info('Stopping minicap service')
var pid = this.banner ? this.banner.pid : -1
var output = this.output
function waitForEnd() {
var endListener
return new Promise(function(resolve/*, reject*/) {
output.on('end', endListener = function() {
resolve(true)
})
})
.finally(function() {
output.removeListener('end', endListener)
})
}
function kindKill() {
if (pid <= 0) {
return Promise.reject(new Error('Minicap service pid is unknown'))
}
log.info('Sending SIGTERM to minicap')
return Promise.all([
waitForEnd()
, adb.shell(options.serial, ['kill', pid])
.then(adbkit.util.readAll)
.timeout(2000)
.return(true)
])
}
function forceKill() {
if (pid <= 0) {
return Promise.reject(new Error('Minicap service pid is unknown'))
}
log.info('Sending SIGKILL to minicap')
return Promise.all([
waitForEnd()
, adb.shell(options.serial, ['kill', '-9', pid])
.then(adbkit.util.readAll)
.timeout(2000)
.return(true)
])
}
function forceEnd() {
log.info('Ending minicap I/O as a last resort')
output.end()
return Promise.resolve(true)
}
return kindKill()
.catch(Promise.TimeoutError, forceKill)
.catch(forceEnd)
}
FrameProducer.prototype._readBanner = function(out) {
log.info('Reading minicap banner')
return bannerutil.read(out).timeout(2000)
}
FrameProducer.prototype._readFrames = function() {
var parser = this.socket.pipe(new FrameParser())
var emit = this.emit.bind(this)
function tryRead() {
for (var frame; (frame = parser.read());) {
emit('frame', frame)
}
}
tryRead()
parser.on('readable', tryRead)
}
function createServer() {
log.info('Starting WebSocket server on port %d', screenOptions.publicPort)
var wss = new WebSocketServer({
port: screenOptions.publicPort
, perMessageDeflate: false
})
var listeningListener, errorListener
return new Promise(function(resolve, reject) {
listeningListener = function() {
return resolve(wss)
}
errorListener = function(err) {
return reject(err)
}
wss.on('listening', listeningListener)
wss.on('error', errorListener)
})
.finally(function() {
wss.removeListener('listening', listeningListener)
wss.removeListener('error', errorListener)
})
}
return createServer()
.then(function(wss) {
var broadcastSet = new BroadcastSet()
var frameProducer = new FrameProducer(
new FrameConfig(display.properties, display.properties))
broadcastSet.on('nonempty', function() {
frameProducer.start()
})
broadcastSet.on('empty', function() {
frameProducer.stop()
})
display.on('rotationChange', function(newRotation) {
frameProducer.updateRotation(newRotation)
})
frameProducer.on('frame', function(frame) {
broadcastSet.each(function(ws) {
ws.send(frame, {
binary: true
})
})
})
frameProducer.on('error', function(err) {
log.fatal('Frame producer had an error', err.stack)
lifecycle.fatal()
})
wss.on('connection', function(ws) {
var id = uuid.v4()
ws.on('message', function(data) {
var match
if ((match = /^(on|off|(size) ([0-9]+)x([0-9]+))$/.exec(data))) {
switch (match[2] || match[1]) {
case 'on':
broadcastSet.insert(id, ws)
break
case 'off':
broadcastSet.remove(id)
break
case 'size':
frameProducer.updateProjection(+match[3], +match[4])
break
}
}
})
ws.on('close', function() {
broadcastSet.remove(id)
})
})
lifecycle.observe(function() {
wss.close()
})
lifecycle.observe(function() {
frameProducer.stop()
})
})
.return(plugin)
})

View File

@@ -0,0 +1,108 @@
var Promise = require('bluebird')
module.exports.read = function parseBanner(out) {
var tryRead
return new Promise(function(resolve, reject) {
var readBannerBytes = 0
var needBannerBytes = 2
var banner = out.banner = {
version: 0
, length: 0
, pid: 0
, realWidth: 0
, realHeight: 0
, virtualWidth: 0
, virtualHeight: 0
, orientation: 0
, quirks: 0
}
tryRead = function() {
for (var chunk; (chunk = out.read(needBannerBytes - readBannerBytes));) {
for (var cursor = 0, len = chunk.length; cursor < len;) {
if (readBannerBytes < needBannerBytes) {
switch (readBannerBytes) {
case 0:
// version
banner.version = chunk[cursor]
break
case 1:
// length
banner.length = needBannerBytes = chunk[cursor]
break
case 2:
case 3:
case 4:
case 5:
// pid
banner.pid +=
(chunk[cursor] << ((readBannerBytes - 2) * 8)) >>> 0
break
case 6:
case 7:
case 8:
case 9:
// real width
banner.realWidth +=
(chunk[cursor] << ((readBannerBytes - 6) * 8)) >>> 0
break
case 10:
case 11:
case 12:
case 13:
// real height
banner.realHeight +=
(chunk[cursor] << ((readBannerBytes - 10) * 8)) >>> 0
break
case 14:
case 15:
case 16:
case 17:
// virtual width
banner.virtualWidth +=
(chunk[cursor] << ((readBannerBytes - 14) * 8)) >>> 0
break
case 18:
case 19:
case 20:
case 21:
// virtual height
banner.virtualHeight +=
(chunk[cursor] << ((readBannerBytes - 18) * 8)) >>> 0
break
case 22:
// orientation
banner.orientation += chunk[cursor] * 90
break
case 23:
// quirks
banner.quirks = chunk[cursor]
break
}
cursor += 1
readBannerBytes += 1
if (readBannerBytes === needBannerBytes) {
return resolve(banner)
}
}
else {
reject(new Error(
'Supposedly impossible error parsing banner'
))
}
}
}
}
tryRead()
out.on('readable', tryRead)
})
.finally(function() {
out.removeListener('readable', tryRead)
})
}

View File

@@ -0,0 +1,40 @@
var util = require('util')
var EventEmitter = require('eventemitter3').EventEmitter
function BroadcastSet() {
this.set = Object.create(null)
this.count = 0
}
util.inherits(BroadcastSet, EventEmitter)
BroadcastSet.prototype.insert = function(id, ws) {
if (!(id in this.set)) {
this.set[id] = ws
this.count += 1
this.emit('insert', id)
if (this.count === 1) {
this.emit('nonempty')
}
}
}
BroadcastSet.prototype.remove = function(id) {
if (id in this.set) {
delete this.set[id]
this.count -= 1
this.emit('remove', id)
if (this.count === 0) {
this.emit('empty')
}
}
}
BroadcastSet.prototype.each = function(fn) {
return Object.keys(this.set).forEach(function(id) {
return fn(this.set[id])
}, this)
}
module.exports = BroadcastSet

View File

@@ -0,0 +1,22 @@
var util = require('util')
function FrameConfig(real, virtual) {
this.realWidth = real.width
this.realHeight = real.height
this.virtualWidth = virtual.width
this.virtualHeight = virtual.height
this.rotation = virtual.rotation
}
FrameConfig.prototype.toString = function() {
return util.format(
'%dx%d@%dx%d/%d'
, this.realWidth
, this.realHeight
, this.virtualWidth
, this.virtualHeight
, this.rotation
)
}
module.exports = FrameConfig

View File

@@ -0,0 +1,56 @@
var stream = require('stream')
var util = require('util')
function FrameParser() {
this.readFrameBytes = 0
this.frameBodyLength = 0
this.frameBody = new Buffer(0)
stream.Transform.call(this)
}
util.inherits(FrameParser, stream.Transform)
FrameParser.prototype._transform = function(chunk, encoding, done) {
var cursor, len, bytesLeft
for (cursor = 0, len = chunk.length; cursor < len;) {
if (this.readFrameBytes < 4) {
this.frameBodyLength +=
(chunk[cursor] << (this.readFrameBytes * 8)) >>> 0
cursor += 1
this.readFrameBytes += 1
}
else {
bytesLeft = len - cursor
if (bytesLeft >= this.frameBodyLength) {
this.frameBody = Buffer.concat([
this.frameBody
, chunk.slice(cursor, cursor + this.frameBodyLength)
])
this.push(this.frameBody)
cursor += this.frameBodyLength
this.frameBodyLength = this.readFrameBytes = 0
this.frameBody = new Buffer(0)
}
else {
// @todo Consider/benchmark continuation frames to prevent
// potential Buffer thrashing.
this.frameBody = Buffer.concat([
this.frameBody
, chunk.slice(cursor, len)
])
this.frameBodyLength -= bytesLeft
this.readFrameBytes += bytesLeft
cursor = len
}
}
}
return done()
}
module.exports = FrameParser

View File

@@ -0,0 +1,26 @@
function StateQueue() {
this.queue = []
}
StateQueue.prototype.next = function() {
return this.queue.shift()
}
StateQueue.prototype.push = function(state) {
var found = false
// Not super efficient, but this shouldn't be running all the time anyway.
for (var i = 0, l = this.queue.length; i < l; ++i) {
if (this.queue[i] === state) {
this.queue.splice(i + 1)
found = true
break
}
}
if (!found) {
this.queue.push(state)
}
}
module.exports = StateQueue