Circuit Breaker#

Important

The synchronous version is generated from the async version

Async#

class purgatory.service._async.circuitbreaker.AsyncCircuitBreaker(context: purgatory.domain.model.Context, uow: purgatory.service._async.unit_of_work.AsyncAbstractUnitOfWork, messagebus: purgatory.service._async.messagebus.AsyncMessageRegistry)#
class purgatory.service._async.circuitbreaker.PublicEvent(messagebus: purgatory.service._async.messagebus.AsyncMessageRegistry, hook: Callable[[str, typing_extensions.Literal[circuit_breaker_created, state_changed, failed, recovered], purgatory.domain.messages.base.Event], None])#
remove_listeners(messagebus: purgatory.service._async.messagebus.AsyncMessageRegistry) None#
async cb_created(event: purgatory.domain.messages.events.CircuitBreakerCreated, uow: purgatory.service._async.unit_of_work.AsyncAbstractUnitOfWork) None#
async cb_state_changed(event: purgatory.domain.messages.events.CircuitBreakerCreated, uow: purgatory.service._async.unit_of_work.AsyncAbstractUnitOfWork) None#
async cb_failed(event: purgatory.domain.messages.events.CircuitBreakerCreated, uow: purgatory.service._async.unit_of_work.AsyncAbstractUnitOfWork) None#
async cb_recovered(event: purgatory.domain.messages.events.CircuitBreakerCreated, uow: purgatory.service._async.unit_of_work.AsyncAbstractUnitOfWork) None#
class purgatory.service._async.circuitbreaker.AsyncCircuitBreakerFactory(default_threshold: int = 5, default_ttl: float = 30, exclude: Optional[List[Union[Type[BaseException], Tuple[Type[BaseException], Callable[[...], bool]]]]] = None, uow: Optional[purgatory.service._async.unit_of_work.AsyncAbstractUnitOfWork] = None)#
async initialize() None#
add_listener(listener: Callable[[str, typing_extensions.Literal[circuit_breaker_created, state_changed, failed, recovered], purgatory.domain.messages.base.Event], None]) None#
remove_listener(listener: Callable[[str, typing_extensions.Literal[circuit_breaker_created, state_changed, failed, recovered], purgatory.domain.messages.base.Event], None]) None#
async get_breaker(circuit: str, threshold: Optional[int] = None, ttl: Optional[float] = None, exclude: Optional[List[Union[Type[BaseException], Tuple[Type[BaseException], Callable[[...], bool]]]]] = None) purgatory.service._async.circuitbreaker.AsyncCircuitBreaker#

Sync#

class purgatory.service._sync.circuitbreaker.SyncCircuitBreaker(context: purgatory.domain.model.Context, uow: purgatory.service._sync.unit_of_work.SyncAbstractUnitOfWork, messagebus: purgatory.service._sync.messagebus.SyncMessageRegistry)#
class purgatory.service._sync.circuitbreaker.PublicEvent(messagebus: purgatory.service._sync.messagebus.SyncMessageRegistry, hook: Callable[[str, typing_extensions.Literal[circuit_breaker_created, state_changed, failed, recovered], purgatory.domain.messages.base.Event], None])#
remove_listeners(messagebus: purgatory.service._sync.messagebus.SyncMessageRegistry) None#
cb_created(event: purgatory.domain.messages.events.CircuitBreakerCreated, uow: purgatory.service._sync.unit_of_work.SyncAbstractUnitOfWork) None#
cb_state_changed(event: purgatory.domain.messages.events.CircuitBreakerCreated, uow: purgatory.service._sync.unit_of_work.SyncAbstractUnitOfWork) None#
cb_failed(event: purgatory.domain.messages.events.CircuitBreakerCreated, uow: purgatory.service._sync.unit_of_work.SyncAbstractUnitOfWork) None#
cb_recovered(event: purgatory.domain.messages.events.CircuitBreakerCreated, uow: purgatory.service._sync.unit_of_work.SyncAbstractUnitOfWork) None#
class purgatory.service._sync.circuitbreaker.SyncCircuitBreakerFactory(default_threshold: int = 5, default_ttl: float = 30, exclude: Optional[List[Union[Type[BaseException], Tuple[Type[BaseException], Callable[[...], bool]]]]] = None, uow: Optional[purgatory.service._sync.unit_of_work.SyncAbstractUnitOfWork] = None)#
initialize() None#
add_listener(listener: Callable[[str, typing_extensions.Literal[circuit_breaker_created, state_changed, failed, recovered], purgatory.domain.messages.base.Event], None]) None#
remove_listener(listener: Callable[[str, typing_extensions.Literal[circuit_breaker_created, state_changed, failed, recovered], purgatory.domain.messages.base.Event], None]) None#
get_breaker(circuit: str, threshold: Optional[int] = None, ttl: Optional[float] = None, exclude: Optional[List[Union[Type[BaseException], Tuple[Type[BaseException], Callable[[...], bool]]]]] = None) purgatory.service._sync.circuitbreaker.SyncCircuitBreaker#