Skip to content

Instantly share code, notes, and snippets.

@jardineworks
Last active June 15, 2020 23:28
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save jardineworks/c4e73bfcc770dd17fbc0 to your computer and use it in GitHub Desktop.
dev.life | Parallel Processing
SECTION #0: Introduction and Presentation
=========================================
(use slideshow)
SECTION #1: Creating a "Report Request" portlet plugin
=======================================================
1. Create a new MVC portlet plugin project (in eclipse) called "dev-life-report"
2. Create a new class com.jardineworks.dev.life.liferay.portlet.report.ReportRequestPortlet in the src/main/java folder with the following definition.
package com.jardineworks.dev.life.liferay.portlet.report;
import java.io.IOException;
import javax.portlet.ActionRequest;
import javax.portlet.ActionResponse;
import javax.portlet.PortletException;
import javax.portlet.ProcessAction;
import javax.portlet.RenderRequest;
import javax.portlet.RenderResponse;
import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.util.ParamUtil;
import com.liferay.util.bridges.mvc.MVCPortlet;
/**
* <p>
* Implementation class for the ReportRequest portlet. This class is used to process requests sent to the Report Request Portlet.
* </p>
*
* @author andrew jardine | jardine works inc.
*/
public class ReportRequestPortlet extends MVCPortlet
{
// class logger
private static final Log _log = LogFactoryUtil.getLog(ReportRequestPortlet.class);
// form parameters
public static final String PARAM_REPORT_NAME = "reportName";
public static final String PARAM_REPORT_SECTION_SUMMARY = "reportSectionSummary";
public static final String PARAM_REPORT_SECTION_COSTS = "reportSectionCosts";
public static final String PARAM_REPORT_SECTION_ROI = "reportSectionROI";
public static final String PARAM_REPORT_SECTION_METRICS = "reportSectionMetrics";
public static final String PARAM_REPORT_SECTION_RECOMMENDATIONS = "reportSectionRecommendations";
/**
* Handler for the RENDER phase of the portlet lifecycle.
*
* @param renderRequest {@link javax.portlet.RenderRequest}
* @param renderResponse {@link javax.portlet.RenderResponse}
*/
@Override
public void render(RenderRequest renderRequest, RenderResponse renderResponse) throws PortletException, IOException
{
super.render(renderRequest, renderResponse);
}
/**
* Handler for the ACTION phase of the portlet lifecycle.
*
* @param renderRequest {@link javax.portlet.ActionRequest}
* @param renderResponse {@link javax.portlet.ActionResponse}
*/
@ProcessAction(name = "requestReport")
public void requestReport(ActionRequest actionRequest, ActionResponse actionResponse)
{
if ( _log.isInfoEnabled() )
_log.info("Report request recevied.");
String reportName = ParamUtil.getString(actionRequest, PARAM_REPORT_NAME, null);
boolean summary = ParamUtil.getBoolean(actionRequest, PARAM_REPORT_SECTION_SUMMARY);
boolean costs = ParamUtil.getBoolean(actionRequest, PARAM_REPORT_SECTION_COSTS);
boolean roi = ParamUtil.getBoolean(actionRequest, PARAM_REPORT_SECTION_ROI);
boolean metrics = ParamUtil.getBoolean(actionRequest, PARAM_REPORT_SECTION_METRICS);
boolean recommendations = ParamUtil.getBoolean(actionRequest, PARAM_REPORT_SECTION_RECOMMENDATIONS);
try
{
// create the report request here
}
catch (Exception e)
{
_log.error("Error occurred while trying to save request for report with id " + reportId, e);
}
if ( _log.isInfoEnabled() )
_log.info("Report request " + reportId + " processed.");
}
}
3. Update the portlet.xml file with the following definition.
<?xml version="1.0"?>
<portlet-app xmlns="http://java.sun.com/xml/ns/portlet/portlet-app_2_0.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/portlet/portlet-app_2_0.xsd http://java.sun.com/xml/ns/portlet/portlet-app_2_0.xsd" version="2.0">
<portlet>
<portlet-name>dev-liferay-report-portlet</portlet-name>
<display-name>Report Request Portlet</display-name>
<portlet-class>com.jardineworks.dev.life.liferay.portlet.report.ReportRequestPortlet</portlet-class>
<init-param>
<name>view-template</name>
<value>/html/report-request/view.jsp</value>
</init-param>
<expiration-cache>0</expiration-cache>
<supports>
<mime-type>text/html</mime-type>
</supports>
<portlet-info>
<title>Report Request</title>
<short-title>Report Request</short-title>
<keywords>reporting, request</keywords>
</portlet-info>
<security-role-ref>
<role-name>administrator</role-name>
</security-role-ref>
<security-role-ref>
<role-name>guest</role-name>
</security-role-ref>
<security-role-ref>
<role-name>power-user</role-name>
</security-role-ref>
<security-role-ref>
<role-name>user</role-name>
</security-role-ref>
</portlet>
</portlet-app>
4. Update the liferay-display.xml found in the WEB-INF folder with the following
<?xml version="1.0"?>
<!DOCTYPE display PUBLIC "-//Liferay//DTD Display 6.2.0//EN" "http://www.liferay.com/dtd/liferay-display_6_2_0.dtd">
<display>
<category name="category.dev.life">
<portlet id="dev-liferay-report-portlet" />
</category>
</display>
5. Create the following folder structure under webapp /html/common and /html/report-request
6. Add the JSTL libraries and taglibs to the project using the liferay-plugin-package.properties file
7. Inside the /html/common create a file called init.jsp with the following content
<%@ taglib uri="http://java.sun.com/portlet_2_0" prefix="portlet" %>
<%@ taglib uri="http://alloy.liferay.com/tld/aui" prefix="aui" %>
<%@ taglib uri="http://liferay.com/tld/ui" prefix="liferay-ui" %>
<%@ taglib uri="http://liferay.com/tld/util" prefix="liferay-util" %>
<%@ taglib uri="http://liferay.com/tld/theme" prefix="theme" %>
<%@ taglib uri="http://liferay.com/tld/portlet" prefix="liferay-portlet" %>
<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c" %>
<%@ taglib uri="http://java.sun.com/jsp/jstl/functions" prefix="fn" %>
<%@ taglib uri="http://java.sun.com/jsp/jstl/fmt" prefix="fmt" %>
<%@ page import="javax.portlet.PortletPreferences"%>
<%@ page import="com.liferay.portlet.PortletPreferencesFactoryUtil"%>
<%@ page import="com.liferay.portal.kernel.util.Validator"%>
<%@ page import="com.liferay.portal.kernel.util.ParamUtil"%>
<%@ page import="com.liferay.portal.kernel.util.GetterUtil"%>
<%@ page import="com.liferay.portal.kernel.util.StringPool"%>
<%@ page import="com.liferay.portal.kernel.util.Constants"%>
<%@ page import="java.util.List" %>
<%@ page import="java.util.Map" %>
<%@ page import="java.util.ArrayList" %>
<%@ page import="java.util.Arrays" %>
<%@ page import="java.util.Iterator" %>
<%@ page import="com.liferay.portal.kernel.log.LogFactoryUtil" %>
<%@ page import="com.liferay.portal.kernel.log.Log" %>
<%@page import="com.liferay.portlet.PortletURLFactoryUtil"%>
<%@page import="com.liferay.portal.kernel.portlet.LiferayPortletURL"%>
<%@page import="com.liferay.portal.kernel.portlet.LiferayPortlet"%>
<%@page import="com.liferay.portal.service.LayoutLocalServiceUtil"%>
<%@page import="com.liferay.portal.model.Layout"%>
<portlet:defineObjects />
<theme:defineObjects/>
<%
PortletPreferences preferences = renderRequest.getPreferences();
String portletResource = ParamUtil.getString( request, "portletResource" );
if ( Validator.isNotNull( portletResource ) )
{
preferences = PortletPreferencesFactoryUtil.getPortletSetup( request, portletResource );
}
String redirect = ParamUtil.getString( request, "redirect" );
%>
8. Inside /html/request-portlet create a file called init.jsp with the following content.
9. Move the view.jsp from the webapp root to the /html/request-portlet and update the content with the following
<%@ include file="init.jsp" %>
<portlet:actionURL name="requestReport" var="reportRequestActionURL" />
<aui:form name="fm" method="post" action="${ reportRequestActionURL }">
<aui:fieldset>
<aui:input name="reportName" />
</aui:fieldset>
<aui:fieldset>
<aui:input type="checkbox" name="reportSections" value="summary" label="report-section-summary" inlineLabel="true" />
<aui:input type="checkbox" name="reportSections" value="costs" label="report-section-costs" inlineLabel="true" />
<aui:input type="checkbox" name="reportSections" value="roi" label="report-section-roi" inlineLabel="true" />
<aui:input type="checkbox" name="reportSections" value="metrics" label="report-section-metrics" inlineLabel="true" />
<aui:input type="checkbox" name="reportSections" value="recommendations" label="report-section-recommendations" inlineLabel="true" />
</aui:fieldset>
<aui:button-row>
<aui:button type="submit" value="report-request-submit" />
</aui:button-row>
</aui:form>
10. Create a new package/class com.jardineworks.dev.life.liferay.portlet.report.util.ReportWebKeys with the following definition
package com.jardineworks.dev.life.liferay.portlet.report.util;
/**
* <p>
* Constants class used to store attribute names for values that are stored in request or response objects, sessions, etc.
* </p>
*
* @author andrew jardine | jardine works inc.
*/
public class ReportWebKeys
{
public static final String NEXT_REPORT_ID = "nextReportId";
}
11. Create a new package/class com.jardineworks.dev.life.liferay.portlet.report.util.ReportStatus with the following definition
package com.jardineworks.dev.life.liferay.portlet.report.util;
/**
* <p>
* Constants used to represent the various status for the jobs.
* </p>
*
* @author andrew jardine | jardine works inc.
*/
public class ReportStatus
{
public static final String STATUS_PENDING = "0";
public static final String STATUS_PROCESSING = "1";
public static final String STATUS_COMPLETE = "2";
public static final String STATUS_ERROR = "-1";
}
12. Now we need to add a service builder definition so that we can save the requests somewhere. In the WEB-INF folder create a new file called service.xml with the following definition.
<!DOCTYPE service-builder PUBLIC "-//Liferay//DTD Service Builder 6.2.0//EN" "http://www.liferay.com/dtd/liferay-service-builder_6_2_0.dtd">
<service-builder package-path="com.jardineworks.dev.life.liferay.portlet.report">
<namespace>DEVLIFE</namespace>
<entity name="Report" uuid="true" local-service="true" remote-service="false">
<!-- PK fields -->
<column name="reportId" type="long" primary="true" />
<!-- Group instance -->
<column name="groupId" type="long" />
<!-- Audit fields -->
<column name="companyId" type="long" />
<column name="userId" type="long" />
<column name="userName" type="String" />
<column name="createDate" type="Date" />
<column name="modifiedDate" type="Date" />
<!-- report fields -->
<column name="reportName" type="String" />
<column name="reportStatus" type="String" />
<column name="startTime" type="Date" />
<column name="endTime" type="Date" />
<!-- report section details -->
<column name="sectionSummary" type="boolean" />
<column name="sectionSummaryStatus" type="String" />
<column name="sectionCosts" type="boolean" />
<column name="sectionCostsStatus" type="String" />
<column name="sectionROI" type="boolean" />
<column name="sectionROIStatus" type="String" />
<column name="sectionMetrics" type="boolean" />
<column name="sectionMetricsStatus" type="String" />
<column name="sectionRecommendations" type="boolean" />
<column name="sectionRecommendationsStatus" type="String" />
<!-- Order -->
<order by="asc">
<order-column name="reportId" />
</order>
<!-- Finder methods -->
<finder name="ReportStatus" return-type="Collection">
<finder-column name="reportStatus" />
</finder>
</entity>
</service-builder>
13. Run the maven build-service goal to generate the source.
14. Update the ReportRequestPortlet class with the following definition --
package com.jardineworks.dev.life.liferay.portlet.report;
import java.io.IOException;
import java.util.Date;
import javax.portlet.ActionRequest;
import javax.portlet.ActionResponse;
import javax.portlet.PortletException;
import javax.portlet.ProcessAction;
import javax.portlet.RenderRequest;
import javax.portlet.RenderResponse;
import com.jardineworks.dev.life.liferay.portlet.report.model.Report;
import com.jardineworks.dev.life.liferay.portlet.report.service.ReportLocalServiceUtil;
import com.jardineworks.dev.life.liferay.portlet.report.util.ReportStatus;
import com.liferay.counter.service.CounterLocalServiceUtil;
import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.util.ParamUtil;
import com.liferay.portal.util.PortalUtil;
import com.liferay.util.bridges.mvc.MVCPortlet;
/**
* <p>
* Implementation class for the ReportRequest portlet. This class is used to process requests sent to the Report Request Portlet.
* </p>
*
* @author andrew jardine | jardine works inc.
*/
public class ReportRequestPortlet extends MVCPortlet
{
// class logger
private static final Log _log = LogFactoryUtil.getLog(ReportRequestPortlet.class);
// form parameters
public static final String PARAM_REPORT_NAME = "reportName";
public static final String PARAM_REPORT_SECTION_SUMMARY = "reportSectionSummary";
public static final String PARAM_REPORT_SECTION_COSTS = "reportSectionCosts";
public static final String PARAM_REPORT_SECTION_ROI = "reportSectionROI";
public static final String PARAM_REPORT_SECTION_METRICS = "reportSectionMetrics";
public static final String PARAM_REPORT_SECTION_RECOMMENDATIONS = "reportSectionRecommendations";
/**
* Handler for the RENDER phase of the portlet lifecycle.
*
* @param renderRequest {@link javax.portlet.RenderRequest}
* @param renderResponse {@link javax.portlet.RenderResponse}
*/
@Override
public void render(RenderRequest renderRequest, RenderResponse renderResponse) throws PortletException, IOException
{
super.render(renderRequest, renderResponse);
}
/**
* Handler for the ACTION phase of the portlet lifecycle.
*
* @param renderRequest {@link javax.portlet.ActionRequest}
* @param renderResponse {@link javax.portlet.ActionResponse}
*/
@ProcessAction(name = "requestReport")
public void requestReport(ActionRequest actionRequest, ActionResponse actionResponse)
{
if ( _log.isInfoEnabled() )
_log.info("Report request recevied.");
String reportName = ParamUtil.getString(actionRequest, PARAM_REPORT_NAME, null);
boolean summary = ParamUtil.getBoolean(actionRequest, PARAM_REPORT_SECTION_SUMMARY);
boolean costs = ParamUtil.getBoolean(actionRequest, PARAM_REPORT_SECTION_COSTS);
boolean roi = ParamUtil.getBoolean(actionRequest, PARAM_REPORT_SECTION_ROI);
boolean metrics = ParamUtil.getBoolean(actionRequest, PARAM_REPORT_SECTION_METRICS);
boolean recommendations = ParamUtil.getBoolean(actionRequest, PARAM_REPORT_SECTION_RECOMMENDATIONS);
try
{
// create the report request here
Report r = ReportLocalServiceUtil.createReport(CounterLocalServiceUtil.increment(Report.class.getCanonicalName()));
// report details
r.setReportName(reportName);
r.setSectionSummary(summary);
r.setSectionCosts(costs);
r.setSectionROI(roi);
r.setSectionMetrics(metrics);
r.setSectionRecommendations(recommendations);
// auditing fields
r.setUserId( PortalUtil.getUserId(actionRequest));
r.setUserName(PortalUtil.getUser(actionRequest).getEmailAddress());
r.setCompanyId(PortalUtil.getCompanyId(actionRequest));
r.setGroupId(PortalUtil.getScopeGroupId(actionRequest));
r.setCreateDate(new Date());
r.setModifiedDate(new Date());
// set the initial status
r.setReportStatus(ReportStatus.STATUS_PENDING);
ReportLocalServiceUtil.addReport(r);
}
catch (Exception e)
{
_log.error("Error occurred while trying to save request for report with name " + reportName, e);
}
if ( _log.isInfoEnabled() )
_log.info("Report request " + reportName + " processed.");
}
}
15. Deploy the portlet.
16. Create a few records and validate in the database.
SECTION #2: Adding a Scheduled Task
====================================
1. Create a new package/class com.jardineworks.dev.life.liferay.portlet.report.messaging.ReportJobMessageListener with the following definition
package com.jardineworks.dev.life.liferay.portlet.report.messaging;
import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageListener;
import com.liferay.portal.kernel.messaging.MessageListenerException;
/**
* <p>
* Implementation class of message listener handler for "report" scheduled jobs that are fired.
* </p>
*
* @author andrew jardine | jardine works inc.
*/
public class ReportMessageListener implements MessageListener
{
// class logger
private static final Log _log = LogFactoryUtil.getLog(ReportMessageListener.class);
/**
* Main entry point for the message listener.
* @param {@link Message} object containing the details being send from invoker to handler.
*/
@Override
public void receive(Message message) throws MessageListenerException
{
try
{
_doReceive(message);
}
catch (Exception e)
{
_log.error("Problem occurred while trying to process message for detination: " + message.getDestinationName());
}
}
/**
* Delegate method used to orchestrate and or perform the listener actions.
* @param message {@link Message} object containing the details being send from invoker to handler.
*/
private void _doReceive(Message message)
{
// in here we can do whatever we need to but for the purposes of this example we'll just dump something
// to the log so that we know that we got this far into the process.
if (_log.isInfoEnabled())
_log.info("Starting to process message for " + this.getClass().getCanonicalName());
}
}
2. Open the liferay-portlet.xml file found in WEB-INF and update it with the following
<?xml version="1.0"?>
<!DOCTYPE liferay-portlet-app PUBLIC "-//Liferay//DTD Portlet Application 6.2.0//EN" "http://www.liferay.com/dtd/liferay-portlet-app_6_2_0.dtd">
<liferay-portlet-app>
<portlet>
<portlet-name>dev-liferay-report-portlet</portlet-name>
<icon>/icon.png</icon>
<scheduler-entry>
<scheduler-description>Message Listener used to process report requests as a job on a timed interval.</scheduler-description>
<scheduler-event-listener-class>com.jardineworks.dev.life.liferay.portlet.report.messaging.ReportMessageListener</scheduler-event-listener-class>
<trigger>
<simple>
<simple-trigger-value>1</simple-trigger-value>
<time-unit>minute</time-unit>
</simple>
</trigger>
</scheduler-entry>
<header-portlet-css>/css/main.css</header-portlet-css>
<footer-portlet-javascript>/js/main.js</footer-portlet-javascript>
</portlet>
<role-mapper>
<role-name>administrator</role-name>
<role-link>Administrator</role-link>
</role-mapper>
<role-mapper>
<role-name>guest</role-name>
<role-link>Guest</role-link>
</role-mapper>
<role-mapper>
<role-name>power-user</role-name>
<role-link>Power User</role-link>
</role-mapper>
<role-mapper>
<role-name>user</role-name>
<role-link>User</role-link>
</role-mapper>
</liferay-portlet-app>
3. We're going to update a couple of portal-ext properties to make sure that we don't run out of characters for our names. Open the file, add the following properties and retart the server.
## ---------------------------------------------------------------------
## Scheduler configuration
##
#
# Set this to false to disable all scheduler classes defined in
# liferay-portlet.xml.
#
scheduler.enabled=true
#
# Set the maximum length of description, group name, and job name fields.
#
scheduler.description.max.length=255
scheduler.group.name.max.length=255
scheduler.job.name.max.length255
4. Deploy the portlet and validate that the INFO message from the _doReceive is invoked on a one minute interval.
5. For this demo we know we're going to be calling a service on a separate tomcat process. To do this we're using the Restlet API so let's add a maven repository and dependency.
<repositories>
<repository>
<id>maven-restlet</id>
<name>Public online Restlet repository</name>
<url>http://maven.restlet.com</url>
</repository>
</repositories>
<dependency>
<groupId>org.restlet.jse</groupId>
<artifactId>org.restlet</artifactId>
<version>2.1.2</version>
</dependency>
6. Now we'll update the message listener to actually process reports. We don't want to have to manually create the reports to test it out so this code will include a couple of methods you probably won't see in the wild. We'll use these methods to generate the reports for us prior to starting the processing. Update the message listener with the following code.
package com.jardineworks.dev.life.liferay.portlet.report.messaging;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;
import org.restlet.Client;
import org.restlet.Request;
import org.restlet.Response;
import org.restlet.data.Method;
import org.restlet.data.Protocol;
import org.restlet.data.Reference;
import com.jardineworks.dev.life.liferay.portlet.report.model.Report;
import com.jardineworks.dev.life.liferay.portlet.report.service.ReportLocalServiceUtil;
import com.jardineworks.dev.life.liferay.portlet.report.util.ReportStatus;
import com.liferay.counter.service.CounterLocalServiceUtil;
import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageListener;
import com.liferay.portal.kernel.messaging.MessageListenerException;
/**
* <p>
* Implementation class of message listener handler for "report" scheduled jobs that are fired.
* </p>
*
* @author andrew jardine | jardine works inc.
*/
public class ReportMessageListener implements MessageListener
{
// class logger
private static final Log _log = LogFactoryUtil.getLog(ReportMessageListener.class);
/**
* Main entry point for the message listener.
*
* @param {@link Message} object containing the details being send from invoker to handler.
*/
@Override
public void receive(Message message) throws MessageListenerException
{
try
{
_doReceive(message);
}
catch (Exception e)
{
_log.error("Problem occurred while trying to process message for detination: " + message.getDestinationName());
}
}
/**
* Delegate method used to orchestrate and or perform the listener actions.
*
* @param message
* {@link Message} object containing the details being send from invoker to handler.
*/
private void _doReceive(Message message)
{
// in here we can do whatever we need to but for the purposes of this example we'll just dump something
// to the log so that we know that we got this far into the process.
if (_log.isInfoEnabled())
_log.info("Starting to process message for " + this.getClass().getCanonicalName());
long processStart = 0L;
long processEnd = 0L;
long start = 0L;
long end = 0L;
try
{
// make some sample reports
List<Report> reports = _makeReports(5);
if ( _log.isInfoEnabled() )
_log.info("------------------------[ START ]-------------------------------");
processStart = System.currentTimeMillis();
for (Report r : reports)
{
if (r.getReportStatus() == ReportStatus.STATUS_PENDING)
{
if ( _log.isInfoEnabled() )
_log.info("Processing report " + r.getReportName() + "..." );
start = System.currentTimeMillis();
r.setReportStatus(ReportStatus.STATUS_PROCESSING);
ReportLocalServiceUtil.updateReport(r);
boolean result = _process(r);
if (result)
r.setReportStatus(ReportStatus.STATUS_COMPLETE);
else
r.setReportStatus(ReportStatus.STATUS_ERROR);
ReportLocalServiceUtil.updateReport(r);
end = System.currentTimeMillis();
if ( _log.isInfoEnabled() )
_log.info(r.getReportName() + " done in " + ((end - start) / 1000) + " seconds.");
}
}
processEnd = System.currentTimeMillis();
if ( _log.isInfoEnabled() )
_log.info("------------------------[ END ]-------------------------------");
}
catch (Exception e)
{
_log.error("Error occurred while processing reports.", e);
}
if ( _log.isInfoEnabled() )
_log.info("Total run time: " + ((processEnd - processStart) / 1000) );
}
/**
* Method used to fabricate reports for testing purposes only.
*
* @param makeCount
* integer representing the number of sample reports to be created.
*/
private List<Report> _makeReports(int makeCount)
{
List<Report> reports = new ArrayList<Report>(makeCount);
Random random = new Random();
for (int i = 0; i < makeCount; ++i)
{
try
{
// create the report request here
Report r = ReportLocalServiceUtil.createReport(CounterLocalServiceUtil.increment(Report.class.getCanonicalName()));
// report details
r.setReportName("TestReport" + r.getReportId());
if ( _log.isInfoEnabled() )
_log.info("Making sample report " + r.getReportName() );
// set the report sections to be created. For this case we want all reports
// to be set the same so that we can validate timing. To do this we use
// static settings -- but to simulate a real variant load we can use the
// same method but instead of static use random.nextBoolean() in the setter
r.setSectionSummary(true);
r.setSectionCosts(true);
r.setSectionROI(true);
r.setSectionMetrics(true);
r.setSectionRecommendations(true);
// auditing fields -- don't worry about some of the others for demo purposes
r.setCreateDate(new Date());
r.setModifiedDate(new Date());
// set the initial status
r.setReportStatus(ReportStatus.STATUS_PENDING);
ReportLocalServiceUtil.addReport(r);
reports.add(r);
}
catch (Exception e)
{
_log.error("Failed to add test report. => " + e.getMessage() + " :: " + e.getCause());
}
}
return reports;
}
/**
* Processes the report passed to the method.
*
* @param report
* {@link Report} object representing the report to be processed.
* @return boolean value of true if the report is processed without error, othewise false is returned.
*/
private boolean _process(Report report)
{
boolean result = Boolean.FALSE;
try
{
Client client = new Client(Protocol.HTTP);
Reference reference = new Reference();
reference.setProtocol(Protocol.HTTP);
reference.setHostDomain("localhost");
reference.setHostPort(9090);
reference.setPath("/dev-life-rest-service/rest/report/process/124/section/summary");
Request request = new Request(Method.GET, reference);
if ( _log.isDebugEnabled() )
_log.info("request: " + request.toString());
Response response = client.handle(request);
if ( _log.isDebugEnabled() )
_log.info("response: " + response.getEntity().getText());
result = Boolean.TRUE;
}
catch (Exception e)
{
_log.error("Error occurred while processing report. Report will be considered a failure.", e);
}
return result;
}
}
7. Deploy the portlet and obseve the log.
Notice that our job runs every 1 minute, creates 5 new jobs, and processes for a period of 25 seconds. We know that the service will pause for a 5 second interval and we can see that the jobs are processes sequentially so 5 jobs @ 5 seconds per job means 25 seconds processing time. Now in some applications this might be perfectly acceptable. In the business case that I reference to demonstrate parallel processing it was not. There could be dozens if now hundreds of jobs in the queue with business users waiting for results. Sequential processing meant that some people could end up waiting hours, or even days for their results. So the first step towards a better solution is being able to process these individual reports, as part of the scheduled job, in parallel on independent threads.
SECTION #3: Parallel Report Generation
=======================================
Paralle report generation means breaking this down a little further. We still have (and need) our master job that is triggered every minute to determine which reports need to be processed, and probaby to impose some sort of a batch count based on our threading model/calculations.
1. First, let's add these properties to our portal-ext.properties file
##
## Parallel Report Jobs Settings
##
# Parallel destination settings for Report CREATE messages
devlife.scheduler.report.create.destination.name=devlife/report_create
devlife.scheduler.report.create.jobs.min=5
devlife.scheduler.report.create.jobs.max=5
devlife.scheduler.report.create.queue.max=2147483647
2. Now restart your server to have these setting available.
3. Create a new package/class com.jardineworks.dev.life.liferay.portlet.report.util.ReportPropsKeys with the following definition
package com.jardineworks.dev.life.liferay.portlet.report.util;
/**
* <p>
* Constants class containing the key names used for properties file configuration settings.
* </p>
*
* @author andrew jardine | jardine works inc.
*/
public class ReportPropsKeys
{
public static final String REPORT_CREATE_DESTINATION_NAME = "devlife.scheduler.report.create.destination.name";
public static final String REPORT_CREATE_JOBS_MIN = "devlife.scheduler.report.create.jobs.min";
public static final String REPORT_CREATE_JOBS_MAX = "devlife.scheduler.report.create.jobs.max";
public static final String REPORT_CREATE_QUEUE_MAX = "devlife.scheduler.report.create.queue.max";
}
4. Create a new package/class com.jardineworks.dev.life.liferay.portlet.report.util.ReportPropsValues with the following definition
package com.jardineworks.dev.life.liferay.portlet.report.util;
import com.liferay.portal.kernel.util.GetterUtil;
import com.liferay.portal.kernel.util.PropsUtil;
/**
* <p>
* Constants class containing the values for the properties file configuration settings.
* </p>
*
* @author andrew jardine | jardine works inc.
*/
public class ReportPropsValues
{
public static final String REPORT_CREATE_DESTINATION_NAME = PropsUtil.get(ReportPropsKeys.REPORT_CREATE_DESTINATION_NAME);
public static final int REPORT_CREATE_JOBS_MIN = GetterUtil.getInteger(PropsUtil.get(ReportPropsKeys.REPORT_CREATE_JOBS_MIN));
public static final int REPORT_CREATE_JOBS_MAX = GetterUtil.getInteger(PropsUtil.get(ReportPropsKeys.REPORT_CREATE_JOBS_MAX));
public static final int REPORT_CREATE_QUEUE_MAX = GetterUtil.getInteger(PropsUtil.get(ReportPropsKeys.REPORT_CREATE_QUEUE_MAX));
}
5. Now, to process our reports in parallel we're going to create a message listener whose job is to process reports. Create another new package/class com.jardineworks.dev.life.liferay.portlet.report.messaging.CreateReportMessageListener with the following definition.
package com.jardineworks.dev.life.liferay.portlet.report.messaging;
import org.restlet.Client;
import org.restlet.Request;
import org.restlet.Response;
import org.restlet.data.Method;
import org.restlet.data.Protocol;
import org.restlet.data.Reference;
import com.jardineworks.dev.life.liferay.portlet.report.model.Report;
import com.jardineworks.dev.life.liferay.portlet.report.service.ReportLocalServiceUtil;
import com.jardineworks.dev.life.liferay.portlet.report.util.ReportStatus;
import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageListener;
import com.liferay.portal.kernel.messaging.MessageListenerException;
import com.liferay.portal.kernel.util.GetterUtil;
import com.liferay.portal.kernel.util.Validator;
/**
* <p>
* Implementation class of message listener handler for jobs that are used to create reports.
* </p>
*
* @author andrew jardine | jardine works inc.
*/
public class CreateReportMessageListener implements MessageListener
{
// class logger
private static final Log _log = LogFactoryUtil.getLog(CreateReportMessageListener.class);
/**
* Main entry point for the message listener.
*
* @param {@link Message} object containing the details being send from invoker to handler.
*/
@Override
public void receive(Message message) throws MessageListenerException
{
try
{
_doReceive(message);
}
catch (Exception e)
{
_log.error("Problem occurred while trying to process message for detination: " + message.getDestinationName());
}
}
/**
* Delegate method used to orchestrate and or perform the listener actions.
*
* @param message
* {@link Message} object containing the details being send from invoker to handler.
*/
private void _doReceive(Message message)
{
// in here we can do whatever we need to but for the purposes of this example we'll just dump something
// to the log so that we know that we got this far into the process.
if (_log.isInfoEnabled())
_log.info("Starting to process message for " + this.getClass().getCanonicalName());
long start = 0L;
long end = 0L;
try
{
Report r = _getReport( GetterUtil.getLong(message.get("reportId")) );
if ( Validator.isNotNull(r) && r.getReportStatus() == ReportStatus.STATUS_PENDING)
{
if ( _log.isInfoEnabled() )
_log.info("Processing report " + r.getReportName() + "..." );
start = System.currentTimeMillis();
r.setReportStatus(ReportStatus.STATUS_PROCESSING);
ReportLocalServiceUtil.updateReport(r);
boolean result = _process(r);
if (result)
r.setReportStatus(ReportStatus.STATUS_COMPLETE);
else
r.setReportStatus(ReportStatus.STATUS_ERROR);
ReportLocalServiceUtil.updateReport(r);
end = System.currentTimeMillis();
if ( _log.isInfoEnabled() )
_log.info(r.getReportName() + " done in " + ((end - start) / 1000) + " seconds.");
}
}
catch (Exception e)
{
_log.error("Error occurred while processing reports.", e);
}
}
/**
* Get the report from the database based on the reportId provided.
* @param reportId long containing the unique identifier for a report.
* @return {@link Report} object representing the report to be processed.
*/
private Report _getReport( long reportId )
{
try
{
if ( _log.isInfoEnabled())
_log.info("Retrieving report with ID " + reportId );
return ReportLocalServiceUtil.getReport(reportId);
}
catch( Exception e )
{
_log.error("Failed to retrieve report with id: " + reportId, e);
}
return null;
}
/**
* Processes the report passed to the method.
*
* @param report
* {@link Report} object representing the report to be processed.
* @return boolean value of true if the report is processed without error, othewise false is returned.
*/
private boolean _process(Report report)
{
boolean result = Boolean.FALSE;
try
{
Client client = new Client(Protocol.HTTP);
Reference reference = new Reference();
reference.setProtocol(Protocol.HTTP);
reference.setHostDomain("localhost");
reference.setHostPort(9090);
reference.setPath("/dev-life-rest-service/rest/report/process/124/section/summary");
Request request = new Request(Method.GET, reference);
if ( _log.isDebugEnabled() )
_log.info("request: " + request.toString());
Response response = client.handle(request);
if ( _log.isDebugEnabled() )
_log.info("response: " + response.getEntity().getText());
result = Boolean.TRUE;
}
catch (Exception e)
{
_log.error("Error occurred while processing report. Report will be considered a failure.", e);
}
return result;
}
}
6. Since we're delegating the work of "creating reports" to another message listner, we need to update the original ReportMessageListener class to no longer perform the creation task, but rather orchestrate the jobs. Update the ReportMessageListener class with the following code
package com.jardineworks.dev.life.liferay.portlet.report.messaging;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;
import com.jardineworks.dev.life.liferay.portlet.report.model.Report;
import com.jardineworks.dev.life.liferay.portlet.report.service.ReportLocalServiceUtil;
import com.jardineworks.dev.life.liferay.portlet.report.util.ReportPropsValues;
import com.jardineworks.dev.life.liferay.portlet.report.util.ReportStatus;
import com.liferay.counter.service.CounterLocalServiceUtil;
import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageBusUtil;
import com.liferay.portal.kernel.messaging.MessageListener;
import com.liferay.portal.kernel.messaging.MessageListenerException;
import com.liferay.portal.kernel.messaging.ParallelDestination;
/**
* <p>
* Implementation class of message listener handler for "report" scheduled jobs that are fired.
* </p>
*
* @author andrew jardine | jardine works inc.
*/
public class ReportMessageListener implements MessageListener
{
// class logger
private static final Log _log = LogFactoryUtil.getLog(ReportMessageListener.class);
/**
* Main entry point for the message listener.
*
* @param {@link Message} object containing the details being send from invoker to handler.
*/
@Override
public void receive(Message message) throws MessageListenerException
{
try
{
_doReceive(message);
}
catch (Exception e)
{
_log.error("Problem occurred while trying to process message for detination: " + message.getDestinationName());
}
}
/**
* Delegate method used to orchestrate and or perform the listener actions.
*
* @param message
* {@link Message} object containing the details being send from invoker to handler.
*/
private void _doReceive(Message message)
{
// in here we can do whatever we need to but for the purposes of this example we'll just dump something
// to the log so that we know that we got this far into the process.
if (_log.isInfoEnabled())
_log.info("Starting to process message for " + this.getClass().getCanonicalName());
long start = 0L;
long end = 0L;
try
{
// make some sample reports
List<Report> reports = _makeReports(5);
for (Report r : reports)
{
if (r.getReportStatus() == ReportStatus.STATUS_PENDING)
{
_sendJob(r);
if ( _log.isInfoEnabled() )
_log.info("Job for reportId " + r.getReportId() + " sent.");
}
}
}
catch (Exception e)
{
_log.error("Error occurred while processing reports.", e);
}
}
/**
* Method used to fabricate reports for testing purposes only.
*
* @param makeCount
* integer representing the number of sample reports to be created.
*/
private List<Report> _makeReports(int makeCount)
{
List<Report> reports = new ArrayList<Report>(makeCount);
Random random = new Random();
for (int i = 0; i < makeCount; ++i)
{
try
{
// create the report request here
Report r = ReportLocalServiceUtil.createReport(CounterLocalServiceUtil.increment(Report.class.getCanonicalName()));
// report details
r.setReportName("TestReport" + r.getReportId());
if (_log.isInfoEnabled())
_log.info("Making sample report " + r.getReportName());
// set the report sections to be created. For this case we want all reports
// to be set the same so that we can validate timing. To do this we use
// static settings -- but to simulate a real variant load we can use the
// same method but instead of static use random.nextBoolean() in the setter
r.setSectionSummary(true);
r.setSectionCosts(true);
r.setSectionROI(true);
r.setSectionMetrics(true);
r.setSectionRecommendations(true);
// auditing fields -- don't worry about some of the others for demo purposes
r.setCreateDate(new Date());
r.setModifiedDate(new Date());
// set the initial status
r.setReportStatus(ReportStatus.STATUS_PENDING);
ReportLocalServiceUtil.addReport(r);
reports.add(r);
}
catch (Exception e)
{
_log.error("Failed to add test report. => " + e.getMessage() + " :: " + e.getCause());
}
}
return reports;
}
/**
* Creates the message and puts the message on the bus to be received by the listener.
*
* @param report
* {@link Report} object that is being referenced in the message.
*/
private void _sendJob(Report report)
{
// check to see if our destination is registered. If not, register it on the fly using
// the values we configured in our portal-ext.
if (!MessageBusUtil.hasMessageListener(ReportPropsValues.REPORT_CREATE_DESTINATION_NAME))
{
if ( _log.isInfoEnabled() )
_log.info("Creating parallel destination for report create listener...");
ParallelDestination destination = new ParallelDestination();
destination.setName(ReportPropsValues.REPORT_CREATE_DESTINATION_NAME);
destination.setWorkersCoreSize(ReportPropsValues.REPORT_CREATE_JOBS_MIN);
destination.setWorkersMaxSize(ReportPropsValues.REPORT_CREATE_JOBS_MAX);
destination.setMaximumQueueSize(ReportPropsValues.REPORT_CREATE_QUEUE_MAX);
destination.afterPropertiesSet();
MessageBusUtil.addDestination(destination);
MessageBusUtil.registerMessageListener(ReportPropsValues.REPORT_CREATE_DESTINATION_NAME, new CreateReportMessageListener());
if ( _log.isInfoEnabled() )
_log.info("Destination added.");
}
// build the message
Message message = new Message();
message.put("reportId", report.getReportId());
message.setDestinationName(ReportPropsValues.REPORT_CREATE_DESTINATION_NAME);
// place it onto the bus
MessageBusUtil.sendMessage(ReportPropsValues.REPORT_CREATE_DESTINATION_NAME, message);
}
}
7. The big change here is registering a new destination on the fly if it is destected as missing. Apart from that, all "processing" is offloaded to a separate listener that runs asynchronously (Parallel Destination) on it's own thread.
8. Deploy and check the results. You should now process all the reports in about 5 seconds because they are running in parallel.
SECTION #4: Parallel Report Section Generation
===============================================
We've shown how to produce reports themselves in parallel, but it's possible to go further. Imagine a scenario where the data that goes into the report is actuall aggregated from several different services. We're producing the reports as a whole in parallel but within each report we enter into the same scenario where we have the individual sections being generated synchronously. First let's create this scenario.
1. We're going to re-use our dummy service for this case so let's abstract it out into a utility. Create a package/classs com.jardineworks.dev.life.liferay.portlet.report.util.ReportRESTServiceUtil with the following definition
package com.jardineworks.dev.life.liferay.portlet.report.util;
import org.restlet.Client;
import org.restlet.Request;
import org.restlet.Response;
import org.restlet.data.Method;
import org.restlet.data.Protocol;
import org.restlet.data.Reference;
import com.jardineworks.dev.life.liferay.portlet.report.model.Report;
import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
/**
* <p>
* Utility class used to store commonly used methods for Reporting functionality.
* </p>
*
* @author andrew jardine | jardine works inc.
*/
public class ReportRESTServiceUtil
{
// class logger
private static final Log _log = LogFactoryUtil.getLog(ReportRESTServiceUtil.class);
public static String getResponse( Report report )
{
try
{
if ( _log.isInfoEnabled() )
_log.info("Processing section...");
Client client = new Client(Protocol.HTTP);
Reference reference = new Reference();
reference.setProtocol(Protocol.HTTP);
reference.setHostDomain("localhost");
reference.setHostPort(9090);
reference.setPath("/dev-life-rest-service/rest/report/process/124/section/summary");
Request request = new Request(Method.GET, reference);
if ( _log.isDebugEnabled() )
_log.info("request: " + request.toString());
Response response = client.handle(request);
if ( _log.isDebugEnabled() )
_log.info("response: " + response.getEntity().getText());
return response.getEntity().getText();
}
catch (Exception e)
{
_log.error("Error occurred while processing report. Report will be considered a failure.", e);
}
return null;
}
}
2. Now create the "slow report generation" by updating the CreateReportMessageListener class to use the following definition.
package com.jardineworks.dev.life.liferay.portlet.report.messaging;
import org.restlet.Client;
import org.restlet.Request;
import org.restlet.Response;
import org.restlet.data.Method;
import org.restlet.data.Protocol;
import org.restlet.data.Reference;
import com.jardineworks.dev.life.liferay.portlet.report.model.Report;
import com.jardineworks.dev.life.liferay.portlet.report.service.ReportLocalServiceUtil;
import com.jardineworks.dev.life.liferay.portlet.report.util.ReportRESTServiceUtil;
import com.jardineworks.dev.life.liferay.portlet.report.util.ReportStatus;
import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageListener;
import com.liferay.portal.kernel.messaging.MessageListenerException;
import com.liferay.portal.kernel.util.GetterUtil;
import com.liferay.portal.kernel.util.Validator;
/**
* <p>
* Implementation class of message listener handler for jobs that are used to create reports.
* </p>
*
* @author andrew jardine | jardine works inc.
*/
public class CreateReportMessageListener implements MessageListener
{
// class logger
private static final Log _log = LogFactoryUtil.getLog(CreateReportMessageListener.class);
/**
* Main entry point for the message listener.
*
* @param {@link Message} object containing the details being send from invoker to handler.
*/
@Override
public void receive(Message message) throws MessageListenerException
{
try
{
_doReceive(message);
}
catch (Exception e)
{
_log.error("Problem occurred while trying to process message for detination: " + message.getDestinationName());
}
}
/**
* Delegate method used to orchestrate and or perform the listener actions.
*
* @param message
* {@link Message} object containing the details being send from invoker to handler.
*/
private void _doReceive(Message message)
{
// in here we can do whatever we need to but for the purposes of this example we'll just dump something
// to the log so that we know that we got this far into the process.
if (_log.isInfoEnabled())
_log.info("Starting to process message for " + this.getClass().getCanonicalName());
long start = 0L;
long end = 0L;
try
{
Report r = _getReport( GetterUtil.getLong(message.get("reportId")) );
if ( Validator.isNotNull(r) && r.getReportStatus() == ReportStatus.STATUS_PENDING)
{
if ( _log.isInfoEnabled() )
_log.info("Processing report " + r.getReportName() + "..." );
start = System.currentTimeMillis();
r.setReportStatus(ReportStatus.STATUS_PROCESSING);
ReportLocalServiceUtil.updateReport(r);
boolean result = _process(r);
if (result)
r.setReportStatus(ReportStatus.STATUS_COMPLETE);
else
r.setReportStatus(ReportStatus.STATUS_ERROR);
ReportLocalServiceUtil.updateReport(r);
end = System.currentTimeMillis();
if ( _log.isInfoEnabled() )
_log.info(r.getReportName() + " done in " + ((end - start) / 1000) + " seconds.");
}
}
catch (Exception e)
{
_log.error("Error occurred while processing reports.", e);
}
}
/**
* Get the report from the database based on the reportId provided.
* @param reportId long containing the unique identifier for a report.
* @return {@link Report} object representing the report to be processed.
*/
private Report _getReport( long reportId )
{
try
{
if ( _log.isInfoEnabled())
_log.info("Retrieving report with ID " + reportId );
return ReportLocalServiceUtil.getReport(reportId);
}
catch( Exception e )
{
_log.error("Failed to retrieve report with id: " + reportId, e);
}
return null;
}
/**
* Processes the report passed to the method.
*
* @param report
* {@link Report} object representing the report to be processed.
* @return boolean value of true if the report is processed without error, othewise false is returned.
*/
private boolean _process(Report report)
{
boolean result = Boolean.FALSE;
try
{
if ( report.getSectionSummary() )
ReportRESTServiceUtil.getResponse(report);
if (report.getSectionROI() )
ReportRESTServiceUtil.getResponse(report);
if ( report.getSectionRecommendations())
ReportRESTServiceUtil.getResponse(report);
if (report.getSectionMetrics())
ReportRESTServiceUtil.getResponse(report);
if (report.getSectionCosts())
ReportRESTServiceUtil.getResponse(report);
}
catch (Exception e)
{
_log.error("Error occurred while processing report. Report will be considered a failure.", e);
}
return result;
}
}
3. Deploy the code and validate that we have parallel report generation, but that we're back to a 25s window based on synchronous section generation.
4. Now we're simply going to repeate the same process we did for creating reports in parallel. We're going to create a listener for each of the sections and then have the create job orchestrate their creation. So lets start with our properties file. Add the following properties to the portal-ext and restart your server.
# Parallel destination settings for Report SECTION messages
devlife.scheduler.report.section.jobs.min=5
devlife.scheduler.report.section.jobs.max=5
devlife.scheduler.report.section.queue.max=2147483647
5. Make a copy of the CreateReportListener class, rename it to ReportSectionSummaryMessageListener and make the definition the following.
package com.jardineworks.dev.life.liferay.portlet.report.messaging;
import com.jardineworks.dev.life.liferay.portlet.report.model.Report;
import com.jardineworks.dev.life.liferay.portlet.report.service.ReportLocalServiceUtil;
import com.jardineworks.dev.life.liferay.portlet.report.util.ReportRESTServiceUtil;
import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageListener;
import com.liferay.portal.kernel.messaging.MessageListenerException;
import com.liferay.portal.kernel.util.GetterUtil;
/**
* <p>
* Implementation class of message listener handler for jobs that are used to create reports.
* </p>
*
* @author andrew jardine | jardine works inc.
*/
public class ReportSectionSummaryMessageListener implements MessageListener
{
// class logger
private static final Log _log = LogFactoryUtil.getLog(ReportSectionSummaryMessageListener.class);
/**
* Main entry point for the message listener.
*
* @param {@link Message} object containing the details being send from invoker to handler.
*/
@Override
public void receive(Message message) throws MessageListenerException
{
try
{
_doReceive(message);
}
catch (Exception e)
{
_log.error("Problem occurred while trying to process message for detination: " + message.getDestinationName());
}
}
/**
* Delegate method used to orchestrate and or perform the listener actions.
*
* @param message
* {@link Message} object containing the details being send from invoker to handler.
*/
private void _doReceive(Message message)
{
// in here we can do whatever we need to but for the purposes of this example we'll just dump something
// to the log so that we know that we got this far into the process.
if (_log.isInfoEnabled())
_log.info("Starting to process message for " + this.getClass().getCanonicalName());
try
{
String section = GetterUtil.getString(message.get("section"));
long reportId = GetterUtil.getLong(message.get("reportId"));
Report r = _getReport( reportId );
if ( _log.isInfoEnabled() )
_log.info("Fecthing " + section + " for reportId " + reportId);
ReportRESTServiceUtil.getResponse(r);
}
catch (Exception e)
{
_log.error("Error occurred while processing section.", e);
}
}
/**
* Get the report from the database based on the reportId provided.
* @param reportId long containing the unique identifier for a report.
* @return {@link Report} object representing the report to be processed.
*/
private Report _getReport( long reportId )
{
try
{
if ( _log.isInfoEnabled())
_log.info("Retrieving report with ID " + reportId );
return ReportLocalServiceUtil.getReport(reportId);
}
catch( Exception e )
{
_log.error("Failed to retrieve report with id: " + reportId, e);
}
return null;
}
}
6. Update the ReportPropsKeys class to include our new keys we added to the portal-ext
package com.jardineworks.dev.life.liferay.portlet.report.util;
/**
* <p>
* Constants class containing the key names used for properties file configuration settings.
* </p>
*
* @author andrew jardine | jardine works inc.
*/
public class ReportPropsKeys
{
public static final String REPORT_CREATE_DESTINATION_NAME = "devlife.scheduler.report.create.destination.name";
public static final String REPORT_CREATE_JOBS_MIN = "devlife.scheduler.report.create.jobs.min";
public static final String REPORT_CREATE_JOBS_MAX = "devlife.scheduler.report.create.jobs.max";
public static final String REPORT_CREATE_QUEUE_MAX = "devlife.scheduler.report.create.queue.max";
// notice that the destination is not here because we build section destinations dynamically based on the section name.
public static final String REPORT_SECTION_JOBS_MIN = "devlife.scheduler.report.section.jobs.min";
public static final String REPORT_SECTION_JOBS_MAX = "devlife.scheduler.report.section.jobs.max";
public static final String REPORT_SECTION_QUEUE_MAX = "devlife.scheduler.report.section.queue.max";
}
7. And then update the ReportPropsValues class to reference the new ReportPropsKeys values
package com.jardineworks.dev.life.liferay.portlet.report.util;
import com.liferay.portal.kernel.util.GetterUtil;
import com.liferay.portal.kernel.util.PropsUtil;
/**
* <p>
* Constants class containing the values for the properties file configuration settings.
* </p>
*
* @author andrew jardine | jardine works inc.
*/
public class ReportPropsValues
{
public static final String REPORT_CREATE_DESTINATION_NAME = PropsUtil.get(ReportPropsKeys.REPORT_CREATE_DESTINATION_NAME);
public static final int REPORT_CREATE_JOBS_MIN = GetterUtil.getInteger(PropsUtil.get(ReportPropsKeys.REPORT_CREATE_JOBS_MIN));
public static final int REPORT_CREATE_JOBS_MAX = GetterUtil.getInteger(PropsUtil.get(ReportPropsKeys.REPORT_CREATE_JOBS_MAX));
public static final int REPORT_CREATE_QUEUE_MAX = GetterUtil.getInteger(PropsUtil.get(ReportPropsKeys.REPORT_CREATE_QUEUE_MAX));
// notice that the destination is not here because we build section destinations dynamically based on the section name.
public static final int REPORT_SECTION_JOBS_MIN = GetterUtil.getInteger(PropsUtil.get(ReportPropsKeys.REPORT_SECTION_JOBS_MIN));
public static final int REPORT_SECTION_JOBS_MAX = GetterUtil.getInteger(PropsUtil.get(ReportPropsKeys.REPORT_SECTION_JOBS_MAX));
public static final int REPORT_SECTION_QUEUE_MAX = GetterUtil.getInteger(PropsUtil.get(ReportPropsKeys.REPORT_SECTION_QUEUE_MAX));
}
8. Now, before we go and create message listeners for ALL the sections, let's just try it out with our first one. Update the CreateReportMessageListener class with the following --
package com.jardineworks.dev.life.liferay.portlet.report.messaging;
import org.restlet.Client;
import org.restlet.Request;
import org.restlet.Response;
import org.restlet.data.Method;
import org.restlet.data.Protocol;
import org.restlet.data.Reference;
import com.jardineworks.dev.life.liferay.portlet.report.model.Report;
import com.jardineworks.dev.life.liferay.portlet.report.service.ReportLocalServiceUtil;
import com.jardineworks.dev.life.liferay.portlet.report.util.ReportPropsValues;
import com.jardineworks.dev.life.liferay.portlet.report.util.ReportRESTServiceUtil;
import com.jardineworks.dev.life.liferay.portlet.report.util.ReportStatus;
import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageBusUtil;
import com.liferay.portal.kernel.messaging.MessageListener;
import com.liferay.portal.kernel.messaging.MessageListenerException;
import com.liferay.portal.kernel.messaging.ParallelDestination;
import com.liferay.portal.kernel.util.GetterUtil;
import com.liferay.portal.kernel.util.Validator;
/**
* <p>
* Implementation class of message listener handler for jobs that are used to create reports.
* </p>
*
* @author andrew jardine | jardine works inc.
*/
public class CreateReportMessageListener implements MessageListener
{
// class logger
private static final Log _log = LogFactoryUtil.getLog(CreateReportMessageListener.class);
/**
* Main entry point for the message listener.
*
* @param {@link Message} object containing the details being send from invoker to handler.
*/
@Override
public void receive(Message message) throws MessageListenerException
{
try
{
_doReceive(message);
}
catch (Exception e)
{
_log.error("Problem occurred while trying to process message for detination: " + message.getDestinationName());
}
}
/**
* Delegate method used to orchestrate and or perform the listener actions.
*
* @param message
* {@link Message} object containing the details being send from invoker to handler.
*/
private void _doReceive(Message message)
{
// in here we can do whatever we need to but for the purposes of this example we'll just dump something
// to the log so that we know that we got this far into the process.
if (_log.isInfoEnabled())
_log.info("Starting to process message for " + this.getClass().getCanonicalName());
long start = 0L;
long end = 0L;
try
{
Report r = _getReport(GetterUtil.getLong(message.get("reportId")));
if (Validator.isNotNull(r) && r.getReportStatus() == ReportStatus.STATUS_PENDING)
{
if (_log.isInfoEnabled())
_log.info("Processing report " + r.getReportName() + "...");
start = System.currentTimeMillis();
r.setReportStatus(ReportStatus.STATUS_PROCESSING);
ReportLocalServiceUtil.updateReport(r);
boolean result = _process(r);
if (result)
r.setReportStatus(ReportStatus.STATUS_COMPLETE);
else
r.setReportStatus(ReportStatus.STATUS_ERROR);
ReportLocalServiceUtil.updateReport(r);
end = System.currentTimeMillis();
if (_log.isInfoEnabled())
_log.info(r.getReportName() + " done in " + ((end - start) / 1000) + " seconds.");
}
}
catch (Exception e)
{
_log.error("Error occurred while processing reports.", e);
}
}
/**
* Get the report from the database based on the reportId provided.
*
* @param reportId
* long containing the unique identifier for a report.
* @return {@link Report} object representing the report to be processed.
*/
private Report _getReport(long reportId)
{
try
{
if (_log.isInfoEnabled())
_log.info("Retrieving report with ID " + reportId);
return ReportLocalServiceUtil.getReport(reportId);
}
catch (Exception e)
{
_log.error("Failed to retrieve report with id: " + reportId, e);
}
return null;
}
/**
* Processes the report passed to the method.
*
* @param report
* {@link Report} object representing the report to be processed.
* @return boolean value of true if the report is processed without error, othewise false is returned.
*/
private boolean _process(Report report)
{
boolean result = Boolean.FALSE;
try
{
if (report.getSectionSummary())
_processSection(report, "summary", ReportSectionSummaryMessageListener.class););
if (report.getSectionROI())
ReportRESTServiceUtil.getResponse(report);
if (report.getSectionRecommendations())
ReportRESTServiceUtil.getResponse(report);
if (report.getSectionMetrics())
ReportRESTServiceUtil.getResponse(report);
if (report.getSectionCosts())
ReportRESTServiceUtil.getResponse(report);
}
catch (Exception e)
{
_log.error("Error occurred while processing report. Report will be considered a failure.", e);
}
return result;
}
/**
* Creates the message and puts the message on the bus to be received by the listener used to process a report section.
*
* @param report
* {@link Report} object that is being referenced in the message.
*/
private void _processSection(Report report, String section, Class listenerClazz) throws IllegalAccessException, InstantiationException
{
String destinationName = "devlife/report-section-" + section;
// check to see if our destination is registered. If not, register it on the fly using
// the values we configured in our portal-ext.
if (!MessageBusUtil.hasMessageListener(destinationName))
{
if (_log.isInfoEnabled())
_log.info("Creating parallel destination for report section " + section + " listener...");
ParallelDestination destination = new ParallelDestination();
destination.setName(destinationName);
destination.setWorkersCoreSize(ReportPropsValues.REPORT_SECTION_JOBS_MIN);
destination.setWorkersMaxSize(ReportPropsValues.REPORT_SECTION_JOBS_MAX);
destination.setMaximumQueueSize(ReportPropsValues.REPORT_SECTION_QUEUE_MAX);
destination.afterPropertiesSet();
MessageBusUtil.addDestination(destination);
MessageBusUtil.registerMessageListener(destinationName, (MessageListener)listenerClazz.newInstance());
if (_log.isInfoEnabled())
_log.info("Destination added.");
}
// build the message
Message message = new Message();
message.put("reportId", report.getReportId());
message.put("section", section);
message.setDestinationName(destinationName);
// place it onto the bus
MessageBusUtil.sendMessage(destinationName, message);
}
}
9. Build and deploy to see if everything is working
10. Now we're going to add the other section summaries. Basically, copy and paste the ReportSectionSummarymessageListener and rename it for each of the sections.
11. Now update the CreateReportMessageListener so that the _process method looks like this.
/**
* Processes the report passed to the method.
*
* @param report
* {@link Report} object representing the report to be processed.
* @return boolean value of true if the report is processed without error, othewise false is returned.
*/
private boolean _process(Report report)
{
boolean result = Boolean.FALSE;
try
{
if (report.getSectionSummary())
_processSection(report, "summary", ReportSectionSummaryMessageListener.class);
if (report.getSectionROI())
_processSection(report, "roi", ReportSectionROIMessageListener.class);
if (report.getSectionRecommendations())
_processSection(report, "recommendations", ReportSectionRecommendationsMessageListener.class);
if (report.getSectionMetrics())
_processSection(report, "metrics", ReportSectionMetricsMessageListener.class);
if (report.getSectionCosts())
_processSection(report, "costs", ReportSectionCostsMessageListener.class);
}
catch (Exception e)
{
_log.error("Error occurred while processing report. Report will be considered a failure.", e);
}
return result;
}
12. Save and deploy again -- and validate that the output looks good and that things look to be running faster once again.
SECTION #5: Report Assembler
=============================
Moving from generating one report at a time to individual reports was easy because each report was still only accessed by a single running process. By breaking the report generation into multiple sub processes running concurrently we have a new problem to solve -- one that is common in concurrent programming. We need to now take the individual sections and put them back together. There are a few ways to do this. The one we're going to use is to create another message listener that we're going to use to "Assemble" reports.
1. First we need a way to track individual section doneness. So update the service.xml file to the following and rebuild the service code.
<!DOCTYPE service-builder PUBLIC "-//Liferay//DTD Service Builder 6.2.0//EN" "http://www.liferay.com/dtd/liferay-service-builder_6_2_0.dtd">
<service-builder package-path="com.jardineworks.dev.life.liferay.portlet.report">
<namespace>DEVLIFE</namespace>
<entity name="Report" uuid="true" local-service="true" remote-service="false">
<!-- PK fields -->
<column name="reportId" type="long" primary="true" />
<!-- Group instance -->
<column name="groupId" type="long" />
<!-- Audit fields -->
<column name="companyId" type="long" />
<column name="userId" type="long" />
<column name="userName" type="String" />
<column name="createDate" type="Date" />
<column name="modifiedDate" type="Date" />
<!-- report fields -->
<column name="reportName" type="String" />
<column name="reportStatus" type="String" />
<column name="startTime" type="Date" />
<column name="endTime" type="Date" />
<!-- report section details -->
<column name="sectionSummary" type="boolean" />
<column name="sectionSummaryStatus" type="String" />
<column name="sectionCosts" type="boolean" />
<column name="sectionCostsStatus" type="String" />
<column name="sectionROI" type="boolean" />
<column name="sectionROIStatus" type="String" />
<column name="sectionMetrics" type="boolean" />
<column name="sectionMetricsStatus" type="String" />
<column name="sectionRecommendations" type="boolean" />
<column name="sectionRecommendationsStatus" type="String" />
<!-- Order -->
<order by="asc">
<order-column name="reportId" />
</order>
<!-- Finder methods -->
<finder name="ReportStatus" return-type="Collection">
<finder-column name="reportStatus" />
</finder>
</entity>
</service-builder>
2. Copy of one of the listeners and rename it ReportAssemblerMessageListener. Use the following definition.
package com.jardineworks.dev.life.liferay.portlet.report.messaging;
import java.util.HashMap;
import java.util.Map;
import com.jardineworks.dev.life.liferay.portlet.report.model.Report;
import com.jardineworks.dev.life.liferay.portlet.report.service.ReportLocalServiceUtil;
import com.jardineworks.dev.life.liferay.portlet.report.util.ReportStatus;
import com.liferay.portal.kernel.exception.SystemException;
import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageListener;
import com.liferay.portal.kernel.messaging.MessageListenerException;
import com.liferay.portal.kernel.util.GetterUtil;
/**
* <p>
* Implementation class of message listener handler for jobs that are used to create reports.
* </p>
*
* @author andrew jardine | jardine works inc.
*/
public class ReportAssemblerMessageListener implements MessageListener
{
// class logger
private static final Log _log = LogFactoryUtil.getLog(ReportAssemblerMessageListener.class);
private Map<String, Report> reports = new HashMap<String, Report>();
/**
* Main entry point for the message listener.
*
* @param {@link Message} object containing the details being send from invoker to handler.
*/
@Override
public void receive(Message message) throws MessageListenerException
{
try
{
_doReceive(message);
}
catch (Exception e)
{
_log.error("Problem occurred while trying to process message for detination: " + message.getDestinationName());
}
}
/**
* Delegate method used to orchestrate and or perform the listener actions.
*
* @param message
* {@link Message} object containing the details being send from invoker to handler.
* @throws SystemException
*/
private void _doReceive(Message message) throws SystemException
{
long reportId = GetterUtil.getLong(message.get("reportId"));
String section = GetterUtil.getString(message.get("section"));
Report r = _getReport(reportId);
switch (section)
{
case "summary":
r.setSectionSummaryStatus(ReportStatus.STATUS_COMPLETE);
break;
case "roi":
r.setSectionROIStatus(ReportStatus.STATUS_COMPLETE);
break;
case "recommendations":
r.setSectionRecommendationsStatus(ReportStatus.STATUS_COMPLETE);
break;
case "metrics":
r.setSectionMetricsStatus(ReportStatus.STATUS_COMPLETE);
break;
case "costs":
r.setSectionCostsStatus(ReportStatus.STATUS_COMPLETE);
break;
default:
if ( _log.isWarnEnabled() )
_log.warn("Unexpected report section found: " + section);
}
if ( _isAssembled(r) )
{
ReportLocalServiceUtil.updateReport(r);
reports.remove(reportId);
}
else
{
reports.put(GetterUtil.getString(reportId), r);
}
}
/**
* Get the report from the database based on the reportId provided.
*
* @param reportId
* long containing the unique identifier for a report.
* @return {@link Report} object representing the report to be processed.
*/
private Report _getReport(long reportId)
{
try
{
if (_log.isInfoEnabled())
_log.info("Retrieving report with ID " + reportId);
if ( reports.containsKey(GetterUtil.getString(reportId)))
return reports.get(GetterUtil.getString(reportId));
else
return ReportLocalServiceUtil.getReport(reportId);
}
catch (Exception e)
{
_log.error("Failed to retrieve report with id: " + reportId, e);
}
return null;
}
/**
* Checks to see whether or not the report has been fully assembled.
* @param report @{link Report} object that is being checked for doneness.
* @return boolean value of true if the report is fully assembled, otherwise false is returned.
*/
private boolean _isAssembled( Report report )
{
if ( report.getSectionSummaryStatus().equals(ReportStatus.STATUS_COMPLETE) &&
report.getSectionROIStatus().equals(ReportStatus.STATUS_COMPLETE) &&
report.getSectionRecommendationsStatus().equals(ReportStatus.STATUS_COMPLETE) &&
report.getSectionMetricsStatus().equals(ReportStatus.STATUS_COMPLETE) &&
report.getSectionCostsStatus().equals(ReportStatus.STATUS_COMPLETE)
)
{
report.setReportStatus(ReportStatus.STATUS_COMPLETE);
return Boolean.TRUE;
}
return Boolean.FALSE;
}
}
3. We should have done this before but update the CreateReportMessageListener to set the status on the sections. NOTE: Really, we should have the individual section managers perform this task but for time constraints we're cheating on this one.
package com.jardineworks.dev.life.liferay.portlet.report.messaging;
import com.jardineworks.dev.life.liferay.portlet.report.model.Report;
import com.jardineworks.dev.life.liferay.portlet.report.service.ReportLocalServiceUtil;
import com.jardineworks.dev.life.liferay.portlet.report.util.ReportPropsKeys;
import com.jardineworks.dev.life.liferay.portlet.report.util.ReportPropsValues;
import com.jardineworks.dev.life.liferay.portlet.report.util.ReportStatus;
import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageBusUtil;
import com.liferay.portal.kernel.messaging.MessageListener;
import com.liferay.portal.kernel.messaging.MessageListenerException;
import com.liferay.portal.kernel.messaging.ParallelDestination;
import com.liferay.portal.kernel.util.GetterUtil;
import com.liferay.portal.kernel.util.StringPool;
import com.liferay.portal.kernel.util.Validator;
/**
* <p>
* Implementation class of message listener handler for jobs that are used to create reports.
* </p>
*
* @author andrew jardine | jardine works inc.
*/
public class CreateReportMessageListener implements MessageListener
{
// class logger
private static final Log _log = LogFactoryUtil.getLog(CreateReportMessageListener.class);
/**
* Main entry point for the message listener.
*
* @param {@link Message} object containing the details being send from invoker to handler.
*/
@Override
public void receive(Message message) throws MessageListenerException
{
try
{
_doReceive(message);
}
catch (Exception e)
{
_log.error("Problem occurred while trying to process message for detination: " + message.getDestinationName());
}
}
/**
* Delegate method used to orchestrate and or perform the listener actions.
*
* @param message
* {@link Message} object containing the details being send from invoker to handler.
*/
private void _doReceive(Message message)
{
// in here we can do whatever we need to but for the purposes of this example we'll just dump something
// to the log so that we know that we got this far into the process.
if (_log.isInfoEnabled())
_log.info("Starting to process message for " + this.getClass().getCanonicalName());
long start = 0L;
long end = 0L;
try
{
Report r = _getReport(GetterUtil.getLong(message.get("reportId")));
if (Validator.isNotNull(r) && r.getReportStatus() == ReportStatus.STATUS_PENDING)
{
if (_log.isInfoEnabled())
_log.info("Processing report " + r.getReportName() + "...");
r.setReportStatus(ReportStatus.STATUS_PROCESSING);
ReportLocalServiceUtil.updateReport(r);
boolean result = _process(r);
if ( result )
{
r.setSectionSummaryStatus(ReportStatus.STATUS_PROCESSING);
r.setSectionROIStatus(ReportStatus.STATUS_PROCESSING);
r.setSectionRecommendationsStatus(ReportStatus.STATUS_PROCESSING);
r.setSectionMetricsStatus(ReportStatus.STATUS_PROCESSING);
r.setSectionCostsStatus(ReportStatus.STATUS_PROCESSING);
ReportLocalServiceUtil.updateReport(r);
}
}
}
catch (Exception e)
{
_log.error("Error occurred while processing reports.", e);
}
}
/**
* Get the report from the database based on the reportId provided.
*
* @param reportId
* long containing the unique identifier for a report.
* @return {@link Report} object representing the report to be processed.
*/
private Report _getReport(long reportId)
{
try
{
if (_log.isInfoEnabled())
_log.info("Retrieving report with ID " + reportId);
return ReportLocalServiceUtil.getReport(reportId);
}
catch (Exception e)
{
_log.error("Failed to retrieve report with id: " + reportId, e);
}
return null;
}
/**
* Processes the report passed to the method.
*
* @param report
* {@link Report} object representing the report to be processed.
* @return boolean value of true if the report is processed without error, othewise false is returned.
*/
private boolean _process(Report report)
{
boolean result = Boolean.FALSE;
try
{
if (report.getSectionSummary())
_processSection(report, "summary", ReportSectionSummaryMessageListener.class);
if (report.getSectionROI())
_processSection(report, "roi", ReportSectionROIMessageListener.class);
if (report.getSectionRecommendations())
_processSection(report, "recommendations", ReportSectionRecommendationsMessageListener.class);
if (report.getSectionMetrics())
_processSection(report, "metrics", ReportSectionMetricsMessageListener.class);
if (report.getSectionCosts())
_processSection(report, "costs", ReportSectionCostsMessageListener.class);
}
catch (Exception e)
{
_log.error("Error occurred while processing report. Report will be considered a failure.", e);
}
return result;
}
/**
* Creates the message and puts the message on the bus to be received by the listener used to process a report section.
*
* @param report
* {@link Report} object that is being referenced in the message.
*/
private void _processSection(Report report, String section, Class listenerClazz) throws IllegalAccessException, InstantiationException
{
String destinationName = "devlife/report-section-" + section;
// check to see if our destination is registered. If not, register it on the fly using
// the values we configured in our portal-ext.
if (!MessageBusUtil.hasMessageListener(destinationName))
{
if (_log.isInfoEnabled())
_log.info("Creating parallel destination for report section " + section + " listener...");
ParallelDestination destination = new ParallelDestination();
destination.setName(destinationName);
destination.setWorkersCoreSize(ReportPropsValues.REPORT_SECTION_JOBS_MIN);
destination.setWorkersMaxSize(ReportPropsValues.REPORT_SECTION_JOBS_MAX);
destination.setMaximumQueueSize(ReportPropsValues.REPORT_SECTION_QUEUE_MAX);
destination.afterPropertiesSet();
MessageBusUtil.addDestination(destination);
MessageBusUtil.registerMessageListener(destinationName, (MessageListener)listenerClazz.newInstance());
if (_log.isInfoEnabled())
_log.info("Destination added.");
}
// build the message
Message message = new Message();
message.put("reportId", report.getReportId());
message.put("section", section);
message.setDestinationName(destinationName);
// place it onto the bus
MessageBusUtil.sendMessage(destinationName, message);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment