Skip to content

Instantly share code, notes, and snippets.

@thetekst
Last active March 7, 2019 16:04
Show Gist options
  • Save thetekst/ccdd19d9f51bc6c5e11f4d93d91ad0a3 to your computer and use it in GitHub Desktop.
Save thetekst/ccdd19d9f51bc6c5e11f4d93d91ad0a3 to your computer and use it in GitHub Desktop.
Spring Boot 2. Server-Sent Events (SSE) by timeout
buildscript {
repositories {
mavenLocal()
jcenter()
}
}
plugins {
id 'java'
id 'groovy'
id 'org.springframework.boot' version '2.1.2.RELEASE'
id 'org.liquibase.gradle' version '2.0.1'
}
apply plugin: 'io.spring.dependency-management'
group 'ru.tkachenko'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.11
targetCompatibility = 1.11
ext {
postgresqlVersion = '42.2.5'
modelmapperVersion = '2.3.2'
modelmapperVersion = '2.3.2'
swagger2Version = '2.9.2'
spockVersion = '1.1-groovy-2.4'
}
repositories {
mavenLocal()
jcenter()
}
dependencies {
compile('org.springframework.boot:spring-boot-starter-jetty') {
exclude group: 'org.eclipse.jetty.websocket'
}
compile('org.springframework.boot:spring-boot-starter-web') {
exclude module: 'spring-boot-starter-tomcat'
}
compile 'org.springframework.boot:spring-boot-starter-data-jpa'
compile('org.springframework.boot:spring-boot-starter-data-elasticsearch')
compile 'org.springframework.boot:spring-boot-starter-security'
compile 'org.springframework.boot:spring-boot-starter-actuator'
compile 'org.springframework.boot:spring-boot-starter-webflux'
compile "io.springfox:springfox-swagger2:${swagger2Version}"
compile "io.springfox:springfox-swagger-ui:${swagger2Version}"
compile 'com.zaxxer:HikariCP'
compile "org.postgresql:postgresql:${postgresqlVersion}"
compile "org.modelmapper:modelmapper:${modelmapperVersion}"
compile "com.fasterxml.jackson.datatype:jackson-datatype-joda"
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
liquibaseRuntime 'org.liquibase:liquibase-core'
liquibaseRuntime 'org.liquibase:liquibase-groovy-dsl:2.0.1'
liquibaseRuntime 'ch.qos.logback:logback-classic'
liquibaseRuntime "org.postgresql:postgresql:${postgresqlVersion}"
testCompile 'org.springframework.boot:spring-boot-starter-test'
testCompile 'org.codehaus.groovy:groovy-all'
testCompile "org.spockframework:spock-spring:${spockVersion}"
testCompile 'com.h2database:h2'
}
liquibase {
activities {
main {
def file = new File("${projectDir}/liquibase.properties")
if (file.exists()) {
def props = new Properties()
InputStream is = new FileInputStream(file)
props.load(is)
is.close()
changeLogFile props['changeLogFile']
url props['url']
username props['username']
password props['password']
} else {
println "Add ${projectDir}/liquibase.properties if you want use liquibase plugin"
}
}
}
}
springBoot {
buildInfo {
properties {
version = project.version
time = java.time.Instant.now()
name = project.name
additional['Built-By'] = System.getProperty('user.name')
}
}
}
bootJar {
launchScript()
}
task wrapper(type: Wrapper, overwrite: true) {
gradleVersion = '5.1.1'
}
package ru.tkachenko.utils;
/**
* @author d.tkachenko
*/
public class Constant {
public static final String V1 = "v1/api";
}
package ru.tkachenko.rest.controller;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import ru.tkachenko.rest.model.SectionListResponse;
import ru.tkachenko.service.SectionService;
import ru.tkachenko.utils.Constant;
/**
* @author d.tkachenko
*/
@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping(Constant.V1 + "/section")
public class SectionController {
private final SectionService sectionService;
@CrossOrigin
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<SectionListResponse> page() {
return sectionService.page();
}
}
package ru.tkachenko.rest.controller;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import ru.tkachenko.rest.model.SectionDto;
import ru.tkachenko.rest.model.SectionRequest;
import ru.tkachenko.service.SectionService;
import ru.tkachenko.utils.Constant;
import javax.validation.Valid;
import java.util.List;
/**
* @author d.tkachenko
*/
@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping(Constant.V1 + "/section")
public class SectionController2 {
private final SectionService sectionService;
@GetMapping("{id}")
public SectionDto get(@PathVariable final Long id) {
return sectionService.get(id);
}
@GetMapping
public List<SectionDto> page() {
return sectionService.page();
}
@CrossOrigin
@GetMapping(value = "sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<SectionDto> sse() {
return sectionService.sse();
}
@DeleteMapping("{id}")
public void delete(@PathVariable final Long id) {
sectionService.delete(id);
}
@PostMapping
public SectionDto save(@RequestBody @Valid final SectionRequest request) {
return sectionService.save(request);
}
}
package ru.tkachenko.rest.controller;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import ru.tkachenko.rest.model.SectionDto;
import ru.tkachenko.rest.model.SectionRequest;
import ru.tkachenko.service.SectionService;
import ru.tkachenko.utils.Constant;
import javax.validation.Valid;
import java.util.List;
/**
* @author d.tkachenko
*/
@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping(Constant.V1 + "/section")
public class SectionController {
private final SectionService sectionService;
@GetMapping("{id}")
public SectionDto get(@PathVariable final Long id) {
return sectionService.get(id);
}
@GetMapping
public List<SectionDto> page() {
return sectionService.page();
}
@CrossOrigin
@GetMapping(value = "listen", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<SectionDto>> listen() {
return sectionService.listen();
}
@DeleteMapping("{id}")
public void delete(@PathVariable final Long id) {
sectionService.delete(id);
}
@PostMapping
public SectionDto save(@RequestBody @Valid final SectionRequest request) {
return sectionService.save(request);
}
}
package ru.tkachenko.data.converter;
import lombok.RequiredArgsConstructor;
import org.modelmapper.ModelMapper;
import org.springframework.stereotype.Component;
import ru.tkachenko.data.model.db.Section;
import ru.tkachenko.rest.model.SectionDto;
/**
* @author d.tkachenko
*/
@Component
@RequiredArgsConstructor
public class SectionConverter {
private final ModelMapper modelMapper;
public SectionDto convertToDto(final Section obj) {
return modelMapper.map(obj, SectionDto.class);
}
}
package ru.tkachenko.rest.model;
import lombok.Data;
import lombok.experimental.Accessors;
import java.time.LocalDateTime;
/**
* @author d.tkachenko
*/
@Data
@Accessors(chain = true)
public class SectionDto {
private Long id;
private String name;
private LocalDateTime created;
private LocalDateTime updated;
private Boolean secret;
}
package ru.tkachenko.rest.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import java.util.List;
/**
* @author d.tkachenko
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
@EqualsAndHashCode(callSuper = true)
public class SectionListResponse extends Response {
private List<SectionDto> sections;
}
package ru.tkachenko.service;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuple2;
import ru.tkachenko.data.converter.SectionConverter;
import ru.tkachenko.data.repository.jpa.SectionRepository;
import ru.tkachenko.rest.model.SectionDto;
import ru.tkachenko.rest.model.SectionListResponse;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* @author d.tkachenko
*/
@Service
@RequiredArgsConstructor
public class SectionService {
private final SectionConverter sectionConverter;
private final SectionRepository sectionRepository;
public Flux<SectionListResponse> page() {
final var personFlux = Flux.fromStream(Stream.generate(() -> {
final List<SectionDto> collect = sectionRepository.findAll().stream()
.map(sectionConverter::convertToDto).collect(Collectors.toList());
return new SectionListResponse(collect);
}));
final var interval = Flux.interval(Duration.ofSeconds(5));
return Flux.zip(personFlux, interval).map(Tuple2::getT1);
}
}
package ru.tkachenko.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import ru.tkachenko.data.converter.SectionConverter;
import ru.tkachenko.data.repository.jpa.SectionRepository;
import ru.tkachenko.rest.model.SectionDto;
import ru.tkachenko.rest.model.SectionRequest;
import ru.tkachenko.service.exception.AppError;
import ru.tkachenko.service.exception.AppServiceException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/**
* @author d.tkachenko
*
* Get List Dto's
*/
@Slf4j
@Service
public class SectionService2 {
private final SectionConverter sectionConverter;
private final SectionRepository sectionRepository;
private final FluxProcessor fluxProcessor;
private final FluxSink fluxSink;
private final AtomicLong counter;
public SectionService(final SectionConverter sectionConverter, final SectionRepository sectionRepository) {
this.sectionConverter = sectionConverter;
this.sectionRepository = sectionRepository;
this.fluxProcessor = DirectProcessor.create().serialize();
this.fluxSink = fluxProcessor.sink();
this.counter = new AtomicLong();
}
public SectionDto get(final Long id) {
return sectionRepository.findById(id).map(sectionConverter::convertToDto)
.orElseThrow(() -> new AppServiceException(AppError.SECTION_NOT_FOUND));
}
@Transactional
public List<SectionDto> page() {
final var sections = findAll();
counter.getAndIncrement();
fluxSink.next(sections);
return sections;
}
public Flux sse() {
return fluxProcessor.map(e -> ServerSentEvent.builder(e).build());
}
@Transactional
public void delete(final Long id) {
sectionRepository.deleteById(id);
counter.getAndIncrement();
fluxSink.next(findAll());
}
@Transactional
public SectionDto save(final SectionRequest request) {
final var section = sectionConverter.convertToEntity(request);
final var saved = sectionRepository.save(section);
final var response = sectionConverter.convertToDto(saved);
counter.getAndIncrement();
fluxSink.next(findAll());
return response;
}
private List<SectionDto> findAll() {
return sectionRepository.findAll().stream()
.map(sectionConverter::convertToDto).collect(Collectors.toList());
}
}
package ru.tkachenko.service;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.UnicastProcessor;
import ru.tkachenko.data.converter.SectionConverter;
import ru.tkachenko.data.repository.jpa.SectionRepository;
import ru.tkachenko.rest.model.SectionDto;
import ru.tkachenko.rest.model.SectionRequest;
import ru.tkachenko.service.exception.AppError;
import ru.tkachenko.service.exception.AppServiceException;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author d.tkachenko
*
* Get single Dto
*/
@Service
public class SectionService {
private final SectionConverter sectionConverter;
private final SectionRepository sectionRepository;
private final FluxProcessor<SectionDto, SectionDto> processor;
private final FluxSink<SectionDto> sink;
public SectionService(final SectionConverter sectionConverter, final SectionRepository sectionRepository) {
this.sectionConverter = sectionConverter;
this.sectionRepository = sectionRepository;
this.processor = UnicastProcessor.create();
this.sink = processor.sink();
}
public SectionDto get(final Long id) {
return sectionRepository.findById(id).map(sectionConverter::convertToDto)
.orElseThrow(() -> new AppServiceException(AppError.SECTION_NOT_FOUND));
}
public List<SectionDto> page() {
return sectionRepository.findAll().stream()
.map(sectionConverter::convertToDto).collect(Collectors.toList());
}
public Flux<ServerSentEvent<SectionDto>> listen() {
return processor.map(e -> ServerSentEvent.builder(e).build());
}
@Transactional
public void delete(final Long id) {
sectionRepository.deleteById(id);
sink.next(new SectionDto().setId(id).setDeleted(true));
}
@Transactional
public SectionDto save(final SectionRequest request) {
final var section = sectionConverter.convertToEntity(request);
final var saved = sectionRepository.save(section);
final var response = sectionConverter.convertToDto(saved);
sink.next(response);
return response;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment