Skip to content

Instantly share code, notes, and snippets.

@no1xsyzy
Last active October 15, 2020 14:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save no1xsyzy/ef171f987a2e78e5f831e523a14c7e9e to your computer and use it in GitHub Desktop.
Save no1xsyzy/ef171f987a2e78e5f831e523a14c7e9e to your computer and use it in GitHub Desktop.
import os
import sys
import time
from functools import cached_property
from multiprocessing import Pool
from PyQt5.QtCore import QObject, pyqtSignal, QThread
from PyQt5.QtWidgets import QApplication, QVBoxLayout, QWidget
from PyQt5.QtWidgets import QPushButton
class LoadFiles(QObject):
run_end = pyqtSignal()
def __init__(self):
super().__init__()
def run(self):
print("hi")
nums = [1, 2, 3, 4, 5, 6]
with Pool(6) as pool:
results = [pool.apply_async(self.work, args=(t,)) for t in nums]
time.sleep(1)
try:
print([res.get(timeout=1) for res in results])
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
self.run_end.emit()
@classmethod
def work(cls, x):
print(f'{os.getpid()=}, {time.time()=}, {__name__=}')
time.sleep(1)
return x * x
class Main(QWidget):
def __init__(self):
super().__init__()
self.setLayout(self.layout)
self.btn.clicked.connect(self.load_files)
@cached_property
def layout(self):
layout = QVBoxLayout()
layout.addWidget(self.btn)
return layout
@cached_property
def btn(self):
return QPushButton("Do Calc")
@cached_property
def file_loader(self):
file_loader = LoadFiles()
file_loader.run_end.connect(self.stop_thread)
return file_loader
@cached_property
def load_thread(self):
load_thread = QThread()
load_thread.started.connect(self.file_loader.run)
self.file_loader.moveToThread(load_thread)
return load_thread
def load_files(self):
# openfile_name = QFileDialog.getOpenFileNames(self,'选择文件',"./","TDMS Files(*.tdms)")
# print(openfile_name)
self.load_thread.start()
print('现在线程状态 :', self.load_thread.isRunning())
def stop_thread(self):
self.load_thread.quit()
self.load_thread.wait()
print('现在线程状态 :', self.load_thread.isRunning())
if __name__ == '__main__':
app = QApplication(sys.argv)
ui = Main()
ui.show()
sys.exit(app.exec_())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment