Skip to content

Instantly share code, notes, and snippets.

@afex
Created March 4, 2010 00:36
Show Gist options
  • Save afex/321256 to your computer and use it in GitHub Desktop.
Save afex/321256 to your computer and use it in GitHub Desktop.
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.pig.tutorial;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.SortedMap;
import org.apache.cassandra.db.*;
import org.apache.cassandra.hadoop.*;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.LoadFunc;
/*
raw = LOAD 'col1,col2' USING org.apache.pig.tutorial.CassandraLoader('keyspace/column_family') AS (key: chararray, col1, col2);
*/
public class CassandraLoader extends LoadFunc {
private RecordReader reader;
private ColumnFamilyInputFormat _inputFormat;
private String _keyspace;
private String _column_family;
private List<byte[]> _column_names;
private static final Log LOG = LogFactory.getLog(CassandraLoader.class);
public CassandraLoader(String target) {
if (target.startsWith("cassandra://")) {
target = target.substring(12);
}
String[] bits = target.split("/");
_inputFormat = new ColumnFamilyInputFormat();
_keyspace = bits[0];
_column_family = bits[1];
_column_names = new ArrayList<byte[]>();
}
@Override
public Tuple getNext() throws IOException {
try {
if (reader.nextKeyValue()) {
String key = (String) reader.getCurrentKey();
SortedMap<byte[], IColumn> map = (SortedMap<byte[], IColumn>) reader.getCurrentValue();
Tuple tuple = TupleFactory.getInstance().newTuple(_column_names.size() + 1);
tuple.set(0, new DataByteArray(key));
for (int i = 0; i < _column_names.size(); ++i) {
IColumn col = map.get(_column_names.get(i));
tuple.set(i + 1, new DataByteArray(col.value()));
}
return tuple;
}
} catch (InterruptedException e) {
throw new IOException(e);
}
return null;
}
@Override
public InputFormat getInputFormat() {
return _inputFormat;
}
@Override
public void prepareToRead(RecordReader reader, PigSplit split) {
this.reader = reader;
}
@Override
public void setLocation(String columns, Job job) throws IOException {
if (_column_names.isEmpty()) {
if (columns.indexOf(",") == -1) {
_column_names.add(columns.getBytes());
} else {
String[] cols = columns.split(",");
for (String col : cols) {
_column_names.add(col.getBytes());
}
}
}
_inputFormat.setColumnFamily(job, _keyspace, _column_family);
SlicePredicate predicate = new SlicePredicate().setColumn_names(_column_names);
_inputFormat.setSlicePredicate(job, predicate);
}
@Override
public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
return location;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment