Skip to content

Instantly share code, notes, and snippets.

@rhyolight
Last active July 28, 2016 17:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rhyolight/414210e1ef9b6c80236a9b4f845843b3 to your computer and use it in GitHub Desktop.
Save rhyolight/414210e1ef9b6c80236a9b4f845843b3 to your computer and use it in GitHub Desktop.
'''
Created on Feb 8, 2015
@author: David Ray
'''
from datetime import datetime
import csv
import numpy as np
from nupic.encoders import MultiEncoder, ScalarEncoder, DateEncoder
from nupic.algorithms.CLAClassifier import CLAClassifier as CLAClassifier
from nupic.research.spatial_pooler import SpatialPooler as SpatialPooler
from nupic.research.temporal_memory import TemporalMemory as TemporalMemory
from nupic.algorithms import anomaly
# You gotta change this path, yo.
INPUT_PATH = "/Users/mtaylor/nta/nab/data/realTraffic/speed_6005.csv"
class Layer():
""" Makeshift Layer to contain and operate on algorithmic entities """
def __init__(self, encoder, sp, tm, classifier):
self.encoder = encoder
self.sp = sp
self.tm = tm
self.classifier = classifier
self.theNum = 0
self.weeksAnomaly = 0.0
self.settleWeek = 0
def step(self, timestamp, value):
""" Feed the incremented input into the Layer components """
encoder = self.encoder
sp = self.sp
tm = self.tm
# Input through multi-encoder.
print "Raw Input: {}, {}".format(timestamp, value)
encodingInput = {
"timestamp": timestamp,
"value": value,
}
encoding = encoder.encode(encodingInput)
print "ScalarEncoder Output = {}".format(encoding)
bucketIdx = encoder.getBucketIndices(encodingInput)[0]
output = np.zeros(shape=(sp.getNumColumns(),))
# Input through spatial pooler
sp.compute(encoding, True, output)
print "SpatialPooler Output = {}".format(output)
# Input through temporal memory
activesCols = sorted(set(np.where(output > 0)[0].flat))
tm.compute(activesCols, True)
activeCells = tm.getActiveCells()
predictiveCells = tm.getPredictiveCells()
print "TemporalMemory Input = {}".format(activesCols)
# Input into Anomaly Computer
predictiveCols = set()
for c in predictiveCells:
predictiveCols.add(tm.columnForCell(c))
predictiveCols = sorted(predictiveCols)
score = anomaly.computeRawAnomalyScore(activesCols, predictiveCols)
print "input: {}".format(activesCols)
print "predi: {}".format(predictiveCols)
print "Anomaly Score: {}\n".format(score)
# # Input into classifier
# retVal = classifier.compute(recordNum,
# patternNZ=activeCells,
# classification= {'bucketIdx': bucketIdx, 'actValue':value},
# learn=True,
# infer=True
# )
# print "TemporalMemory Prediction = " + str(getSDR(activeCells)) +\
# " | CLAClassifier 1 step prob = " + str(retVal[1])
# print ""
def getSDR(cells):
retVal = set()
for cell in cells:
retVal.add(cell / tm.cellsPerColumn)
return retVal
def runThroughLayer(layer, recordNum, sequenceNum):
layer.input(recordNum, recordNum, sequenceNum)
def createEncoder():
"""Create the encoder instance for our test and return it."""
valueEncoder = ScalarEncoder(21, 0.0, 100.0, n=50, name="value",
clipInput=True)
timeEncoder = DateEncoder(timeOfDay=(21, 9.5), name="timestamp")
encoder = MultiEncoder()
encoder.addEncoder("value", valueEncoder)
encoder.addEncoder("timestamp", timeEncoder)
return encoder
def createLayer():
encoder = createEncoder()
sp = SpatialPooler(
inputDimensions = (encoder.getWidth()),
columnDimensions = (2048),
potentialRadius = 12,
potentialPct = 0.85,
globalInhibition = True,
localAreaDensity = -1.0,
numActiveColumnsPerInhArea = 40,
stimulusThreshold = 1.0,
synPermInactiveDec = 0.0005,
synPermActiveInc = 0.0015,
synPermConnected = 0.1,
minPctOverlapDutyCycle = 0.1,
minPctActiveDutyCycle = 0.1,
dutyCyclePeriod = 1000,
maxBoost = 2.0,
seed = 42,
spVerbosity = 0
)
tm = TemporalMemory(
columnDimensions = (2048,),
cellsPerColumn = (32),
initialPermanence = 0.2,
connectedPermanence = 0.8,
minThreshold = 5,
maxNewSynapseCount = 6,
permanenceDecrement = 0.1,
permanenceIncrement = 0.1,
activationThreshold = 4
)
classifier = CLAClassifier(
steps = [1],
alpha = 0.1,
actValueAlpha = 0.3,
verbosity = 0
)
sp.printParameters()
print ""
return Layer(encoder, sp, tm, classifier)
def run():
layer = createLayer()
with open(INPUT_PATH, "r") as inputFile:
reader = csv.reader(inputFile)
reader.next() # skip header
for row in reader:
timestamp = datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S")
value = int(row[1])
layer.step(timestamp, value)
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment