diff --git a/lib/units/device/plugins/screen/stream.js b/lib/units/device/plugins/screen/stream.js index ab75417b..024c018f 100644 --- a/lib/units/device/plugins/screen/stream.js +++ b/lib/units/device/plugins/screen/stream.js @@ -33,7 +33,10 @@ module.exports = syrup.serial() this.output = null this.socket = null this.banner = null + this.parser = null this.frameConfig = config + this.readable = false + this.needsReadable = false } util.inherits(FrameProducer, EventEmitter) @@ -62,13 +65,14 @@ module.exports = syrup.serial() return this._connectService() }) .then(function(socket) { + this.parser = new FrameParser() this.socket = new RiskyStream(socket) .on('unexpectedEnd', this._socketEnded.bind(this)) return this._readBanner(this.socket.stream) }) .then(function(banner) { this.banner = banner - return this._readFrames() + return this._readFrames(this.socket.stream) }) .then(function() { this.runningState = FrameProducer.STATE_STARTED @@ -107,6 +111,7 @@ module.exports = syrup.serial() this.output = null this.socket = null this.banner = null + this.parser = null this._ensureState() }) } @@ -162,6 +167,29 @@ module.exports = syrup.serial() this._configChanged() } + FrameProducer.prototype.nextFrame = function() { + var frame = null, chunk + + if (this.parser) { + while ((frame = this.parser.nextFrame()) === null) { + if ((chunk = this.socket.stream.read())) { + this.parser.push(chunk) + } + else { + this.readable = false + break + } + } + } + + return frame + } + + FrameProducer.prototype.needFrame = function() { + this.needsReadable = true + this._maybeEmitReadable() + } + FrameProducer.prototype._configChanged = function() { this.restart() } @@ -302,23 +330,25 @@ module.exports = syrup.serial() .catch(forceEnd) } - FrameProducer.prototype._readBanner = function(out) { + FrameProducer.prototype._readBanner = function(socket) { log.info('Reading minicap banner') - return bannerutil.read(out).timeout(2000) + return bannerutil.read(socket).timeout(2000) } - FrameProducer.prototype._readFrames = function() { - var parser = this.socket.stream.pipe(new FrameParser()) - var emit = this.emit.bind(this) + FrameProducer.prototype._readFrames = function(socket) { + this.needsReadable = true - function tryRead() { - for (var frame; (frame = parser.read());) { - emit('frame', frame) - } + socket.on('readable', function() { + this.readable = true + this._maybeEmitReadable() + }.bind(this)) + } + + FrameProducer.prototype._maybeEmitReadable = function() { + if (this.readable && this.needsReadable) { + this.needsReadable = false + this.emit('readable') } - - tryRead() - parser.on('readable', tryRead) } function createServer() { @@ -366,12 +396,21 @@ module.exports = syrup.serial() frameProducer.updateRotation(newRotation) }) - frameProducer.on('frame', function(frame) { - broadcastSet.each(function(ws) { - ws.send(frame, { - binary: true - }) - }) + frameProducer.on('readable', function next() { + console.log('NEXT') + var frame + if ((frame = frameProducer.nextFrame())) { + Promise.settle([broadcastSet.values().map(function(ws) { + return new Promise(function(resolve/*, reject*/) { + ws.send(frame, { + binary: true + }, resolve) + }) + })]).then(next) + } + else { + frameProducer.needFrame() + } }) frameProducer.on('error', function(err) { diff --git a/lib/units/device/plugins/screen/util/broadcastset.js b/lib/units/device/plugins/screen/util/broadcastset.js index 0eef8626..1b1cf78e 100644 --- a/lib/units/device/plugins/screen/util/broadcastset.js +++ b/lib/units/device/plugins/screen/util/broadcastset.js @@ -31,9 +31,9 @@ BroadcastSet.prototype.remove = function(id) { } } -BroadcastSet.prototype.each = function(fn) { - return Object.keys(this.set).forEach(function(id) { - return fn(this.set[id]) +BroadcastSet.prototype.values = function() { + return Object.keys(this.set).map(function(id) { + return this.set[id] }, this) } diff --git a/lib/units/device/plugins/screen/util/frameparser.js b/lib/units/device/plugins/screen/util/frameparser.js index 7260905f..585a7554 100644 --- a/lib/units/device/plugins/screen/util/frameparser.js +++ b/lib/units/device/plugins/screen/util/frameparser.js @@ -1,56 +1,66 @@ -var stream = require('stream') -var util = require('util') - function FrameParser() { this.readFrameBytes = 0 this.frameBodyLength = 0 - this.frameBody = new Buffer(0) - stream.Transform.call(this) + this.frameBody = null + this.cursor = 0 + this.chunk = null } -util.inherits(FrameParser, stream.Transform) +FrameParser.prototype.push = function(chunk) { + if (this.chunk) { + throw new Error('Must consume pending frames before pushing more chunks') + } -FrameParser.prototype._transform = function(chunk, encoding, done) { - var cursor, len, bytesLeft + this.chunk = chunk +} - for (cursor = 0, len = chunk.length; cursor < len;) { +FrameParser.prototype.nextFrame = function() { + if (!this.chunk) { + return null + } + + for (var len = this.chunk.length; this.cursor < len;) { if (this.readFrameBytes < 4) { this.frameBodyLength += - (chunk[cursor] << (this.readFrameBytes * 8)) >>> 0 - cursor += 1 + (this.chunk[this.cursor] << (this.readFrameBytes * 8)) >>> 0 + this.cursor += 1 this.readFrameBytes += 1 } else { - bytesLeft = len - cursor + var bytesLeft = len - this.cursor if (bytesLeft >= this.frameBodyLength) { - this.frameBody = Buffer.concat([ - this.frameBody - , chunk.slice(cursor, cursor + this.frameBodyLength) - ]) + var completeBody = this.frameBody + ? Buffer.concat([ + this.frameBody + , this.chunk.slice(this.cursor, this.cursor + this.frameBodyLength) + ]) + : this.chunk.slice(this.cursor, this.cursor + this.frameBodyLength) - this.push(this.frameBody) - - cursor += this.frameBodyLength + this.cursor += this.frameBodyLength this.frameBodyLength = this.readFrameBytes = 0 - this.frameBody = new Buffer(0) + this.frameBody = null + + return completeBody } else { // @todo Consider/benchmark continuation frames to prevent // potential Buffer thrashing. - this.frameBody = Buffer.concat([ - this.frameBody - , chunk.slice(cursor, len) - ]) + this.frameBody = this.frameBody + ? Buffer.concat([this.frameBody, this.chunk.slice(this.cursor, len)]) + : this.chunk.slice(this.cursor, len) this.frameBodyLength -= bytesLeft this.readFrameBytes += bytesLeft - cursor = len + this.cursor = len } } } - return done() + this.cursor = 0 + this.chunk = null + + return null } module.exports = FrameParser