Created
November 24, 2010 02:08
-
-
Save itissid/712976 to your computer and use it in GitHub Desktop.
Sample code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for(doc=0 ; doc<this.docs.length; doc++ ){ | |
var self = this; | |
//Need to scale as too many connections are formed too quickly.. Server will drown... | |
(function testServer(data, docnum){ | |
if(self.openConn < 300){ | |
var s = new net.Stream(); | |
s.setEncoding('utf8'); | |
s.on('connect', function(){ | |
console.log('***Sending data for document:'+docnum); | |
console.log(data.length); | |
s.write(data+'\n', encoding='utf8'); | |
}) | |
//handler to recieve the data back after the inference. /Recieve the inference results. | |
s.on('data', function(data){ | |
console.log('*******DATA RECIEVED FOR:'+docnum); | |
console.log(data); | |
s.end(); | |
}) | |
s.on('close', function(had_error){ | |
self.openConn--; | |
//Slot is open, grab it and make a connection. | |
if(self.openConn < 300 && self.processingQueue.length>0){ | |
//pop a connection and | |
var arr_t = self.processingQueue.pop(); | |
console.log('Processing queue doc ID:',arr_t[1]); | |
console.log('#open connections:'+self.openConn); | |
testServer(arr_t[0], arr_t[1]); | |
} | |
}) | |
self.openConn++; | |
s.connect(socketPath); | |
}else{ | |
console.log('Pushing docnum: '+docnum+' to queue') | |
self.processingQueue.push([ data,docnum]); | |
} | |
})(this.docs[doc], doc); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* server.cpp | |
* | |
* Created on: Nov 15, 2010 | |
* Author: sid | |
*/ | |
#include <fcntl.h> | |
#include <assert.h> | |
#include <string> | |
#include <stdlib.h> | |
#include <stdio.h> | |
#include <iostream> | |
#include <errno.h> | |
#include <sys/socket.h> | |
#include <sys/types.h> | |
#include <unistd.h> | |
#include <stddef.h> | |
#include <sys/types.h> | |
#include <sys/stat.h> | |
#include <sys/un.h> | |
#include <event2/event.h> | |
#include <event2/util.h> | |
#include <event2/buffer.h> | |
#include <event2/bufferevent.h> | |
// | |
#include <fstream> | |
#include <set> | |
#include <sstream> | |
#include "common.h" | |
#include "document.h" | |
#include "model.h" | |
#include "sampler.h" | |
#include "cmd_flags.h" | |
/*This will become the base class that will accept | |
* connections. This should act as a server. It will | |
* */ | |
using namespace std; | |
using learning_lda::LDACorpus; | |
using learning_lda::LDAModel; | |
using learning_lda::LDAAccumulativeModel; | |
using learning_lda::LDASampler; | |
using learning_lda::LDADocument; | |
using learning_lda::LDACmdLineFlags; | |
using learning_lda::DocumentWordTopicsPB; | |
using learning_lda::RandInt; | |
static char* _SOCKNAME = "/tmp/ts1"; | |
class LDAModelCommon{ | |
public: | |
LDAModel& model; | |
map<string, int>& word_index_map; | |
LDACmdLineFlags& flags; | |
LDAModelCommon(LDAModel& model, map<string, int>& word_index_map, LDACmdLineFlags& flags): | |
model(model),word_index_map(word_index_map),flags(flags){}; | |
}; | |
class MainListener{ | |
public: | |
static LDAModelCommon* commonData; | |
MainListener(){ | |
/*TODO: Check if file exists*/ | |
struct sockaddr_un name; | |
int sock; | |
printf("Running Topic model service on UNIX DOMAIN SOCKET: %s\n",_SOCKNAME); | |
char* filename = _SOCKNAME; | |
unlink(filename); | |
/* Create the STREAM socket. */ | |
sock = socket (AF_UNIX, SOCK_STREAM, 0); | |
if (sock < 0){ | |
perror ("socket file does not exist"); | |
exit (EXIT_FAILURE); | |
} | |
/* Bind a name to the socket. */ | |
name.sun_family = AF_UNIX; | |
strcpy(name.sun_path, filename); | |
int size = strlen(name.sun_path) | |
+ sizeof(name.sun_family) ; | |
//Bind now.. bind! | |
if (bind (sock, (struct sockaddr *)&name, size) < 0){ | |
perror ("Unable to bind socket at this time"); | |
exit (EXIT_FAILURE); | |
} | |
if (listen(sock, 16)<0) { | |
perror("listen"); | |
return; | |
} | |
evutil_make_socket_nonblocking(sock); | |
#ifndef WIN32 | |
{ | |
int one = 1; | |
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); | |
} | |
#endif | |
//Set up libevent | |
struct event *listener_event; | |
struct event_base* base; | |
base = event_base_new(); | |
listener_event = event_new(base, sock, EV_READ|EV_PERSIST, MainListener::handleIncoming, (void*)base); | |
event_add(listener_event, NULL); | |
event_base_dispatch(base); | |
} | |
/*This will listen to incoming FD's and | |
* then read what is on. this is a test fnction for now... | |
* Need to make the recieved FD's work with libevent as well.. | |
* */ | |
void static handleIncoming(evutil_socket_t listener, short event, void *arg){ | |
//recieve data from node.js | |
struct event_base *base = (struct event_base*)arg; | |
struct sockaddr_storage ss; | |
socklen_t slen = sizeof(ss); | |
int fd = accept(listener, (struct sockaddr*)&ss, &slen); | |
if (fd < 0) { | |
perror("accept"); | |
} else if (fd > FD_SETSIZE) { | |
close(fd); | |
} else { | |
struct bufferevent *bev; | |
evutil_make_socket_nonblocking(fd); | |
bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); | |
bufferevent_setcb(bev, MainListener::readDocumentAndInfer, NULL, MainListener::errorcb, NULL); | |
//bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE); Not now... | |
bufferevent_enable(bev, EV_READ|EV_WRITE); | |
} | |
} | |
//Event handler for reading a document and inferring from the model | |
void static readDocumentAndInfer(struct bufferevent *bev, void *ctx){ | |
struct evbuffer *input, *output; | |
char* line; | |
size_t n; | |
//Not thread safe... | |
input = bufferevent_get_input(bev); | |
output = bufferevent_get_output(bev); | |
//Create temp references. | |
map<string, int>& word_index_map = MainListener::commonData->word_index_map; | |
LDAModel& model = MainListener::commonData->model; | |
LDACmdLineFlags& flags = MainListener::commonData->flags; | |
LDASampler sampler(flags.alpha_, flags.beta_, &model, NULL); | |
//Read a line representing a document run inference | |
//1) Topic distribution | |
//2) K Representative words from top 2 topics. | |
while ((line = evbuffer_readln(input, &n, EVBUFFER_EOL_LF))) { | |
using std::istringstream; | |
//std::cout<<"Received Document string of len "<<strlen(line)<<std::endl;//<<line; | |
istringstream ss((string(line))); | |
//Event/thread specifc datastructure | |
DocumentWordTopicsPB document_topics; | |
string word; | |
int count; | |
while (ss >> word >> count) { | |
// Load and init a document. | |
vector<int32> topics; | |
for (int i = 0; i < count; ++i) { | |
topics.push_back(RandInt(model.num_topics())); | |
} | |
map<string, int>::const_iterator iter = word_index_map.find(word); | |
if (iter != word_index_map.end()) { | |
//Mapping of words to there indices and each occurance to random topics. | |
document_topics.add_wordtopics(word, iter->second, topics); | |
} | |
} | |
LDADocument document(document_topics, model.num_topics()); | |
TopicProbDistribution prob_dist(model.num_topics(), 0); | |
//Sampling is read only operations. So I dont need to worry about concurrency... | |
for (int iter = 0; iter < flags.total_iterations_; ++iter) { | |
sampler.SampleNewTopicsForDocument(&document, false); | |
if (iter >= flags.burn_in_iterations_) { | |
const vector<int64>& document_distribution = | |
document.topic_distribution(); | |
for (int i = 0; i < document_distribution.size(); ++i) { | |
prob_dist[i] += document_distribution[i]; | |
} | |
} | |
} | |
//50 most probable words... from 2 topics.. | |
//Example implementtaion. | |
/*std::string mostprobTopK = | |
sampler.GenerateTopKTopicWords( prob_dist, word_index_map, 50).str(); | |
*/ | |
std::ostringstream s; | |
string topicdist(""); | |
//Topic distribution line. | |
for (int topic = 0; topic < prob_dist.size(); ++topic) { | |
s << (prob_dist[topic] / | |
(flags.total_iterations_ - flags.burn_in_iterations_))<< | |
((topic < prob_dist.size() - 1) ? " " : " "); | |
} | |
std::string topic_dist_op = s.str(); | |
topic_dist_op = topic_dist_op ;////+":::"+ mostprobTopK; | |
const char* finalop = topic_dist_op.c_str(); | |
//cout<<finalop<<endl; | |
int ret = evbuffer_add(output, finalop, topic_dist_op.size()); | |
ret = evbuffer_add(output, "\n", 1); | |
//cout<<"RET"<<ret<<endl; | |
} | |
// | |
} | |
//Error call back | |
void static errorcb(struct bufferevent *bev, short error, void *ctx){ | |
if (error & BEV_EVENT_EOF) { | |
/* connection has been closed, do any clean up here */ | |
/* ... */ | |
//printf("%d",EVUTIL_SOCKET_ERROR()); | |
std::cout<<"Connection is closed"<<std::endl; | |
} else if (error & BEV_EVENT_ERROR) { | |
/* check errno to see what error occurred */ | |
/* ... */ | |
EVUTIL_SOCKET_ERROR(); | |
std::cout<<"Error:"<<error<<std::endl; | |
} else if (error & BEV_EVENT_TIMEOUT) { | |
/* must be a timeout event handle, handle it */ | |
/* ... */ | |
std::cout<<"Time out"<<std::endl; | |
} | |
bufferevent_free(bev);//freeing the buffered event | |
} | |
}; | |
//common data structure...Read only so i dont have to worry about threading. | |
//Server command line: | |
// | |
LDAModelCommon* MainListener::commonData = NULL; | |
int main(int argc, char** argv){ | |
//create the model... | |
//CReate an event base and drop into do accept | |
using std::ifstream; | |
using std::ofstream; | |
srand(time(NULL)); | |
LDACmdLineFlags flags; | |
flags.ParseCmdFlags(argc, argv); | |
if (!flags.CheckInferringValidity()) { | |
return -1; | |
} | |
srand(time(NULL)); | |
map<string, int> word_index_map; | |
ifstream model_fin(flags.model_file_.c_str()); | |
LDAModel model(model_fin, &word_index_map); | |
LDAModelCommon* l_ModelCommon = new LDAModelCommon(model,word_index_map, flags ); | |
MainListener::commonData = l_ModelCommon; | |
//Now start the server | |
new MainListener(); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment