Skip to content

Instantly share code, notes, and snippets.

@serjflint
Created August 11, 2020 07:01
Show Gist options
  • Save serjflint/efe4f42b5934c2db16c64769ffa7499f to your computer and use it in GitHub Desktop.
Save serjflint/efe4f42b5934c2db16c64769ffa7499f to your computer and use it in GitHub Desktop.
Trio multi-thread file update idling
pyinstaller --onefile --noconfirm --hiddenimport=win32timezone ^
--hiddenimport=pkg_resources.py2_warn --hiddenimport=win32serviceutil ^
--noupx --uac-admin --name=AG_Loader cli.py
import sys
import servicemanager
import win32serviceutil
from logbook import Logger, FileHandler, NTEventLogHandler, INFO, set_datetime_format
from loader.loader import main, this_exe_dir, Loader
from loader.service import Service
log = Logger("cli")
set_datetime_format("local")
LOG_FORMAT = (
"[{record.time}] {record.level_name}: {record.channel}: "
"{record.func_name}({record.lineno}): {record.message}"
)
if __name__ == "__main__":
worker = Loader()
worker.init()
worker_name = worker.config["Addon"]["name"]
Service._svc_name_ = worker_name
Service._svc_display_name_ = worker_name
Service._svc_description_ = "Служба интеграции данных для ПО АвтоГраф"
Service._target = (main, worker)
with NTEventLogHandler(
worker_name, format_string=LOG_FORMAT, level=INFO
).applicationbound():
with FileHandler(
this_exe_dir().joinpath("service.log"),
format_string=LOG_FORMAT,
bubble=True,
).applicationbound():
try:
if len(sys.argv) == 1:
servicemanager.Initialize()
servicemanager.PrepareToHostSingle(Service)
servicemanager.StartServiceCtrlDispatcher()
else:
win32serviceutil.HandleCommandLine(Service)
except Exception as err:
log.exception(err)
sys.exit(1)
[Addon]
name = "AG_Loader"
debug = false
[AutoGRAPH]
dir = "temp"
carslist = true
sbin = true
last_id = 675403987
step = 100000
carlist = true
lls = false
[Sync]
wait = 300
attempts = 30
writes = 30
import argparse
from datetime import datetime, timedelta
import os
import re
import sys
import time
from collections import namedtuple
from typing import List, Dict, Tuple, Callable
import random
import faulthandler
import toml
import trio
import trio_mysql.cursors
from logbook import (
Logger,
set_datetime_format,
FileHandler,
StderrHandler,
NullHandler,
INFO,
DEBUG,
)
log = Logger("loader")
set_datetime_format("local")
Record = namedtuple(
"Record",
[
"rec_id",
"obj_id",
"rec_date",
"lat",
"lon",
"d1",
"d2",
"d3",
"d4",
"d5",
"d6",
"an1",
"an2",
],
)
Objects = Dict[int, int]
Groups = Dict[str, List[int]]
Records = Dict[int, Dict[Tuple, List[Record]]]
def this_exe_dir():
return trio.Path(os.path.dirname(os.path.realpath(sys.argv[0])))
DEFAULT_LAST_ID = 0
DEFAULT_STEP = 10000
DEFAULT_WORKING_DIR = this_exe_dir()
DEFAULT_CARSLIST = True
DEFAULT_SBIN = True
DEFAULT_LLS = True
DEFAULT_WAIT = 300
DEFAULT_ATTEMPTS = 30
DEFAULT_WRITES = 30
ONLINE_READ_TIMEOUT = 10
START_DATETIME = datetime.min
CHANNEL_BUFFER_SIZE = 1
LOG_FORMAT = (
"[{record.time}] {record.level_name}: {record.channel}: "
"{record.func_name}({record.lineno}): {record.message}"
)
class Loader:
"""
Stop Callable is used to check if the main thread is scheduled to stop (Windows service)
To start Loader use an example below:
if __name__ == "__main__":
worker = Loader()
worker.init()
main(worker)
"""
def __init__(self, stop: Callable = lambda: False):
self.connection: dict = dict()
self.config: dict = dict()
self.config_path: trio.Path = None
self.is_stop: Callable = stop
self.objects: Objects = dict()
self._initialized: bool = False
self._producer_last_id: int = 0
self.open_files: set = set()
self.writing_files: set = set()
def init(self):
if sys.argv[0].endswith(".py"):
parser: argparse.ArgumentParser = argparse.ArgumentParser(
description="AG_Loader"
)
parser.add_argument(
"config",
nargs="?",
default="config.toml",
help="Path to the configuration file",
)
args, leftovers = parser.parse_known_args()
self.config_path = trio.Path(args.config)
else:
self.config_path = this_exe_dir().joinpath("config.toml")
try:
with open(self.config_path, mode="r", encoding="utf8") as stream:
self.config = toml.load(stream)
except FileNotFoundError as err:
log.critical(f'Невозможно открыть "config.toml": {err}')
sys.exit(1)
if (
self.config.get("Addon", None) is None
or self.config["Addon"].get("name", None) is None
):
log.critical("Обязательно наличие названия службы в конфигурации")
sys.exit(1)
self._initialized = True
async def run(self):
if not self._initialized:
log.critical("Перед запуском основного цикла необходимо вызвать init()!")
sys.exit(1)
try:
await this_exe_dir().joinpath("SBIN").mkdir(exist_ok=True)
except OSError as err:
log.critical(f"Невозможно создать папку с данными: {err}")
sys.exit(1)
if (
self.config.get("SERVER", None) is None
or self.config["SERVER"].get("host", None) is None
):
log.critical("Обязательно наличие удалённого сервера в конфигурации")
sys.exit(1)
a = self.config["AutoGRAPH"]
a["last_id"] = int(a.get("last_id", DEFAULT_LAST_ID))
a["step"] = int(a.get("step", DEFAULT_STEP)) or int(p.step)
a["dir"] = a.get("dir", DEFAULT_WORKING_DIR)
a["carslist"] = bool(a.get("carslist", DEFAULT_CARSLIST))
a["sbin"] = bool(a.get("sbin", DEFAULT_SBIN))
a["lls"] = bool(a.get("lls", DEFAULT_LLS))
try:
objects, groups = await self.select_objects()
except trio_mysql.Error as err:
log.error(f"Ошибка обновления списка объектов: {err}")
sys.exit(1)
self.objects = objects
s = self.config["Sync"]
s["wait"] = int(s.get("wait", DEFAULT_WAIT))
s["attempts"] = int(s.get("attempts", DEFAULT_ATTEMPTS))
s["writes"] = int(s.get("writes", DEFAULT_WRITES))
if a["carslist"]:
try:
await self.save_carslist(a["dir"], groups)
except OSError as err:
log.error(f"Не удалось записать CarsList: {err}")
sys.exit(1)
log.info(f"Загружена информация о {len(self.objects)} объектах")
async with trio.open_nursery() as nursery:
nursery.start_soon(self.schedule, self.save_config, 60, self.config_path)
nursery.start_soon(
self.schedule, self.update_cars_lists, 60, a["carslist"], a["dir"]
)
send_channel, receive_channel = trio.open_memory_channel(
CHANNEL_BUFFER_SIZE
)
nursery.start_soon(self.producer, send_channel)
nursery.start_soon(self.consumer, receive_channel)
async def producer(self, send_channel: trio.MemorySendChannel) -> None:
a = self.config["AutoGRAPH"]
self._producer_last_id, step = a["last_id"], a["step"]
async with send_channel:
while not self.is_stop():
await trio.sleep(1)
last_id = self._producer_last_id
log.debug(f"Enter {self._producer_last_id}")
try:
max_id = await self.select_last_id()
except trio_mysql.Error as err:
log.error(f"Ошибка получения записей от источника: {err}")
continue
try:
records, last_id, received = await self.select_records(
last_id, step, self.objects
)
except trio_mysql.Error as err:
log.error(f"Ошибка получения записей: {err}")
continue
log.debug(f"Exit {self._producer_last_id}")
if records:
if not last_id > self._producer_last_id:
log.warn(
f"Wrong last_id: {last_id}. Original: {self._producer_last_id}"
)
continue
processed = last_id - self._producer_last_id
self._producer_last_id = last_id
log.info(
f"Пройдено: {processed}. Получено {received}. Прогресс: {last_id} из {max_id}"
)
await trio.sleep(0.1)
await send_channel.send((records, last_id, received))
else:
processed = 0
if self._producer_last_id + step < max_id:
processed = step
last_id = self._producer_last_id + step
self._producer_last_id = last_id
log.info(
f"Пройдено: {processed}. Получено {received}. Прогресс: {last_id} из {max_id}. "
)
continue
if processed < step:
if last_id + step > max_id:
await trio.sleep(ONLINE_READ_TIMEOUT)
async def consumer(self, receive_channel: trio.MemoryReceiveChannel) -> None:
lls = self.config["AutoGRAPH"]["lls"]
async with receive_channel:
async for result in receive_channel:
if self.is_stop():
return
records, last_id, received = result
log.debug(f"Enter {last_id}")
while True:
try:
saved = await self.save_records(
records, self.objects, lls=lls, **self.config["Sync"]
)
self.config["AutoGRAPH"]["last_id"] = last_id
log.info(
f"Получено: {received}. Сохранено: {saved}. Последний: {last_id}"
)
break
except (OSError, TimeoutError) as err:
log.exception(f"Не удалось сохранить записи: {err}")
await trio.sleep(1)
log.debug(f"Exit {last_id}")
async def save_config(self, path) -> None:
log.debug(f"Enter")
try:
with open(path, mode="w", encoding="utf8") as f:
data = toml.dumps(self.config)
if not data:
raise ValueError(f"Cannot dump to TOML: {self.config}")
f.write(data)
except FileNotFoundError as err:
log.critical(f'Невозможно открыть "{path}": {err}')
except Exception as err:
log.exception(err)
log.debug(f"Exit")
# debug stats
log.debug(
f"open_files {len(self.open_files)}, writing files {len(self.writing_files)}"
)
if self.config["Addon"]["debug"]:
stats = trio.to_thread.current_default_thread_limiter().statistics()
low_stats = trio.lowlevel.current_statistics()
log.debug(f"Thread stats: {stats}\nLow stats: {low_stats}")
with open(this_exe_dir().joinpath("thread_dump.log"), mode="w") as f:
faulthandler.dump_traceback(f)
trio.lowlevel.current_trio_token().run_sync_soon(lambda: None)
async def schedule(self, func: Callable, interval: float = 1.0, *args):
while not self.is_stop():
await trio.sleep(interval)
try:
log.debug(f"Enter")
await func(*args)
log.debug(f"Exit")
except Exception as err:
log.exception(f"Error in scheduled function {func.__name__}: {err}")
async def update_cars_lists(self, carslist, working_dir) -> bool:
log.debug(f"Enter")
try:
objects, groups = await self.select_objects()
except trio_mysql.Error as err:
log.error(f"Ошибка обновления списка объектов: {err}")
return False
self.objects = objects
if carslist:
try:
await self.save_carslist(working_dir, groups)
except (OSError, TimeoutError) as err:
log.error(f"Не удалось записать CarsList: {err}")
return False
log.debug(f"Exit")
return True
async def select_last_id(self) -> int:
log.debug(f"Enter")
last_id = self._producer_last_id + 2 * self.config["AutoGRAPH"]["step"]
log.debug(f"Exit")
return last_id
async def select_objects(self) -> (Objects, Groups):
log.debug(f"Enter")
objects: dict = dict()
groups: dict = dict()
for obj in range(10000):
objects[obj] = obj
buffer: List[int] = groups.get(str(obj % 100), list())
buffer.append(obj)
groups[str(obj % 100)] = buffer
log.debug(f"Exit")
return objects, groups
@staticmethod
async def save_carslist(working_dir: str, groups: Groups) -> None:
log.debug(f"Enter")
working_dir = trio.Path(working_dir)
await working_dir.mkdir(exist_ok=True)
carslist = working_dir.joinpath("CarsList")
await carslist.mkdir(exist_ok=True)
dbf = working_dir.joinpath("DBF")
await dbf.mkdir(exist_ok=True)
async def write_carslist(name: str, group: List[int]):
name = re.sub(r"[\\/:*?\"\'<>|]", "", name)
ini_file = carslist.joinpath(f"{name}.ini")
atg_file = dbf.joinpath(f"{name}.atg")
ini = f"[SETUP]\nName={name}\n"
atg = f"AutoGRAPH database 1.3\n{len(group)} records\n"
for obj in group:
try:
autograph_id = get_autograph_id(obj)
except ValueError:
log.info(f"Missing autograph_id in {obj}")
continue
ini += (
f"[{autograph_id}]\n"
f"Model={obj}\n"
f"Number={obj}\n"
"Online=1,1\n"
)
atg += f"{autograph_id}:\tPASSWORD={autograph_id}0\n"
try:
async with await trio.open_file(ini_file, mode="w") as f:
await f.write(ini)
async with await trio.open_file(atg_file, mode="w") as f:
await f.write(atg)
except UnicodeEncodeError as err:
log.error(f"{err}: {ini}")
raise
with trio.move_on_after(30) as cancel_scope:
async with trio.open_nursery() as nursery:
for key, value in groups.items():
nursery.start_soon(write_carslist, key, value)
if cancel_scope.cancelled_caught:
raise TimeoutError("Cannot save CarsList")
log.debug(f"Exit")
async def select_records(
self, last_id: int, step: int, objects: Objects
) -> (Record, int):
log.debug(f"Enter")
records = dict()
new_last_id: int = last_id
received = 0
for i in range(step):
record = Record(
last_id + i,
random.randrange(10000),
datetime(2020, 1, 1, 0, 0, 0),
50.0,
50.0,
2,
1,
1,
1,
1,
1,
None,
None,
)
if record.rec_id > new_last_id:
new_last_id = record.rec_id
if not record.lat or not record.lon or record.obj_id not in objects:
continue
obj_dict: dict = records.get(record.obj_id, dict())
week = record.rec_date.isocalendar()[:2]
buffer = obj_dict.get(week, list())
buffer.append(record)
received += 1
obj_dict[week] = buffer
records[record.obj_id] = obj_dict
log.debug(f"Exit")
return records, new_last_id, received
async def save_records(
self,
records: Records,
objects: Objects,
lls: bool = False,
wait: int = 30,
attempts: int = 5,
writes: int = 10,
) -> int:
log.debug(f"Enter")
queue = list()
for obj_id, weeks in records.items():
if obj_id not in objects:
continue
for week, batch in weeks.items():
queue.append((obj_id, batch))
log.debug(f"Await {len(queue)} writes")
step = writes
saved = list()
for x in range(0, len(queue), step):
with trio.move_on_after(wait) as cancel_scope:
async with trio.open_nursery() as nursery:
for obj_id, batch in queue[x : x + step]:
autograph_id = get_autograph_id(objects[obj_id])
nursery.start_soon(
self.write_sbin, autograph_id, batch, lls, attempts, saved
)
if cancel_scope.cancelled_caught:
raise TimeoutError("Can not save records in time")
log.debug(f"Exit")
return sum(saved)
async def write_sbin(
self,
autograph_id: int,
batch: List[Record],
lls: bool,
attempts: int,
saved: list,
):
log.debug(f"Write {autograph_id}")
buffer: bytes = bytes()
try:
filtered = batch
saved.append(len(filtered))
for record in filtered:
buffer += record_to_bin(record, lls)
except AttributeError as err:
raise TypeError(f"Batch should contain entries of type Record: {err}")
obj_dir = this_exe_dir().joinpath("SBIN").joinpath(autograph_id)
await obj_dir.mkdir(exist_ok=True)
today = batch[0].rec_date.date()
monday = today - timedelta(days=today.weekday())
file_name = f"{autograph_id}-{monday:%y%m%d}.sbin"
sbin_file = obj_dir.joinpath(file_name)
for t in range(attempts):
try:
self.open_files.add(file_name)
async with await trio.open_file(sbin_file, mode="ab", buffering=0) as f:
self.writing_files.add(file_name)
await f.write(buffer)
self.writing_files.remove(file_name)
# with open(sbin_file, mode="ab", buffering=0) as f:
# log.debug(f"Await write {obj_id}")
# f.write(buffer)
break
except OSError as err:
log.error(f"Не удалось сохранить записи в файл, попытка {t}: {err}")
await trio.sleep(1)
else:
raise OSError("Не удалось сохранить записи в файл")
self.open_files.remove(file_name)
def date_to_ag(dt: datetime) -> str:
sec = (dt.year - 2009) * 32140800
sec += (dt.month - 1) * 2678400
sec += (dt.day - 1) * 86400
sec += dt.hour * 3600
sec += dt.minute * 60
sec += dt.second
return f"{sec:08x}"
def nmea_to_ag(val: float):
val /= 100.0
dd = int(val)
h = (((val - dd) * 100 / 60) + dd) * 1.2 * 1000000
return f"{int(h):07x}"
def get_autograph_id(obj: int) -> str:
if obj is None:
raise ValueError(f"Autograph_id must not be None")
return f"{obj:07}"[-7:]
# fmt: off
CRC8_Table = [
0, 94, 188, 226, 97, 63, 221, 131, 194, 156, 126, 32, 163, 253, 31, 65,
157, 195, 33, 127, 252, 162, 64, 30, 95, 1, 227, 189, 62, 96, 130, 220,
35, 125, 159, 193, 66, 28, 254, 160, 225, 191, 93, 3, 128, 222, 60, 98,
190, 224, 2, 92, 223, 129, 99, 61, 124, 34, 192, 158, 29, 67, 161, 255,
70, 24, 250, 164, 39, 121, 155, 197, 132, 218, 56, 102, 229, 187, 89, 7,
219, 133, 103, 57, 186, 228, 6, 88, 25, 71, 165, 251, 120, 38, 196, 154,
101, 59, 217, 135, 4, 90, 184, 230, 167, 249, 27, 69, 198, 152, 122, 36,
248, 166, 68, 26, 153, 199, 37, 123, 58, 100, 134, 216, 91, 5, 231, 185,
140, 210, 48, 110, 237, 179, 81, 15, 78, 16, 242, 172, 47, 113, 147, 205,
17, 79, 173, 243, 112, 46, 204, 146, 211, 141, 111, 49, 178, 236, 14, 80,
175, 241, 19, 77, 206, 144, 114, 44, 109, 51, 209, 143, 12, 82, 176, 238,
50, 108, 142, 208, 83, 13, 239, 177, 240, 174, 76, 18, 145, 207, 45, 115,
202, 148, 118, 40, 171, 245, 23, 73, 8, 86, 180, 234, 105, 55, 213, 139,
87, 9, 235, 181, 54, 104, 138, 212, 149, 203, 41, 119, 244, 170, 72, 22,
233, 183, 85, 11, 136, 214, 52, 106, 43, 117, 151, 201, 74, 20, 246, 168,
116, 42, 200, 150, 21, 75, 169, 247, 182, 232, 10, 84, 215, 137, 107, 53
]
# fmt: on
def CRC8(buff):
crc = 0
for c in buff:
crc = CRC8_Table[crc ^ c]
return crc
def record_to_bin(record: Record, lls: bool = False) -> bytes:
if record.lat < 0 or record.lon < 0:
return bytes()
rec_date = date_to_ag(record.rec_date)
lat = f"B{nmea_to_ag(record.lat)}"
lon = f"2{nmea_to_ag(record.lon)}"
try:
data = list(bytes.fromhex("00E6F70F" + lon + lat + rec_date))
except ValueError as err:
log.error(f"{err}: record {record} lon {lon} lat {lat} rec_date {rec_date}")
return bytes()
data.reverse()
d = 0x0F
if record.d1 == 2:
d -= 1 << 0
if record.d1 == 2:
d -= 1 << 1
if record.d1 == 2:
d -= 1 << 2
if record.d1 == 2:
d -= 1 << 3
data[-4] = d
data[-1] = CRC8(data[:-1])
if lls and record.an1 and record.an2 and record.an1 < 4096 and record.an2 < 4096:
data += data[:4]
data[-1] |= 0b1 << 7
lls1 = f"{int(record.an1):03x}"
lls2 = f"{int(record.an2):03x}"
data += reversed(list(bytes.fromhex(f"{lls2}{lls1}08")))
data += [3, 0, 0, 0]
data += [0x0F, 0xF7, 0xE6, 0] # 0x0E -> 0x0F
data[-1] = CRC8(data[-16:-1])
return bytes(data)
def main(worker: Loader):
level = DEBUG if worker.config["Addon"].get("debug", False) else INFO
with NullHandler().applicationbound():
with FileHandler(
this_exe_dir().joinpath("worker.log"),
format_string=LOG_FORMAT,
level=level,
).applicationbound():
log.info(f"Программа запущена в {this_exe_dir()}")
with StderrHandler(
format_string=LOG_FORMAT, bubble=True, level=level
).applicationbound():
while True:
try:
trio.run(worker.run)
break
except OSError as err:
log.error(f"Сервер Автограф не отвечает: {err}")
time.sleep(30)
except Exception as err:
log.exception(err)
time.sleep(3)
[tool.poetry]
name = "ag_loader_python"
version = "0.3.0"
description = ""
authors = ["Sergei Iakhnitskii <serjflint@gmail.com>"]
[tool.poetry.dependencies]
python = "^3.8"
logbook = "^1.5.3"
trio = "^0.16.0"
trio-mysql = "^0.9.3"
toml = "^0.10.1"
[tool.poetry.dev-dependencies]
pyinstaller = "^3.6"
pywin32 = "^228"
pywin32-ctypes = "^0.2.0"
pefile = "^2019.4.18"
[build-system]
requires = ["poetry>=0.12"]
build-backend = "poetry.masonry.api"
from threading import Thread
from typing import Callable
import servicemanager
import win32event
import win32service
import win32serviceutil
from logbook import Logger
log = Logger("service")
class Service(win32serviceutil.ServiceFramework):
_svc_name_: str = "actualServiceName"
_svc_display_name_: str = "actualDisplayName"
_svc_description_: str = "actualDescription"
_target: (Callable, object) = (lambda x: x, None)
_svc_is_auto_start_: bool = True
def __init__(self, *args):
win32serviceutil.ServiceFramework.__init__(self, *args)
self.stop_event = win32event.CreateEvent(None, 0, 0, None)
log.info("Init")
# logs into the system event log
def log(self, msg):
servicemanager.LogInfoMsg(str(msg))
def SvcDoRun(self):
log.info("start")
self.ReportServiceStatus(win32service.SERVICE_START_PENDING)
try:
self.ReportServiceStatus(win32service.SERVICE_RUNNING)
stop_thread: bool = False
target, worker = self._target
worker.is_stop = lambda: stop_thread
try:
t = Thread(target=target, args=(worker,))
t.start()
except Exception as err:
log.exception(err)
raise
log.info("wait")
win32event.WaitForSingleObject(self.stop_event, win32event.INFINITE)
log.info("stopping")
stop_thread = True
t.join()
log.info("stop")
except Exception as x:
log.exception()
self.log("Exception : %s" % x)
self.SvcStop()
def SvcStop(self):
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
win32event.SetEvent(self.stop_event)
self.ReportServiceStatus(win32service.SERVICE_STOPPED)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment