Created
December 23, 2019 08:36
-
-
Save NABEEL-AHMED-JAMIL/1cbe8b3c87f707c26ac9cd2352ba5db1 to your computer and use it in GitHub Desktop.
File Create and Marge Under Thread Demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ballistic; | |
import java.util.ArrayList; | |
import java.util.List; | |
public class ExceptionUtil { | |
public static Throwable getRootCause(final Throwable throwable) { | |
final List<Throwable> list = getThrowableList(throwable); | |
Throwable rootCause = list.size() < 2 ? null : (Throwable) list.get(list.size() - 1); | |
if (rootCause == null) { | |
return throwable; | |
} | |
return rootCause; | |
} | |
public static String getRootCauseMessage(final Throwable throwable) { | |
Throwable root = getRootCause(throwable); | |
return root.toString(); | |
} | |
private static List<Throwable> getThrowableList(Throwable throwable) { | |
final List<Throwable> list = new ArrayList<Throwable>(); | |
while (throwable != null && list.contains(throwable) == false) { | |
list.add(throwable); | |
throwable = throwable.getCause(); | |
} | |
return list; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ballistic; | |
import org.apache.commons.net.ftp.FTP; | |
import org.apache.commons.net.ftp.FTPReply; | |
import org.apache.commons.net.ftp.FTPSClient; | |
import javax.net.ssl.*; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.lang.reflect.Field; | |
import java.lang.reflect.Method; | |
import java.net.Socket; | |
import java.security.KeyManagementException; | |
import java.security.NoSuchAlgorithmException; | |
import java.security.Security; | |
import java.security.cert.X509Certificate; | |
import java.util.Locale; | |
public class FtpFileExchange { | |
private final Integer TENSECONDS = 10*1000; // 10 second | |
// mean directory path for default server | |
private String directoryPath; | |
private String host; | |
private Integer port; | |
private String user; | |
private String password; | |
private ModifiedFTPSClient ftpsClient; | |
public FtpFileExchange() { } | |
public String getHost() { | |
return host; | |
} | |
public FtpFileExchange setHost(String host) { | |
this.host = host; | |
return this; | |
} | |
public Integer getPort() { | |
return port; | |
} | |
public FtpFileExchange setPort(Integer port) { | |
this.port = port; | |
return this; | |
} | |
public String getUser() { | |
return user; | |
} | |
public FtpFileExchange setUser(String user) { | |
this.user = user; | |
return this; | |
} | |
public String getPassword() { | |
return password; | |
} | |
public FtpFileExchange setPassword(String password) { | |
this.password = password; | |
return this; | |
} | |
public String getDirectoryPath() { | |
return directoryPath; | |
} | |
public FtpFileExchange setDirectoryPath(String directoryPath) { | |
this.directoryPath = directoryPath; | |
return this; | |
} | |
public Boolean connectionOpen() throws IOException, NoSuchAlgorithmException, KeyManagementException { | |
Boolean isLogin = false; | |
if(this.port > 100) { | |
Security.addProvider(new com.sun.net.ssl.internal.ssl.Provider()); | |
TrustManager[] trustAllCerts = new TrustManager[] { | |
new X509TrustManager() { | |
public X509Certificate[] getAcceptedIssuers() { | |
return null; | |
} | |
public void checkServerTrusted(X509Certificate[] certs, String authType) { return; } | |
public void checkClientTrusted(X509Certificate[] certs, String authType) { return; } | |
} | |
}; | |
SSLContext sc = SSLContext.getInstance("SSL"); | |
sc.init(null, trustAllCerts, null); | |
this.ftpsClient = new ModifiedFTPSClient(true, sc); | |
} else { | |
this.ftpsClient = new ModifiedFTPSClient(); | |
} | |
this.ftpsClient.setControlKeepAliveTimeout(TENSECONDS); | |
this.showServerReply(this.ftpsClient); | |
System.out.println("FTP :- Connection try :- IP :- (" + this.host + ") , Port :- " + this.port + " Start"); | |
this.ftpsClient.connect(this.host, this.port); | |
System.out.println("FTP :- Connection try :- IP :- (" + this.host + ") , Port :- " + this.port + " Done"); | |
int reply = this.ftpsClient.getReplyCode(); | |
System.out.println("FTP :- Connection Code :- " + reply); | |
if(!FTPReply.isPositiveCompletion(reply)) { | |
this.ftpsClient.disconnect(); | |
throw new IOException("Exception in connecting to FTP Server"); | |
} | |
isLogin = this.ftpsClient.login(user, password); | |
this.showServerReply(this.ftpsClient); | |
System.out.println("FTP :- Login Status :- " + isLogin); | |
return isLogin; | |
} | |
public Boolean uploadFile(final InputStream inputStream, final String fileName) throws Exception { | |
Boolean isUpload = false; | |
if(this.directoryPath != null) { | |
if(!this.isDirectoryExist(this.directoryPath)) { | |
throw new Exception("Directory Not Exist"); | |
} | |
// if directory exist then change the directory | |
this.ftpsClient.changeWorkingDirectory(this.directoryPath); | |
} | |
this.ftpsClient.enterLocalPassiveMode(); | |
this.ftpsClient.execPBSZ(0); | |
this.ftpsClient.execPROT("P"); | |
this.ftpsClient.setFileType(FTP.BINARY_FILE_TYPE); | |
// show the directory where file exist | |
System.out.println("Current Directory " + this.ftpsClient.printWorkingDirectory()); | |
System.out.println("Final Path :- " + fileName); | |
isUpload = this.ftpsClient.storeFile(fileName, inputStream); | |
if(isUpload) { | |
System.out.println("The file is uploaded successfully."); | |
} | |
return isUpload; | |
} | |
public InputStream downloadFile(String directoryPath) throws Exception { | |
if(directoryPath != null) { | |
throw new NullPointerException("Directory Path Null"); | |
} | |
return this.ftpsClient.retrieveFileStream(directoryPath); | |
} | |
public Boolean isDirectoryExist(String directoryPath) throws IOException { | |
Boolean isDirectory = false; | |
if(this.ftpsClient.cwd(directoryPath)==550){ | |
System.out.println("Directory Doesn't Exists"); | |
isDirectory = false; | |
}else if(this.ftpsClient.cwd(directoryPath)==250){ | |
isDirectory = true; | |
System.out.println("Directory Exists"); | |
}else{ | |
isDirectory = false; | |
System.out.println("Unknown Status"); | |
} | |
this.showServerReply(this.ftpsClient); | |
return isDirectory; | |
} | |
public Boolean createDirectory(String directoryPath) throws IOException { | |
if(this.isDirectoryExist(directoryPath)) { | |
System.out.println("Directory Already Exist"); | |
return false; | |
} | |
return this.ftpsClient.makeDirectory(directoryPath); | |
} | |
private void showServerReply(FTPSClient ftpsClient) { | |
String[] replies = ftpsClient.getReplyStrings(); | |
if (replies != null && replies.length > 0) { | |
for (String aReply : replies) { | |
System.out.println("SERVER: " + aReply); | |
} | |
} | |
} | |
// connection close for client | |
public void close() throws IOException { | |
if (this.ftpsClient.isConnected()) { | |
this.ftpsClient.logout(); | |
this.ftpsClient.disconnect(); | |
this.showServerReply(this.ftpsClient); | |
} | |
} | |
private class ModifiedFTPSClient extends FTPSClient { | |
public ModifiedFTPSClient() { | |
super("TLS", false); | |
} | |
public ModifiedFTPSClient(boolean isImplicit) { | |
super("TLS", isImplicit); | |
} | |
// TLS will be default there in ftps-client | |
public ModifiedFTPSClient(boolean isImplicit, SSLContext sc) { | |
super(isImplicit, sc); | |
} | |
@Override | |
protected void _prepareDataSocket_(final Socket socket) throws IOException { | |
if (socket instanceof SSLSocket) { | |
final SSLSession session = ((SSLSocket)_socket_).getSession(); | |
if (session.isValid()) { | |
final SSLSessionContext context = session.getSessionContext(); | |
try { | |
final Field sessionHostPortCache = context.getClass().getDeclaredField("sessionHostPortCache"); | |
sessionHostPortCache.setAccessible(true); | |
final Object cache = sessionHostPortCache.get(context); | |
final Method method = cache.getClass().getDeclaredMethod("put", Object.class, Object.class); | |
method.setAccessible(true); | |
method.invoke(cache, String.format("%s:%s", socket.getInetAddress().getHostName(), | |
String.valueOf(socket.getPort())).toLowerCase(Locale.ROOT), session); | |
method.invoke(cache, String.format("%s:%s", socket.getInetAddress().getHostAddress(), | |
String.valueOf(socket.getPort())).toLowerCase(Locale.ROOT), session); | |
} catch (NoSuchFieldException e) { | |
throw new IOException(e); | |
} catch (Exception e) { | |
throw new IOException(e); | |
} | |
} else { | |
throw new IOException("Invalid SSL Session"); | |
} | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ballistic; | |
import org.apache.commons.io.IOUtils; | |
import java.io.*; | |
public class IOCopier { | |
public BufferedOutputStream createAppendableStream(String destination) throws FileNotFoundException { | |
System.out.println("Destination File :- " + destination); | |
return new BufferedOutputStream(new FileOutputStream(destination, true)); | |
} | |
// use the file and delete the file | |
public void appendFile(OutputStream outputStream, String sourceFile) throws Exception { | |
InputStream inputStream = null; | |
try { | |
System.out.println("Source File :- " + sourceFile); | |
inputStream = new BufferedInputStream(new FileInputStream(sourceFile)); | |
IOUtils.copy(inputStream, outputStream); | |
} finally { | |
if(inputStream != null) { | |
inputStream.close(); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ballistic; | |
import java.io.*; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.UUID; | |
import java.util.stream.Collectors; | |
public class ThreadExample { | |
public final static String HOST = "ftps.msftinc.com"; | |
public final static Integer PORT = 990; | |
public final static String USER = "mac"; | |
public final static String PASSWORD = "Goo"; | |
private String INPUT_FILE_PATH = "E:\\input_file\\"; | |
private String OUTPUT_FILE = "E:\\output_file\\marge_file.txt"; | |
private IOCopier ioCopier; | |
public ThreadExample() {} | |
public IOCopier getIoCopier() { | |
return ioCopier; | |
} | |
public void setIoCopier(IOCopier ioCopier) { | |
this.ioCopier = ioCopier; | |
} | |
public void ftpFileUnderThread() { | |
try { | |
FtpFileExchange ftpExchangeFile = new FtpFileExchange(); | |
ftpExchangeFile.setHost(HOST).setPort(PORT).setUser(USER).setPassword(PASSWORD); | |
for(int i=0; i<1000; i++) { | |
final Thread fileProcessThread = new Thread() { | |
public void run() { | |
try { | |
System.out.println("Current Thread :- " + Thread.currentThread().getName()); | |
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); | |
getMockUpdate(byteArrayOutputStream); | |
synchronized(byteArrayOutputStream) { | |
if(ftpExchangeFile.connectionOpen()) { | |
ftpExchangeFile.uploadFile(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()), | |
Thread.currentThread().getName()+".txt"); | |
} | |
} | |
} catch (Exception ex) { | |
System.err.println("Error :- "+ExceptionUtil.getRootCauseMessage(ex)); | |
} | |
} | |
}; | |
fileProcessThread.setName("UU-"+i); | |
fileProcessThread.start(); | |
} | |
} catch (Exception ex) { | |
System.err.println("Error :- "+ExceptionUtil.getRootCauseMessage(ex)); | |
} | |
} | |
public void localFileUnderThread() { | |
try { | |
for(int i=0; i<1000; i++) { | |
final Thread fileProcessThread = new Thread() { | |
public void run() { | |
Long currentTime = System.currentTimeMillis(); | |
System.out.println("Start Time In Ms :- " + currentTime); | |
try { | |
System.out.println("Current Thread :- " + Thread.currentThread().getName()); | |
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); | |
getMockUpdate(byteArrayOutputStream); | |
if(byteArrayOutputStream != null) { | |
String fileName = INPUT_FILE_PATH+Thread.currentThread().getName()+".txt"; | |
System.out.println("Saving Devices into file " + fileName); | |
try(OutputStream outputStream = new FileOutputStream(fileName)) { | |
byteArrayOutputStream.writeTo(outputStream); | |
} finally { | |
byteArrayOutputStream.close(); | |
} | |
System.out.println("Data Save To " + fileName); | |
} | |
} catch (Exception ex) { | |
System.err.println("Error :- "+ExceptionUtil.getRootCauseMessage(ex)); | |
} | |
System.out.println("End Time In Ms :- " + (System.currentTimeMillis() - currentTime)); | |
} | |
}; | |
fileProcessThread.setName("UU-"+i); | |
fileProcessThread.start(); | |
} | |
} catch (Exception ex) { | |
System.err.println("Error :- "+ExceptionUtil.getRootCauseMessage(ex)); | |
} | |
} | |
public void margeLocalFileUnderThread() { | |
try { | |
OutputStream outputStream; | |
ioCopier = new IOCopier(); | |
String destinationFilePath = OUTPUT_FILE; | |
List<String> sourcesFilePath = Arrays.stream(new File(INPUT_FILE_PATH).listFiles()) | |
.map(file -> { return file.getPath(); }).collect(Collectors.toList()); | |
outputStream = ioCopier.createAppendableStream(destinationFilePath); | |
for(int i=0; i<sourcesFilePath.size(); i++) { | |
OutputStream finalOutputStream = outputStream; | |
int finalI = i; | |
final Thread margeFileThread = new Thread() { | |
@Override | |
public void run() { | |
try { | |
ioCopier.appendFile(finalOutputStream, sourcesFilePath.get(finalI)); | |
} catch (Exception ex) { | |
System.err.println("Error :- "+ExceptionUtil.getRootCauseMessage(ex)); | |
} | |
} | |
}; | |
margeFileThread.setName("UU-"+i); | |
margeFileThread.start(); | |
} | |
} catch (Exception ex) { | |
System.err.println("Error :- "+ExceptionUtil.getRootCauseMessage(ex)); | |
} | |
} | |
private void getMockUpdate(ByteArrayOutputStream byteArrayOutputStream) throws Exception { | |
for (int i=0; i<1000; i++) { | |
String device = Thread.currentThread().getName()+"::"+UUID.randomUUID().toString()+"\n"; | |
System.out.print("Device With Thread :- " + device); | |
byteArrayOutputStream.write(device.getBytes()); | |
} | |
} | |
// thread can be create with (extend thread|impalement thread) | |
public static void main(String args[]) { | |
ThreadExample threadExample = new ThreadExample(); | |
try { | |
//threadExample.ftpFileUnderThread(); | |
//System.out.println("File Create Process Start"); | |
//threadExample.localFileUnderThread(); | |
//System.out.println("Marge Process Start"); | |
//threadExample.margeLocalFileUnderThread(); | |
} catch (Exception ex) { | |
System.err.println("Error :- "+ExceptionUtil.getRootCauseMessage(ex)); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment