Skip to content

Instantly share code, notes, and snippets.

Created July 13, 2021 07:25
Show Gist options
  • Save enijkamp/fe62ec220c3cb8e6a5df098c9e470036 to your computer and use it in GitHub Desktop.
Save enijkamp/fe62ec220c3cb8e6a5df098c9e470036 to your computer and use it in GitHub Desktop.
import sys
import os
import gzip
import json
import io
import argparse
import concurrent.futures
from tokenizers import Tokenizer
import tensorflow as tf
from import storage
def create_args(args=argparse.Namespace()):
args.data_bucket = 'sfr-tpu-prem2-bucket'
args.data_bucket_path = 'enijkamp/bigquery_json'
args.data_langs = ['c', 'cpp', 'cs', 'css', 'go', 'html', 'java', 'js', 'obj', 'perl', 'php', 'py', 'ruby']
args.tokenizer_file = '/export/home/gptc/byte-level-bpe.tokenizer.json'
args.run_num_processes = 96
# dataset size
# 50,000 samples -> 500MB
# 2 TB / 500 MB * 50,000 samples = (2 * 1000000) MB / 500 MB * 50,000 samples = 200,000,000 total samples
# 200,000,000 total samples / 13 languages = 15,000,000 samples per lang
# 15,000,000 / 50,000 = 300 files per language
args.out_gs = False
args.out_bucket_path = '/export/home/gptc/bigcode_bpe'
args.out_records_per_lang = 15_000_000
args.out_min_len = 128
args.out_max_len = 16_384
return args
def process_files(args):
def to_gs_out_file(f):
return f'gs://{args.data_bucket}/{args.out_bucket_path}/{f[len(args.data_bucket_path)+1:]}'
def to_local_out_file(f):
path = f'{args.out_bucket_path}/{f[len(args.data_bucket_path)+1:]}'
os.makedirs(os.path.dirname(path), exist_ok=True)
return path
def process(lang, files_chunks, do_map):
to_out_file = to_gs_out_file if args.out_gs else to_local_out_file
total_num_records_for_lang = 0
for files_chunk in files_chunks:
if total_num_records_for_lang > args.out_records_per_lang:
for f, (num_records, mean_len_record) in zip(files_chunk, do_map(process_file, [(args, f, to_out_file(f)) for f in files_chunk])):
print(f, num_records, mean_len_record)
print(f'lang={lang} file={f} mean_len_record={int(mean_len_record)} num_records={num_records} total_num_records_for_lang={total_num_records_for_lang}', flush=True)
total_num_records_for_lang += num_records
print(f'finished {lang} with {total_num_records_for_lang} total records', flush=True)
def process_chunks(lang, files_chunks):
if args.run_num_processes == 1:
def do_map(f, args_list):
for args in args_list:
yield f(*args)
process(lang, files_chunks, do_map)
with concurrent.futures.ProcessPoolExecutor(args.run_num_processes) as executor:
def map_with_zip_args(f, args):
for result in, *zip(*args)):
yield result
process(lang, files_chunks, map_with_zip_args)
def yield_chunks(lang):
storage_client = storage.Client()
files = []
for blob in storage_client.list_blobs(args.data_bucket, prefix=f'{args.data_bucket_path}/{lang}/'):
if '.json' in
for files_chunk in partition(l=files, n=args.run_num_processes):
yield files_chunk
for lang in args.data_langs:
print(f'processing {lang}', flush=True)
process_chunks(lang, yield_chunks(lang))
def process_file(args, in_file, out_file):
if args.out_gs:
storage_client = storage.Client()
target_blob = storage.Blob(bucket=storage_client.bucket(args.out_bucket_path), name=out_file)
if target_blob.exists(storage_client):
print(f'skipping {out_file}', flush=True)
return 0, 0.
def mv(p1, p2):
raise NotImplementedError()
def rm(f):
raise NotImplementedError()
if os.path.exists(out_file):
print(f'skipping {out_file}', flush=True)
return 0, 0.
def mv(f1, f2):
os.rename(f1, f2)
def rm(f):
os.remove(f) if os.path.exists(f) else None
out_file_tmp = f'{out_file}.tmp'
print(f'processing {in_file} -> {out_file_tmp}', flush=True)
tokenizer = create_tokenizer(args.tokenizer_file)
data_iter = create_gs_data_iter(args.data_bucket, in_file)
n, mean_len = 0, 0.
with as writer:
for sample in data_iter:
record = tokenizer.encode(sample).ids
if len(record) >= args.out_min_len and len(record) <= args.out_max_len:
write_to_file(writer, record)
n += 1
mean_len += (len(record) - mean_len) / n
if n % 10_000 == 0:
print(n, int(mean_len), in_file)
print(f'finalizing {out_file_tmp} -> {out_file}', flush=True)
mv(out_file_tmp, out_file)
return n, mean_len
def create_tokenizer(file):
return Tokenizer.from_file(file)
def create_gs_data_iter(data_bucket, data_file):
storage_client = storage.Client()
bucket = storage_client.get_bucket(data_bucket)
blob = bucket.blob(data_file)
data = io.BytesIO(blob.download_as_string())
with as f:
for line in f:
j = json.loads(line)
if 'content' in j:
yield j['content']
def _int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=value))
def write_to_file(writer, data):
feature = { 'text': _int64_feature(data) }
tf_example = tf.train.Example(features=tf.train.Features(feature=feature))
def partition(l, n):
return [l[i:i + n] for i in range(0, len(l), n)]
def main():
args = create_args()
print('done.', flush=True)
if __name__ == '__main__':
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment