Part B - Defining Custom AsyncWorkers¶
In Part A, we saw a default AsyncWorker in action. However, the whole point of
PySink is to help you implement your own workers, so let’s see how that works.
To create a custom AsyncWorker, create a new class that inherits from AsyncWorker
and override the run() method to implement your long-running task. This task can be anything
you like, but in this example we will keep it simple. This worker will take perform a similar task to
AsyncWorker's default behavior, but this time you will be able to specify the number
of cycles and how long each cycle takes.
First, create a class called CustomAsyncWorker that inherits from AsyncWorker. Any values that the
CustomAsyncWorker needs should be passed in via it’s __init__() method and stored as attributes. Within this class,
override the run() method to implement the new task, and any result values can be passed in
to the complete() method as keyword arguments (more on this in Part C):
1# Define the custom worker, inheriting from AsyncWorker
2class CustomAsyncWorker(AsyncWorker):
3 # Any values needed in self.run are passed in to __init__
4 def __init__(self, delay_seconds: int, cycles: int):
5 super(CustomAsyncWorker, self).__init__()
6 self.delay_seconds = delay_seconds
7 self.cycles = cycles
8
9 # Override AsyncWorker's .run() method
10 def run(self):
11 self.emit_start() # This can be called to signal the start of the task
12 progress = 5
13 self.update_progress(progress, 'Starting Task')
14 for ii in range(self.cycles):
15 time.sleep(self.delay_seconds)
16 progress += 90 / self.cycles
17 self.update_progress(progress, f'Progress message #{ii + 1}')
18
19 # You can keep track of warnings or errors by appending them to the warnings or errors lists
20 self.warnings.append('Custom Warning')
21 self.errors.append('Custom Error')
22
23 # Result values can be passed to self.complete() as kwargs.
24 self.complete(custom_result_1='result 1', custom_result_2='result 2')
Let’s take a closer look at the run() method. On line 11, the
emit_start() method is called to signal the start of the worker’s task (this is not necessary,
however it can be useful if there are a lot of workers running simultaneously). In lines 12-17 the
task is actually implemented. As shown in lines 20 and 21, you can keep track of warnings and errors by appending them
to the worker’s warnings and errors. These warnings and errors
will automatically be propagated to the results object emitted by the finished signal.
Once the task is complete, the complete() method is called. If your
task ends up having any result values (data from an API, calculation results, etc.) the simple way to emit those values is to
pass them into the complete() method as keyword arguments. The
complete() method will pack those results into the results_dict
attribute of the AsyncWorkerResults object that gets emitted by the finished
signal. To access this data, pull it from results_dict attribute
within the completion callback like this:
1# Function to be called when the worker is finished
2def completion_callback(results: AsyncWorkerResults):
3 print(f'\nWorker Complete!')
4 print(f'\tErrors: {results.errors}')
5 print(f'\tWarnings: {results.warnings}')
6 print(f'\tResult 1: {results.results_dict.get("custom_result_1")}')
7 print(f'\tResult 2: {results.results_dict.get("custom_result_2")}')
8 sys.exit() # Exit the App event loop
The keys of the results_dict are the keywords that were passed to
complete(). (In Part C, you will see how these can be passed as attributes
of the results object instead).
And that’s it. All of the other callback methods stay the same as Part A, so here’s the full script:
1from PySide6.QtWidgets import QApplication
2from PySink import AsyncManager, AsyncWorker, AsyncWorkerProgress, AsyncWorkerResults
3import sys
4import time
5
6
7# Define the custom worker, inheriting from AsyncWorker
8class CustomAsyncWorker(AsyncWorker):
9 # Any values needed in self.run are passed in to __init__
10 def __init__(self, delay_seconds: int, cycles: int):
11 super(CustomAsyncWorker, self).__init__()
12 self.delay_seconds = delay_seconds
13 self.cycles = cycles
14
15 # Override AsyncWorker's .run() method
16 def run(self):
17 self.emit_start() # This can be called to signal the start of the task
18 progress = 5
19 self.update_progress(progress, 'Starting Task')
20 for ii in range(self.cycles):
21 time.sleep(self.delay_seconds)
22 progress += 90 / self.cycles
23 self.update_progress(progress, f'Progress message #{ii + 1}')
24
25 # You can keep track of warnings or errors by appending them to the warnings or errors lists
26 self.warnings.append('Custom Warning')
27 self.errors.append('Custom Error')
28
29 # Result values can be passed to self.complete() as kwargs.
30 self.complete(custom_result_1='result 1', custom_result_2='result 2')
31
32
33# Function to be called whenever a worker's task has started
34def worker_started_callback(worker_id: str):
35 print(f'Worker with id {worker_id} has started its task\n')
36
37
38# Function to be called whenever progress is updated
39def progress_callback(progress: AsyncWorkerProgress):
40 print(f'Progress Received, value: {progress.value}, message: {progress.message}')
41
42
43# Function to be called when the worker is finished
44def completion_callback(results: AsyncWorkerResults):
45 print(f'\nWorker Complete!')
46 print(f'\tErrors: {results.errors}')
47 print(f'\tWarnings: {results.warnings}')
48 print(f'\tResult 1: {results.results_dict.get("custom_result_1")}')
49 print(f'\tResult 2: {results.results_dict.get("custom_result_2")}')
50 sys.exit() # Exit the App event loop
51
52
53def run_main():
54 app = QApplication()
55 # Create the Async Manager
56 manager = AsyncManager()
57 # Create the Worker and pass in the necessary values
58 worker = CustomAsyncWorker(delay_seconds=1, cycles=3)
59 # Connect the Worker's signals to their callbacks
60 worker.signals.started.connect(worker_started_callback)
61 worker.signals.progress.connect(progress_callback)
62 worker.signals.finished.connect(completion_callback)
63 # Start the Worker and App event loop
64 manager.start_worker(worker)
65 app.exec()
66
67
68run_main()
Running this script gives the following output in the terminal:
1Worker with id 409976aa-5999-46cb-be4a-2e244a175316 has started its task
2
3Progress Received, value: 5, message: Starting Task
4Progress Received, value: 35.0, message: Progress message #1
5Progress Received, value: 65.0, message: Progress message #2
6Progress Received, value: 95.0, message: Progress message #3
7
8Worker Complete!
9 Errors: ['Custom Error']
10 Warnings: ['Custom Warning']
11 Result 1: result 1
12 Result 2: result 2
Congratulations! You’ve just implemented your first custom AsyncWorker. The output of this worker is
still getting printed to the console, but before seeing how to implement a worker in a PySide6 app,
let’s see how to further customize an AsyncWorker in Part C - Defining Custom Results and Part D - Defining Custom Signals.