From 1b805ebb6350cbe239b69c2a05db229d4af3b4f9 Mon Sep 17 00:00:00 2001 From: Karma Riuk Date: Mon, 19 May 2025 11:55:56 +0200 Subject: [PATCH] users now see their position in the queue update in real time --- public/js/index.js | 31 +++++++++++++++++++++ src/routes/answers.py | 56 +++++++++++++++++++++++--------------- src/server.py | 20 ++++++++++++-- src/utils/observer.py | 22 +++++++++++++-- src/utils/queue_manager.py | 2 +- 5 files changed, 103 insertions(+), 28 deletions(-) diff --git a/public/js/index.js b/public/js/index.js index 88851a0..222df9e 100644 --- a/public/js/index.js +++ b/public/js/index.js @@ -134,6 +134,17 @@ socket.on("progress", (data) => { socket.on("started-processing", () => { setProgress(0); + if (queue_position_interval != null) { + clearTimeout(queue_position_interval); + queue_position_interval = null; + } +}); + +socket.on("changed-subject", () => { + console.log("changed-subject"); + commentResultsContainer.classList.add("hidden"); + refinementResultsContainer.classList.add("hidden"); + progressContainer.classList.add("hidden"); }); socket.on("complete", (data) => { @@ -157,6 +168,22 @@ socket.on("successful-upload", () => { uploadStatusEl.textContent = "Upload succeeded!"; }); +socket.on("queue_position", (data) => { + console.log(`got answer for queue position with ${data}`); + if (data.status == "waiting") + statusStatusEl.textContent = `Currently waiting, position in queue ${data.position}`; + else { + if (queue_position_interval != null) { + console.log("clearing interval"); + clearTimeout(queue_position_interval); + queue_position_interval = null; + } + statusStatusEl.textContent = data.status; + } +}); + +let queue_position_interval = null; + document.getElementById("request-status").onclick = async () => { if (!uuid.reportValidity()) return; const res = await fetch(`/answers/status/${uuid.value}`, { @@ -184,6 +211,10 @@ document.getElementById("request-status").onclick = async () => { else console.error(`Unknown type ${data.type}`); } else if (json.status == "waiting") { statusStatusEl.textContent = `Currently waiting, position in queue ${json.queue_position}`; + queue_position_interval = setInterval(() => { + socket.emit("get_queue_position", { uuid: uuid.value }); + console.log("asking for queue posittin"); + }, 3000); } }; diff --git a/src/routes/answers.py b/src/routes/answers.py index ac303ae..08cc364 100644 --- a/src/routes/answers.py +++ b/src/routes/answers.py @@ -3,7 +3,7 @@ from typing import Callable from flask import Blueprint, request, jsonify, current_app, url_for from utils.errors import InvalidJsonFormatError from utils.process_data import evaluate_comments, evaluate_refinement -from utils.observer import SocketObserver, Status, Subject, request2status +from utils.observer import SocketObserver, Status, Subject, uuid2subject import functools import json, uuid @@ -64,7 +64,7 @@ def handler(type_: str, validate_json: Callable, evaluate_submission: Callable): process_id = str(uuid.uuid4()) subject = Subject(process_id, type_, evaluate_submission) - request2status[process_id] = subject + uuid2subject[process_id] = subject QUEUE_MANAGER.submit(subject, validated) url = url_for(f".status", id=process_id, _external=True) @@ -91,37 +91,49 @@ def submit_comments(task): @router.route('/status/') def status(id): - if id not in request2status: + if id not in uuid2subject: return jsonify({"error": "Id doens't exist", "message": f"Id {id} doesn't exist"}), 404 - subject = request2status[id] + subject = uuid2subject[id] if subject.status == Status.COMPLETE: return jsonify({"status": "complete", "type": subject.type, "results": subject.results}) - elif subject.status == Status.PROCESSING: - socketio = current_app.extensions['socketio'] - sid = request.headers.get('X-Socket-Id') - socket_emit = functools.partial(socketio.emit, room=sid) - request2status[id] = subject + socketio = current_app.extensions['socketio'] + sid = request.headers.get('X-Socket-Id') + socket_emit = functools.partial(socketio.emit, to=sid) + + if sid and sid in SocketObserver.socket2obs: + obs = SocketObserver.socket2obs[sid] + subject_watched_by_socket = Subject.obs2subject[obs] + if subject == subject_watched_by_socket: + return ( + jsonify( + { + "error": "Already listening", + "message": f"You are already seeing the real-time progress of that request, please don't spam", + } + ), + 400, + ) + + subject_watched_by_socket.unregisterObserver(obs) + SocketObserver.socket2obs.pop(sid) + socket_emit("changing-subject") + + if subject.status == Status.PROCESSING: if sid: - if sid in SocketObserver.socket2obs: - return ( - jsonify( - { - "error": "Already listening", - "message": f"You are already seeing the real-time progress of that request, please don't spam", - } - ), - 400, - ) - obs = SocketObserver(sid, socket_emit) obs.updatePercentage(subject.percent) subject.registerObserver(obs) return jsonify({"status": "processing", "percent": subject.percent}) - elif subject.status == Status.WAITING: + + if subject.status == Status.WAITING: + if sid: + obs = SocketObserver(sid, socket_emit) + subject.registerObserver(obs) return jsonify({"status": "waiting", "queue_position": QUEUE_MANAGER.get_position(id)}) - elif subject.status == Status.CREATED: + + if subject.status == Status.CREATED: return jsonify({"status": "created"}) raise Exception("This code should be unreachable") diff --git a/src/server.py b/src/server.py index a040a25..1bfcd46 100644 --- a/src/server.py +++ b/src/server.py @@ -1,9 +1,10 @@ # server.py -from flask import Flask +from flask import Flask, request from flask_cors import CORS from flask_socketio import SocketIO +from utils.observer import Status, uuid2subject from routes.index import router as index_router -from routes.answers import router as answers_router +from routes.answers import QUEUE_MANAGER, router as answers_router from routes.datasets import router as datasets_router from werkzeug.exceptions import HTTPException import os @@ -41,9 +42,22 @@ def init_socketio(app): socketio = SocketIO(app, cors_allowed_origins='*') @socketio.on('connect') - def _(): + def on_connect(): print('Websocket client connected') + @socketio.on('get_queue_position') + def on_get_queue_position(data): + sid = request.sid + subject_id = data["uuid"] + subject = uuid2subject[subject_id] + if subject.status == Status.WAITING: + return socketio.emit( + 'queue_position', + {"status": "waiting", "position": QUEUE_MANAGER.get_position(subject_id)}, + to=sid, + ) + return socketio.emit('queue_position', {"status": subject.status.value}, to=sid) + return socketio diff --git a/src/utils/observer.py b/src/utils/observer.py index 4d7e7e2..ace4390 100644 --- a/src/utils/observer.py +++ b/src/utils/observer.py @@ -12,6 +12,10 @@ class Status(Enum): class Observer(ABC): + @abstractmethod + def updateStarted(self): + ... + @abstractmethod def updatePercentage(self, percentage: float): ... @@ -24,12 +28,15 @@ class Observer(ABC): class SocketObserver(Observer): socket2obs: dict[str, "SocketObserver"] = {} - def __init__(self, sid: str, socket_emit: Callable[[str, Any], None]) -> None: + def __init__(self, sid: str, socket_emit: Callable) -> None: super().__init__() self.sid = sid self.socket_emit = socket_emit SocketObserver.socket2obs[self.sid] = self + def updateStarted(self): + self.socket_emit("started-processing") + def updatePercentage(self, percentage: float): self.socket_emit("progress", {'percent': percentage}) @@ -39,6 +46,8 @@ class SocketObserver(Observer): class Subject: + obs2subject: dict[Observer, "Subject"] = {} + def __init__(self, id: str, type_: str, task: Callable) -> None: self.id = id self.type = type_ @@ -50,9 +59,16 @@ class Subject: def registerObserver(self, observer: Observer) -> None: self.observers.add(observer) + Subject.obs2subject[observer] = self def unregisterObserver(self, observer: Observer): self.observers.remove(observer) + Subject.obs2subject.pop(observer) + + def notifyStarted(self): + self.status = Status.PROCESSING + for observer in self.observers: + observer.updateStarted def notifyPercentage(self, percentage: float): self.percent = percentage @@ -63,8 +79,10 @@ class Subject: self.status = Status.COMPLETE for observer in self.observers: observer.updateComplete({"type": self.type, "results": results}) + Subject.obs2subject.pop(observer) + self.observers.clear() self.results = results # TODO: maybe save results to disk here? -request2status: dict[str, Subject] = {} +uuid2subject: dict[str, Subject] = {} diff --git a/src/utils/queue_manager.py b/src/utils/queue_manager.py index 63852ee..cc92894 100644 --- a/src/utils/queue_manager.py +++ b/src/utils/queue_manager.py @@ -36,7 +36,7 @@ class QueueManager: self.wait_queue.remove(subject.id) except ValueError: pass - subject.status = Status.PROCESSING + subject.notifyStarted() # Execute the user-defined task synchronously in this worker thread subject.task( *args,