Allows for multiple types of load-balancing methods to be selected and allows for instances to be load-balanced by weight. Allows for instances to be named. Fixes bug with instance cleanup

This commit is contained in:
Zachary Boyd 2018-05-09 21:09:38 -07:00
parent 4bf03cb299
commit 6878081797
10 changed files with 82 additions and 18 deletions

View file

@ -32,7 +32,7 @@ The following command line switches and their environment variable equivalents a
|-h, --httpPort |HTTP_PORT |Port the HTTP proxy will bind to|
|-l, --logLevel |LOG_LEVEL |Log level (defaults to "info") set to "null" to disable logging. To see a log of all network traffic set logLevel to "verbose"|
|-p, --parentDataDirectory|PARENT_DATA_DIRECTORY |Parent directory that will contain the data directories for the instances|
|-b, --loadBalanceMethod|LOAD_BALANCE_METHOD|Method that will be used to sort the instances between each request. Currently supports "round_robin" and "weighted".|
For example: `tor-router -j 3 -s 9050` would start the proxy with 3 tor instances and listen for SOCKS connections on 9050.
## Configuration

View file

@ -62,10 +62,15 @@ let argv_config = require('yargs')
alias: 'parentDataDirectory',
describe: 'Parent directory that will contain the data directories for the instances',
demand: false
},
b: {
alias: "loadBalanceMethod",
describe: 'Method that will be used to sort the instances between each request. Currently supports "round_robin" and "weighted". [default: round_robin]',
demand: false
}
});
let env_whitelist = [ "CONTROL_PORT", "INSTANCES", "SOCKS_PORT", "DNS_PORT", "HTTP_PORT", "LOG_LEVEL", 'PARENT_DATA_DIRECTORIES', "controlPort", "instances", "socksPort", "dnsPort", "httpPort", "logLevel", 'parentDataDirectories' ];
let env_whitelist = [ "CONTROL_PORT", "INSTANCES", "SOCKS_PORT", "DNS_PORT", "HTTP_PORT", "LOG_LEVEL", 'PARENT_DATA_DIRECTORIES', 'LOAD_BALANCE_METHOD', "controlPort", "instances", "socksPort", "dnsPort", "httpPort", "logLevel", 'parentDataDirectories', 'loadBalanceMethod'];
nconf
.env({
@ -115,6 +120,10 @@ let dns_port = nconf.get('dnsPort');
let http_port = nconf.get('httpPort');
let control_port = nconf.get('controlPort');
if (typeof(control_port) === 'boolean') {
control_port = 9077;
nconf.set('controlPort', 9077);
}
let control = new ControlServer(logger, nconf);
@ -123,14 +132,26 @@ process.on('SIGHUP', () => {
});
if (socks_port) {
if (typeof(socks_port) === 'boolean') {
socks_port = 9050;
nconf.set('socksPort', 9077);
}
let socks = control.createSOCKSServer(socks_port);
}
if (http_port) {
if (typeof(http_port) === 'boolean') {
http_port = 9080;
nconf.set('httpPort', 9077);
}
let http = control.createHTTPServer(http_port);
}
if (dns_port) {
if (typeof(dns_port) === 'boolean') {
dns_port = 9053;
nconf.set('dnsPort', 9077);
}
let dns = control.createDNSServer(dns_port);
}
@ -149,6 +170,12 @@ function cleanUp(error) {
async.each(control.torPool.instances, (instance, next) => {
instance.exit(next);
}, (exitError) => {
if (error) {
console.error(error.stack);
}
if (exitError){
console.error(exitError);
}
process.exit(Number(Boolean(error || exitError)));
});
}

5
package-lock.json generated
View file

@ -739,6 +739,11 @@
}
}
},
"js-weighted-list": {
"version": "0.1.1",
"resolved": "https://registry.npmjs.org/js-weighted-list/-/js-weighted-list-0.1.1.tgz",
"integrity": "sha1-iIhN0JGgScjltGMKFWSB8xe9Sq8="
},
"jsbn": {
"version": "0.1.1",
"resolved": "https://registry.npmjs.org/jsbn/-/jsbn-0.1.1.tgz",

View file

@ -25,6 +25,7 @@
"eventemitter2": "^3.0.0",
"get-port": "^2.1.0",
"jrpc2": "^1.0.5",
"js-weighted-list": "^0.1.1",
"lodash": "^4.17.4",
"nanoid": "^1.0.2",
"native-dns": "git+https://github.com/znetstar/node-dns.git",

View file

@ -8,6 +8,7 @@ class ControlServer {
constructor(logger, nconf) {
this.torPool = new TorPool(null, null, logger, nconf);
this.logger = logger;
this.nconf = nconf;
let server = this.server = new rpc.Server();
server.expose('createTorPool', this.createTorPool.bind(this));
@ -20,7 +21,7 @@ class ControlServer {
if (!this.torPool)
return reject({ message: 'No pool created' });
resolve(this.torPool.instances.map((i) => ( { dns_port: i.dns_port, socks_port: i.socks_port, process_id: i.process.pid } )) );
resolve(this.torPool.instances.map((i) => ( { dns_port: i.dns_port, socks_port: i.socks_port, process_id: i.process.pid, config: i.definition.Config, weight: i.definition.weight } )) );
});
}).bind(this));
@ -51,7 +52,7 @@ class ControlServer {
});
}).bind(this) );
server.expose('removeInstancesAt', (function (instance_index) {
server.expose('removeInstanceAt', (function (instance_index) {
return new Promise((resolve, reject) => {
this.torPool.remove_at(instance_index, (error) => {
if (error) reject(error);
@ -80,6 +81,14 @@ class ControlServer {
return Promise.resolve();
}).bind(this) );
server.expose('setTorConfig', (function (config) {
this.nconf.set('torConfig', config);
return Promise.resolve();
}).bind(this));
server.expose('getTorConfig', (function (config) {
return Promise.resolve(this.nconf.get('torConfig'));
}).bind(this));
}
listen(port, callback) {

View file

@ -3,7 +3,7 @@ const UDPServer = require('native-dns').UDPServer;
class DNSServer extends UDPServer {
constructor(tor_pool, logger, nconf) {
super(this.nconf.get('dns:options'));
super(nconf.get('dns:options'));
this.logger = logger;
this.tor_pool = tor_pool;
@ -21,7 +21,7 @@ class DNSServer extends UDPServer {
if (!err && answer) {
for (let a of answer.answer){
res.answer.push(a);
this.logger && this.logger.verbose(`[dns]: ${question.name} type ${dns.consts.QTYPE_TO_NAME[question.type]} → 127.0.0.1:${dns_port}${a.address}`)
this.logger && this.logger.verbose(`[dns]: ${question.name} type ${dns.consts.QTYPE_TO_NAME[question.type]} → 127.0.0.1:${dns_port}${tor_instance.definition.Name ? ' ('+tor_instance.definition.Name+')' : '' }${a.address}`)
}
}
});

View file

@ -96,7 +96,7 @@ class HTTPServer extends Server {
let connect = (tor_instance) => {
let socks_port = tor_instance.socks_port;
logger && logger.verbose(`[http-connect]: ${req.connection.remoteAddress}:${req.connection.remotePort} → 127.0.0.1:${socks_port}${hostname}:${port}`)
logger && logger.verbose(`[http-connect]: ${req.connection.remoteAddress}:${req.connection.remotePort} → 127.0.0.1:${socks_port}${tor_instance.definition.Name ? ' ('+tor_instance.definition.Name+')' : '' }${hostname}:${port}`)
var outbound_socket;
let onClose = (error) => {

View file

@ -34,7 +34,7 @@ class SOCKSServer extends SOCKS5Server{
let connect = (tor_instance) => {
let socks_port = tor_instance.socks_port;
logger && logger.verbose(`[socks]: ${info.srcAddr}:${info.srcPort} → 127.0.0.1:${socks_port}${info.dstAddr}:${info.dstPort}`)
logger && logger.verbose(`[socks]: ${info.srcAddr}:${info.srcPort} → 127.0.0.1:${socks_port}${tor_instance.definition.Name ? ' ('+tor_instance.definition.Name+')' : '' }${info.dstAddr}:${info.dstPort}`)
d.on('error', onClose);

View file

@ -24,17 +24,34 @@ const _ = require('lodash');
const path = require('path');
const nanoid = require('nanoid');
const fs = require('fs');
const WeightedList = require('js-weighted-list');
const load_balance_methods = {
round_robin: function (instances) {
return instances.rotate(1);
},
weighted: function (instances) {
if (!instances._weighted_list) {
instances._weighted_list = new WeightedList(
instances.map((instance) => {
return [ instance.id, instance.definition.Weight, instance ]
})
);
}
return instances._weighted_list.peek(instances.length).map((element) => element.data);
}
};
class TorPool extends EventEmitter {
constructor(tor_path, default_config, logger, nconf) {
constructor(tor_path, default_config, logger, nconf, data_directory, load_balance_method) {
super();
this._instances = [];
default_config = _.extend({}, (default_config || {}), nconf.get('torConfig'));
this.default_tor_config = default_config;
this.data_directory = nconf.get('parentDataDirectory');
this.data_directory = data_directory || nconf.get('parentDataDirectory');
this.load_balance_method = load_balance_method || nconf.get('loadBalanceMethod');
!fs.existsSync(this.data_directory) && fs.mkdirSync(this.data_directory);
this.tor_path = tor_path || 'tor';
this._instances = [];
this.logger = logger;
this.nconf = nconf;
}
@ -47,7 +64,7 @@ class TorPool extends EventEmitter {
instance_definition.Config = instance_definition.Config || {};
instance_definition.Config = _.extend(instance_definition.Config, this.default_tor_config);
let instance_id = nanoid();
instance_definition.Config.DataDirectory = instance_definition.Config.DataDirectory || path.join(this.data_directory, instance_id);
instance_definition.Config.DataDirectory = instance_definition.Config.DataDirectory || path.join(this.data_directory, (instance_definition.Name || instance_id));
let instance = new TorProcess(this.tor_path, instance_definition.Config, this.logger, this.nconf);
instance.id = instance_id;
instance.definition = instance_definition;
@ -95,7 +112,7 @@ class TorPool extends EventEmitter {
}
next() {
this._instances = this._instances.rotate(1);
this._instances = load_balance_methods[this.nconf.get('loadBalanceMethod')](this._instances);
return this.instances[0];
}
@ -116,4 +133,6 @@ class TorPool extends EventEmitter {
}
};
TorPool.LoadBalanceMethods = load_balance_methods;
module.exports = TorPool;

View file

@ -22,10 +22,10 @@ class TorProcess extends EventEmitter {
}
exit(callback) {
this.process.once('exit', (code) => {
this.once('process_exit', (code) => {
callback && callback(null, code);
});
this.process.kill('SIGKILL');
this.process.kill('SIGINT');
}
new_ip() {
@ -73,8 +73,11 @@ class TorProcess extends EventEmitter {
shell: '/bin/bash'
});
tor.on('exit', () => {
del.sync(this.tor_config.DataDirectory);
tor.on('close', (code) => {
this.emit('process_exit', code);
if (this.definition && !this.definition.Name) {
del.sync(this.tor_config.DataDirectory);
}
});
tor.stderr.on('data', (data) => {