This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Configuration | |
@EnableKafka | |
@EnableKafkaStreams | |
public class KafkaStreamConfig { | |
@Value("${kafka.bootstrapAddress}") | |
private String bootstrapAddress; | |
@Value("${kafka.groupId}") | |
private String groupId; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Configuration | |
@EnableKafka | |
public class KafkaConsumerConfig { | |
@Value( "${kafka.bootstrapAddress}" ) | |
private String bootstrapAddress; | |
@Value( "${kafka.groupId}" ) | |
private String groupId; | |
@Bean |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@SpringBootApplication | |
@EnableScheduling | |
public class Application { | |
@Autowired | |
private WeatherLogService weatherLogService; | |
public static void main(String[] args) { | |
SpringApplication.run(Application.class, args); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Service | |
public class WeatherLogServiceImpl implements WeatherLogService { | |
private final WeatherRepository repository; | |
@Autowired | |
WeatherLogServiceImpl(final WeatherRepository repository) { | |
this.repository = repository; | |
} | |
@Override | |
public void saveLogs(WeatherLog log) throws InterruptedException { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE weather.log | |
( | |
city character varying(255) COLLATE pg_catalog."default" NOT NULL, | |
latitude double precision NOT NULL, | |
longitude double precision NOT NULL, | |
temp double precision NOT NULL, | |
logdate date NOT NULL, | |
id integer NOT NULL | |
) PARTITION BY RANGE (logdate); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@SpringBootApplication | |
@EnableScheduling | |
public class Application { | |
@Autowired | |
private WeatherAgentService weatherAgentService; | |
public static void main(String[] args) { | |
SpringApplication.run(Application.class, args); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Service | |
public class WeatherAgentServiceImpl implements WeatherAgentService { | |
private final CityRepository cityRepository; | |
private final ExecutorService executor; | |
private final KafkaTemplate<String, WeatherInfo> kafkaTemplate; | |
private final Random rand = new Random(); | |
private final int min = -20; | |
private final int max = 50; | |
@Autowired | |
public WeatherAgentServiceImpl(CityRepository cityRepository, KafkaTemplate<String, WeatherInfo> kafkaTemplate) { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Configuration | |
public class KafkaProducerConfig { | |
@Value( "${kafka.bootstrapAddress}" ) | |
private String bootstrapAddress; | |
@Bean | |
public ProducerFactory<String, WeatherInfo> producerFactory() { | |
Map<String, Object> configProps = new HashMap<>(); | |
configProps.put( | |
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Configuration | |
public class KafkaTopicConfig { | |
@Value("${kafka.bootstrapAddress}") | |
private String bootstrapAddress; | |
@Bean | |
public KafkaAdmin kafkaAdmin() { | |
Map<String, Object> configs = new HashMap<>(); | |
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); |