Add files via upload

This commit is contained in:
halil ibrahim deniz 2023-12-06 01:34:47 +03:00 committed by GitHub
parent fb4a8f7ca2
commit d3b99f2911
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 177 additions and 0 deletions

0
privacynet/__init__.py Normal file
View File

177
privacynet/privacynet.py Normal file
View File

@ -0,0 +1,177 @@
from subprocess import call, check_call, CalledProcessError
from os.path import isfile, basename
from os import devnull
from sys import exit, stdout, stderr
from atexit import register
from argparse import ArgumentParser
from json import load
from urllib.request import urlopen
from urllib.error import URLError
from time import sleep
import subprocess
import requests
class TorIptables(object):
def __init__(self):
self.local_dnsport = "53" # DNSPort
self.virtual_net = "10.0.0.0/10" # VirtualAddrNetwork
self.local_loopback = "127.0.0.1" # Local loopback
self.non_tor_net = ["192.168.0.0/16", "172.16.0.0/12"]
self.non_tor = ["127.0.0.0/9", "127.128.0.0/10", "127.0.0.0/8"]
self.tor_uid = subprocess.getoutput("id -ur debian-tor") # Tor user uid
self.trans_port = "9040" # Tor port
self.tor_config_file = '/etc/tor/torrc'
self.torrc = r'''
## Inserted by %s for tor iptables rules set
## Transparently route all traffic thru tor on port %s
VirtualAddrNetwork %s
AutomapHostsOnResolve 1
TransPort %s
DNSPort %s
''' % (basename(__file__), self.trans_port, self.virtual_net, self.trans_port, self.local_dnsport)
self.log_file = "../privacynet.log" # Günlük dosyasının adı
self.log = open(self.log_file, "a") # Günlük dosyasını oluşturmak ve açmak
def __del__(self):
if self.log:
self.log.close() # Program sonlandığında günlük dosyasını kapat
def write_log(self, message):
if self.log:
self.log.write(message + "\n") # Günlük dosyasına yaz
def flush_iptables_rules(self):
call(["iptables", "-F"])
call(["iptables", "-t", "nat", "-F"])
self.write_log("[+] Flushed iptables rules") # Günlük
def load_iptables_rules(self):
self.flush_iptables_rules()
self.non_tor.extend(self.non_tor_net)
@register
def restart_tor():
fnull = open(devnull, 'w')
try:
tor_restart = check_call(
["service", "tor", "restart"],
stdout=fnull, stderr=fnull)
if tor_restart == 0:
print(" {0}".format(
"[+] Anonymizer status [ON]"))
self.get_ip()
except CalledProcessError as err:
print("[!] Command failed: %s" % ' '.join(err.cmd))
# See https://trac.torproject.org/projects/tor/wiki/doc/TransparentProxy#WARNING
# See https://lists.torproject.org/pipermail/tor-talk/2014-March/032503.html
call(["iptables", "-I", "OUTPUT", "!", "-o", "lo", "!", "-d", self.local_loopback, "!", "-s", self.local_loopback, "-p", "tcp", "-m", "tcp", "--tcp-flags", "ACK,FIN", "ACK,FIN", "-j", "DROP"])
call(["iptables", "-I", "OUTPUT", "!", "-o", "lo", "!", "-d", self.local_loopback, "!", "-s", self.local_loopback, "-p", "tcp", "-m", "tcp", "--tcp-flags", "ACK,RST", "ACK,RST", "-j", "DROP"])
call(["iptables", "-t", "nat", "-A", "OUTPUT", "-m", "owner", "--uid-owner", "%s" % self.tor_uid, "-j", "RETURN"])
call(["iptables", "-t", "nat", "-A", "OUTPUT", "-p", "udp", "--dport", self.local_dnsport, "-j", "REDIRECT", "--to-ports", self.local_dnsport])
for net in self.non_tor:
call(["iptables", "-t", "nat", "-A", "OUTPUT", "-d", "%s" % net, "-j", "RETURN"])
call(["iptables", "-t", "nat", "-A", "OUTPUT", "-p", "tcp", "--syn", "-j", "REDIRECT", "--to-ports", "%s" % self.trans_port])
call(["iptables", "-A", "OUTPUT", "-m", "state", "--state", "ESTABLISHED,RELATED", "-j", "ACCEPT"])
for net in self.non_tor:
call(["iptables", "-A", "OUTPUT", "-d", "%s" % net, "-j", "ACCEPT"])
call(["iptables", "-A", "OUTPUT", "-m", "owner", "--uid-owner", "%s" % self.tor_uid, "-j", "ACCEPT"])
call(["iptables", "-A", "OUTPUT", "-j", "REJECT"])
self.write_log("[+] Loaded iptables rules") # Günlük
def geolocate_ip(self, ip):
try:
response = requests.get(f"http://ip-api.com/json/{ip}")
data = response.json()
country = data["country"]
city = data["city"]
return country, city
except Exception as e:
print(f"Error geolocating IP: {e}")
return None, None
def get_ip(self):
print(" [\033[92m*\033[0m] \033[93mGetting public IP, please wait...\033[0m")
retries = 0
my_public_ip = None
while retries < 12 and not my_public_ip:
retries += 1
try:
my_public_ip = load(urlopen('https://check.torproject.org/api/ip'))['IP']
except URLError:
sleep(5)
print(" [\033[93m?\033[0m] Still waiting for IP address...")
except ValueError:
break
if not my_public_ip:
my_public_ip = subprocess.getoutput('wget -qO - ifconfig.me')
if not my_public_ip:
exit(" \033[91m[!]\033[0m Can't get public ip address!")
country, city = self.geolocate_ip(my_public_ip)
if country and city:
print(" {0}".format("[\033[92m+\033[0m] Your IP is \033[92m%s\033[0m" % my_public_ip))
print(" {0}".format("[\033[92m+\033[0m] Country: \033[92m%s\033[0m" % country))
print(" {0}".format("[\033[92m+\033[0m] City: \033[92m%s\033[0m" % city))
self.write_log(f"[+] Your IP is {my_public_ip}\n[+] Country: {country}\n[+] City: {city}") # Günlük
else:
print(" {0}".format("[\033[92m+\033[0m] Your IP is \033[92m%s\033[0m" % my_public_ip))
print(" {0}".format("[\033[93m!\033[0m] Error geolocating IP"))
self.write_log(f"[+] Your IP is {my_public_ip}\n[!] Error geolocating IP") # Günlük
def change_ip(self):
call(['kill', '-HUP', '%s' % subprocess.getoutput('pidof tor')])
self.get_ip()
if __name__ == '__main__':
parser = ArgumentParser(
description=
'PrivacyNet is an anonymization source for loading and unloading iptables rules')
parser.add_argument('-l', '--load', action='store_true', help='This option will load tor iptables rules')
parser.add_argument('-f', '--flush', action='store_true', help='This option flushes the iptables rules to default')
parser.add_argument('-r', '--refresh', action='store_true', help='This option will change the circuit and gives new IP')
parser.add_argument('-i', '--ip', action='store_true', help='This option will output the current public IP address')
parser.add_argument('-a', '--auto', action='store_true', help='This option enables automatic IP change every X seconds')
parser.add_argument('-t', '--interval', type=int, default=3600, help='Interval for automatic IP change in seconds (default: 3600)')
args = parser.parse_args()
try:
privacy_net = TorIptables()
if isfile(privacy_net.tor_config_file):
if not 'VirtualAddrNetwork' in open(privacy_net.tor_config_file).read():
with open(privacy_net.tor_config_file, 'a+') as torrconf:
torrconf.write(privacy_net.torrc)
if args.load:
privacy_net.load_iptables_rules()
elif args.flush:
privacy_net.flush_iptables_rules()
print(" {0}".format("[\033[93m!\033[0m] Anonymizer status \033[91m[OFF]\033[0m"))
privacy_net.write_log("[!] Anonymizer status [OFF]") # Günlük
elif args.ip:
privacy_net.get_ip()
elif args.refresh:
privacy_net.change_ip()
elif args.auto:
interval = args.interval
try:
while True:
privacy_net.change_ip()
print(" {0}".format("[\033[92m*\033[0m] IP changed successfully\n"))
sleep(interval)
except KeyboardInterrupt:
print("\n[\033[91m!\033[0m] Program terminated by user")
else:
parser.print_help()
except Exception as err:
print(f"[!] Run as super user: {err[1]}")
privacy_net.write_log(f"[!] Run as super user: {err[1]}") # Günlük