diff --git a/lib/roles/app.js b/lib/roles/app.js index caea5d8a..a77989a8 100644 --- a/lib/roles/app.js +++ b/lib/roles/app.js @@ -168,18 +168,14 @@ module.exports = function(options) { var channels = [] , group = socket.handshake.user.group - function messageListener(channel, wrapper) { - switch (wrapper.type) { - case wire.MessageType.JOIN_GROUP: - var message = wire.JoinGroupMessage.decode(wrapper.message) - socket.emit('join', message) - break - case wire.MessageType.LEAVE_GROUP: - var message = wire.LeaveGroupMessage.decode(wrapper.message) - socket.emit('leave', message) - break - } - } + var messageListener = wirerouter() + .on(wire.MessageType.JOIN_GROUP, function(channel, message) { + socket.emit('join', message) + }) + .on(wire.MessageType.LEAVE_GROUP, function(channel, message) { + socket.emit('leave', message) + }) + .handler() // Global messages // diff --git a/lib/roles/device.js b/lib/roles/device.js index 6c8c718b..cc000147 100644 --- a/lib/roles/device.js +++ b/lib/roles/device.js @@ -13,6 +13,7 @@ var split = require('split') var logger = require('../util/logger') var wire = require('../wire') var wireutil = require('../util/wireutil')(wire) +var wirerouter = require('../wire/router') var devutil = require('../util/devutil') var pathutil = require('../util/pathutil') var promiseutil = require('../util/promiseutil') @@ -326,90 +327,86 @@ module.exports = function(options) { selfDestruct() }) - sub.on('message', function(channel, data) { - var wrapper = wire.Envelope.decode(data) - channels.keepalive(channel) - switch (wrapper.type) { - case wire.MessageType.PROBE: - var message = wire.ProbeMessage.decode(wrapper.message) - push.send([channel, - wireutil.makeDeviceIdentityMessage(options.serial, identity)]) - break - case wire.MessageType.GROUP: - var message = wire.GroupMessage.decode(wrapper.message) - , groupChannel = message.channel - if (devutil.matchesRequirements(identity, message.requirements)) { - channels.register(groupChannel, message.timeout) - log.info('Subscribing to group channel "%s"', groupChannel) - sub.subscribe(groupChannel) - push.send([groupChannel, - wireutil.makeJoinGroupMessage(options.serial)]) - } - break - case wire.MessageType.UNGROUP: - var message = wire.UngroupMessage.decode(wrapper.message) - , groupChannel = message.channel - if (devutil.matchesRequirements(identity, message.requirements)) { - channels.unregister(groupChannel) - log.info('Unsubscribing from group channel "%s"', groupChannel) - sub.unsubscribe(groupChannel) - push.send([groupChannel, - wireutil.makeLeaveGroupMessage(options.serial)]) - } - break - case wire.MessageType.SHELL_COMMAND: - var message = wire.ShellCommandMessage.decode(wrapper.message) - log.info('Running shell command "%s"', message.command.join(' ')) - adb.shellAsync(options.serial, message.command) - .then(function(stream) { - var resolver = Promise.defer() - , seq = 0 + sub.on('message', wirerouter() + .on(wire.MessageType.ProbeMessage, function(channel, message) { + push.send([channel, + wireutil.makeDeviceIdentityMessage(options.serial, identity)]) + channels.keepalive(channel) + }) + .on(wire.MessageType.GroupMessage, function(channel, message) { + var groupChannel = message.channel + if (devutil.matchesRequirements(identity, message.requirements)) { + channels.register(groupChannel, message.timeout) + log.info('Subscribing to group channel "%s"', groupChannel) + sub.subscribe(groupChannel) + push.send([groupChannel, + wireutil.makeJoinGroupMessage(options.serial)]) + } + channels.keepalive(channel) + }) + .on(wire.MessageType.UngroupMessage, function(channel, message) { + var groupChannel = message.channel + if (devutil.matchesRequirements(identity, message.requirements)) { + channels.unregister(groupChannel) + log.info('Unsubscribing from group channel "%s"', groupChannel) + sub.unsubscribe(groupChannel) + push.send([groupChannel, + wireutil.makeLeaveGroupMessage(options.serial)]) + } + channels.keepalive(channel) + }) + .on(wire.MessageType.ShellCommandMessage, function(channel, message) { + log.info('Running shell command "%s"', message.command.join(' ')) + adb.shellAsync(options.serial, message.command) + .then(function(stream) { + var resolver = Promise.defer() + , seq = 0 - function dataListener(chunk) { - push.send([message.channel, - wireutil.makeDeviceDataMessage( - options.serial - , seq++ - , chunk - )]) - } - - function endListener() { - push.send([message.channel, - wireutil.makeDeviceDoneMessage(options.serial)]) - resolver.resolve() - } - - function errorListener(err) { - log.error('Shell command "%s" failed due to "%s"' - , message.command.join(' '), err.message) - resolver.reject(err) - push.send([message.channel, - wireutil.makeDeviceFailMessage( - options.serial - , err.message - )]) - } - - stream.on('data', dataListener) - stream.on('end', endListener) - stream.on('error', errorListener) - - return resolver.promise.finally(function() { - stream.removeListener('data', dataListener) - stream.removeListener('end', endListener) - stream.removeListener('error', errorListener) - }) - }) - .error(function(err) { - log.error('Shell command "%s" failed due to "%s"' - , message.command.join(' '), err.message) + function dataListener(chunk) { push.send([message.channel, - wire.makeDeviceFailMessage(options.serial, err.message)]) + wireutil.makeShellCommandDataMessage( + options.serial + , seq++ + , chunk + )]) + } + + function endListener() { + push.send([message.channel, + wireutil.makeShellCommandDoneMessage(options.serial)]) + resolver.resolve() + } + + function errorListener(err) { + log.error('Shell command "%s" failed due to "%s"' + , message.command.join(' '), err.message) + resolver.reject(err) + push.send([message.channel, + wireutil.makeShellCommandFailMessage( + options.serial + , err.message + )]) + } + + stream.on('data', dataListener) + stream.on('end', endListener) + stream.on('error', errorListener) + + return resolver.promise.finally(function() { + stream.removeListener('data', dataListener) + stream.removeListener('end', endListener) + stream.removeListener('error', errorListener) }) - break - } - }) + }) + .error(function(err) { + log.error('Shell command "%s" failed due to "%s"' + , message.command.join(' '), err.message) + push.send([message.channel, + wire.makeShellCommandFailMessage(options.serial, err.message)]) + }) + channels.keepalive(channel) + }) + .handler()) function poke() { push.send([wireutil.global, diff --git a/lib/roles/notify/hipchat.js b/lib/roles/notify/hipchat.js index d87be864..223e3a20 100644 --- a/lib/roles/notify/hipchat.js +++ b/lib/roles/notify/hipchat.js @@ -6,6 +6,7 @@ var zmq = require('zmq') var logger = require('../../util/logger') var wire = require('../../wire') +var wirerouter = require('../../wire/router') var wireutil = require('../../util/wireutil')(wire) module.exports = function(options) { @@ -27,19 +28,15 @@ module.exports = function(options) { sub.subscribe(channel) }) - sub.on('message', function(channel, data) { - var wrapper = wire.Envelope.decode(data) - switch (wrapper.type) { - case wire.MessageType.DEVICE_LOG: - var message = wire.DeviceLogMessage.decode(wrapper.message) - if (message.priority >= options.priority) { - buffer.push(message) - clearTimeout(timer) - timer = setTimeout(push, 1000) - } - break - } - }) + sub.on('message', wirerouter() + .on(wire.MessageType.DeviceLogMessage, function(channel, message) { + if (message.priority >= options.priority) { + buffer.push(message) + clearTimeout(timer) + timer = setTimeout(push, 1000) + } + }) + .handler()) function push() { var messages = buffer.splice(0).map(function(entry) { diff --git a/lib/roles/processor.js b/lib/roles/processor.js index 04037089..e2943cdd 100644 --- a/lib/roles/processor.js +++ b/lib/roles/processor.js @@ -2,6 +2,7 @@ var zmq = require('zmq') var logger = require('../util/logger') var wire = require('../wire') +var wirerouter = require('../wire/router') var wireutil = require('../util/wireutil')(wire) var dbapi = require('../db/api') @@ -30,49 +31,34 @@ module.exports = function(options) { devDealer.connect(endpoint) }) - devDealer.on('message', function(channel, data) { - var wrapper = wire.Envelope.decode(data) - switch (wrapper.type) { - case wire.MessageType.JOIN_GROUP: - var message = wire.JoinGroupMessage.decode(wrapper.message) - appDealer.send([channel, data]) - break - case wire.MessageType.LEAVE_GROUP: - var message = wire.LeaveGroupMessage.decode(wrapper.message) - appDealer.send([channel, data]) - break - case wire.MessageType.DEVICE_LOG: - var message = wire.DeviceLogMessage.decode(wrapper.message) - dbapi.saveDeviceLog(message.serial, message) - appDealer.send([channel, data]) - break - case wire.MessageType.DEVICE_POKE: - var message = wire.DevicePokeMessage.decode(wrapper.message) - devDealer.send([message.channel, wireutil.makeProbeMessage()]) - break - case wire.MessageType.DEVICE_IDENTITY: - var message = wire.DeviceIdentityMessage.decode(wrapper.message) - dbapi.saveDeviceIdentity(message.serial, message) - break - case wire.MessageType.DEVICE_STATUS: - var message = wire.DeviceStatusMessage.decode(wrapper.message) - dbapi.saveDeviceStatus(message.serial, message) - break - case wire.MessageType.DEVICE_PROPERTIES: - var message = wire.DevicePropertiesMessage.decode(wrapper.message) - break - case wire.MessageType.DEVICE_DATA: - var message = wire.DeviceDataMessage.decode(wrapper.message) - appDealer.send([channel, data]) - break - case wire.MessageType.DEVICE_DONE: - var message = wire.DeviceDoneMessage.decode(wrapper.message) - appDealer.send([channel, data]) - break - case wire.MessageType.DEVICE_FAIL: - var message = wire.DeviceFailMessage.decode(wrapper.message) - appDealer.send([channel, data]) - break - } - }) + devDealer.on('message', wirerouter() + .on(wire.MessageType.JoinGroupMessage, function(channel, message, data) { + appDealer.send([channel, data]) + }) + .on(wire.MessageType.LeaveGroupMessage, function(channel, message, data) { + appDealer.send([channel, data]) + }) + .on(wire.MessageType.DeviceLogMessage, function(channel, message, data) { + dbapi.saveDeviceLog(message.serial, message) + appDealer.send([channel, data]) + }) + .on(wire.MessageType.DevicePokeMessage, function(channel, message) { + devDealer.send([message.channel, wireutil.makeProbeMessage()]) + }) + .on(wire.MessageType.DeviceIdentityMessage, function(channel, message) { + dbapi.saveDeviceIdentity(message.serial, message) + }) + .on(wire.MessageType.DeviceStatusMessage, function(channel, message) { + dbapi.saveDeviceStatus(message.serial, message) + }) + .on(wire.MessageType.DeviceShellCommandDataMessage, function(channel, message, data) { + appDealer.send([channel, data]) + }) + .on(wire.MessageType.DeviceShellCommandDoneMessage, function(channel, message, data) { + appDealer.send([channel, data]) + }) + .on(wire.MessageType.DeviceShellCommandFailMessage, function(channel, message, data) { + appDealer.send([channel, data]) + }) + .handler()) } diff --git a/lib/util/wireutil.js b/lib/util/wireutil.js index 7d5ee731..0fbf50dc 100644 --- a/lib/util/wireutil.js +++ b/lib/util/wireutil.js @@ -30,7 +30,7 @@ module.exports = function(wire) { , entry.identifier ) - return wireutil.envelope(wire.MessageType.DEVICE_LOG, message) + return wireutil.envelope(wire.MessageType.DeviceLogMessage, message) } , makeGroupMessage: function(channel, timeout, requirements) { var message = new wire.GroupMessage( @@ -39,23 +39,23 @@ module.exports = function(wire) { , requirements ) - return wireutil.envelope(wire.MessageType.GROUP, message) + return wireutil.envelope(wire.MessageType.GroupMessage, message) } , makeUngroupMessage: function(requirements) { var message = new wire.UngroupMessage(requirements) - return wireutil.envelope(wire.MessageType.UNGROUP, message) + return wireutil.envelope(wire.MessageType.UngroupMessage, message) } , makeJoinGroupMessage: function(serial) { var message = new wire.JoinGroupMessage(serial) - return wireutil.envelope(wire.MessageType.JOIN_GROUP, message) + return wireutil.envelope(wire.MessageType.JoinGroupMessage, message) } , makeLeaveGroupMessage: function(serial) { var message = new wire.LeaveGroupMessage(serial) - return wireutil.envelope(wire.MessageType.LEAVE_GROUP, message) + return wireutil.envelope(wire.MessageType.LeaveGroupMessage, message) } , makeDevicePokeMessage: function(serial, channel) { var message = new wire.DevicePokeMessage(serial, channel) - return wireutil.envelope(wire.MessageType.DEVICE_POKE, message) + return wireutil.envelope(wire.MessageType.DevicePokeMessage, message) } , makeDeviceIdentityMessage: function(serial, identity) { var message = new wire.DeviceIdentityMessage( @@ -81,7 +81,7 @@ module.exports = function(wire) { ) ) - return wireutil.envelope(wire.MessageType.DEVICE_IDENTITY, message) + return wireutil.envelope(wire.MessageType.DeviceIdentityMessage, message) } , makeDevicePropertiesMessage: function(serial, properties) { var message = new wire.DevicePropertiesMessage( @@ -91,7 +91,10 @@ module.exports = function(wire) { }) ) - return wireutil.envelope(wire.MessageType.DEVICE_PROPERTIES, message) + return wireutil.envelope( + wire.MessageType.DevicePropertiesMessage + , message + ) } , makeDeviceStatusMessage: function(serial, type, provider) { var message = new wire.DeviceStatusMessage( @@ -100,27 +103,36 @@ module.exports = function(wire) { , provider ) - return wireutil.envelope(wire.MessageType.DEVICE_STATUS, message) + return wireutil.envelope(wire.MessageType.DeviceStatusMessage, message) } , makeProbeMessage: function() { var message = new wire.ProbeMessage() - return wireutil.envelope(wire.MessageType.PROBE, message) + return wireutil.envelope(wire.MessageType.ProbeMessage, message) } , makeShellCommandMessage: function(channel, command) { var message = new wire.ShellCommandMessage(channel, command) - return wireutil.envelope(wire.MessageType.SHELL_COMMAND, message) + return wireutil.envelope(wire.MessageType.ShellCommandMessage, message) } - , makeDeviceDataMessage: function(serial, seq, chunk) { - var message = new wire.DeviceDataMessage(serial, seq, chunk) - return wireutil.envelope(wire.MessageType.DEVICE_DATA, message) + , makeShellCommandDataMessage: function(serial, seq, chunk) { + var message = new wire.ShellCommandDataMessage(serial, seq, chunk) + return wireutil.envelope( + wire.MessageType.ShellCommandDataMessage + , message + ) } - , makeDeviceDoneMessage: function(serial) { - var message = new wire.DeviceDoneMessage(serial) - return wireutil.envelope(wire.MessageType.DEVICE_DONE, message) + , makeShellCommandDoneMessage: function(serial) { + var message = new wire.ShellCommandDoneMessage(serial) + return wireutil.envelope( + wire.MessageType.ShellCommandDoneMessage + , message + ) } - , makeDeviceFailMessage: function(serial, reason) { - var message = new wire.DeviceFailMessage(serial, reason) - return wireutil.envelope(wire.MessageType.DEVICE_FAIL, message) + , makeShellCommandFailMessage: function(serial, reason) { + var message = new wire.ShellCommandFailMessage(serial, reason) + return wireutil.envelope( + wire.MessageType.ShellCommandFailMessage + , message + ) } } diff --git a/lib/wire/index.js b/lib/wire/index.js index d82bc80d..c2d08f03 100644 --- a/lib/wire/index.js +++ b/lib/wire/index.js @@ -2,5 +2,15 @@ var path = require('path') var ProtoBuf = require('protobufjs') -module.exports = - ProtoBuf.loadProtoFile(path.join(__dirname, 'wire.proto')).build() +var wire = ProtoBuf.loadProtoFile(path.join(__dirname, 'wire.proto')).build() + +wire.ReverseMessageType = Object.keys(wire.MessageType) + .reduce( + function(acc, type) { + acc[wire.MessageType[type]] = type + return acc + } + , Object.create(null) + ) + +module.exports = wire diff --git a/lib/wire/router.js b/lib/wire/router.js new file mode 100644 index 00000000..bc808aa2 --- /dev/null +++ b/lib/wire/router.js @@ -0,0 +1,39 @@ +var events = require('events') +var util = require('util') + +var wire = require('./') +var log = require('../util/logger').createLogger('wire:router') + +function Router() { + if (!(this instanceof Router)) { + return new Router() + } + + events.EventEmitter.call(this) +} + +util.inherits(Router, events.EventEmitter) + +Router.prototype.handler = function() { + return function(channel, data) { + var wrapper = wire.Envelope.decode(data) + , type = wire.ReverseMessageType[wrapper.type] + + if (type) { + this.emit( + wrapper.type + , channel + , wire[type].decode(wrapper.message) + , data + ) + } + else { + log.warn( + 'Unknown message type "%d", perhaps we need an update?' + , wrapper.type + ) + } + }.bind(this) +} + +module.exports = Router diff --git a/lib/wire/wire.proto b/lib/wire/wire.proto index 3f9698b1..53952b77 100644 --- a/lib/wire/wire.proto +++ b/lib/wire/wire.proto @@ -1,21 +1,20 @@ // Message wrapper enum MessageType { - DEVICE_POKE = 1; - DEVICE_STATUS = 2; - DEVICE_TYPE = 3; - DEVICE_PROPERTIES = 4; - GROUP = 5; - UNGROUP = 15; - JOIN_GROUP = 6; - LEAVE_GROUP = 7; - PROBE = 8; - SHELL_COMMAND = 9; - DEVICE_DATA = 10; - DEVICE_DONE = 11; - DEVICE_FAIL = 12; - DEVICE_IDENTITY = 13; - DEVICE_LOG = 14; + DevicePokeMessage = 1; + DeviceStatusMessage = 2; + DevicePropertiesMessage = 4; + GroupMessage = 5; + UngroupMessage = 15; + JoinGroupMessage = 6; + LeaveGroupMessage = 7; + ProbeMessage = 8; + ShellCommandMessage = 9; + ShellCommandDataMessage = 10; + ShellCommandDoneMessage = 11; + ShellCommandFailMessage = 12; + DeviceIdentityMessage = 13; + DeviceLogMessage = 14; } message Envelope {