Skip to content
Snippets Groups Projects
Commit f89f1611 authored by Tim Repke's avatar Tim Repke
Browse files

drop pipeline events, functionality moved to nacsos-data

parent fe3555da
No related branches found
No related tags found
1 merge request!21Deploy deletion cascades & pipeline integration
Pipeline #1101 failed
......@@ -7,4 +7,4 @@ __pycache__
/hypercorn.error
.pyc
server.md
dump.sql
\ No newline at end of file
dumps/
\ No newline at end of file
flake8==6.0.0
tox==4.4.5
tox==4.4.6
pytest==7.2.1
pytest-cov==4.0.0
mypy==1.0.1
types-toml==0.10.8.4
\ No newline at end of file
types-toml==0.10.8.5
\ No newline at end of file
......@@ -2,7 +2,7 @@ from typing import Union, Literal, TYPE_CHECKING
from pymitter import EventEmitter
from .hooks import imports
from .hooks import example
from . import events
eventbus = EventEmitter(delimiter='_', wildcard=True)
......@@ -16,6 +16,6 @@ else:
AnyEventLiteral = Literal[tuple(sc._name for sc in events.BaseEvent.get_subclasses())] # noqa PyProtectedMember
# Permanent/global listeners
eventbus.on(events.PipelineTaskStatusChangedEvent._name, imports.update_import_status) # noqa PyProtectedMember
eventbus.on(events.ExampleEvent._name, example.test_listener) # noqa PyProtectedMember
__all__ = ['eventbus', 'events', 'AnyEvent', 'AnyEventType']
from typing import Literal, ClassVar
from typing import ClassVar
from pydantic import BaseModel
......@@ -17,38 +17,10 @@ class BaseEvent(BaseModel):
return tuple(set(recurse(cls)))
TaskStatus = Literal['PENDING', 'RUNNING', 'COMPLETED', 'FAILED', 'CANCELLED'] # FIXME move somewhere else
class ExampleEvent(BaseEvent):
_name = 'Example_*'
payload_a: str
class PipelineTaskStatusChangedEvent(BaseEvent):
"""
Emitted when the pipeline service calls the nacsos-core API and tells it about a status change of a task
"""
_name = 'PipelineTaskStatus_*'
task_id: str
status: TaskStatus
project_id: str
user_id: str
import_id: str | None = None
function_name: str # incl module path
class PipelineTaskStatusCompletedEvent(PipelineTaskStatusChangedEvent):
"""
More specific version of `PipelineTaskStatusChangedEvent` emitted when a task finished (successfully/completed)
"""
_name = 'PipelineTask_completed'
class PipelineTaskStatusStartedEvent(PipelineTaskStatusChangedEvent):
"""
More specific version of `PipelineTaskStatusChangedEvent` emitted when a task started
"""
_name = 'PipelineTask_started'
class PipelineTaskStatusFailedEvent(PipelineTaskStatusChangedEvent):
"""
More specific version of `PipelineTaskStatusChangedEvent` emitted when a task finished (failed)
"""
_name = 'PipelineTask_failed'
class ExampleSubEvent(ExampleEvent):
_name = 'Example_sub'
import logging
from ..events import ExampleEvent
logger = logging.getLogger('nacsos.event-hooks.test')
def test_listener(event: ExampleEvent):
logger.debug(f'Received event {event}')
import logging
from datetime import datetime
from sqlalchemy import select
from nacsos_data.db.schemas import Import
from ..events import PipelineTaskStatusChangedEvent
from ....data import db_engine
logger = logging.getLogger('nacsos.event-hooks.imports')
async def update_import_status(event: PipelineTaskStatusChangedEvent):
logger.debug(f'Maybe going to update import status {event}')
if event.function_name in [
'nacsos_lib.twitter.import.import_twitter_api',
'nacsos_lib.twitter.import.import_twitter_db',
'nacsos_lib.academic.import.import_wos_file'
]:
async with db_engine.session() as session:
stmt = select(Import).filter_by(pipeline_task_id=event.task_id)
import_details: Import | None = (await session.execute(stmt)).scalars().one_or_none()
if import_details is None and event.import_id is not None:
logger.debug(f'second try with {event.import_id}')
stmt = select(Import).filter_by(import_id=event.import_id)
import_details = (await session.execute(stmt)).scalars().one_or_none()
if import_details is not None:
# Seems like task was started, remember the time
if event.status == 'RUNNING' and import_details.time_started is None:
logger.debug( # type: ignore[unreachable]
f'Updating import start time for {import_details.import_id}'
)
import_details.time_started = datetime.now()
await session.commit()
elif (event.status == 'COMPLETED' or event.status == 'FAILED' or event.status == 'CANCELLED') \
and import_details.time_finished is None:
logger.debug( # type: ignore[unreachable]
f'Updating import finish time for {import_details.import_id}'
)
import_details.time_finished = datetime.now()
await session.commit()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment