Skip to content

Instantly share code, notes, and snippets.

@agfor
Created April 23, 2012 23:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save agfor/2474497 to your computer and use it in GitHub Desktop.
Save agfor/2474497 to your computer and use it in GitHub Desktop.
A Python module to help a non-programmer engineer fried do some statistical analysis on raw images
#!/usr/bin/env python3
#from tkFileDialog import askopenfilenames
from tkinter.filedialog import askopenfilename
import os, os.path
from itertools import chain, repeat
from operator import itemgetter, sub, pow, truediv
from array import array
from math import sqrt
from sys import argv
class Images:
def __init__(self, field_width, little_endian, rows, columns, trim, *file_names):
self.field_width, self.little_endian = field_width, little_endian
self.rows, self.columns, self.pixels = rows, columns, rows * columns
self.count = len(file_names)
if trim < self.count / 2: self.trim = trim
else: self.trim = (self.count - 1) // 2
self.images = [Image(file_name, field_width, little_endian, rows, columns,
trim) for file_name in file_names]
def calc_stats(self):
pixels_over_time = zip(*[image.values for image in self.images])
time_trim = itemgetter(slice(self.trim, self.count - self.trim))
trim_len = self.count - self.trim * 2
self.mean_stats, self.std_dev_stats = zip(*mean_std_dev((
mean_std_dev(tuple(map(time_trim,
map(sorted, pixels_over_time))), trim_len)), self.pixels))
print(self.group_name, "Time Stats Calculated")
def write_stats(self):
with open("output/" + self.group_name + ".time.stats", "w") as f:
f.writeline("Trim: " + str(self.trim))
f.writeline("Image Resolution: " + str(self.rows) + "x" + str(self.cols))
f.writeline("Number of Images: " + str(self.count))
f.writeline("Mean and Std Dev of Means: " + str(self.mean_stats))
f.writeline("Mean and Std Dev of Std Devs: " + str(self.std_dev_stats))
print(self.group_name, "Time Stats Written")
class Image:
def __init__(self, file_name, field_width, little_endian, rows, columns,
trim, filter_val = 0, values = False, filter = False):
self.field_width, self.little_endian = field_width, little_endian
self.rows, self.columns, self.values = rows, columns, False
self.file_name, self.filter_val = file_name, filter_val
if trim < min(rows / 2, columns / 2): self.trim = trim
else: self.trim = int(min((rows - 1) / 2, (columns - 1) / 2))
self.file_format = ('L' if self.field_width == 4 else
('H' if self.field_width == 2 else 'B'))
self.values = array(self.file_format)
if values:
if filter:
self.values = filter(values, self)
self.file_name += '.' + str(filter).split(' ')[1]
print(self.file_name, "Filtered and Processed")
else: self.values.extend(values); print(self.file_name, "Processed")
else:
self.decode(); self.file_name = os.path.split(self.file_name)[1]
print(self.file_name, "Read and Processed")
def filter(self, filter):
return Image(self.file_name, self.field_width, self.little_endian,
self.rows, self.columns, self.trim, self.filter_val, self.values, filter)
def write(self):
try: os.mkdir("output")
except: pass
if self.little_endian != array("i", [1]).tostring()[0]:
self.values.byteswap()
with open("output/" + self.file_name, "bw") as f: self.values.tofile(f)
print(self.file_name, "Written")
def decode(self):
try:
with open(self.file_name, "br") as f:
self.values.fromfile(f, self.columns * self.rows)
except IOError as file_error: print(file_error); raise
if self.little_endian != array("i", [1]).tostring()[0]:
self.values.byteswap()
def calc_stats(self):
rows, columns, values = self.rows, self.columns, self.values
row_trim = itemgetter(slice(self.trim, columns - self.trim))
col_trim = itemgetter(slice(self.trim, rows - self.trim))
trim_row, trim_col = rows - self.trim * 2, columns - self.trim * 2
self.row_means, self.row_std_devs = mean_std_dev(tuple(map(row_trim,
map(sorted, (values[j : j + columns]
for j in range(0, rows * columns, columns))))), trim_col)
self.col_means, self.col_std_devs = mean_std_dev(tuple(map(col_trim,
map(sorted, (values[i :: columns]
for i in range(0, columns))))), trim_row)
self.stat_means, self.stat_std_devs = map(chain.from_iterable, zip(
mean_std_dev((self.row_means, self.row_std_devs), rows),
mean_std_dev((self.col_means, self.col_std_devs), columns)))
print(self.file_name, "Stats Calculated")
def write_stats(self):
try: os.mkdir("output")
except: pass
titles = (("",), ("\nRow Means and Standard Deviations\n",),
("\nColumn Means and Standard Deviations\n",))
headers, means, std_devs = map(map, repeat(map), repeat(repeat(str)),
((("Mean and Std Dev of Rows' Means",
"Mean and Std Dev of Rows' Std Devs",
"Mean and Std Dev of Cols' Means",
"Mean and Std Dev of Cols' Std Devs"),
range(self.rows), range(self.columns)),
(self.stat_means, self.row_means, self.col_means),
(self.stat_std_devs, self.row_std_devs, self.col_std_devs)))
lines = map(chain.from_iterable, map(zip, headers, repeat(repeat(', ')),
means, repeat(repeat(', ')), std_devs, repeat(repeat('\n'))))
with open("output/" + self.file_name + ".stats", "w") as f:
f.writelines(chain.from_iterable(chain.from_iterable(
zip(titles, lines))))
print(self.file_name, "Stats Written")
def column_median_filter(values, image):
columns, middle = image.columns, (image.filter_val - 1) // 2
median, width = itemgetter(middle), image.filter_val * columns
size = (image.rows - image.filter_val + 1) * columns
closing_rows = -1 * (image.filter_val // 2) * columns
return array(image.file_format, chain(values[0:middle * columns],
map(median, map(sorted, (values[j : j + width : columns]
for j in range(size)))), values[closing_rows::]))
def mean_std_dev(stats, length):
means = tuple(map(truediv, map(sum, stats), repeat(length)))
# print(stats, length, '\n\n', means, '\n\n')
return means, tuple(map(sqrt, map(truediv, map(sum,
map(map, repeat(pow), map(map, repeat(sub), stats,
map(repeat, means)), repeat(repeat(2)))), repeat(length))))
def process(image):
image.calc_stats(); image.write_stats()
#cmf_image = image.filter(column_median_filter)
#cmf_image.write(); cmf_image.calc_stats(); cmf_image.write_stats()
def run_test():
try:
print("Running Tests")
test_image = Image("test.raw", 2, True, 6, 3, 2, 3,
(0, 255, 511, 255, 256, 257, 256, 0, 256*256-1,
1, 0, 127, 128, 512, 2048, 4096, 64, 256*128))
test_image.write()
test_image = Image("output/test.raw", 2, True, 6, 3, 1, 3)
process(test_image)
except: print("Error Running Tests -- Consult a Programmer"); raise
else: print("Tests Successful\n",
"\nPlease verify all settings match the file to be processed."
"\nEnter only a file name on the command line for default settings:\n" +
"(fields 2, little-endian True, rows 512, cols 640, trim 10, median width 3)",
"\nOR enter a file name followed by the above values for custom settings.",
"\nPad your fields to 1, 2, or 4 bytes; only those widths are supported.",
"\nYour image must have at least 3 rows and be exactly the specified size.\n" +
"Your image must have more than twice as many rows and columns as your trim.\n"
"Run the program with no arguments to run tests and display usage information."
"\nExample:", argv[0], "example_image.raw 2 True 512 640 10 3")
if __name__ == '__main__':
file_names = askopenfilenames()
print(file_names)
if file_names:
if len(file_names) == 1:
process(Image(file_names[0], 2, True, 512, 640, 10, 3))
else:
images = Images(2, True, 512, 640, 10, 3, file_names)
images.calc_stats()
images.write_stats()
else:
run_test()
exit()
try:
if len(argv) == 2: process(Image(argv[1], 2, True, 512, 640, 10, 3))
else: process(Image(argv[1], int(argv[2]), (True if argv[3] in
("True", "true", "Yes", "yes", "1") else False), int(argv[4]),
int(argv[5]), int(argv[6]), int(argv[7])))
except (IndexError, ValueError): run_test()
except: run_test(); raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment