Created
October 10, 2019 17:02
-
-
Save slavahatnuke/14b46cbf60f4d61dbb6fd05fcbd1c627 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { Debug } = require('dataflow'); | |
const debug = Debug('dataflow:debug:dedupe:worker'); | |
const debugIdentity = debug.child('identity'); | |
const { groupBy, flattenDeep } = require('lodash'); | |
const { | |
EntityId, Entity, Indexer, IdentityCollection, IndexEntity, | |
} = require('dedupe'); | |
function buildEntityIds(items) { | |
return items.map((item) => { | |
const { | |
owner, keys, recordData, dataType, id, tokenType, | |
} = item; | |
const entityOwner = `${owner}/${dataType}`; | |
const meta = { | |
item, | |
}; | |
if (recordData.master_id) { | |
delete recordData.master_id; | |
} | |
const entity = Entity(recordData, { | |
owner: entityOwner, | |
id, | |
meta, | |
}); | |
const entityId = EntityId({ | |
entity, | |
keys, | |
tokenType, | |
owner, | |
}); | |
return entityId; | |
}); | |
} | |
function groupEntityIdsByOwner(entityIds) { | |
return Object | |
.entries(groupBy(entityIds, ({ entity: { owner } }) => owner)) | |
.map(([owner, entityIds]) => ({ | |
owner, | |
entityIds, | |
})); | |
} | |
function FindCandidatesByEntityIds({ mongo }) { | |
const indexer = Indexer(); | |
return async function findCandidatesByEntityIds(entityIds) { | |
let candidates = []; | |
const ownerEntityIdsGroups = groupEntityIdsByOwner(entityIds); | |
const promises = ownerEntityIdsGroups.map(async ({ owner, entityIds }) => { | |
const dataForSearch = entityIds.map((entityId) => { | |
const { ids: idTokens, indexEntity } = indexer.index({ entity: entityId }); | |
return { | |
entityId, | |
indexEntity, | |
indexEntityObject: indexEntity.toObject(), | |
owner: entityId.entity.owner, | |
idTokens: idTokens || [], | |
}; | |
}); | |
const dataForSearchGrouped = dataForSearch | |
.map(({ idTokens }) => idTokens.map(tokenGroup => ({ | |
tokens: { $all: tokenGroup }, | |
}))); | |
const tokensQuery = flattenDeep(dataForSearchGrouped); | |
const collection = await mongo.getCollection(owner); | |
if (tokensQuery.length) { | |
const $query = { | |
owner, | |
$or: tokensQuery, | |
}; | |
debugIdentity.child('query')({ $query }); | |
candidates = await collection.find($query) | |
.toArray(); | |
} | |
}); | |
await Promise.all(promises); | |
return candidates; | |
}; | |
} | |
function IndexEntityIds({ mongo, force = false, indexByTokens = false } = {}) { | |
const indexer = Indexer(); | |
return async function indexEntityIds(entityIds) { | |
const ownerEntityIdsGroups = groupEntityIdsByOwner(entityIds); | |
const promises = ownerEntityIdsGroups.map(async ({ owner, entityIds }) => { | |
const dataForIndex = entityIds.map((entityId) => { | |
const { ids: idTokens, indexEntity } = indexer.index({ entity: entityId }); | |
return { | |
entityId, | |
indexEntity, | |
idTokens: idTokens || [], | |
}; | |
}); | |
const collection = await mongo.getCollection(owner); | |
const updates = dataForIndex.map(({ indexEntity, idTokens }) => { | |
const { | |
tokens, tokenType, id, owner, | |
} = indexEntity.toObject(); | |
const condition = { | |
owner, | |
}; | |
if (indexByTokens) { | |
const tokenQuery = idTokens.map((tokens) => { | |
const $all = tokens.map(token => ({ $elemMatch: { $eq: token } })); | |
return { | |
tokens: { | |
$all, | |
}, | |
}; | |
}); | |
if (tokenQuery.length && indexByTokens) { | |
condition.$or = tokenQuery; | |
} else { | |
condition.id = id; | |
} | |
} else { | |
condition.id = id; | |
} | |
const now = new Date(); | |
const update = { | |
$setOnInsert: { | |
id, | |
owner, | |
createdAt: now, | |
tokenType, | |
}, | |
}; | |
if (!tokens.length) { | |
tokens.push('no-tokens'); | |
} | |
if (force) { | |
update.$set = { | |
tokens, | |
updatedAt: now, | |
}; | |
} else { | |
update.$addToSet = { tokens: { $each: tokens } }; | |
update.$set = { updatedAt: now }; | |
} | |
debugIdentity.child('query.bulk')({ | |
condition, | |
update, | |
}); | |
return { | |
condition, | |
update, | |
}; | |
}); | |
if (updates.length) { | |
const bulk = collection.initializeOrderedBulkOp(); | |
updates.forEach(({ condition, update }) => { | |
bulk | |
.find(condition) | |
.upsert() | |
.updateOne(update); | |
}); | |
await bulk.execute(); | |
} | |
}); | |
await Promise.all(promises); | |
}; | |
} | |
function toArrayTokens(stringTokens) { | |
return stringTokens | |
.map((token) => { | |
const match = /^([^:]*):(.*)$/igm.exec(token); | |
if (match) { | |
return [match[1], match[2] || '']; | |
} | |
return null; | |
}) | |
.filter(value => !!value); | |
} | |
function arrayTokensToStringTokens(arrayTokens) { | |
return arrayTokens.map(([field, value]) => `${field}:${value}`); | |
} | |
function buildPatch({ entityTokens, candidateTokens } = {}) { | |
const candidateArrayTokens = toArrayTokens(candidateTokens); | |
const entityArrayTokens = toArrayTokens(entityTokens); | |
const insert = entityArrayTokens.filter(([entityField, entityValue]) => { | |
const found = candidateArrayTokens | |
.find(([candidateField, candidateValue]) => candidateField === entityField && candidateValue === entityValue); | |
const haveToInsert = !found; | |
return haveToInsert; | |
}); | |
const remove = candidateArrayTokens.filter(([candidateField, candidateValue]) => { | |
if (!candidateValue) { | |
const foundNonEmptyCandidate = candidateArrayTokens.find(([field, value]) => candidateField === field && value); | |
if (foundNonEmptyCandidate) { | |
return true; | |
} | |
} | |
const found = entityArrayTokens.find(([entityField]) => candidateField === entityField); | |
if (!found) { | |
return false; | |
} | |
const entityValue = found[1]; | |
if (!entityValue) { | |
return false; | |
} | |
if (entityValue !== candidateValue) { | |
return true; | |
} | |
return false; | |
}); | |
const patch = { | |
remove: arrayTokensToStringTokens(remove), | |
insert: arrayTokensToStringTokens(insert), | |
}; | |
debugIdentity({ | |
candidateArrayTokens, | |
entityArrayTokens, | |
patch, | |
}); | |
return patch; | |
} | |
function UpdateIndex({ mongo }) { | |
return async function updateIndexByEntityIds(indexUpdates) { | |
const groupsByOwner = Object.entries(groupBy(indexUpdates, ({ owner }) => owner)); | |
const promises = groupsByOwner.map(async ([owner, indexUpdates]) => { | |
const updates = []; | |
indexUpdates | |
.forEach(({ owner, id, patch }) => { | |
if (!patch.insert.length && !patch.remove.length) { | |
return null; | |
} | |
const condition = { | |
owner, | |
id, | |
}; | |
if (patch.insert.length) { | |
const updateToInsert = { | |
$addToSet: { tokens: { $each: patch.insert } }, | |
}; | |
debugIdentity.child('query.bulk.patch')({ | |
condition, | |
update: updateToInsert, | |
}); | |
updates.push({ | |
condition, | |
update: updateToInsert, | |
}); | |
} | |
if (patch.remove.length) { | |
const updateToRemove = { | |
condition, | |
update: { | |
$pullAll: { tokens: patch.remove }, | |
}, | |
}; | |
debugIdentity.child('query.bulk.patch')({ | |
condition, | |
update: updateToRemove, | |
}); | |
updates.push(updateToRemove); | |
} | |
return null; | |
}); | |
if (updates.length) { | |
const collection = await mongo.getCollection(owner); | |
const bulk = collection.initializeOrderedBulkOp(); | |
updates.forEach(({ condition, update }) => { | |
bulk | |
.find(condition) | |
.updateOne(update); | |
}); | |
await bulk.execute(); | |
} | |
debugIdentity({ updates }); | |
}); | |
await Promise.all(promises); | |
}; | |
} | |
function identifySameEntities(entityIds) { | |
const entities = entityIds.map(({ entity }) => entity); | |
const identityCollection = IdentityCollection(entities); | |
const debugLead = debugIdentity.child('lead'); | |
entityIds.forEach((entityId) => { | |
const candidateEntities = identityCollection | |
.getEntities(entityId); | |
const ids = candidateEntities | |
.map(({ id }) => id); | |
ids.push(entityId.entity.id); | |
const [leadId] = ids.sort(); | |
debugLead({ | |
leadId, | |
candidatesToBeLeadId: ids, | |
}); | |
entityId.entity.id = leadId; | |
entityId.entity.meta.item.id = leadId; | |
candidateEntities.forEach((entity) => { | |
entity.id = leadId; | |
entity.meta.item.id = leadId; | |
}); | |
}); | |
return entityIds; | |
} | |
function ProcessIdentityItems({ mongo }) { | |
const findCandidatesByEntityIds = FindCandidatesByEntityIds({ mongo }); | |
const indexEntityIds = IndexEntityIds({ mongo }); | |
const updateIndex = UpdateIndex({ mongo }); | |
return async function processIdentityItems(items) { | |
const entityIds = identifySameEntities(buildEntityIds(items)); | |
const candidates = await findCandidatesByEntityIds(entityIds); | |
debugIdentity({ candidates }); | |
const identityCollection = IdentityCollection(candidates.map(IndexEntity)); | |
// identification | |
const identification = entityIds.map((entityId) => { | |
const candidateEntity = identityCollection.getEntity(entityId); | |
return { | |
entityId, | |
candidateEntity, | |
}; | |
}); | |
// not identified case | |
const entityIdsToIndex = identification | |
.filter(({ candidateEntity }) => !candidateEntity) | |
.map(({ entityId }) => entityId); | |
// identified case | |
const entityIdUpdates = identification | |
.filter(({ candidateEntity }) => candidateEntity) | |
.map(({ candidateEntity, entityId }) => { | |
// identify / update entity and item | |
const { id } = candidateEntity; | |
entityId.entity.id = id; | |
entityId.entity.meta.item.id = id; | |
debugIdentity({ identified: id }); | |
const candidateTokens = entityId.toTokens(candidateEntity); | |
const entityTokens = entityId.toTokens(entityId.entity); | |
const patch = buildPatch({ | |
candidateTokens, | |
entityTokens, | |
}); | |
debugIdentity({ patch }); | |
return { | |
entityId, | |
owner: entityId.entity.owner, | |
id, | |
patch, | |
}; | |
}); | |
await indexEntityIds(entityIdsToIndex); | |
await updateIndex(entityIdUpdates); | |
return items.map((item) => { | |
const { record, id, recordData } = item; | |
record.updateValues(recordData); | |
record.updateValues({ master_id: id }); | |
return item; | |
}); | |
}; | |
} | |
module.exports = { | |
ProcessIdentityItems, | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment