Created
March 2, 2025 15:54
-
-
Save bvalosek/6734cb967c01d833e26d7e84ebd6994d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { jsonArrayFrom } from 'kysely/helpers/postgres'; | |
import { range } from 'lodash'; | |
import { appendToCollectorRank, recalculateAndUpdateCollectorRank } from './collector-rank'; | |
import { ActiveTransaction, CoreDependencies } from './core-dependencies'; | |
import { CollectedRecord } from './database/tables'; | |
import { | |
createDate, | |
createDateMinutesFromNow, | |
notMaterialized, | |
toNumberOnly, | |
toNumberOnlyAsString, | |
toNumberOrNull, | |
toValueOnly, | |
} from './database/values'; | |
import { assertInvariant, InvariantViolationError } from './errors'; | |
import { | |
RecordReservationCanceled, | |
RecordReservationExpired, | |
RecordsDelivered, | |
RecordsReserved, | |
} from './events'; | |
import { | |
RecordReservationBatchStatus, | |
RPCInvalidInput, | |
RPCInvalidRequest, | |
RPCNotFound, | |
} from './lib-core'; | |
import { generateId } from './lib/id'; | |
import { | |
followGroupAndSubscribeToNewReleaseNotifications, | |
resolveDoesFollowGroup, | |
} from './subscriptions'; | |
/* | |
This module deals with all supply business logic for records | |
- reserving records during checkout | |
- delivering reserved records | |
- recalling record supply (for refunds or free collect recalls) | |
- transfering record ownership to another user | |
*/ | |
/** | |
* Statuses that are considered "released" and can be re-used | |
* | |
* released statuses will be picked off during reservation out of older batches | |
*/ | |
export const ReleasedReservationStatuses: RecordReservationBatchStatus[] = [ | |
'canceled', | |
'expired', | |
'recalled', | |
]; | |
/** | |
* Reserve specific records for a later deliver | |
* | |
* Listing must be active , supply checks are throws are done here | |
* | |
* This is where all the heavy lifting on assigning and reserving serial | |
* numbers occurs | |
*/ | |
export const createRecordVariantReservations = async (input: { | |
recordId: string; | |
variantId: string; | |
quantity: number; | |
expiresInMinutes: number; | |
collectorDID?: string; | |
orderId?: string; | |
core: CoreDependencies; | |
trx: ActiveTransaction; | |
}) => { | |
// lock the variant | |
const variant = await input.trx.db | |
.selectFrom('record_variant as rv') | |
.innerJoin('record_listing as rl', 'rv.currentListingId', 'rl.id') | |
.where('rv.recordId', '=', input.recordId) | |
.where('rv.variantId', '=', input.variantId) | |
.selectAll() | |
.select(['rl.maxSupply']) | |
.forUpdate() | |
.executeTakeFirst(); | |
if (!variant) throw new RPCNotFound('Record or variant not found'); | |
if (!variant.isActive) throw new RPCInvalidRequest('Collection is not active'); | |
const maxSupply = toNumberOrNull(variant.maxSupply); | |
const currentSupply = toNumberOnly(variant.currentSupply); | |
const currentReserved = toNumberOnly(variant.pendingReservations); | |
const nextSerialNumber = toNumberOnly(variant.nextSerialNumber); | |
// find and lock any released reservations for this variant that we could | |
// re-use before allocating more serial numbers | |
const freeReservations = await input.trx.db | |
.selectFrom('record_variant_reservation as rvr') | |
.innerJoin('record_reservation_batch as rrb', 'rvr.batchId', 'rrb.id') | |
.innerJoin('record_listing as rl', 'rrb.listingId', 'rl.id') | |
.where('rl.recordId', '=', input.recordId) | |
.where('rl.variantId', '=', input.variantId) | |
.where('rrb.status', 'in', ReleasedReservationStatuses) | |
.orderBy('rvr.serialNumber', 'asc') // prioritize earlier serial numbers | |
.limit(input.quantity) // we need at most the quantity we are trying to reserve | |
.select(['rvr.id', 'rvr.serialNumber']) | |
.forUpdate() | |
.execute(); | |
// ensure that if this variant has limited supply , we arent about to reserve | |
// too many -- critical checks here that need to be clean error messages for | |
// the user | |
if (maxSupply !== null) { | |
const available = maxSupply - currentSupply - currentReserved; | |
assertInvariant(available >= 0, 'available supply is not negative'); | |
// when all records in the edition are fully collected, no chance to get | |
// any more so 100p fail with no suggestion to retry | |
if (currentSupply === maxSupply) { | |
throw new RPCInvalidRequest('All records have been collected'); | |
} | |
// reservation is going to fail because we cannot allocate enough, work to | |
// get best error message we can for the user | |
if (available < input.quantity) { | |
if (available === 0) { | |
throw new RPCInvalidRequest('Pending orders for all remaining records, try again later'); | |
} | |
const lowerTo = `to ${available}${available > 1 ? ' or fewer' : ''}`; | |
// would have enough if not for pending reservations | |
if (input.quantity < maxSupply - currentSupply) { | |
const adjust = input.quantity === 1 ? '' : ` or lower order quantity ${lowerTo}`; | |
throw new RPCInvalidRequest(`Too many pending record orders, try again later${adjust}`); | |
} | |
// else, hitting hard supply limit | |
throw new RPCInvalidRequest(`Not enough remaining records, lower order quantity ${lowerTo}`); | |
} | |
} | |
// at this point the reservation is going to succeed , determine which serial | |
// numbers to allocate | |
let nextNextSerialNumber = nextSerialNumber; | |
const toReserve = new Set(freeReservations.map((r) => toNumberOnly(r.serialNumber))); | |
const needToAllocateCount = input.quantity - toReserve.size; | |
if (needToAllocateCount > 0) { | |
assertInvariant( | |
maxSupply === null || nextSerialNumber <= maxSupply, | |
'Next serial number not over max supply' | |
); | |
range(needToAllocateCount).map((n) => toReserve.add(nextSerialNumber + n)); | |
nextNextSerialNumber = nextSerialNumber + needToAllocateCount; | |
} | |
// double check we allocated the right amount | |
assertInvariant(toReserve.size === input.quantity, 'Correct reserved serial count'); | |
// write back to variant row | |
await input.trx.db | |
.updateTable('record_variant') | |
.where('recordId', '=', input.recordId) | |
.where('variantId', '=', input.variantId) | |
.set({ | |
// okay to write like this because rv row is locked above | |
pendingReservations: currentReserved + input.quantity, | |
nextSerialNumber: nextNextSerialNumber, | |
}) | |
.execute(); | |
// remove the reservations we are about to re-use to adhere to the unique | |
// constraint - we will always use all of the found reservations because of | |
// the limit we set in the initial query | |
const reservationIdsToDelete = freeReservations.map((r) => r.id); | |
if (reservationIdsToDelete.length > 0) { | |
await input.trx.db | |
.deleteFrom('record_variant_reservation as rvr') | |
.where('rvr.id', 'in', reservationIdsToDelete) | |
.execute(); | |
} | |
// print the new batch and reservations | |
const batchId = generateId('record_batch'); | |
await input.trx.db | |
.insertInto('record_reservation_batch') | |
.values({ | |
id: batchId, | |
listingId: variant.currentListingId, | |
collectorDID: input.collectorDID, | |
orderId: input.orderId, | |
status: 'pending', | |
expiresAt: createDateMinutesFromNow(input.expiresInMinutes), | |
createdAt: createDate(), | |
}) | |
.execute(); | |
const reservations = [...toReserve].map((serialNumber) => ({ | |
id: generateId('record_variant_reservation'), | |
batchId, | |
serialNumber, | |
_recordId: input.recordId, | |
_variantId: input.variantId, | |
})); | |
await input.trx.db.insertInto('record_variant_reservation').values(reservations).execute(); | |
input.trx.registerSideEffect(() => | |
input.core.publishEvent(RecordsReserved, { batchId: batchId }) | |
); | |
return { batchId, reservations }; | |
}; | |
/** | |
* Complete the reservation lifecycle as delivered / canceled or expired | |
* | |
* Will succeed so long as all invariants pass , all checking happens earlier in | |
* the reservation process | |
*/ | |
export const unsafeFulfillRecordVariantReservations = async (input: { | |
batchId: string; | |
outcome: 'delivered' | 'canceled' | 'expired'; | |
core: CoreDependencies; | |
trx: ActiveTransaction; | |
}) => { | |
// resolve variant and batch infromation | |
const batch = await input.trx.db | |
.selectFrom('record_reservation_batch as rrb') | |
.where('rrb.id', '=', input.batchId) | |
.innerJoin('record_listing as rl', 'rrb.listingId', 'rl.id') | |
.innerJoin('record_variant as rv', (j) => | |
j.onRef('rl.recordId', '=', 'rv.recordId').onRef('rl.variantId', '=', 'rv.variantId') | |
) | |
.innerJoin('record_listing as current_rl', 'rv.currentListingId', 'current_rl.id') | |
// not enforcing at most 1 publisher per record here exactly | |
.innerJoin('catalog_publisher as cp', 'rl.recordId', 'cp.catalogEntryId') | |
.forUpdate() | |
.selectAll('rrb') | |
.select((q) => [ | |
'rv.recordId', | |
'rv.variantId', | |
'rv.currentSupply as rv_currentSupply', | |
'rv.pendingReservations as rv_pendingReservations', | |
'current_rl.maxSupply as current_rl_maxSupply', | |
'cp.publisherDID as publisherDID', | |
q // subquery to avoid issue with locking for udpate on left join | |
.selectFrom('record_reservation_batch as irrb') | |
.where('irrb.id', '=', input.batchId) | |
.leftJoin('order as o', 'irrb.orderId', 'o.id') | |
.leftJoin('account as a', 'irrb.collectorDID', 'a.did') | |
.select((q) => [q.fn.coalesce('a.email', 'o.email').as('collectorEmail')]) | |
.as('collectorEmail'), | |
jsonArrayFrom( | |
q | |
.selectFrom('record_variant_reservation as rvr') | |
.where('rvr.batchId', '=', input.batchId) | |
.selectAll('rvr') | |
).as('reservations'), | |
]) | |
.executeTakeFirst(); | |
if (!batch) throw new InvariantViolationError('Batch exists'); | |
assertInvariant(batch.reservations.length > 0, 'Batch has reservations'); | |
const countInBatch = batch.reservations.length; | |
const currentSupply = toNumberOnly(batch.rv_currentSupply); | |
const currentReserved = toNumberOnly(batch.rv_pendingReservations); | |
const maxSupply = toNumberOrNull(batch.current_rl_maxSupply); | |
const nextSupplyCount = | |
input.outcome === 'delivered' ? currentSupply + countInBatch : currentSupply; | |
const nextReserveCount = currentReserved - countInBatch; | |
// assert invariants | |
assertInvariant(batch.status === 'pending', 'Batch is pending'); | |
assertInvariant(batch.deliveredAt === null, 'Batch not delivered'); | |
assertInvariant(batch.canceledAt === null, 'Batch not canceled'); | |
if (input.outcome === 'delivered') { | |
assertInvariant(batch.expiresAt > createDate(), 'Batch not expired'); | |
} | |
assertInvariant(nextReserveCount >= 0, 'Next reserve count is not negative'); | |
assertInvariant( | |
maxSupply === null || nextSupplyCount <= maxSupply, | |
'Next supply count is not over max' | |
); | |
// update variant row | |
await input.trx.db | |
.updateTable('record_variant as rv') | |
.where('rv.recordId', '=', batch.recordId) | |
.where('rv.variantId', '=', batch.variantId) | |
.set({ pendingReservations: nextReserveCount, currentSupply: nextSupplyCount }) | |
.execute(); | |
// mark batch as fulfilled | |
const column = | |
input.outcome === 'delivered' | |
? 'deliveredAt' | |
: input.outcome === 'canceled' | |
? 'canceledAt' | |
: 'expiresAt'; | |
const event = | |
input.outcome === 'delivered' | |
? RecordsDelivered | |
: input.outcome === 'canceled' | |
? RecordReservationCanceled | |
: RecordReservationExpired; | |
await input.trx.db | |
.updateTable('record_reservation_batch') | |
.where('id', '=', input.batchId) | |
.set({ [column]: createDate(), status: input.outcome }) | |
.execute(); | |
input.trx.registerSideEffect(() => input.core.publishEvent(event, { batchId: input.batchId })); | |
// deliver records | |
let collected: CollectedRecord[] = []; | |
let collectorRank: number | undefined; | |
if (input.outcome === 'delivered') { | |
// where the actual records are created | |
const now = createDate(); | |
collected = batch.reservations.map((r) => ({ | |
id: generateId('collected'), | |
editionNumber: r.serialNumber, | |
listingId: batch.listingId, | |
orderId: batch.orderId, | |
collectorDID: batch.collectorDID, | |
createdAt: now, | |
})); | |
await input.trx.db.insertInto('collected_record').values(collected).execute(); | |
// update collector ranks | |
const append = await appendToCollectorRank({ | |
recordId: batch.recordId, | |
variantId: toNumberOnlyAsString(batch.variantId), | |
collectorEmail: toValueOnly(batch.collectorEmail), | |
core: input.core, | |
trx: input.trx, | |
}); | |
collectorRank = append.rank; | |
// auto follow the group iff not already following AND not unfollowed | |
const subscriber = | |
batch.collectorDID !== null | |
? { userDID: batch.collectorDID } | |
: { email: toValueOnly(batch.collectorEmail) }; | |
const followStatus = await resolveDoesFollowGroup({ | |
subscriber, | |
groupDID: batch.publisherDID, | |
core: input.core, | |
trx: input.trx, | |
}); | |
if (followStatus === 'not following') { | |
await followGroupAndSubscribeToNewReleaseNotifications({ | |
subscriber, | |
source: 'collect', | |
groupIdentifier: batch.publisherDID, | |
core: input.core, | |
trx: input.trx, | |
}); | |
} | |
} | |
return { collected, collectorRank }; | |
}; | |
export const unsafeRecallCollectedRecordsEx = async (input: { | |
collectedRecordIds: string[]; | |
core: CoreDependencies; | |
trx: ActiveTransaction; | |
}) => { | |
// resolve batch etc | |
const collected = await input.trx.db | |
.with(notMaterialized('res'), (q) => | |
q | |
.selectFrom('record_variant_reservation as rvr') | |
.innerJoin('record_reservation_batch as rrb', 'rvr.batchId', 'rrb.id') | |
.innerJoin('record_listing as rl', 'rrb.listingId', 'rl.id') | |
.where('rrb.status', '=', 'delivered') | |
.select(['rl.recordId', 'rl.variantId', 'rvr.serialNumber', 'rrb.id as batchId']) | |
) | |
.selectFrom('collected_record as cr') | |
.where('cr.id', 'in', input.collectedRecordIds) | |
.innerJoin('record_listing as rl', 'cr.listingId', 'rl.id') | |
.innerJoin('res', (j) => | |
j // | |
.onRef('rl.recordId', '=', 'res.recordId') | |
.onRef('rl.variantId', '=', 'res.variantId') | |
.onRef('cr.editionNumber', '=', 'res.serialNumber') | |
) | |
.selectAll('cr') | |
.select(['rl.recordId', 'rl.variantId', 'res.batchId']) | |
.orderBy('cr.editionNumber', 'asc') | |
.execute(); | |
// check invariants | |
const batchIds = new Set(collected.map((c) => c.batchId)); | |
const recordIds = new Set(collected.map((c) => c.recordId)); | |
const variantIds = new Set(collected.map((c) => c.variantId)); | |
if (collected.length !== input.collectedRecordIds.length) { | |
throw new RPCInvalidRequest('Not all collected records found'); | |
} | |
if (recordIds.size !== 1) { | |
throw new RPCInvalidRequest('Collected records must be from same release'); | |
} | |
if (variantIds.size !== 1) { | |
throw new RPCInvalidRequest('Collected records must be from same variant'); | |
} | |
if (batchIds.size !== 1) { | |
throw new RPCInvalidRequest('Collected records must be from same reservation batch'); | |
} | |
const batchId = [...batchIds][0]; | |
const reserved = await input.trx.db | |
.selectFrom('record_variant_reservation') | |
.forUpdate() | |
.where('batchId', '=', batchId) | |
.execute(); | |
assertInvariant(reserved.length === collected.length, 'all records from batch being recalled'); | |
// mark the batch as recalled | |
await input.trx.db | |
.updateTable('record_reservation_batch') | |
.where('id', '=', batchId) | |
.set({ status: 'recalled' }) | |
.execute(); | |
// reduce denormalized supply record variant | |
const recordId = [...recordIds][0]; | |
const variantId = [...variantIds][0]; | |
const update = await input.trx.db | |
.updateTable('record_variant as rv') | |
.where('rv.recordId', '=', recordId) | |
.where('rv.variantId', '=', variantId) | |
.set({ currentSupply: (q) => q('rv.currentSupply', '-', reserved.length) }) | |
.returning(['rv.currentSupply']) | |
.execute(); | |
assertInvariant(update.length === 1, 'record variant updated'); | |
assertInvariant(toNumberOnly(update[0].currentSupply) >= 0, 'current supply is not negative'); | |
// log the recalled supply | |
const recalled = collected.map((cr) => ({ | |
id: generateId('recalled'), | |
listingId: cr.listingId, | |
collectedRecordId: cr.id, | |
recordReservationBatchId: batchId, | |
orderId: cr.orderId, | |
// TODO: change to serial number | |
editionNumber: cr.editionNumber, | |
collectorDID: cr.collectorDID, | |
collectedAt: cr.createdAt, | |
createdAt: createDate(), | |
})); | |
await input.trx.db.insertInto('recalled_supply').values(recalled).execute(); | |
// remove the collected records | |
const deleted = await input.trx.db | |
.deleteFrom('collected_record as cr') | |
.where('cr.id', 'in', input.collectedRecordIds) | |
.returning('cr.id') | |
.execute(); | |
assertInvariant(deleted.length === recalled.length, 'collected records deleted'); | |
// recompute collector rank | |
await recalculateAndUpdateCollectorRank({ | |
recordId, | |
variantId: toNumberOnlyAsString(variantId), | |
core: input.core, | |
trx: input.trx, | |
}); | |
return { recordId, variantId, batchId, collects: collected, recalled }; | |
}; | |
/** | |
* Transfer a record from a guest collector to a user | |
*/ | |
export const unsafeTransferGuestCollectedRecord = async (input: { | |
collectedRecordIds: string[]; | |
collectorDID: string; | |
core: CoreDependencies; | |
trx: ActiveTransaction; | |
}) => { | |
const existing = await input.trx.db | |
.selectFrom('collected_record as cr') | |
.innerJoin('record_listing as rl', 'cr.listingId', 'rl.id') | |
.innerJoin('record_variant as rv', (j) => | |
j.onRef('rl.recordId', '=', 'rv.recordId').onRef('rl.variantId', '=', 'rv.variantId') | |
) | |
.where('cr.id', 'in', input.collectedRecordIds) | |
.selectAll('cr') | |
.select(['rl.recordId', 'rl.variantId']) | |
.execute(); | |
if (existing.length !== input.collectedRecordIds.length) { | |
throw new RPCInvalidInput('Not all collected records found'); | |
} | |
if (existing.some((cr) => cr.collectorDID)) { | |
throw new RPCInvalidInput('Collected records must not already be owned'); | |
} | |
const recordIds = new Set(existing.map((cr) => cr.recordId)); | |
const variantIds = new Set(existing.map((cr) => toNumberOnlyAsString(cr.variantId))); | |
if (recordIds.size !== 1) { | |
throw new RPCInvalidInput('Collected records must be from same release'); | |
} | |
if (variantIds.size !== 1) { | |
throw new RPCInvalidInput('Collected records must be from same variant'); | |
} | |
const recordId = [...recordIds][0]; | |
const variantId = [...variantIds][0]; | |
const update = await input.trx.db | |
.updateTable('collected_record as cr') | |
.where('cr.id', 'in', input.collectedRecordIds) | |
.set({ collectorDID: input.collectorDID }) | |
.returning('cr.id') | |
.execute(); | |
assertInvariant(update.length === input.collectedRecordIds.length, 'collected records updated'); | |
// recompute collector rank | |
await recalculateAndUpdateCollectorRank({ | |
recordId, | |
variantId, | |
core: input.core, | |
trx: input.trx, | |
}); | |
// TODO: events | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment