Created
November 2, 2020 08:50
-
-
Save bDrwx/cabec8d3fd249750fcf80a100825007a to your computer and use it in GitHub Desktop.
logging error
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from typing import List, Tuple, DefaultDict | |
from collections import deque, defaultdict | |
import click | |
import csv | |
from cdr import Gcdr, Subscriber, Dvo, Interfacez, UserType, CallType, Reg | |
from sqlalchemy import create_engine | |
from configparser import ConfigParser, ExtendedInterpolation | |
from sqlalchemy import ( | |
Table, | |
Column, | |
Integer, | |
String, | |
DateTime, | |
MetaData, | |
PrimaryKeyConstraint, | |
) | |
from pprint import pprint | |
from pathlib import Path | |
from enum import Enum | |
from utility import get_logger, bcd_to_str, bcd_to_time, to_sec | |
UNDEFINED_LOCATION: int = 0 | |
BASE_DIR = Path(__file__).resolve().parent.parent | |
logger = get_logger(__name__) | |
@click.command() | |
@click.argument('files', nargs=-1, type=click.Path(exists=True)) | |
@click.option( | |
'--ptus', type=click.Choice(['SK', 'SV', 'VV', 'PO', 'PI'], case_sensitive=False) | |
) | |
def main(files, ptus): | |
config = ConfigParser(interpolation=ExtendedInterpolation()) | |
config.read(f'{BASE_DIR}/config.properties') | |
data_out = BASE_DIR.joinpath(config.get(ptus, 'result')) | |
data_out.mkdir(parents=True, exist_ok=True) | |
tetra_version: Integer = int(config.get(ptus, 'version')) | |
provider_id = int(config.get(ptus, 'ptus_id')) | |
for cdr_file in files: | |
logger.info(f'Processing {cdr_file}') | |
# Определяем имя файла журнала | |
# log_file = BASE_DIR.joinpath(config.get(ptus, 'log'), cdr_file) | |
# log_file.parent.mkdir(parents=True, exist_ok=True) | |
# append log files if DEBUG is set (from top of file) | |
path = Path(cdr_file) | |
try: | |
out_buffers: Tuple[List[Gcdr], DefaultDict[str, List[Reg]]] = cdr_parser(path, tetra_version, provider_id) | |
except ValueError as err: | |
print(err) | |
finally: | |
write_to_csv(out_buffers, f"{data_out}/{Path(path).name}") | |
def cdr_parser( | |
filename, version: Integer, provider_id: Integer | |
) -> Tuple[List[Gcdr], DefaultDict[str, List[Reg]]]: | |
logger.info(f"Пытаюсь разобрать {filename} при помощи {version} версии парсера") | |
if version == 5: | |
from kaitai.parser.tetra_v5 import Tetra | |
elif version == 7: | |
from kaitai.parser.tetra_v7 import Tetra | |
else: | |
raise Exception("Не удалось загрузить модуль парсера") | |
target = Tetra.from_file(filename) | |
call_stack = deque() | |
reg_buffer: DefaultDict[str, List[Reg]] = defaultdict(list) | |
cdr_buffer: List[Gcdr] = [] | |
class MockUi(Enum): | |
Inner = 0 | |
class MockInt: | |
def __init__(self): | |
self.ui = MockUi.Inner | |
self.pui_type = 0 | |
self.pui_index = 0 | |
void_int = Interfacez(MockInt()) | |
for blk in target.block: | |
logger.info('Starting new block in CDR file') | |
for event in blk.events.event: | |
if event.body.type == Tetra.Types.toc: | |
""" Обработка записи инициализации вызова TOC """ | |
if call_stack: | |
rec = call_stack.pop() | |
logger.error(f'Неожиданное вхождение TOC записи в {filename}. Call stack member is {rec.type} -> cr:{rec.call_reference}') | |
logger.debug(f'TOC: {event.body.seq_num} cr: {event.body.call_reference}') | |
if event.body.members == 65535: | |
# Обработка персонального вызова | |
if event.body.call_reference == 0: | |
# Звонок не состоялся. Строим GCDR и сохраняем в CSV | |
toc: Tetra.Toc = event.body | |
userA = Subscriber( | |
UserType.inner, | |
bcd_to_str(toc.served_number), | |
toc.location, | |
toc.location, | |
) | |
userB = Subscriber( | |
UserType.inner, | |
bcd_to_str(toc.called_number), | |
UNDEFINED_LOCATION, | |
UNDEFINED_LOCATION, | |
) | |
dvo = Dvo(False) | |
gdp = Gcdr( | |
toc.dxt_id.as_int, | |
provider_id, | |
bcd_to_time(toc.setup_time), | |
to_sec(toc.duration), | |
userA, | |
userB, | |
void_int, | |
void_int, | |
toc.termination, | |
dvo, | |
CallType.toc, | |
) | |
cdr_buffer.append(gdp) | |
else: | |
# Звонок состоялся. Инициализируем GCDR и ждем TCC или OutG | |
call_stack.append(event.body) | |
else: | |
# Обработка группового вызова. Строим GCDR и сохраняем его в CSV | |
toc = event.body | |
userA = Subscriber( | |
UserType.inner, | |
bcd_to_str(toc.served_number), | |
toc.location, | |
toc.location, | |
) | |
userB = Subscriber( | |
UserType.inner, | |
bcd_to_str(toc.called_number), | |
UNDEFINED_LOCATION, | |
UNDEFINED_LOCATION, | |
) | |
dvo = Dvo(False) | |
gdp = Gcdr( | |
toc.dxt_id.as_int, | |
provider_id, | |
bcd_to_time(toc.setup_time), | |
to_sec(toc.duration), | |
userA, | |
userB, | |
void_int, | |
void_int, | |
toc.termination, | |
dvo, | |
CallType.toc, | |
) | |
cdr_buffer.append(gdp) | |
if event.body.type == Tetra.Types.tcc: | |
""" Обработка запси терминации вызова TCC """ | |
if not call_stack: | |
logger.error(f'Не обработаны записи TOC или InG для звонка {event.body.type} -> cr: {event.body.call_reference}') | |
continue | |
logger.debug(f"TCC: {event.body.seq_num} cr: {event.body.call_reference}") | |
partial_cdr = call_stack.pop() | |
if partial_cdr.call_reference == event.body.call_reference: | |
"""Все совпало. Будем собирать Gcdr""" | |
tcc = event.body | |
dvo = Dvo(False) | |
if type(partial_cdr) is Tetra.Toc: | |
userA = Subscriber( | |
UserType.inner, | |
bcd_to_str(partial_cdr.served_number), | |
partial_cdr.location, | |
UNDEFINED_LOCATION, | |
) | |
userB = Subscriber( | |
UserType.inner, | |
bcd_to_str(tcc.served_number), | |
tcc.location, | |
UNDEFINED_LOCATION, | |
) | |
gdp = Gcdr( | |
partial_cdr.dxt_id.as_int, | |
provider_id, | |
bcd_to_time(partial_cdr.setup_time), | |
to_sec(partial_cdr.duration), | |
userA, | |
userB, | |
void_int, | |
void_int, | |
partial_cdr.termination, | |
dvo, | |
CallType.toctcc, | |
) | |
cdr_buffer.append(gdp) | |
elif type(partial_cdr) is Tetra.InG: | |
userA = Subscriber( | |
UserType.outer, | |
bcd_to_str(partial_cdr.calling_number), | |
UNDEFINED_LOCATION, | |
UNDEFINED_LOCATION, | |
) | |
userB = Subscriber( | |
UserType.inner, | |
bcd_to_str(tcc.served_nitsi), | |
tcc.location, | |
UNDEFINED_LOCATION, | |
) | |
gdp = Gcdr( | |
tcc.dxt_id.as_int, | |
provider_id, | |
bcd_to_time(tcc.setup_time), | |
to_sec(tcc.duration), | |
userA, | |
userB, | |
Interfacez(partial_cdr.in_int), | |
void_int, | |
tcc.termination, | |
dvo, | |
CallType.ingtcc, | |
) | |
cdr_buffer.append(gdp) | |
else: | |
raise ValueError(f"Неожиданный тип объекта {type(partial_cdr)}") | |
else: | |
raise ValueError( | |
f"Не соответствие call_reference обрабатываемых записей" | |
f"{partial_cdr.call_reference} != {event.body.call_reference}" | |
) | |
if event.body.type == Tetra.Types.out_g: | |
"""Обработка записи звонка исходящего на фиксированную сеть TOC -> OutG""" | |
if len(call_stack) == 0: | |
raise ValueError( | |
f"Не обработана запись TOC для звонка {event.body.call_reference}" | |
) | |
logger.debug(f"OutG: {event.body.seq_num} cr: {event.body.call_reference}") | |
toc: Tetra.Toc = call_stack.pop() | |
out_g: Tetra.OutG = event.body | |
userA = Subscriber( | |
UserType.inner, | |
bcd_to_str(toc.served_number), | |
toc.location, | |
UNDEFINED_LOCATION, | |
) | |
userB = Subscriber( | |
UserType.outer, | |
bcd_to_str(out_g.transmitted_number), | |
UNDEFINED_LOCATION, | |
UNDEFINED_LOCATION, | |
) | |
dvo = Dvo(False) | |
gdp = Gcdr( | |
toc.dxt_id.as_int, | |
provider_id, | |
bcd_to_time(toc.setup_time), | |
to_sec(toc.duration), | |
userA, | |
userB, | |
void_int, | |
Interfacez(out_g.out_int), | |
toc.termination, | |
dvo, | |
CallType.tocoutg, | |
) | |
cdr_buffer.append(gdp) | |
if event.body.type == Tetra.Types.in_g: | |
""" Обработка записи звонка пришедшего из внешней сети """ | |
if len(call_stack) != 0: | |
raise ValueError( | |
f"Неожиданное вхождение записи IN_G." | |
f"Обработка звонка {event.body.call_reference}" | |
f"завершена не корректно." | |
) | |
logger.debug(f"InG: {event.body.seq_num} cr: {event.body.call_reference}") | |
if event.body.call_reference == 0: | |
# Звонок не состоялся. Строим GCDR и сохраняем его в CSV | |
in_g: Tetra.InG = event.body | |
userA = Subscriber( | |
UserType.outer, | |
bcd_to_str(in_g.calling_number), | |
UNDEFINED_LOCATION, | |
UNDEFINED_LOCATION, | |
) | |
userB = Subscriber( | |
UserType.inner, | |
bcd_to_str(in_g.called_number), | |
UNDEFINED_LOCATION, | |
UNDEFINED_LOCATION, | |
) | |
dvo = Dvo(False) | |
gdp = Gcdr( | |
in_g.dxt_id.as_int, | |
provider_id, | |
bcd_to_time(in_g.setup_time), | |
to_sec(in_g.duration), | |
userA, | |
userB, | |
Interfacez(in_g.in_int), | |
void_int, | |
in_g.termination, | |
dvo, | |
CallType.ing, | |
) | |
cdr_buffer.append(gdp) | |
else: | |
# Продолжаем обрабатывать звонок | |
call_stack.append(event.body) | |
if event.body.type == Tetra.Types.reg: | |
""" Обработка записи о регистрации абонента """ | |
logger.debug( | |
f"REG: {event.body.seq_num} " | |
f"SERVED_NITSI: {bcd_to_str(event.body.served_nitsi)} " | |
f"LOCATION: {event.body.location}:{event.body.prev_location}" | |
) | |
reg = Reg(event.body) | |
reg_buffer[reg.get_number()].append(reg) | |
logger.info( | |
f"End reading block. Calls quantity: {len(cdr_buffer)}." | |
f"Regs quantity: {len(reg_buffer)}" | |
) | |
# Write REG records to BD | |
# if len(reg_buffer) > 0: | |
# conn.execute(REGS_TABLE.insert(), reg_buffer) | |
# reg_buffer.clear() | |
return cdr_buffer, reg_buffer | |
def init_db(path): | |
engine = create_engine(f"sqlite:///{path}", echo=True) | |
metadata = MetaData() | |
regs_table = Table( | |
"regs", | |
metadata, | |
Column("id", Integer, primary_key=True), | |
Column("served_nitsi", String(12)), | |
Column("location", Integer), | |
Column("prev_location", Integer), | |
Column("reg_at", DateTime), | |
PrimaryKeyConstraint("id", "served_nitsi", name="reg_pk"), | |
) | |
metadata.create_all(engine) | |
conn = engine.connect() | |
return conn, regs_table | |
def write_to_csv( | |
out_buffers: Tuple[List[Gcdr], DefaultDict[str, List[Reg]]], file: str | |
): | |
cdr_buff, reg_buff = out_buffers | |
# Write gcdrs to file | |
with open(file, "w+", newline="") as csv_file: | |
wr = csv.writer(csv_file, delimiter=";") | |
for cdr in cdr_buff: | |
cdr.abon_a.get_last_location(reg_buff, cdr.date, cdr.call_duration) | |
cdr.abon_b.get_last_location(reg_buff, cdr.date, cdr.call_duration) | |
wr.writerow(list(cdr)) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment