Skip to content

Instantly share code, notes, and snippets.

@itissid
Created November 24, 2010 02:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save itissid/712976 to your computer and use it in GitHub Desktop.
Save itissid/712976 to your computer and use it in GitHub Desktop.
Sample code
for(doc=0 ; doc<this.docs.length; doc++ ){
var self = this;
//Need to scale as too many connections are formed too quickly.. Server will drown...
(function testServer(data, docnum){
if(self.openConn < 300){
var s = new net.Stream();
s.setEncoding('utf8');
s.on('connect', function(){
console.log('***Sending data for document:'+docnum);
console.log(data.length);
s.write(data+'\n', encoding='utf8');
})
//handler to recieve the data back after the inference. /Recieve the inference results.
s.on('data', function(data){
console.log('*******DATA RECIEVED FOR:'+docnum);
console.log(data);
s.end();
})
s.on('close', function(had_error){
self.openConn--;
//Slot is open, grab it and make a connection.
if(self.openConn < 300 && self.processingQueue.length>0){
//pop a connection and
var arr_t = self.processingQueue.pop();
console.log('Processing queue doc ID:',arr_t[1]);
console.log('#open connections:'+self.openConn);
testServer(arr_t[0], arr_t[1]);
}
})
self.openConn++;
s.connect(socketPath);
}else{
console.log('Pushing docnum: '+docnum+' to queue')
self.processingQueue.push([ data,docnum]);
}
})(this.docs[doc], doc);
}
/*
* server.cpp
*
* Created on: Nov 15, 2010
* Author: sid
*/
#include <fcntl.h>
#include <assert.h>
#include <string>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <errno.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <stddef.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/un.h>
#include <event2/event.h>
#include <event2/util.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>
//
#include <fstream>
#include <set>
#include <sstream>
#include "common.h"
#include "document.h"
#include "model.h"
#include "sampler.h"
#include "cmd_flags.h"
/*This will become the base class that will accept
* connections. This should act as a server. It will
* */
using namespace std;
using learning_lda::LDACorpus;
using learning_lda::LDAModel;
using learning_lda::LDAAccumulativeModel;
using learning_lda::LDASampler;
using learning_lda::LDADocument;
using learning_lda::LDACmdLineFlags;
using learning_lda::DocumentWordTopicsPB;
using learning_lda::RandInt;
static char* _SOCKNAME = "/tmp/ts1";
class LDAModelCommon{
public:
LDAModel& model;
map<string, int>& word_index_map;
LDACmdLineFlags& flags;
LDAModelCommon(LDAModel& model, map<string, int>& word_index_map, LDACmdLineFlags& flags):
model(model),word_index_map(word_index_map),flags(flags){};
};
class MainListener{
public:
static LDAModelCommon* commonData;
MainListener(){
/*TODO: Check if file exists*/
struct sockaddr_un name;
int sock;
printf("Running Topic model service on UNIX DOMAIN SOCKET: %s\n",_SOCKNAME);
char* filename = _SOCKNAME;
unlink(filename);
/* Create the STREAM socket. */
sock = socket (AF_UNIX, SOCK_STREAM, 0);
if (sock < 0){
perror ("socket file does not exist");
exit (EXIT_FAILURE);
}
/* Bind a name to the socket. */
name.sun_family = AF_UNIX;
strcpy(name.sun_path, filename);
int size = strlen(name.sun_path)
+ sizeof(name.sun_family) ;
//Bind now.. bind!
if (bind (sock, (struct sockaddr *)&name, size) < 0){
perror ("Unable to bind socket at this time");
exit (EXIT_FAILURE);
}
if (listen(sock, 16)<0) {
perror("listen");
return;
}
evutil_make_socket_nonblocking(sock);
#ifndef WIN32
{
int one = 1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
}
#endif
//Set up libevent
struct event *listener_event;
struct event_base* base;
base = event_base_new();
listener_event = event_new(base, sock, EV_READ|EV_PERSIST, MainListener::handleIncoming, (void*)base);
event_add(listener_event, NULL);
event_base_dispatch(base);
}
/*This will listen to incoming FD's and
* then read what is on. this is a test fnction for now...
* Need to make the recieved FD's work with libevent as well..
* */
void static handleIncoming(evutil_socket_t listener, short event, void *arg){
//recieve data from node.js
struct event_base *base = (struct event_base*)arg;
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listener, (struct sockaddr*)&ss, &slen);
if (fd < 0) {
perror("accept");
} else if (fd > FD_SETSIZE) {
close(fd);
} else {
struct bufferevent *bev;
evutil_make_socket_nonblocking(fd);
bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, MainListener::readDocumentAndInfer, NULL, MainListener::errorcb, NULL);
//bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE); Not now...
bufferevent_enable(bev, EV_READ|EV_WRITE);
}
}
//Event handler for reading a document and inferring from the model
void static readDocumentAndInfer(struct bufferevent *bev, void *ctx){
struct evbuffer *input, *output;
char* line;
size_t n;
//Not thread safe...
input = bufferevent_get_input(bev);
output = bufferevent_get_output(bev);
//Create temp references.
map<string, int>& word_index_map = MainListener::commonData->word_index_map;
LDAModel& model = MainListener::commonData->model;
LDACmdLineFlags& flags = MainListener::commonData->flags;
LDASampler sampler(flags.alpha_, flags.beta_, &model, NULL);
//Read a line representing a document run inference
//1) Topic distribution
//2) K Representative words from top 2 topics.
while ((line = evbuffer_readln(input, &n, EVBUFFER_EOL_LF))) {
using std::istringstream;
//std::cout<<"Received Document string of len "<<strlen(line)<<std::endl;//<<line;
istringstream ss((string(line)));
//Event/thread specifc datastructure
DocumentWordTopicsPB document_topics;
string word;
int count;
while (ss >> word >> count) {
// Load and init a document.
vector<int32> topics;
for (int i = 0; i < count; ++i) {
topics.push_back(RandInt(model.num_topics()));
}
map<string, int>::const_iterator iter = word_index_map.find(word);
if (iter != word_index_map.end()) {
//Mapping of words to there indices and each occurance to random topics.
document_topics.add_wordtopics(word, iter->second, topics);
}
}
LDADocument document(document_topics, model.num_topics());
TopicProbDistribution prob_dist(model.num_topics(), 0);
//Sampling is read only operations. So I dont need to worry about concurrency...
for (int iter = 0; iter < flags.total_iterations_; ++iter) {
sampler.SampleNewTopicsForDocument(&document, false);
if (iter >= flags.burn_in_iterations_) {
const vector<int64>& document_distribution =
document.topic_distribution();
for (int i = 0; i < document_distribution.size(); ++i) {
prob_dist[i] += document_distribution[i];
}
}
}
//50 most probable words... from 2 topics..
//Example implementtaion.
/*std::string mostprobTopK =
sampler.GenerateTopKTopicWords( prob_dist, word_index_map, 50).str();
*/
std::ostringstream s;
string topicdist("");
//Topic distribution line.
for (int topic = 0; topic < prob_dist.size(); ++topic) {
s << (prob_dist[topic] /
(flags.total_iterations_ - flags.burn_in_iterations_))<<
((topic < prob_dist.size() - 1) ? " " : " ");
}
std::string topic_dist_op = s.str();
topic_dist_op = topic_dist_op ;////+":::"+ mostprobTopK;
const char* finalop = topic_dist_op.c_str();
//cout<<finalop<<endl;
int ret = evbuffer_add(output, finalop, topic_dist_op.size());
ret = evbuffer_add(output, "\n", 1);
//cout<<"RET"<<ret<<endl;
}
//
}
//Error call back
void static errorcb(struct bufferevent *bev, short error, void *ctx){
if (error & BEV_EVENT_EOF) {
/* connection has been closed, do any clean up here */
/* ... */
//printf("%d",EVUTIL_SOCKET_ERROR());
std::cout<<"Connection is closed"<<std::endl;
} else if (error & BEV_EVENT_ERROR) {
/* check errno to see what error occurred */
/* ... */
EVUTIL_SOCKET_ERROR();
std::cout<<"Error:"<<error<<std::endl;
} else if (error & BEV_EVENT_TIMEOUT) {
/* must be a timeout event handle, handle it */
/* ... */
std::cout<<"Time out"<<std::endl;
}
bufferevent_free(bev);//freeing the buffered event
}
};
//common data structure...Read only so i dont have to worry about threading.
//Server command line:
//
LDAModelCommon* MainListener::commonData = NULL;
int main(int argc, char** argv){
//create the model...
//CReate an event base and drop into do accept
using std::ifstream;
using std::ofstream;
srand(time(NULL));
LDACmdLineFlags flags;
flags.ParseCmdFlags(argc, argv);
if (!flags.CheckInferringValidity()) {
return -1;
}
srand(time(NULL));
map<string, int> word_index_map;
ifstream model_fin(flags.model_file_.c_str());
LDAModel model(model_fin, &word_index_map);
LDAModelCommon* l_ModelCommon = new LDAModelCommon(model,word_index_map, flags );
MainListener::commonData = l_ModelCommon;
//Now start the server
new MainListener();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment