Created
October 28, 2010 15:25
-
-
Save roycef/651575 to your computer and use it in GitHub Desktop.
Modified MapReduceServlet.java and accompanying configuration files to allow the start of Map Reduce tasks via HTTP GET
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<cronentries> | |
<url>/mapreduce/command/start_job?mapper_params.mapreduce.mapper.inputformat.datastoreinputformat.entitykind=Account&name=periodReset</url> | |
<description>Reset the period counters</description> | |
<schedule>every day 00:00</schedule> | |
<timezone>America/Chicago</timezone> | |
</cron> | |
</cronentries> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<configurations> | |
<configuration name="periodReset"> | |
<property> | |
<name>mapreduce.map.class</name> | |
<!-- Set this to be your Mapper class --> | |
<value>com.roycefullerton.mappers.PeriodResetMapper</value> | |
</property> | |
<!-- This is a default tool that lets us iterate over datastore entities --> | |
<property> | |
<name>mapreduce.inputformat.class</name> | |
<value>com.google.appengine.tools.mapreduce.DatastoreInputFormat</value> | |
</property> | |
<property> | |
<name human="Entity Kind to Map Over">mapreduce.mapper.inputformat.datastoreinputformat.entitykind</name> | |
<value template="optional">Account</value> | |
</property> | |
</configuration> | |
</configurations> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2010 Google Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
* | |
* Modified by Royce Fullerton on 2010.10.28 to allow the startup of Map Reduce tasks via HTTP GET | |
* in order to make it easier to run Map Reduce tasks from cron jobs. | |
* It is advised to put admin only security constraints for /mapreduce/* in your web.xml file | |
* | |
* | |
*/ | |
package com.google.appengine.tools.mapreduce; | |
import com.google.appengine.api.datastore.Cursor; | |
import com.google.appengine.api.datastore.DatastoreService; | |
import com.google.appengine.api.datastore.DatastoreServiceFactory; | |
import com.google.appengine.api.datastore.EntityNotFoundException; | |
import com.google.appengine.api.datastore.Key; | |
import com.google.appengine.api.labs.taskqueue.Queue; | |
import com.google.appengine.api.labs.taskqueue.TaskAlreadyExistsException; | |
import com.google.appengine.api.labs.taskqueue.TaskOptions; | |
import com.google.appengine.api.memcache.MemcacheServiceFactory; | |
import com.google.common.base.Preconditions; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.mapreduce.Counter; | |
import org.apache.hadoop.mapreduce.CounterGroup; | |
import org.apache.hadoop.mapreduce.Counters; | |
import org.apache.hadoop.mapreduce.InputFormat; | |
import org.apache.hadoop.mapreduce.InputSplit; | |
import org.apache.hadoop.mapreduce.JobID; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.OutputCommitter; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.RecordWriter; | |
import org.apache.hadoop.mapreduce.StatusReporter; | |
import org.apache.hadoop.mapreduce.TaskAttemptID; | |
import org.apache.hadoop.mapreduce.TaskID; | |
import org.apache.hadoop.mapreduce.Mapper.Context; | |
import org.json.JSONArray; | |
import org.json.JSONException; | |
import org.json.JSONObject; | |
import java.io.FileNotFoundException; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.OutputStream; | |
import java.lang.reflect.Constructor; | |
import java.lang.reflect.InvocationTargetException; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Set; | |
import java.util.TreeMap; | |
import java.util.logging.Level; | |
import java.util.logging.Logger; | |
import javax.servlet.http.HttpServlet; | |
import javax.servlet.http.HttpServletRequest; | |
import javax.servlet.http.HttpServletResponse; | |
/** | |
* Servlet for all MapReduce API related functions. | |
* | |
* This should be specified as the handler for MapReduce URLs in web.xml. | |
* For instance: | |
* <pre> | |
* {@code | |
* <servlet> | |
* <servlet-name>mapreduce</servlet-name> | |
* <servlet-class>com.google.appengine.tools.mapreduce.MapReduceServlet</servlet-class> | |
* </servlet> | |
* <servlet-mapping> | |
* <servlet-name>mapreduce</servlet-name> | |
* <url-pattern>/mapreduce/*</url-pattern> | |
* </servlet-mapping> | |
* } | |
* | |
* Generally you'll want this handler to be protected by an admin security constraint | |
* (see <a | |
* href="http://code.google.com/appengine/docs/java/config/webxml.html#Security_and_Authentication"> | |
* Security and Authentication</a>) | |
* for more details. | |
* </pre> | |
* | |
* @author frew@google.com (Fred Wulff) | |
*/ | |
public class MapReduceServlet extends HttpServlet { | |
/** | |
* | |
*/ | |
private static final int DEFAULT_JOBS_PER_PAGE_COUNT = 50; | |
private static final Logger log = Logger.getLogger(MapReduceServlet.class.getName()); | |
// Amount of time to spend on actual map() calls per task execution. | |
private static final int PROCESSING_TIME_PER_TASK_MS = 10000; | |
/** | |
* Default amount of quota to divvy up per controller execution. | |
*/ | |
public static final int DEFAULT_QUOTA_BATCH_SIZE = 20; | |
// VisibleForTesting | |
static final String CONTROLLER_PATH = "controllerCallback"; | |
static final String START_PATH = "start"; | |
static final String MAPPER_WORKER_PATH = "mapperCallback"; | |
static final String COMMAND_PATH = "command"; | |
// Command paths | |
static final String LIST_JOBS_PATH = "list_jobs"; | |
static final String LIST_CONFIGS_PATH = "list_configs"; | |
static final String CLEANUP_JOB_PATH = "cleanup_job"; | |
static final String ABORT_JOB_PATH = "abort_job"; | |
static final String GET_JOB_DETAIL_PATH = "get_job_detail"; | |
static final String START_JOB_PATH = "start_job"; | |
private DatastoreService ds = DatastoreServiceFactory.getDatastoreService(); | |
private Clock clock = new SystemClock(); | |
/** | |
* Returns the portion of the URL from the end of the TLD (exclusive) to the | |
* handler portion (exclusive). | |
* | |
* For example, getBase(https://www.google.com/foo/bar) -> /foo/ | |
* However, there are handler portions that take more than segment | |
* (currently only the command handlers). So in that case, we have: | |
* getBase(https://www.google.com/foo/command/bar) -> /foo/ | |
*/ | |
static String getBase(HttpServletRequest request) { | |
String fullPath = request.getRequestURI(); | |
int baseEnd = getDividingIndex(fullPath); | |
return fullPath.substring(0, baseEnd + 1); | |
} | |
/** | |
* Finds the index of the "/" separating the base from the handler. | |
*/ | |
private static int getDividingIndex(String fullPath) { | |
int baseEnd = fullPath.lastIndexOf("/"); | |
if (fullPath.substring(0, baseEnd).endsWith(COMMAND_PATH)) { | |
baseEnd = fullPath.substring(0, baseEnd).lastIndexOf("/"); | |
} | |
return baseEnd; | |
} | |
/** | |
* Returns the handler portion of the URL path. | |
* | |
* For example, getHandler(https://www.google.com/foo/bar) -> bar | |
* Note that for command handlers, | |
* getHandler(https://www.google.com/foo/command/bar) -> command/bar | |
*/ | |
static String getHandler(HttpServletRequest request) { | |
String requestURI = request.getRequestURI(); | |
return requestURI.substring(getDividingIndex(requestURI) + 1); | |
} | |
/** | |
* Checks to ensure that the current request was sent via the task queue. | |
* | |
* If the request is not in the task queue, returns false, and sets the | |
* response status code to 403. This protects against CSRF attacks against | |
* task queue-only handlers. | |
* | |
* @return true if the request is a task queue request | |
*/ | |
private boolean checkForTaskQueue(HttpServletRequest request, HttpServletResponse response) { | |
if (request.getHeader("X-AppEngine-QueueName") == null) { | |
log.log(Level.SEVERE, "Received unexpected non-task queue request. Possible CSRF attack."); | |
try { | |
response.sendError( | |
HttpServletResponse.SC_FORBIDDEN, "Received unexpected non-task queue request."); | |
} catch (IOException ioe) { | |
throw new RuntimeException("Encountered error writing error", ioe); | |
} | |
return false; | |
} | |
return true; | |
} | |
/** | |
* Checks to ensure that the current request was sent via an AJAX request. | |
* | |
* If the request was not sent by an AJAX request, returns false, and sets | |
* the response status code to 403. This protects against CSRF attacks against | |
* AJAX only handlers. | |
* | |
* @return true if the request is a task queue request | |
*/ | |
private boolean checkForAjax(HttpServletRequest request, HttpServletResponse response) { | |
if (!"XMLHttpRequest".equals(request.getHeader("X-Requested-With"))) { | |
log.log( | |
Level.SEVERE, "Received unexpected non-XMLHttpRequest command. Possible CSRF attack."); | |
try { | |
response.sendError(HttpServletResponse.SC_FORBIDDEN, | |
"Received unexpected non-XMLHttpRequest command."); | |
} catch (IOException ioe) { | |
throw new RuntimeException("Encountered error writing error", ioe); | |
} | |
return false; | |
} | |
return true; | |
} | |
/** | |
* Handles all MapReduce callbacks. | |
*/ | |
@Override | |
public void doPost(HttpServletRequest request, HttpServletResponse response) { | |
String handler = MapReduceServlet.getHandler(request); | |
if (handler.startsWith(CONTROLLER_PATH)) { | |
if (!checkForTaskQueue(request, response)) { | |
return; | |
} | |
handleController(request, response); | |
} else if (handler.startsWith(MAPPER_WORKER_PATH)) { | |
if (!checkForTaskQueue(request, response)) { | |
return; | |
} | |
handleMapperWorker(request, response); | |
} else if (handler.startsWith(START_PATH)) { | |
// We don't add a GET handler for this one, since we're expecting the user | |
// to POST the whole XML specification. | |
// TODO(frew): Make name customizable. | |
// TODO(frew): Add ability to specify a redirect. | |
handleStart( | |
ConfigurationXmlUtil.getConfigurationFromXml(request.getParameter("configuration")), | |
"Automatically run request", request); | |
} else if (handler.startsWith(COMMAND_PATH)) { | |
if (!checkForAjax(request, response)) { | |
return; | |
} | |
handleCommand(handler.substring(COMMAND_PATH.length() + 1), request, response); | |
} else { | |
throw new RuntimeException( | |
"Received an unknown MapReduce request handler. See logs for more detail."); | |
} | |
} | |
public void doGet(HttpServletRequest request, HttpServletResponse response) { | |
String handler = MapReduceServlet.getHandler(request); | |
if (handler.startsWith(COMMAND_PATH)) { | |
/*** removed for cron invocation - RJF | |
if (!checkForAjax(request, response)) { | |
return; | |
}**/ | |
handleCommand(handler.substring(COMMAND_PATH.length() + 1), request, response); | |
} else { | |
handleStaticResources(handler, response); | |
} | |
} | |
/** | |
* Handles all status page commands. | |
*/ | |
public void handleCommand( | |
String command, HttpServletRequest request, HttpServletResponse response) { | |
JSONObject retValue = null; | |
response.setContentType("application/json"); | |
boolean isPost = "POST".equals(request.getMethod()); | |
try { | |
if (command.equals(LIST_CONFIGS_PATH) && !isPost) { | |
MapReduceXml xml; | |
try { | |
xml = MapReduceXml.getMapReduceXmlFromFile(); | |
retValue = handleListConfigs(xml); | |
} catch (FileNotFoundException e) { | |
retValue = new JSONObject(); | |
retValue.put("status", "Couldn't find mapreduce.xml file"); | |
} | |
} else if (command.equals(LIST_JOBS_PATH) && !isPost) { | |
String cursor = request.getParameter("cursor"); | |
String countString = request.getParameter("count"); | |
int count = DEFAULT_JOBS_PER_PAGE_COUNT; | |
if (countString != null) { | |
count = Integer.parseInt(countString); | |
} | |
retValue = handleListJobs(cursor, count); | |
} else if (command.equals(CLEANUP_JOB_PATH) && isPost) { | |
retValue = handleCleanupJob(request.getParameter("mapreduce_id")); | |
} else if (command.equals(ABORT_JOB_PATH) && isPost) { | |
retValue = handleAbortJob(request.getParameter("mapreduce_id")); | |
} else if (command.equals(GET_JOB_DETAIL_PATH) && !isPost) { | |
retValue = handleGetJobDetail(request.getParameter("mapreduce_id")); | |
//} else if (command.equals(START_JOB_PATH) && isPost) { //removed for cron invocation - RJF | |
} else if (command.equals(START_JOB_PATH)) { | |
Map<String, String> templateParams = new TreeMap<String, String>(); | |
Map httpParams = request.getParameterMap(); | |
for (Object paramObject : httpParams.keySet()) { | |
String param = (String) paramObject; | |
if (param.startsWith("mapper_params.")) { | |
templateParams.put(param.substring("mapper_params.".length()), | |
((String[]) httpParams.get(param))[0]); | |
} | |
} | |
retValue = handleStartJob(templateParams, ((String []) httpParams.get("name"))[0], request); | |
} else { | |
response.sendError(404); | |
return; | |
} | |
} catch (Throwable t) { | |
log.log(Level.SEVERE, "Got exception while running command", t); | |
try { | |
retValue = new JSONObject(); | |
retValue.put("error_class", t.getClass().getName()); | |
retValue.put("error_message", | |
"Full stack trace is available in the server logs. Message: " | |
+ t.getMessage()); | |
} catch (JSONException e) { | |
throw new RuntimeException("Couldn't create error JSON object", e); | |
} | |
} | |
try { | |
response.getWriter().print(retValue.toString()); | |
response.getWriter().flush(); | |
} catch (IOException e) { | |
throw new RuntimeException("Couldn't write command response", e); | |
} | |
} | |
/** | |
* Handle the list_configs AJAX command. | |
*/ | |
public JSONObject handleListConfigs(MapReduceXml xml) { | |
JSONObject retValue = new JSONObject(); | |
JSONArray configArray = new JSONArray(); | |
Set<String> names = xml.getConfigurationNames(); | |
for (String name : names) { | |
String configXml = xml.getTemplateAsXmlString(name); | |
ConfigurationTemplatePreprocessor preprocessor = | |
new ConfigurationTemplatePreprocessor(configXml); | |
configArray.put(preprocessor.toJson(name)); | |
} | |
try { | |
retValue.put("configs", configArray); | |
} catch (JSONException e) { | |
throw new RuntimeException("Hard coded string is null"); | |
} | |
return retValue; | |
} | |
/** | |
* Handle the list_jobs AJAX command. | |
*/ | |
public JSONObject handleListJobs(String cursor, int count) { | |
List<MapReduceState> states = new ArrayList<MapReduceState>(); | |
Cursor newCursor = MapReduceState.getMapReduceStates(ds, cursor, count, states); | |
JSONArray jobs = new JSONArray(); | |
for (MapReduceState state : states) { | |
jobs.put(state.toJson(false)); | |
} | |
JSONObject retValue = new JSONObject(); | |
try { | |
retValue.put("jobs", jobs); | |
if (newCursor != null) { | |
retValue.put("cursor", newCursor.toWebSafeString()); | |
} | |
} catch (JSONException e) { | |
throw new RuntimeException("Hard coded string is null", e); | |
} | |
return retValue; | |
} | |
/** | |
* Handle the cleanup_job AJAX command. | |
*/ | |
public JSONObject handleCleanupJob(String jobId) { | |
JSONObject retValue = new JSONObject(); | |
try { | |
try { | |
MapReduceState.getMapReduceStateFromJobID(ds, JobID.forName(jobId)).delete(); | |
retValue.put("status", "Successfully deleted requested job."); | |
} catch (IllegalArgumentException e) { | |
retValue.put("status", "Couldn't find requested job."); | |
} catch (EntityNotFoundException e) { | |
retValue.put("status", "Couldn't find requested job."); | |
} | |
} catch (JSONException e) { | |
throw new RuntimeException("Hard coded string is null", e); | |
} | |
return retValue; | |
} | |
/** | |
* Handle the abort_job AJAX command. | |
*/ | |
public JSONObject handleAbortJob(String jobId) { | |
// TODO(frew): Implement | |
return new JSONObject(); | |
} | |
/** | |
* Handle the get_job_detail AJAX command. | |
*/ | |
public JSONObject handleGetJobDetail(String jobId) { | |
MapReduceState state; | |
try { | |
state = MapReduceState.getMapReduceStateFromJobID(ds, JobID.forName(jobId)); | |
} catch (EntityNotFoundException e) { | |
throw new IllegalArgumentException("Couldn't find MapReduce for id:" + jobId, e); | |
} | |
return state.toJson(true); | |
} | |
/** | |
* Handle the start_job AJAX command. | |
*/ | |
public JSONObject handleStartJob(Map<String, String> params, String name, | |
HttpServletRequest request) { | |
try { | |
MapReduceXml mrXml = MapReduceXml.getMapReduceXmlFromFile(); | |
Configuration configuration = mrXml.instantiateConfiguration(name, params); | |
// TODO(frew): What should we be doing here for error handling? | |
String jobId = handleStart(configuration, name, request); | |
JSONObject retValue = new JSONObject(); | |
try { | |
retValue.put("mapreduce_id", jobId); | |
} catch (JSONException e) { | |
throw new RuntimeException("Hard-coded string is null"); | |
} | |
return retValue; | |
} catch (FileNotFoundException e) { | |
throw new RuntimeException("Couldn't find mapreduce.xml", e); | |
} | |
} | |
/** | |
* Update the current MR state by aggregating information from shard states. | |
* | |
* @param mrState the current MR state | |
* @param shardStates all shard states (active and inactive) | |
*/ | |
public void aggregateState(MapReduceState mrState, List<ShardState> shardStates) { | |
List<Long> mapperCounts = new ArrayList<Long>(); | |
Counters counters = new Counters(); | |
for (ShardState shardState : shardStates) { | |
Counters shardCounters = shardState.getCounters(); | |
// findCounter creates the counter if it doesn't exist. | |
mapperCounts.add(shardCounters.findCounter( | |
HadoopCounterNames.MAP_INPUT_RECORDS_GROUP, | |
HadoopCounterNames.MAP_INPUT_RECORDS_NAME).getValue()); | |
for (CounterGroup shardCounterGroup : shardCounters) { | |
for (Counter shardCounter : shardCounterGroup) { | |
counters.findCounter( | |
shardCounterGroup.getName(), shardCounter.getName()).increment( | |
shardCounter.getValue()); | |
} | |
} | |
} | |
log.fine("Aggregated counters: " + counters); | |
mrState.setCounters(counters); | |
mrState.setProcessedCounts(mapperCounts); | |
} | |
/** | |
* Refills quotas for all active shards based on the input processing rate. | |
* | |
* @param context context to get input processing rate from | |
* @param mrState the MR state containing the last poll time | |
* @param activeShardStates all active shard states | |
*/ | |
public void refillQuotas(AppEngineJobContext context, MapReduceState mrState, | |
List<ShardState> activeShardStates) { | |
if (activeShardStates.size() == 0) { | |
return; | |
} | |
long lastPollTime = mrState.getLastPollTime(); | |
long currentPollTime = clock.currentTimeMillis(); | |
int inputProcessingRate = context.getInputProcessingRate(); | |
long totalQuotaRefill; | |
// Initial quota fill | |
if (lastPollTime == -1) { | |
totalQuotaRefill = inputProcessingRate; | |
} else { | |
long delta = currentPollTime - lastPollTime; | |
totalQuotaRefill = (long) (delta * inputProcessingRate / 1000.0); | |
} | |
long perShardQuotaRefill = totalQuotaRefill / activeShardStates.size(); | |
QuotaManager manager = new QuotaManager(MemcacheServiceFactory.getMemcacheService()); | |
for (ShardState activeShardState : activeShardStates) { | |
manager.put(activeShardState.getTaskAttemptID().toString(), perShardQuotaRefill); | |
} | |
mrState.setLastPollTime(currentPollTime); | |
} | |
/** | |
* Handles the logic for a controller task queue invocation. | |
* | |
* The primary jobs of the controller are to aggregate state from the | |
* MR workers (e.g. presenting an overall view of the counters), and to set | |
* the quota for MR workers. | |
*/ | |
public void handleController(HttpServletRequest request, HttpServletResponse response) { | |
AppEngineJobContext context = new AppEngineJobContext(request, false); | |
try { | |
List<ShardState> shardStates = ShardState.getShardStatesFromJobID( | |
ds, context.getJobID()); | |
MapReduceState mrState = MapReduceState.getMapReduceStateFromJobID( | |
ds, context.getJobID()); | |
List<ShardState> activeShardStates = selectActiveShards(shardStates); | |
aggregateState(mrState, shardStates); | |
mrState.setActiveShardCount(activeShardStates.size()); | |
mrState.setShardCount(shardStates.size()); | |
if (activeShardStates.size() == 0) { | |
mrState.setDone(); | |
} else { | |
refillQuotas(context, mrState, activeShardStates); | |
} | |
mrState.persist(); | |
if (MapReduceState.Status.ACTIVE.equals(mrState.getStatus())) { | |
scheduleController(request, context, context.getSliceNumber() + 1); | |
} else { | |
deleteAllShards(shardStates); | |
if (context.hasDoneCallback()) { | |
scheduleDoneCallback( | |
context.getDoneCallbackQueue(), context.getDoneCallbackUrl(), | |
context.getJobID().toString()); | |
} | |
} | |
} catch (EntityNotFoundException enfe) { | |
log.severe("Couldn't find the state for MapReduce: " + context.getJobID() | |
+ ". Aborting!"); | |
return; | |
} | |
} | |
private void scheduleDoneCallback(Queue queue, String url, String jobId) { | |
String taskName = ("done_callback" + jobId).replace('_', '-'); | |
try { | |
queue.add( | |
TaskOptions.Builder.method(TaskOptions.Method.POST) | |
.url(url) | |
.param("job_id", jobId) | |
.taskName(taskName)); | |
} catch (TaskAlreadyExistsException e) { | |
log.warning("Done callback task " + taskName + " already exists."); | |
} | |
} | |
private void deleteAllShards(List<ShardState> shardStates) { | |
List<Key> keys = new ArrayList<Key>(); | |
for (ShardState shardState : shardStates) { | |
keys.add(shardState.getKey()); | |
} | |
ds.delete(keys); | |
} | |
/** | |
* Return all shards with status == ACTIVE. | |
*/ | |
private List<ShardState> selectActiveShards(List<ShardState> shardStates) { | |
List<ShardState> activeShardStates = new ArrayList<ShardState>(); | |
for (ShardState shardState : shardStates) { | |
if (ShardState.Status.ACTIVE.equals(shardState.getStatus())) { | |
activeShardStates.add(shardState); | |
} | |
} | |
return activeShardStates; | |
} | |
/** | |
* Process one task invocation worth of | |
* {@link AppEngineMapper#map(Object, Object, org.apache.hadoop.mapreduce.Mapper.Context)} | |
* calls. Also handles calling | |
* {@link AppEngineMapper#taskSetup(org.apache.hadoop.mapreduce.Mapper.Context)} | |
* and | |
* {@link AppEngineMapper#taskCleanup(org.apache.hadoop.mapreduce.Mapper.Context)} | |
* before and after any map calls made. | |
* | |
* @return | |
* @throws IOException if the provided {@code mapper} throws such an exception | |
* during execution | |
* | |
* @throws InterruptedException if the provided {@code mapper} throws such an | |
* exception during execution | |
*/ | |
// VisibleForTesting | |
<INKEY,INVALUE,OUTKEY,OUTVALUE> | |
boolean processMapper( | |
AppEngineMapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper, | |
Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context context, | |
QuotaConsumer consumer, | |
long startTime) | |
throws IOException, InterruptedException { | |
boolean shouldShardContinue = true; | |
if (consumer.check(1)) { | |
mapper.taskSetup(context); | |
while (clock.currentTimeMillis() < startTime + PROCESSING_TIME_PER_TASK_MS | |
&& consumer.consume(1) | |
&& (shouldShardContinue = context.nextKeyValue())) { | |
mapper.map(context.getCurrentKey(), context.getCurrentValue(), context); | |
Counter inputRecordsCounter = context.getCounter( | |
HadoopCounterNames.MAP_INPUT_RECORDS_GROUP, | |
HadoopCounterNames.MAP_INPUT_RECORDS_NAME); | |
inputRecordsCounter.increment(1); | |
} | |
mapper.taskCleanup(context); | |
} else { | |
log.warning("No quota. Aborting!"); | |
} | |
return shouldShardContinue; | |
} | |
/** | |
* Does a single task queue invocation's worth of worker work. Also handles | |
* calling | |
* {@link AppEngineMapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)} | |
* and | |
* {@link AppEngineMapper#cleanup(org.apache.hadoop.mapreduce.Mapper.Context)} | |
* as appropriate. | |
*/ | |
@SuppressWarnings("unchecked") | |
public <INKEY,INVALUE,OUTKEY,OUTVALUE> | |
void handleMapperWorker(HttpServletRequest request, HttpServletResponse response) { | |
AppEngineJobContext jobContext = new AppEngineJobContext(request, false); | |
AppEngineTaskAttemptContext taskAttemptContext = new AppEngineTaskAttemptContext( | |
request, jobContext, ds); | |
long startTime = clock.currentTimeMillis(); | |
log.fine("Running worker: " + taskAttemptContext.getTaskAttemptID() + " " | |
+ jobContext.getSliceNumber()); | |
try { | |
AppEngineMapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = | |
(AppEngineMapper<INKEY,INVALUE,OUTKEY,OUTVALUE>) taskAttemptContext.getMapper(); | |
InputSplit split = taskAttemptContext.getInputSplit(); | |
RecordReader<INKEY, INVALUE> reader = | |
(RecordReader<INKEY,INVALUE>) taskAttemptContext.getRecordReader(split); | |
DatastorePersistingStatusReporter reporter = | |
new DatastorePersistingStatusReporter(taskAttemptContext.getShardState()); | |
AppEngineMapper.AppEngineContext context = getMapperContext( | |
taskAttemptContext, mapper, split, reader, reporter); | |
if (jobContext.getSliceNumber() == 0) { | |
// This is the first invocation for this mapper. | |
mapper.setup((Context) context); | |
} | |
QuotaConsumer consumer = getQuotaConsumer(taskAttemptContext); | |
boolean shouldContinue = processMapper(mapper, (Context) context, consumer, startTime); | |
if (shouldContinue) { | |
taskAttemptContext.getShardState().setRecordReader(jobContext.getConfiguration(), reader); | |
} else { | |
taskAttemptContext.getShardState().setDone(); | |
} | |
// This persists the shard state including the new record reader. | |
reporter.persist(); | |
consumer.dispose(); | |
if (shouldContinue) { | |
scheduleWorker( | |
request, jobContext, context.getTaskAttemptID(), jobContext.getSliceNumber() + 1); | |
} else { | |
// This is the last invocation for this mapper. | |
mapper.cleanup((Context) context); | |
} | |
} catch (IOException ioe) { | |
// TODO(frew): Currently all user errors result in retry. We should | |
// figure out some way to differentiate which should be fatal (or | |
// maybe just have a memcache counter for each shard that causes us | |
// to bail on repeated errors). | |
throw new RuntimeException(ioe); | |
} catch (SecurityException e) { | |
throw new RuntimeException( | |
"MapReduce framework doesn't have permission to instantiate classes.", e); | |
} catch (InvocationTargetException e) { | |
throw new RuntimeException("Got exception instantiating Mapper.Context", e); | |
} catch (InterruptedException e) { | |
throw new RuntimeException( | |
"Got InterruptedException running Mapper. This should never happen.", e); | |
} | |
} | |
/** | |
* Get the QuotaConsumer for current shard. | |
*/ | |
private QuotaConsumer getQuotaConsumer(AppEngineTaskAttemptContext taskAttemptContext) { | |
QuotaManager manager = new QuotaManager(MemcacheServiceFactory.getMemcacheService()); | |
QuotaConsumer consumer = new QuotaConsumer( | |
manager, taskAttemptContext.getTaskAttemptID().toString(), DEFAULT_QUOTA_BATCH_SIZE); | |
return consumer; | |
} | |
/** | |
* Get the mapper context for the current shard. Since there is currently | |
* no reducer support, the output values are currently set to {@code null}. | |
* | |
* @return the newly initialized context | |
* @throws InvocationTargetException if the constructor throws an exception | |
*/ | |
@SuppressWarnings("unchecked") | |
private <INKEY, INVALUE, OUTKEY, OUTVALUE> | |
AppEngineMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>.AppEngineContext getMapperContext( | |
AppEngineTaskAttemptContext taskAttemptContext, | |
AppEngineMapper<INKEY, INVALUE, OUTKEY, OUTVALUE> mapper, | |
InputSplit split, | |
RecordReader<INKEY, INVALUE> reader, | |
StatusReporter reporter) throws InvocationTargetException { | |
Constructor<AppEngineMapper.AppEngineContext> contextConstructor; | |
try { | |
contextConstructor = AppEngineMapper.AppEngineContext.class.getConstructor( | |
new Class[]{ | |
AppEngineMapper.class, | |
Configuration.class, | |
TaskAttemptID.class, | |
RecordReader.class, | |
RecordWriter.class, | |
OutputCommitter.class, | |
StatusReporter.class, | |
InputSplit.class | |
} | |
); | |
AppEngineMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>.AppEngineContext context = | |
contextConstructor.newInstance( | |
mapper, | |
taskAttemptContext.getConfiguration(), | |
taskAttemptContext.getTaskAttemptID(), | |
reader, | |
null, /* not yet implemented */ | |
null, /* not yet implemented */ | |
reporter, | |
split | |
); | |
return context; | |
} catch (SecurityException e) { | |
// Since we know the class we're calling, this is strictly a programming error. | |
throw new RuntimeException("Couldn't initialize Mapper.Context", e); | |
} catch (NoSuchMethodException e) { | |
// Same | |
throw new RuntimeException("Couldn't initialize Mapper.Context", e); | |
} catch (IllegalArgumentException e) { | |
// There's a small chance this could be a bad supplied argument, | |
// but we should validate that earlier. | |
throw new RuntimeException("Couldn't initialize Mapper.Context", e); | |
} catch (InstantiationException e) { | |
// Programming error | |
throw new RuntimeException("Couldn't initialize Mapper.Context", e); | |
} catch (IllegalAccessException e) { | |
// Programming error | |
throw new RuntimeException("Couldn't initialize Mapper.Context", e); | |
} | |
} | |
// VisibleForTesting | |
void setClock(Clock clock) { | |
this.clock = clock; | |
} | |
/** | |
* Handle the initial request to start the MapReduce. | |
* | |
* @return the JobID of the newly created MapReduce or {@code null} if the | |
* MapReduce couldn't be created. | |
*/ | |
public String handleStart(Configuration conf, String name, HttpServletRequest request) { | |
AppEngineJobContext context = new AppEngineJobContext(conf, request, true); | |
// Initialize InputSplits | |
Class<? extends InputFormat<?, ?>> inputFormatClass; | |
try { | |
inputFormatClass = context.getInputFormatClass(); | |
} catch (ClassNotFoundException e) { | |
throw new InvalidConfigurationException("Invalid input format class specified.", e); | |
} | |
InputFormat<?, ?> inputFormat; | |
try { | |
inputFormat = inputFormatClass.newInstance(); | |
} catch (InstantiationException e) { | |
throw new InvalidConfigurationException( | |
"Input format class must have a default constructor.", e); | |
} catch (IllegalAccessException e) { | |
throw new InvalidConfigurationException( | |
"Input format class must have a visible constructor.", e); | |
} | |
List<InputSplit> splits; | |
try { | |
splits = inputFormat.getSplits(context); | |
} catch (InterruptedException e) { | |
throw new RuntimeException( | |
"Thread got interrupted in a single-threaded environment. This shouldn't happen.", e); | |
} catch (IOException e) { | |
throw new RuntimeException( | |
"Got an IOException while trying to make splits", e); | |
} | |
MapReduceState mrState = MapReduceState.generateInitializedMapReduceState( | |
ds, name, context.getJobID(), System.currentTimeMillis()); | |
mrState.setConfigurationXML( | |
ConfigurationXmlUtil.convertConfigurationToXml( | |
context.getConfiguration())); | |
// Abort if we don't have any splits | |
if (splits == null || splits.size() == 0) { | |
mrState.setDone(); | |
mrState.persist(); | |
return null; | |
} | |
mrState.persist(); | |
scheduleController(request, context, 0); | |
scheduleShards(request, context, inputFormat, splits); | |
return mrState.getJobID(); | |
} | |
/** | |
* Schedules a controller task queue invocation. | |
* | |
* @param req the current request | |
* @param context this MR's job context | |
* @param sliceNumber a counter that increments for each sequential, successful | |
* task queue invocation | |
*/ | |
// VisibleForTesting | |
void scheduleController(HttpServletRequest req, AppEngineJobContext context, int sliceNumber) { | |
String taskName = ("controller_" + context.getJobID() + "__" + sliceNumber).replace('_', '-'); | |
try { | |
context.getControllerQueue().add( | |
TaskOptions.Builder.method(TaskOptions.Method.POST) | |
.url(getBase(req) + CONTROLLER_PATH) | |
.param(AppEngineJobContext.JOB_ID_PARAMETER_NAME, context.getJobID().toString()) | |
.param(AppEngineJobContext.SLICE_NUMBER_PARAMETER_NAME, "" + sliceNumber) | |
.countdownMillis(2000) | |
.taskName(taskName)); | |
} catch (TaskAlreadyExistsException e) { | |
log.warning("Controller task " + taskName + " already exists."); | |
} | |
} | |
/** | |
* Schedules a worker task on the appropriate queue. | |
* | |
* @param req the current servlet request | |
* @param context the context for this MR job | |
* @param taskAttemptId the task attempt ID for this worker | |
* @param sliceNumber a counter that increments for each sequential, successful | |
* task queue invocation | |
*/ | |
// VisibleForTesting | |
void scheduleWorker(HttpServletRequest req, AppEngineJobContext context, | |
TaskAttemptID taskAttemptId, int sliceNumber) { | |
Preconditions.checkArgument( | |
context.getJobID().equals(taskAttemptId.getJobID()), | |
"Worker task must be for this MR job"); | |
String taskName = ("worker_" + taskAttemptId + "__" + sliceNumber).replace('_', '-'); | |
try { | |
context.getWorkerQueue().add( | |
TaskOptions.Builder.method(TaskOptions.Method.POST) | |
.url(getBase(req) + MAPPER_WORKER_PATH) | |
.param(AppEngineTaskAttemptContext.TASK_ATTEMPT_ID_PARAMETER_NAME, | |
"" + taskAttemptId) | |
.param(AppEngineJobContext.JOB_ID_PARAMETER_NAME, "" + taskAttemptId.getJobID()) | |
.param(AppEngineJobContext.SLICE_NUMBER_PARAMETER_NAME, "" + sliceNumber) | |
.taskName(taskName)); | |
} catch (TaskAlreadyExistsException e) { | |
log.warning("Worker task " + taskName + " already exists."); | |
} | |
} | |
/** | |
* Schedules the initial worker callback execution for all shards. | |
* | |
* @param req the current request | |
* @param context this MR's context | |
* @param format the input format to use for generating {@code RecordReader}s | |
* from the {@code InputSplit}s | |
* @param splits all input splits for this MR | |
*/ | |
// VisibleForTesting | |
void scheduleShards(HttpServletRequest req, AppEngineJobContext context, | |
InputFormat<?,?> format, List<InputSplit> splits) { | |
// TODO(frew): To make life easy for people using InputFormats | |
// from general Hadoop, we should add support for grouping | |
// InputFormats that generate many splits into a reasonable | |
// number of shards. | |
// TODO(frew): We will pass along the configuration so that worker tasks | |
// don't have to read the MapReduceState whenever task queue supports | |
// reasonable size payloads. | |
int i = 0; | |
for (InputSplit split : splits) { | |
Configuration conf = context.getConfiguration(); | |
TaskAttemptID taskAttemptId = new TaskAttemptID( | |
new TaskID(context.getJobID(), true, i), 1); | |
ShardState shardState = ShardState.generateInitializedShardState(ds, taskAttemptId); | |
shardState.setInputSplit(conf, split); | |
AppEngineTaskAttemptContext attemptContext = new AppEngineTaskAttemptContext( | |
context, shardState, taskAttemptId); | |
try { | |
RecordReader<?,?> reader = format.createRecordReader(split, attemptContext); | |
shardState.setRecordReader(conf, reader); | |
} catch (IOException e) { | |
throw new RuntimeException( | |
"Got an IOException creating a record reader.", e); | |
} catch (InterruptedException e) { | |
throw new RuntimeException( | |
"Got an interrupted exception in a single threaded environment.", e); | |
} | |
shardState.persist(); | |
scheduleWorker(req, context, taskAttemptId, 0); | |
i++; | |
} | |
} | |
/** | |
* Handle serving of static resources (which we do dynamically so users | |
* only have to add one entry to their web.xml). | |
*/ | |
public void handleStaticResources(String handler, HttpServletResponse response) { | |
String fileName = null; | |
if (handler.equals("status")) { | |
response.setContentType("text/html"); | |
fileName = "overview.html"; | |
} else if (handler.equals("detail")) { | |
response.setContentType("text/html"); | |
fileName = "detail.html"; | |
} else if (handler.equals("base.css")) { | |
response.setContentType("text/css"); | |
fileName = "base.css"; | |
} else if (handler.equals("jquery.js")) { | |
response.setContentType("text/javascript"); | |
fileName = "jquery-1.4.2.min.js"; | |
} else if (handler.equals("status.js")) { | |
response.setContentType("text/javascript"); | |
fileName = "status.js"; | |
} else { | |
try { | |
response.sendError(404); | |
} catch (IOException e) { | |
throw new RuntimeException("Encountered error sending 404", e); | |
} | |
return; | |
} | |
response.setHeader("Cache-Control", "public; max-age=300"); | |
try { | |
InputStream resourceStream = MapReduceServlet.class.getResourceAsStream( | |
"/com/google/appengine/tools/mapreduce/" + fileName); | |
OutputStream responseStream = response.getOutputStream(); | |
byte[] buffer = new byte[1024]; | |
int bytesRead; | |
while ((bytesRead = resourceStream.read(buffer)) != -1) { | |
responseStream.write(buffer, 0, bytesRead); | |
} | |
responseStream.flush(); | |
} catch (FileNotFoundException e) { | |
throw new RuntimeException("Couldn't find static file for MapReduce library", e); | |
} catch (IOException e) { | |
throw new RuntimeException("Couldn't read static file for MapReduce library", e); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment