Created
May 24, 2021 11:56
-
-
Save wtnb75/9d8cb5d16f1fbc2697f54384416d6560 to your computer and use it in GitHub Desktop.
guess x86 architecture from binary
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import click | |
import requests | |
import functools | |
import subprocess | |
import re | |
import os | |
import tempfile | |
import tarfile | |
import zipfile | |
from logging import getLogger | |
_log = getLogger(__name__) | |
@click.group(invoke_without_command=True) | |
@click.pass_context | |
@click.version_option(version="0.1", prog_name="guess-arch") | |
def cli(ctx): | |
if ctx.invoked_subcommand is None: | |
print(ctx.get_help()) | |
def set_verbose(flag): | |
from logging import basicConfig, DEBUG, INFO | |
fmt = '%(asctime)s %(levelname)s %(message)s' | |
if flag: | |
basicConfig(level=DEBUG, format=fmt) | |
else: | |
basicConfig(level=INFO, format=fmt) | |
_common_option = [ | |
click.option("--verbose/--no-verbose", default=False, show_default=True), | |
] | |
def common_option(decs): | |
def deco(f): | |
for dec in reversed(decs): | |
f = dec(f) | |
return f | |
return deco | |
def cli_option(func): | |
@functools.wraps(func) | |
def wrap(verbose, *args, **kwargs): | |
set_verbose(verbose) | |
return func(*args, **kwargs) | |
return common_option(_common_option)(wrap) | |
class GuessArch: | |
objdump_bin = "objdump" | |
regexp = r"\s*(?P<address>[0-9a-f]*):\s*" + \ | |
r"(?P<binary>([0-9a-f]{2}\s)+)\s*" + \ | |
r"(?P<opcode>[a-z][a-z0-9]+)\s+" + \ | |
r"(?P<operand>[%,a-z0-9,\)\( ]*)" + \ | |
r"(?P<reference><[^>]+>)?" | |
r"(?P<comment>\s*#.*)?$" | |
def __init__(self, objdump_bin=None, regexp=None): | |
if objdump_bin: | |
self.objdump_bin = objdump_bin | |
if regexp: | |
self.regexp = regexp | |
def parse(self, binpath): | |
regexp = re.compile(self.regexp) | |
cmd = [self.objdump_bin, "-d", binpath] | |
with subprocess.Popen( | |
cmd, text=True, | |
stdin=subprocess.DEVNULL, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.DEVNULL) as pipe: | |
for line in pipe.stdout: | |
m = regexp.match(line) | |
if m is None: | |
continue | |
data = m.groupdict() | |
for k, v in data.items(): | |
if v: | |
data[k] = v.strip() | |
yield data | |
class ExecInArchive: | |
def __init__(self, fname, type="tar"): | |
self.type = type | |
if type == "tar": | |
self.tf = tarfile.open(fname, mode="r:gz") | |
elif type == "zip": | |
self.zf = zipfile.ZipFile(fname, mode="r") | |
def parse_all_tar(self, pattern): | |
regexp = re.compile(pattern) | |
ga = GuessArch() | |
while True: | |
ti = self.tf.next() | |
if ti is None: | |
break | |
if not ti.isreg(): | |
continue | |
if not regexp.search(ti.name): | |
continue | |
bin = self.tf.extractfile(ti).read() | |
with tempfile.NamedTemporaryFile("wb") as tf: | |
tf.write(bin) | |
tf.flush() | |
opcodes = set() | |
for i in ga.parse(tf.name): | |
opcodes.add(i.get("opcode")) | |
yield ti.name, opcodes | |
def parse_all_zip(self, pattern): | |
regexp = re.compile(pattern) | |
ga = GuessArch() | |
for zi in self.zf.filelist(): | |
if zi.is_dir(): | |
continue | |
if not regexp.search(zi.filename): | |
continue | |
bin = self.zf.read(zi) | |
with tempfile.NamedTemporaryFile("wb") as tf: | |
tf.write(bin) | |
tf.flush() | |
opcodes = set() | |
for i in ga.parse(tf.name): | |
opcodes.add(i.get("opcode")) | |
yield zi.filename, opcodes | |
def parse_all(self, pattern=r"(bin/|lib/lib.*.so)"): | |
if self.type == "tar": | |
return self.parse_all_tar(pattern) | |
elif self.type == "zip": | |
return self.parse_all_zip(pattern) | |
class Opcode2Arch: | |
def __init__(self, opcode_table={}): | |
self.opcode_table = opcode_table | |
@classmethod | |
def read_opcode_table(cls, input): | |
import csv | |
res = {} | |
rd = csv.reader(input) | |
for row in rd: | |
assert len(row) == 2 | |
opcode, optypes = row | |
if opcode not in res: | |
res[opcode] = set() | |
res[opcode].update(optypes.split()) | |
return res | |
def convert(self, opcodes): | |
res = set() | |
for i in opcodes: | |
if i in self.opcode_table: | |
res.update(self.opcode_table[i]) | |
return res | |
@cli.command() | |
@cli_option | |
@click.argument("input", type=click.Path(exists=True, readable=True, file_okay=True, dir_okay=False)) | |
def parse_tar(input): | |
eia = ExecInArchive(input) | |
for name, opcodes in eia.parse_all(): | |
if len(opcodes) != 0: | |
print(name, opcodes) | |
@cli.command() | |
@cli_option | |
@click.argument("input", type=click.Path(exists=True, readable=True, file_okay=True, dir_okay=False)) | |
@click.argument("opcode-table", type=click.File('r')) | |
def parse_tar_arch(input, opcode_table): | |
op2a = Opcode2Arch(Opcode2Arch.read_opcode_table(opcode_table)) | |
eia = ExecInArchive(input) | |
for name, opcodes in eia.parse_all(): | |
if len(opcodes) != 0: | |
print(name, op2a.convert(opcodes)) | |
@cli.command() | |
@cli_option | |
@click.argument("input", type=click.Path(exists=True, readable=True, file_okay=True, dir_okay=False)) | |
def parse_bin(input): | |
ga = GuessArch() | |
print(input, {x.get("opcode") for x in ga.parse(input)}) | |
@cli.command() | |
@cli_option | |
@click.argument("image-name") | |
def parse_docker_image(image_name): | |
with tempfile.NamedTemporaryFile("r+") as tf: | |
cmd = ["docker", "export", "-o", tf.name, image_name] | |
subprocess.check_call(cmd) | |
eia = ExecInArchive(tf.name) | |
for name, opcodes in eia.parse_all(): | |
if len(opcodes) != 0: | |
print(name, opcodes) | |
table_url = "https://sourceware.org/git/?p=binutils-gdb.git;a=blob_plain;f=opcodes/i386-opc.tbl" | |
@cli.command() | |
@cli_option | |
@click.option("--url", type=str, default=table_url) | |
def download_table(url): | |
outfile = os.path.basename(url) | |
if os.path.exists(outfile): | |
_log.info("file exists: %s", outfile) | |
return | |
with open(os.path.basename(url), "w") as ofp: | |
ofp.write(requests.get(url).content) | |
@cli.command() | |
@cli_option | |
@click.argument("input", type=click.File("r")) | |
def create_table(input): | |
def conv(s): | |
convmap = {} | |
if s == "0": | |
return "x86" | |
if s.startswith("Cpu"): | |
return s[3:] | |
m = re.match("<(?P<cputype>[0-9a-z]*):cpu>", s) | |
if m is not None: | |
return m.group("cputype").upper() | |
return convmap.get(s, s) | |
def convop(s): | |
if s.startswith("<"): | |
return None | |
if s.startswith("{"): | |
return None | |
s = s.split("<")[0] | |
if ":" in s: | |
return None | |
return s | |
out = {} | |
for line in input: | |
line = line.strip() | |
if line.startswith("//") or line.startswith("#"): | |
continue | |
if len(line) == 0: | |
continue | |
if "," not in line: | |
continue | |
ls = [x.strip() for x in line.split(",")] | |
if len(ls) < 4: | |
continue | |
opcode = convop(ls[0]) | |
if opcode is None: | |
continue | |
if opcode not in out: | |
out[opcode] = set() | |
cpuset = [conv(x.strip()) for x in ls[3].split("|")] | |
out[opcode].update(cpuset) | |
for k, v in out.items(): | |
click.echo(f"{k},{' '.join(v)}") | |
@cli.command() | |
@cli_option | |
@click.argument("input", type=click.File("r")) | |
def read_table(input): | |
print(Opcode2Arch.read_opcode_table(input)) | |
if __name__ == "__main__": | |
cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment