- PostgresQl
- Typescript
- Redis
- TypeORM
Last active
May 29, 2024 15:00
-
-
Save jesster2k10/4fb4a6742a2099429cd8185c4f91b528 to your computer and use it in GitHub Desktop.
Node.js Social Activity Stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
Entity, | |
PrimaryGeneratedColumn, | |
Column, | |
ManyToOne, | |
BaseEntity, | |
} from 'typeorm' | |
import { Profile } from 'entities' | |
import { ActivityTargetType, ActivityVerb } from './enums' | |
@Entity() | |
export class Activity extends BaseEntity { | |
@PrimaryGeneratedColumn() | |
id: number | |
@Column('enum', { enum: ActivityTargetType }) | |
targetType: ActivityTargetType | |
@Column('smallint') | |
targetId: number | |
@Column('enum', { enum: ActivityTargetType, nullable: true }) | |
parentType?: ActivityTargetType | |
@Column('smallint', { nullable: true }) | |
parentId?: number | |
@ManyToOne(() => Profile) | |
actor: Profile | |
@Column({ nullable: true }) | |
actorId: number | |
@Column('enum', { enum: ActivityVerb }) | |
verb: ActivityVerb | |
@Column('timestamp', { default: 'now()' }) | |
date: number | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export enum ActivityTargetType { | |
LISTING = 'LISTING', // this could be a Photo, Post, Tweet etc. | |
COMMENT = 'COMMENT', | |
NOTICE = 'NOTICE', | |
} | |
export enum ActivityVerb { | |
ADD = 'ADD', | |
UPDATE = 'UPDATE', | |
LIKED = 'LIKED', | |
BOOKMARK = 'BOOKMARK', | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const john = {id: 1} // fetch profile from database | |
const jacob = {id: 2} | |
const { ADD } = ActivityVerb | |
const { LISTING } = ActivityTargetType | |
await feedService.follow(john.id, jacob.id) | |
await feedService.createActivity( | |
john, | |
ADD, | |
LISTING, | |
1, | |
) | |
await feedService.createActivity( | |
jacob, | |
ADD, | |
LISTING, | |
2, | |
) | |
const jacobFeed = await feedService.getFeed(jacob.id) // [{..., item: { id: 1 }}] | |
const johnFeed = await feedService.getFeed(john.id) // [{..., item: { id: 1 }}, {..., item: { id: 2 }}] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* This file is responsible for | |
* 1. storing activities in PostGres/Redis | |
* 2. fetching the users feed from redis | |
* 3. storing following/unfollowing in redis | |
*/ | |
import { | |
ActivityTargetType, | |
ActivityVerb, | |
Activity, | |
Profile, | |
Listing, | |
} from 'entities' | |
import { groupBy, isEmpty } from 'lodash' | |
import { redis } from 'store/redis' // ioredis | |
import { ger } from 'store/ger' // look up NPM package ger | |
import { In } from 'typeorm' | |
import batch from 'batch-promises' | |
interface CreateActivityConfig { | |
parent?: number | |
parentType?: ActivityTargetType | |
} | |
const MAX_BATCH = 1000 // Limit the number of Promise.all at once to 1000, that way we dont overload the server say | |
// if a user with 1,000,000 followers makes a post. We wont be making the 1,000,000 writes at once but in batches of 1,000 | |
export class FeedService { | |
// Safely parse the data from redis | |
parseArray(str: string | string[]): any[] { | |
try { | |
if (Array.isArray(str)) { | |
return str.map(item => JSON.parse(item)) | |
} | |
return JSON.parse(str) | |
} catch (error) { | |
return null | |
} | |
} | |
// Users can turn on post notifications for other users. | |
async enablePostNotifications(other: number, current: number) { | |
const follower = String(current) | |
const timestamp = String(new Date().getTime()) | |
return redis.zadd(`post_not:${other}`, timestamp, follower) | |
} | |
// Store the follower ids in redis so we can access them later without touching the DB | |
async follow(other: number, current: number) { | |
const follower = String(current) | |
const timestamp = String(new Date().getTime()) | |
await ger.event('users', current, 'follows', other) | |
return redis.zadd(`followers:${other}`, timestamp, follower) | |
} | |
async unfollow(other: number, current: number) { | |
const follower = String(current) | |
await ger.delete_events('users', current, 'follows', other) | |
return redis.zrem(`followers:${other}`, follower) | |
} | |
// This function taks the activity stored in redis and gets the associated target object from the database. | |
// I only have on clause set up right now but it can be used for much more. | |
// My application has two type of feeds one for the home page and one called "activity" (think the "Following" tab in Instagram) | |
// The only difference is that the home page returns only one of each target object that is no duplicates for an activity | |
// Where as the "activity" page will return it as it is. | |
private extractTargetObjects = (type: 'feed' | 'activity' = 'feed') => < | |
T extends Activity | |
>( | |
feed: T[] | |
) => { | |
const mapItemToFeedItem = <T>(item: T, index: number) => ({ | |
...feed[index], // the stored redis activity | |
item, // this is the item form the database | |
}) | |
if (type !== 'feed') { | |
return Promise.all( | |
feed.map(item => { | |
const id = item.targetId | |
switch (item.targetType) { | |
case 'LISTING': | |
return Listing.findOne({ where: { id } }) // TypeORM query | |
default: | |
return null | |
} | |
}) | |
).then(data => data.map(mapItemToFeedItem)) | |
} | |
const grouped = groupBy(feed, item => item.targetType) | |
const promises = Object.entries(grouped).map(([targetType, items]) => { | |
const ids = items.map(item => item.targetId) | |
switch (targetType) { | |
case 'LISTING': | |
return Listing.find({ where: { id: In(ids) } }) // batch query | |
default: | |
return [] | |
} | |
}) | |
return Promise.all(promises) | |
.then(data => data.reduce((left, right) => left.concat(right), [])) | |
.then(data => data.map(mapItemToFeedItem)) | |
} | |
// get the users feed | |
getFeed(profile: number, limit: number = 6, offset: number = 0) { | |
return redis | |
.zrange(`feed:${profile}`, offset, limit) // query the stored feed in redis | |
.then(this.parseArray) // parse the redis string | |
.then(this.extractTargetObjects('feed')) // extract the target objects | |
} | |
getActivityFeed(profile: number, limit: number = 6, offset: number = 0) { | |
return redis | |
.zrange(`activity:${2}`, offset, limit) // query the stored feed in redis | |
.then(this.parseArray) // parse the redis string | |
.then(this.extractTargetObjects('activity'))// extract the target objects | |
} | |
// Just saves the activity in PostGres and returns a JSON string of it | |
private async generateActivity( | |
actor: Profile, | |
verb: ActivityVerb, | |
targetType: ActivityTargetType, | |
target: number, | |
config: CreateActivityConfig = {} | |
): Promise<[Activity, string]> { | |
let activity = new Activity() | |
activity.actor = actor | |
activity.targetType = targetType | |
activity.targetId = target | |
activity.verb = verb | |
activity.parentId = config.parent | |
activity.parentType = config.parentType | |
activity = await activity.save() | |
const serializedActivity = JSON.stringify({ | |
...activity, | |
actor: actor.id, | |
}) | |
return [activity, serializedActivity] | |
} | |
// This stores the activity in postgres and each of the users followers feeds/activity feeds | |
// It also stores the activity as a notification for a user if they have enabled post notifications | |
async createActivity( | |
actor: Profile, | |
verb: ActivityVerb, | |
targetType: ActivityTargetType, | |
target: number, | |
config: CreateActivityConfig = {} | |
) { | |
const followers: string[] = await redis.zrange( | |
`followers:${actor.id}`, | |
0, | |
-1 | |
) // get followers | |
const subscribers: string[] = | |
(await redis.zrange(`post_not:${actor.id}`, 0, -1)) || [] // get those who have enabled push notifications | |
const timestamp = String(new Date().getTime()) | |
const isListing = | |
targetType === ActivityTargetType.LISTING && verb === ActivityVerb.ADD // can omit this check | |
const [activity, serializedActivity] = await this.generateActivity( | |
actor, | |
verb, | |
targetType, | |
target, | |
config | |
) | |
if (!isEmpty(subscribers)) { | |
// make a notification for people who have enable post notifications | |
batch(MAX_BATCH, subscribers, subscriber => | |
redis.zadd(`notifications:${subscriber}`, timestamp, serializedActivity) | |
) | |
} | |
if (!isEmpty(followers)) { | |
// Publish to followers feed if it is a listing else to their activity feed | |
batch(MAX_BATCH, followers, async follower => | |
isListing | |
? redis.zadd(`feed:${follower}`, timestamp, serializedActivity) | |
: redis.zadd(`activity:${follower}`, timestamp, serializedActivity) | |
) | |
} | |
if (isListing) { | |
// Add to actors feed | |
await redis.zadd(`feed:${actor.id}`, timestamp, serializedActivity) | |
} | |
return activity | |
} | |
} | |
export const feedService = new FeedService() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment