diff --git a/server/api/routes/pipelines.py b/server/api/routes/pipelines.py index a7d619af55052d30a7cf2ec084c610f5663d3fae..802d99d334231c33b297ce22f8b9d165c0c8da95 100644 --- a/server/api/routes/pipelines.py +++ b/server/api/routes/pipelines.py @@ -14,6 +14,7 @@ import aiofiles from dramatiq_abort import abort from fastapi import APIRouter, UploadFile, Depends, Query from fastapi.responses import FileResponse +from starlette.responses import StreamingResponse from nacsos_data.util.auth import UserPermissions from pydantic import StringConstraints from tempfile import TemporaryDirectory @@ -25,7 +26,7 @@ from server.util.config import settings from server.data import db_engine from server.pipelines.security import UserTaskPermissionChecker, UserTaskProjectPermissions -from server.pipelines.files import get_outputs_flat, get_log, zip_folder, delete_task_directory +from server.pipelines.files import get_outputs_flat, get_log, zip_folder, delete_task_directory, stream_log logger = get_logger('nacsos.api.route.pipelines') router = APIRouter() @@ -62,6 +63,13 @@ def get_task_log(permissions: UserTaskProjectPermissions = Depends(UserTaskPermi return get_log(task_id=str(task_id)) +@router.get('/artefacts/log-stream', response_class=StreamingResponse) +def stream_task_log(permissions: UserTaskProjectPermissions = Depends(UserTaskPermissionChecker('artefacts_read'))): + return StreamingResponse(stream_log(str(permissions.task.task_id)), + media_type='text/plain', + headers={'X-Content-Type-Options': 'nosniff'}) + + @router.get('/artefacts/file', response_class=FileResponse) def get_file(filename: str, permissions: UserTaskProjectPermissions = Depends(UserTaskPermissionChecker('artefacts_read'))) \ diff --git a/server/pipelines/files.py b/server/pipelines/files.py index 694a25ac312268a086088fb2b06396dc2387b9d0..06bb61425b04decaec7506ed9236f9c3e516a47b 100644 --- a/server/pipelines/files.py +++ b/server/pipelines/files.py @@ -1,7 +1,13 @@ +import logging import os +import time +from typing import Generator from zipfile import ZipFile, Path -from .errors import MissingFileError + from server.util.config import settings +from .errors import MissingFileError + +logger = logging.getLogger('server.pipelines.files') def get_outputs_flat(task_id: str, include_fsize: bool = True) -> list[tuple[str, int] | str]: @@ -23,6 +29,52 @@ def get_outputs_flat(task_id: str, include_fsize: bool = True) -> list[tuple[str return ret +def stream_log(task_id: str, max_fails: int = 30, lookback: int = 500) -> Generator[str, None, None]: + # Construct path to logfile + filename = settings.PIPES.target_dir / task_id / 'progress.log' + + # Check if logfile exists + if not filename.exists(): + raise StopIteration + + with open(filename, 'r+') as file: + # find the size of the file + st_results = os.stat(filename) + st_size = st_results[6] + + # jump to almost the end of the file + file.seek(max(0, st_size - lookback)) + # for line in file: + # yield line.strip() + + logger.debug(f'Going to stream from {filename} starting at {max(0, st_size - lookback)}/{st_size}') + fail_count = 0 + while True: + # remember where we are now + where = file.tell() + # try to read a line + line = file.readline() + yield line + + # no full new line yet, remember we failed, sleep, and jump back where we last started + if not line: + fail_count += 1 + logger.debug(f'No full line, increasing fail count to {fail_count}') + time.sleep(1) + file.seek(where) + + # found new line, yield and reset fail counter + else: + fail_count = 0 + logger.debug(f'Yielding line: {line.strip()}') + yield line.strip() + + # we haven't seen anything new for 30 seconds, considering done! + if fail_count > max_fails: + logger.debug('Reached max fails, stopping log streaming') + raise StopIteration + + def get_log(task_id: str) -> str | None: """ Get the contents of the log file as a string.