Skip to content

Instantly share code, notes, and snippets.

@bvalosek
Created March 2, 2025 15:54
Show Gist options
  • Save bvalosek/6734cb967c01d833e26d7e84ebd6994d to your computer and use it in GitHub Desktop.
Save bvalosek/6734cb967c01d833e26d7e84ebd6994d to your computer and use it in GitHub Desktop.
import { jsonArrayFrom } from 'kysely/helpers/postgres';
import { range } from 'lodash';
import { appendToCollectorRank, recalculateAndUpdateCollectorRank } from './collector-rank';
import { ActiveTransaction, CoreDependencies } from './core-dependencies';
import { CollectedRecord } from './database/tables';
import {
createDate,
createDateMinutesFromNow,
notMaterialized,
toNumberOnly,
toNumberOnlyAsString,
toNumberOrNull,
toValueOnly,
} from './database/values';
import { assertInvariant, InvariantViolationError } from './errors';
import {
RecordReservationCanceled,
RecordReservationExpired,
RecordsDelivered,
RecordsReserved,
} from './events';
import {
RecordReservationBatchStatus,
RPCInvalidInput,
RPCInvalidRequest,
RPCNotFound,
} from './lib-core';
import { generateId } from './lib/id';
import {
followGroupAndSubscribeToNewReleaseNotifications,
resolveDoesFollowGroup,
} from './subscriptions';
/*
This module deals with all supply business logic for records
- reserving records during checkout
- delivering reserved records
- recalling record supply (for refunds or free collect recalls)
- transfering record ownership to another user
*/
/**
* Statuses that are considered "released" and can be re-used
*
* released statuses will be picked off during reservation out of older batches
*/
export const ReleasedReservationStatuses: RecordReservationBatchStatus[] = [
'canceled',
'expired',
'recalled',
];
/**
* Reserve specific records for a later deliver
*
* Listing must be active , supply checks are throws are done here
*
* This is where all the heavy lifting on assigning and reserving serial
* numbers occurs
*/
export const createRecordVariantReservations = async (input: {
recordId: string;
variantId: string;
quantity: number;
expiresInMinutes: number;
collectorDID?: string;
orderId?: string;
core: CoreDependencies;
trx: ActiveTransaction;
}) => {
// lock the variant
const variant = await input.trx.db
.selectFrom('record_variant as rv')
.innerJoin('record_listing as rl', 'rv.currentListingId', 'rl.id')
.where('rv.recordId', '=', input.recordId)
.where('rv.variantId', '=', input.variantId)
.selectAll()
.select(['rl.maxSupply'])
.forUpdate()
.executeTakeFirst();
if (!variant) throw new RPCNotFound('Record or variant not found');
if (!variant.isActive) throw new RPCInvalidRequest('Collection is not active');
const maxSupply = toNumberOrNull(variant.maxSupply);
const currentSupply = toNumberOnly(variant.currentSupply);
const currentReserved = toNumberOnly(variant.pendingReservations);
const nextSerialNumber = toNumberOnly(variant.nextSerialNumber);
// find and lock any released reservations for this variant that we could
// re-use before allocating more serial numbers
const freeReservations = await input.trx.db
.selectFrom('record_variant_reservation as rvr')
.innerJoin('record_reservation_batch as rrb', 'rvr.batchId', 'rrb.id')
.innerJoin('record_listing as rl', 'rrb.listingId', 'rl.id')
.where('rl.recordId', '=', input.recordId)
.where('rl.variantId', '=', input.variantId)
.where('rrb.status', 'in', ReleasedReservationStatuses)
.orderBy('rvr.serialNumber', 'asc') // prioritize earlier serial numbers
.limit(input.quantity) // we need at most the quantity we are trying to reserve
.select(['rvr.id', 'rvr.serialNumber'])
.forUpdate()
.execute();
// ensure that if this variant has limited supply , we arent about to reserve
// too many -- critical checks here that need to be clean error messages for
// the user
if (maxSupply !== null) {
const available = maxSupply - currentSupply - currentReserved;
assertInvariant(available >= 0, 'available supply is not negative');
// when all records in the edition are fully collected, no chance to get
// any more so 100p fail with no suggestion to retry
if (currentSupply === maxSupply) {
throw new RPCInvalidRequest('All records have been collected');
}
// reservation is going to fail because we cannot allocate enough, work to
// get best error message we can for the user
if (available < input.quantity) {
if (available === 0) {
throw new RPCInvalidRequest('Pending orders for all remaining records, try again later');
}
const lowerTo = `to ${available}${available > 1 ? ' or fewer' : ''}`;
// would have enough if not for pending reservations
if (input.quantity < maxSupply - currentSupply) {
const adjust = input.quantity === 1 ? '' : ` or lower order quantity ${lowerTo}`;
throw new RPCInvalidRequest(`Too many pending record orders, try again later${adjust}`);
}
// else, hitting hard supply limit
throw new RPCInvalidRequest(`Not enough remaining records, lower order quantity ${lowerTo}`);
}
}
// at this point the reservation is going to succeed , determine which serial
// numbers to allocate
let nextNextSerialNumber = nextSerialNumber;
const toReserve = new Set(freeReservations.map((r) => toNumberOnly(r.serialNumber)));
const needToAllocateCount = input.quantity - toReserve.size;
if (needToAllocateCount > 0) {
assertInvariant(
maxSupply === null || nextSerialNumber <= maxSupply,
'Next serial number not over max supply'
);
range(needToAllocateCount).map((n) => toReserve.add(nextSerialNumber + n));
nextNextSerialNumber = nextSerialNumber + needToAllocateCount;
}
// double check we allocated the right amount
assertInvariant(toReserve.size === input.quantity, 'Correct reserved serial count');
// write back to variant row
await input.trx.db
.updateTable('record_variant')
.where('recordId', '=', input.recordId)
.where('variantId', '=', input.variantId)
.set({
// okay to write like this because rv row is locked above
pendingReservations: currentReserved + input.quantity,
nextSerialNumber: nextNextSerialNumber,
})
.execute();
// remove the reservations we are about to re-use to adhere to the unique
// constraint - we will always use all of the found reservations because of
// the limit we set in the initial query
const reservationIdsToDelete = freeReservations.map((r) => r.id);
if (reservationIdsToDelete.length > 0) {
await input.trx.db
.deleteFrom('record_variant_reservation as rvr')
.where('rvr.id', 'in', reservationIdsToDelete)
.execute();
}
// print the new batch and reservations
const batchId = generateId('record_batch');
await input.trx.db
.insertInto('record_reservation_batch')
.values({
id: batchId,
listingId: variant.currentListingId,
collectorDID: input.collectorDID,
orderId: input.orderId,
status: 'pending',
expiresAt: createDateMinutesFromNow(input.expiresInMinutes),
createdAt: createDate(),
})
.execute();
const reservations = [...toReserve].map((serialNumber) => ({
id: generateId('record_variant_reservation'),
batchId,
serialNumber,
_recordId: input.recordId,
_variantId: input.variantId,
}));
await input.trx.db.insertInto('record_variant_reservation').values(reservations).execute();
input.trx.registerSideEffect(() =>
input.core.publishEvent(RecordsReserved, { batchId: batchId })
);
return { batchId, reservations };
};
/**
* Complete the reservation lifecycle as delivered / canceled or expired
*
* Will succeed so long as all invariants pass , all checking happens earlier in
* the reservation process
*/
export const unsafeFulfillRecordVariantReservations = async (input: {
batchId: string;
outcome: 'delivered' | 'canceled' | 'expired';
core: CoreDependencies;
trx: ActiveTransaction;
}) => {
// resolve variant and batch infromation
const batch = await input.trx.db
.selectFrom('record_reservation_batch as rrb')
.where('rrb.id', '=', input.batchId)
.innerJoin('record_listing as rl', 'rrb.listingId', 'rl.id')
.innerJoin('record_variant as rv', (j) =>
j.onRef('rl.recordId', '=', 'rv.recordId').onRef('rl.variantId', '=', 'rv.variantId')
)
.innerJoin('record_listing as current_rl', 'rv.currentListingId', 'current_rl.id')
// not enforcing at most 1 publisher per record here exactly
.innerJoin('catalog_publisher as cp', 'rl.recordId', 'cp.catalogEntryId')
.forUpdate()
.selectAll('rrb')
.select((q) => [
'rv.recordId',
'rv.variantId',
'rv.currentSupply as rv_currentSupply',
'rv.pendingReservations as rv_pendingReservations',
'current_rl.maxSupply as current_rl_maxSupply',
'cp.publisherDID as publisherDID',
q // subquery to avoid issue with locking for udpate on left join
.selectFrom('record_reservation_batch as irrb')
.where('irrb.id', '=', input.batchId)
.leftJoin('order as o', 'irrb.orderId', 'o.id')
.leftJoin('account as a', 'irrb.collectorDID', 'a.did')
.select((q) => [q.fn.coalesce('a.email', 'o.email').as('collectorEmail')])
.as('collectorEmail'),
jsonArrayFrom(
q
.selectFrom('record_variant_reservation as rvr')
.where('rvr.batchId', '=', input.batchId)
.selectAll('rvr')
).as('reservations'),
])
.executeTakeFirst();
if (!batch) throw new InvariantViolationError('Batch exists');
assertInvariant(batch.reservations.length > 0, 'Batch has reservations');
const countInBatch = batch.reservations.length;
const currentSupply = toNumberOnly(batch.rv_currentSupply);
const currentReserved = toNumberOnly(batch.rv_pendingReservations);
const maxSupply = toNumberOrNull(batch.current_rl_maxSupply);
const nextSupplyCount =
input.outcome === 'delivered' ? currentSupply + countInBatch : currentSupply;
const nextReserveCount = currentReserved - countInBatch;
// assert invariants
assertInvariant(batch.status === 'pending', 'Batch is pending');
assertInvariant(batch.deliveredAt === null, 'Batch not delivered');
assertInvariant(batch.canceledAt === null, 'Batch not canceled');
if (input.outcome === 'delivered') {
assertInvariant(batch.expiresAt > createDate(), 'Batch not expired');
}
assertInvariant(nextReserveCount >= 0, 'Next reserve count is not negative');
assertInvariant(
maxSupply === null || nextSupplyCount <= maxSupply,
'Next supply count is not over max'
);
// update variant row
await input.trx.db
.updateTable('record_variant as rv')
.where('rv.recordId', '=', batch.recordId)
.where('rv.variantId', '=', batch.variantId)
.set({ pendingReservations: nextReserveCount, currentSupply: nextSupplyCount })
.execute();
// mark batch as fulfilled
const column =
input.outcome === 'delivered'
? 'deliveredAt'
: input.outcome === 'canceled'
? 'canceledAt'
: 'expiresAt';
const event =
input.outcome === 'delivered'
? RecordsDelivered
: input.outcome === 'canceled'
? RecordReservationCanceled
: RecordReservationExpired;
await input.trx.db
.updateTable('record_reservation_batch')
.where('id', '=', input.batchId)
.set({ [column]: createDate(), status: input.outcome })
.execute();
input.trx.registerSideEffect(() => input.core.publishEvent(event, { batchId: input.batchId }));
// deliver records
let collected: CollectedRecord[] = [];
let collectorRank: number | undefined;
if (input.outcome === 'delivered') {
// where the actual records are created
const now = createDate();
collected = batch.reservations.map((r) => ({
id: generateId('collected'),
editionNumber: r.serialNumber,
listingId: batch.listingId,
orderId: batch.orderId,
collectorDID: batch.collectorDID,
createdAt: now,
}));
await input.trx.db.insertInto('collected_record').values(collected).execute();
// update collector ranks
const append = await appendToCollectorRank({
recordId: batch.recordId,
variantId: toNumberOnlyAsString(batch.variantId),
collectorEmail: toValueOnly(batch.collectorEmail),
core: input.core,
trx: input.trx,
});
collectorRank = append.rank;
// auto follow the group iff not already following AND not unfollowed
const subscriber =
batch.collectorDID !== null
? { userDID: batch.collectorDID }
: { email: toValueOnly(batch.collectorEmail) };
const followStatus = await resolveDoesFollowGroup({
subscriber,
groupDID: batch.publisherDID,
core: input.core,
trx: input.trx,
});
if (followStatus === 'not following') {
await followGroupAndSubscribeToNewReleaseNotifications({
subscriber,
source: 'collect',
groupIdentifier: batch.publisherDID,
core: input.core,
trx: input.trx,
});
}
}
return { collected, collectorRank };
};
export const unsafeRecallCollectedRecordsEx = async (input: {
collectedRecordIds: string[];
core: CoreDependencies;
trx: ActiveTransaction;
}) => {
// resolve batch etc
const collected = await input.trx.db
.with(notMaterialized('res'), (q) =>
q
.selectFrom('record_variant_reservation as rvr')
.innerJoin('record_reservation_batch as rrb', 'rvr.batchId', 'rrb.id')
.innerJoin('record_listing as rl', 'rrb.listingId', 'rl.id')
.where('rrb.status', '=', 'delivered')
.select(['rl.recordId', 'rl.variantId', 'rvr.serialNumber', 'rrb.id as batchId'])
)
.selectFrom('collected_record as cr')
.where('cr.id', 'in', input.collectedRecordIds)
.innerJoin('record_listing as rl', 'cr.listingId', 'rl.id')
.innerJoin('res', (j) =>
j //
.onRef('rl.recordId', '=', 'res.recordId')
.onRef('rl.variantId', '=', 'res.variantId')
.onRef('cr.editionNumber', '=', 'res.serialNumber')
)
.selectAll('cr')
.select(['rl.recordId', 'rl.variantId', 'res.batchId'])
.orderBy('cr.editionNumber', 'asc')
.execute();
// check invariants
const batchIds = new Set(collected.map((c) => c.batchId));
const recordIds = new Set(collected.map((c) => c.recordId));
const variantIds = new Set(collected.map((c) => c.variantId));
if (collected.length !== input.collectedRecordIds.length) {
throw new RPCInvalidRequest('Not all collected records found');
}
if (recordIds.size !== 1) {
throw new RPCInvalidRequest('Collected records must be from same release');
}
if (variantIds.size !== 1) {
throw new RPCInvalidRequest('Collected records must be from same variant');
}
if (batchIds.size !== 1) {
throw new RPCInvalidRequest('Collected records must be from same reservation batch');
}
const batchId = [...batchIds][0];
const reserved = await input.trx.db
.selectFrom('record_variant_reservation')
.forUpdate()
.where('batchId', '=', batchId)
.execute();
assertInvariant(reserved.length === collected.length, 'all records from batch being recalled');
// mark the batch as recalled
await input.trx.db
.updateTable('record_reservation_batch')
.where('id', '=', batchId)
.set({ status: 'recalled' })
.execute();
// reduce denormalized supply record variant
const recordId = [...recordIds][0];
const variantId = [...variantIds][0];
const update = await input.trx.db
.updateTable('record_variant as rv')
.where('rv.recordId', '=', recordId)
.where('rv.variantId', '=', variantId)
.set({ currentSupply: (q) => q('rv.currentSupply', '-', reserved.length) })
.returning(['rv.currentSupply'])
.execute();
assertInvariant(update.length === 1, 'record variant updated');
assertInvariant(toNumberOnly(update[0].currentSupply) >= 0, 'current supply is not negative');
// log the recalled supply
const recalled = collected.map((cr) => ({
id: generateId('recalled'),
listingId: cr.listingId,
collectedRecordId: cr.id,
recordReservationBatchId: batchId,
orderId: cr.orderId,
// TODO: change to serial number
editionNumber: cr.editionNumber,
collectorDID: cr.collectorDID,
collectedAt: cr.createdAt,
createdAt: createDate(),
}));
await input.trx.db.insertInto('recalled_supply').values(recalled).execute();
// remove the collected records
const deleted = await input.trx.db
.deleteFrom('collected_record as cr')
.where('cr.id', 'in', input.collectedRecordIds)
.returning('cr.id')
.execute();
assertInvariant(deleted.length === recalled.length, 'collected records deleted');
// recompute collector rank
await recalculateAndUpdateCollectorRank({
recordId,
variantId: toNumberOnlyAsString(variantId),
core: input.core,
trx: input.trx,
});
return { recordId, variantId, batchId, collects: collected, recalled };
};
/**
* Transfer a record from a guest collector to a user
*/
export const unsafeTransferGuestCollectedRecord = async (input: {
collectedRecordIds: string[];
collectorDID: string;
core: CoreDependencies;
trx: ActiveTransaction;
}) => {
const existing = await input.trx.db
.selectFrom('collected_record as cr')
.innerJoin('record_listing as rl', 'cr.listingId', 'rl.id')
.innerJoin('record_variant as rv', (j) =>
j.onRef('rl.recordId', '=', 'rv.recordId').onRef('rl.variantId', '=', 'rv.variantId')
)
.where('cr.id', 'in', input.collectedRecordIds)
.selectAll('cr')
.select(['rl.recordId', 'rl.variantId'])
.execute();
if (existing.length !== input.collectedRecordIds.length) {
throw new RPCInvalidInput('Not all collected records found');
}
if (existing.some((cr) => cr.collectorDID)) {
throw new RPCInvalidInput('Collected records must not already be owned');
}
const recordIds = new Set(existing.map((cr) => cr.recordId));
const variantIds = new Set(existing.map((cr) => toNumberOnlyAsString(cr.variantId)));
if (recordIds.size !== 1) {
throw new RPCInvalidInput('Collected records must be from same release');
}
if (variantIds.size !== 1) {
throw new RPCInvalidInput('Collected records must be from same variant');
}
const recordId = [...recordIds][0];
const variantId = [...variantIds][0];
const update = await input.trx.db
.updateTable('collected_record as cr')
.where('cr.id', 'in', input.collectedRecordIds)
.set({ collectorDID: input.collectorDID })
.returning('cr.id')
.execute();
assertInvariant(update.length === input.collectedRecordIds.length, 'collected records updated');
// recompute collector rank
await recalculateAndUpdateCollectorRank({
recordId,
variantId,
core: input.core,
trx: input.trx,
});
// TODO: events
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment