Part B - Defining Custom AsyncWorkers

In Part A, we saw a default AsyncWorker in action. However, the whole point of PySink is to help you implement your own workers, so let’s see how that works.

To create a custom AsyncWorker, create a new class that inherits from AsyncWorker and override the run() method to implement your long-running task. This task can be anything you like, but in this example we will keep it simple. This worker will take perform a similar task to AsyncWorker's default behavior, but this time you will be able to specify the number of cycles and how long each cycle takes.

First, create a class called CustomAsyncWorker that inherits from AsyncWorker. Any values that the CustomAsyncWorker needs should be passed in via it’s __init__() method and stored as attributes. Within this class, override the run() method to implement the new task, and any result values can be passed in to the complete() method as keyword arguments (more on this in Part C):

 1# Define the custom worker, inheriting from AsyncWorker
 2class CustomAsyncWorker(AsyncWorker):
 3    # Any values needed in self.run are passed in to __init__
 4    def __init__(self, delay_seconds: int, cycles: int):
 5        super(CustomAsyncWorker, self).__init__()
 6        self.delay_seconds = delay_seconds
 7        self.cycles = cycles
 8
 9    # Override AsyncWorker's .run() method
10    def run(self):
11        self.emit_start()   # This can be called to signal the start of the task
12        progress = 5
13        self.update_progress(progress, 'Starting Task')
14        for ii in range(self.cycles):
15            time.sleep(self.delay_seconds)
16            progress += 90 / self.cycles
17            self.update_progress(progress, f'Progress message #{ii + 1}')
18
19        # You can keep track of warnings or errors by appending them to the warnings or errors lists
20        self.warnings.append('Custom Warning')
21        self.errors.append('Custom Error')
22
23        # Result values can be passed to self.complete() as kwargs.
24        self.complete(custom_result_1='result 1', custom_result_2='result 2')

Let’s take a closer look at the run() method. On line 11, the emit_start() method is called to signal the start of the worker’s task (this is not necessary, however it can be useful if there are a lot of workers running simultaneously). In lines 12-17 the task is actually implemented. As shown in lines 20 and 21, you can keep track of warnings and errors by appending them to the worker’s warnings and errors. These warnings and errors will automatically be propagated to the results object emitted by the finished signal.

Once the task is complete, the complete() method is called. If your task ends up having any result values (data from an API, calculation results, etc.) the simple way to emit those values is to pass them into the complete() method as keyword arguments. The complete() method will pack those results into the results_dict attribute of the AsyncWorkerResults object that gets emitted by the finished signal. To access this data, pull it from results_dict attribute within the completion callback like this:

1# Function to be called when the worker is finished
2def completion_callback(results: AsyncWorkerResults):
3    print(f'\nWorker Complete!')
4    print(f'\tErrors: {results.errors}')
5    print(f'\tWarnings: {results.warnings}')
6    print(f'\tResult 1: {results.results_dict.get("custom_result_1")}')
7    print(f'\tResult 2: {results.results_dict.get("custom_result_2")}')
8    sys.exit()  # Exit the App event loop

The keys of the results_dict are the keywords that were passed to complete(). (In Part C, you will see how these can be passed as attributes of the results object instead).

And that’s it. All of the other callback methods stay the same as Part A, so here’s the full script:

 1from PySide6.QtWidgets import QApplication
 2from PySink import AsyncManager, AsyncWorker, AsyncWorkerProgress, AsyncWorkerResults
 3import sys
 4import time
 5
 6
 7# Define the custom worker, inheriting from AsyncWorker
 8class CustomAsyncWorker(AsyncWorker):
 9    # Any values needed in self.run are passed in to __init__
10    def __init__(self, delay_seconds: int, cycles: int):
11        super(CustomAsyncWorker, self).__init__()
12        self.delay_seconds = delay_seconds
13        self.cycles = cycles
14
15    # Override AsyncWorker's .run() method
16    def run(self):
17        self.emit_start()   # This can be called to signal the start of the task
18        progress = 5
19        self.update_progress(progress, 'Starting Task')
20        for ii in range(self.cycles):
21            time.sleep(self.delay_seconds)
22            progress += 90 / self.cycles
23            self.update_progress(progress, f'Progress message #{ii + 1}')
24
25        # You can keep track of warnings or errors by appending them to the warnings or errors lists
26        self.warnings.append('Custom Warning')
27        self.errors.append('Custom Error')
28
29        # Result values can be passed to self.complete() as kwargs.
30        self.complete(custom_result_1='result 1', custom_result_2='result 2')
31
32
33# Function to be called whenever a worker's task has started
34def worker_started_callback(worker_id: str):
35    print(f'Worker with id {worker_id} has started its task\n')
36
37
38# Function to be called whenever progress is updated
39def progress_callback(progress: AsyncWorkerProgress):
40    print(f'Progress Received, value: {progress.value}, message: {progress.message}')
41
42
43# Function to be called when the worker is finished
44def completion_callback(results: AsyncWorkerResults):
45    print(f'\nWorker Complete!')
46    print(f'\tErrors: {results.errors}')
47    print(f'\tWarnings: {results.warnings}')
48    print(f'\tResult 1: {results.results_dict.get("custom_result_1")}')
49    print(f'\tResult 2: {results.results_dict.get("custom_result_2")}')
50    sys.exit()  # Exit the App event loop
51
52
53def run_main():
54    app = QApplication()
55    #   Create the Async Manager
56    manager = AsyncManager()
57    #   Create the Worker and pass in the necessary values
58    worker = CustomAsyncWorker(delay_seconds=1, cycles=3)
59    #   Connect the Worker's signals to their callbacks
60    worker.signals.started.connect(worker_started_callback)
61    worker.signals.progress.connect(progress_callback)
62    worker.signals.finished.connect(completion_callback)
63    #   Start the Worker and App event loop
64    manager.start_worker(worker)
65    app.exec()
66
67
68run_main()

Running this script gives the following output in the terminal:

 1Worker with id 409976aa-5999-46cb-be4a-2e244a175316 has started its task
 2
 3Progress Received, value: 5, message: Starting Task
 4Progress Received, value: 35.0, message: Progress message #1
 5Progress Received, value: 65.0, message: Progress message #2
 6Progress Received, value: 95.0, message: Progress message #3
 7
 8Worker Complete!
 9    Errors: ['Custom Error']
10    Warnings: ['Custom Warning']
11    Result 1: result 1
12    Result 2: result 2

Congratulations! You’ve just implemented your first custom AsyncWorker. The output of this worker is still getting printed to the console, but before seeing how to implement a worker in a PySide6 app, let’s see how to further customize an AsyncWorker in Part C - Defining Custom Results and Part D - Defining Custom Signals.