Skip to content

Instantly share code, notes, and snippets.

@fqrouter fqrouter/janus.py
Created Mar 4, 2013

Embed
What would you like to do?
https://github.com/evilaliv3/janus implemented in python for android
import subprocess
import shlex
import atexit
import signal
import dpkt
import contextlib
import binascii
from scapy.all import *
LOGGER = logging.getLogger()
GATEWAY_FAKE_MAC = '12:34:56:78:9a:bc'
def main():
outbound_interface, my_ip, gateway_ip = conf.route.route('8.8.8.8')
gateway_real_mac = query_mac_from_arp_cache(gateway_ip)
LOGGER.info('gateway found at %s, %s' % (gateway_ip, gateway_real_mac))
setup_fake_mac_in_arp_cache(gateway_ip, gateway_real_mac, GATEWAY_FAKE_MAC)
setup_iptables_dropping_input(gateway_real_mac)
raw_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
raw_socket.setsockopt(socket.SOL_IP, socket.IP_HDRINCL, 1)
with contextlib.closing(raw_socket):
with contextlib.closing(conf.L2socket(iface='wlan0')) as send_socket:
with contextlib.closing(conf.L2listen(iface='wlan0')) as receive_socket:
while True:
sniff(raw_socket, send_socket.outs, receive_socket.ins, gateway_real_mac)
def sniff(raw_socket, send_socket, receive_socket, gateway_real_mac):
bytes, sa_ll = receive_socket.recvfrom(1500)
ethernet_frame = dpkt.ethernet.Ethernet(bytes)
is_ip_packet = isinstance(ethernet_frame.data, dpkt.ip.IP)
if is_ip_packet:
ethernet_frame.data.sum = 0
ethernet_frame.data.data.sum = 0
if ethernet_frame.dst == binascii.unhexlify(GATEWAY_FAKE_MAC.replace(':', '')):
inject_outgoing(send_socket, ethernet_frame, gateway_real_mac)
elif ethernet_frame.src == binascii.unhexlify(gateway_real_mac.replace(':', '')) and is_ip_packet:
inject_incoming(raw_socket, ethernet_frame.data)
def inject_outgoing(send_socket, ethernet_frame, gateway_real_mac):
ethernet_frame.dst = binascii.unhexlify(gateway_real_mac.replace(':', ''))
send_socket.send(str(ethernet_frame))
def inject_incoming(raw_socket, ip_packet):
print(repr(ip_packet))
raw_socket.sendto(str(ip_packet), (socket.inet_ntoa(ip_packet.dst), 0))
def setup_fake_mac_in_arp_cache(ip, real_mac, fake_mac):
current_mac = query_mac_from_arp_cache(ip)
if current_mac != real_mac:
raise Exception('expected mac for %s is %s, actuall is %s' % (ip, real_mac, current_mac))
def clean_up():
current_mac = query_mac_from_arp_cache(ip)
if current_mac != fake_mac:
LOGGER.warn('expected mac for %s is %s, actuall is %s' % (ip, fake_mac, current_mac))
change_arp_cache(ip, real_mac)
atexit.register(clean_up)
signal.signal(signal.SIGTERM, lambda *args: clean_up())
change_arp_cache(ip, fake_mac)
def query_mac_from_arp_cache(ip):
output = subprocess.check_output(shlex.split('arp -a %s -n' % ip))
return output.split(' ')[3]
def change_arp_cache(ip, mac):
LOGGER.info('change arp cache %s, %s' % (ip, mac))
subprocess.check_call(shlex.split('arp -d %s' % ip))
subprocess.check_call(shlex.split('arp -s %s %s' % (ip, mac)))
def setup_iptables_dropping_input(gateway_real_mac):
def clean_up():
subprocess.check_call(
shlex.split('iptables -D INPUT -i wlan0 -m mac --mac-source %s -j DROP' % gateway_real_mac))
subprocess.check_call(
shlex.split('iptables -D FORWARD -i wlan0 -m mac --mac-source %s -j DROP' % gateway_real_mac))
atexit.register(clean_up)
signal.signal(signal.SIGTERM, lambda *args: clean_up())
subprocess.check_call(shlex.split('iptables -I INPUT -i wlan0 -m mac --mac-source %s -j DROP' % gateway_real_mac))
subprocess.check_call(shlex.split('iptables -I FORWARD -i wlan0 -m mac --mac-source %s -j DROP' % gateway_real_mac))
if '__main__' == __name__:
logging.basicConfig(level=logging.INFO)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.