fiftyone.operators.store.notification_service#
Notification service for ExecutionStore using MongoDB Change Streams.
Classes:
Abstract base class for change stream notification services. |
|
|
|
Functions:
Check if the notification service is disabled. |
- class fiftyone.operators.store.notification_service.ChangeStreamNotificationService#
Bases:
ABCAbstract base class for change stream notification services.
Methods:
subscribe(store_name, callback[, dataset_id])Register a local subscriber for a specific store.
unsubscribe(subscription_id)Unsubscribe local subscribers from a specific store.
unsubscribe_all(store_name)Unsubscribe from all changes in a store.
notify(store_name, message_data)Notify local subscribers and remote listeners of a change.
start()Start watching for database changes.
stop()Stop watching for database changes.
- abstract subscribe(store_name: str, callback: Callable[[str], None], dataset_id: str | None = None) str#
Register a local subscriber for a specific store.
- Parameters:
store_name – The name of the store to subscribe to.
callback – The callback to call when a change occurs.
dataset_id – Optional dataset ID to filter changes by.
- Returns:
The subscription id.
- abstract unsubscribe(subscription_id: str)#
Unsubscribe local subscribers from a specific store.
- Parameters:
subscription_id – The subscription id to unsubscribe from.
- abstract unsubscribe_all(store_name: str)#
Unsubscribe from all changes in a store.
- Parameters:
store_name (str) – the name of the store to unsubscribe from
- abstract notify(store_name: str, message_data: MessageData) None#
Notify local subscribers and remote listeners of a change.
- Parameters:
store_name – The name of the store that changed.
message – The message to notify subscribers with.
- abstract async start() None#
Start watching for database changes.
- abstract async stop() None#
Stop watching for database changes.
- class fiftyone.operators.store.notification_service.MongoChangeStreamNotificationService(collection_name: str, remote_notifier: RemoteNotifier = None, registry: LocalSubscriptionRegistry = None)#
Bases:
ChangeStreamNotificationServiceMethods:
subscribe(store_name, callback[, dataset_id])Register a local subscriber for a specific store.
unsubscribe(subscription_id)Unsubscribe from a specific store.
unsubscribe_all(store_name)Unsubscribe from all changes in a store.
start(dedicated_event_loop)Start watching the collection for changes using change streams or polling.
notify(store_name, message_data)Notify local subscribers and remote listeners of a change.
stop()Signal stop watching the collection for changes.
- subscribe(store_name: str, callback: Callable[[str], None], dataset_id: str | None = None) str#
Register a local subscriber for a specific store.
- Parameters:
store_name – The name of the store to subscribe to.
callback – The callback to call when a change occurs.
dataset_id – Optional dataset ID to filter changes by.
- Returns:
The subscription id.
- unsubscribe(subscription_id: str)#
Unsubscribe from a specific store.
- Parameters:
subscription_id – The subscription id to unsubscribe from.
- unsubscribe_all(store_name: str)#
Unsubscribe from all changes in a store.
- Parameters:
store_name (str) – the name of the store to unsubscribe from
- async start(dedicated_event_loop: AbstractEventLoop) None#
Start watching the collection for changes using change streams or polling.
- async notify(store_name: str, message_data: MessageData) None#
Notify local subscribers and remote listeners of a change. Handles exceptions gracefully to prevent failures when clients disconnect.
- Parameters:
store_name – The name of the store that changed
message_data – The message data to notify subscribers with
- async stop() None#
Signal stop watching the collection for changes. Assume this is called from thread safe context.
- class fiftyone.operators.store.notification_service.MongoChangeStreamNotificationServiceLifecycleManager(notification_service: MongoChangeStreamNotificationService)#
Bases:
objectMethods:
Create a dedicated event loop in a new thread and start the notification service.
stop()- start_in_dedicated_thread() None#
Create a dedicated event loop in a new thread and start the notification service.
- async stop() None#
- fiftyone.operators.store.notification_service.is_notification_service_disabled() bool#
Check if the notification service is disabled.