|
import ch.qos.logback.classic.Level |
|
import com.google.common.base.Stopwatch |
|
import org.apache.jackrabbit.oak.api.Blob |
|
import org.apache.jackrabbit.oak.api.PropertyState |
|
import org.apache.jackrabbit.oak.api.Type |
|
import org.apache.jackrabbit.oak.commons.PathUtils |
|
import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry |
|
import org.apache.jackrabbit.oak.spi.state.NodeState |
|
import org.apache.jackrabbit.oak.spi.state.NodeStateUtils |
|
import org.apache.jackrabbit.oak.spi.state.NodeStore |
|
import org.slf4j.Logger |
|
import org.slf4j.LoggerFactory |
|
|
|
import javax.jcr.PropertyType |
|
|
|
class BlobChecker { |
|
final Logger log = LoggerFactory.getLogger('script-console') |
|
|
|
//ch.qos.logback.classic.Logger |
|
def dsLogger = LoggerFactory.getLogger('org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore') |
|
def dsLoggerLevel = null |
|
|
|
final NodeStore nodeStore |
|
final BlobValidator validator |
|
int blobCount |
|
int orphanedBlobRefs |
|
long nodeCount |
|
def sc |
|
def binding |
|
boolean osgi //is running in Felix Script Console |
|
PrintWriter reporter |
|
|
|
//By default it would only report upto 1000 missing binary. Set it to -1 for complete check |
|
int maxErrorCount = 1000 |
|
|
|
|
|
BlobChecker(def opts){ |
|
//Named args |
|
String fdsDir = opts.fdsDir |
|
String reportPath = opts.reportPath |
|
def script = opts.script |
|
|
|
assert script : "Script parameter is required" |
|
|
|
this.sc = script |
|
this.binding = sc.binding |
|
this.osgi = sc.osgi != null |
|
this.nodeStore = getStore() |
|
this.validator = fdsDir ? new FDSBlobValidator(fdsDir) : new NodeStoreBlobValidator() |
|
if (fdsDir) { |
|
print0("Validating via FileDataStore $fdsDir") |
|
} |
|
|
|
if (reportPath){ |
|
File report = new File("$reportPath/orphan-files.txt") |
|
reporter = new PrintWriter(new BufferedWriter(new FileWriter(report))) |
|
print0("Report also saved at ${report.absolutePath}") |
|
} |
|
} |
|
|
|
def checkBlobs(String rootPath = '/', boolean fromCheckpoint = false){ |
|
NodeState node = nodeStore.root |
|
|
|
if (fromCheckpoint) { |
|
NodeState async = nodeStore.root.getChildNode(':async') |
|
String cp = async.getString('async') |
|
assert cp |
|
node = store.retrieve(cp) |
|
assert node : "Could not retrieve state from checkpoint $cp" |
|
print0("Using NodeState from checkpoint $cp") |
|
} |
|
|
|
Stopwatch w = Stopwatch.createStarted() |
|
start() |
|
try { |
|
checkBlobs(rootPath, node) |
|
}finally { |
|
print0("Timetaken : $w") |
|
done() |
|
} |
|
} |
|
|
|
def start() { |
|
//Set the DS Logger level to ERROR to hide the unnecessary warning for missing binary |
|
dsLoggerLevel = dsLogger.level |
|
dsLogger.level = Level.ERROR |
|
} |
|
|
|
private void done() { |
|
if (reporter) { |
|
reporter.close() |
|
} |
|
dsLogger.level = dsLoggerLevel |
|
} |
|
|
|
def checkBlobs(String path, NodeState rootState){ |
|
if (!PathUtils.isValid(path)) { |
|
println("Not a valid path: $path"); |
|
return; |
|
} |
|
|
|
NodeState node = NodeStateUtils.getNode(rootState, path) |
|
checkBlobsRecurse(node, path) |
|
print0("Path : $path") |
|
print0("Blob Count : $blobCount") |
|
print0("Orphaned Count : $orphanedBlobRefs") |
|
print0("Node Count : $nodeCount") |
|
} |
|
|
|
def checkBlobsRecurse(NodeState node, String path) { |
|
checkBinaryProps(node, path) |
|
node.childNodeEntries.each {ChildNodeEntry cne -> |
|
checkBlobsRecurse(cne.nodeState, PathUtils.concat(path, cne.name)) |
|
} |
|
} |
|
|
|
def checkBinaryProps(NodeState node, String path) { |
|
node.properties.each {PropertyState ps -> |
|
if (ps.type.tag() == PropertyType.BINARY) { |
|
(0..<ps.count()).each {int i -> |
|
Blob b = ps.getValue(Type.BINARY, i) |
|
if (isExternal(b)) { |
|
if (!validator.isValid(b)) { |
|
print0("==,$path/${ps.name}, ${b.contentIdentity}") |
|
orphanedBlobRefs++ |
|
if (maxErrorCount > 0){ |
|
assert orphanedBlobRefs < maxErrorCount : "Number of missing binary " + |
|
"found to be greater than $maxErrorCount. Aborting further check. Increase the " + |
|
"limit to check further" |
|
} |
|
} |
|
blobCount++ |
|
} |
|
} |
|
} |
|
} |
|
|
|
nodeCount++ |
|
if (nodeCount % 10000L == 0L){ |
|
print0("Traversed through $nodeCount nodes so far. Orphaned binary $orphanedBlobRefs/$blobCount") |
|
} |
|
} |
|
|
|
boolean isExternal(Blob b) { |
|
//For segment we can easily check if blob is external |
|
if (b.class.name.endsWith("SegmentBlob")){ |
|
return b.isExternal() |
|
} |
|
return true |
|
} |
|
|
|
def print0(def msg){ |
|
sc.println(msg) |
|
if (reporter){ |
|
reporter.println(msg) |
|
} else if (osgi){ |
|
log.info(msg) |
|
} |
|
} |
|
|
|
NodeStore getStore(){ |
|
if (osgi) { |
|
//Groovy script console |
|
return sc.osgi.getService(org.apache.sling.jcr.api.SlingRepository.class).manager.store |
|
} else { |
|
//Groovy Oak Console |
|
return sc.session.store |
|
} |
|
} |
|
|
|
interface BlobValidator { |
|
boolean isValid(Blob b) |
|
} |
|
|
|
static class NodeStoreBlobValidator implements BlobValidator{ |
|
@Override |
|
boolean isValid(Blob b) { |
|
InputStream is = null |
|
try{ |
|
is = b.getNewStream() |
|
return true |
|
} catch (Exception e){ |
|
return false |
|
} finally { |
|
is?.close() |
|
} |
|
} |
|
} |
|
|
|
static class FDSBlobValidator implements BlobValidator{ |
|
final String dsPath |
|
|
|
FDSBlobValidator(String dsPath){ |
|
this.dsPath = dsPath |
|
assert new File(dsPath).exists() : "FDS Dir $dsPath not valid" |
|
} |
|
|
|
@Override |
|
boolean isValid(Blob b) { |
|
if (b.class.name.endsWith("SegmentBlob") && b.isExternal()){ |
|
return isValidId(b.blobId) |
|
} |
|
return true |
|
} |
|
|
|
private boolean isValidId(String blobId){ |
|
String blobFileName = blobId.substring(0,40); |
|
String path = "$dsPath/${blobId.substring(0, 2)}/${blobId.substring(2, 4)}/${blobId.substring(4, 6)}/$blobFileName" |
|
File f = new File(path); |
|
return f.exists() |
|
} |
|
} |
|
} |