/binkeras.py Secret
Created
September 7, 2020 18:29
Binfile folder watchdog
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import numpy as np | |
import struct | |
from sklearn.model_selection import train_test_split | |
import tensorflow as tf | |
from tensorflow.keras import models | |
from tensorflow.keras import layers | |
from keras.utils.np_utils import to_categorical | |
from tensorflow.keras.preprocessing.text import Tokenizer | |
from matplotlib import pyplot as plt | |
def getrgb888(r, g, b): | |
i = 1 | |
if r >= g and r >= b: | |
i = r / 255 + 1 | |
elif g >= r and g >= b: | |
i = g / 255 + 1 | |
elif b >= g and b >= r: | |
i = b / 255 + 1 | |
if i != 0: | |
r_ = r / i | |
g_ = g / i | |
b_ = b / i | |
else: | |
r_ = r | |
g_ = g | |
b_ = b | |
if r_ > 30: | |
r_ = r_ - 30 | |
if g_ > 30: | |
g_ = g_ - 30 | |
if b > 30: | |
b_ = b_ - 30 | |
r_ = r_ * 255 / 225 | |
g_ = g_ * 255 / 225 | |
b_ = b_ * 255 / 225 | |
if r_ > 255: | |
r_ = 255 | |
if g_ > 255: | |
g_ = 255 | |
if b_ > 255: | |
b_ = 255 | |
rgblist = [np.uint8(r_), np.uint8(g_), np.uint8(b_)] | |
return rgblist | |
#raw값을 rgb값으로 변환 | |
def createdata(direction): | |
curr_path = direction | |
txt_path = os.path.join(os.path.abspath(os.getcwd()), 'tmp_filelist.txt') | |
dataX, dataY = [], [] | |
with open(txt_path, 'wt') as f: | |
for root, dirs, files in os.walk(curr_path): | |
if files: | |
for file in files: | |
if file.find('.bin') > 0: | |
filename = os.path.join(root, file) | |
f.write(filename + '\n') | |
with open(txt_path, 'r') as file: | |
lines = file.readlines() | |
lines.sort() | |
j = 0; | |
for line in lines: | |
bfname = ''.join([os.path.splitext(line)[0], '.bin']) | |
with open(bfname, 'rb') as BF: | |
while True: | |
temp = BF.read(1) # 0x02 (Start of File) | |
if not temp: break | |
m_AcX = np.int16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) | | |
(int(struct.unpack('B', BF.read(1))[0]) << 8)) | |
m_AcY = np.int16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) | | |
(int(struct.unpack('B', BF.read(1))[0]) << 8)) | |
m_AcZ = np.int16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) | | |
(int(struct.unpack('B', BF.read(1))[0]) << 8)) | |
tmp = np.int16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) | | |
(int(struct.unpack('B', BF.read(1))[0]) << 8)) | |
m_GyX = np.int16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) | | |
(int(struct.unpack('B', BF.read(1))[0]) << 8)) | |
m_GyY = np.int16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) | | |
(int(struct.unpack('B', BF.read(1))[0]) << 8)) | |
m_GyZ = np.int16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) | | |
(int(struct.unpack('B', BF.read(1))[0]) << 8)) | |
c_C = np.uint16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) | | |
(int(struct.unpack('B', BF.read(1))[0]) << 8)) | |
c_R = np.uint16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) | | |
(int(struct.unpack('B', BF.read(1))[0]) << 8)) | |
c_G = np.uint16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) | | |
(int(struct.unpack('B', BF.read(1))[0]) << 8)) | |
c_B = np.uint16(0x0000 | int(struct.unpack('B', BF.read(1))[0]) | | |
(int(struct.unpack('B', BF.read(1))[0]) << 8)) | |
PPM = np.int(0x0000 | int(struct.unpack('B', BF.read(1))[0]) | | |
(int(struct.unpack('B', BF.read(1))[0]) << 8) | | |
(int(struct.unpack('B', BF.read(1))[0]) << 16) | | |
(int(struct.unpack('B', BF.read(1))[0]) << 24)) | |
rgb = getrgb888(c_R, c_G, c_B) | |
dataX.append(rgb) | |
dataY.append(os.path.splitext(os.path.basename(line))[0]) | |
temp = BF.read(1) # 0x03 (End of File) | |
j += 1 | |
dataX = np.array(dataX) | |
t = Tokenizer() | |
t.fit_on_texts(dataY) | |
dataY_=t.texts_to_sequences(dataY) | |
dataY = to_categorical(dataY_) | |
#카테고리에 대한 one-hot encoding | |
return dataX, dataY | |
def func_keras(dataX, dataY): | |
x_train, x_test, y_train, y_test = train_test_split(dataX, dataY, test_size=0.2, shuffle=True, stratify=dataY, random_state=34) | |
model = models.Sequential() | |
model.add(layers.Dense(64, input_dim=3, activation='relu')) | |
model.add(layers.Dense(64, activation='relu')) | |
model.add(layers.Dense(32, activation='relu')) | |
model.add(layers.Dense(np.shape(y_train)[1], activation='softmax')) | |
model.compile(optimizer='rmsprop', | |
loss='categorical_crossentropy', | |
metrics=['accuracy'] | |
) | |
hist = model.fit(x_train, | |
y_train, | |
epochs=500, | |
batch_size=32 | |
) | |
loss = hist.history['loss'][499] | |
acc = hist.history['accuracy'][499] | |
model.save("C:/Users/jiyoo/Documents/code/python/model/model.h5") | |
tf.keras.backend.clear_session() | |
return loss, acc |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import os | |
from PyQt5.QtGui import QIcon | |
from PyQt5.QtCore import pyqtSignal, QObject, QThread | |
from PyQt5.QtWidgets import QErrorMessage, QApplication, QWidget, QDesktopWidget | |
from PyQt5.QtWidgets import QLabel, QHBoxLayout, QVBoxLayout, QMainWindow, QLineEdit | |
from PyQt5.QtWidgets import QPushButton, QAction, QMenu, QSystemTrayIcon, qApp, QWidget | |
from watchdog.events import FileSystemEventHandler | |
from watchdog.observers import Observer | |
import time | |
import binkeras | |
direction = 'C:/Users/jiyoo/Documents/code/binfiles/binfiles' | |
expl = '내 binary 파일을 감시합니다.' | |
newfile = 0 #number of new files updated after last training | |
accuracyN = 0 #accuracy of last training | |
costN = 0 #cost of last training | |
trainflag = False #whether it is training or not | |
newflag = False | |
class Singleton(object): #allows only one process working | |
_instance = None | |
def __new__(cls, *args, **kwargs): | |
if not isinstance(cls._instance, Singleton): | |
cls._instance = object.__new__(cls, *args, **kwargs) | |
return cls._instance | |
class binfileWatch(Singleton): | |
def __init__(self): | |
self.observer = None | |
self.is_watching = False | |
def setting(self): | |
self.observer = Observer() #define observer for watchdog | |
eventhandler = Handler(self.observer) | |
self.observer.schedule(eventhandler, direction, recursive=True) | |
def run(self): #start observing | |
if not self.observer: | |
self.setting() | |
self.observer.start() | |
self.is_watching = True | |
def stop(self): #stop observing | |
self.observer.stop() | |
self.observer = None | |
self.is_watching = False | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self.stop() | |
class Handler(FileSystemEventHandler): | |
def __init__(self, observer): | |
self.observer = observer | |
self.dirpath = direction #direction to watch file creation | |
self.tag = 'MyEventHandler' | |
self.wait = 1 | |
self.retry = 10 | |
def on_created(self, event): #executed when file is created in folder | |
global newflag | |
global newfile | |
newfile += 1 | |
newflag = True | |
class watchThread(QThread): #looping in GUI program | |
newfilesignal = pyqtSignal(int) | |
newfiletrainsignal = pyqtSignal(int) | |
def __init__(self): | |
QThread.__init__(self) | |
def run(self): | |
global newflag, trainflag | |
global newfile | |
while True: | |
if newflag : #check flag (global variable) to see if new file is created | |
if not trainflag: | |
self.newfilesignal.emit(newfile) | |
newflag = False | |
trainflag = True | |
self.usleep(100) | |
else: | |
self.newfiletrainsignal.emit(newfile) | |
newflag = False | |
self.usleep(100) | |
class trainThread(QThread): | |
trainsignal = pyqtSignal(int) | |
traindonesignal = pyqtSignal(int) | |
def __init__(self): | |
QThread.__init__(self) | |
def run(self): | |
global trainflag, newflag | |
global accuracyN, costN, newfile | |
while True: | |
if trainflag: | |
self.sleep(1) | |
self.trainsignal.emit(newfile) #emit signal if so | |
newfile = 0 | |
datax, datay = binkeras.createdata(direction) | |
costN, accuracyN = binkeras.func_keras(datax, datay) | |
self.traindonesignal.emit(newfile) | |
newflag = False | |
trainflag = False | |
self.sleep(1) | |
class App(QMainWindow): | |
tray_icon = None | |
def __init__(self): | |
super().__init__() | |
self.widget = QWidget() | |
self.ledt = QLineEdit() | |
self.watchthread = watchThread() | |
self.trainthread = trainThread() | |
self.watch = binfileWatch() | |
self.initUI() | |
self.initActions() | |
self.setFixedSize(400, 200) | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
pass | |
def initUI(self): | |
global newfile | |
self.layout1 = QHBoxLayout() #Program Description | |
self.layout2 = QHBoxLayout() #number of new files updated, now training... | |
self.layout3 = QHBoxLayout() #model Accuracy & Cost | |
self.layout4 = QHBoxLayout() #button - watchdog start, stop | |
self.layout5 = QHBoxLayout() #button - watchdog quit | |
self.main_layout = QVBoxLayout() #layout for all-in-one | |
self.explabel = QLabel(''.join([os.path.splitext(os.path.basename(direction))[0], ' 폴더 내 binary 파일을 감시합니다.'])) | |
self.newlabel = QLabel(''.join([str(newfile), ' 개의 신규 파일 존재'])) | |
self.acclabel = QLabel(''.join(['Model Accuracy : ', str(accuracyN), ' Model Cost : ', str(costN)])) | |
self.startbtn = QPushButton('감시 시작') | |
self.quitbtn = QPushButton('프로그램 종료') | |
self.layout1.addWidget(self.explabel) | |
self.layout2.addWidget(self.newlabel) | |
self.layout3.addWidget(self.acclabel) | |
self.layout4.addWidget(self.startbtn) | |
self.layout5.addWidget(self.quitbtn) | |
self.main_layout.addLayout(self.layout1) | |
self.main_layout.addLayout(self.layout2) | |
self.main_layout.addLayout(self.layout3) | |
self.main_layout.addLayout(self.layout4) | |
self.main_layout.addLayout(self.layout5) | |
self.widget.setLayout(self.main_layout) | |
self.setCentralWidget(self.widget) | |
self.tray_icon = QSystemTrayIcon(self) | |
self.tray_icon.setIcon(QIcon('icon.ico')) | |
show_action = QAction("Show", self) | |
quit_action = QAction("Exit", self) | |
hide_action = QAction("Hide", self) | |
show_action.triggered.connect(self.show) | |
hide_action.triggered.connect(self.hide) | |
quit_action.triggered.connect(qApp.quit) | |
tray_menu = QMenu() | |
tray_menu.addAction(show_action) | |
tray_menu.addAction(hide_action) | |
tray_menu.addAction(quit_action) | |
self.tray_icon.setContextMenu(tray_menu) | |
self.tray_icon.show() | |
self.watchthread.newfilesignal.connect(self.showMsgnewfile) | |
self.watchthread.newfiletrainsignal.connect(self.showMsgnewfiletrain) | |
self.trainthread.trainsignal.connect(self.showMsgtrain) | |
self.trainthread.traindonesignal.connect(self.showMsgtrainDone) | |
self.setWindowTitle('BinWatchdog') | |
self.setWindowIcon(QIcon('icon.png')) | |
self.setGeometry(0, 0, 300, 200) | |
self.show() | |
def initActions(self): | |
self.startbtn.clicked.connect(self.toggleStart) | |
self.quitbtn.clicked.connect(self.toggleQuit) | |
def closeEvent(self, event): | |
event.ignore() | |
self.hide() | |
self.tray_icon.showMessage( | |
'BinWatchDog', | |
'Watchdog in Tray', | |
QIcon('icon.ico'), | |
2000) | |
def showMsgnewfile(self, val): #executes if gain signal from thread | |
self.tray_icon.showMessage( | |
'BinWatchDog', | |
'New item updated in folder', | |
QIcon('icon.ico'), | |
2000) | |
self.newlabel.setText(''.join([str(val), ' 개의 신규 파일 존재'])) #update text | |
def showMsgnewfiletrain(self, val): #executes if gain signal from thread | |
self.tray_icon.showMessage( | |
'BinWatchDog', | |
'New item updated in folder', | |
QIcon('icon.ico'), | |
2000) | |
self.newlabel.setText(''.join([str(val), ' 개의 신규 파일 존재, Training...'])) #update text | |
def showMsgtrain(self, val): #executes if gain signal from thread | |
self.tray_icon.showMessage( | |
'BinWatchDog', | |
'Training Start....', | |
QIcon('icon.ico'), | |
2000) | |
self.newlabel.setText(''.join([str(val), ' 개의 신규 파일 존재, Training...'])) | |
def showMsgtrainDone(self, val): #executes if gain signal from thread | |
self.tray_icon.showMessage( | |
'BinWatchDog', | |
'Training Done', | |
QIcon('icon.ico'), | |
2000) | |
self.acclabel.setText(''.join(['Model Accuracy : ', str(round(float(accuracyN), 2)), ' Model Cost : ', str(round(float(costN), 2))])) | |
self.newlabel.setText(''.join([str(val), ' 개의 신규 파일 존재'])) | |
def toggleStart(self): | |
if self.watch.is_watching: #if it is now watching | |
self.watch.stop() #Watchdog stop | |
self.startbtn.setText('감시 시작') | |
else: | |
self.watch.run() | |
self.watchthread.start() | |
self.trainthread.start() | |
self.startbtn.setText('감시 종료') | |
def toggleQuit(self): | |
sys.exit() | |
if __name__ == '__main__': | |
app = QApplication(sys.argv) | |
window = App() | |
window.show() | |
sys.exit(app.exec()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment