Created
October 20, 2011 23:44
-
-
Save bigjust/1302723 to your computer and use it in GitHub Desktop.
Y U NO REPLICATE SOLR???
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.solr.handler; | |
import org.apache.commons.httpclient.*; | |
import org.apache.commons.httpclient.auth.AuthScope; | |
import org.apache.commons.httpclient.methods.PostMethod; | |
import org.apache.commons.io.IOUtils; | |
import org.apache.lucene.index.IndexCommit; | |
import org.apache.solr.common.SolrException; | |
import org.apache.solr.common.params.ModifiableSolrParams; | |
import org.apache.solr.common.util.FastInputStream; | |
import org.apache.solr.common.util.JavaBinCodec; | |
import org.apache.solr.common.util.NamedList; | |
import org.apache.solr.common.util.FileUtils; | |
import org.apache.solr.core.SolrCore; | |
import static org.apache.solr.handler.ReplicationHandler.*; | |
import org.apache.solr.request.LocalSolrQueryRequest; | |
import org.apache.solr.request.SolrQueryRequest; | |
import org.apache.solr.search.SolrIndexSearcher; | |
import org.apache.solr.update.CommitUpdateCommand; | |
import org.apache.solr.update.DirectUpdateHandler2; | |
import org.apache.solr.util.RefCounted; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.*; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.FileChannel; | |
import java.text.SimpleDateFormat; | |
import java.util.*; | |
import java.util.concurrent.*; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.regex.Matcher; | |
import java.util.regex.Pattern; | |
import java.util.zip.Adler32; | |
import java.util.zip.Checksum; | |
import java.util.zip.GZIPInputStream; | |
import java.util.zip.InflaterInputStream; | |
/** | |
* <p/> Provides functionality of downloading changed index files as well as config files and a timer for scheduling fetches from the | |
* master. </p> | |
* | |
* | |
* @since solr 1.4 | |
*/ | |
public class SnapPuller { | |
private static final Logger LOG = LoggerFactory.getLogger(SnapPuller.class.getName()); | |
private static final List<Map<String,Object>> EMPTY_LIST_OF_MAPS | |
= Collections.emptyList(); | |
private final String masterUrl; | |
private final ReplicationHandler replicationHandler; | |
private final Integer pollInterval; | |
private String pollIntervalStr; | |
private ScheduledExecutorService executorService; | |
private volatile long executorStartTime; | |
private volatile long replicationStartTime; | |
private final SolrCore solrCore; | |
private volatile List<Map<String, Object>> filesToDownload; | |
private volatile List<Map<String, Object>> confFilesToDownload; | |
private volatile List<Map<String, Object>> filesDownloaded; | |
private volatile List<Map<String, Object>> confFilesDownloaded; | |
private volatile Map<String, Object> currentFile; | |
private volatile FileFetcher fileFetcher; | |
private volatile ExecutorService fsyncService; | |
private volatile boolean stop = false; | |
private boolean useInternal = false; | |
private boolean useExternal = false; | |
/** | |
* Disable the timer task for polling | |
*/ | |
private AtomicBoolean pollDisabled = new AtomicBoolean(false); | |
// HttpClient shared by all cores (used if timeout is not specified for a core) | |
private static HttpClient client; | |
// HttpClient for this instance if connectionTimeout or readTimeout has been specified | |
private final HttpClient myHttpClient; | |
private static synchronized HttpClient createHttpClient(String connTimeout, String readTimeout) { | |
if (connTimeout == null && readTimeout == null && client != null) return client; | |
MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager(); | |
// Keeping a very high number so that if you have a large number of cores | |
// no requests are kept waiting for an idle connection. | |
mgr.getParams().setDefaultMaxConnectionsPerHost(10000); | |
mgr.getParams().setMaxTotalConnections(10000); | |
mgr.getParams().setSoTimeout(readTimeout == null ? 20000 : Integer.parseInt(readTimeout)); //20 secs | |
mgr.getParams().setConnectionTimeout(connTimeout == null ? 5000 : Integer.parseInt(connTimeout)); //5 secs | |
HttpClient httpClient = new HttpClient(mgr); | |
if (client == null && connTimeout == null && readTimeout == null) client = httpClient; | |
return httpClient; | |
} | |
public SnapPuller(NamedList initArgs, ReplicationHandler handler, SolrCore sc) { | |
solrCore = sc; | |
masterUrl = (String) initArgs.get(MASTER_URL); | |
if (masterUrl == null) | |
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, | |
"'masterUrl' is required for a slave"); | |
this.replicationHandler = handler; | |
pollIntervalStr = (String) initArgs.get(POLL_INTERVAL); | |
pollInterval = readInterval(pollIntervalStr); | |
String compress = (String) initArgs.get(COMPRESSION); | |
useInternal = INTERNAL.equals(compress); | |
useExternal = EXTERNAL.equals(compress); | |
String connTimeout = (String) initArgs.get(HTTP_CONN_TIMEOUT); | |
String readTimeout = (String) initArgs.get(HTTP_READ_TIMEOUT); | |
String httpBasicAuthUser = (String) initArgs.get(HTTP_BASIC_AUTH_USER); | |
String httpBasicAuthPassword = (String) initArgs.get(HTTP_BASIC_AUTH_PASSWORD); | |
myHttpClient = createHttpClient(connTimeout, readTimeout); | |
if (httpBasicAuthUser != null && httpBasicAuthPassword != null) { | |
myHttpClient.getState().setCredentials(AuthScope.ANY, | |
new UsernamePasswordCredentials(httpBasicAuthUser, httpBasicAuthPassword)); | |
} | |
if (pollInterval != null && pollInterval > 0) { | |
startExecutorService(); | |
} else { | |
LOG.info(" No value set for 'pollInterval'. Timer Task not started."); | |
} | |
} | |
private void startExecutorService() { | |
Runnable task = new Runnable() { | |
public void run() { | |
if (pollDisabled.get()) { | |
LOG.info("Poll disabled"); | |
return; | |
} | |
try { | |
executorStartTime = System.currentTimeMillis(); | |
replicationHandler.doFetch(null); | |
} catch (Exception e) { | |
LOG.error("Exception in fetching index", e); | |
} | |
} | |
}; | |
executorService = Executors.newSingleThreadScheduledExecutor(); | |
long initialDelay = pollInterval - (System.currentTimeMillis() % pollInterval); | |
executorService.scheduleAtFixedRate(task, initialDelay, pollInterval, TimeUnit.MILLISECONDS); | |
LOG.info("Poll Scheduled at an interval of " + pollInterval + "ms"); | |
} | |
/** | |
* Gets the latest commit version and generation from the master | |
*/ | |
@SuppressWarnings("unchecked") | |
NamedList getLatestVersion() throws IOException { | |
PostMethod post = new PostMethod(masterUrl); | |
post.addParameter(COMMAND, CMD_INDEX_VERSION); | |
post.addParameter("wt", "javabin"); | |
return getNamedListResponse(post); | |
} | |
NamedList getCommandResponse(NamedList<String> commands) throws IOException { | |
PostMethod post = new PostMethod(masterUrl); | |
for (Map.Entry<String, String> c : commands) { | |
post.addParameter(c.getKey(),c.getValue()); | |
} | |
post.addParameter("wt", "javabin"); | |
return getNamedListResponse(post); | |
} | |
private NamedList<?> getNamedListResponse(PostMethod method) throws IOException { | |
try { | |
int status = myHttpClient.executeMethod(method); | |
if (status != HttpStatus.SC_OK) { | |
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, | |
"Request failed for the url " + method); | |
} | |
return (NamedList<?>) new JavaBinCodec().unmarshal(method.getResponseBodyAsStream()); | |
} finally { | |
try { | |
method.releaseConnection(); | |
} catch (Exception e) { | |
} | |
} | |
} | |
/** | |
* Fetches the list of files in a given index commit point | |
*/ | |
void fetchFileList(long version) throws IOException { | |
PostMethod post = new PostMethod(masterUrl); | |
post.addParameter(COMMAND, CMD_GET_FILE_LIST); | |
post.addParameter(CMD_INDEX_VERSION, String.valueOf(version)); | |
post.addParameter("wt", "javabin"); | |
@SuppressWarnings("unchecked") | |
NamedList<List<Map<String, Object>>> nl | |
= (NamedList<List<Map<String, Object>>>) getNamedListResponse(post); | |
List<Map<String, Object>> f = nl.get(CMD_GET_FILE_LIST); | |
if (f != null) | |
filesToDownload = Collections.synchronizedList(f); | |
else { | |
filesToDownload = Collections.emptyList(); | |
LOG.error("No files to download for indexversion: "+ version); | |
} | |
f = nl.get(CONF_FILES); | |
if (f != null) | |
confFilesToDownload = Collections.synchronizedList(f); | |
} | |
/** | |
* This command downloads all the necessary files from master to install a index commit point. Only changed files are | |
* downloaded. It also downloads the conf files (if they are modified). | |
* | |
* @param core the SolrCore | |
* @return true on success, false if slave is already in sync | |
* @throws IOException if an exception occurs | |
*/ | |
@SuppressWarnings("unchecked") | |
boolean successfulInstall = false; | |
boolean fetchLatestIndex(SolrCore core) throws IOException { | |
replicationStartTime = System.currentTimeMillis(); | |
try { | |
//get the current 'replicateable' index version in the master | |
NamedList response = null; | |
try { | |
response = getLatestVersion(); | |
} catch (Exception e) { | |
LOG.error("Master at: " + masterUrl + " is not available. Index fetch failed. Exception: " + e.getMessage()); | |
return false; | |
} | |
long latestVersion = (Long) response.get(CMD_INDEX_VERSION); | |
long latestGeneration = (Long) response.get(GENERATION); | |
if (latestVersion == 0L) { | |
//there is nothing to be replicated | |
return false; | |
} | |
IndexCommit commit; | |
RefCounted<SolrIndexSearcher> searcherRefCounted = null; | |
try { | |
searcherRefCounted = core.getNewestSearcher(false); | |
commit = searcherRefCounted.get().getIndexReader().getIndexCommit(); | |
} finally { | |
if (searcherRefCounted != null) | |
searcherRefCounted.decref(); | |
} | |
if (commit.getVersion() == latestVersion && commit.getGeneration() == latestGeneration) { | |
//master and slave are alsready in sync just return | |
LOG.info("Slave in sync with master."); | |
return false; | |
} | |
LOG.info("Master's version: " + latestVersion + ", generation: " + latestGeneration); | |
LOG.info("Slave's version: " + commit.getVersion() + ", generation: " + commit.getGeneration()); | |
LOG.info("Starting replication process"); | |
// get the list of files first | |
fetchFileList(latestVersion); | |
// this can happen if the commit point is deleted before we fetch the file list. | |
if(filesToDownload.isEmpty()) return false; | |
LOG.info("Number of files in latest index in master: " + filesToDownload.size()); | |
// Create the sync service | |
fsyncService = Executors.newSingleThreadExecutor(); | |
// use a synchronized list because the list is read by other threads (to show details) | |
filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>()); | |
// if the generateion of master is older than that of the slave , it means they are not compatible to be copied | |
// then a new index direcory to be created and all the files need to be copied | |
boolean isFullCopyNeeded = commit.getGeneration() >= latestGeneration; | |
File tmpIndexDir = createTempindexDir(core); | |
if (isIndexStale()) | |
isFullCopyNeeded = true; | |
successfulInstall = false; | |
boolean deleteTmpIdxDir = true; | |
File indexDir = null ; | |
try { | |
indexDir = new File(core.getIndexDir()); | |
downloadIndexFiles(isFullCopyNeeded, tmpIndexDir, latestVersion); | |
LOG.info("Total time taken for download : " + ((System.currentTimeMillis() - replicationStartTime) / 1000) + " secs"); | |
<<<<<<<<<<<<<<<<<<< LAST LOG MESSAGE ^^^^ <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< | |
Collection<Map<String, Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload); | |
if (!modifiedConfFiles.isEmpty()) { | |
downloadConfFiles(confFilesToDownload, latestVersion); | |
if (isFullCopyNeeded) { | |
successfulInstall = modifyIndexProps(tmpIndexDir.getName()); | |
deleteTmpIdxDir = false; | |
} else { | |
successfulInstall = copyIndexFiles(tmpIndexDir, indexDir); | |
} | |
if (successfulInstall) { | |
LOG.info("Configuration files are modified, core will be reloaded"); | |
logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);//write to a file time of replication and conf files. | |
reloadCore(); | |
} | |
} else { | |
terminateAndWaitFsyncService(); | |
if (isFullCopyNeeded) { | |
successfulInstall = modifyIndexProps(tmpIndexDir.getName()); | |
deleteTmpIdxDir = false; | |
} else { | |
successfulInstall = copyIndexFiles(tmpIndexDir, indexDir); | |
} | |
if (successfulInstall) { | |
logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall); | |
doCommit(); | |
} | |
} | |
replicationStartTime = 0; | |
return successfulInstall; | |
} catch (ReplicationHandlerException e) { | |
LOG.error("User aborted Replication"); | |
} catch (SolrException e) { | |
throw e; | |
} catch (Exception e) { | |
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Index fetch failed : ", e); | |
} finally { | |
if (deleteTmpIdxDir) delTree(tmpIndexDir); | |
else delTree(indexDir); | |
} | |
return successfulInstall; | |
} finally { | |
if (!successfulInstall) { | |
logReplicationTimeAndConfFiles(null, successfulInstall); | |
} | |
filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null; | |
replicationStartTime = 0; | |
fileFetcher = null; | |
if (fsyncService != null && !fsyncService.isShutdown()) fsyncService.shutdownNow(); | |
fsyncService = null; | |
stop = false; | |
fsyncException = null; | |
} | |
} | |
private volatile Exception fsyncException; | |
/** | |
* terminate the fsync service and wait for all the tasks to complete. If it is already terminated | |
* | |
* @throws Exception | |
*/ | |
private void terminateAndWaitFsyncService() throws Exception { | |
if (fsyncService.isTerminated()) return; | |
fsyncService.shutdown(); | |
// give a long wait say 1 hr | |
fsyncService.awaitTermination(3600, TimeUnit.SECONDS); | |
// if any fsync failed, throw that exception back | |
Exception fsyncExceptionCopy = fsyncException; | |
if (fsyncExceptionCopy != null) throw fsyncExceptionCopy; | |
} | |
/** | |
* Helper method to record the last replication's details so that we can show them on the statistics page across | |
* restarts. | |
*/ | |
private void logReplicationTimeAndConfFiles(Collection<Map<String, Object>> modifiedConfFiles, boolean successfulInstall) { | |
FileOutputStream outFile = null; | |
List<String> confFiles = new ArrayList<String>(); | |
if (modifiedConfFiles != null && !modifiedConfFiles.isEmpty()) | |
for (Map<String, Object> map1 : modifiedConfFiles) | |
confFiles.add((String) map1.get(NAME)); | |
Properties props = replicationHandler.loadReplicationProperties(); | |
long replicationTime = System.currentTimeMillis(); | |
long replicationTimeTaken = (replicationTime - getReplicationStartTime()) / 1000; | |
try { | |
int indexCount = 1, confFilesCount = 1; | |
if (props.containsKey(TIMES_INDEX_REPLICATED)) { | |
indexCount = Integer.valueOf(props.getProperty(TIMES_INDEX_REPLICATED)) + 1; | |
} | |
StringBuffer sb = readToStringBuffer(replicationTime, props.getProperty(INDEX_REPLICATED_AT_LIST)); | |
props.setProperty(INDEX_REPLICATED_AT_LIST, sb.toString()); | |
props.setProperty(INDEX_REPLICATED_AT, String.valueOf(replicationTime)); | |
props.setProperty(PREVIOUS_CYCLE_TIME_TAKEN, String.valueOf(replicationTimeTaken)); | |
props.setProperty(TIMES_INDEX_REPLICATED, String.valueOf(indexCount)); | |
if (modifiedConfFiles != null && !modifiedConfFiles.isEmpty()) { | |
props.setProperty(CONF_FILES_REPLICATED, confFiles.toString()); | |
props.setProperty(CONF_FILES_REPLICATED_AT, String.valueOf(replicationTime)); | |
if (props.containsKey(TIMES_CONFIG_REPLICATED)) { | |
confFilesCount = Integer.valueOf(props.getProperty(TIMES_CONFIG_REPLICATED)) + 1; | |
} | |
props.setProperty(TIMES_CONFIG_REPLICATED, String.valueOf(confFilesCount)); | |
} | |
props.setProperty(LAST_CYCLE_BYTES_DOWNLOADED, String.valueOf(getTotalBytesDownloaded(this))); | |
if (!successfulInstall) { | |
int numFailures = 1; | |
if (props.containsKey(TIMES_FAILED)) { | |
numFailures = Integer.valueOf(props.getProperty(TIMES_FAILED)) + 1; | |
} | |
props.setProperty(TIMES_FAILED, String.valueOf(numFailures)); | |
props.setProperty(REPLICATION_FAILED_AT, String.valueOf(replicationTime)); | |
sb = readToStringBuffer(replicationTime, props.getProperty(REPLICATION_FAILED_AT_LIST)); | |
props.setProperty(REPLICATION_FAILED_AT_LIST, sb.toString()); | |
} | |
File f = new File(solrCore.getDataDir(), REPLICATION_PROPERTIES); | |
outFile = new FileOutputStream(f); | |
props.store(outFile, "Replication details"); | |
outFile.close(); | |
} catch (Exception e) { | |
LOG.warn("Exception while updating statistics", e); | |
} | |
finally { | |
IOUtils.closeQuietly(outFile); | |
} | |
} | |
static long getTotalBytesDownloaded(SnapPuller snappuller) { | |
long bytesDownloaded = 0; | |
//get size from list of files to download | |
for (Map<String, Object> file : snappuller.getFilesDownloaded()) { | |
bytesDownloaded += (Long) file.get(SIZE); | |
} | |
//get size from list of conf files to download | |
for (Map<String, Object> file : snappuller.getConfFilesDownloaded()) { | |
bytesDownloaded += (Long) file.get(SIZE); | |
} | |
//get size from current file being downloaded | |
Map<String, Object> currentFile = snappuller.getCurrentFile(); | |
if (currentFile != null) { | |
if (currentFile.containsKey("bytesDownloaded")) { | |
bytesDownloaded += (Long) currentFile.get("bytesDownloaded"); | |
} | |
} | |
return bytesDownloaded; | |
} | |
private StringBuffer readToStringBuffer(long replicationTime, String str) { | |
StringBuffer sb = new StringBuffer(); | |
List<String> l = new ArrayList<String>(); | |
if (str != null && str.length() != 0) { | |
String[] ss = str.split(","); | |
for (int i = 0; i < ss.length; i++) { | |
l.add(ss[i]); | |
} | |
} | |
sb.append(replicationTime); | |
if (!l.isEmpty()) { | |
for (int i = 0; i < l.size() || i < 9; i++) { | |
if (i == l.size() || i == 9) break; | |
String s = l.get(i); | |
sb.append(",").append(s); | |
} | |
} | |
return sb; | |
} | |
private void doCommit() throws IOException { | |
SolrQueryRequest req = new LocalSolrQueryRequest(solrCore, new ModifiableSolrParams()); | |
try { | |
CommitUpdateCommand cmd = new CommitUpdateCommand(req, false); | |
cmd.waitFlush = true; | |
cmd.waitSearcher = true; | |
solrCore.getUpdateHandler().commit(cmd); | |
if (solrCore.getUpdateHandler() instanceof DirectUpdateHandler2) { | |
LOG.info("Force open index writer to make sure older index files get deleted"); | |
<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< Log Message that gets printed 3-4 minutes Later <<<<<<<<<<<<<<<<<<<<< | |
DirectUpdateHandler2 handler = (DirectUpdateHandler2) solrCore.getUpdateHandler(); | |
handler.forceOpenWriter(); | |
replicationHandler.refreshCommitpoint(); | |
} else { | |
LOG.warn("The update handler is not an instance or sub-class of DirectUpdateHandler2. " + | |
"ReplicationHandler may not be able to cleanup un-used index files."); | |
} | |
} finally { | |
req.close(); | |
} | |
} | |
/** | |
* All the files are copied to a temp dir first | |
*/ | |
private File createTempindexDir(SolrCore core) { | |
String tmpIdxDirName = "index." + new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.US).format(new Date()); | |
File tmpIdxDir = new File(core.getDataDir(), tmpIdxDirName); | |
tmpIdxDir.mkdirs(); | |
return tmpIdxDir; | |
} | |
private void reloadCore() { | |
new Thread() { | |
@Override | |
public void run() { | |
try { | |
solrCore.getCoreDescriptor().getCoreContainer().reload(solrCore.getName()); | |
} catch (Exception e) { | |
LOG.error("Could not restart core ", e); | |
} | |
} | |
}.start(); | |
} | |
private void downloadConfFiles(List<Map<String, Object>> confFilesToDownload, long latestVersion) throws Exception { | |
LOG.info("Starting download of configuration files from master: " + confFilesToDownload); | |
confFilesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>()); | |
File tmpconfDir = new File(solrCore.getResourceLoader().getConfigDir(), "conf." + getDateAsStr(new Date())); | |
try { | |
boolean status = tmpconfDir.mkdirs(); | |
if (!status) { | |
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, | |
"Failed to create temporary config folder: " + tmpconfDir.getName()); | |
} | |
for (Map<String, Object> file : confFilesToDownload) { | |
String saveAs = (String) (file.get(ALIAS) == null ? file.get(NAME) : file.get(ALIAS)); | |
fileFetcher = new FileFetcher(tmpconfDir, file, saveAs, true, latestVersion); | |
currentFile = file; | |
fileFetcher.fetchFile(); | |
confFilesDownloaded.add(new HashMap<String, Object>(file)); | |
} | |
// this is called before copying the files to the original conf dir | |
// so that if there is an exception avoid corrupting the original files. | |
terminateAndWaitFsyncService(); | |
copyTmpConfFiles2Conf(tmpconfDir); | |
} finally { | |
delTree(tmpconfDir); | |
} | |
} | |
/** | |
* Download the index files. If a new index is needed, download all the files. | |
* | |
* @param downloadCompleteIndex is it a fresh index copy | |
* @param tmpIdxDir the directory to which files need to be downloadeed to | |
* @param latestVersion the version number | |
*/ | |
private void downloadIndexFiles(boolean downloadCompleteIndex, File tmpIdxDir, long latestVersion) throws Exception { | |
for (Map<String, Object> file : filesToDownload) { | |
File localIndexFile = new File(solrCore.getIndexDir(), (String) file.get(NAME)); | |
if (!localIndexFile.exists() || downloadCompleteIndex) { | |
fileFetcher = new FileFetcher(tmpIdxDir, file, (String) file.get(NAME), false, latestVersion); | |
currentFile = file; | |
fileFetcher.fetchFile(); | |
filesDownloaded.add(new HashMap<String, Object>(file)); | |
} else { | |
LOG.info("Skipping download for " + localIndexFile); | |
} | |
} | |
} | |
/** | |
* All the files which are common between master and slave must have same timestamp and size else we assume they are | |
* not compatible (stale). | |
* | |
* @return true if the index stale and we need to download a fresh copy, false otherwise. | |
*/ | |
private boolean isIndexStale() { | |
for (Map<String, Object> file : filesToDownload) { | |
File localIndexFile = new File(solrCore.getIndexDir(), (String) file | |
.get(NAME)); | |
if (localIndexFile.exists() | |
&& localIndexFile.length() != (Long) file.get(SIZE)) { | |
// file exists and size is different, therefore we must assume | |
// corrupted index | |
return true; | |
} | |
} | |
return false; | |
} | |
/** | |
* Copy a file by the File#renameTo() method. If it fails, it is considered a failure | |
* <p/> | |
*/ | |
private boolean copyAFile(File tmpIdxDir, File indexDir, String fname, List<String> copiedfiles) { | |
File indexFileInTmpDir = new File(tmpIdxDir, fname); | |
File indexFileInIndex = new File(indexDir, fname); | |
boolean success = indexFileInTmpDir.renameTo(indexFileInIndex); | |
if(!success){ | |
try { | |
LOG.error("Unable to move index file from: " + indexFileInTmpDir | |
+ " to: " + indexFileInIndex + "Trying to do a copy"); | |
FileUtils.copyFile(indexFileInTmpDir,indexFileInIndex); | |
success = true; | |
} catch (IOException e) { | |
LOG.error("Unable to copy index file from: " + indexFileInTmpDir | |
+ " to: " + indexFileInIndex , e); | |
} | |
} | |
if (!success) { | |
for (String f : copiedfiles) { | |
File indexFile = new File(indexDir, f); | |
if (indexFile.exists()) | |
indexFile.delete(); | |
} | |
delTree(tmpIdxDir); | |
return false; | |
} | |
return true; | |
} | |
/** | |
* Copy all index files from the temp index dir to the actual index. The segments_N file is copied last. | |
*/ | |
private boolean copyIndexFiles(File tmpIdxDir, File indexDir) throws IOException { | |
String segmentsFile = null; | |
List<String> copiedfiles = new ArrayList<String>(); | |
for (Map<String, Object> f : filesDownloaded) { | |
String fname = (String) f.get(NAME); | |
// the segments file must be copied last | |
// or else if there is a failure in between the | |
// index will be corrupted | |
if (fname.startsWith("segments_")) { | |
//The segments file must be copied in the end | |
//Otherwise , if the copy fails index ends up corrupted | |
segmentsFile = fname; | |
continue; | |
} | |
if (!copyAFile(tmpIdxDir, indexDir, fname, copiedfiles)) return false; | |
copiedfiles.add(fname); | |
} | |
//copy the segments file last | |
if (segmentsFile != null) { | |
if (!copyAFile(tmpIdxDir, indexDir, segmentsFile, copiedfiles)) return false; | |
} | |
return true; | |
} | |
/** | |
* The conf files are copied to the tmp dir to the conf dir. A backup of the old file is maintained | |
*/ | |
private void copyTmpConfFiles2Conf(File tmpconfDir) throws IOException { | |
File confDir = new File(solrCore.getResourceLoader().getConfigDir()); | |
for (File file : tmpconfDir.listFiles()) { | |
File oldFile = new File(confDir, file.getName()); | |
if (oldFile.exists()) { | |
File backupFile = new File(confDir, oldFile.getName() + "." + getDateAsStr(new Date(oldFile.lastModified()))); | |
boolean status = oldFile.renameTo(backupFile); | |
if (!status) { | |
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, | |
"Unable to rename: " + oldFile + " to: " + backupFile); | |
} | |
} | |
boolean status = file.renameTo(oldFile); | |
if (status) { | |
} else { | |
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, | |
"Unable to rename: " + file + " to: " + oldFile); | |
} | |
} | |
} | |
private String getDateAsStr(Date d) { | |
return new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.US).format(d); | |
} | |
/** | |
* If the index is stale by any chance, load index from a different dir in the data dir. | |
*/ | |
private boolean modifyIndexProps(String tmpIdxDirName) { | |
LOG.info("New index installed. Updating index properties..."); | |
File idxprops = new File(solrCore.getDataDir() + "index.properties"); | |
Properties p = new Properties(); | |
if (idxprops.exists()) { | |
InputStream is = null; | |
try { | |
is = new FileInputStream(idxprops); | |
p.load(is); | |
} catch (Exception e) { | |
LOG.error("Unable to load index.properties"); | |
} finally { | |
IOUtils.closeQuietly(is); | |
} | |
} | |
p.put("index", tmpIdxDirName); | |
FileOutputStream os = null; | |
try { | |
os = new FileOutputStream(idxprops); | |
p.store(os, "index properties"); | |
} catch (Exception e) { | |
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, | |
"Unable to write index.properties", e); | |
} finally { | |
IOUtils.closeQuietly(os); | |
} | |
return true; | |
} | |
private final Map<String, FileInfo> confFileInfoCache = new HashMap<String, FileInfo>(); | |
/** | |
* The local conf files are compared with the conf files in the master. If they are same (by checksum) do not copy. | |
* | |
* @param confFilesToDownload The list of files obtained from master | |
* | |
* @return a list of configuration files which have changed on the master and need to be downloaded. | |
*/ | |
private Collection<Map<String, Object>> getModifiedConfFiles(List<Map<String, Object>> confFilesToDownload) { | |
if (confFilesToDownload == null || confFilesToDownload.isEmpty()) | |
return EMPTY_LIST_OF_MAPS; | |
//build a map with alias/name as the key | |
Map<String, Map<String, Object>> nameVsFile = new HashMap<String, Map<String, Object>>(); | |
NamedList<String> names = new NamedList<String>(); | |
for (Map<String, Object> map : confFilesToDownload) { | |
//if alias is present that is the name the file may have in the slave | |
String name = (String) (map.get(ALIAS) == null ? map.get(NAME) : map.get(ALIAS)); | |
nameVsFile.put(name, map); | |
names.add(name, null); | |
} | |
//get the details of the local conf files with the same alias/name | |
List<Map<String, Object>> localFilesInfo = replicationHandler.getConfFileInfoFromCache(names, confFileInfoCache); | |
//compare their size/checksum to see if | |
for (Map<String, Object> fileInfo : localFilesInfo) { | |
String name = (String) fileInfo.get(NAME); | |
Map<String, Object> m = nameVsFile.get(name); | |
if (m == null) continue; // the file is not even present locally (so must be downloaded) | |
if (m.get(CHECKSUM).equals(fileInfo.get(CHECKSUM))) { | |
nameVsFile.remove(name); //checksums are same so the file need not be downloaded | |
} | |
} | |
return nameVsFile.isEmpty() ? EMPTY_LIST_OF_MAPS : nameVsFile.values(); | |
} | |
/** | |
* Delete the directory tree recursively | |
*/ | |
static boolean delTree(File dir) { | |
if (dir == null || !dir.exists()) | |
return false; | |
boolean isSuccess = true; | |
File contents[] = dir.listFiles(); | |
if (contents != null) { | |
for (File file : contents) { | |
if (file.isDirectory()) { | |
boolean success = delTree(file); | |
if (!success) { | |
LOG.warn("Unable to delete directory : " + file); | |
isSuccess = false; | |
} | |
} else { | |
boolean success = file.delete(); | |
if (!success) { | |
LOG.warn("Unable to delete file : " + file); | |
isSuccess = false; | |
return false; | |
} | |
} | |
} | |
} | |
return isSuccess && dir.delete(); | |
} | |
/** | |
* Disable periodic polling | |
*/ | |
void disablePoll() { | |
pollDisabled.set(true); | |
LOG.info("inside disable poll, value of pollDisabled = " + pollDisabled); | |
} | |
/** | |
* Enable periodic polling | |
*/ | |
void enablePoll() { | |
pollDisabled.set(false); | |
LOG.info("inside enable poll, value of pollDisabled = " + pollDisabled); | |
} | |
/** | |
* Stops the ongoing pull | |
*/ | |
void abortPull() { | |
stop = true; | |
} | |
long getReplicationStartTime() { | |
return replicationStartTime; | |
} | |
List<Map<String, Object>> getConfFilesToDownload() { | |
//make a copy first because it can be null later | |
List<Map<String, Object>> tmp = confFilesToDownload; | |
//create a new instance. or else iterator may fail | |
return tmp == null ? EMPTY_LIST_OF_MAPS : new ArrayList<Map<String, Object>>(tmp); | |
} | |
List<Map<String, Object>> getConfFilesDownloaded() { | |
//make a copy first because it can be null later | |
List<Map<String, Object>> tmp = confFilesDownloaded; | |
// NOTE: it's safe to make a copy of a SynchronizedCollection(ArrayList) | |
return tmp == null ? EMPTY_LIST_OF_MAPS : new ArrayList<Map<String, Object>>(tmp); | |
} | |
List<Map<String, Object>> getFilesToDownload() { | |
//make a copy first because it can be null later | |
List<Map<String, Object>> tmp = filesToDownload; | |
return tmp == null ? EMPTY_LIST_OF_MAPS : new ArrayList<Map<String, Object>>(tmp); | |
} | |
List<Map<String, Object>> getFilesDownloaded() { | |
List<Map<String, Object>> tmp = filesDownloaded; | |
return tmp == null ? EMPTY_LIST_OF_MAPS : new ArrayList<Map<String, Object>>(tmp); | |
} | |
Map<String, Object> getCurrentFile() { | |
Map<String, Object> tmp = currentFile; | |
FileFetcher tmpFileFetcher = fileFetcher; | |
if (tmp == null) | |
return null; | |
tmp = new HashMap<String, Object>(tmp); | |
if (tmpFileFetcher != null) | |
tmp.put("bytesDownloaded", tmpFileFetcher.bytesDownloaded); | |
return tmp; | |
} | |
boolean isPollingDisabled() { | |
return pollDisabled.get(); | |
} | |
Long getNextScheduledExecTime() { | |
Long nextTime = null; | |
if (executorStartTime > 0) | |
nextTime = executorStartTime + pollInterval; | |
return nextTime; | |
} | |
private static class ReplicationHandlerException extends InterruptedException { | |
public ReplicationHandlerException(String message) { | |
super(message); | |
} | |
} | |
/** | |
* The class acts as a client for ReplicationHandler.FileStream. It understands the protocol of wt=filestream | |
* | |
* @see org.apache.solr.handler.ReplicationHandler.FileStream | |
*/ | |
private class FileFetcher { | |
boolean includeChecksum = true; | |
private File copy2Dir; | |
String fileName; | |
String saveAs; | |
long size, lastmodified; | |
long bytesDownloaded = 0; | |
FileChannel fileChannel; | |
private FileOutputStream fileOutputStream; | |
byte[] buf = new byte[1024 * 1024]; | |
Checksum checksum; | |
File file; | |
int errorCount = 0; | |
private boolean isConf; | |
private PostMethod post; | |
private boolean aborted = false; | |
private Long indexVersion; | |
FileFetcher(File dir, Map<String, Object> fileDetails, String saveAs, | |
boolean isConf, long latestVersion) throws IOException { | |
this.copy2Dir = dir; | |
this.fileName = (String) fileDetails.get(NAME); | |
this.size = (Long) fileDetails.get(SIZE); | |
this.isConf = isConf; | |
this.saveAs = saveAs; | |
if(fileDetails.get(LAST_MODIFIED) != null){ | |
lastmodified = (Long)fileDetails.get(LAST_MODIFIED); | |
} | |
indexVersion = latestVersion; | |
this.file = new File(copy2Dir, saveAs); | |
this.fileOutputStream = new FileOutputStream(file); | |
this.fileChannel = this.fileOutputStream.getChannel(); | |
if (includeChecksum) | |
checksum = new Adler32(); | |
} | |
/** | |
* The main method which downloads file | |
*/ | |
void fetchFile() throws Exception { | |
try { | |
while (true) { | |
final FastInputStream is = getStream(); | |
int result; | |
try { | |
//fetch packets one by one in a single request | |
result = fetchPackets(is); | |
if (result == 0 || result == NO_CONTENT) { | |
// if the file is downloaded properly set the | |
// timestamp same as that in the server | |
if (file.exists() && lastmodified > 0) | |
file.setLastModified(lastmodified); | |
return; | |
} | |
//if there is an error continue. But continue from the point where it got broken | |
} finally { | |
IOUtils.closeQuietly(is); | |
} | |
} | |
} finally { | |
cleanup(); | |
//if cleanup suceeds . The file is downloaded fully. do an fsync | |
fsyncService.submit(new Runnable(){ | |
public void run() { | |
try { | |
FileUtils.sync(file); | |
} catch (IOException e) { | |
fsyncException = e; | |
} | |
} | |
}); | |
} | |
} | |
private int fetchPackets(FastInputStream fis) throws Exception { | |
byte[] intbytes = new byte[4]; | |
byte[] longbytes = new byte[8]; | |
try { | |
while (true) { | |
if (stop) { | |
stop = false; | |
aborted = true; | |
throw new ReplicationHandlerException("User aborted replication"); | |
} | |
long checkSumServer = -1; | |
fis.readFully(intbytes); | |
//read the size of the packet | |
int packetSize = readInt(intbytes); | |
if (packetSize <= 0) { | |
LOG.warn("No content recieved for file: " + currentFile); | |
return NO_CONTENT; | |
} | |
if (buf.length < packetSize) | |
buf = new byte[packetSize]; | |
if (checksum != null) { | |
//read the checksum | |
fis.readFully(longbytes); | |
checkSumServer = readLong(longbytes); | |
} | |
//then read the packet of bytes | |
fis.readFully(buf, 0, packetSize); | |
//compare the checksum as sent from the master | |
if (includeChecksum) { | |
checksum.reset(); | |
checksum.update(buf, 0, packetSize); | |
long checkSumClient = checksum.getValue(); | |
if (checkSumClient != checkSumServer) { | |
LOG.error("Checksum not matched between client and server for: " + currentFile); | |
//if checksum is wrong it is a problem return for retry | |
return 1; | |
} | |
} | |
//if everything is fine, write down the packet to the file | |
fileChannel.write(ByteBuffer.wrap(buf, 0, packetSize)); | |
bytesDownloaded += packetSize; | |
if (bytesDownloaded >= size) | |
return 0; | |
//errorcount is always set to zero after a successful packet | |
errorCount = 0; | |
} | |
} catch (ReplicationHandlerException e) { | |
throw e; | |
} catch (Exception e) { | |
LOG.warn("Error in fetching packets ", e); | |
//for any failure , increment the error count | |
errorCount++; | |
//if it fails for the same pacaket for MAX_RETRIES fail and come out | |
if (errorCount > MAX_RETRIES) { | |
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, | |
"Fetch failed for file:" + fileName, e); | |
} | |
return ERR; | |
} | |
} | |
/** | |
* The webcontainer flushes the data only after it fills the buffer size. So, all data has to be read as readFully() | |
* other wise it fails. So read everything as bytes and then extract an integer out of it | |
*/ | |
private int readInt(byte[] b) { | |
return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16) | |
| ((b[2] & 0xff) << 8) | (b[3] & 0xff)); | |
} | |
/** | |
* Same as above but to read longs from a byte array | |
*/ | |
private long readLong(byte[] b) { | |
return (((long) (b[0] & 0xff)) << 56) | (((long) (b[1] & 0xff)) << 48) | |
| (((long) (b[2] & 0xff)) << 40) | (((long) (b[3] & 0xff)) << 32) | |
| (((long) (b[4] & 0xff)) << 24) | ((b[5] & 0xff) << 16) | |
| ((b[6] & 0xff) << 8) | ((b[7] & 0xff)); | |
} | |
/** | |
* cleanup everything | |
*/ | |
private void cleanup() { | |
try { | |
//close the FileOutputStream (which also closes the Channel) | |
fileOutputStream.close(); | |
} catch (Exception e) {/* noop */ | |
LOG.error("Error closing the file stream: "+ this.saveAs ,e); | |
} | |
try { | |
post.releaseConnection(); | |
} catch (Exception e) { | |
} | |
if (bytesDownloaded != size) { | |
//if the download is not complete then | |
//delete the file being downloaded | |
try { | |
file.delete(); | |
} catch (Exception e) { | |
LOG.error("Error deleting file in cleanup" + e.getMessage()); | |
} | |
//if the failure is due to a user abort it is returned nomally else an exception is thrown | |
if (!aborted) | |
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, | |
"Unable to download " + fileName + " completely. Downloaded " | |
+ bytesDownloaded + "!=" + size); | |
} | |
} | |
/** | |
* Open a new stream using HttpClient | |
*/ | |
FastInputStream getStream() throws IOException { | |
post = new PostMethod(masterUrl); | |
//the method is command=filecontent | |
post.addParameter(COMMAND, CMD_GET_FILE); | |
//add the version to download. This is used to reserve the download | |
post.addParameter(CMD_INDEX_VERSION, indexVersion.toString()); | |
if (isConf) { | |
//set cf instead of file for config file | |
post.addParameter(CONF_FILE_SHORT, fileName); | |
} else { | |
post.addParameter(FILE, fileName); | |
} | |
if (useInternal) { | |
post.addParameter(COMPRESSION, "true"); | |
} | |
if (useExternal) { | |
post.setRequestHeader(new Header("Accept-Encoding", "gzip,deflate")); | |
} | |
//use checksum | |
if (this.includeChecksum) | |
post.addParameter(CHECKSUM, "true"); | |
//wt=filestream this is a custom protocol | |
post.addParameter("wt", FILE_STREAM); | |
// This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that | |
// the server starts from the offset | |
if (bytesDownloaded > 0) { | |
post.addParameter(OFFSET, "" + bytesDownloaded); | |
} | |
myHttpClient.executeMethod(post); | |
InputStream is = post.getResponseBodyAsStream(); | |
//wrap it using FastInputStream | |
if (useInternal) { | |
is = new InflaterInputStream(is); | |
} else if (useExternal) { | |
is = checkCompressed(post, is); | |
} | |
return new FastInputStream(is); | |
} | |
} | |
/* | |
* This is copied from CommonsHttpSolrServer | |
*/ | |
private InputStream checkCompressed(HttpMethod method, InputStream respBody) throws IOException { | |
Header contentEncodingHeader = method.getResponseHeader("Content-Encoding"); | |
if (contentEncodingHeader != null) { | |
String contentEncoding = contentEncodingHeader.getValue(); | |
if (contentEncoding.contains("gzip")) { | |
respBody = new GZIPInputStream(respBody); | |
} else if (contentEncoding.contains("deflate")) { | |
respBody = new InflaterInputStream(respBody); | |
} | |
} else { | |
Header contentTypeHeader = method.getResponseHeader("Content-Type"); | |
if (contentTypeHeader != null) { | |
String contentType = contentTypeHeader.getValue(); | |
if (contentType != null) { | |
if (contentType.startsWith("application/x-gzip-compressed")) { | |
respBody = new GZIPInputStream(respBody); | |
} else if (contentType.startsWith("application/x-deflate")) { | |
respBody = new InflaterInputStream(respBody); | |
} | |
} | |
} | |
} | |
return respBody; | |
} | |
static Integer readInterval(String interval) { | |
if (interval == null) | |
return null; | |
int result = 0; | |
if (interval != null) { | |
Matcher m = INTERVAL_PATTERN.matcher(interval.trim()); | |
if (m.find()) { | |
String hr = m.group(1); | |
String min = m.group(2); | |
String sec = m.group(3); | |
result = 0; | |
try { | |
if (sec != null && sec.length() > 0) | |
result += Integer.parseInt(sec); | |
if (min != null && min.length() > 0) | |
result += (60 * Integer.parseInt(min)); | |
if (hr != null && hr.length() > 0) | |
result += (60 * 60 * Integer.parseInt(hr)); | |
result *= 1000; | |
} catch (NumberFormatException e) { | |
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, | |
INTERVAL_ERR_MSG); | |
} | |
} else { | |
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, | |
INTERVAL_ERR_MSG); | |
} | |
} | |
return result; | |
} | |
public void destroy() { | |
if (executorService != null) executorService.shutdown(); | |
} | |
String getMasterUrl() { | |
return masterUrl; | |
} | |
String getPollInterval() { | |
return pollIntervalStr; | |
} | |
private static final int MAX_RETRIES = 5; | |
private static final int NO_CONTENT = 1; | |
private static final int ERR = 2; | |
public static final String REPLICATION_PROPERTIES = "replication.properties"; | |
public static final String POLL_INTERVAL = "pollInterval"; | |
public static final String INTERVAL_ERR_MSG = "The " + POLL_INTERVAL + " must be in this format 'HH:mm:ss'"; | |
private static final Pattern INTERVAL_PATTERN = Pattern.compile("(\\d*?):(\\d*?):(\\d*)"); | |
private static final String HTTP_CONN_TIMEOUT = "httpConnTimeout"; | |
private static final String HTTP_READ_TIMEOUT = "httpReadTimeout"; | |
private static final String HTTP_BASIC_AUTH_USER = "httpBasicAuthUser"; | |
private static final String HTTP_BASIC_AUTH_PASSWORD = "httpBasicAuthPassword"; | |
static final String INDEX_REPLICATED_AT = "indexReplicatedAt"; | |
static final String TIMES_INDEX_REPLICATED = "timesIndexReplicated"; | |
static final String CONF_FILES_REPLICATED = "confFilesReplicated"; | |
static final String CONF_FILES_REPLICATED_AT = "confFilesReplicatedAt"; | |
static final String TIMES_CONFIG_REPLICATED = "timesConfigReplicated"; | |
static final String LAST_CYCLE_BYTES_DOWNLOADED = "lastCycleBytesDownloaded"; | |
static final String TIMES_FAILED = "timesFailed"; | |
static final String REPLICATION_FAILED_AT = "replicationFailedAt"; | |
static final String PREVIOUS_CYCLE_TIME_TAKEN = "previousCycleTimeInSeconds"; | |
static final String INDEX_REPLICATED_AT_LIST = "indexReplicatedAtList"; | |
static final String REPLICATION_FAILED_AT_LIST = "replicationFailedAtList"; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment