Skip to content

Instantly share code, notes, and snippets.

@dineshdharme
Created June 4, 2021 01:57
Show Gist options
  • Save dineshdharme/c14355042bc5ba72ad24ef8d0a26f34c to your computer and use it in GitHub Desktop.
Save dineshdharme/c14355042bc5ba72ad24ef8d0a26f34c to your computer and use it in GitHub Desktop.
Parallel generate million rows of large number of numeric columns in python for classification and upload to S3
import numpy as np
from pathlib import Path
import boto3
from boto3.exceptions import S3UploadFailedError
from joblib import Parallel, delayed
import sys
# boto3 clients not thread-safe, but also not serializable, so making it global
s3_client = boto3.client('s3')
def generate_hypercube(samples, dimensions, rng):
"""Returns distinct binary samples of length dimensions
"""
if dimensions > 30:
return np.hstack([rng.randint(2, size=(samples, dimensions - 30)),
generate_hypercube(samples, 30, rng)])
out = rng.choice(2 ** dimensions, samples).astype(dtype='>u4',
copy=False)
out = np.unpackbits(out.view('>u1')).reshape((-1, 32))[:, -dimensions:]
return out
def randomly_generate_dataset_values(
generator, n_features, n_informative, n_redundant, n_repeated,
n_classes, n_clusters_per_class, weights, class_sep,
hypercube, shift, scale):
n_clusters = n_classes * n_clusters_per_class
if weights and len(weights) == (n_classes - 1):
weights = weights + [1.0 - sum(weights)]
if weights is None:
weights = [1.0 / n_classes] * n_classes
weights[-1] = 1.0 - sum(weights[:-1])
# Build the polytope whose vertices become cluster centroids
centroids = generate_hypercube(n_clusters, n_informative,
generator).astype(float, copy=False)
centroids *= 2 * class_sep
centroids -= class_sep
if not hypercube:
centroids *= generator.rand(n_clusters, 1)
centroids *= generator.rand(1, n_informative)
# Generate random covariances
cov_list = [2 * generator.rand(n_informative, n_informative) - 1
for _ in range(len(centroids))]
# Generate random redundancy
rand_redundant = 2 * generator.rand(n_informative, n_redundant) - 1
# Generate repeat features
rand_repeated = generator.rand(n_repeated)
# Random shift and scale
if shift is None:
shift = (2 * generator.rand(n_features) - 1) * class_sep
if scale is None:
scale = 1 + 100 * generator.rand(n_features)
return weights, centroids, cov_list, rand_redundant, rand_repeated, shift, scale
def make_batch(
generator, n_samples, n_features, n_informative, n_redundant,
n_repeated, n_classes, n_clusters_per_class, weights, flip_y,
centroids, cov_list, rand_redundant, rand_repeated, shift, scale):
n_clusters = n_classes * n_clusters_per_class
# Distribute samples among clusters by weight
n_samples_per_cluster = [
int(n_samples * weights[k % n_classes] / n_clusters_per_class)
for k in range(n_clusters)]
for i in range(n_samples - sum(n_samples_per_cluster)):
n_samples_per_cluster[i % n_clusters] += 1
n_useless = n_features - n_informative - n_redundant - n_repeated
# Initialize X and y
X = np.zeros((n_samples, n_features))
y = np.zeros(n_samples, dtype=np.int)
# Initially draw informative features from the standard normal
X[:, :n_informative] = generator.randn(n_samples, n_informative)
# Create each cluster; a variant of make_blobs
stop = 0
for k, centroid in enumerate(centroids):
start, stop = stop, stop + n_samples_per_cluster[k]
y[start:stop] = k % n_classes # assign labels
X_k = X[start:stop, :n_informative] # slice a view of the cluster
X_k[...] = np.dot(X_k, cov_list[k]) # introduce random covariance
X_k += centroid # shift the cluster to a vertex
# Create redundant features
if n_redundant > 0:
X[:, n_informative:n_informative + n_redundant] = \
np.dot(X[:, :n_informative], rand_redundant)
# Repeat some features
if n_repeated > 0:
n = n_informative + n_redundant
indices = ((n - 1) * rand_repeated + 0.5).astype(np.intp)
X[:, n:n + n_repeated] = X[:, indices]
# Fill useless features
if n_useless > 0:
X[:, -n_useless:] = generator.randn(n_samples, n_useless)
# Randomly replace labels
if flip_y >= 0.0:
flip_mask = generator.rand(n_samples) < flip_y
y[flip_mask] = generator.randint(n_classes, size=flip_mask.sum())
# Randomly shift and scale
X += shift
X *= scale
return X, y
def save_classification_dataset(
output_directory, s3_bucket, s3_prefix, num_batches, batch_samples, num_dirs,
n_features, n_classes=2, n_clusters_per_class=2, weights=None,
flip_y=0.01, class_sep=1.0, hypercube=True, shift=0.0, scale=1.0,
generator=None, num_threads=1):
"""Generate a random n-class classification problem.
This initially creates clusters of points normally distributed (std=1)
about vertices of an ``n_informative``-dimensional hypercube with sides of
length ``2*class_sep`` and assigns an equal number of clusters to each
class. It introduces interdependence between these features and adds
various types of further noise to the data.
Without shuffling, ``X`` horizontally stacks features in the following
order: the primary ``n_informative`` features, followed by ``n_redundant``
linear combinations of the informative features, followed by ``n_repeated``
duplicates, drawn randomly with replacement from the informative and
redundant features. The remaining features are filled with random noise.
Thus, without shuffling, all useful features are contained in the columns
``X[:, :n_informative + n_redundant + n_repeated]``.
Read more in the :ref:`User Guide <sample_generators>`.
Parameters
----------
output_directory : string or Path
The local directory to save to
s3_bucket : string or None
If not None, the S3 bucket to upload results to. output_directory is
only used for temporary storage in this case. Each batch deleted before
the next is created.
num_batches : int
The number of batches.
batch_samples : int
The number of samples per batch.
n_features : int
The total number of features. These comprise ``n_informative``
informative features, ``n_redundant`` redundant features,
``n_repeated`` duplicated features and
``n_features-n_informative-n_redundant-n_repeated`` useless features
drawn at random.
n_classes : int, optional (default=2)
The number of classes (or labels) of the classification problem.
n_clusters_per_class : int, optional (default=2)
The number of clusters per class.
weights : list of floats or None (default=None)
The proportions of samples assigned to each class. If None, then
classes are balanced. Note that if ``len(weights) == n_classes - 1``,
then the last class weight is automatically inferred.
More than ``n_samples`` samples may be returned if the sum of
``weights`` exceeds 1.
flip_y : float, optional (default=0.01)
The fraction of samples whose class are randomly exchanged. Larger
values introduce noise in the labels and make the classification
task harder.
class_sep : float, optional (default=1.0)
The factor multiplying the hypercube size. Larger values spread
out the clusters/classes and make the classification task easier.
hypercube : boolean, optional (default=True)
If True, the clusters are put on the vertices of a hypercube. If
False, the clusters are put on the vertices of a random polytope.
shift : float, array of shape [n_features] or None, optional (default=0.0)
Shift features by the specified value. If None, then features
are shifted by a random value drawn in [-class_sep, class_sep].
scale : float, array of shape [n_features] or None, optional (default=1.0)
Multiply features by the specified value. If None, then features
are scaled by a random value drawn in [1, 100]. Note that scaling
happens after shifting.
generator : RandomState instance or None (default)
Determines random number generation for dataset creation.
Returns
-------
X : array of shape [n_samples, n_features]
The generated samples.
y : array of shape [n_samples]
The integer labels for class membership of each sample.
Notes
-----
The algorithm is adapted from Guyon [1] and was designed to generate
the "Madelon" dataset.
References
----------
.. [1] I. Guyon, "Design of experiments for the NIPS 2003 variable
selection benchmark", 2003.
"""
assert (s3_bucket is None) == (s3_prefix is None), \
'Gave only one of S3 bucket and prefix'
batch_folder = '{problem}_{rows}_x_{cols}_fragmented'.format(
problem='binary' if n_classes == 2 else 'multiclass',
rows=batch_samples * num_batches, cols=n_features)
Path(output_directory, batch_folder).mkdir(parents=True, exist_ok=True)
num_dirs_list = ["dir_"+str(index) for index in sorted(list(range(num_dirs)) * (num_batches // num_dirs))]
print(num_dirs_list)
if generator is None:
generator = np.random.RandomState()
n_informative = int(round(n_features * 0.4))
n_redundant = int(round(n_features * 0.2))
n_repeated = int(round(n_features * 0.1))
n_useless = n_features - n_informative - n_redundant - n_repeated
columns = ['target'] + \
[f'informative_{i}' for i in range(1, n_informative + 1)] + \
[f'redundant_{i}' for i in range(1, n_redundant + 1)] + \
[f'repeated_{i}' for i in range(1, n_repeated + 1)] + \
[f'useless_{i}' for i in range(1, n_useless + 1)]
assert len(columns) == n_features + 1, f'Incorrect number of column names'
header = ','.join(columns)
# Count features, clusters and samples
if n_informative + n_redundant + n_repeated > n_features:
raise ValueError("Number of informative, redundant and repeated "
"features must sum to less than the number of total"
" features")
# Use log2 to avoid overflow errors
if n_informative < np.log2(n_classes * n_clusters_per_class):
raise ValueError("n_classes * n_clusters_per_class must"
" be smaller or equal 2 ** n_informative")
if weights and len(weights) not in [n_classes, n_classes - 1]:
raise ValueError("Weights specified but incompatible with number "
"of classes.")
data_info = randomly_generate_dataset_values(
generator, n_features, n_informative, n_redundant, n_repeated,
n_classes, n_clusters_per_class, weights, class_sep,
hypercube, shift, scale)
weights, centroids, cov_list, rand_redundant, rand_repeated, shift, scale = data_info
def makes_save_batch(batch_num_seed_tuple):
batch_num, (dir_name, batch_seed) = batch_num_seed_tuple
print("tuple", batch_num_seed_tuple)
batch_gen = np.random.RandomState(batch_seed)
X, y = make_batch(
batch_gen, batch_samples, n_features, n_informative, n_redundant,
n_repeated, n_classes, n_clusters_per_class, weights, flip_y,
centroids, cov_list, rand_redundant, rand_repeated, shift, scale)
data = np.concatenate([np.expand_dims(y, axis=1), X], axis=1)
base_path = f'batch_{batch_num}.csv'
dir_path = Path(output_directory, batch_folder, dir_name)
dir_path.mkdir(parents=True, exist_ok=True)
fn = Path(dir_path, base_path)
np.savetxt(fn, data, delimiter=',', header=header, comments='')
if s3_bucket:
key = f'{s3_prefix}/{batch_folder}/{dir_name}/{base_path}'
try:
s3_client.upload_file(str(fn), s3_bucket, key)
except S3UploadFailedError as e:
print(e)
fn.unlink()
seeds = generator.choice(np.iinfo(np.int32).max, num_batches, replace=True)
dir_with_seeds_list = list(zip(num_dirs_list, seeds))
par = Parallel(n_jobs=num_threads, prefer='threads')
par(delayed(makes_save_batch)(tup) for tup in enumerate(dir_with_seeds_list))
import numpy as np
from pathlib import Path
from batch_generator import save_classification_dataset
import time
import os
# Total number of rows = NUM_BATCHES * BATCH_SAMPLES
NUM_BATCHES = 60
NUM_DIRS = 3
BATCH_SAMPLES = 100
N_FEATURES = 40
N_CLASSES = 4
SEED = 12345
THREADS = None # Used for a c5n.18xlarge
OUTPUT_DIRECTORY_BASE = 'synthetic_datasets_run01'
UPLOAD_TO_S3 = False
S3_BUCKET = 'bucketname'
S3_PREFIX = 'data/test_data_run01'
def main():
directory_name = Path(OUTPUT_DIRECTORY_BASE.format(
problem='binary' if N_CLASSES == 2 else 'multiclass',
rows=BATCH_SAMPLES * NUM_BATCHES, cols=N_FEATURES))
directory_name.mkdir(parents=True, exist_ok=True)
if UPLOAD_TO_S3:
s3_bucket = S3_BUCKET
s3_prefix = S3_PREFIX
else:
s3_bucket = None
s3_prefix = None
generator = np.random.RandomState(SEED)
start = time.time()
save_classification_dataset(
directory_name, s3_bucket, s3_prefix, NUM_BATCHES, BATCH_SAMPLES, NUM_DIRS,
N_FEATURES, N_CLASSES, generator=generator, n_clusters_per_class=4,
weights=None, flip_y=0.03, class_sep=0.7, hypercube=True, shift=0.0,
scale=1.0, num_threads=os.cpu_count() if THREADS is None else THREADS)
dur = time.time() - start
print(f'Time Taken in seconds: {dur}')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment