Skip to content

Instantly share code, notes, and snippets.

@dragnot
Created March 5, 2015 04:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dragnot/06ef5d01df8cb181555a to your computer and use it in GitHub Desktop.
Save dragnot/06ef5d01df8cb181555a to your computer and use it in GitHub Desktop.
package com.emc.benchmark.ycsb;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import org.msgpack.MessagePack;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Protocol;
import java.io.IOException;
import java.util.*;
/**
* Created by Guy on 1/19/2015.
*/
public class RedisClient extends DB {
protected Jedis jedis;
public static final String HOST_PROPERTY = "redis.host";
public static final String PORT_PROPERTY = "redis.port";
public static final String PASSWORD_PROPERTY = "redis.password";
//message pack library
final MessagePack messagePack = new MessagePack();
//lua script for value updates
public final String UPDATE_LUA = "local mv = redis.call('GET', KEYS[1])\n" +
"if not mv then return end\n" +
"\n" +
"local v = cmsgpack.unpack(mv)\n" +
"local args = #ARGV / 2\n" +
"\n" +
"for i=1, args do\n" +
" v[tonumber(ARGV[i*2-1])*2] = ARGV[i*2]\n" +
"end\n" +
"\n" +
"mv = cmsgpack.pack(v)\n" +
"redis.call('SET', KEYS[1], mv)\n" ;
// "\n" +
// "return args\n";
private final String FIELD_NAME = "field";
private static String lua_sh;
public void init() throws DBException {
Properties props = getProperties();
int port;
String portString = props.getProperty(PORT_PROPERTY);
if (portString != null) {
port = Integer.parseInt(portString);
}
else {
port = Protocol.DEFAULT_PORT;
}
String host = props.getProperty(HOST_PROPERTY);
//need to improve this and work with connection pool
jedis = new Jedis(host, port);
jedis.connect();
String password = props.getProperty(PASSWORD_PROPERTY);
if (password != null) {
jedis.auth(password);
}
//load update script
if ( lua_sh == null )
lua_sh = jedis.scriptLoad(UPDATE_LUA);
}
@Override
public int read(String table, String key, Set<String> fields, HashMap<String, ByteIterator> result) {
byte[] res = jedis.get(key.getBytes());
if ( res == null )
return 1;
else
{
try {
//unpack the results so they can be used in teh code
messagePack.read(res);
} catch (IOException e) {
e.printStackTrace();
}
}
return 0;
}
@Override
public int scan(String s, String s1, int i, Set<String> set, Vector<HashMap<String, ByteIterator>> vector) {
return 0;
}
@Override
public int update(String table, String key, HashMap<String, ByteIterator> values) {
//this list will serve as input variable to the lua script
List<String> luavalues = new ArrayList<>();
for (String tmpKey : values.keySet())
{
luavalues.add(tmpKey.replace(FIELD_NAME,""));
luavalues.add(values.get(tmpKey).toString());
}
jedis.evalsha(lua_sh, new ArrayList<>(Arrays.asList(key)), luavalues);
return 0;
}
@Override
public int insert(String table, String key, HashMap<String, ByteIterator> values) {
byte[] raw = new byte[0];
// Serialize
try {
//we will create a list of strings and place the field names in them
List<String> tmpKeys = new ArrayList<>();
List<String> packValues = new ArrayList<>();
for ( String tmpKey : values.keySet())
{
//tmpKeys.add(tmpKey.replace("field",""));
tmpKeys.add(tmpKey);
}
//we will sort the list by number
Collections.sort(tmpKeys);
for (String newKey : tmpKeys)
{
packValues.add(newKey.replace(FIELD_NAME,""));
packValues.add(values.get(newKey).toString());
}
raw = messagePack.write(packValues);
} catch (IOException e) {
e.printStackTrace();
}
if (jedis.set(key.getBytes(), raw).equals("OK")) {
return 0;
}
return 1;
}
@Override
public int delete(String table, String key) {
if (jedis.del(key) > 0)
return 0;
return 1;
}
public void cleanup() throws DBException {
jedis.disconnect();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment