Last active
August 29, 2015 14:00
-
-
Save mishok13/11141275 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.screen6.cascading.redis; | |
import java.io.IOException; | |
import cascading.flow.FlowProcess; | |
import cascading.scheme.Scheme; | |
import cascading.scheme.SinkCall; | |
import cascading.scheme.SourceCall; | |
import cascading.tap.Tap; | |
import cascading.tuple.Fields; | |
import cascading.tuple.TupleEntry; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public abstract class RedisBaseScheme<Config, Intermediate, Value> extends Scheme<Config, Void, RedisSchemeCollector, Object[], Void> { | |
private static final Logger logger = LoggerFactory.getLogger(RedisBaseScheme.class); | |
RedisBaseScheme(Fields sinkFields) { | |
setSinkFields(sinkFields); | |
} | |
@Override | |
public boolean isSource() { | |
return false; | |
} | |
@Override | |
public void sourceConfInit(FlowProcess<Config> flowProcess, Tap<Config, Void, RedisSchemeCollector> output, Config conf) {} | |
@Override | |
public void sinkConfInit(FlowProcess<Config> flowProcess, Tap<Config, Void, RedisSchemeCollector> output, Config conf) {} | |
@Override | |
public boolean source( FlowProcess<Config> flowProcess, SourceCall<Object[], Void> voidSourceCall ) throws IOException { | |
throw new IllegalStateException("Can't be used as a source"); | |
} | |
protected abstract Intermediate getIntermediate(TupleEntry tupleEntry); | |
protected abstract String getKey(Intermediate tupleEntry); | |
protected abstract Value getValue(Intermediate tupleEntry); | |
protected abstract String getCommand(); | |
@Override | |
public void sink(FlowProcess<Config> flowProcess, SinkCall<Void, RedisSchemeCollector> sinkCall) throws IOException { | |
Intermediate entry = getIntermediate(sinkCall.getOutgoingEntry()); | |
String key = getKey(entry); | |
Value value = getValue(entry); | |
String command = getCommand(); | |
sinkCall.getOutput().collect(command, key, value); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
... | |
import redis.clients.jedis.exceptions.JedisConnectionException; | |
public class RedisSchemeCollector<Config, Value> extends TupleEntrySchemeCollector<Config, TupleEntrySchemeCollector> { | |
... | |
public void collect(String command, String key, Value value) { | |
Jedis client = pool.getResource(); | |
try { | |
client.set(key, (String) value); | |
} catch (JedisConnectionException exc) { | |
if (client != null) { | |
this.pool.returnBrokenResource(client); | |
} | |
} finally { | |
if (client != null) { | |
this.pool.returnResource(client); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.screen6.cascading.redis; | |
import cascading.flow.FlowProcess; | |
import cascading.tuple.TupleEntrySchemeCollector; | |
import redis.clients.jedis.Jedis; | |
import redis.clients.jedis.JedisPool; | |
import redis.clients.jedis.JedisPoolConfig; | |
public class RedisSchemeCollector<Config, Value> extends TupleEntrySchemeCollector<Config, TupleEntrySchemeCollector> { | |
private JedisPool pool; | |
public RedisSchemeCollector(FlowProcess<Config> flowProcess, RedisBaseScheme scheme, String hostname, int port, int db) { | |
super(flowProcess, scheme); | |
this.pool = new JedisPool(new JedisPoolConfig(), hostname, port); | |
setOutput(this); | |
} | |
@Override | |
public void close() { | |
this.pool.destroy(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.screen6.cascading.redis; | |
import java.io.IOException; | |
import cascading.flow.FlowProcess; | |
import cascading.scheme.Scheme; | |
import cascading.scheme.SinkCall; | |
import cascading.scheme.SourceCall; | |
import cascading.tap.Tap; | |
import cascading.tuple.Fields; | |
import cascading.tuple.TupleEntry; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class RedisScheme<Config> extends RedisBaseScheme<Config, TupleEntry, String> { | |
private static final Logger logger = LoggerFactory.getLogger(RedisScheme.class); | |
public static final String DEFAULT_KEY_DELIMITER = ":"; | |
public static final String DEFAULT_VALUE_DELIMITER = ":"; | |
public static final String DEFAULT_COMMAND = "set"; | |
private Fields keyFields; | |
private Fields valueFields; | |
private String keyDelimiter = DEFAULT_KEY_DELIMITER; | |
private String valueDelimiter = DEFAULT_VALUE_DELIMITER; | |
private String command = DEFAULT_COMMAND; | |
public RedisScheme(Fields keyFields, Fields valueFields) { | |
this(keyFields, valueFields, DEFAULT_COMMAND); | |
} | |
public RedisScheme(Fields keyFields, Fields valueFields, String command) { | |
this(keyFields, valueFields, command, DEFAULT_KEY_DELIMITER, DEFAULT_VALUE_DELIMITER); | |
} | |
public RedisScheme(Fields keyFields, Fields valueFields, String command, String keyDelimiter, String valueDelimiter) { | |
super(Fields.merge(keyFields, valueFields)); | |
this.keyFields = keyFields; | |
this.valueFields = valueFields; | |
this.command = command; | |
this.keyDelimiter = keyDelimiter; | |
this.valueDelimiter = valueDelimiter; | |
logger.debug("Created {}", this); | |
} | |
public String toString() { | |
return String.format("<RedisScheme %s; %s. Command: %s>", this.keyFields, this.valueFields, this.command); | |
} | |
@Override | |
protected TupleEntry getIntermediate(TupleEntry entry) { | |
logger.debug("Getting intermediate value of {}", entry); | |
return entry; | |
} | |
@Override | |
protected String getValue(TupleEntry entry) { | |
logger.debug("Getting value of {} with {}", entry, valueFields); | |
return entry.selectTuple(this.valueFields).toString(this.valueDelimiter, false); | |
} | |
@Override | |
protected String getKey(TupleEntry entry) { | |
logger.debug("Getting key of {} with {}", entry, keyFields); | |
return entry.selectTuple(this.keyFields).toString(this.keyDelimiter, false); | |
} | |
@Override | |
protected String getCommand() { | |
return this.command; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.screen6.cascading.redis; | |
import java.io.IOException; | |
import cascading.tap.SinkTap; | |
import cascading.flow.FlowProcess; | |
import cascading.tuple.TupleEntryCollector; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class RedisSinkTap<Config> extends SinkTap<Config, Object> { | |
private static final Logger logger = LoggerFactory.getLogger(RedisSinkTap.class); | |
private static final String HOSTNAME = "localhost"; | |
private static final int PORT = 6379; | |
private static final int DATABASE = 0; | |
private String hostname; | |
private int port; | |
private int db; | |
public RedisSinkTap(RedisBaseScheme scheme) { | |
this(HOSTNAME, PORT, DATABASE, scheme); | |
} | |
public RedisSinkTap(String hostname, RedisBaseScheme scheme) { | |
this(hostname, PORT, DATABASE, scheme); | |
} | |
public RedisSinkTap(String hostname, int port, RedisBaseScheme scheme) { | |
this(hostname, port, DATABASE, scheme); | |
} | |
public RedisSinkTap(String hostname, int port, int db, RedisBaseScheme scheme) { | |
super(scheme); | |
logger.info("Creating tap at {}:{}@{}", hostname, port, db); | |
this.hostname = hostname; | |
this.port = port; | |
this.db = db; | |
} | |
public TupleEntryCollector openForWrite(FlowProcess<Config> flowProcess, Object output) throws IOException { | |
return new RedisSchemeCollector(flowProcess, | |
(RedisBaseScheme) getScheme(), | |
hostname, | |
port, | |
db); | |
} | |
@Override | |
public boolean deleteResource(Config conf) throws IOException { | |
return true; | |
} | |
@Override | |
public boolean createResource(Config conf) throws IOException { | |
return true; | |
} | |
@Override | |
public boolean resourceExists(Config conf) throws IOException { | |
return true; | |
} | |
@Override | |
public long getModifiedTime(Config conf) throws IOException { | |
// We are always assuming stale data in database | |
return 0; | |
} | |
@Override | |
public String getIdentifier() { | |
return String.format("redis://%s:%s@%s", this.hostname, this.port, this.db); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment