This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export const takeLatest = (patternOrChannel, saga, ...args) => fork(function*() { | |
let lastTask | |
while (true) { | |
const action = yield* take(patternOrChannel) | |
if (lastTask) { | |
yield cancel(lastTask) // cancel is no-op if the task has already terminated | |
} | |
lastTask = yield* fork(saga, ...args.concat(action)) | |
} | |
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export interface HeartbeatAction { | |
type: "heartbeat"; | |
count: number; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export function fn<TArgs extends unknown[],TReturn>(fn: (...args: TArgs) => TReturn): (...args: TArgs) => Operation<TReturn> { | |
return function* (...args: TArgs) { | |
return fn(...args); | |
} | |
} | |
// => map(fn(data => data.type)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { spawn, pipe, type Operation } from "./mod.ts"; | |
export type Stream<T, TClose> = Operation<Subscription<T, TClose>>; | |
export interface Subscription<T, TClose> { | |
next(): Operation<IteratorResult<T, TClose>>; | |
} | |
export interface Buffer<T, TClose> extends Subscription<T, TClose> { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { buffer, on, flow, filter } from "effection"; | |
let results = flow( | |
on('message'), | |
buffer({ size: 100 }), | |
map(message => JSON.parse(message.data)), | |
filter(data => data.type === "result"), | |
); | |
let websocket = new WebSocket('wss://localhost:9000'); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import type { Operation, Stream } from "https://esm.sh/effection@3.0.0-alpha.7"; | |
import { | |
action, | |
createChannel, | |
type Resolve, | |
resource, | |
suspend, | |
useScope, | |
} from "https://esm.sh/effection@3.0.0-alpha.7"; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { evaluate, reset, shift, type Computation } from "https://deno.land/x/continuation@0.1.5/mod.ts"; | |
function async<T>(op: () => Computation<T>): Computation<Promise<T>> { | |
return reset(function*() { | |
let { resolve, reject } = yield* shift(function*(k) { | |
return new Promise((resolve, reject) => k({ resolve, reject })); | |
}); | |
try { | |
resolve(yield* op()); | |
} catch (error) { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// could be really expensive if the enumerable is really long. | |
[...worlds].find(x => x.name === "Arcadia"); | |
// this will return early the moment that the predicate matches | |
function* find(enumerable, predicate) { | |
return yield* shift(function*(k) { | |
for (let item of enumerable) { | |
if (predicate(item) { | |
k(item); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import type { Stream, Task, Operation, Result } from "./types.ts"; | |
export interface Scope extends Operation<void> { | |
run<T>(operation: () => Operation<T>): Task<T>; | |
close(): Operation<void>; | |
created: Stream<Task<unknown>, void>; | |
enqueued: Stream<Operation<unknown>, void>; | |
dropped: Stream<Operation<unknown>, void>; | |
results: Stream<{task: Task<unknown>, result: Result<unknown>}, void>; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import type { Operation, Task, Result, Port } from "https://deno.land/x/effection/mod.ts"; | |
import { action, spawn, createContext } from "https://deno.land/x/effection/mod.ts"; | |
const ErrorContext = createContext<Port<Error, never>>("error"); | |
export function* go<T>(op: () => Operation<T>): Operation<Task<Result<T>>> { | |
return yield* spawn(function*() { | |
try { | |
return Ok(yield* call(op)); | |
} catch (error) { |