Skip to content

Instantly share code, notes, and snippets.

@rhodey
Created January 16, 2020 04:24
Show Gist options
  • Save rhodey/fdb3570ef8216967762f1d63baf21f4e to your computer and use it in GitHub Desktop.
Save rhodey/fdb3570ef8216967762f1d63baf21f4e to your computer and use it in GitHub Desktop.
/*
* Copyright (C) 2016 An Honest Effort LLC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.anhonesteffort.acap25;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.anhonesteffort.acap25.capture.DiBitChannel;
import org.anhonesteffort.acap25.chnlzr.ChnlzrHostsController;
import org.anhonesteffort.acap25.chnlzr.ProtocolErrorException;
import org.anhonesteffort.acap25.chnlzr.netty.SamplingHandler;
import org.anhonesteffort.acap25.capture.CaptureStats;
import org.anhonesteffort.acap25.capture.ControlChannelFollower;
import org.anhonesteffort.acap25.capture.DiBitCaptureChannel;
import org.anhonesteffort.p25.protocol.DataUnitFramer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import static org.anhonesteffort.acap25.Util.string;
import static org.anhonesteffort.proto.p25.ProtoP25.P25ChannelId;
import static org.anhonesteffort.proto.p25.ProtoP25.P25ChannelId.Type;
public class ChannelMonitor {
private static final Logger log = LoggerFactory.getLogger(ChannelMonitor.class);
private final Metrics metrics = new Metrics();
private final Map<Integer, CompletableFuture<CaptureStats>> futures = new ConcurrentHashMap<>();
@Getter private final CompletableFuture<Void> shutdown = new CompletableFuture<>();
private final ChnlzrHostsController chnlzrs;
private final DecoderConfig config;
public ChannelMonitor(ChnlzrHostsController chnlzrs, DecoderConfig config) {
this.chnlzrs = chnlzrs;
this.config = config;
shutdown.whenComplete((ok, err) -> futures.values().forEach(future -> future.cancel(true)));
}
private static int hashcode(P25ChannelId id) {
return Integer.valueOf((int) id.getFrequency()).hashCode();
}
private static boolean isCancelErr(Throwable err) {
return err instanceof CancellationException;
}
private static boolean isProtoErr(Throwable err) {
return err instanceof ProtocolErrorException;
}
private static boolean isCtrl(P25ChannelId channel) {
return channel.getType() == Type.CONTROL;
}
public CompletableFuture<CaptureStats> capture(P25ChannelId id) {
CompletableFuture<CaptureStats> future = new CompletableFuture<>();
if (futures.putIfAbsent(hashcode(id), future) != null || shutdown.isDone()) {
future.completeExceptionally(new CancellationException("duplicate channel or shutdown"));
return future;
}
future.whenComplete((ok, err) -> {
futures.remove(hashcode(id));
if (shutdown.isDone() || isCancelErr(err)) { return; }
if (isCtrl(id) && err != null) {
log.error("{} -> channel capture failed with error, shutting down", string(id), err);
shutdown.completeExceptionally(err);
} else if (isCtrl(id)) {
metrics.captureControlComplete(ok);
log.info("{} -> channel capture timed out with '{}', resubmitting", string(id), ok);
capture(id);
} else if (!isCtrl(id) && err != null) {
metrics.captureTrafficError();
log.error("{} -> channel capture failed with error", string(id), err);
// todo: traffic channel capture failed with error, maybe shutdown?
} else if (!isCtrl(id) && ok != null) {
metrics.captureTrafficComplete(ok);
log.info("{} -> channel capture completed with '{}'", string(id), ok);
}
});
chnlzrs.createSamplerFor(id)
.whenComplete(new ChnlzrCallback(id, future));
return future;
}
@AllArgsConstructor
private final class ChnlzrCallback implements BiConsumer<SamplingHandler, Throwable> {
private final P25ChannelId id;
private final CompletableFuture<CaptureStats> future;
private String[] commands() {
if (isCtrl(id)) {
return new String[] {
"/home/rhodey/.nvm/versions/node/v8.9.3/bin/node",
"/home/rhodey/dev/dsp/js/acap25-radio/radio-control.js",
"--frequency", Integer.toString((int) id.getFrequency())
};
} else {
return new String[] {
"/home/rhodey/.nvm/versions/node/v8.9.3/bin/node",
"/home/rhodey/dev/dsp/js/acap25-radio/radio-group.js",
"--frequency", Integer.toString((int) id.getFrequency()),
"--source", Integer.toString(id.getSourceId()),
"--group", Integer.toString(id.getGroupId())
};
}
}
private Process process() throws IOException {
return new ProcessBuilder(commands())
.directory(new File("/home/rhodey/dev/dsp/js/acap25-radio"))
.start();
}
@Override
public void accept(SamplingHandler sampling, Throwable err) {
if (err != null && !isProtoErr(err)) {
log.error("{} -> channel request failed with error, shutting down", string(id), err);
shutdown.completeExceptionally(err);
} else if (err != null && isCtrl(id)) {
log.error("{} -> channel request abandoned, shutting down", string(id));
shutdown.completeExceptionally(err);
} else if (err != null) {
log.warn("{} -> channel request abandoned", string(id));
future.complete(null);
} else {
metrics.captureChannelCount(futures.size());
log.info("{} -> channel capture beginning", string(id));
sampling.getShutdown().whenComplete((ok, errr) -> {
if (errr != null) {
future.completeExceptionally(errr);
} else {
future.completeExceptionally(new IllegalStateException(String.format(
"%s -> source future completed unexpectedly", id
)));
}
});
future.whenComplete((ok, errr) -> sampling.getShutdown().complete(null));
try {
DiBitChannel bits = new DiBitChannel();
DiBitCaptureChannel capture = new DiBitCaptureChannel(future, process());
if (isCtrl(id)) {
DataUnitFramer framer = new DataUnitFramer(Optional.empty());
ControlChannelFollower follower = new ControlChannelFollower(future, id, ChannelMonitor.this, config.getControlPduRate());
future.whenComplete((ok, errr) -> follower.close());
framer.addSink(follower);
capture.addSink(framer);
}
bits.addSink(capture);
sampling.setSink(bits);
} catch (IOException errr) {
future.completeExceptionally(errr);
}
}
}
}
}
/*
* Copyright (C) 2016 An Honest Effort LLC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.anhonesteffort.acap25.capture;
import org.anhonesteffort.dsp.AbstractSource;
import org.anhonesteffort.dsp.Sink;
import org.anhonesteffort.p25.protocol.DiBit;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.concurrent.CompletableFuture;
public class DiBitCaptureChannel extends AbstractSource<DiBit, Sink<DiBit>> implements Sink<DiBit> {
private final CompletableFuture<CaptureStats> shutdown;
private final OutputStream subin;
private final BufferedReader subout;
private int count, bits = 0;
/*
TODO:
create node subprocess w/ P25ChannelId passed through arguments.
teach node subprocess to keep up with P25ChannelId changes like GroupChannelCapture does.
node can ask two things:
1. close the channel due to frame timeout
2. spawn new traffic channel based off trunking frames
*/
public DiBitCaptureChannel(CompletableFuture<CaptureStats> shutdown, Process process) {
this.shutdown = shutdown;
subin = process.getOutputStream();
subout = new BufferedReader(new InputStreamReader(process.getInputStream()));
shutdown.whenComplete((ok, err) -> process.destroy());
}
private CaptureStats readStats() throws IOException {
if (!subout.ready()) {
return null;
} else {
String stats = subout.readLine();
if (stats == null || stats.split(",").length != 3) {
throw new IllegalStateException(String.format("failed to read stats from subprocess output: %s", stats));
} else { try {
return new CaptureStats(
Long.parseLong(stats.split(",")[0]),
Long.parseLong(stats.split(",")[1]),
Long.parseLong(stats.split(",")[2])
);
} catch (NumberFormatException err) {
throw new IllegalStateException(String.format("failed to read stats from subprocess output: %s", stats));
} }
}
}
private void write(DiBit dbit) throws IOException {
bits = ((bits << 2) & 0xFF) + dbit.getValue();
if (++count == 4) {
subin.write(bits);
subin.flush(); // todo: rate limit
count = bits = 0;
}
}
@Override
public void consume(DiBit dbit) {
try {
CaptureStats stats = readStats();
if (stats != null) {
shutdown.complete(stats);
} else {
write(dbit);
broadcast(dbit);
}
} catch (Throwable err) {
shutdown.completeExceptionally(err);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment