python3+PyQt5实现支持多线程的页面索引器应用程序
程序员文章站
2023-12-26 16:50:03
本文通过python3+pyqt5实现了python qt gui 快速编程的19章的页面索引器应用程序例子。
/home/yrd/eric_workspace/chap...
本文通过python3+pyqt5实现了python qt gui 快速编程的19章的页面索引器应用程序例子。
/home/yrd/eric_workspace/chap19/walker_ans.py
#!/usr/bin/env python3 import codecs import html.entities import re import sys from pyqt5.qtcore import (qmutex, qthread,pyqtsignal,qt) class walker(qthread): finished = pyqtsignal(bool,int) indexed = pyqtsignal(str,int) common_words_threshold = 250 min_word_len = 3 max_word_len = 25 invalid_first_or_last = frozenset("0123456789_") striphtml_re = re.compile(r"<[^>]*?>", re.ignorecase|re.multiline) entity_re = re.compile(r"&(\w+?);|(\d+?);") split_re = re.compile(r"\w+", re.ignorecase|re.multiline) def __init__(self, index, lock, files, filenamesforwords, commonwords, parent=none): super(walker, self).__init__(parent) self.index = index self.lock = lock self.files = files self.filenamesforwords = filenamesforwords self.commonwords = commonwords self.stopped = false self.mutex = qmutex() self.completed = false def stop(self): try: self.mutex.lock() self.stopped = true finally: self.mutex.unlock() def isstopped(self): try: self.mutex.lock() return self.stopped finally: self.mutex.unlock() def run(self): self.processfiles() self.stop() self.finished.emit(self.completed,self.index) def processfiles(self): def unichrfromentity(match): text = match.group(match.lastindex) if text.isdigit(): return chr(int(text)) u = html.entities.name2codepoint.get(text) return chr(u) if u is not none else "" for fname in self.files: if self.isstopped(): return words = set() fh = none try: fh = codecs.open(fname, "r", "utf8", "ignore") text = fh.read() except environmenterror as e: sys.stderr.write("error: {0}\n".format(e)) continue finally: if fh is not none: fh.close() if self.isstopped(): return text = self.striphtml_re.sub("", text) text = self.entity_re.sub(unichrfromentity, text) text = text.lower() for word in self.split_re.split(text): if (self.min_word_len <= len(word) <= self.max_word_len and word[0] not in self.invalid_first_or_last and word[-1] not in self.invalid_first_or_last): try: self.lock.lockforread() new = word not in self.commonwords finally: self.lock.unlock() if new: words.add(word) if self.isstopped(): return for word in words: try: self.lock.lockforwrite() files = self.filenamesforwords[word] if len(files) > self.common_words_threshold: del self.filenamesforwords[word] self.commonwords.add(word) else: files.add(str(fname)) finally: self.lock.unlock() self.indexed.emit(fname,self.index) self.completed = true /home/yrd/eric_workspace/chap19/pageindexer_ans.pyw #!/usr/bin/env python3 import collections import os import sys from pyqt5.qtcore import (qdir, qreadwritelock, qmutex,qt) from pyqt5.qtwidgets import (qapplication, qdialog, qfiledialog, qframe, qhboxlayout, qlcdnumber, qlabel, qlineedit, qlistwidget, qpushbutton, qvboxlayout) import walker_ans as walker def isalive(qobj): import sip try: sip.unwrapinstance(qobj) except runtimeerror: return false return true class form(qdialog): def __init__(self, parent=none): super(form, self).__init__(parent) self.mutex = qmutex() self.filecount = 0 self.filenamesforwords = collections.defaultdict(set) self.commonwords = set() self.lock = qreadwritelock() self.path = qdir.homepath() pathlabel = qlabel("indexing path:") self.pathlabel = qlabel() self.pathlabel.setframestyle(qframe.styledpanel|qframe.sunken) self.pathbutton = qpushbutton("set &path...") self.pathbutton.setautodefault(false) findlabel = qlabel("&find word:") self.findedit = qlineedit() findlabel.setbuddy(self.findedit) commonwordslabel = qlabel("&common words:") self.commonwordslistwidget = qlistwidget() commonwordslabel.setbuddy(self.commonwordslistwidget) fileslabel = qlabel("files containing the &word:") self.fileslistwidget = qlistwidget() fileslabel.setbuddy(self.fileslistwidget) filesindexedlabel = qlabel("files indexed") self.filesindexedlcd = qlcdnumber() self.filesindexedlcd.setsegmentstyle(qlcdnumber.flat) wordsindexedlabel = qlabel("words indexed") self.wordsindexedlcd = qlcdnumber() self.wordsindexedlcd.setsegmentstyle(qlcdnumber.flat) commonwordslcdlabel = qlabel("common words") self.commonwordslcd = qlcdnumber() self.commonwordslcd.setsegmentstyle(qlcdnumber.flat) self.statuslabel = qlabel("click the 'set path' " "button to start indexing") self.statuslabel.setframestyle(qframe.styledpanel|qframe.sunken) toplayout = qhboxlayout() toplayout.addwidget(pathlabel) toplayout.addwidget(self.pathlabel, 1) toplayout.addwidget(self.pathbutton) toplayout.addwidget(findlabel) toplayout.addwidget(self.findedit, 1) leftlayout = qvboxlayout() leftlayout.addwidget(fileslabel) leftlayout.addwidget(self.fileslistwidget) rightlayout = qvboxlayout() rightlayout.addwidget(commonwordslabel) rightlayout.addwidget(self.commonwordslistwidget) middlelayout = qhboxlayout() middlelayout.addlayout(leftlayout, 1) middlelayout.addlayout(rightlayout) bottomlayout = qhboxlayout() bottomlayout.addwidget(filesindexedlabel) bottomlayout.addwidget(self.filesindexedlcd) bottomlayout.addwidget(wordsindexedlabel) bottomlayout.addwidget(self.wordsindexedlcd) bottomlayout.addwidget(commonwordslcdlabel) bottomlayout.addwidget(self.commonwordslcd) bottomlayout.addstretch() layout = qvboxlayout() layout.addlayout(toplayout) layout.addlayout(middlelayout) layout.addlayout(bottomlayout) layout.addwidget(self.statuslabel) self.setlayout(layout) self.walkers = [] self.completed = [] self.pathbutton.clicked.connect(self.setpath) self.findedit.returnpressed.connect(self.find) self.setwindowtitle("page indexer") def stopwalkers(self): for walker in self.walkers: if isalive(walker) and walker.isrunning(): walker.stop() for walker in self.walkers: if isalive(walker) and walker.isrunning(): walker.wait() self.walkers = [] self.completed = [] def setpath(self): self.stopwalkers() self.pathbutton.setenabled(false) path = qfiledialog.getexistingdirectory(self, "choose a path to index", self.path) if not path: self.statuslabel.settext("click the 'set path' " "button to start indexing") self.pathbutton.setenabled(true) return self.statuslabel.settext("scanning directories...") qapplication.processevents() # needed for windows self.path = qdir.tonativeseparators(path) self.findedit.setfocus() self.pathlabel.settext(self.path) self.statuslabel.clear() self.fileslistwidget.clear() self.filecount = 0 self.filenamesforwords = collections.defaultdict(set) self.commonwords = set() nofilesfound = true files = [] index = 0 for root, dirs, fnames in os.walk(str(self.path)): for name in [name for name in fnames if name.endswith((".htm", ".html"))]: files.append(os.path.join(root, name)) if len(files) == 1000: self.processfiles(index, files[:]) files = [] index += 1 nofilesfound = false if files: self.processfiles(index, files[:]) nofilesfound = false if nofilesfound: self.finishedindexing() self.statuslabel.settext( "no html files found in the given path") def processfiles(self, index, files): thread = walker.walker(index, self.lock, files, self.filenamesforwords, self.commonwords, self) thread.indexed[str,int].connect(self.indexed) thread.finished[bool,int].connect(self.finished) thread.finished.connect(thread.deletelater) self.walkers.append(thread) self.completed.append(false) thread.start() thread.wait(300) # needed for windows def find(self): word = str(self.findedit.text()) if not word: try: self.mutex.lock() self.statuslabel.settext("enter a word to find in files") finally: self.mutex.unlock() return try: self.mutex.lock() self.statuslabel.clear() self.fileslistwidget.clear() finally: self.mutex.unlock() word = word.lower() if " " in word: word = word.split()[0] try: self.lock.lockforread() found = word in self.commonwords finally: self.lock.unlock() if found: try: self.mutex.lock() self.statuslabel.settext("common words like '{0}' " "are not indexed".format(word)) finally: self.mutex.unlock() return try: self.lock.lockforread() files = self.filenamesforwords.get(word, set()).copy() finally: self.lock.unlock() if not files: try: self.mutex.lock() self.statuslabel.settext("no indexed file contains " "the word '{0}'".format(word)) finally: self.mutex.unlock() return files = [qdir.tonativeseparators(name) for name in sorted(files, key=str.lower)] try: self.mutex.lock() self.fileslistwidget.additems(files) self.statuslabel.settext( "{0} indexed files contain the word '{1}'".format( len(files), word)) finally: self.mutex.unlock() def indexed(self, fname, index): try: self.mutex.lock() self.statuslabel.settext(fname) self.filecount += 1 count = self.filecount finally: self.mutex.unlock() if count % 25 == 0: try: self.lock.lockforread() indexedwordcount = len(self.filenamesforwords) commonwordcount = len(self.commonwords) finally: self.lock.unlock() try: self.mutex.lock() self.filesindexedlcd.display(count) self.wordsindexedlcd.display(indexedwordcount) self.commonwordslcd.display(commonwordcount) finally: self.mutex.unlock() elif count % 101 == 0: try: self.lock.lockforread() words = self.commonwords.copy() finally: self.lock.unlock() try: self.mutex.lock() self.commonwordslistwidget.clear() self.commonwordslistwidget.additems(sorted(words)) finally: self.mutex.unlock() def finished(self, completed, index): done = false if self.walkers: self.completed[index] = true if all(self.completed): try: self.mutex.lock() self.statuslabel.settext("finished") done = true finally: self.mutex.unlock() else: try: self.mutex.lock() self.statuslabel.settext("finished") done = true finally: self.mutex.unlock() if done: self.finishedindexing() def reject(self): if not all(self.completed): self.stopwalkers() self.finishedindexing() else: self.accept() def closeevent(self, event=none): self.stopwalkers() def finishedindexing(self): self.filesindexedlcd.display(self.filecount) self.wordsindexedlcd.display(len(self.filenamesforwords)) self.commonwordslcd.display(len(self.commonwords)) self.pathbutton.setenabled(true) qapplication.processevents() # needed for windows app = qapplication(sys.argv) form = form() form.show() app.exec_()
运行结果:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。