Skip to content

Instantly share code, notes, and snippets.

@chichoon
Created September 7, 2020 18:29
Binfile folder watchdog
import os
import numpy as np
import struct
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras import models
from tensorflow.keras import layers
from keras.utils.np_utils import to_categorical
from tensorflow.keras.preprocessing.text import Tokenizer
from matplotlib import pyplot as plt
def getrgb888(r, g, b):
i = 1
if r >= g and r >= b:
i = r / 255 + 1
elif g >= r and g >= b:
i = g / 255 + 1
elif b >= g and b >= r:
i = b / 255 + 1
if i != 0:
r_ = r / i
g_ = g / i
b_ = b / i
else:
r_ = r
g_ = g
b_ = b
if r_ > 30:
r_ = r_ - 30
if g_ > 30:
g_ = g_ - 30
if b > 30:
b_ = b_ - 30
r_ = r_ * 255 / 225
g_ = g_ * 255 / 225
b_ = b_ * 255 / 225
if r_ > 255:
r_ = 255
if g_ > 255:
g_ = 255
if b_ > 255:
b_ = 255
rgblist = [np.uint8(r_), np.uint8(g_), np.uint8(b_)]
return rgblist
#raw값을 rgb값으로 변환
def createdata(direction):
curr_path = direction
txt_path = os.path.join(os.path.abspath(os.getcwd()), 'tmp_filelist.txt')
dataX, dataY = [], []
with open(txt_path, 'wt') as f:
for root, dirs, files in os.walk(curr_path):
if files:
for file in files:
if file.find('.bin') > 0:
filename = os.path.join(root, file)
f.write(filename + '\n')
with open(txt_path, 'r') as file:
lines = file.readlines()
lines.sort()
j = 0;
for line in lines:
bfname = ''.join([os.path.splitext(line)[0], '.bin'])
with open(bfname, 'rb') as BF:
while True:
temp = BF.read(1) # 0x02 (Start of File)
if not temp: break
m_AcX = np.int16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) |
(int(struct.unpack('B', BF.read(1))[0]) << 8))
m_AcY = np.int16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) |
(int(struct.unpack('B', BF.read(1))[0]) << 8))
m_AcZ = np.int16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) |
(int(struct.unpack('B', BF.read(1))[0]) << 8))
tmp = np.int16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) |
(int(struct.unpack('B', BF.read(1))[0]) << 8))
m_GyX = np.int16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) |
(int(struct.unpack('B', BF.read(1))[0]) << 8))
m_GyY = np.int16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) |
(int(struct.unpack('B', BF.read(1))[0]) << 8))
m_GyZ = np.int16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) |
(int(struct.unpack('B', BF.read(1))[0]) << 8))
c_C = np.uint16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) |
(int(struct.unpack('B', BF.read(1))[0]) << 8))
c_R = np.uint16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) |
(int(struct.unpack('B', BF.read(1))[0]) << 8))
c_G = np.uint16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) |
(int(struct.unpack('B', BF.read(1))[0]) << 8))
c_B = np.uint16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) |
(int(struct.unpack('B', BF.read(1))[0]) << 8))
PPM = np.int(0x0000 | int(struct.unpack('B', BF.read(1))[0]) |
(int(struct.unpack('B', BF.read(1))[0]) << 8) |
(int(struct.unpack('B', BF.read(1))[0]) << 16) |
(int(struct.unpack('B', BF.read(1))[0]) << 24))
rgb = getrgb888(c_R, c_G, c_B)
dataX.append(rgb)
dataY.append(os.path.splitext(os.path.basename(line))[0])
temp = BF.read(1) # 0x03 (End of File)
j += 1
dataX = np.array(dataX)
t = Tokenizer()
t.fit_on_texts(dataY)
dataY_=t.texts_to_sequences(dataY)
dataY = to_categorical(dataY_)
#카테고리에 대한 one-hot encoding
return dataX, dataY
def func_keras(dataX, dataY):
x_train, x_test, y_train, y_test = train_test_split(dataX, dataY, test_size=0.2, shuffle=True, stratify=dataY, random_state=34)
model = models.Sequential()
model.add(layers.Dense(64, input_dim=3, activation='relu'))
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dense(32, activation='relu'))
model.add(layers.Dense(np.shape(y_train)[1], activation='softmax'))
model.compile(optimizer='rmsprop',
loss='categorical_crossentropy',
metrics=['accuracy']
)
hist = model.fit(x_train,
y_train,
epochs=500,
batch_size=32
)
loss = hist.history['loss'][499]
acc = hist.history['accuracy'][499]
model.save("C:/Users/jiyoo/Documents/code/python/model/model.h5")
tf.keras.backend.clear_session()
return loss, acc
import sys
import os
from PyQt5.QtGui import QIcon
from PyQt5.QtCore import pyqtSignal, QObject, QThread
from PyQt5.QtWidgets import QErrorMessage, QApplication, QWidget, QDesktopWidget
from PyQt5.QtWidgets import QLabel, QHBoxLayout, QVBoxLayout, QMainWindow, QLineEdit
from PyQt5.QtWidgets import QPushButton, QAction, QMenu, QSystemTrayIcon, qApp, QWidget
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
import time
import binkeras
direction = 'C:/Users/jiyoo/Documents/code/binfiles/binfiles'
expl = '내 binary 파일을 감시합니다.'
newfile = 0 #number of new files updated after last training
accuracyN = 0 #accuracy of last training
costN = 0 #cost of last training
trainflag = False #whether it is training or not
newflag = False
class Singleton(object): #allows only one process working
_instance = None
def __new__(cls, *args, **kwargs):
if not isinstance(cls._instance, Singleton):
cls._instance = object.__new__(cls, *args, **kwargs)
return cls._instance
class binfileWatch(Singleton):
def __init__(self):
self.observer = None
self.is_watching = False
def setting(self):
self.observer = Observer() #define observer for watchdog
eventhandler = Handler(self.observer)
self.observer.schedule(eventhandler, direction, recursive=True)
def run(self): #start observing
if not self.observer:
self.setting()
self.observer.start()
self.is_watching = True
def stop(self): #stop observing
self.observer.stop()
self.observer = None
self.is_watching = False
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
class Handler(FileSystemEventHandler):
def __init__(self, observer):
self.observer = observer
self.dirpath = direction #direction to watch file creation
self.tag = 'MyEventHandler'
self.wait = 1
self.retry = 10
def on_created(self, event): #executed when file is created in folder
global newflag
global newfile
newfile += 1
newflag = True
class watchThread(QThread): #looping in GUI program
newfilesignal = pyqtSignal(int)
newfiletrainsignal = pyqtSignal(int)
def __init__(self):
QThread.__init__(self)
def run(self):
global newflag, trainflag
global newfile
while True:
if newflag : #check flag (global variable) to see if new file is created
if not trainflag:
self.newfilesignal.emit(newfile)
newflag = False
trainflag = True
self.usleep(100)
else:
self.newfiletrainsignal.emit(newfile)
newflag = False
self.usleep(100)
class trainThread(QThread):
trainsignal = pyqtSignal(int)
traindonesignal = pyqtSignal(int)
def __init__(self):
QThread.__init__(self)
def run(self):
global trainflag, newflag
global accuracyN, costN, newfile
while True:
if trainflag:
self.sleep(1)
self.trainsignal.emit(newfile) #emit signal if so
newfile = 0
datax, datay = binkeras.createdata(direction)
costN, accuracyN = binkeras.func_keras(datax, datay)
self.traindonesignal.emit(newfile)
newflag = False
trainflag = False
self.sleep(1)
class App(QMainWindow):
tray_icon = None
def __init__(self):
super().__init__()
self.widget = QWidget()
self.ledt = QLineEdit()
self.watchthread = watchThread()
self.trainthread = trainThread()
self.watch = binfileWatch()
self.initUI()
self.initActions()
self.setFixedSize(400, 200)
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def initUI(self):
global newfile
self.layout1 = QHBoxLayout() #Program Description
self.layout2 = QHBoxLayout() #number of new files updated, now training...
self.layout3 = QHBoxLayout() #model Accuracy & Cost
self.layout4 = QHBoxLayout() #button - watchdog start, stop
self.layout5 = QHBoxLayout() #button - watchdog quit
self.main_layout = QVBoxLayout() #layout for all-in-one
self.explabel = QLabel(''.join([os.path.splitext(os.path.basename(direction))[0], ' 폴더 내 binary 파일을 감시합니다.']))
self.newlabel = QLabel(''.join([str(newfile), ' 개의 신규 파일 존재']))
self.acclabel = QLabel(''.join(['Model Accuracy : ', str(accuracyN), ' Model Cost : ', str(costN)]))
self.startbtn = QPushButton('감시 시작')
self.quitbtn = QPushButton('프로그램 종료')
self.layout1.addWidget(self.explabel)
self.layout2.addWidget(self.newlabel)
self.layout3.addWidget(self.acclabel)
self.layout4.addWidget(self.startbtn)
self.layout5.addWidget(self.quitbtn)
self.main_layout.addLayout(self.layout1)
self.main_layout.addLayout(self.layout2)
self.main_layout.addLayout(self.layout3)
self.main_layout.addLayout(self.layout4)
self.main_layout.addLayout(self.layout5)
self.widget.setLayout(self.main_layout)
self.setCentralWidget(self.widget)
self.tray_icon = QSystemTrayIcon(self)
self.tray_icon.setIcon(QIcon('icon.ico'))
show_action = QAction("Show", self)
quit_action = QAction("Exit", self)
hide_action = QAction("Hide", self)
show_action.triggered.connect(self.show)
hide_action.triggered.connect(self.hide)
quit_action.triggered.connect(qApp.quit)
tray_menu = QMenu()
tray_menu.addAction(show_action)
tray_menu.addAction(hide_action)
tray_menu.addAction(quit_action)
self.tray_icon.setContextMenu(tray_menu)
self.tray_icon.show()
self.watchthread.newfilesignal.connect(self.showMsgnewfile)
self.watchthread.newfiletrainsignal.connect(self.showMsgnewfiletrain)
self.trainthread.trainsignal.connect(self.showMsgtrain)
self.trainthread.traindonesignal.connect(self.showMsgtrainDone)
self.setWindowTitle('BinWatchdog')
self.setWindowIcon(QIcon('icon.png'))
self.setGeometry(0, 0, 300, 200)
self.show()
def initActions(self):
self.startbtn.clicked.connect(self.toggleStart)
self.quitbtn.clicked.connect(self.toggleQuit)
def closeEvent(self, event):
event.ignore()
self.hide()
self.tray_icon.showMessage(
'BinWatchDog',
'Watchdog in Tray',
QIcon('icon.ico'),
2000)
def showMsgnewfile(self, val): #executes if gain signal from thread
self.tray_icon.showMessage(
'BinWatchDog',
'New item updated in folder',
QIcon('icon.ico'),
2000)
self.newlabel.setText(''.join([str(val), ' 개의 신규 파일 존재'])) #update text
def showMsgnewfiletrain(self, val): #executes if gain signal from thread
self.tray_icon.showMessage(
'BinWatchDog',
'New item updated in folder',
QIcon('icon.ico'),
2000)
self.newlabel.setText(''.join([str(val), ' 개의 신규 파일 존재, Training...'])) #update text
def showMsgtrain(self, val): #executes if gain signal from thread
self.tray_icon.showMessage(
'BinWatchDog',
'Training Start....',
QIcon('icon.ico'),
2000)
self.newlabel.setText(''.join([str(val), ' 개의 신규 파일 존재, Training...']))
def showMsgtrainDone(self, val): #executes if gain signal from thread
self.tray_icon.showMessage(
'BinWatchDog',
'Training Done',
QIcon('icon.ico'),
2000)
self.acclabel.setText(''.join(['Model Accuracy : ', str(round(float(accuracyN), 2)), ' Model Cost : ', str(round(float(costN), 2))]))
self.newlabel.setText(''.join([str(val), ' 개의 신규 파일 존재']))
def toggleStart(self):
if self.watch.is_watching: #if it is now watching
self.watch.stop() #Watchdog stop
self.startbtn.setText('감시 시작')
else:
self.watch.run()
self.watchthread.start()
self.trainthread.start()
self.startbtn.setText('감시 종료')
def toggleQuit(self):
sys.exit()
if __name__ == '__main__':
app = QApplication(sys.argv)
window = App()
window.show()
sys.exit(app.exec())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment