Skip to content
Snippets Groups Projects
Commit 9607bcaf authored by Tim Repke's avatar Tim Repke
Browse files

fix import

parent c2e001e3
No related branches found
No related tags found
1 merge request!96fix import
Pipeline #3558 failed
......@@ -16,9 +16,11 @@ from sqlalchemy.ext.asyncio import AsyncSession # noqa F401
from nacsos_data.models.pipeline import compute_fingerprint, TaskStatus
from nacsos_data.db.schemas import Task
from server.util.config import settings
from server.util.config import settings, DatabaseConfig
from server.util.logging import get_file_logger, LogRedirector
logger = logging.getLogger('nacsos.pipelines.actor')
R = TypeVar("R")
P = ParamSpec("P")
......@@ -83,7 +85,9 @@ class NacsosActor(Actor[P, R]):
@classmethod
@asynccontextmanager
async def exec_context(cls) \
-> AsyncIterator[tuple[AsyncSession, logging.Logger, Path, str, str | None, str | None]]:
-> AsyncIterator[tuple[DatabaseConfig, logging.Logger, Path, str, str | None, str | None]]:
logger.info('Opening execution context')
from nacsos_data.db import get_engine_async
db_engine = get_engine_async(settings=settings.DB) # type: ignore[arg-type]
......@@ -95,6 +99,7 @@ class NacsosActor(Actor[P, R]):
message_id = message.message_id
actor_name = message.options.get('nacsos_actor_name') # type: ignore[assignment]
task_id = message.options.get('nacsos_task_id')
logger.info(f'message_id: {message_id}, task_id: {task_id}, actor_name: {actor_name}')
target_dir = settings.PIPES.target_dir / str(task_id)
target_dir.mkdir(parents=True, exist_ok=True)
......@@ -114,22 +119,27 @@ class NacsosActor(Actor[P, R]):
else:
task_logger.warning(f'Task {task_id} not found in database.')
status: TaskStatus | None = None
with TemporaryDirectory(dir=settings.PIPES.WORKING_DIR) as work_dir, \
LogRedirector(task_logger, level='INFO', stream='stdout'), \
LogRedirector(task_logger, level='ERROR', stream='stderr'):
try:
yield session, task_logger, target_dir, work_dir, task_id, message_id
except (Exception, Warning) as e:
# Oh no, something failed. Do some post-mortem logging
tb = traceback.format_exc()
task_logger.fatal(tb)
task_logger.fatal(f'{type(e).__name__}: {e}')
status = TaskStatus.FAILED
finally:
status: TaskStatus | None = None
with TemporaryDirectory(dir=settings.PIPES.WORKING_DIR) as work_dir, \
LogRedirector(task_logger, level='INFO', stream='stdout'), \
LogRedirector(task_logger, level='ERROR', stream='stderr'):
try:
# Yielding this info implicitly executes everything in the `with:` context.
yield settings.DB, task_logger, target_dir, work_dir, task_id, message_id
except (Exception, Warning) as e:
# Oh no, something failed. Do some post-mortem logging
logger.error('Big drama from an actor!')
logger.exception(e)
tb = traceback.format_exc()
task_logger.fatal(tb)
task_logger.fatal(f'{type(e).__name__}: {e}')
status = TaskStatus.FAILED
finally:
async with db_engine.session() as session: # type: AsyncSession
task = await session.get(Task, task_id)
status = status or TaskStatus.COMPLETED
logger.debug(f'Pre-set actor status: {status}')
if status is None:
status = TaskStatus.COMPLETED
if task:
task.status = status
task.time_finished = datetime.datetime.now()
......
import asyncio
import datetime
import logging
import traceback
import uuid
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import TypeVar, NamedTuple, ParamSpec, Protocol, Callable, Awaitable, Any, TYPE_CHECKING, Generic
import dramatiq
from dramatiq import Actor, Broker
from dramatiq.middleware import CurrentMessage, AsyncIO
from dramatiq.brokers.redis import RedisBroker
from dramatiq_abort import Abortable, backends
from sqlalchemy.ext.asyncio import AsyncSession
from nacsos_data.models.pipeline import TaskModel, compute_fingerprint, TaskStatus
from nacsos_data.db.schemas import Task
from server.util.config import settings
from server.util.logging import get_file_logger, LogRedirector
logger = logging.getLogger('nacsos.pipelines.task')
broker = RedisBroker(url=settings.PIPES.REDIS_URL)
......
......@@ -2,6 +2,8 @@ from pathlib import Path
from typing import cast
import dramatiq
from nacsos_data.db import get_engine_async
from nacsos_data.db.schemas import Import
from nacsos_data.models.imports import ImportConfig, ImportModel
from nacsos_data.util import ensure_values
......@@ -25,20 +27,21 @@ def prefix_sources(sources: list[Path]):
@dramatiq.actor(actor_class=NacsosActor, max_retries=0) # type: ignore[arg-type]
async def import_task(import_id: str | None = None) -> None:
async with NacsosActor.exec_context() as (session, logger, target_dir, work_dir, task_id, message_id):
async with NacsosActor.exec_context() as (db_settings, logger, target_dir, work_dir, task_id, message_id):
logger.info('Preparing import task!')
db_engine = get_engine_async(settings=db_settings)
async with db_engine.session() as session:
if import_id is None:
raise ValueError('import_id is required here.')
if import_id is None:
raise ValueError('import_id is required here.')
stmt = select(Import).where(Import.import_id == import_id)
result = (await session.execute(stmt)).scalars().one_or_none()
if result is None:
raise NotFoundError(f'No import info for id={import_id}')
stmt = select(Import).where(Import.import_id == import_id)
result = (await session.execute(stmt)).scalars().one_or_none()
if result is None:
raise NotFoundError(f'No import info for id={import_id}')
import_details = ImportModel.model_validate(result.__dict__)
result.pipeline_task_id = task_id
await session.commit()
import_details = ImportModel.model_validate(result.__dict__)
result.pipeline_task_id = task_id
await session.commit()
user_id, project_id, config = cast(tuple[str, str, ImportConfig],
ensure_values(import_details, 'user_id', 'project_id', 'config'))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment