Last active
November 14, 2018 11:17
-
-
Save NABEEL-AHMED-JAMIL/ff3db2604f8fa67000959e31ef96244e to your computer and use it in GitHub Desktop.
Kafka => Auto Data Gen Data To => MongoDb => Jobs Scheduler 2 mint => Fetch New Data Mongdb => Update Lucence => Angular 6 View
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ballistic.kafka.broker; | |
import com.ballistic.kafka.mongdb.DbConnection; | |
import com.ballistic.kafka.mongdb.DeviceDAO; | |
import com.ballistic.kafka.mongdb.DeviceManager; | |
import com.ballistic.kafka.pojo.Device; | |
import com.google.gson.Gson; | |
import com.google.gson.reflect.TypeToken; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import java.io.UnsupportedEncodingException; | |
import java.util.*; | |
/** | |
* Note :- Consumer Class | |
* Name => Nabeel Ahmed | |
* Email => nabeel.amd93@gmail.com | |
* */ | |
public class Consumer { | |
private DbConnection dbConnection = new DbConnection("localhost:27017", "kdb"); | |
private DeviceManager deviceManager = null; | |
private KafkaConsumer<String, String> consumer = null; | |
private List<Device> devices = null; | |
private static volatile Boolean init = false; | |
public Consumer() { | |
if(!init){ | |
System.out.println("Consumer :- Constrictor..Init."); | |
this.setConsumer(new KafkaConsumer<String, String>(this.getProperties())); | |
this.setDeviceManager(new DeviceManager(new DeviceDAO(dbConnection))); | |
System.out.println("Consumer :- Constrictor..End."); | |
init = true; | |
} | |
} | |
public DeviceManager getDeviceManager() { return deviceManager; } | |
private void setDeviceManager(DeviceManager deviceManager) { this.deviceManager = deviceManager; } | |
private Properties getProperties() { | |
System.out.println("Consumer :- Properties-Init."); | |
Properties properties = new Properties(); | |
properties.put("bootstrap.servers", "localhost:9092"); | |
properties.put("kafka.topic" , "demo1"); | |
properties.put("compression.type" , "gzip"); | |
properties.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer"); | |
properties.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer"); | |
properties.put("max.partition.fetch.bytes", "2097152"); | |
properties.put("max.poll.records", "500"); | |
properties.put("group.id", "test-group"); | |
System.out.println("Consumer :- Properties-End."); | |
return properties; | |
} | |
public KafkaConsumer<String, String> getConsumer() { return consumer; } | |
private void setConsumer(KafkaConsumer<String, String> consumer) { this.consumer = consumer; } | |
public MsgKafka decodeMsg(String json) throws UnsupportedEncodingException { | |
long startTime = System.currentTimeMillis(); | |
System.out.println("New Message :- Decode"); | |
Gson gson = new Gson(); | |
MsgKafka msg = gson.fromJson(json, MsgKafka.class); | |
byte[] decoderData = Base64.getDecoder().decode(msg.getData()); | |
msg.setData(new String(decoderData, "utf-8")); | |
System.out.println("+----------------------------------------------------------------------------------------+"); | |
System.out.println("Message :-" + msg); | |
try { | |
this.devices = new ArrayList<>(); | |
this.devices = gson.fromJson(msg.getData(), new TypeToken<List<Device>>(){}.getType()); | |
}catch (Exception e) { | |
System.err.println("Error :- Data Parse Exception"); | |
} | |
System.out.println("+----------------------------------------------------------------------------------------+"); | |
System.out.println("Msg Decode :- " + "Successfully " + (System.currentTimeMillis() - startTime)/1000 + " sec."); | |
return msg; | |
} | |
public void startConsumer() { | |
try{ | |
List<String> topics = Arrays.asList(this.getProperties().getProperty("kafka.topic")); | |
this.getConsumer().subscribe(topics); | |
System.out.println("Subscribed to topic " + topics.toString()); | |
while (true){ | |
long startTime = System.currentTimeMillis(); | |
System.out.println("Consumer :- Start " + startTime); | |
ConsumerRecords<String, String> records = this.getConsumer().poll(10); | |
if(!records.isEmpty()) { | |
for (ConsumerRecord<String, String> record: records) { | |
System.out.println("+-----------------------------------------------------------------------------+"); | |
System.out.printf("partition = %s, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), decodeMsg(record.value()).getData()); | |
if(devices.size() > 0) { | |
if(devices.size() > 10 ) { | |
this.getDeviceManager().save(devices); | |
} else { | |
devices.parallelStream().forEach(device -> { | |
if(device.getParchesDate() == null) { device.setParchesDate(new Date()); } | |
this.getDeviceManager().save(device); | |
}); | |
} | |
} | |
} | |
System.out.println("Process Time :- " + (System.currentTimeMillis() - startTime)/1000 + " sec."); | |
}else { | |
System.out.println("Wait...ing"); | |
} | |
} | |
} catch (UnsupportedEncodingException e) { | |
System.err.println("Error " + e.getLocalizedMessage()); | |
} finally { | |
this.getConsumer().close(); | |
System.out.println("Producer :- Close"); | |
} | |
} | |
public static void main(String[] args) { | |
Consumer consumer = new Consumer(); | |
consumer.startConsumer(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ballistic.kafka.mongdb; | |
import com.mongodb.MongoClient; | |
import com.mongodb.MongoClientOptions; | |
import com.mongodb.ReadPreference; | |
import org.mongodb.morphia.Datastore; | |
import org.mongodb.morphia.Morphia; | |
public class DbConnection { | |
private Morphia morphia = null; | |
private Datastore datastore = null; | |
private MongoClient mongoClient = null; | |
private String dbNames = null; | |
private String hosts = null; | |
public DbConnection(String hosts, String dbNames) { | |
this.hosts = hosts; | |
this.dbNames = dbNames; | |
MongoClientOptions options = MongoClientOptions.builder() | |
.connectionsPerHost(2).socketKeepAlive(true).connectTimeout(60000) | |
.socketTimeout(60000).maxWaitTime(1000).maxConnectionIdleTime(60000) | |
.readPreference(ReadPreference.secondaryPreferred()) | |
.build(); | |
// use :- hard-code for now | |
if(this.hosts.equals("localhost:27017") && this.dbNames != null) { | |
this.mongoClient = new MongoClient(hosts,options); | |
this.morphia = new Morphia(); | |
this.datastore = this.morphia.createDatastore(this.mongoClient, dbNames); | |
this.datastore.ensureIndexes(); | |
System.out.println("DB:- Database connection Successful"); | |
}else { | |
System.err.println("DB:- Database connection Fail"); | |
} | |
} | |
public Datastore getDb() { return this.datastore; } | |
public static void main(String args[]) { | |
DbConnection dbConnection = new DbConnection("localhost:27017", "kdb"); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ballistic.kafka.pojo; | |
import com.google.gson.Gson; | |
import org.mongodb.morphia.annotations.*; | |
import java.util.Date; | |
@Entity("device") | |
@Indexes(@Index(fields = { @Field("devName") }, options = @IndexOptions(unique = true))) | |
public class Device { | |
@Id | |
private String id; | |
@Property("devName") | |
private String devName; | |
@Property("price") | |
private String price; | |
@Property("parchesDate") | |
private Date parchesDate; | |
@Property("saleDate") | |
private Date saleDate; | |
@Property("status") | |
private Status status; | |
@Embedded | |
private Photo photo; | |
public Device() { } | |
public Device(String id, String devName, String price) { | |
this.id = id; | |
this.devName = devName; | |
this.price = price; | |
} | |
public Device(String devName, String price, Date parchesDate, Date saleDate, Status status) { | |
this.devName = devName; | |
this.price = price; | |
this.parchesDate = parchesDate; | |
this.saleDate = saleDate; | |
this.status = status; | |
} | |
public Device(String id, String devName, String price, Date parchesDate, Status status, Photo photo) { | |
this.id = id; | |
this.devName = devName; | |
this.price = price; | |
this.parchesDate = parchesDate; | |
this.status = status; | |
this.photo = photo; | |
} | |
public Device(String id, String devName, String price, Date parchesDate, Date saleDate, Status status) { | |
this.id = id; | |
this.devName = devName; | |
this.price = price; | |
this.parchesDate = parchesDate; | |
this.saleDate = saleDate; | |
this.status = status; | |
} | |
public String getId() { return id; } | |
public void setId(String id) { this.id = id; } | |
public String getDevName() { return devName; } | |
public void setDevName(String devName) { this.devName = devName; } | |
public String getPrice() { return price; } | |
public void setPrice(String price) { this.price = price; } | |
public Date getParchesDate() { return parchesDate; } | |
public void setParchesDate(Date parchesDate) { this.parchesDate = parchesDate; } | |
public Date getSaleDate() { return saleDate; } | |
public void setSaleDate(Date saleDate) { this.saleDate = saleDate; } | |
public Status getStatus() { return status; } | |
public void setStatus(Status status) { this.status = status; } | |
public Photo getPhoto() { return photo; } | |
public void setPhoto(Photo photo) { this.photo = photo; } | |
@Override | |
public String toString() { | |
return new Gson().toJson(this); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ballistic.kafka.mongdb; | |
import com.ballistic.kafka.pojo.Device; | |
import com.ballistic.kafka.pojo.Photo; | |
import com.ballistic.kafka.pojo.Status; | |
import com.google.common.cache.Cache; | |
import com.google.common.cache.CacheBuilder; | |
import com.google.common.collect.Lists; | |
import com.mongodb.MongoException; | |
import org.mongodb.morphia.mapping.Mapper; | |
import org.mongodb.morphia.query.Query; | |
import org.mongodb.morphia.query.UpdateOperations; | |
import java.text.ParseException; | |
import java.text.SimpleDateFormat; | |
import java.util.Date; | |
import java.util.List; | |
import java.util.concurrent.*; | |
public class DeviceDAO implements IDeviceDAO { | |
private static final String DEV_NAME = "devName"; | |
private static final String PRICE = "price"; | |
private static final String STATUS = "status"; | |
private static final String SALEDATE = "saleDate"; | |
private static final String PHOTO = "photo"; | |
private static final String PHOTONAME = "photoName"; | |
private static final String PHOTOURL = "photoUrl"; | |
private static final String SIZE = "size"; | |
private static final String TYPE = "type"; | |
private static final String CREATED = "created"; | |
private static final String UPDATE = "update"; | |
private DbConnection dbConnection = null; | |
/** | |
* Note:- Duration Change from 15 to 1 | |
* Reason:- Due to Consumer update the Db on each second so we lime the cache evict time '1' mint | |
* Issue:- data-retrieve => solve | |
* */ | |
private static Cache<Status, List<Device>> fetchAllByDeviceStatusCache = | |
CacheBuilder.newBuilder().maximumSize(5000).expireAfterWrite(1, TimeUnit.MINUTES).build(); | |
private static Cache<String, List<Device>> fetchDevicesByPricesOrSizeCache = | |
CacheBuilder.newBuilder().maximumSize(5000).expireAfterWrite(1, TimeUnit.MINUTES).build(); | |
public DeviceDAO(DbConnection dbConnection) { this.dbConnection = dbConnection; } | |
// Test-Done | |
@Override | |
public void save(List<Device> devices) { | |
long startTime = System.currentTimeMillis(); | |
this.dbConnection.getDb().save(devices); | |
System.out.println("Db Store Process Time :- " + (System.currentTimeMillis() - startTime)/1000 + " sec."); | |
} | |
// Test-Done | |
@Override | |
public void save(Device device) { | |
long startTime = System.currentTimeMillis(); | |
this.dbConnection.getDb().save(device); | |
System.out.println("Db Store Process Time :- " + (System.currentTimeMillis() - startTime)/1000 + " sec."); | |
} | |
// Test-Done | |
@Override | |
public void update(List<Device> devices) { | |
long startTime = System.currentTimeMillis(); | |
if(!devices.equals(null) && devices.size() > 0) { | |
devices.parallelStream().forEach(device -> { update(device); }); | |
}else { | |
System.err.println("Device's have Null Object So Fetch Process Fail"); | |
} | |
System.out.println("Db Update Process Time :- " + (System.currentTimeMillis() - startTime)/1000 + " sec."); | |
} | |
// Test-Done | |
@Override | |
public void update(Device device) { | |
long startTime = System.currentTimeMillis(); | |
if(!device.equals(null)) { | |
if(device.getId() != null) { | |
try { | |
final Query<Device> findQuery = this.dbConnection.getDb().createQuery(Device.class).field(Mapper.ID_KEY).equal(device.getId()).disableValidation(); | |
UpdateOperations<Device> updateOps = this.dbConnection.getDb().createUpdateOperations(Device.class); | |
if(device.getDevName() != null) { updateOps.set(DEV_NAME, device.getDevName()); } | |
try { | |
if(device.getPrice() != null && Double.valueOf(device.getPrice()) > 0) { | |
updateOps.set(PRICE, device.getPrice()); | |
} | |
}catch (NumberFormatException e) { System.err.println("Error " + e.getLocalizedMessage()); } | |
if(device.getStatus() != null) { updateOps.set(STATUS, device.getStatus()); } | |
if(device.getSaleDate() != null && | |
(device.getParchesDate() != null && device.getParchesDate().before(device.getSaleDate()))) { | |
updateOps.set(SALEDATE, device.getStatus().getValue()); | |
} | |
if(device.getPhoto() != null) { | |
Photo photo = device.getPhoto(); | |
if (photo.getPhotoName() != null) { updateOps.set(PHOTO+"."+PHOTONAME, photo.getPhotoName()); } | |
if (photo.getPhotoUrl() != null && photo.getPhotoName() != null) { updateOps.set(PHOTO+"."+PHOTOURL, photo.getPhotoUrl()); } | |
if (photo.getSize() != null) { updateOps.set(PHOTO+"."+SIZE, photo.getSize()); } | |
if (photo.getType() != null) { updateOps.set(PHOTO+"."+TYPE, photo.getType()); } | |
if (photo.getCreated() != null) { updateOps.set(PHOTO+"."+CREATED, photo.getCreated()); } | |
if ((photo.getSize() != null && photo.getCreated() != null) && (photo.getUpdate().after(photo.getCreated()))) { | |
updateOps.set(PHOTO+"."+UPDATE, photo.getUpdate()); | |
} | |
} | |
this.dbConnection.getDb().update(findQuery, updateOps, true); | |
}catch (MongoException e) { | |
System.err.println("Error " + e.getLocalizedMessage()); | |
} catch (Exception e) { | |
System.err.println("Error " + e.getLocalizedMessage()); | |
} | |
} else { | |
System.err.println("Zero Document fetch.."); | |
} | |
}else { | |
System.err.println("Device have Null Object So Fetch Process Fail"); | |
} | |
System.out.println("Db Update Process Time :- " + (System.currentTimeMillis() - startTime)/ 1000 + " sec."); | |
} | |
// Test-Done | |
@Override | |
public Device findById(String id) { | |
Device device = new Device(); | |
if(id != null) { device = this.dbConnection.getDb().createQuery(Device.class).field(Mapper.ID_KEY).equal(id).get(); } | |
return device; | |
} | |
// Test-Done | |
@Deprecated | |
@Override | |
public List<Device> getAllDevices() { | |
List<Device> devices = Lists.newArrayList(); | |
try { | |
devices = this.dbConnection.getDb().find(Device.class).asList(); | |
return devices; | |
}catch (OutOfMemoryError e) { | |
System.out.println("Error " + e.getLocalizedMessage()); | |
return devices; | |
} | |
} | |
// Test-Done | |
@Override | |
public List<Device> fetchDevicesByPricesOrSize(String field,String value,Integer option) { | |
System.out.println("Fetch By " + field + " value " + value + " option " + option); | |
List<Device> devices = Lists.newArrayList(); | |
if((field != null && value != null) && option != null) { | |
if(field.equalsIgnoreCase(SIZE) || field.equalsIgnoreCase(PRICE)) { | |
if(field.equalsIgnoreCase(SIZE)) { field = PHOTO+"."+field; } | |
try { | |
DbConnection dbConnection = this.dbConnection; | |
String finalField = field; | |
devices = fetchDevicesByPricesOrSizeCache.get(value, new Callable<List<Device>>() { | |
@Override | |
public List<Device> call() throws Exception { | |
List<Device> devices = Lists.newArrayList(); | |
System.out.println("Fetch From Db"); | |
if(option == 0) { // equal | |
devices = dbConnection.getDb().createQuery(Device.class).field(finalField).equal(value).asList(); | |
}else if (option == 1) { // greater then equal | |
devices = dbConnection.getDb().createQuery(Device.class).field(finalField).greaterThanOrEq(value).asList(); | |
} else if(option == -1) { // less then equal | |
devices = dbConnection.getDb().createQuery(Device.class).field(finalField).lessThanOrEq(value).asList(); | |
} | |
return devices; | |
} | |
}); | |
}catch (NumberFormatException e) { | |
System.err.println("Error " + e.getLocalizedMessage()); | |
}catch (Exception e) { | |
System.out.println("Error " + e.getLocalizedMessage()); | |
} | |
} else { | |
System.err.println("Search field wrong it's should be 'size' either 'price'"); | |
} | |
} | |
return devices; | |
} | |
@Override | |
public List<Device> fetchAllByDeviceStatus(Status status) { | |
System.out.println("Fetch By " + status); | |
List<Device> devices = Lists.newArrayList(); | |
try { | |
DbConnection dbConnection = this.dbConnection; | |
devices = fetchAllByDeviceStatusCache.get(status, new Callable<List<Device>>() { | |
@Override | |
public List<Device> call() throws Exception { | |
List<Device> devices = Lists.newArrayList(); | |
devices = dbConnection.getDb().createQuery(Device.class).field(STATUS).equal(status).asList(); | |
System.out.println("Fetch From Db"); | |
return devices; | |
} | |
}); | |
}catch (Exception e) { | |
System.err.println("Error " + e.getLocalizedMessage()); | |
} | |
return devices; | |
} | |
// under-process => not yet complete | |
@Override | |
public List<Device> fetchAllByDates(Date from, Date to) { | |
List<Device> devices = Lists.newArrayList(); | |
if(to != null && from != null) { | |
if(from.before(to)) { | |
try { | |
DbConnection dbConnection = this.dbConnection; | |
// work like as id's and second time if new id's | |
String finalField = to + "_" + from; | |
devices = fetchDevicesByPricesOrSizeCache.get(finalField, new Callable<List<Device>>() { | |
@Override | |
public List<Device> call() throws Exception { | |
List<Device> devices = Lists.newArrayList(); | |
devices = dbConnection.getDb().createQuery(Device.class).field("parchesDate"). | |
greaterThanOrEq(new Date()).field("parchesDate").lessThanOrEq(new Date()).asList(); | |
return devices; | |
} | |
}); | |
} catch (Exception e) { | |
System.out.println("Error " + e.getLocalizedMessage()); | |
} | |
} | |
} | |
return devices; | |
} | |
// Test-Case Runner | |
public static void main(String args[]) throws InterruptedException, ExecutionException, ParseException { | |
DbConnection dbConnection = new DbConnection("localhost:27017", "kdb"); | |
IDeviceDAO ideviceDAO = new DeviceDAO(dbConnection); | |
/** | |
* Date from = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS").parse("2018-11-13 20:40:00.000"); | |
* Date to = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS").parse("2018-11-13 20:40:01.000"); | |
* System.out.println("Size " + ideviceDAO.fetchAllByDates(from, to).size()); | |
* while (true) { | |
* long startTime = System.currentTimeMillis(); | |
* System.out.println("+--------------------------------+"); | |
* System.out.println("Size "+ideviceDAO.fetchDevicesByPricesOrSize("price", String.valueOf(Producer.getRandomNumberInRange(1, 100)), 0).size()); | |
* // System.out.println("Size " + ideviceDAO.getAllDevices().size()); | |
* System.out.println("Total Fetch Time " + (System.currentTimeMillis()-startTime)/1000 + " sec."); | |
* } | |
* | |
* while (true) { | |
* long startTime = System.currentTimeMillis(); | |
* ExecutorService executorService = Executors.newSingleThreadExecutor(); | |
* Callable<List<Device>> callable = () -> { | |
* System.out.println("Entered Callable"); | |
* Thread.sleep(200); | |
* return ideviceDAO.fetchAllByDeviceStatus(Status.SECOND); | |
* }; | |
* Future<List<Device>> future = executorService.submit(callable); | |
* System.out.println(future.get().size()); | |
* //future.get().parallelStream().forEach(System.out::println); | |
* System.out.println("Total Fetch Time " + (System.currentTimeMillis()-startTime)/1000 + " sec."); | |
* executorService.shutdown(); | |
* } | |
* ideviceDAO.save(new Device("baffbeaa", "85",new Date(), new Date(), Status.NEW)); | |
* ideviceDAO.update(new Device("c9ea8b92-43df-43a7-8de8-4a6b717b9f71", "nabeel.amd93", "kjlj")); | |
* ideviceDAO.save(null); | |
* */ | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ballistic.kafka.mongdb; | |
import com.ballistic.kafka.pojo.Device; | |
import com.ballistic.kafka.pojo.Status; | |
import java.util.Date; | |
import java.util.List; | |
public class DeviceManager { | |
private IDeviceDAO deviceDAO = null; | |
public DeviceManager(IDeviceDAO deviceDAO) { | |
System.out.println("DeviceManger :- Constrictor..Init."); | |
this.deviceDAO = deviceDAO; | |
System.out.println("DeviceManger :- Constrictor..End."); | |
} | |
// here we check and send all data | |
public void save(List<Device> devices) { this.deviceDAO.save(devices); } | |
public void save(Device device) { this.deviceDAO.save(device); } | |
public void update(List<Device> devices) { this.deviceDAO.update(devices); } | |
public void update(Device device) { this.deviceDAO.update(device); } | |
public Device findById(String id) { return this.deviceDAO.findById(id); } | |
public List<Device> getAllDevices() { return this.deviceDAO.getAllDevices(); } | |
public List<Device> fetchDevicesByPricesOrSize(String field,String value, Integer option) { | |
return this.deviceDAO.fetchDevicesByPricesOrSize(field,value,option); | |
} | |
public List<Device> fetchAllByDeviceStatus(Status status) { return this.deviceDAO.fetchAllByDeviceStatus(status); } | |
public List<Device> fetchAllByDates(Date to, Date from) { return this.deviceDAO.fetchAllByDates(to, from); } | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ballistic.kafka.mongdb; | |
import com.ballistic.kafka.pojo.Device; | |
import com.ballistic.kafka.pojo.Status; | |
import java.util.Date; | |
import java.util.List; | |
public interface IDeviceDAO { | |
// Save | |
public void save(List<Device> devices); | |
public void save(Device device); | |
// Update | |
public void update(List<Device> devices); | |
public void update(Device device); | |
// Fetch | |
public Device findById(String id); | |
public List<Device> getAllDevices(); | |
public List<Device> fetchDevicesByPricesOrSize(String field ,String value, Integer option); | |
public List<Device> fetchAllByDeviceStatus(Status status); | |
public List<Device> fetchAllByDates(Date to, Date from); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ballistic.kafka.broker; | |
import com.google.gson.Gson; | |
public class MsgKafka { | |
private String id; | |
private String timestamp; | |
private String data; | |
public MsgKafka() { } | |
public MsgKafka(String id, String timestamp, String data) { | |
this.id = id; | |
this.timestamp = timestamp; | |
this.data = data; | |
} | |
public String getId() { return id; } | |
public void setId(String id) { this.id = id; } | |
public String getTimestamp() { return timestamp; } | |
public void setTimestamp(String timestamp) { this.timestamp = timestamp; } | |
public String getData() { return data; } | |
public void setData(String data) { this.data = data; } | |
@Override | |
public String toString() { | |
return new Gson().toJson(this); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ballistic.kafka.pojo; | |
import com.google.gson.Gson; | |
import org.mongodb.morphia.annotations.*; | |
import java.util.Date; | |
@Entity("photo") | |
public class Photo { | |
@Property("photoName") | |
private String photoName; | |
@Property("photoUrl") | |
private String photoUrl; | |
@Property("size") | |
private String size; | |
@Property("type") | |
private String type; | |
@Property("created") | |
private Date created; | |
@Property("update") | |
private Date update; | |
public Photo() { } | |
public Photo(String photoName, String photoUrl, String size, String type, Date created, Date update) { | |
this.photoName = photoName; | |
this.photoUrl = photoUrl; | |
this.size = size; | |
this.type = type; | |
this.created = created; | |
this.update = update; | |
} | |
public String getPhotoName() { return photoName; } | |
public void setPhotoName(String photoName) { this.photoName = photoName; } | |
public String getPhotoUrl() { return photoUrl; } | |
public void setPhotoUrl(String photoUrl) { this.photoUrl = photoUrl; } | |
public String getSize() { return size; } | |
public void setSize(String size) { this.size = size; } | |
public String getType() { return type; } | |
public void setType(String type) { this.type = type; } | |
public Date getCreated() { return created; } | |
public void setCreated(Date created) { this.created = created; } | |
public Date getUpdate() { return update; } | |
public void setUpdate(Date update) { this.update = update; } | |
@Override | |
public String toString() { | |
return new Gson().toJson(this); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ballistic.kafka.broker; | |
import com.ballistic.kafka.pojo.Device; | |
import com.ballistic.kafka.pojo.Photo; | |
import com.ballistic.kafka.pojo.Status; | |
import com.google.gson.Gson; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import javax.imageio.ImageIO; | |
import java.awt.image.BufferedImage; | |
import java.io.File; | |
import java.io.IOException; | |
import java.io.UnsupportedEncodingException; | |
import java.sql.Timestamp; | |
import java.util.*; | |
import java.util.List; | |
/** | |
* Note:- A Simple Single :- kafka Producer with kafka Consumer | |
* */ | |
public class Producer { | |
private KafkaProducer<String, String> producer = null; | |
private static volatile Boolean init = false; | |
private ImageProcess imageProcess; | |
private List<BufferedImage> bufferedImages; | |
public Producer() { | |
if(!init){ | |
System.out.println("Producer :- Constrictor..Init."); | |
this.setProducer(new KafkaProducer<String, String>(getProperties())); | |
this.imageProcess = new ImageProcess(6,6); | |
this.bufferedImages = this.imageProcess.getSpriteImages(); | |
System.out.println("Producer :- Constrictor..End."); | |
init = true; | |
} | |
} | |
private Properties getProperties() { | |
System.out.println("Producer :- Properties-Init."); | |
Properties properties = new Properties(); | |
properties.put("bootstrap.servers", "localhost:9092"); | |
properties.put("acks", "0"); | |
properties.put("retries", "1"); | |
properties.put("batch.size", "20971520"); | |
properties.put("linger.ms", "33"); | |
properties.put("max.request.size", "2097152"); | |
properties.put("compression.type", "gzip"); | |
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
properties.put("kafka.topic", "demo1"); | |
System.out.println("Producer :- Properties-End."); | |
return properties; | |
} | |
public KafkaProducer<String, String> getProducer() { return producer; } | |
private void setProducer(KafkaProducer<String, String> producer) { this.producer = producer; } | |
public static Integer getRandomNumberInRange(Integer min, Integer max) { return new Random().ints(min, (max + 1)).findFirst().getAsInt(); } | |
private String getMessage(String id) throws UnsupportedEncodingException { | |
try { | |
System.out.println("New Message :- Creating"); | |
MsgKafka msgkafka = new MsgKafka(); | |
msgkafka.setId(id); | |
msgkafka.setTimestamp(String.valueOf(new Timestamp(System.currentTimeMillis()))); | |
msgkafka.setData(Base64.getEncoder().encodeToString(String.valueOf(this.chunkDevices()).getBytes("utf-8"))); | |
String message = new Gson().toJson(msgkafka); | |
System.out.println("+----------------------------------------------------------------------------------------+"); | |
System.out.println("Message :-" + message); | |
return message; | |
} catch (UnsupportedEncodingException e) { | |
System.err.println("Error :- " + e.getLocalizedMessage()); | |
throw e; | |
} | |
} | |
private List<Device> chunkDevices() throws UnsupportedEncodingException { | |
long startTime = System.currentTimeMillis(); | |
System.out.println("Chuck :- Start " + startTime); | |
List<Device> devices = new ArrayList<>(); | |
int randeomChunk = this.getRandomNumberInRange(1, 1000); | |
System.out.println("Random Chuck :- " + randeomChunk); | |
for (Integer i = 0; i <= randeomChunk; i++) { | |
System.out.println("+-----------------------------------------------------------------------------------"); | |
Device device = null; | |
if(i%2 == 0) { | |
device = new Device( | |
String.valueOf(UUID.randomUUID()), String.valueOf(UUID.randomUUID()).replaceAll("[^a-z]", ""), | |
String.valueOf(this.getRandomNumberInRange(1, 100)), new Date(), null, | |
this.getRandomNumberInRange(1, 2) == 1 ? Status.NEW : Status.SECOND); | |
; | |
} else { | |
/** | |
* Note:- Photo Object | |
* Bindery code will deserialize and print the part of image | |
* */ | |
String photoName = String.valueOf(UUID.randomUUID()).replaceAll("[^a-z]", ""); | |
String photoUrl = "https://ballistic.com/?photoName/" + photoName + "/photo="+ | |
Base64.getEncoder().encodeToString | |
(String.valueOf(this.bufferedImages.get(this.getRandomNumberInRange(1, this.bufferedImages.size()-1))).getBytes("utf-8")); | |
device = new Device(String.valueOf(UUID.randomUUID()), String.valueOf(UUID.randomUUID()).replaceAll("[^a-z]", ""), | |
String.valueOf(this.getRandomNumberInRange(1, 100)), new Date(), this.getRandomNumberInRange(1, 2) == 1 ? Status.NEW : Status.SECOND, | |
new Photo(photoName, photoUrl, String.valueOf(this.getRandomNumberInRange(100, 1000)), this.type(), new Date(), new Date())); | |
} | |
devices.add(device); | |
System.out.println("Device :- " + device); | |
} | |
System.out.println("Chuck :- End :- " + (System.currentTimeMillis() - startTime)/1000 + "sec."); | |
return devices; | |
} | |
private String type() { | |
Integer type = getRandomNumberInRange(1, 3); | |
if(type == 1) { | |
return "image/png"; | |
} else if (type == 2) { | |
return "image/jpg"; | |
} else { | |
return "image/jpeg"; | |
} | |
} | |
public void startProducer() { | |
try{ | |
String topic = null; | |
while (true) { | |
long startTime = System.currentTimeMillis(); | |
System.out.println("Producer :- Start " + startTime); | |
if(topic == null) { topic = String.valueOf(this.getProperties().get("kafka.topic")); } | |
String id = "device-" + UUID.randomUUID(); | |
String message = getMessage(id); | |
System.out.println("Send :- " + message); | |
this.getProducer().send(new ProducerRecord(topic, id , message)); | |
System.out.println("+----------------------------------------------------------------------------------"); | |
long stopTime = System.currentTimeMillis(); | |
long elapsedTime = (stopTime - startTime)/1000; | |
System.out.println("Send :- " + "Successfully " + elapsedTime + " sec."); | |
} | |
} catch (UnsupportedEncodingException e) { | |
System.err.println("Error :- " + e.getLocalizedMessage()); | |
} finally { | |
this.getProducer().close(); | |
System.out.println("Producer :- Close"); | |
} | |
} | |
public final static class ImageProcess { | |
private static volatile Boolean init = false; | |
private final String imagePath = "split.png"; | |
private BufferedImage spriteSheet; | |
private Integer rows, cols = 0; | |
// mean that in 1 image have 36 sprite | |
private final Integer totalSprite = 36; | |
private Integer width, height; | |
private List<BufferedImage> spriteImages = new ArrayList<>(totalSprite); | |
private File file; | |
public ImageProcess() { } | |
public ImageProcess(Integer rows, Integer cols) { | |
if(!init) { | |
if(!imagePath.contains("..")) { | |
this.file = new File(ClassLoader.getSystemResource(imagePath).getPath()); | |
if(this.file != null && this.file.isFile()) { | |
try { | |
System.out.println("Read File :- "+ this.file); | |
this.setRows(rows); | |
this.setCols(cols); | |
this.setSpriteSheet(ImageIO.read(this.file)); | |
} catch (IOException e) { | |
System.err.println("Error :- " + e.getLocalizedMessage()); | |
} | |
init = true; | |
System.out.println("File Found and Assign"); | |
}else { | |
System.err.println("Read File :- "+ imagePath + " Not Found."); | |
return; | |
} | |
}else { | |
System.err.println("Error Wrong Path"); | |
} | |
} | |
} | |
public int getRows() { return rows; } | |
private void setRows(Integer rows) { this.rows = rows; } | |
public int getCols() { return cols; } | |
private void setCols(Integer cols) { this.cols = cols; } | |
public Integer getWidth() { return width; } | |
public void setWidth(Integer width) { this.width = width; } | |
public Integer getHeight() { return height; } | |
public void setHeight(Integer height) { this.height = height; } | |
public BufferedImage getSpriteSheet() { return spriteSheet; } | |
public void setSpriteSheet(BufferedImage spriteSheet) { this.spriteSheet = spriteSheet; } | |
// Note:- Sprite the image's and send the Boolean => if | |
private List<BufferedImage> getSpriteImages() { | |
if(this.getWidth() == null) { this.setWidth(this.getSpriteSheet().getWidth()/this.getCols()); } | |
if(this.getHeight() == null) { this.setHeight(this.getSpriteSheet().getHeight()/this.getRows()); } | |
int x = 0; | |
int y = 0; | |
for (int index = 0; index < totalSprite; index++) { | |
this.spriteImages.add(this.getSpriteSheet().getSubimage(x, y, width, height)); | |
x += width; | |
if (x >= width * cols) { | |
x = 0; | |
y += height; | |
} | |
} | |
return this.spriteImages; | |
} | |
} | |
public static void main(String[] args){ | |
Producer producer = new Producer(); | |
producer.startProducer(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ballistic.kafka.pojo; | |
import com.google.gson.Gson; | |
public enum Status { | |
NEW("New"), SECOND("Second"); | |
private String value; | |
Status(String value) { this.value = value; } | |
public String getValue( ) { return value; } | |
@Override | |
public String toString() { return new Gson().toJson(this); } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Under----Working