Skip to content

Instantly share code, notes, and snippets.

@conartist6
Last active March 10, 2019 18:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save conartist6/e3e7e19fde884b920fca7359163640ec to your computer and use it in GitHub Desktop.
Save conartist6/e3e7e19fde884b920fca7359163640ec to your computer and use it in GitHub Desktop.
iter-tools groupBy
import { iterableCurry } from './internal/iterable'
import { WeakExchange } from './internal/queues'
import consume from './consume'
function groupBy (getKey = (k, i) => k, iterable) {
let iterator
let idx = 0
const weakExchange = new WeakExchange()
let consumer
let done = false
let item
let nGroups = 0
let groupsConsumed = false
function fetch (expectedKey = {}) {
const { done: _done, value } = iterator.next()
done = _done
if (!done) {
const key = getKey(value, idx++)
item = { value, key }
if (expectedKey !== key) {
consumer = weakExchange.spawnConsumer()
}
weakExchange.push(item)
}
}
function fetchGroup (key) {
while (!done && item.key === key) fetch(key)
}
function * generateGroup (key, consumer, idx) {
try {
yield '__ENSURE_FINALLY__'
do {
if (consumer.peek().key !== key) {
break
}
const cachedItem = consumer.shift()
if (consumer.isEmpty()) {
fetch(key)
}
yield cachedItem.value
} while (!done)
} finally {
if (groupsConsumed && !done && idx === nGroups && typeof iterator.return === "function") {
iterator.return()
}
}
}
function * generateGroups () {
try {
iterator = iterable[Symbol.iterator]()
consumer = weakExchange.spawnConsumer()
fetch()
while (!done) {
const { key } = item
const group = generateGroup(key, consumer, nGroups++)
group.next() // ensure finally
yield [key, group]
fetchGroup(key)
}
} finally {
groupsConsumed = true
}
}
return generateGroups()
}
export default iterableCurry(groupBy, false, 0, 1)
class QueueItem {
constructor (data) {
this.data = data
this.next = null
}
}
export class Queue {
constructor () {
this.head = this.tail = new QueueItem(null)
}
peek() {
return this.head.next.data
}
shift () {
if (this.isEmpty()) throw new Error('Cannot shift empty queue')
const { data } = this.head.next
const head = this.head = this.head.next;
return data
}
push (data) {
const newItem = new QueueItem(data)
this.tail.next = this.tail = newItem
}
isEmpty () {
return !this.head.next
}
}
class Consumer {
constructor (head) {
this.head = head
}
peek() {
return this.head.next.data
}
shift () {
if (this.isEmpty()) throw new Error('Cannot shift empty queue')
const { data } = this.head.next
this.head = this.head.next
return data
}
isEmpty () {
return !this.head.next
}
}
/**
* Exchanges are a specialized kind of singly linked queues. Conceptually they represent a single queue, but
* they employ many consumers (heads) each of which consume the queue data at their own pace. A reference
* to the original head is kept up until `noMoreConsumers` is called, at which point the earlier items can
* be garbage collected.
*
* 0 -> 1 -> 2 -> 3 -> 4 -> 5 -> 6 -> 7 -> 8 -> 9
* ^ <-| ^ ^ ^
* Root ... GC'd when no root | Head 1 Head 2 Tail
*/
export class Exchange extends Queue {
shift () {
throw new Error('Unsupported')
}
spawnConsumer () {
if (!this.head) throw new Error('You cannot spawn a new consumer after setting calling noMoreConsumers')
return new Consumer(this.head)
}
noMoreConsumers () {
// Allow the GC to collect items
this.head = null
}
}
/**
* A WeakExchange is like an exchange but it never has a root, and spawned consumers will not have access
* to items pushed to the exchange prior to the consumers' spawning.
*/
export class WeakExchange extends Queue {
constructor() {
super()
this.head = null
}
shift () {
throw new Error('Unsupported')
}
spawnConsumer () {
return new Consumer(this.tail)
}
}
export const fakeQueue = { push: () => {} }
import { iterableCurry } from './internal/iterable'
import { WeakExchange } from './internal/queues'
import consume from './consume'
const SPLIT = {}
function * splitAll(iterable) {
for (const item of iterable) {
yield [item]
}
}
function splitBy (predicate, iterable) {
if (!predicate) {
return splitAll(iterable)
}
let iterator
let idx = 0
const weakExchange = new WeakExchange()
let done = false
let nGroups = 0
let groupsConsumed = false
let lastGroupConsumed
function fetch () {
const { done: _done, value } = iterator.next()
const newItem = {
done: (done = _done),
value: predicate(value, idx++) ? SPLIT : value
}
weakExchange.push(newItem)
return newItem
}
function fetchGroup () {
let item
while (!item || !(item.done || item.value === SPLIT)) item = fetch()
}
function * generateGroup (consumer, idx) {
try {
yield '__ENSURE_FINALLY__'
while(true) {
if (consumer.isEmpty()) fetch()
const item = consumer.shift()
if (item.value === SPLIT || item.done) break
yield item.value
}
lastGroupConsumed = true
} finally {
if (groupsConsumed && !done && idx === nGroups && typeof iterator.return === "function") {
iterator.return()
}
}
}
function * generateGroups () {
try {
iterator = iterable[Symbol.iterator]()
do {
const consumer = weakExchange.spawnConsumer()
lastGroupConsumed = false
const group = generateGroup(consumer, nGroups++)
group.next() // ensure finally
yield group
if (!lastGroupConsumed) fetchGroup()
} while (!done)
} finally {
groupsConsumed = true
}
}
return generateGroups()
}
export default iterableCurry(splitBy, false, 0, 1)
/* eslint-env node, jest */
const { splitBy, asyncSplitBy, map, toArray, asyncToArray } = require('..')
describe('splitBy', function () {
it('splits all', function () {
const iterable = splitBy('ABBC')
expect(Array.from(iterable)).toEqual([['A'], ['B'], ['B'], ['C']])
})
it('creates empty iterables', function () {
const iterable = splitBy(char => char === 'x', 'x')
expect(Array.from(map(toArray, iterable))).toEqual([[], []])
})
it('splits', function () {
const [a, b, c] = splitBy(char => char === 'x', 'AAAxBCDExF')
expect(Array.from(a)).toEqual(['A', 'A', 'A'])
expect(Array.from(b)).toEqual(['B', 'C', 'D', 'E'])
expect(Array.from(c)).toEqual(['F'])
})
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment