mirror of
https://github.com/karma-riuk/crab-webapp.git
synced 2025-07-05 14:18:12 +02:00
created the observers and handling them
This commit is contained in:
@ -1 +1,69 @@
|
||||
request2status = {}
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
from typing import Callable, Optional, Set, Any
|
||||
|
||||
|
||||
class Status(Enum):
|
||||
CREATED = "created"
|
||||
PROCESSING = "processing"
|
||||
COMPLETE = "complete"
|
||||
|
||||
|
||||
class Observer(ABC):
|
||||
@abstractmethod
|
||||
def updatePercentage(self, percentage: float):
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def updateComplete(self, results: dict):
|
||||
...
|
||||
|
||||
|
||||
class SocketObserver(Observer):
|
||||
def __init__(self, socket_emit: Callable[[str, Any], None]) -> None:
|
||||
super().__init__()
|
||||
self.socket_emit = socket_emit
|
||||
|
||||
def updatePercentage(self, percentage: float):
|
||||
self.socket_emit("progress", {'percent': percentage})
|
||||
|
||||
def updateComplete(self, results: dict):
|
||||
self.socket_emit("complete", results)
|
||||
|
||||
|
||||
class Subject:
|
||||
# TODO: maybe have a process or thread pool here to implement the queue
|
||||
def __init__(self, id: str, task: Callable) -> None:
|
||||
self.id = id
|
||||
self.observers: Set[Observer] = set()
|
||||
self.status: Status = Status.CREATED
|
||||
self.results: Optional[dict] = None
|
||||
self.task = task
|
||||
self.percent: float = -1
|
||||
|
||||
def registerObserver(self, observer: Observer) -> None:
|
||||
self.observers.add(observer)
|
||||
|
||||
def unregisterObserver(self, observer: Observer):
|
||||
self.observers.remove(observer)
|
||||
|
||||
def notifyPercentage(self, percentage: float):
|
||||
self.percent = percentage
|
||||
for observer in self.observers:
|
||||
observer.updatePercentage(percentage)
|
||||
|
||||
def notifyComplete(self, results: dict):
|
||||
self.status = Status.COMPLETE
|
||||
for observer in self.observers:
|
||||
observer.updateComplete(results)
|
||||
self.results = results
|
||||
# TODO: maybe save results to disk here?
|
||||
|
||||
def launch_task(self, *args, **kwargs):
|
||||
self.status = Status.PROCESSING
|
||||
self.task(
|
||||
*args, **kwargs, percent_cb=self.notifyPercentage, complete_cb=self.notifyComplete
|
||||
)
|
||||
|
||||
|
||||
request2status: dict[str, Subject] = {}
|
||||
|
Reference in New Issue
Block a user