Skip to content

Instantly share code, notes, and snippets.

@y3nr1ng
Created December 17, 2018 06:16
Show Gist options
  • Save y3nr1ng/77cdba9fed2e559bfc7fc927fc680aee to your computer and use it in GitHub Desktop.
Save y3nr1ng/77cdba9fed2e559bfc7fc927fc680aee to your computer and use it in GitHub Desktop.
Concat spectrum from different sensors.
"""
Concat spectrum from different sensors.
"""
from io import StringIO
import logging
import click
import coloredlogs
import numpy as np
import pandas as pd
coloredlogs.install(
level='DEBUG',
fmt='%(asctime)s %(module)s[%(process)d] %(levelname)s %(message)s',
datefmt='%H:%M:%S'
)
logger = logging.getLogger(__name__)
def load_file(path, offset=22):
with open(path, 'r') as fd:
lines = fd.readlines()
if offset:
lines = lines[offset:]
buffer = StringIO(''.join(lines))
df = pd.read_csv(
buffer,
sep='\t', # parse by tabs
names=['wavelength', 'intensity'], index_col=False
)
df.set_index('wavelength', inplace=True)
return df
def concat_data(x, y, mode='lstsq'):
"""
Concat two spectrum X and Y using specified region as reference.
"""
overlap = x.index & y.index
xo, yo = x.loc[overlap], y.loc[overlap]
if mode == 'lstsq':
A = np.vstack([xo['intensity'].values, np.ones(len(xo))]).T
m, c = np.linalg.lstsq(A, yo['intensity'].values, rcond=None)[0]
y = (y-c)/m
elif mode == 'last':
y /= yo.iloc[-1]/xo.iloc[-1]
else:
raise ValueError("unknown concatenation method")
return x.combine_first(y)
def normalize_peak(df, lpf=500.):
logger.info("using data after {:.2f} nm to normalize".format(lpf))
df_max = df[df.index > lpf].max()
df /= df_max
return df
@click.command()
@click.argument('short', type=click.Path(exists=True))
@click.argument('long', type=click.Path(exists=True))
@click.argument('output')
@click.option('--mode', type=click.Choice(['lstsq', 'last']),
help='Method to determine concatenation ratio.')
@click.option('--no-norm', 'norm', is_flag=True, default=False,
help='Do not normalize the result to [0, 1].')
@click.option('--lpf', type=np.float32, default=500.,
help='Long-pass frequency used in normalization, default 500.0 nm.')
def main(short, long, output, mode, norm, lpf):
"""
This script concat spectrum from SHORT and LONG file and save the resolved
result in OUTPUT. Spectrum from SHORT is favored over LONG.
"""
sh_data = load_file(short)
ln_data = load_file(long)
if mode is None:
mode = 'lstsq'
df = concat_data(sh_data, ln_data, mode)
if not norm:
df = normalize_peak(df, lpf)
df.to_csv(output, index_label='wavelength', header=True, float_format='%.6g')
logger.info("result saved to \"{}\"".format(output))
if __name__ == '__main__':
try:
main()
except Exception as e:
logger.exception(str(e))
coloredlogs
click
numpy
pandas
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment