Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
/*
複数のプロセスから排他的にデータを読み書きします。
また、メタデータとしてバージョンやデータのダイジェストを保存することで 更新チェックを高速に行えるようにしています。
- 読み書きの際はメタデータにjavaのFileLock が取得されます。
- メタデータの読み書きは javaの MappedByteBuffer.force が使われます。
データの読み込みには byte[] load_if_update() を使います。
まだデータを読んだことがないか、最後に読んだあとに変更されていればデータを読み込みます。
データの書き出しには byte[] TransactionProc インタフェースの実装クラスをユーザが作成して、
transaction(TransactionProc) を呼び出します。
TransactionProc#update では 古いデータを加工して新しいデータを返すようなコードを実装してください。
注意点
- このクラスは実際にはメタデータ、データファイル、データファイルの一時ファイルの3つのファイルを使います。
- このクラスはプロセス間の排他を行いますが、スレッド間の排他は行なっていません。
- MappedByteBuffer は明示的に unmapを行うことができません。gcまかせです。
- MappedByteBuffer.force() の結果が別プロセス上のマッピングにすぐに伝達されるかどうかは未定義です。
*/
package jp.juggler.util;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import android.util.Log;
public class ExclusiveDataFile {
static final String TAG="ExclusiveDataFile";
public static boolean debug = false;
// ダイジェスト計算に使うアルゴリズム
static final String digest_type = "SHA-1";
// 仮想メモリのページサイズ。実際には FileChannel#map が適当に調整してくれるはず…
static final int pagesize = 4096;
// トランザクションのデータ計算インタフェース
public interface TransactionProc{
byte[] update(byte[] old) throws IOException;
}
////////////////////////////////////////
// コンストラクタ
// データファイル
final File datafile;
// データファイルの一時ファイル
final File datafile_tmp;
// メタデータを格納するファイル
final File metafile;
// ファイルのパーミッション
final int permission;
public ExclusiveDataFile(File datafile, File datafile_tmp, File metafile, int permission,boolean bOpen) throws IOException{
this.datafile = datafile;
this.datafile_tmp = datafile_tmp;
this.metafile = metafile;
this.permission = permission;
if(bOpen) open();
}
/////////////////////////////////////////////////////////////
RandomAccessFile meta_file_handle;
FileChannel meta_channel;
FileLock meta_lock;
MappedByteBuffer meta_buffer;
byte[] tmp = new byte[pagesize];
// メタデータを開く
public void open() throws IOException{
meta_file_handle = new RandomAccessFile(metafile, "rw");
set_permission(metafile.getPath(),permission);
//
meta_channel = meta_file_handle.getChannel();
if( meta_channel.size() < pagesize ){
meta_channel.position(0);
byte[] tmp = new byte[pagesize];
for(int i=0;i<pagesize;++i) tmp[i]=0;
ByteBuffer buffer = ByteBuffer.wrap(tmp);
buffer.position(0);
int nWrite = 0;
while( nWrite<pagesize ){
int delta = meta_channel.write(buffer);
nWrite += delta;
}
meta_channel.force(true);
meta_channel.position(0);
}
meta_buffer_map();
}
// メタデータを閉じる
public void close(){
unlock();
meta_buffer_unmap();
if( meta_channel != null ){
try{ meta_channel.close(); }catch(Throwable ex){}
meta_channel = null;
}
if( meta_file_handle != null ){
try{ meta_file_handle.close(); } catch(Throwable ex){}
meta_file_handle = null;
}
}
// 閉じて、データとメタデータを削除して、開き直す
public void create() throws IOException {
close();
metafile.delete();
datafile.delete();
last_version = 0;
last_hash_length = -1;
open();
}
//////////////////////////////////////////////////
// mmap
public void meta_buffer_map() throws IOException{
if( meta_buffer == null ){
meta_buffer = meta_channel.map(FileChannel.MapMode.READ_WRITE,0,pagesize);
meta_buffer.load();
}
}
public void meta_buffer_unmap(){
if( meta_buffer != null ){
meta_buffer =null;
System.gc();
// unmap を明示的に行うメソッドがない
// GCがファイナライズを省略したらリークしてしまう?
}
}
///////////////////////////////////////////////////
// flock
public void lock() {
if( meta_lock == null ){
try{
meta_lock = meta_channel.lock();
}catch(Throwable ex){
ex.printStackTrace();
throw new RuntimeException("lock failed.",ex);
}
}
}
public void unlock(){
if( meta_lock != null ){
try{ meta_lock.release(); }catch(Throwable ex){}
meta_lock = null;
}
}
///////////////////////////////////////////////////
// メタデータの更新
private void meta_write(int data_len,int version,int digest_len,byte[] digest){
meta_buffer.position(0);
meta_buffer.putInt( data_len );
meta_buffer.putInt( version );
meta_buffer.putInt( digest_len );
meta_buffer.put( digest );
meta_buffer.force();
}
private boolean isMetaChanged(){
int version = meta_buffer.getInt(4);
if( version != last_version ) return true;
int hash_length = meta_buffer.getInt(8);
if( hash_length != last_hash_length ) return true;
meta_buffer.position(12);
for(int i=0;i<hash_length;++i){
if( last_hash[i] == meta_buffer.get() ) continue;
return true;
}
return false;
}
//////////////////////////////////////////////////////
private int last_version = -1;
private int last_hash_length = -1;
private byte[] last_hash = new byte[pagesize];
private byte[] last_data;
// データのセーブ(内部処理のみで、ロックを含まない)
private void save_sub(byte[] new_data) throws IOException{
try{
// save
FileOutputStream fo = new FileOutputStream(datafile_tmp);
try{
fo.write(new_data);
}finally{
fo.close();
}
//
set_permission( datafile_tmp.getPath() ,permission );
//
datafile_tmp.renameTo(datafile);
}finally{
datafile_tmp.delete();
}
// ダイジェストを計算する
byte[] digest = check_digest(new_data);
// バージョン番号を計算する
int new_version;
if( last_version == Integer.MAX_VALUE
|| last_version <= 0
){
new_version = 1;
}else{
new_version = last_version +1;
}
// メタデータを書き込む
if(debug) Log.d(TAG,String.format( "save: datalen=%d,version=%d,digestlen=%d"
,new_data.length
,new_version
,digest.length
));
meta_write(
new_data.length,
new_version,
digest.length,
digest
);
}
// データのロード(内部処理のみで、ロックを行わない)
private byte[] load_sub() throws IOException{
// load metadata
meta_buffer.position(0);
int length_data = meta_buffer.getInt();
last_version = meta_buffer.getInt();
last_hash_length = meta_buffer.getInt();
meta_buffer.get(last_hash,0,last_hash_length);
if(debug) Log.d(TAG,String.format( "load: datalen=%d,version=%d,digestlen=%d"
,length_data
,last_version
,last_hash_length
));
// read main data
FileInputStream in = new FileInputStream(datafile);
try{
byte[] data = new byte[length_data];
int nRead = 0;
while(nRead<length_data){
int delta = in.read(data,nRead,length_data-nRead);
if(delta <= 0 ) throw new RuntimeException(String.format("unexpected EOF (read=%d,remain=%d)",nRead,length_data-nRead));
nRead += delta;
}
if( length_data > 0 ){
// データがカラではない場合はダイジェストを確認する
byte[] digest = check_digest(data);
if( digest.length != last_hash_length ) throw new RuntimeException("datafile is broken. digest size not match.");
for(int i=0;i<last_hash_length;++i){
if( last_hash[i] != digest[i] ) throw new RuntimeException("datafile is broken. digest not match.");
}
}
last_data = data;
return data;
}finally{
in.close();
}
}
// データの強制ロード
public byte[] load() throws IOException{
lock();
try{
return load_sub();
}finally{
unlock();
}
}
// データが更新されていればロード,でなければnullを返す
public byte[] load_if_update() throws IOException{
// ロック前にversionだけ見て大雑把に確認する
int version = meta_buffer.getInt(4);
if( version == last_version ) return null;
// ロックして
lock();
try{
return isMetaChanged() ? load_sub() : null;
}finally{
unlock();
}
}
// 最後に読んだデータを取得
public byte[] getLastLoad(){
return last_data;
}
// transaction update
public void transaction(TransactionProc proc) throws IOException {
lock();
try{
byte[] old_data = last_data;
try{
if( isMetaChanged() ) old_data = load_sub();
}catch(FileNotFoundException ex){
old_data = null; // ファイルがない場合
}
// update
byte[] new_data = proc.update(old_data);
save_sub(new_data);
}finally{
datafile_tmp.delete();
unlock();
}
}
///////////////////////////////////////////////////
// ユーティリティ
// Androidの非公開APIを使ってファイルパーミッションを設定する
public static final int set_permission(String path,int perms){
return set_permission(path,perms,-1,-1);
}
// Androidの非公開APIを使ってファイルパーミッションを設定する
public static final int set_permission(String path,int perms,int uid,int gid){
try{
//
Class<?> clazz = Class.forName("android.os.FileUtils");
Method method= clazz.getMethod("setPermissions",String.class ,int.class ,int.class ,int.class);
//
return ((Integer)(method.invoke(null,path,perms,uid,gid))).intValue();
// returns 0 or errno
}catch(Throwable ex){
ex.printStackTrace();
return -1;
}
}
// ダイジェストの計算
public static final byte[] check_digest(byte[] data){
try{
MessageDigest digest_maker = MessageDigest.getInstance(digest_type);
digest_maker.update(data);
return digest_maker.digest();
}catch(NoSuchAlgorithmException ex){
throw new RuntimeException(ex);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.