Last active
April 8, 2022 01:18
-
-
Save dibyanshusinha/adf692ab4d163f90804aeb0009c077d2 to your computer and use it in GitHub Desktop.
Java refer to
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
A class is a template for creating a particular form of object. | |
class Entry { | |
/* fields */ | |
String name; | |
String address; | |
String phone; | |
/* constructor */ is called Implicitly | |
Entry() { // | |
super() // Implicit | |
} | |
/* accessors -- getter setter*/ | |
} | |
static keyword -- Basically means that the variable will have same value for | |
all instances of the class changing the value will change the value of the field for all objects | |
It can be accessed directly using class name or object name | |
A class can have another class | |
Final keyword - | |
var -> Constants | |
method -> can't be overriden | |
class --> can't extend it | |
# Inhertance --> | |
childclass extends parentclass{ | |
} | |
childclass object when created calls childclass constructer -- | |
everyclass constructer has super() inbuilt | |
so if the child class has parent super calls it's parent's constructor. | |
SUPER represents parent | |
Override | |
childclass can have the same method as it's parent class and child class method with samename will have precedence | |
we should use @override so that it can help us with compile time error for ex parent has show() and child has show1() | |
but if we would have written override with child.show1 compiler will know that we wanted to override but since | |
the parent class doesn't have show1 it will throw an error. | |
Encapsulation ---> have getters and setters all vars inside be private | |
# Abstract Class --> Abstract class has methods defined. Marking abstract means no one can create direct object of the class. | |
To create object or use it we will need to inherit the abstract class using extends. | |
A method can be abstract if we don't need the implementation there inside the abstract class | |
Advantage would be to have a pattern and gurantee that anyone using that abstract class child's object will have to use and implement those methods | |
We can create refernece of Abstract class. | |
ex - Number is Abstract calss Integer is a class that extends Number. | |
So we can do this --> Number n = new Integer(3) but new Number is not available | |
### Inherit class use extends keyword, inherting Interface use implemets keywords | |
# Interface --> | |
Interface is similar to Abstract but method defination is not present just declaration (Method can be defined from 1.8) | |
All methods by default public and abstract | |
class Test extends Parent1, Parent2 --> Multiple inhertance support | |
Prefer Interface | |
Abstract|Interface helps in achieving Genralisation patterns or add restrictions in a way for ex you have to implement the methods in interface | |
Interface Types - | |
Single - One method, -- > Also called functional interface (Java 8 -(1.8)) ex lambda function | |
Normal - many methods, | |
marker - no methods ex - serializable interface | |
Annonymus class -- | |
Sometimes just to change the behaviour of a class method without changing the class we need to inherit the class and change the behaviour in subclass. | |
However just to change that behaviour we have created a useless class. We can achieve the same thing using annonymus class. | |
ex | |
A obj = new A(){ | |
//override the method needed | |
public void show(){ | |
//do someother things | |
} | |
} | |
obj.show() | |
// can be helpful for one time use change | |
Interface with anonymous can be handy and save memory | |
#Diff | |
1) Abstract class can have abstract and non-abstract methods. Interface can have only abstract methods. Since Java 8, it can have default and static methods also. | |
2) Abstract class doesn't support multiple inheritance. Interface supports multiple inheritance. | |
3) Abstract class can have final, non-final, static and non-static variables. Interface has only static and final variables. | |
4) Abstract class can provide the implementation of interface. Interface can't provide the implementation of abstract class. | |
5) The abstract keyword is used to declare abstract class. The interface keyword is used to declare interface. | |
6) An abstract class can extend another Java class and implement multiple Java interfaces. An interface can extend another Java interface only. | |
7) An abstract class can be extended using keyword "extends". An interface can be implemented using keyword "implements". | |
8) A Java abstract class can have class members like private, protected, etc. Members of a Java interface are public by default. | |
9)Example: | |
public abstract class Shape{ | |
public abstract void draw(); | |
} Example: | |
public interface Drawable{ | |
void draw(); | |
} | |
supports Varargs method(int ... a, string b) // method(1, 2, 3, 4, "b") | |
Important datatype for me -- | |
List mylist = some value to be stored in list | |
Map mymap = new HashMap(); | |
Map<String(Key), String(Value)> mymap = new HashMap<>(); // make typesafe | |
Collection values = new ArrayList(); // default can save anything | |
Collection<Integer> values = new ArrayList<>(); | |
Collectors.stream() gives us js like features sort filter group etc.. supports labda | |
Set set = new TreeSet -- will have unique elements using Treeset it will be always sorted | |
# list-loops | |
List<String> crunchifyList = new ArrayList<String>(); | |
// Other way to define list is - we will not use this list :) | |
List<String> crunchifyListNew = Arrays.asList("Facebook", "Paypal", "Google", "Yahoo"); | |
// New Enhanced For loop | |
System.out.println("\n==============> 2. New Enhanced For loop Example.."); | |
for (String temp : crunchifyList) { | |
System.out.println(temp); | |
} | |
// Iterator - Returns an iterator over the elements in this list in proper sequence. | |
System.out.println("\n==============> 3. Iterator Example..."); | |
Iterator<String> crunchifyIterator = crunchifyList.iterator(); | |
while (crunchifyIterator.hasNext()) { | |
System.out.println(crunchifyIterator.next()); | |
} | |
// ListIterator - traverse a list of elements in either forward or backward order | |
// An iterator for lists that allows the programmer to traverse the list in either direction, modify the list during iteration, | |
// and obtain the iterator's current position in the list. | |
System.out.println("\n==============> 4. ListIterator Example..."); | |
ListIterator<String> crunchifyListIterator = crunchifyList.listIterator(); | |
while (crunchifyListIterator.hasNext()) { | |
System.out.println(crunchifyListIterator.next()); | |
} | |
// while loop | |
System.out.println("\n==============> 5. While Loop Example...."); | |
int i = 0; | |
while (i < crunchifyList.size()) { | |
System.out.println(crunchifyList.get(i)); | |
i++; | |
} | |
// Iterable.forEach() util: Returns a sequential Stream with this collection as its source | |
System.out.println("\n==============> 6. Iterable.forEach() Example...."); | |
crunchifyList.forEach((temp) -> { | |
System.out.println(temp); | |
}); | |
// collection Stream.forEach() util: Returns a sequential Stream with this collection as its source | |
System.out.println("\n==============> 7. Stream.forEach() Example...."); | |
crunchifyList.stream().forEach((crunchifyTemp) -> System.out.println(crunchifyTemp)); | |
} | |
# ASYNC PROGRAMMING | |
## Async with Completable Future | |
runAsync takes Runnable as input parameter and returns CompletableFuture<Void>, which means it does not return any result. | |
CompletableFuture<Void> run = CompletableFuture.runAsync(()-> System.out.println("hello")); | |
But suppyAsync takes Supplier as argument and returns the CompletableFuture<U> with result value, which means it does not take any input parameters but it returns result as output. | |
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> { | |
System.out.println("Hello"); | |
return "result"; | |
}); | |
System.out.println(supply.get()); //result | |
Conclusion : So if you want the result to be returned, then choose supplyAsync or if you just want to run an async action, then choose runAsync. | |
------------------------ | |
CompletableFuture<Integer> completableFutureResponse = CompletableFuture.supplyAsync(()->factorial(5)); | |
completableFutureResponse.thenApplyAsync(result->{ | |
String threadname = Thread.currentThread.getName(); | |
System.out.println("async -> in thread "+ threadname + " final result is " + result ); | |
return result * 10; | |
}).thenAcceptAsync(res->{ | |
String threadname = Thread.currentThread.getName(); | |
System.out.println("async -> in thread "+ threadname + " final result is " + res ); | |
return result * 10; | |
}) | |
------------------------ | |
You can also pass the Runnable object in the form of a lambda expression - | |
// Using Lambda Expression | |
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { | |
// Simulate a long-running Job | |
try { | |
TimeUnit.SECONDS.sleep(1); | |
} catch (InterruptedException e) { | |
throw new IllegalStateException(e); | |
} | |
System.out.println("I'll run in a separate thread than the main thread."); | |
}); | |
------------------------ | |
// Using Lambda Expression | |
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { | |
try { | |
TimeUnit.SECONDS.sleep(1); | |
} catch (InterruptedException e) { | |
throw new IllegalStateException(e); | |
} | |
return "Result of the asynchronous computation"; | |
}); | |
------------------------ | |
// Using Lambda Expression | |
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { | |
try { | |
TimeUnit.SECONDS.sleep(1); | |
} catch (InterruptedException e) { | |
throw new IllegalStateException(e); | |
} | |
return "Result of the asynchronous computation"; | |
}); | |
future.get(); | |
------------------------ | |
Executor executor = Executors.newFixedThreadPool(10); | |
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { | |
try { | |
TimeUnit.SECONDS.sleep(1); | |
} catch (InterruptedException e) { | |
throw new IllegalStateException(e); | |
} | |
return "Result of the asynchronous computation"; | |
}, executor); | |
------------------------ | |
sequence of transformations on the CompletableFuture | |
CompletableFuture<String> welcomeText = CompletableFuture.supplyAsync(() -> { | |
try { | |
TimeUnit.SECONDS.sleep(1); | |
} catch (InterruptedException e) { | |
throw new IllegalStateException(e); | |
} | |
return "Rajeev"; | |
}).thenApply(name -> { | |
return "Hello " + name; | |
}).thenApply(greeting -> { | |
return greeting + ", Welcome to the Blog"; | |
}); | |
System.out.println(welcomeText.get()); | |
// Prints - Hello Rajeev, Welcome to the Blog | |
------------------------ | |
If you don’t want to return anything from your callback function and just want to run some piece of code after the completion of the Future, then you can use thenAccept() and thenRun() methods. These methods are consumers and are often used as the last callback in the callback chain. | |
CompletableFuture.thenAccept() takes a Consumer<T> and returns CompletableFuture<Void>. It has access to the result of the CompletableFuture on which it is attached. | |
// thenAccept() example | |
CompletableFuture.supplyAsync(() -> { | |
return ProductService.getProductDetail(productId); | |
}).thenAccept(product -> { | |
System.out.println("Got product detail from remote service " + product.getName()) | |
}); | |
While thenAccept() has access to the result of the CompletableFuture on which it is attached, thenRun() doesn’t even have access to the Future’s result. It takes a Runnable and returns CompletableFuture<Void> - | |
// thenRun() example | |
CompletableFuture.supplyAsync(() -> { | |
// Run some computation | |
}).thenRun(() -> { | |
// Computation Finished. | |
}); | |
1. CompletableFuture.allOf() | |
CompletableFuture.allOf is used in scenarios when you have a List of independent futures that you want to run in parallel and do something after all of them are complete. | |
Let’s say that you want to download the contents of 100 different web pages of a website. You can do this operation sequentially but this will take a lot of time. So, you have written a function which takes a web page link, and returns a CompletableFuture, i.e. It downloads the web page’s content asynchronously - | |
CompletableFuture<String> downloadWebPage(String pageLink) { | |
return CompletableFuture.supplyAsync(() -> { | |
// Code to download and return the web page's content | |
}); | |
} | |
Now, when all the web pages are downloaded, you want to count the number of web pages that contain a keyword - ‘CompletableFuture’. Let’s use CompletableFuture.allOf() to achieve this - | |
List<String> webPageLinks = Arrays.asList(...) // A list of 100 web page links | |
// Download contents of all the web pages asynchronously | |
List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream() | |
.map(webPageLink -> downloadWebPage(webPageLink)) | |
.collect(Collectors.toList()); | |
// Create a combined Future using allOf() | |
CompletableFuture<Void> allFutures = CompletableFuture.allOf( | |
pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()]) | |
); | |
The problem with CompletableFuture.allOf() is that it returns CompletableFuture<Void>. But we can get the results of all the wrapped CompletableFutures by writing few additional lines of code - | |
// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list - | |
CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> { | |
return pageContentFutures.stream() | |
.map(pageContentFuture -> pageContentFuture.join()) | |
.collect(Collectors.toList()); | |
}); | |
Take a moment to understand the above code snippet. Since we’re calling future.join() when all the futures are complete, we’re not blocking anywhere :-) | |
The join() method is similar to get(). The only difference is that it throws an unchecked exception if the underlying CompletableFuture completes exceptionally. | |
Let’s now count the number of web pages that contain our keyword - | |
// Count the number of web pages having the "CompletableFuture" keyword. | |
CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> { | |
return pageContents.stream() | |
.filter(pageContent -> pageContent.contains("CompletableFuture")) | |
.count(); | |
}); | |
System.out.println("Number of Web Pages having CompletableFuture keyword - " + | |
countFuture.get()); | |
//Other ex | |
List<CompletableFuture<String>> futuresList = | |
someList.stream().map(elem -> CompletableFuture.supplyAsync(() -> { return "stringval"}).collect(Collectors.toList()); | |
return futuresList.stream().map(CompletableFuture::join).collect(Collectors.toList()) | |
//Snippet for list - working | |
CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[0])) | |
.thenApply( | |
v -> futuresList.stream().map(CompletableFuture::join).collect(Collectors.toList())) | |
.join(); | |
//Works neatly | |
CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[0])); //thisline can be ommited and still works ? | |
return futuresList.stream().map(CompletableFuture::join).collect(Collectors.toList()); | |
//Needs testing but neater | |
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])) | |
// avoid throwing an exception in the join() call | |
.exceptionally(ex -> null) | |
.join(); | |
Map<Boolean, List<CompletableFuture<RoomTypes.RoomType>>> result = | |
completableFutures.stream() | |
.collect(Collectors.partitioningBy(CompletableFuture::isCompletedExceptionally))); | |
CompletableFuture.anyOf() | |
CompletableFuture.anyOf() as the name suggests, returns a new CompletableFuture which is completed when any of the given CompletableFutures complete, with the same result. | |
Consider the following example - | |
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { | |
try { | |
TimeUnit.SECONDS.sleep(2); | |
} catch (InterruptedException e) { | |
throw new IllegalStateException(e); | |
} | |
return "Result of Future 1"; | |
}); | |
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { | |
try { | |
TimeUnit.SECONDS.sleep(1); | |
} catch (InterruptedException e) { | |
throw new IllegalStateException(e); | |
} | |
return "Result of Future 2"; | |
}); | |
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { | |
try { | |
TimeUnit.SECONDS.sleep(3); | |
} catch (InterruptedException e) { | |
throw new IllegalStateException(e); | |
} | |
return "Result of Future 3"; | |
}); | |
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3); | |
System.out.println(anyOfFuture.get()); // Result of Future 2 | |
In the above example, the anyOfFuture is completed when any of the three CompletableFutures complete. Since future2 has the least amount of sleep time, it will complete first, and the final result will be - Result of Future 2. | |
CompletableFuture.anyOf() takes a varargs of Futures and returns CompletableFuture<Object>. The problem with CompletableFuture.anyOf() is that if you have CompletableFutures that return results of different types, then you won’t know the type of your final CompletableFuture. | |
CompletableFuture Exception Handling | |
We explored How to create CompletableFuture, transform them, and combine multiple CompletableFutures. Now let’s understand what to do when anything goes wrong. | |
Let’s first understand how errors are propagated in a callback chain. Consider the following CompletableFuture callback chain - | |
CompletableFuture.supplyAsync(() -> { | |
// Code which might throw an exception | |
return "Some result"; | |
}).thenApply(result -> { | |
return "processed result"; | |
}).thenApply(result -> { | |
return "result after further processing"; | |
}).thenAccept(result -> { | |
// do something with the final result | |
}); | |
If an error occurs in the original supplyAsync() task, then none of the thenApply() callbacks will be called and future will be resolved with the exception occurred. If an error occurs in first thenApply() callback then 2nd and 3rd callbacks won’t be called and the future will be resolved with the exception occurred, and so on. | |
Handle exceptions using exceptionally() callback | |
The exceptionally() callback gives you a chance to recover from errors generated from the original Future. You can log the exception here and return a default value. | |
Integer age = -1; | |
CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> { | |
if(age < 0) { | |
throw new IllegalArgumentException("Age can not be negative"); | |
} | |
if(age > 18) { | |
return "Adult"; | |
} else { | |
return "Child"; | |
} | |
}).exceptionally(ex -> { | |
System.out.println("Oops! We have an exception - " + ex.getMessage()); | |
return "Unknown!"; | |
}); | |
System.out.println("Maturity : " + maturityFuture.get()); | |
Note that, the error will not be propagated further in the callback chain if you handle it once. | |
2. Handle exceptions using the generic handle() method | |
The API also provides a more generic method - handle() to recover from exceptions. It is called whether or not an exception occurs. | |
Integer age = -1; | |
CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> { | |
if(age < 0) { | |
throw new IllegalArgumentException("Age can not be negative"); | |
} | |
if(age > 18) { | |
return "Adult"; | |
} else { | |
return "Child"; | |
} | |
}).handle((res, ex) -> { | |
if(ex != null) { | |
System.out.println("Oops! We have an exception - " + ex.getMessage()); | |
return "Unknown!"; | |
} | |
return res; | |
}); | |
System.out.println("Maturity : " + maturityFuture.get()); | |
If an exception occurs, then the res argument will be null, otherwise, the ex argument will be null. | |
Consider the following example - | |
CompletableFuture.supplyAsync(() -> { | |
try { | |
TimeUnit.SECONDS.sleep(1); | |
} catch (InterruptedException e) { | |
throw new IllegalStateException(e); | |
} | |
return "Some Result" | |
}).thenApply(result -> { | |
/* | |
Executed in the same thread where the supplyAsync() task is executed | |
or in the main thread If the supplyAsync() task completes immediately (Remove sleep() call to verify) | |
*/ | |
return "Processed Result" | |
}) | |
In the above case, the task inside thenApply() is executed in the same thread where the supplyAsync() task is executed, or in the main thread if the supplyAsync() task completes immediately (try removing sleep() call to verify). | |
To have more control over the thread that executes the callback task, you can use async callbacks. If you use thenApplyAsync() callback, then it will be executed in a different thread obtained from ForkJoinPool.commonPool() - | |
CompletableFuture.supplyAsync(() -> { | |
return "Some Result" | |
}).thenApplyAsync(result -> { | |
// Executed in a different thread from ForkJoinPool.commonPool() | |
return "Processed Result" | |
}) | |
Moreover, If you pass an Executor to the thenApplyAsync() callback then the task will be executed in a thread obtained from the Executor’s thread pool. | |
Executor executor = Executors.newFixedThreadPool(2); | |
CompletableFuture.supplyAsync(() -> { | |
return "Some result" | |
}).thenApplyAsync(result -> { | |
// Executed in a thread obtained from the executor | |
return "Processed Result" | |
}, executor); | |
------------------------ | |
## with @Async - (to use this add @EnableAsync on a configuration class) | |
@Async | |
public CompletableFuture<Integer> asyncFactorial(int number){ | |
Integer fact = 1; | |
for(int i - 1; i <= number; i ++){fact = fact * i} | |
String threadname = Thread.currentThread.getName(); | |
System.out.println("in factorial the thread is "+ threadname + "\n" ); | |
return CompletableFuture.completedFuture(fact); | |
} | |
---- Spring with events(Bonus for later TBC) -- | |
#DAO - SpringORM | |
@Component | |
public class MovieDao{ | |
@Autowired | |
private SessionFactory s; | |
@Transactional | |
public List<Movie> getMovies(){ | |
Session session = s.getCurrentSession(); | |
List<Movie> movies = session.createQuery("from Movie", Movie.class).list(); | |
return movies | |
} | |
@Transactional | |
public addMovie(Movie m){ | |
Session session = s.getCurrentSession(); | |
session.save(m) | |
} | |
@Transactional | |
public getMovie(int id){ | |
Session session = s.getCurrentSession(); | |
session.get(Movie.class, id) | |
} | |
} | |
### Usage | |
//Some class | |
@Autowired | |
MovieDao dao; | |
//some method | |
dao.getMovies() | |
dao.addMovie(m) | |
dao.getMovie(123) | |
#JDBC | |
@Repository | |
public class ProductJdbcDAO { | |
String CHECK_PRODUCTS_EXSIST = "select PRODUCT_ID from PRODUCTS where STATUS = 'ACTIVE'; | |
List<Product> productList = jdbcTemplate.query(CHECK_POLICY_PRODUCTS_EXSIST, (resultSet, rownum)->{ | |
Product product = new Product(); | |
product.setProduct_id(resultSet.getInt("PRODUCT_ID")); | |
return product; | |
}); | |
or | |
List<Product> productList = jdbcTemplate.query(CHECK_PRODUCTS_EXSIST, BeanPropertyRowMapper.newInstance(Product.class)); | |
System.out.println("\n Products " + productList.toString()); | |
String sql = "Insert into product_table(PRODUCT_ID, STATUS) values (?, ?)" | |
int rows_affected = jdbcTemplate.update(sql, value1, value2); | |
Optional<Product> get(int id){ | |
sql = "select * from PRODUCTS where id = ?" | |
Product p = null; | |
try{ | |
p = jdbcTemplate.queryForObject(sql, new Object[]{id}, (resultSet, rownum)->{}) | |
}catch (DataAccessException e){ | |
//log e | |
} | |
return Optional.ofNullable(p) | |
} | |
//int rowsaffected = jdbcTemplate.update() //hq query | |
} | |
String INSERT_QUERY = "INSERT INTO ABC (COLNAME) VALUES(?)" | |
jdbcTemplate.batchUpdate( | |
INSERT_QUERY, | |
new BatchPreparedStatementSetter() { | |
@Override | |
public void setValues(PreparedStatement ps, int i) throws SQLException { | |
String str = somelist.get(i); | |
ps.setString(1, str); | |
} | |
@Override | |
public int getBatchSize() { | |
return somelist.size(); | |
} | |
}); | |
//insert example | |
public void insert(Customer customer){ | |
String sql = "INSERT INTO CUSTOMER " + | |
"(CUST_ID, NAME, AGE) VALUES (?, ?, ?)"; | |
getSimpleJdbcTemplate().update(sql, customer.getCustId(), | |
customer.getName(),customer.getAge() | |
); | |
} | |
//example 2 | |
jdbcTemplate.update( | |
INSERT_QUERY, | |
new Object[]{id}); | |
//insert with named parameter | |
public void insertNamedParameter(Customer customer){ | |
String sql = "INSERT INTO CUSTOMER " + | |
"(CUST_ID, NAME, AGE) VALUES (:custId, :name, :age)"; | |
Map<String, Object> parameters = new HashMap<String, Object>(); | |
parameters.put("custId", customer.getCustId()); | |
parameters.put("name", customer.getName()); | |
parameters.put("age", customer.getAge()); | |
getSimpleJdbcTemplate().update(sql, parameters); | |
} | |
//insert with named parameter 2 | |
MapSqlParameterSource params = new MapSqlParameterSource(); | |
params.addValue("id", id); | |
namedParameterJdbcTemplate.update( "INSERT INTO CUSTOMER (CUST_ID) VALUES (:id), params); | |
//insert batch example | |
public void insertBatch(final List<Customer> customers){ | |
String sql = "INSERT INTO CUSTOMER " + | |
"(CUST_ID, NAME, AGE) VALUES (?, ?, ?)"; | |
List<Object[]> parameters = new ArrayList<Object[]>(); | |
for (Customer cust : customers) { | |
parameters.add(new Object[] {cust.getCustId(), cust.getName(), cust.getAge()}); | |
} | |
getSimpleJdbcTemplate().batchUpdate(sql, parameters); | |
} | |
//insert batch | |
jdbcTemplate.batchUpdate( | |
"INSERT INTO CUSTOMER " + | |
"(CUST_ID, NAME, AGE) VALUES (?, ?, ?)", | |
new BatchPreparedStatementSetter() { | |
@Override | |
public void setValues(PreparedStatement ps, int i) throws SQLException { | |
Customer c = customers.get(i); | |
ps.setString(1, id); | |
ps.setString(2,name); | |
ps.setInt(3, age); | |
} | |
//insert batch with named parameter | |
public void insertBatchNamedParameter(final List<Customer> customers){ | |
String sql = "INSERT INTO CUSTOMER " + | |
"(CUST_ID, NAME, AGE) VALUES (:custId, :name, :age)"; | |
List<SqlParameterSource> parameters = new ArrayList<SqlParameterSource>(); | |
for (Customer cust : customers) { | |
parameters.add(new BeanPropertySqlParameterSource(cust)); | |
} | |
getSimpleJdbcTemplate().batchUpdate(sql, | |
parameters.toArray(new SqlParameterSource[0])); | |
} | |
//insert batch with named parameter | |
public void insertBatchNamedParameter2(final List<Customer> customers){ | |
SqlParameterSource[] params = SqlParameterSourceUtils.createBatch(customers.toArray()); | |
getSimpleJdbcTemplate().batchUpdate( | |
"INSERT INTO CUSTOMER (CUST_ID, NAME, AGE) VALUES (:custId, :name, :age)", | |
params); | |
} | |
//insert batch example with SQL | |
public void insertBatchSQL(final String sql){ | |
getJdbcTemplate().batchUpdate(new String[]{sql}); | |
} | |
//query single row with ParameterizedRowMapper | |
public Customer findByCustomerId(int custId){ | |
String sql = "SELECT * FROM CUSTOMER WHERE CUST_ID = ?"; | |
Customer customer = getSimpleJdbcTemplate().queryForObject( | |
sql, new CustomerParameterizedRowMapper(), custId); | |
return customer; | |
} | |
//query single row with ParameterizedBeanPropertyRowMapper (Customer.class) | |
public Customer findByCustomerId2(int custId){ | |
String sql = "SELECT * FROM CUSTOMER WHERE CUST_ID = ?"; | |
Customer customer = getSimpleJdbcTemplate().queryForObject( | |
sql,ParameterizedBeanPropertyRowMapper.newInstance(Customer.class), custId); | |
return customer; | |
} | |
//query mutiple rows with ParameterizedBeanPropertyRowMapper (Customer.class) | |
public List<Customer> findAll(){ | |
String sql = "SELECT * FROM CUSTOMER"; | |
List<Customer> customers = | |
getSimpleJdbcTemplate().query(sql, ParameterizedBeanPropertyRowMapper.newInstance(Customer.class)); | |
return customers; | |
} | |
//query mutiple rows with ParameterizedBeanPropertyRowMapper (Customer.class) | |
public List<Customer> findAll2(){ | |
String sql = "SELECT * FROM CUSTOMER"; | |
List<Customer> customers = | |
getSimpleJdbcTemplate().query(sql, ParameterizedBeanPropertyRowMapper.newInstance(Customer.class)); | |
return customers; | |
} | |
public String findCustomerNameById(int custId){ | |
String sql = "SELECT NAME FROM CUSTOMER WHERE CUST_ID = ?"; | |
String name = getSimpleJdbcTemplate().queryForObject( | |
sql, String.class, custId); | |
return name; | |
} | |
public int findTotalCustomer(){ | |
String sql = "SELECT COUNT(*) FROM CUSTOMER"; | |
int total = getSimpleJdbcTemplate().queryForInt(sql); | |
return total; | |
} | |
// @Repository // Entity is what gets stored in a database. A repository is what interacts with a database (there’s a difference). | |
# File --> MovieDAO.class | |
@Entity | |
@Table(name = "MOVIE") | |
public class Movie { // can implement Serializable, Cloneable | |
@Id | |
//@GeneratedValue(strategy= GenerationType.SEQUENCE,generator="SOME_SEQ") | |
//@SequenceGenerator(name="SOME_SEQ",sequenceName="SOME_SEQ", allocationSize=1) | |
@Column(name = "ID",nullable = false) | |
private Long id; | |
@Lob | |
@Column(name = "DETAILS") | |
private byte[] details; | |
private String movieName; | |
private String language; | |
// standard constructor, getters, setters | |
} | |
public interface MovieDAO<ENTITY> { | |
List<ENTITY> queryForMovies(); | |
} | |
#Hibernate - EntityManager | |
###Usage | |
@Repository("MovieDAO") | |
public class MovieDAOImpl implements MovieDAO<Movie> { | |
@Inject | |
Configuration config; // org.apache.commons.configuration | |
// @PersistenceContext(unitName = "dbalias") | |
@PersistenceContext | |
protected EntityManager em; | |
1. | |
em.getTransaction().begin(); | |
em.persist(product); | |
em.getTransaction().commit(); | |
2. | |
@Transactional(value = "TxnManager_pricing", propagation = "required") //saves commit etc.. | |
public Movie getMovie(Long movieId) { | |
EntityManager em = getEntityManager(); | |
Movie movie = em.find(Movie.class, new Long(movieId)); | |
// em.persist(movie); // em.detach(movie); em.merge(movie); em.detach(movie); | |
return movie; | |
} | |
public List<Movie> queryForMovies() { | |
EntityManager em = getEntityManager(); | |
List<Movie> movies = em.createNativeQuery("SELECT movie from Movie movie where movie.language = ?1") //or can use createQuery | |
.setParameter(1, "English") | |
.getResultList(); | |
return movies; | |
} | |
} | |
#JPA | |
###usage | |
//someclass | |
@Autowired | |
ProductRepo repo | |
//some method | |
repo.save(product) | |
repo.getOne(id) | |
interface ProductRepo extends JPARepository <Movie, IdType>{ | |
// List<Movie> findByLanguage(String language) //DSL Qury format method name | |
@Query("SQL|JPAQUERY|HQL...") | |
//JPA Query example --> "from Movie where column= :value" //Namedjdbctemplate query | |
// List<Movie> customMethod(@Param("value") String language) | |
} | |
# Validation | |
public class User { | |
@NotNull(message = "Name cannot be null") | |
private String name; | |
@AssertTrue | |
private boolean working; | |
@Size(min = 10, max = 200, message | |
= "About Me must be between 10 and 200 characters") | |
private String aboutMe; | |
@Min(value = 18, message = "Age should not be less than 18") | |
@Max(value = 150, message = "Age should not be greater than 150") | |
private int age; | |
@Email(message = "Email should be valid") | |
private String email; | |
@JsonProperty("vertical_industries") | |
@Valid // validates the type and the constraints inside it | |
@NotNull(message = "MISSING_REQUIRED_PARAMETER") | |
@Size(min=1, max=50, message = "INVALID_ARRAY_LENGTH") | |
private List<Mytype> numbers; | |
// standard setters and getters | |
// can alao be used List<@NotBlank String> preferences; | |
} | |
##Usage | |
1. @Valid User user | |
2. | |
ValidatorFactory factory = Validation.buildDefaultValidatorFactory(); | |
Validator validator = factory.getValidator(); | |
Set<ConstraintViolation<User>> violations = validator.validate(user); | |
for (ConstraintViolation<User> violation : violations) { | |
log.error(violation.getMessage()); | |
} | |
----------------------------------------------------------------------------------------------------------------------------- | |
PAGINATION | |
----------------------------------------------------------------------------------------------------------------------------- | |
Basic Approach - USING OFFSET - not efficient | |
A | |
Navigate Only with prev - next | |
SQL - PAGINATION WITHOUT LIMIT - SELECT ID, NAME, TYPE FROM ( SELECT ROW_NUMBER() OVER (ORDER BY TIME_UPDATED DESC) as row_id, TABLENAME.* | |
FROM TABLENAME) where row_id between OFFSET+1 and OFFSET+LIMIT; | |
Pagination - | |
if (pageNumber == null || pageNumber < 1) pageNumber = 1; | |
if (pageSize == null || pageSize < 1) pageSize = 100; | |
int offset = (pageNumber - 1) * pageSize; | |
B | |
Real pagination needs total count to | |
There are essentially two options for implementing page-count) this without risking to run out of heap memory: | |
a - Use a separate query to fetch the count of the result, | |
b - Use a combined query to fetch both with native SQL (where available). | |
Both options will consume no additional memory on the application server. However, they have different benefits and drawbacks given the following quality goals: | |
Best solution for b | |
Using CTE to count all records | |
WITH Data_CTE | |
AS | |
( | |
SELECT [name], object_id | |
FROM sys.all_objects | |
--WHERE [name] LIKE 'fn_%' | |
), | |
Count_CTE | |
AS | |
( | |
SELECT COUNT(*) AS TotalRows FROM Data_CTE | |
) | |
SELECT * | |
FROM Data_CTE | |
CROSS JOIN Count_CTE | |
ORDER BY [name] | |
OFFSET (@PageNum - 1) * @PageSize ROWS | |
FETCH NEXT @PageSize ROWS ONLY; | |
Note: | |
Offset has a big issue by default | |
Explained : | |
Consider the senario of data in db with index mentioned below | |
21 | |
15 27 | |
13 17 24 28 | |
10 22 25 | |
SELECT * FROM my_table ORDER BY id LIMIT 5 | |
DB scans | |
21 | |
15 | |
13 17 | |
10 | |
and returns. 10, 13, 15, 17, 21 | |
For page 2 | |
SELECT * FROM my_table ORDER BY id LIMIT 5 OFFSET 5 | |
DB scans | |
21 | |
15 27 | |
13 17 24 28 | |
10 22 25 | |
and returns - 22, 24, 25, 27, 28 | |
We still had to go through all the leafs in order to find the last 5. | |
This is because there is no way for the RDBMS to know where it needs to start. | |
So it has to find the first 5 elements in order to find the next 5 elements. | |
the way offset is designed: | |
the rows are first sorted according to the <order by clause> and | |
then limited by dropping the number of rows specified in the <result offset clause> from the beginning. | |
Lets also note that some storage solutions such as | |
Elasticsearch will even prevent you from querying a important offset for this same reason. | |
To SOLVE THE INEEFECIENCY :- | |
Now imagine a world without these problems. As it turns out, living without offset is quite simple: just use a where clause that selects only data you haven’t seen yet. | |
For that, we exploit the fact that we work on an ordered set—you do have an order by clause, ain’t you? Once there is a definite sort order, we can use a simple filter to only select what follows the entry we have seen last: | |
SELECT ... | |
FROM ... | |
WHERE ... | |
AND id < ?last_seen_id | |
ORDER BY id DESC | |
FETCH FIRST 10 ROWS ONLY | |
This is the basic recipe. It gets more interesting when sorting on multiple columns, but the idea is the same. This recipe is also applicable to many NoSQL systems. | |
This approach—called seek method or keyset pagination—solves the problem of drifting results | |
eg: | |
SELECT * FROM my_table WHERE id > 21 ORDER BY id LIMIT 5 | |
DB scans | |
27 | |
24 28 | |
22 25 | |
and returns - 22, 24, 25, 27, 28 | |
notice we didn't have to check for or even scan values 10, 13, 15, 17, 21 | |
since the column was indexed...db just read the values greater than last | |
scanned id (21) in our case as indexed columns were (binary) sorted | |
Note: | |
limitations: most notably that you cannot directly navigate to arbitrary pages. | |
However, this is not a problem when using infinite scrolling. | |
Showing page number to click on is a poor navigation interface anyway—IMHO. | |
Examples of this approach | |
Sorting per date | |
If you are sorting per date there always is a risk that multiple rows have the same date. | |
If those rows are just between 2 pages you are going to have an issue not loading some of those rows on neither page. | |
To fix this you need to order on a second primary field. | |
SELECT * | |
FROM my_table | |
WHERE (update_date = '2017-12-21' AND id > 21) | |
OR update_date > '2017-12-21' | |
ORDER BY update_date,id LIMIT 5 | |
Basically we are fetching all the rows that has the same update date as the last row on the previous page, | |
but whose id is different. And also rows that were updated later. This of course make the query more complicated. | |
Keyset pagination’s only limitations is jumping on a specific page. But that’s is in my opinion rarely needed. With proper UX/Design it can be avoided. | |
Keyset pagination has no size limit, you can query in a 100 million row table and if you have the right indexes all the pages will be fast. | |
So in my opinion there is no reason not to use it instead of what we are more used of doing. | |
We should avoid using offset as much as possible and only use it when absolutely necessary and putting proper protection in place. | |
For ex if arbitrarily page-number is a must | |
Use B - a or B - b | |
To achieve B-b's best sol is alread there | |
----------------------------------------------------------------------------------------------------------------------------- | |
KAFKA RELATED | |
----------------------------------------------------------------------------------------------------------------------------- | |
Exercise: Single Node Single Broker | |
Configuration | |
In this exercise you will run basic Kafka operations. | |
Duration: 30 mins | |
Procedure | |
First let us start implementing single node-single broker configuration . | |
Hopefully you would have installed Java on your machine by now. Before moving to the | |
Kafka Cluster Setup, first you would need to start your ZooKeeper because Kafka | |
Cluster uses ZooKeeper. | |
Start ZooKeeper | |
Open a new terminal and type the following command – | |
$ cd ~/confluent-6.1.2 | |
bin/zookeeper-server-start etc/kafka/zookeeper.properties | |
Open a new terminal and type the following command – | |
$ cd ~/confluent-6.1.2 | |
To start Kafka Broker, type the following command − | |
bin/kafka-server-start etc/kafka/server.properties | |
NOTE: | |
Before starting broker, comment below lines in server.properties | |
#confluent.balancer.enable=true | |
Single Node-Single Broker Configuration | |
In this configuration you have a single ZooKeeper and broker id instance. Following are | |
the steps to configure it − | |
Creating a Kafka Topic − Kafka provides a command line utility named kafkatopics.sh | |
to create topics on the server. Open new terminal and type the below example. | |
Syntax | |
bin/kafka-topics --create --zookeeper localhost:2181 -replication- | |
factor 1 --partitions 1 --topic topic-name | |
Example | |
bin/kafka-topics --create --zookeeper localhost:2181 -replication- | |
factor 1 --partitions 1 --topic Hello-Kafka | |
We just created a topic named Hello-Kafka with a single partition and one replica | |
factor. | |
The above created output will be similar to the following output − | |
Output − Created topic Hello-Kafka | |
Once the topic has been created, you can get the notification in Kafka broker terminal | |
window and the log for the created topic specified in “/tmp/kafka-logs/“ in the | |
etc/kafka/server.properties file. | |
List of Topics | |
To get a list of topics in Kafka server, you can use the following command − | |
Syntax | |
bin/kafka-topics --list --zookeeper localhost:2181 | |
Output | |
Hello-Kafka | |
Since we have created a topic, it will list out Hello-Kafka only. Suppose, if you create | |
more than one topics, you will get the topic names in the output. | |
Start Producer to Send Messages | |
Syntax | |
bin/kafka-console-producer --broker-list localhost:9092 --topic | |
topic-name | |
From the above syntax, two main parameters are required for the producer command | |
line client − | |
Broker-list − The list of brokers that we want to send the messages to. In this case we | |
only have one broker. The Config/server.properties file contains broker port id, since | |
we know our broker is listening on port 9092, so you can specify it directly. | |
Topic name − Here is an example for the topic name. | |
Example | |
bin/kafka-console-producer --broker-list localhost:9092 --topic | |
Hello-Kafka | |
The producer will wait on input from stdin and publishes to the Kafka cluster. By | |
default, every new line is published as a new message then the default producer | |
properties are specified in config/producer.properties file. Now you can type a few lines | |
of messages in the terminal as shown below. Output | |
$ bin/kafka-console-producer --broker-list localhost:9092 | |
--topic Hello-Kafka | |
My first message | |
My second message | |
Start Consumer to Receive Messages | |
Similar to producer, the default consumer properties are | |
specified in config/consumer.proper-ties file. Open a new terminal and type the | |
below syntax for consuming messages. | |
Syntax | |
bin/kafka-console-consumer --bootstrap-server localhost:9092 -- | |
topic topic-name --from-beginning | |
Example | |
bin/kafka-console-consumer --bootstrap-server localhost:9092 -- | |
topic Hello-Kafka --from-beginning | |
Output | |
Hello | |
My first message | |
My second message | |
Finally, you are able to enter messages from the producer’s terminal and see them | |
appearing in the consumer’s terminal. As of now, you have a very good understanding | |
on the single node cluster with a single broker. Let us now move on to the multiple | |
brokers configuration. | |
Exercise: Exploring CLI commands | |
List all Kafka Topics | |
kafka-topics --list --zookeeper <IP>:<PORT> | |
Example: | |
bin/kafka-topics --list --zookeeper localhost:2181 | |
Create a new Topic | |
kafka-topics --create --zookeeper <IP>:<PORT> --replication-factor | |
<<number>> --partitions <<number>> --topic <<name>> | |
Example: | |
bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 | |
-partitions 1 --topic tx1 | |
Produce a Message to Kafka Topic | |
kafka-console-producer --broker-list <IP>:<PORT> --topic <<name>> | |
Example: | |
bin/kafka-console-producer --broker-list localhost:9092 --topic tx1 | |
Consume Message(S) from Kafka Topic | |
kafka-console-consumer –bootstrap-server <IP>:<PORT> --topic <<name>> --from- | |
beginning | |
Example: | |
bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic tx1 -- | |
from-beginning | |
Describe all topics | |
bin/kafka-topics --zookeeper localhost:2181 -- | |
describe | |
Describe a Kafka Topic | |
kafka-topics --describe --zookeeper <IP>:<PORT> --topic <<name>> | |
Example: | |
bin/kafka-topics --describe --zookeeper localhost:2181 --topic tx1 | |
bin/kafka-topics --describe --bootstrap-server localhost:9092 -- | |
topic tx1 | |
Alter a Kafka Topic | |
kafka-topics -–zookeeper <IP>:<PORT> --alter --topic <<name>> --config | |
retention.ms=172800000 | |
Example: | |
bin/kafka-topics --zookeeper localhost:2181 --alter --topic tx1 --config | |
retention.ms=172800000 | |
Remove a Kafka Topic | |
Update Server.properties on each broker | |
delete.topic.enable=true # this allow us to delete the topic | |
Re-Start broker | |
kafka-topics -–zookeeper <IP>:<PORT> --delete --topic <<name>> | |
Example: | |
bin/kafka-topics --zookeeper localhost:2181 --delete --topic tx1 | |
bin/kafka-topics --bootstrap-server localhost:9092 --delete --topic tx2 | |
Verify : - marked for deletion | |
bin/kafka-topics --list --zookeeper localhost:2181 | |
Disable Auto create of a Kafka Topic | |
Update Server.properties on each broker | |
auto.create.topics.enable=false # stops autocreation of a topic | |
Example: | |
bin/kafka-console-producer --broker-list localhost:9092 --topic tx1 | |
Notice Error. | |
Increase Partitions of the topic | |
Example: | |
Create a Topic | |
bin/kafka-topics --create --bootstrap-server localhost:9092 --replication- | |
factor 1 -partitions 6 --topic tx2 | |
increase partitions | |
bin/kafka-topics --zookeeper localhost:2181 --alter --topic tx3 --partitions | |
8 | |
The number of partitions for a topic can only be increased, | |
if partitions are increased for a topic that has a key, the | |
partition logic or ordering of the messages will be affected | |
Adding partitions succeeded! | |
Display List consumer groups | |
bin/kafka-consumer-groups --bootstrap-server localhost:9092 --list | |
Describe the consumer group | |
bin/kafka-consumer-groups --bootstrap-server localhost:9092 --describe -- | |
group g3 --members | |
bin/kafka-consumer-groups --bootstrap-server localhost:9092 --describe -- | |
group g2 --members –verbose | |
Producer a Message with a Key and Value | |
bin/kafka-console-producer --broker-list localhost:9092 --topic tx3 -property | |
"parse.key=true" --property "key.separator=:" | |
key1:value1 | |
key2:value2 | |
key3:value3 | |
Producer a Message with a Key and Value | |
bin/kafka-console-consumer --topic tx3 --from-beginning –bootstrap-server | |
localhost:9092 --property print.key=true | |
bin/kafka-console-consumer \ | |
--property 'print.timestamp=true' --property 'print.key=true' \ | |
--property 'print.offset=true' --property 'enable.auto.commit=false' \ | |
--bootstrap-server localhost:9092 --topic monitoring --group GroupA -- | |
from-beginning | |
Exercise: Single Node-Multiple Brokers | |
Configuration | |
Before moving on to the multiple brokers cluster setup, first start your ZooKeeper | |
server. | |
Create Multiple Kafka Brokers − We have one Kafka broker instance already in | |
config/server.properties. Now we need multiple broker instances, so copy the existing | |
server.properties file into two new config files and rename it as server-one.properties | |
and server-two.properties. | |
$ cd etc/kafka | |
$ cp server.properties server-one.properties | |
$ cp server.properties server-two.properties | |
Then edit both new files and assign the following changes | |
etc/kafka/server-one.properties | |
# The id of the broker. This must be set to a unique integer for | |
each broker. | |
broker.id=1 | |
# The port the socket server listens on port=9093 | |
listeners=PLAINTEXT://localhost:9093 | |
# A comma separated list of directories under which to store log | |
files | |
log.dirs=/tmp/kafka-logs-1 | |
#To disable the broker’s HTTP listener, set to a blank value | |
confluent.http.server.listeners= | |
#and comment confluent balancer | |
#confluent.balancer.enable=true | |
etc/kafka/server-two.properties | |
# The id of the broker. This must be set to a unique integer for | |
each broker. | |
broker.id=2 | |
# The port the socket server listens on port=9094 | |
listeners=PLAINTEXT://localhost:9094 | |
# A comma separated list of directories under which to store log | |
files | |
log.dirs=/tmp/kafka-logs-2 | |
#To disable the broker’s HTTP listener, set to a blank value | |
confluent.http.server.listeners= | |
#and comment confluent balancer | |
#confluent.balancer.enable=true | |
Start Multiple Brokers− After all the changes have been made on three servers then | |
open three new terminals to start each broker one by one. | |
Broker1 | |
bin/kafka-server-start etc/kafka/server.properties | |
Broker2 | |
bin/kafka-server-start etc/kafka/server-one.properties | |
Broker3 | |
bin/kafka-server-start etc/kafka/server-two.properties | |
Now we have three different brokers running on the machine. Try it by yourself to check | |
all the daemons by typing jps on the ZooKeeper terminal, then you would see the | |
response. | |
Creating a Topic | |
Let us assign the replication factor value as three for this topic because we have three | |
different brokers running. If you have two brokers, then the assigned replica value will | |
be two. | |
Syntax | |
bin/kafka-topics --create --zookeeper localhost:2181 -- | |
replication-factor 3 --partitions 1 --topic topic-name | |
Example | |
bin/kafka-topics --create --zookeeper localhost:2181 -- | |
replication-factor 3 --partitions 1 --topic | |
Multibrokerapplication | |
Output : Example | |
created topic “Multibrokerapplication” | |
The Describe command is used to check which broker is listening on the current | |
created topic as shown below − | |
bin/kafka-topics --describe --zookeeper localhost:2181 - | |
-topic Multibrokerapplication | |
Output : Example | |
Topic:Multibrokerapplication PartitionCount:1 | |
ReplicationFactor:3 Configs: | |
Topic:Multibrokerapplication Partition:0 Leader:0 | |
Replicas:0,2,1 Isr:0,2,1 | |
From the above output, we can conclude that first line gives a summary of all the | |
partitions, showing topic name, partition count and the replication factor that we have | |
chosen already. In the second line, each node will be the leader for a randomly | |
selected portion of the partitions. | |
In our case, we see that our first broker (with broker.id 0) is the leader. Then | |
Replicas:0,2,1 means that all the brokers replicate the topic finally Isr is the set of | |
insync replicas. Well, this is the subset of replicas that are currently alive and caught up | |
by the leader. | |
Start Producer to Send Messages | |
This procedure remains the same as in the single broker setup. | |
Example | |
bin/kafka-console-producer --broker-list localhost:9092 --topic | |
Multibrokerapplication | |
Output | |
bin/kafka-console-producer --broker-list localhost:9092 --topic | |
Multibrokerapplication | |
[2016-01-20 19:27:21,045] WARN Property topic is not valid | |
(kafka.utils.Verifia-bleProperties) | |
This is single node-multi broker demo This | |
is the second message | |
Start Consumer to Receive Messages | |
This procedure remains the same as shown in the single broker setup. | |
Example | |
bin/kafka-console-consumer --bootstrap-server localhost:9092 -- | |
topic Multibrokerapplication --from-beginning | |
Output | |
bin/kafka-console-consumer --bootstrap-server localhost:9092 | |
--topic Multibrokerapplication --from-beginning | |
This is single node-multi broker demo | |
This is the second message | |
Basic Topic Operations | |
In this chapter we will discuss the various basic topic operations. | |
Modifying a Topic | |
As you have already understood how to create a topic in Kafka Cluster. Now let us | |
modify a created topic using the following command | |
Syntax | |
bin/kafka-topics —zookeeper localhost:2181 --alter --topic | |
topic_name --partitions count | |
Example | |
We have already created a topic “Hello-Kafka” with single partition | |
count and one replica factor. | |
Now using “alter” command we have changed the partition count. | |
bin/kafka-topics --zookeeper localhost:2181 | |
--alter --topic Hello-kafka --partitions 2 | |
Output | |
WARNING: If partitions are increased for a topic that has a | |
key, the partition logic or ordering of the messages will be | |
affected Adding partitions succeeded! | |
Deleting a Topic | |
To delete a topic, you can use the following syntax. | |
Syntax | |
bin/kafka-topics --zookeeper localhost:2181 --delete --topic | |
topic_name | |
Example | |
bin/kafka-topics --zookeeper localhost:2181 --delete --topic | |
Hello-kafka | |
Output | |
> Topic Hello-kafka marked for deletion | |
Note −This will have no impact if delete.topic.enable is not set to true | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment