Skip to content

Instantly share code, notes, and snippets.

@jesster2k10
Last active August 27, 2023 11:10
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jesster2k10/4fb4a6742a2099429cd8185c4f91b528 to your computer and use it in GitHub Desktop.
Save jesster2k10/4fb4a6742a2099429cd8185c4f91b528 to your computer and use it in GitHub Desktop.
Node.js Social Activity Stream
  • PostgresQl
  • Typescript
  • Redis
  • TypeORM
import {
Entity,
PrimaryGeneratedColumn,
Column,
ManyToOne,
BaseEntity,
} from 'typeorm'
import { Profile } from 'entities'
import { ActivityTargetType, ActivityVerb } from './enums'
@Entity()
export class Activity extends BaseEntity {
@PrimaryGeneratedColumn()
id: number
@Column('enum', { enum: ActivityTargetType })
targetType: ActivityTargetType
@Column('smallint')
targetId: number
@Column('enum', { enum: ActivityTargetType, nullable: true })
parentType?: ActivityTargetType
@Column('smallint', { nullable: true })
parentId?: number
@ManyToOne(() => Profile)
actor: Profile
@Column({ nullable: true })
actorId: number
@Column('enum', { enum: ActivityVerb })
verb: ActivityVerb
@Column('timestamp', { default: 'now()' })
date: number
}
export enum ActivityTargetType {
LISTING = 'LISTING', // this could be a Photo, Post, Tweet etc.
COMMENT = 'COMMENT',
NOTICE = 'NOTICE',
}
export enum ActivityVerb {
ADD = 'ADD',
UPDATE = 'UPDATE',
LIKED = 'LIKED',
BOOKMARK = 'BOOKMARK',
}
const john = {id: 1} // fetch profile from database
const jacob = {id: 2}
const { ADD } = ActivityVerb
const { LISTING } = ActivityTargetType
await feedService.follow(john.id, jacob.id)
await feedService.createActivity(
john,
ADD,
LISTING,
1,
)
await feedService.createActivity(
jacob,
ADD,
LISTING,
2,
)
const jacobFeed = await feedService.getFeed(jacob.id) // [{..., item: { id: 1 }}]
const johnFeed = await feedService.getFeed(john.id) // [{..., item: { id: 1 }}, {..., item: { id: 2 }}]
/**
* This file is responsible for
* 1. storing activities in PostGres/Redis
* 2. fetching the users feed from redis
* 3. storing following/unfollowing in redis
*/
import {
ActivityTargetType,
ActivityVerb,
Activity,
Profile,
Listing,
} from 'entities'
import { groupBy, isEmpty } from 'lodash'
import { redis } from 'store/redis' // ioredis
import { ger } from 'store/ger' // look up NPM package ger
import { In } from 'typeorm'
import batch from 'batch-promises'
interface CreateActivityConfig {
parent?: number
parentType?: ActivityTargetType
}
const MAX_BATCH = 1000 // Limit the number of Promise.all at once to 1000, that way we dont overload the server say
// if a user with 1,000,000 followers makes a post. We wont be making the 1,000,000 writes at once but in batches of 1,000
export class FeedService {
// Safely parse the data from redis
parseArray(str: string | string[]): any[] {
try {
if (Array.isArray(str)) {
return str.map(item => JSON.parse(item))
}
return JSON.parse(str)
} catch (error) {
return null
}
}
// Users can turn on post notifications for other users.
async enablePostNotifications(other: number, current: number) {
const follower = String(current)
const timestamp = String(new Date().getTime())
return redis.zadd(`post_not:${other}`, timestamp, follower)
}
// Store the follower ids in redis so we can access them later without touching the DB
async follow(other: number, current: number) {
const follower = String(current)
const timestamp = String(new Date().getTime())
await ger.event('users', current, 'follows', other)
return redis.zadd(`followers:${other}`, timestamp, follower)
}
async unfollow(other: number, current: number) {
const follower = String(current)
await ger.delete_events('users', current, 'follows', other)
return redis.zrem(`followers:${other}`, follower)
}
// This function taks the activity stored in redis and gets the associated target object from the database.
// I only have on clause set up right now but it can be used for much more.
// My application has two type of feeds one for the home page and one called "activity" (think the "Following" tab in Instagram)
// The only difference is that the home page returns only one of each target object that is no duplicates for an activity
// Where as the "activity" page will return it as it is.
private extractTargetObjects = (type: 'feed' | 'activity' = 'feed') => <
T extends Activity
>(
feed: T[]
) => {
const mapItemToFeedItem = <T>(item: T, index: number) => ({
...feed[index], // the stored redis activity
item, // this is the item form the database
})
if (type !== 'feed') {
return Promise.all(
feed.map(item => {
const id = item.targetId
switch (item.targetType) {
case 'LISTING':
return Listing.findOne({ where: { id } }) // TypeORM query
default:
return null
}
})
).then(data => data.map(mapItemToFeedItem))
}
const grouped = groupBy(feed, item => item.targetType)
const promises = Object.entries(grouped).map(([targetType, items]) => {
const ids = items.map(item => item.targetId)
switch (targetType) {
case 'LISTING':
return Listing.find({ where: { id: In(ids) } }) // batch query
default:
return []
}
})
return Promise.all(promises)
.then(data => data.reduce((left, right) => left.concat(right), []))
.then(data => data.map(mapItemToFeedItem))
}
// get the users feed
getFeed(profile: number, limit: number = 6, offset: number = 0) {
return redis
.zrange(`feed:${profile}`, offset, limit) // query the stored feed in redis
.then(this.parseArray) // parse the redis string
.then(this.extractTargetObjects('feed')) // extract the target objects
}
getActivityFeed(profile: number, limit: number = 6, offset: number = 0) {
return redis
.zrange(`activity:${2}`, offset, limit) // query the stored feed in redis
.then(this.parseArray) // parse the redis string
.then(this.extractTargetObjects('activity'))// extract the target objects
}
// Just saves the activity in PostGres and returns a JSON string of it
private async generateActivity(
actor: Profile,
verb: ActivityVerb,
targetType: ActivityTargetType,
target: number,
config: CreateActivityConfig = {}
): Promise<[Activity, string]> {
let activity = new Activity()
activity.actor = actor
activity.targetType = targetType
activity.targetId = target
activity.verb = verb
activity.parentId = config.parent
activity.parentType = config.parentType
activity = await activity.save()
const serializedActivity = JSON.stringify({
...activity,
actor: actor.id,
})
return [activity, serializedActivity]
}
// This stores the activity in postgres and each of the users followers feeds/activity feeds
// It also stores the activity as a notification for a user if they have enabled post notifications
async createActivity(
actor: Profile,
verb: ActivityVerb,
targetType: ActivityTargetType,
target: number,
config: CreateActivityConfig = {}
) {
const followers: string[] = await redis.zrange(
`followers:${actor.id}`,
0,
-1
) // get followers
const subscribers: string[] =
(await redis.zrange(`post_not:${actor.id}`, 0, -1)) || [] // get those who have enabled push notifications
const timestamp = String(new Date().getTime())
const isListing =
targetType === ActivityTargetType.LISTING && verb === ActivityVerb.ADD // can omit this check
const [activity, serializedActivity] = await this.generateActivity(
actor,
verb,
targetType,
target,
config
)
if (!isEmpty(subscribers)) {
// make a notification for people who have enable post notifications
batch(MAX_BATCH, subscribers, subscriber =>
redis.zadd(`notifications:${subscriber}`, timestamp, serializedActivity)
)
}
if (!isEmpty(followers)) {
// Publish to followers feed if it is a listing else to their activity feed
batch(MAX_BATCH, followers, async follower =>
isListing
? redis.zadd(`feed:${follower}`, timestamp, serializedActivity)
: redis.zadd(`activity:${follower}`, timestamp, serializedActivity)
)
}
if (isListing) {
// Add to actors feed
await redis.zadd(`feed:${actor.id}`, timestamp, serializedActivity)
}
return activity
}
}
export const feedService = new FeedService()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment