Skip to content

Instantly share code, notes, and snippets.

@mishok13
Last active August 29, 2015 14:00
Show Gist options
  • Save mishok13/11141275 to your computer and use it in GitHub Desktop.
Save mishok13/11141275 to your computer and use it in GitHub Desktop.
package io.screen6.cascading.redis;
import java.io.IOException;
import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class RedisBaseScheme<Config, Intermediate, Value> extends Scheme<Config, Void, RedisSchemeCollector, Object[], Void> {
private static final Logger logger = LoggerFactory.getLogger(RedisBaseScheme.class);
RedisBaseScheme(Fields sinkFields) {
setSinkFields(sinkFields);
}
@Override
public boolean isSource() {
return false;
}
@Override
public void sourceConfInit(FlowProcess<Config> flowProcess, Tap<Config, Void, RedisSchemeCollector> output, Config conf) {}
@Override
public void sinkConfInit(FlowProcess<Config> flowProcess, Tap<Config, Void, RedisSchemeCollector> output, Config conf) {}
@Override
public boolean source( FlowProcess<Config> flowProcess, SourceCall<Object[], Void> voidSourceCall ) throws IOException {
throw new IllegalStateException("Can't be used as a source");
}
protected abstract Intermediate getIntermediate(TupleEntry tupleEntry);
protected abstract String getKey(Intermediate tupleEntry);
protected abstract Value getValue(Intermediate tupleEntry);
protected abstract String getCommand();
@Override
public void sink(FlowProcess<Config> flowProcess, SinkCall<Void, RedisSchemeCollector> sinkCall) throws IOException {
Intermediate entry = getIntermediate(sinkCall.getOutgoingEntry());
String key = getKey(entry);
Value value = getValue(entry);
String command = getCommand();
sinkCall.getOutput().collect(command, key, value);
}
}
...
import redis.clients.jedis.exceptions.JedisConnectionException;
public class RedisSchemeCollector<Config, Value> extends TupleEntrySchemeCollector<Config, TupleEntrySchemeCollector> {
...
public void collect(String command, String key, Value value) {
Jedis client = pool.getResource();
try {
client.set(key, (String) value);
} catch (JedisConnectionException exc) {
if (client != null) {
this.pool.returnBrokenResource(client);
}
} finally {
if (client != null) {
this.pool.returnResource(client);
}
}
}
}
package io.screen6.cascading.redis;
import cascading.flow.FlowProcess;
import cascading.tuple.TupleEntrySchemeCollector;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class RedisSchemeCollector<Config, Value> extends TupleEntrySchemeCollector<Config, TupleEntrySchemeCollector> {
private JedisPool pool;
public RedisSchemeCollector(FlowProcess<Config> flowProcess, RedisBaseScheme scheme, String hostname, int port, int db) {
super(flowProcess, scheme);
this.pool = new JedisPool(new JedisPoolConfig(), hostname, port);
setOutput(this);
}
@Override
public void close() {
this.pool.destroy();
}
}
package io.screen6.cascading.redis;
import java.io.IOException;
import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RedisScheme<Config> extends RedisBaseScheme<Config, TupleEntry, String> {
private static final Logger logger = LoggerFactory.getLogger(RedisScheme.class);
public static final String DEFAULT_KEY_DELIMITER = ":";
public static final String DEFAULT_VALUE_DELIMITER = ":";
public static final String DEFAULT_COMMAND = "set";
private Fields keyFields;
private Fields valueFields;
private String keyDelimiter = DEFAULT_KEY_DELIMITER;
private String valueDelimiter = DEFAULT_VALUE_DELIMITER;
private String command = DEFAULT_COMMAND;
public RedisScheme(Fields keyFields, Fields valueFields) {
this(keyFields, valueFields, DEFAULT_COMMAND);
}
public RedisScheme(Fields keyFields, Fields valueFields, String command) {
this(keyFields, valueFields, command, DEFAULT_KEY_DELIMITER, DEFAULT_VALUE_DELIMITER);
}
public RedisScheme(Fields keyFields, Fields valueFields, String command, String keyDelimiter, String valueDelimiter) {
super(Fields.merge(keyFields, valueFields));
this.keyFields = keyFields;
this.valueFields = valueFields;
this.command = command;
this.keyDelimiter = keyDelimiter;
this.valueDelimiter = valueDelimiter;
logger.debug("Created {}", this);
}
public String toString() {
return String.format("<RedisScheme %s; %s. Command: %s>", this.keyFields, this.valueFields, this.command);
}
@Override
protected TupleEntry getIntermediate(TupleEntry entry) {
logger.debug("Getting intermediate value of {}", entry);
return entry;
}
@Override
protected String getValue(TupleEntry entry) {
logger.debug("Getting value of {} with {}", entry, valueFields);
return entry.selectTuple(this.valueFields).toString(this.valueDelimiter, false);
}
@Override
protected String getKey(TupleEntry entry) {
logger.debug("Getting key of {} with {}", entry, keyFields);
return entry.selectTuple(this.keyFields).toString(this.keyDelimiter, false);
}
@Override
protected String getCommand() {
return this.command;
}
}
package io.screen6.cascading.redis;
import java.io.IOException;
import cascading.tap.SinkTap;
import cascading.flow.FlowProcess;
import cascading.tuple.TupleEntryCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RedisSinkTap<Config> extends SinkTap<Config, Object> {
private static final Logger logger = LoggerFactory.getLogger(RedisSinkTap.class);
private static final String HOSTNAME = "localhost";
private static final int PORT = 6379;
private static final int DATABASE = 0;
private String hostname;
private int port;
private int db;
public RedisSinkTap(RedisBaseScheme scheme) {
this(HOSTNAME, PORT, DATABASE, scheme);
}
public RedisSinkTap(String hostname, RedisBaseScheme scheme) {
this(hostname, PORT, DATABASE, scheme);
}
public RedisSinkTap(String hostname, int port, RedisBaseScheme scheme) {
this(hostname, port, DATABASE, scheme);
}
public RedisSinkTap(String hostname, int port, int db, RedisBaseScheme scheme) {
super(scheme);
logger.info("Creating tap at {}:{}@{}", hostname, port, db);
this.hostname = hostname;
this.port = port;
this.db = db;
}
public TupleEntryCollector openForWrite(FlowProcess<Config> flowProcess, Object output) throws IOException {
return new RedisSchemeCollector(flowProcess,
(RedisBaseScheme) getScheme(),
hostname,
port,
db);
}
@Override
public boolean deleteResource(Config conf) throws IOException {
return true;
}
@Override
public boolean createResource(Config conf) throws IOException {
return true;
}
@Override
public boolean resourceExists(Config conf) throws IOException {
return true;
}
@Override
public long getModifiedTime(Config conf) throws IOException {
// We are always assuming stale data in database
return 0;
}
@Override
public String getIdentifier() {
return String.format("redis://%s:%s@%s", this.hostname, this.port, this.db);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment