Skip to content

Instantly share code, notes, and snippets.

@tcz
Created December 3, 2012 00:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tcz/4191668 to your computer and use it in GitHub Desktop.
Save tcz/4191668 to your computer and use it in GitHub Desktop.
Xuggler integration with BytesWritable
package hu.tcz.udder;
import com.xuggle.xuggler.io.IURLProtocolHandler;
import com.xuggle.xuggler.io.IURLProtocolHandlerFactory;
import org.apache.hadoop.io.BytesWritable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.UUID;
public class BytesWritableUrlHandler implements IURLProtocolHandlerFactory {
private static HashMap<String, BytesWritable> writables = new HashMap<String, BytesWritable>();
public final static String PROTOCOL_NAME = "byteswritable";
public IURLProtocolHandler getHandler(String protocol, String url, int flags) {
if ( !writables.containsKey(url) )
{
// File not found;
return null;
}
BytesWritable writable = writables.get(url);
return new BytesWritableHandler(writable);
}
public static String addWritable( BytesWritable writable )
{
String name = PROTOCOL_NAME + ":" + UUID.randomUUID().toString();
writables.put(name, writable);
return name;
}
private static class BytesWritableHandler implements IURLProtocolHandler {
private BytesWritable writable;
private int bytePointer = 0;
public BytesWritableHandler( BytesWritable writable )
{
this.writable = writable;
}
public int open(String url, int flags) {
return 0;
}
public int read(byte[] bytes, int size) {
int readableSize;
readableSize = Math.min(size, (this.writable.getLength() - bytePointer));
bytes = Arrays.copyOfRange(this.writable.getBytes(), bytePointer, readableSize);
bytePointer += readableSize;
return readableSize;
}
public int write(byte[] bytes, int size) {
this.writable.set(bytes, bytePointer, size);
bytePointer += size;
return size;
}
public long seek(long offset, int whence) {
long writableLength = this.writable.getLength();
long maxSeek = (writableLength - bytePointer);
long seek = 0;
switch(whence)
{
case SEEK_CUR:
seek = Math.min(maxSeek, offset);
bytePointer += seek;
break;
case SEEK_SET:
seek = Math.min((bytePointer+maxSeek), offset);
bytePointer = (int) seek;
case SEEK_END:
offset = Math.min(0, offset);
seek = Math.max(-writableLength, offset);
bytePointer = (int) (writableLength - offset);
break;
case SEEK_SIZE:
break;
}
return seek;
}
public int close() {
return 0;
}
public boolean isStreamed(String url, int flags) {
return false;
}
}
}
java.lang.RuntimeException: could not open: byteswritable:d68ce8fa-c56d-4ff5-bade-a4cfb3f666fe
at com.xuggle.mediatool.MediaReader.open(MediaReader.java:637)
at com.xuggle.mediatool.MediaReader.readPacket(MediaReader.java:434)
at hu.tcz.udder.format.mp3.WaveToMp3Mapper.convertToMP3(WaveToMp3Mapper.java:70)
at hu.tcz.udder.format.mp3.WaveToMp3Mapper.map(WaveToMp3Mapper.java:38)
at hu.tcz.udder.format.mp3.WaveToMp3Mapper.map(WaveToMp3Mapper.java:25)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177)
package hu.tcz.udder.format.mp3;
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
import com.xuggle.mediatool.ToolFactory;
import com.xuggle.mediatool.IMediaReader;
import com.xuggle.mediatool.IMediaWriter;
import com.xuggle.mediatool.MediaListenerAdapter;
import com.xuggle.mediatool.event.IAddStreamEvent;
import com.xuggle.xuggler.IStreamCoder;
import com.xuggle.xuggler.io.URLProtocolManager;
import hu.tcz.udder.BytesWritableUrlHandler;
public class WaveToMp3Mapper extends Mapper<LongWritable, BytesWritable, LongWritable, BytesWritable> {
@Override
public void setup(Mapper.Context context)
{
URLProtocolManager protocol_manager = URLProtocolManager.getManager();
protocol_manager.registerFactory(
BytesWritableUrlHandler.PROTOCOL_NAME,
new BytesWritableUrlHandler());
}
@Override
public void map(LongWritable key, BytesWritable value, Mapper.Context context) throws IOException, InterruptedException {
BytesWritable converted = new BytesWritable();
convertToMP3(value, converted);
context.write(key,converted);
}
public void convertToMP3(BytesWritable input, BytesWritable output) { //modify on your convenience
String inputPath = BytesWritableUrlHandler.addWritable(input);
String outputPath = BytesWritableUrlHandler.addWritable(output);
IMediaReader mediaReader = ToolFactory.makeReader(inputPath);
IMediaWriter mediaWriter = ToolFactory.makeWriter(outputPath, mediaReader);
mediaReader.addListener(mediaWriter);
mediaWriter.addListener(new MediaListenerAdapter() {
@Override
public void onAddStream(IAddStreamEvent event) {
IStreamCoder streamCoder = event.getSource().getContainer().getStream(event.getStreamIndex()).getStreamCoder();
streamCoder.setFlag(IStreamCoder.Flags.FLAG_QSCALE, false);
streamCoder.setBitRate(320000);
streamCoder.setBitRateTolerance(0);
}
}
);
while (mediaReader.readPacket() == null);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment