Uses bluebird for Promises. All functions in TorProcess return Promises instead of callbacks

This commit is contained in:
Zachary Boyd 2018-09-09 00:44:39 -04:00
parent 723a227f26
commit d1265253d2
4 changed files with 146 additions and 148 deletions

5
package-lock.json generated
View file

@ -105,6 +105,11 @@
"tweetnacl": "^0.14.3" "tweetnacl": "^0.14.3"
} }
}, },
"bluebird": {
"version": "3.5.2",
"resolved": "https://registry.npmjs.org/bluebird/-/bluebird-3.5.2.tgz",
"integrity": "sha512-dhHTWMI7kMx5whMQntl7Vr9C6BvV10lFXDAasnqnrMYhXVCzzk6IO9Fo2L75jXHT07WrOngL1WDXOp+yYS91Yg=="
},
"boolbase": { "boolbase": {
"version": "1.0.0", "version": "1.0.0",
"resolved": "https://registry.npmjs.org/boolbase/-/boolbase-1.0.0.tgz", "resolved": "https://registry.npmjs.org/boolbase/-/boolbase-1.0.0.tgz",

View file

@ -20,6 +20,7 @@
}, },
"dependencies": { "dependencies": {
"async": "^2.1.4", "async": "^2.1.4",
"bluebird": "^3.5.2",
"del": "^3.0.0", "del": "^3.0.0",
"eventemitter3": "^3.1.0", "eventemitter3": "^3.1.0",
"get-port": "^2.1.0", "get-port": "^2.1.0",

View file

@ -17,15 +17,18 @@ Array.prototype.rotate = (function() {
}; };
})(); })();
const { EventEmitter } = require('eventemitter3');
const async = require('async');
const TorProcess = require('./TorProcess');
const _ = require('lodash');
const path = require('path'); const path = require('path');
const nanoid = require('nanoid');
const fs = require('fs'); const fs = require('fs');
const { EventEmitter } = require('eventemitter3');
const Promise = require("bluebird");
const _ = require('lodash');
const nanoid = require('nanoid');
const WeightedList = require('js-weighted-list'); const WeightedList = require('js-weighted-list');
const TorProcess = require('./TorProcess');
class TorPool extends EventEmitter { class TorPool extends EventEmitter {
constructor(tor_path, default_config, data_directory, load_balance_method, granax_options, logger) { constructor(tor_path, default_config, data_directory, load_balance_method, granax_options, logger) {
super(); super();
@ -80,7 +83,7 @@ class TorPool extends EventEmitter {
this._instances.push(instance); this._instances.push(instance);
return new Promise((resolve, reject) => { return await new Promise((resolve, reject) => {
instance.once('error', reject); instance.once('error', reject);
instance.once('ready', () => { instance.once('ready', () => {

View file

@ -1,16 +1,22 @@
const spawn = require('child_process').spawn; const spawn = require('child_process').spawn;
const _ = require('lodash');
const async = require('async');
const fs = require('fs'); const fs = require('fs');
const crypto = require('crypto');
const { connect } = require('net');
const os = require('os');
const Promise = require('bluebird');
const _ = require('lodash');
const { EventEmitter } = require('eventemitter3');
const shell = require('shelljs');
const getPort = require('get-port'); const getPort = require('get-port');
const del = require('del'); const del = require('del');
const { EventEmitter } = require('eventemitter3');
const temp = require('temp'); const temp = require('temp');
const { TorController } = require('granax'); const { TorController } = require('granax');
const { connect } = require('net');
const shell = require('shelljs'); Promise.promisifyAll(temp);
const crypto = require('crypto'); Promise.promisifyAll(fs);
const os = require('os');
temp.track(); temp.track();
class TorProcess extends EventEmitter { class TorProcess extends EventEmitter {
@ -26,11 +32,15 @@ class TorProcess extends EventEmitter {
this.tor_config = config; this.tor_config = config;
} }
exit(callback) { async exit() {
this.once('process_exit', (code) => { let p = new Promise((resolve, reject) => {
callback && callback(null, code); this.once('process_exit', (code) => {
resolve();
});
}); });
this.process.kill('SIGINT'); this.process.kill('SIGINT');
await p;
} }
@ -56,156 +66,135 @@ class TorProcess extends EventEmitter {
/* Passthrough to granax */ /* Passthrough to granax */
new_identity(callback) { async new_identity() {
this.logger.info(`[tor-${this.instance_name}]: requested a new identity`); this.logger.info(`[tor-${this.instance_name}]: requested a new identity`);
this.controller.cleanCircuits(callback || (() => {}));
await this.controller.cleanCircuitsAsync();
} }
get_config(keyword, callback) { async get_config(keyword) {
if (!this.controller)
throw new Error(`Controller is not connected`);
return await this.controller.getConfigAsync(keyword);
}
async set_config(keyword, value) {
if (!this.controller) { if (!this.controller) {
return callback(new Error(`Controller is not connected`)); return new Error(`Controller is not connected`);
} }
return this.controller.getConfig(keyword, callback); return await this.controller.setConfigAsync(keyword, value);
} }
set_config(keyword, value, callback) { async signal(signal) {
if (!this.controller) { if (!this.controller) {
return callback(new Error(`Controller is not connected`)); throw new Error(`Controller is not connected`);
} }
return this.controller.setConfig(keyword, value, callback); return await this.controller.signal(signal);
} }
signal(signal, callback) { async create() {
if (!this.controller) { let dnsPort = this._dns_port = await getPort();
return callback(new Error(`Controller is not connected`)); let socksPort = this._socks_port = await getPort();
} let controlPort = this._control_port = await getPort();
return this.controller.signal(signal, callback); let options = {
} DNSPort: `127.0.0.1:${context.dnsPort}`,
SocksPort: `127.0.0.1:${context.socksPort}`,
ControlPort: `127.0.0.1:${context.controlPort}`,
HashedControlPassword: shell.exec(`${this.tor_path} --quiet --hash-password "${this.control_password}"`, { async: false, silent: true }).stdout.trim()
};
/* Begin Deprecated */ let config = _.extend(_.extend({}, this.tor_config), options);
let text = Object.keys(config).map((key) => `${key} ${config[key]}`).join(os.EOL);
new_ip(callback) { let configFilePath = await temp.openAsync('tor-router').path;
this.logger.warn(`TorProcess.new_ip is deprecated, use TorProcess.new_identity`); fs.writeFileAsync(configFilePath, text);
return this.new_identity(callback);
}
/* End Deprecated */ let tor = spawn(this.tor_path, ['-f', context.configPath], {
stdio: ['ignore', 'pipe', 'pipe'],
create(callback) { detached: false
async.auto({
dnsPort: (callback) => getPort().then(port => callback(null, port)),
socksPort: (callback) => getPort().then(port => callback(null, port)),
controlPort: (callback) => getPort().then(port => callback(null, port)),
configPath: ['dnsPort', 'socksPort', 'controlPort', (context, callback) => {
let options = {
DNSPort: `127.0.0.1:${context.dnsPort}`,
SocksPort: `127.0.0.1:${context.socksPort}`,
ControlPort: `127.0.0.1:${context.controlPort}`,
HashedControlPassword: shell.exec(`${this.tor_path} --quiet --hash-password "${this.control_password}"`, { async: false, silent: true }).stdout.trim()
};
let config = _.extend(_.extend({}, this.tor_config), options);
let text = Object.keys(config).map((key) => `${key} ${config[key]}`).join(os.EOL);
temp.open('tor-router', (err, info) => {
if (err) return callback(err);
fs.writeFile(info.path, text, (err) => {
callback(err, info.path);
});
});
}]
}, (error, context) => {
if (error)
return callback && callback(error);
this._dns_port = context.dnsPort;
this._socks_port = context.socksPort;
this._control_port = context.controlPort;
let tor = spawn(this.tor_path, ['-f', context.configPath], {
stdio: ['ignore', 'pipe', 'pipe'],
detached: false
});
tor.on('close', (code) => {
this.emit('process_exit', code);
if (this.definition && !this.definition.Name) {
del.sync(this.tor_config.DataDirectory, { force: true });
}
});
tor.stderr.on('data', (data) => {
let error_message = Buffer.from(data).toString('utf8');
this.emit('error', new Error(error_message));
});
this.once('ready', () => {
this.ready = true;
this.logger.info(`[tor-${this.instance_name}]: tor is ready`);
});
this.on('control_listen', () => {
this._controller = new TorController(connect(this._control_port), _.extend({ authOnConnect: false }, this.granax_options));
this.controller.on('ready', () => {
this.logger.debug(`[tor-${this.instance_name}]: connected to tor control port`);
this.controller.authenticate(`"${this.control_password}"`, (err) => {
if (err) {
this.logger.error(`[tor-${this.instance_name}]: ${err.stack}`);
this.emit('error', err);
} else {
this.logger.debug(`[tor-${this.instance_name}]: authenticated with tor instance via the control port`);
this.control_port_connected = true;
this.emit('controller_ready');
}
});
});
});
tor.stdout.on('data', (data) => {
let text = Buffer.from(data).toString('utf8');
let msg = text.split('] ').pop();
if (text.indexOf('Bootstrapped 100%: Done') !== -1){
this.bootstrapped = true;
this.emit('ready');
}
if (text.indexOf('Opening Control listener on') !== -1) {
this.control_port_listening = true;
this.emit('control_listen');
}
if (text.indexOf('Opening Socks listener on') !== -1) {
this.socks_port_listening = true;
this.emit('socks_listen');
}
if (text.indexOf('Opening DNS listener on') !== -1) {
this.dns_port_listening = true;
this.emit('dns_listen');
}
if (text.indexOf('[err]') !== -1) {
this.emit('error', new Error(msg));
this.logger.error(`[tor-${this.instance_name}]: ${msg}`);
}
else if (text.indexOf('[notice]') !== -1) {
this.logger.debug(`[tor-${this.instance_name}]: ${msg}`);
}
else if (text.indexOf('[warn]') !== -1) {
this.logger.warn(`[tor-${this.instance_name}]: ${msg}`);
}
});
this.process = tor;
callback && callback(null, tor);
}); });
tor.on('close', (code) => {
this.emit('process_exit', code);
if (this.definition && !this.definition.Name) {
del.sync(this.tor_config.DataDirectory, { force: true });
}
});
tor.stderr.on('data', (data) => {
let error_message = Buffer.from(data).toString('utf8');
this.emit('error', new Error(error_message));
});
this.once('ready', () => {
this.ready = true;
this.logger.info(`[tor-${this.instance_name}]: tor is ready`);
});
this.on('control_listen', () => {
this._controller = new TorController(connect(this._control_port), _.extend({ authOnConnect: false }, this.granax_options));
Promise.promisifyAll(this._controller);
this.controller.on('ready', () => {
this.logger.debug(`[tor-${this.instance_name}]: connected to tor control port`);
this.controller.authenticate(`"${this.control_password}"`, (err) => {
if (err) {
this.logger.error(`[tor-${this.instance_name}]: ${err.stack}`);
this.emit('error', err);
} else {
this.logger.debug(`[tor-${this.instance_name}]: authenticated with tor instance via the control port`);
this.control_port_connected = true;
this.emit('controller_ready');
}
});
});
});
tor.stdout.on('data', (data) => {
let text = Buffer.from(data).toString('utf8');
let msg = text.split('] ').pop();
if (text.indexOf('Bootstrapped 100%: Done') !== -1){
this.bootstrapped = true;
this.emit('ready');
}
if (text.indexOf('Opening Control listener on') !== -1) {
this.control_port_listening = true;
this.emit('control_listen');
}
if (text.indexOf('Opening Socks listener on') !== -1) {
this.socks_port_listening = true;
this.emit('socks_listen');
}
if (text.indexOf('Opening DNS listener on') !== -1) {
this.dns_port_listening = true;
this.emit('dns_listen');
}
if (text.indexOf('[err]') !== -1) {
this.emit('error', new Error(msg));
this.logger.error(`[tor-${this.instance_name}]: ${msg}`);
}
else if (text.indexOf('[notice]') !== -1) {
this.logger.debug(`[tor-${this.instance_name}]: ${msg}`);
}
else if (text.indexOf('[warn]') !== -1) {
this.logger.warn(`[tor-${this.instance_name}]: ${msg}`);
}
});
this.process = tor;
return tor;
} }
}; };