Created
May 17, 2011 19:15
-
-
Save vishvananda/977159 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3.1 | |
import struct | |
class BootRecord: | |
def __init__(self): | |
self.rootdir_cluster = 0 | |
def unpack(self, data): | |
offset = 0 | |
(self.boot_instructions, | |
self.int13_flag, | |
self.oem_id, | |
self.bytes_per_sector, | |
self.sectors_per_cluster, | |
self.num_reserved_sectors, | |
self.num_fats, | |
self.num_rootdirs, | |
self.num_sectors, | |
self.media_descriptor, | |
self.sectors_per_fat, | |
self.sectors_per_head, | |
self.heads_per_cylinder, | |
self.hidden_sectors, | |
self.total_sectors | |
) = struct.unpack_from("<2sB8sHBHBHHBHHHLL", data) | |
print("num sectors", self.num_sectors) | |
offset += struct.calcsize("<2sB8sHBHBHHBHHHLL") | |
if self.is32(): | |
(self.num_fat_sectors, | |
self.active_fat, | |
self.fat_version, | |
self.rootdir_cluster, | |
self.fsinfo_sector, | |
self.boot_sector_backup | |
) = struct.unpack_from("<LHHLHHx12", data, offset) | |
offset += struct.calcsize("<LHHLHHx12") | |
(self.drive_num, | |
self.boot_signature, | |
self.serial_number, | |
self.volume_label, | |
self.file_system_id | |
) = struct.unpack_from("<BxBL11s8s", data, offset) | |
print('offset', offset, 'filesystem', self.file_system_id) | |
offset += struct.calcsize("<BxBL11s8s") | |
self.rest_of_boot_sector = data[offset:-2] | |
(self.bootable_signature, ) = struct.unpack_from("<H", data, -2) | |
def pack(self): | |
data = bytearray(struct.pack("<2sB8sHBHBHHBHHHLL", | |
self.boot_instructions, | |
self.int13_flag, | |
self.oem_id, | |
self.bytes_per_sector, | |
self.sectors_per_cluster, | |
self.num_reserved_sectors, | |
self.num_fats, | |
self.num_rootdirs, | |
self.num_sectors, | |
self.media_descriptor, | |
self.sectors_per_fat, | |
self.sectors_per_head, | |
self.heads_per_cylinder, | |
self.hidden_sectors, | |
self.total_sectors, | |
)) | |
if self.is32(): | |
data.extend(struct.pack("<LHHLHHx12", | |
self.num_fat_sectors, | |
self.active_fat, | |
self.fat_version, | |
self.rootdir_cluster, | |
self.fsinfo_sector, | |
self.boot_sector_backup | |
)) | |
data.extend(struct.pack("<BxBL11s8s", | |
self.drive_num, | |
self.boot_signature, | |
self.serial_number, | |
self.volume_label, | |
self.file_system_id | |
)) | |
data.extend(self.rest_of_boot_sector) | |
data.extend(struct.pack("<H", self.bootable_signature)) | |
return bytes(data) | |
def is32(self): | |
return self.num_sectors == 0 | |
def getNumSectors(self): | |
return self.num_sectors if self.num_sectors else self.total_sectors | |
def getRootdirCluster(self): | |
return self.rootdir_cluster | |
def getSectorsPerFat(self): | |
return self.sectors_per_fat | |
class Fat: | |
def __init__(self): | |
self.offsets = [] | |
def getNextCluster(self, cluster): | |
if self.offsets[cluster] >= self.max: | |
return 0 | |
else: | |
return self.offsets[cluster] | |
def setNextCluster(self, cluster, next_cluster): | |
self.offsets[cluster] = next_cluster | |
def setLastCluster(self, cluster): | |
self.setNextCluster(cluster, self.max) | |
def isAllocated(self, cluster): | |
return self.offsets[cluster] != 0 | |
def deallocateCluster(self, cluster): | |
self.setNextCluster(cluster, 0) | |
def allocateClusters(self, num_clusters, cluster = 0): | |
cur_cluster = cluster | |
prev_cluster = 0 | |
while cur_cluster: | |
num_clusters -= 1 | |
prev_cluster = cur_cluster | |
cur_cluster = self.getNextCluster(cur_cluster) | |
if num_clusters == 0: | |
self.setLastCluster(cur_cluster) | |
if num_clusters < 0: | |
self.deallocatecluster(cur_cluster) | |
if not self.hasEnoughFreeClusters(num_clusters): | |
raise IOError("Fat doesn't have " + num_clusters + " available") | |
for offset in range(num_clusters): | |
cur_cluster = self.getFreeClusterOffset() | |
if prev_cluster: | |
self.fat.setNextCluster(prev_cluster, cluster) | |
else: | |
cluster = cur_cluster | |
prev_cluster = cur_cluster | |
if cur_cluster: | |
self.setLastCluster(cur_cluster) | |
return cluster | |
def getFreeClusterOffset(self): | |
for cluster in range(len(self.offsets)): | |
if not self.isAllocated(cluster): | |
return cluster | |
def hasEnoughFreeClusters(self, num_clusters): | |
free_clusters = 0 | |
for cluster in range(len(self.offsets)): | |
free_clusters += self.isAllocated(cluster) | |
if free_clusters >= num_clusters: | |
return True | |
class Fat12(Fat): | |
def __init__(self): | |
self.max = 0xff9 | |
Fat.__init__(self) | |
def unpack(self, data): | |
nibbles = [] | |
for entry in data: | |
nibbles.append(entry & 0xf) | |
nibbles.append((entry >> 4) & 0xf) | |
self.offsets = [(h << 8) | (m << 4) | l for | |
l, m, h in zip(nibbles[0::3], | |
nibbles[1::3], | |
nibbles[2::3])] | |
def pack(self): | |
nibbles = [] | |
for entry in self.offsets: | |
nibbles.append(entry & 0xf) | |
nibbles.append((entry >> 4) & 0xf) | |
nibbles.append((entry >> 8) & 0xf) | |
return bytes([(h << 4) | l | |
for l, h in zip(nibbles[0::2], | |
nibbles[1::2])]) | |
class Fat16(Fat): | |
def __init__(self): | |
self.max = 0xfff9 | |
Fat.__init__(self) | |
def unpack(self, data): | |
for offset in range(len(data) // 2): | |
(value, ) = struct.unpack_from("<H", data, offset * 2) | |
self.offsets.append(value) | |
def pack(self): | |
buffer = bytearray() | |
for offset in self.offsets: | |
buffer.extend(struct.pack("<H", offset)) | |
return bytes(buffer) | |
class Fat32(Fat): | |
def __init__(self): | |
self.max = 0xfffffff9 | |
Fat.__init__(self) | |
def unpack(self, data): | |
for offset in range(len(data) // 4): | |
(value, ) = struct.unpack_from("<L", data, offset * 4) | |
self.offsets.append(value) | |
def pack(self): | |
buffer = bytearray() | |
for offset in self.offsets: | |
buffer.extend(struct.pack("<L", offset)) | |
return bytes(buffer) | |
class BaseEntry: | |
def create(data, offset = 0): | |
if(data[offset + 11] == 0x0f): | |
return LfnEntry(data, offset) | |
else: | |
return DirEntry(data, offset) | |
def __init__(self, data = None, offset = 0): | |
if data == None: data = b'\x00' * FatDir.ENTRY_SIZE | |
self.unpack(data, offset) | |
def isLast(self): | |
return self.firstbyte == 0x00 | |
def isFree(self): | |
return self.firstbyte == 0xe5 or self.isLast() | |
def isHidden(self): | |
return self.attributes & 0x02 | |
def isDirectory(self): | |
return self.attributes & 0x10 | |
def setDirectory(self): | |
self.attributes |= 0x10 | |
def isLfn(self): | |
return self.attributes == 0x0f | |
def unpack(self, data, offset): | |
(self.firstbyte, | |
self.attributes | |
) = struct.unpack_from("<B10xB20x", data, offset) | |
class DirEntry(BaseEntry): | |
def __init__(self, data = None, offset = 0): | |
self.lfns = [] | |
BaseEntry.__init__(self, data, offset) | |
def setDosname(self, dosname, extension): | |
self.dosname = dosname | |
self.firstbyte = self.dosname[0] | |
self.extension = extension | |
def getDosname(self): | |
return self.dosname, self.extension | |
def setFilename(self, filename): | |
checksum = self.crc() | |
sequence = 1 | |
parts = [filename[i:i+13] for i in range(0, len(filename), 13)] | |
for characters in parts: | |
if len(characters) < 13 and characters[-1] != '\x00': | |
characters += '\x00' | |
lfn = LfnEntry() | |
lfn.setSequence(sequence) | |
lfn.checksum = checksum | |
self.lfns.append(lfn) | |
sequence += 1 | |
lfn.setLast() | |
def getFilename(self): | |
long_file_name = (self.dosname.rstrip() + | |
((b'.' + self.extension.rstrip()) | |
if len(self.extension.rstrip()) else b'') | |
).decode("ascii").lower() | |
checksum = self.crc() | |
sequence = 1 | |
name = '' | |
for lfn in self.lfns: | |
if (not lfn.isLfn() or | |
lfn.isFree() or | |
lfn.checksum != checksum or | |
lfn.getSequence() != sequence ): | |
break | |
name += lfn.getLongCharacters() | |
sequence += 1 | |
if lfn.isEnd(): | |
long_file_name = name | |
break | |
return long_file_name | |
def crc(self): | |
shortname = self.dosname + self.extension | |
crc = 0 | |
for char in shortname: | |
crc =(((crc<<7) | (crc>>1)) + char) & 0xff | |
return crc | |
def getCluster(self): | |
return (self.first_cluster_hi << 16) | self.first_cluster_lo | |
def setCluster(self, cluster): | |
self.first_cluster_lo = cluster & 0xffff | |
self.first_cluster_hi = cluster >> 16 | |
def unpack(self, data, offset): | |
(self.dosname, | |
self.extension, | |
self.attributes, | |
self.creation_ms, | |
self.creation_time, | |
self.creation_date, | |
self.last_access_date, | |
self.first_cluster_hi, | |
self.last_write_time, | |
self.last_write_date, | |
self.first_cluster_lo, | |
self.file_size | |
) = struct.unpack_from("<8s3sBxBHHHHHHHL", data, offset) | |
self.firstbyte = self.dosname[0] | |
def pack(self): | |
data = b'' | |
for lfn in reversed(self.lfns): | |
data += lfn.pack() | |
return data + struct.pack("<8s3sBxBHHHHHHHL", self.dosname, | |
self.extension, | |
self.attributes, | |
self.creation_ms, | |
self.creation_time, | |
self.creation_date, | |
self.last_access_date, | |
self.first_cluster_hi, | |
self.last_write_time, | |
self.last_write_date, | |
self.first_cluster_lo, | |
self.file_size | |
) | |
class LfnEntry(BaseEntry): | |
def __init__(self, data=None, offset = 0): | |
BaseEntry.__init__(self, data, offset) | |
self.attributes |= 0x0f | |
def getSequence(self): | |
return self.firstbyte & ~0x40 | |
def setSequence(self, sequence): | |
if self.isEnd: | |
sequence |= 0x40 | |
self.firstbyte = sequence | |
def isEnd(self): | |
return self.firstbyte & 0x40 | |
def setEnd(self): | |
self.firstbyte |= 0x40 | |
def getLongCharacters(self): | |
return (self.unicode1 + self.unicode2 + self.unicode3 | |
).decode("utf-16").rstrip("\x00\uffff") | |
def setLongCharacters(self, characters): | |
self.unicode1 = characters[5].encode("utf-16").ljust(5, '\uffff') | |
self.unicode2 = characters[5:11].encode("utf-16").ljust(6, '\uffff') | |
self.unicode3 = characters[11:13].encode("utf-16").ljust(2, '\uffff') | |
def unpack(self, data, offset): | |
(self.firstbyte, | |
self.unicode1, | |
self.attributes, | |
self.checksum, | |
self.unicode2, | |
self.unicode3 | |
) = struct.unpack_from("<B10sBxB12sxx4s", data, offset) | |
def pack(self): | |
return struct.pack("<B10sBxB12sxx4s", self.firstbyte, | |
self.unicode1, | |
self.attributes, | |
self.checksum, | |
self.unicode2, | |
self.unicode3 | |
) | |
class FatDir: | |
ENTRY_SIZE = 32 | |
def __init__(self, data = None, cluster=0, num_dirs=None): | |
if data != None: | |
if num_dirs == None: | |
num_dirs = len(data) // FatDir.ENTRY_SIZE | |
self.cluster = cluster | |
self.unpack(data, num_dirs) | |
self.dangling_lfns = [] | |
def unpack(self, data, num_dirs): | |
self.entries = [] | |
lfns = [] | |
for n in range(num_dirs): | |
entry = BaseEntry.create(data, n * FatDir.ENTRY_SIZE) | |
if entry.isLast(): | |
break | |
if entry.isLfn(): | |
lfns.append(entry) | |
else: | |
lfns.reverse() | |
entry.lfns = lfns | |
lfns = [] | |
self.entries.append(entry) | |
self.dangling_lfns = lfns | |
self.files = {} | |
for entry in self.entries: | |
if not entry.isFree(): | |
self.files[entry.getFilename()] = entry | |
def pack(self): | |
data = b'' | |
for entry in self.entries: | |
data += entry.pack() | |
for entry in self.dangling_lfns: | |
data += entry.pack() | |
return data | |
def dosParts(self, filename): | |
parts = filename.rpartition('.') | |
dosname = parts[0].upper()[:8].ljust(8).encode() | |
extension = parts[2].upper()[:3].ljust(3).encode() | |
num = 1 | |
valid = False | |
while not valid: | |
for entry in self.entries: | |
if (dosname, extension) == entry.getDosname(): | |
extra = b'~' + str(num).encode() | |
dosname = dosname[:-len(extra)] + extra | |
num += 1 | |
break | |
else: | |
valid = True | |
return dosname, extension | |
def addEntry(self, filename, cluster, size, dir=False): | |
entry = DirEntry() | |
entry.setDosname(*self.dosParts(filename)) | |
if dir: | |
entry.setDirectory() | |
entry.setCluster(cluster) | |
entry.file_size = size | |
self.entries.append(entry) | |
self.files[filename] = entry | |
class FatImage: | |
def __init__(self, buffer = None, fat = Fat12, | |
fat_number=0, bytes_per_sector=512): | |
self.buffer = buffer | |
self.fat_number = fat_number | |
self.bytes_per_sector = bytes_per_sector | |
self.bootRecord = BootRecord() | |
self.fat = fat() | |
self.rootdir = FatDir() | |
self.hidden_sectors = {} | |
self.clusters = {} | |
if self.buffer: | |
self.readMeta() | |
def read(self, buffer=None): | |
self.readMeta(buffer=buffer) | |
self.readClusters(buffer=buffer) | |
def readMeta(self, buffer=None): | |
self.bytes_per_sector = self._getSectorSize(buffer=buffer) | |
self.bootRecord.unpack(self._readSectors(0, 1, buffer=buffer)) | |
rootdir_cluster = self.bootRecord.getRootdirCluster() | |
self.first_data_sector = (self.bootRecord.num_reserved_sectors + | |
self.bootRecord.getSectorsPerFat() * | |
self.bootRecord.num_fats) | |
if rootdir_cluster: | |
self.rootdir.cluster = rootdir_cluster | |
data = self._loadFromCluster(rootdir_cluster) | |
self.rootdir = self.rootdir.unpack(data) | |
else: | |
rootdir_offset = self.first_data_sector | |
rootdir_sectors = int(round(0.5 + self.bootRecord.num_rootdirs / | |
(self.bytes_per_sector / FatDir.ENTRY_SIZE))) | |
print(rootdir_sectors) | |
self.rootdir.unpack(self._readSectors(rootdir_offset, | |
rootdir_sectors, | |
buffer=buffer), | |
self.bootRecord.num_rootdirs) | |
self.first_data_sector += rootdir_sectors | |
if self.bootRecord.hidden_sectors: | |
for offset in range(self.bootRecord.hidden_sectors): | |
sector = self.first_data_sector + offset | |
self.hidden_sector[offset] = self._readSectors(sector, | |
1, | |
buffer=buffer) | |
self.first_data_sector += self.bootRecord.hidden_sectors | |
self.num_clusters = ((self.bootRecord.getNumSectors() - | |
self.first_data_sector) | |
// self.bootRecord.sectors_per_cluster) | |
if self.num_clusters < 4085: | |
self.fat = Fat12() | |
elif self.num_clusters < 65525: | |
self.fat = Fat16() | |
else: | |
self.fat = Fat32() | |
self.fat_offset = (self.bootRecord.num_reserved_sectors + | |
self.fat_number * self.bootRecord.getSectorsPerFat()) | |
self.fat.unpack(self._readSectors(self.fat_offset, | |
self.bootRecord.getSectorsPerFat(), | |
buffer=buffer)) | |
def readClusters(self, buffer=None): | |
for offset in range(self.num_clusters): | |
spc = self.bootRecord.sectors_per_cluster | |
self.clusters[offset] = self._readSectors(self.first_data_sector + | |
offset * spc, | |
spc, | |
buffer=buffer) | |
def write(self, buffer=None): | |
self.writeMeta(buffer=buffer) | |
self.writeClusters(buffer=buffer) | |
def writeMeta(self, buffer=None): | |
self._writeSectors(self.bootRecord.pack(), 0, 1, buffer=buffer) | |
sectors_per_fat = self.bootRecord.getSectorsPerFat() | |
for offset in range(self.bootRecord.num_fats): | |
self._writeSectors(self.fat.pack(), | |
self.bootRecord.num_reserved_sectors + | |
offset * sectors_per_fat, | |
sectors_per_fat, | |
buffer=buffer) | |
if not self.bootRecord.getRootdirCluster(): | |
rootdir_offset = (self.bootRecord.num_reserved_sectors + | |
sectors_per_fat * self.bootRecord.num_fats) | |
rootdir_sectors = (self.bootRecord.num_rootdirs // | |
(self.bytes_per_sector // FatDir.ENTRY_SIZE)) | |
self._writeSectors(self.rootdir.pack(), | |
rootdir_offset, | |
rootdir_sectors, | |
buffer=buffer) | |
if self.bootRecord.hidden_sectors: | |
for offset in range(self.bootRecord.hidden_sectors): | |
self._writeSectors(self.hidden_sector[offset], | |
self.first_data_sector + offset, | |
1, buffer=buffer) | |
def writeClusters(self, buffer=None): | |
spc = self.bootRecord.sectors_per_cluster | |
for offset, data in self.clusters.items(): | |
self._writeSectors(data, | |
self.first_data_sector + offset * spc, | |
spc, | |
buffer=buffer) | |
# make sure to write the last cluster so buffer is proper size | |
last_cluster = self.num_clusters - 1 | |
if not last_cluster in self.clusters: | |
empty_cluster = (b'\x00' * self.bytes_per_sector * | |
self.bootRecord.sectors_per_cluster) | |
self._writeSectors(empty_cluster, | |
self.first_data_sector + last_cluster * spc, | |
spc, | |
buffer=buffer) | |
def getAllFiles(self, dir=None): | |
if dir == None: | |
dir = self.rootdir | |
files = {} | |
for filename, entry in dir.files.items(): | |
if entry.isDirectory(): | |
if filename not in ['.', '..']: | |
cluster = entry.getCluster() | |
d = FatDir(self._loadFromCluster(cluster), cluster) | |
files[filename] = self.getAllFiles(d) | |
else: | |
files[filename] = self._loadFromCluster(entry.getCluster()) | |
return files | |
def _getLogicalCluster(self, logical_cluster): | |
real_cluster = logical_cluster - 2 | |
if not real_cluster in self.clusters: | |
# logical cluster 2 is the first actual data cluster | |
spc = self.bootRecord.sectors_per_cluster | |
sectors = self._readSectors(self.first_data_sector + | |
real_cluster * spc, | |
spc) | |
self.clusters[real_cluster] = sectors | |
return self.clusters[real_cluster] | |
def _setLogicalCluster(self, logical_cluster, data): | |
real_cluster = logical_cluster - 2 | |
clen = self.bootRecord.sectors_per_cluster * self.bytes_per_sector | |
data = data + b'\x00' * (clen - len(data)) | |
self.clusters[real_cluster] = data | |
def _loadFromCluster(self, cluster): | |
output = b'' | |
while cluster != 0: | |
output += self._getLogicalCluster(cluster) | |
cluster = self.fat.getNextCluster(cluster) | |
return output | |
def _writeToCluster(self, cluster, data): | |
clen = self.bootRecord.sectors_per_cluster * self.bytes_per_sector | |
offset = 0 | |
while cluster != 0: | |
self._setLogicalCluster(cluster, data[offset * clen: | |
(offset + 1) * clen]) | |
offset += 1 | |
cluster = self.fat.getNextCluster(cluster) | |
def _getFile(self, filename, parent_dir=None, dir=False): | |
if dir and filename == '': | |
return self.rootdir | |
if '/' in filename: | |
parts = filename.partition('/') | |
return self._getFile(parts[2], | |
self._getFile(parts[0], parent_dir, True), | |
dir) | |
if parent_dir == None: | |
parent_dir = self.rootdir | |
if filename not in parent_dir.files: | |
raise IOError("Directory has no member " + filename) | |
# Logical cluster [2] is the first_data_sector | |
rootdir_sectors = (self.bootRecord.num_rootdirs // | |
(self.bytes_per_sector // FatDir.ENTRY_SIZE)) | |
first_data_sector = (self.bootRecord.num_reserved_sectors + | |
self.bootRecord.sectors_per_fat * | |
self.bootRecord.num_fats + | |
rootdir_sectors) | |
entry = parent_dir.files[filename] | |
cluster = entry.getCluster() | |
if dir and not entry.isDirectory(): | |
raise IOError(filename + " is a file, not a directory") | |
if not dir and entry.isDirectory(): | |
raise IOError(filename + " is a directory, not a file") | |
if dir and cluster == 0: | |
return self.rootdir | |
output = self._loadFromCluster(cluster) | |
if dir: | |
return FatDir(output, cluster) | |
else: | |
return output[:entry.file_size] | |
def getFile(self, filename): | |
return self._getFile(filename) | |
def putFile(self, filename, buffer): | |
dir = self.rootdir | |
parts = filename.rpartition('/') | |
parent = parts[0] | |
filename = parts[2] | |
dir = self._getFile(parent, dir, True) | |
cluster = 0 | |
if filename in dir.files: | |
cluster = dir.files[filename].getCluster() | |
size = len(buffer) | |
num_clusters = self._getNumClusters(size) | |
cluster = self.fat.allocateClusters(num_clusters, cluster) | |
self._writeToCluster(cluster, buffer) | |
if filename in dir.files: | |
# This file has a directory entry already, so only update | |
# the parent directory if our file size has changed | |
if dir.files[filename].file_size != size: | |
dir.files[filename].file_size = size | |
self._writeDir(parent, dir.pack()) | |
else: | |
# New entry, so the parent directory needs to be updated | |
dir.addEntry(filename, cluster, size) | |
self._writeDir(parent, dir.pack()) | |
def _getNumClusters(self, size): | |
clen = self.bootRecord.sectors_per_cluster * self.bytes_per_sector | |
num_clusters = 0 | |
if size: | |
num_clusters = size // clen + 1 | |
return num_clusters | |
def _writeDir(self, directory, buffer): | |
if directory == '': | |
# The root directory should only be written if it is stored in | |
# its own cluster (i.e. fat32) | |
cluster = self.bootRecord.getRootdirCluster() | |
if cluster: | |
num_clusters = self.getNumClusters(len(buffer)) | |
cluster = self.fat.allocateClusters(num_clusters, cluster) | |
self._writeToCluster(cluster, buffer) | |
else: | |
# Not the root directory, so just write it like a normal file | |
self.putFile(directory, buffer) | |
def listFiles(self, dirname=None): | |
if dirname == None: | |
dir = self.rootdir | |
else: | |
dir = self._getFile(dirname, dir = True) | |
return dir.files.keys() | |
def _getSectorSize(self, buffer=None): | |
if buffer == None: | |
buffer = self.buffer | |
buffer.seek(11) | |
(size,) = struct.unpack("<H", buffer.read(2)) | |
return size | |
def _readSectors(self, sector, count=1, num_bytes=None, buffer=None): | |
if buffer == None: | |
buffer = self.buffer | |
buffer.seek(self.bytes_per_sector * sector) | |
if num_bytes == None: | |
num_bytes = self.bytes_per_sector * count | |
data = buffer.read(num_bytes) | |
if len(data) < num_bytes: | |
raise EOFError | |
return data | |
def _writeSectors(self, data, sector, count=1, num_bytes=None, buffer=None): | |
if buffer == None: | |
buffer = self.buffer | |
buffer.seek(self.bytes_per_sector * sector) | |
if num_bytes == None: | |
num_bytes = self.bytes_per_sector * count | |
bytes_written = buffer.write(data[:num_bytes]) | |
if __name__ == '__main__': | |
def usage(): | |
print("Usage: [-o outputfile] [-b bootsector] [-d directory]") | |
bootsector = "boot.bin" | |
directory = "floppy" | |
outputfile = "floppy.dsk" | |
import sys, getopt | |
try: | |
opts, args = getopt.getopt(sys.argv[1:], | |
"ho:b:d:", | |
["help", | |
"outputbuffer=", | |
"bootsector=", | |
"directory="]) | |
except getopt.GetoptError as err: | |
print(err) | |
usage() | |
sys.exit(2) | |
for o, a in opts: | |
print(o) | |
if o in ("-h", "--help"): | |
usage() | |
sys.exit() | |
elif o in ("-o", "--outputfile"): | |
outputfile = a | |
elif o in ("-b", "--bootsector"): | |
bootsector = a | |
elif o in ("-o", "--directory"): | |
directory = a | |
from io import BytesIO | |
with open("fat.img", "rb") as f: | |
J = FatImage(f) | |
print(J.listFiles('floppy')) | |
test = J.getFile('floppy/onyx.bin') | |
print(len(test), test[:100]) | |
J.readClusters() | |
# import pdb; pdb.set_trace() | |
J.putFile('floppy/another.bin', test) | |
print(J.listFiles()) | |
print(J.listFiles('floppy')) | |
print(J.listFiles('floppy/floppy')) | |
other = J.getFile('floppy/another.bin') | |
print('comparison', test == other) | |
image = bytearray(512 *2880) | |
result = BytesIO(image) | |
J.write(result) | |
result = result.getvalue() | |
print(len(result), result[:100]) | |
with open("image.img", "wb") as w: w.write(result) | |
with open("one.bin", "wb") as w: w.write(test) | |
with open("two.bin", "wb") as w: w.write(other) | |
# print(J.rootdir.files) | |
# print(J.rootdir.files['test'].getCluster()) | |
# print(J.listFiles('test')) | |
# print(J.getAllFiles()['test']) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment