Circuit Breaker#
Important
The synchronous version is generated from the async version
Async#
- class purgatory.service._async.circuitbreaker.AsyncCircuitBreaker(context: purgatory.domain.model.Context, uow: purgatory.service._async.unit_of_work.AsyncAbstractUnitOfWork, messagebus: purgatory.service._async.messagebus.AsyncMessageRegistry)#
- class purgatory.service._async.circuitbreaker.PublicEvent(messagebus: purgatory.service._async.messagebus.AsyncMessageRegistry, hook: Callable[[str, typing_extensions.Literal[circuit_breaker_created, state_changed, failed, recovered], purgatory.domain.messages.base.Event], None])#
- remove_listeners(messagebus: purgatory.service._async.messagebus.AsyncMessageRegistry) None #
- async cb_created(event: purgatory.domain.messages.events.CircuitBreakerCreated, uow: purgatory.service._async.unit_of_work.AsyncAbstractUnitOfWork) None #
- async cb_state_changed(event: purgatory.domain.messages.events.CircuitBreakerCreated, uow: purgatory.service._async.unit_of_work.AsyncAbstractUnitOfWork) None #
- async cb_failed(event: purgatory.domain.messages.events.CircuitBreakerCreated, uow: purgatory.service._async.unit_of_work.AsyncAbstractUnitOfWork) None #
- async cb_recovered(event: purgatory.domain.messages.events.CircuitBreakerCreated, uow: purgatory.service._async.unit_of_work.AsyncAbstractUnitOfWork) None #
- class purgatory.service._async.circuitbreaker.AsyncCircuitBreakerFactory(default_threshold: int = 5, default_ttl: float = 30, exclude: Optional[List[Union[Type[BaseException], Tuple[Type[BaseException], Callable[[...], bool]]]]] = None, uow: Optional[purgatory.service._async.unit_of_work.AsyncAbstractUnitOfWork] = None)#
- async initialize() None #
- add_listener(listener: Callable[[str, typing_extensions.Literal[circuit_breaker_created, state_changed, failed, recovered], purgatory.domain.messages.base.Event], None]) None #
- remove_listener(listener: Callable[[str, typing_extensions.Literal[circuit_breaker_created, state_changed, failed, recovered], purgatory.domain.messages.base.Event], None]) None #
- async get_breaker(circuit: str, threshold: Optional[int] = None, ttl: Optional[float] = None, exclude: Optional[List[Union[Type[BaseException], Tuple[Type[BaseException], Callable[[...], bool]]]]] = None) purgatory.service._async.circuitbreaker.AsyncCircuitBreaker #
Sync#
- class purgatory.service._sync.circuitbreaker.SyncCircuitBreaker(context: purgatory.domain.model.Context, uow: purgatory.service._sync.unit_of_work.SyncAbstractUnitOfWork, messagebus: purgatory.service._sync.messagebus.SyncMessageRegistry)#
- class purgatory.service._sync.circuitbreaker.PublicEvent(messagebus: purgatory.service._sync.messagebus.SyncMessageRegistry, hook: Callable[[str, typing_extensions.Literal[circuit_breaker_created, state_changed, failed, recovered], purgatory.domain.messages.base.Event], None])#
- remove_listeners(messagebus: purgatory.service._sync.messagebus.SyncMessageRegistry) None #
- cb_created(event: purgatory.domain.messages.events.CircuitBreakerCreated, uow: purgatory.service._sync.unit_of_work.SyncAbstractUnitOfWork) None #
- cb_state_changed(event: purgatory.domain.messages.events.CircuitBreakerCreated, uow: purgatory.service._sync.unit_of_work.SyncAbstractUnitOfWork) None #
- cb_failed(event: purgatory.domain.messages.events.CircuitBreakerCreated, uow: purgatory.service._sync.unit_of_work.SyncAbstractUnitOfWork) None #
- cb_recovered(event: purgatory.domain.messages.events.CircuitBreakerCreated, uow: purgatory.service._sync.unit_of_work.SyncAbstractUnitOfWork) None #
- class purgatory.service._sync.circuitbreaker.SyncCircuitBreakerFactory(default_threshold: int = 5, default_ttl: float = 30, exclude: Optional[List[Union[Type[BaseException], Tuple[Type[BaseException], Callable[[...], bool]]]]] = None, uow: Optional[purgatory.service._sync.unit_of_work.SyncAbstractUnitOfWork] = None)#
- initialize() None #
- add_listener(listener: Callable[[str, typing_extensions.Literal[circuit_breaker_created, state_changed, failed, recovered], purgatory.domain.messages.base.Event], None]) None #
- remove_listener(listener: Callable[[str, typing_extensions.Literal[circuit_breaker_created, state_changed, failed, recovered], purgatory.domain.messages.base.Event], None]) None #
- get_breaker(circuit: str, threshold: Optional[int] = None, ttl: Optional[float] = None, exclude: Optional[List[Union[Type[BaseException], Tuple[Type[BaseException], Callable[[...], bool]]]]] = None) purgatory.service._sync.circuitbreaker.SyncCircuitBreaker #