Created
December 17, 2018 06:16
-
-
Save y3nr1ng/77cdba9fed2e559bfc7fc927fc680aee to your computer and use it in GitHub Desktop.
Concat spectrum from different sensors.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Concat spectrum from different sensors. | |
""" | |
from io import StringIO | |
import logging | |
import click | |
import coloredlogs | |
import numpy as np | |
import pandas as pd | |
coloredlogs.install( | |
level='DEBUG', | |
fmt='%(asctime)s %(module)s[%(process)d] %(levelname)s %(message)s', | |
datefmt='%H:%M:%S' | |
) | |
logger = logging.getLogger(__name__) | |
def load_file(path, offset=22): | |
with open(path, 'r') as fd: | |
lines = fd.readlines() | |
if offset: | |
lines = lines[offset:] | |
buffer = StringIO(''.join(lines)) | |
df = pd.read_csv( | |
buffer, | |
sep='\t', # parse by tabs | |
names=['wavelength', 'intensity'], index_col=False | |
) | |
df.set_index('wavelength', inplace=True) | |
return df | |
def concat_data(x, y, mode='lstsq'): | |
""" | |
Concat two spectrum X and Y using specified region as reference. | |
""" | |
overlap = x.index & y.index | |
xo, yo = x.loc[overlap], y.loc[overlap] | |
if mode == 'lstsq': | |
A = np.vstack([xo['intensity'].values, np.ones(len(xo))]).T | |
m, c = np.linalg.lstsq(A, yo['intensity'].values, rcond=None)[0] | |
y = (y-c)/m | |
elif mode == 'last': | |
y /= yo.iloc[-1]/xo.iloc[-1] | |
else: | |
raise ValueError("unknown concatenation method") | |
return x.combine_first(y) | |
def normalize_peak(df, lpf=500.): | |
logger.info("using data after {:.2f} nm to normalize".format(lpf)) | |
df_max = df[df.index > lpf].max() | |
df /= df_max | |
return df | |
@click.command() | |
@click.argument('short', type=click.Path(exists=True)) | |
@click.argument('long', type=click.Path(exists=True)) | |
@click.argument('output') | |
@click.option('--mode', type=click.Choice(['lstsq', 'last']), | |
help='Method to determine concatenation ratio.') | |
@click.option('--no-norm', 'norm', is_flag=True, default=False, | |
help='Do not normalize the result to [0, 1].') | |
@click.option('--lpf', type=np.float32, default=500., | |
help='Long-pass frequency used in normalization, default 500.0 nm.') | |
def main(short, long, output, mode, norm, lpf): | |
""" | |
This script concat spectrum from SHORT and LONG file and save the resolved | |
result in OUTPUT. Spectrum from SHORT is favored over LONG. | |
""" | |
sh_data = load_file(short) | |
ln_data = load_file(long) | |
if mode is None: | |
mode = 'lstsq' | |
df = concat_data(sh_data, ln_data, mode) | |
if not norm: | |
df = normalize_peak(df, lpf) | |
df.to_csv(output, index_label='wavelength', header=True, float_format='%.6g') | |
logger.info("result saved to \"{}\"".format(output)) | |
if __name__ == '__main__': | |
try: | |
main() | |
except Exception as e: | |
logger.exception(str(e)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
coloredlogs | |
click | |
numpy | |
pandas |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment