Skip to content

Instantly share code, notes, and snippets.

@athoune
Last active May 29, 2020 08:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save athoune/8190f92fe954563cf520225319019708 to your computer and use it in GitHub Desktop.
Save athoune/8190f92fe954563cf520225319019708 to your computer and use it in GitHub Desktop.
nia nia nia ssh public key
#!/usr/bin/env python3
import pwd
import sys
import re
import subprocess
import tempfile
import os
import io
from collections import namedtuple
import socket
ACCEPTED = re.compile(r"Accepted publickey for ([^ ]+) from ([^ ]+) port (\d+) ssh2: (\w+) ([^ ]+)")
OPTIONS = re.compile(r'^((([a-z\-]+="[^"]+")|([^ ]+))+) ')
Key = namedtuple('Key', ['type', 'key', 'options', 'comment'])
Fingerprint = namedtuple('Fingerprint', ['size', 'hash', 'type'])
def ssh_key(line):
KEYS = ['sk-ecdsa-sha2-nistp256@openssh.com', 'ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384',
'ecdsa-sha2-nistp521', 'sk-ssh-ed25519@openssh.com', 'ssh-ed25519', 'ssh-dss', 'ssh-rsa']
options = None
if not line.split(' ', 1)[0] in KEYS:
m = OPTIONS.match(line)
if m:
options = m.group(0)
line = line[len(options):]
slugs = line.strip().split(' ', 2)
assert slugs[0] in KEYS, slugs[0]
comment = None
if len(slugs) == 3:
comment = slugs[2]
return Key(slugs[0], slugs[1], options, comment)
def keys(user):
u = pwd.getpwnam(user)
with open(u.pw_dir + '/.ssh/authorized_keys', 'r') as f:
for line in f.readlines():
if line.strip() == '' or line.startswith('#'):
continue
k = ssh_key(line)
fd, tmp = tempfile.mkstemp()
f = open(fd, 'w')
f.write(line)
f.close()
with subprocess.Popen(['ssh-keygen', '-l', '-f', tmp], stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc:
rc = proc.wait()
if rc != 0:
raise Exception(line, proc.stderr.read())
r = proc.stdout.read()
rr = r.strip().decode('utf8').split(' ')
yield Fingerprint(int(rr[0]), rr[1], rr[-1][1:-1]), k.comment
class Keys:
def __init__(self):
self._users = dict()
def __call__(self, user, fingerprint):
if user not in self._users:
self._users[user] = dict()
for fp, comment in keys(user):
self._users[user][fp.hash] = comment
return self._users[user][fingerprint]
class Abuse:
def __init__(self):
self._abuses = dict()
def __call__(self, ip):
if ip not in self._abuses:
with subprocess.Popen(['whois', '-b', ip], stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc:
rc = proc.wait()
if rc != 0:
raise Exception(ip, proc.stderr.read())
for line in proc.stdout:
line = line.decode('utf8')
if line.startswith("abuse-mailbox"):
self._abuses[ip] = re.compile(r"\s+").split(line[:-1])[-1]
return self._abuses.get(ip)
def read_auth_log():
keys = Keys()
abuse = Abuse()
for line in sys.stdin.readlines():
slugs = line[:-1].split(" ", 5)
if not slugs[4].startswith("sshd"):
continue
comment = slugs[5]
if not comment.startswith("Accepted publickey"):
continue
m = ACCEPTED.match(comment)
user = m.group(1)
fp = m.group(5)
print(slugs)
#print(m.group(2))
name = None
try:
name, alias, addresslist = socket.gethostbyaddr(m.group(2))
except Exception as e:
#print(m.group(2), e)
name = abuse(m.group(2))
try:
print(keys(user, fp), name)
except Exception as e:
print("aaaaaaah", e)
if __name__ == "__main__":
read_auth_log()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment