Skip to content

Instantly share code, notes, and snippets.

@AlmightyOatmeal
Forked from mraspaud/read_meteor2.py
Created October 7, 2017 15:55
Show Gist options
  • Save AlmightyOatmeal/56aa481b2171e88ccfd928ab63610a11 to your computer and use it in GitHub Desktop.
Save AlmightyOatmeal/56aa481b2171e88ccfd928ab63610a11 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2015 Martin Raspaud
# Author(s):
# Martin Raspaud <martin.raspaud@smhi.se>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Read meteor m n2 data.
ch 1: https://lut.im/R092lPL4/eyB8oEG3
ch 2: https://lut.im/8WDeHVVl/YeibSA0C
ch 3: https://lut.im/w8iqnCVO/MRNGn3MW
ch 4: https://lut.im/LiLfVdRH/JzZkmSy0
ch 5: https://lut.im/mThQIxno/ydIW3roK
ch 6: https://lut.im/uUeQWGYe/KDeFwJLP
"""
import numpy as np
def dec10to16(data):
arr10 = data.astype(np.uint16).flat
new_shape = list(data.shape[:-1]) + [(data.shape[-1] * 8) / 10]
arr16 = np.zeros(new_shape, dtype=np.uint16)
arr16.flat[::4] = (np.left_shift(arr10[::5], 2) +
np.right_shift((arr10[1::5]), 6))
arr16.flat[1::4] = (np.left_shift((arr10[1::5] & 63), 4) +
np.right_shift((arr10[2::5]), 4))
arr16.flat[2::4] = (np.left_shift(arr10[2::5] & 15, 6) +
np.right_shift((arr10[3::5]), 2))
arr16.flat[3::4] = (np.left_shift(arr10[3::5] & 3, 8) +
arr10[4::5])
return arr16
def show_arr(data, filename=None):
"""Show the stetched data.
"""
from PIL import Image as pil
norm_arr = np.array((data - data.min()) * 255.0 /
(data.max() - data.min())).astype(np.uint8)
img = pil.fromarray(norm_arr)
if filename:
img.save(filename)
else:
img.show()
dtype_frame = np.dtype([("sync", "u1", (4, )),
("telemetry1", "V2"),
("bis-m1", "V4"),
("sspd1", "V4"),
("mtvza1", "V8"),
("msu-mr1", "V238"),
("telemetry2", "V2"),
("bis-m2", "V4"),
("sspd2", "V4"),
("mtvza2", "V8"),
("msu-mr2", "V238"),
("telemetry3", "V2"),
("bis-m3", "V4"),
("sspd3", "V4"),
("mtvza3", "V8"),
("msu-mr3", "V238"),
("telemetry4", "V2"),
("bis-m4", "V4"),
("sspd4", "V4"),
("mtvza4", "V8"),
("msu-mr4", "V234")])
dtype_msu = np.dtype([("sync", "u1", (8, )),
("hour", "u1"),
("minute", "u1"),
("second", "u1"),
("delay", "u1"),
("serial", "u1"),
("type", "u1"),
("telemetry", "u1", (16, )),
("reserve", "u1", (5, )),
("calibration", "u1", (15, )),
("video", "u1", (11790, )),
("spare", "u1", (10, ))])
def frame_sync(packets, asm):
"""Sync the data to the *asm* marker.
"""
buff = ""
for packet in packets:
offset = packet.find(asm)
if offset != -1:
buff += packet[:offset]
yield buff
buff = packet[offset:]
else:
buff += packet
def decode_data(packets):
"""Decode the data as numpy array for access to the different fields.
"""
for packet in packets:
try:
yield np.fromstring(packet, dtype=dtype_msu)[0]
except ValueError:
print "skip"
def get_msumr_stream(packets):
"""Select the msu-mr bits from the packets and stream.
"""
for packet in packets:
try:
arr = np.fromstring(packet, dtype=dtype_frame)[0]
except ValueError:
continue
yield (str(arr["msu-mr1"]) + str(arr["msu-mr2"]) +
str(arr["msu-mr3"]) + str(arr["msu-mr4"]))
def read_file(filename, plen=1024):
"""Read the *filename* in chunks of *plen* bytes.
"""
with open(filename) as fd:
buff = fd.read(plen)
while len(buff) == plen:
yield buff
buff = fd.read(plen)
if __name__ == '__main__':
asm1 = bytearray((26, 207, 252, 29)) # carrier frames
asm2 = bytearray([2, 24, 167, 163, 146, 221, 154, 191]) # msu-mr frames
# msus = [msu for msu in decode_data(frame_sync(get_msumr_stream(frame_sync(read_file("MEM_01313_141009052955n00.raw"),
# asm1)),
# asm2))]
msus = [msu for msu in decode_data(frame_sync(get_msumr_stream(frame_sync(read_file("MEM_01342_141011062913n00.raw"),
asm1)),
asm2))]
arr = dec10to16(np.hstack(msus)["video"])
# 12 items per line, W then B for channels 1-3, then C then H for channels
# 4-6
#arr2 = dec10to16(np.hstack(msus)["calibration"])
# pause
# the data is encoded as 4 words for ch1, then 4 words for ch2, etc...
# so we have to reorder it into (line, column, channel)
channels = np.swapaxes(arr.reshape((-1, 393, 6, 4)),
2, 3).reshape(-1, 1572, 6)
from trollimage.image import Image
img = Image(
(channels[:, :, 2], channels[:, :, 1], channels[:, :, 0]), "RGB")
img.stretch("linear")
img.show()
# for i in range(6):
# img = Image((channels[:, :, i]), "L")
# img.stretch("linear")
# img.save("mem_channel%d.png" % i)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment