Skip to content

Instantly share code, notes, and snippets.

@NABEEL-AHMED-JAMIL
Last active November 14, 2018 11:17
Show Gist options
  • Save NABEEL-AHMED-JAMIL/ff3db2604f8fa67000959e31ef96244e to your computer and use it in GitHub Desktop.
Save NABEEL-AHMED-JAMIL/ff3db2604f8fa67000959e31ef96244e to your computer and use it in GitHub Desktop.
Kafka => Auto Data Gen Data To => MongoDb => Jobs Scheduler 2 mint => Fetch New Data Mongdb => Update Lucence => Angular 6 View
package com.ballistic.kafka.broker;
import com.ballistic.kafka.mongdb.DbConnection;
import com.ballistic.kafka.mongdb.DeviceDAO;
import com.ballistic.kafka.mongdb.DeviceManager;
import com.ballistic.kafka.pojo.Device;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.UnsupportedEncodingException;
import java.util.*;
/**
* Note :- Consumer Class
* Name => Nabeel Ahmed
* Email => nabeel.amd93@gmail.com
* */
public class Consumer {
private DbConnection dbConnection = new DbConnection("localhost:27017", "kdb");
private DeviceManager deviceManager = null;
private KafkaConsumer<String, String> consumer = null;
private List<Device> devices = null;
private static volatile Boolean init = false;
public Consumer() {
if(!init){
System.out.println("Consumer :- Constrictor..Init.");
this.setConsumer(new KafkaConsumer<String, String>(this.getProperties()));
this.setDeviceManager(new DeviceManager(new DeviceDAO(dbConnection)));
System.out.println("Consumer :- Constrictor..End.");
init = true;
}
}
public DeviceManager getDeviceManager() { return deviceManager; }
private void setDeviceManager(DeviceManager deviceManager) { this.deviceManager = deviceManager; }
private Properties getProperties() {
System.out.println("Consumer :- Properties-Init.");
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("kafka.topic" , "demo1");
properties.put("compression.type" , "gzip");
properties.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("max.partition.fetch.bytes", "2097152");
properties.put("max.poll.records", "500");
properties.put("group.id", "test-group");
System.out.println("Consumer :- Properties-End.");
return properties;
}
public KafkaConsumer<String, String> getConsumer() { return consumer; }
private void setConsumer(KafkaConsumer<String, String> consumer) { this.consumer = consumer; }
public MsgKafka decodeMsg(String json) throws UnsupportedEncodingException {
long startTime = System.currentTimeMillis();
System.out.println("New Message :- Decode");
Gson gson = new Gson();
MsgKafka msg = gson.fromJson(json, MsgKafka.class);
byte[] decoderData = Base64.getDecoder().decode(msg.getData());
msg.setData(new String(decoderData, "utf-8"));
System.out.println("+----------------------------------------------------------------------------------------+");
System.out.println("Message :-" + msg);
try {
this.devices = new ArrayList<>();
this.devices = gson.fromJson(msg.getData(), new TypeToken<List<Device>>(){}.getType());
}catch (Exception e) {
System.err.println("Error :- Data Parse Exception");
}
System.out.println("+----------------------------------------------------------------------------------------+");
System.out.println("Msg Decode :- " + "Successfully " + (System.currentTimeMillis() - startTime)/1000 + " sec.");
return msg;
}
public void startConsumer() {
try{
List<String> topics = Arrays.asList(this.getProperties().getProperty("kafka.topic"));
this.getConsumer().subscribe(topics);
System.out.println("Subscribed to topic " + topics.toString());
while (true){
long startTime = System.currentTimeMillis();
System.out.println("Consumer :- Start " + startTime);
ConsumerRecords<String, String> records = this.getConsumer().poll(10);
if(!records.isEmpty()) {
for (ConsumerRecord<String, String> record: records) {
System.out.println("+-----------------------------------------------------------------------------+");
System.out.printf("partition = %s, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), decodeMsg(record.value()).getData());
if(devices.size() > 0) {
if(devices.size() > 10 ) {
this.getDeviceManager().save(devices);
} else {
devices.parallelStream().forEach(device -> {
if(device.getParchesDate() == null) { device.setParchesDate(new Date()); }
this.getDeviceManager().save(device);
});
}
}
}
System.out.println("Process Time :- " + (System.currentTimeMillis() - startTime)/1000 + " sec.");
}else {
System.out.println("Wait...ing");
}
}
} catch (UnsupportedEncodingException e) {
System.err.println("Error " + e.getLocalizedMessage());
} finally {
this.getConsumer().close();
System.out.println("Producer :- Close");
}
}
public static void main(String[] args) {
Consumer consumer = new Consumer();
consumer.startConsumer();
}
}
package com.ballistic.kafka.mongdb;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
import com.mongodb.ReadPreference;
import org.mongodb.morphia.Datastore;
import org.mongodb.morphia.Morphia;
public class DbConnection {
private Morphia morphia = null;
private Datastore datastore = null;
private MongoClient mongoClient = null;
private String dbNames = null;
private String hosts = null;
public DbConnection(String hosts, String dbNames) {
this.hosts = hosts;
this.dbNames = dbNames;
MongoClientOptions options = MongoClientOptions.builder()
.connectionsPerHost(2).socketKeepAlive(true).connectTimeout(60000)
.socketTimeout(60000).maxWaitTime(1000).maxConnectionIdleTime(60000)
.readPreference(ReadPreference.secondaryPreferred())
.build();
// use :- hard-code for now
if(this.hosts.equals("localhost:27017") && this.dbNames != null) {
this.mongoClient = new MongoClient(hosts,options);
this.morphia = new Morphia();
this.datastore = this.morphia.createDatastore(this.mongoClient, dbNames);
this.datastore.ensureIndexes();
System.out.println("DB:- Database connection Successful");
}else {
System.err.println("DB:- Database connection Fail");
}
}
public Datastore getDb() { return this.datastore; }
public static void main(String args[]) {
DbConnection dbConnection = new DbConnection("localhost:27017", "kdb");
}
}
package com.ballistic.kafka.pojo;
import com.google.gson.Gson;
import org.mongodb.morphia.annotations.*;
import java.util.Date;
@Entity("device")
@Indexes(@Index(fields = { @Field("devName") }, options = @IndexOptions(unique = true)))
public class Device {
@Id
private String id;
@Property("devName")
private String devName;
@Property("price")
private String price;
@Property("parchesDate")
private Date parchesDate;
@Property("saleDate")
private Date saleDate;
@Property("status")
private Status status;
@Embedded
private Photo photo;
public Device() { }
public Device(String id, String devName, String price) {
this.id = id;
this.devName = devName;
this.price = price;
}
public Device(String devName, String price, Date parchesDate, Date saleDate, Status status) {
this.devName = devName;
this.price = price;
this.parchesDate = parchesDate;
this.saleDate = saleDate;
this.status = status;
}
public Device(String id, String devName, String price, Date parchesDate, Status status, Photo photo) {
this.id = id;
this.devName = devName;
this.price = price;
this.parchesDate = parchesDate;
this.status = status;
this.photo = photo;
}
public Device(String id, String devName, String price, Date parchesDate, Date saleDate, Status status) {
this.id = id;
this.devName = devName;
this.price = price;
this.parchesDate = parchesDate;
this.saleDate = saleDate;
this.status = status;
}
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getDevName() { return devName; }
public void setDevName(String devName) { this.devName = devName; }
public String getPrice() { return price; }
public void setPrice(String price) { this.price = price; }
public Date getParchesDate() { return parchesDate; }
public void setParchesDate(Date parchesDate) { this.parchesDate = parchesDate; }
public Date getSaleDate() { return saleDate; }
public void setSaleDate(Date saleDate) { this.saleDate = saleDate; }
public Status getStatus() { return status; }
public void setStatus(Status status) { this.status = status; }
public Photo getPhoto() { return photo; }
public void setPhoto(Photo photo) { this.photo = photo; }
@Override
public String toString() {
return new Gson().toJson(this);
}
}
package com.ballistic.kafka.mongdb;
import com.ballistic.kafka.pojo.Device;
import com.ballistic.kafka.pojo.Photo;
import com.ballistic.kafka.pojo.Status;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.mongodb.MongoException;
import org.mongodb.morphia.mapping.Mapper;
import org.mongodb.morphia.query.Query;
import org.mongodb.morphia.query.UpdateOperations;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
public class DeviceDAO implements IDeviceDAO {
private static final String DEV_NAME = "devName";
private static final String PRICE = "price";
private static final String STATUS = "status";
private static final String SALEDATE = "saleDate";
private static final String PHOTO = "photo";
private static final String PHOTONAME = "photoName";
private static final String PHOTOURL = "photoUrl";
private static final String SIZE = "size";
private static final String TYPE = "type";
private static final String CREATED = "created";
private static final String UPDATE = "update";
private DbConnection dbConnection = null;
/**
* Note:- Duration Change from 15 to 1
* Reason:- Due to Consumer update the Db on each second so we lime the cache evict time '1' mint
* Issue:- data-retrieve => solve
* */
private static Cache<Status, List<Device>> fetchAllByDeviceStatusCache =
CacheBuilder.newBuilder().maximumSize(5000).expireAfterWrite(1, TimeUnit.MINUTES).build();
private static Cache<String, List<Device>> fetchDevicesByPricesOrSizeCache =
CacheBuilder.newBuilder().maximumSize(5000).expireAfterWrite(1, TimeUnit.MINUTES).build();
public DeviceDAO(DbConnection dbConnection) { this.dbConnection = dbConnection; }
// Test-Done
@Override
public void save(List<Device> devices) {
long startTime = System.currentTimeMillis();
this.dbConnection.getDb().save(devices);
System.out.println("Db Store Process Time :- " + (System.currentTimeMillis() - startTime)/1000 + " sec.");
}
// Test-Done
@Override
public void save(Device device) {
long startTime = System.currentTimeMillis();
this.dbConnection.getDb().save(device);
System.out.println("Db Store Process Time :- " + (System.currentTimeMillis() - startTime)/1000 + " sec.");
}
// Test-Done
@Override
public void update(List<Device> devices) {
long startTime = System.currentTimeMillis();
if(!devices.equals(null) && devices.size() > 0) {
devices.parallelStream().forEach(device -> { update(device); });
}else {
System.err.println("Device's have Null Object So Fetch Process Fail");
}
System.out.println("Db Update Process Time :- " + (System.currentTimeMillis() - startTime)/1000 + " sec.");
}
// Test-Done
@Override
public void update(Device device) {
long startTime = System.currentTimeMillis();
if(!device.equals(null)) {
if(device.getId() != null) {
try {
final Query<Device> findQuery = this.dbConnection.getDb().createQuery(Device.class).field(Mapper.ID_KEY).equal(device.getId()).disableValidation();
UpdateOperations<Device> updateOps = this.dbConnection.getDb().createUpdateOperations(Device.class);
if(device.getDevName() != null) { updateOps.set(DEV_NAME, device.getDevName()); }
try {
if(device.getPrice() != null && Double.valueOf(device.getPrice()) > 0) {
updateOps.set(PRICE, device.getPrice());
}
}catch (NumberFormatException e) { System.err.println("Error " + e.getLocalizedMessage()); }
if(device.getStatus() != null) { updateOps.set(STATUS, device.getStatus()); }
if(device.getSaleDate() != null &&
(device.getParchesDate() != null && device.getParchesDate().before(device.getSaleDate()))) {
updateOps.set(SALEDATE, device.getStatus().getValue());
}
if(device.getPhoto() != null) {
Photo photo = device.getPhoto();
if (photo.getPhotoName() != null) { updateOps.set(PHOTO+"."+PHOTONAME, photo.getPhotoName()); }
if (photo.getPhotoUrl() != null && photo.getPhotoName() != null) { updateOps.set(PHOTO+"."+PHOTOURL, photo.getPhotoUrl()); }
if (photo.getSize() != null) { updateOps.set(PHOTO+"."+SIZE, photo.getSize()); }
if (photo.getType() != null) { updateOps.set(PHOTO+"."+TYPE, photo.getType()); }
if (photo.getCreated() != null) { updateOps.set(PHOTO+"."+CREATED, photo.getCreated()); }
if ((photo.getSize() != null && photo.getCreated() != null) && (photo.getUpdate().after(photo.getCreated()))) {
updateOps.set(PHOTO+"."+UPDATE, photo.getUpdate());
}
}
this.dbConnection.getDb().update(findQuery, updateOps, true);
}catch (MongoException e) {
System.err.println("Error " + e.getLocalizedMessage());
} catch (Exception e) {
System.err.println("Error " + e.getLocalizedMessage());
}
} else {
System.err.println("Zero Document fetch..");
}
}else {
System.err.println("Device have Null Object So Fetch Process Fail");
}
System.out.println("Db Update Process Time :- " + (System.currentTimeMillis() - startTime)/ 1000 + " sec.");
}
// Test-Done
@Override
public Device findById(String id) {
Device device = new Device();
if(id != null) { device = this.dbConnection.getDb().createQuery(Device.class).field(Mapper.ID_KEY).equal(id).get(); }
return device;
}
// Test-Done
@Deprecated
@Override
public List<Device> getAllDevices() {
List<Device> devices = Lists.newArrayList();
try {
devices = this.dbConnection.getDb().find(Device.class).asList();
return devices;
}catch (OutOfMemoryError e) {
System.out.println("Error " + e.getLocalizedMessage());
return devices;
}
}
// Test-Done
@Override
public List<Device> fetchDevicesByPricesOrSize(String field,String value,Integer option) {
System.out.println("Fetch By " + field + " value " + value + " option " + option);
List<Device> devices = Lists.newArrayList();
if((field != null && value != null) && option != null) {
if(field.equalsIgnoreCase(SIZE) || field.equalsIgnoreCase(PRICE)) {
if(field.equalsIgnoreCase(SIZE)) { field = PHOTO+"."+field; }
try {
DbConnection dbConnection = this.dbConnection;
String finalField = field;
devices = fetchDevicesByPricesOrSizeCache.get(value, new Callable<List<Device>>() {
@Override
public List<Device> call() throws Exception {
List<Device> devices = Lists.newArrayList();
System.out.println("Fetch From Db");
if(option == 0) { // equal
devices = dbConnection.getDb().createQuery(Device.class).field(finalField).equal(value).asList();
}else if (option == 1) { // greater then equal
devices = dbConnection.getDb().createQuery(Device.class).field(finalField).greaterThanOrEq(value).asList();
} else if(option == -1) { // less then equal
devices = dbConnection.getDb().createQuery(Device.class).field(finalField).lessThanOrEq(value).asList();
}
return devices;
}
});
}catch (NumberFormatException e) {
System.err.println("Error " + e.getLocalizedMessage());
}catch (Exception e) {
System.out.println("Error " + e.getLocalizedMessage());
}
} else {
System.err.println("Search field wrong it's should be 'size' either 'price'");
}
}
return devices;
}
@Override
public List<Device> fetchAllByDeviceStatus(Status status) {
System.out.println("Fetch By " + status);
List<Device> devices = Lists.newArrayList();
try {
DbConnection dbConnection = this.dbConnection;
devices = fetchAllByDeviceStatusCache.get(status, new Callable<List<Device>>() {
@Override
public List<Device> call() throws Exception {
List<Device> devices = Lists.newArrayList();
devices = dbConnection.getDb().createQuery(Device.class).field(STATUS).equal(status).asList();
System.out.println("Fetch From Db");
return devices;
}
});
}catch (Exception e) {
System.err.println("Error " + e.getLocalizedMessage());
}
return devices;
}
// under-process => not yet complete
@Override
public List<Device> fetchAllByDates(Date from, Date to) {
List<Device> devices = Lists.newArrayList();
if(to != null && from != null) {
if(from.before(to)) {
try {
DbConnection dbConnection = this.dbConnection;
// work like as id's and second time if new id's
String finalField = to + "_" + from;
devices = fetchDevicesByPricesOrSizeCache.get(finalField, new Callable<List<Device>>() {
@Override
public List<Device> call() throws Exception {
List<Device> devices = Lists.newArrayList();
devices = dbConnection.getDb().createQuery(Device.class).field("parchesDate").
greaterThanOrEq(new Date()).field("parchesDate").lessThanOrEq(new Date()).asList();
return devices;
}
});
} catch (Exception e) {
System.out.println("Error " + e.getLocalizedMessage());
}
}
}
return devices;
}
// Test-Case Runner
public static void main(String args[]) throws InterruptedException, ExecutionException, ParseException {
DbConnection dbConnection = new DbConnection("localhost:27017", "kdb");
IDeviceDAO ideviceDAO = new DeviceDAO(dbConnection);
/**
* Date from = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS").parse("2018-11-13 20:40:00.000");
* Date to = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS").parse("2018-11-13 20:40:01.000");
* System.out.println("Size " + ideviceDAO.fetchAllByDates(from, to).size());
* while (true) {
* long startTime = System.currentTimeMillis();
* System.out.println("+--------------------------------+");
* System.out.println("Size "+ideviceDAO.fetchDevicesByPricesOrSize("price", String.valueOf(Producer.getRandomNumberInRange(1, 100)), 0).size());
* // System.out.println("Size " + ideviceDAO.getAllDevices().size());
* System.out.println("Total Fetch Time " + (System.currentTimeMillis()-startTime)/1000 + " sec.");
* }
*
* while (true) {
* long startTime = System.currentTimeMillis();
* ExecutorService executorService = Executors.newSingleThreadExecutor();
* Callable<List<Device>> callable = () -> {
* System.out.println("Entered Callable");
* Thread.sleep(200);
* return ideviceDAO.fetchAllByDeviceStatus(Status.SECOND);
* };
* Future<List<Device>> future = executorService.submit(callable);
* System.out.println(future.get().size());
* //future.get().parallelStream().forEach(System.out::println);
* System.out.println("Total Fetch Time " + (System.currentTimeMillis()-startTime)/1000 + " sec.");
* executorService.shutdown();
* }
* ideviceDAO.save(new Device("baffbeaa", "85",new Date(), new Date(), Status.NEW));
* ideviceDAO.update(new Device("c9ea8b92-43df-43a7-8de8-4a6b717b9f71", "nabeel.amd93", "kjlj"));
* ideviceDAO.save(null);
* */
}
}
package com.ballistic.kafka.mongdb;
import com.ballistic.kafka.pojo.Device;
import com.ballistic.kafka.pojo.Status;
import java.util.Date;
import java.util.List;
public class DeviceManager {
private IDeviceDAO deviceDAO = null;
public DeviceManager(IDeviceDAO deviceDAO) {
System.out.println("DeviceManger :- Constrictor..Init.");
this.deviceDAO = deviceDAO;
System.out.println("DeviceManger :- Constrictor..End.");
}
// here we check and send all data
public void save(List<Device> devices) { this.deviceDAO.save(devices); }
public void save(Device device) { this.deviceDAO.save(device); }
public void update(List<Device> devices) { this.deviceDAO.update(devices); }
public void update(Device device) { this.deviceDAO.update(device); }
public Device findById(String id) { return this.deviceDAO.findById(id); }
public List<Device> getAllDevices() { return this.deviceDAO.getAllDevices(); }
public List<Device> fetchDevicesByPricesOrSize(String field,String value, Integer option) {
return this.deviceDAO.fetchDevicesByPricesOrSize(field,value,option);
}
public List<Device> fetchAllByDeviceStatus(Status status) { return this.deviceDAO.fetchAllByDeviceStatus(status); }
public List<Device> fetchAllByDates(Date to, Date from) { return this.deviceDAO.fetchAllByDates(to, from); }
}
package com.ballistic.kafka.mongdb;
import com.ballistic.kafka.pojo.Device;
import com.ballistic.kafka.pojo.Status;
import java.util.Date;
import java.util.List;
public interface IDeviceDAO {
// Save
public void save(List<Device> devices);
public void save(Device device);
// Update
public void update(List<Device> devices);
public void update(Device device);
// Fetch
public Device findById(String id);
public List<Device> getAllDevices();
public List<Device> fetchDevicesByPricesOrSize(String field ,String value, Integer option);
public List<Device> fetchAllByDeviceStatus(Status status);
public List<Device> fetchAllByDates(Date to, Date from);
}
package com.ballistic.kafka.broker;
import com.google.gson.Gson;
public class MsgKafka {
private String id;
private String timestamp;
private String data;
public MsgKafka() { }
public MsgKafka(String id, String timestamp, String data) {
this.id = id;
this.timestamp = timestamp;
this.data = data;
}
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getTimestamp() { return timestamp; }
public void setTimestamp(String timestamp) { this.timestamp = timestamp; }
public String getData() { return data; }
public void setData(String data) { this.data = data; }
@Override
public String toString() {
return new Gson().toJson(this);
}
}
package com.ballistic.kafka.pojo;
import com.google.gson.Gson;
import org.mongodb.morphia.annotations.*;
import java.util.Date;
@Entity("photo")
public class Photo {
@Property("photoName")
private String photoName;
@Property("photoUrl")
private String photoUrl;
@Property("size")
private String size;
@Property("type")
private String type;
@Property("created")
private Date created;
@Property("update")
private Date update;
public Photo() { }
public Photo(String photoName, String photoUrl, String size, String type, Date created, Date update) {
this.photoName = photoName;
this.photoUrl = photoUrl;
this.size = size;
this.type = type;
this.created = created;
this.update = update;
}
public String getPhotoName() { return photoName; }
public void setPhotoName(String photoName) { this.photoName = photoName; }
public String getPhotoUrl() { return photoUrl; }
public void setPhotoUrl(String photoUrl) { this.photoUrl = photoUrl; }
public String getSize() { return size; }
public void setSize(String size) { this.size = size; }
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public Date getCreated() { return created; }
public void setCreated(Date created) { this.created = created; }
public Date getUpdate() { return update; }
public void setUpdate(Date update) { this.update = update; }
@Override
public String toString() {
return new Gson().toJson(this);
}
}
package com.ballistic.kafka.broker;
import com.ballistic.kafka.pojo.Device;
import com.ballistic.kafka.pojo.Photo;
import com.ballistic.kafka.pojo.Status;
import com.google.gson.Gson;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.sql.Timestamp;
import java.util.*;
import java.util.List;
/**
* Note:- A Simple Single :- kafka Producer with kafka Consumer
* */
public class Producer {
private KafkaProducer<String, String> producer = null;
private static volatile Boolean init = false;
private ImageProcess imageProcess;
private List<BufferedImage> bufferedImages;
public Producer() {
if(!init){
System.out.println("Producer :- Constrictor..Init.");
this.setProducer(new KafkaProducer<String, String>(getProperties()));
this.imageProcess = new ImageProcess(6,6);
this.bufferedImages = this.imageProcess.getSpriteImages();
System.out.println("Producer :- Constrictor..End.");
init = true;
}
}
private Properties getProperties() {
System.out.println("Producer :- Properties-Init.");
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("acks", "0");
properties.put("retries", "1");
properties.put("batch.size", "20971520");
properties.put("linger.ms", "33");
properties.put("max.request.size", "2097152");
properties.put("compression.type", "gzip");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("kafka.topic", "demo1");
System.out.println("Producer :- Properties-End.");
return properties;
}
public KafkaProducer<String, String> getProducer() { return producer; }
private void setProducer(KafkaProducer<String, String> producer) { this.producer = producer; }
public static Integer getRandomNumberInRange(Integer min, Integer max) { return new Random().ints(min, (max + 1)).findFirst().getAsInt(); }
private String getMessage(String id) throws UnsupportedEncodingException {
try {
System.out.println("New Message :- Creating");
MsgKafka msgkafka = new MsgKafka();
msgkafka.setId(id);
msgkafka.setTimestamp(String.valueOf(new Timestamp(System.currentTimeMillis())));
msgkafka.setData(Base64.getEncoder().encodeToString(String.valueOf(this.chunkDevices()).getBytes("utf-8")));
String message = new Gson().toJson(msgkafka);
System.out.println("+----------------------------------------------------------------------------------------+");
System.out.println("Message :-" + message);
return message;
} catch (UnsupportedEncodingException e) {
System.err.println("Error :- " + e.getLocalizedMessage());
throw e;
}
}
private List<Device> chunkDevices() throws UnsupportedEncodingException {
long startTime = System.currentTimeMillis();
System.out.println("Chuck :- Start " + startTime);
List<Device> devices = new ArrayList<>();
int randeomChunk = this.getRandomNumberInRange(1, 1000);
System.out.println("Random Chuck :- " + randeomChunk);
for (Integer i = 0; i <= randeomChunk; i++) {
System.out.println("+-----------------------------------------------------------------------------------");
Device device = null;
if(i%2 == 0) {
device = new Device(
String.valueOf(UUID.randomUUID()), String.valueOf(UUID.randomUUID()).replaceAll("[^a-z]", ""),
String.valueOf(this.getRandomNumberInRange(1, 100)), new Date(), null,
this.getRandomNumberInRange(1, 2) == 1 ? Status.NEW : Status.SECOND);
;
} else {
/**
* Note:- Photo Object
* Bindery code will deserialize and print the part of image
* */
String photoName = String.valueOf(UUID.randomUUID()).replaceAll("[^a-z]", "");
String photoUrl = "https://ballistic.com/?photoName/" + photoName + "/photo="+
Base64.getEncoder().encodeToString
(String.valueOf(this.bufferedImages.get(this.getRandomNumberInRange(1, this.bufferedImages.size()-1))).getBytes("utf-8"));
device = new Device(String.valueOf(UUID.randomUUID()), String.valueOf(UUID.randomUUID()).replaceAll("[^a-z]", ""),
String.valueOf(this.getRandomNumberInRange(1, 100)), new Date(), this.getRandomNumberInRange(1, 2) == 1 ? Status.NEW : Status.SECOND,
new Photo(photoName, photoUrl, String.valueOf(this.getRandomNumberInRange(100, 1000)), this.type(), new Date(), new Date()));
}
devices.add(device);
System.out.println("Device :- " + device);
}
System.out.println("Chuck :- End :- " + (System.currentTimeMillis() - startTime)/1000 + "sec.");
return devices;
}
private String type() {
Integer type = getRandomNumberInRange(1, 3);
if(type == 1) {
return "image/png";
} else if (type == 2) {
return "image/jpg";
} else {
return "image/jpeg";
}
}
public void startProducer() {
try{
String topic = null;
while (true) {
long startTime = System.currentTimeMillis();
System.out.println("Producer :- Start " + startTime);
if(topic == null) { topic = String.valueOf(this.getProperties().get("kafka.topic")); }
String id = "device-" + UUID.randomUUID();
String message = getMessage(id);
System.out.println("Send :- " + message);
this.getProducer().send(new ProducerRecord(topic, id , message));
System.out.println("+----------------------------------------------------------------------------------");
long stopTime = System.currentTimeMillis();
long elapsedTime = (stopTime - startTime)/1000;
System.out.println("Send :- " + "Successfully " + elapsedTime + " sec.");
}
} catch (UnsupportedEncodingException e) {
System.err.println("Error :- " + e.getLocalizedMessage());
} finally {
this.getProducer().close();
System.out.println("Producer :- Close");
}
}
public final static class ImageProcess {
private static volatile Boolean init = false;
private final String imagePath = "split.png";
private BufferedImage spriteSheet;
private Integer rows, cols = 0;
// mean that in 1 image have 36 sprite
private final Integer totalSprite = 36;
private Integer width, height;
private List<BufferedImage> spriteImages = new ArrayList<>(totalSprite);
private File file;
public ImageProcess() { }
public ImageProcess(Integer rows, Integer cols) {
if(!init) {
if(!imagePath.contains("..")) {
this.file = new File(ClassLoader.getSystemResource(imagePath).getPath());
if(this.file != null && this.file.isFile()) {
try {
System.out.println("Read File :- "+ this.file);
this.setRows(rows);
this.setCols(cols);
this.setSpriteSheet(ImageIO.read(this.file));
} catch (IOException e) {
System.err.println("Error :- " + e.getLocalizedMessage());
}
init = true;
System.out.println("File Found and Assign");
}else {
System.err.println("Read File :- "+ imagePath + " Not Found.");
return;
}
}else {
System.err.println("Error Wrong Path");
}
}
}
public int getRows() { return rows; }
private void setRows(Integer rows) { this.rows = rows; }
public int getCols() { return cols; }
private void setCols(Integer cols) { this.cols = cols; }
public Integer getWidth() { return width; }
public void setWidth(Integer width) { this.width = width; }
public Integer getHeight() { return height; }
public void setHeight(Integer height) { this.height = height; }
public BufferedImage getSpriteSheet() { return spriteSheet; }
public void setSpriteSheet(BufferedImage spriteSheet) { this.spriteSheet = spriteSheet; }
// Note:- Sprite the image's and send the Boolean => if
private List<BufferedImage> getSpriteImages() {
if(this.getWidth() == null) { this.setWidth(this.getSpriteSheet().getWidth()/this.getCols()); }
if(this.getHeight() == null) { this.setHeight(this.getSpriteSheet().getHeight()/this.getRows()); }
int x = 0;
int y = 0;
for (int index = 0; index < totalSprite; index++) {
this.spriteImages.add(this.getSpriteSheet().getSubimage(x, y, width, height));
x += width;
if (x >= width * cols) {
x = 0;
y += height;
}
}
return this.spriteImages;
}
}
public static void main(String[] args){
Producer producer = new Producer();
producer.startProducer();
}
}
package com.ballistic.kafka.pojo;
import com.google.gson.Gson;
public enum Status {
NEW("New"), SECOND("Second");
private String value;
Status(String value) { this.value = value; }
public String getValue( ) { return value; }
@Override
public String toString() { return new Gson().toJson(this); }
}
@NABEEL-AHMED-JAMIL
Copy link
Author

Under----Working

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment