Skip to content

Instantly share code, notes, and snippets.

@slavahatnuke
Created October 10, 2019 17:02
Show Gist options
  • Save slavahatnuke/14b46cbf60f4d61dbb6fd05fcbd1c627 to your computer and use it in GitHub Desktop.
Save slavahatnuke/14b46cbf60f4d61dbb6fd05fcbd1c627 to your computer and use it in GitHub Desktop.
const { Debug } = require('dataflow');
const debug = Debug('dataflow:debug:dedupe:worker');
const debugIdentity = debug.child('identity');
const { groupBy, flattenDeep } = require('lodash');
const {
EntityId, Entity, Indexer, IdentityCollection, IndexEntity,
} = require('dedupe');
function buildEntityIds(items) {
return items.map((item) => {
const {
owner, keys, recordData, dataType, id, tokenType,
} = item;
const entityOwner = `${owner}/${dataType}`;
const meta = {
item,
};
if (recordData.master_id) {
delete recordData.master_id;
}
const entity = Entity(recordData, {
owner: entityOwner,
id,
meta,
});
const entityId = EntityId({
entity,
keys,
tokenType,
owner,
});
return entityId;
});
}
function groupEntityIdsByOwner(entityIds) {
return Object
.entries(groupBy(entityIds, ({ entity: { owner } }) => owner))
.map(([owner, entityIds]) => ({
owner,
entityIds,
}));
}
function FindCandidatesByEntityIds({ mongo }) {
const indexer = Indexer();
return async function findCandidatesByEntityIds(entityIds) {
let candidates = [];
const ownerEntityIdsGroups = groupEntityIdsByOwner(entityIds);
const promises = ownerEntityIdsGroups.map(async ({ owner, entityIds }) => {
const dataForSearch = entityIds.map((entityId) => {
const { ids: idTokens, indexEntity } = indexer.index({ entity: entityId });
return {
entityId,
indexEntity,
indexEntityObject: indexEntity.toObject(),
owner: entityId.entity.owner,
idTokens: idTokens || [],
};
});
const dataForSearchGrouped = dataForSearch
.map(({ idTokens }) => idTokens.map(tokenGroup => ({
tokens: { $all: tokenGroup },
})));
const tokensQuery = flattenDeep(dataForSearchGrouped);
const collection = await mongo.getCollection(owner);
if (tokensQuery.length) {
const $query = {
owner,
$or: tokensQuery,
};
debugIdentity.child('query')({ $query });
candidates = await collection.find($query)
.toArray();
}
});
await Promise.all(promises);
return candidates;
};
}
function IndexEntityIds({ mongo, force = false, indexByTokens = false } = {}) {
const indexer = Indexer();
return async function indexEntityIds(entityIds) {
const ownerEntityIdsGroups = groupEntityIdsByOwner(entityIds);
const promises = ownerEntityIdsGroups.map(async ({ owner, entityIds }) => {
const dataForIndex = entityIds.map((entityId) => {
const { ids: idTokens, indexEntity } = indexer.index({ entity: entityId });
return {
entityId,
indexEntity,
idTokens: idTokens || [],
};
});
const collection = await mongo.getCollection(owner);
const updates = dataForIndex.map(({ indexEntity, idTokens }) => {
const {
tokens, tokenType, id, owner,
} = indexEntity.toObject();
const condition = {
owner,
};
if (indexByTokens) {
const tokenQuery = idTokens.map((tokens) => {
const $all = tokens.map(token => ({ $elemMatch: { $eq: token } }));
return {
tokens: {
$all,
},
};
});
if (tokenQuery.length && indexByTokens) {
condition.$or = tokenQuery;
} else {
condition.id = id;
}
} else {
condition.id = id;
}
const now = new Date();
const update = {
$setOnInsert: {
id,
owner,
createdAt: now,
tokenType,
},
};
if (!tokens.length) {
tokens.push('no-tokens');
}
if (force) {
update.$set = {
tokens,
updatedAt: now,
};
} else {
update.$addToSet = { tokens: { $each: tokens } };
update.$set = { updatedAt: now };
}
debugIdentity.child('query.bulk')({
condition,
update,
});
return {
condition,
update,
};
});
if (updates.length) {
const bulk = collection.initializeOrderedBulkOp();
updates.forEach(({ condition, update }) => {
bulk
.find(condition)
.upsert()
.updateOne(update);
});
await bulk.execute();
}
});
await Promise.all(promises);
};
}
function toArrayTokens(stringTokens) {
return stringTokens
.map((token) => {
const match = /^([^:]*):(.*)$/igm.exec(token);
if (match) {
return [match[1], match[2] || ''];
}
return null;
})
.filter(value => !!value);
}
function arrayTokensToStringTokens(arrayTokens) {
return arrayTokens.map(([field, value]) => `${field}:${value}`);
}
function buildPatch({ entityTokens, candidateTokens } = {}) {
const candidateArrayTokens = toArrayTokens(candidateTokens);
const entityArrayTokens = toArrayTokens(entityTokens);
const insert = entityArrayTokens.filter(([entityField, entityValue]) => {
const found = candidateArrayTokens
.find(([candidateField, candidateValue]) => candidateField === entityField && candidateValue === entityValue);
const haveToInsert = !found;
return haveToInsert;
});
const remove = candidateArrayTokens.filter(([candidateField, candidateValue]) => {
if (!candidateValue) {
const foundNonEmptyCandidate = candidateArrayTokens.find(([field, value]) => candidateField === field && value);
if (foundNonEmptyCandidate) {
return true;
}
}
const found = entityArrayTokens.find(([entityField]) => candidateField === entityField);
if (!found) {
return false;
}
const entityValue = found[1];
if (!entityValue) {
return false;
}
if (entityValue !== candidateValue) {
return true;
}
return false;
});
const patch = {
remove: arrayTokensToStringTokens(remove),
insert: arrayTokensToStringTokens(insert),
};
debugIdentity({
candidateArrayTokens,
entityArrayTokens,
patch,
});
return patch;
}
function UpdateIndex({ mongo }) {
return async function updateIndexByEntityIds(indexUpdates) {
const groupsByOwner = Object.entries(groupBy(indexUpdates, ({ owner }) => owner));
const promises = groupsByOwner.map(async ([owner, indexUpdates]) => {
const updates = [];
indexUpdates
.forEach(({ owner, id, patch }) => {
if (!patch.insert.length && !patch.remove.length) {
return null;
}
const condition = {
owner,
id,
};
if (patch.insert.length) {
const updateToInsert = {
$addToSet: { tokens: { $each: patch.insert } },
};
debugIdentity.child('query.bulk.patch')({
condition,
update: updateToInsert,
});
updates.push({
condition,
update: updateToInsert,
});
}
if (patch.remove.length) {
const updateToRemove = {
condition,
update: {
$pullAll: { tokens: patch.remove },
},
};
debugIdentity.child('query.bulk.patch')({
condition,
update: updateToRemove,
});
updates.push(updateToRemove);
}
return null;
});
if (updates.length) {
const collection = await mongo.getCollection(owner);
const bulk = collection.initializeOrderedBulkOp();
updates.forEach(({ condition, update }) => {
bulk
.find(condition)
.updateOne(update);
});
await bulk.execute();
}
debugIdentity({ updates });
});
await Promise.all(promises);
};
}
function identifySameEntities(entityIds) {
const entities = entityIds.map(({ entity }) => entity);
const identityCollection = IdentityCollection(entities);
const debugLead = debugIdentity.child('lead');
entityIds.forEach((entityId) => {
const candidateEntities = identityCollection
.getEntities(entityId);
const ids = candidateEntities
.map(({ id }) => id);
ids.push(entityId.entity.id);
const [leadId] = ids.sort();
debugLead({
leadId,
candidatesToBeLeadId: ids,
});
entityId.entity.id = leadId;
entityId.entity.meta.item.id = leadId;
candidateEntities.forEach((entity) => {
entity.id = leadId;
entity.meta.item.id = leadId;
});
});
return entityIds;
}
function ProcessIdentityItems({ mongo }) {
const findCandidatesByEntityIds = FindCandidatesByEntityIds({ mongo });
const indexEntityIds = IndexEntityIds({ mongo });
const updateIndex = UpdateIndex({ mongo });
return async function processIdentityItems(items) {
const entityIds = identifySameEntities(buildEntityIds(items));
const candidates = await findCandidatesByEntityIds(entityIds);
debugIdentity({ candidates });
const identityCollection = IdentityCollection(candidates.map(IndexEntity));
// identification
const identification = entityIds.map((entityId) => {
const candidateEntity = identityCollection.getEntity(entityId);
return {
entityId,
candidateEntity,
};
});
// not identified case
const entityIdsToIndex = identification
.filter(({ candidateEntity }) => !candidateEntity)
.map(({ entityId }) => entityId);
// identified case
const entityIdUpdates = identification
.filter(({ candidateEntity }) => candidateEntity)
.map(({ candidateEntity, entityId }) => {
// identify / update entity and item
const { id } = candidateEntity;
entityId.entity.id = id;
entityId.entity.meta.item.id = id;
debugIdentity({ identified: id });
const candidateTokens = entityId.toTokens(candidateEntity);
const entityTokens = entityId.toTokens(entityId.entity);
const patch = buildPatch({
candidateTokens,
entityTokens,
});
debugIdentity({ patch });
return {
entityId,
owner: entityId.entity.owner,
id,
patch,
};
});
await indexEntityIds(entityIdsToIndex);
await updateIndex(entityIdUpdates);
return items.map((item) => {
const { record, id, recordData } = item;
record.updateValues(recordData);
record.updateValues({ master_id: id });
return item;
});
};
}
module.exports = {
ProcessIdentityItems,
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment