Created
January 16, 2020 04:24
-
-
Save rhodey/fdb3570ef8216967762f1d63baf21f4e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (C) 2016 An Honest Effort LLC. | |
* | |
* This program is free software: you can redistribute it and/or modify | |
* it under the terms of the GNU General Public License as published by | |
* the Free Software Foundation, either version 3 of the License, or | |
* (at your option) any later version. | |
* | |
* This program is distributed in the hope that it will be useful, | |
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
* GNU General Public License for more details. | |
* | |
* You should have received a copy of the GNU General Public License | |
* along with this program. If not, see <http://www.gnu.org/licenses/>. | |
*/ | |
package org.anhonesteffort.acap25; | |
import lombok.AllArgsConstructor; | |
import lombok.Getter; | |
import org.anhonesteffort.acap25.capture.DiBitChannel; | |
import org.anhonesteffort.acap25.chnlzr.ChnlzrHostsController; | |
import org.anhonesteffort.acap25.chnlzr.ProtocolErrorException; | |
import org.anhonesteffort.acap25.chnlzr.netty.SamplingHandler; | |
import org.anhonesteffort.acap25.capture.CaptureStats; | |
import org.anhonesteffort.acap25.capture.ControlChannelFollower; | |
import org.anhonesteffort.acap25.capture.DiBitCaptureChannel; | |
import org.anhonesteffort.p25.protocol.DataUnitFramer; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.File; | |
import java.io.IOException; | |
import java.util.Map; | |
import java.util.Optional; | |
import java.util.concurrent.CancellationException; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.function.BiConsumer; | |
import static org.anhonesteffort.acap25.Util.string; | |
import static org.anhonesteffort.proto.p25.ProtoP25.P25ChannelId; | |
import static org.anhonesteffort.proto.p25.ProtoP25.P25ChannelId.Type; | |
public class ChannelMonitor { | |
private static final Logger log = LoggerFactory.getLogger(ChannelMonitor.class); | |
private final Metrics metrics = new Metrics(); | |
private final Map<Integer, CompletableFuture<CaptureStats>> futures = new ConcurrentHashMap<>(); | |
@Getter private final CompletableFuture<Void> shutdown = new CompletableFuture<>(); | |
private final ChnlzrHostsController chnlzrs; | |
private final DecoderConfig config; | |
public ChannelMonitor(ChnlzrHostsController chnlzrs, DecoderConfig config) { | |
this.chnlzrs = chnlzrs; | |
this.config = config; | |
shutdown.whenComplete((ok, err) -> futures.values().forEach(future -> future.cancel(true))); | |
} | |
private static int hashcode(P25ChannelId id) { | |
return Integer.valueOf((int) id.getFrequency()).hashCode(); | |
} | |
private static boolean isCancelErr(Throwable err) { | |
return err instanceof CancellationException; | |
} | |
private static boolean isProtoErr(Throwable err) { | |
return err instanceof ProtocolErrorException; | |
} | |
private static boolean isCtrl(P25ChannelId channel) { | |
return channel.getType() == Type.CONTROL; | |
} | |
public CompletableFuture<CaptureStats> capture(P25ChannelId id) { | |
CompletableFuture<CaptureStats> future = new CompletableFuture<>(); | |
if (futures.putIfAbsent(hashcode(id), future) != null || shutdown.isDone()) { | |
future.completeExceptionally(new CancellationException("duplicate channel or shutdown")); | |
return future; | |
} | |
future.whenComplete((ok, err) -> { | |
futures.remove(hashcode(id)); | |
if (shutdown.isDone() || isCancelErr(err)) { return; } | |
if (isCtrl(id) && err != null) { | |
log.error("{} -> channel capture failed with error, shutting down", string(id), err); | |
shutdown.completeExceptionally(err); | |
} else if (isCtrl(id)) { | |
metrics.captureControlComplete(ok); | |
log.info("{} -> channel capture timed out with '{}', resubmitting", string(id), ok); | |
capture(id); | |
} else if (!isCtrl(id) && err != null) { | |
metrics.captureTrafficError(); | |
log.error("{} -> channel capture failed with error", string(id), err); | |
// todo: traffic channel capture failed with error, maybe shutdown? | |
} else if (!isCtrl(id) && ok != null) { | |
metrics.captureTrafficComplete(ok); | |
log.info("{} -> channel capture completed with '{}'", string(id), ok); | |
} | |
}); | |
chnlzrs.createSamplerFor(id) | |
.whenComplete(new ChnlzrCallback(id, future)); | |
return future; | |
} | |
@AllArgsConstructor | |
private final class ChnlzrCallback implements BiConsumer<SamplingHandler, Throwable> { | |
private final P25ChannelId id; | |
private final CompletableFuture<CaptureStats> future; | |
private String[] commands() { | |
if (isCtrl(id)) { | |
return new String[] { | |
"/home/rhodey/.nvm/versions/node/v8.9.3/bin/node", | |
"/home/rhodey/dev/dsp/js/acap25-radio/radio-control.js", | |
"--frequency", Integer.toString((int) id.getFrequency()) | |
}; | |
} else { | |
return new String[] { | |
"/home/rhodey/.nvm/versions/node/v8.9.3/bin/node", | |
"/home/rhodey/dev/dsp/js/acap25-radio/radio-group.js", | |
"--frequency", Integer.toString((int) id.getFrequency()), | |
"--source", Integer.toString(id.getSourceId()), | |
"--group", Integer.toString(id.getGroupId()) | |
}; | |
} | |
} | |
private Process process() throws IOException { | |
return new ProcessBuilder(commands()) | |
.directory(new File("/home/rhodey/dev/dsp/js/acap25-radio")) | |
.start(); | |
} | |
@Override | |
public void accept(SamplingHandler sampling, Throwable err) { | |
if (err != null && !isProtoErr(err)) { | |
log.error("{} -> channel request failed with error, shutting down", string(id), err); | |
shutdown.completeExceptionally(err); | |
} else if (err != null && isCtrl(id)) { | |
log.error("{} -> channel request abandoned, shutting down", string(id)); | |
shutdown.completeExceptionally(err); | |
} else if (err != null) { | |
log.warn("{} -> channel request abandoned", string(id)); | |
future.complete(null); | |
} else { | |
metrics.captureChannelCount(futures.size()); | |
log.info("{} -> channel capture beginning", string(id)); | |
sampling.getShutdown().whenComplete((ok, errr) -> { | |
if (errr != null) { | |
future.completeExceptionally(errr); | |
} else { | |
future.completeExceptionally(new IllegalStateException(String.format( | |
"%s -> source future completed unexpectedly", id | |
))); | |
} | |
}); | |
future.whenComplete((ok, errr) -> sampling.getShutdown().complete(null)); | |
try { | |
DiBitChannel bits = new DiBitChannel(); | |
DiBitCaptureChannel capture = new DiBitCaptureChannel(future, process()); | |
if (isCtrl(id)) { | |
DataUnitFramer framer = new DataUnitFramer(Optional.empty()); | |
ControlChannelFollower follower = new ControlChannelFollower(future, id, ChannelMonitor.this, config.getControlPduRate()); | |
future.whenComplete((ok, errr) -> follower.close()); | |
framer.addSink(follower); | |
capture.addSink(framer); | |
} | |
bits.addSink(capture); | |
sampling.setSink(bits); | |
} catch (IOException errr) { | |
future.completeExceptionally(errr); | |
} | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (C) 2016 An Honest Effort LLC. | |
* | |
* This program is free software: you can redistribute it and/or modify | |
* it under the terms of the GNU General Public License as published by | |
* the Free Software Foundation, either version 3 of the License, or | |
* (at your option) any later version. | |
* | |
* This program is distributed in the hope that it will be useful, | |
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
* GNU General Public License for more details. | |
* | |
* You should have received a copy of the GNU General Public License | |
* along with this program. If not, see <http://www.gnu.org/licenses/>. | |
*/ | |
package org.anhonesteffort.acap25.capture; | |
import org.anhonesteffort.dsp.AbstractSource; | |
import org.anhonesteffort.dsp.Sink; | |
import org.anhonesteffort.p25.protocol.DiBit; | |
import java.io.BufferedReader; | |
import java.io.IOException; | |
import java.io.InputStreamReader; | |
import java.io.OutputStream; | |
import java.util.concurrent.CompletableFuture; | |
public class DiBitCaptureChannel extends AbstractSource<DiBit, Sink<DiBit>> implements Sink<DiBit> { | |
private final CompletableFuture<CaptureStats> shutdown; | |
private final OutputStream subin; | |
private final BufferedReader subout; | |
private int count, bits = 0; | |
/* | |
TODO: | |
create node subprocess w/ P25ChannelId passed through arguments. | |
teach node subprocess to keep up with P25ChannelId changes like GroupChannelCapture does. | |
node can ask two things: | |
1. close the channel due to frame timeout | |
2. spawn new traffic channel based off trunking frames | |
*/ | |
public DiBitCaptureChannel(CompletableFuture<CaptureStats> shutdown, Process process) { | |
this.shutdown = shutdown; | |
subin = process.getOutputStream(); | |
subout = new BufferedReader(new InputStreamReader(process.getInputStream())); | |
shutdown.whenComplete((ok, err) -> process.destroy()); | |
} | |
private CaptureStats readStats() throws IOException { | |
if (!subout.ready()) { | |
return null; | |
} else { | |
String stats = subout.readLine(); | |
if (stats == null || stats.split(",").length != 3) { | |
throw new IllegalStateException(String.format("failed to read stats from subprocess output: %s", stats)); | |
} else { try { | |
return new CaptureStats( | |
Long.parseLong(stats.split(",")[0]), | |
Long.parseLong(stats.split(",")[1]), | |
Long.parseLong(stats.split(",")[2]) | |
); | |
} catch (NumberFormatException err) { | |
throw new IllegalStateException(String.format("failed to read stats from subprocess output: %s", stats)); | |
} } | |
} | |
} | |
private void write(DiBit dbit) throws IOException { | |
bits = ((bits << 2) & 0xFF) + dbit.getValue(); | |
if (++count == 4) { | |
subin.write(bits); | |
subin.flush(); // todo: rate limit | |
count = bits = 0; | |
} | |
} | |
@Override | |
public void consume(DiBit dbit) { | |
try { | |
CaptureStats stats = readStats(); | |
if (stats != null) { | |
shutdown.complete(stats); | |
} else { | |
write(dbit); | |
broadcast(dbit); | |
} | |
} catch (Throwable err) { | |
shutdown.completeExceptionally(err); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment