Skip to content

Instantly share code, notes, and snippets.

@jsmorph
Last active December 19, 2015 02:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jsmorph/5881142 to your computer and use it in GitHub Desktop.
Save jsmorph/5881142 to your computer and use it in GitHub Desktop.
Accumulo mutation hook that publishes mutations to Redis
package org.apache.accumulo.examples.simple.constraints;
import java.util.Collections;
import java.util.List;
import org.apache.accumulo.core.constraints.Constraint;
import org.apache.accumulo.core.data.Mutation;
public class MutationHook implements Constraint {
static final List<Short> empty = Collections.emptyList();
@Override
public String getViolationDescription(short violationCode) {
return "Just a hook.";
}
@Override
public List<Short> check(Environment env, Mutation mutation) {
return empty;
}
}
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* An Accumulo Constraint that is really a hook to publish mutations to Redis.
*
* Example:
*
* Start up Redis (wherever it needs to be -- see below), and start 'redis-cli':
*
* subscribe accumulo
*
* Then at the Accumulo shell:
*
* createtable foo
* constraint -a org.apache.accumulo.examples.simple.constraints.RedisPub
* insert b c x d
* insert b c x e
* insert b c x f
* delete b c x
*
* From redis-cli, you should see
*
* 1) "message"
* 2) "accumulo"
* 3) "800162090163017800000001640100"
* 1) "message"
* 2) "accumulo"
* 3) "800162090163017800000001650100"
* 1) "message"
* 2) "accumulo"
* 3) "800162090163017800000001660100"
* 1) "message"
* 2) "accumulo"
* 3) "8001620801630178000001000100"
* 1) "message"
* 2) "accumulo"
* 3) "8001620801630178000001000100"
*
* You can deserialize those mutations with 'deserialize(unhex())' below.
*
* Plenty of ToDo's in the code below.
*
* To compile and run, put
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.0.0</version>
</dependency>
* and maybe
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.5.5</version>
</dependency>
* in 'examples/simple/pom.xml'.
*
*/
package org.apache.accumulo.examples.simple.constraints;
import java.util.Collections;
import java.util.List;
import org.apache.accumulo.core.constraints.Constraint;
import org.apache.accumulo.core.data.Mutation;
import java.io.DataOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.ByteArrayInputStream;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class RedisPub extends MutationHook {
// ToDo: Parameterize somehow.
// ToDo: Parameters: redisHost, redisPort, channel, timeouts, etc.
// ToDo: Expose many JedisPoolConfig properties.
// ToDo: Use Zookeeper to supply these parameters.
protected static final String redisHost = "localhost";
protected static final String channel = "accumulo"; // And host/server name?
// Do an expensive serialization test before each pub.
protected static boolean paranoid = true;
protected JedisPool pool = null;
protected synchronized JedisPool ensure () {
if (pool == null) {
pool = new JedisPool(new JedisPoolConfig(), redisHost);
}
return pool;
}
// Maybe use Thrift?
public static byte[] serialize (final Mutation m) throws java.io.IOException {
// Don't bother with try/close.
final ByteArrayOutputStream bs = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(bs);
m.write(out);
out.close();
return bs.toByteArray();
}
public static Mutation deserialize (final byte[] bs) throws java.io.IOException {
// Don't bother with try/close.
final Mutation m = new Mutation();
final DataInputStream in = new DataInputStream(new ByteArrayInputStream(bs));
m.readFields(in);
in.close();
return m;
}
public static String hex (final byte[] bs) {
return javax.xml.bind.DatatypeConverter.printHexBinary(bs);
}
public static byte[] unhex (final String s) {
return javax.xml.bind.DatatypeConverter.parseHexBinary(s);
}
// Test serialization.
public static boolean serialized (final Mutation m) throws java.io.IOException {
return m.equals(deserialize(unhex(hex(serialize(m)))));
}
@Override
public List<Short> check(Environment env, Mutation mutation) {
try {
final String s = hex(serialize(mutation));
if (paranoid && !serialized(mutation)) {
throw new IllegalStateException("Failed to serialize mutation: " + s);
}
final Jedis jedis = ensure().getResource();
try {
jedis.publish(channel, s);
return empty;
} finally {
pool.returnResource(jedis);
}
} catch (java.io.IOException e) {
throw new RuntimeException(e);
}
}
}
// Local Variables:
// c-basic-offset: 2
// indent-tabs-mode: nil
// End:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment