Skip to content

Instantly share code, notes, and snippets.

@biningo
Last active March 17, 2024 07:54
Show Gist options
  • Save biningo/c2478c0166f33b4bd38a96f75c5ce63f to your computer and use it in GitHub Desktop.
Save biningo/c2478c0166f33b4bd38a96f75c5ce63f to your computer and use it in GitHub Desktop.
生产者-消费者模式 在文件IO中的应用
public class FileSystemDispatcher {
private final AtomicBoolean running = new AtomicBoolean(false);
public final int threadNums;
public String dirPath;
public final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
public ThreadPoolExecutor writeThreads;
public Map<String, BufferedOutputStream> fileWriters = new ConcurrentHashMap<>();
public FileSystemDispatcher(String dirPath, int threadNums) {
this.dirPath = dirPath;
this.threadNums = threadNums;
this.writeThreads = new ThreadPoolExecutor(
this.threadNums, this.threadNums,
0, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadFactoryBuilder().setNameFormat("dispatcher-write-%d").build(),
new ThreadPoolExecutor.AbortPolicy()
);
}
public void start() {
if (!running.compareAndSet(false, true)) {
return;
}
for (int i = 0; i < threadNums; i++) {
writeThreads.execute(() -> {
while (running.get() || !queue.isEmpty()) {
String line = null;
try {
line = queue.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
}
if (StringUtils.isEmpty(line)) {
continue;
}
doWrite(line);
}
});
}
}
public void stop() {
if (!running.compareAndSet(true, false)) {
return;
}
writeThreads.shutdown();
while (!queue.isEmpty()) {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
}
System.out.println("finished");
fileWriters.forEach((key, output) -> {
try {
output.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
public void dispatch(String line) {
try {
if (StringUtils.isEmpty(line)) {
return;
}
queue.put(line);
} catch (InterruptedException ignored) {
}
}
private void doWrite(String line) {
String filename = getFileName();
BufferedOutputStream writer = null;
try {
writer = getWriter(filename);
writer.write((StringUtils.strip(line) + "\r\n").getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private synchronized BufferedOutputStream getWriter(String key) throws IOException {
Path path = Paths.get(dirPath, key);
if (Files.notExists(path)) {
Files.createFile(path);
}
if (!fileWriters.containsKey(key)) {
fileWriters.put(key, new BufferedOutputStream(new FileOutputStream(path.toFile(), true)));
}
return fileWriters.get(key);
}
private String getFileName() {
return Thread.currentThread().getName();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment