diff --git a/lib/db/api.js b/lib/db/api.js index f9a17b2b..a9ac7a9e 100644 --- a/lib/db/api.js +++ b/lib/db/api.js @@ -84,6 +84,12 @@ dbapi.setDeviceAbsent = function(serial) { })) } +dbapi.setDeviceChannel = function(serial, channel) { + return db.run(r.table('devices').get(serial).update({ + channel: channel + })) +} + dbapi.saveDeviceIdentity = function(serial, identity) { return db.run(r.table('devices').get(serial).update({ platform: identity.platform @@ -105,16 +111,4 @@ dbapi.loadDevice = function(serial) { return db.run(r.table('devices').get(serial)) } -dbapi.ensureDeviceSaved = function(serial) { - return dbapi.loadDevice(serial) - .then(function(device) { - if (!device) { - return Promise.delay(100).then(function() { - return dbapi.loadDevice(serial) - }) - } - return device - }) -} - module.exports = dbapi diff --git a/lib/roles/device.js b/lib/roles/device.js index e7a6d7f1..720e4966 100644 --- a/lib/roles/device.js +++ b/lib/roles/device.js @@ -431,8 +431,13 @@ module.exports = function(options) { .handler()) function poke() { - push.send([wireutil.global, - wireutil.makeDevicePokeMessage(options.serial, solo)]) + push.send([ + wireutil.global + , wireutil.envelope(new wire.DevicePokeMessage( + options.serial + , solo + )) + ]) } function isGrouped() { diff --git a/lib/roles/processor.js b/lib/roles/processor.js index f13a9eeb..5cadb5ab 100644 --- a/lib/roles/processor.js +++ b/lib/roles/processor.js @@ -32,10 +32,20 @@ module.exports = function(options) { }) devDealer.on('message', wirerouter() + // Initial device message .on(wire.DevicePresentMessage, function(channel, message, data) { dbapi.saveDevice(message.serial, message) - appDealer.send([channel, data]) + .then(function() { + devDealer.send([ + message.provider.channel + , wireutil.envelope(new wire.DeviceRegisteredMessage( + message.serial + )) + ]) + appDealer.send([channel, data]) + }) }) + // Workerless messages .on(wire.DeviceAbsentMessage, function(channel, message, data) { dbapi.setDeviceAbsent(message.serial) appDealer.send([channel, data]) @@ -44,6 +54,17 @@ module.exports = function(options) { dbapi.saveDeviceStatus(message.serial, message.status) appDealer.send([channel, data]) }) + // Worker initialized + .on(wire.DevicePokeMessage, function(channel, message) { + dbapi.setDeviceChannel(message.serial, message.channel) + .then(function() { + devDealer.send([ + message.channel + , wireutil.envelope(new wire.ProbeMessage()) + ]) + }) + }) + // Worker messages .on(wire.JoinGroupMessage, function(channel, message, data) { dbapi.setDeviceOwner(message.serial, message.owner) appDealer.send([channel, data]) @@ -56,12 +77,6 @@ module.exports = function(options) { dbapi.saveDeviceLog(message.serial, message) appDealer.send([channel, data]) }) - .on(wire.DevicePokeMessage, function(channel, message) { - dbapi.ensureDeviceSaved(message.serial) - .then(function() { - devDealer.send([message.channel, wireutil.makeProbeMessage()]) - }) - }) .on(wire.DeviceIdentityMessage, function(channel, message, data) { dbapi.saveDeviceIdentity(message.serial, message) appDealer.send([channel, data]) diff --git a/lib/roles/provider.js b/lib/roles/provider.js index 491c5427..10d99e45 100644 --- a/lib/roles/provider.js +++ b/lib/roles/provider.js @@ -9,12 +9,14 @@ var _ = require('lodash') var logger = require('../util/logger') var wire = require('../wire') var wireutil = require('../wire/util') +var wirerouter = require('../wire/router') var procutil = require('../util/procutil') module.exports = function(options) { var log = logger.createLogger('provider') var client = Promise.promisifyAll(adb.createClient()) var workers = {} + var solo = wireutil.makePrivateChannel() var lists = { all: [] , ready: [] @@ -58,6 +60,19 @@ module.exports = function(options) { push.connect(endpoint) }) + // Input + var sub = zmq.socket('sub') + options.endpoints.sub.forEach(function(endpoint) { + log.info('Receiving input from %s', endpoint) + sub.connect(endpoint) + }) + + // Establish always-on channels + ;[solo].forEach(function(channel) { + log.info('Subscribing to permanent channel "%s"', channel) + sub.subscribe(channel) + }) + // Track and manage devices client.trackDevicesAsync().then(function(tracker) { log.info('Tracking devices') @@ -82,85 +97,119 @@ module.exports = function(options) { tracker.on('add', filterDevice(function(device) { log.info('Found device "%s" (%s)', device.id, device.type) - // Tell others we found a device - push.send([ - wireutil.global - , wireutil.envelope(new wire.DevicePresentMessage( - device.id - , options.name - , wireutil.toDeviceStatus(device.type) - )) - ]) + var privateTracker = new events.EventEmitter() + , timer + , worker + + // Wait for others to acknowledge the device + var register = new Promise(function(resolve, reject) { + // Tell others we found a device + push.send([ + wireutil.global + , wireutil.envelope(new wire.DevicePresentMessage( + device.id + , wireutil.toDeviceStatus(device.type) + , new wire.ProviderMessage( + solo + , options.name + ) + )) + ]) + + privateTracker.once('register', resolve) + }) + + register.then(function() { + log.info('Registered device "%s"', device.id) + check() + }) // Statistics lists.all.push(device.id) delayedTotals() - var privateTracker = new events.EventEmitter() - , resolver = Promise.defer() - , timer - , worker + // The device object will be kept up to date by the tracker, except + // our custom "present" property + _.assign(device, { + present: true + }) // When any event occurs on the added device - function deviceListener(type, device) { + function deviceListener(type) { // Okay, this is a bit unnecessary but it allows us to get rid of an // ugly switch statement and return to the original style. - privateTracker.emit(type, device) + privateTracker.emit(type) } // When the added device changes - function changeListener(device) { - log.info('Device "%s" is now "%s"', device.id, device.type) + function changeListener() { + register.then(function() { + log.info('Device "%s" is now "%s"', device.id, device.type) - // Tell others the device changed - push.send([ - wireutil.global - , wireutil.envelope(new wire.DeviceStatusMessage( - device.id - , wireutil.toDeviceStatus(device.type) - )) - ]) + // Tell others the device changed + push.send([ + wireutil.global + , wireutil.envelope(new wire.DeviceStatusMessage( + device.id + , wireutil.toDeviceStatus(device.type) + )) + ]) - check(device) + check() + }) } // When the added device gets removed - function removeListener(device) { - log.info('Lost device "%s" (%s)', device.id, device.type) + function removeListener() { + register.then(function() { + log.info('Lost device "%s" (%s)', device.id, device.type) - clearTimeout(timer) - flippedTracker.removeListener(device.id, deviceListener) - _.pull(lists.all, device.id) - delayedTotals() + clearTimeout(timer) + flippedTracker.removeListener(device.id, deviceListener) + _.pull(lists.all, device.id) + delayedTotals() - // Tell others the device is gone - push.send([ - wireutil.global - , wireutil.envelope(new wire.DeviceAbsentMessage( - device.id - )) - ]) + // Tell others the device is gone + push.send([ + wireutil.global + , wireutil.envelope(new wire.DeviceAbsentMessage( + device.id + )) + ]) - stop() + _.assign(device, { + present: false + }) + + check() + }) } // Check if we can do anything with the device - function check(device) { + function check() { clearTimeout(timer) - switch (device.type) { - case 'device': - case 'emulator': - timer = setTimeout(work, 100) - break - default: - stop() - break + + if (device.present) { + // We might get multiple status updates in rapid succession, + // so let's wait for a while + switch (device.type) { + case 'device': + case 'emulator': + timer = setTimeout(work, 100) + break + default: + timer = setTimeout(stop, 100) + break + } + } + else { + stop() } } // Starts a device worker and keeps it alive function work() { - return worker = workers[device.id] = spawn(device) + return worker = workers[device.id] = spawn() .then(function() { log.info('Device worker "%s" has retired', device.id) worker = workers[device.id] = null @@ -188,7 +237,7 @@ module.exports = function(options) { } // Spawn a device worker - function spawn(device) { + function spawn() { var ports = options.ports.splice(0, 2) , proc = options.fork(device, ports) , resolver = Promise.defer() @@ -272,7 +321,6 @@ module.exports = function(options) { flippedTracker.on(device.id, deviceListener) privateTracker.on('change', changeListener) privateTracker.on('remove', removeListener) - check(device) })) tracker.on('change', filterDevice(function(device) { @@ -282,6 +330,12 @@ module.exports = function(options) { tracker.on('remove', filterDevice(function(device) { flippedTracker.emit(device.id, 'remove', device) })) + + sub.on('message', wirerouter() + .on(wire.DeviceRegisteredMessage, function(channel, message) { + flippedTracker.emit(message.serial, 'register') + }) + .handler()) }) function gracefullyExit() { diff --git a/lib/wire/util.js b/lib/wire/util.js index 9e683ddc..1a9179b9 100644 --- a/lib/wire/util.js +++ b/lib/wire/util.js @@ -40,12 +40,6 @@ var wireutil = { , entry.identifier )) } -, makeDevicePokeMessage: function(serial, channel) { - return wireutil.envelope(new wire.DevicePokeMessage( - serial - , channel - )) - } , makeDeviceIdentityMessage: function(serial, identity) { return wireutil.envelope(new wire.DeviceIdentityMessage( serial @@ -78,9 +72,6 @@ var wireutil = { }) )) } -, makeProbeMessage: function() { - return wireutil.envelope(new wire.ProbeMessage()) - } , makeShellCommandMessage: function(channel, command) { return wireutil.envelope(new wire.ShellCommandMessage( channel diff --git a/lib/wire/wire.proto b/lib/wire/wire.proto index 04f70375..eee4ef71 100644 --- a/lib/wire/wire.proto +++ b/lib/wire/wire.proto @@ -25,6 +25,7 @@ enum MessageType { KeyDownMessage = 23; KeyUpMessage = 24; KeyPressMessage = 25; + DeviceRegisteredMessage = 26; } message Envelope { @@ -46,10 +47,19 @@ message DeviceLogMessage { // Introductions +message ProviderMessage { + required string channel = 1; + required string name = 2; +} + message DevicePresentMessage { required string serial = 1; - required string provider = 2; - required DeviceStatus status = 4; + required DeviceStatus status = 2; + required ProviderMessage provider = 3; +} + +message DeviceRegisteredMessage { + required string serial = 1; } message DeviceAbsentMessage {