Skip to content

Instantly share code, notes, and snippets.

@biochem-fan
Created June 20, 2020 09:17
Show Gist options
  • Save biochem-fan/f9eaa108c9660fc909a70fb78bba67bc to your computer and use it in GitHub Desktop.
Save biochem-fan/f9eaa108c9660fc909a70fb78bba67bc to your computer and use it in GitHub Desktop.
Group by Time
# Group particles by time by @biochem_fan
# GPLv3; contact me if you want other license
import sys
import numpy as np
from collections import OrderedDict
def load_star(filename):
from collections import OrderedDict
# This is not a very serious parser; should be token-based.
datasets = OrderedDict()
current_data = None
current_colnames = None
in_loop = 0 # 0: outside 1: reading colnames 2: reading data
for line in open(filename):
line = line.strip()
# remove comments
comment_pos = line.find('#')
if comment_pos > 0:
line = line[:comment_pos]
if line == "":
if in_loop == 2:
in_loop = 0
continue
if line.startswith("data_"):
in_loop = 0
data_name = line[5:]
current_data = OrderedDict()
datasets[data_name] = current_data
elif line.startswith("loop_"):
current_colnames = []
in_loop = 1
elif line.startswith("_"):
if in_loop == 2:
in_loop = 0
elems = line[1:].split()
if in_loop == 1:
current_colnames.append(elems[0])
current_data[elems[0]] = []
else:
current_data[elems[0]] = elems[1]
elif in_loop > 0:
in_loop = 2
elems = line.split()
#print elems, current_colnames
assert len(elems) == len(current_colnames)
for idx, e in enumerate(elems):
current_data[current_colnames[idx]].append(e)
return datasets
def write_star(filename, datasets):
f = open(filename, "w")
for data_name, data in datasets.items():
f.write( "\ndata_" + data_name + "\n\n")
col_names = data.keys()
need_loop = isinstance(data[col_names[0]], list)
if need_loop:
f.write("loop_\n")
for idx, col_name in enumerate(col_names):
f.write("_%s #%d\n" % (col_name, idx + 1))
nrow = len(data[col_names[0]])
for row in xrange(nrow):
f.write("\t".join([data[x][row] for x in col_names]))
f.write("\n")
else:
for col_name, value in data.items():
f.write("_%s\t%s\n" % (col_name, value))
f.write("\n")
f.close()
##################################################################
star = load_star("run_data.star")
npart = len(star['particles']['rlnCoordinateX'])
block = np.zeros(npart)
print(npart)
min_part_per_group = 75
groups = {}
num_per_group = [0]
cur_group = 0
cnt = 0
cur_mic = None
for i in xrange(npart):
this_mic = star['particles']['rlnMicrographName'][i]
if cur_mic != this_mic:
if cnt > min_part_per_group:
cur_group += 1
num_per_group.append(0)
cnt = 0
groups[star['particles']['rlnMicrographName'][i]] = cur_group
cur_mic = this_mic
num_per_group[len(num_per_group) - 1] += 1
cnt += 1
num_per_group = np.array(num_per_group)
print(num_per_group, len(num_per_group), np.sum(num_per_group))
ngroups = len(num_per_group)
# merge the last group (if too small?)
if False:
for mic in groups.keys():
if groups[mic] == cur_group:
groups[mic] = cur_group - 1
ngroups -= 1
# Assign optics groups
new_optics_groups = np.zeros(npart, dtype=np.int)
for i in xrange(npart):
this_mic = star['particles']['rlnMicrographName'][i]
new_optics_groups[i] = groups[this_mic]
star['particles']['rlnOpticsGroup'] = (new_optics_groups + 1).astype(np.string0)
# Duplicate optics groups
optics_table = star['optics']
for k in optics_table.keys():
optics_table[k] = [optics_table[k][0]]
for i in xrange(ngroups - 1):
new_item = optics_table[k][0]
if k == 'rlnOpticsGroupName':
new_item = 'OpticsGroup_%d' % (i + 2)
elif k == 'rlnOpticsGroup':
new_item = '%d' % (i + 2)
optics_table[k].append(new_item)
write_star("run_data_grouped.star", star)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment