Bases: Generic[PubSubArgs, PubSubResult]
Publish-Subscribe channel for inter cog communication
Source code in PyDrocsid/pubsub.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 | class PubSubChannel(Generic[PubSubArgs, PubSubResult]):
"""Publish-Subscribe channel for inter cog communication"""
def __init__(self) -> None:
self._subscriptions: list[Callable[PubSubArgs, Awaitable[PubSubResult | None]]] = []
async def publish(self, *args: PubSubArgs.args, **kwargs: PubSubArgs.kwargs) -> list[PubSubResult]:
"""
Publish a message to this channel. This will call all subscriptions and return
a list of their return values (if they don't return None).
"""
result = await asyncio.gather(*[sub(*args, **kwargs) for sub in self._subscriptions])
return [r for r in result if r is not None]
# calling the PubSubChannel object directly (like a function) also publishes a message
__call__ = publish
@property
def subscribe(self) -> Type[Subscription[PubSubArgs, PubSubResult]]:
"""
Decorator for async functions to register them as subscriptions of this channel.
Can only be used on methods of Cog classes.
"""
class Sub(Subscription[PubSubArgs, PubSubResult]): # type: ignore
channel = self
return Sub
def register(self, subscription: Subscription[PubSubArgs, PubSubResult]) -> None:
self._subscriptions.append(subscription)
|
publish
async
publish(*args: PubSubArgs.args, **kwargs: PubSubArgs.kwargs) -> list[PubSubResult]
Publish a message to this channel. This will call all subscriptions and return
a list of their return values (if they don't return None).
Source code in PyDrocsid/pubsub.py
| async def publish(self, *args: PubSubArgs.args, **kwargs: PubSubArgs.kwargs) -> list[PubSubResult]:
"""
Publish a message to this channel. This will call all subscriptions and return
a list of their return values (if they don't return None).
"""
result = await asyncio.gather(*[sub(*args, **kwargs) for sub in self._subscriptions])
return [r for r in result if r is not None]
|
subscribe
property
subscribe() -> Type[Subscription[PubSubArgs, PubSubResult]]
Decorator for async functions to register them as subscriptions of this channel.
Can only be used on methods of Cog classes.
Source code in PyDrocsid/pubsub.py
61
62
63
64
65
66
67
68
69
70
71 | @property
def subscribe(self) -> Type[Subscription[PubSubArgs, PubSubResult]]:
"""
Decorator for async functions to register them as subscriptions of this channel.
Can only be used on methods of Cog classes.
"""
class Sub(Subscription[PubSubArgs, PubSubResult]): # type: ignore
channel = self
return Sub
|