Skip to content

Instantly share code, notes, and snippets.

@dsyer
Created December 23, 2010 13:53
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save dsyer/753000 to your computer and use it in GitHub Desktop.
Save dsyer/753000 to your computer and use it in GitHub Desktop.
Cloud Sample for Spring Batch Admin
.classpath
.project
.settings
target
bin
<?xml version="1.0" encoding="UTF-8"?>
<beansProjectDescription>
<version>1</version>
<pluginVersion><![CDATA[2.5.1.201011101000-RELEASE]]></pluginVersion>
<configSuffixes>
<configSuffix><![CDATA[xml]]></configSuffix>
</configSuffixes>
<enableImports><![CDATA[true]]></enableImports>
<configs>
<config>src/test/resources/org/springframework/batch/admin/sample/PeerIntegrationTests-context.xml</config>
<config>src/test/resources/org/springframework/batch/admin/web/sample/views/AbstractSampleViewTests-context.xml</config>
<config>src/test/resources/org/springframework/batch/admin/sample/BroadcastJobLaunchRequestsIntegrationTests-context.xml</config>
<config>src/test/resources/org/springframework/batch/admin/sample/ChannelInterceptorIntegrationTests-context.xml</config>
<config>src/main/resources/META-INF/spring/batch/override/cluster-context.xml</config>
<config>src/main/resources/META-INF/spring/batch/jobs/doubler-context.xml</config>
<config>src/test/resources/org/springframework/batch/admin/sample/proxy/HttpInvokerProxiesFactoryBeanConfigurationTests-context.xml</config>
<config>src/main/resources/META-INF/spring/batch/servlet/override/resources-context.xml</config>
<config>src/main/resources/META-INF/spring/batch/jobs/staging-context.xml</config>
<config>src/main/resources/META-INF/spring/batch/jobs/unstaging-context.xml</config>
<config>src/main/resources/META-INF/spring/batch/servlet/override/cluster-context.xml</config>
<config>src/main/resources/META-INF/spring/batch/override/broadcast-context.xml</config>
<config>src/main/resources/META-INF/spring/batch/profile/cluster/cluster-context.xml</config>
<config>src/main/resources/META-INF/spring/batch/profile/local/cluster-context.xml</config>
<config>src/test/resources/org/springframework/batch/admin/sample/DetourIntegrationTests-context.xml</config>
<config>src/test/resources/org/springframework/batch/admin/sample/DetourReplyChannelIntegrationTests-context.xml</config>
<config>src/test/resources/org/springframework/batch/admin/sample/DetourTempReplyChannelIntegrationTests-context.xml</config>
<config>src/test/resources/org/springframework/batch/admin/sample/job/DoublerJobIntegrationTests-context.xml</config>
<config>src/main/resources/META-INF/spring/batch/override/env-context.xml</config>
<config>src/test/resources/org/springframework/batch/admin/sample/FilterIntegrationTests-context.xml</config>
<config>src/main/resources/META-INF/spring/batch/servlet/profile/cluster/integration-context.xml</config>
<config>src/main/resources/META-INF/spring/batch/servlet/profile/local/integration-context.xml</config>
<config>src/main/resources/META-INF/spring/batch/profile/cluster/dynamic/remote-context.xml</config>
<config>src/main/resources/META-INF/spring/batch/profile/cluster/static/remote-context.xml</config>
<config>src/main/resources/META-INF/spring/batch/jobs/sql-context.xml</config>
<config>src/test/resources/org/springframework/batch/admin/sample/job/SqlJobIntegrationTests-context.xml</config>
<config>src/test/resources/org/springframework/batch/admin/sample/job/StagingJobIntegrationTests-context.xml</config>
<config>src/main/resources/META-INF/spring/batch/override/synchronizer-context.xml</config>
<config>src/test/resources/org/springframework/batch/admin/sample/job/UnstagingJobIntegrationTests-context.xml</config>
<config>src/test/resources/org/springframework/batch/admin/sample/PeersIntegrationTests-context.xml</config>
<config>target/classes/META-INF/spring/batch/jobs/doubler-context.xml</config>
<config>target/classes/META-INF/spring/batch/jobs/sql-context.xml</config>
<config>target/classes/META-INF/spring/batch/jobs/staging-context.xml</config>
<config>target/classes/META-INF/spring/batch/jobs/unstaging-context.xml</config>
<config>target/classes/META-INF/spring/batch/override/broadcast-context.xml</config>
<config>target/classes/META-INF/spring/batch/override/cluster-context.xml</config>
<config>target/classes/META-INF/spring/batch/override/env-context.xml</config>
<config>target/classes/META-INF/spring/batch/override/synchronizer-context.xml</config>
<config>target/classes/META-INF/spring/batch/servlet/override/resources-context.xml</config>
</configs>
<configSets>
</configSets>
</beansProjectDescription>

Cloud Sample

A sample application extending Spring Batch Admin and making it deployable in a cloud platform (multiple servlet containers behind a load balancer).

Work in Progress

  • Manual registration of new nodes through the /broadcasts endpoint. Doesn't work right now because the form data doesn't get converted to a message body, or headers (but PoC integration test PeersIntegrationTests does work).
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<import resource="classpath*:/META-INF/spring/batch/override/broadcast-context.xml"/>
<import resource="classpath*:/META-INF/spring/batch/servlet/resources/resources*.xml"/>
<import resource="classpath*:/META-INF/spring/batch/servlet/override/resources*.xml"/>
<bean id="resourceService" class="org.springframework.batch.admin.web.resources.DefaultResourceService"/>
</beans>
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.web.sample.views;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.springframework.mock.web.MockHttpServletRequest;
import org.springframework.mock.web.MockHttpServletResponse;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.support.WebApplicationContextLoader;
@ContextConfiguration(loader=WebApplicationContextLoader.class)
@RunWith(SpringJUnit4ClassRunner.class)
public abstract class AbstractSampleViewTests {
protected MockHttpServletRequest request;
protected MockHttpServletResponse response;
@Before
public void setUp() {
request = new MockHttpServletRequest();
request.setRequestURI("/springsource-batch");
response = new MockHttpServletResponse();
}
}
# Placeholders batch.*
# for HSQLDB:
batch.jdbc.driver=org.hsqldb.jdbcDriver
batch.jdbc.url=jdbc:hsqldb:mem:testdb;sql.enforce_strict_size=true
# use this one for a separate server process so you can inspect the results
# (or add it to system properties with -D to override at run time).
# batch.jdbc.url=jdbc:hsqldb:hsql://localhost:9005/samples
batch.jdbc.user=sa
batch.jdbc.password=
batch.jdbc.testWhileIdle=false
batch.jdbc.validationQuery=
batch.schema.script=classpath:/org/springframework/batch/core/schema-hsqldb.sql
batch.drop.script=classpath:/org/springframework/batch/core/schema-drop-hsqldb.sql
batch.business.schema.script=classpath:/business-schema-hsqldb.sql
batch.data.source.init=true
batch.database.incrementer.class=org.springframework.jdbc.support.incrementer.HsqlMaxValueIncrementer
batch.grid.size=2
batch.job.configuration.file.dir=target/config
# Placeholders batch.*
# for MYSQL:
batch.jdbc.driver=com.mysql.jdbc.Driver
batch.jdbc.url=jdbc:mysql://localhost/test
batch.jdbc.user=test
batch.jdbc.password=test
batch.jdbc.testWhileIdle=true
batch.jdbc.validationQuery=SELECT 1
batch.drop.script=classpath:/org/springframework/batch/core/schema-drop-mysql.sql
batch.schema.script=classpath:/org/springframework/batch/core/schema-mysql.sql
batch.business.schema.script=classpath:business-schema-mysql.sql
batch.data.source.init=true
batch.database.incrementer.class=org.springframework.jdbc.support.incrementer.MySQLMaxValueIncrementer
batch.grid.size=2
batch.job.configuration.file.dir=target/config
# Placeholders batch.*
# for MYSQL:
batch.jdbc.driver=com.mysql.jdbc.Driver
batch.jdbc.url=jdbc:mysql://${MYSQL_HOST}:${MYSQL_PORT}/${MYSQL_DATABASE}
batch.jdbc.user=${MYSQL_USER}
batch.jdbc.password=${MYSQL_PASSWORD}
batch.jdbc.testWhileIdle=true
batch.jdbc.validationQuery=SELECT 1
batch.drop.script=classpath:/org/springframework/batch/core/schema-drop-mysql.sql
batch.schema.script=classpath:/org/springframework/batch/core/schema-mysql.sql
batch.business.schema.script=classpath:business-schema-mysql.sql
batch.data.source.init=false
batch.database.incrementer.class=org.springframework.jdbc.support.incrementer.MySQLMaxValueIncrementer
batch.grid.size=2
batch.job.configuration.file.dir=target/config
/*
* Copyright 2006-2007 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.net.InetAddress;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import org.springframework.batch.admin.web.JobController;
import org.springframework.beans.factory.BeanFactoryUtils;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.MessagingException;
import org.springframework.integration.core.MessageHandler;
import org.springframework.integration.core.SubscribableChannel;
import org.springframework.integration.message.GenericMessage;
/**
* @author Dave Syer
*
*/
public class BootstrapTests {
@Test
public void testDefaultWebappRootConfiguration() throws Exception {
testConfiguration("jobRepository");
}
@Test
public void testClusterWebappRootConfiguration() throws Exception {
System.setProperty("TOPOLOGY", "cluster");
try {
testConfiguration("jobRepository", "broadcastService");
}
finally {
System.clearProperty("TOPOLOGY");
}
}
private void testConfiguration(String... beans) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"classpath:/org/springframework/batch/admin/web/resources/webapp-config.xml");
for (String bean : beans) {
assertTrue(context.containsBean(bean));
}
context.close();
}
@Test
public void testPingRoute() {
try {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"classpath:/org/springframework/batch/admin/web/resources/webapp-config.xml");
MessageChannel ping = context.getBean("ping", MessageChannel.class);
SubscribableChannel pong = context.getBean("pong", SubscribableChannel.class);
final AtomicReference<BroadcastRequest> wrapper = new AtomicReference<BroadcastRequest>();
pong.subscribe(new MessageHandler() {
public void handleMessage(Message<?> message) throws MessagingException {
wrapper.set((BroadcastRequest) message.getPayload());
}
});
ping.send(new GenericMessage<String>("foo"));
assertNotNull(wrapper.get());
context.close();
}
finally {
System.clearProperty("TOPOLOGY");
}
}
@Test
public void testDefaultServletConfiguration() throws Exception {
ClassPathXmlApplicationContext parent = new ClassPathXmlApplicationContext(
"classpath:/org/springframework/batch/admin/web/resources/webapp-config.xml");
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
new String[] { "classpath:/org/springframework/batch/admin/web/resources/servlet-config.xml" }, parent);
assertTrue(context.containsBean("cluster"));
context.close();
parent.close();
}
@Test
public void testClusterServletConfiguration() throws Exception {
System.setProperty("TOPOLOGY", "cluster");
try {
ClassPathXmlApplicationContext parent = new ClassPathXmlApplicationContext(
"classpath:/org/springframework/batch/admin/web/resources/webapp-config.xml");
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
new String[] { "classpath:/org/springframework/batch/admin/web/resources/servlet-config.xml" },
parent);
assertTrue(context.containsBean("/service/files"));
String[] beanNames = BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context.getBeanFactory(),
JobController.class);
assertEquals(1, beanNames.length);
BroadcastService broadcastService = parent.getBean(BroadcastService.class);
BroadcastRequest ping = broadcastService.handle(broadcastService.ping());
ping = broadcastService.handle(ping);
ping = broadcastService.handle(ping);
assertTrue(broadcastService.isPinged());
context.close();
parent.close();
}
finally {
System.clearProperty("TOPOLOGY");
}
}
@Test
public void testDynamicClusterServletConfiguration() throws Exception {
System.setProperty("TOPOLOGY", "cluster");
System.setProperty("DISCOVERY", "dynamic");
try {
ClassPathXmlApplicationContext parent = new ClassPathXmlApplicationContext(
"classpath:/org/springframework/batch/admin/web/resources/webapp-config.xml");
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
new String[] { "classpath:/org/springframework/batch/admin/web/resources/servlet-config.xml" },
parent);
assertTrue(context.containsBean("/service/files"));
String[] beanNames = BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context.getBeanFactory(),
JobController.class);
assertEquals(1, beanNames.length);
BroadcastService broadcastService = parent.getBean(BroadcastService.class);
BroadcastRequest ping = broadcastService.handle(broadcastService.ping());
ping = broadcastService.handle(ping);
ping = broadcastService.handle(ping);
assertTrue(broadcastService.isPinged());
Properties properties = context.getBean("clusterProperties", Properties.class);
assertEquals("http://" + InetAddress.getLocalHost().getHostName() + ":8080/service/files",
properties.getProperty("file.service.urls"));
context.close();
parent.close();
}
finally {
System.clearProperty("TOPOLOGY");
System.clearProperty("DISCOVERY");
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd">
<integration:channel id="ping" />
<integration:publish-subscribe-channel id="pinged" />
<integration:channel id="pong" />
<integration:logging-channel-adapter channel="pinged" level="INFO" />
<chain xmlns="http://www.springframework.org/schema/integration" input-channel="ping">
<transformer
expression="payload instanceof T(org.springframework.batch.admin.sample.BroadcastRequest) ? payload : @broadcastService.ping()" />
<service-activator ref="broadcastService" />
<router expression="payload?.message=='pinged' ? 'pinged' : 'pong'" />
</chain>
<bean id="broadcastService" class="org.springframework.batch.admin.sample.BroadcastService">
<property name="staticProperties">
<props>
<prop key="port">${local.port:8080}</prop>
</props>
</property>
</bean>
</beans>
package org.springframework.batch.admin.sample;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@SuppressWarnings("serial")
public class BroadcastId implements Serializable {
private final String name;
private final String host;
private final String id;
private final Map<String, String> properties;
public BroadcastId(String name, String host, String id) {
this(name, host, id, new HashMap<String, String>());
}
public BroadcastId(String name, String host, String id, Map<String, String> properties) {
this.name = name;
this.host = host;
this.id = id;
this.properties = properties;
}
public Map<String, String> getProperties() {
return properties;
}
public String getName() {
return name;
}
public String getHost() {
return host;
}
public String getId() {
return id;
}
public String toString() {
return String.format("[name=%s, host=%s, id=%s]", name, host, id);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((host == null) ? 0 : host.hashCode());
result = prime * result + ((id == null) ? 0 : id.hashCode());
result = prime * result + ((name == null) ? 0 : name.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
BroadcastId other = (BroadcastId) obj;
if (host == null) {
if (other.host != null)
return false;
}
else if (!host.equals(other.host))
return false;
if (id == null) {
if (other.id != null)
return false;
}
else if (!id.equals(other.id))
return false;
if (name == null) {
if (other.name != null)
return false;
}
else if (!name.equals(other.name))
return false;
return true;
}
}
package org.springframework.batch.admin.sample.web;
import java.util.Set;
import org.springframework.batch.admin.sample.BroadcastId;
public class BroadcastInfo {
private final Set<BroadcastId> peers;
private final BroadcastId id;
public BroadcastInfo(BroadcastId id, Set<BroadcastId> peers) {
this.id = id;
this.peers = peers;
}
public BroadcastId getId() {
return id;
}
public Set<BroadcastId> getPeers() {
return peers;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration" xmlns:http="http://www.springframework.org/schema/integration/http"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd">
<import resource="classpath:/org/springframework/batch/admin/web/resources/webapp-config.xml" />
<import resource="classpath:/META-INF/spring/batch/jobs/staging-context.xml" />
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository"/>
</bean>
</beans>
package org.springframework.batch.admin.sample;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.util.ArrayList;
import java.util.Collection;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.batch.test.MetaDataInstanceFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.MessageHandlingException;
import org.springframework.integration.MessagingException;
import org.springframework.integration.core.MessageHandler;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.core.SubscribableChannel;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
public class BroadcastJobLaunchRequestsIntegrationTests {
@Autowired
@Qualifier("job-requests")
private MessageChannel requests;
@Autowired
@Qualifier("job-operator")
private SubscribableChannel replies;
@Autowired
@Qualifier("staging")
private Job job;
private JobExecution result;
private MessageHandler handler = new MessageHandler() {
public void handleMessage(Message<?> message) throws MessagingException {
result = (JobExecution) message.getPayload();
}
};
@BeforeClass
public static void environmentSetUp() {
System.setProperty("TOPOLOGY", "cluster");
}
@BeforeClass
public static void environmentTearDown() {
System.clearProperty("TOPOLOGY");
}
@Before
public void init() {
replies.subscribe(handler);
}
@After
public void close() {
replies.unsubscribe(handler);
}
@Before
public void clear() {
TestService.messages.clear();
}
@Test(expected = MessageHandlingException.class)
public void testBroadcastWithNonExistentFile() throws Exception {
MessagingTemplate gateway = new MessagingTemplate();
gateway.setDefaultChannel(replies);
gateway.setReceiveTimeout(500L);
gateway.afterPropertiesSet();
JobParameters jobParameters = new JobParametersBuilder().addString("input.file", "file:./target/input.txt")
.toJobParameters();
gateway.convertAndSend(requests, new JobLaunchRequest(job, jobParameters));
assertEquals(null, result);
assertEquals(1, TestService.messages.size());
}
@Test
public void testBroadcastWithExistingFile() throws Exception {
MessagingTemplate gateway = new MessagingTemplate();
gateway.setDefaultChannel(replies);
gateway.setReceiveTimeout(500L);
gateway.afterPropertiesSet();
JobParameters jobParameters = new JobParametersBuilder().addString("input.file",
"file:./src/test/resources/data/test.txt").toJobParameters();
gateway.convertAndSend(requests, new JobLaunchRequest(job, jobParameters));
assertNotNull("No JobExecution started", result);
}
public static class TestService {
private static Collection<Object> messages = new ArrayList<Object>();
public JobExecution accept(String message) {
messages.add(message);
if (message.contains("input.txt")) {
throw new IllegalStateException("Planned");
}
return MetaDataInstanceFactory.createJobExecution();
}
}
}
/*
* Copyright 2006-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import javax.servlet.ServletContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;
import org.springframework.web.context.ServletContextAware;
/**
* @author Dave Syer
*
*/
public class BroadcastPropertiesFactoryBean implements FactoryBean<Properties>, ServletContextAware, InitializingBean {
private static Log logger = LogFactory.getLog(BroadcastPropertiesFactoryBean.class);
private PropertiesLoader propertiesLoader = new PropertiesLoader();
private BroadcastService broadcastService;
private String template = "cluster";
private String contextPath;
private String servletPath;
public void setServletContext(ServletContext servletContext) {
contextPath = servletContext.getContextPath();
}
/**
* @param contextPath the path to the batch servlet (not including context root)
*/
public void setServletPath(String servletPath) {
// TODO: replace this with a generic map
this.servletPath = servletPath;
}
/**
* The name of the template to render to create properties.
*
* @param template the template name
*/
public void setTemplate(String template) {
this.template = template;
}
public void setBroadcastService(BroadcastService broadcastService) {
this.broadcastService = broadcastService;
}
/*
* (non-Javadoc)
*
* @see
* org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
*/
public void afterPropertiesSet() throws Exception {
Assert.state(broadcastService != null, "A BroadcastService must be provided");
}
public Properties getObject() throws Exception {
Map<String, Object> model = new HashMap<String, Object>();
model.put("servletPath", (contextPath==null ? "" : contextPath) + servletPath);
Set<BroadcastId> peers = broadcastService.getPeers();
model.put("peers", peers);
model.put("id", broadcastService.getId());
logger.info("Cluster model: "+model);
Properties properties = propertiesLoader.getProperties(template, model);
logger.info("Cluster properties: "+properties);
return properties;
}
public Class<? extends Properties> getObjectType() {
return Properties.class;
}
public boolean isSingleton() {
return true;
}
}
/*
* Copyright 2006-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.net.InetAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import org.junit.Before;
import org.junit.Test;
import org.springframework.core.io.ClassPathResource;
import org.springframework.util.StringUtils;
/**
* @author Dave Syer
*
*/
public class BroadcastPropertiesFactoryBeanTests {
private BroadcastPropertiesFactoryBean factory = new BroadcastPropertiesFactoryBean();
private BroadcastService broadcastService = new BroadcastService();
@Before
public void setUp() throws Exception {
Map<String, String> properties = Collections.singletonMap("port",
"8080");
broadcastService.setName("foo");
broadcastService.setStaticProperties(properties);
broadcastService.afterPropertiesSet();
BroadcastRequest pong = broadcastService.handle(null);
Set<BroadcastId> peers = new HashSet<BroadcastId>();
peers.add(new BroadcastId("foo", "foo.com", "FOO", properties));
pong = new BroadcastRequest("ping", pong.getFrom(), peers);
pong = broadcastService.handle(pong);
pong = broadcastService.handle(pong);
pong = broadcastService.handle(pong);
pong = broadcastService.handle(pong);
assertTrue(broadcastService.isPinged());
assertEquals(2, broadcastService.getPeers().size());
assertEquals("{port=8080}", broadcastService.getPeers().iterator()
.next().getProperties().toString());
}
@Test
public void testGetObject() throws Exception {
assertTrue(new ClassPathResource("/template/properties/cluster.ftl")
.exists());
factory.setBroadcastService(broadcastService);
factory.setServletPath("/batch");
Properties properties = factory.getObject();
// System.err.println(properties);
assertEquals(
StringUtils.commaDelimitedListToSet("http://foo.com:8080/batch/service/files,http://"
+ broadcastService.getId().getHost()
+ ":8080/batch/service/files"),
StringUtils.commaDelimitedListToSet(properties
.getProperty("file.service.urls")));
}
@Test
public void testTimestampUUID() throws Exception {
long timestamp = System.currentTimeMillis() * 10000 + 122192928000000000L;
long clockSequence = 123;
long version = 1;
long variant = 2;
long mostSigBits = 0;
long leastSigBits = 0;
long node = 0;
byte[] bytes = InetAddress.getLocalHost().getAddress();
for (int i = 0; i < bytes.length; i++) {
node |= (0xffffffffffffffffL & (new Long(bytes[i]) << (i * 8)));
// Need to mask off the top bits...
node = node & 0x0000FFFFFFFFFFFFL;
}
mostSigBits &= (~0x000000000000F000L);
mostSigBits |= version << 12;
mostSigBits &= (~0xFFFFFFFF00000000L);
mostSigBits |= (timestamp & 0x00000000FFFFFFFF) << 32;
mostSigBits &= (~0x00000000FFFF0000L);
mostSigBits |= (timestamp & 0x0000FFFF00000000L) >>> 16;
mostSigBits &= (~0x0000000000000FFFL);
mostSigBits |= (timestamp & 0x0FFF000000000000L) >>> 48;
leastSigBits &= (~0xD000000000000000L);
if (variant == 2) {
leastSigBits |= 0x8000000000000000L;
} else {
leastSigBits |= variant << 61;
}
leastSigBits &= (~0x3FFF000000000000L);
leastSigBits |= clockSequence << 48;
leastSigBits &= (~0x0000FFFFFFFFFFFFL);
leastSigBits |= node & 0x0000FFFFFFFFFFFFL;
UUID uuid = new UUID(mostSigBits, leastSigBits);
assertEquals(version, uuid.version());
assertEquals(variant, uuid.variant());
assertEquals(clockSequence, uuid.clockSequence());
assertEquals(node, uuid.node());
assertEquals(timestamp, uuid.timestamp());
}
}
package org.springframework.batch.admin.sample;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
@SuppressWarnings("serial")
public class BroadcastRequest implements Serializable {
private final Set<BroadcastId> peers;
private final String message;
private final BroadcastId from;
private final int bounces;
private final int generation;
private final boolean ready;
private final String conversation;
public BroadcastRequest(String message, BroadcastId from, Set<BroadcastId> peers) {
this(generateConversationId(), message, from, peers, 0, 0, false);
}
private BroadcastRequest(String conversation, String message, BroadcastId from, Set<BroadcastId> peers,
int generation, int bounces, boolean ready) {
this.conversation = conversation;
this.message = message;
this.from = from;
this.peers = peers;
this.generation = generation;
this.bounces = bounces;
this.ready = ready;
}
public Set<BroadcastId> getPeers() {
return peers;
}
public String getMessage() {
return message;
}
public BroadcastId getFrom() {
return from;
}
public int getBounces() {
return bounces;
}
public int getGeneration() {
return generation;
}
public boolean isReady() {
return ready;
}
public String getConversation() {
return conversation;
}
@Override
public String toString() {
return String
.format("[BroadcastRequest: message=%s, ready=%s, peers.size=%d, conversation=%s, from=%s, peers=%s, generation=%d, bounces=%d]",
message, ready, peers.size(), conversation, from, peers, generation, bounces);
}
public BroadcastRequest unready(BroadcastId from, Set<BroadcastId> others) {
Set<BroadcastId> set = new HashSet<BroadcastId>(others);
set.addAll(peers);
set.add(from);
return new BroadcastRequest(conversation, "ping", from, set, generation, bounces + 1, false);
}
public BroadcastRequest ready(BroadcastId from, Set<BroadcastId> others) {
Set<BroadcastId> set = new HashSet<BroadcastId>(others);
set.addAll(peers);
set.add(from);
return new BroadcastRequest(conversation, "ping", from, set, generation + 1, bounces + 1, true);
}
public BroadcastRequest complete(BroadcastId from, Set<BroadcastId> others) {
Set<BroadcastId> set = new HashSet<BroadcastId>(others);
set.addAll(peers);
set.add(from);
return new BroadcastRequest(conversation, "pinged", from, set, generation, bounces + 1, true);
}
public boolean isOlderThan(BroadcastRequest request) {
return request != null
&& UUID.fromString(request.conversation).timestamp() > UUID.fromString(conversation).timestamp();
}
public boolean isSameAgeAs(BroadcastRequest request) {
return request != null
&& UUID.fromString(request.conversation).timestamp() == UUID.fromString(conversation).timestamp();
}
private static String generateConversationId() {
long timestamp = System.currentTimeMillis() * 10000 + 122192928000000000L;
long clockSequence = 123;
long version = 1;
long variant = 2;
long mostSigBits = 0;
long leastSigBits = 0;
long node = 1;
byte[] bytes;
try {
bytes = InetAddress.getLocalHost().getAddress();
for (int i = 0; i < bytes.length; i++) {
node |= (0xffffffffffffffffL & (new Long(bytes[i]) << (i * 8)));
}
}
catch (UnknownHostException e) {
// ignored
}
// System.err.println(String.format("%032x", node));
mostSigBits &= (~0x000000000000F000L);
mostSigBits |= version << 12;
mostSigBits &= (~0xFFFFFFFF00000000L);
mostSigBits |= (timestamp & 0x00000000FFFFFFFF) << 32;
mostSigBits &= (~0x00000000FFFF0000L);
mostSigBits |= (timestamp & 0x0000FFFF00000000L) >>> 16;
mostSigBits &= (~0x0000000000000FFFL);
mostSigBits |= (timestamp & 0x0FFF000000000000L) >>> 48;
leastSigBits &= (~0xC000000000000000L);
leastSigBits |= variant << 62;
leastSigBits &= (~0x3FFF000000000000L);
leastSigBits |= clockSequence << 48;
leastSigBits &= (~0x0000FFFFFFFFFFFFL);
leastSigBits |= node;
UUID uuid = new UUID(mostSigBits, leastSigBits);
return uuid.toString();
}
}
package org.springframework.batch.admin.sample;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedResource;
@MessageEndpoint
@ManagedResource
public class BroadcastService implements BeanNameAware, InitializingBean {
private static Log logger = LogFactory.getLog(BroadcastService.class);
private volatile BroadcastId id;
private String name;
private Map<String, String> staticProperties = new HashMap<String, String>();
private Object lock = new Object();
private volatile BroadcastRequest current = null;
public void afterPropertiesSet() throws Exception {
if (id == null) {
init();
}
}
public void setStaticProperties(Map<String, String> staticProperties) {
this.staticProperties = staticProperties;
}
private void init() {
synchronized (lock) {
if (id == null) {
String host = "localhost";
try {
host = InetAddress.getLocalHost().getHostName();
}
catch (UnknownHostException e) {
logger.warn("Could not determine host address", e);
}
id = new BroadcastId(name, host, UUID.randomUUID().toString(), staticProperties);
}
}
}
public void setName(String name) {
this.name = name;
}
public void setBeanName(String name) {
if (this.name == null) {
this.name = name;
}
}
public void reset() {
// current = null;
}
/**
* @return the status of the service: has it been successfully pinged?
*/
@ManagedAttribute
public boolean isPinged() {
return current != null && current.isReady();
}
public BroadcastId getId() {
if (id == null) {
init();
}
return id;
}
public Set<BroadcastId> getPeers() {
return current == null ? new HashSet<BroadcastId>() : new HashSet<BroadcastId>(current.getPeers());
}
@ServiceActivator
public BroadcastRequest handle(BroadcastRequest request) {
if (request == null) {
request = ping();
}
logger.debug("Ping: to=" + id + " " + request);
if (current == null) {
current = request;
}
Set<BroadcastId> peers = current.getPeers();
Set<BroadcastId> others = request.getPeers();
BroadcastRequest input = request;
if (input.isOlderThan(current)) {
return current;
}
else if (!input.isSameAgeAs(current)) {
// It's newer, so we should discard the old one
current = input;
}
else if (others.size() < peers.size() || current.isReady()) {
// The incoming request is the same age, but it has less peers so it
// is not up to date
input = current;
}
others = input.getPeers();
peers = current.getPeers();
// We are not yet registered
if (!others.contains(id)) {
current = input.unready(id, peers);
}
else if (!peers.equals(others)) {
// We are already registered and stable but give the others a chance
// to find us
current = input.ready(id, peers);
}
else {
// We are already registered, and fairly confident that the others
// are aware
logger.debug("Ping complete, publishing pinged message");
current = input.complete(id, peers);
}
return current;
}
/**
* @return a new BroadcastRequest initiating a new conversation
*/
public BroadcastRequest ping() {
return new BroadcastRequest("ping", id, new HashSet<BroadcastId>());
}
}
DROP TABLE LEAD_INPUTS IF EXISTS;
DROP TABLE LEAD_INPUTS_SEQ IF EXISTS;
DROP TABLE LEADS IF EXISTS;
DROP TABLE LEADS_SEQ IF EXISTS;
CREATE TABLE LEAD_INPUTS (
ID BIGINT IDENTITY,
PROCESSED BOOLEAN DEFAULT FALSE,
DATA LONGVARCHAR
);
CREATE TABLE LEADS (
ID BIGINT IDENTITY,
PRODUCT VARCHAR(100),
AMOUNT FLOAT,
CLIENT_NAME VARCHAR(100),
CLIENT_COUNTRY CHAR(2),
QUERY VARCHAR(1000),
SALES VARCHAR(100)
);
CREATE TABLE LEAD_INPUTS_SEQ (
ID BIGINT IDENTITY
);
INSERT INTO LEAD_INPUTS_SEQ (ID) VALUES (0);
CREATE TABLE LEADS_SEQ (
ID BIGINT IDENTITY
);
INSERT INTO LEADS_SEQ (ID) VALUES (0);
DROP TABLE LEAD_INPUTS;
DROP TABLE LEAD_INPUTS_SEQ;
DROP TABLE LEADS;
DROP TABLE LEADS_SEQ;
CREATE TABLE LEAD_INPUTS (
ID BIGINT NOT NULL PRIMARY KEY,
PROCESSED BOOLEAN DEFAULT FALSE,
DATA TEXT
) ENGINE=InnoDB;
CREATE TABLE LEADS (
ID BIGINT NOT NULL PRIMARY KEY,
PRODUCT VARCHAR(100),
AMOUNT FLOAT,
CLIENT_NAME VARCHAR(100),
CLIENT_COUNTRY CHAR(2),
QUERY VARCHAR(1000),
SALES VARCHAR(100)
) ENGINE=InnoDB;
CREATE TABLE LEAD_INPUTS_SEQ (
ID BIGINT NOT NULL
) ENGINE=MYISAM;
INSERT INTO LEAD_INPUTS_SEQ (ID) VALUES (0);
CREATE TABLE LEADS_SEQ (
ID BIGINT NOT NULL
) ENGINE=MYISAM;
INSERT INTO LEADS_SEQ (ID) VALUES (0);
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:beans="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/integration" xmlns:http="http://www.springframework.org/schema/integration/http"
xmlns:util="http://www.springframework.org/schema/util" xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd">
<channel id="filtered" />
<channel id="intercepted">
<interceptors>
<beans:bean class="org.springframework.batch.admin.sample.ChannelInterceptorIntegrationTests$Interceptor" />
</interceptors>
</channel>
<channel id="advised" />
<channel id="responses" />
<filter input-channel="filtered" output-channel="responses" expression="false" />
<bridge input-channel="intercepted" output-channel="responses" />
<bridge input-channel="advised" output-channel="responses" />
<aop:config>
<aop:aspect ref="advice">
<aop:around method="send" pointcut="execution(* org.springframework..MessageChannel.send(..)) and bean(advised) and args(message, ..)" />
</aop:aspect>
</aop:config>
<beans:bean id="advice" class="org.springframework.batch.admin.sample.ChannelInterceptorIntegrationTests$ChannelAdvice" />
</beans:beans>
package org.springframework.batch.admin.sample;
import static org.junit.Assert.assertEquals;
import org.aspectj.lang.ProceedingJoinPoint;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.MessageDeliveryException;
import org.springframework.integration.channel.interceptor.ChannelInterceptorAdapter;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
public class ChannelInterceptorIntegrationTests {
@Autowired
@Qualifier("filtered")
private MessageChannel filtered;
@Autowired
@Qualifier("intercepted")
private MessageChannel intercepted;
@Autowired
@Qualifier("advised")
private MessageChannel advised;
@Test
public void testFilter() throws Exception {
MessagingTemplate gateway = new MessagingTemplate();
gateway.setReceiveTimeout(100L);
String result = (String) gateway.convertSendAndReceive(filtered, "foo");
assertEquals(null, result);
}
@Test(expected=MessageDeliveryException.class)
public void testInterceptor() throws Exception {
MessagingTemplate gateway = new MessagingTemplate();
gateway.setReceiveTimeout(100L);
String result = (String) gateway.convertSendAndReceive(intercepted, "foo");
assertEquals(null, result);
}
@Test
public void testAdvice() throws Exception {
MessagingTemplate gateway = new MessagingTemplate();
gateway.setReceiveTimeout(100L);
String result = (String) gateway.convertSendAndReceive(advised, "foo");
assertEquals(null, result);
}
public static class Interceptor extends ChannelInterceptorAdapter {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
return null;
}
}
public static class ChannelAdvice {
public boolean send(ProceedingJoinPoint joinPoint, Message<?> message) throws Throwable {
// joinPoint.proceed();
return true;
}
}
}
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.lead;
public class Client {
String name;
String country;
public Client() {
this("None", "XX");
}
public Client(String name, String countery) {
this.name = name;
country = countery;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
@Override
public String toString() {
return String.format("[%s: name=%s, country=%s]", getClass().getSimpleName(), name, country);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<import resource="../profile/${TOPOLOGY:local}/*.xml"/>
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:integration="http://www.springframework.org/schema/integration" xmlns:http="http://www.springframework.org/schema/integration/http"
xmlns:jdbc="http://www.springframework.org/schema/integration/jdbc" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/integration/http http://www.springframework.org/schema/integration/http/spring-integration-http-2.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<import resource="${DISCOVERY:static}/*.xml"/>
<bean id="localFileService" class="org.springframework.batch.admin.service.LocalFileService">
<property name="fileSender" ref="fileSender" />
<property name="outputDir" value="#{systemProperties['java.io.tmpdir']?:'/tmp'}/batch/#{systemProperties['HOST']?:'local'}/files" />
</bean>
<aop:config>
<aop:aspect ref="detour">
<aop:around method="send"
pointcut="execution(* org.springframework..MessageChannel.send(..)) and bean(job-requests) and args(message, ..)" />
</aop:aspect>
</aop:config>
<bean id="detour" class="org.springframework.batch.admin.sample.Detour">
<constructor-arg ref="job-requests-diversion" />
<constructor-arg ref="selector" />
</bean>
<bean id="selector" class="org.springframework.integration.filter.MethodInvokingSelector">
<constructor-arg ref="fileFilter" />
<constructor-arg value="cannotProcess" />
</bean>
<bean id="fileFilter" class="org.springframework.batch.admin.sample.InputFileFilter" />
<integration:channel id="job-requests-diversion" />
<chain id="externalRequestSender" input-channel="job-requests-diversion" output-channel="job-operator" xmlns="http://www.springframework.org/schema/integration">
<transformer>
<beans:bean class="org.springframework.batch.admin.sample.JobRequestToStringAdapter" />
</transformer>
<service-activator ref="jobRequestHandler" />
</chain>
<integration:channel id="external-transformed-requests" />
<integration:bridge input-channel="external-transformed-requests" output-channel="job-launches" />
<integration:gateway id="localJobRequestHandler" default-request-channel="external-transformed-requests"
default-reply-channel="job-operator" service-interface="org.springframework.batch.admin.sample.relay.Relay" default-reply-timeout="1000"/>
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd">
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration"
xmlns:http="http://www.springframework.org/schema/integration/http"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<import resource="../profile/${TOPOLOGY:local}/*.xml"/>
</beans>
<#if peers?? && peers?size!=0>
<#list peers as peer>
<#assign files_list>${files_list!}<#if peer_index!=0>,</#if>http://${peer.host}:${peer.properties.port}${servletPath}/service/files</#assign>
<#assign sink_list>${sink_list!}<#if peer_index!=0>,</#if>http://${peer.host}:${peer.properties.port}${servletPath}/service/sink</#assign>
</#list>
<#else>
<#assign files_list>http://localhost:8080${servletPath}/service/files</#assign>
<#assign sink_list>http://localhost:8080${servletPath}/service/jobs</#assign>
</#if>
file.service.urls=${files_list}
job.request.handler.service.urls=${sink_list}
package org.springframework.batch.admin.sample;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.admin.service.FileInfo;
import org.springframework.batch.admin.service.FileService;
import org.springframework.core.io.Resource;
public class ClusterFileService implements FileService {
private static final Log logger = LogFactory.getLog(ClusterFileService.class);
public Set<FileService> services = Collections.emptySet();
public FileService localService;
/**
* @param localService the localService to set
*/
public void setLocalService(FileService localService) {
this.localService = localService;
}
/**
* @param services the services to set
*/
public void setServices(Set<FileService> services) {
this.services = services;
}
public FileInfo createFile(String path) throws IOException {
return localService.createFile(path);
}
public int delete(String pattern) throws IOException {
int count = 0;
for (FileService service : services) {
count = service.delete(pattern);
}
return count;
}
public int countFiles() {
int count = 0;
for (FileService service : services) {
count += service.countFiles();
}
return count;
}
public List<FileInfo> getFiles(int startFile, int pageSize) throws IOException {
List<FileInfo> files = new ArrayList<FileInfo>();
for (FileService service : services) {
logger.debug("Getting files from service: " + service);
List<FileInfo> remotes = service.getFiles(startFile, pageSize);
if (service == localService) {
files.addAll(remotes);
} else {
for (FileInfo file : remotes) {
file = new FileInfo(file.getPath(), file.getTimestamp(), false);
files.add(file);
}
}
logger.debug("Finished");
}
// TODO: paginate this properly using the individual counts.
return files;
}
public Resource getResource(String path) {
return localService.getResource(path);
}
public boolean publish(FileInfo target) throws IOException {
return localService.publish(target);
}
}
package org.springframework.batch.admin.sample.relay;
import java.util.Set;
public class CompositeRelay<T, S> implements Relay<T, S> {
private Set<Relay<T, S>> delegates;
private boolean expectReply = true;
/**
* Flag to indicate that a reply is expected: i.e. one of the delegates must
* respond with a non-null result.
*
* @param expectReply the flag value to set
*/
public void setExpectReply(boolean expectReply) {
this.expectReply = expectReply;
}
public void setDelegates(Set<Relay<T, S>> delegates) {
this.delegates = delegates;
}
public S accept(T message) {
S result = null;
for (Relay<T, S> sink : delegates) {
S reply = sink.accept(message);
if (reply != null && result != null) {
result = reply;
}
}
if (expectReply && result == null) {
throw new IllegalStateException("Expected reply and got none.");
}
return result;
}
}
# Placeholders static network topology
file.service.urls=http://localhost:8080/cloud-sample/batch/service/files,http://localhost:8081/cloud-sample/batch/service/files
job.request.handler.service.urls=http://localhost:8080/cloud-sample/batch/service/jobs,http://localhost:8081/cloud-sample/batch/service/jobs
# Placeholders static network topology
local.port=8080
# Placeholders static network topology
local.port=8081
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.aspectj.lang.ProceedingJoinPoint;
import org.springframework.context.Lifecycle;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.core.MessageSelector;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.util.Assert;
/**
* An aspect that diverts the message through a gateway, and only forwards it if
* it gets a reply.
*
* @author Dave Syer
*/
public class Detour implements Lifecycle {
private static final Log logger = LogFactory.getLog(Detour.class);
private final MessageChannel channel;
private volatile long timeout = 0;
private final MessageSelector selector;
private volatile boolean running = true;
/**
* Create a new wire tap with <em>no</em> {@link MessageSelector}.
*
* @param channel the MessageChannel to which intercepted messages will be
* sent
*/
public Detour(MessageChannel channel) {
this(channel, null);
}
/**
* Create a new wire tap with the provided {@link MessageSelector}.
*
* @param channel the channel to which intercepted messages will be sent
* @param selector the selector that must accept a message for it to be sent
* to the intercepting channel
*/
public Detour(MessageChannel channel, MessageSelector selector) {
Assert.notNull(channel, "channel must not be null");
this.channel = channel;
this.selector = selector;
}
/**
* Specify the timeout value for sending to the intercepting target. Note
* that this value will only apply if the target is a {@link BlockingTarget}
* . The default value is 0.
*
* @param timeout the timeout in milliseconds
*/
public void setTimeout(long timeout) {
this.timeout = timeout;
}
/**
* Check whether the wire tap is currently running.
*/
public boolean isRunning() {
return this.running;
}
/**
* Restart the wire tap if it has been stopped. It is running by default.
*/
public void start() {
this.running = true;
}
/**
* Stop the wire tap. To restart, invoke {@link #start()}.
*/
public void stop() {
this.running = false;
}
/**
* Intercept the Message and, <em>if accepted</em> by the
* {@link MessageSelector}, send it to the secondary target, wait for a
* reply (up to a timeout if specified), and continue. If this component's
* {@link MessageSelector} is <code>null</code>, it will accept all
* messages.
*/
public boolean send(ProceedingJoinPoint joinPoint, Message<?> message) throws Throwable {
if (this.running && (this.selector == null || this.selector.accept(message))) {
if (logger.isDebugEnabled()) {
logger.debug("Detour to channel '" + this.channel + "' was taken (message accepted)");
}
Message<?> reply = divert(message);
if (reply==null) {
if (logger.isDebugEnabled()) {
logger.debug("Detour filtered message (or timed out)");
}
return true;
}
return (Boolean) joinPoint.proceed(new Object[] { reply });
}
if (logger.isDebugEnabled()) {
logger.debug("Detour to channel '" + this.channel + "' was not taken (message not accepted)");
}
return (Boolean) joinPoint.proceed();
}
private Message<?> divert(Message<?> message) {
MessagingTemplate template = new MessagingTemplate();
template.setReceiveTimeout(timeout >= 0 ? timeout : 0);
Message<?> reply = template.sendAndReceive(channel, message);
return reply;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration" xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<integration:channel id="requests" />
<integration:channel id="diversion" />
<integration:channel id="replies" />
<integration:bridge input-channel="requests" output-channel="replies" />
<integration:service-activator input-channel="diversion" output-channel="replies"
expression="'bar'" />
<aop:config>
<aop:aspect ref="detour">
<aop:around method="send"
pointcut="execution(* org.springframework..MessageChannel.send(..)) and bean(requests) and args(message, ..)" />
</aop:aspect>
</aop:config>
<bean id="detour" class="org.springframework.batch.admin.sample.Detour">
<constructor-arg ref="diversion" />
<constructor-arg ref="selector" />
</bean>
<bean id="selector" class="org.springframework.integration.filter.ExpressionEvaluatingSelector">
<constructor-arg value="payload=='foo'" />
</bean>
</beans>
package org.springframework.batch.admin.sample;
import static org.junit.Assert.assertEquals;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.MessagingException;
import org.springframework.integration.core.MessageHandler;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.core.SubscribableChannel;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
public class DetourIntegrationTests {
@Autowired
private MessageChannel requests;
@Autowired
private SubscribableChannel replies;
private String result;
private MessageHandler handler = new MessageHandler() {
public void handleMessage(Message<?> message) throws MessagingException {
result = (String) message.getPayload();
}
};
@Before
public void init() {
replies.subscribe(handler);
}
@After
public void close() {
replies.unsubscribe(handler);
}
@Test
public void testDetourSuccessful() throws Exception {
MessagingTemplate gateway = new MessagingTemplate();
gateway.setReceiveTimeout(500L);
gateway.afterPropertiesSet();
gateway.convertAndSend(requests, "foo");
assertEquals("bar", result);
}
@Test
public void testDetourUnsuccessful() throws Exception {
MessagingTemplate gateway = new MessagingTemplate();
gateway.setReceiveTimeout(500L);
gateway.afterPropertiesSet();
gateway.convertAndSend(requests, "spam");
assertEquals("spam", result);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration" xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<integration:channel id="requests" />
<integration:channel id="diversion" />
<integration:channel id="replies" />
<integration:bridge input-channel="requests" output-channel="replies"/>
<integration:filter input-channel="diversion" output-channel="replies" expression="payload=='foo'" throw-exception-on-rejection="true"/>
<aop:config>
<aop:aspect ref="detour">
<aop:around method="send"
pointcut="execution(* org.springframework..MessageChannel.send(..)) and bean(requests) and args(message, ..)" />
</aop:aspect>
</aop:config>
<bean id="detour" class="org.springframework.batch.admin.sample.Detour">
<constructor-arg ref="diversion" />
<constructor-arg ref="selector" />
</bean>
<bean id="selector" class="org.springframework.integration.filter.ExpressionEvaluatingSelector">
<constructor-arg value="payload=='foo' or payload=='bar'" />
</bean>
</beans>
package org.springframework.batch.admin.sample;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.MessageDeliveryException;
import org.springframework.integration.MessageHandlingException;
import org.springframework.integration.MessageRejectedException;
import org.springframework.integration.core.MessageHandler;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.core.SubscribableChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
public class DetourReplyChannelIntegrationTests implements MessageHandler {
@Autowired
private MessageChannel requests;
@Autowired
private SubscribableChannel replies;
private volatile String result;
private CountDownLatch latch = new CountDownLatch(1);
public void handleMessage(Message<?> message) throws MessageRejectedException, MessageHandlingException,
MessageDeliveryException {
result = (String) message.getPayload();
latch.countDown();
}
@Before
public void init() {
replies.subscribe(this);
}
@After
public void clean() {
replies.unsubscribe(this);
}
@Test
public void testDetourSuccessful() throws Exception {
MessagingTemplate gateway = new MessagingTemplate(requests);
gateway.setReceiveTimeout(500);
gateway.send(MessageBuilder.withPayload("foo").build());
assertEquals("foo", result);
}
@Test(expected=MessageRejectedException.class)
public void testDetourSuccessfulAndFiltered() throws Exception {
MessagingTemplate gateway = new MessagingTemplate(requests);
gateway.setReceiveTimeout(500);
gateway.send(MessageBuilder.withPayload("bar").build());
assertEquals(null, result);
}
@Test
public void testDetourUnsuccessful() throws Exception {
MessagingTemplate gateway = new MessagingTemplate(requests);
gateway.send(MessageBuilder.withPayload("spam").build());
latch.await(1, TimeUnit.SECONDS);
assertEquals("spam", result);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration" xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<integration:channel id="requests" />
<integration:channel id="diversion" />
<integration:bridge input-channel="requests" />
<integration:service-activator input-channel="diversion" expression="'bar'" />
<aop:config>
<aop:aspect ref="detour">
<aop:around method="send"
pointcut="execution(* org.springframework..MessageChannel.send(..)) and bean(requests) and args(message, ..)" />
</aop:aspect>
</aop:config>
<bean id="detour" class="org.springframework.batch.admin.sample.Detour">
<constructor-arg ref="diversion" />
<constructor-arg ref="selector" />
</bean>
<bean id="selector" class="org.springframework.integration.filter.ExpressionEvaluatingSelector">
<constructor-arg value="payload=='foo'" />
</bean>
</beans>
package org.springframework.batch.admin.sample;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
public class DetourTempReplyChannelIntegrationTests {
@Autowired
private MessageChannel requests;
@Test
public void testDetourSuccessful() throws Exception {
MessagingTemplate gateway = new MessagingTemplate(requests);
gateway.setReceiveTimeout(500);
assertEquals("bar", gateway.sendAndReceive(MessageBuilder.withPayload("foo").build()).getPayload());
}
@Test
public void testDetourUnsuccessful() throws Exception {
MessagingTemplate gateway = new MessagingTemplate(requests);
gateway.setReceiveTimeout(500);
assertEquals("spam", gateway.sendAndReceive(MessageBuilder.withPayload("spam").build()).getPayload());
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd">
<job id="doubler" xmlns="http://www.springframework.org/schema/batch">
<step id="doubler-step">
<tasklet ref="fileDoublerTasklet" />
</step>
</job>
<chain input-channel="input-files" output-channel="job-requests" xmlns="http://www.springframework.org/schema/integration">
<filter method="filter">
<bean class="org.springframework.batch.admin.integration.FileParentDirectoryFilter" xmlns="http://www.springframework.org/schema/beans">
<property name="parentName" value="doubler" />
</bean>
</filter>
<service-activator>
<bean class="org.springframework.batch.admin.integration.FileToJobLaunchRequestAdapter" xmlns="http://www.springframework.org/schema/beans">
<property name="job" ref="doubler" />
</bean>
</service-activator>
</chain>
<bean id="fileDoublerTasklet" class="org.springframework.batch.admin.sample.job.FileDoublerTasklet" scope="step">
<property name="source" value="#{jobParameters['input.file']}" />
<property name="destination" value="#{jobParameters['output.file']}" />
<property name="linesToSkip" value="1"/>
</bean>
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<import resource="classpath*:/META-INF/spring/batch/bootstrap/resources/*.xml" />
<import resource="classpath*:/META-INF/spring/batch/bootstrap/manager/*.xml" />
<import resource="classpath*:/META-INF/spring/batch/jobs/doubler-context.xml" />
<integration:channel id="job-requests" />
<bean id="jobLoader" class="java.util.Date" />
<bean id="jobLauncherTaskExecutor" class="org.springframework.core.task.SyncTaskExecutor" />
</beans>
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.job;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@DirtiesContext(classMode=ClassMode.AFTER_CLASS)
public class DoublerJobIntegrationTests {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobExplorer jobExplorer;
@Autowired
private Job job;
@Test
public void testLaunchJob() throws Exception {
int before = jobExplorer.getJobInstances("doubler", 0, 1000).size();
JobParameters jobParameters = new JobParametersBuilder().addString("input.file", "classpath:/data/test.txt")
.addString("output.file", "file:target/data/double.txt").toJobParameters();
JobExecution jobExecution = jobLauncher.run(job, jobParameters);
assertNotNull(jobExecution);
assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());
int after = jobExplorer.getJobInstances("doubler", 0, 1000).size();
assertEquals(before + 1, after);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- Use this to set additional properties on beans at run time -->
<bean id="placeholderProperties" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:/org/springframework/batch/admin/bootstrap/batch.properties</value>
<value>classpath:batch-default.properties</value>
<value>classpath:batch-${ENVIRONMENT:hsql}.properties</value>
<value>classpath:META-INF/spring/batch/profile/${TOPOLOGY:local}/config.properties</value>
<value>classpath:META-INF/spring/batch/profile/${TOPOLOGY:local}/${DISCOVERY:static}/config.properties</value>
<value>classpath:META-INF/spring/batch/profile/${HOST:one}/config.properties</value>
</list>
</property>
<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />
<property name="ignoreResourceNotFound" value="true" />
<property name="ignoreUnresolvablePlaceholders" value="false" />
<property name="order" value="1" />
</bean>
</beans>
<#import "/spring.ftl" as spring />
<#assign url><@spring.url relativeUrl="${servletPath}/env.json"/></#assign>
"environment" : {
"resource" : "${baseUrl}${url}",
"system" : {
<#if system?? && system?size!=0>
<#list system?keys as key>
"${key}" : "${system[key]}"<#if key_index != system?size-1>,</#if>
</#list>
</#if>
},
"host" : {
<#if host?? && host?size!=0>
<#list host?keys as key>
"${key}" : "${host[key]}"<#if key_index != host?size-1>,</#if>
</#list>
</#if>
}
}
/*
* Copyright 2006-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.springframework.batch.admin.sample.web;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.admin.sample.util.JsonWrapper;
/**
* @author Dave Syer
*
*/
public class EnvironmentContextListener implements ServletContextListener {
private static Log logger = LogFactory.getLog(EnvironmentContextListener.class);
public void contextInitialized(ServletContextEvent sce) {
if (System.getenv("VMC_APP_VERSION") != null) {
System.setProperty("TOPOLOGY", "cluster");
System.setProperty("DISCOVERY", "dynamic");
}
String mysql = System.getenv("VMC_MYSQL");
if (mysql != null) {
try {
JsonWrapper wrapper = new JsonWrapper(System.getenv("VMC_SERVICES"));
logger.info("VMC_SERVICES: " + wrapper);
System.setProperty("MYSQL_USER", wrapper.get("mysql.options.user", String.class));
System.setProperty("MYSQL_PASSWORD", wrapper.get("mysql.options.password", String.class));
System.setProperty("MYSQL_DATABASE", wrapper.get("mysql.options.name", String.class));
String[] split = mysql.split(":");
System.setProperty("MYSQL_HOST", split[0]);
System.setProperty("MYSQL_PORT", split[1]);
System.setProperty("ENVIRONMENT", "vmc");
} catch (Exception e) {
// Ignore it...
logger.debug("VMC_SERVICES not discovered ", e);
}
}
}
public void contextDestroyed(ServletContextEvent sce) {
}
}
/*
* Copyright 2006-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.web;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.stereotype.Controller;
import org.springframework.ui.ModelMap;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
/**
* @author Dave Syer
*
*/
@Controller
public class EnvironmentController {
private static Log logger = LogFactory.getLog(EnvironmentController.class);
@RequestMapping(value = "/env", method = RequestMethod.GET)
public void env(ModelMap model) {
model.put("system", getSystemProperties());
model.put("host", getHostProperties());
}
private Properties getSystemProperties() {
Properties env = new Properties();
try {
env.putAll(System.getProperties());
}
catch (Exception e) {
logger.warn("Could not obtain System properties", e);
}
return env;
}
private Properties getHostProperties() {
Properties env = new Properties();
try {
Map<String, String> values = System.getenv();
for (String key : values.keySet()) {
env.setProperty(key, values.get(key));
}
}
catch (Exception e) {
logger.warn("Could not obtain OS environment", e);
}
return env;
}
}
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.web.sample.views;
import static org.junit.Assert.assertEquals;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.admin.sample.util.JsonWrapper;
import org.springframework.batch.support.PropertiesConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.support.WebApplicationContextLoader;
import org.springframework.web.servlet.View;
@ContextConfiguration(loader = WebApplicationContextLoader.class, inheritLocations = false, locations = "AbstractSampleViewTests-context.xml")
@RunWith(SpringJUnit4ClassRunner.class)
public class EnvJsonViewTests extends AbstractSampleViewTests {
private final HashMap<String, Object> model = new HashMap<String, Object>();
@Autowired
@Qualifier("env.json")
private View env;
@Test
public void testListViewWithJobs() throws Exception {
model.put("system",
PropertiesConverter.stringToProperties("foo=bar,spam=baz"));
model.put("host",
PropertiesConverter.stringToProperties("foo=bar,spam=buz"));
model.put("baseUrl", "http://localhost:8080/springsource");
env.render(model, request, response);
String content = response.getContentAsString();
// System.err.println(content);
JsonWrapper wrapper = new JsonWrapper(content);
assertEquals(2, wrapper.get("environment.system", Map.class).size());
assertEquals(2, wrapper.get("environment.host", Map.class).size());
}
@Test
public void testMysqlProperties() throws Exception {
String content = "[{\"name\":\"mysql\",\"tier\":\"free\",\"type\":\"database\",\"version\":\"5.1\",\"vendor\":\"mysql\",\"options\":{\"name\":\"d9f265e7ac9964d71a894a1180bc429f7\",\"port\":3306,\"node_id\":\"mysql_node_1\",\"hostname\":\"10.114.110.212\",\"password\":\"pItb28ZtsnHgB\",\"user\":\"uYxOi7OyhsNBt\"}}]";
JsonWrapper wrapper = new JsonWrapper(content);
// System.err.println(wrapper.get(expression, type));
assertEquals(3306, wrapper.get("mysql.options.port"));
assertEquals("d9f265e7ac9964d71a894a1180bc429f7", wrapper.get("mysql.options.name", String.class));
}
}
/*
* Copyright 2006-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.job;
import java.io.BufferedReader;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.core.io.Resource;
/**
* @author Dave Syer
*
*/
public class FileDoublerTasklet implements Tasklet {
private Resource destination;
private Resource source;
private int linesToSkip = 0;
private String encoding = "UTF-8";
/**
* @param linesToSkip the linesToSkip to set
*/
public void setLinesToSkip(int linesToSkip) {
this.linesToSkip = linesToSkip;
}
/**
* @param encoding the encoding to set
*/
public void setEncoding(String encoding) {
this.encoding = encoding;
}
/**
* @param destination the destination to set
*/
public void setDestination(Resource destination) {
this.destination = destination;
}
/**
* @param source the source to set
*/
public void setSource(Resource source) {
this.source = source;
}
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
FileUtils.copyFile(source.getFile(), destination.getFile());
InputStream input = source.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input, encoding));
if (linesToSkip > 0) {
for (int i = 0; i < linesToSkip; i++) {
reader.readLine();
}
}
OutputStream output = new FileOutputStream(destination.getFile(), true);
IOUtils.copy(reader, output, encoding);
IOUtils.closeQuietly(reader);
IOUtils.closeQuietly(output);
return RepeatStatus.FINISHED;
}
}
/*
* Copyright 2006-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.job;
import static org.junit.Assert.assertEquals;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.junit.Test;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
/**
* @author Dave Syer
*
*/
public class FileDoublerTaskletTests {
private FileDoublerTasklet tasklet = new FileDoublerTasklet();
@Test
public void testSunnyDay() throws Exception {
Resource source = new FileSystemResource("src/test/resources/data/test.txt");
Resource destination = new FileSystemResource("target/data/double.txt");
FileUtils.deleteQuietly(destination.getFile());
tasklet.setSource(source);
tasklet.setDestination(destination);
tasklet.execute(null, null);
String input = FileUtils.readFileToString(source.getFile());
String output = FileUtils.readFileToString(destination.getFile());
assertEquals(input.length()*2, output.length());
}
@Test
public void testSunnyDayWithSkippedLine() throws Exception {
Resource source = new FileSystemResource("src/test/resources/data/test.txt");
Resource destination = new FileSystemResource("target/data/double.txt");
FileUtils.deleteQuietly(destination.getFile());
tasklet.setLinesToSkip(1);
tasklet.setSource(source);
tasklet.setDestination(destination);
tasklet.execute(null, null);
@SuppressWarnings("unchecked")
List<String> input = FileUtils.readLines(source.getFile());
@SuppressWarnings("unchecked")
List<String> output = FileUtils.readLines(destination.getFile());
assertEquals(input.size()*2-1, output.size());
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration" xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<integration:channel id="requests" />
<integration:channel id="replies" />
<integration:filter input-channel="requests" output-channel="replies" expression="payload=='foo'"
throw-exception-on-rejection="true" />
</beans>
package org.springframework.batch.admin.sample;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.CountDownLatch;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.MessageDeliveryException;
import org.springframework.integration.MessageHandlingException;
import org.springframework.integration.MessageRejectedException;
import org.springframework.integration.core.MessageHandler;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.core.SubscribableChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
public class FilterIntegrationTests implements MessageHandler {
@Autowired
private MessageChannel requests;
@Autowired
private SubscribableChannel replies;
private volatile String result;
private CountDownLatch latch = new CountDownLatch(1);
public void handleMessage(Message<?> message) throws MessageRejectedException, MessageHandlingException,
MessageDeliveryException {
result = (String) message.getPayload();
latch.countDown();
}
@Before
public void init() {
replies.subscribe(this);
}
@After
public void clean() {
replies.unsubscribe(this);
}
@Test
public void testUnfiltered() throws Exception {
MessagingTemplate gateway = new MessagingTemplate(requests);
gateway.setReceiveTimeout(500);
gateway.send(MessageBuilder.withPayload("foo").build());
assertEquals("foo", result);
}
@Test(expected = MessageRejectedException.class)
public void testFiltered() throws Exception {
MessagingTemplate gateway = new MessagingTemplate(requests);
gateway.setReceiveTimeout(500);
gateway.send(MessageBuilder.withPayload("bar").build());
assertEquals(null, result);
}
@Test
@Ignore
public void testUnfilteredWithReply() throws Exception {
MessagingTemplate gateway = new MessagingTemplate(requests);
gateway.setReceiveTimeout(500);
Message<?> reply = gateway.sendAndReceive(MessageBuilder.withPayload("foo").build());
/*
* TODO: not really a filter use case this one... the template cannot
* receive a reply if the message is sent to another channel (replies in
* this case). Shouldn't the dispatcher recognise the reply channel
* header and use it?
*/
assertEquals("foo", reply.getPayload());
}
@Test(expected = MessageRejectedException.class)
public void testFilteredWithReply() throws Exception {
MessagingTemplate gateway = new MessagingTemplate(requests);
gateway.setReceiveTimeout(500);
Message<?> reply = gateway.sendAndReceive(MessageBuilder.withPayload("bar").build());
assertEquals(null, result);
assertEquals(null, reply);
}
}
/*
* Copyright 2002-2009 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.util;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.lang.reflect.Field;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import org.springframework.http.HttpInputMessage;
import org.springframework.http.MediaType;
import org.springframework.http.converter.FormHttpMessageConverter;
import org.springframework.http.converter.HttpMessageNotReadableException;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.util.FileCopyUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
/**
* Hack to work around https://jira.springsource.org/browse/SPR-7845.
*
* @author Dave Syer
*
*/
public class FormParametersExtractor extends FormHttpMessageConverter {
private static final String FORM_CHARSET = "UTF-8";
@Override
public MultiValueMap<String, String> read(Class<? extends MultiValueMap<String, ?>> clazz,
HttpInputMessage inputMessage) throws IOException, HttpMessageNotReadableException {
MediaType contentType = inputMessage.getHeaders().getContentType();
Charset charset = contentType.getCharSet() != null ? contentType.getCharSet() : Charset.forName(FORM_CHARSET);
String body = FileCopyUtils.copyToString(new InputStreamReader(getFormBody(inputMessage), charset));
String[] pairs = StringUtils.tokenizeToStringArray(body, "&");
MultiValueMap<String, String> result = new LinkedMultiValueMap<String, String>(pairs.length);
for (String pair : pairs) {
int idx = pair.indexOf('=');
if (idx == -1) {
result.add(URLDecoder.decode(pair, charset.name()), null);
}
else {
String name = URLDecoder.decode(pair.substring(0, idx), charset.name());
String value = URLDecoder.decode(pair.substring(idx + 1), charset.name());
result.add(name, value);
}
}
return result;
}
private InputStream getFormBody(HttpInputMessage inputMessage) throws IOException {
Field field = ReflectionUtils.findField(ServletServerHttpRequest.class, "servletRequest");
ReflectionUtils.makeAccessible(field);
HttpServletRequest request = (HttpServletRequest) ReflectionUtils.getField(field, inputMessage);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Writer writer = new OutputStreamWriter(bos, FORM_CHARSET);
@SuppressWarnings("unchecked")
Map<String, String[]> form = request.getParameterMap();
for (Iterator<String> nameIterator = form.keySet().iterator(); nameIterator.hasNext();) {
String name = nameIterator.next();
List<String> values = Arrays.asList(form.get(name));
for (Iterator<String> valueIterator = values.iterator(); valueIterator.hasNext();) {
String value = valueIterator.next();
writer.write(URLEncoder.encode(name, FORM_CHARSET));
if (value != null) {
writer.write('=');
writer.write(URLEncoder.encode(value, FORM_CHARSET));
if (valueIterator.hasNext()) {
writer.write('&');
}
}
}
if (nameIterator.hasNext()) {
writer.append('&');
}
}
writer.flush();
return new ByteArrayInputStream(bos.toByteArray());
}
}
/*
* Copyright 2006-2011 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.mock.web.MockHttpServletRequest;
import org.springframework.util.MultiValueMap;
/**
* @author Dave Syer
*
*/
public class FormParametersExtractorTests {
@Test
public void testReadClass() throws Exception {
FormParametersExtractor extractor = new FormParametersExtractor();
assertTrue(extractor.canRead(MultiValueMap.class, MediaType.APPLICATION_FORM_URLENCODED));
MockHttpServletRequest request = new MockHttpServletRequest();
request.addHeader("Content-Type", MediaType.APPLICATION_FORM_URLENCODED.toString());
request.addParameter("foo", "bar");
ServletServerHttpRequest inputMessage = new ServletServerHttpRequest(request);
MultiValueMap<String, String> result = extractor.read(null, inputMessage);
assertEquals("{foo=[bar]}", result.toString());
}
}
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.proxy;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.DecimalFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.remoting.httpinvoker.HttpInvokerProxyFactoryBean;
import org.springframework.remoting.support.RemoteAccessor;
/**
* @author Dave Syer
*
*/
public class HttpInvokerProxiesFactoryBean<T> extends RemoteAccessor implements FactoryBean<Set<T>>, InitializingBean,
BeanClassLoaderAware {
private static final Log logger = LogFactory.getLog(HttpInvokerProxiesFactoryBean.class);
private Set<T> proxies = Collections.emptySet();
private Set<String> serviceUrls = new CopyOnWriteArraySet<String>();
private T localService;
private String localHostName = "localhost";
private String localPort = "";
private DecimalFormat format = new DecimalFormat("#");
public HttpInvokerProxiesFactoryBean() {
try {
localHostName = InetAddress.getLocalHost().getHostName();
}
catch (UnknownHostException e) {
logger.debug("Could not determine local hostname", e);
}
}
/**
* @param localService the localService to set
*/
public void setLocalService(T localService) {
this.localService = localService;
}
/**
* @param localHostName the localHostName to set
*/
public void setLocalHostName(String localHostName) {
this.localHostName = localHostName;
}
/**
* @param localPort the localPort to set
*/
public void setLocalPort(int port) {
this.localPort = ":" + format.format(port);
}
/**
* @param serviceUrls the serviceUrls to set
*/
public void setServiceUrls(String[] serviceUrls) {
this.serviceUrls = new CopyOnWriteArraySet<String>(Arrays.asList(serviceUrls));
}
/**
* @param serviceUrl
*/
public void addServiceUrl(String serviceUrl) {
if (!serviceUrl.startsWith("http://" + localHostName + localPort)
&& !serviceUrl.startsWith("http://localhost" + localPort)) {
this.serviceUrls.add(serviceUrl);
}
}
/**
* @see InitializingBean#afterPropertiesSet()
*/
public void afterPropertiesSet() throws Exception {
Set<String> serviceUrls = new HashSet<String>(this.serviceUrls);
this.serviceUrls.clear();
for (String serviceUrl : serviceUrls) {
addServiceUrl(serviceUrl);
}
createProxies();
}
/**
*
*/
private void createProxies() {
Set<T> result = new HashSet<T>();
for (String serviceUrl : serviceUrls) {
HttpInvokerProxyFactoryBean factory = new HttpInvokerProxyFactoryBean();
factory.setServiceInterface(getServiceInterface());
factory.setServiceUrl(serviceUrl);
factory.afterPropertiesSet();
@SuppressWarnings("unchecked")
T service = (T) factory.getObject();
logger.debug("Adding remote service of type " + getServiceInterface() + " at " + serviceUrl);
result.add(service);
}
if (localService != null) {
logger.debug("Adding local service of type " + getServiceInterface());
result.add(localService);
}
logger.debug("Total services of type " + getServiceInterface() + ": " + result.size());
proxies = new CopyOnWriteArraySet<T>(result);
}
/**
* @see FactoryBean#getObject()
*/
public Set<T> getObject() throws Exception {
return Collections.unmodifiableSet(proxies);
}
/**
* @see FactoryBean#getObjectType()
*/
public Class<?> getObjectType() {
return Set.class;
}
/**
* @see FactoryBean#isSingleton()
*/
public boolean isSingleton() {
return true;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd">
<bean id="localService" class="org.springframework.batch.admin.sample.proxy.HttpInvokerProxiesFactoryBeanConfigurationTests.LocalService" autowire-candidate="false"/>
<bean id="services" class="org.springframework.batch.admin.sample.proxy.HttpInvokerProxiesFactoryBean">
<property name="localService" ref="localService"/>
<property name="serviceInterface" value="org.springframework.batch.admin.sample.proxy.HttpInvokerProxiesFactoryBeanConfigurationTests.Service"/>
<property name="serviceUrls" value="#{properties['service.urls']}"/>
</bean>
<util:properties id="properties"><prop key="service.urls">http://localhost:8080/service,http://remote:8080/service</prop></util:properties>
</beans>
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.proxy;
import static org.junit.Assert.assertEquals;
import java.util.Set;
import javax.annotation.Resource;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @author Dave Syer
*
*/
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
public class HttpInvokerProxiesFactoryBeanConfigurationTests {
@Resource
private Set<Service> services;
@Test
public void testSunnyDayWithLocalService() throws Exception {
assertEquals(2, services.size());
}
public static interface Service {
String getMessage();
}
public static class LocalService implements Service {
public String getMessage() {
return "foo";
}
}
}
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.proxy;
import static org.junit.Assert.*;
import java.util.Set;
import org.junit.Test;
/**
* @author Dave Syer
*
*/
public class HttpInvokerProxiesFactoryBeanTests {
private HttpInvokerProxiesFactoryBean<Service> factory = new HttpInvokerProxiesFactoryBean<Service>();
@Test
public void testSunnyDayWithLocalService() throws Exception {
factory.setServiceInterface(Service.class);
LocalService localService = new LocalService();
factory.setLocalService(localService);
factory.addServiceUrl("http://localhost:8080/service/");
factory.afterPropertiesSet();
Set<Service> services = factory.getObject();
assertEquals(1, services.size());
assertEquals(localService, services.iterator().next());
}
@Test
public void testSunnyDayWithLocalServiceAndPort() throws Exception {
factory.setServiceInterface(Service.class);
LocalService localService = new LocalService();
factory.setLocalService(localService);
factory.setLocalPort(8080);
factory.addServiceUrl("http://localhost:8080/service/");
factory.addServiceUrl("http://localhost:8081/service/");
factory.afterPropertiesSet();
Set<Service> services = factory.getObject();
assertEquals(2, services.size());
assertTrue(services.contains(localService));
}
@Test
public void testSunnyDayWithNoLocalService() throws Exception {
factory.setServiceInterface(Service.class);
factory.addServiceUrl("http://remote:8080/service/");
factory.afterPropertiesSet();
Set<Service> services = factory.getObject();
assertEquals(1, services.size());
}
public interface Service {
String getMessage();
}
public class LocalService implements Service {
public String getMessage() {
return "foo";
}
}
}
package org.springframework.batch.admin.sample;
import org.junit.Ignore;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;
import org.springframework.batch.admin.web.sample.views.StatusViewTests;
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* A test suite that is ignored, but can be resurrected to help debug ordering issues in tests.
*
* @author Dave Syer
*
*/
@RunWith(Suite.class)
@SuiteClasses(value = { StatusViewTests.class,
BroadcastJobLaunchRequestsIntegrationTests.class })
@Ignore
public class IgnoredTestSuite {
}
<jsp:forward page="/home" />
/*
* Copyright 2006-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.beans.BeansException;
import org.springframework.beans.TypeConverter;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.support.AbstractBeanFactory;
import org.springframework.core.io.Resource;
/**
* @author Dave Syer
*
*/
public class InputFileFilter implements BeanFactoryAware {
private static final Log logger = LogFactory.getLog(InputFileFilter.class);
private TypeConverter typeConverter;
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.typeConverter = ((AbstractBeanFactory)beanFactory).getTypeConverter();
}
public boolean cannotProcess(JobLaunchRequest request) {
return !canProcess(request);
}
public boolean canProcess(JobLaunchRequest request) {
JobParameters jobParameters = request.getJobParameters();
if (!jobParameters.getParameters().containsKey("input.file")) {
if (logger.isDebugEnabled()) {
logger.debug("Message has no matching job parameter.");
}
return true;
}
String location = jobParameters.getString("input.file");
if (typeConverter.convertIfNecessary(location, Resource.class).exists()) {
if (logger.isDebugEnabled()) {
logger.debug("File exists (" + location + ").");
}
return true;
}
if (logger.isDebugEnabled()) {
logger.debug("Input file does not exist (" + location + ").");
}
return false;
}
}
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.context.support.StaticApplicationContext;
/**
* @author Dave Syer
*
*/
public class InputFileFilterTests {
private InputFileFilter filter = new InputFileFilter();
@Test
public void testFilterConversionFileExists() throws Exception {
filter.setBeanFactory(new StaticApplicationContext().getBeanFactory());
assertTrue(filter.canProcess(new JobLaunchRequest(null, new JobParametersBuilder().addString("input.file",
"classpath:/log4j.properties").toJobParameters())));
}
@Test
public void testFilterConversionFileMissing() throws Exception {
filter.setBeanFactory(new StaticApplicationContext().getBeanFactory());
assertFalse(filter.canProcess(new JobLaunchRequest(null, new JobParametersBuilder().addString("input.file",
"files:/garbage").toJobParameters())));
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans" xmlns:integration="http://www.springframework.org/schema/integration"
xmlns:http="http://www.springframework.org/schema/integration/http"
xsi:schemaLocation="http://www.springframework.org/schema/integration/http http://www.springframework.org/schema/integration/http/spring-integration-http-2.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<http:inbound-channel-adapter name="/ping" channel="pre-ping" view-name="redirect:status"
supported-methods="GET,POST" header-mapper="inboundHeaderMapper" />
<http:inbound-channel-adapter name="/ping.json" channel="pre-ping" view-name="redirect:status.json"
supported-methods="GET,POST" header-mapper="inboundHeaderMapper" />
<http:inbound-channel-adapter name="/pings" channel="pings" view-name="redirect:status"
supported-methods="POST" />
<!-- curl -d urls="http://localhost:8080/cloud-sample/ping&urls=http://localhost:8081/cloud-sample/ping" http://localhost:8080/cloud-sample/pings.json -->
<http:inbound-channel-adapter name="/pings.json" channel="pings" view-name="redirect:status.json"
supported-methods="POST" message-converters="formParametersExtractor" request-payload-type="org.springframework.util.MultiValueMap"/>
<bean id="formParametersExtractor" class="org.springframework.batch.admin.sample.util.FormParametersExtractor"/>
<integration:channel id="pings" />
<!-- This chain takes an input header "urls" and broadcasts a ping to all of them -->
<chain input-channel="pings" output-channel="ping" xmlns="http://www.springframework.org/schema/integration">
<header-enricher>
<header name="request" expression="@broadcastService.ping()"/>
</header-enricher>
<splitter expression="payload.urls" />
<header-enricher>
<header name="http_responseUrl" expression="headers['http_requestUrl'].replaceAll('pings','ping')" overwrite="true"/>
<header name="http_requestUrl" expression="payload" overwrite="true"/>
</header-enricher>
<transformer expression="headers.request" />
</chain>
<bean id="inboundHeaderMapper" class="org.springframework.integration.http.support.DefaultHttpHeaderMapper">
<property name="inboundHeaderNames" value="x-http_responseurl" />
</bean>
<integration:header-enricher input-channel="pre-ping" output-channel="ping"
ref="requestResponseHeaderEnricher" method="enrich" default-overwrite="true" />
<bean id="requestResponseHeaderEnricher" class="org.springframework.batch.admin.sample.RequestResponseHeaderEnricher" />
<integration:channel id="pre-ping" />
<http:outbound-gateway request-channel="pong" reply-channel="nullChannel" url="{url}"
request-timeout="1000" mapped-request-headers="http_responseUrl">
<http:uri-variable name="url" expression="headers['http_requestUrl']" />
</http:outbound-gateway>
<bean name="/service/files" class="org.springframework.remoting.httpinvoker.HttpInvokerServiceExporter">
<property name="serviceInterface" value="org.springframework.batch.admin.service.FileService" />
<property name="service" ref="localFileService" />
</bean>
<bean name="/service/jobs" class="org.springframework.remoting.httpinvoker.HttpInvokerServiceExporter">
<property name="serviceInterface" value="org.springframework.batch.admin.sample.relay.Relay" />
<property name="service" ref="localJobRequestHandler" />
</bean>
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration" xmlns:http="http://www.springframework.org/schema/integration/http"
xsi:schemaLocation="http://www.springframework.org/schema/integration/http http://www.springframework.org/schema/integration/http/spring-integration-http-2.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<http:inbound-channel-adapter name="/ping" channel="ping" view-name="redirect:status" />
<http:outbound-gateway request-channel="pong" reply-channel="nullChannel" url="{url}" request-timeout="1000">
<http:uri-variable name="url" expression="headers['http_requestUrl']"/>
</http:outbound-gateway>
</beans>
/*
* Copyright 2006-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.job;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.jdbc.core.namedparam.BeanPropertySqlParameterSource;
import org.springframework.jdbc.core.simple.SimpleJdbcTemplate;
import org.springframework.util.Assert;
/**
* @author Dave Syer
*
*/
public class JdbcTasklet implements Tasklet, InitializingBean {
private static final Log logger = LogFactory.getLog(JdbcTasklet.class);
private SimpleJdbcTemplate jdbcTemplate;
private String sql;
/**
* An SQL query to execute in the tasklet. The query can be a select,
* update, delete or insert, and it can contain embedded query parameters
* using a {@link BeanPropertySqlParameterSource} whose root context is the
* step context. So for example
*
* <pre>
* DELETE from LEAD_INPUTS where ID=:jobParameters[idToDelete]
* </pre>
*
* Note that the syntax for the named parameters is different from and not
* as flexible as Spring EL. So it might be better anyway if possible to
* use late binding to push step context properties into the query, e.g.
* this will work in a bean definition which is step scoped:
*
* <pre>
* &lt;bean id="tasklet" class="org...JdbcTasklet" scope="step"&gt;
* &lt;property name="sql"&gt;
* &lt;value&gt;
* DELETE from LEAD_INPUTS where ID=#{jobParameters['i.to.delete']?:-1}
* &lt;/value&gt;
* &lt;/property&gt;
* &lt;/bean&gt;
* </pre>
*
* @see BeanPropertySqlParameterSource
*
* @param sql the sql to set
*/
public void setSql(String sql) {
this.sql = sql;
}
/**
* @param dataSource the dataSource to set
*/
public void setDataSource(DataSource dataSource) {
this.jdbcTemplate = new SimpleJdbcTemplate(dataSource);
}
public void afterPropertiesSet() throws Exception {
Assert.state(jdbcTemplate != null, "A DataSource must be provided");
Assert.state(sql != null, "A SQL query must be provided");
}
/**
* Execute the {@link #setSql(String) SQL query} provided. If the query
* starts with "select" (case insensitive) the result is a list of maps,
* which is logged and added to the step execution exit status. Otherwise
* the query is executed and the result is an indication, also in the exit
* status, of the number of rows updated.
*/
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
ExitStatus exitStatus = stepExecution.getExitStatus();
if (sql.trim().toUpperCase().startsWith("SELECT")) {
logger.debug("Executing: " + sql);
List<Map<String, Object>> result = jdbcTemplate.queryForList(sql, new BeanPropertySqlParameterSource(
chunkContext.getStepContext()));
String msg = "Result: " + result;
logger.debug(msg);
stepExecution.setExitStatus(exitStatus.addExitDescription(msg));
}
else {
logger.debug("Updating : " + sql);
int updated = jdbcTemplate.update(sql, new BeanPropertySqlParameterSource(chunkContext.getStepContext()));
String msg = "Updated: " + updated + " rows";
logger.debug(msg);
stepExecution.setExitStatus(exitStatus.addExitDescription(msg));
}
return RepeatStatus.FINISHED;
}
}
/*
* Copyright 2006-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.converter.DefaultJobParametersConverter;
import org.springframework.batch.core.converter.JobParametersConverter;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.batch.support.PropertiesConverter;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Transformer;
/**
* @author Dave Syer
*
*/
@MessageEndpoint
public class JobRequestToStringAdapter {
private static final Log logger = LogFactory.getLog(JobRequestToStringAdapter.class);
private JobParametersConverter converter = new DefaultJobParametersConverter();
@Transformer
public String transform(JobLaunchRequest request) {
StringBuilder buffer = new StringBuilder(request.getJob().getName());
buffer.append("[").append(
PropertiesConverter.propertiesToString(converter.getProperties(request.getJobParameters())))
.append("]");
if (logger.isDebugEnabled()) {
logger.debug("Converted request: "+buffer);
}
return buffer.toString();
}
}
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.util;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.MappingJsonFactory;
import org.springframework.context.expression.MapAccessor;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
public class JsonWrapper {
private final SpelExpressionParser parser;
private final StandardEvaluationContext context;
private Object target;
private final String content;
public JsonWrapper(String content) throws Exception {
this.content = content;
try {
target = new MappingJsonFactory().createJsonParser(
content.replace("\\", "/")).readValueAs(Map.class);
} catch (JsonParseException e) {
throw new JsonMappingException("Cannot create wrapper for:\n"
+ content, e);
} catch (JsonMappingException e) {
try {
List<?> list = new MappingJsonFactory().createJsonParser(
content.replace("\\", "/")).readValueAs(List.class);
HashMap<String, Object> map = new HashMap<String, Object>();
if (list.isEmpty()) {
target = map;
} else {
Object object = list.get(0);
@SuppressWarnings("unchecked")
boolean isNamedMap = object instanceof Map && ((Map<String,?>)object).containsKey("name");
if (isNamedMap) {
target = map;
for (Object item : list ) {
@SuppressWarnings("unchecked")
String key = (String) ((Map<String,?>)item).get("name");
map.put(key, item);
}
}
}
System.err.println(list.get(0).getClass());
} catch (JsonParseException ex) {
throw new JsonMappingException("Cannot create wrapper for:\n"
+ content, ex);
}
}
context = new StandardEvaluationContext();
context.addPropertyAccessor(new MapAccessor());
parser = new SpelExpressionParser();
}
@SuppressWarnings("unchecked")
public Map<String, Object> getMap() {
return (Map<String, Object>) target;
}
public Object get(String expression) throws Exception {
return get(expression, Object.class);
}
public <T> T get(String expression, Class<T> type) throws Exception {
return parser.parseExpression(expression).getValue(context, target,
type);
}
@Override
public String toString() {
return content;
}
}
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.lead;
public class Lead {
private Long id;
private Client client;
private Product product;
private String salesRep = "Nobody";
private double amount;
private String query;
public Lead(Lead other) {
this(other.id, other.client, other.product);
salesRep = other.salesRep;
amount = other.amount;
query = other.query;
}
public Lead(Long id, Client client, Product product) {
this.id = id;
this.client = client;
this.product = product;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Client getClient() {
return client;
}
public void setClient(Client client) {
this.client = client;
}
public Product getProduct() {
return product;
}
public void setProduct(Product product) {
this.product = product;
}
public String getSalesRep() {
return salesRep;
}
public void setSalesRep(String salesRep) {
this.salesRep = salesRep;
}
public double getAmount() {
return amount;
}
public void setAmount(double amount) {
this.amount = amount;
}
public String getQuery() {
return query;
}
public void setQuery(String query) {
this.query = query;
}
@Override
public String toString() {
return String.format("[%s: id=%d, client=%s, product=%s, sales=%s]", getClass().getSimpleName(), id, client, product, salesRep);
}
}
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.lead.support;
import org.springframework.batch.admin.sample.lead.Client;
import org.springframework.batch.admin.sample.lead.Lead;
import org.springframework.batch.admin.sample.lead.Product;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
public class LeadFieldSetMapper implements FieldSetMapper<Lead> {
public Lead mapFieldSet(FieldSet fieldSet) {
Lead lead = new Lead(fieldSet.readLong("id"), new Client(fieldSet.readString("client.name"), fieldSet
.readString("client.country")), new Product(fieldSet.readString("product.name")));
lead.setAmount(fieldSet.readDouble("amount"));
lead.setQuery(fieldSet.readString("query"));
return lead;
}
}
/*
* Copyright 2006-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.lead.support;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.batch.admin.sample.lead.Lead;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.dao.IncorrectResultSizeDataAccessException;
import org.springframework.jdbc.core.simple.SimpleJdbcTemplate;
import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
/**
* @author Dave Syer
*
*/
public class LeadUnstagingItemProcessor implements ItemProcessor<Map<String,?>, Lead> {
private SimpleJdbcTemplate jdbcTemplate;
private DataFieldMaxValueIncrementer incrementer;
public void setDataSource(DataSource dataSource) {
jdbcTemplate = new SimpleJdbcTemplate(dataSource);
}
public void setIncrementer(DataFieldMaxValueIncrementer incrementer) {
this.incrementer = incrementer;
}
public Lead process(Map<String, ?> item) throws Exception {
String data = (String) item.get("DATA");
FieldSetMapper<Lead> mapper = new LeadFieldSetMapper();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[] {"id","client.name","client.country","product.name","amount","query"});
FieldSet fieldSet = tokenizer.tokenize(data);
long id = incrementer.nextLongValue();
Lead lead = mapper.mapFieldSet(fieldSet);
lead.setId(id);
int updated = jdbcTemplate.update("UPDATE LEAD_INPUTS set PROCESSED=true WHERE ID=?", item.get("ID"));
if (updated!=1) {
throw new IncorrectResultSizeDataAccessException(1);
}
return lead;
}
}
log4j.rootCategory=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - %m%n
log4j.category.org.springframework.batch.admin=DEBUG
log4j.category.org.springframework.integration.channel=DEBUG
log4j.category.org.springframework.integration.http=DEBUG
log4j.category.org.springframework.jdbc=INFO
log4j.category.org.springframework.remoting=DEBUG
log4j.category.org.springframework.transaction=INFO
# for debugging datasource initialization
#log4j.category.test.jdbc=DEBUG
#log4j.category.org.apache.activemq=DEBUG
Manifest-Version: 1.0
package org.springframework.batch.admin.sample.web;
import java.io.IOException;
import java.io.Writer;
import java.util.Date;
import java.util.Properties;
import org.springframework.batch.admin.sample.BroadcastService;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.util.DefaultPropertiesPersister;
import org.springframework.web.bind.annotation.ModelAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
@Controller
public class MasterController implements BeanFactoryAware {
@Autowired
private BroadcastService service;
private Properties properties;
private BeanFactory beanFactory;
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
@RequestMapping("/cluster")
public @ModelAttribute("info") BroadcastInfo status() {
return new BroadcastInfo(service.getId(), service.getPeers());
}
@RequestMapping("/cluster.properties")
public void properties(Writer writer) throws IOException {
if (properties==null) {
// Dependency lookup to prevent possible blocking when context is loaded
properties = beanFactory.getBean("clusterProperties", Properties.class);
}
if (properties==null) {
properties = new Properties();
}
new DefaultPropertiesPersister().store(properties, writer, "# cluster properties "+new Date());
}
}
/*
* Copyright 2006-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.web;
import static org.junit.Assert.*;
import java.io.StringWriter;
import java.io.Writer;
import org.junit.Test;
import org.springframework.batch.admin.sample.BroadcastService;
import org.springframework.batch.support.PropertiesConverter;
import org.springframework.test.util.ReflectionTestUtils;
/**
* @author Dave Syer
*
*/
public class MasterControllerTests {
private MasterController controller = new MasterController();
@Test
public void testStatus() {
ReflectionTestUtils.setField(controller, "service", new BroadcastService());
assertNotNull(controller.status());
}
@Test
public void testProperties() throws Exception {
ReflectionTestUtils.setField(controller, "properties", PropertiesConverter.stringToProperties("foo=bar"));
Writer writer = new StringWriter();
controller.properties(writer);
assertTrue(writer.toString().contains("foo=bar"));
}
}
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.web;
import org.springframework.batch.admin.web.resources.BaseMenu;
import org.springframework.stereotype.Component;
@Component
public class MasterMenu extends BaseMenu {
public MasterMenu() {
super("/cluster", "Master", Integer.MIN_VALUE+1);
}
}
title=Cloud Ping Test
company.url=http://www.springsource.com
company.name=SpringSource
product.url=http://www.springsource.com
product.name=Cloud Ping
copyright=Copyright 2009-2010 SpringSource. All Rights Reserved.
company.contact.url=info@springsource.com
company.contact=Contact SpringSource
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration" xmlns:http="http://www.springframework.org/schema/integration/http"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/integration/http http://www.springframework.org/schema/integration/http/spring-integration-http-2.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd">
<integration:channel id="ping" />
<integration:channel id="pong" />
<integration:gateway id="pinger" service-interface="org.springframework.batch.admin.sample.Pinger"
default-request-channel="ping" />
<chain xmlns="http://www.springframework.org/schema/integration" input-channel="ping" output-channel="pong">
<service-activator ref="service-one" />
<filter expression="payload?.message=='ping'"/>
</chain>
<chain xmlns="http://www.springframework.org/schema/integration" input-channel="ping" output-channel="pong">
<service-activator ref="service-two" />
<filter expression="payload?.message=='ping'"/>
</chain>
<chain xmlns="http://www.springframework.org/schema/integration" input-channel="ping" output-channel="pong">
<service-activator ref="service-three" />
<filter expression="payload?.message=='ping'"/>
</chain>
<chain xmlns="http://www.springframework.org/schema/integration" input-channel="ping" output-channel="pong">
<service-activator ref="service-four" />
<filter expression="payload?.message=='ping'"/>
</chain>
<integration:bridge input-channel="pong" output-channel="ping"/>
<bean id="service-one" class="org.springframework.batch.admin.sample.BroadcastService"/>
<bean id="service-two" parent="service-one"/>
<bean id="service-three" parent="service-one"/>
<bean id="service-four" parent="service-one"/>
</beans>
package org.springframework.batch.admin.sample;
import static org.junit.Assert.assertEquals;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
public class PeerIntegrationTests {
private Log logger = LogFactory.getLog(getClass());
@Autowired
@Qualifier("service-one")
private BroadcastService service;
@Autowired
private Pinger pinger;
@Autowired
private List<BroadcastService> services;
@Test
public void testPeer() throws Exception {
pinger.ping(new BroadcastRequest("ping", service.getId(), Collections.<BroadcastId> emptySet()));
assertEquals(services.size(), service.getPeers().size());
for (BroadcastService other : services) {
logger.debug(String.format("Service: id=%s, ready=%s, peers=%s", other.getId(), other.isPinged(),
other.getPeers()));
assertEquals("Service: id=" + other.getId(), services.size(), other.getPeers().size());
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration" xmlns:http="http://www.springframework.org/schema/integration/http"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/integration/http http://www.springframework.org/schema/integration/http/spring-integration-http-2.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd">
<integration:message-history tracked-components="one,two,three,four,five"/>
<integration:channel id="pings" />
<integration:channel id="ping" />
<integration:channel id="pong" />
<integration:gateway id="pingers" service-interface="org.springframework.batch.admin.sample.Pingers"
default-request-channel="pings" />
<integration:chain input-channel="pings" output-channel="ping">
<integration:header-enricher>
<integration:header name="request" expression="@serviceOne.ping()" />
</integration:header-enricher>
<integration:splitter expression="headers['urls']"/>
<integration:header-enricher>
<integration:header name="http_requestUrl" expression="payload" />
</integration:header-enricher>
<integration:transformer expression="headers.request" />
</integration:chain>
<chain id="one" xmlns="http://www.springframework.org/schema/integration" input-channel="ping" output-channel="pong">
<service-activator ref="serviceOne" />
<filter expression="payload?.message=='ping'" />
</chain>
<chain id="two" xmlns="http://www.springframework.org/schema/integration" input-channel="ping" output-channel="pong">
<service-activator ref="serviceTwo" />
<filter expression="payload?.message=='ping'" />
</chain>
<chain id="three" xmlns="http://www.springframework.org/schema/integration" input-channel="ping" output-channel="pong">
<service-activator ref="serviceThree" />
<filter expression="payload?.message=='ping'" />
</chain>
<chain id="four" xmlns="http://www.springframework.org/schema/integration" input-channel="ping" output-channel="pong">
<service-activator ref="serviceFour" />
<filter expression="payload?.message=='ping'" />
</chain>
<chain id="five" xmlns="http://www.springframework.org/schema/integration" input-channel="ping" output-channel="pong">
<service-activator ref="serviceFive" />
<filter expression="payload?.message=='ping'" />
</chain>
<integration:bridge input-channel="pong" output-channel="ping" />
<bean id="serviceOne" class="org.springframework.batch.admin.sample.BroadcastService" />
<bean id="serviceTwo" parent="serviceOne" />
<bean id="serviceThree" parent="serviceOne" />
<bean id="serviceFour" parent="serviceOne" />
<bean id="serviceFive" parent="serviceOne" />
</beans>
package org.springframework.batch.admin.sample;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
public class PeersIntegrationTests {
private Log logger = LogFactory.getLog(getClass());
@Autowired
@Qualifier("serviceOne")
private BroadcastService service;
@Autowired
private Pingers pingers;
@Autowired
private List<BroadcastService> services;
@Test
public void testTwoPeers() throws Exception {
doTest("serviceOne", "serviceTwo");
}
@Test
public void testThreePeers() throws Exception {
doTest("serviceOne", "serviceTwo", "serviceThree");
}
@Test
public void testFourPeers() throws Exception {
doTest("serviceOne", "serviceTwo", "serviceThree", "serviceFour");
}
@Test
public void testFivePeers() throws Exception {
doTest("serviceOne", "serviceTwo", "serviceThree", "serviceFour", "serviceFive");
}
private void doTest(String... input) {
List<String> urls = Arrays.asList(input);
pingers.ping("ping", urls);
assertEquals(services.size(), service.getPeers().size());
int count = 0;
for (BroadcastService other : services) {
logger.debug(String.format("Service: id=%s, ready=%s, peers=%s", other.getId(), other.isPinged(),
other.getPeers()));
if (!urls.contains(other.getId().getName())) {
continue;
}
count++;
assertEquals("Service: id=" + other.getId(), services.size(), other.getPeers().size());
}
assertEquals(urls.size(), count);
for (BroadcastService other : services) {
if (!urls.contains(other.getId().getName())) {
continue;
}
assertTrue("Service: id=" + other.getId(), other.isPinged());
other.reset();
}
}
}
package org.springframework.batch.admin.sample;
import org.springframework.integration.annotation.Gateway;
public interface Pinger {
@Gateway
public void ping(BroadcastRequest broadcastRequest);
}
package org.springframework.batch.admin.sample;
import java.util.Collection;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.Header;
public interface Pingers {
@Gateway
public void ping(String message, @Header("urls") Collection<String> urls);
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-sample</artifactId>
<description>A sample web application (WAR project) for Spring Batch Admin console.</description>
<parent>
<artifactId>spring-batch-admin-parent</artifactId>
<groupId>org.springframework.batch</groupId>
<version>1.2.1.BUILD-SNAPSHOT</version>
</parent>
<properties>
<spring.integration.version>2.0.3.RELEASE</spring.integration.version>
<spring.framework.version>3.0.5.RELEASE</spring.framework.version>
<!-- Override this with -Denvironment=XXXX to change database type -->
<environment>hsql</environment>
</properties>
<packaging>war</packaging>
<name>cloud-sample</name>
<dependencies>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-metadata-job</artifactId>
<version>2.0.0.CI-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-infrastructure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-http</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
<version>${spring.integration.version}</version>
</dependency>
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib-nodep</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.6</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-admin-manager</artifactId>
<version>${pom.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.mortbay.jetty</groupId>
<artifactId>maven-jetty-plugin</artifactId>
<configuration>
<contextPath>/cloud-sample</contextPath>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemProperties>
<property>
<name>ENVIRONMENT</name>
<value>${environment}</value>
</property>
</systemProperties>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<!-- http://maven.apache.org/plugins/maven-compiler-plugin/ -->
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Maven Milestone Repository</name>
<url>http://s3.amazonaws.com/maven.springframework.org/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.lead;
public class Product {
private String name;
public Product() {
this("None");
}
public Product(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return String.format("[%s: name=%s]", getClass().getSimpleName(), name);
}
}
/*
* Copyright 2006-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample;
import java.io.IOException;
import java.io.StringReader;
import java.util.Map;
import java.util.Properties;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.ui.freemarker.FreeMarkerConfigurationFactory;
import org.springframework.ui.freemarker.FreeMarkerTemplateUtils;
import freemarker.template.Configuration;
import freemarker.template.Template;
import freemarker.template.TemplateException;
/**
* @author Dave Syer
*
*/
public class PropertiesLoader implements InitializingBean {
private volatile boolean initialised = false;
private Object lock = new Object();
private Configuration configuration;
public void afterPropertiesSet() {
if (!initialised) {
init();
}
}
private void init() {
synchronized (lock) {
if (!initialised) {
FreeMarkerConfigurationFactory factory = new FreeMarkerConfigurationFactory();
factory.setPreferFileSystemAccess(false);
factory.setTemplateLoaderPath("classpath:/template/properties");
try {
configuration = factory.createConfiguration();
}
catch (Exception e) {
throw new IllegalStateException("Could not create FreeMarker Configuration", e);
}
}
}
}
public Properties getProperties(String locator, Map<String, Object> model) {
if (!initialised) {
init();
}
if (!locator.endsWith(".ftl")) {
locator = locator + ".ftl";
}
Properties properties = new Properties();
try {
Template template = configuration.getTemplate(locator);
properties.load(new StringReader(FreeMarkerTemplateUtils.processTemplateIntoString(template, model)));
}
catch (IOException e) {
throw new IllegalStateException("Could not load template: " + locator, e);
}
catch (TemplateException e) {
throw new IllegalStateException("Could not load template: " + locator, e);
}
return properties;
}
}
/*
* Copyright 2006-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample;
import static org.junit.Assert.assertEquals;
import java.util.HashMap;
import java.util.Properties;
import org.junit.Test;
/**
* @author Dave Syer
*
*/
public class PropertiesLoaderTests {
@Test
public void testGetProperties() throws Exception {
PropertiesLoader service = new PropertiesLoader();
HashMap<String, Object> model = new HashMap<String, Object>();
model.put("foo", "bar");
Properties properties = service.getProperties("test", model);
assertEquals("bar", properties.getProperty("foo"));
}
}
/*
* Copyright 2002-2009 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.scope;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.scope.ScopedProxyUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.config.BeanDefinitionVisitor;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.Scope;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.expression.BeanFactoryAccessor;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.ParseException;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.util.StringValueResolver;
/**
* <p>
* A Scope implementation that allows for beans to be refreshed dynamically at
* runtime (see {@link #refresh(String)} and {@link #refreshAll()}). If a bean
* is refreshed then the next time the bean is accessed (i.e. a method is
* executed) a new instance is created. All lifecycle methods are applied to the
* bean instances, so any destruction callbacks that were registered in the bean
* factory are called when it is refreshed, and then the initialization
* callbacks are invoked as normal when the new instance is created. A new bean
* instance is created from the original bean definition, so any externalized
* content (property placeholders or expressions in string literals) is
* re-evaluated when it is created.
* </p>
*
* <p>
* Note that all beans in this scope are <em>only</em> initialized when first
* accessed, so the scope forces lazy initialization semantics. The
* implementation involves creating a proxy for every bean in the scope, so
* there is a flag {@link #setProxyTargetClass(boolean) proxyTargetClass} which
* controls the proxy creation, defaulting to JDK dynamic proxies and therefore
* only exposing the interfaces implemented by a bean. If callers need access to
* other methods then the flag needs to be set (and CGLib present on the
* classpath). Because this scope automatically proxies all its beans, there is
* no need to add <code>&lt;aop:auto-proxy/&gt;</code> to any bean definitions.
* </p>
*
* <p>
* The scoped proxy approach adopted here has a side benefit that bean instances
* are automatically {@link Serializable}, and can be sent across the wire as
* long as the receiver has an identical application context on the other side.
* To ensure that the two contexts agree that they are identical they have to
* have the same serialization id. One will be generated automatically by
* default from the bean names, so two contexts with the same bean names are by
* default able to exchange beans by name. If you need to override the default
* id then provide an explicit {@link #setId(String) id} when the Scope is
* declared.
* </p>
*
* @author Dave Syer
*
* @since 3.1
*
*/
@ManagedResource
public class RefreshScope implements Scope, BeanFactoryPostProcessor, DisposableBean {
private static final Log logger = LogFactory.getLog(RefreshScope.class);
private ConcurrentMap<String, BeanCallbackWrapper> cache = new ConcurrentHashMap<String, BeanCallbackWrapper>();
private String name = "refresh";
private boolean proxyTargetClass = false;
private ConfigurableListableBeanFactory beanFactory;
private StandardEvaluationContext evaluationContext;
private String id;
/**
* Manual override for the serialization id that will be used to identify
* the bean factory. The default is a unique key based on the bean names in
* the bean factory.
*
* @param id the id to set
*/
public void setId(String id) {
this.id = id;
}
/**
* The name of this scope. Default "refresh".
*
* @param name the name value to set
*/
public void setName(String name) {
this.name = name;
}
/**
* Flag to indicate that proxies should be created for the concrete type,
* not just the interfaces, of the scoped beans.
*
* @param proxyTargetClass the flag value to set
*/
public void setProxyTargetClass(boolean proxyTargetClass) {
this.proxyTargetClass = proxyTargetClass;
}
public void destroy() throws Exception {
refreshAll();
}
public Object get(String name, ObjectFactory<?> objectFactory) {
BeanCallbackWrapper value = new BeanCallbackWrapper(name, objectFactory, proxyTargetClass);
BeanCallbackWrapper result = cache.putIfAbsent(name, value);
value = result == null ? value : result;
return value.getBean();
}
public String getConversationId() {
return name;
}
public void registerDestructionCallback(String name, Runnable callback) {
BeanCallbackWrapper value = cache.get(name);
if (value == null) {
return;
}
value.setCallback(callback);
}
public Object remove(String name) {
BeanCallbackWrapper value = cache.get(name);
if (value == null) {
return null;
}
return cache.remove(name, value);
}
public Object resolveContextualObject(String key) {
Expression expression = parseExpression(key);
return expression.getValue(evaluationContext, beanFactory);
}
private Expression parseExpression(String input) {
if (StringUtils.hasText(input)) {
ExpressionParser parser = new SpelExpressionParser();
try {
return parser.parseExpression(input);
}
catch (ParseException e) {
throw new IllegalArgumentException("Cannot parse expression: " + input, e);
}
}
else {
return null;
}
}
@ManagedOperation(description = "Dispose of the current instance of bean name provided and force a refresh on next method execution.")
public void refresh(String name) {
logger.debug("Refreshing bean: " + name);
BeanCallbackWrapper wrapper = cache.remove(name);
if (wrapper != null) {
wrapper.destroy();
}
}
@ManagedOperation(description = "Dispose of the current instance of all beans in this scope and force a refresh on next method execution.")
public void refreshAll() {
logger.debug("Refreshing all beans");
List<Throwable> errors = new ArrayList<Throwable>();
Collection<BeanCallbackWrapper> wrappers = new HashSet<BeanCallbackWrapper>(cache.values());
cache.clear();
for (BeanCallbackWrapper wrapper : wrappers) {
try {
wrapper.destroy();
}
catch (RuntimeException e) {
errors.add(e);
}
}
if (!errors.isEmpty()) {
throw wrapIfNecessary(errors.get(0));
}
}
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
beanFactory.registerScope(name, this);
setSerializationId(beanFactory);
this.beanFactory = beanFactory;
evaluationContext = new StandardEvaluationContext();
evaluationContext.addPropertyAccessor(new BeanFactoryAccessor());
Assert.state(beanFactory instanceof BeanDefinitionRegistry,
"BeanFactory was not a BeanDefinitionRegistry, so RefreshScope cannot be used.");
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
for (String beanName : beanFactory.getBeanDefinitionNames()) {
BeanDefinition definition = beanFactory.getBeanDefinition(beanName);
// Replace this or any of its inner beans with scoped proxy if it
// has this scope
boolean scoped = name.equals(definition.getScope());
Scopifier scopifier = new Scopifier(registry, name, proxyTargetClass, scoped);
scopifier.visitBeanDefinition(definition);
if (scoped) {
createScopedProxy(beanName, definition, registry, proxyTargetClass);
}
}
}
/**
* If the bean factory is a DefaultListableBeanFactory then it can serialize
* scoped beans and deserialize them in another context (even in another
* JVM), as long as the ids of the bean factories match. This method sets up
* the serialization id to be either the id provided to the scope instance,
* or if that is null, a hash of all the bean names.
*
* @param beanFactory the bean factory to configure
*/
private void setSerializationId(ConfigurableListableBeanFactory beanFactory) {
if (beanFactory instanceof DefaultListableBeanFactory) {
String id = this.id;
if (id == null) {
String names = Arrays.asList(beanFactory.getBeanDefinitionNames()).toString();
logger.debug("Generating bean factory id from names: " + names);
id = UUID.nameUUIDFromBytes(names.getBytes()).toString();
}
logger.info("BeanFactory id=" + id);
((DefaultListableBeanFactory) beanFactory).setSerializationId(id);
}
else {
logger.warn("BeanFactory was not a DefaultListableBeanFactory, so RefreshScope beans "
+ "cannot be serialized reliably and passed to a remote JVM.");
}
}
private static RuntimeException wrapIfNecessary(Throwable throwable) {
if (throwable instanceof RuntimeException) {
return (RuntimeException) throwable;
}
if (throwable instanceof Error) {
throw (Error) throwable;
}
return new IllegalStateException(throwable);
}
private static BeanDefinitionHolder createScopedProxy(String beanName, BeanDefinition definition,
BeanDefinitionRegistry registry, boolean proxyTargetClass) {
BeanDefinitionHolder proxyHolder = ScopedProxyUtils.createScopedProxy(new BeanDefinitionHolder(definition,
beanName), registry, proxyTargetClass);
registry.registerBeanDefinition(beanName, proxyHolder.getBeanDefinition());
return proxyHolder;
}
/**
* Helper class to scan a bean definition hierarchy and force the use of
* auto-proxy for scoped beans.
*
* @author Dave Syer
*
*/
private static class Scopifier extends BeanDefinitionVisitor {
private final boolean proxyTargetClass;
private final BeanDefinitionRegistry registry;
private final String scope;
private final boolean scoped;
public Scopifier(BeanDefinitionRegistry registry, String scope, boolean proxyTargetClass, boolean scoped) {
super(new StringValueResolver() {
public String resolveStringValue(String value) {
return value;
}
});
this.registry = registry;
this.proxyTargetClass = proxyTargetClass;
this.scope = scope;
this.scoped = scoped;
}
@Override
protected Object resolveValue(Object value) {
BeanDefinition definition = null;
String beanName = null;
if (value instanceof BeanDefinition) {
definition = (BeanDefinition) value;
beanName = BeanDefinitionReaderUtils.generateBeanName(definition, registry);
}
else if (value instanceof BeanDefinitionHolder) {
BeanDefinitionHolder holder = (BeanDefinitionHolder) value;
definition = holder.getBeanDefinition();
beanName = holder.getBeanName();
}
if (definition != null) {
boolean nestedScoped = scope.equals(definition.getScope());
boolean scopeChangeRequiresProxy = !scoped && nestedScoped;
if (scopeChangeRequiresProxy) {
// Exit here so that nested inner bean definitions are not
// analysed
return createScopedProxy(beanName, definition, registry, proxyTargetClass);
}
}
// Nested inner bean definitions are recursively analysed here
value = super.resolveValue(value);
return value;
}
}
/**
* Wrapper for a bean instance and any destruction callback (DisposableBean
* etc.) that is registered for it. If the bean is disposable, the wrapper
* also guards access to the bean: a read lock (allowing concurrent access)
* is taken for all method executions except the destruction callback, which
* uses a write lock.
*
* @author Dave Syer
*
*/
private static class BeanCallbackWrapper {
private Object bean;
private Runnable callback;
private ReadWriteLock lock;
private final String name;
private final ObjectFactory<?> objectFactory;
private final boolean proxyTargetClass;
public BeanCallbackWrapper(String name, ObjectFactory<?> objectFactory, boolean proxyTargetClass) {
this.name = name;
this.objectFactory = objectFactory;
this.proxyTargetClass = proxyTargetClass;
}
public void setCallback(Runnable callback) {
this.callback = callback;
}
public Object getBean() {
if (bean == null) {
bean = objectFactory.getObject();
if (callback != null) {
lock = new ReentrantReadWriteLock();
bean = getDisposalLockProxy(bean, lock.readLock());
}
}
return bean;
}
public void destroy() {
if (callback == null) {
return;
}
Lock lock = this.lock.writeLock();
lock.lock();
try {
callback.run();
}
catch (Throwable e) {
throw wrapIfNecessary(e);
}
finally {
lock.unlock();
}
}
/**
* Apply a lock (preferably a read lock allowing multiple concurrent
* access) to the bean. Callers should replace the bean input with the
* output.
*
* @param bean the bean to lock
* @param lock the lock to apply
* @return a proxy that locks while its methods are executed
*/
private Object getDisposalLockProxy(Object bean, final Lock lock) {
ProxyFactory factory = new ProxyFactory(bean);
factory.setProxyTargetClass(proxyTargetClass);
factory.addAdvice(new MethodInterceptor() {
public Object invoke(MethodInvocation invocation) throws Throwable {
lock.lock();
try {
return invocation.proceed();
}
finally {
lock.unlock();
}
}
});
return factory.getProxy();
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((name == null) ? 0 : name.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
BeanCallbackWrapper other = (BeanCallbackWrapper) obj;
if (name == null) {
if (other.name != null)
return false;
}
else if (!name.equals(other.name))
return false;
return true;
}
}
}
package org.springframework.batch.admin.sample.relay;
public interface Relay<T, S> {
S accept(T message);
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<integration:service-activator input-channel="pinged" expression="@refreshScope.refreshAll()"/>
<bean id="refreshScope" class="org.springframework.batch.admin.sample.scope.RefreshScope" />
<bean id="clusterProperties" class="org.springframework.batch.admin.sample.BroadcastPropertiesFactoryBean">
<property name="broadcastService" ref="broadcastService" />
<property name="template" value="cluster" />
<property name="servletPath" value="#{resourceService.servletPath}" />
</bean>
<bean id="fileService" class="org.springframework.batch.admin.sample.ClusterFileService" scope="refresh">
<property name="localService" ref="localFileService" />
<property name="services">
<bean class="org.springframework.batch.admin.sample.proxy.HttpInvokerProxiesFactoryBean">
<property name="serviceInterface" value="org.springframework.batch.admin.service.FileService" />
<property name="serviceUrls" value="#{clusterProperties['file.service.urls']}" />
<property name="localPort" value="${local.port:8080}" />
<property name="localHostName" value="#{broadcastService.id.host}" />
<property name="localService" ref="localFileService" />
</bean>
</property>
</bean>
<bean id="jobRequestHandler" class="org.springframework.batch.admin.sample.relay.CompositeRelay" scope="refresh">
<property name="delegates">
<bean class="org.springframework.batch.admin.sample.proxy.HttpInvokerProxiesFactoryBean">
<property name="serviceInterface" value="org.springframework.batch.admin.sample.relay.Relay" />
<property name="serviceUrls" value="#{clusterProperties['job.request.handler.service.urls']}" />
<property name="localPort" value="${local.port:8080}" />
<property name="localHostName" value="#{broadcastService.id.host}" />
</bean>
</property>
</bean>
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration" xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd">
<util:properties id="clusterProperties">
<prop key="file.service.urls">${file.service.urls}</prop>
<prop key="job.request.handler.service.urls">${job.request.handler.service.urls}></prop>
</util:properties>
<bean id="fileService" class="org.springframework.batch.admin.sample.ClusterFileService">
<property name="localService" ref="localFileService" />
<property name="services">
<bean class="org.springframework.batch.admin.sample.proxy.HttpInvokerProxiesFactoryBean">
<property name="serviceInterface" value="org.springframework.batch.admin.service.FileService" />
<property name="serviceUrls" value="${file.service.urls}" />
<property name="localPort" value="${local.port:8080}" />
<property name="localHostName" value="#{broadcastService.id.host}" />
<property name="localService" ref="localFileService" />
</bean>
</property>
</bean>
<bean id="jobRequestHandler" class="org.springframework.batch.admin.sample.relay.CompositeRelay">
<property name="delegates">
<bean class="org.springframework.batch.admin.sample.proxy.HttpInvokerProxiesFactoryBean">
<property name="serviceInterface" value="org.springframework.batch.admin.sample.relay.Relay" />
<property name="serviceUrls" value="${job.request.handler.service.urls}" />
<property name="localPort" value="${local.port:8080}" />
<property name="localHostName" value="#{broadcastService.id.host}" />
</bean>
</property>
</bean>
</beans>
/*
* Copyright 2006-2011 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample;
import java.util.HashMap;
import java.util.Map;
/**
* @author Dave Syer
*
*/
public class RequestResponseHeaderEnricher {
public Map<String,?> enrich(Map<String,Object> headers) {
Map<String,Object> map = new HashMap<String, Object>(headers);
if (map.containsKey("http_requestUrl")) {
String newRequest = (String) map.get("http_requestUrl");
String newResponse = null;
if (map.containsKey("x-http_responseurl")) {
newResponse = newRequest;
newRequest = (String) map.get("x-http_responseurl");
}
map.put("http_requestUrl", newRequest);
map.put("http_responseUrl", newResponse);
map.remove("x-http_responseurl");
}
return map;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<bean name="redirect:status" class="org.springframework.web.servlet.view.RedirectView">
<property name="url" value="#{resourceService.servletPath}/cluster" />
<property name="contextRelative" value="true" />
</bean>
<bean name="redirect:status.json" class="org.springframework.web.servlet.view.RedirectView">
<property name="url" value="#{resourceService.servletPath}/cluster.json" />
<property name="contextRelative" value="true" />
</bean>
<bean class="org.springframework.batch.admin.sample.web.MasterMenu" parent="baseMenu" />
<bean class="org.springframework.batch.admin.sample.web.MasterController" />
<bean id="cluster" parent="standard">
<property name="attributes">
<props merge="true">
<prop key="body">/sample/html/status.ftl</prop>
</props>
</property>
</bean>
<bean id="cluster.json" parent="standard.json">
<property name="attributes">
<props merge="true">
<prop key="body">/sample/json/status.ftl</prop>
</props>
</property>
</bean>
<bean class="org.springframework.batch.admin.sample.web.EnvironmentController" />
<bean name="env.json" parent="standard.json">
<property name="attributes">
<props merge="true">
<prop key="body">/sample/json/env.ftl</prop>
</props>
</property>
</bean>
</beans>
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.lead.support;
import org.springframework.batch.admin.sample.lead.Client;
import org.springframework.batch.admin.sample.lead.Lead;
import org.springframework.batch.admin.sample.lead.Product;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
/**
* A dummy reader for testing purposes.
*
* @author Dave Syer
*
*/
public class SimpleReader implements ItemReader<Lead> {
private int maxCount = 10;
private int count=0;
public Lead read() throws Exception, UnexpectedInputException, ParseException {
if (count>=maxCount) {
return null;
}
++count;
return new Lead(123L+count, new Client("Client"+count, "UK"), new Product("Foo"));
}
}
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.lead.support;
import java.util.List;
import org.springframework.batch.admin.sample.lead.Lead;
import org.springframework.batch.item.ItemWriter;
/**
* A writer for a list of leads.
*
* @author Dave Syer
*
*/
public class SimpleWriter implements ItemWriter<Lead> {
private StringBuffer result = new StringBuffer();
/**
* Wait for the results to arrive. Each item is a future result containing a
* list of leads. Each lead in that list will have been processed in a
* different branch of the message flow.
*
* @see ItemWriter#write(List)
*/
public void write(List<? extends Lead> items) throws Exception {
for (Lead value : items) {
String sales = value.getSalesRep();
if (sales != null && !sales.equals("Nobody")) {
result.append(value.getClient().getName() + "(" + sales + ")");
}
}
}
/**
* For testing purposes, keep an aggregate summary of all activity.
*
* @return a summary of all the calls to write
*/
public String getResult() {
return result.toString();
}
}
<?xml version="1.0" encoding="ISO-8859-1"?>
<project name="Spring Batch Admin: ${project.name}">
<bannerLeft>
<name>Spring Batch Admin: ${project.name}</name>
</bannerLeft>
<body>
<links>
<item name="${project.name}" href="index.html"/>
</links>
<menu name="Spring Batch Admin: ${project.name}">
<item name="${project.name}" href="index.html"/>
</menu>
<menu ref="reports"/>
</body>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd">
<job id="sql" xmlns="http://www.springframework.org/schema/batch">
<step id="sql-step">
<tasklet ref="tasklet"/>
</step>
</job>
<bean id="tasklet" class="org.springframework.batch.admin.sample.job.JdbcTasklet" scope="step">
<property name="dataSource" ref="dataSource" />
<property name="sql" value="#{jobParameters['sql']}" />
</bean>
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<import resource="classpath*:/META-INF/spring/batch/bootstrap/resources/*.xml" />
<import resource="classpath*:/META-INF/spring/batch/bootstrap/manager/*.xml" />
<import resource="classpath*:/META-INF/spring/batch/jobs/sql-context.xml" />
<integration:channel id="job-requests" />
<bean id="jobLoader" class="java.util.Date" />
<bean id="jobLauncherTaskExecutor" class="org.springframework.core.task.SyncTaskExecutor" />
</beans>
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.job;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import javax.sql.DataSource;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.simple.SimpleJdbcTemplate;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@DirtiesContext(classMode=ClassMode.AFTER_CLASS)
public class SqlJobIntegrationTests {
private SimpleJdbcTemplate jdbcTemplate;
@Autowired
public void setDataSource(DataSource dataSource) {
jdbcTemplate = new SimpleJdbcTemplate(dataSource);
}
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobExplorer jobExplorer;
@Autowired
private Job job;
@Test
public void testSelect() throws Exception {
int before = jobExplorer.getJobInstances("sql", 0, 1000).size();
JobParameters jobParameters = new JobParametersBuilder().addString("sql", "SELECT * FROM BATCH_STEP_EXECUTION")
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(job, jobParameters);
assertNotNull(jobExecution);
assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());
int after = jobExplorer.getJobInstances("sql", 0, 1000).size();
assertEquals(before + 1, after);
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
String description = stepExecution.getExitStatus().getExitDescription();
assertTrue("Wrong description: " + description, description.contains("STEP_EXECUTION_ID="
+ stepExecution.getId()));
}
@Test
public void testSelectWithBinding() throws Exception {
int before = jobExplorer.getJobInstances("sql", 0, 1000).size();
JobParameters jobParameters = new JobParametersBuilder().addString("sql",
"SELECT * FROM BATCH_STEP_EXECUTION where STEP_EXECUTION_ID=:stepExecution.id").toJobParameters();
JobExecution jobExecution = jobLauncher.run(job, jobParameters);
assertNotNull(jobExecution);
assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());
int after = jobExplorer.getJobInstances("sql", 0, 1000).size();
assertEquals(before + 1, after);
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
String description = stepExecution.getExitStatus().getExitDescription();
assertTrue("Wrong description: " + description, description.contains("STEP_EXECUTION_ID="
+ stepExecution.getId()));
}
@Test
public void testUpdate() throws Exception {
jdbcTemplate.update("INSERT INTO LEAD_INPUTS (ID,PROCESSED) values (0,false)");
int before = jobExplorer.getJobInstances("sql", 0, 1000).size();
JobParameters jobParameters = new JobParametersBuilder().addString("sql",
"UPDATE LEAD_INPUTS set PROCESSED=true where PROCESSED=:jobParameters[flag]")
.addString("flag", "false").toJobParameters();
JobExecution jobExecution = jobLauncher.run(job, jobParameters);
assertNotNull(jobExecution);
assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());
int after = jobExplorer.getJobInstances("sql", 0, 1000).size();
assertEquals(before + 1, after);
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
String description = stepExecution.getExitStatus().getExitDescription();
assertTrue("Wrong description: " + description, description.contains("Updated: 1"));
jdbcTemplate.update("DELETE FROM LEAD_INPUTS where ID=0");
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd">
<job id="staging" xmlns="http://www.springframework.org/schema/batch">
<step id="staging-step">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="5" />
</tasklet>
</step>
</job>
<chain input-channel="input-files" output-channel="job-requests" xmlns="http://www.springframework.org/schema/integration">
<filter method="filter">
<bean class="org.springframework.batch.admin.integration.FileParentDirectoryFilter" xmlns="http://www.springframework.org/schema/beans">
<property name="parentName" value="staging" />
</bean>
</filter>
<service-activator>
<bean class="org.springframework.batch.admin.integration.FileToJobLaunchRequestAdapter" xmlns="http://www.springframework.org/schema/beans">
<property name="job" ref="staging" />
</bean>
</service-activator>
</chain>
<bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="linesToSkip" value="1" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.PassThroughLineMapper"/>
</property>
<property name="resource" value="#{jobParameters['input.file']}" />
</bean>
<bean id="writer" class="org.springframework.batch.admin.sample.lead.support.StagingWriter">
<property name="dataSource" ref="dataSource"/>
<property name="incrementer" ref="incrementer"/>
</bean>
<bean id="incrementer" class="${batch.database.incrementer.class}">
<property name="dataSource" ref="dataSource" />
<property name="columnName" value="ID" />
<property name="incrementerName" value="LEAD_INPUTS_SEQ" />
</bean>
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<import resource="classpath*:/META-INF/spring/batch/bootstrap/resources/*.xml" />
<import resource="classpath*:/META-INF/spring/batch/bootstrap/manager/*.xml" />
<import resource="classpath*:/META-INF/spring/batch/jobs/staging-context.xml" />
<integration:channel id="job-requests" />
<bean id="jobLoader" class="java.util.Date" />
<bean id="jobLauncherTaskExecutor" class="org.springframework.core.task.SyncTaskExecutor" />
</beans>
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.job;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@DirtiesContext(classMode=ClassMode.AFTER_CLASS)
public class StagingJobIntegrationTests {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobExplorer jobExplorer;
@Autowired
private Job job;
@Test
public void testLaunchJob() throws Exception {
int before = jobExplorer.getJobInstances("staging", 0, 1000).size();
JobParameters jobParameters = new JobParametersBuilder().addString("input.file", "classpath:/data/test.txt")
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(job, jobParameters);
assertNotNull(jobExecution);
assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());
int after = jobExplorer.getJobInstances("staging", 0, 1000).size();
assertEquals(before + 1, after);
}
}
package org.springframework.batch.admin.sample.lead.support;
import java.util.List;
import javax.sql.DataSource;
import org.springframework.batch.item.ItemWriter;
import org.springframework.jdbc.core.simple.SimpleJdbcTemplate;
import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
/**
* @author Dave Syer
*
*/
public class StagingWriter implements ItemWriter<String> {
private SimpleJdbcTemplate jdbcTemplate;
private DataFieldMaxValueIncrementer incrementer;
public void setDataSource(DataSource dataSource) {
jdbcTemplate = new SimpleJdbcTemplate(dataSource);
}
public void setIncrementer(DataFieldMaxValueIncrementer incrementer) {
this.incrementer = incrementer;
}
public void write(List<? extends String> values) throws Exception {
for (String value : values) {
long id = incrementer.nextLongValue();
jdbcTemplate.update("INSERT INTO LEAD_INPUTS (ID, DATA) values(?,?)", id, value);
}
}
}
<#import "/spring.ftl" as spring />
<p>Broadcast Information:</p>
<table title="Node Information"
class="bordered-table">
<tr>
<th>Property</th>
<th>Value</th>
</tr>
<tr class="name-sublevel1-even">
<td>Name</td>
<td>${info.id.name}</td>
</tr>
<tr class="name-sublevel1-odd">
<td>Host</td>
<td>${info.id.host}</td>
</tr>
<tr class="name-sublevel1-even">
<td>Id</td>
<td>${info.id.id}</td>
</tr>
</table>
<#if info.peers?? && info.peers?size!=0>
<p>Peers (${info.peers?size}):</p>
<table title="Peers"
class="bordered-table">
<tr>
<th>Peer</th>
<th>Host</th>
<th>Id</th>
</tr>
<#list info.peers as peer>
<#if peer_index % 2 == 0>
<#assign rowClass="name-sublevel1-even" />
<#else>
<#assign rowClass="name-sublevel1-odd" />
</#if>
<tr class="${rowClass}">
<td>${peer.name}</td>
<td>${peer.host}</td>
<td>${peer.id}</td>
</tr>
</#list>
</table>
<#else>
<p>There are no peers registered</p>
</#if>
<#assign ping_url><@spring.url relativeUrl="${servletPath}/ping"/></#assign>
<p>To re-register peers, <a href="${ping_url}">click here</a> </p>
"node" : {
<#assign url><@spring.url relativeUrl="${baseUrl}${servletPath}/cluster.json"/></#assign>
"resource" : "${url}",
"name" : "${info.id.name}",
"host" : "${info.id.host}",
"id" : "${info.id.id}"
},
"peers" : [<#if info.peers?? && info.peers?size!=0><#list info.peers as peer>
{
"name" : "${peer.name}",
"host" : "${peer.host}",
"id" : "${peer.id}"
}<#if peer_index != info.peers?size-1>,</#if></#list></#if>
]
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.web.sample.views;
import static org.junit.Assert.assertEquals;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.admin.sample.BroadcastId;
import org.springframework.batch.admin.sample.util.JsonWrapper;
import org.springframework.batch.admin.sample.web.BroadcastInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.support.WebApplicationContextLoader;
import org.springframework.web.servlet.View;
@ContextConfiguration(loader = WebApplicationContextLoader.class, inheritLocations = false, locations = "AbstractSampleViewTests-context.xml")
@RunWith(SpringJUnit4ClassRunner.class)
public class StatusJsonViewTests extends AbstractSampleViewTests {
private final HashMap<String, Object> model = new HashMap<String, Object>();
@Autowired
@Qualifier("cluster.json")
private View status;
@Test
public void testListViewWithJobs() throws Exception {
model.put(
"info",
new BroadcastInfo(new BroadcastId("one", "two", "three"), Collections.singleton(new BroadcastId("one",
"two", "four"))));
model.put("baseUrl", "http://localhost:8080/springsource");
status.render(model, request, response);
String content = response.getContentAsString();
// System.err.println(content);
JsonWrapper wrapper = new JsonWrapper(content);
assertEquals(4, wrapper.get("node", Map.class).size());
assertEquals("one", wrapper.get("peers[0].name"));
}
}
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.web.sample.views;
import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.HashMap;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.admin.sample.BroadcastId;
import org.springframework.batch.admin.sample.web.BroadcastInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.support.WebApplicationContextLoader;
import org.springframework.web.servlet.View;
@ContextConfiguration(loader = WebApplicationContextLoader.class, inheritLocations = false, locations = "AbstractSampleViewTests-context.xml")
@RunWith(SpringJUnit4ClassRunner.class)
public class StatusViewTests extends AbstractSampleViewTests {
private final HashMap<String, Object> model = new HashMap<String, Object>();
@Autowired
@Qualifier("cluster")
private View view;
@Test
public void testStatus() throws Exception {
BroadcastId peer = new BroadcastId("foo", "BAR", "spam");
model.put("info", new BroadcastInfo(peer, Collections.singleton(peer)));
view.render(model, request, response);
String content = response.getContentAsString();
//System.err.println(content);
assertTrue(content.matches("(?s).*<td>.*"+peer.getName()+"</td>.*<td>.*"+peer.getHost()+"</td>.*"));
assertTrue(content.contains("Broadcast Information"));
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<bean id="jobLauncherSynchronizer" class="org.springframework.batch.admin.launch.JobLauncherSynchronizer">
<property name="jobExplorer" ref="jobExplorer"/>
<property name="jobRepository" ref="jobRepository"/>
</bean>
<aop:aspectj-autoproxy>
<aop:include name="jobLauncherSynchronizer"/>
</aop:aspectj-autoproxy>
</beans>
foo=${foo}
id,client.name,client.country,product.name,amount,query
12,Jochen,DE,Foo,1000,
13,Willi,DE,Foo,10000,
14,Goetz,DE,Foo,10,
15,Brian,UK,Bar,20000,
16,Roderick,UK,Bar,30,
17,Francois,FR,Foo,25,
18,Muriel,FR,Bar,120,
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd">
<job id="unstaging" xmlns="http://www.springframework.org/schema/batch">
<step id="unstaging-step">
<tasklet>
<chunk reader="reader" processor="processor" writer="writer" commit-interval="5" skip-limit="2">
<skippable-exception-classes>
<include class="java.lang.NumberFormatException" />
</skippable-exception-classes>
</chunk>
</tasklet>
</step>
</job>
<bean id="reader" class="org.springframework.batch.item.database.JdbcPagingItemReader">
<property name="dataSource" ref="dataSource" />
<property name="pageSize" value="100" />
<property name="queryProvider">
<bean class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="selectClause" value="*" />
<property name="fromClause" value="LEAD_INPUTS" />
<property name="whereClause" value="PROCESSED=false" />
<property name="sortKey" value="ID" />
</bean>
</property>
<property name="rowMapper">
<bean class="org.springframework.jdbc.core.ColumnMapRowMapper" />
</property>
</bean>
<bean id="processor" class="org.springframework.batch.admin.sample.lead.support.LeadUnstagingItemProcessor">
<property name="dataSource" ref="dataSource" />
<property name="incrementer" ref="incrementer" />
</bean>
<bean id="writer" class="org.springframework.batch.item.database.JdbcBatchItemWriter">
<property name="dataSource" ref="dataSource" />
<property name="itemSqlParameterSourceProvider">
<bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" />
</property>
<property name="sql">
<value>
INSERT INTO LEADS (ID, PRODUCT, CLIENT_NAME, CLIENT_COUNTRY, AMOUNT, QUERY) values
(:id, :product.name, :client.name, :client.country, :amount, :query)
</value>
</property>
</bean>
<bean id="incrementer" class="${batch.database.incrementer.class}">
<property name="dataSource" ref="dataSource" />
<property name="columnName" value="ID" />
<property name="incrementerName" value="LEADS_SEQ" />
</bean>
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<import resource="classpath*:/META-INF/spring/batch/bootstrap/resources/*.xml" />
<import resource="classpath*:/META-INF/spring/batch/bootstrap/manager/*.xml" />
<import resource="classpath*:/META-INF/spring/batch/jobs/unstaging-context.xml" />
<integration:channel id="job-requests" />
<bean id="jobLoader" class="org.springframework.batch.core.configuration.support.AutomaticJobRegistrar">
<property name="applicationContextFactories">
<bean class="org.springframework.batch.core.configuration.support.ClasspathXmlApplicationContextsFactoryBean">
<property name="resources" value="classpath*:/META-INF/spring/batch/jobs/staging*.xml" />
</bean>
</property>
<property name="jobLoader">
<bean class="org.springframework.batch.core.configuration.support.DefaultJobLoader">
<property name="jobRegistry" ref="jobRegistry" />
</bean>
</property>
</bean>
<bean id="jobLauncherTaskExecutor" class="org.springframework.core.task.SyncTaskExecutor" />
</beans>
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample.job;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.JobLocator;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@DirtiesContext(classMode=ClassMode.AFTER_CLASS)
public class UnstagingJobIntegrationTests {
private static Log logger = LogFactory.getLog(UnstagingJobIntegrationTests.class);
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobExplorer jobExplorer;
@Autowired
private JobLocator jobLocator;
@Autowired
@Qualifier("unstaging")
private Job unstaging;
@Test
public void testLaunchJob() throws Exception {
runStaging();
JobParameters jobParameters = new JobParameters();
int before = jobExplorer.getJobInstances("unstaging", 0, 1000).size();
JobExecution jobExecution = jobLauncher.run(unstaging, jobParameters);
assertNotNull(jobExecution);
assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());
int after = jobExplorer.getJobInstances("unstaging", 0, 1000).size();
assertEquals(before + 1, after);
}
private void runStaging() throws Exception {
JobParameters jobParameters = new JobParametersBuilder().addString("input.file", "classpath:/data/test.txt")
.toJobParameters();
try {
jobLauncher.run(jobLocator.getJob("staging"), jobParameters);
}
catch (JobInstanceAlreadyCompleteException e) {
logger.info("Staging already complete. Not running again.");
}
}
}
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.admin.sample;
import static org.junit.Assert.assertEquals;
import java.net.URI;
import org.junit.Test;
import org.springframework.web.util.UriTemplate;
/**
* @author Dave Syer
*
*/
public class UriTemplateTests {
@Test
public void testPlaceholders() throws Exception {
UriTemplate template = new UriTemplate("http://{hostname}/{context}/{servlet}/foo?bar={spam}");
URI result = template.expand("localhost", "foo", "bar", "spam");
assertEquals("http://localhost/foo/bar/foo?bar=spam", result.toASCIIString());
}
@Test
public void testProtocolPlaceholders() throws Exception {
UriTemplate template = new UriTemplate("{base}/{context}/{servlet}/foo?bar={spam}");
URI result = template.expand("http://localhost", "foo", "bar", "spam");
assertEquals("http://localhost/foo/bar/foo?bar=spam", result.toASCIIString());
}
}
h1. Remote Chunking
Spring Batch provides some standard features to execute a step in a job by sending chunks of items from a master to a bunch of workers, or competing consumers. (Actually these features are not yet released, but plenty of best practice experience exists, and there is code in SVN.) The messaging infrastructure has to be JMS (or have the same single consumer guaranteed delivery semantics). It's quite tricky to set up and even more so when you enter the cloud space because the nodes in the messaging layer need to be aware of each other point-to-point, and the application nodes also need to be aware of each of the messaging nodes point-to-point.
h2. Example: Messaging with ActiveMQ
Goal: set up a tier of message brokers in the platform somewhere, and have them use standard failover configuration so that they are fault tolerant and load balanced.
h3. Brokers
Each broker needs to be started with a network connector URI that lists the other brokers
Each broker node needs a network URI for the other brokers in the cluster, e.g. if Spring XML is used to configure the broker
{code}
<bean id="brokerService" class="org.apache.activemq.broker.BrokerService"
init-method="start" destroy-method="stop">
<property name="brokerName" value="${broker.name}" />
...
<property name="networkConnectorURIs">
<list>
<value>${jms.network.connector.uri}</value>
</list>
</property>
...
</bean>
{code}
with the broker name and uri provided from a placeholder):
{code}
jms.broker.name=host1.broker
jms.network.connector.uri=static:(tcp://host1:61616,tcp://host2:61616)
{code}
Where "hostX" are the IP addresses of the broker nodes, and the broker name is unique in the cluster.
There are two possible deployment / cluster architectures:
# The brokers all live in their own VM in a cluster tier between the database and the application.
# The brokers are embedded in the same VM as the application.
h3. Application
Each application node needs a network URI for the brokers in the connection factory, e.g. if using Spring XML for configuration
{code}
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
depends-on="brokerService">
<property name="brokerURL" value="${jms.client.broker.uri}" />
<property name="prefetchPolicy">
<bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
<property name="all" value="0" />
</bean>
</property>
</bean>
{code}
with the broker uri provided from a placeholder (hostX are the IP addresses of the broker nodes, which might be the same as the application nodes if the broker is embedded):
{code}
jms.client.broker.uri=failover:(tcp://host1:61616,tcp://host2:61616)?timeout=3000
{code}
h2. Dynamic Cluster Membership
When the broker population changes (e.g. a node is added to increase performance under load) the applications have to be updated. The only practical way to do this is a rolling restart - especially in a servlet application, the only way to refresh the Spring context really is to redeploy the WAR. Batch applications are mostly stateless, but you would want to wait for each node to finish processing what it was in the middle of before restarting it.
h2. CloudFoundry Experience and Observations
I tried to set up a messaging tier embedded in the tomcat application in CloudFoundry. It works if the URI placeholders are added to the deployment manually, but of course you only know the host names / IP addresses after the deployment is successful. So it's a very manual process (and one one I would like to repeat very often).
To automate I tried a couple of things (described below in more detail). Looking at the source code for CloudFoundry I can see that it has the knowledge that I needed, but I don't have a way to inject a script that can set up the properties that I need.
h3. Shell Hacks Against DB Master
The DB master always has a known alias ("dbmaster") in the application tier. I wrote a shell script that would run on the app tier box (as allowed by CloudFoundry) and send its host name by scp to a known temporary file location. All the app tier nodes do this and wait for a response containing a fixed number of node ids (assuming we know the cluster size before we start).
This would have worked for the case of a cluster with a fixed number of nodes, except for the fact that the nodes cannot ssh into each other (no private key is installed).
h3. Using the Load Balancer to Pass Messages
Another fixed point in the cluster is the load balancer. The application nodes don't know the address of the load balancer when they start, but they can find it out by looking at the request URL when an HTTP request comes in.
Using this and the knowledge that the load balancer is a round robin, you can devise a "good enough" protocol for bouncing an HTTP request up and down from the application to the load balancer enough times to pick up all the node host addresses. Once all the application nodes know the other addresses they can serve them up in a format (e.g. .properties) that is useful to a Spring application context.
This works, but it's awkward because it has to be bootstrapped by a request to one of the applications, and this has to happen before the Spring context that needs the node addresses tries to initialise itself. Fortunately CloudFoundry pings the application home page so it can be used to initiate the process, but you still have to work out how to enforce the ordering of context loading.
h2. Advanced Load Balancer?
I also remembered Jim Jagielski talking about "advanced load balancer" options in HTTPD. Maybe this is a good use case: the load balancer always knows about the application nodes and can assess their health, so all we need is:
# a way to send it a message and have it broadcasted to all the nodes
# a way to ask it to send such a message with a list of node addresses
h1. File Processing
A key use case for Batch systems is processing file-based data (it's a dirty job but someone has to do it). Some of the files are quite large (many GB), but not all. In the case of small files users or operators can upload them through a web UI (Spring Batch Admin provides this feature) to a temporary directory. In the case of large files users would probably want to use FTP. It is inefficient to move the file around once it has been uploaded and for efficient reading it needs to be on a local file system where it is being processed.
h2. Web Upload and Batch Processing
The sunny day case for the web upload looks like this:
* User uploads file through web UI
* System generates a trigger (sends a Spring Integration message) locally on the node that has the file
* Job launches on the same node and picks up file location. Data is processed, e.g. scrubbed and transferred to a database. The Batch metadata are updated in the database to show the number of items processed, etc.
* System leaves file where it is and reports successful exit
h2. Manual File Upload
For the manual FTP upload the first problem is: the user needs to know where to put the file. He might not (should not?) have knowledge or direct TCP access to the application nodes. The second problem is: the system needs to know when the file upload finishes (not when it starts) so it can trigger the job.
h2. Job Failure and Restart
If a job fails Spring Batch allows a restart and positions the reader after the item that was last committed. If the input source is a file, then this just means changing a file pointer.
The problem for a cloud deployment is that the file is located on one of the application nodes, but there is no way to know which one from the outside. You can send in a restart request to the load balancer, but it would probably go to the wrong node.
The solution might be messaging
# _Point-to-Point_: The system to store information about file-node associations centrally, combined with some point-to-point messaging.
# _Publish-Subscribe_ The restart message is sent to all nodes and ignored by the ones that don't have the file - they know which ones they are. (The advanced load balancer would work here too.)
In neither case does the messaging need to be as robust or safe as JMS. If the message doesn't get through the user will just try again, and requests are idempotent because of the Batch meta data.
h2. File Output
If the output of a job is a file (also not unusual) then there is a similar problem to the restart: how can it be located and relayed, or its location relayed, to the user?
<?xml version="1.0" encoding="ISO-8859-1"?>
<web-app xmlns="http://java.sun.com/xml/ns/j2ee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://java.sun.com/xml/ns/j2ee
http://java.sun.com/xml/ns/j2ee/web-app_2_4.xsd" version="2.4">
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath*:/org/springframework/batch/admin/web/resources/webapp-config.xml</param-value>
</context-param>
<listener>
<listener-class>org.springframework.batch.admin.sample.web.EnvironmentContextListener</listener-class>
</listener>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<filter>
<filter-name>shallowEtagHeaderFilter</filter-name>
<filter-class>org.springframework.web.filter.ShallowEtagHeaderFilter</filter-class>
</filter>
<filter>
<filter-name>hiddenHttpMethodFilter</filter-name>
<filter-class>org.springframework.web.filter.HiddenHttpMethodFilter</filter-class>
</filter>
<filter-mapping>
<filter-name>shallowEtagHeaderFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<filter-mapping>
<filter-name>hiddenHttpMethodFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<servlet>
<servlet-name>Batch Servlet</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath*:/org/springframework/batch/admin/web/resources/servlet-config.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>Batch Servlet</servlet-name>
<url-pattern>/*</url-pattern>
</servlet-mapping>
</web-app>
package org.springframework.test.context.support;
/*
* Copyright 2009-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.support.BeanDefinitionReader;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.beans.factory.xml.XmlBeanDefinitionReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigUtils;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.mock.web.MockServletContext;
import org.springframework.util.StringUtils;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.support.GenericWebApplicationContext;
public class WebApplicationContextLoader extends AbstractContextLoader {
protected static final Log logger = LogFactory.getLog(WebApplicationContextLoader.class);
/**
* <p>
* Creates a new {@link XmlBeanDefinitionReader}.
* </p>
*
* @return a new XmlBeanDefinitionReader.
* @see AbstractGenericContextLoader#createBeanDefinitionReader(GenericApplicationContext)
* @see XmlBeanDefinitionReader
*/
protected BeanDefinitionReader createBeanDefinitionReader(final GenericApplicationContext context) {
return new XmlBeanDefinitionReader(context);
}
/**
* Returns &quot;<code>-context.xml</code>&quot;.
*
* @see org.springframework.test.context.support.AbstractContextLoader#getResourceSuffix()
*/
@Override
public String getResourceSuffix() {
return "-context.xml";
}
public ApplicationContext loadContext(String... locations) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("Loading ApplicationContext for locations ["
+ StringUtils.arrayToCommaDelimitedString(locations) + "].");
}
GenericWebApplicationContext context = new GenericWebApplicationContext();
prepareContext(context);
customizeBeanFactory(context.getDefaultListableBeanFactory());
createBeanDefinitionReader(context).loadBeanDefinitions(locations);
AnnotationConfigUtils.registerAnnotationConfigProcessors(context);
customizeContext(context);
context.refresh();
context.registerShutdownHook();
return context;
}
protected void customizeContext(GenericWebApplicationContext context) {
}
protected void customizeBeanFactory(DefaultListableBeanFactory defaultListableBeanFactory) {
}
private void prepareContext(GenericWebApplicationContext context) {
MockServletContext servletContext = new MockServletContext();
context.setServletContext(servletContext);
servletContext.setAttribute(WebApplicationContext.ROOT_WEB_APPLICATION_CONTEXT_ATTRIBUTE, context);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment