Skip to content

Instantly share code, notes, and snippets.

@trvswgnr
Last active August 7, 2023 07:31
Show Gist options
  • Save trvswgnr/8e6857a35910bdbd0dab3663e22f1e97 to your computer and use it in GitHub Desktop.
Save trvswgnr/8e6857a35910bdbd0dab3663e22f1e97 to your computer and use it in GitHub Desktop.
daxxer challenge 2
import type {
UserId,
User,
Todo,
IBService,
IQueueService,
ICacheService,
ILogService,
IMonitorService,
} from './types';
export class BService implements IBService {
public users: NonNullable<User>[] = [];
public todos: Todo[] = [];
constructor() { }
async getUser(userId: UserId): Promise<User> {
return this.users.find(user => user.id === userId) || null;
}
async createUser(userId: UserId): Promise<User> {
const user = {
id: userId,
name: `travvy${userId}`,
email: 'l33th4xxx0r@travvyr00lz.lol',
};
this.users.push(user);
return user
}
async createTodo(todo: Todo): Promise<Todo> {
const { id, userId } = todo;
const user = await this.getUser(userId);
if (!user) {
throw new Error(`user does not exist: ${userId}`);
}
const existingTodo = this.todos.find(todo => todo.id === id);
if (existingTodo) {
throw new Error(`todo already exists: ${id}`);
}
this.todos.push(todo);
return todo;
}
}
export class LogService implements ILogService {
public logs: any[] = [];
constructor(private showOutput?: boolean) { }
log(...messages: any[]): void {
if (this.showOutput) {
console.log(...messages);
}
this.logs.push(`LOG: ${messages.join(' ')}`);
}
error(...messages: any[]): void {
if (this.showOutput) {
console.error(...messages);
}
this.logs.push(`ERROR: ${messages.join(' ')}`);
}
}
export class CacheService<T> implements ICacheService<T> {
private cache = new Set<T>();
add(key: T): void {
this.cache.add(key);
}
has(key: T): boolean {
return this.cache.has(key);
}
delete(key: T): void {
this.cache.delete(key);
}
get size(): number {
return this.cache.size;
}
}
export class QueueService<T> implements IQueueService<T> {
private queue: T[] = [];
push(item: T): void {
this.queue.push(item);
}
shift(): T | undefined {
return this.queue.shift();
}
get length(): number {
return this.queue.length;
}
}
export class MonitorService implements IMonitorService {
public monitorCount = 0;
monitor(callback: () => void, interval: number): void {
setInterval(() => {
callback();
this.monitorCount++;
}, interval);
}
}
import { jest, describe, it, expect, beforeEach } from '@jest/globals';
import { SyncService } from './SyncService';
import { EventFromA, ILogService, IQueueService } from './types';
import { QueueService, LogService, BService, MonitorService, CacheService } from './mock-services';
jest.mock('./services');
describe('no race conditions', () => {
let syncService: SyncService;
let mockQueue: IQueueService<EventFromA>;
let mockLogService: ILogService;
beforeEach(() => {
mockQueue = new QueueService();
mockLogService = new LogService();
syncService = new SyncService(
new BService(),
new MonitorService(),
mockLogService,
new CacheService(),
new CacheService(),
mockQueue
);
});
it('should handle race condition when processing events', async () => {
const events = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10].map(id => ({
id,
userId: 1,
type: 'todo.created' as const,
todo: {
id,
userId: 1,
title: `Todo ${id}`,
description: `Description ${id}`,
},
}));
// simulate a delay when processing events
jest.spyOn(syncService, 'process' as any).mockImplementation(() => new Promise(resolve => setTimeout(resolve, 500)));
const handleEvents = events.map(event => syncService.handleEvent(event));
// wait for all events to be handled
await Promise.all(handleEvents);
// wait for the queue to be processed
await new Promise(resolve => setTimeout(resolve, 100));
// check that the 'processQueue' method was not called concurrently
expect(mockLogService.error).not.toHaveBeenCalledWith('the queue is empty, this means we have a race condition');
});
});
import { SyncService } from './SyncService';
import type {
EventId,
UserId,
EventFromA,
} from './types';
import { describe, beforeEach, it, expect } from '@jest/globals';
import { QueueService, LogService, BService, MonitorService, CacheService } from './mock-services';
describe('SyncService', () => {
let bService: BService;
let syncService: SyncService;
let queue: QueueService<EventFromA>;
let userCache: CacheService<UserId>;
let logService: LogService;
let monitorService: MonitorService;
let idempotencyKeys: CacheService<EventId>;
beforeEach(() => {
bService = new BService();
queue = new QueueService();
userCache = new CacheService<UserId>();
logService = new LogService();
monitorService = new MonitorService();
idempotencyKeys = new CacheService<EventId>();
syncService = new SyncService(
bService,
monitorService,
logService,
userCache,
idempotencyKeys,
queue,
);
});
it('should create a user and cache it', async () => {
const event: EventFromA = { id: 1, userId: 1, type: 'user.created' };
syncService.handleEvent(event);
await new Promise(resolve => setTimeout(resolve, 100)); // wait for processing
expect(userCache.has(1)).toBe(true);
});
it('should create a todo and cache it', async () => {
const userEvent: EventFromA = { id: 1, userId: 1, type: 'user.created' };
const todoEvent: EventFromA = { id: 2, userId: 1, type: 'todo.created', todo: { id: 1, userId: 1, title: 'Todo 1', description: 'Description 1' } };
syncService.handleEvent(userEvent);
syncService.handleEvent(todoEvent);
await new Promise(resolve => setTimeout(resolve, 100)); // wait for processing
expect(userCache.has(1)).toBe(true);
});
it('should not create a user if it already exists', async () => {
const event1: EventFromA = { id: 1, userId: 1, type: 'user.created' };
const event2: EventFromA = { id: 2, userId: 1, type: 'user.created' };
syncService.handleEvent(event1);
syncService.handleEvent(event2);
await new Promise(resolve => setTimeout(resolve, 500)); // wait for processing
expect(userCache.size).toBe(1);
expect(bService.users.length).toBe(1);
expect(logService.logs).toContain("LOG: user already exists in cache: 1, skipping...");
});
it('should not create a todo if it already exists', async () => {
const userEvent: EventFromA = { id: 1, userId: 1, type: 'user.created' };
const todoEvent1: EventFromA = { id: 2, userId: 1, type: 'todo.created', todo: { id: 1, userId: 1, title: 'Todo 1', description: 'Description 1' } };
const todoEvent2: EventFromA = { id: 3, userId: 1, type: 'todo.created', todo: { id: 1, userId: 1, title: 'Todo 1', description: 'Description 1' } };
syncService.handleEvent(userEvent);
syncService.handleEvent(todoEvent1);
syncService.handleEvent(todoEvent2);
await new Promise(resolve => setTimeout(resolve, 100)); // wait for processing
expect(userCache.size).toBe(1);
expect(bService.users.length).toBe(1);
expect(logService.logs).toContain("ERROR: failed to process event: 3 - todo already exists: 1");
expect(bService.todos.length).toBe(1);
});
it('should create a new user if it does not exist before creating a todo', async () => {
const todoEvent: EventFromA = { id: 1, userId: 1, type: 'todo.created', todo: { id: 1, userId: 1, title: 'Todo 1', description: 'Description 1' } };
syncService.handleEvent(todoEvent);
await new Promise(resolve => setTimeout(resolve, 100)); // wait for processing
expect(userCache.has(1)).toBe(true);
expect(bService.users.length).toBe(1);
expect(bService.todos.length).toBe(1);
});
it('should handle multiple events concurrently', async () => {
const userEvent1: EventFromA = { id: 1, userId: 1, type: 'user.created' };
const userEvent2: EventFromA = { id: 2, userId: 2, type: 'user.created' };
const todoEvent1: EventFromA = { id: 3, userId: 1, type: 'todo.created', todo: { id: 1, userId: 1, title: 'Todo 1', description: 'Description 1' } };
const todoEvent2: EventFromA = { id: 4, userId: 2, type: 'todo.created', todo: { id: 2, userId: 2, title: 'Todo 2', description: 'Description 2' } };
const eventPromises = [
new Promise((resolve) => setTimeout(() => resolve(syncService.handleEvent(userEvent1)), Math.random() * 1000)),
new Promise((resolve) => setTimeout(() => resolve(syncService.handleEvent(userEvent2)), Math.random() * 1000)),
new Promise((resolve) => setTimeout(() => resolve(syncService.handleEvent(todoEvent1)), Math.random() * 1000)),
new Promise((resolve) => setTimeout(() => resolve(syncService.handleEvent(todoEvent2)), Math.random() * 1000)),
];
await Promise.race(eventPromises);
await new Promise(resolve => setTimeout(resolve, 5000)); // wait for processing
expect(userCache.size).toBe(2);
expect(bService.users.length).toBe(2);
expect(bService.todos.length).toBe(2);
}, 10000);
it('should retry if a network error occurs', async () => {
// mock a network error
let calls = 0;
bService.getUser = async () => {
calls++;
await new Promise(resolve => setTimeout(resolve, 500)); // wait for processing
if (calls === 1) {
throw new Error('network error');
}
return { id: 1, name: 'travvy1', email: 'l33th4xxx0r@travvyr00lz.lol' }
};
const event: EventFromA = { id: 1, userId: 1, type: 'user.created' };
syncService.handleEvent(event);
await new Promise(resolve => setTimeout(resolve, 1000)); // wait for processing
expect(logService.logs).toContain("ERROR: failed to process event: 1 - network error - adding event back to queue...");
await new Promise(resolve => setTimeout(resolve, 500)); // wait for processing
expect(userCache.size).toBe(1);
});
it('should not retry if a non-network error occurs', async () => {
// mock a non-network error
bService.getUser = async () => {
throw new Error('some other error');
};
const event: EventFromA = { id: 1, userId: 1, type: 'user.created' };
syncService.handleEvent(event);
await new Promise(resolve => setTimeout(resolve, 1000)); // wait for processing
expect(logService.logs).toContain("ERROR: failed to process event: 1 - some other error");
expect(userCache.size).toBe(0);
});
it('should process all events even if they come in at different times', async () => {
const user1: EventFromA = { id: 1, userId: 1, type: 'user.created' };
const todo1: EventFromA = { id: 2, userId: 1, type: 'todo.created', todo: { id: 1, userId: 1, title: 'Todo 1', description: 'Description 1' } };
const user2: EventFromA = { id: 3, userId: 2, type: 'user.created' };
const todo2: EventFromA = { id: 4, userId: 2, type: 'todo.created', todo: { id: 2, userId: 2, title: 'Todo 2', description: 'Description 2' } };
setTimeout(() => {
syncService.handleEvent(user2);
syncService.handleEvent(todo2);
}, 2000);
syncService.handleEvent(user1);
syncService.handleEvent(todo1);
await new Promise(resolve => setTimeout(resolve, 3000)); // wait for processing
expect(queue.length).toBe(0); // all events should be processed
expect(userCache.size).toBe(2); // both users should be created
}, 10000);
it('should process all events independently', async () => {
const event1: EventFromA = { id: 1, userId: 1, type: 'user.created' };
const event2: EventFromA = { id: 2, userId: 2, type: 'user.created' };
syncService.handleEvent(event1);
syncService.handleEvent(event2);
// wait some time to simulate waiting for another event
await new Promise(resolve => setTimeout(resolve, 5000));
expect(userCache.has(1)).toBe(true);
expect(userCache.has(2)).toBe(true);
const event3: EventFromA = { id: 3, userId: 3, type: 'user.created' };
syncService.handleEvent(event3);
// wait for the third event to be processed
await new Promise(resolve => setTimeout(resolve, 1000));
expect(userCache.has(3)).toBe(true);
}, 10000);
});
import type {
EventFromA,
EventId,
UserId,
IBService,
IQueueService,
ICacheService,
ILogService,
IMonitorService,
} from './types';
export class SyncService {
private processing: Promise<void> = Promise.resolve();
constructor(
private bService: IBService,
private monitorService: IMonitorService,
private logService: ILogService,
private userCache: ICacheService<UserId>,
private idempotencyCache: ICacheService<EventId>,
private eventQueue: IQueueService<EventFromA>,
) {
this.monitor()
}
async handleEvent(event: EventFromA) {
this.logService.log(`received event: ${event.type}`);
this.eventQueue.push(event);
this.processing = this.processing
.then(() => this.processQueue())
.catch((error) => {
let msg = error instanceof Error ? error.message : 'unknown error';
return this.logService.error(`failed to process queue - ${msg}`);
})
.finally(() => this.processing = Promise.resolve());
}
private async processQueue() {
if (this.eventQueue.length === 0) {
this.logService.error('the queue is empty, this means we have a race condition');
}
while (this.eventQueue.length > 0) {
const event = this.eventQueue.shift()!;
if (this.idempotencyCache.has(event.id)) {
this.logService.log(`event already processed: ${event.id}`);
continue;
}
this.idempotencyCache.add(event.id);
this.logService.log(`processing event: ${event.id}`);
await this.process(event)
.catch(error => {
if (!(error instanceof Error)) throw error;
let message = `failed to process event: ${event.id} - ${error.message}`;
// retry if matches our retry policy
if (this.shouldRetry(error)) {
this.eventQueue.push(event);
this.idempotencyCache.delete(event.id);
message += ' - adding event back to queue...';
}
this.logService.error(message);
});
}
}
private async process(event: EventFromA) {
switch (event.type) {
case 'user.created':
await this.ensureUserExists(event.userId);
break;
case 'todo.created':
if (!event.todo) throw new Error(`missing todo: ${event.id}`);
await this.ensureUserExists(event.userId);
await this.bService.createTodo(event.todo)
.then(() => this.logService.log(`todo created: ${event.todo.id}`));
break;
default:
const invalidEvent = event;
throw new Error(`unknown event type: ${(invalidEvent as any).type}`);
}
}
private async ensureUserExists(userId: UserId) {
if (this.userCache.has(userId)) {
this.logService.log(`user already exists in cache: ${userId}, skipping...`);
return;
}
const user = await this.bService.getUser(userId);
if (user) {
this.userCache.add(userId);
this.logService.log(`user already exists in Service B: ${userId}, skipping...`);
return;
}
this.logService.log(`user does not exist in Service B: ${userId}, creating...`);
await this.bService.createUser(userId)
.then(() => {
this.userCache.add(userId);
this.logService.log(`user created: ${userId}`);
})
.catch(error => {
if (!(error instanceof Error)) throw error;
this.logService.error(`failed to create user: ${userId} - ${error.message}`);
});
}
private monitor() {
this.monitorService.monitor(() => {
this.logService.log(`queue length: ${this.eventQueue.length}`);
this.logService.log(`user cache size: ${this.userCache.size}`);
this.logService.log(`idempotency keys size: ${this.idempotencyCache.size}`);
}, 60000); // monitor every minute
}
shouldRetry(error: Error) {
return error.message.includes('network error');
}
}
export interface IMonitorService {
monitor(...args: any[]): any;
}
export interface ILogService {
log(...messages: any[]): Promise<void> | void;
error(...messages: any[]): Promise<void> | void;
}
export interface IBService {
getUser(userId: UserId): Promise<User>;
createUser(userId: UserId): Promise<User>;
createTodo(todo: Todo): Promise<Todo>;
}
export interface ICacheService<T> {
add(key: T): void;
has(key: T): boolean;
delete(key: T): void;
size: number;
}
export interface IQueueService<T> {
push(item: T): void;
shift(): T | undefined;
length: number;
}
export type UserId = number;
export type TodoId = number;
export type EventId = number;
export type User = {
id: UserId;
name: string;
email: string;
} | null;
export type Todo = {
id: TodoId;
userId: UserId;
title: string;
description: string;
}
export type BaseEvent<T> = {
id: EventId;
userId: UserId;
type: T;
};
export type EventFromA =
| BaseEvent<'user.created'>
| BaseEvent<'todo.created'> & { todo: Todo };
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment