Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save Jalson1982/66ad3a22f8d6be755d3841d444e78ab3 to your computer and use it in GitHub Desktop.
Save Jalson1982/66ad3a22f8d6be755d3841d444e78ab3 to your computer and use it in GitHub Desktop.
import {Message as MessageClass} from '../../interfaces&classes/message-classes';
import {store} from '../../store';
import {
FETCH_FILTER_MESSAGES,
GET_LOCAL_DB_DATA,
S_UPDATE_UNREADS,
TABLE_SPINNER,
} from '../../helpers/constants';
import {IObjectNumber} from '../../interfaces&classes/common';
import {sequenceMessages} from '../../helpers/helpers';
import {Database, Model} from '@nozbe/watermelondb';
import SQLiteAdapter from '@nozbe/watermelondb/adapters/sqlite';
import {schemaMigrations} from '@nozbe/watermelondb/Schema/migrations';
import {
Message,
EncryptionKey,
schema,
MessageRecord,
EncryptionKeyRecord,
messageColumnsRawSelectQueryFragment,
TableKeyRecord,
Table,
} from './watermelon-model-definitions';
import {Q} from '@nozbe/watermelondb';
import {sanitizedRaw} from '@nozbe/watermelondb/RawRecord';
const migrations = schemaMigrations({
migrations: [],
});
const adapter = new SQLiteAdapter({
schema,
migrations,
dbName: '8seatchatTable-watermelondb',
jsi: true,
onSetUpError: error => {
if (DEBUG) {
customLogger('error', executionId, error);
}
},
});
const DEBUG = true;
let executionId = 0;
export const customLogger = (
level: 'error' | 'debug',
id: number,
...args: any[]
) => {
if (DEBUG) {
console?.debug(
`[wm-db-${level}(${id})]`,
...args,
level === 'error' ? args[0].stack : '',
);
}
};
export let watermelonDatabase: Database | null;
export const getDBConnection = async () => {
if (!watermelonDatabase) {
watermelonDatabase = new Database({
adapter,
modelClasses: [Message, EncryptionKey, Table],
});
}
return watermelonDatabase;
};
function getParentFunctionName() {
try {
throw new Error();
} catch (e) {
let parent = '';
const allMatches: string = e.stack.match(/(\w+)@|at (\w+) \(/g).join(' ');
Object.keys(databaseClient).forEach(name => {
if (allMatches.includes(name)) {
parent = name;
}
});
return parent;
}
}
const execute = async <T>(callback: (database: Database, id: number) => T) => {
const id = executionId + 1;
executionId = id;
try {
customLogger('debug', id, `executing: ${getParentFunctionName()}`);
const database = await getDBConnection();
return await callback(database, executionId);
} catch (error) {
customLogger('error', id, error, callback.name);
}
customLogger('debug', id, 'is completed');
};
export const deleteDb = () =>
execute(async database => {
await database.write(() => {
return database.unsafeResetDatabase();
});
watermelonDatabase = null;
});
export const createTable = async () => {
await getDBConnection();
};
const bulkUpsertMessages = (messages: MessageRecord[]) =>
execute(async database => {
const collection = await database.collections.get<MessageRecord & Model>(
'message',
);
// Fetch all existing messages in a single query
const existingMessages = await collection
.query(Q.where('_id', Q.oneOf(messages.map(message => message._id))))
.fetch();
// Create a map of existing message IDs for quick lookup
const existingMessageIds = new Map(
existingMessages.map(message => [message._id, message]),
);
const updates: (MessageRecord & Model)[] = [];
const creates: (MessageRecord & Model)[] = [];
for (const message of messages) {
const existing = existingMessageIds.get(message._id);
if (existing) {
updates.push(
existing.prepareUpdate(toUpdate => Object.assign(toUpdate, message)),
);
} else {
creates.push(
collection.prepareCreate(toCreate => {
toCreate._raw = sanitizedRaw({_id: message._id}, collection.schema);
Object.assign(toCreate, message);
}),
);
}
}
await database.write(async () => {
await database.batch(...updates, ...creates);
});
});
export const bulkSaveChatData = async (messages: MessageRecord[]) => {
await bulkUpsertMessages(messages);
getConvesations();
};
export const getConvesations = async () => {
const allMessages = await database.collections
.get('message')
.query(Q.sortBy('time', Q.desc))
.fetch();
const conversations = allMessages.reduce((acc, message) => {
const key = `${message.tableId}_${message.code}`;
if (!acc[key]) {
acc[key] = {...message, unread: message.isRead ? 1 : 0};
} else {
if (message.isRead) {
acc[key].unread += 1;
}
// Check if current message is newer
if (new Date(message.time) > new Date(acc[key].time)) {
// Replace message but keep the calculated unread count
const {unread} = acc[key];
acc[key] = {...message, unread};
}
}
return acc;
}, {});
const conversationsArray = Object.values(conversations).sort(
(a, b) => new Date(b.time) - new Date(a.time),
);
const sequenced = sequenceMessages(conversationsArray);
store.dispatch({
type: GET_LOCAL_DB_DATA,
payload: sequenced,
});
};
export const saveChatData = async (message: MessageRecord) => {
bulkSaveChatData([message]);
const {tableId} = message;
getUnreads(tableId);
};
export const getUnreads = async (tableId: string) =>
execute(async database => {
const messages = await database
.get('message')
.query(
Q.unsafeSqlQuery(
`SELECT code, sum(is_read) as unread
FROM message
WHERE table_id = $0
GROUP BY code`,
[tableId],
),
)
.unsafeFetchRaw();
let unreads: IObjectNumber = {};
messages.forEach(message => {
unreads[message.code] = message.unread;
});
if (unreads.count) {
store.dispatch({
type: S_UPDATE_UNREADS,
payload: {unreads, tableId},
});
}
});
export const getLastMessageTime = async () =>
execute(async database => {
const messages = await database
.get('message')
.query(
Q.unsafeSqlQuery(
`SELECT time, table_id as "tableId"
FROM (SELECT * FROM message ORDER BY time desc) m
GROUP BY table_id
ORDER BY time DESC
;`,
),
)
.unsafeFetchRaw();
return messages;
});
export const updateChatData = async (tableId: string, code: number) =>
execute(async database => {
const collection = await database.collections.get<MessageRecord & Model>(
'message',
);
const updates: (MessageRecord & Model)[] = [];
const messages = await collection
.query(Q.and(Q.where('code', code), Q.where('table_id', tableId)))
.fetch();
if (messages?.length) {
messages.forEach(message => {
updates.push(
message.prepareUpdate(toUpdate => {
toUpdate.isRead = false;
}),
);
});
}
await database.write(async () => {
await database.batch(...updates);
});
});
export const deleteChatData = async (tableId: string) =>
execute(async database => {
const collection = database.collections.get('message');
const recordsToDelete = await collection
.query(Q.where('table_id', tableId))
.fetch();
await database.action(async () => {
await Promise.all(
recordsToDelete.map(record => record.destroyPermanently()),
);
});
getConvesations();
});
export const createKeysTable = async () => {};
export const upsertTables = async (tables: TableKeyRecord[]) => {
execute(async database => {
const collection = database.collections.get<TableKeyRecord & Model>(
'table',
);
const updates: (TableKeyRecord & Model)[] = [];
const creates: (TableKeyRecord & Model)[] = [];
for (const table of tables) {
const existing = await collection.find(table._id).catch(() => {});
if (existing) {
updates.push(
existing.prepareUpdate(toUpdate => Object.assign(toUpdate, table)),
);
} else {
creates.push(
collection.prepareCreate(toCreate => {
toCreate._raw = sanitizedRaw({_id: table._id}, collection.schema);
Object.assign(toCreate, table);
}),
);
}
}
await database.write(async () => {
await database.batch(...updates, ...creates);
});
});
};
export const saveUserKeys = async (encryptionKey: EncryptionKeyRecord) =>
execute(async database => {
const collection = database.collections.get<EncryptionKeyRecord & Model>(
'encryption_key',
);
await database.write(async () => {
return await collection.create(key => {
key._id = encryptionKey._id;
key.userId = encryptionKey.userId;
key.code = encryptionKey.code;
key.userSecretKey = encryptionKey.userSecretKey;
key.senderPublicKey = encryptionKey.senderPublicKey;
});
});
});
export const getKeys = async (
tableId: string,
code: string,
callback: (result: EncryptionKeyRecord) => void,
) =>
execute(async database => {
const result = await database
.get<Model & EncryptionKeyRecord>('encryption_key')
.query(
Q.and(Q.where('_id', tableId), Q.where('code', code)),
Q.sortBy('created_at', 'desc'),
)
.fetch();
if (result.length) {
callback(result[0]);
}
});
export const filterData = async (
text: string,
callback: (result: MessageRecord[]) => void,
) =>
execute(async database => {
const messages = await database
.get<Model & MessageRecord>('message')
.query(
Q.where('text', Q.like(`%${Q.sanitizeLikeString(text)}%`)),
Q.take(100),
Q.sortBy('time', Q.desc),
)
.fetch();
callback(messages);
});
export const fetchFilterMessage = async (message: MessageClass) =>
execute(async database => {
const {code, tableId, time} = message;
const messages = await database
.get<Model & MessageRecord>('message')
.query(
Q.unsafeSqlQuery(
`
SELECT ${messageColumnsRawSelectQueryFragment}
FROM message
WHERE tableId = ?
AND code = ?
AND time <= ?
ORDER BY time desc
LIMIT 30;`,
[tableId, code, time],
),
)
.unsafeFetchRaw();
store.dispatch({
database: 'wm',
type: FETCH_FILTER_MESSAGES,
payload: {messages: messages, data: message},
});
return messages;
});
export const fetchMoreMessage = async (
tableId: string,
code: number,
index = 0,
lastMessageTime: number,
) =>
execute(async database => {
const timeInMs = new Date(lastMessageTime).getTime();
const messages = await database
.get<Model & MessageRecord>('message')
.query(
Q.where('code', code),
Q.where('table_id', tableId),
Q.where('time', Q.gt(timeInMs)),
Q.sortBy('time', Q.asc),
)
.unsafeFetchRaw();
store.dispatch({type: TABLE_SPINNER, payload: false});
if (messages.length > 0) {
store.dispatch({
database: 'wm',
type: 'ADD_BOTTOM_MESSAGE',
payload: {messages, index},
});
}
return messages;
});
export const getMoreChatData = async (
tableId: string,
code: number,
index = 0,
lastMessageTime: number,
) =>
execute(async database => {
const timeInMs = new Date(lastMessageTime).getTime();
const messages = await database
.get<Model & MessageRecord>('message')
.query(
Q.where('code', code),
Q.where('table_id', tableId),
Q.where('time', Q.lt(timeInMs)),
Q.sortBy('time', Q.desc),
)
.unsafeFetchRaw();
store.dispatch({type: TABLE_SPINNER, payload: false});
if (messages.length > 0) {
store.dispatch({
database: 'wm',
type: 'ADD_BOTTOM_MESSAGE',
payload: {messages, index},
});
}
return messages;
});
export const getDistinctMessagesByCode = async (tableId: string) =>
execute(async database => {
const messages = await database
.get<Model & MessageRecord>('message')
.query(Q.where('table_id', tableId), Q.sortBy('code', Q.asc), Q.take(30))
.fetch();
return messages;
});
export const getMoreMessagesByCode = async (
tableId: string,
code: number,
page: number,
pageSize: number,
) =>
execute(async database => {
const messages = await database
.get<Model & MessageRecord>('message')
.query(
Q.where('code', code),
Q.where('table_id', tableId),
Q.sortBy('time', Q.desc),
Q.skip((page - 1) * pageSize),
Q.take(pageSize),
)
.fetch();
return messages;
});
const databaseClient = {
getDBConnection,
deleteDb,
createTable,
bulkSaveChatData,
getConvesations,
saveChatData,
getUnreads,
getLastMessageTime,
updateChatData,
deleteChatData,
createKeysTable,
saveUserKeys,
getKeys,
filterData,
fetchFilterMessage,
fetchMoreMessage,
getMoreChatData,
upsertTables,
getDistinctMessagesByCode,
};
export default databaseClient;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment