fiftyone.operators.store.notification_service#

Notification service for ExecutionStore using MongoDB Change Streams.

Copyright 2017-2025, Voxel51, Inc.

Classes:

ChangeStreamNotificationService()

Abstract base class for change stream notification services.

MongoChangeStreamNotificationService(...[, ...])

MongoChangeStreamNotificationServiceLifecycleManager(...)

Functions:

is_notification_service_disabled()

Check if the notification service is disabled.

class fiftyone.operators.store.notification_service.ChangeStreamNotificationService#

Bases: ABC

Abstract base class for change stream notification services.

Methods:

subscribe(store_name, callback[, dataset_id])

Register a local subscriber for a specific store.

unsubscribe(subscription_id)

Unsubscribe local subscribers from a specific store.

unsubscribe_all(store_name)

Unsubscribe from all changes in a store.

notify(store_name, message_data)

Notify local subscribers and remote listeners of a change.

start()

Start watching for database changes.

stop()

Stop watching for database changes.

abstract subscribe(store_name: str, callback: Callable[[str], None], dataset_id: str | None = None) str#

Register a local subscriber for a specific store.

Parameters:
  • store_name – The name of the store to subscribe to.

  • callback – The callback to call when a change occurs.

  • dataset_id – Optional dataset ID to filter changes by.

Returns:

The subscription id.

abstract unsubscribe(subscription_id: str)#

Unsubscribe local subscribers from a specific store.

Parameters:

subscription_id – The subscription id to unsubscribe from.

abstract unsubscribe_all(store_name: str)#

Unsubscribe from all changes in a store.

Parameters:

store_name (str) – the name of the store to unsubscribe from

abstract notify(store_name: str, message_data: MessageData) None#

Notify local subscribers and remote listeners of a change.

Parameters:
  • store_name – The name of the store that changed.

  • message – The message to notify subscribers with.

abstract async start() None#

Start watching for database changes.

abstract async stop() None#

Stop watching for database changes.

class fiftyone.operators.store.notification_service.MongoChangeStreamNotificationService(collection_name: str, remote_notifier: RemoteNotifier = None, registry: LocalSubscriptionRegistry = None)#

Bases: ChangeStreamNotificationService

Methods:

subscribe(store_name, callback[, dataset_id])

Register a local subscriber for a specific store.

unsubscribe(subscription_id)

Unsubscribe from a specific store.

unsubscribe_all(store_name)

Unsubscribe from all changes in a store.

start(dedicated_event_loop)

Start watching the collection for changes using change streams or polling.

notify(store_name, message_data)

Notify local subscribers and remote listeners of a change.

stop()

Signal stop watching the collection for changes.

subscribe(store_name: str, callback: Callable[[str], None], dataset_id: str | None = None) str#

Register a local subscriber for a specific store.

Parameters:
  • store_name – The name of the store to subscribe to.

  • callback – The callback to call when a change occurs.

  • dataset_id – Optional dataset ID to filter changes by.

Returns:

The subscription id.

unsubscribe(subscription_id: str)#

Unsubscribe from a specific store.

Parameters:

subscription_id – The subscription id to unsubscribe from.

unsubscribe_all(store_name: str)#

Unsubscribe from all changes in a store.

Parameters:

store_name (str) – the name of the store to unsubscribe from

async start(dedicated_event_loop: AbstractEventLoop) None#

Start watching the collection for changes using change streams or polling.

async notify(store_name: str, message_data: MessageData) None#

Notify local subscribers and remote listeners of a change. Handles exceptions gracefully to prevent failures when clients disconnect.

Parameters:
  • store_name – The name of the store that changed

  • message_data – The message data to notify subscribers with

async stop() None#

Signal stop watching the collection for changes. Assume this is called from thread safe context.

class fiftyone.operators.store.notification_service.MongoChangeStreamNotificationServiceLifecycleManager(notification_service: MongoChangeStreamNotificationService)#

Bases: object

Methods:

start_in_dedicated_thread()

Create a dedicated event loop in a new thread and start the notification service.

stop()

start_in_dedicated_thread() None#

Create a dedicated event loop in a new thread and start the notification service.

async stop() None#
fiftyone.operators.store.notification_service.is_notification_service_disabled() bool#

Check if the notification service is disabled.