Replaces callbacks with Promises throughout the rest of the application.

This commit is contained in:
Zachary Boyd 2018-09-09 01:45:12 -04:00
parent d1265253d2
commit a74409ddb7
7 changed files with 215 additions and 354 deletions

View file

@ -3,7 +3,8 @@
## [4.0.0] - 2018-09-09
### Changes
- All methods now return promises instead of accepting callbacks. Methods now take advantage of async/await resulting in clearer code. Compatibility with node < v8 will be broken.
- All methods now return promises instead of accepting callbacks. Methods now take advantage of async/await to increase readability.
- The `logger` argument to the constructor's of all classes is now optional
### Removes
- The `new_ips` and `new_ip_at` TorPool and `new_ip` TorProcess have been removed. Use `new_identites`, `new_identity_at` and `new_identity` instead.

View file

@ -3,12 +3,11 @@ const SOCKSServer = require('./SOCKSServer');
const DNSServer = require('./DNSServer');
const HTTPServer = require('./HTTPServer');
const rpc = require('jrpc2');
const async = require('async');
class ControlServer {
constructor(logger, nconf) {
this.torPool = new TorPool(nconf.get('torPath'), null, nconf.get('parentDataDirectory'), nconf.get('loadBalanceMethod'), nconf.get('granaxOptions'),logger);
this.logger = logger;
this.logger = logger || require('./winston-silent-logger');
this.nconf = nconf;
let server = this.server = new rpc.Server();
@ -21,247 +20,99 @@ class ControlServer {
return { name: i.instance_name, dns_port: i.dns_port, socks_port: i.socks_port, process_id: i.process.pid, config: i.definition.Config, weight: i.definition.weight };
};
server.expose('queryInstances', (function () {
return new Promise((resolve, reject) => {
if (!this.torPool)
return reject({ message: 'No pool created' });
resolve(this.torPool.instances.map(instance_info) );
});
server.expose('queryInstances', (async () => {
if (!this.torPool)
throw new Error('No instances created');
this.torPool.instances.map(instance_info);
}).bind(this));
server.expose('queryInstanceByName', (function (instance_name) {
return new Promise((resolve, reject) => {
if (!this.torPool)
return reject({ message: 'No pool created' });
server.expose('queryInstanceByName', (async (instance_name) => {
if (!this.torPool)
throw new Error('No pool created');
let i = this.torPool.instance_by_name(instance_name);
let instance = this.torPool.instance_by_name(instance_name);
if (!i)
return reject({ message: `Instance "${instance_name}"" does not exist` });
if (!instance)
throw new Error(`Instance "${instance_name}"" does not exist`);
resolve(instance_info(i));
});
return instance_info(i)
}).bind(this));
server.expose('queryInstanceAt', (function (index) {
return new Promise((resolve, reject) => {
if (!this.torPool)
return reject({ message: 'No pool created' });
server.expose('queryInstanceAt', (async (index) => {
if (!this.torPool)
throw new Error('No pool created');
let i = this.torPool.instance_at(index);
let instance = this.torPool.instance_at(index);
if (!i)
return reject({ message: `Instance at "${i}"" does not exist` });
if (!instance)
throw new Error(`Instance at "${i}"" does not exist`);
resolve(instance_info(this.torPool.instance_at(index)));
});
return instance_info(this.torPool.instance_at(index));
}).bind(this));
server.expose('createInstances', (function (instances) {
return new Promise((resolve, reject) => {
this.torPool.create(instances, (error, instances) => {
if (error) reject(error);
else resolve();
});
});
}).bind(this) );
server.expose('createInstances', this.torPool.create.bind(this.torPool));
server.expose('addInstances', this.torPool.add.bind(this.torPool));
server.expose('addInstances', (function (instances) {
return new Promise((resolve, reject) => {
this.torPool.add(instances, (error, instances) => {
if (error) reject(error);
else resolve();
});
});
}).bind(this) );
server.expose('removeInstances', this.torPool.remove.bind(this.torPool));
server.expose('removeInstances', (function (instances) {
return new Promise((resolve, reject) => {
this.torPool.remove(instances, (error) => {
if (error) reject(error);
else resolve();
});
});
}).bind(this) );
server.expose('removeInstanceAt', this.torPool.remove_at.bind(this.torPool));
server.expose('removeInstanceAt', (function (instance_index) {
return new Promise((resolve, reject) => {
this.torPool.remove_at(instance_index, (error) => {
if (error) reject(error);
else resolve();
});
});
}).bind(this) );
server.expose('removeInstanceByName', this.torPool.remove_by_name.bind(this.torPool));
server.expose('removeInstanceByName', (function (instance_name) {
return new Promise((resolve, reject) => {
this.torPool.remove_by_name(instance_name, (error) => {
if (error) reject(error);
else resolve();
});
});
}).bind(this) );
server.expose('newIdentites', this.torPool.new_identites.bind(this.torPool));
server.expose('newIdentites', (function() {
return new Promise((resolve, reject) => {
this.torPool.new_identites((error) => {
if (error) reject(error);
else resolve();
});
});
server.expose('newIdentityAt', this.torPool.new_identity_at.bind(this.torPool));
server.expose('newIdentityByName', this.torPool.new_identity_by_name.bind(this.torPool));
server.expose('nextInstance', (async () => this.torPool.next()).bind(this));
server.expose('closeInstances', (async () => this.torPool.exit()).bind(this));
server.expose('getDefaultTorConfig', (async () => {
return this.nconf.get('torConfig');
}).bind(this));
server.expose('newIdentityAt', (function(index) {
return new Promise((resolve, reject) => {
this.torPool.new_identity_at(index, (error) => {
if (error) reject(error);
else resolve();
});
});
}).bind(this));
server.expose('newIdentityByName', (function(name) {
return new Promise((resolve, reject) => {
this.torPool.new_identity_by_name(name, (error) => {
if (error) reject(error);
else resolve();
});
});
}).bind(this));
/* Begin Deprecated */
server.expose('newIps', (function() {
return new Promise((resolve, reject) => {
this.torPool.new_ips((error) => {
if (error) reject(error);
else resolve();
});
});
}).bind(this));
server.expose('newIpAt', (function(index) {
return new Promise((resolve, reject) => {
this.torPool.new_ip_at(index, (error) => {
if (error) reject(error);
else resolve();
});
});
}).bind(this));
/* End Deprecated */
server.expose('nextInstance', (function () {
this.torPool.next();
return Promise.resolve();
}).bind(this) );
server.expose('closeInstances', (function () {
this.torPool.exit();
return Promise.resolve();
}).bind(this) );
server.expose('getDefaultTorConfig', (function () {
return Promise.resolve(this.nconf.get('torConfig'));
}).bind(this));
server.expose('setDefaultTorConfig', (function (config) {
server.expose('setDefaultTorConfig', (async (config) => {
this.nconf.set('torConfig', config);
return Promise.resolve();
}).bind(this));
server.expose('setTorConfig', (function (config) {
return new Promise((resolve, reject) => {
async.each(Object.keys(config), function (key, next) {
var value = config[key];
server.expose('setTorConfig', (async (config) => {
await Promise.all(Object.keys(config).map((key) => {
var value = config[key];
this.torPool.set_config_all(key, value, next);
}, function (error){
if (error) reject(error);
resolve();
});
});
return this.torPool.set_config_all(key, value);
}));
}).bind(this));
server.expose('getLoadBalanceMethod', (function () {
return Promise.resolve(this.torPool.load_balance_method);
server.expose('getLoadBalanceMethod', (async () => {
return this.torPool.load_balance_method;
}).bind(this));
server.expose('setLoadBalanceMethod', (function (loadBalanceMethod) {
server.expose('setLoadBalanceMethod', (async (loadBalanceMethod) => {
this.torPool.load_balance_method = loadBalanceMethod;
return Promise.resolve();
}).bind(this));
server.expose('getInstanceConfigByName', (function (name, keyword) {
return new Promise((resolve, reject) => {
this.torPool.get_config_by_name(name, keyword, (error, value) => {
if (error) reject(error);
else resolve(value);
});
});
}).bind(this));
server.expose('getInstanceConfigByName', this.torPool.get_config_by_name.bind(this.torPool));
server.expose('getInstanceConfigAt', (function (index, keyword) {
return new Promise((resolve, reject) => {
this.torPool.get_config_at(index, keyword, (error, value) => {
if (error) reject(error);
else resolve(value);
});
});
}).bind(this));
server.expose('getInstanceConfigAt', this.torPool.get_config_at.bind(this.torPool));
server.expose('setInstanceConfigByName', (function (name, keyword, value) {
return new Promise((resolve, reject) => {
this.torPool.set_config_by_name(name, keyword, value, (error) => {
if (error) reject(error);
else resolve();
});
});
}).bind(this));
server.expose('setInstanceConfigByName', this.torPool.set_config_by_name.bind(this.torPool));
server.expose('setInstanceConfigAt', (function (index, keyword, value) {
return new Promise((resolve, reject) => {
this.torPool.set_config_at(index, keyword, value, (error) => {
if (error) reject(error);
else resolve();
});
});
}).bind(this));
server.expose('setInstanceConfigAt', this.torPool.set_config_at.bind(this.torPool));
server.expose('signalAllInstances', this.torPool.signal_all.bind(this.torPool));
server.expose('signalAllInstances', (function (signal) {
return new Promise((resolve, reject) => {
this.torPool.signal_all(signal, (error) => {
if (error) reject(error);
else resolve();
});
});
}).bind(this));
server.expose('signalInstanceAt', this.torPool.signal_at.bind(this.torPool));
server.expose('signalInstanceAt', (function (index, signal, callback) {
return new Promise((resolve, reject) => {
this.torPool.signal_at(index, signal, (error) => {
if (error) reject(error);
else resolve();
});
});
}).bind(this));
server.expose('signalInstanceByName', (function (name, signal, callback) {
return new Promise((resolve, reject) => {
this.torPool.signal_by_name(name, signal, (error) => {
if (error) reject(error);
else resolve();
});
});
}).bind(this));
server.expose('signalInstanceByName', this.torPool.signal_by_name.bind(this.torPool));
}
listen(port, callback) {
listen(port) {
this.tcpTransport = new rpc.tcpTransport({ port });
this.tcpTransport.listen(this.server);
callback && callback();
}
close() {
@ -273,28 +124,25 @@ class ControlServer {
return this.torPool;
}
createSOCKSServer(port, callback) {
createSOCKSServer(port) {
this.socksServer = new SOCKSServer(this.torPool, this.logger);
this.socksServer.listen(port || 9050);
this.logger.info(`[socks]: Listening on ${port}`);
this.socksServer;
callback && callback();
}
createHTTPServer(port, callback) {
createHTTPServer(port) {
this.httpServer = new HTTPServer(this.torPool, this.logger);
this.httpServer.listen(port || 9080);
this.logger.info(`[http]: Listening on ${port}`);
this.httpServer;
callback && callback();
}
createDNSServer(port, callback) {
createDNSServer(port) {
this.dnsServer = new DNSServer(this.torPool, this.nconf.get('dns:options'), this.nconf.get('dns:timeout'), this.logger);
this.dnsServer.serve(port || 9053);
this.logger.info(`[dns]: Listening on ${port}`);
this.dnsServer;
callback && callback();
}
};

View file

@ -1,13 +1,13 @@
const dns = require('native-dns');
const UDPServer = require('native-dns').UDPServer;
const { UDPServer } = require('native-dns');
class DNSServer extends UDPServer {
constructor(tor_pool, dns_options, dns_timeout, logger) {
super(dns_options);
this.logger = logger;
this.logger = logger || require('./winston-silent-logger');
this.tor_pool = tor_pool;
var handle_dns_request = (req, res) => {
const handle_dns_request = (req, res) => {
let connect = (tor_instance) => {
for (let question of req.question) {
let dns_port = (tor_instance.dns_port);
@ -27,7 +27,7 @@ class DNSServer extends UDPServer {
});
outbound_req.on('error', (err) => {
this.logger.error(`[dns]: an error occured while handling ar request: ${err.message}`);
});

View file

@ -1,16 +1,19 @@
const http = require('http');
const Server = http.Server;
const socks = require('socksv5');
const URL = require('url');
const { Server } = http;
const socks = require('socksv5');
const SocksProxyAgent = require('socks-proxy-agent');
const TOR_ROUTER_PROXY_AGENT = 'tor-router';
class HTTPServer extends Server {
constructor(tor_pool, logger) {
let handle_http_connections = (req, res) => {
let url = URL.parse(req.url);
url.port = url.port || 80;
var buffer = [];
let buffer = [];
function onIncomingData(chunk) {
buffer.push(chunk);
@ -23,7 +26,7 @@ class HTTPServer extends Server {
req.on('data', onIncomingData);
req.on('end', preConnectClosed);
req.on('error', function (err) {
logger.error("[http-proxy]: an error occured\n"+err.stack);
logger.error("[http-proxy]: an error occured: "+err.message);
});
let connect = (tor_instance) => {
@ -112,7 +115,7 @@ class HTTPServer extends Server {
outbound_socket && outbound_socket.on('close', onClose);
outbound_socket && outbound_socket.on('error', onClose);
inbound_socket.write('HTTP/1.1 200 Connection Established\r\n'+'Proxy-agent: tor-router\r\n' +'\r\n');
inbound_socket.write(`HTTP/1.1 200 Connection Established\r\n'+'Proxy-agent: ${TOR_ROUTER_PROXY_AGENT}\r\n` +'\r\n');
outbound_socket.write(head);
outbound_socket.pipe(inbound_socket);
@ -130,7 +133,7 @@ class HTTPServer extends Server {
super(handle_http_connections);
this.on('connect', handle_connect_connections);
this.logger = logger;
this.logger = logger || require('./winston-silent-logger');
this.tor_pool = tor_pool;
}
};

View file

@ -1,7 +1,7 @@
const socks = require('socksv5');
const SOCKS5Server = socks.Server;
const { Server } = socks;
class SOCKSServer extends SOCKS5Server{
class SOCKSServer extends Server{
constructor(tor_pool, logger) {
let handleConnection = (info, accept, deny) => {
let inbound_socket = accept(true);
@ -67,7 +67,7 @@ class SOCKSServer extends SOCKS5Server{
super(handleConnection);
this.logger = logger;
this.logger = logger || require('./winston-silent-logger');;
this.useAuth(socks.auth.None());
}

View file

@ -101,19 +101,20 @@ class TorProcess extends EventEmitter {
let controlPort = this._control_port = await getPort();
let options = {
DNSPort: `127.0.0.1:${context.dnsPort}`,
SocksPort: `127.0.0.1:${context.socksPort}`,
ControlPort: `127.0.0.1:${context.controlPort}`,
DNSPort: `127.0.0.1:${dnsPort}`,
SocksPort: `127.0.0.1:${socksPort}`,
ControlPort: `127.0.0.1:${controlPort}`,
HashedControlPassword: shell.exec(`${this.tor_path} --quiet --hash-password "${this.control_password}"`, { async: false, silent: true }).stdout.trim()
};
let config = _.extend(_.extend({}, this.tor_config), options);
let text = Object.keys(config).map((key) => `${key} ${config[key]}`).join(os.EOL);
let configFilePath = await temp.openAsync('tor-router').path;
fs.writeFileAsync(configFilePath, text);
let configFile = await temp.openAsync('tor-router');
let configPath = configFile.path;
fs.writeFileAsync(configPath, text);
let tor = spawn(this.tor_path, ['-f', context.configPath], {
let tor = spawn(this.tor_path, ['-f', configPath], {
stdio: ['ignore', 'pipe', 'pipe'],
detached: false
});
@ -121,7 +122,7 @@ class TorProcess extends EventEmitter {
tor.on('close', (code) => {
this.emit('process_exit', code);
if (this.definition && !this.definition.Name) {
del.sync(this.tor_config.DataDirectory, { force: true });
del(this.tor_config.DataDirectory, { force: true });
}
});
@ -139,7 +140,7 @@ class TorProcess extends EventEmitter {
this.on('control_listen', () => {
this._controller = new TorController(connect(this._control_port), _.extend({ authOnConnect: false }, this.granax_options));
Promise.promisifyAll(this._controller);
this.controller.on('ready', () => {
this.logger.debug(`[tor-${this.instance_name}]: connected to tor control port`);
this.controller.authenticate(`"${this.control_password}"`, (err) => {

View file

@ -1,21 +1,16 @@
var nconf = require('nconf');
var yargs = require('yargs');
var fs = require('fs');
var TorRouter = require('../');
var SOCKSServer = TorRouter.SOCKSServer;
var DNSServer = TorRouter.DNSServer;
var TorPool = TorRouter.TorPool;
var ControlServer = TorRouter.ControlServer;
var winston = require('winston');
var async = require('async');
const fs = require('fs');
process.title = 'tor-router';
const nconf = require('nconf');
const yargs = require('yargs');
const winston = require('winston')
const Promise = require('bluebird');
let package_json = JSON.parse(fs.readFileSync(`${__dirname}/../package.json`, 'utf8'));
const { ControlServer } = require('./');
function main(nconf, logger) {
const package_json = JSON.parse(fs.readFileSync(`${__dirname}/../package.json`, 'utf8'));
async function main(nconf, logger) {
let instances = nconf.get('instances');
let log_level = logLevel;
let socks_port = nconf.get('socksPort');
let dns_port = nconf.get('dnsPort');
let http_port = nconf.get('httpPort');
@ -28,124 +23,137 @@ function main(nconf, logger) {
let control = new ControlServer(logger, nconf);
process.on('SIGHUP', () => {
control.torPool.new_ips();
try {
if (socks_port) {
if (typeof(socks_port) === 'boolean') {
socks_port = 9050;
nconf.set('socksPort', socks_port);
}
control.createSOCKSServer(socks_port);
}
if (http_port) {
if (typeof(http_port) === 'boolean') {
http_port = 9080;
nconf.set('httpPort', http_port);
}
control.createHTTPServer(http_port);
}
if (dns_port) {
if (typeof(dns_port) === 'boolean') {
dns_port = 9053;
nconf.set('dnsPort', dns_port);
}
control.createDNSServer(dns_port);
}
if (instances) {
logger.info(`[tor]: starting ${Array.isArray(instances) ? instances.length : instances} tor instances...`)
await control.torPool.create(instances);
logger.info('[tor]: tor started');
}
await control.listen(control_port);
logger.info(`[control]: control Server listening on ${control_port}`);
} catch (error) {
logger.error(`[global]: error starting application: ${error.stack}`);
process.exit(1);
}
const cleanUp = (async (error) => {
let thereWasAnExitError = false;
let { handleError } = this;
try {
await Promise.all(control.torPool.instances.map((instance) => instance.exit()));
} catch (exitError) {
logger.error(`[global]: error closing tor instances: ${exitError.message}`);
thereWasAnExitError = true;
}
if (handleError && error) {
console.log(error)
logger.error(`[global]: error shutting down: ${error.message}`);
}
process.exit(Number(Boolean(error || thereWasAnExitError)));
});
if (socks_port) {
if (typeof(socks_port) === 'boolean') {
socks_port = 9050;
nconf.set('socksPort', socks_port);
}
control.createSOCKSServer(socks_port);
}
process.title = 'tor-router';
if (http_port) {
if (typeof(http_port) === 'boolean') {
http_port = 9080;
nconf.set('httpPort', http_port);
}
control.createHTTPServer(http_port);
}
process.on('SIGHUP', () => {
control.torPool.new_identites();
});
if (dns_port) {
if (typeof(dns_port) === 'boolean') {
dns_port = 9053;
nconf.set('dnsPort', dns_port);
}
control.createDNSServer(dns_port);
}
if (instances) {
logger.info(`[tor]: starting ${Array.isArray(instances) ? instances.length : instances} tor instances...`)
control.torPool.create(instances, (err) => {
logger.info('[tor]: tor started');
});
}
control.listen(control_port, () => {
logger.info(`[control]: Control Server listening on ${control_port}`);
})
function cleanUp(error) {
async.each(control.torPool.instances, (instance, next) => {
instance.exit(next);
}, (exitError) => {
if (error) {
console.error(error.stack);
}
if (exitError){
console.error(exitError);
}
process.exit(Number(Boolean(error || exitError)));
});
}
process.on('exit', cleanUp);
process.on('SIGINT', cleanUp);
process.on('uncaughtException', cleanUp);
process.on('uncaughtException', cleanUp.bind({ handleError: true }));
}
let argv_config = require('yargs')
.version(package_json.version)
.usage('Usage: tor-router [arguments]')
.options({
f: {
alias: 'config',
describe: 'Path to a config file to use',
demand: false
},
c: {
alias: 'controlPort',
describe: 'Port the control server will bind to [default: 9077]',
demand: false
// ,default: 9077
},
j: {
alias: 'instances',
describe: 'Number of instances using the default config',
demand: false
// ,default: 1
},
s: {
alias: 'socksPort',
describe: 'Port the SOCKS5 Proxy server will bind to',
demand: false,
// ,default: 9050
},
d: {
alias: 'dnsPort',
describe: 'Port the DNS Proxy server will bind to',
demand: false
},
h: {
alias: 'httpPort',
describe: 'Port the HTTP Proxy server will bind to',
demand: false
},
l: {
alias: 'logLevel',
describe: 'Controls the verbosity of console log output. Default level is "info". Set to "verbose" to see all network traffic logged or "null" to disable logging completely [default: info]',
demand: false
// ,default: "info"
},
p: {
alias: 'parentDataDirectory',
describe: 'Parent directory that will contain the data directories for the instances',
demand: false
},
b: {
alias: "loadBalanceMethod",
describe: 'Method that will be used to sort the instances between each request. Currently supports "round_robin" and "weighted". [default: round_robin]',
demand: false
},
t: {
alias: "torPath",
describe: "Provide the path for the Tor executable that will be used",
demand: false
}
});
let argv_config =
yargs
.version(package_json.version)
.usage('Usage: tor-router [arguments]')
.options({
f: {
alias: 'config',
describe: 'Path to a config file to use',
demand: false
},
c: {
alias: 'controlPort',
describe: 'Port the control server will bind to [default: 9077]',
demand: false
// ,default: 9077
},
j: {
alias: 'instances',
describe: 'Number of instances using the default config',
demand: false
// ,default: 1
},
s: {
alias: 'socksPort',
describe: 'Port the SOCKS5 Proxy server will bind to',
demand: false,
// ,default: 9050
},
d: {
alias: 'dnsPort',
describe: 'Port the DNS Proxy server will bind to',
demand: false
},
h: {
alias: 'httpPort',
describe: 'Port the HTTP Proxy server will bind to',
demand: false
},
l: {
alias: 'logLevel',
describe: 'Controls the verbosity of console log output. Default level is "info". Set to "verbose" to see all network traffic logged or "null" to disable logging completely [default: info]',
demand: false
// ,default: "info"
},
p: {
alias: 'parentDataDirectory',
describe: 'Parent directory that will contain the data directories for the instances',
demand: false
},
b: {
alias: "loadBalanceMethod",
describe: 'Method that will be used to sort the instances between each request. Currently supports "round_robin" and "weighted". [default: round_robin]',
demand: false
},
t: {
alias: "torPath",
describe: "Provide the path for the Tor executable that will be used",
demand: false
}
});
nconf = require(`${__dirname}/../src/nconf_load_env.js`)(nconf);
require(`${__dirname}/../src/nconf_load_env.js`)(nconf);
nconf
.argv(argv_config);