Skip to content

Instantly share code, notes, and snippets.

@cogmission
Last active September 29, 2016 06:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cogmission/2f6c9c82280ae81a12a4037ee313a781 to your computer and use it in GitHub Desktop.
Save cogmission/2f6c9c82280ae81a12a4037ee313a781 to your computer and use it in GitHub Desktop.
htmjava_detector and HTMModel.java
# ----------------------------------------------------------------------
# Copyright (C) 2016, Numenta, Inc. Unless you have an agreement
# with Numenta, Inc., for a separate license for this software code, the
# following terms and conditions apply:
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero Public License version 3 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU Affero Public License for more details.
#
# You should have received a copy of the GNU Affero Public License
# along with this program. If not, see http://www.gnu.org/licenses.
#
# http://numenta.org/licenses/
# ----------------------------------------------------------------------
import math
import simplejson as json
from subprocess import Popen, PIPE
from nupic.algorithms import anomaly_likelihood
from nupic.frameworks.opf.common_models.cluster_params import (
getScalarMetricWithTimeOfDayAnomalyParams)
from nab.detectors.base import AnomalyDetector
class HtmjavaDetector(AnomalyDetector):
"""
Inspired by the 'NumentaDetector' replacing the 'OPF CLAModel' with a
java subprocess running 'htm.java' model
"""
def __init__(self, *args, **kwargs):
super(HtmjavaDetector, self).__init__(*args, **kwargs)
self.model = None
self.sensorParams = None
self.modelParams = None
self.anomalyLikelihood = None
def getAdditionalHeaders(self):
"""Returns a list of strings."""
return ["raw_score"]
def handleRecord(self, inputData):
"""Returns a tuple (anomalyScore, rawScore).
Internally to NuPIC "anomalyScore" corresponds to "likelihood_score"
and "rawScore" corresponds to "anomaly_score". Sorry about that.
"""
# Send input to HTM Java detector
line = "{0},{1}\n".format(inputData['timestamp'], inputData['value'])
self.model.stdin.writelines(line)
# Retrieve the anomaly score
result = self.model.stdout.readline()
rawScore = float(result)
# Compute log(anomaly likelihood)
anomalyScore = self.anomalyLikelihood.anomalyProbability(
inputData["value"], rawScore, inputData["timestamp"])
logScore = self.anomalyLikelihood.computeLogLikelihood(anomalyScore)
return (logScore, rawScore)
def initialize(self):
# Get config params, setting the RDSE resolution
rangePadding = abs(self.inputMax - self.inputMin) * 0.2
self.modelParams = getScalarMetricWithTimeOfDayAnomalyParams(
metricData=[0],
minVal=self.inputMin-rangePadding,
maxVal=self.inputMax+rangePadding,
minResolution=0.001,
tmImplementation="tm_cpp"
)["modelConfig"]
self._setupEncoderParams(
self.modelParams["modelParams"]["sensorParams"]["encoders"])
# Initialize the anomaly likelihood object
numentaLearningPeriod = math.floor(self.probationaryPeriod / 2.0)
self.anomalyLikelihood = anomaly_likelihood.AnomalyLikelihood(
claLearningPeriod=numentaLearningPeriod,
estimationSamples=self.probationaryPeriod-numentaLearningPeriod,
reestimationPeriod=100
)
def _stopModel(self):
"""
Stop HTM Java model process
"""
if self.model:
self.model.terminate()
self.model = None
def run(self):
# Launch HTM Java detector per process passing OPF model parameters
self.model = Popen(["java", "-jar",
"./nab/detectors/htmjava/build/libs/htm.java-nab.jar",
json.dumps(self.modelParams)],
stdin=PIPE, stdout=PIPE)
response = super(HtmjavaDetector, self).run()
# Terminate HTM Java
self.model.stdin.writelines("\n")
self._stopModel()
return response
def _setupEncoderParams(self, encoderParams):
# The encoder must expect the NAB-specific datafile headers
encoderParams["timestamp_dayOfWeek"] = encoderParams.pop("c0_dayOfWeek")
encoderParams["timestamp_timeOfDay"] = encoderParams.pop("c0_timeOfDay")
encoderParams["timestamp_timeOfDay"]["fieldname"] = "timestamp"
encoderParams["timestamp_timeOfDay"]["name"] = "timestamp"
encoderParams["timestamp_weekend"] = encoderParams.pop("c0_weekend")
encoderParams["value"] = encoderParams.pop("c1")
encoderParams["value"]["fieldname"] = "value"
encoderParams["value"]["name"] = "value"
self.sensorParams = encoderParams["value"]
package nab.detectors.htmjava;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.joda.time.DateTimeZone;
import org.numenta.nupic.Connections;
import org.numenta.nupic.Parameters;
import org.numenta.nupic.Parameters.KEY;
import org.numenta.nupic.algorithms.Anomaly;
import org.numenta.nupic.algorithms.SpatialPooler;
import org.numenta.nupic.algorithms.TemporalMemory;
import org.numenta.nupic.network.Layer;
import org.numenta.nupic.network.Network;
import org.numenta.nupic.network.PublisherSupplier;
import org.numenta.nupic.network.Region;
import org.numenta.nupic.network.sensor.ObservableSensor;
import org.numenta.nupic.network.sensor.Publisher;
import org.numenta.nupic.network.sensor.Sensor;
import org.numenta.nupic.network.sensor.SensorParams;
import org.numenta.nupic.util.Tuple;
import org.numenta.nupic.util.UniversalRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
public class HTMModel {
protected static final Logger LOGGER = LoggerFactory.getLogger(HTMModel.class);
private Network network;
private PublisherSupplier supplier;
/**
* Create HTM Model to be used by NAB
* @param modelParams OPF Model parameters to parameters from
*/
public HTMModel(JsonNode modelParams) {
LOGGER.trace("HTMModel({})", modelParams);
// Create Sensor publisher to push NAB input data to network
supplier = PublisherSupplier.builder()
.addHeader("timestamp,value")
.addHeader("datetime,float")
.addHeader("T,B")
.build();
// Get updated model parameters
Parameters parameters = getModelParameters(modelParams);
parameters.set(KEY.RANDOM, new UniversalRandom(42));//712
parameters.set(KEY.SEED, 42);
// Create NAB Network
network = Network.create("NAB Network", parameters)
.add(Network.createRegion("NAB Region")
.add(Network.createLayer("NAB Layer", parameters)
.add(Anomaly.create())
.add(new TemporalMemory())
.add(new SpatialPooler())
.add(Sensor.create(ObservableSensor::create,
SensorParams.create(SensorParams.Keys::obs, "Manual Input", supplier)))));
}
/**
* Update encoders parameters
* @param modelParams OPF Model parameters to get encoder parameters from
* @return Updated Encoder parameters suitable for {@link Parameters.KEY.FIELD_ENCODING_MAP}
*/
public Map<String, Map<String, Object>> getFieldEncodingMap(JsonNode modelParams) {
Map<String, Map<String, Object>> fieldEncodings = new HashMap<>();
String fieldName;
Map<String, Object> fieldMap;
JsonNode encoders = modelParams.path("encoders");
LOGGER.trace("getFieldEncodingMap({})", encoders);
for (JsonNode node : encoders) {
if (node.isNull())
continue;
fieldName = node.path("fieldname").textValue();
fieldMap = fieldEncodings.get(fieldName);
if (fieldMap == null) {
fieldMap = new HashMap<>();
fieldMap.put("fieldName", fieldName);
fieldEncodings.put(fieldName, fieldMap);
}
fieldMap.put("encoderType", node.path("type").textValue());
if (node.has("timeOfDay")) {
JsonNode timeOfDay = node.get("timeOfDay");
fieldMap.put("fieldType", "datetime");
fieldMap.put(KEY.DATEFIELD_PATTERN.getFieldName(), "YYYY-MM-dd HH:mm:ss");
fieldMap.put(KEY.DATEFIELD_TOFD.getFieldName(),
new Tuple(timeOfDay.get(0).asInt(), timeOfDay.get(1).asDouble()));
} else {
fieldMap.put("fieldType", "float");
}
if (node.has("resolution")) {
fieldMap.put("resolution", node.get("resolution").asDouble());
}
}
LOGGER.trace("getFieldEncodingMap => {}", fieldEncodings);
return fieldEncodings;
}
/**
* Update Spatial Pooler parameters
* @param modelParams OPF Model parameters to get spatial pooler parameters from
* @return Updated Spatial Pooler parameters
*/
public Parameters getSpatialPoolerParams(JsonNode modelParams) {
Parameters p = Parameters.getSpatialDefaultParameters();
JsonNode spParams = modelParams.path("spParams");
LOGGER.trace("getSpatialPoolerParams({})", spParams);
if (spParams.has("columnCount")) {
p.set(KEY.COLUMN_DIMENSIONS, new int[]{spParams.get("columnCount").asInt()});
}
if (spParams.has("maxBoost")) {
p.set(KEY.MAX_BOOST, spParams.get("maxBoost").asDouble());
}
if (spParams.has("synPermInactiveDec")) {
p.set(KEY.SYN_PERM_INACTIVE_DEC, spParams.get("synPermInactiveDec").asDouble());
}
if (spParams.has("synPermConnected")) {
p.set(KEY.SYN_PERM_CONNECTED, spParams.get("synPermConnected").asDouble());
}
if (spParams.has("synPermActiveInc")) {
p.set(KEY.SYN_PERM_ACTIVE_INC, spParams.get("synPermActiveInc").asDouble());
}
if (spParams.has("seed")) {
p.set(KEY.SEED, spParams.get("seed").asInt());
}
if (spParams.has("numActiveColumnsPerInhArea")) {
p.set(KEY.NUM_ACTIVE_COLUMNS_PER_INH_AREA, spParams.get("numActiveColumnsPerInhArea").asDouble());
}
if (spParams.has("globalInhibition")) {
p.set(KEY.GLOBAL_INHIBITION, spParams.get("globalInhibition").asBoolean());
}
if (spParams.has("potentialPct")) {
p.set(KEY.POTENTIAL_PCT, spParams.get("potentialPct").asDouble());
}
LOGGER.trace("getSpatialPoolerParams => {}", p);
return p;
}
/**
* Update Temporal Memory parameters
* @param modelParams OPF Model parameters to get Temporal Memory parameters from
* @return Updated Temporal Memory parameters
*/
public Parameters getTemporalMemoryParams(JsonNode modelParams) {
Parameters p = Parameters.getTemporalDefaultParameters();
JsonNode tpParams = modelParams.path("tpParams");
LOGGER.trace("getTemporalMemoryParams({})", tpParams);
if (tpParams.has("columnCount")) {
p.set(KEY.COLUMN_DIMENSIONS, new int[]{tpParams.get("columnCount").asInt()});
}
if (tpParams.has("inputWidth")) {
p.set(KEY.INPUT_DIMENSIONS, new int[]{tpParams.get("inputWidth").asInt()});
}
if (tpParams.has("activationThreshold")) {
p.set(KEY.ACTIVATION_THRESHOLD, tpParams.get("activationThreshold").asInt());
}
if (tpParams.has("cellsPerColumn")) {
p.set(KEY.CELLS_PER_COLUMN, tpParams.get("cellsPerColumn").asInt());
}
if (tpParams.has("permanenceInc")) {
p.set(KEY.PERMANENCE_INCREMENT, tpParams.get("permanenceInc").asDouble());
}
if (tpParams.has("minThreshold")) {
p.set(KEY.MIN_THRESHOLD, tpParams.get("minThreshold").asInt());
}
if (tpParams.has("initialPerm")) {
p.set(KEY.INITIAL_PERMANENCE, tpParams.get("initialPerm").asDouble());
// + "globalDecay": 0.0,
// + "maxAge": 0,
}
if(tpParams.has("maxSegmentsPerCell")) {
p.set(KEY.MAX_SEGMENTS_PER_CELL, tpParams.get("maxSegmentsPerCell").asInt());
}
if(tpParams.has("maxSynapsesPerSegment")) {
p.set(KEY.MAX_SYNAPSES_PER_SEGMENT, tpParams.get("maxSynapsesPerSegment").asInt());
}
if (tpParams.has("permanenceDec")) {
p.set(KEY.PERMANENCE_DECREMENT, tpParams.get("permanenceDec").asDouble());
}
if (tpParams.has("predictedSegmentDecrement")) {
p.set(KEY.PREDICTED_SEGMENT_DECREMENT, tpParams.get("predictedSegmentDecrement").asDouble());
}
if (tpParams.has("seed")) {
p.set(KEY.SEED, tpParams.get("seed").asInt());
}
if (tpParams.has("newSynapseCount")) {
p.set(KEY.MAX_NEW_SYNAPSE_COUNT, tpParams.get("newSynapseCount").intValue());
}
p.set(KEY.POTENTIAL_RADIUS, 2048);
LOGGER.trace("getTemporalMemoryParams => {}", p);
return p;
}
/**
* Update Sensor parameters
* @param modelParams OPF Model parameters to get Sensor parameters from
* @return Updated Sensor parameters
*/
public Parameters getSensorParams(JsonNode modelParams) {
JsonNode sensorParams = modelParams.path("sensorParams");
LOGGER.trace("getSensorParams({})", sensorParams);
Map<String, Map<String, Object>> fieldEncodings = getFieldEncodingMap(sensorParams);
Parameters p = Parameters.empty();
p.set(KEY.CLIP_INPUT, true);
p.set(KEY.FIELD_ENCODING_MAP, fieldEncodings);
LOGGER.trace("getSensorParams => {}", p);
return p;
}
/**
* Update NAB parameters
* @param params OPF parameters to get NAB model parameters from
* @return Updated Model parameters
*/
public Parameters getModelParameters(JsonNode params) {
JsonNode modelParams = params.path("modelParams");
LOGGER.trace("getModelParameters({})", modelParams);
Parameters p = Parameters.getAllDefaultParameters()
.union(getSpatialPoolerParams(modelParams))
.union(getTemporalMemoryParams(modelParams))
.union(getSensorParams(modelParams));
LOGGER.trace("getModelParameters => {}", p);
return p;
}
public Publisher getPublisher() {
return supplier.get();
}
public Network getNetwork() {
return network;
}
public void showDebugInfo() {
Region region = network.getHead();
Layer<?> layer = region.lookup("NAB Layer");
Connections connections = layer.getConnections();
double[] cycles = connections.getActiveDutyCycles();
int spActive = 0;
for (int i = 0; i < cycles.length; i++) {
if (cycles[i] > 0) {
spActive++;
}
}
LOGGER.debug("SP ActiveDutyCycles: {}", spActive);
}
public void showConfigInfo() {
Region region = network.getHead();
Layer<?> layer = region.lookup("NAB Layer");
Connections connections = layer.getConnections();
LOGGER.trace("==================== connections.getPrintString() ====================");
LOGGER.trace(connections.getPrintString());
LOGGER.trace("======================================================================");
}
/**
* Launch htm.java NAB detector
*
* Usage:
* As a standalone application (for debug purpose only):
*
* java -jar htm.java-nab.jar "{\"modelParams\":{....}}" < nab_data.csv > anomalies.out
*
* For complete list of command line options use:
*
* java -jar htm.java-nab.jar --help
*
* As a NAB detector (see 'htmjava_detector.py'):
*
* python run.py --detect --score --normalize -d htmjava
*
* Logging options, see "log4j.properties":
*
* - "LOGLEVEL": Controls log output (default: "OFF")
* - "LOGGER": Either "CONSOLE" or "FILE" (default: "CONSOLE")
* - "LOGFILE": Log file destination (default: "htmjava.log")
*
* For example:
*
* java -DLOGLEVEL=TRACE -DLOGGER=FILE -jar htm.java-nab.jar "{\"modelParams\":{....}}" < nab_data.csv > anomalies.out
*
*/
@SuppressWarnings("resource")
public static void main(String[] args) {
try {
LOGGER.trace("main({})", Arrays.asList(args));
// Parse command line args
OptionParser parser = new OptionParser();
parser.nonOptions("OPF parameters object (JSON)");
parser.acceptsAll(Arrays.asList("p", "params"), "OPF parameters file (JSON).\n(default: first non-option argument)")
.withOptionalArg()
.ofType(File.class);
parser.acceptsAll(Arrays.asList("i", "input"), "Input data file (csv).\n(default: stdin)")
.withOptionalArg()
.ofType(File.class);
parser.acceptsAll(Arrays.asList("o", "output"), "Output results file (csv).\n(default: stdout)")
.withOptionalArg()
.ofType(File.class);
parser.acceptsAll(Arrays.asList("s", "skip"), "Header lines to skip")
.withOptionalArg()
.ofType(Integer.class)
.defaultsTo(0);
parser.acceptsAll(Arrays.asList("h", "?", "help"), "Help");
OptionSet options = parser.parse(args);
if (args.length == 0 || options.has("h")) {
parser.printHelpOn(System.out);
return;
}
// Get in/out files
final PrintStream output;
final InputStream input;
if (options.has("i")) {
input = new FileInputStream((File)options.valueOf("i"));
} else {
input = System.in;
}
if (options.has("o")) {
output = new PrintStream((File)options.valueOf("o"));
} else {
output = System.out;
}
// Parse OPF Model Parameters
JsonNode params;
ObjectMapper mapper = new ObjectMapper();
if (options.has("p")) {
params = mapper.readTree((File)options.valueOf("p"));
} else if (options.nonOptionArguments().isEmpty()) {
try { input.close(); }catch(Exception ignore) {}
if(options.has("o")) {
try { output.flush(); output.close(); }catch(Exception ignore) {}
}
throw new IllegalArgumentException("Expecting OPF parameters. See 'help' for more information");
} else {
params = mapper.readTree((String)options.nonOptionArguments().get(0));
}
// Number of header lines to skip
int skip = (int) options.valueOf("s");
// Force timezone to UTC
DateTimeZone.setDefault(DateTimeZone.UTC);
// Create NAB Network Model
HTMModel model = new HTMModel(params);
Network network = model.getNetwork();
network.observe().subscribe((inference) -> {
double score = inference.getAnomalyScore();
int record = inference.getRecordNum();
LOGGER.trace("record = {}, score = {}", record, score);
output.println(score);
}, (error) -> {
LOGGER.error("Error processing data", error);
}, () -> {
LOGGER.trace("Done processing data");
if(LOGGER.isDebugEnabled()) {
model.showDebugInfo();
}
});
// If debugging, print out Connections before starting
// the Network
if(LOGGER.isTraceEnabled()) {
model.showConfigInfo();
}
network.start();
// Pipe data to network
Publisher publisher = model.getPublisher();
BufferedReader in = new BufferedReader(new InputStreamReader(input));
String line;
while ((line = in.readLine()) != null && line.trim().length() > 0) {
// Skip header lines
if (skip > 0) {
skip--;
continue;
}
publisher.onNext(line);
}
publisher.onComplete();
in.close();
LOGGER.trace("Done publishing data");
} catch (IOException e) {
e.printStackTrace();
}
}
}
{
"bayesChangePt": {
"reward_low_FN_rate": {
"score": -119.7471580442638,
"threshold": 0.9813537597656257
},
"reward_low_FP_rate": {
"score": -108.56073591812678,
"threshold": 0.9945251464843755
},
"standard": {
"score": -74.9034705754507,
"threshold": 0.9845962524414067
}
},
"expose": {
"reward_low_FN_rate": {
"score": -138.31510519385375,
"threshold": 0.9935243606567389
},
"reward_low_FP_rate": {
"score": -108.5970398319597,
"threshold": 0.9999267578125005
},
"standard": {
"score": -77.86692739970123,
"threshold": 0.9947875976562506
}
},
"htmjava": {
"reward_low_FN_rate": {
"score": -12.493787098873803,
"threshold": 0.5031753540039061
},
"reward_low_FP_rate": {
"score": -0.06692378134318133,
"threshold": 0.5258056640625001
},
"standard": {
"score": 19.50621290112619,
"threshold": 0.5031753540039061
}
},
"null": {
"reward_low_FN_rate": {
"score": -232.0,
"threshold": 0.6250000000000001
},
"reward_low_FP_rate": {
"score": -116.0,
"threshold": 0.6250000000000001
},
"standard": {
"score": -116.0,
"threshold": 0.6250000000000001
}
},
"numenta": {
"reward_low_FN_rate": {
"score": 9.638038476183745,
"threshold": 0.5218505859374999
},
"reward_low_FP_rate": {
"score": 20.322843547289906,
"threshold": 0.5312498092651368
},
"standard": {
"score": 35.638038476183745,
"threshold": 0.52186279296875
}
},
"numentaTM": {
"reward_low_FN_rate": {
"score": -4.682023383551243,
"threshold": 0.48872070312500016
},
"reward_low_FP_rate": {
"score": 4.501624059009296,
"threshold": 0.54091796875
},
"standard": {
"score": 23.317976616448757,
"threshold": 0.48872070312500016
}
},
"random": {
"reward_low_FN_rate": {
"score": -45.889252239141754,
"threshold": 0.9984497070312507
},
"reward_low_FP_rate": {
"score": -40.59422055318891,
"threshold": 0.9995117187500009
},
"standard": {
"score": -25.889252239141747,
"threshold": 0.9984497070312507
}
},
"relativeEntropy": {
"reward_low_FN_rate": {
"score": -27.2288224715,
"threshold": 0.5
},
"reward_low_FP_rate": {
"score": -5.57182391716,
"threshold": 0.5
},
"standard": {
"score": 10.7711775285,
"threshold": 0.5
}
},
"skyline": {
"reward_low_FN_rate": {
"score": -77.20605964639174,
"threshold": 0.8031250000000002
},
"reward_low_FP_rate": {
"score": -53.17159266153103,
"threshold": 0.8996093750000005
},
"standard": {
"score": -33.206059646391715,
"threshold": 0.8031250000000002
}
},
"twitterADVec": {
"reward_low_FN_rate": {
"score": -30.63914483,
"threshold": 0.5
},
"reward_low_FP_rate": {
"score": -12.15798656,
"threshold": 0.5
},
"standard": {
"score": -8.639144834,
"threshold": 0.5
}
},
"windowedGaussian": {
"reward_low_FN_rate": {
"score": -98.7538610655,
"threshold": 1.0
},
"reward_low_FP_rate": {
"score": -87.9970284629,
"threshold": 1.0
},
"standard": {
"score": -44.7538610655,
"threshold": 1.0
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment