Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@mrmekon
Created November 8, 2011 15:40
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mrmekon/1348090 to your computer and use it in GitHub Desktop.
Save mrmekon/1348090 to your computer and use it in GitHub Desktop.
PGP Key Extractor -- A (partial) Python implementation of OpenPGP
#!/usr/bin/env python2.7
#
# PGP Key Extractor -- A (partial) Python implementation of OpenPGP
#
# Copyright 2011 Trevor Bentley
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# DESCRIPTION:
#
# A quick-n-dirty python script that breaks a PGP message up into packets,
# and decodes the secret key message.
#
# This was just a research project into how PGP messages are formatted. It
# is full of bad design and magic numbers, and supports very few of the
# permitted PGP combinations.
#
# The only combination expected to work is a secret key using the DSA algorithm,
# encrypted with 3DES, and hashing the passphrase with 'iterated+salted' SHA1.
# Compressed data must be ZIP or ZLIB format.
#
# There is nothing secure about this program. Your secret key will probably
# end up stored in RAM unencrypted after running this. Don't run it on
# a key you care about if you think infiltrators are scouring your RAM.
#
# Export your secret key to a file with:
# $ gpg --export-secret-key 'Your Name' > seckey.pgp
#
# Run script with:
# $ ./pgp_key_extract seckey.pgp file_to_decrypt.pgp
#
# Depends on PyCrypto
# Also depends on a non-portable C extension to interact with libgcrypt.
#
import sys
import struct
import math
from Crypto.Hash import SHA
from Crypto.Cipher import DES3
import getpass
import spam
import zlib
import tempfile
global packetList
packetList = []
class PacketHeader:
'''Represents the header of a PGP packet'''
packetTagStrings = {
0:"Reserved",
1:"PUB ENC Session",
2:"Signature",
3:"SYM ENC Session",
4:"One-Pass Signature",
5:"Secret Key",
6:"Public Key",
7:"Secret Subkey",
8:"Compressed Data",
9:"SYM ENC Data",
10:"Marker",
11:"Literal Data",
12:"Trust",
13:"User ID",
14:"Public Subkey",
17:"User Attribute",
18:"SYM ENC INTEG Data",
19:"Modification Detection Code",
}
'''Packet tag identifies the type of packet'''
def __init__(self):
'''Set required fields to None'''
self.rawPacketTagByte = None
self.newStyle = None
self.tag = None
self.headerLength = None
self.length = None
self.isPartial = None
def loadHeaderFromFile(self, f):
'''Load a packet header from an open file'''
tagByte = f.read(1)
if (len(tagByte) == 0):
return False
tagByte = ord(tagByte)
if (not (tagByte & 0x80)):
raise Exception("INVALID TAG BYTE: 0x%x" % tagByte)
self.rawPacketTagByte = tagByte
self.newStyle = tagByte & 0x40
if (self.newStyle):
self.tag = (tagByte) & 0x1F
else:
self.tag = (tagByte >> 2) & 0x0F
self.loadLengthFromFile(f)
return True
def loadLengthFromFile(self, f):
'''Call appropriate function to read length (variable length)'''
if (self.newStyle):
self.length = self.loadNewLengthFromFile(f)
else:
lentype = self.rawPacketTagByte & 0x03
if (lentype == 0):
self.headerLength = 2
elif (lentype == 1):
self.headerLength = 3
elif (lentype == 2):
self.headerLength = 5
else:
self.headerLength = 1
self.length = 0
return
self.length = self.loadOldLengthFromFile(f)
def loadNewLengthFromFile(self, f):
'''For new-style packets, value of each byte tells us how many more to read'''
self.isPartial = False
bytes = f.read(1)
val = ord(bytes[0])
if (val <= 191): # one byte length
self.headerLength = 2
return val
elif (val >= 192 and val <= 223): # two byte length
self.headerLength = 3
bytes += f.read(1)
val = ((val-192)<<8)+ord(bytes[0])+192
return val
elif (val == 255): # 4 byte length
self.headerLength = 6
bytes = f.read(4)
val = ord(bytes[0])<<24 | ord(bytes[1])<<16 | ord(bytes[2])<<8 | ord(bytes[3])
#val = ord(bytes[0])<<0 | ord(bytes[1])<<8 | ord(bytes[2])<<16 | ord(bytes[3])<<24
return val
else:
# Oh joy -- "partial length header".
self.headerLength = 2
self.isPartial = True
bytes = 1 << (val & 0x1F)
return bytes
def loadOldLengthFromFile(self, f):
'''For old style packets, bits in tag tell us how many bytes to read'''
numbytes = self.headerLength - 1
bytes = f.read(numbytes)
val = 0
for i in range(numbytes):
val <<= 8
val += ord(bytes[i])
return val
def tagString(self):
'''Print string description of header tag'''
try:
return PacketHeader.packetTagStrings[self.tag]
except KeyError:
return "UNKNOWN"
def __str__(self):
'''Print formatted description of this header'''
return "HEADER TYPE (%s) HEADER SIZE (%d) DATA LEN (%d)" % (self.tagString(),
self.headerLength,
self.length)
class Packet:
'''Stores content of a PGP packet, and a copy of its header'''
algorithmStrings = {
1:"RSA",
2:"RSA Encrypt-Only",
3:"RSA Sign-Only",
16:"Elgamal",
17:"DSA",
18:"Elliptic Curve",
19:"ECDSA",
20:"Elgamal OLD",
21:"Diffie-Hellman",
}
'''Asymmetric ciphers'''
encryptionStrings = {
0:"Plaintext",
1:"IDEA",
2:"TripleDES",
3:"CAST5",
4:"Blowfish",
7:"AES-128",
8:"AES-192",
9:"AES-256",
10:"Twofish",
}
'''Symetric ciphers'''
hashStrings = {
1:"MD5",
2:"SHA-1",
3:"RIPE-MD/160",
8:"SHA256",
9:"SHA384",
10:"SHA512",
11:"SHA224",
}
'''Hash algorithms'''
compressedStrings = {
0:"Uncompressed",
1:"ZIP",
2:"ZLIB",
3:"BZip2"
}
def __init__(self):
'''Create empty packet with an empty header'''
self.header = PacketHeader()
self.data = None
def loadPacketFromFile(self, f):
'''Fills in a packet from contents of a file. Starts with the header.'''
if (not self.header.loadHeaderFromFile(f)):
return False
if (self.header.length > 0):
self.data = f.read(self.header.length)
while (self.header.isPartial):
bytes = self.header.loadNewLengthFromFile(f)
self.header.length += bytes
self.data += f.read(bytes)
else:
self.data = f.read(1024*1024*1024)
print self.header
if (self.header.tag == 5 or self.header.tag == 7): # Secret key
self.loadSecretKeyFromPacket(self.data)
elif (self.header.tag == 1): # Pub key encrypted session key
self.loadSessionKey(self.data)
elif (self.header.tag == 18): # sym enc int data packet
self.loadEncryptedDataPacket(self.data)
elif (self.header.tag == 8): # Compressed
self.loadCompressedPacket(self.data)
elif (self.header.tag == 11): # Literal data
self.loadLiteralDataPacket(self.data)
return True
def loadLiteralDataPacket(self,data):
print "Literal Data packet"
idx = 0
self.format = ord(data[idx])
idx += 1
print data[0:64]
f = open("./pgp_extract.out", "w")
f.write(data)
f.close()
def loadCompressedPacket(self,data):
print "Compressed packet"
idx = 0
self.algo = ord(data[idx])
idx += 1
print self.compressedString()
uncompressed = None
if (self.algo == 1): # ZIP
# Magic "wbits=-15" tells it to do a raw decompress without
# ZIP headers and that nonsense
uncompressed = zlib.decompress(data[idx:], -15)
elif (self.algo == 2): # ZLIB
uncompressed = zlib.decompress(data[idx:])
if (uncompressed != None):
tfile = tempfile.TemporaryFile()
tfile.write(uncompressed)
tfile.seek(0)
loadPacketsFromFileIntoList(tfile)
def loadEncryptedDataPacket(self, data):
idx = 0
self.version = ord(data[idx])
idx += 1
self.encdata = data[idx:]
idx += len(self.encdata)
# find symmetric key from session packet
p = getPacketWithSessionKey()
if (p == None):
raise Exception("No valid session key found! Password wrong?")
result = spam.decryptData('\x00'*16, p.sessionkey, self.encdata, p.algo)
if (result[14:16] != result[16:18]):
raise Exception("Decrypted data invalid!");
result = result[18:]
tfile = tempfile.TemporaryFile()
tfile.write(result)
tfile.seek(0)
loadPacketsFromFileIntoList(tfile)
def loadSessionKey(self, data):
self.sessionkey = None
idx = 0
self.version = ord(data[idx])
idx += 1
self.keyid = data[idx:idx+8]
idx += 8
print " * Encrypted with key: 0x%X" % struct.unpack(">Q", self.keyid)
self.algo = ord(data[idx])
idx += 1
print " * Algorithm: %s" % self.algoString()
p = getPacketWithKeyId(self.keyid)
if (p and p.isValid):
print " * Found matching secret key!"
else:
print " * No matching key."
return
self.encdata1 = self.readMPIFromBuffer(data[idx:])
idx += len(self.encdata1)
self.encdata2 = self.readMPIFromBuffer(data[idx:])
idx += len(self.encdata2)
if p.algo != 16:
raise Exception("Only supporting Elgamal sessions at the moment...")
if p.algo == 16: # Elgamal
self.frame = spam.decryptElgamalSessionKey(p.p,p.g,p.y,p.x,self.encdata1, self.encdata2)
n = 0
n += 2 # skip the length bytes of MPI
# Later versions encode the DEK like this:
# 0 2 RND(n bytes) 0 A DEK(k bytes) CSUM(2 bytes)
# Preceding 0 is stripped by libgcrypt already
if (ord(self.frame[n]) != 2):
raise Exception("Invalid session key!")
# Determine 'n' by counting until we hit the first 0
n += 1
while (n < len(self.frame) and ord(self.frame[n]) != 0):
n += 1
n += 1
# Keylength is frame size minus:
# * 1 (algorithm)
# * 2 (checksum)
self.keylen = len(self.frame) - n - 3
self.algo = ord(self.frame[n])
n += 1
self.sessionkey = self.frame[n:-2]
self.keychecksum = struct.unpack(">H",self.frame[-2:])[0]
checksum = sum([ord(x) for x in self.sessionkey]) % 65536
if (self.keychecksum != checksum):
raise Exception("Session key checksum mismatch!")
print " * Session key: ",
for x in self.sessionkey: print "%.2x"%ord(x),
print ""
def loadSecretKeyFromPacket(self, data):
'''Load contents of a secret key -- decrypt encrypted contents.'''
self.isValid = False
# Key version
idx = 0
if (ord(data[idx]) != 4):
raise Exception("VERSION %c KEYS UNSUPPORTED!" % data[idx])
self.version = ord(data[idx])
idx += 1
# Creation time
self.creationTime = struct.unpack(">L", data[idx:idx+4])[0]
idx += 4
# Public key algorithm
self.algo = ord(data[idx])
idx += 1
print " * Algorithm: %s" % self.algoString()
# Read MPIs for the algorithm
if (self.algo == 17): #DSA
# prime p
print "p idx: %u" % idx
self.p = self.readMPIFromBuffer(data[idx:])
idx += len(self.p)
# order q
print "q idx: %u" % idx
self.q = self.readMPIFromBuffer(data[idx:])
idx += len(self.q)
# generator g
self.g = self.readMPIFromBuffer(data[idx:])
idx += len(self.g)
# value y
self.y = self.readMPIFromBuffer(data[idx:])
idx += len(self.y)
pass
elif (self.algo == 1): #RSA
# mod n
# exp e
pass
elif (self.algo == 16): #Elgamal
# prime p
self.p = self.readMPIFromBuffer(data[idx:])
idx += len(self.p)
# generator g
self.g = self.readMPIFromBuffer(data[idx:])
idx += len(self.g)
# pub key val y
self.y = self.readMPIFromBuffer(data[idx:])
idx += len(self.y)
else:
raise Exception("Unsupported key algorithm %s (%d)" % (self.algoString(self.algo), self.algo))
# String-to-Key usage (tells whether key is encrypted)
self.s2k = ord(data[idx])
idx += 1
if (self.s2k == 0):
print " * Encryption: None"
self.encryption = 0
elif (self.s2k == 255 or self.s2k == 254):
# Symmetric encryption algorithm
self.encryption = ord(data[idx])
idx += 1
print " * Encryption: %s" % self.encryptionString()
else:
self.encryption = self.s2k
print " * Encryption: %s" % self.encryptionString()
if (self.encryption != 0): # Key is encrypted -- decrypt
# String-to-key specifier type
self.specifier = ord(data[idx])
idx += 1
print " * Specifier: %d" % self.specifier
# S2K hash algorithm
self.hash = ord(data[idx])
idx += 1
print " * Hash: %s" % self.hashString()
if (self.specifier == 1): # salted
self.salt = struct.unpack(">Q", data[idx:idx+8])[0]
idx += 8
print " * Salt: 0x%X" % self.salt
elif (self.specifier == 3): # salted and iterated
# Read salt
self.salt = struct.unpack(">Q", data[idx:idx+8])[0]
idx += 8
print " * Salt: 0x%X" % self.salt
# Read salt count (number of bytes to hash)
self.s2k_count = ord(data[idx])
idx += 1
print " * Salt count: %d" % (self.s2k_count)
# Apply s2k formulat to convert count to number of bytes
self.hash_bytes = (16+(self.s2k_count&15)) << ((self.s2k_count>>4) + 6)
print " * Hash bytes: %d" % self.hash_bytes
if (self.hash == 2): # sha-1
self.iv = struct.unpack(">Q", data[idx:idx+8])[0]
idx += 8
print " * IV: 0x%X" % self.iv
# Get password from user
password = getpass.getpass(" * Enter secret key passphrase: ")
# Concatenate salt and passphrase
hashchunk = struct.pack(">Q",self.salt) + password
# Copy hashchunk until we reach the right number of bytes to hash
hashval = hashchunk*(self.hash_bytes/len(hashchunk))
hashval += hashchunk[0:self.hash_bytes%len(hashchunk)]
# Perform SHA hash
self.hashresult = SHA.new(hashval).digest()
if (self.encryption == 3): #CAST5
self.hashresult = self.hashresult[0:16]
elif (self.encryption == 2): # 3DES
# Documented in RFC 4880 3.7.1.1
# Append '0' byte to the front of the full
# hash value, hash it, and take the first 4 bytes to pad the 20-byte
# SHA-1 result up to 24-bytes for DES3's key.
hashval = '\x00' + hashval
self.hashresult += SHA.new(hashval).digest()[0:4]
print " * Cipher Key: ",
for x in self.hashresult: print "%x"%ord(x),
print ""
# Read secret MPIs
if (self.algo == 17 or self.algo == 16): # DSA or Elgamal
# exponent x
if (self.encryption == 0): # not encrypted, just read
self.x = self.readMPIFromBuffer(data[idx:])
idx += len(self.x)
else: # encrypted, must decrypt
enc_x = data[idx:]
idx += len(enc_x)
# Decrypt in Python. This does NOT work
#des = DES3.new(self.hashresult, DES3.MODE_CFB, struct.pack(">Q",self.iv))
#plaintext = des.decrypt(enc_x)
#print " * Plaintext: ",
#for x in plaintext: print "%.2x"%ord(x),
#print ""
#self.x = self.readMPIFromBuffer(plaintext)
#print " * Unencrypted x: ",
#for x in self.x: print "%.2x"%ord(x),
#print ""
#checkhash = SHA.new(plaintext[:-20]).digest()
#decrypted_hash = plaintext[-20:]
#if (checkhash != decrypted_hash):
# print "ERROR: Encrypted hash does not match!"
# Decrypt in libgcrypt (c extension). This DOES work.
plaintext = spam.decryptSecretKey(struct.pack(">Q",self.iv),self.hashresult,enc_x,self.encryption)
print " * Plaintext: ",
for x in plaintext: print "%.2x"%ord(x),
print ""
self.x = self.readMPIFromBuffer(plaintext)
print " * Unencrypted x: ",
for x in self.x: print "%.2x"%ord(x),
print ""
checkhash = SHA.new(plaintext[:-20]).digest()
decrypted_hash = plaintext[-20:]
if (checkhash != decrypted_hash):
print "ERROR: Encrypted hash does not match!"
else:
self.isValid = True
if (idx != len(data)):
raise Exception("Finished decoding, but packet still has data!")
else:
raise Exception("Algorithm %d not supported" % self.algo)
# Calculate fingerprint
# version, creation time, algorithm, length of all mpis
fingerLength = None
fingerData = None
if self.algo == 17:
fingerLength = 6 + \
len(self.p) + \
len(self.q) + \
len(self.g) + \
len(self.y)
fingerData = '\x99' + chr((fingerLength >> 8)&0xFF) + \
chr(fingerLength & 0xFF) + \
chr(self.version) + \
struct.pack(">L",self.creationTime) + \
chr(self.algo) + \
self.p + self.q + self.g + self.y
elif self.algo == 16:
fingerLength = 6 + \
len(self.p) + \
len(self.g) + \
len(self.y)
fingerData = '\x99' + chr((fingerLength >> 8)&0xFF) + \
chr(fingerLength & 0xFF) + \
chr(self.version) + \
struct.pack(">L",self.creationTime) + \
chr(self.algo) + \
self.p + self.g + self.y
if fingerData != None:
self.fingerprint = SHA.new(fingerData).digest()
print " * Fingerprint: ",
for x in self.fingerprint: print "%.2x"%ord(x),
print ""
def readMPIFromBuffer(self, data):
'''Reads a multi-precision integer from a buffer of bytes.'''
# First two bytes are number of bits to read
bits = struct.unpack(">H", data[0:2])[0]
print " * MPI bits: %d" % bits
# Convert bits to bytes, add 2 for the header
bytes = int((bits+7)/8)+2
return data[0:bytes]
def algoString(self):
'''Convert asymmetric algorithm index to string'''
try:
return Packet.algorithmStrings[self.algo]
except Exception:
return "UNKNOWN - %d" % self.algo
def encryptionString(self):
'''Convert symmetric algorithm index to string'''
try:
return Packet.encryptionStrings[self.encryption]
except Exception:
return "UNKNOWN - %d" % self.encryption
def hashString(self):
'''Convert hash algorithm index to string'''
try:
return Packet.hashStrings[self.hash]
except Exception:
return "UNKNOWN - %d" % self.hash
def compressedString(self):
'''Convert asymmetric algorithm index to string'''
try:
return Packet.compressedStrings[self.algo]
except Exception:
return "UNKNOWN - %d" % self.algo
def getPacketWithKeyId(keyid):
global packetList
for p in packetList:
if p.header.tag == 5 or p.header.tag == 7:
if (p.fingerprint[-8:] == keyid):
return p
return None
def getPacketWithSessionKey():
global packetList
for p in packetList:
if p.header.tag == 1:
if (p.sessionkey != None):
return p
return None
def loadPacketsFromFileIntoList(f):
global packetList
print "Packets:"
while (True):
p = Packet()
if (p.loadPacketFromFile(f) == False):
break
packetList.append(p)
print "Bytes read: %d" % f.tell()
print "DONE!"
def readPacketsFromFile(filename):
global packetList
f = None
try:
f = open(filename, "r")
if (f == None):
raise Exception("Unable to open file: %s" % filename)
except:
raise
print "Analyzing file: %s" % filename
print ""
# Iterate over packets in the PGP message
loadPacketsFromFileIntoList(f)
f.close()
if __name__ == "__main__":
print "-----------------------------------------"
print "PGP Key Extractor -- Trevor Bentley, 2011"
print "-----------------------------------------"
# Takes a file containing a secret key as its first argument
if (len(sys.argv) < 2):
raise Exception("No PGP key file given!")
for i in range(len(sys.argv)-1):
readPacketsFromFile(sys.argv[i+1])
#!/usr/bin/env python2.7
#
# PGP Key Extractor -- A (partial) Python implementation of OpenPGP
#
# Copyright 2011 Trevor Bentley
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from distutils.core import setup, Extension
module1 = Extension('spam',
include_dirs = ['/opt/local/include'],
libraries = ['gcrypt', 'gpg-error'],
library_dirs = ['/opt/local/lib'],
sources = ['spam.c'])
setup (name = 'PackageName',
version = '1.0',
description = 'This is a demo package',
ext_modules = [module1])
/* Passes buffers from Python to libgcrypt's gcry_cipher_decrypt() function.
*
* This is a nasty hack to tie Python OpenPGP implementation to
* libgcrypt with C. Named stupidly, uncommented, no error checking,
* and only supports very specific combinations.
*
* Use with pgp_key_extract.py
*
* PGP Key Extractor -- A (partial) Python implementation of OpenPGP
*
* Copyright 2011 Trevor Bentley
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Code copied from Python C-extension example. Didn't bother renaming
* anything. Probably not portable. Tested on OS X 10.6.1.
*/
#include <Python.h>
#include <gcrypt.h>
#include <gpg-error.h>
#include <stdio.h>
static PyObject *decryptSecretKey(PyObject *self, PyObject *args);
static PyObject* decryptElgamalSessionKey(PyObject *self,
PyObject *args);
static PyObject *decryptData(PyObject *self, PyObject *args);
static PyMethodDef SpamMethods[] = {
{"decryptSecretKey", decryptSecretKey, METH_VARARGS,
"Decrypt a secret key packet with S2K cipher."},
{"decryptElgamalSessionKey", decryptElgamalSessionKey,
METH_VARARGS, "Decrypt an ElGamal encrypted session key."},
{"decryptData", decryptData,
METH_VARARGS, "Decrypt data packet"},
{NULL, NULL, 0, NULL} /* Sentinel */
};
PyMODINIT_FUNC
initspam(void)
{
(void) Py_InitModule("spam", SpamMethods);
}
static PyObject* decryptElgamalSessionKey(PyObject *self,
PyObject *args) {
const Py_buffer p,g,y,x,d1,d2;
gcry_sexp_t key, data, result;
gcry_mpi_t mpi_p,mpi_g,mpi_y,mpi_x,mpi_dat1,mpi_dat2;
char *retval;
unsigned long retval_len = 0;
gcry_mpi_t cipher_key;
int i;
if (!PyArg_ParseTuple(args, "s*s*s*s*s*s*", &p, &g, &y, &x, &d1, &d2))
return NULL;
gcry_mpi_scan (&mpi_p, GCRYMPI_FMT_PGP, p.buf, p.len, NULL);
gcry_mpi_scan (&mpi_g, GCRYMPI_FMT_PGP, g.buf, g.len, NULL);
gcry_mpi_scan (&mpi_y, GCRYMPI_FMT_PGP, y.buf, y.len, NULL);
gcry_mpi_scan (&mpi_x, GCRYMPI_FMT_PGP, x.buf, x.len, NULL);
gcry_mpi_scan (&mpi_dat1, GCRYMPI_FMT_PGP, d1.buf, d1.len, NULL);
gcry_mpi_scan (&mpi_dat2, GCRYMPI_FMT_PGP, d2.buf, d2.len, NULL);
gcry_sexp_build(&key, NULL,
"(private-key(elg(p%m)(g%m)(y%m)(x%m)))",
mpi_p, mpi_g, mpi_y, mpi_x);
gcry_sexp_build (&data, NULL,
"(enc-val(elg(a%m)(b%m)))", mpi_dat1, mpi_dat2);
gcry_pk_decrypt (&result, data, key);
cipher_key = gcry_sexp_nth_mpi (result, 0, GCRYMPI_FMT_STD);
for (i=0;i<64;i++) {
if (i&&i%16==0)printf("\n");
printf("%.2X ",((unsigned char*)result)[i]);
}
printf("\n\n");
for (i=0;i<64;i++) {
if (i&&i%16==0)printf("\n");
printf("%.2X ",((unsigned char*)cipher_key)[i]);
}
printf("\n");
gcry_mpi_print(GCRYMPI_FMT_PGP, NULL, 0, &retval_len, cipher_key);
retval = malloc(retval_len);
gcry_mpi_print(GCRYMPI_FMT_PGP, retval, retval_len, NULL, cipher_key);
gcry_sexp_release(key);
gcry_sexp_release(data);
gcry_sexp_release(result);
return Py_BuildValue("s#", retval, retval_len);
}
static PyObject *decryptData(PyObject *self, PyObject *args)
{
const Py_buffer iv;
const Py_buffer key;
const Py_buffer encdata;
unsigned char *data;
unsigned long i;
unsigned long algo;
int sts;
gcry_cipher_hd_t cipher_hd;
gcry_error_t err;
if (!PyArg_ParseTuple(args, "s*s*s*l", &iv, &key, &encdata, &algo))
return NULL;
/* Open a cipher object with mode TripleDES */
err = gcry_cipher_open (&cipher_hd,
GCRY_CIPHER_AES256,
GCRY_CIPHER_MODE_CFB,
(GCRY_CIPHER_SECURE | GCRY_CIPHER_ENABLE_SYNC |
GCRY_CIPHER_ENABLE_SYNC));
if (err != GPG_ERR_NO_ERROR)
printf("Error opening cipher!\n");
printf("Opened cipher\n");
/* Set TripleDES key */
err = gcry_cipher_setkey (cipher_hd, key.buf, key.len);
if (err != GPG_ERR_NO_ERROR) {
printf("Error setting key\n");
}
printf("Set key\n");
/* Set TripleDES IV */
err = gcry_cipher_setiv ( cipher_hd, iv.buf, iv.len );
if (err != GPG_ERR_NO_ERROR) {
printf("Error setting iv\n");
}
printf("Set IV\n");
for (i=0;i<64;i++) {
if (i&&i%16==0)printf("\n");
printf("%.2X ",((unsigned char*)encdata.buf)[i]);
}
printf("\n");
/* Run decryption */
data = malloc(encdata.len);
if (data == NULL) return NULL;
//err = gcry_cipher_decrypt ( cipher_hd, data, encdata.len, encdata.buf, encdata.len );
err = gcry_cipher_decrypt ( cipher_hd, data, encdata.len, encdata.buf, 18 );
if (err != GPG_ERR_NO_ERROR) {
printf("Error decrypting\n");
}
//gcry_cipher_sync (cipher_hd);
for (i=0;i<64;i++) {
if (i&&i%16==0)printf("\n");
printf("%.2X ",((unsigned char*)(encdata.buf+18))[i]);
}
printf("\n");
err = gcry_cipher_decrypt ( cipher_hd, data+18, encdata.len-18, encdata.buf+18, encdata.len-18 );
if (err != GPG_ERR_NO_ERROR) {
printf("Error decrypting\n");
}
printf("Ran decrypt\n");
printf("Data count: %d\n", encdata.len);
for (i=0;i<64;i++) {
if (i&&i%16==0)printf("\n");
printf("%.2X ",((unsigned char*)data)[i]);
}
printf("\n");
/* Return decrypted data */
return Py_BuildValue("s#", data, encdata.len);
}
static PyObject *
decryptSecretKey(PyObject *self, PyObject *args)
{
const Py_buffer iv;
const Py_buffer key;
const Py_buffer encdata;
unsigned char data[2048];
unsigned long pgp_cipher, gcrypt_cipher;
unsigned long i;
int sts;
gcry_cipher_hd_t cipher_hd;
gcry_error_t err;
if (!PyArg_ParseTuple(args, "s*s*s*l", &iv, &key, &encdata, &pgp_cipher))
return NULL;
switch (pgp_cipher) {
case 1: gcrypt_cipher = GCRY_CIPHER_IDEA; break;
case 2: gcrypt_cipher = GCRY_CIPHER_3DES; break;
case 3: gcrypt_cipher = GCRY_CIPHER_CAST5; break;
case 4: gcrypt_cipher = GCRY_CIPHER_BLOWFISH; break;
case 7: gcrypt_cipher = GCRY_CIPHER_AES128; break;
case 8: gcrypt_cipher = GCRY_CIPHER_AES192; break;
case 9: gcrypt_cipher = GCRY_CIPHER_AES256; break;
case 10: gcrypt_cipher = GCRY_CIPHER_TWOFISH; break;
default:
printf("ERROR: Invalid symmetric cipher!\n");
return NULL;
}
/* Open a cipher object with mode TripleDES */
err = gcry_cipher_open (&cipher_hd,
gcrypt_cipher,
GCRY_CIPHER_MODE_CFB,
(GCRY_CIPHER_SECURE | GCRY_CIPHER_ENABLE_SYNC));
if (err != GPG_ERR_NO_ERROR)
printf("Error opening cipher!\n");
/* Set TripleDES key */
err = gcry_cipher_setkey (cipher_hd, key.buf, key.len);
if (err != GPG_ERR_NO_ERROR) {
printf("Error setting key\n");
}
/* Set TripleDES IV */
err = gcry_cipher_setiv ( cipher_hd, iv.buf, iv.len );
if (err != GPG_ERR_NO_ERROR) {
printf("Error setting iv\n");
}
/* Run decryption */
err = gcry_cipher_decrypt ( cipher_hd, (char*)data, encdata.len, encdata.buf, encdata.len );
if (err != GPG_ERR_NO_ERROR) {
printf("Error decrypting\n");
}
/* Return decrypted data */
return Py_BuildValue("s#", data,encdata.len);
}
@mrmekon
Copy link
Author

mrmekon commented Nov 8, 2011

Build with:

python setup.py build
sudo python setup.py install

Export PGP key with:

gpg --export-secret-key "Your Full Name" > seckey.pgp

Encrypt a file with:

gpg --encrypt --recipient "Your Full Name" testfile.txt

Run with:

./pgp_key_extract.py seckey.pgp testfile.txt.pgp

@vsoch
Copy link

vsoch commented Mar 30, 2021

hey @mrmekon! I stumbled on your gist and have found it very helpful to walk through the reference document and understand what is going on. I have a question that you might be able to point me in the right direction for - given that we sign an object, e.g.,

$ gpg --sign tacos.txt

And then we provide it as an input to your script (modified to just read one, for example)

python modified_pgp_key_extract.py tacos.txt.gpg

what would be the process for doing the equivalent of gpg verify? It looks like I have the following packets:

HEADER TYPE (One-Pass Signature) HEADER SIZE (2) DATA LEN (13)
HEADER TYPE (Literal Data) HEADER SIZE (2) DATA LEN (22)
HEADER TYPE (Signature) HEADER SIZE (3) DATA LEN (563)
HEADER TYPE (Compressed Data) HEADER SIZE (1) DATA LEN (0)

It's definitely hard to find good documentation about the process (I've been looking at the source code for gpg but I'm not a C++/C programmer so I don't absorb a lot) so thank you for your help!

@NadAlaba
Copy link

NadAlaba commented Sep 21, 2022

@mrmekon When decrypting secret key, your code didn't work in python because the segment size of CFB mode in PyCryptodome (which is the new PyCrypto) defaults to 8 bits (CFB8). You can change this behavior by adding a segment_size parameter, and make it equal to the block size of the cipher. For example in AES it would be:
aes = AES.new(self.hashresult, AES.MODE_CFB, struct.pack(">Q",self.iv), segment_size=128)
I guess in DES it should be 64

@mrmekon
Copy link
Author

mrmekon commented Dec 8, 2022

I'm afraid I haven't thought about this thing in over a decade. I'll certainly leave it up for reference, but I can't remember anything about how it works and I'm not going to update it. Feel free to post any patches here if you make them, though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment