users now see their position in the queue update

in real time
This commit is contained in:
Karma Riuk
2025-05-19 11:55:56 +02:00
parent 694c134a54
commit 1b805ebb63
5 changed files with 103 additions and 28 deletions

View File

@ -134,6 +134,17 @@ socket.on("progress", (data) => {
socket.on("started-processing", () => { socket.on("started-processing", () => {
setProgress(0); setProgress(0);
if (queue_position_interval != null) {
clearTimeout(queue_position_interval);
queue_position_interval = null;
}
});
socket.on("changed-subject", () => {
console.log("changed-subject");
commentResultsContainer.classList.add("hidden");
refinementResultsContainer.classList.add("hidden");
progressContainer.classList.add("hidden");
}); });
socket.on("complete", (data) => { socket.on("complete", (data) => {
@ -157,6 +168,22 @@ socket.on("successful-upload", () => {
uploadStatusEl.textContent = "Upload succeeded!"; uploadStatusEl.textContent = "Upload succeeded!";
}); });
socket.on("queue_position", (data) => {
console.log(`got answer for queue position with ${data}`);
if (data.status == "waiting")
statusStatusEl.textContent = `Currently waiting, position in queue ${data.position}`;
else {
if (queue_position_interval != null) {
console.log("clearing interval");
clearTimeout(queue_position_interval);
queue_position_interval = null;
}
statusStatusEl.textContent = data.status;
}
});
let queue_position_interval = null;
document.getElementById("request-status").onclick = async () => { document.getElementById("request-status").onclick = async () => {
if (!uuid.reportValidity()) return; if (!uuid.reportValidity()) return;
const res = await fetch(`/answers/status/${uuid.value}`, { const res = await fetch(`/answers/status/${uuid.value}`, {
@ -184,6 +211,10 @@ document.getElementById("request-status").onclick = async () => {
else console.error(`Unknown type ${data.type}`); else console.error(`Unknown type ${data.type}`);
} else if (json.status == "waiting") { } else if (json.status == "waiting") {
statusStatusEl.textContent = `Currently waiting, position in queue ${json.queue_position}`; statusStatusEl.textContent = `Currently waiting, position in queue ${json.queue_position}`;
queue_position_interval = setInterval(() => {
socket.emit("get_queue_position", { uuid: uuid.value });
console.log("asking for queue posittin");
}, 3000);
} }
}; };

View File

@ -3,7 +3,7 @@ from typing import Callable
from flask import Blueprint, request, jsonify, current_app, url_for from flask import Blueprint, request, jsonify, current_app, url_for
from utils.errors import InvalidJsonFormatError from utils.errors import InvalidJsonFormatError
from utils.process_data import evaluate_comments, evaluate_refinement from utils.process_data import evaluate_comments, evaluate_refinement
from utils.observer import SocketObserver, Status, Subject, request2status from utils.observer import SocketObserver, Status, Subject, uuid2subject
import functools import functools
import json, uuid import json, uuid
@ -64,7 +64,7 @@ def handler(type_: str, validate_json: Callable, evaluate_submission: Callable):
process_id = str(uuid.uuid4()) process_id = str(uuid.uuid4())
subject = Subject(process_id, type_, evaluate_submission) subject = Subject(process_id, type_, evaluate_submission)
request2status[process_id] = subject uuid2subject[process_id] = subject
QUEUE_MANAGER.submit(subject, validated) QUEUE_MANAGER.submit(subject, validated)
url = url_for(f".status", id=process_id, _external=True) url = url_for(f".status", id=process_id, _external=True)
@ -91,37 +91,49 @@ def submit_comments(task):
@router.route('/status/<id>') @router.route('/status/<id>')
def status(id): def status(id):
if id not in request2status: if id not in uuid2subject:
return jsonify({"error": "Id doens't exist", "message": f"Id {id} doesn't exist"}), 404 return jsonify({"error": "Id doens't exist", "message": f"Id {id} doesn't exist"}), 404
subject = request2status[id] subject = uuid2subject[id]
if subject.status == Status.COMPLETE: if subject.status == Status.COMPLETE:
return jsonify({"status": "complete", "type": subject.type, "results": subject.results}) return jsonify({"status": "complete", "type": subject.type, "results": subject.results})
elif subject.status == Status.PROCESSING:
socketio = current_app.extensions['socketio']
sid = request.headers.get('X-Socket-Id')
socket_emit = functools.partial(socketio.emit, room=sid)
request2status[id] = subject socketio = current_app.extensions['socketio']
sid = request.headers.get('X-Socket-Id')
socket_emit = functools.partial(socketio.emit, to=sid)
if sid and sid in SocketObserver.socket2obs:
obs = SocketObserver.socket2obs[sid]
subject_watched_by_socket = Subject.obs2subject[obs]
if subject == subject_watched_by_socket:
return (
jsonify(
{
"error": "Already listening",
"message": f"You are already seeing the real-time progress of that request, please don't spam",
}
),
400,
)
subject_watched_by_socket.unregisterObserver(obs)
SocketObserver.socket2obs.pop(sid)
socket_emit("changing-subject")
if subject.status == Status.PROCESSING:
if sid: if sid:
if sid in SocketObserver.socket2obs:
return (
jsonify(
{
"error": "Already listening",
"message": f"You are already seeing the real-time progress of that request, please don't spam",
}
),
400,
)
obs = SocketObserver(sid, socket_emit) obs = SocketObserver(sid, socket_emit)
obs.updatePercentage(subject.percent) obs.updatePercentage(subject.percent)
subject.registerObserver(obs) subject.registerObserver(obs)
return jsonify({"status": "processing", "percent": subject.percent}) return jsonify({"status": "processing", "percent": subject.percent})
elif subject.status == Status.WAITING:
if subject.status == Status.WAITING:
if sid:
obs = SocketObserver(sid, socket_emit)
subject.registerObserver(obs)
return jsonify({"status": "waiting", "queue_position": QUEUE_MANAGER.get_position(id)}) return jsonify({"status": "waiting", "queue_position": QUEUE_MANAGER.get_position(id)})
elif subject.status == Status.CREATED:
if subject.status == Status.CREATED:
return jsonify({"status": "created"}) return jsonify({"status": "created"})
raise Exception("This code should be unreachable") raise Exception("This code should be unreachable")

View File

@ -1,9 +1,10 @@
# server.py # server.py
from flask import Flask from flask import Flask, request
from flask_cors import CORS from flask_cors import CORS
from flask_socketio import SocketIO from flask_socketio import SocketIO
from utils.observer import Status, uuid2subject
from routes.index import router as index_router from routes.index import router as index_router
from routes.answers import router as answers_router from routes.answers import QUEUE_MANAGER, router as answers_router
from routes.datasets import router as datasets_router from routes.datasets import router as datasets_router
from werkzeug.exceptions import HTTPException from werkzeug.exceptions import HTTPException
import os import os
@ -41,9 +42,22 @@ def init_socketio(app):
socketio = SocketIO(app, cors_allowed_origins='*') socketio = SocketIO(app, cors_allowed_origins='*')
@socketio.on('connect') @socketio.on('connect')
def _(): def on_connect():
print('Websocket client connected') print('Websocket client connected')
@socketio.on('get_queue_position')
def on_get_queue_position(data):
sid = request.sid
subject_id = data["uuid"]
subject = uuid2subject[subject_id]
if subject.status == Status.WAITING:
return socketio.emit(
'queue_position',
{"status": "waiting", "position": QUEUE_MANAGER.get_position(subject_id)},
to=sid,
)
return socketio.emit('queue_position', {"status": subject.status.value}, to=sid)
return socketio return socketio

View File

@ -12,6 +12,10 @@ class Status(Enum):
class Observer(ABC): class Observer(ABC):
@abstractmethod
def updateStarted(self):
...
@abstractmethod @abstractmethod
def updatePercentage(self, percentage: float): def updatePercentage(self, percentage: float):
... ...
@ -24,12 +28,15 @@ class Observer(ABC):
class SocketObserver(Observer): class SocketObserver(Observer):
socket2obs: dict[str, "SocketObserver"] = {} socket2obs: dict[str, "SocketObserver"] = {}
def __init__(self, sid: str, socket_emit: Callable[[str, Any], None]) -> None: def __init__(self, sid: str, socket_emit: Callable) -> None:
super().__init__() super().__init__()
self.sid = sid self.sid = sid
self.socket_emit = socket_emit self.socket_emit = socket_emit
SocketObserver.socket2obs[self.sid] = self SocketObserver.socket2obs[self.sid] = self
def updateStarted(self):
self.socket_emit("started-processing")
def updatePercentage(self, percentage: float): def updatePercentage(self, percentage: float):
self.socket_emit("progress", {'percent': percentage}) self.socket_emit("progress", {'percent': percentage})
@ -39,6 +46,8 @@ class SocketObserver(Observer):
class Subject: class Subject:
obs2subject: dict[Observer, "Subject"] = {}
def __init__(self, id: str, type_: str, task: Callable) -> None: def __init__(self, id: str, type_: str, task: Callable) -> None:
self.id = id self.id = id
self.type = type_ self.type = type_
@ -50,9 +59,16 @@ class Subject:
def registerObserver(self, observer: Observer) -> None: def registerObserver(self, observer: Observer) -> None:
self.observers.add(observer) self.observers.add(observer)
Subject.obs2subject[observer] = self
def unregisterObserver(self, observer: Observer): def unregisterObserver(self, observer: Observer):
self.observers.remove(observer) self.observers.remove(observer)
Subject.obs2subject.pop(observer)
def notifyStarted(self):
self.status = Status.PROCESSING
for observer in self.observers:
observer.updateStarted
def notifyPercentage(self, percentage: float): def notifyPercentage(self, percentage: float):
self.percent = percentage self.percent = percentage
@ -63,8 +79,10 @@ class Subject:
self.status = Status.COMPLETE self.status = Status.COMPLETE
for observer in self.observers: for observer in self.observers:
observer.updateComplete({"type": self.type, "results": results}) observer.updateComplete({"type": self.type, "results": results})
Subject.obs2subject.pop(observer)
self.observers.clear()
self.results = results self.results = results
# TODO: maybe save results to disk here? # TODO: maybe save results to disk here?
request2status: dict[str, Subject] = {} uuid2subject: dict[str, Subject] = {}

View File

@ -36,7 +36,7 @@ class QueueManager:
self.wait_queue.remove(subject.id) self.wait_queue.remove(subject.id)
except ValueError: except ValueError:
pass pass
subject.status = Status.PROCESSING subject.notifyStarted()
# Execute the user-defined task synchronously in this worker thread # Execute the user-defined task synchronously in this worker thread
subject.task( subject.task(
*args, *args,