providers#
Attributes#
Classes#
Functions#
|
Cleanup function for graceful shutdown |
|
Get a copy of current providers (thread-safe and immutable) |
|
Get a specific provider by name (thread-safe) |
|
Create authentication object for a provider |
|
Check if a process is available and not excluded |
|
Get the result storage type for a process |
|
Get complete process configuration |
|
Get list of all provider names |
|
Get list of all process IDs for a provider |
|
Check if the provider loader is healthy |
Module Contents#
- providers.logger#
- providers.PROVIDERS: ump.api.models.providers_config.ModelServers#
- providers.PROVIDERS_LOCK#
- providers.RELOAD_TIMER: threading.Timer | None = None#
- providers.DEBOUNCE_DELAY = 0.5#
- class providers.ProviderLoader#
Bases:
watchdog.events.FileSystemEventHandler
- last_reload = 0#
- reload_lock#
- on_any_event(event)#
- _debounced_reload()#
Debounce rapid file changes to avoid reload storms
- load_providers()#
- _atomic_update(new_providers: ump.api.models.providers_config.ModelServers)#
Atomically update providers with rollback capability
- providers.provider_loader#
- providers.observer#
- providers.cleanup()#
Cleanup function for graceful shutdown
- providers.get_providers() ump.api.models.providers_config.ModelServers #
Get a copy of current providers (thread-safe and immutable)
- providers.get_provider(provider_name: str) ump.api.models.providers_config.ProviderConfig | None #
Get a specific provider by name (thread-safe)
- providers.authenticate_provider(provider: ump.api.models.providers_config.ProviderConfig)#
Create authentication object for a provider
- providers.check_process_availability(provider: str, process_id: str) bool #
Check if a process is available and not excluded
- providers.check_result_storage(provider: str, process_id: str) str | None #
Get the result storage type for a process
- providers.get_process_config(provider: str, process_id: str) ump.api.models.providers_config.ProcessConfig #
Get complete process configuration
- providers.list_providers() list[str] #
Get list of all provider names
- providers.list_processes(provider: str) list[str] #
Get list of all process IDs for a provider
- providers.is_healthy() bool #
Check if the provider loader is healthy