Skip to content

Instantly share code, notes, and snippets.

@shoghicp
Last active February 7, 2023 04:51
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save shoghicp/6ce05806ffc805929667ec2d4c62aba2 to your computer and use it in GitHub Desktop.
Save shoghicp/6ce05806ffc805929667ec2d4c62aba2 to your computer and use it in GitHub Desktop.
Dockerfile to extract ZST WARC compressed from Parler dump
FROM python:3.7-buster
# 1. Download repo to folder (Download ZIP, or git clone)
# 2. Build via: $ docker build . -t warc_zstd
# 3. Point to folder of WARCs: $ docker run --volume ~/warcs/:/data warc_zstd
# 4. This will extract any unextracted .warc.zst
# From https://hastebin.com/raw/werapevufe
RUN DEBIAN_FRONTEND=noninteractive apt update && \
DEBIAN_FRONTEND=noninteractive apt install -y zstd
RUN python -m pip install --upgrade pip && \
python -m pip install --no-cache-dir zstandard==0.10.2
# add xtract.py from source
COPY xtract.py /usr/bin/
RUN chmod +x /usr/bin/xtract.py
# add entrypoint.sh, loops through all /data/*.warc.zst
COPY entrypoint.sh /
RUN chmod +x /entrypoint.sh
WORKDIR /data
ENTRYPOINT ["/entrypoint.sh"]
#!/bin/bash
ls -lah
for f in *.warc.zst; do
WARC_BNAME="${f%.*}"
# If .warc doesn't exist already
if [ ! -f "${WARC_BNAME}" ]; then
python3 /usr/bin/xtract.py "${f}" > "${WARC_BNAME}.dict"
zstd -d "${f}" -D "${WARC_BNAME}.dict"
fi
done
#!/usr/bin/python3
# Copied-and-pasted parts of zstdstream.py from https://github.com/internetarchive/CDX-Writer (AGPL)
import sys
import zstandard as zstd
from _zstd_cffi import ffi, lib
f = open(sys.argv[1], "rb")
class FrameParameters(object):
def __init__(self, fparams):
self.content_size = fparams.frameContentSize
self.window_size = fparams.windowSize
self.dict_id = fparams.dictID
self.has_checksum = bool(fparams.checksumFlag)
class FrameParametersEx(FrameParameters):
def __init__(self, fparams):
FrameParameters.__init__(self, fparams)
self.frame_type = fparams.frameType
# headerSize is always 0 for skippable frames. no use to copy.
#self.header_size = fparams.headerSize
def _get_frame_parameters(data):
params = ffi.new('ZSTD_frameHeader *')
data_buffer = ffi.from_buffer(data)
zresult = lib.ZSTD_getFrameHeader(params, data_buffer, len(data_buffer))
if lib.ZSTD_isError(zresult):
raise zstd.ZstdError('cannot get frame parameters: %s' %
_zstd_error(zresult))
if zresult:
raise zstd.ZstdError('not enough data for frame parameters; need %d bytes' %
zresult)
return FrameParametersEx(params[0])
def get_zstd_dictionary(fobj):
# method 1: the first skippable frame
# frame header is 2 to 14 bytes.
if hasattr(fobj, 'peek'):
data = fobj.peek(4 + 14)
else:
data = fobj.read(4 + 14)
fobj.seek(-len(data), 1)
try:
frame_params = _get_frame_parameters(data)
# dictionary frame must meet following conditions:
# * it is a skippable frame (frame_type == 1)
# * it has frame_content_size > 0
# * it does not have dict
# dictionary frame must not have dictionary
if frame_params.frame_type == 1 and frame_params.dict_id == 0:
content_size = frame_params.content_size
if content_size != lib.ZSTD_CONTENTSIZE_UNKNOWN:
# getFrameHeader() does not set headerSize. Assume fixed length 8
fobj.seek(8, 1)
zdict = fobj.read(content_size)
assert len(zdict) == content_size
if frame_params.has_checksum:
fobj.seek(4, 1)
magic = zdict[:4]
if zdict[:4] == b'\x28\xb5\x2f\xfd':
# zstd compressed
dctx = zstd.ZstdDecompressor()
zdict = dctx.decompress(zdict)
magic = zdict[:4]
if magic == b'\x37\xa4\x30\xec':
return zdict
return b''
except zstd.ZstdError:
return b''
zdict = get_zstd_dictionary(f)
sys.stdout.buffer.write(zdict)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment