Created
February 25, 2014 06:33
-
-
Save amolpujari/9203860 to your computer and use it in GitHub Desktop.
egauge data simulator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
push_url = None | |
auth_string = None | |
register_names = [] | |
reporting_interval = 60 | |
min_reporting_interval = 5 | |
max_reporting_interval = 600 | |
sampling_interval = 1 | |
min_sampling_interval = 1 | |
max_sampling_interval = 10 | |
max_posting = 100 | |
register_patterns = [[1000, 1000000], [0, 0], [-1000000, -10000]] | |
# 1st register will generate the readings between 1000, 1000000, i.e. register_patterns[0][0] and register_patterns[0][1] | |
# 2nd register will generate the readings between register_patterns[1][0] and register_patterns[1][1] | |
# 3rd register will generate the readings between register_patterns[2][0] and register_patterns[2][1] | |
# 4rd register will generate the readings between register_patterns[0][0] and register_patterns[0][1] | |
# and it repeats this way in round | |
# you may edit these patterns | |
# where register_patterns[x][0] <= register_patterns[x][1] | |
def time_delta(): | |
return str(sampling_interval) | |
def number_of_readings_per_post(): | |
return int(reporting_interval)-1 | |
import sys | |
import time | |
from time import sleep | |
from cStringIO import StringIO | |
import httplib2 | |
import random | |
import re | |
def main(): | |
collect_args() | |
for count in range(max_posting): | |
prepare_data() | |
post_it() | |
clear_data() | |
sleep(reporting_interval-1) | |
data = StringIO() | |
def prepare_data(): | |
add_opening_group_element() | |
add_opening_data_element() | |
add_cname_elements() | |
add_readings_elements() | |
add_closing_data_element() | |
add_closing_group_element() | |
def clear_data(): | |
data.truncate(0) | |
def add_cname_elements(): | |
global data | |
for register_name in register_names: | |
data.write('<cname t="P">') | |
data.write(str(register_name)) | |
data.write('</cname>') | |
def add_readings_elements(): | |
global data | |
for count in range(number_of_readings_per_post()): | |
data.write('<r>') | |
add_readings_element() | |
data.write('</r>') | |
def add_readings_element(): | |
global data | |
for idx in range(len(register_names)): | |
data.write('<c>') | |
readings_range = register_patterns[(idx) % len(register_patterns)] | |
if (readings_range[0] == readings_range[1]): | |
reading = readings_range[0] | |
else: | |
reading = random.randrange(readings_range[0], readings_range[1]) | |
data.write(str(reading)) | |
data.write('</c>') | |
def current_time_in_hex(): | |
return hex(int(time.time())) | |
def add_opening_data_element(): | |
global data | |
columns = len(register_names) | |
time_stamp = current_time_in_hex() | |
delta = 'true' | |
epoch = current_time_in_hex() | |
data_element = ''.join(['<data columns="', str(columns), '" time_stamp="', str(time_stamp), '" time_delta="', str(time_delta()), '" delta="', str(delta), '" epoch="', str(epoch), '">']) | |
data.write(data_element) | |
def add_opening_group_element(): | |
global data | |
serial = current_time_in_hex() | |
group_element = ''.join(['<group serial="', str(serial), '">']) | |
data.write(group_element) | |
def add_closing_data_element(): | |
global data | |
data.write('</data>') | |
def add_closing_group_element(): | |
global data | |
data.write('</group>') | |
def post_it(): | |
try: | |
response = httplib2.Http().request(("http://" + str(push_url) + "/" + str(auth_string)), "POST", body=(data.getvalue()), headers={}) | |
print ''.join([str(response[1]), ' @ ', time.ctime()]) | |
except Exception as e: | |
msg = str(e.__class__.__name__) + ':' + str(e) | |
print msg | |
def help(): | |
print 'example => simulator.py egauge.plotwatt.com YmFmMmQ1Njc4MjRhOThiZTgxOWNkODEyYTg1NTViYTE= CT1,CT2,CT3 60 1' | |
def validate_register_names(): | |
if (len(register_names) < 1): | |
raise Exception('0 or invalid registered_names: ' + str(register_names)) | |
for register_name in register_names: | |
if not (re.match('^\w+$', register_name)): | |
raise Exception('invalid register_name, should be ^\w+$') | |
def validate_args(): | |
validate_register_names() | |
if (reporting_interval < min_reporting_interval): | |
raise Exception('reporting_interval must be >= ' + str(min_reporting_interval)) | |
if (reporting_interval > max_reporting_interval): | |
raise Exception('reporting_interval must be <= ' + str(max_reporting_interval)) | |
if (sampling_interval < min_sampling_interval): | |
raise Exception('sampling_interval must be >= ' + str(min_sampling_interval)) | |
if (sampling_interval > max_sampling_interval): | |
raise Exception('sampling_interval must be <= ' + str(max_sampling_interval)) | |
global register_names | |
register_names = uniq(register_names) | |
def uniq(arr): | |
return list(set(arr)) | |
def collect_args(): | |
args = sys.argv | |
if (len(args) != 6): | |
help() | |
raise Exception('invalid arguments, 6 required.') | |
global push_url, auth_string, register_names, reporting_interval, sampling_interval | |
push_url = str(args[1]) | |
auth_string = str(args[2]) | |
register_names = str(args[3]).split(',') | |
reporting_interval = int(args[4]) | |
sampling_interval = int(args[5]) | |
validate_args() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment