Separate device functionality into plugins.

This commit is contained in:
Simo Kinnunen
2014-03-14 20:18:53 +09:00
parent d61a70fda5
commit 3d20b06f7f
24 changed files with 1277 additions and 700 deletions

View File

@@ -0,0 +1,25 @@
var syrup = require('syrup')
var adbkit = require('adbkit')
var logger = require('../../../util/logger')
var promiseutil = require('../../../util/promiseutil')
module.exports = syrup()
.define(function(options) {
var log = logger.createLogger('device:plugins:adb')
var adb = adbkit.createClient()
function ensureBootComplete() {
return promiseutil.periodicNotify(
adb.waitBootComplete(options.serial)
, 1000
)
.progressed(function() {
log.info('Waiting for boot to complete')
})
}
return ensureBootComplete()
.return(adb)
})

View File

@@ -0,0 +1,14 @@
var syrup = require('syrup')
var logger = require('../../../util/logger')
var ChannelManager = require('../../../wire/channelmanager')
module.exports = syrup()
.define(function(options, router) {
var log = logger.createLogger('device:plugins:channels')
var channels = new ChannelManager()
channels.on('timeout', function(channel) {
log.info('Channel "%s" timed out', channel)
})
return channels
})

View File

@@ -0,0 +1,16 @@
var syrup = require('syrup')
var logger = require('../../../util/logger')
module.exports = syrup()
.dependency(require('./http'))
.define(function(options, http) {
var log = logger.createLogger('device:plugins:display')
function fetch() {
log.info('Fetching display info')
return http.getDisplay(0)
}
return fetch()
})

View File

@@ -0,0 +1,20 @@
var syrup = require('syrup')
var wire = require('../../../wire')
var wireutil = require('../../../wire/util')
module.exports = syrup()
.dependency(require('./push'))
.define(function(options, push) {
function heartbeat() {
push.send([
wireutil.heartbeat
, wireutil.envelope(new wire.DeviceHeartbeatMessage(
options.serial
))
])
setTimeout(heartbeat, options.heartbeatInterval)
}
heartbeat()
})

View File

@@ -0,0 +1,144 @@
var util = require('util')
var assert = require('assert')
var http = require('http')
var Promise = require('bluebird')
var syrup = require('syrup')
var request = Promise.promisifyAll(require('request'))
var httpProxy = require('http-proxy')
var split = require('split')
var logger = require('../../../util/logger')
var devutil = require('../../../util/devutil')
module.exports = syrup()
.dependency(require('./adb'))
.dependency(require('./quit'))
.dependency(require('../resources/remote'))
.define(function(options, adb, quit, remote) {
var log = logger.createLogger('device:plugins:http')
var service = {
port: 2870
, privateUrl: null
, publicUrl: null
}
function openService() {
log.info('Launching HTTP API')
return devutil.ensureUnusedPort(adb, options.serial, service.port)
.then(function() {
var log = logger.createLogger('device:remote:http')
return adb.shell(options.serial, [
remote.bin
, '--lib', remote.lib
, '--listen-http', service.port
])
.then(function(out) {
out.pipe(split())
.on('data', function(chunk) {
log.info('Remote says: "%s"', chunk)
})
.on('error', function(err) {
log.fatal('Remote shell had an error', err.stack)
quit.fatal()
})
.on('end', function() {
log.fatal('Remote shell ended')
quit.fatal()
})
})
.then(function() {
return devutil.waitForPort(adb, options.serial, service.port)
})
.then(function(conn) {
var ours = options.ports.pop()
, everyones = options.ports.pop()
, url = util.format('http://127.0.0.1:%d', ours)
// Don't need the connection
conn.end()
log.info('Opening device HTTP API forwarder on "%s"', url)
service.privateUrl = url
service.publicUrl = util.format(
'http://%s:%s'
, options.publicIp
, everyones
)
return adb.forward(
options.serial
, util.format('tcp:%d', ours)
, util.format('tcp:%d', service.port)
)
.then(function() {
log.info(
'Opening HTTP API proxy on "http://%s:%s"'
, options.publicIp
, everyones
)
var resolver = Promise.defer()
function resolve() {
proxyServer
.on('error', function(err) {
log.fatal('Proxy server had an error', err.stack)
quit.fatal()
})
resolver.resolve()
}
function reject(err) {
resolver.reject(err)
}
var proxy = httpProxy.createProxyServer({
target: url
, ws: false
, xfwd: false
})
var proxyServer = http.createServer(proxy.web)
.listen(everyones)
proxyServer.on('listening', resolve)
proxyServer.on('error', reject)
return resolver.promise.finally(function() {
proxyServer.removeListener('listening', resolve)
proxyServer.removeListener('error', reject)
})
})
})
})
}
return openService()
.then(function() {
return {
getDisplay: function(id) {
return request.getAsync({
url: util.format(
'%s/api/v1/displays/%d'
, service.privateUrl
, id
)
, json: true
})
.then(function(args) {
var display = args[1]
assert.ok('id' in display, 'Invalid response from HTTP API')
display.url = util.format(
'%s/api/v1/displays/%d/screenshot.jpg'
, service.publicUrl
, id
)
return display
})
}
}
})
})

View File

@@ -0,0 +1,20 @@
var syrup = require('syrup')
var devutil = require('../../../util/devutil')
var logger = require('../../../util/logger')
module.exports = syrup()
.dependency(require('./adb'))
.define(function(options, adb) {
var log = logger.createLogger('device:plugins:identity')
function solve() {
log.info('Solving identity')
return adb.getProperties(options.serial)
.then(function(properties) {
return devutil.makeIdentity(options.serial, properties)
})
}
return solve()
})

View File

@@ -0,0 +1,224 @@
var util = require('util')
var syrup = require('syrup')
var split = require('split')
var ByteBuffer = require('protobufjs/node_modules/bytebuffer')
var wire = require('../../../wire')
var devutil = require('../../../util/devutil')
var keyutil = require('../../../util/keyutil')
var streamutil = require('../../../util/streamutil')
var logger = require('../../../util/logger')
module.exports = syrup()
.dependency(require('./adb'))
.dependency(require('./router'))
.dependency(require('./quit'))
.dependency(require('../resources/inputagent'))
.define(function(options, adb, router, quit, apk) {
var log = logger.createLogger('device:plugins:input')
var agent = {
socket: null
, port: 1090
}
var service = {
socket: null
, port: 1100
, startAction: 'jp.co.cyberagent.stf.input.agent.InputService.ACTION_START'
, stopAction: 'jp.co.cyberagent.stf.input.agent.InputService.ACTION_STOP'
}
function openAgent() {
log.info('Launching input agent')
return stopAgent()
.then(function() {
return devutil.ensureUnusedPort(adb, options.serial, agent.port)
})
.then(function() {
return adb.shell(options.serial, util.format(
"export CLASSPATH='%s';"
+ " exec app_process /system/bin '%s'"
, apk.path
, apk.main
))
})
.then(function(out) {
out.pipe(split())
.on('data', function(chunk) {
log.info('Agent says: "%s"', chunk)
})
.on('error', function(err) {
log.fatal('InputAgent shell had an error', err.stack)
quit.fatal()
})
.on('end', function() {
log.fatal('InputAgent shell ended')
quit.fatal()
})
})
.then(function() {
return devutil.waitForPort(adb, options.serial, agent.port)
})
.then(function(conn) {
agent.socket = conn
conn.on('error', function(err) {
log.fatal('InputAgent socket had an error', err.stack)
quit.fatal()
})
conn.on('end', function() {
log.fatal('InputAgent socket ended')
quit.fatal()
})
})
}
function stopAgent() {
return devutil.killProcsByComm(
adb
, options.serial
, 'app_process'
, 'app_process'
)
}
function callService(intent) {
return adb.shell(options.serial, util.format(
'am startservice --user 0 %s'
, intent
))
.then(function(out) {
return streamutil.findLine(out, /^Error/)
.finally(function() {
out.end()
})
.then(function(line) {
if (line.indexOf('--user') !== -1) {
return adb.shell(options.serial, util.format(
'am startservice %s'
, intent
))
.then(function() {
return streamutil.findLine(out, /^Error/)
.finally(function() {
out.end()
})
.then(function(line) {
throw new Error(util.format(
'Service had an error: "%s"'
, line
))
})
.catch(streamutil.NoSuchLineError, function() {
return true
})
})
}
else {
throw new Error(util.format(
'Service had an error: "%s"'
, line
))
}
})
.catch(streamutil.NoSuchLineError, function() {
return true
})
})
}
function openService() {
log.info('Launching input service')
return stopService()
.then(function() {
return devutil.waitForPortToFree(adb, options.serial, service.port)
})
.then(function() {
return callService(util.format("-a '%s'", service.startAction))
})
.then(function() {
return devutil.waitForPort(adb, options.serial, service.port)
})
.then(function(conn) {
service.socket = conn
conn.on('error', function(err) {
log.fatal('InputService socket had an error', err.stack)
quit.fatal()
})
conn.on('end', function() {
log.fatal('InputService socket ended')
quit.fatal()
})
})
}
function stopService() {
return callService(util.format("-a '%s'", service.stopAction))
}
function sendInputEvent(event) {
var lengthBuffer = new ByteBuffer()
, messageBuffer = new resource.proto.InputEvent(event).encode()
// Delimiter
lengthBuffer.writeVarint32(messageBuffer.length)
agent.socket.write(Buffer.concat([
lengthBuffer.toBuffer()
, messageBuffer.toBuffer()
]))
}
return openAgent()
.then(openService)
.then(function() {
router
.on(wire.KeyDownMessage, function(channel, message) {
sendInputEvent({
action: 0
, keyCode: keyutil.unwire(message.keyCode)
})
})
.on(wire.KeyUpMessage, function(channel, message) {
sendInputEvent({
action: 1
, keyCode: keyutil.unwire(message.keyCode)
})
})
.on(wire.KeyPressMessage, function(channel, message) {
sendInputEvent({
action: 2
, keyCode: keyutil.unwire(message.keyCode)
})
})
.on(wire.TypeMessage, function(channel, message) {
sendInputEvent({
action: 3
, keyCode: 0
, text: message.text
})
})
return {
unlock: function() {
service.socket.write('unlock\n')
}
, lock: function() {
service.socket.write('lock\n')
}
, acquireWakeLock: function() {
service.socket.write('acquire wake lock\n')
}
, releaseWakeLock: function() {
service.socket.write('release wake lock\n')
}
, identity: function() {
service.socket.write(util.format(
'show identity %s\n'
, options.serial
))
}
}
})
})

View File

@@ -0,0 +1,67 @@
var syrup = require('syrup')
var logger = require('../../../util/logger')
var wire = require('../../../wire')
var wireutil = require('../../../wire/util')
module.exports = syrup()
.dependency(require('./adb'))
.dependency(require('./router'))
.dependency(require('./quit'))
.define(function(options, adb, router, quit) {
var log = logger.createLogger('device:plugins:logcat')
function openService() {
log.info('Launching logcat service')
return adb.openLogcat(options.serial)
.then(function(logcat) {
return logcat
.on('error', function(err) {
log.fatal('Logcat had an error', err)
quit.fatal()
})
.on('end', function() {
log.fatal('Logcat ended')
quit.fatal()
})
})
}
return openService()
.then(function(logcat) {
function reset() {
logcat
.resetFilters()
.excludeAll()
}
function entryListener(entry) {
push.send([
owner.group
, wireutil.envelope(new wire.DeviceLogcatEntryMessage(
options.serial
, entry.date.getTime() / 1000
, entry.pid
, entry.tid
, entry.priority
, entry.tag
, entry.message
))
])
}
logcat.on('entry', entryListener)
router
.on(wire.LogcatApplyFiltersMessage, function(channel, message) {
reset()
message.filters.forEach(function(filter) {
logcat.include(filter.tag, filter.priority)
})
})
reset()
return logcat
})
})

View File

@@ -0,0 +1,27 @@
var syrup = require('syrup')
var logger = require('../../../util/logger')
var wire = require('../../../wire')
var wireutil = require('../../../wire/util')
module.exports = syrup()
.dependency(require('./push'))
.define(function(options, push) {
// Forward all logs
logger.on('entry', function(entry) {
push.send([
wireutil.log
, wireutil.envelope(new wire.DeviceLogMessage(
options.serial
, entry.timestamp / 1000
, entry.priority
, entry.tag
, entry.pid
, entry.message
, entry.identifier
))
])
})
return logger
})

View File

@@ -0,0 +1,142 @@
var Promise = require('bluebird')
var syrup = require('syrup')
var logger = require('../../../util/logger')
var wire = require('../../../wire')
var wireutil = require('../../../wire/util')
var devutil = require('../../../util/devutil')
module.exports = syrup()
.dependency(require('./router'))
.dependency(require('./identity'))
.dependency(require('./push'))
.dependency(require('./sub'))
.dependency(require('./channels'))
.dependency(require('./input'))
.dependency(require('./quit'))
.define(function(options, router, identity, push, sub, channels, input, quit) {
var log = logger.createLogger('device:plugins:owner')
var owner = null
function isGrouped() {
return !!owner
}
function isOwnedBy(someOwner) {
return owner && owner.group == someOwner.group
}
function joinGroup(newOwner, timeout) {
log.info('Now owned by "%s"', newOwner.email)
log.info('Subscribing to group channel "%s"', newOwner.group)
channels.register(newOwner.group, timeout)
sub.subscribe(newOwner.group)
push.send([
wireutil.global
, wireutil.envelope(new wire.JoinGroupMessage(
options.serial
, newOwner
))
])
input.acquireWakeLock(services.inputServiceSocket)
input.unlock(services.inputServiceSocket)
owner = newOwner
}
function leaveGroup() {
log.info('No longer owned by "%s"', owner.email)
log.info('Unsubscribing from group channel "%s"', owner.group)
channels.unregister(owner.group)
sub.unsubscribe(owner.group)
push.send([
wireutil.global
, wireutil.envelope(new wire.LeaveGroupMessage(
options.serial
, owner
))
])
input.releaseWakeLock(services.inputServiceSocket)
input.lock(services.inputServiceSocket)
owner = null
}
channels.on('timeout', function(channel) {
if (owner && channel === owner.group) {
leaveGroup()
}
})
router
.on(wire.GroupMessage, function(channel, message) {
var seq = 0
if (devutil.matchesRequirements(identity, message.requirements)) {
if (!isGrouped()) {
joinGroup(message.owner, message.timeout)
push.send([
channel
, wireutil.envelope(new wire.TransactionDoneMessage(
options.serial
, seq++
, true
))
])
}
else if (isOwnedBy(message.owner)) {
push.send([
channel
, wireutil.envelope(new wire.TransactionDoneMessage(
options.serial
, seq++
, true
))
])
}
else {
push.send([
channel
, wireutil.envelope(new wire.TransactionDoneMessage(
options.serial
, seq++
, false
))
])
}
}
})
.on(wire.UngroupMessage, function(channel, message) {
var seq = 0
if (devutil.matchesRequirements(identity, message.requirements)) {
if (isGrouped()) {
leaveGroup()
push.send([
channel
, wireutil.envelope(new wire.TransactionDoneMessage(
options.serial
, seq++
, true
))
])
}
else {
push.send([
channel
, wireutil.envelope(new wire.TransactionDoneMessage(
options.serial
, seq++
, true
))
])
}
}
})
quit.observe(function() {
if (isGrouped()) {
leaveGroup()
return Promise.delay(500)
}
else {
return true
}
})
})

View File

@@ -0,0 +1,19 @@
var syrup = require('syrup')
var zmq = require('zmq')
var logger = require('../../../util/logger')
module.exports = syrup()
.define(function(options) {
var log = logger.createLogger('device:plugins:push')
// Output
var push = zmq.socket('push')
options.endpoints.push.forEach(function(endpoint) {
log.info('Sending output to %s', endpoint)
push.connect(endpoint)
})
return push
})

View File

@@ -0,0 +1,38 @@
var Promise = require('bluebird')
var syrup = require('syrup')
var logger = require('../../../util/logger')
module.exports = syrup()
.define(function(options) {
var log = logger.createLogger('device:plugins:quit')
var cleanup = []
function graceful() {
log.info('Winding down for graceful exit')
var wait = Promise.all(cleanup.map(function(fn) {
return fn()
}))
return wait.then(function() {
process.exit(0)
})
}
function fatal() {
log.fatal('Shutting down due to fatal error')
process.exit(1)
}
process.on('SIGINT', graceful)
process.on('SIGTERM', graceful)
return {
graceful: graceful
, fatal: fatal
, observe: function(promise) {
cleanup.push(promise)
}
}
})

View File

@@ -0,0 +1,15 @@
var syrup = require('syrup')
var wirerouter = require('../../../wire/router')
module.exports = syrup()
.dependency(require('./sub'))
.dependency(require('./channels'))
.define(function(options, sub, channels) {
var router = wirerouter()
sub.on('message', router.handler())
router.on('message', function(channel) {
channels.keepalive(channel)
})
return router
})

View File

@@ -0,0 +1,96 @@
var Promise = require('bluebird')
var syrup = require('syrup')
var logger = require('../../../util/logger')
var wire = require('../../../wire')
var wireutil = require('../../../wire/util')
module.exports = syrup()
.dependency(require('./adb'))
.dependency(require('./router'))
.dependency(require('./push'))
.dependency(require('./sub'))
.define(function(options, adb, router, push, sub) {
var log = logger.createLogger('device:plugins:shell')
router.on(wire.ShellCommandMessage, function(channel, message) {
var seq = 0
log.info('Running shell command "%s"', message.command)
adb.shell(options.serial, message.command)
.then(function(stream) {
var resolver = Promise.defer()
, timer
function keepAliveListener(channel, message) {
clearTimeout(timer)
timer = setTimeout(forceStop, message.timeout)
}
function readableListener() {
var chunk
while (chunk = stream.read()) {
push.send([
channel
, wireutil.envelope(new wire.TransactionProgressMessage(
options.serial
, seq++
, chunk
))
])
}
}
function endListener() {
push.send([
channel
, wireutil.envelope(new wire.TransactionDoneMessage(
options.serial
, seq++
, true
))
])
resolver.resolve()
}
function errorListener(err) {
resolver.reject(err)
}
function forceStop() {
stream.end()
}
stream.on('readable', readableListener)
stream.on('end', endListener)
stream.on('error', errorListener)
sub.subscribe(channel)
router.on(wire.ShellKeepAliveMessage, keepAliveListener)
timer = setTimeout(forceStop, message.timeout)
return resolver.promise.finally(function() {
stream.removeListener('readable', readableListener)
stream.removeListener('end', endListener)
stream.removeListener('error', errorListener)
sub.unsubscribe(channel)
router.removeListener(wire.ShellKeepAliveMessage, keepAliveListener)
clearTimeout(timer)
})
})
.error(function(err) {
log.error('Shell command "%s" failed', message.command, err.stack)
push.send([
channel
, wireutil.envelope(new wire.TransactionDoneMessage(
options.serial
, seq++
, false
, err.message
))
])
})
})
})

View File

@@ -0,0 +1,32 @@
var syrup = require('syrup')
var logger = require('../../../util/logger')
var wire = require('../../../wire')
var wireutil = require('../../../wire/util')
module.exports = syrup()
.dependency(require('./sub'))
.dependency(require('./push'))
.dependency(require('./channels'))
.define(function(options, sub, push, channels) {
var log = logger.createLogger('device:plugins:solo')
var channel = wireutil.makePrivateChannel()
log.info('Subscribing to permanent channel "%s"', channel)
sub.subscribe(channel)
channels.register(channel, Infinity)
return {
channel: channel
, poke: function() {
push.send([
wireutil.global
, wireutil.envelope(new wire.DevicePokeMessage(
options.serial
, channel
))
])
}
}
})

View File

@@ -0,0 +1,61 @@
var syrup = require('syrup')
var split = require('split')
var logger = require('../../../util/logger')
var devutil = require('../../../util/devutil')
module.exports = syrup()
.dependency(require('./adb'))
.dependency(require('./quit'))
.dependency(require('../resources/remote'))
.define(function(options, adb, quit, remote) {
var log = logger.createLogger('device:plugins:stats')
var service = {
port: 2830
}
function openService() {
return devutil.ensureUnusedPort(adb, options.serial, service.port)
.then(function(port) {
return adb.shell(options.serial, [
remote.bin
, '--lib', remote.lib
, '--listen-stats', service.port
])
.then(function(out) {
out.pipe(split())
.on('data', function(chunk) {
log.info('Remote says: "%s"', chunk)
})
.on('error', function(err) {
log.fatal('Remote shell had an error', err.stack)
quit.fatal()
})
.on('end', function() {
log.fatal('Remote shell ended')
quit.fatal()
})
})
})
.then(function() {
return devutil.waitForPort(adb, options.serial, service.port)
})
.then(function(conn) {
conn.pipe(split())
.on('error', function(err) {
log.fatal('Remote had an error', err.stack)
quit.fatal()
})
.on('end', function() {
log.fatal('Remote ended')
quit.fatal()
})
})
}
return openService()
.then(function() {
return {}
})
})

View File

@@ -0,0 +1,28 @@
var syrup = require('syrup')
var zmq = require('zmq')
var logger = require('../../../util/logger')
var wireutil = require('../../../wire/util')
module.exports = syrup()
.dependency(require('./channels'))
.define(function(options, channels) {
var log = logger.createLogger('device:plugins:sub')
// Input
var sub = zmq.socket('sub')
options.endpoints.sub.forEach(function(endpoint) {
log.info('Receiving input from %s', endpoint)
sub.connect(endpoint)
})
// Establish always-on channels
;[wireutil.global].forEach(function(channel) {
log.info('Subscribing to permanent channel "%s"', channel)
sub.subscribe(channel)
channels.register(channel, Infinity)
})
return sub
})

View File

@@ -0,0 +1,95 @@
var Promise = require('bluebird')
var syrup = require('syrup')
var split = require('split')
var monkey = require('adbkit-monkey')
var wire = require('../../../wire')
var devutil = require('../../../util/devutil')
var logger = require('../../../util/logger')
module.exports = syrup()
.dependency(require('./adb'))
.dependency(require('./router'))
.dependency(require('./quit'))
.dependency(require('../resources/remote'))
.define(function(options, adb, router, quit, remote) {
var log = logger.createLogger('device:plugins:touch')
var service = {
port: 2820
}
function openService() {
log.info('Launching touch service')
return devutil.ensureUnusedPort(adb, options.serial, service.port)
.then(function() {
return adb.shell(options.serial, [
remote.bin
, '--lib', remote.lib
, '--listen-input', service.port
])
})
.then(function(out) {
out.pipe(split())
.on('data', function(chunk) {
log.info('Remote says: "%s"', chunk)
})
.on('error', function(err) {
log.fatal('Remote had an error', err.stack)
quit.fatal()
})
.on('end', function() {
log.fatal('Remote ended')
quit.fatal()
})
})
.then(function() {
return devutil.waitForPort(adb, options.serial, service.port)
})
.then(function(conn) {
return Promise.promisifyAll(monkey.connectStream(conn))
})
.then(function(monkey) {
monkey
.on('error', function(err) {
log.fatal('Monkey had an error', err.stack)
quit.fatal()
})
.on('end', function() {
log.fatal('Monkey ended')
quit.fatal()
})
})
}
return openService()
.then(function(monkey) {
router
.on(wire.TouchDownMessage, function(channel, message) {
monkey.touchDownAsync(message.x, message.y)
.catch(function(err) {
log.error('touchDown failed', err.stack)
})
})
.on(wire.TouchMoveMessage, function(channel, message) {
monkey.touchMoveAsync(message.x, message.y)
.catch(function(err) {
log.error('touchMove failed', err.stack)
})
})
.on(wire.TouchUpMessage, function(channel, message) {
monkey.touchUpAsync(message.x, message.y)
.catch(function(err) {
log.error('touchUp failed', err.stack)
})
})
.on(wire.TapMessage, function(channel, message) {
monkey.tapAsync(message.x, message.y)
.catch(function(err) {
log.error('tap failed', err.stack)
})
})
return {}
})
})