magic_utils package

Submodules

magic_utils.eventmanager module

channel_module

This module defines the Channel class, which facilitates the management of subscribers and the notification process.

The Channel class allows subscribers to be added, removed, and notified either sequentially or in parallel. Subscribers are callable objects that can be invoked with arguments and keyword arguments.

Usage example:

# Create a new channel ch = Channel()

# Define some subscriber functions def subscriber1(arg1, kwarg1=None):

print(f”Subscriber 1: {arg1}, {kwarg1}”)

def subscriber2(arg1, kwarg1=None):

print(f”Subscriber 2: {arg1}, {kwarg1}”)

# Subscribe to the channel ch.subscribe(subscriber1, ‘data1’, kwarg1=’value1’) ch.subscribe(subscriber2, ‘data2’)

# Notify all subscribers sequentially ch.notify_all()

# Notify all subscribers in parallel (2 threads) ch.notify_parallel(num_threads=2)

class magic_utils.eventmanager.EventManager[source]

Bases: object

A class to manage subscribers and their notifications.

The EventManager class manages a list of subscribers and supports adding, removing, and notifying these subscribers. Subscribers can be notified sequentially with notify_all or in parallel with notify_parallel.

Variables:

subscribers – A list where each element is a tuple containing a callable subscriber and its associated arguments and keyword arguments.

clear()[source]

Remove all subscribers from the EventManager.

notify_all() dict[tuple[str, tuple[any], tuple[tuple[str, any]] | tuple], any][source]

Notify all subscribers sequentially.

Returns:

Dict of {tuple(sub, args, tuple(kwargs.items())) : returned_value}. The order of the items represents the order in which they were called

notify_parallel(num_threads: int = 2) dict[tuple[str, tuple[any], tuple[tuple[str, any]] | tuple], any][source]

Notify all subscribers in parallel using the specified number of threads.

If the specified number of threads exceeds the number of subscribers, a warning will be issued and the number of threads will be automatically adjusted to match the number of available subscribers.

Parameters:

num_threads – The number of threads to use for parallel notification. Must be 2 or higher.

Returns:

Dict of {tuple(sub, args, tuple(kwargs.items())) : returned_value}. The order of the items represents the order in which they were called.

Raises:

ValueError – If num_threads is less than 2.

subscribe(subscriber: callable, *subscriber_args, **subscriber_kwargs)[source]

Add a new subscriber to the EventManager.

Parameters:
  • subscriber – A callable object that will be notified.

  • subscriber_args – Arguments to be passed to the subscriber callable.

  • subscriber_kwargs – Keyword arguments to be passed to the subscriber callable.

unsubscribe(subscriber: callable)[source]

Remove a subscriber from the EventManager.

Parameters:

subscriber – The callable object to be removed from the list of subscribers.

Module contents

class magic_utils.EventManager[source]

Bases: object

A class to manage subscribers and their notifications.

The EventManager class manages a list of subscribers and supports adding, removing, and notifying these subscribers. Subscribers can be notified sequentially with notify_all or in parallel with notify_parallel.

Variables:

subscribers – A list where each element is a tuple containing a callable subscriber and its associated arguments and keyword arguments.

clear()[source]

Remove all subscribers from the EventManager.

notify_all() dict[tuple[str, tuple[any], tuple[tuple[str, any]] | tuple], any][source]

Notify all subscribers sequentially.

Returns:

Dict of {tuple(sub, args, tuple(kwargs.items())) : returned_value}. The order of the items represents the order in which they were called

notify_parallel(num_threads: int = 2) dict[tuple[str, tuple[any], tuple[tuple[str, any]] | tuple], any][source]

Notify all subscribers in parallel using the specified number of threads.

If the specified number of threads exceeds the number of subscribers, a warning will be issued and the number of threads will be automatically adjusted to match the number of available subscribers.

Parameters:

num_threads – The number of threads to use for parallel notification. Must be 2 or higher.

Returns:

Dict of {tuple(sub, args, tuple(kwargs.items())) : returned_value}. The order of the items represents the order in which they were called.

Raises:

ValueError – If num_threads is less than 2.

subscribe(subscriber: callable, *subscriber_args, **subscriber_kwargs)[source]

Add a new subscriber to the EventManager.

Parameters:
  • subscriber – A callable object that will be notified.

  • subscriber_args – Arguments to be passed to the subscriber callable.

  • subscriber_kwargs – Keyword arguments to be passed to the subscriber callable.

unsubscribe(subscriber: callable)[source]

Remove a subscriber from the EventManager.

Parameters:

subscriber – The callable object to be removed from the list of subscribers.