Created
June 4, 2021 01:57
-
-
Save dineshdharme/c14355042bc5ba72ad24ef8d0a26f34c to your computer and use it in GitHub Desktop.
Parallel generate million rows of large number of numeric columns in python for classification and upload to S3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from pathlib import Path | |
import boto3 | |
from boto3.exceptions import S3UploadFailedError | |
from joblib import Parallel, delayed | |
import sys | |
# boto3 clients not thread-safe, but also not serializable, so making it global | |
s3_client = boto3.client('s3') | |
def generate_hypercube(samples, dimensions, rng): | |
"""Returns distinct binary samples of length dimensions | |
""" | |
if dimensions > 30: | |
return np.hstack([rng.randint(2, size=(samples, dimensions - 30)), | |
generate_hypercube(samples, 30, rng)]) | |
out = rng.choice(2 ** dimensions, samples).astype(dtype='>u4', | |
copy=False) | |
out = np.unpackbits(out.view('>u1')).reshape((-1, 32))[:, -dimensions:] | |
return out | |
def randomly_generate_dataset_values( | |
generator, n_features, n_informative, n_redundant, n_repeated, | |
n_classes, n_clusters_per_class, weights, class_sep, | |
hypercube, shift, scale): | |
n_clusters = n_classes * n_clusters_per_class | |
if weights and len(weights) == (n_classes - 1): | |
weights = weights + [1.0 - sum(weights)] | |
if weights is None: | |
weights = [1.0 / n_classes] * n_classes | |
weights[-1] = 1.0 - sum(weights[:-1]) | |
# Build the polytope whose vertices become cluster centroids | |
centroids = generate_hypercube(n_clusters, n_informative, | |
generator).astype(float, copy=False) | |
centroids *= 2 * class_sep | |
centroids -= class_sep | |
if not hypercube: | |
centroids *= generator.rand(n_clusters, 1) | |
centroids *= generator.rand(1, n_informative) | |
# Generate random covariances | |
cov_list = [2 * generator.rand(n_informative, n_informative) - 1 | |
for _ in range(len(centroids))] | |
# Generate random redundancy | |
rand_redundant = 2 * generator.rand(n_informative, n_redundant) - 1 | |
# Generate repeat features | |
rand_repeated = generator.rand(n_repeated) | |
# Random shift and scale | |
if shift is None: | |
shift = (2 * generator.rand(n_features) - 1) * class_sep | |
if scale is None: | |
scale = 1 + 100 * generator.rand(n_features) | |
return weights, centroids, cov_list, rand_redundant, rand_repeated, shift, scale | |
def make_batch( | |
generator, n_samples, n_features, n_informative, n_redundant, | |
n_repeated, n_classes, n_clusters_per_class, weights, flip_y, | |
centroids, cov_list, rand_redundant, rand_repeated, shift, scale): | |
n_clusters = n_classes * n_clusters_per_class | |
# Distribute samples among clusters by weight | |
n_samples_per_cluster = [ | |
int(n_samples * weights[k % n_classes] / n_clusters_per_class) | |
for k in range(n_clusters)] | |
for i in range(n_samples - sum(n_samples_per_cluster)): | |
n_samples_per_cluster[i % n_clusters] += 1 | |
n_useless = n_features - n_informative - n_redundant - n_repeated | |
# Initialize X and y | |
X = np.zeros((n_samples, n_features)) | |
y = np.zeros(n_samples, dtype=np.int) | |
# Initially draw informative features from the standard normal | |
X[:, :n_informative] = generator.randn(n_samples, n_informative) | |
# Create each cluster; a variant of make_blobs | |
stop = 0 | |
for k, centroid in enumerate(centroids): | |
start, stop = stop, stop + n_samples_per_cluster[k] | |
y[start:stop] = k % n_classes # assign labels | |
X_k = X[start:stop, :n_informative] # slice a view of the cluster | |
X_k[...] = np.dot(X_k, cov_list[k]) # introduce random covariance | |
X_k += centroid # shift the cluster to a vertex | |
# Create redundant features | |
if n_redundant > 0: | |
X[:, n_informative:n_informative + n_redundant] = \ | |
np.dot(X[:, :n_informative], rand_redundant) | |
# Repeat some features | |
if n_repeated > 0: | |
n = n_informative + n_redundant | |
indices = ((n - 1) * rand_repeated + 0.5).astype(np.intp) | |
X[:, n:n + n_repeated] = X[:, indices] | |
# Fill useless features | |
if n_useless > 0: | |
X[:, -n_useless:] = generator.randn(n_samples, n_useless) | |
# Randomly replace labels | |
if flip_y >= 0.0: | |
flip_mask = generator.rand(n_samples) < flip_y | |
y[flip_mask] = generator.randint(n_classes, size=flip_mask.sum()) | |
# Randomly shift and scale | |
X += shift | |
X *= scale | |
return X, y | |
def save_classification_dataset( | |
output_directory, s3_bucket, s3_prefix, num_batches, batch_samples, num_dirs, | |
n_features, n_classes=2, n_clusters_per_class=2, weights=None, | |
flip_y=0.01, class_sep=1.0, hypercube=True, shift=0.0, scale=1.0, | |
generator=None, num_threads=1): | |
"""Generate a random n-class classification problem. | |
This initially creates clusters of points normally distributed (std=1) | |
about vertices of an ``n_informative``-dimensional hypercube with sides of | |
length ``2*class_sep`` and assigns an equal number of clusters to each | |
class. It introduces interdependence between these features and adds | |
various types of further noise to the data. | |
Without shuffling, ``X`` horizontally stacks features in the following | |
order: the primary ``n_informative`` features, followed by ``n_redundant`` | |
linear combinations of the informative features, followed by ``n_repeated`` | |
duplicates, drawn randomly with replacement from the informative and | |
redundant features. The remaining features are filled with random noise. | |
Thus, without shuffling, all useful features are contained in the columns | |
``X[:, :n_informative + n_redundant + n_repeated]``. | |
Read more in the :ref:`User Guide <sample_generators>`. | |
Parameters | |
---------- | |
output_directory : string or Path | |
The local directory to save to | |
s3_bucket : string or None | |
If not None, the S3 bucket to upload results to. output_directory is | |
only used for temporary storage in this case. Each batch deleted before | |
the next is created. | |
num_batches : int | |
The number of batches. | |
batch_samples : int | |
The number of samples per batch. | |
n_features : int | |
The total number of features. These comprise ``n_informative`` | |
informative features, ``n_redundant`` redundant features, | |
``n_repeated`` duplicated features and | |
``n_features-n_informative-n_redundant-n_repeated`` useless features | |
drawn at random. | |
n_classes : int, optional (default=2) | |
The number of classes (or labels) of the classification problem. | |
n_clusters_per_class : int, optional (default=2) | |
The number of clusters per class. | |
weights : list of floats or None (default=None) | |
The proportions of samples assigned to each class. If None, then | |
classes are balanced. Note that if ``len(weights) == n_classes - 1``, | |
then the last class weight is automatically inferred. | |
More than ``n_samples`` samples may be returned if the sum of | |
``weights`` exceeds 1. | |
flip_y : float, optional (default=0.01) | |
The fraction of samples whose class are randomly exchanged. Larger | |
values introduce noise in the labels and make the classification | |
task harder. | |
class_sep : float, optional (default=1.0) | |
The factor multiplying the hypercube size. Larger values spread | |
out the clusters/classes and make the classification task easier. | |
hypercube : boolean, optional (default=True) | |
If True, the clusters are put on the vertices of a hypercube. If | |
False, the clusters are put on the vertices of a random polytope. | |
shift : float, array of shape [n_features] or None, optional (default=0.0) | |
Shift features by the specified value. If None, then features | |
are shifted by a random value drawn in [-class_sep, class_sep]. | |
scale : float, array of shape [n_features] or None, optional (default=1.0) | |
Multiply features by the specified value. If None, then features | |
are scaled by a random value drawn in [1, 100]. Note that scaling | |
happens after shifting. | |
generator : RandomState instance or None (default) | |
Determines random number generation for dataset creation. | |
Returns | |
------- | |
X : array of shape [n_samples, n_features] | |
The generated samples. | |
y : array of shape [n_samples] | |
The integer labels for class membership of each sample. | |
Notes | |
----- | |
The algorithm is adapted from Guyon [1] and was designed to generate | |
the "Madelon" dataset. | |
References | |
---------- | |
.. [1] I. Guyon, "Design of experiments for the NIPS 2003 variable | |
selection benchmark", 2003. | |
""" | |
assert (s3_bucket is None) == (s3_prefix is None), \ | |
'Gave only one of S3 bucket and prefix' | |
batch_folder = '{problem}_{rows}_x_{cols}_fragmented'.format( | |
problem='binary' if n_classes == 2 else 'multiclass', | |
rows=batch_samples * num_batches, cols=n_features) | |
Path(output_directory, batch_folder).mkdir(parents=True, exist_ok=True) | |
num_dirs_list = ["dir_"+str(index) for index in sorted(list(range(num_dirs)) * (num_batches // num_dirs))] | |
print(num_dirs_list) | |
if generator is None: | |
generator = np.random.RandomState() | |
n_informative = int(round(n_features * 0.4)) | |
n_redundant = int(round(n_features * 0.2)) | |
n_repeated = int(round(n_features * 0.1)) | |
n_useless = n_features - n_informative - n_redundant - n_repeated | |
columns = ['target'] + \ | |
[f'informative_{i}' for i in range(1, n_informative + 1)] + \ | |
[f'redundant_{i}' for i in range(1, n_redundant + 1)] + \ | |
[f'repeated_{i}' for i in range(1, n_repeated + 1)] + \ | |
[f'useless_{i}' for i in range(1, n_useless + 1)] | |
assert len(columns) == n_features + 1, f'Incorrect number of column names' | |
header = ','.join(columns) | |
# Count features, clusters and samples | |
if n_informative + n_redundant + n_repeated > n_features: | |
raise ValueError("Number of informative, redundant and repeated " | |
"features must sum to less than the number of total" | |
" features") | |
# Use log2 to avoid overflow errors | |
if n_informative < np.log2(n_classes * n_clusters_per_class): | |
raise ValueError("n_classes * n_clusters_per_class must" | |
" be smaller or equal 2 ** n_informative") | |
if weights and len(weights) not in [n_classes, n_classes - 1]: | |
raise ValueError("Weights specified but incompatible with number " | |
"of classes.") | |
data_info = randomly_generate_dataset_values( | |
generator, n_features, n_informative, n_redundant, n_repeated, | |
n_classes, n_clusters_per_class, weights, class_sep, | |
hypercube, shift, scale) | |
weights, centroids, cov_list, rand_redundant, rand_repeated, shift, scale = data_info | |
def makes_save_batch(batch_num_seed_tuple): | |
batch_num, (dir_name, batch_seed) = batch_num_seed_tuple | |
print("tuple", batch_num_seed_tuple) | |
batch_gen = np.random.RandomState(batch_seed) | |
X, y = make_batch( | |
batch_gen, batch_samples, n_features, n_informative, n_redundant, | |
n_repeated, n_classes, n_clusters_per_class, weights, flip_y, | |
centroids, cov_list, rand_redundant, rand_repeated, shift, scale) | |
data = np.concatenate([np.expand_dims(y, axis=1), X], axis=1) | |
base_path = f'batch_{batch_num}.csv' | |
dir_path = Path(output_directory, batch_folder, dir_name) | |
dir_path.mkdir(parents=True, exist_ok=True) | |
fn = Path(dir_path, base_path) | |
np.savetxt(fn, data, delimiter=',', header=header, comments='') | |
if s3_bucket: | |
key = f'{s3_prefix}/{batch_folder}/{dir_name}/{base_path}' | |
try: | |
s3_client.upload_file(str(fn), s3_bucket, key) | |
except S3UploadFailedError as e: | |
print(e) | |
fn.unlink() | |
seeds = generator.choice(np.iinfo(np.int32).max, num_batches, replace=True) | |
dir_with_seeds_list = list(zip(num_dirs_list, seeds)) | |
par = Parallel(n_jobs=num_threads, prefer='threads') | |
par(delayed(makes_save_batch)(tup) for tup in enumerate(dir_with_seeds_list)) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from pathlib import Path | |
from batch_generator import save_classification_dataset | |
import time | |
import os | |
# Total number of rows = NUM_BATCHES * BATCH_SAMPLES | |
NUM_BATCHES = 60 | |
NUM_DIRS = 3 | |
BATCH_SAMPLES = 100 | |
N_FEATURES = 40 | |
N_CLASSES = 4 | |
SEED = 12345 | |
THREADS = None # Used for a c5n.18xlarge | |
OUTPUT_DIRECTORY_BASE = 'synthetic_datasets_run01' | |
UPLOAD_TO_S3 = False | |
S3_BUCKET = 'bucketname' | |
S3_PREFIX = 'data/test_data_run01' | |
def main(): | |
directory_name = Path(OUTPUT_DIRECTORY_BASE.format( | |
problem='binary' if N_CLASSES == 2 else 'multiclass', | |
rows=BATCH_SAMPLES * NUM_BATCHES, cols=N_FEATURES)) | |
directory_name.mkdir(parents=True, exist_ok=True) | |
if UPLOAD_TO_S3: | |
s3_bucket = S3_BUCKET | |
s3_prefix = S3_PREFIX | |
else: | |
s3_bucket = None | |
s3_prefix = None | |
generator = np.random.RandomState(SEED) | |
start = time.time() | |
save_classification_dataset( | |
directory_name, s3_bucket, s3_prefix, NUM_BATCHES, BATCH_SAMPLES, NUM_DIRS, | |
N_FEATURES, N_CLASSES, generator=generator, n_clusters_per_class=4, | |
weights=None, flip_y=0.03, class_sep=0.7, hypercube=True, shift=0.0, | |
scale=1.0, num_threads=os.cpu_count() if THREADS is None else THREADS) | |
dur = time.time() - start | |
print(f'Time Taken in seconds: {dur}') | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment