Skip to content

Instantly share code, notes, and snippets.

@uklance
Last active October 22, 2021 21:38
Show Gist options
  • Save uklance/e8b88da8028a7e391da616769d5745b4 to your computer and use it in GitHub Desktop.
Save uklance/e8b88da8028a7e391da616769d5745b4 to your computer and use it in GitHub Desktop.
package com.example.jet;
import com.hazelcast.core.ManagedContext;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.test.TestSources;
import com.hazelcast.spring.context.SpringManagedContext;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import java.util.Map;
import static com.hazelcast.jet.spring.JetSpringServiceFactories.bean;
public class LocalJetTest {
@Test
public void test1() throws Exception {
AnnotationConfigApplicationContext appContext = new AnnotationConfigApplicationContext(Config1.class, Config2.class);
appContext.start();
Thread.sleep(10_000);
}
public static class MyService {
public String toLowerCase(String s) {
return s.toLowerCase();
}
}
@Configuration
@PropertySource("classpath:spring.properties")
public static class Config1 {
@Bean
public MyService myService() {
return new MyService();
}
}
@Configuration
public static class Config2 {
@Bean
public ManagedContext managedContext() {
return new SpringManagedContext();
}
@Bean
public Pipeline samplePipeline(@Value("${token}") String token, @Value("${replacement}") String replacement) {
FunctionEx<String, String> inlineFunction = s -> s.equals(token) ? replacement : s;
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("TEN", "TWENTY", "THIRTY"))
.map(inlineFunction)
.mapUsingService(bean(MyService.class), (myService, s) -> myService.toLowerCase(s))
.writeTo(Sinks.logger());
return p;
}
@Bean(destroyMethod = "shutdown")
public JetInstance jet(ManagedContext managedContext, Map<String, Pipeline> pipelines) {
JetConfig jetConfig = new JetConfig()
.configureHazelcast(hz -> hz.setManagedContext(managedContext));
JetInstance jet = Jet.newJetInstance(jetConfig);
pipelines.forEach((name, pipeline) -> {
jet.newJob(pipeline, new JobConfig().setName(name));
});
return jet;
}
}
}
22:33:10.564 [ INFO] [c.h.j.i.MasterJobContext] Start executing job 'samplePipeline', execution 06fe-b9dd-9581-0001, execution graph in DOT format:
digraph DAG {
"items" [localParallelism=1];
"map" [localParallelism=4];
"mapUsingService" [localParallelism=4];
"loggerSink" [localParallelism=1];
"items" -> "map" [queueSize=1024];
"map" -> "mapUsingService" [queueSize=1024];
"mapUsingService" -> "loggerSink" [queueSize=1024];
}
HINT: You can use graphviz or http://viz-js.com to visualize the printed graph.
22:33:10.700 [ INFO] [c.h.j.i.JobExecutionService] Execution plan for jobId=06fe-b9dd-9580-0001, jobName='samplePipeline', executionId=06fe-b9dd-9581-0001 initialized
22:33:10.700 [ INFO] [c.h.j.i.JobExecutionService] Start execution of job 'samplePipeline', execution 06fe-b9dd-9581-0001 from coordinator [192.168.0.33]:5701
22:33:10.767 [ INFO] [c.h.j.i.c.WriteLoggerP] [samplePipeline/loggerSink#0] fifty
22:33:10.800 [ INFO] [c.h.j.i.c.WriteLoggerP] [samplePipeline/loggerSink#0] ten
22:33:10.800 [ INFO] [c.h.j.i.c.WriteLoggerP] [samplePipeline/loggerSink#0] thirty
22:33:10.826 [ INFO] [c.h.j.i.MasterJobContext] Execution of job 'samplePipeline', execution 06fe-b9dd-9581-0001 completed successfully
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<name>${artifactId}</name>
<description>${artifactId}</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.3.11</version>
</dependency>
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-jet-spring</artifactId>
<version>4.5.1</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
token=TWENTY
replacement=FIFTY
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment