diff --git a/lib/cli.js b/lib/cli.js index 878faa30..5212ec0c 100644 --- a/lib/cli.js +++ b/lib/cli.js @@ -755,6 +755,7 @@ program , profile: options.profile , Bucket: options.bucket , endpoint: options.endpoint + , expires: options.expires }) }) @@ -922,10 +923,18 @@ program , 'websocket port' , Number , 7110) + .option('--storage-type ' + , 'storage type' + , String + , 'temp') .option('--storage-port ' , 'storage port' , Number , 7102) + .option('--storage-options ' + , 'array of options to pass to the storage implementation' + , String + , '[]') .option('--storage-plugin-image-port ' , 'storage image plugin port' , Number @@ -972,7 +981,7 @@ program .option('--use-s3' , 'use amazon s3') .option('--s3-bucket ' - , 'your bucket name' + , 'your Bucket name' , String) .option('--s3-profile ' , 'aws credential Profile name' @@ -991,7 +1000,7 @@ program // Each forked process waits for signals to stop, and so we run over the // default limit of 10. So, it's not a leak, but a refactor wouldn't hurt. process.setMaxListeners(20) - + function run() { var procs = [ // app triproxy @@ -1098,22 +1107,11 @@ program ]) // storage - , (function() { - if (!options.useS3) { - return procutil.fork(__filename, [ - 'storage-temp' - , '--port', options.storagePort - ]); - } else { - return procutil.fork(__filename, [ - 'storage-s3' - , '--port', options.storagePort - , '--bucket', options.s3Bucket - , '--profile', options.s3Profile - , '--endpoint', options.s3Endpoint - ]) - } - })() + , procutil.fork(__filename, [ + util.format('storage-%s', options.storageType) + , '--port', options.storagePort + ].concat(JSON.parse(options.storageOptions))) + // image processor , procutil.fork(__filename, [ 'storage-plugin-image' diff --git a/lib/units/storage/amazons3.js b/lib/units/storage/amazons3.js index 48bae43d..d801c228 100644 --- a/lib/units/storage/amazons3.js +++ b/lib/units/storage/amazons3.js @@ -10,23 +10,23 @@ var Promise = require('bluebird') var uuid = require('node-uuid') var fs = require('fs') +var lifecycle = require('../../util/lifecycle') var logger = require('../../util/logger') -var Storage = require('../../util/storage') var requtil = require('../../util/requtil') -var download = require('../../util/download') var AWS = require('aws-sdk') -var fs = require('fs'); module.exports = function(options) { var log = logger.createLogger('storage:s3') - , app = express() - , server = http.createServer(app) - , credentials = new AWS.SharedIniFileCredentials({ - profile: options.profile}) - , bucket = options.Bucket - , s3 + , app = express() + , server = http.createServer(app) + , credentials = new AWS.SharedIniFileCredentials({ + profile: options.profile + }) + , bucket = options.Bucket + , s3 + // TODO: remove later log.debug(options) AWS.config.credentials = credentials; @@ -39,151 +39,72 @@ module.exports = function(options) { app.use(bodyParser.json()) app.use(validator()) - function makePutParams(id, path, name) { - var key = id + name - ? util.format('/%s', name) - : '' - , rs = fs.createReadStream(path) - return { - Bucket: bucket - , Key: key - , Body: rs - } - } + function putObject(plugin, uriorpath, name) { + return new Promise(function(resolve, reject) { + var id = uuid.v4() + var key = id + var filename = name? name: path.basename(uriorpath) + var rs = fs.createReadStream(uriorpath) + s3.putObject({ + Key: key + , Body: rs + , Bucket: bucket + , Metadata: { + plugin: plugin + , name: filename + } + }, function(err, data) { + if (err) { + log.error('failed to store "%s" bucket:"%s"', key, bucket) + log.error(err); + reject(err); + } else { + log.info('Stored "%s" to %s/%s', name, bucket, key) + resolve({ + id: id + , name: filename + }) + } + }) + }) + } + function getHref(plugin, id, name) { + return util.format( + '/s/%s/%s%s' + , plugin + , id + , name ? '/' + path.basename(name) : '' + ) + } app.post('/s/download/:plugin', function(req, res) { requtil.validate(req, function() { req.checkBody('url').notEmpty() }) - .then(function() { - return download(req.body.url, { - dir: options.cahedir + .then(function() { + return putObject(req.params.plugin, req.body.url) }) - }) - .then(function(file){ - return new Promise(function(resolve, reject) { - var id = uuid.v4() - , params = makePutParams(id, file.path) - s3.putObject(params, function(err, data) { - if (err) { - log.err('failed to store "%s"', params.Key); - reject(err) - } else { - resolve({ - id: id - , name: file.name - }) - } - }) - }) - }) - .then(function(file){ - res.status(201) - .json({ - success: true + .then(function(obj) { + var plugin = req.params.plugin + res.status(201) + .json({ + success: true , resource: { - date: new Date() - , plugin: plugin - , id: file.id - , name: file.name - , href: util.format( - '/s/%s/%s%s' - , plugin - , file.id - , file.name - ? util.format('/%s', path.basename(file.name)) - : '' - ) - } - }) - - }) - .catch(requtil.ValidationError, function(err) { - res.status(400) - .json({ - success: false + date: new Date() + , plugin: plugin + , id: obj.id + , name: obj.name + , href: getHref(plugin, obj.id, obj.name) + } + }) + }) + .catch(requtil.ValidationError, function(err) { + res.status(400) + .json({ + success: false , error: 'ValidationError' , validationErrors: err.errors - }) - }) - .catch(function(err) { - log.error('Error storing resource', err.stack) - res.status(500) - .json({ - success: false - , error: 'ServerError' - }) - }) - }) - - app.post('/s/upload/:plugin', function(req, res) { - var form = new formidable.IncomingForm() - var plugin = req.params.plugin - Promise.promisify(form.parse, form)(req) - .spread(function(fields, files) { - var requests = Object.keys(files).map(function(field) { - var file = files[field] - log.info('Uploaded "%s" to "%s"', file.name, file.path) - - return new Promise(function(resolve, reject) { - var id = uuid.v4() - , key = id + '/' + file.name - , rs = fs.createReadStream(file.path) - , params = { - Key: key - , Body: rs - , Bucket: bucket - , Metadata: { - field: field - , plugin: plugin - } - }; - s3.putObject(params, function(err, data) { - if (err) { - log.error('failed to store "%s" bucket:"%s"', - key, bucket) - log.error(err); - reject(err); - } else { - log.info('Stored "%s" to %s/%s/%s', file.name, - bucket, id, file.name) - resolve({ - field: field - , id: id - , name: file.name - }); - } - }); - }); - }); - return Promise.all(requests) - }) - .then(function(storedFiles) { - res.status(201) - .json({ - success: true - , resources: (function() { - var mapped = Object.create(null) - storedFiles.forEach(function(file) { - var plugin = req.params.plugin - mapped[file.field] = { - date: new Date() - , plugin: plugin - , id: file.id - , name: file.name - , href: util.format( - '/s/%s/%s%s' - , plugin - , file.id - , file.name - ? util.format('/%s', path.basename(file.name)) - : '' - ) - } - }) - return mapped; - })() - }) + }) }) .catch(function(err) { log.error('Error storing resource', err.stack) @@ -193,29 +114,84 @@ module.exports = function(options) { , error: 'ServerError' }) }) + }) + + app.post('/s/upload/:plugin', function(req, res, next) { + var form = new formidable.IncomingForm() + var plugin = req.params.plugin + Promise.promisify(form.parse, form)(req) + .spread(function(fields, files) { + var requests = Object.keys(files).map(function(field) { + var file = files[field] + log.info('Uploaded "%s" to "%s"', file.name, file.path) + return putObject(plugin, file.path, file.name) + .then(function (obj) { + return { + field: field + , id: obj.id + , name: obj.name + , temppath: file.path + } + }) + }) + return Promise.all(requests) + }) + .then(function(storedFiles) { + res.status(201).json({ + success: true, + resources: (function() { + var mapped = Object.create(null) + storedFiles.forEach(function(file) { + var plugin = req.params.plugin + mapped[file.field] = { + date: new Date() + , plugin: plugin + , id: file.id + , name: file.name + , href: getHref(plugin, file.id, file.name) + } + }) + return mapped; + })() + }) + return storedFiles + }) + .then(function (storedFiles){ + storedFiles.forEach(function (file){ + log.debug('cleaned up: %s', file.temppath) + fs.unlink(file.temppath) + }) + }) + .catch(function(err) { + log.error('Error storing resource', err.stack) + res.status(500) + .json({ + success: false, + error: 'ServerError' + }) + }) }); app.get('/s/blob/:id/:name', function(req, res) { - var path = util.format('%s/%s', req.params.id, req.params.name) var params = { - Key: path - , Bucket: bucket + Key: req.params.id, + Bucket: bucket }; s3.getObject(params, function(err, data) { - if (err) { - log.error('failed to retreive[' + path + ']') - log.error(err, err.stack); - res.sendStatus(404) + if (err) { + log.error('failed to retreive[' + path + ']') + log.error(err, err.stack); + res.sendStatus(404) } else { - res.set({ - 'Content-type': data.ContentType - }); - res.send(data.Body); + res.set({ + 'Content-type': data.ContentType + }); + res.send(data.Body); } - }); - }); + }) + }) + // initialize server.listen(options.port) console.log('Listening on port %d', options.port) } - diff --git a/package.json b/package.json index 4f12fe76..20bb3f4c 100644 --- a/package.json +++ b/package.json @@ -33,6 +33,7 @@ "adbkit": "^2.3.1", "adbkit-apkreader": "^1.0.0", "adbkit-monkey": "^1.0.1", + "aws-sdk": "^2.2.3", "bluebird": "^2.9.34", "body-parser": "^1.13.3", "chalk": "~1.0.0",