Skip to content

Instantly share code, notes, and snippets.

@danizen
Created July 5, 2022 21:40
Show Gist options
  • Save danizen/e4ca984fa3fead2ca1da491a0347290d to your computer and use it in GitHub Desktop.
Save danizen/e4ca984fa3fead2ca1da491a0347290d to your computer and use it in GitHub Desktop.
Index a MARC file so you can read only the parts you need
import io
import logging
import os
import mmap
import time
from collections.abc import Sequence
from struct import pack, unpack
LOG = logging.getLogger(__name__)
def parse_offset_id(f):
"""
Bypass pymarc and parse MARC to determine the offset and control_id of reach record
"""
offset = f.tell()
ldr = f.read(24)
if not ldr:
return offset, None
# resid is the remaining length of the record
resid = int(ldr[:5]) - 24
dir_plus_data = f.read(resid)
if not dir_plus_data:
return offset, None
assert len(dir_plus_data) == resid
base_of_data = int(ldr[12:17]) - 24
directory = dir_plus_data[:base_of_data]
data = dir_plus_data[base_of_data:]
# first field is 001 controlfield
assert directory[:3] == b'001'
# length of field without record terminator
field_len = int(directory[3:7]) - 1
control_id = data[:field_len]
return offset, control_id
def yield_offset_id(path, buffering=io.DEFAULT_BUFFER_SIZE):
with open(path, 'rb', buffering=buffering) as f:
while True:
offset, control_id = parse_offset_id(f)
if control_id is None:
break
yield offset, control_id
class MARCIndex(Sequence):
def __init__(self, path, index_path=None, buffering=io.DEFAULT_BUFFER_SIZE):
self.path = path
if index_path is None:
if path.endswith('.mrc'):
index_path = path[:-4] + '.idx'
else:
index_path = path + '.idx'
self.index_path = index_path
self.buffering = buffering
try:
self.open()
except FileNotFoundError:
self.__mbuf = None
self.__size = 0
def __len__(self):
return self.__size
def __del__(self):
# not sure if this is needed
if self.__mbuf:
self.__mbuf.close()
def __getitem__(self, key):
if isinstance(key, slice):
return [self[ii] for ii in range(*key.indices(len(self)))]
else:
off = int(key) * 32
return unpack('24pQ', self.__mbuf[off:off+32])
def __setitem__(self, key, value):
raise TypeError('this object is immutable')
def __delitem__(self, key):
raise TypeError('this object is immutable')
def open(self):
f = open(self.index_path, 'rb')
index_size = os.fstat(f.fileno()).st_size
self.__mbuf = mmap.mmap(f.fileno(), index_size, access=mmap.ACCESS_READ)
self.__size = index_size // 32
f.close()
def close(self):
if self.__mbuf is not None:
self.__mbuf.close()
self.__mbuf = None
def build(self):
"""
Builds the index path based on the MARC Path.
Re-opens the index when completed
:return: None
"""
stime = time.perf_counter()
records = [
(control_id, offset)
for offset, control_id in yield_offset_id(self.path, self.buffering)
]
duration = time.perf_counter() - stime
LOG.info('Read %d records in %.1f seconds', len(records), duration)
stime = time.perf_counter()
records = sorted(records, key=lambda p: p[0])
duration = time.perf_counter() - stime
LOG.info('Sorted %d records in %.1f seconds', len(records), duration)
# the format of each record will be an 20 character ascii control_id + an 8 byte integer
stime = time.perf_counter()
with open(self.index_path, 'wb') as f:
for control_id, offset in records:
buf = pack('24pQ', control_id, offset)
f.write(buf)
duration = time.perf_counter() - stime
LOG.info('Wrote %d records in %.1f seconds', len(records), duration)
self.open()
def lookup(self, control_id):
"""
Binary search for the position of the control_id over the index
:param control_id: byte or str representation of the control_id
:return: position in the index
"""
# convert control_id into a padded bytes of fixed length
if isinstance(control_id, str):
control_id = control_id.encode('ascii')
low = 0
high = len(self) - 1
while low <= high:
mid = (low + high) // 2
middle_id, _ = self[mid]
if middle_id < control_id:
low = mid + 1
elif middle_id > control_id:
high = mid - 1
elif middle_id != control_id:
return None
else:
return mid
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment