Skip to content

Instantly share code, notes, and snippets.

@vishvananda
Created May 17, 2011 19:15
Show Gist options
  • Save vishvananda/977159 to your computer and use it in GitHub Desktop.
Save vishvananda/977159 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3.1
import struct
class BootRecord:
def __init__(self):
self.rootdir_cluster = 0
def unpack(self, data):
offset = 0
(self.boot_instructions,
self.int13_flag,
self.oem_id,
self.bytes_per_sector,
self.sectors_per_cluster,
self.num_reserved_sectors,
self.num_fats,
self.num_rootdirs,
self.num_sectors,
self.media_descriptor,
self.sectors_per_fat,
self.sectors_per_head,
self.heads_per_cylinder,
self.hidden_sectors,
self.total_sectors
) = struct.unpack_from("<2sB8sHBHBHHBHHHLL", data)
print("num sectors", self.num_sectors)
offset += struct.calcsize("<2sB8sHBHBHHBHHHLL")
if self.is32():
(self.num_fat_sectors,
self.active_fat,
self.fat_version,
self.rootdir_cluster,
self.fsinfo_sector,
self.boot_sector_backup
) = struct.unpack_from("<LHHLHHx12", data, offset)
offset += struct.calcsize("<LHHLHHx12")
(self.drive_num,
self.boot_signature,
self.serial_number,
self.volume_label,
self.file_system_id
) = struct.unpack_from("<BxBL11s8s", data, offset)
print('offset', offset, 'filesystem', self.file_system_id)
offset += struct.calcsize("<BxBL11s8s")
self.rest_of_boot_sector = data[offset:-2]
(self.bootable_signature, ) = struct.unpack_from("<H", data, -2)
def pack(self):
data = bytearray(struct.pack("<2sB8sHBHBHHBHHHLL",
self.boot_instructions,
self.int13_flag,
self.oem_id,
self.bytes_per_sector,
self.sectors_per_cluster,
self.num_reserved_sectors,
self.num_fats,
self.num_rootdirs,
self.num_sectors,
self.media_descriptor,
self.sectors_per_fat,
self.sectors_per_head,
self.heads_per_cylinder,
self.hidden_sectors,
self.total_sectors,
))
if self.is32():
data.extend(struct.pack("<LHHLHHx12",
self.num_fat_sectors,
self.active_fat,
self.fat_version,
self.rootdir_cluster,
self.fsinfo_sector,
self.boot_sector_backup
))
data.extend(struct.pack("<BxBL11s8s",
self.drive_num,
self.boot_signature,
self.serial_number,
self.volume_label,
self.file_system_id
))
data.extend(self.rest_of_boot_sector)
data.extend(struct.pack("<H", self.bootable_signature))
return bytes(data)
def is32(self):
return self.num_sectors == 0
def getNumSectors(self):
return self.num_sectors if self.num_sectors else self.total_sectors
def getRootdirCluster(self):
return self.rootdir_cluster
def getSectorsPerFat(self):
return self.sectors_per_fat
class Fat:
def __init__(self):
self.offsets = []
def getNextCluster(self, cluster):
if self.offsets[cluster] >= self.max:
return 0
else:
return self.offsets[cluster]
def setNextCluster(self, cluster, next_cluster):
self.offsets[cluster] = next_cluster
def setLastCluster(self, cluster):
self.setNextCluster(cluster, self.max)
def isAllocated(self, cluster):
return self.offsets[cluster] != 0
def deallocateCluster(self, cluster):
self.setNextCluster(cluster, 0)
def allocateClusters(self, num_clusters, cluster = 0):
cur_cluster = cluster
prev_cluster = 0
while cur_cluster:
num_clusters -= 1
prev_cluster = cur_cluster
cur_cluster = self.getNextCluster(cur_cluster)
if num_clusters == 0:
self.setLastCluster(cur_cluster)
if num_clusters < 0:
self.deallocatecluster(cur_cluster)
if not self.hasEnoughFreeClusters(num_clusters):
raise IOError("Fat doesn't have " + num_clusters + " available")
for offset in range(num_clusters):
cur_cluster = self.getFreeClusterOffset()
if prev_cluster:
self.fat.setNextCluster(prev_cluster, cluster)
else:
cluster = cur_cluster
prev_cluster = cur_cluster
if cur_cluster:
self.setLastCluster(cur_cluster)
return cluster
def getFreeClusterOffset(self):
for cluster in range(len(self.offsets)):
if not self.isAllocated(cluster):
return cluster
def hasEnoughFreeClusters(self, num_clusters):
free_clusters = 0
for cluster in range(len(self.offsets)):
free_clusters += self.isAllocated(cluster)
if free_clusters >= num_clusters:
return True
class Fat12(Fat):
def __init__(self):
self.max = 0xff9
Fat.__init__(self)
def unpack(self, data):
nibbles = []
for entry in data:
nibbles.append(entry & 0xf)
nibbles.append((entry >> 4) & 0xf)
self.offsets = [(h << 8) | (m << 4) | l for
l, m, h in zip(nibbles[0::3],
nibbles[1::3],
nibbles[2::3])]
def pack(self):
nibbles = []
for entry in self.offsets:
nibbles.append(entry & 0xf)
nibbles.append((entry >> 4) & 0xf)
nibbles.append((entry >> 8) & 0xf)
return bytes([(h << 4) | l
for l, h in zip(nibbles[0::2],
nibbles[1::2])])
class Fat16(Fat):
def __init__(self):
self.max = 0xfff9
Fat.__init__(self)
def unpack(self, data):
for offset in range(len(data) // 2):
(value, ) = struct.unpack_from("<H", data, offset * 2)
self.offsets.append(value)
def pack(self):
buffer = bytearray()
for offset in self.offsets:
buffer.extend(struct.pack("<H", offset))
return bytes(buffer)
class Fat32(Fat):
def __init__(self):
self.max = 0xfffffff9
Fat.__init__(self)
def unpack(self, data):
for offset in range(len(data) // 4):
(value, ) = struct.unpack_from("<L", data, offset * 4)
self.offsets.append(value)
def pack(self):
buffer = bytearray()
for offset in self.offsets:
buffer.extend(struct.pack("<L", offset))
return bytes(buffer)
class BaseEntry:
def create(data, offset = 0):
if(data[offset + 11] == 0x0f):
return LfnEntry(data, offset)
else:
return DirEntry(data, offset)
def __init__(self, data = None, offset = 0):
if data == None: data = b'\x00' * FatDir.ENTRY_SIZE
self.unpack(data, offset)
def isLast(self):
return self.firstbyte == 0x00
def isFree(self):
return self.firstbyte == 0xe5 or self.isLast()
def isHidden(self):
return self.attributes & 0x02
def isDirectory(self):
return self.attributes & 0x10
def setDirectory(self):
self.attributes |= 0x10
def isLfn(self):
return self.attributes == 0x0f
def unpack(self, data, offset):
(self.firstbyte,
self.attributes
) = struct.unpack_from("<B10xB20x", data, offset)
class DirEntry(BaseEntry):
def __init__(self, data = None, offset = 0):
self.lfns = []
BaseEntry.__init__(self, data, offset)
def setDosname(self, dosname, extension):
self.dosname = dosname
self.firstbyte = self.dosname[0]
self.extension = extension
def getDosname(self):
return self.dosname, self.extension
def setFilename(self, filename):
checksum = self.crc()
sequence = 1
parts = [filename[i:i+13] for i in range(0, len(filename), 13)]
for characters in parts:
if len(characters) < 13 and characters[-1] != '\x00':
characters += '\x00'
lfn = LfnEntry()
lfn.setSequence(sequence)
lfn.checksum = checksum
self.lfns.append(lfn)
sequence += 1
lfn.setLast()
def getFilename(self):
long_file_name = (self.dosname.rstrip() +
((b'.' + self.extension.rstrip())
if len(self.extension.rstrip()) else b'')
).decode("ascii").lower()
checksum = self.crc()
sequence = 1
name = ''
for lfn in self.lfns:
if (not lfn.isLfn() or
lfn.isFree() or
lfn.checksum != checksum or
lfn.getSequence() != sequence ):
break
name += lfn.getLongCharacters()
sequence += 1
if lfn.isEnd():
long_file_name = name
break
return long_file_name
def crc(self):
shortname = self.dosname + self.extension
crc = 0
for char in shortname:
crc =(((crc<<7) | (crc>>1)) + char) & 0xff
return crc
def getCluster(self):
return (self.first_cluster_hi << 16) | self.first_cluster_lo
def setCluster(self, cluster):
self.first_cluster_lo = cluster & 0xffff
self.first_cluster_hi = cluster >> 16
def unpack(self, data, offset):
(self.dosname,
self.extension,
self.attributes,
self.creation_ms,
self.creation_time,
self.creation_date,
self.last_access_date,
self.first_cluster_hi,
self.last_write_time,
self.last_write_date,
self.first_cluster_lo,
self.file_size
) = struct.unpack_from("<8s3sBxBHHHHHHHL", data, offset)
self.firstbyte = self.dosname[0]
def pack(self):
data = b''
for lfn in reversed(self.lfns):
data += lfn.pack()
return data + struct.pack("<8s3sBxBHHHHHHHL", self.dosname,
self.extension,
self.attributes,
self.creation_ms,
self.creation_time,
self.creation_date,
self.last_access_date,
self.first_cluster_hi,
self.last_write_time,
self.last_write_date,
self.first_cluster_lo,
self.file_size
)
class LfnEntry(BaseEntry):
def __init__(self, data=None, offset = 0):
BaseEntry.__init__(self, data, offset)
self.attributes |= 0x0f
def getSequence(self):
return self.firstbyte & ~0x40
def setSequence(self, sequence):
if self.isEnd:
sequence |= 0x40
self.firstbyte = sequence
def isEnd(self):
return self.firstbyte & 0x40
def setEnd(self):
self.firstbyte |= 0x40
def getLongCharacters(self):
return (self.unicode1 + self.unicode2 + self.unicode3
).decode("utf-16").rstrip("\x00\uffff")
def setLongCharacters(self, characters):
self.unicode1 = characters[5].encode("utf-16").ljust(5, '\uffff')
self.unicode2 = characters[5:11].encode("utf-16").ljust(6, '\uffff')
self.unicode3 = characters[11:13].encode("utf-16").ljust(2, '\uffff')
def unpack(self, data, offset):
(self.firstbyte,
self.unicode1,
self.attributes,
self.checksum,
self.unicode2,
self.unicode3
) = struct.unpack_from("<B10sBxB12sxx4s", data, offset)
def pack(self):
return struct.pack("<B10sBxB12sxx4s", self.firstbyte,
self.unicode1,
self.attributes,
self.checksum,
self.unicode2,
self.unicode3
)
class FatDir:
ENTRY_SIZE = 32
def __init__(self, data = None, cluster=0, num_dirs=None):
if data != None:
if num_dirs == None:
num_dirs = len(data) // FatDir.ENTRY_SIZE
self.cluster = cluster
self.unpack(data, num_dirs)
self.dangling_lfns = []
def unpack(self, data, num_dirs):
self.entries = []
lfns = []
for n in range(num_dirs):
entry = BaseEntry.create(data, n * FatDir.ENTRY_SIZE)
if entry.isLast():
break
if entry.isLfn():
lfns.append(entry)
else:
lfns.reverse()
entry.lfns = lfns
lfns = []
self.entries.append(entry)
self.dangling_lfns = lfns
self.files = {}
for entry in self.entries:
if not entry.isFree():
self.files[entry.getFilename()] = entry
def pack(self):
data = b''
for entry in self.entries:
data += entry.pack()
for entry in self.dangling_lfns:
data += entry.pack()
return data
def dosParts(self, filename):
parts = filename.rpartition('.')
dosname = parts[0].upper()[:8].ljust(8).encode()
extension = parts[2].upper()[:3].ljust(3).encode()
num = 1
valid = False
while not valid:
for entry in self.entries:
if (dosname, extension) == entry.getDosname():
extra = b'~' + str(num).encode()
dosname = dosname[:-len(extra)] + extra
num += 1
break
else:
valid = True
return dosname, extension
def addEntry(self, filename, cluster, size, dir=False):
entry = DirEntry()
entry.setDosname(*self.dosParts(filename))
if dir:
entry.setDirectory()
entry.setCluster(cluster)
entry.file_size = size
self.entries.append(entry)
self.files[filename] = entry
class FatImage:
def __init__(self, buffer = None, fat = Fat12,
fat_number=0, bytes_per_sector=512):
self.buffer = buffer
self.fat_number = fat_number
self.bytes_per_sector = bytes_per_sector
self.bootRecord = BootRecord()
self.fat = fat()
self.rootdir = FatDir()
self.hidden_sectors = {}
self.clusters = {}
if self.buffer:
self.readMeta()
def read(self, buffer=None):
self.readMeta(buffer=buffer)
self.readClusters(buffer=buffer)
def readMeta(self, buffer=None):
self.bytes_per_sector = self._getSectorSize(buffer=buffer)
self.bootRecord.unpack(self._readSectors(0, 1, buffer=buffer))
rootdir_cluster = self.bootRecord.getRootdirCluster()
self.first_data_sector = (self.bootRecord.num_reserved_sectors +
self.bootRecord.getSectorsPerFat() *
self.bootRecord.num_fats)
if rootdir_cluster:
self.rootdir.cluster = rootdir_cluster
data = self._loadFromCluster(rootdir_cluster)
self.rootdir = self.rootdir.unpack(data)
else:
rootdir_offset = self.first_data_sector
rootdir_sectors = int(round(0.5 + self.bootRecord.num_rootdirs /
(self.bytes_per_sector / FatDir.ENTRY_SIZE)))
print(rootdir_sectors)
self.rootdir.unpack(self._readSectors(rootdir_offset,
rootdir_sectors,
buffer=buffer),
self.bootRecord.num_rootdirs)
self.first_data_sector += rootdir_sectors
if self.bootRecord.hidden_sectors:
for offset in range(self.bootRecord.hidden_sectors):
sector = self.first_data_sector + offset
self.hidden_sector[offset] = self._readSectors(sector,
1,
buffer=buffer)
self.first_data_sector += self.bootRecord.hidden_sectors
self.num_clusters = ((self.bootRecord.getNumSectors() -
self.first_data_sector)
// self.bootRecord.sectors_per_cluster)
if self.num_clusters < 4085:
self.fat = Fat12()
elif self.num_clusters < 65525:
self.fat = Fat16()
else:
self.fat = Fat32()
self.fat_offset = (self.bootRecord.num_reserved_sectors +
self.fat_number * self.bootRecord.getSectorsPerFat())
self.fat.unpack(self._readSectors(self.fat_offset,
self.bootRecord.getSectorsPerFat(),
buffer=buffer))
def readClusters(self, buffer=None):
for offset in range(self.num_clusters):
spc = self.bootRecord.sectors_per_cluster
self.clusters[offset] = self._readSectors(self.first_data_sector +
offset * spc,
spc,
buffer=buffer)
def write(self, buffer=None):
self.writeMeta(buffer=buffer)
self.writeClusters(buffer=buffer)
def writeMeta(self, buffer=None):
self._writeSectors(self.bootRecord.pack(), 0, 1, buffer=buffer)
sectors_per_fat = self.bootRecord.getSectorsPerFat()
for offset in range(self.bootRecord.num_fats):
self._writeSectors(self.fat.pack(),
self.bootRecord.num_reserved_sectors +
offset * sectors_per_fat,
sectors_per_fat,
buffer=buffer)
if not self.bootRecord.getRootdirCluster():
rootdir_offset = (self.bootRecord.num_reserved_sectors +
sectors_per_fat * self.bootRecord.num_fats)
rootdir_sectors = (self.bootRecord.num_rootdirs //
(self.bytes_per_sector // FatDir.ENTRY_SIZE))
self._writeSectors(self.rootdir.pack(),
rootdir_offset,
rootdir_sectors,
buffer=buffer)
if self.bootRecord.hidden_sectors:
for offset in range(self.bootRecord.hidden_sectors):
self._writeSectors(self.hidden_sector[offset],
self.first_data_sector + offset,
1, buffer=buffer)
def writeClusters(self, buffer=None):
spc = self.bootRecord.sectors_per_cluster
for offset, data in self.clusters.items():
self._writeSectors(data,
self.first_data_sector + offset * spc,
spc,
buffer=buffer)
# make sure to write the last cluster so buffer is proper size
last_cluster = self.num_clusters - 1
if not last_cluster in self.clusters:
empty_cluster = (b'\x00' * self.bytes_per_sector *
self.bootRecord.sectors_per_cluster)
self._writeSectors(empty_cluster,
self.first_data_sector + last_cluster * spc,
spc,
buffer=buffer)
def getAllFiles(self, dir=None):
if dir == None:
dir = self.rootdir
files = {}
for filename, entry in dir.files.items():
if entry.isDirectory():
if filename not in ['.', '..']:
cluster = entry.getCluster()
d = FatDir(self._loadFromCluster(cluster), cluster)
files[filename] = self.getAllFiles(d)
else:
files[filename] = self._loadFromCluster(entry.getCluster())
return files
def _getLogicalCluster(self, logical_cluster):
real_cluster = logical_cluster - 2
if not real_cluster in self.clusters:
# logical cluster 2 is the first actual data cluster
spc = self.bootRecord.sectors_per_cluster
sectors = self._readSectors(self.first_data_sector +
real_cluster * spc,
spc)
self.clusters[real_cluster] = sectors
return self.clusters[real_cluster]
def _setLogicalCluster(self, logical_cluster, data):
real_cluster = logical_cluster - 2
clen = self.bootRecord.sectors_per_cluster * self.bytes_per_sector
data = data + b'\x00' * (clen - len(data))
self.clusters[real_cluster] = data
def _loadFromCluster(self, cluster):
output = b''
while cluster != 0:
output += self._getLogicalCluster(cluster)
cluster = self.fat.getNextCluster(cluster)
return output
def _writeToCluster(self, cluster, data):
clen = self.bootRecord.sectors_per_cluster * self.bytes_per_sector
offset = 0
while cluster != 0:
self._setLogicalCluster(cluster, data[offset * clen:
(offset + 1) * clen])
offset += 1
cluster = self.fat.getNextCluster(cluster)
def _getFile(self, filename, parent_dir=None, dir=False):
if dir and filename == '':
return self.rootdir
if '/' in filename:
parts = filename.partition('/')
return self._getFile(parts[2],
self._getFile(parts[0], parent_dir, True),
dir)
if parent_dir == None:
parent_dir = self.rootdir
if filename not in parent_dir.files:
raise IOError("Directory has no member " + filename)
# Logical cluster [2] is the first_data_sector
rootdir_sectors = (self.bootRecord.num_rootdirs //
(self.bytes_per_sector // FatDir.ENTRY_SIZE))
first_data_sector = (self.bootRecord.num_reserved_sectors +
self.bootRecord.sectors_per_fat *
self.bootRecord.num_fats +
rootdir_sectors)
entry = parent_dir.files[filename]
cluster = entry.getCluster()
if dir and not entry.isDirectory():
raise IOError(filename + " is a file, not a directory")
if not dir and entry.isDirectory():
raise IOError(filename + " is a directory, not a file")
if dir and cluster == 0:
return self.rootdir
output = self._loadFromCluster(cluster)
if dir:
return FatDir(output, cluster)
else:
return output[:entry.file_size]
def getFile(self, filename):
return self._getFile(filename)
def putFile(self, filename, buffer):
dir = self.rootdir
parts = filename.rpartition('/')
parent = parts[0]
filename = parts[2]
dir = self._getFile(parent, dir, True)
cluster = 0
if filename in dir.files:
cluster = dir.files[filename].getCluster()
size = len(buffer)
num_clusters = self._getNumClusters(size)
cluster = self.fat.allocateClusters(num_clusters, cluster)
self._writeToCluster(cluster, buffer)
if filename in dir.files:
# This file has a directory entry already, so only update
# the parent directory if our file size has changed
if dir.files[filename].file_size != size:
dir.files[filename].file_size = size
self._writeDir(parent, dir.pack())
else:
# New entry, so the parent directory needs to be updated
dir.addEntry(filename, cluster, size)
self._writeDir(parent, dir.pack())
def _getNumClusters(self, size):
clen = self.bootRecord.sectors_per_cluster * self.bytes_per_sector
num_clusters = 0
if size:
num_clusters = size // clen + 1
return num_clusters
def _writeDir(self, directory, buffer):
if directory == '':
# The root directory should only be written if it is stored in
# its own cluster (i.e. fat32)
cluster = self.bootRecord.getRootdirCluster()
if cluster:
num_clusters = self.getNumClusters(len(buffer))
cluster = self.fat.allocateClusters(num_clusters, cluster)
self._writeToCluster(cluster, buffer)
else:
# Not the root directory, so just write it like a normal file
self.putFile(directory, buffer)
def listFiles(self, dirname=None):
if dirname == None:
dir = self.rootdir
else:
dir = self._getFile(dirname, dir = True)
return dir.files.keys()
def _getSectorSize(self, buffer=None):
if buffer == None:
buffer = self.buffer
buffer.seek(11)
(size,) = struct.unpack("<H", buffer.read(2))
return size
def _readSectors(self, sector, count=1, num_bytes=None, buffer=None):
if buffer == None:
buffer = self.buffer
buffer.seek(self.bytes_per_sector * sector)
if num_bytes == None:
num_bytes = self.bytes_per_sector * count
data = buffer.read(num_bytes)
if len(data) < num_bytes:
raise EOFError
return data
def _writeSectors(self, data, sector, count=1, num_bytes=None, buffer=None):
if buffer == None:
buffer = self.buffer
buffer.seek(self.bytes_per_sector * sector)
if num_bytes == None:
num_bytes = self.bytes_per_sector * count
bytes_written = buffer.write(data[:num_bytes])
if __name__ == '__main__':
def usage():
print("Usage: [-o outputfile] [-b bootsector] [-d directory]")
bootsector = "boot.bin"
directory = "floppy"
outputfile = "floppy.dsk"
import sys, getopt
try:
opts, args = getopt.getopt(sys.argv[1:],
"ho:b:d:",
["help",
"outputbuffer=",
"bootsector=",
"directory="])
except getopt.GetoptError as err:
print(err)
usage()
sys.exit(2)
for o, a in opts:
print(o)
if o in ("-h", "--help"):
usage()
sys.exit()
elif o in ("-o", "--outputfile"):
outputfile = a
elif o in ("-b", "--bootsector"):
bootsector = a
elif o in ("-o", "--directory"):
directory = a
from io import BytesIO
with open("fat.img", "rb") as f:
J = FatImage(f)
print(J.listFiles('floppy'))
test = J.getFile('floppy/onyx.bin')
print(len(test), test[:100])
J.readClusters()
# import pdb; pdb.set_trace()
J.putFile('floppy/another.bin', test)
print(J.listFiles())
print(J.listFiles('floppy'))
print(J.listFiles('floppy/floppy'))
other = J.getFile('floppy/another.bin')
print('comparison', test == other)
image = bytearray(512 *2880)
result = BytesIO(image)
J.write(result)
result = result.getvalue()
print(len(result), result[:100])
with open("image.img", "wb") as w: w.write(result)
with open("one.bin", "wb") as w: w.write(test)
with open("two.bin", "wb") as w: w.write(other)
# print(J.rootdir.files)
# print(J.rootdir.files['test'].getCluster())
# print(J.listFiles('test'))
# print(J.getAllFiles()['test'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment