Skip to content
Snippets Groups Projects
Commit 5d526189 authored by Tim Repke's avatar Tim Repke
Browse files

fix minor issues with async event flow

parent 73503949
No related branches found
No related tags found
No related merge requests found
Pipeline #760 failed
......@@ -44,6 +44,10 @@ loggers:
handlers: [default]
level: DEBUG
propagate: no
asyncio:
handlers: [default]
level: INFO
propagate: no
nacsos_data:
handlers: [default]
level: DEBUG
......
......@@ -5,7 +5,12 @@ from server.util.logging import get_logger
logger = get_logger('nacsos.main')
logger.info('Starting up server')
from server import app # noqa: F401, E402
from server import app # noqa: E402
@app.on_event('startup')
async def hook_event_listeners():
from server.util.events import eventbus # noqa: F401
# config = Config()
# config.bind = f'{settings.SERVER.HOST}:{settings.SERVER.PORT}'
......
......@@ -8,4 +8,4 @@ passlib[bcrypt]==1.7.4
PyYAML==6.0
httpx[http2]==0.23.0
pymitter==0.4.0
-e git+ssh://git@gitlab.pik-potsdam.de/mcc-apsis/nacsos/nacsos-data.git@11f8606148cdf996ac878241080f491fa9b45be5#egg=nacsos_data
\ No newline at end of file
-e git+ssh://git@gitlab.pik-potsdam.de/mcc-apsis/nacsos/nacsos-data.git@784586cb24a09295466c055f582bf2f48d952984#egg=nacsos_data
\ No newline at end of file
from typing import Type
from fastapi import APIRouter
from pydantic import BaseModel
......@@ -15,6 +16,10 @@ class Event(BaseModel):
payload: AnyEvent
class UnknownEventError(Exception):
pass
@router.post('/emit')
async def emit(event: Event) -> None:
"""
......@@ -27,10 +32,12 @@ async def emit(event: Event) -> None:
logger.info(f'Received external event to be emitted: {event.event}')
if hasattr(events, event.event):
EmitEvent = getattr(events, event.event)
emit_event = EmitEvent.from_obj(event.payload)
await eventbus.emit(emit_event._name, emit_event) # noqa PyProtectedMember
EmitEvent: Type[AnyEvent] = getattr(events, event.event)
emit_event = EmitEvent.parse_obj(event.payload)
logger.debug(f'Going to emit {EmitEvent} ({emit_event})')
await eventbus.emit_async(emit_event._name, emit_event) # noqa PyProtectedMember
else:
raise UnknownEventError(f'Event {event.event} not in {AnyEvent}')
# TODO user-configurable triggers (e.g. trigger on event or cron-like)
# - create schema, model, crud in nacsos-data (probably could just be a JSONB field in `Project`
......
......@@ -7,7 +7,8 @@ from . import events
eventbus = EventEmitter(delimiter='_', wildcard=True)
AnyEvent = Union[events.BaseEvent.get_subclasses()]
AnyEventType = Literal[tuple(sc._name for sc in events.BaseEvent.get_subclasses())] # noqa PyProtectedMember
AnyEventType = Literal[tuple(sc.__name__ for sc in events.BaseEvent.get_subclasses())] # noqa PyProtectedMember
AnyEventLiteral = Literal[tuple(sc._name for sc in events.BaseEvent.get_subclasses())] # noqa PyProtectedMember
# Permanent/global listeners
eventbus.on(events.PipelineTaskStatusChangedEvent._name, imports.update_import_status) # noqa PyProtectedMember
......
......@@ -29,6 +29,7 @@ class PipelineTaskStatusChangedEvent(BaseEvent):
status: TaskStatus
project_id: str
user_id: str
import_id: str | None = None
function_name: str # incl module path
......
import logging
from datetime import datetime
from sqlalchemy import select
......@@ -6,23 +7,35 @@ from nacsos_data.db.schemas import Import
from ..events import PipelineTaskStatusChangedEvent
from ....data import db_engine
logger = logging.getLogger('nacsos.event-hooks.imports')
async def update_import_status(event: PipelineTaskStatusChangedEvent):
logger.debug(f'Maybe going to update import status {event}')
if event.function_name in [
'nacsos_lib.twitter.import.import_twitter_api',
'nacsos_lib.twitter.import.import_twitter_db'
]:
async with db_engine.session() as session:
stmt = select(Import).filter_by(pipeline_task_id=event.task_id)
import_details: Import = (await session.execute(stmt)).scalars().one_or_none()
if import_details is None and event.import_id is not None:
logger.debug(f'second try with {event.import_id}')
stmt = select(Import).filter_by(import_id=event.import_id)
import_details: Import = (await session.execute(stmt)).scalars().one_or_none()
logger.debug(repr(import_details))
if import_details is not None:
# Seems like task was started, remember the time
if event.status == 'RUNNING' and import_details.time_started is None:
logger.debug(f'Updating import start time for {import_details.import_id}')
import_details.time_started = datetime.now()
session.commit()
await session.commit()
elif (event.status == 'COMPLETED' or event.status == 'FAILED' or event.status == 'CANCELLED') \
and import_details.time_finished is None:
logger.debug(f'Updating import finish time for {import_details.import_id}')
import_details.time_finished = datetime.now()
session.commit()
await session.commit()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment