Created
September 10, 2012 17:58
-
-
Save amattn/3692552 to your computer and use it in GitHub Desktop.
Rough draft of a golang Riak driver
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Package riak is an opinionated driver for the Riak datastore | |
// Developed against Riak 1.2 | |
// | |
// Design decisions were made to prioritize safety over absolute ease of use. | |
// This is a work in progress, but could be good starting point for your own custom driver | |
// Eventually, there will be a real repo. | |
// Also on the roadmap is protocol buffer support, multi-node clusters, better sibling | |
// resolution and even client-side CRDTs | |
// | |
// If you have any feedback, please let me know. | |
package riak | |
import ( | |
"bytes" | |
"fmt" | |
"io/ioutil" | |
"log" | |
"net/http" | |
"strings" | |
) | |
const HTTPHeaderKeyContentType = "Content-Type" | |
const HTTPHeaderKeyETag = "ETag" | |
const RiakHeaderKeyVectorClock = "X-Riak-Vclock" | |
// Cluster is a "write-once" struct. After being created with MakeCluster, it | |
// can be cheaply passed around and used safely accross threads/channels. | |
type Cluster struct { | |
scheme string | |
address string | |
port uint16 | |
} | |
// scheme must be one of http or https | |
func MakeCluster(scheme string, address string, port uint16) (clusterToReturn Cluster, err error) { | |
if scheme == "http" || scheme == "https" { | |
clusterToReturn = Cluster{scheme, address, port} | |
} else { | |
err = MakeRiakError(92275497, "MakeCluster invalid scheme, expected http|https, got"+scheme, nil) | |
} | |
return | |
} | |
// abstract base struct, only used as an anonymous field | |
// make sure to create ExtraHeaders = make(http.Header) | |
// before using it. | |
type RiakRequest struct { | |
Parameters map[string]string // url query paramters | |
Header http.Header // http headers | |
} | |
func (rReq *RiakRequest) ContentType() string { | |
return rReq.Header.Get(HTTPHeaderKeyContentType) | |
} | |
func (rReq *RiakRequest) SetContentType(value string) { | |
rReq.Header.Set(HTTPHeaderKeyContentType, value) | |
} | |
func (rReq *RiakRequest) VectorClock() string { | |
return rReq.Header.Get(RiakHeaderKeyVectorClock) | |
} | |
func (rReq *RiakRequest) SetVectorClock(value string) { | |
rReq.Header.Set(RiakHeaderKeyVectorClock, value) | |
} | |
// Abstract base struct, only used as an anonymous field | |
// A typically riak response does not have a traditional success/fail result | |
// The DidSucceed paramater is a loosely defined summary or general suggestion/ | |
// Error may be set even when DidSucceed is true. | |
// For example, a Fetch operation 200 and 304 are clear successes, but 300,404 | |
// are "kinda" successes. 300 won't return an error, but usually requires | |
// sibling resolution on the part of the client. 404 will return an error, | |
// even though DidSucceed is set to true. | |
// | |
// Typically, a client will check DidSucceed first. If DidSucceed is false, | |
// usually the error is passed on to the error handler. When DidSucced is true, | |
// either the body is processed or the client does a switch on StatusCode to figure | |
// out which success path needs to be processed. | |
type RiakResponse struct { | |
Status string // e.g. "200 OK" | |
StatusCode int // e.g. 200 | |
DidSucceed bool | |
Error error | |
Header http.Header // http headers | |
BodyContent []byte | |
} | |
func (rRsp *RiakResponse) ContentType() string { | |
return rRsp.Header.Get(HTTPHeaderKeyContentType) | |
} | |
func (rRsp *RiakResponse) ETag() string { | |
return rRsp.Header.Get(HTTPHeaderKeyETag) | |
} | |
func (rRsp *RiakResponse) VectorClock() string { | |
return rRsp.Header.Get(RiakHeaderKeyVectorClock) | |
} | |
// List Keys | |
// http://wiki.basho.com/HTTP-List-Keys.html | |
type RiakAllKeysRequest struct { | |
RiakRequest | |
Bucket string | |
} | |
type RiakAllKeysResponse struct { | |
RiakResponse | |
BodyContent []byte | |
} | |
func NewAllKeysRequest() (req *RiakAllKeysRequest) { | |
req = new(RiakAllKeysRequest) | |
req.Parameters = make(map[string]string) | |
req.Header = make(http.Header) | |
return | |
} | |
// Set Bucket Properties | |
// http://wiki.basho.com/HTTP-Set-Bucket-Properties.html | |
type RiakSetBucketPropertiesRequest struct { | |
RiakRequest | |
Bucket string | |
BodyContent []byte | |
} | |
type RiakSetBucketPropertiesResponse struct { | |
RiakResponse | |
BodyContent []byte | |
} | |
func NewSetBucketPropertiesRequest() (req *RiakSetBucketPropertiesRequest) { | |
req = new(RiakSetBucketPropertiesRequest) | |
req.Parameters = make(map[string]string) | |
req.Header = make(http.Header) | |
return | |
} | |
// Fetch Object | |
// http://wiki.basho.com/HTTP-Fetch-Object.html | |
type RiakFetchObjectRequest struct { | |
RiakRequest | |
Bucket string | |
Key string | |
AcceptMultipartMixed bool // allow multipart sibling results | |
} | |
type RiakFetchObjectResponse struct { | |
RiakResponse | |
Mixed []MultipartMixedContent | |
} | |
type MultipartMixedContent struct { | |
Headers map[string]string | |
BodyContent []byte | |
} | |
func NewFetchObjectRequest() (req *RiakFetchObjectRequest) { | |
req = new(RiakFetchObjectRequest) | |
req.Parameters = make(map[string]string) | |
req.Header = make(http.Header) | |
return | |
} | |
// Store Object | |
// http://wiki.basho.com/HTTP-Store-Object.html | |
type RiakStoreObjectRequest struct { | |
RiakRequest | |
Bucket string | |
Key string // technically key is optional, but we don't support that yet. | |
BodyContent []byte | |
ReturnBody bool // if set to true, Value | |
} | |
type RiakStoreObjectResponse struct { | |
RiakResponse | |
BodyContent []byte | |
} | |
func NewStoreObjectRequest() (req *RiakStoreObjectRequest) { | |
req = new(RiakStoreObjectRequest) | |
req.Parameters = make(map[string]string) | |
req.Header = make(http.Header) | |
return | |
} | |
// Delete Object | |
// http://wiki.basho.com/HTTP-Delete-Object.html | |
type RiakDeleteObjectRequest struct { | |
RiakRequest | |
Bucket string | |
Key string | |
} | |
type RiakDeleteObjectResponse struct { | |
RiakResponse | |
} | |
func NewDeleteObjectRequest() (req *RiakDeleteObjectRequest) { | |
req = new(RiakDeleteObjectRequest) | |
req.Parameters = make(map[string]string) | |
req.Header = make(http.Header) | |
return | |
} | |
// Map Reduce | |
// NOT FULLY IMPLEMENTED YET | |
// http://wiki.basho.com/HTTP-MapReduce.html | |
type RiakMapReduceRequest struct { | |
RiakRequest | |
Inputs []string // list of buckets | |
Queries []map[string]string // list of query phases. each phase has a string type | |
} | |
type RiakMapReduceResponse struct { | |
RiakResponse | |
BodyContent []byte | |
} | |
func NewMapReduceRequest() (req *RiakMapReduceRequest) { | |
req = new(RiakMapReduceRequest) | |
req.Parameters = make(map[string]string) | |
req.Header = make(http.Header) | |
req.Inputs = make([]string, 0) | |
req.Queries = make([]map[string]string, 0) | |
return | |
} | |
// Ping | |
// http://wiki.basho.com/HTTP-Ping.html | |
type RiakPingRequest struct { | |
RiakRequest | |
} | |
type RiakPingResponse struct { | |
RiakResponse | |
BodyContent []byte | |
} | |
func NewPingRequest() (req *RiakPingRequest) { | |
req = new(RiakPingRequest) | |
req.Parameters = make(map[string]string) | |
req.Header = make(http.Header) | |
return | |
} | |
// ******************************************************************************** | |
// ******************************************************************************** | |
// ******************************************************************************** | |
// ******************************************************************************** | |
// ******************************************************************************** | |
// List Keys | |
// ******************************************************************************** | |
func (cluster *Cluster) AllKeys(clusterRequestPtr *RiakAllKeysRequest) (clusterResponsePtr *RiakAllKeysResponse) { | |
clusterResponsePtr = new(RiakAllKeysResponse) | |
if clusterRequestPtr == nil { | |
clusterResponsePtr.Error = MakeRiakError(2737046061, "AllKeys expected non-nil request pointer", nil) | |
return | |
} | |
urlStr := buildURLForCluster(cluster) | |
urlStr = urlStr + "/buckets/" + clusterRequestPtr.Bucket + "/keys?keys=true" | |
httpRequestPtr, err := http.NewRequest("GET", urlStr, nil) | |
for key, value := range clusterRequestPtr.Header { | |
httpRequestPtr.Header[key] = value | |
} | |
if len(clusterRequestPtr.ContentType()) == 0 { | |
httpRequestPtr.Header.Set(HTTPHeaderKeyContentType, "text/plain") | |
} | |
httpClient := http.DefaultClient | |
httpResponse, err := httpClient.Do(httpRequestPtr) | |
if err != nil { | |
// handle error | |
clusterResponsePtr.Error = err | |
return | |
} | |
defer httpResponse.Body.Close() | |
clusterResponsePtr.Status = httpResponse.Status | |
clusterResponsePtr.StatusCode = httpResponse.StatusCode | |
clusterResponsePtr.Header = httpResponse.Header | |
if clusterResponsePtr.StatusCode == 200 { | |
clusterResponsePtr.DidSucceed = true | |
body, err := ioutil.ReadAll(httpResponse.Body) | |
if err != nil { | |
clusterResponsePtr.Error = err | |
return | |
} | |
clusterResponsePtr.BodyContent = body | |
} | |
return | |
} | |
// ******************************************************************************** | |
// SetBucketProperties | |
// ******************************************************************************** | |
func (cluster *Cluster) SetBucketProperties(clusterRequestPtr *RiakSetBucketPropertiesRequest) (clusterResponsePtr *RiakSetBucketPropertiesResponse) { | |
clusterResponsePtr = new(RiakSetBucketPropertiesResponse) | |
if clusterRequestPtr == nil { | |
clusterResponsePtr.Error = MakeRiakError(2737046061, "SetBucketProperties expected non-nil request pointer", nil) | |
return | |
} | |
urlStr := buildURLForClusterBucket(cluster, clusterRequestPtr.Bucket) | |
urlStr = urlStr + "/props" | |
// log.Println("SetBucketProperties riak urlStr", urlStr) | |
byteReadBuffer := bytes.NewBuffer(clusterRequestPtr.BodyContent) | |
httpClient := http.DefaultClient | |
httpRequestPtr, err := http.NewRequest("PUT", urlStr, byteReadBuffer) | |
for key, value := range clusterRequestPtr.Header { | |
httpRequestPtr.Header[key] = value | |
} | |
if len(clusterRequestPtr.ContentType()) == 0 { | |
httpRequestPtr.Header.Set(HTTPHeaderKeyContentType, "application/json") | |
} | |
httpResponse, err := httpClient.Do(httpRequestPtr) | |
if err != nil { | |
clusterResponsePtr.Error = err | |
return | |
} | |
defer httpResponse.Body.Close() | |
clusterResponsePtr.Status = httpResponse.Status | |
clusterResponsePtr.StatusCode = httpResponse.StatusCode | |
clusterResponsePtr.Header = httpResponse.Header | |
// we expect a 204, no content here... | |
if clusterResponsePtr.StatusCode == 204 { | |
clusterResponsePtr.DidSucceed = true | |
} else { | |
clusterResponsePtr.DidSucceed = false | |
switch clusterResponsePtr.StatusCode { | |
case 400: | |
clusterResponsePtr.Error = MakeRiakError(3141343314, "Failed to set bucket properites, Bad Request, submitted JSON is invalid", nil) | |
case 415: | |
clusterResponsePtr.Error = MakeRiakError(3141343314, "Failed to set bucket properites, Unsupported Media Type, Content-Type was not set to application/json", nil) | |
default: | |
clusterResponsePtr.Error = MakeRiakError(3141343314, "Failed to set bucket properites, unknown error", nil) | |
} | |
log.Println("SaveValue clusterResponse, unexpected status", clusterResponsePtr) | |
} | |
return | |
} | |
// ******************************************************************************** | |
// FetchObject | |
// ******************************************************************************** | |
func (cluster *Cluster) FetchObject(fetchRequestPtr *RiakFetchObjectRequest) (clusterResponsePtr *RiakFetchObjectResponse) { | |
clusterResponsePtr = new(RiakFetchObjectResponse) | |
if fetchRequestPtr == nil { | |
clusterResponsePtr.Error = MakeRiakError(2737046061, "FetchObject expected non-nil request pointer", nil) | |
return | |
} | |
if len(fetchRequestPtr.Bucket) == 0 || len(fetchRequestPtr.Key) == 0 { | |
clusterResponsePtr.Error = MakeRiakError(2221401309, "FetchObject incomplete fetchRequestPtr", nil) | |
return | |
} | |
urlStr := buildURLForClusterBucketKey(cluster, fetchRequestPtr.Bucket, fetchRequestPtr.Key) | |
urlStr = appendParametersToURL(fetchRequestPtr.Parameters, urlStr) | |
// log.Println("FetchObject:urlStr", urlStr) | |
httpClient := http.DefaultClient | |
httpRequestPtr, err := http.NewRequest("GET", urlStr, nil) | |
for key, value := range fetchRequestPtr.Header { | |
httpRequestPtr.Header[key] = value | |
} | |
// httpRequestPtr.Header | |
if fetchRequestPtr.AcceptMultipartMixed { | |
httpRequestPtr.Header.Add("Accept", "multipart/mixed") | |
httpRequestPtr.Header.Add("Accept", "*/*") | |
} | |
httpResponse, err := httpClient.Do(httpRequestPtr) | |
if err != nil { | |
// handle error | |
clusterResponsePtr.Error = err | |
return | |
} | |
defer httpResponse.Body.Close() | |
clusterResponsePtr.Status = httpResponse.Status | |
clusterResponsePtr.StatusCode = httpResponse.StatusCode | |
clusterResponsePtr.Header = httpResponse.Header | |
getBody := func() { | |
clusterResponsePtr.DidSucceed = true | |
body, err := ioutil.ReadAll(httpResponse.Body) | |
if err != nil { | |
clusterResponsePtr.Error = err | |
return | |
} | |
clusterResponsePtr.BodyContent = body | |
} | |
switch clusterResponsePtr.StatusCode { | |
case 200: // OK | |
getBody() | |
case 300: // Multiple Choices | |
getBody() | |
err := parseMultipartMixed(clusterResponsePtr) | |
if err != nil { | |
clusterResponsePtr.Error = MakeRiakError(4078424640, "300 multiple choices, Unknown error returned from parseMultipartMixed", err) | |
} | |
case 304: // not modified (when using conditional request semantics) | |
clusterResponsePtr.DidSucceed = true | |
case 404: // Not Found - the object could not be found on enough partitions | |
// In practice, 404 is a "successful" result... | |
clusterResponsePtr.DidSucceed = true | |
clusterResponsePtr.Error = MakeRiakError(4078424641, "404 Not Found", err) | |
case 400: // Bad Request - e.g. when r parameter is invalid (> N) | |
clusterResponsePtr.DidSucceed = false | |
clusterResponsePtr.Error = MakeRiakError(4078424642, "400 Bad Request", err) | |
case 406: | |
clusterResponsePtr.DidSucceed = false | |
clusterResponsePtr.Error = MakeRiakError(4078424643, "406 Not acceptable", err) | |
case 503: // Service Unavailable - the request timed out internally | |
clusterResponsePtr.DidSucceed = false | |
clusterResponsePtr.Error = MakeRiakError(4078424644, "503 Service Unavailable - the request timed out internally", err) | |
default: | |
clusterResponsePtr.DidSucceed = false | |
clusterResponsePtr.Error = MakeRiakError(4078424645, fmt.Sprintf("%d", clusterResponsePtr.StatusCode)+" Unknown error during fetch object", err) | |
} | |
return | |
} | |
func parseMultipartMixed(fetchResponsePtr *RiakFetchObjectResponse) (err error) { | |
// defer util.Un(util.Trace("parseMultipartMixed")) | |
contentTypeParts := strings.Split(fetchResponsePtr.ContentType(), "boundary=") | |
if len(contentTypeParts) != 2 { | |
log.Println("fetchResponsePtr.ContentType", fetchResponsePtr.ContentType()) // error only | |
log.Println("fetchResponsePtr", fetchResponsePtr) // error only | |
return MakeRiakError(1908241166, "Cannot parse multipart boundary", nil) | |
} | |
var crlf []byte = []byte{'\r', '\n'} | |
var crlfcrlf []byte = []byte{'\r', '\n', '\r', '\n'} | |
boundary := string(crlf) + "--" + contentTypeParts[1] + string(crlf) | |
endMarker := string(crlf) + "--" + contentTypeParts[1] + "--" + string(crlf) | |
parts := bytes.Split(fetchResponsePtr.BodyContent, []byte(boundary)) | |
allMultipartContent := make([]MultipartMixedContent, 0, len(parts)) | |
for i, part := range parts { | |
// log.Println("part", part) | |
// log.Println("string(part)", string(part)) | |
bodyStart := bytes.Index(part, crlfcrlf) | |
if bodyStart > 0 { | |
var multipartContent MultipartMixedContent | |
multipartContent.Headers = make(map[string]string) | |
partHeader := part[:bodyStart] | |
partBody := part[bodyStart+len(crlfcrlf):] | |
if i == len(parts)-1 { | |
if bytes.HasSuffix(partBody, []byte(endMarker)) { | |
partBody = partBody[:len(partBody)-len(endMarker)] | |
} | |
} | |
// parse header | |
for _, headerLine := range bytes.Split(partHeader, crlf) { | |
headerParts := strings.SplitN(string(headerLine), ": ", 2) | |
multipartContent.Headers[headerParts[0]] = headerParts[1] | |
} | |
multipartContent.BodyContent = partBody | |
allMultipartContent = append(allMultipartContent, multipartContent) | |
} | |
} | |
fetchResponsePtr.Mixed = allMultipartContent | |
return MakeRiakError(1908241166, "TODO", nil) | |
} | |
// ******************************************************************************** | |
// StoreObject | |
// ******************************************************************************** | |
func (cluster *Cluster) StoreObject(clusterRequestPtr *RiakStoreObjectRequest) (clusterResponsePtr *RiakStoreObjectResponse) { | |
// defer util.Un(util.Trace("StoreObject")) | |
clusterResponsePtr = new(RiakStoreObjectResponse) | |
if clusterRequestPtr == nil { | |
clusterResponsePtr.Error = MakeRiakError(2737046061, "StoreObject expected non-nil request pointer", nil) | |
return | |
} | |
if len(clusterRequestPtr.Bucket) == 0 || len(clusterRequestPtr.Key) == 0 { | |
clusterResponsePtr.Error = MakeRiakError(1606074292, "StoreObject incomplete clusterRequestPtr"+"\n"+fmt.Sprintf("%v", clusterRequestPtr), nil) | |
return | |
} | |
urlStr := buildURLForClusterBucketKey(cluster, clusterRequestPtr.Bucket, clusterRequestPtr.Key) | |
if clusterRequestPtr.ReturnBody { | |
clusterRequestPtr.Parameters["returnbody"] = "true" | |
} | |
urlStr = appendParametersToURL(clusterRequestPtr.Parameters, urlStr) | |
// log.Println("StoreObject:urlStr", urlStr) | |
byteReadBuffer := bytes.NewBuffer(clusterRequestPtr.BodyContent) | |
httpClient := http.DefaultClient | |
httpRequestPtr, err := http.NewRequest("PUT", urlStr, byteReadBuffer) | |
for key, value := range clusterRequestPtr.Header { | |
httpRequestPtr.Header[key] = value | |
} | |
// make sure content type is set | |
if len(clusterRequestPtr.ContentType()) == 0 { | |
httpRequestPtr.Header.Set(HTTPHeaderKeyContentType, "text/plain") | |
} | |
if len(clusterRequestPtr.VectorClock()) > 0 && clusterRequestPtr.VectorClock() != "0" { | |
httpRequestPtr.Header.Set(RiakHeaderKeyVectorClock, clusterRequestPtr.VectorClock()) | |
} else { | |
httpRequestPtr.Header.Del(RiakHeaderKeyVectorClock) | |
} | |
httpResponse, err := httpClient.Do(httpRequestPtr) | |
if err != nil && httpResponse == nil { | |
msg := fmt.Sprintf("Unexpected error returned from httpClient.Do()\nhttpRequestPtr: %v\nhttpResponse:%v", httpRequestPtr, httpResponse) | |
clusterResponsePtr.Error = MakeRiakError(814750928, msg, err) | |
return | |
} | |
defer httpResponse.Body.Close() | |
clusterResponsePtr.Status = httpResponse.Status | |
clusterResponsePtr.StatusCode = httpResponse.StatusCode | |
clusterResponsePtr.Header = httpResponse.Header | |
switch clusterResponsePtr.StatusCode { | |
case 201: | |
fallthrough | |
case 200: | |
fallthrough | |
case 204: | |
fallthrough | |
case 300: | |
clusterResponsePtr.DidSucceed = true | |
if clusterRequestPtr.ReturnBody { | |
body, err := ioutil.ReadAll(httpResponse.Body) | |
if err != nil { | |
clusterResponsePtr.Error = MakeRiakError(3859484060, "Error reading response body content", err) | |
return | |
} | |
clusterResponsePtr.BodyContent = body | |
} | |
case 400: | |
clusterResponsePtr.DidSucceed = false | |
clusterResponsePtr.Error = MakeRiakError(3859484060, "400 Bad Request", err) | |
case 406: | |
clusterResponsePtr.DidSucceed = false | |
clusterResponsePtr.Error = MakeRiakError(3859484060, "406 Not acceptable", err) | |
case 412: | |
clusterResponsePtr.DidSucceed = false | |
clusterResponsePtr.Error = MakeRiakError(3859484060, "412 Precondition Failed, one of the conditional request headers failed to match", err) | |
case 500: | |
clusterResponsePtr.DidSucceed = false | |
clusterResponsePtr.Error = MakeRiakError(3859484060, "500", err) | |
body, err := ioutil.ReadAll(httpResponse.Body) | |
if err != nil { | |
clusterResponsePtr.Error = MakeRiakError(3859484060, "Error reading response body content", err) | |
return | |
} | |
clusterResponsePtr.BodyContent = body | |
default: | |
clusterResponsePtr.DidSucceed = false | |
clusterResponsePtr.Error = MakeRiakError(3859484060, fmt.Sprintf("%d", clusterResponsePtr.StatusCode)+"Unknown error during store object", err) | |
} | |
return | |
} | |
// ******************************************************************************** | |
// DeleteObject | |
// ******************************************************************************** | |
func (cluster *Cluster) DeleteObject(clusterRequestPtr *RiakDeleteObjectRequest) (clusterResponsePtr *RiakDeleteObjectResponse) { | |
clusterResponsePtr = new(RiakDeleteObjectResponse) | |
if clusterRequestPtr == nil { | |
clusterResponsePtr.Error = MakeRiakError(2737046061, "DeleteObject expected non-nil request pointer", nil) | |
return | |
} | |
if len(clusterRequestPtr.Bucket) == 0 || len(clusterRequestPtr.Key) == 0 { | |
clusterResponsePtr.Error = MakeRiakError(6415850092, | |
"DeleteObject incomplete clusterRequestPtr Bucket:"+clusterRequestPtr.Bucket+" Key:"+clusterRequestPtr.Key, | |
nil) | |
return | |
} | |
urlStr := buildURLForClusterBucketKey(cluster, clusterRequestPtr.Bucket, clusterRequestPtr.Key) | |
// log.Println("DeleteObject", urlStr) | |
httpClient := http.DefaultClient | |
httpRequestPtr, err := http.NewRequest("DELETE", urlStr, nil) | |
httpResponse, err := httpClient.Do(httpRequestPtr) | |
if err != nil { | |
clusterResponsePtr.Error = err | |
return | |
} | |
defer httpResponse.Body.Close() | |
clusterResponsePtr.Status = httpResponse.Status | |
clusterResponsePtr.StatusCode = httpResponse.StatusCode | |
clusterResponsePtr.Header = httpResponse.Header | |
// we expect a 204 when object was found and deleted | |
// or a 404 when object was not found | |
switch clusterResponsePtr.StatusCode { | |
case 204: | |
clusterResponsePtr.DidSucceed = true | |
case 404: | |
clusterResponsePtr.DidSucceed = true | |
case 405: | |
clusterResponsePtr.DidSucceed = false | |
clusterResponsePtr.Error = MakeRiakError(3526715237, clusterResponsePtr.Status+"\n"+urlStr, err) | |
case 400: | |
clusterResponsePtr.DidSucceed = false | |
clusterResponsePtr.Error = MakeRiakError(3526715238, clusterResponsePtr.Status+"\n"+urlStr, err) | |
default: | |
clusterResponsePtr.DidSucceed = false | |
clusterResponsePtr.Error = MakeRiakError( | |
3526715239, | |
"Unknown error during delete object: "+clusterResponsePtr.Status+"\n"+urlStr, | |
err, | |
) | |
} | |
return | |
} | |
// ******************************************************************************** | |
// MapReduce | |
// ******************************************************************************** | |
func (cluster *Cluster) MapReduce(clusterRequestPtr *RiakMapReduceRequest) (clusterResponsePtr *RiakMapReduceResponse) { | |
clusterResponsePtr = new(RiakMapReduceResponse) | |
if clusterRequestPtr == nil { | |
clusterResponsePtr.Error = MakeRiakError(2737046061, "MapReduce expected non-nil request pointer", nil) | |
return | |
} | |
urlStr := buildURLForCluster(cluster) | |
urlStr = urlStr + "/mapred" | |
log.Println("MapReduce Riak urlStr", urlStr) | |
clusterRequestPtr.SetContentType("application/json") | |
clusterResponsePtr.Error = MakeRiakError(4178877316, "MapReduce TODO", nil) | |
return | |
} | |
// ******************************************************************************** | |
// Ping | |
// ******************************************************************************** | |
func (cluster *Cluster) Ping(RiakPingRequest) (clusterResponse RiakPingResponse) { | |
urlStr := buildURLForCluster(cluster) | |
urlStr = urlStr + "/ping" | |
// log.Println("Ping riak urlStr", urlStr) | |
httpResponse, err := http.Get(urlStr) | |
if err != nil { | |
// handle error | |
clusterResponse.Error = err | |
return | |
} | |
defer httpResponse.Body.Close() | |
clusterResponse.Status = httpResponse.Status | |
clusterResponse.StatusCode = httpResponse.StatusCode | |
clusterResponse.Header = httpResponse.Header | |
switch clusterResponse.StatusCode { | |
case 200: | |
clusterResponse.DidSucceed = true | |
body, err := ioutil.ReadAll(httpResponse.Body) | |
if err != nil { | |
clusterResponse.Error = err | |
return | |
} | |
clusterResponse.BodyContent = body | |
default: | |
clusterResponse.DidSucceed = false | |
clusterResponse.Error = MakeRiakError(297128426, "Ping Failed, unknown error", err) | |
} | |
return | |
} | |
// ******************************************************************************** | |
// Utilities | |
// ******************************************************************************** | |
// URL Building functions | |
func buildURLForCluster(cluster *Cluster) (urlToReturn string) { | |
urlToReturn = fmt.Sprintf("%s://%s:%d", cluster.scheme, cluster.address, cluster.port) | |
return | |
} | |
func buildURLForClusterBucket(cluster *Cluster, bucket string) (urlToReturn string) { | |
urlToReturn = fmt.Sprintf("%s/buckets/%s", buildURLForCluster(cluster), bucket) | |
return | |
} | |
func buildURLForClusterBucketKey(cluster *Cluster, bucket string, key string) (urlToReturn string) { | |
slash := "/" | |
if len(key) == 0 { | |
slash = "" | |
} | |
urlToReturn = fmt.Sprintf("%s/keys%s%s", buildURLForClusterBucket(cluster, bucket), slash, key) | |
return | |
} | |
func appendParametersToURL(parameters map[string]string, urlStr string) string { | |
parametersStr := "" | |
for key, value := range parameters { | |
if len(parametersStr) > 0 { | |
parametersStr = fmt.Sprintf("%s&") | |
} | |
parametersStr = fmt.Sprintf("%s%s=%s", parametersStr, key, value) | |
} | |
if len(parametersStr) > 0 { | |
urlStr = fmt.Sprintf("%s?%s", urlStr, parametersStr) | |
} | |
return urlStr | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment