mirror of
https://github.com/DeviceFarmer/stf.git
synced 2026-04-18 13:53:22 +02:00
Give forward plugin its own folder. Should make it easier to clean it up.
This commit is contained in:
269
lib/units/device/plugins/forward/index.js
Normal file
269
lib/units/device/plugins/forward/index.js
Normal file
@@ -0,0 +1,269 @@
|
||||
var net = require('net')
|
||||
|
||||
var Promise = require('bluebird')
|
||||
var syrup = require('syrup')
|
||||
|
||||
var wire = require('../../../../wire')
|
||||
var logger = require('../../../../util/logger')
|
||||
var lifecycle = require('../../../../util/lifecycle')
|
||||
var streamutil = require('../../../../util/streamutil')
|
||||
var wireutil = require('../../../../wire/util')
|
||||
|
||||
var ForwardReader = require('./util/reader')
|
||||
var ForwardWriter = require('./util/writer')
|
||||
|
||||
module.exports = syrup.serial()
|
||||
.dependency(require('../../support/adb'))
|
||||
.dependency(require('../../support/router'))
|
||||
.dependency(require('../../support/push'))
|
||||
.dependency(require('../../resources/minirev'))
|
||||
.dependency(require('../group'))
|
||||
.define(function(options, adb, router, push, minirev, group) {
|
||||
var log = logger.createLogger('device:plugins:forward')
|
||||
var plugin = Object.create(null)
|
||||
|
||||
function ForwardManager() {
|
||||
var forwards = Object.create(null)
|
||||
|
||||
function Forward(conn, to) {
|
||||
var proxies = Object.create(null)
|
||||
|
||||
function Proxy(fd) {
|
||||
function maybeSend() {
|
||||
var chunk
|
||||
while ((chunk = this.read())) {
|
||||
if (!conn.write(chunk)) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function killListeners() {
|
||||
src.removeListener('readable', maybeSend)
|
||||
conn.removeListener('drain', maybeSend)
|
||||
conn.removeListener('end', killListeners)
|
||||
}
|
||||
|
||||
var src = new ForwardWriter(fd)
|
||||
.on('readable', maybeSend)
|
||||
.on('error', function(err) {
|
||||
log.error('Proxy writer %d had an error', fd, to, err.stack)
|
||||
})
|
||||
|
||||
conn.on('drain', maybeSend)
|
||||
conn.on('end', killListeners)
|
||||
|
||||
this.dest = net.connect(to)
|
||||
.once('end', function() {
|
||||
delete proxies[fd]
|
||||
killListeners()
|
||||
})
|
||||
.on('error', function(err) {
|
||||
log.error('Proxy reader %d had an error', fd, to, err.stack)
|
||||
})
|
||||
|
||||
this.dest.pipe(src)
|
||||
|
||||
this.stop = function() {
|
||||
this.dest.end()
|
||||
}
|
||||
}
|
||||
|
||||
conn.pipe(new ForwardReader())
|
||||
.on('packet', function(fd, packet) {
|
||||
var proxy = proxies[fd]
|
||||
|
||||
if (packet) {
|
||||
if (!proxy) {
|
||||
// New connection
|
||||
proxy = proxies[fd] = new Proxy(fd)
|
||||
}
|
||||
|
||||
proxy.dest.write(packet)
|
||||
}
|
||||
else {
|
||||
// The connection ended
|
||||
if (proxy) {
|
||||
proxy.stop()
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
this.end = function() {
|
||||
conn.end()
|
||||
}
|
||||
}
|
||||
|
||||
this.add = function(port, conn, to) {
|
||||
forwards[port] = new Forward(conn, to)
|
||||
}
|
||||
|
||||
this.remove = function(port) {
|
||||
if (forwards[port]) {
|
||||
forwards[port].end()
|
||||
}
|
||||
}
|
||||
|
||||
this.removeAll = function() {
|
||||
Object.keys(forwards).forEach(function(port) {
|
||||
forwards[port].end()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
var manager = new ForwardManager()
|
||||
|
||||
function startService() {
|
||||
log.info('Launching reverse port forwarding service')
|
||||
return adb.shell(options.serial, [
|
||||
'exec'
|
||||
, minirev.bin
|
||||
])
|
||||
.timeout(10000)
|
||||
.then(function(out) {
|
||||
lifecycle.share('Forward shell', out)
|
||||
streamutil.talk(log, 'Forward shell says: "%s"', out)
|
||||
})
|
||||
}
|
||||
|
||||
function connectService(times) {
|
||||
function tryConnect(times, delay) {
|
||||
return adb.openLocal(options.serial, 'localabstract:minirev')
|
||||
.timeout(10000)
|
||||
.catch(function(err) {
|
||||
if (/closed/.test(err.message) && times > 1) {
|
||||
return Promise.delay(delay)
|
||||
.then(function() {
|
||||
return tryConnect(--times, delay * 2)
|
||||
})
|
||||
}
|
||||
return Promise.reject(err)
|
||||
})
|
||||
}
|
||||
log.info('Connecting to reverse port forwarding service')
|
||||
return tryConnect(times, 100)
|
||||
}
|
||||
|
||||
function awaitServer() {
|
||||
return connectService(5)
|
||||
.then(function(conn) {
|
||||
conn.end()
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
plugin.createForward = function(port, to) {
|
||||
log.info(
|
||||
'Creating reverse port forward from ":%d" to "%s:%d"'
|
||||
, port
|
||||
, to.host
|
||||
, to.port
|
||||
)
|
||||
return connectService(1)
|
||||
.then(function(out) {
|
||||
var header = new Buffer(4)
|
||||
header.writeUInt16LE(0, 0)
|
||||
header.writeUInt16LE(port, 2)
|
||||
out.write(header)
|
||||
return manager.add(port, out, to)
|
||||
})
|
||||
}
|
||||
|
||||
plugin.removeForward = function(port) {
|
||||
log.info('Removing reverse port forward ":%d"', port)
|
||||
manager.remove(port)
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
plugin.connect = function(options) {
|
||||
var resolver = Promise.defer()
|
||||
|
||||
var conn = net.connect(options)
|
||||
|
||||
function connectListener() {
|
||||
resolver.resolve(conn)
|
||||
}
|
||||
|
||||
function errorListener(err) {
|
||||
resolver.reject(err)
|
||||
}
|
||||
|
||||
conn.on('connect', connectListener)
|
||||
conn.on('error', errorListener)
|
||||
|
||||
return resolver.promise.finally(function() {
|
||||
conn.removeListener('connect', connectListener)
|
||||
conn.removeListener('error', errorListener)
|
||||
})
|
||||
}
|
||||
|
||||
plugin.reset = function() {
|
||||
manager.removeAll()
|
||||
}
|
||||
|
||||
group.on('leave', plugin.reset)
|
||||
|
||||
return startService()
|
||||
.then(awaitServer)
|
||||
.then(function() {
|
||||
router
|
||||
.on(wire.ForwardTestMessage, function(channel, message) {
|
||||
var reply = wireutil.reply(options.serial)
|
||||
plugin.connect({
|
||||
host: message.targetHost
|
||||
, port: message.targetPort
|
||||
})
|
||||
.then(function(conn) {
|
||||
conn.end()
|
||||
push.send([
|
||||
channel
|
||||
, reply.okay('success')
|
||||
])
|
||||
})
|
||||
.catch(function() {
|
||||
push.send([
|
||||
channel
|
||||
, reply.fail('fail_connect')
|
||||
])
|
||||
})
|
||||
})
|
||||
.on(wire.ForwardCreateMessage, function(channel, message) {
|
||||
var reply = wireutil.reply(options.serial)
|
||||
plugin.createForward(message.devicePort, {
|
||||
host: message.targetHost
|
||||
, port: message.targetPort
|
||||
})
|
||||
.then(function() {
|
||||
push.send([
|
||||
channel
|
||||
, reply.okay('success')
|
||||
])
|
||||
})
|
||||
.catch(function(err) {
|
||||
log.error('Reverse port forwarding failed', err.stack)
|
||||
push.send([
|
||||
channel
|
||||
, reply.fail('fail_forward')
|
||||
])
|
||||
})
|
||||
})
|
||||
.on(wire.ForwardRemoveMessage, function(channel, message) {
|
||||
var reply = wireutil.reply(options.serial)
|
||||
plugin.removeForward(message.devicePort)
|
||||
.then(function() {
|
||||
push.send([
|
||||
channel
|
||||
, reply.okay('success')
|
||||
])
|
||||
})
|
||||
.catch(function(err) {
|
||||
log.error('Reverse port unforwarding failed', err.stack)
|
||||
push.send([
|
||||
channel
|
||||
, reply.fail('fail')
|
||||
])
|
||||
})
|
||||
})
|
||||
})
|
||||
.return(plugin)
|
||||
})
|
||||
75
lib/units/device/plugins/forward/util/reader.js
Normal file
75
lib/units/device/plugins/forward/util/reader.js
Normal file
@@ -0,0 +1,75 @@
|
||||
var util = require('util')
|
||||
var stream = require('stream')
|
||||
|
||||
var HEADER_SIZE = 4
|
||||
|
||||
function ForwardReader() {
|
||||
stream.Transform.call(this)
|
||||
this._header = new Buffer(HEADER_SIZE)
|
||||
this._needLength = -HEADER_SIZE
|
||||
this._target = 0
|
||||
}
|
||||
|
||||
util.inherits(ForwardReader, stream.Transform)
|
||||
|
||||
ForwardReader.prototype._transform = function(chunk, encoding, done) {
|
||||
var cursor = 0
|
||||
|
||||
while (cursor < chunk.length) {
|
||||
var diff = chunk.length - cursor
|
||||
|
||||
// Do we need a header?
|
||||
if (this._needLength < 0) {
|
||||
// Still missing a header?
|
||||
if (chunk.length < -this._needLength) {
|
||||
// Save what we're received so far.
|
||||
chunk.copy(
|
||||
this._header
|
||||
, HEADER_SIZE + this._needLength
|
||||
, cursor
|
||||
, cursor + -this._needLength
|
||||
)
|
||||
break
|
||||
}
|
||||
|
||||
// Combine previous and current chunk in case the header was split.
|
||||
chunk.copy(
|
||||
this._header
|
||||
, HEADER_SIZE + this._needLength
|
||||
, cursor
|
||||
, cursor + -this._needLength
|
||||
)
|
||||
|
||||
cursor += -this._needLength
|
||||
|
||||
this._target = this._header.readUInt16LE(0)
|
||||
this._needLength = this._header.readUInt16LE(2)
|
||||
|
||||
if (this._needLength === 0) {
|
||||
// This is a fin packet
|
||||
this.emit('packet', this._target, null)
|
||||
this._needLength = -HEADER_SIZE
|
||||
}
|
||||
}
|
||||
// Do we have a full data packet?
|
||||
else if (diff >= this._needLength) {
|
||||
this.emit(
|
||||
'packet'
|
||||
, this._target
|
||||
, chunk.slice(cursor, cursor + this._needLength)
|
||||
)
|
||||
cursor += this._needLength
|
||||
this._needLength = -HEADER_SIZE
|
||||
}
|
||||
// We have a partial data packet.
|
||||
else {
|
||||
this.emit('packet', this._target, chunk.slice(cursor, cursor + diff))
|
||||
this._needLength -= diff
|
||||
cursor += diff
|
||||
}
|
||||
}
|
||||
|
||||
done()
|
||||
}
|
||||
|
||||
module.exports = ForwardReader
|
||||
45
lib/units/device/plugins/forward/util/writer.js
Normal file
45
lib/units/device/plugins/forward/util/writer.js
Normal file
@@ -0,0 +1,45 @@
|
||||
var util = require('util')
|
||||
var stream = require('stream')
|
||||
|
||||
var HEADER_SIZE = 4
|
||||
var MAX_PACKET_SIZE = 0xFFFF
|
||||
|
||||
function ForwardWriter(target) {
|
||||
stream.Transform.call(this)
|
||||
this._target = target
|
||||
}
|
||||
|
||||
util.inherits(ForwardWriter, stream.Transform)
|
||||
|
||||
ForwardWriter.prototype._transform = function(chunk, encoding, done) {
|
||||
var header
|
||||
, length
|
||||
|
||||
do {
|
||||
length = Math.min(MAX_PACKET_SIZE, chunk.length)
|
||||
|
||||
header = new Buffer(HEADER_SIZE)
|
||||
header.writeUInt16LE(this._target, 0)
|
||||
header.writeUInt16LE(length, 2)
|
||||
|
||||
this.push(header)
|
||||
this.push(chunk.slice(0, length))
|
||||
|
||||
chunk = chunk.slice(length)
|
||||
}
|
||||
while (chunk.length)
|
||||
|
||||
done()
|
||||
}
|
||||
|
||||
ForwardWriter.prototype._flush = function(done) {
|
||||
var header = new Buffer(HEADER_SIZE)
|
||||
header.writeUInt16LE(this._target, 0)
|
||||
header.writeUInt16LE(0, 2)
|
||||
|
||||
this.push(header)
|
||||
|
||||
done()
|
||||
}
|
||||
|
||||
module.exports = ForwardWriter
|
||||
Reference in New Issue
Block a user