Skip to content
Snippets Groups Projects
Commit c055e051 authored by Tim Repke's avatar Tim Repke
Browse files

log streamer

parent 5e6ce7e9
No related branches found
No related tags found
1 merge request!99Main
Pipeline #3638 canceled
......@@ -14,6 +14,7 @@ import aiofiles
from dramatiq_abort import abort
from fastapi import APIRouter, UploadFile, Depends, Query
from fastapi.responses import FileResponse
from starlette.responses import StreamingResponse
from nacsos_data.util.auth import UserPermissions
from pydantic import StringConstraints
from tempfile import TemporaryDirectory
......@@ -25,7 +26,7 @@ from server.util.config import settings
from server.data import db_engine
from server.pipelines.security import UserTaskPermissionChecker, UserTaskProjectPermissions
from server.pipelines.files import get_outputs_flat, get_log, zip_folder, delete_task_directory
from server.pipelines.files import get_outputs_flat, get_log, zip_folder, delete_task_directory, stream_log
logger = get_logger('nacsos.api.route.pipelines')
router = APIRouter()
......@@ -62,6 +63,13 @@ def get_task_log(permissions: UserTaskProjectPermissions = Depends(UserTaskPermi
return get_log(task_id=str(task_id))
@router.get('/artefacts/log-stream', response_class=StreamingResponse)
def stream_task_log(permissions: UserTaskProjectPermissions = Depends(UserTaskPermissionChecker('artefacts_read'))):
return StreamingResponse(stream_log(str(permissions.task.task_id)),
media_type='text/plain',
headers={'X-Content-Type-Options': 'nosniff'})
@router.get('/artefacts/file', response_class=FileResponse)
def get_file(filename: str,
permissions: UserTaskProjectPermissions = Depends(UserTaskPermissionChecker('artefacts_read'))) \
......
import logging
import os
import time
from typing import Generator
from zipfile import ZipFile, Path
from .errors import MissingFileError
from server.util.config import settings
from .errors import MissingFileError
logger = logging.getLogger('server.pipelines.files')
def get_outputs_flat(task_id: str, include_fsize: bool = True) -> list[tuple[str, int] | str]:
......@@ -23,6 +29,52 @@ def get_outputs_flat(task_id: str, include_fsize: bool = True) -> list[tuple[str
return ret
def stream_log(task_id: str, max_fails: int = 30, lookback: int = 500) -> Generator[str, None, None]:
# Construct path to logfile
filename = settings.PIPES.target_dir / task_id / 'progress.log'
# Check if logfile exists
if not filename.exists():
raise StopIteration
with open(filename, 'r+') as file:
# find the size of the file
st_results = os.stat(filename)
st_size = st_results[6]
# jump to almost the end of the file
file.seek(max(0, st_size - lookback))
# for line in file:
# yield line.strip()
logger.debug(f'Going to stream from {filename} starting at {max(0, st_size - lookback)}/{st_size}')
fail_count = 0
while True:
# remember where we are now
where = file.tell()
# try to read a line
line = file.readline()
yield line
# no full new line yet, remember we failed, sleep, and jump back where we last started
if not line:
fail_count += 1
logger.debug(f'No full line, increasing fail count to {fail_count}')
time.sleep(1)
file.seek(where)
# found new line, yield and reset fail counter
else:
fail_count = 0
logger.debug(f'Yielding line: {line.strip()}')
yield line.strip()
# we haven't seen anything new for 30 seconds, considering done!
if fail_count > max_fails:
logger.debug('Reached max fails, stopping log streaming')
raise StopIteration
def get_log(task_id: str) -> str | None:
"""
Get the contents of the log file as a string.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment