Created
June 6, 2023 12:23
-
-
Save Jalson1982/66ad3a22f8d6be755d3841d444e78ab3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import {Message as MessageClass} from '../../interfaces&classes/message-classes'; | |
import {store} from '../../store'; | |
import { | |
FETCH_FILTER_MESSAGES, | |
GET_LOCAL_DB_DATA, | |
S_UPDATE_UNREADS, | |
TABLE_SPINNER, | |
} from '../../helpers/constants'; | |
import {IObjectNumber} from '../../interfaces&classes/common'; | |
import {sequenceMessages} from '../../helpers/helpers'; | |
import {Database, Model} from '@nozbe/watermelondb'; | |
import SQLiteAdapter from '@nozbe/watermelondb/adapters/sqlite'; | |
import {schemaMigrations} from '@nozbe/watermelondb/Schema/migrations'; | |
import { | |
Message, | |
EncryptionKey, | |
schema, | |
MessageRecord, | |
EncryptionKeyRecord, | |
messageColumnsRawSelectQueryFragment, | |
TableKeyRecord, | |
Table, | |
} from './watermelon-model-definitions'; | |
import {Q} from '@nozbe/watermelondb'; | |
import {sanitizedRaw} from '@nozbe/watermelondb/RawRecord'; | |
const migrations = schemaMigrations({ | |
migrations: [], | |
}); | |
const adapter = new SQLiteAdapter({ | |
schema, | |
migrations, | |
dbName: '8seatchatTable-watermelondb', | |
jsi: true, | |
onSetUpError: error => { | |
if (DEBUG) { | |
customLogger('error', executionId, error); | |
} | |
}, | |
}); | |
const DEBUG = true; | |
let executionId = 0; | |
export const customLogger = ( | |
level: 'error' | 'debug', | |
id: number, | |
...args: any[] | |
) => { | |
if (DEBUG) { | |
console?.debug( | |
`[wm-db-${level}(${id})]`, | |
...args, | |
level === 'error' ? args[0].stack : '', | |
); | |
} | |
}; | |
export let watermelonDatabase: Database | null; | |
export const getDBConnection = async () => { | |
if (!watermelonDatabase) { | |
watermelonDatabase = new Database({ | |
adapter, | |
modelClasses: [Message, EncryptionKey, Table], | |
}); | |
} | |
return watermelonDatabase; | |
}; | |
function getParentFunctionName() { | |
try { | |
throw new Error(); | |
} catch (e) { | |
let parent = ''; | |
const allMatches: string = e.stack.match(/(\w+)@|at (\w+) \(/g).join(' '); | |
Object.keys(databaseClient).forEach(name => { | |
if (allMatches.includes(name)) { | |
parent = name; | |
} | |
}); | |
return parent; | |
} | |
} | |
const execute = async <T>(callback: (database: Database, id: number) => T) => { | |
const id = executionId + 1; | |
executionId = id; | |
try { | |
customLogger('debug', id, `executing: ${getParentFunctionName()}`); | |
const database = await getDBConnection(); | |
return await callback(database, executionId); | |
} catch (error) { | |
customLogger('error', id, error, callback.name); | |
} | |
customLogger('debug', id, 'is completed'); | |
}; | |
export const deleteDb = () => | |
execute(async database => { | |
await database.write(() => { | |
return database.unsafeResetDatabase(); | |
}); | |
watermelonDatabase = null; | |
}); | |
export const createTable = async () => { | |
await getDBConnection(); | |
}; | |
const bulkUpsertMessages = (messages: MessageRecord[]) => | |
execute(async database => { | |
const collection = await database.collections.get<MessageRecord & Model>( | |
'message', | |
); | |
// Fetch all existing messages in a single query | |
const existingMessages = await collection | |
.query(Q.where('_id', Q.oneOf(messages.map(message => message._id)))) | |
.fetch(); | |
// Create a map of existing message IDs for quick lookup | |
const existingMessageIds = new Map( | |
existingMessages.map(message => [message._id, message]), | |
); | |
const updates: (MessageRecord & Model)[] = []; | |
const creates: (MessageRecord & Model)[] = []; | |
for (const message of messages) { | |
const existing = existingMessageIds.get(message._id); | |
if (existing) { | |
updates.push( | |
existing.prepareUpdate(toUpdate => Object.assign(toUpdate, message)), | |
); | |
} else { | |
creates.push( | |
collection.prepareCreate(toCreate => { | |
toCreate._raw = sanitizedRaw({_id: message._id}, collection.schema); | |
Object.assign(toCreate, message); | |
}), | |
); | |
} | |
} | |
await database.write(async () => { | |
await database.batch(...updates, ...creates); | |
}); | |
}); | |
export const bulkSaveChatData = async (messages: MessageRecord[]) => { | |
await bulkUpsertMessages(messages); | |
getConvesations(); | |
}; | |
export const getConvesations = async () => { | |
const allMessages = await database.collections | |
.get('message') | |
.query(Q.sortBy('time', Q.desc)) | |
.fetch(); | |
const conversations = allMessages.reduce((acc, message) => { | |
const key = `${message.tableId}_${message.code}`; | |
if (!acc[key]) { | |
acc[key] = {...message, unread: message.isRead ? 1 : 0}; | |
} else { | |
if (message.isRead) { | |
acc[key].unread += 1; | |
} | |
// Check if current message is newer | |
if (new Date(message.time) > new Date(acc[key].time)) { | |
// Replace message but keep the calculated unread count | |
const {unread} = acc[key]; | |
acc[key] = {...message, unread}; | |
} | |
} | |
return acc; | |
}, {}); | |
const conversationsArray = Object.values(conversations).sort( | |
(a, b) => new Date(b.time) - new Date(a.time), | |
); | |
const sequenced = sequenceMessages(conversationsArray); | |
store.dispatch({ | |
type: GET_LOCAL_DB_DATA, | |
payload: sequenced, | |
}); | |
}; | |
export const saveChatData = async (message: MessageRecord) => { | |
bulkSaveChatData([message]); | |
const {tableId} = message; | |
getUnreads(tableId); | |
}; | |
export const getUnreads = async (tableId: string) => | |
execute(async database => { | |
const messages = await database | |
.get('message') | |
.query( | |
Q.unsafeSqlQuery( | |
`SELECT code, sum(is_read) as unread | |
FROM message | |
WHERE table_id = $0 | |
GROUP BY code`, | |
[tableId], | |
), | |
) | |
.unsafeFetchRaw(); | |
let unreads: IObjectNumber = {}; | |
messages.forEach(message => { | |
unreads[message.code] = message.unread; | |
}); | |
if (unreads.count) { | |
store.dispatch({ | |
type: S_UPDATE_UNREADS, | |
payload: {unreads, tableId}, | |
}); | |
} | |
}); | |
export const getLastMessageTime = async () => | |
execute(async database => { | |
const messages = await database | |
.get('message') | |
.query( | |
Q.unsafeSqlQuery( | |
`SELECT time, table_id as "tableId" | |
FROM (SELECT * FROM message ORDER BY time desc) m | |
GROUP BY table_id | |
ORDER BY time DESC | |
;`, | |
), | |
) | |
.unsafeFetchRaw(); | |
return messages; | |
}); | |
export const updateChatData = async (tableId: string, code: number) => | |
execute(async database => { | |
const collection = await database.collections.get<MessageRecord & Model>( | |
'message', | |
); | |
const updates: (MessageRecord & Model)[] = []; | |
const messages = await collection | |
.query(Q.and(Q.where('code', code), Q.where('table_id', tableId))) | |
.fetch(); | |
if (messages?.length) { | |
messages.forEach(message => { | |
updates.push( | |
message.prepareUpdate(toUpdate => { | |
toUpdate.isRead = false; | |
}), | |
); | |
}); | |
} | |
await database.write(async () => { | |
await database.batch(...updates); | |
}); | |
}); | |
export const deleteChatData = async (tableId: string) => | |
execute(async database => { | |
const collection = database.collections.get('message'); | |
const recordsToDelete = await collection | |
.query(Q.where('table_id', tableId)) | |
.fetch(); | |
await database.action(async () => { | |
await Promise.all( | |
recordsToDelete.map(record => record.destroyPermanently()), | |
); | |
}); | |
getConvesations(); | |
}); | |
export const createKeysTable = async () => {}; | |
export const upsertTables = async (tables: TableKeyRecord[]) => { | |
execute(async database => { | |
const collection = database.collections.get<TableKeyRecord & Model>( | |
'table', | |
); | |
const updates: (TableKeyRecord & Model)[] = []; | |
const creates: (TableKeyRecord & Model)[] = []; | |
for (const table of tables) { | |
const existing = await collection.find(table._id).catch(() => {}); | |
if (existing) { | |
updates.push( | |
existing.prepareUpdate(toUpdate => Object.assign(toUpdate, table)), | |
); | |
} else { | |
creates.push( | |
collection.prepareCreate(toCreate => { | |
toCreate._raw = sanitizedRaw({_id: table._id}, collection.schema); | |
Object.assign(toCreate, table); | |
}), | |
); | |
} | |
} | |
await database.write(async () => { | |
await database.batch(...updates, ...creates); | |
}); | |
}); | |
}; | |
export const saveUserKeys = async (encryptionKey: EncryptionKeyRecord) => | |
execute(async database => { | |
const collection = database.collections.get<EncryptionKeyRecord & Model>( | |
'encryption_key', | |
); | |
await database.write(async () => { | |
return await collection.create(key => { | |
key._id = encryptionKey._id; | |
key.userId = encryptionKey.userId; | |
key.code = encryptionKey.code; | |
key.userSecretKey = encryptionKey.userSecretKey; | |
key.senderPublicKey = encryptionKey.senderPublicKey; | |
}); | |
}); | |
}); | |
export const getKeys = async ( | |
tableId: string, | |
code: string, | |
callback: (result: EncryptionKeyRecord) => void, | |
) => | |
execute(async database => { | |
const result = await database | |
.get<Model & EncryptionKeyRecord>('encryption_key') | |
.query( | |
Q.and(Q.where('_id', tableId), Q.where('code', code)), | |
Q.sortBy('created_at', 'desc'), | |
) | |
.fetch(); | |
if (result.length) { | |
callback(result[0]); | |
} | |
}); | |
export const filterData = async ( | |
text: string, | |
callback: (result: MessageRecord[]) => void, | |
) => | |
execute(async database => { | |
const messages = await database | |
.get<Model & MessageRecord>('message') | |
.query( | |
Q.where('text', Q.like(`%${Q.sanitizeLikeString(text)}%`)), | |
Q.take(100), | |
Q.sortBy('time', Q.desc), | |
) | |
.fetch(); | |
callback(messages); | |
}); | |
export const fetchFilterMessage = async (message: MessageClass) => | |
execute(async database => { | |
const {code, tableId, time} = message; | |
const messages = await database | |
.get<Model & MessageRecord>('message') | |
.query( | |
Q.unsafeSqlQuery( | |
` | |
SELECT ${messageColumnsRawSelectQueryFragment} | |
FROM message | |
WHERE tableId = ? | |
AND code = ? | |
AND time <= ? | |
ORDER BY time desc | |
LIMIT 30;`, | |
[tableId, code, time], | |
), | |
) | |
.unsafeFetchRaw(); | |
store.dispatch({ | |
database: 'wm', | |
type: FETCH_FILTER_MESSAGES, | |
payload: {messages: messages, data: message}, | |
}); | |
return messages; | |
}); | |
export const fetchMoreMessage = async ( | |
tableId: string, | |
code: number, | |
index = 0, | |
lastMessageTime: number, | |
) => | |
execute(async database => { | |
const timeInMs = new Date(lastMessageTime).getTime(); | |
const messages = await database | |
.get<Model & MessageRecord>('message') | |
.query( | |
Q.where('code', code), | |
Q.where('table_id', tableId), | |
Q.where('time', Q.gt(timeInMs)), | |
Q.sortBy('time', Q.asc), | |
) | |
.unsafeFetchRaw(); | |
store.dispatch({type: TABLE_SPINNER, payload: false}); | |
if (messages.length > 0) { | |
store.dispatch({ | |
database: 'wm', | |
type: 'ADD_BOTTOM_MESSAGE', | |
payload: {messages, index}, | |
}); | |
} | |
return messages; | |
}); | |
export const getMoreChatData = async ( | |
tableId: string, | |
code: number, | |
index = 0, | |
lastMessageTime: number, | |
) => | |
execute(async database => { | |
const timeInMs = new Date(lastMessageTime).getTime(); | |
const messages = await database | |
.get<Model & MessageRecord>('message') | |
.query( | |
Q.where('code', code), | |
Q.where('table_id', tableId), | |
Q.where('time', Q.lt(timeInMs)), | |
Q.sortBy('time', Q.desc), | |
) | |
.unsafeFetchRaw(); | |
store.dispatch({type: TABLE_SPINNER, payload: false}); | |
if (messages.length > 0) { | |
store.dispatch({ | |
database: 'wm', | |
type: 'ADD_BOTTOM_MESSAGE', | |
payload: {messages, index}, | |
}); | |
} | |
return messages; | |
}); | |
export const getDistinctMessagesByCode = async (tableId: string) => | |
execute(async database => { | |
const messages = await database | |
.get<Model & MessageRecord>('message') | |
.query(Q.where('table_id', tableId), Q.sortBy('code', Q.asc), Q.take(30)) | |
.fetch(); | |
return messages; | |
}); | |
export const getMoreMessagesByCode = async ( | |
tableId: string, | |
code: number, | |
page: number, | |
pageSize: number, | |
) => | |
execute(async database => { | |
const messages = await database | |
.get<Model & MessageRecord>('message') | |
.query( | |
Q.where('code', code), | |
Q.where('table_id', tableId), | |
Q.sortBy('time', Q.desc), | |
Q.skip((page - 1) * pageSize), | |
Q.take(pageSize), | |
) | |
.fetch(); | |
return messages; | |
}); | |
const databaseClient = { | |
getDBConnection, | |
deleteDb, | |
createTable, | |
bulkSaveChatData, | |
getConvesations, | |
saveChatData, | |
getUnreads, | |
getLastMessageTime, | |
updateChatData, | |
deleteChatData, | |
createKeysTable, | |
saveUserKeys, | |
getKeys, | |
filterData, | |
fetchFilterMessage, | |
fetchMoreMessage, | |
getMoreChatData, | |
upsertTables, | |
getDistinctMessagesByCode, | |
}; | |
export default databaseClient; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment