Skip to content

Instantly share code, notes, and snippets.

@enijkamp
Created July 13, 2021 07:25
Show Gist options
  • Save enijkamp/fe62ec220c3cb8e6a5df098c9e470036 to your computer and use it in GitHub Desktop.
Save enijkamp/fe62ec220c3cb8e6a5df098c9e470036 to your computer and use it in GitHub Desktop.
create_tf_records.py
import sys
import os
import gzip
import json
import io
import argparse
import concurrent.futures
from tokenizers import Tokenizer
import tensorflow as tf
from google.cloud import storage
def create_args(args=argparse.Namespace()):
args.data_bucket = 'sfr-tpu-prem2-bucket'
args.data_bucket_path = 'enijkamp/bigquery_json'
args.data_langs = ['c', 'cpp', 'cs', 'css', 'go', 'html', 'java', 'js', 'obj', 'perl', 'php', 'py', 'ruby']
args.tokenizer_file = '/export/home/gptc/byte-level-bpe.tokenizer.json'
args.run_num_processes = 96
# dataset size
# 50,000 samples -> 500MB
# 2 TB / 500 MB * 50,000 samples = (2 * 1000000) MB / 500 MB * 50,000 samples = 200,000,000 total samples
# 200,000,000 total samples / 13 languages = 15,000,000 samples per lang
# 15,000,000 / 50,000 = 300 files per language
args.out_gs = False
args.out_bucket_path = '/export/home/gptc/bigcode_bpe'
args.out_records_per_lang = 15_000_000
args.out_min_len = 128
args.out_max_len = 16_384
return args
def process_files(args):
def to_gs_out_file(f):
return f'gs://{args.data_bucket}/{args.out_bucket_path}/{f[len(args.data_bucket_path)+1:]}'
def to_local_out_file(f):
path = f'{args.out_bucket_path}/{f[len(args.data_bucket_path)+1:]}'
os.makedirs(os.path.dirname(path), exist_ok=True)
return path
def process(lang, files_chunks, do_map):
to_out_file = to_gs_out_file if args.out_gs else to_local_out_file
total_num_records_for_lang = 0
for files_chunk in files_chunks:
if total_num_records_for_lang > args.out_records_per_lang:
break
for f, (num_records, mean_len_record) in zip(files_chunk, do_map(process_file, [(args, f, to_out_file(f)) for f in files_chunk])):
print(f, num_records, mean_len_record)
print(f'lang={lang} file={f} mean_len_record={int(mean_len_record)} num_records={num_records} total_num_records_for_lang={total_num_records_for_lang}', flush=True)
total_num_records_for_lang += num_records
print(f'finished {lang} with {total_num_records_for_lang} total records', flush=True)
def process_chunks(lang, files_chunks):
if args.run_num_processes == 1:
def do_map(f, args_list):
for args in args_list:
yield f(*args)
process(lang, files_chunks, do_map)
else:
with concurrent.futures.ProcessPoolExecutor(args.run_num_processes) as executor:
def map_with_zip_args(f, args):
for result in executor.map(f, *zip(*args)):
yield result
process(lang, files_chunks, map_with_zip_args)
def yield_chunks(lang):
storage_client = storage.Client()
files = []
for blob in storage_client.list_blobs(args.data_bucket, prefix=f'{args.data_bucket_path}/{lang}/'):
if '.json' in blob.name:
files.append(blob.name)
for files_chunk in partition(l=files, n=args.run_num_processes):
yield files_chunk
for lang in args.data_langs:
print(f'processing {lang}', flush=True)
process_chunks(lang, yield_chunks(lang))
def process_file(args, in_file, out_file):
if args.out_gs:
storage_client = storage.Client()
target_blob = storage.Blob(bucket=storage_client.bucket(args.out_bucket_path), name=out_file)
if target_blob.exists(storage_client):
print(f'skipping {out_file}', flush=True)
return 0, 0.
def mv(p1, p2):
raise NotImplementedError()
def rm(f):
raise NotImplementedError()
else:
if os.path.exists(out_file):
print(f'skipping {out_file}', flush=True)
return 0, 0.
def mv(f1, f2):
os.rename(f1, f2)
def rm(f):
os.remove(f) if os.path.exists(f) else None
out_file_tmp = f'{out_file}.tmp'
rm(out_file_tmp)
print(f'processing {in_file} -> {out_file_tmp}', flush=True)
tokenizer = create_tokenizer(args.tokenizer_file)
data_iter = create_gs_data_iter(args.data_bucket, in_file)
n, mean_len = 0, 0.
with tf.io.TFRecordWriter(out_file_tmp) as writer:
for sample in data_iter:
record = tokenizer.encode(sample).ids
if len(record) >= args.out_min_len and len(record) <= args.out_max_len:
write_to_file(writer, record)
n += 1
mean_len += (len(record) - mean_len) / n
if n % 10_000 == 0:
print(n, int(mean_len), in_file)
print(f'finalizing {out_file_tmp} -> {out_file}', flush=True)
mv(out_file_tmp, out_file)
return n, mean_len
def create_tokenizer(file):
return Tokenizer.from_file(file)
def create_gs_data_iter(data_bucket, data_file):
storage_client = storage.Client()
bucket = storage_client.get_bucket(data_bucket)
blob = bucket.blob(data_file)
data = io.BytesIO(blob.download_as_string())
with gzip.open(data) as f:
for line in f:
j = json.loads(line)
if 'content' in j:
yield j['content']
def _int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=value))
def write_to_file(writer, data):
feature = { 'text': _int64_feature(data) }
tf_example = tf.train.Example(features=tf.train.Features(feature=feature))
writer.write(tf_example.SerializeToString())
def partition(l, n):
return [l[i:i + n] for i in range(0, len(l), n)]
def main():
args = create_args()
process_files(args)
print('done.', flush=True)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment