Skip to content
Snippets Groups Projects
Commit 73503949 authored by Tim Repke's avatar Tim Repke
Browse files

add server-wide event handling

parent 254ad282
No related branches found
No related tags found
No related merge requests found
Pipeline #757 failed
......@@ -33,4 +33,4 @@ router.include_router(project.router, prefix='/project', tags=['project'])
router.include_router(imports.router, prefix='/imports', tags=['imports'])
# route for triggering events in the system
router.include_router(imports.router, prefix='/events', tags=['events'])
router.include_router(events.router, prefix='/events', tags=['events'])
from fastapi import APIRouter
from pydantic import BaseModel
from ...util.events import eventbus, events, AnyEvent, AnyEventType
from ...util.logging import get_logger
logger = get_logger('nacsos.api.route.events')
router = APIRouter()
logger.debug('Setup nacsos.api.route.events router')
class Event(BaseModel):
event: AnyEventType
payload: AnyEvent
@router.post('/emit')
async def emit(event: Event) -> None:
"""
This route can be used to trigger an event on the system.
FIXME: This should require some sort of authentication!
:param event: event (incl optional payload) to emit
:return: void
"""
logger.info(f'Received external event to be emitted: {event.event}')
if hasattr(events, event.event):
EmitEvent = getattr(events, event.event)
emit_event = EmitEvent.from_obj(event.payload)
await eventbus.emit(emit_event._name, emit_event) # noqa PyProtectedMember
# TODO user-configurable triggers (e.g. trigger on event or cron-like)
# - create schema, model, crud in nacsos-data (probably could just be a JSONB field in `Project`
# - create @startup function that sets up all listeners
# - list all triggers
# - add trigger
# - remove trigger
# - update trigger
......@@ -5,11 +5,12 @@ from nacsos_data.db.crud.imports import \
read_all_imports_for_project, \
read_import, upsert_import, \
read_item_count_for_import
from nacsos_data.util.pipelines.imports import submit_jsonl_import_task
from server.data import db_engine
from server.util.pipelines.imports import submit_jsonl_import_task
from server.util.security import UserPermissionChecker, UserPermissions, InsufficientPermissions
from server.util.logging import get_logger
from server.util.config import settings
logger = get_logger('nacsos.api.route.imports')
router = APIRouter()
......@@ -60,7 +61,9 @@ async def trigger_import(import_id: str,
if str(import_details.project_id) == str(permissions.permissions.project_id):
if import_details.type == ImportType.jsonl:
task_id = await submit_jsonl_import_task(import_id=import_id, engine=db_engine)
await submit_jsonl_import_task(import_id=import_id,
base_url=settings.PIPES.API_URL,
engine=db_engine)
else:
raise NotImplementedError(f'No import trigger for "{import_details.type}" implemented yet.')
else:
......
from fastapi import APIRouter, status as http_status
from fastapi import APIRouter
from fastapi.responses import PlainTextResponse
from server.util.logging import get_logger
from server.util.security import InsufficientPermissions
......
......@@ -90,10 +90,15 @@ class UsersConfig(BaseModel):
REGISTRATION_ENABLED: bool = False # Set this to true to enable the registration endpoint
class PipelinesConfig(BaseModel):
API_URL: str = 'http://localhost:8000/api'
class Settings(BaseSettings):
SERVER: ServerConfig = ServerConfig()
DB: DatabaseConfig = DatabaseConfig()
USERS: UsersConfig = UsersConfig()
PIPES: PipelinesConfig = PipelinesConfig()
# EMAIL: EmailConfig
......
from datetime import datetime
from sqlalchemy import select
from nacsos_data.db.schemas import Import
from ..events import PipelineTaskStatusChangedEvent
from ....data import db_engine
async def update_import_status(event: PipelineTaskStatusChangedEvent):
if event.function_name in [
'nacsos_lib.twitter.import.import_twitter_api',
'nacsos_lib.twitter.import.import_twitter_db'
]:
async with db_engine.session() as session:
stmt = select(Import).filter_by(pipeline_task_id=event.task_id)
import_details: Import = (await session.execute(stmt)).scalars().one_or_none()
if import_details is not None:
# Seems like task was started, remember the time
if event.status == 'RUNNING' and import_details.time_started is None:
import_details.time_started = datetime.now()
session.commit()
elif (event.status == 'COMPLETED' or event.status == 'FAILED' or event.status == 'CANCELLED') \
and import_details.time_finished is None:
import_details.time_finished = datetime.now()
session.commit()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment