Skip to content

Instantly share code, notes, and snippets.

@ephes
Last active August 29, 2015 14:27
Show Gist options
  • Save ephes/90193567b8c0501b13ef to your computer and use it in GitHub Desktop.
Save ephes/90193567b8c0501b13ef to your computer and use it in GitHub Desktop.
#!/anaconda/bin/python3
import os
import sys
import array
import joblib
import numbers
import numpy as np
import scipy.sparse as sp
from os.path import join
from sys import getsizeof
from itertools import chain
from itertools import islice
from collections import Counter
from scipy.sparse import issparse
from collections import defaultdict
from sklearn.utils.fixes import frombuffer_empty
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_extraction.hashing import FeatureHasher
from sklearn.feature_extraction import _hashing
from sklearn.externals import six
from sklearn.utils.fixes import bincount
from sklearn.preprocessing import normalize
mega = 1000000.0
def csr_vappend(X, Y):
X.data = np.hstack((X.data, Y.data))
X.indices = np.hstack((X.indices, Y.indices))
X.indptr = np.hstack((X.indptr, (Y.indptr + X.nnz)[1:]))
X._shape = (X.shape[0] + Y.shape[0], Y.shape[1])
return X
def array_size(arr, fac=mega):
# return array size in megabytes resp nbytes / fac
if issparse(arr):
return (arr.data.nbytes + arr.indptr.nbytes + arr.indices.nbytes) / fac
else:
return arr.nbytes / fac
def _make_int_array():
"""Construct an array.array of a type suitable for scipy.sparse indices."""
return array.array(str("i"))
def _document_frequency(X):
"""Count the number of non-zero values for each feature in sparse X."""
if sp.isspmatrix_csr(X):
return bincount(X.indices, minlength=X.shape[1])
else:
return np.diff(sp.csc_matrix(X, copy=False).indptr)
class DebugCountVectorizer(CountVectorizer):
def _count_vocab(self, raw_documents, fixed_vocab):
"""Create sparse feature matrix, and vocabulary where fixed_vocab=False
"""
if fixed_vocab:
vocabulary = self.vocabulary_
else:
# Add a new value when a new vocabulary item is seen
vocabulary = defaultdict()
vocabulary.default_factory = vocabulary.__len__
analyze = self.build_analyzer()
j_indices = _make_int_array()
indptr = _make_int_array()
indptr.append(0)
for doc in raw_documents:
for feature in analyze(doc):
try:
j_indices.append(vocabulary[feature])
except KeyError:
# Ignore out-of-vocabulary items for fixed_vocab=True
continue
indptr.append(len(j_indices))
if not fixed_vocab:
# disable defaultdict behaviour
vocabulary = dict(vocabulary)
if not vocabulary:
raise ValueError("empty vocabulary; perhaps the documents only"
" contain stop words")
j_indices = frombuffer_empty(j_indices, dtype=np.intc)
indptr = np.frombuffer(indptr, dtype=np.intc)
values = np.ones(len(j_indices))
print('j_indices size: {}'.format(array_size(j_indices)))
print('indptr size: {}'.format(array_size(indptr)))
print('values size: {}'.format(array_size(values)))
X = sp.csr_matrix((values, j_indices, indptr),
shape=(len(indptr) - 1, len(vocabulary)),
dtype=self.dtype)
print('X before sum duplicates size: {}'.format(array_size(X)))
X.sum_duplicates()
print('final X size: {}'.format(array_size(X)))
print('vocabulary size: {}'.format(getsizeof(vocabulary) / mega))
return vocabulary, X
class DebugFeatureHasher(FeatureHasher):
def transform(self, raw_X, y=None):
"""Transform a sequence of instances to a scipy.sparse matrix.
Parameters
----------
raw_X : iterable over iterable over raw features, length = n_samples
Samples. Each sample must be iterable an (e.g., a list or tuple)
containing/generating feature names (and optionally values, see
the input_type constructor argument) which will be hashed.
raw_X need not support the len function, so it can be the result
of a generator; n_samples is determined on the fly.
y : (ignored)
Returns
-------
X : scipy.sparse matrix, shape = (n_samples, self.n_features)
Feature matrix, for use with estimators or further transformers.
"""
raw_X = iter(raw_X)
if self.input_type == "dict":
raw_X = (_iteritems(d) for d in raw_X)
elif self.input_type == "string":
raw_X = (((f, 1) for f in x) for x in raw_X)
indices, indptr, values = \
_hashing.transform(raw_X, self.n_features, self.dtype)
n_samples = indptr.shape[0] - 1
if n_samples == 0:
raise ValueError("Cannot vectorize empty sequence.")
print('indices size: {}'.format(array_size(indices)))
print('indptr size: {}'.format(array_size(indptr)))
print('values size: {}'.format(array_size(values)))
X = sp.csr_matrix((values, indices, indptr), dtype=self.dtype,
shape=(n_samples, self.n_features))
print('X before sum duplicates size: {}'.format(array_size(X)))
X.sum_duplicates() # also sorts the indices
print('final X size: {}'.format(array_size(X)))
if self.non_negative:
np.abs(X.data, X.data)
return X
class DebugHashingVectorizer(HashingVectorizer):
def _get_hasher(self):
return DebugFeatureHasher(
n_features=self.n_features, input_type='string',
dtype=self.dtype, non_negative=self.non_negative)
def transform(self, X, y=None):
analyzer = self.build_analyzer()
X = self._get_hasher().transform(analyzer(doc) for doc in X)
if self.binary:
X.data.fill(1)
if self.norm is not None:
#X = X.astype(np.double)
print('X dtype: {}'.format(X.dtype))
X = normalize(X, norm=self.norm, copy=False)
return X
# Alias transform to fit_transform for convenience
fit_transform = transform
class LeanFeatureHasher(FeatureHasher):
def in_chunks(self, iterable, size):
it = iter(iterable)
while True:
chunk = tuple(islice(it, size))
if not chunk:
return
yield chunk
def transform(self, raw_X, y=None):
X = sp.csr_matrix((0, 0), dtype=self.dtype)
for raw_X_chunk in self.in_chunks(raw_X, 10000):
raw_X_chunk = iter(raw_X_chunk)
if self.input_type == "dict":
raw_X_chunk = (_iteritems(d) for d in raw_X_chunk)
elif self.input_type == "string":
raw_X_chunk = (((f, 1) for f in x) for x in raw_X_chunk)
indices, indptr, values = \
_hashing.transform(raw_X_chunk, self.n_features, self.dtype)
n_samples = indptr.shape[0] - 1
Y = sp.csr_matrix((values, indices, indptr), dtype=self.dtype,
shape=(n_samples, self.n_features))
print('part Y size: {}'.format(array_size(Y)))
Y.sum_duplicates() # also sorts the indices
print('part Y size (sum duplicates): {}'.format(array_size(Y)))
X = csr_vappend(X, Y)
print('part X size: {}'.format(array_size(X)))
if self.non_negative:
np.abs(X.data, X.data)
n_samples = X.indptr.shape[0] - 1
if n_samples == 0:
raise ValueError("Cannot vectorize empty sequence.")
print('final X size: {}'.format(array_size(X)))
return X
class LeanHashingVectorizer(HashingVectorizer):
def _get_hasher(self):
return LeanFeatureHasher(
n_features=self.n_features, input_type='string',
dtype=self.dtype, non_negative=self.non_negative)
class LeanCountVectorizer(CountVectorizer):
def _get_sparse_matrix_from_indices(self, j_indices, indptr, vocabulary):
j_indices = frombuffer_empty(j_indices, dtype=np.intc)
indptr = np.frombuffer(indptr, dtype=np.intc)
values = np.ones(len(j_indices))
X = sp.csr_matrix((values, j_indices, indptr),
shape=(len(indptr) - 1, len(vocabulary)),
dtype=self.dtype)
X.sum_duplicates()
return X
def _get_empty_arrays(self):
j_indices = _make_int_array()
indptr = _make_int_array()
indptr.append(0)
return j_indices, indptr
def _count_vocab(self, raw_documents, fixed_vocab):
"""Create sparse feature matrix, and vocabulary where fixed_vocab=False
"""
if fixed_vocab:
vocabulary = self.vocabulary_
else:
# Add a new value when a new vocabulary item is seen
vocabulary = defaultdict()
vocabulary.default_factory = vocabulary.__len__
self.chunksize = 10000
analyze = self.build_analyzer()
X = sp.csr_matrix((0, 0), dtype=self.dtype)
j_indices, indptr = self._get_empty_arrays()
for num, doc in enumerate(raw_documents):
for feature in analyze(doc):
try:
j_indices.append(vocabulary[feature])
except KeyError:
# Ignore out-of-vocabulary items for fixed_vocab=True
continue
indptr.append(len(j_indices))
if num > 0 and num % self.chunksize == 0:
Y = self._get_sparse_matrix_from_indices(j_indices, indptr, vocabulary)
X = csr_vappend(X, Y)
del(Y)
j_indices, indptr = self._get_empty_arrays()
Y = self._get_sparse_matrix_from_indices(j_indices, indptr, vocabulary)
X = csr_vappend(X, Y)
print('j_indices size: {}'.format(getsizeof(j_indices) / mega))
print('indptr size: {}'.format(getsizeof(indptr) / mega))
del(Y)
del(j_indices, indptr)
print('final X size: {}'.format(array_size(X)))
print('vocabulary size: {}'.format(getsizeof(vocabulary) / mega))
if not fixed_vocab:
# disable defaultdict behaviour
vocabulary = dict(vocabulary)
if not vocabulary:
raise ValueError("empty vocabulary; perhaps the documents only"
" contain stop words")
joblib.dump(X, 'enron.features')
joblib.dump(vocabulary, 'enron.vocabulary')
return vocabulary, X
def _sort_features(self, X, vocabulary):
"""Sort features by name
Returns a reordered matrix and modifies the vocabulary in place
"""
sorted_features = sorted(six.iteritems(vocabulary))
map_index = np.empty(len(sorted_features), dtype=np.int32)
for new_val, (term, old_val) in enumerate(sorted_features):
vocabulary[term] = new_val
map_index[old_val] = new_val
# swap columns in place
indices = X.indices
for idx, val in enumerate(X.indices):
indices[idx] = map_index[val]
X.indices = indices
return X
def _limit_features(self, X, vocabulary, high=None, low=None,
limit=None):
high_not_set = high is None or int(high) == X.shape[0]
low_not_set = low is None or int(low) == 1
limit_not_set = limit is None or int(limit) == X.shape[1]
if high_not_set and low_not_set and limit_not_set:
return X, set()
# Calculate a mask based on document frequencies
dfs = _document_frequency(X)
tfs = np.asarray(X.sum(axis=0)).ravel()
mask = np.ones(len(dfs), dtype=bool)
if high is not None:
mask &= dfs <= high
if low is not None:
mask &= dfs >= low
if limit is not None and mask.sum() > limit:
mask_inds = (-tfs[mask]).argsort()[:limit]
new_mask = np.zeros(len(dfs), dtype=bool)
new_mask[np.where(mask)[0][mask_inds]] = True
mask = new_mask
new_indices = np.cumsum(mask) - 1 # maps old indices to new
removed_terms = set()
for term, old_index in list(six.iteritems(vocabulary)):
if mask[old_index]:
vocabulary[term] = new_indices[old_index]
else:
del vocabulary[term]
removed_terms.add(term)
kept_indices = np.where(mask)[0]
if len(kept_indices) == 0:
raise ValueError("After pruning, no terms remain. Try a lower"
" min_df or a higher max_df.")
return X[:, kept_indices], removed_terms
# def _count_vocab(self, raw_documents, fixed_vocab):
# vocabulary = joblib.load('enron.vocabulary')
# X = joblib.load('enron.features')
# return vocabulary, X
def fit_transform(self, raw_documents, y=None):
self._validate_vocabulary()
max_df = self.max_df
min_df = self.min_df
max_features = self.max_features
vocabulary, X = self._count_vocab(raw_documents,
self.fixed_vocabulary_)
if self.binary:
X.data.fill(1)
if not self.fixed_vocabulary_:
X = self._sort_features(X, vocabulary)
n_doc = X.shape[0]
max_doc_count = (max_df
if isinstance(max_df, numbers.Integral)
else max_df * n_doc)
min_doc_count = (min_df
if isinstance(min_df, numbers.Integral)
else min_df * n_doc)
if max_doc_count < min_doc_count:
raise ValueError(
"max_df corresponds to < documents than min_df")
print('limit features args: {} {} {}'.format(max_doc_count, min_doc_count, max_features))
X, self.stop_words_ = self._limit_features(X, vocabulary,
max_doc_count,
min_doc_count,
max_features)
self.vocabulary_ = vocabulary
return X
class AupiffCountVectorizer(LeanCountVectorizer):
def _count_vocab(self, raw_documents, fixed_vocab):
"""Create sparse feature matrix, and vocabulary where fixed_vocab=False
"""
if fixed_vocab:
vocabulary = self.vocabulary_
else:
# Add a new value when a new vocabulary item is seen
vocabulary = defaultdict()
vocabulary.default_factory = vocabulary.__len__
analyze = self.build_analyzer()
j_indices = _make_int_array()
indptr = _make_int_array()
values = _make_int_array()
indptr.append(0)
for doc in raw_documents:
j_idx_to_count_dict = {}
for feature in analyze(doc):
try:
vocab_idx = vocabulary[feature]
word_occurences = j_idx_to_count_dict.get(vocab_idx, 0)
j_idx_to_count_dict[vocab_idx] = word_occurences + 1
except KeyError:
# Ignore out-of-vocabulary items for fixed_vocab=True
continue
for (vocabIdx, wordCount) in six.iteritems(j_idx_to_count_dict):
values.append(wordCount)
j_indices.append(vocabIdx)
indptr.append(len(j_indices))
if not fixed_vocab:
# disable defaultdict behaviour
vocabulary = dict(vocabulary)
if not vocabulary:
raise ValueError("empty vocabulary; perhaps the documents only"
" contain stop words")
j_indices = frombuffer_empty(j_indices, dtype=np.intc)
indptr = np.frombuffer(indptr, dtype=np.intc)
values = frombuffer_empty(values, dtype=np.intc)
print('j_indices size: {}'.format(array_size(j_indices)))
print('indptr size: {}'.format(array_size(indptr)))
print('values size: {}'.format(array_size(values)))
X = sp.csr_matrix((values, j_indices, indptr),
shape=(len(indptr) - 1, len(vocabulary)),
dtype=self.dtype)
print('X before sort_indices size: {}'.format(array_size(X)))
X.sort_indices()
print('final X size: {}'.format(array_size(X)))
print('vocabulary size: {}'.format(getsizeof(vocabulary) / mega))
return vocabulary, X
class FastAupiffCountVectorizer(LeanCountVectorizer):
def _count_vocab(self, raw_documents, fixed_vocab):
"""Create sparse feature matrix, and vocabulary where fixed_vocab=False
"""
if fixed_vocab:
vocabulary = self.vocabulary_
else:
# Add a new value when a new vocabulary item is seen
vocabulary = defaultdict()
vocabulary.default_factory = vocabulary.__len__
analyze = self.build_analyzer()
j_indices = _make_int_array()
indptr = _make_int_array()
values = _make_int_array()
indptr.append(0)
for doc in raw_documents:
doc_indices = _make_int_array()
for feature in analyze(doc):
try:
doc_indices.append(vocabulary[feature])
except KeyError:
# Ignore out-of-vocabulary items for fixed_vocab=True
continue
# sum duplicates for doc features
doc_indices = np.frombuffer(doc_indices, dtype=np.intc)
doc_indices.sort()
diff = np.concatenate(([1], np.diff(doc_indices)))
idx = np.concatenate((np.where(diff)[0], [len(doc_indices)]))
# append doc to global indices and values
j_indices.extend(doc_indices[idx[:-1]])
values.extend(np.diff(idx))
indptr.append(len(j_indices))
if not fixed_vocab:
# disable defaultdict behaviour
vocabulary = dict(vocabulary)
if not vocabulary:
raise ValueError("empty vocabulary; perhaps the documents only"
" contain stop words")
j_indices = frombuffer_empty(j_indices, dtype=np.intc)
indptr = np.frombuffer(indptr, dtype=np.intc)
values = frombuffer_empty(values, dtype=np.intc)
print('j_indices size: {}'.format(array_size(j_indices)))
print('indptr size: {}'.format(array_size(indptr)))
print('values size: {}'.format(array_size(values)))
X = sp.csr_matrix((values, j_indices, indptr),
shape=(len(indptr) - 1, len(vocabulary)),
dtype=self.dtype)
print('X before sort_indices size: {}'.format(array_size(X)))
print('final X size: {}'.format(array_size(X)))
print('vocabulary size: {}'.format(getsizeof(vocabulary) / mega))
return vocabulary, X
class FastAupiffCountVectorizer(LeanCountVectorizer):
def _count_vocab(self, raw_documents, fixed_vocab):
"""Create sparse feature matrix, and vocabulary where fixed_vocab=False
"""
if fixed_vocab:
vocabulary = self.vocabulary_
else:
# Add a new value when a new vocabulary item is seen
vocabulary = defaultdict()
vocabulary.default_factory = vocabulary.__len__
analyze = self.build_analyzer()
j_indices = _make_int_array()
indptr = _make_int_array()
values = _make_int_array()
indptr.append(0)
for doc in raw_documents:
feature_counter = {}
for feature in analyze(doc):
try:
feature_idx = vocabulary[feature]
if feature_idx not in feature_counter:
feature_counter[feature_idx] = 1
else:
feature_counter[feature_idx] += 1
except KeyError:
# Ignore out-of-vocabulary items for fixed_vocab=True
continue
j_indices.extend(feature_counter.keys())
values.extend(feature_counter.values())
del(feature_counter)
indptr.append(len(j_indices))
if not fixed_vocab:
# disable defaultdict behaviour
vocabulary = dict(vocabulary)
if not vocabulary:
raise ValueError("empty vocabulary; perhaps the documents only"
" contain stop words")
j_indices = frombuffer_empty(j_indices, dtype=np.intc)
indptr = np.frombuffer(indptr, dtype=np.intc)
values = frombuffer_empty(values, dtype=np.intc)
print('j_indices size: {}'.format(array_size(j_indices)))
print('indptr size: {}'.format(array_size(indptr)))
print('values size: {}'.format(array_size(values)))
X = sp.csr_matrix((values, j_indices, indptr),
shape=(len(indptr) - 1, len(vocabulary)),
dtype=self.dtype)
print('X before sort_indices size: {}'.format(array_size(X)))
X.sort_indices()
print('final X size: {}'.format(array_size(X)))
print('vocabulary size: {}'.format(getsizeof(vocabulary) / mega))
return vocabulary, X
def iterate_enron_docs(path):
num = 0
for root, dirs, files in os.walk(path):
for name in files:
num += 1
if num % 10000 == 0:
print(num)
with open(join(root, name), 'r') as fh:
try:
yield fh.read()
except UnicodeDecodeError:
pass
# if num > 20000:
# break
def main(args):
in_path, out_path = args[0], args[1]
#vectorizer = DebugCountVectorizer()
#vectorizer = DebugHashingVectorizer()
#vectorizer = LeanHashingVectorizer()
#vectorizer = LeanCountVectorizer(dtype=np.int32)
#vectorizer = AupiffCountVectorizer()
#vectorizer = FastAupiffCountVectorizer()
vectorizer = FastAupiffCountVectorizer2()
#vectorizer = DebugHashingVectorizer()
X = vectorizer.fit_transform(iterate_enron_docs(in_path))
#joblib.dump(X, out_path)
if __name__ == '__main__':
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment