Skip to content

Instantly share code, notes, and snippets.

@williballenthin
Created August 26, 2021 18:06
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save williballenthin/8566a1043532479c51ebb22a5b590db1 to your computer and use it in GitHub Desktop.
Save williballenthin/8566a1043532479c51ebb22a5b590db1 to your computer and use it in GitHub Desktop.
compare vivisect analysis comparison across versions
#!/usr/bin/env python3
'''
compare vivisect analysis comparison across versions.
pip install devtools[pygments] pydantic viv-utils termcolor
'''
import sys
import time
import os.path
import logging
import argparse
from typing import List, Literal, Optional, Union
import viv_utils
import termcolor
from devtools import debug
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
class MemoryMapEntry(BaseModel):
address: int
size: int
def __str__(self):
return "0x%08X 0x%06X" % (self.address, self.size)
MemoryMap = List[MemoryMapEntry]
class CoverageResult(BaseModel):
version: str
status: Union[Literal["ok"], str]
duration: float
# all following fields are present only if status == "ok"
memory_map: Optional[MemoryMap]
discovered_count: Optional[int]
undiscovered_count: Optional[int]
xref_count: Optional[int]
location_count: Optional[int]
function_count: Optional[int]
block_count: Optional[int]
instruction_count: Optional[int]
unicode_count: Optional[int]
ascii_count: Optional[int]
number_count: Optional[int]
pointer_count: Optional[int]
vtable_count: Optional[int]
import_count: Optional[int]
export_count: Optional[int]
class Spec(BaseModel):
sample: str
results: List[CoverageResult] = Field(default_factory=list)
def resolve_sample_path(self, spec_path):
return os.path.normpath(os.path.join(os.path.dirname(spec_path), self.sample))
def tuple_get(t, index, default=None):
if len(t) > index:
return t[index]
else:
return default
def compute_coverage_result(version, vw, duration, status):
info = vw.getDiscoveredInfo()
return CoverageResult(
version=version,
status=status,
duration=duration,
memory_map=[MemoryMapEntry(address=va, size=size) for va, size, _, _ in vw.getMemoryMaps()],
discovered_count=tuple_get(info, 0, 0),
undiscovered_count=tuple_get(info, 1, 0),
xref_count=tuple_get(info, 2, 0),
location_count=tuple_get(info, 3, 0),
function_count=tuple_get(info, 4, 0),
block_count=tuple_get(info, 5, 0),
instruction_count=tuple_get(info, 6, 0),
unicode_count=tuple_get(info, 7, 0),
ascii_count=tuple_get(info, 8, 0),
number_count=tuple_get(info, 9, 0),
pointer_count=tuple_get(info, 10, 0),
vtable_count=tuple_get(info, 11, 0),
import_count=len(vw.getImports()),
export_count=len(vw.getExports()),
)
def blue(s: str) -> str:
return termcolor.colored(s, "blue")
def red(s: str) -> str:
return termcolor.colored(s, "red")
def green(s: str) -> str:
return termcolor.colored(s, "green")
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="A program.")
parser.add_argument("-v", "--verbose", action="store_true",
help="Enable debug logging")
parser.add_argument("-q", "--quiet", action="store_true",
help="Disable all output but errors")
action = parser.add_subparsers(dest="action")
action_create = action.add_parser("create", help="create a new spec")
action_create.add_argument("spec", type=str, help="Path to spec file")
action_create.add_argument("sample", type=str, help="Path to sample")
action_record = action.add_parser("record", help="record results for a version")
action_record.add_argument("spec", type=str, help="Path to spec file")
action_record.add_argument("version", type=str, help="Name of version to record")
action_display = action.add_parser("display", help="display results across versions")
action_display.add_argument("spec", type=str, help="Path to spec file")
action_clear= action.add_parser("clear", help="clear all results from spec")
action_clear.add_argument("spec", type=str, help="Path to spec file")
args = parser.parse_args(args=argv)
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
elif args.quiet:
logging.basicConfig(level=logging.ERROR)
logging.getLogger().setLevel(logging.ERROR)
else:
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("vivisect").setLevel(logging.ERROR)
logging.getLogger("vivisect.base").setLevel(logging.ERROR)
logging.getLogger("vivisect.impemu").setLevel(logging.ERROR)
logging.getLogger("vtrace").setLevel(logging.ERROR)
logging.getLogger("envi").setLevel(logging.ERROR)
logging.getLogger("envi.codeflow").setLevel(logging.ERROR)
if args.action is None:
parser.print_help()
return -1
elif args.action == "create":
logger.info("action: create")
assert not os.path.exists(args.spec), "spec already exists"
spec = Spec(sample=os.path.relpath(args.sample, os.path.dirname(args.spec)))
with open(args.spec, "wb") as f:
f.write(spec.json().encode("utf-8"))
logger.info("wrote spec to: %s", args.spec)
elif args.action == "record":
logger.info("action: record")
spec = Spec.parse_file(args.spec)
assert args.version not in map(lambda result: result.version, spec.results), "version already recorded"
logger.info("spec: %s", os.path.abspath(args.spec))
logger.info("sample: %s", os.path.abspath(spec.resolve_sample_path(args.spec)))
t0 = time.time()
try:
vw = viv_utils.getWorkspaceFromFile(spec.resolve_sample_path(args.spec))
except Exception as e:
t1 = time.time()
duration = float(t1 - t0)
status = "error: %s" % (str(e))
cov = CoverageResult(
version=args.version,
status=status,
duration=duration,
)
print(blue(cov.version) + " in %.2fs" % cov.duration)
print(red(status))
else:
t1 = time.time()
duration = float(t1 - t0)
status = "ok"
cov = compute_coverage_result(args.version, vw, duration, status)
print(blue(cov.version) + " in %.2fs" % cov.duration)
print(" memory map:")
for entry in cov.memory_map:
print(" %s" % (str(entry)))
for k in cov.__fields__.keys():
if not k.endswith("_count"):
continue
print(" %s %d" % ((k + ":").ljust(20), getattr(cov, k)))
spec.results.append(cov)
with open(args.spec, "wb") as f:
f.write(spec.json().encode("utf-8"))
logger.info("wrote spec to: %s", args.spec)
elif args.action == "display":
logger.info("action: display")
spec = Spec.parse_file(args.spec)
cov_by_version = {cov.version: cov for cov in spec.results}
versions = sorted(cov_by_version.keys())
for i, version in enumerate(versions):
cov = cov_by_version[version]
if cov.status != "ok":
print(blue(cov.version) + " in %.2fs" % cov.duration)
print(" " + red(cov.status))
print()
continue
prior = None
for prior_index in range(i - 1, -1, -1):
# find most recent result that was successful
prior = cov_by_version[versions[prior_index]]
if prior.status == "ok":
break
prior = None
if prior is None:
# no prior version to diff against
print(blue(cov.version) + " in %.2fs" % cov.duration)
print(" memory map:")
for entry in cov.memory_map:
print(" %s" % (str(entry)))
for k in cov.__fields__.keys():
if not k.endswith("_count"):
continue
print(" %s %d" % ((k + ":").ljust(20), getattr(cov, k)))
else:
# diff against prior
found_change = False
duration_delta = cov.duration - prior.duration
if -0.1 < duration_delta < 0.1:
# no change
print(blue(cov.version) + " in %.2fs" % cov.duration)
elif duration_delta < -0.1:
# got faster
print(blue(cov.version) + " in %.2fs (%s)" % (cov.duration, green("-%.2fs" % abs(duration_delta))))
elif duration_delta > +0.1:
# got slower
print(blue(cov.version) + " in %.2fs (%s)" % (cov.duration, red("+%.2fs" % abs(duration_delta))))
else:
raise RuntimeError("impossible")
if cov.memory_map != prior.memory_map:
found_change = True
print(" memory map:")
for entry, prior_entry in zip(cov.memory_map, prior.memory_map):
if entry != prior_entry:
print((" %s" % (str(entry))).ljust(28) + "\t(" + red("changed") + ")")
else:
print(" %s" % (str(entry)))
for k in cov.__fields__.keys():
if not k.endswith("_count"):
continue
val = getattr(cov, k)
prior_val = getattr(prior, k)
delta = val - prior_val
if delta == 0:
# same results
pass
elif delta > 0:
# more results
found_change = True
print(" %s %d\t(%s)" % ((k + ":").ljust(20), val, green("+" + str(delta))))
elif delta < 0:
# fewer results
found_change = True
print(" %s %d\t(%s)" % ((k + ":").ljust(20), val, red("-" + str(abs(delta)))))
else:
raise RuntimeError("impossible")
if not found_change:
print(" no change.")
print()
elif args.action == "clear":
logger.info("action: clear")
assert os.path.exists(args.spec), "spec does not exist"
spec = Spec.parse_file(args.spec)
with open(args.spec, "wb") as f:
f.write(Spec(sample=spec.sample).json().encode("utf-8"))
logger.info("wrote spec to: %s", args.spec)
else:
raise NotImplementedError(args.action)
return 0
if __name__ == "__main__":
sys.exit(main())
@williballenthin
Copy link
Author

for reference, commands for bulk recording results:

for F in tests/data/*.elf_; do B="$(basename "$F")"; echo "$B"; python scripts/compare-viv-analysis.py create regression/"$B".json "$F"; done
for TAG in v1.0.0 v1.0.1 v1.0.2 v1.0.3 v1.0.4 master; do cd ~/code/vivisect/; git checkout "$TAG"; cd ~/code/capa-pub; for SPEC in regression/*; do echo "$TAG $SPEC"; python scripts/compare-viv-analysis.py record "$SPEC" "$TAG"; done; done; sed -i 's/"master/"zmaster/g' regression/*.json

python scripts/compare-viv-analysis.py display regression/7351f8a40c5450557b24622417fc478d.elf_.json
python scripts/compare-viv-analysis.py display regression/9fd556b58393b71d64da277c8f782ef4d1dccffd007495709375d2efa85d1248.elf_.json

@williballenthin
Copy link
Author

example output:

image

@williballenthin
Copy link
Author

williballenthin commented Aug 26, 2021

record results from each commit since v1.0.0

git checkout master && git log --decorate=full v1.0.0..HEAD | grep "^commit" | choose 1 | tac | /bin/cat --number | sed -e 's/^    //g' -e 's/^  /00/g' -e 's/^ /0/g' | while read -r LINE; do I=$(echo "$LINE" | choose 0); COMMIT=$(echo "$LINE" | choose 1); echo "$COMMIT $I"; git checkout "$COMMIT"; for SPEC in regression/*; do python ~/code/capa-pub/scripts/compare-viv-analysis.py record "$SPEC" "$I $COMMIT"; done; done;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment