Last active
March 17, 2024 07:54
-
-
Save biningo/c2478c0166f33b4bd38a96f75c5ce63f to your computer and use it in GitHub Desktop.
生产者-消费者模式 在文件IO中的应用
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class FileSystemDispatcher { | |
private final AtomicBoolean running = new AtomicBoolean(false); | |
public final int threadNums; | |
public String dirPath; | |
public final BlockingQueue<String> queue = new LinkedBlockingQueue<>(); | |
public ThreadPoolExecutor writeThreads; | |
public Map<String, BufferedOutputStream> fileWriters = new ConcurrentHashMap<>(); | |
public FileSystemDispatcher(String dirPath, int threadNums) { | |
this.dirPath = dirPath; | |
this.threadNums = threadNums; | |
this.writeThreads = new ThreadPoolExecutor( | |
this.threadNums, this.threadNums, | |
0, TimeUnit.SECONDS, | |
new SynchronousQueue<>(), | |
new ThreadFactoryBuilder().setNameFormat("dispatcher-write-%d").build(), | |
new ThreadPoolExecutor.AbortPolicy() | |
); | |
} | |
public void start() { | |
if (!running.compareAndSet(false, true)) { | |
return; | |
} | |
for (int i = 0; i < threadNums; i++) { | |
writeThreads.execute(() -> { | |
while (running.get() || !queue.isEmpty()) { | |
String line = null; | |
try { | |
line = queue.poll(100, TimeUnit.MILLISECONDS); | |
} catch (InterruptedException ignored) { | |
} | |
if (StringUtils.isEmpty(line)) { | |
continue; | |
} | |
doWrite(line); | |
} | |
}); | |
} | |
} | |
public void stop() { | |
if (!running.compareAndSet(true, false)) { | |
return; | |
} | |
writeThreads.shutdown(); | |
while (!queue.isEmpty()) { | |
try { | |
Thread.sleep(100); | |
} catch (InterruptedException ignored) { | |
} | |
} | |
System.out.println("finished"); | |
fileWriters.forEach((key, output) -> { | |
try { | |
output.close(); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
}); | |
} | |
public void dispatch(String line) { | |
try { | |
if (StringUtils.isEmpty(line)) { | |
return; | |
} | |
queue.put(line); | |
} catch (InterruptedException ignored) { | |
} | |
} | |
private void doWrite(String line) { | |
String filename = getFileName(); | |
BufferedOutputStream writer = null; | |
try { | |
writer = getWriter(filename); | |
writer.write((StringUtils.strip(line) + "\r\n").getBytes(StandardCharsets.UTF_8)); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
private synchronized BufferedOutputStream getWriter(String key) throws IOException { | |
Path path = Paths.get(dirPath, key); | |
if (Files.notExists(path)) { | |
Files.createFile(path); | |
} | |
if (!fileWriters.containsKey(key)) { | |
fileWriters.put(key, new BufferedOutputStream(new FileOutputStream(path.toFile(), true))); | |
} | |
return fileWriters.get(key); | |
} | |
private String getFileName() { | |
return Thread.currentThread().getName(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment