Last active
August 29, 2015 14:27
-
-
Save ephes/90193567b8c0501b13ef to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/anaconda/bin/python3 | |
import os | |
import sys | |
import array | |
import joblib | |
import numbers | |
import numpy as np | |
import scipy.sparse as sp | |
from os.path import join | |
from sys import getsizeof | |
from itertools import chain | |
from itertools import islice | |
from collections import Counter | |
from scipy.sparse import issparse | |
from collections import defaultdict | |
from sklearn.utils.fixes import frombuffer_empty | |
from sklearn.feature_extraction.text import CountVectorizer | |
from sklearn.feature_extraction.text import HashingVectorizer | |
from sklearn.feature_extraction.hashing import FeatureHasher | |
from sklearn.feature_extraction import _hashing | |
from sklearn.externals import six | |
from sklearn.utils.fixes import bincount | |
from sklearn.preprocessing import normalize | |
mega = 1000000.0 | |
def csr_vappend(X, Y): | |
X.data = np.hstack((X.data, Y.data)) | |
X.indices = np.hstack((X.indices, Y.indices)) | |
X.indptr = np.hstack((X.indptr, (Y.indptr + X.nnz)[1:])) | |
X._shape = (X.shape[0] + Y.shape[0], Y.shape[1]) | |
return X | |
def array_size(arr, fac=mega): | |
# return array size in megabytes resp nbytes / fac | |
if issparse(arr): | |
return (arr.data.nbytes + arr.indptr.nbytes + arr.indices.nbytes) / fac | |
else: | |
return arr.nbytes / fac | |
def _make_int_array(): | |
"""Construct an array.array of a type suitable for scipy.sparse indices.""" | |
return array.array(str("i")) | |
def _document_frequency(X): | |
"""Count the number of non-zero values for each feature in sparse X.""" | |
if sp.isspmatrix_csr(X): | |
return bincount(X.indices, minlength=X.shape[1]) | |
else: | |
return np.diff(sp.csc_matrix(X, copy=False).indptr) | |
class DebugCountVectorizer(CountVectorizer): | |
def _count_vocab(self, raw_documents, fixed_vocab): | |
"""Create sparse feature matrix, and vocabulary where fixed_vocab=False | |
""" | |
if fixed_vocab: | |
vocabulary = self.vocabulary_ | |
else: | |
# Add a new value when a new vocabulary item is seen | |
vocabulary = defaultdict() | |
vocabulary.default_factory = vocabulary.__len__ | |
analyze = self.build_analyzer() | |
j_indices = _make_int_array() | |
indptr = _make_int_array() | |
indptr.append(0) | |
for doc in raw_documents: | |
for feature in analyze(doc): | |
try: | |
j_indices.append(vocabulary[feature]) | |
except KeyError: | |
# Ignore out-of-vocabulary items for fixed_vocab=True | |
continue | |
indptr.append(len(j_indices)) | |
if not fixed_vocab: | |
# disable defaultdict behaviour | |
vocabulary = dict(vocabulary) | |
if not vocabulary: | |
raise ValueError("empty vocabulary; perhaps the documents only" | |
" contain stop words") | |
j_indices = frombuffer_empty(j_indices, dtype=np.intc) | |
indptr = np.frombuffer(indptr, dtype=np.intc) | |
values = np.ones(len(j_indices)) | |
print('j_indices size: {}'.format(array_size(j_indices))) | |
print('indptr size: {}'.format(array_size(indptr))) | |
print('values size: {}'.format(array_size(values))) | |
X = sp.csr_matrix((values, j_indices, indptr), | |
shape=(len(indptr) - 1, len(vocabulary)), | |
dtype=self.dtype) | |
print('X before sum duplicates size: {}'.format(array_size(X))) | |
X.sum_duplicates() | |
print('final X size: {}'.format(array_size(X))) | |
print('vocabulary size: {}'.format(getsizeof(vocabulary) / mega)) | |
return vocabulary, X | |
class DebugFeatureHasher(FeatureHasher): | |
def transform(self, raw_X, y=None): | |
"""Transform a sequence of instances to a scipy.sparse matrix. | |
Parameters | |
---------- | |
raw_X : iterable over iterable over raw features, length = n_samples | |
Samples. Each sample must be iterable an (e.g., a list or tuple) | |
containing/generating feature names (and optionally values, see | |
the input_type constructor argument) which will be hashed. | |
raw_X need not support the len function, so it can be the result | |
of a generator; n_samples is determined on the fly. | |
y : (ignored) | |
Returns | |
------- | |
X : scipy.sparse matrix, shape = (n_samples, self.n_features) | |
Feature matrix, for use with estimators or further transformers. | |
""" | |
raw_X = iter(raw_X) | |
if self.input_type == "dict": | |
raw_X = (_iteritems(d) for d in raw_X) | |
elif self.input_type == "string": | |
raw_X = (((f, 1) for f in x) for x in raw_X) | |
indices, indptr, values = \ | |
_hashing.transform(raw_X, self.n_features, self.dtype) | |
n_samples = indptr.shape[0] - 1 | |
if n_samples == 0: | |
raise ValueError("Cannot vectorize empty sequence.") | |
print('indices size: {}'.format(array_size(indices))) | |
print('indptr size: {}'.format(array_size(indptr))) | |
print('values size: {}'.format(array_size(values))) | |
X = sp.csr_matrix((values, indices, indptr), dtype=self.dtype, | |
shape=(n_samples, self.n_features)) | |
print('X before sum duplicates size: {}'.format(array_size(X))) | |
X.sum_duplicates() # also sorts the indices | |
print('final X size: {}'.format(array_size(X))) | |
if self.non_negative: | |
np.abs(X.data, X.data) | |
return X | |
class DebugHashingVectorizer(HashingVectorizer): | |
def _get_hasher(self): | |
return DebugFeatureHasher( | |
n_features=self.n_features, input_type='string', | |
dtype=self.dtype, non_negative=self.non_negative) | |
def transform(self, X, y=None): | |
analyzer = self.build_analyzer() | |
X = self._get_hasher().transform(analyzer(doc) for doc in X) | |
if self.binary: | |
X.data.fill(1) | |
if self.norm is not None: | |
#X = X.astype(np.double) | |
print('X dtype: {}'.format(X.dtype)) | |
X = normalize(X, norm=self.norm, copy=False) | |
return X | |
# Alias transform to fit_transform for convenience | |
fit_transform = transform | |
class LeanFeatureHasher(FeatureHasher): | |
def in_chunks(self, iterable, size): | |
it = iter(iterable) | |
while True: | |
chunk = tuple(islice(it, size)) | |
if not chunk: | |
return | |
yield chunk | |
def transform(self, raw_X, y=None): | |
X = sp.csr_matrix((0, 0), dtype=self.dtype) | |
for raw_X_chunk in self.in_chunks(raw_X, 10000): | |
raw_X_chunk = iter(raw_X_chunk) | |
if self.input_type == "dict": | |
raw_X_chunk = (_iteritems(d) for d in raw_X_chunk) | |
elif self.input_type == "string": | |
raw_X_chunk = (((f, 1) for f in x) for x in raw_X_chunk) | |
indices, indptr, values = \ | |
_hashing.transform(raw_X_chunk, self.n_features, self.dtype) | |
n_samples = indptr.shape[0] - 1 | |
Y = sp.csr_matrix((values, indices, indptr), dtype=self.dtype, | |
shape=(n_samples, self.n_features)) | |
print('part Y size: {}'.format(array_size(Y))) | |
Y.sum_duplicates() # also sorts the indices | |
print('part Y size (sum duplicates): {}'.format(array_size(Y))) | |
X = csr_vappend(X, Y) | |
print('part X size: {}'.format(array_size(X))) | |
if self.non_negative: | |
np.abs(X.data, X.data) | |
n_samples = X.indptr.shape[0] - 1 | |
if n_samples == 0: | |
raise ValueError("Cannot vectorize empty sequence.") | |
print('final X size: {}'.format(array_size(X))) | |
return X | |
class LeanHashingVectorizer(HashingVectorizer): | |
def _get_hasher(self): | |
return LeanFeatureHasher( | |
n_features=self.n_features, input_type='string', | |
dtype=self.dtype, non_negative=self.non_negative) | |
class LeanCountVectorizer(CountVectorizer): | |
def _get_sparse_matrix_from_indices(self, j_indices, indptr, vocabulary): | |
j_indices = frombuffer_empty(j_indices, dtype=np.intc) | |
indptr = np.frombuffer(indptr, dtype=np.intc) | |
values = np.ones(len(j_indices)) | |
X = sp.csr_matrix((values, j_indices, indptr), | |
shape=(len(indptr) - 1, len(vocabulary)), | |
dtype=self.dtype) | |
X.sum_duplicates() | |
return X | |
def _get_empty_arrays(self): | |
j_indices = _make_int_array() | |
indptr = _make_int_array() | |
indptr.append(0) | |
return j_indices, indptr | |
def _count_vocab(self, raw_documents, fixed_vocab): | |
"""Create sparse feature matrix, and vocabulary where fixed_vocab=False | |
""" | |
if fixed_vocab: | |
vocabulary = self.vocabulary_ | |
else: | |
# Add a new value when a new vocabulary item is seen | |
vocabulary = defaultdict() | |
vocabulary.default_factory = vocabulary.__len__ | |
self.chunksize = 10000 | |
analyze = self.build_analyzer() | |
X = sp.csr_matrix((0, 0), dtype=self.dtype) | |
j_indices, indptr = self._get_empty_arrays() | |
for num, doc in enumerate(raw_documents): | |
for feature in analyze(doc): | |
try: | |
j_indices.append(vocabulary[feature]) | |
except KeyError: | |
# Ignore out-of-vocabulary items for fixed_vocab=True | |
continue | |
indptr.append(len(j_indices)) | |
if num > 0 and num % self.chunksize == 0: | |
Y = self._get_sparse_matrix_from_indices(j_indices, indptr, vocabulary) | |
X = csr_vappend(X, Y) | |
del(Y) | |
j_indices, indptr = self._get_empty_arrays() | |
Y = self._get_sparse_matrix_from_indices(j_indices, indptr, vocabulary) | |
X = csr_vappend(X, Y) | |
print('j_indices size: {}'.format(getsizeof(j_indices) / mega)) | |
print('indptr size: {}'.format(getsizeof(indptr) / mega)) | |
del(Y) | |
del(j_indices, indptr) | |
print('final X size: {}'.format(array_size(X))) | |
print('vocabulary size: {}'.format(getsizeof(vocabulary) / mega)) | |
if not fixed_vocab: | |
# disable defaultdict behaviour | |
vocabulary = dict(vocabulary) | |
if not vocabulary: | |
raise ValueError("empty vocabulary; perhaps the documents only" | |
" contain stop words") | |
joblib.dump(X, 'enron.features') | |
joblib.dump(vocabulary, 'enron.vocabulary') | |
return vocabulary, X | |
def _sort_features(self, X, vocabulary): | |
"""Sort features by name | |
Returns a reordered matrix and modifies the vocabulary in place | |
""" | |
sorted_features = sorted(six.iteritems(vocabulary)) | |
map_index = np.empty(len(sorted_features), dtype=np.int32) | |
for new_val, (term, old_val) in enumerate(sorted_features): | |
vocabulary[term] = new_val | |
map_index[old_val] = new_val | |
# swap columns in place | |
indices = X.indices | |
for idx, val in enumerate(X.indices): | |
indices[idx] = map_index[val] | |
X.indices = indices | |
return X | |
def _limit_features(self, X, vocabulary, high=None, low=None, | |
limit=None): | |
high_not_set = high is None or int(high) == X.shape[0] | |
low_not_set = low is None or int(low) == 1 | |
limit_not_set = limit is None or int(limit) == X.shape[1] | |
if high_not_set and low_not_set and limit_not_set: | |
return X, set() | |
# Calculate a mask based on document frequencies | |
dfs = _document_frequency(X) | |
tfs = np.asarray(X.sum(axis=0)).ravel() | |
mask = np.ones(len(dfs), dtype=bool) | |
if high is not None: | |
mask &= dfs <= high | |
if low is not None: | |
mask &= dfs >= low | |
if limit is not None and mask.sum() > limit: | |
mask_inds = (-tfs[mask]).argsort()[:limit] | |
new_mask = np.zeros(len(dfs), dtype=bool) | |
new_mask[np.where(mask)[0][mask_inds]] = True | |
mask = new_mask | |
new_indices = np.cumsum(mask) - 1 # maps old indices to new | |
removed_terms = set() | |
for term, old_index in list(six.iteritems(vocabulary)): | |
if mask[old_index]: | |
vocabulary[term] = new_indices[old_index] | |
else: | |
del vocabulary[term] | |
removed_terms.add(term) | |
kept_indices = np.where(mask)[0] | |
if len(kept_indices) == 0: | |
raise ValueError("After pruning, no terms remain. Try a lower" | |
" min_df or a higher max_df.") | |
return X[:, kept_indices], removed_terms | |
# def _count_vocab(self, raw_documents, fixed_vocab): | |
# vocabulary = joblib.load('enron.vocabulary') | |
# X = joblib.load('enron.features') | |
# return vocabulary, X | |
def fit_transform(self, raw_documents, y=None): | |
self._validate_vocabulary() | |
max_df = self.max_df | |
min_df = self.min_df | |
max_features = self.max_features | |
vocabulary, X = self._count_vocab(raw_documents, | |
self.fixed_vocabulary_) | |
if self.binary: | |
X.data.fill(1) | |
if not self.fixed_vocabulary_: | |
X = self._sort_features(X, vocabulary) | |
n_doc = X.shape[0] | |
max_doc_count = (max_df | |
if isinstance(max_df, numbers.Integral) | |
else max_df * n_doc) | |
min_doc_count = (min_df | |
if isinstance(min_df, numbers.Integral) | |
else min_df * n_doc) | |
if max_doc_count < min_doc_count: | |
raise ValueError( | |
"max_df corresponds to < documents than min_df") | |
print('limit features args: {} {} {}'.format(max_doc_count, min_doc_count, max_features)) | |
X, self.stop_words_ = self._limit_features(X, vocabulary, | |
max_doc_count, | |
min_doc_count, | |
max_features) | |
self.vocabulary_ = vocabulary | |
return X | |
class AupiffCountVectorizer(LeanCountVectorizer): | |
def _count_vocab(self, raw_documents, fixed_vocab): | |
"""Create sparse feature matrix, and vocabulary where fixed_vocab=False | |
""" | |
if fixed_vocab: | |
vocabulary = self.vocabulary_ | |
else: | |
# Add a new value when a new vocabulary item is seen | |
vocabulary = defaultdict() | |
vocabulary.default_factory = vocabulary.__len__ | |
analyze = self.build_analyzer() | |
j_indices = _make_int_array() | |
indptr = _make_int_array() | |
values = _make_int_array() | |
indptr.append(0) | |
for doc in raw_documents: | |
j_idx_to_count_dict = {} | |
for feature in analyze(doc): | |
try: | |
vocab_idx = vocabulary[feature] | |
word_occurences = j_idx_to_count_dict.get(vocab_idx, 0) | |
j_idx_to_count_dict[vocab_idx] = word_occurences + 1 | |
except KeyError: | |
# Ignore out-of-vocabulary items for fixed_vocab=True | |
continue | |
for (vocabIdx, wordCount) in six.iteritems(j_idx_to_count_dict): | |
values.append(wordCount) | |
j_indices.append(vocabIdx) | |
indptr.append(len(j_indices)) | |
if not fixed_vocab: | |
# disable defaultdict behaviour | |
vocabulary = dict(vocabulary) | |
if not vocabulary: | |
raise ValueError("empty vocabulary; perhaps the documents only" | |
" contain stop words") | |
j_indices = frombuffer_empty(j_indices, dtype=np.intc) | |
indptr = np.frombuffer(indptr, dtype=np.intc) | |
values = frombuffer_empty(values, dtype=np.intc) | |
print('j_indices size: {}'.format(array_size(j_indices))) | |
print('indptr size: {}'.format(array_size(indptr))) | |
print('values size: {}'.format(array_size(values))) | |
X = sp.csr_matrix((values, j_indices, indptr), | |
shape=(len(indptr) - 1, len(vocabulary)), | |
dtype=self.dtype) | |
print('X before sort_indices size: {}'.format(array_size(X))) | |
X.sort_indices() | |
print('final X size: {}'.format(array_size(X))) | |
print('vocabulary size: {}'.format(getsizeof(vocabulary) / mega)) | |
return vocabulary, X | |
class FastAupiffCountVectorizer(LeanCountVectorizer): | |
def _count_vocab(self, raw_documents, fixed_vocab): | |
"""Create sparse feature matrix, and vocabulary where fixed_vocab=False | |
""" | |
if fixed_vocab: | |
vocabulary = self.vocabulary_ | |
else: | |
# Add a new value when a new vocabulary item is seen | |
vocabulary = defaultdict() | |
vocabulary.default_factory = vocabulary.__len__ | |
analyze = self.build_analyzer() | |
j_indices = _make_int_array() | |
indptr = _make_int_array() | |
values = _make_int_array() | |
indptr.append(0) | |
for doc in raw_documents: | |
doc_indices = _make_int_array() | |
for feature in analyze(doc): | |
try: | |
doc_indices.append(vocabulary[feature]) | |
except KeyError: | |
# Ignore out-of-vocabulary items for fixed_vocab=True | |
continue | |
# sum duplicates for doc features | |
doc_indices = np.frombuffer(doc_indices, dtype=np.intc) | |
doc_indices.sort() | |
diff = np.concatenate(([1], np.diff(doc_indices))) | |
idx = np.concatenate((np.where(diff)[0], [len(doc_indices)])) | |
# append doc to global indices and values | |
j_indices.extend(doc_indices[idx[:-1]]) | |
values.extend(np.diff(idx)) | |
indptr.append(len(j_indices)) | |
if not fixed_vocab: | |
# disable defaultdict behaviour | |
vocabulary = dict(vocabulary) | |
if not vocabulary: | |
raise ValueError("empty vocabulary; perhaps the documents only" | |
" contain stop words") | |
j_indices = frombuffer_empty(j_indices, dtype=np.intc) | |
indptr = np.frombuffer(indptr, dtype=np.intc) | |
values = frombuffer_empty(values, dtype=np.intc) | |
print('j_indices size: {}'.format(array_size(j_indices))) | |
print('indptr size: {}'.format(array_size(indptr))) | |
print('values size: {}'.format(array_size(values))) | |
X = sp.csr_matrix((values, j_indices, indptr), | |
shape=(len(indptr) - 1, len(vocabulary)), | |
dtype=self.dtype) | |
print('X before sort_indices size: {}'.format(array_size(X))) | |
print('final X size: {}'.format(array_size(X))) | |
print('vocabulary size: {}'.format(getsizeof(vocabulary) / mega)) | |
return vocabulary, X | |
class FastAupiffCountVectorizer(LeanCountVectorizer): | |
def _count_vocab(self, raw_documents, fixed_vocab): | |
"""Create sparse feature matrix, and vocabulary where fixed_vocab=False | |
""" | |
if fixed_vocab: | |
vocabulary = self.vocabulary_ | |
else: | |
# Add a new value when a new vocabulary item is seen | |
vocabulary = defaultdict() | |
vocabulary.default_factory = vocabulary.__len__ | |
analyze = self.build_analyzer() | |
j_indices = _make_int_array() | |
indptr = _make_int_array() | |
values = _make_int_array() | |
indptr.append(0) | |
for doc in raw_documents: | |
feature_counter = {} | |
for feature in analyze(doc): | |
try: | |
feature_idx = vocabulary[feature] | |
if feature_idx not in feature_counter: | |
feature_counter[feature_idx] = 1 | |
else: | |
feature_counter[feature_idx] += 1 | |
except KeyError: | |
# Ignore out-of-vocabulary items for fixed_vocab=True | |
continue | |
j_indices.extend(feature_counter.keys()) | |
values.extend(feature_counter.values()) | |
del(feature_counter) | |
indptr.append(len(j_indices)) | |
if not fixed_vocab: | |
# disable defaultdict behaviour | |
vocabulary = dict(vocabulary) | |
if not vocabulary: | |
raise ValueError("empty vocabulary; perhaps the documents only" | |
" contain stop words") | |
j_indices = frombuffer_empty(j_indices, dtype=np.intc) | |
indptr = np.frombuffer(indptr, dtype=np.intc) | |
values = frombuffer_empty(values, dtype=np.intc) | |
print('j_indices size: {}'.format(array_size(j_indices))) | |
print('indptr size: {}'.format(array_size(indptr))) | |
print('values size: {}'.format(array_size(values))) | |
X = sp.csr_matrix((values, j_indices, indptr), | |
shape=(len(indptr) - 1, len(vocabulary)), | |
dtype=self.dtype) | |
print('X before sort_indices size: {}'.format(array_size(X))) | |
X.sort_indices() | |
print('final X size: {}'.format(array_size(X))) | |
print('vocabulary size: {}'.format(getsizeof(vocabulary) / mega)) | |
return vocabulary, X | |
def iterate_enron_docs(path): | |
num = 0 | |
for root, dirs, files in os.walk(path): | |
for name in files: | |
num += 1 | |
if num % 10000 == 0: | |
print(num) | |
with open(join(root, name), 'r') as fh: | |
try: | |
yield fh.read() | |
except UnicodeDecodeError: | |
pass | |
# if num > 20000: | |
# break | |
def main(args): | |
in_path, out_path = args[0], args[1] | |
#vectorizer = DebugCountVectorizer() | |
#vectorizer = DebugHashingVectorizer() | |
#vectorizer = LeanHashingVectorizer() | |
#vectorizer = LeanCountVectorizer(dtype=np.int32) | |
#vectorizer = AupiffCountVectorizer() | |
#vectorizer = FastAupiffCountVectorizer() | |
vectorizer = FastAupiffCountVectorizer2() | |
#vectorizer = DebugHashingVectorizer() | |
X = vectorizer.fit_transform(iterate_enron_docs(in_path)) | |
#joblib.dump(X, out_path) | |
if __name__ == '__main__': | |
main(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment