Skip to content
Snippets Groups Projects
Commit 254ad282 authored by Tim Repke's avatar Tim Repke
Browse files

add events that can be emitted server-wide

parent 42a1fab6
No related branches found
No related tags found
No related merge requests found
...@@ -6,6 +6,7 @@ from .routes import auth ...@@ -6,6 +6,7 @@ from .routes import auth
from .routes import projects from .routes import projects
from .routes import project from .routes import project
from .routes import imports from .routes import imports
from .routes import events
# this router proxies all /api endpoints # this router proxies all /api endpoints
router = APIRouter() router = APIRouter()
...@@ -30,3 +31,6 @@ router.include_router(project.router, prefix='/project', tags=['project']) ...@@ -30,3 +31,6 @@ router.include_router(project.router, prefix='/project', tags=['project'])
# route for project related things # route for project related things
router.include_router(imports.router, prefix='/imports', tags=['imports']) router.include_router(imports.router, prefix='/imports', tags=['imports'])
# route for triggering events in the system
router.include_router(imports.router, prefix='/events', tags=['events'])
from typing import Union, Literal
from pymitter import EventEmitter
from .hooks import imports
from . import events
eventbus = EventEmitter(delimiter='_', wildcard=True)
AnyEvent = Union[events.BaseEvent.get_subclasses()]
AnyEventType = Literal[tuple(sc._name for sc in events.BaseEvent.get_subclasses())] # noqa PyProtectedMember
# Permanent/global listeners
eventbus.on(events.PipelineTaskStatusChangedEvent._name, imports.update_import_status) # noqa PyProtectedMember
__all__ = ['eventbus', 'events', 'AnyEvent', 'AnyEventType']
from typing import Literal, ClassVar
from pydantic import BaseModel
class BaseEvent(BaseModel):
_name = ClassVar[str]
@classmethod
def get_subclasses(cls):
def recurse(sub_cls):
if hasattr(sub_cls, '__subclasses__'):
for sc in sub_cls.__subclasses__():
return sub_cls.__subclasses__() + recurse(sc)
return []
return tuple(set(recurse(cls)))
TaskStatus = Literal['PENDING', 'RUNNING', 'COMPLETED', 'FAILED', 'CANCELLED'] # FIXME move somewhere else
class PipelineTaskStatusChangedEvent(BaseEvent):
"""
Emitted when the pipeline service calls the nacsos-core API and tells it about a status change of a task
"""
_name = 'PipelineTaskStatus_*'
task_id: str
status: TaskStatus
project_id: str
user_id: str
function_name: str # incl module path
class PipelineTaskStatusCompletedEvent(PipelineTaskStatusChangedEvent):
"""
More specific version of `PipelineTaskStatusChangedEvent` emitted when a task finished (successfully/completed)
"""
_name = 'PipelineTask_completed'
class PipelineTaskStatusStartedEvent(PipelineTaskStatusChangedEvent):
"""
More specific version of `PipelineTaskStatusChangedEvent` emitted when a task started
"""
_name = 'PipelineTask_started'
class PipelineTaskStatusFailedEvent(PipelineTaskStatusChangedEvent):
"""
More specific version of `PipelineTaskStatusChangedEvent` emitted when a task finished (failed)
"""
_name = 'PipelineTask_failed'
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment