Skip to content

Instantly share code, notes, and snippets.

@meesern
Created December 5, 2012 10:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save meesern/4214591 to your computer and use it in GitHub Desktop.
Save meesern/4214591 to your computer and use it in GitHub Desktop.
Timestreams Feeders
#!/usr/bin/env ruby
require 'net/http'
require 'uri'
require 'json'
require 'pathname'
require 'debugger'
require_relative 'timestream-feeder'
#Set up the timestream feeder
@tsf = TimestreamFeeder.new(:host => 'energyforchange.ac.uk',
:container_name => "test")
puts @tsf
# @tsf.feed(:ts => sample_time, :value => sample) unless sample < 0
#!/usr/bin/env ruby
require 'net/http'
require 'uri'
require 'json'
require 'pathname'
require_relative 'timestream-feeder'
# Some structures...
Exmples = <<here
Browse around http://www.cl.cam.ac.uk/meters/elec
risers...
wget -r -np -nH --cut-dirs=2 http://www.cl.cam.ac.uk/meters/elec/primary-cs1-riser/
wget -r -np -nH --cut-dirs=2 http://www.cl.cam.ac.uk/meters/elec/primary-cs2-riser/
wget -r -np -nH --cut-dirs=2 http://www.cl.cam.ac.uk/meters/elec/primary-cs3-riser/
wget -r -np -nH --cut-dirs=2 http://www.cl.cam.ac.uk/meters/elec/primary-cs4-riser/
cs1 North
cs2 Central
cs3 South
cs4 West
each riser.. Three lighting/sockets pairs and air conditioning
F-lighting First floor
F-sockets
G-lighting Ground floor
G-sockets
S-lighting Second floor
S-sockets
SW00-AC Air Conditioning?
Each Circuit...
S-m23-2011-02-01.json #S-id-date.json one day 2 minute aggregate
Each JSON...
{"label": "22 2011-06-30","path":"meters.cl.cam.ac.uk/elec/primary-cs1-riser/G- lighting","description":"Lighting","room":"GN17","coverage":"Ground North", "type":"Auto Meter IC970","ts": "2011-06-30","data":[[1309392000000,-0.014], [1309392120000,0.032],[1309392240000,0.030],[1309392360000,0.030],.....
i.e. data is list of twople lists. Time in milliseconds since epoch
Time.at(1309392000.000) = 2011-06-30 1am
Time.at(1309392120.000) = 2011-06-30 1.02am
Power in KW
here
#
#The regexp that covers the days that we are interested in..
#All of June...
Days = /2011-06-/
Feeds = [
{:table=> 'wp_1_ts_current_53', :dir=> 'primary-cs1-riser/G-lighting'},
{:table=> 'wp_1_ts_current_54', :dir=> 'primary-cs1-riser/G-sockets'},
{:table=> 'wp_1_ts_current_55', :dir=> 'primary-cs1-riser/F-lighting'},
{:table=> 'wp_1_ts_current_56', :dir=> 'primary-cs1-riser/F-sockets'},
{:table=> 'wp_1_ts_current_57', :dir=> 'primary-cs1-riser/S-lighting'},
{:table=> 'wp_1_ts_current_58', :dir=> 'primary-cs1-riser/S-lighting'},
{:table=> 'wp_1_ts_current_59', :dir=> 'primary-cs2-riser/G-sockets'},
{:table=> 'wp_1_ts_current_60', :dir=> 'primary-cs2-riser/G-lighting'},
{:table=> 'wp_1_ts_current_61', :dir=> 'primary-cs2-riser/F-sockets'},
{:table=> 'wp_1_ts_current_62', :dir=> 'primary-cs2-riser/F-lighting'},
{:table=> 'wp_1_ts_current_63', :dir=> 'primary-cs2-riser/S-sockets'},
{:table=> 'wp_1_ts_current_64', :dir=> 'primary-cs2-riser/S-lighting'},
{:table=> 'wp_1_ts_current_65', :dir=> 'primary-cs3-riser/G-sockets'},
{:table=> 'wp_1_ts_current_66', :dir=> 'primary-cs3-riser/G-lighting'},
{:table=> 'wp_1_ts_current_67', :dir=> 'primary-cs3-riser/F-sockets'},
{:table=> 'wp_1_ts_current_68', :dir=> 'primary-cs3-riser/F-lighting'},
{:table=> 'wp_1_ts_current_69', :dir=> 'primary-cs3-riser/S-sockets'},
{:table=> 'wp_1_ts_current_70', :dir=> 'primary-cs3-riser/S-lighting'},
{:table=> 'wp_1_ts_current_71', :dir=> 'primary-cs4-riser/G-sockets'},
{:table=> 'wp_1_ts_current_72', :dir=> 'primary-cs4-riser/G-lighting'},
{:table=> 'wp_1_ts_current_73', :dir=> 'primary-cs4-riser/F-sockets'},
{:table=> 'wp_1_ts_current_74', :dir=> 'primary-cs4-riser/F-lighting'},
{:table=> 'wp_1_ts_current_75', :dir=> 'primary-cs4-riser/S-sockets'},
{:table=> 'wp_1_ts_current_76', :dir=> 'primary-cs4-riser/S-lighting'}
]
#
# Send a number of days from each feed
#
Feeds.each do |feed|
puts "Sending feed: #{feed[:table]}"
#Set up the timestream feeder
@tsf = TimestreamFeeder.new(:host => 'dev.c-tech.wp.horizon.ac.uk',
:container => feed[:table])
#
# Get all days in June 2011
#
daypaths = Pathname.new(feed[:dir]).children.select{|p| p.to_path =~ Days }.sort
daypaths.each do |daypath|
@day = JSON.parse(daypath.read, :symbolize_names => true)
#
#{"label": "22 2011-06-30","path":"meters.cl.cam.ac.uk/elec/primary-cs1-riser/G- lighting","description":"Lighting","room":"GN17","coverage":"Ground North", "type":"Auto Meter IC970","ts": "2011-06-30","data":[[1309392000000,-0.014], [1309392120000,0.032],[1309392240000,0.030],[1309392360000,0.030],.....
#
#Push each reading to the timestreams API
max = 0
@day[:data].each do |reading|
sample_time = Time.at(reading[0]/1000).to_s
sample = reading[1] * 1000 #Can't get timestreams to work with float
#puts "At #{sample_time} measured #{sample}"
puts "From #{feed[:table]} At #{sample_time} max #{max = sample}" if (sample > max)
#Feed each value up to the timestreams server
@tsf.feed(:ts => sample_time, :value => sample) unless sample < 0
end
end
end
#
# A simple timestreams data uploader
#
require 'net/http'
require 'uri'
require 'json'
class TimestreamFeeder
attr_reader :container
def initialize(params)
@host = params[:host] || exit
@port = params[:port] || 80
@base = '/wp-content/plugins/timestreams/2/'
@container = params[:container] || find_or_create(params[:container_name]) || exit
end
def find_or_create(name)
find(name) || create(name)
end
def find(name)
result = api_call(:get, 'measurement_containers', nil )
container = result["measurementContainers"].find {|m| m["friendlyname"] == name}
container && container["name"]
end
def create(name)
resp = sendcreate(name)
resp["measurementcontainer"]
end
def sendcreate(name)
args = "name=#{name}&measuretype=current&min=0&max=12000&unit=text%2fx-data-Watt&symbol=W&device=#{name}&datatype=DECIMAL(10,3)&siteid=1&blogid=1&userid=1"
api_call(:post, 'measurement_container', args )
end
#accept measurments with timestamps and upload them to the timesstream
#measure is a hash with :ts and :value
def feed(measure)
args = "value=#{measure[:value]}&ts=#{measure[:ts]}"
api_call(:post, 'measurement/'+@container, args )
end
def api_call(verb, endpoint, data)
#URI.encode_www_form should work but freaks out SLIM
#resp = h.post(@base+'measurement/'+@container,
# URI.encode_www_form(data))
resp = html_request(verb, @base+endpoint, data )
case resp
when Net::HTTPSuccess
#puts resp.body
else
unexpected(resp)
end
JSON::parse(resp.body)
end
def html_request(verb, *args)
puts "#{verb} #{args.join('?')}"
html do |h|
(verb == :post) ? h.post(*args):h.get(args[0])
end
end
private
def unexpected(resp)
puts resp.body
raise "Got unexpected response #{resp.class}."
end
def html_proxy
return [nil,nil,nil,nil] unless ENV['http_proxy']
puts "Parsing proxy"
uri = URI.parse(ENV['http_proxy'])
proxy_user = proxy_pass = nil
proxy_user, proxy_pass = uri.userinfo.split(/:/) if uri.userinfo
proxy_host = uri.host
proxy_host = nil if (proxy_host.empty?)
#no proxy if the server name is a local name (not domain.tld)
proxy_host = nil if (@host.split('.').length < 2)
proxy_port = uri.port || 8080
puts "Delivering to server #{proxy_host}:#{proxy_port.to_s}->#{@host}:#{@port.to_s}"
[proxy_host, proxy_port, proxy_user, proxy_pass]
end
def html
phost,pt,pu,pp = html_proxy
prox = Net::HTTP::Proxy(phost,pt,pu,pp)
puts "Opened Proxy: #{prox.inspect}" unless phost.nil?
prox.start(@host, @port) do |h|
yield(h)
end
end
end
#!/usr/bin/env ruby
require 'mysql2'
require_relative 'timestream-feeder'
db = Mysql2::Client.new(:username => 'bzb',
:database=>'EnergyDataStoreV2',
:host=>'localhost',
:password=>'********' )
# Some tables...
Exmples = <<here
tblHubDir
+-------+-------------------+---------------------+---------------------+-------+----------------+
| hubId | description | commissioned | decommissioned | pilot | pilot_category |
+-------+-------------------+---------------------+---------------------+-------+----------------+
| 4 | Ben B CC Home Hub | 2011-03-18 08:00:00 | 2011-04-06 08:00:00 | 0 | 0 |
+-------+-------------------+---------------------+---------------------+-------+----------------+
tblSensorCategory
+----------+------------------------------------------+
| catagory | description |
+----------+------------------------------------------+
| 1 | Whole House CT Clamp for Electrical Load |
| 2 | Circuit Level Monitoring (MCC) |
| 3 | Appliance Level Monitoring |
+----------+------------------------------------------+
tblSensor
+-------+----------+--------------+---------------------+---------------------+----------+----------+
| hubId | sensorId | description | commissioned | decommissioned | category | parentId |
+-------+----------+--------------+---------------------+---------------------+----------+----------+
| 4 | 0 | House Sensor | 2011-03-18 08:00:00 | 2011-04-06 08:00:00 | 1 | NULL |
| 4 | 1 | Temp Sensor | 2011-03-18 08:00:00 | 2011-04-06 08:00:00 | 1 | NULL |
| 5 | 0 | House Sensor | 2011-03-18 08:00:00 | 2011-07-11 00:00:00 | 1 | NULL |
+-------+----------+--------------+---------------------+---------------------+----------+----------+
tblLoadAgg - 5 minute or longer averages
+-------+----------+---------------------+---------+
| hubId | sensorId | timeStamp | loadAvg |
+-------+----------+---------------------+---------+
| 4 | 0 | 2011-03-21 15:10:00 | 366.54 |
| 4 | 0 | 2011-03-21 15:20:00 | 396.59 |
| 4 | 0 | 2011-03-21 15:30:00 | 239.83 |
+-------+----------+---------------------+---------+
here
#Set up the timestream feeder
@tsf = TimestreamFeeder.new(:host => 'dev.c-tech.wp.horizon.ac.uk',
:container => 'wp_2_2_ts_Current_2')
@geospatial = nil
@main_incomer = nil
#Get the row with the Jubilee Campus Geospatial Building id
db.query("select * from tblHubDir where description = 'JC Geospatial Building' limit 1").each do |row|
@geospatial = row['hubId']
end
#Get the row with the main incomer sensor id
db.query("select * from tblSensorDir where description = 'Main Incomer'
and hubId = #{@geospatial} limit 1").each do |row|
@main_incomer = row['sensorId']
end
#Get each of the rows and feed the time and data up to the
#timestreams API
db.query("select * from tblLoadAgg where hubId = #{@geospatial}
and sensorId = #{@main_incomer}").each do |row|
sample_time = row['timeStamp']
sample = row['loadAvg'].to_i
#puts "At #{sample_time} measured #{sample}"
#Feed each value up to the timestreams server
@tsf.feed(:ts => sample_time, :value => sample) unless sample.zero?
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment