Skip to content

Instantly share code, notes, and snippets.

View artembilan's full-sized avatar
🇺🇦

Artem Bilan artembilan

🇺🇦
View GitHub Profile
@Test
public void testTransactionalBRPOP() {
RedisConnectionFactory connectionFactory = new JedisConnectionFactory();
RedisTemplate<String, String> redisTemplate = new StringRedisTemplate(connectionFactory);
redisTemplate.setEnableTransactionSupport(true);
redisTemplate.afterPropertiesSet();
TransactionTemplate transactionTemplate = new TransactionTemplate(new PseudoTransactionManager());
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@artembilan
artembilan / TailSourceConfiguration.java
Created September 13, 2016 13:09
Reloadable Tail SCSt Source App
/*
* Copyright 2015-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
private static class SubscribableChannelPublisherAdapter
implements Publisher<Message<?>>, Subscriber<Message<?>>, Subscription {
private final DirectProcessor<Message<?>> delegate = DirectProcessor.create();
private final MessageHandler subscriberAdapter = this.delegate.connectEmitter()::accept;
private final SubscribableChannel channel;
private Subscriber<? super Message<?>> actualSubscriber;
@Test
public void testParallelPromises() throws InterruptedException {
Environment environment = new Environment();
final AtomicBoolean first = new AtomicBoolean(true);
for (int i = 0; i < 10; i++) {
final Promise<String> promise = Promises.task(environment, () -> {
if (!first.getAndSet(false)) {
try {
Thread.sleep(1000);
}
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
@Test
@RedisAvailable
public void testFtpWithRedisMetadataStore() throws Exception {
RedisTemplate<String, ?> template = new RedisTemplate<String, Object>();
template.setConnectionFactory(this.getConnectionFactoryForTest());
template.setKeySerializer(new StringRedisSerializer());
template.afterPropertiesSet();
template.delete("persistentAcceptOnceFileListFilterRedisTests");
final FTPFile ftpFile = new FTPFile();
@Bean
public IntegrationFlow sendToKafkaFlow(String serverAddress) {
return f -> f.<String>split(p -> FastList.newWithNValues(100, () -> p), null)
.handle(kafkaMessageHandler(serverAddress));
}
private KafkaProducerMessageHandlerSpec kafkaMessageHandler(String serverAddress) {
return Kafka.outboundChannelAdapter(props -> props.put("queue.buffering.max.ms", "15000"))
.messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))