Skip to content

Instantly share code, notes, and snippets.

@NABEEL-AHMED-JAMIL
Created December 23, 2019 08:36
Show Gist options
  • Save NABEEL-AHMED-JAMIL/1cbe8b3c87f707c26ac9cd2352ba5db1 to your computer and use it in GitHub Desktop.
Save NABEEL-AHMED-JAMIL/1cbe8b3c87f707c26ac9cd2352ba5db1 to your computer and use it in GitHub Desktop.
File Create and Marge Under Thread Demo
package com.ballistic;
import java.util.ArrayList;
import java.util.List;
public class ExceptionUtil {
public static Throwable getRootCause(final Throwable throwable) {
final List<Throwable> list = getThrowableList(throwable);
Throwable rootCause = list.size() < 2 ? null : (Throwable) list.get(list.size() - 1);
if (rootCause == null) {
return throwable;
}
return rootCause;
}
public static String getRootCauseMessage(final Throwable throwable) {
Throwable root = getRootCause(throwable);
return root.toString();
}
private static List<Throwable> getThrowableList(Throwable throwable) {
final List<Throwable> list = new ArrayList<Throwable>();
while (throwable != null && list.contains(throwable) == false) {
list.add(throwable);
throwable = throwable.getCause();
}
return list;
}
}
package com.ballistic;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.commons.net.ftp.FTPSClient;
import javax.net.ssl.*;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.Socket;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.Security;
import java.security.cert.X509Certificate;
import java.util.Locale;
public class FtpFileExchange {
private final Integer TENSECONDS = 10*1000; // 10 second
// mean directory path for default server
private String directoryPath;
private String host;
private Integer port;
private String user;
private String password;
private ModifiedFTPSClient ftpsClient;
public FtpFileExchange() { }
public String getHost() {
return host;
}
public FtpFileExchange setHost(String host) {
this.host = host;
return this;
}
public Integer getPort() {
return port;
}
public FtpFileExchange setPort(Integer port) {
this.port = port;
return this;
}
public String getUser() {
return user;
}
public FtpFileExchange setUser(String user) {
this.user = user;
return this;
}
public String getPassword() {
return password;
}
public FtpFileExchange setPassword(String password) {
this.password = password;
return this;
}
public String getDirectoryPath() {
return directoryPath;
}
public FtpFileExchange setDirectoryPath(String directoryPath) {
this.directoryPath = directoryPath;
return this;
}
public Boolean connectionOpen() throws IOException, NoSuchAlgorithmException, KeyManagementException {
Boolean isLogin = false;
if(this.port > 100) {
Security.addProvider(new com.sun.net.ssl.internal.ssl.Provider());
TrustManager[] trustAllCerts = new TrustManager[] {
new X509TrustManager() {
public X509Certificate[] getAcceptedIssuers() {
return null;
}
public void checkServerTrusted(X509Certificate[] certs, String authType) { return; }
public void checkClientTrusted(X509Certificate[] certs, String authType) { return; }
}
};
SSLContext sc = SSLContext.getInstance("SSL");
sc.init(null, trustAllCerts, null);
this.ftpsClient = new ModifiedFTPSClient(true, sc);
} else {
this.ftpsClient = new ModifiedFTPSClient();
}
this.ftpsClient.setControlKeepAliveTimeout(TENSECONDS);
this.showServerReply(this.ftpsClient);
System.out.println("FTP :- Connection try :- IP :- (" + this.host + ") , Port :- " + this.port + " Start");
this.ftpsClient.connect(this.host, this.port);
System.out.println("FTP :- Connection try :- IP :- (" + this.host + ") , Port :- " + this.port + " Done");
int reply = this.ftpsClient.getReplyCode();
System.out.println("FTP :- Connection Code :- " + reply);
if(!FTPReply.isPositiveCompletion(reply)) {
this.ftpsClient.disconnect();
throw new IOException("Exception in connecting to FTP Server");
}
isLogin = this.ftpsClient.login(user, password);
this.showServerReply(this.ftpsClient);
System.out.println("FTP :- Login Status :- " + isLogin);
return isLogin;
}
public Boolean uploadFile(final InputStream inputStream, final String fileName) throws Exception {
Boolean isUpload = false;
if(this.directoryPath != null) {
if(!this.isDirectoryExist(this.directoryPath)) {
throw new Exception("Directory Not Exist");
}
// if directory exist then change the directory
this.ftpsClient.changeWorkingDirectory(this.directoryPath);
}
this.ftpsClient.enterLocalPassiveMode();
this.ftpsClient.execPBSZ(0);
this.ftpsClient.execPROT("P");
this.ftpsClient.setFileType(FTP.BINARY_FILE_TYPE);
// show the directory where file exist
System.out.println("Current Directory " + this.ftpsClient.printWorkingDirectory());
System.out.println("Final Path :- " + fileName);
isUpload = this.ftpsClient.storeFile(fileName, inputStream);
if(isUpload) {
System.out.println("The file is uploaded successfully.");
}
return isUpload;
}
public InputStream downloadFile(String directoryPath) throws Exception {
if(directoryPath != null) {
throw new NullPointerException("Directory Path Null");
}
return this.ftpsClient.retrieveFileStream(directoryPath);
}
public Boolean isDirectoryExist(String directoryPath) throws IOException {
Boolean isDirectory = false;
if(this.ftpsClient.cwd(directoryPath)==550){
System.out.println("Directory Doesn't Exists");
isDirectory = false;
}else if(this.ftpsClient.cwd(directoryPath)==250){
isDirectory = true;
System.out.println("Directory Exists");
}else{
isDirectory = false;
System.out.println("Unknown Status");
}
this.showServerReply(this.ftpsClient);
return isDirectory;
}
public Boolean createDirectory(String directoryPath) throws IOException {
if(this.isDirectoryExist(directoryPath)) {
System.out.println("Directory Already Exist");
return false;
}
return this.ftpsClient.makeDirectory(directoryPath);
}
private void showServerReply(FTPSClient ftpsClient) {
String[] replies = ftpsClient.getReplyStrings();
if (replies != null && replies.length > 0) {
for (String aReply : replies) {
System.out.println("SERVER: " + aReply);
}
}
}
// connection close for client
public void close() throws IOException {
if (this.ftpsClient.isConnected()) {
this.ftpsClient.logout();
this.ftpsClient.disconnect();
this.showServerReply(this.ftpsClient);
}
}
private class ModifiedFTPSClient extends FTPSClient {
public ModifiedFTPSClient() {
super("TLS", false);
}
public ModifiedFTPSClient(boolean isImplicit) {
super("TLS", isImplicit);
}
// TLS will be default there in ftps-client
public ModifiedFTPSClient(boolean isImplicit, SSLContext sc) {
super(isImplicit, sc);
}
@Override
protected void _prepareDataSocket_(final Socket socket) throws IOException {
if (socket instanceof SSLSocket) {
final SSLSession session = ((SSLSocket)_socket_).getSession();
if (session.isValid()) {
final SSLSessionContext context = session.getSessionContext();
try {
final Field sessionHostPortCache = context.getClass().getDeclaredField("sessionHostPortCache");
sessionHostPortCache.setAccessible(true);
final Object cache = sessionHostPortCache.get(context);
final Method method = cache.getClass().getDeclaredMethod("put", Object.class, Object.class);
method.setAccessible(true);
method.invoke(cache, String.format("%s:%s", socket.getInetAddress().getHostName(),
String.valueOf(socket.getPort())).toLowerCase(Locale.ROOT), session);
method.invoke(cache, String.format("%s:%s", socket.getInetAddress().getHostAddress(),
String.valueOf(socket.getPort())).toLowerCase(Locale.ROOT), session);
} catch (NoSuchFieldException e) {
throw new IOException(e);
} catch (Exception e) {
throw new IOException(e);
}
} else {
throw new IOException("Invalid SSL Session");
}
}
}
}
}
package com.ballistic;
import org.apache.commons.io.IOUtils;
import java.io.*;
public class IOCopier {
public BufferedOutputStream createAppendableStream(String destination) throws FileNotFoundException {
System.out.println("Destination File :- " + destination);
return new BufferedOutputStream(new FileOutputStream(destination, true));
}
// use the file and delete the file
public void appendFile(OutputStream outputStream, String sourceFile) throws Exception {
InputStream inputStream = null;
try {
System.out.println("Source File :- " + sourceFile);
inputStream = new BufferedInputStream(new FileInputStream(sourceFile));
IOUtils.copy(inputStream, outputStream);
} finally {
if(inputStream != null) {
inputStream.close();
}
}
}
}
package com.ballistic;
import java.io.*;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
public class ThreadExample {
public final static String HOST = "ftps.msftinc.com";
public final static Integer PORT = 990;
public final static String USER = "mac";
public final static String PASSWORD = "Goo";
private String INPUT_FILE_PATH = "E:\\input_file\\";
private String OUTPUT_FILE = "E:\\output_file\\marge_file.txt";
private IOCopier ioCopier;
public ThreadExample() {}
public IOCopier getIoCopier() {
return ioCopier;
}
public void setIoCopier(IOCopier ioCopier) {
this.ioCopier = ioCopier;
}
public void ftpFileUnderThread() {
try {
FtpFileExchange ftpExchangeFile = new FtpFileExchange();
ftpExchangeFile.setHost(HOST).setPort(PORT).setUser(USER).setPassword(PASSWORD);
for(int i=0; i<1000; i++) {
final Thread fileProcessThread = new Thread() {
public void run() {
try {
System.out.println("Current Thread :- " + Thread.currentThread().getName());
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
getMockUpdate(byteArrayOutputStream);
synchronized(byteArrayOutputStream) {
if(ftpExchangeFile.connectionOpen()) {
ftpExchangeFile.uploadFile(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()),
Thread.currentThread().getName()+".txt");
}
}
} catch (Exception ex) {
System.err.println("Error :- "+ExceptionUtil.getRootCauseMessage(ex));
}
}
};
fileProcessThread.setName("UU-"+i);
fileProcessThread.start();
}
} catch (Exception ex) {
System.err.println("Error :- "+ExceptionUtil.getRootCauseMessage(ex));
}
}
public void localFileUnderThread() {
try {
for(int i=0; i<1000; i++) {
final Thread fileProcessThread = new Thread() {
public void run() {
Long currentTime = System.currentTimeMillis();
System.out.println("Start Time In Ms :- " + currentTime);
try {
System.out.println("Current Thread :- " + Thread.currentThread().getName());
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
getMockUpdate(byteArrayOutputStream);
if(byteArrayOutputStream != null) {
String fileName = INPUT_FILE_PATH+Thread.currentThread().getName()+".txt";
System.out.println("Saving Devices into file " + fileName);
try(OutputStream outputStream = new FileOutputStream(fileName)) {
byteArrayOutputStream.writeTo(outputStream);
} finally {
byteArrayOutputStream.close();
}
System.out.println("Data Save To " + fileName);
}
} catch (Exception ex) {
System.err.println("Error :- "+ExceptionUtil.getRootCauseMessage(ex));
}
System.out.println("End Time In Ms :- " + (System.currentTimeMillis() - currentTime));
}
};
fileProcessThread.setName("UU-"+i);
fileProcessThread.start();
}
} catch (Exception ex) {
System.err.println("Error :- "+ExceptionUtil.getRootCauseMessage(ex));
}
}
public void margeLocalFileUnderThread() {
try {
OutputStream outputStream;
ioCopier = new IOCopier();
String destinationFilePath = OUTPUT_FILE;
List<String> sourcesFilePath = Arrays.stream(new File(INPUT_FILE_PATH).listFiles())
.map(file -> { return file.getPath(); }).collect(Collectors.toList());
outputStream = ioCopier.createAppendableStream(destinationFilePath);
for(int i=0; i<sourcesFilePath.size(); i++) {
OutputStream finalOutputStream = outputStream;
int finalI = i;
final Thread margeFileThread = new Thread() {
@Override
public void run() {
try {
ioCopier.appendFile(finalOutputStream, sourcesFilePath.get(finalI));
} catch (Exception ex) {
System.err.println("Error :- "+ExceptionUtil.getRootCauseMessage(ex));
}
}
};
margeFileThread.setName("UU-"+i);
margeFileThread.start();
}
} catch (Exception ex) {
System.err.println("Error :- "+ExceptionUtil.getRootCauseMessage(ex));
}
}
private void getMockUpdate(ByteArrayOutputStream byteArrayOutputStream) throws Exception {
for (int i=0; i<1000; i++) {
String device = Thread.currentThread().getName()+"::"+UUID.randomUUID().toString()+"\n";
System.out.print("Device With Thread :- " + device);
byteArrayOutputStream.write(device.getBytes());
}
}
// thread can be create with (extend thread|impalement thread)
public static void main(String args[]) {
ThreadExample threadExample = new ThreadExample();
try {
//threadExample.ftpFileUnderThread();
//System.out.println("File Create Process Start");
//threadExample.localFileUnderThread();
//System.out.println("Marge Process Start");
//threadExample.margeLocalFileUnderThread();
} catch (Exception ex) {
System.err.println("Error :- "+ExceptionUtil.getRootCauseMessage(ex));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment