Skip to content

Instantly share code, notes, and snippets.

@cowboyd
cowboyd / 01-take-latest.ts
Created May 16, 2023 15:12
Explicit Drop vs Implicit drop.
export const takeLatest = (patternOrChannel, saga, ...args) => fork(function*() {
let lastTask
while (true) {
const action = yield* take(patternOrChannel)
if (lastTask) {
yield cancel(lastTask) // cancel is no-op if the task has already terminated
}
lastTask = yield* fork(saga, ...args.concat(action))
}
})
export interface HeartbeatAction {
type: "heartbeat";
count: number;
}
@cowboyd
cowboyd / lift.ts
Created April 21, 2023 15:38
Lift a simple function into an Effection operation
export function fn<TArgs extends unknown[],TReturn>(fn: (...args: TArgs) => TReturn): (...args: TArgs) => Operation<TReturn> {
return function* (...args: TArgs) {
return fn(...args);
}
}
// => map(fn(data => data.type))
@cowboyd
cowboyd / stream-operations.ts
Created April 19, 2023 19:39
Hypothetical composable stream operations
import { spawn, pipe, type Operation } from "./mod.ts";
export type Stream<T, TClose> = Operation<Subscription<T, TClose>>;
export interface Subscription<T, TClose> {
next(): Operation<IteratorResult<T, TClose>>;
}
export interface Buffer<T, TClose> extends Subscription<T, TClose> {
@cowboyd
cowboyd / 01-buffer-flow.ts
Last active April 19, 2023 17:38
Piping, buffering, filtering, mapping, and general hypothetical combinations API.
import { buffer, on, flow, filter } from "effection";
let results = flow(
on('message'),
buffer({ size: 100 }),
map(message => JSON.parse(message.data)),
filter(data => data.type === "result"),
);
let websocket = new WebSocket('wss://localhost:9000');
@cowboyd
cowboyd / events.ts
Created April 10, 2023 17:02
Event Listeners in Effection
import type { Operation, Stream } from "https://esm.sh/effection@3.0.0-alpha.7";
import {
action,
createChannel,
type Resolve,
resource,
suspend,
useScope,
} from "https://esm.sh/effection@3.0.0-alpha.7";
@cowboyd
cowboyd / 01-async.ts
Created April 6, 2023 14:00
An "async" function to turn any `Computation` into a `Promise`
import { evaluate, reset, shift, type Computation } from "https://deno.land/x/continuation@0.1.5/mod.ts";
function async<T>(op: () => Computation<T>): Computation<Promise<T>> {
return reset(function*() {
let { resolve, reject } = yield* shift(function*(k) {
return new Promise((resolve, reject) => k({ resolve, reject }));
});
try {
resolve(yield* op());
} catch (error) {
@cowboyd
cowboyd / finders.ts
Last active April 4, 2023 20:25
"Early Return" using delimited continuations
// could be really expensive if the enumerable is really long.
[...worlds].find(x => x.name === "Arcadia");
// this will return early the moment that the predicate matches
function* find(enumerable, predicate) {
return yield* shift(function*(k) {
for (let item of enumerable) {
if (predicate(item) {
k(item);
@cowboyd
cowboyd / 01-scope.ts
Last active March 29, 2023 16:22
Beefed up `Scope`
import type { Stream, Task, Operation, Result } from "./types.ts";
export interface Scope extends Operation<void> {
run<T>(operation: () => Operation<T>): Task<T>;
close(): Operation<void>;
created: Stream<Task<unknown>, void>;
enqueued: Stream<Operation<unknown>, void>;
dropped: Stream<Operation<unknown>, void>;
results: Stream<{task: Task<unknown>, result: Result<unknown>}, void>;
}
@cowboyd
cowboyd / go.ts
Created March 25, 2023 00:17
go in terms of spawn
import type { Operation, Task, Result, Port } from "https://deno.land/x/effection/mod.ts";
import { action, spawn, createContext } from "https://deno.land/x/effection/mod.ts";
const ErrorContext = createContext<Port<Error, never>>("error");
export function* go<T>(op: () => Operation<T>): Operation<Task<Result<T>>> {
return yield* spawn(function*() {
try {
return Ok(yield* call(op));
} catch (error) {