Skip to content

Instantly share code, notes, and snippets.

View bbalakriz's full-sized avatar

Bala bbalakriz

View GitHub Profile
@bbalakriz
bbalakriz / WeightedHashTag.java
Created May 8, 2020 06:06
WeightedHashTag.java
@Path("hashtags")
public class WeightedHashTag {
@Inject
@Channel("hash-stream")
Publisher<String> weightedTag;
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
@bbalakriz
bbalakriz / tweet-agg-kafka.properties
Last active May 8, 2020 05:26
tweet-agg-kafka.properties
# quarkus kafka-streams config
quarkus.kafka-streams.bootstrap-servers=localhost:9092
quarkus.kafka-streams.application-id=tweet-agg
quarkus.kafka-streams.topics=twitter-feeds
# kafka pass-through options
kafka-streams.cache.max.bytes.buffering=0
kafka-streams.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
kafka-streams.default.value.serde=com.redhat.hackfest.serializers.FeedSeder
kafka-streams.default.timestamp.extractor=com.redhat.hackfest.utils.FeedTimestampExtractor
@bbalakriz
bbalakriz / FeedTimestampExtractor.java
Created May 8, 2020 05:19
FeedTimestampExtractor.java
public class FeedTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
final SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM dd HH:mm:ss zzz yyyy");
String eventTime = ((Feed) record.value()).getTime();
try {
return sdf.parse(eventTime).getTime();
} catch (ParseException e) {
LOG.error("Error in timestamp parsing. Check the incoming feed timestamp format");
@bbalakriz
bbalakriz / FeedReader.java
Created May 8, 2020 04:42
FeedReader.java
@ApplicationScoped
public class FeedReader {
private static final String TWITTER_FEEDS_TOPIC = "twitter-feeds";
private static final String FEEDS_AGGREGATED_TOPIC = "agg-feeds";
@Produces
public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
builder.<String, Feed>stream(TWITTER_FEEDS_TOPIC,
public class FeedSerializer implements Serializer<Feed> {
private static final Charset CHARSET = Charset.forName("UTF-8");
static private Gson gson = new Gson();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, Feed feed) {
@bbalakriz
bbalakriz / tweet-poller-kafka.properties
Created May 8, 2020 04:26
tweet-poller-kafka.properties
# kafka twitter-feeds config
mp.messaging.outgoing.twitter-feeds.bootstrap.servers=localhost:9092
mp.messaging.outgoing.twitter-feeds.connector=smallrye-kafka
mp.messaging.outgoing.twitter-feeds.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.twitter-feeds.value.serializer=com.redhat.hackfest.serializers.FeedSerializer
@bbalakriz
bbalakriz / CamelResource.java
Created May 8, 2020 04:06
CamelResource.java
@Path("/twitter")
@ApplicationScoped
public class CamelResource {
final Logger LOG = Logger.getLogger(CamelResource.class);
String twitterSearchUri = "twitter-search://#" + "$$TOPIC" + "?count=10&lang=en-us";
@Inject
@Channel("twitter-feeds")
Emitter<Feed> emitter;
@bbalakriz
bbalakriz / tweet-poller-camel.properties
Last active May 6, 2020 10:43
tweet-poller-app-properties
# camel twitter search config
camel.component.twitter-search.consumerKey=xxxxxYyfr2o9mxaay4
camel.component.twitter-search.consumerSecret=xxxxxxxxxxfOdhNqGE8EGVC6pKmDpfEJe1V6
camel.component.twitter-search.accessToken=xxxxxxx-44ZwHxl6zTWz0xO4Jh4qJKdyZMw
camel.component.twitter-search.accessTokenSecret=xxxxxxSW1ojFqOW7nT4W7frzNladZUUWpd2a
@bbalakriz
bbalakriz / Transaction.java
Created February 25, 2020 07:44
Amended Transaction.java
package com.bmi.calc;
/**
* This class was automatically generated by the data modeler tool.
*/
import java.io.Serializable;
import java.time.LocalDateTime;
import org.kie.api.definition.type.Label;
package com.bala.rhpam.test;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConcurrentHTTPGetStatusLogger {
private static final int MYTHREADS = 700;