Skip to content

Instantly share code, notes, and snippets.

@jszaday
Last active June 9, 2020 01:24
Show Gist options
  • Save jszaday/614ab2705f504d277a096556c22505dd to your computer and use it in GitHub Desktop.
Save jszaday/614ab2705f504d277a096556c22505dd to your computer and use it in GitHub Desktop.
Straightforward Channels API for Charm++
#include "channels.h"
#include "channels.def.h"
Main::Main(CkArgMsg*) {
// Initialize read only variables
nElems = 4;
mainProxy = thisProxy;
// Create chares and chare arrays
channel = CProxy_Channel<int>::ckNew();
ring = CProxy_Ring::ckNew(channel, nElems);
// Setup a callback when a value is sent to the channel
CkCallback cb(CkIndex_Main::recvFromChannel(0), mainProxy);
channel.receive(cb);
// And send a value to the channel
channel.send(0);
}
void Main::recvFromChannel(int value) {
CkPrintf("[MAIN] Received the value %d from the channel.\n", value);
ring.run(value);
}
void Main::done() {
if (!hasRun) {
hasRun = true;
ring.run(nElems);
} else {
CkExit();
}
}
void Ring::run(int offset) {
thisProxy[thisIndex].exchange(thisIndex + offset);
}
void Ring::exchange(int i) {
if (i % 2 == 0) {
CkPrintf("[%d] Sending the value %d via the channel.\n", thisIndex, i);
channel.send(i);
} else {
CkPrintf("[%d] Received the value %d from the channel.\n", thisIndex, channel.receive());
}
CkCallback cb(CkIndex_Main::done(), mainProxy);
contribute(cb);
}
mainmodule channels {
readonly int nElems;
readonly CProxy_Main mainProxy;
mainchare Main {
entry Main(CkArgMsg*);
entry [reductiontarget] void recvFromChannel(int);
entry [reductiontarget] void done();
};
template <typename T>
chare Channel {
entry Channel();
entry void send(T);
entry [sync] T receive();
entry [threaded] void receive(CkCallback cb);
};
array [1D] Ring {
entry Ring(CProxy_Channel<int>);
entry void run(int);
entry [threaded] void exchange(int);
};
chare Channel<int>;
}
#include "channels.decl.h"
#include <queue>
int nElems;
CProxy_Main mainProxy;
class Main : public CBase_Main
{
public:
Main(CkArgMsg*);
Main(CkMigrateMessage*) {}
void done();
void recvFromChannel(int);
private:
CProxy_Ring ring;
CProxy_Channel<int> channel;
bool hasRun;
};
template<typename T>
class Channel : public CBase_Channel<T>
{
public:
Channel() {}
Channel(CkMigrateMessage*) {}
void send(T value) {
data.push(value);
if (!waiting.empty()) {
auto front = waiting.front();
waiting.pop();
CthAwaken(front);
}
}
inline T waitForData() {
if (data.empty()) {
waiting.push(CthSelf());
CthSuspend();
}
auto front = data.front();
data.pop();
return front;
}
T receive() {
return waitForData();
}
void receive(CkCallback cb) {
auto value = waitForData();
PUP::sizer szr;
szr | value;
CkMarshallMsg *msg = CkAllocateMarshallMsg(szr.size(), NULL);
PUP::toMem ppr((void *)msg->msgBuf);
ppr | value;
cb.send(msg);
}
private:
std::queue<CthThread> waiting;
std::queue<T> data;
};
class Ring : public CBase_Ring
{
public:
Ring(CProxy_Channel<int> channel_) : channel(channel_) { }
Ring(CkMigrateMessage*) {}
void run(int);
void exchange(int);
private:
CProxy_Channel<int> channel;
};
#define CK_TEMPLATES_ONLY
#include "channels.def.h"
#undef CK_TEMPLATES_ONLY
CHARMC = $(CHARM_HOME)/bin/charmc
BINARY = channels
CHARMCFLAGS = $(OPTS)
CHARMCLINKFLAGS = -language charm++ $(OPTS)
TESTFLAGS = ++local +p2
%.o: %.C
all: $(BINARY)
$(BINARY): channels.o
$(CHARMC) $(CHARMCLINKFLAGS) -o $(BINARY) channels.o
channels.decl.h: channels.ci
$(CHARMC) $(CHARMCFLAGS) channels.ci
channels.o: channels.C channels.h channels.decl.h
$(CHARMC) $(CHARMCFLAGS) -c channels.C
test: $(BINARY)
./charmrun $(TESTFLAGS) $(BINARY)
clean:
rm -f *.o *.decl.h *.def.h charmrun $(BINARY)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment