Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save meddulla/3770662 to your computer and use it in GitHub Desktop.
Save meddulla/3770662 to your computer and use it in GitHub Desktop.
Tiny recommendation engine using Map/Reduce, Kaffeine, NodeJS and Redis
sys = require "sys";
http = require "http";
url = require "url";
connect = require "connect";
redis = require "redis";
hashRing = require "hash_ring";
Sharding = {
servers = { "127.0.0.1 10000 6378": 1 };
ring = new hashRing.HashRing servers;
function getComponentsForEntry(entry) {
entry_ = entry.split " ";
return {
host: entry_[0],
http_port: parseInt entry_[1],
redis_port: parseInt entry_[2]
};
}
function getNodeForKey(key) {
getComponentsForEntry ring.getNode key;
}
function getRedisNodeForKey(key) {
entry = getNodeForKey key;
return {
host: entry.host,
port: entry.redis_port
};
}
function getHTTPNodeForKey(key) {
entry = getNodeForKey key;
return {
host: entry.host,
port: entry.http_port
};
}
function getLocalRedisNode(keys) {
typeof keys !== "Array" && (keys = [keys]);
if keys.length <= 0, throw "keys.length <= 0";
getRedisNodeForKey keys[0];
}
function splitToShards(items) {
shards = {};
items.forEach((item) {
entry = ring.getNode item;
shards[entry] ||= [];
shards[entry].push(item);
});
shards_ = [];
for entry in shards {
shards_.push shards[entry];
delete shards[entry];
}
shards_;
}
return {
"getRedisNodeForKey": getRedisNodeForKey,
"getHTTPNodeForKey": getHTTPNodeForKey,
"getLocalRedisNode": getLocalRedisNode,
"splitToShards": splitToShards
};
}();
RedisPool = {
pool = { };
function _returnNode(node) {
pool_key = "#{node.host}:#{node.port}";
if pool[pool_key], return pool[pool_key];
redisClient = redis.createClient node.port, node.host;
pool[pool_key] = redisClient;
}
function getRedisClientForKey(key) {
node = Sharding.getRedisNodeForKey key;
_returnNode node;
}
function getLocalRedisClient(keys) {
node = Sharding.getLocalRedisNode keys;
_returnNode node;
}
return {
"getRedisClientForKey": getRedisClientForKey,
"getLocalRedisClient": getLocalRedisClient
};
}();
function Recommendation(userId, relatedUsersIds) {
iRelatedAccumulator = { }, cutoff = 1, limit = 50;
function reduce(iRelatedAccumulator, cb) {
contenders = [];
for iRelatedUserId in iRelatedAccumulator {
count = iRelatedAccumulator[iRelatedUserId];
if count <= cutoff, continue;
contenders.push [iRelatedUserId, count];
}
iRelatedAccumulator = null;
contenders.sort (a, b) { b[1] - a[1] };
contenders = contenders.slice(0, limit - 1).map((a) { a[0] });
cb contenders;
}
function countIRelatedUsersIds(sres, cb) {
sres.forEach((row) {
row.forEach((iRelatedUserId) {
if relatedUsersIds.indexOf(iRelatedUserId) === -1 {
iRelatedAccumulator[iRelatedUserId] ||= 0;
iRelatedAccumulator[iRelatedUserId]++;
}
});
});
delete iRelatedAccumulator[userId];
cb iRelatedAccumulator;
}
function getIRelatedUsersIds(cb) {
if relatedUsersIds.length <= 0 {
countIRelatedUsersIds [], cb;
return;
}
redisClient = RedisPool.getLocalRedisClient relatedUsersIds;
multi = redisClient.multi();
relatedUsersIds.forEach((relatedUserId) {
multi.sort relatedUserId;
});
multi.exec((err, sres) {
err ? console.err(err) : countIRelatedUsersIds(sres, cb);
});
}
return {
"getIRelatedUsersIds": getIRelatedUsersIds,
"reduce": reduce
};
}
function MapClient(userId, relatedUsersIds, cb) {
data = "";
node = Sharding.getHTTPNodeForKey userId;
req = http.request({
host: node.host,
port: node.port,
path: "/related/map/#{encodeURIComponent userId}.json",
method: "GET"
}, (cres) {
cres.on "data", (chunk) { data += chunk };
cres.on "end", { cb data };
});
j = JSON.stringify relatedUsersIds;
req.setHeader "Content-Length", j.length;
req.write j;
req.end();
}
function Aggregation(userId) {
function handleAggRelated_(userId, relatedUsersIds, cb) {
requestCount = 0, resultCount = 0;
iAggRelatedAccumulator = { };
function onAggAllResults() {
recommendation = new Recommendation userId, relatedUsersIds;
recommendation.reduce iAggRelatedAccumulator, cb;
}
function onAggOneResult(data) {
var iRelatedAccumulator;
try {
iRelatedAccumulator = JSON.parse data;
} catch e {
console.log "#{e}: [#{data}]";
return;
}
for iRelatedUserId in iRelatedAccumulator {
iAggRelatedAccumulator[iRelatedUserId] ||= 0;
iAggRelatedAccumulator[iRelatedUserId] +=
iRelatedAccumulator[iRelatedUserId];
}
resultCount++;
resultCount >= requestCount && onAggAllResults();
}
relatedUsersIds.length || onAggOneResult JSON.stringify [ ];
Sharding.splitToShards(relatedUsersIds).forEach((shard) {
requestCount++;
new MapClient userId, shard, onAggOneResult;
});
}
function getRelatedUsersIdsTo(userId, cb) {
redisClient = RedisPool.getRedisClientForKey userId;
redisClient.sort userId, cb;
}
function handleAggRelated(cb) {
getRelatedUsersIdsTo(userId, (err, relatedUsersIds) {
handleAggRelated_ userId, relatedUsersIds, cb;
});
}
return {
"handleAggRelated": handleAggRelated
};
}
function ClientContext(req, res) {
return {
req: req,
res: res
};
}
function Server(port) {
function handleRelated(clientContext, userId, relatedUsersIds) {
recommendation = new Recommendation userId, relatedUsersIds;
iRelatedAccumulator = recommendation.getIRelatedUsersIds!();
clientContext.res.writeHead 200, {"Content-Type": "json"};
clientContext.res.end(JSON.stringify iRelatedAccumulator);
}
function routeForRelated(req, res) {
clientContext = new ClientContext req, res;
data = "";
req.addListener "data", (chunk) { data += chunk };
req.addListener! "end";
var relatedUsersIds;
try {
relatedUsersIds = JSON.parse data;
} catch e {
console.log data;
clientContext.res.writeHead 400, {"Content-Type": "text/plain"};
clientContext.res.end "Bogus relatedsUsersIds: [#{data}]";
return;
}
handleRelated clientContext, clientContext.req.params.userId,
relatedUsersIds;
}
function routeForAggRelated(req, res) {
clientContext = new ClientContext req, res;
aggregation = new Aggregation clientContext.req.params.userId;
contenders = aggregation.handleAggRelated!();
clientContext.res.writeHead 200, {"Content-Type": "json"};
clientContext.res.end(JSON.stringify contenders);
}
function routesForRoot(app) {
app.get "/related/map/:userId.:format?", routeForRelated;
app.get "/related/:userId.:format?", routeForAggRelated;
}
server = connect.createServer();
server.use "/", connect.router routesForRoot;
server.listen port;
}
new Server 10000;
sys = require "sys";
http = require "http";
url = require "url";
connect = require "connect";
redis = require "redis";
hashRing = require "hash_ring";
Sharding = {
servers = { "127.0.0.1 10000 6378": 1 };
ring = new hashRing.HashRing servers;
function getComponentsForEntry(entry) {
entry_ = entry.split " ";
return {
host: entry_[0],
http_port: parseInt entry_[1],
redis_port: parseInt entry_[2]
};
}
function getNodeForKey(key) {
getComponentsForEntry ring.getNode key;
}
function getRedisNodeForKey(key) {
entry = getNodeForKey key;
return {
host: entry.host,
port: entry.redis_port
};
}
function getHTTPNodeForKey(key) {
entry = getNodeForKey key;
return {
host: entry.host,
port: entry.http_port
};
}
function getLocalRedisNode(keys) {
typeof keys !== "Array" && (keys = [keys]);
if keys.length <= 0, throw "keys.length <= 0";
getRedisNodeForKey keys[0];
}
function splitToShards(items) {
shards = {};
items.forEach((item) {
entry = ring.getNode item;
shards[entry] ||= [];
shards[entry].push(item);
});
shards_ = [];
for entry in shards {
shards_.push shards[entry];
delete shards[entry];
}
shards_;
}
return {
"getRedisNodeForKey": getRedisNodeForKey,
"getHTTPNodeForKey": getHTTPNodeForKey,
"getLocalRedisNode": getLocalRedisNode,
"splitToShards": splitToShards
};
}();
RedisPool = {
pool = { };
function _returnNode(node) {
pool_key = "#{node.host}:#{node.port}";
if pool[pool_key], return pool[pool_key];
redisClient = redis.createClient node.port, node.host;
pool[pool_key] = redisClient;
}
function getRedisClientForKey(key) {
node = Sharding.getRedisNodeForKey key;
_returnNode node;
}
function getLocalRedisClient(keys) {
node = Sharding.getLocalRedisNode keys;
_returnNode node;
}
return {
"getRedisClientForKey": getRedisClientForKey,
"getLocalRedisClient": getLocalRedisClient
};
}();
function Recommendation(userId, relatedUsersIds) {
iRelatedAccumulator = { }, cutoff = 1, limit = 50;
function reduce(iRelatedAccumulator, cb) {
contenders = [];
for iRelatedUserId in iRelatedAccumulator {
count = iRelatedAccumulator[iRelatedUserId];
if count <= cutoff, continue;
contenders.push [iRelatedUserId, count];
}
iRelatedAccumulator = null;
contenders.sort (a, b) { b[1] - a[1] };
contenders = contenders.slice(0, limit - 1).map((a) { a[0] });
cb contenders;
}
function countIRelatedUsersIds(sres, cb) {
sres.forEach((row) {
row.forEach((iRelatedUserId) {
if relatedUsersIds.indexOf(iRelatedUserId) === -1 {
iRelatedAccumulator[iRelatedUserId] ||= 0;
iRelatedAccumulator[iRelatedUserId]++;
}
});
});
delete iRelatedAccumulator[userId];
cb iRelatedAccumulator;
}
function getIRelatedUsersIds(cb) {
if relatedUsersIds.length <= 0 {
countIRelatedUsersIds [], cb;
return;
}
redisClient = RedisPool.getLocalRedisClient relatedUsersIds;
multi = redisClient.multi();
relatedUsersIds.forEach((relatedUserId) {
multi.sort relatedUserId;
});
multi.exec((err, sres) {
err ? console.err(err) : countIRelatedUsersIds(sres, cb);
});
}
return {
"getIRelatedUsersIds": getIRelatedUsersIds,
"reduce": reduce
};
}
function MapClient(userId, relatedUsersIds, cb) {
data = "";
node = Sharding.getHTTPNodeForKey userId;
req = http.request({
host: node.host,
port: node.port,
path: "/related/map/#{encodeURIComponent userId}.json",
method: "GET"
}, (cres) {
cres.on "data", (chunk) { data += chunk };
cres.on "end", { cb data };
});
j = JSON.stringify relatedUsersIds;
req.setHeader "Content-Length", j.length;
req.write j;
req.end();
}
function Aggregation(userId) {
function handleAggRelated_(userId, relatedUsersIds, cb) {
requestCount = 0, resultCount = 0;
iAggRelatedAccumulator = { };
function onAggAllResults() {
recommendation = new Recommendation userId, relatedUsersIds;
recommendation.reduce iAggRelatedAccumulator, cb;
}
function onAggOneResult(data) {
var iRelatedAccumulator;
try {
iRelatedAccumulator = JSON.parse data;
} catch e {
console.log "#{e}: [#{data}]";
return;
}
for iRelatedUserId in iRelatedAccumulator {
iAggRelatedAccumulator[iRelatedUserId] ||= 0;
iAggRelatedAccumulator[iRelatedUserId] +=
iRelatedAccumulator[iRelatedUserId];
}
resultCount++;
resultCount >= requestCount && onAggAllResults();
}
relatedUsersIds.length || onAggOneResult JSON.stringify [ ];
Sharding.splitToShards(relatedUsersIds).forEach((shard) {
requestCount++;
new MapClient userId, shard, onAggOneResult;
});
}
function getRelatedUsersIdsTo(userId, cb) {
redisClient = RedisPool.getRedisClientForKey userId;
redisClient.sort userId, cb;
}
function handleAggRelated(cb) {
getRelatedUsersIdsTo(userId, (err, relatedUsersIds) {
handleAggRelated_ userId, relatedUsersIds, cb;
});
}
return {
"handleAggRelated": handleAggRelated
};
}
function ClientContext(req, res) {
return {
req: req,
res: res
};
}
function Server(port) {
function handleRelated(clientContext, userId, relatedUsersIds) {
recommendation = new Recommendation userId, relatedUsersIds;
iRelatedAccumulator = recommendation.getIRelatedUsersIds!();
clientContext.res.writeHead 200, {"Content-Type": "json"};
clientContext.res.end(JSON.stringify iRelatedAccumulator);
}
function routeForRelated(req, res) {
clientContext = new ClientContext req, res;
data = "";
req.addListener "data", (chunk) { data += chunk };
req.addListener! "end";
var relatedUsersIds;
try {
relatedUsersIds = JSON.parse data;
} catch e {
console.log data;
clientContext.res.writeHead 400, {"Content-Type": "text/plain"};
clientContext.res.end "Bogus relatedsUsersIds: [#{data}]";
return;
}
handleRelated clientContext, clientContext.req.params.userId,
relatedUsersIds;
}
function routeForAggRelated(req, res) {
clientContext = new ClientContext req, res;
aggregation = new Aggregation clientContext.req.params.userId;
contenders = aggregation.handleAggRelated!();
clientContext.res.writeHead 200, {"Content-Type": "json"};
clientContext.res.end(JSON.stringify contenders);
}
function routesForRoot(app) {
app.get "/related/map/:userId.:format?", routeForRelated;
app.get "/related/:userId.:format?", routeForAggRelated;
}
server = connect.createServer();
server.use "/", connect.router routesForRoot;
server.listen port;
}
new Server 10000;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment