Repositories#

Important

The synchronous version is generated from the async version

Async#

exception purgatory.service._async.repository.ConfigurationError#
class purgatory.service._async.repository.AsyncAbstractRepository#
messages: List[purgatory.domain.messages.base.Message]#
async initialize() None#

Override to initialize the repository asynchronously

abstract async get(name: str) Optional[purgatory.domain.model.Context]#

Load breakers from the repository.

abstract async register(context: purgatory.domain.model.Context) None#

Add a circuit breaker into the repository.

abstract async update_state(name: str, state: str, opened_at: Optional[float]) None#

Sate the new staate of the circuit breaker into the repository.

abstract async inc_failures(name: str, failure_count: int) None#

Increment the number of failure in the repository.

abstract async reset_failure(name: str) None#

Reset the number of failure in the repository.

class purgatory.service._async.repository.AsyncInMemoryRepository#
messages: List[purgatory.domain.messages.base.Message]#
async get(name: str) Optional[purgatory.domain.model.Context]#

Add a circuit breaker into the repository.

async register(context: purgatory.domain.model.Context) None#

Add a circuit breaker into the repository.

async update_state(name: str, state: str, opened_at: Optional[float]) None#

Because the get method return the object directly, nothing to do here.

async inc_failures(name: str, failure_count: int) None#

Because the get method return the object directly, nothing to do here.

async reset_failure(name: str) None#

Reset the number of failure in the repository.

class purgatory.service._async.repository.AsyncRedisRepository(url: str)#
messages: List[purgatory.domain.messages.base.Message]#
async initialize() None#

Override to initialize the repository asynchronously

async get(name: str) Optional[purgatory.domain.model.Context]#

Add a circuit breaker into the repository.

async register(context: purgatory.domain.model.Context) None#

Add a circuit breaker into the repository.

async update_state(name: str, state: str, opened_at: Optional[float]) None#

Store the new state in the repository.

async inc_failures(name: str, failure_count: int) None#

Store the new state in the repository.

async reset_failure(name: str) None#

Reset the number of failure in the repository.

Sync#

exception purgatory.service._sync.repository.ConfigurationError#
class purgatory.service._sync.repository.SyncAbstractRepository#
messages: List[purgatory.domain.messages.base.Message]#
initialize() None#

Override to initialize the repository asynchronously

abstract get(name: str) Optional[purgatory.domain.model.Context]#

Load breakers from the repository.

abstract register(context: purgatory.domain.model.Context) None#

Add a circuit breaker into the repository.

abstract update_state(name: str, state: str, opened_at: Optional[float]) None#

Sate the new staate of the circuit breaker into the repository.

abstract inc_failures(name: str, failure_count: int) None#

Increment the number of failure in the repository.

abstract reset_failure(name: str) None#

Reset the number of failure in the repository.

class purgatory.service._sync.repository.SyncInMemoryRepository#
messages: List[purgatory.domain.messages.base.Message]#
get(name: str) Optional[purgatory.domain.model.Context]#

Add a circuit breaker into the repository.

register(context: purgatory.domain.model.Context) None#

Add a circuit breaker into the repository.

update_state(name: str, state: str, opened_at: Optional[float]) None#

Because the get method return the object directly, nothing to do here.

inc_failures(name: str, failure_count: int) None#

Because the get method return the object directly, nothing to do here.

reset_failure(name: str) None#

Reset the number of failure in the repository.

class purgatory.service._sync.repository.SyncRedisRepository(url: str)#
messages: List[purgatory.domain.messages.base.Message]#
initialize() None#

Override to initialize the repository asynchronously

get(name: str) Optional[purgatory.domain.model.Context]#

Add a circuit breaker into the repository.

register(context: purgatory.domain.model.Context) None#

Add a circuit breaker into the repository.

update_state(name: str, state: str, opened_at: Optional[float]) None#

Store the new state in the repository.

inc_failures(name: str, failure_count: int) None#

Store the new state in the repository.

reset_failure(name: str) None#

Reset the number of failure in the repository.