Skip to content

Instantly share code, notes, and snippets.

@tinnefeld
Created February 15, 2012 12:23
Show Gist options
  • Save tinnefeld/1835351 to your computer and use it in GitHub Desktop.
Save tinnefeld/1835351 to your computer and use it in GitHub Desktop.
Increment Operator for RAMCloud Patch file for 7d19140
From fa6b3028135f535393ea67ca4b6db9bec8bc48d0 Mon Sep 17 00:00:00 2001
From: Christian Tinnefeld <tinnefeld@gmail.com>
Date: Tue, 14 Feb 2012 12:17:02 -0500
Subject: [PATCH] Added increment operator
---
GNUmakefile | 2 +-
scripts/rawmetrics.py | 1 +
src/MasterClient.cc | 30 ++++++++++++++++++++++++
src/MasterClient.h | 3 ++
src/MasterService.cc | 60 +++++++++++++++++++++++++++++++++++++++++++++++++
src/MasterService.h | 3 ++
src/RamCloud.cc | 10 ++++++++
src/RamCloud.h | 3 ++
src/Rpc.cc | 1 +
src/Rpc.h | 21 ++++++++++++++++-
src/RpcTest.cc | 2 +-
src/ServerMain.cc | 2 +-
12 files changed, 134 insertions(+), 4 deletions(-)
diff --git a/GNUmakefile b/GNUmakefile
index 21b1435..c090711 100644
--- a/GNUmakefile
+++ b/GNUmakefile
@@ -83,7 +83,7 @@ DOXYGEN ?= doxygen
# Test whether Infiniband support is available. Avoids using $(COMFLAGS)
# (particularly, -MD) which results in bad interactions with mergedeps.
-INFINIBAND = $(shell $(CXX) $(INCLUDES) $(EXTRACXXFLAGS) $(LIBS) -libverbs \
+INFINIBAND = no #$(shell $(CXX) $(INCLUDES) $(EXTRACXXFLAGS) $(LIBS) -libverbs \
-o /dev/null src/HaveInfiniband.cc \
>/dev/null 2>&1 \
&& echo yes || echo no)
diff --git a/scripts/rawmetrics.py b/scripts/rawmetrics.py
index 3bf23ce..5ceee9f 100755
--- a/scripts/rawmetrics.py
+++ b/scripts/rawmetrics.py
@@ -327,6 +327,7 @@ rpc.metric('openTableTicks', 'time spent executing OPEN_TABLE RPC')
rpc.metric('dropTableTicks', 'time spent executing DROP_TABLE RPC')
rpc.metric('createTicks', 'time spent executing CREATE RPC')
rpc.metric('readTicks', 'time spent executing READ RPC')
+rpc.metric('incrementTicks', 'time spent executing INCREMENT RPC')
rpc.metric('writeTicks', 'time spent executing WRITE RPC')
rpc.metric('removeTicks', 'time spent executing REMOVE RPC')
rpc.metric('enlistServerTicks', 'time spent executing ENLIST_SERVER RPC')
diff --git a/src/MasterClient.cc b/src/MasterClient.cc
index c2d162c..acf93c0 100644
--- a/src/MasterClient.cc
+++ b/src/MasterClient.cc
@@ -318,6 +318,36 @@ MasterClient::read(uint32_t tableId, uint64_t id, Buffer* value,
Read(*this, tableId, id, value, rejectRules, version)();
}
+void
+MasterClient::increment(uint32_t tableId, uint64_t id, Buffer* value,
+ const RejectRules* rejectRules, uint64_t* version)
+{
+ Buffer req;
+
+ IncrementRpc::Request& reqHdr(allocHeader<IncrementRpc>(req));
+ reqHdr.tableId = tableId;
+ reqHdr.id = id;
+ reqHdr.rejectRules = rejectRules ? *rejectRules : defaultRejectRules;
+
+ value->reset();
+
+ AsyncState state = send<IncrementRpc>(session,
+ req,
+ *value);
+
+ const IncrementRpc::Response& respHdr(recv<IncrementRpc>(state));
+ if (version != NULL)
+ *version = respHdr.version;
+
+ value->truncateFront(sizeof(respHdr));
+ assert(respHdr.length == value->getTotalLength());
+
+ checkStatus(HERE);
+}
+
+
+
+
/**
* Read the current contents of multiple objects.
*
diff --git a/src/MasterClient.h b/src/MasterClient.h
index 358ddba..73d409b 100644
--- a/src/MasterClient.h
+++ b/src/MasterClient.h
@@ -196,6 +196,9 @@ class MasterClient : public Client {
uint64_t create(uint32_t tableId, const void* buf, uint32_t length,
uint64_t* version = NULL, bool async = false);
void fillWithTestData(uint32_t numObjects, uint32_t objectSize);
+ void increment(uint32_t tableId, uint64_t id, Buffer* value,
+ const RejectRules* rejectRules = NULL,
+ uint64_t* version = NULL);
void multiRead(std::vector<ReadObject*> requests);
void read(uint32_t tableId, uint64_t id, Buffer* value,
const RejectRules* rejectRules = NULL,
diff --git a/src/MasterService.cc b/src/MasterService.cc
index 6c11f0c..8ab9770 100644
--- a/src/MasterService.cc
+++ b/src/MasterService.cc
@@ -153,6 +153,10 @@ MasterService::dispatch(RpcOpcode opcode, Rpc& rpc)
callHandler<FillWithTestDataRpc, MasterService,
&MasterService::fillWithTestData>(rpc);
break;
+ case IncrementRpc::opcode:
+ callHandler<IncrementRpc, MasterService,
+ &MasterService::increment>(rpc);
+ break;
case MultiReadRpc::opcode:
callHandler<MultiReadRpc, MasterService,
&MasterService::multiRead>(rpc);
@@ -286,6 +290,62 @@ MasterService::fillWithTestData(const FillWithTestDataRpc::Request& reqHdr,
}
/**
+ * Top-level server method to handle the INCREMENT request.
+ * \copydetails create
+ */
+void
+MasterService::increment(const IncrementRpc::Request& reqHdr,
+ IncrementRpc::Response& respHdr,
+ Rpc& rpc)
+{
+ // We must return table doesn't exist if the table does not exist. Also, we
+ // might have an entry in the hash table that's invalid because its tablet
+ // no longer lives here.
+ if (getTable(reqHdr.tableId, reqHdr.id) == NULL) {
+ respHdr.common.status = STATUS_TABLE_DOESNT_EXIST;
+ return;
+ }
+
+ LogEntryHandle handle = objectMap.lookup(reqHdr.tableId, reqHdr.id);
+ if (handle == NULL || handle->type() != LOG_ENTRY_TYPE_OBJ) {
+ respHdr.common.status = STATUS_OBJECT_DOESNT_EXIST;
+ return;
+ }
+
+ const Object* obj = handle->userData<Object>();
+ respHdr.version = obj->version;
+ Status status = rejectOperation(reqHdr.rejectRules, obj->version);
+ if (status != STATUS_OK) {
+ respHdr.common.status = status;
+ return;
+ }
+
+ uint64_t value = 0;
+ if (obj->dataLength(handle->length())==sizeof(int64_t)) {
+
+ value = *((int64_t*)obj->data);
+ value++;
+ Buffer tmpBuffer;
+ Buffer::Chunk::appendToBuffer(&tmpBuffer, &value, sizeof(int64_t));
+
+ Status status = storeData(reqHdr.tableId, reqHdr.id, &reqHdr.rejectRules,
+ &tmpBuffer, 0,
+ static_cast<uint32_t>(sizeof(int64_t)),
+ &respHdr.version, true);
+ if (status != STATUS_OK) {
+ respHdr.common.status = status;
+ return;
+ }
+ }
+
+ Buffer::Chunk::appendToBuffer(&rpc.replyPayload,
+ &value, sizeof(uint64_t));
+ // TODO(ongaro): We'll need a new type of Chunk to block the cleaner
+ // from scribbling over obj->data.
+ respHdr.length = obj->dataLength(handle->length());
+}
+
+/**
* Top-level server method to handle the MULTIREAD request.
*
* \copydetails Service::ping
diff --git a/src/MasterService.h b/src/MasterService.h
index 2cb5232..f93d190 100644
--- a/src/MasterService.h
+++ b/src/MasterService.h
@@ -91,6 +91,9 @@ class MasterService : public Service {
void fillWithTestData(const FillWithTestDataRpc::Request& reqHdr,
FillWithTestDataRpc::Response& respHdr,
Rpc& rpc);
+ void increment(const IncrementRpc::Request& reqHdr,
+ IncrementRpc::Response& respHdr,
+ Rpc& rpc);
void multiRead(const MultiReadRpc::Request& reqHdr,
MultiReadRpc::Response& respHdr,
Rpc& rpc);
diff --git a/src/RamCloud.cc b/src/RamCloud.cc
index 9b46f92..1493fc5 100644
--- a/src/RamCloud.cc
+++ b/src/RamCloud.cc
@@ -213,6 +213,16 @@ RamCloud::read(uint32_t tableId, uint64_t id, Buffer* value,
return Read(*this, tableId, id, value, rejectRules, version)();
}
+/// \copydoc MasterClient::read
+void
+RamCloud::increment(uint32_t tableId, uint64_t id, Buffer* value,
+ const RejectRules* rejectRules, uint64_t* version)
+{
+ Context::Guard _(clientContext);
+ MasterClient master(objectFinder.lookupHead(tableId));
+ master.increment(tableId, id, value, rejectRules, version);
+}
+
/**
* Read the current contents of multiple objects.
*
diff --git a/src/RamCloud.h b/src/RamCloud.h
index db2649f..0e1af99 100644
--- a/src/RamCloud.h
+++ b/src/RamCloud.h
@@ -178,6 +178,9 @@ class RamCloud {
void read(uint32_t tableId, uint64_t id, Buffer* value,
const RejectRules* rejectRules = NULL,
uint64_t* version = NULL);
+ void increment(uint32_t tableId, uint64_t id, Buffer* value,
+ const RejectRules* rejectRules = NULL,
+ uint64_t* version = NULL);
void multiRead(MasterClient::ReadObject* requests[], uint32_t numRequests);
void remove(uint32_t tableId, uint64_t id,
const RejectRules* rejectRules = NULL,
diff --git a/src/Rpc.cc b/src/Rpc.cc
index f5a93bd..6f7cade 100644
--- a/src/Rpc.cc
+++ b/src/Rpc.cc
@@ -84,6 +84,7 @@ Rpc::opcodeSymbol(uint32_t opcode)
case REQUEST_SERVER_LIST: return "REQUEST_SERVER_LIST";
case GET_SERVER_ID: return "GET_SERVER_ID";
case ILLEGAL_RPC_TYPE: return "ILLEGAL_RPC_TYPE";
+ case INCREMENT: return "INCREMENT";
}
// Never heard of this RPC; return the numeric value. The shared buffer
diff --git a/src/Rpc.h b/src/Rpc.h
index 2d331d6..59198f9 100644
--- a/src/Rpc.h
+++ b/src/Rpc.h
@@ -96,7 +96,8 @@ enum RpcOpcode {
UPDATE_SERVER_LIST = 38,
REQUEST_SERVER_LIST = 39,
GET_SERVER_ID = 40,
- ILLEGAL_RPC_TYPE = 41, // 1 + the highest legitimate RpcOpcode
+ INCREMENT = 41,
+ ILLEGAL_RPC_TYPE = 42, // 1 + the highest legitimate RpcOpcode
};
/**
@@ -161,6 +162,24 @@ struct FillWithTestDataRpc {
} __attribute__((packed));
};
+struct IncrementRpc {
+ static const RpcOpcode opcode = INCREMENT;
+ static const ServiceType service = MASTER_SERVICE;
+ struct Request {
+ RpcRequestCommon common;
+ uint32_t tableId;
+ uint64_t id;
+ RejectRules rejectRules;
+ } __attribute__((packed));
+ struct Response {
+ RpcResponseCommon common;
+ uint64_t version;
+ uint32_t length; // Length of the object's value in bytes.
+ // The actual bytes of the object follow
+ // immediately after this header.
+ } __attribute__((packed));
+};
+
struct MultiReadRpc {
static const RpcOpcode opcode = MULTI_READ;
static const ServiceType service = MASTER_SERVICE;
diff --git a/src/RpcTest.cc b/src/RpcTest.cc
index 7bb3716..87325cc 100644
--- a/src/RpcTest.cc
+++ b/src/RpcTest.cc
@@ -44,7 +44,7 @@ TEST_F(RpcTest, opcodeSymbol_integer) {
EXPECT_STREQ("ILLEGAL_RPC_TYPE", Rpc::opcodeSymbol(ILLEGAL_RPC_TYPE));
// Test out-of-range values.
- EXPECT_STREQ("unknown(42)", Rpc::opcodeSymbol(ILLEGAL_RPC_TYPE+1));
+ EXPECT_STREQ("unknown(43)", Rpc::opcodeSymbol(ILLEGAL_RPC_TYPE+1));
// Make sure the next-to-last value is defined (this will fail if
// someone adds a new opcode and doesn't update opcodeSymbol).
diff --git a/src/ServerMain.cc b/src/ServerMain.cc
index d71ced5..177ff69 100644
--- a/src/ServerMain.cc
+++ b/src/ServerMain.cc
@@ -118,7 +118,7 @@ main(int argc, char *argv[])
const string localLocator = optionParser.options.getLocalLocator();
- InfRcTransport<>::setName(localLocator.c_str());
+ //InfRcTransport<>::setName(localLocator.c_str());
Context::get().transportManager->setTimeout(
optionParser.options.getTransportTimeout());
Context::get().transportManager->initialize(localLocator.c_str());
--
1.7.3.2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment