napari.qt.threading.WorkerBase¶
-
class
napari.qt.threading.
WorkerBase
(*args, SignalsClass=<class 'napari._qt.qthreading.WorkerBaseSignals'>, **kwargs)[source]¶ Bases:
PyQt5.QtCore.QRunnable
Base class for creating a Worker that can run in another thread.
- Parameters
SignalsClass (type, optional) – A QObject subclass that contains signals, by default WorkerBaseSignals
-
signals
¶ signal emitter object. To allow identify which worker thread emitted signal.
- Type
Methods
Attributes
-
create
(Callable[[], None]) → QRunnable¶
-
quit
()[source]¶ Send a request to abort the worker.
Note
It is entirely up to subclasses to honor this method by checking
self.abort_requested
periodically in theirworker.work
method, and exiting ifTrue
.- Return type
-
run
()[source]¶ Start the worker.
The end-user should never need to call this function. But it cannot be made private or renamed, since it is called by Qt.
The order of method calls when starting a worker is:
calls QThreadPool.globalInstance().start(worker) | triggered by the QThreadPool.start() method | | called by worker.run | | | V V V worker.start -> worker.run -> worker.work
This is the function that actually gets called when calling
QThreadPool.start(worker)()
. It simply wraps thework()
method, and emits a few signals. Subclasses should NOT override this method (except with good reason), and instead should implementwork()
.
-
setAutoDelete
(self, bool)¶
-
start
()[source]¶ Start this worker in a thread and add it to the global threadpool.
The order of method calls when starting a worker is:
calls QThreadPool.globalInstance().start(worker) | triggered by the QThreadPool.start() method | | called by worker.run | | | V V V worker.start -> worker.run -> worker.work
-
work
()[source]¶ Main method to execute the worker.
The end-user should never need to call this function. But subclasses must implement this method (See
GeneratorFunction.work()
for an example implementation). Minimally, it should checkself.abort_requested
periodically and exit if True.Examples
class MyWorker(WorkerBase): def work(self): i = 0 while True: if self.abort_requested: self.aborted.emit() break i += 1 if i > max_iters: break time.sleep(0.5)