Skip to content
Snippets Groups Projects
Commit fcd9af4c authored by Tim Repke's avatar Tim Repke
Browse files

Merge branch 'master' into 'production'

Deploy deletion cascades & pipeline integration

See merge request !21
parents 74010082 d6a4b45c
No related branches found
No related tags found
1 merge request!21Deploy deletion cascades & pipeline integration
Pipeline #1117 passed with warnings
...@@ -6,4 +6,5 @@ __pycache__ ...@@ -6,4 +6,5 @@ __pycache__
/hypercorn.access /hypercorn.access
/hypercorn.error /hypercorn.error
.pyc .pyc
server.md server.md
\ No newline at end of file dumps/
\ No newline at end of file
...@@ -13,6 +13,12 @@ cache: ...@@ -13,6 +13,12 @@ cache:
- .cache/pip - .cache/pip
- venv/ - venv/
before_script:
# Provides credentials to pip to access private GitLab PyPi index.
- echo "machine gitlab.pik-potsdam.de" > ~/.netrc
- echo "login gitlab-ci-token" >> ~/.netrc
- echo "password ${CI_JOB_TOKEN}" >> ~/.netrc
stages: stages:
- build - build
- test - test
...@@ -20,7 +26,7 @@ stages: ...@@ -20,7 +26,7 @@ stages:
build-job: build-job:
stage: build stage: build
image: python:latest image: python:3.10.9
script: script:
- python -V - python -V
- pip install virtualenv - pip install virtualenv
...@@ -28,23 +34,25 @@ build-job: ...@@ -28,23 +34,25 @@ build-job:
- source venv/bin/activate - source venv/bin/activate
- pwd - pwd
- ls -lisah - ls -lisah
- git config --global url."https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.pik-potsdam.de/".insteadOf "ssh://git@gitlab.pik-potsdam.de/" - git config --global url."https://gitlab.pik-potsdam.de/".insteadOf "ssh://git@gitlab.pik-potsdam.de/"
- pip install -r requirements.txt - pip install -r requirements.txt
- pip install -r requirements_dev.txt - pip install -r requirements_dev.txt
test-job1: test-job1:
stage: test stage: test
image: python:latest image: python:3.10.9
script: script:
- source venv/bin/activate - source venv/bin/activate
- flake8 --config .flake8 - flake8 --config .flake8
test-job2: test-job2:
stage: test stage: test
image: python:latest image: python:3.10.9
script: script:
- source venv/bin/activate - source venv/bin/activate
- mypy --config-file=pyproject.toml server - which python
- pip freeze
- python -m mypy --config-file=pyproject.toml server
deploy-to-production: deploy-to-production:
stage: deploy stage: deploy
...@@ -58,7 +66,7 @@ deploy-to-production: ...@@ -58,7 +66,7 @@ deploy-to-production:
- echo $HOME - echo $HOME
- echo "Reset git config" - echo "Reset git config"
- rm /home/gitlab-runner/.gitconfig - rm /home/gitlab-runner/.gitconfig
- git config --global url."https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.pik-potsdam.de/".insteadOf "ssh://git@gitlab.pik-potsdam.de/" - git config --global url."https://gitlab.pik-potsdam.de/".insteadOf "ssh://git@gitlab.pik-potsdam.de/"
- git config --global user.name gitlab-runner - git config --global user.name gitlab-runner
- git config --global user.email gitlab-runner@gitlab.pik-potsdam.de - git config --global user.email gitlab-runner@gitlab.pik-potsdam.de
- git config --global -l --show-origin - git config --global -l --show-origin
...@@ -78,6 +86,8 @@ deploy-to-production: ...@@ -78,6 +86,8 @@ deploy-to-production:
- echo "Creating new virtual environment" - echo "Creating new virtual environment"
- python3.10 -m venv venv - python3.10 -m venv venv
- source venv/bin/activate - source venv/bin/activate
- which python
- python -V
- echo "Installing requirements" - echo "Installing requirements"
- pip install -r requirements.txt - pip install -r requirements.txt
- echo "Handling migrations" - echo "Handling migrations"
......
...@@ -2,7 +2,7 @@ NACSOS_LOG_CONF_FILE="config/logging.toml" ...@@ -2,7 +2,7 @@ NACSOS_LOG_CONF_FILE="config/logging.toml"
NACSOS_SERVER__HOST="localhost" NACSOS_SERVER__HOST="localhost"
NACSOS_SERVER__PORT=8081 NACSOS_SERVER__PORT=8081
NACSOS_SERVER__CORS_ORIGINS='["http://localhost:8080","http://localhost:8081", "http://localhost:8082","http://localhost", "http://0.0.0.0:8081", "http://0.0.0.0", "http://127.0.0.1:8081", "http://127.0.0.1"]' NACSOS_SERVER__CORS_ORIGINS='["http://localhost:8080", "http://127.0.0.1:8080", "http://localhost:8081", "http://localhost:8082","http://localhost", "http://0.0.0.0:8081", "http://0.0.0.0", "http://127.0.0.1:8081", "http://127.0.0.1"]'
NACSOS_SERVER__HEADER_CORS=true NACSOS_SERVER__HEADER_CORS=true
NACSOS_DB__HOST="localhost" NACSOS_DB__HOST="localhost"
...@@ -11,4 +11,5 @@ NACSOS_DB__USER="root" ...@@ -11,4 +11,5 @@ NACSOS_DB__USER="root"
NACSOS_DB__PASSWORD="root" NACSOS_DB__PASSWORD="root"
NACSOS_DB__DATABASE="nacsos_core" NACSOS_DB__DATABASE="nacsos_core"
NACSOS_USERS__DEFAULT_USER="b0949d0e-e3e1-47c3-9a5d-a2cbbdc2ea23" NACSOS_USERS__DEFAULT_USER="user1"
\ No newline at end of file #NACSOS_USERS__DEFAULT_USER
\ No newline at end of file
...@@ -3,7 +3,7 @@ version: '3.8' ...@@ -3,7 +3,7 @@ version: '3.8'
services: services:
db: db:
container_name: nacsos_postgres container_name: nacsos_postgres
image: postgres:14.2-bullseye image: postgres:15.1-alpine
restart: always restart: always
environment: environment:
POSTGRES_USER: root POSTGRES_USER: root
......
fastapi==0.88.0 fastapi==0.92.0
hypercorn==0.14.3 hypercorn==0.14.3
toml==0.10.2 toml==0.10.2
email-validator==1.3.0 email-validator==1.3.1
python-dotenv==0.21.0 python-dotenv==1.0.0
python-jose[cryptography]==3.3.0 python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4 passlib[bcrypt]==1.7.4
httpx[http2]==0.23.1 httpx[http2]==0.23.3
pymitter==0.4.0 pymitter==0.4.0
uvicorn==0.20.0 uvicorn==0.20.0
python-multipart==0.0.5 python-multipart==0.0.5
-e git+ssh://git@gitlab.pik-potsdam.de/mcc-apsis/nacsos/nacsos-data.git@5afad1104fd7f012c5241ba1b9a3e5a90de5b174#egg=nacsos_data git+ssh://git@gitlab.pik-potsdam.de/mcc-apsis/nacsos/nacsos-data.git@v0.3.1#egg=nacsos_data
flake8==6.0.0 flake8==6.0.0
tox==4.0.11 tox==4.4.6
pytest==7.2.0 pytest==7.2.1
pytest-cov==4.0.0 pytest-cov==4.0.0
mypy==0.991 mypy==1.0.1
alembic==1.8.1 types-toml==0.10.8.5
types-toml==0.10.8.1 types-PyYAML==6.0.12.8
\ No newline at end of file \ No newline at end of file
import mimetypes
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.gzip import GZipMiddleware from fastapi.middleware.gzip import GZipMiddleware
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from .util.middlewares import TimingMiddleware, ErrorHandlingMiddleware from fastapi.middleware.trustedhost import TrustedHostMiddleware
from .util.middlewares import TimingMiddleware, ErrorHandlingMiddleware
from .util.config import settings from .util.config import settings
from .util.logging import get_logger from .util.logging import get_logger
from .api import router as api_router from .api import router as api_router
from .data import db_engine from .data import db_engine
import mimetypes
mimetypes.init() mimetypes.init()
......
...@@ -154,7 +154,6 @@ async def _construct_annotation_item(assignment: AssignmentModel, project_id: st ...@@ -154,7 +154,6 @@ async def _construct_annotation_item(assignment: AssignmentModel, project_id: st
if project is None: if project is None:
raise ProjectNotFoundError(f'No project found in DB for id {project_id}') raise ProjectNotFoundError(f'No project found in DB for id {project_id}')
item = await read_any_item_by_item_id(item_id=assignment.item_id, item_type=project.type, engine=db_engine) item = await read_any_item_by_item_id(item_id=assignment.item_id, item_type=project.type, engine=db_engine)
return AnnotationItem(scheme=merged_scheme, assignment=assignment, scope=scope, item=item) return AnnotationItem(scheme=merged_scheme, assignment=assignment, scope=scope, item=item)
...@@ -172,13 +171,26 @@ async def get_next_assignment_for_scope_for_user(assignment_scope_id: str, ...@@ -172,13 +171,26 @@ async def get_next_assignment_for_scope_for_user(assignment_scope_id: str,
return await _construct_annotation_item(assignment=assignment, project_id=permissions.permissions.project_id) return await _construct_annotation_item(assignment=assignment, project_id=permissions.permissions.project_id)
class NoAssignments(Warning):
pass
@router.get('/annotate/next/{assignment_scope_id}', response_model=AnnotationItem) @router.get('/annotate/next/{assignment_scope_id}', response_model=AnnotationItem)
async def get_next_open_assignment_for_scope_for_user(assignment_scope_id: str, async def get_next_open_assignment_for_scope_for_user(assignment_scope_id: str,
permissions=Depends(UserPermissionChecker('annotations_read'))): permissions=Depends(UserPermissionChecker('annotations_read'))):
# FIXME response for "all done"
assignment = await read_next_open_assignment_for_scope_for_user(assignment_scope_id=assignment_scope_id, assignment = await read_next_open_assignment_for_scope_for_user(assignment_scope_id=assignment_scope_id,
user_id=permissions.user.user_id, user_id=permissions.user.user_id,
db_engine=db_engine) db_engine=db_engine)
# Either there are no assignments, or everything is done.
if assignment is None:
assignments = await read_assignments_for_scope_for_user(assignment_scope_id=assignment_scope_id,
user_id=permissions.user.user_id,
db_engine=db_engine, limit=1)
if len(assignments) > 0:
assignment = assignments[0]
else:
raise NoAssignments('This user has no assignments in this scope.')
return await _construct_annotation_item(assignment=assignment, project_id=permissions.permissions.project_id) return await _construct_annotation_item(assignment=assignment, project_id=permissions.permissions.project_id)
......
...@@ -2,9 +2,10 @@ from fastapi import APIRouter, Depends ...@@ -2,9 +2,10 @@ from fastapi import APIRouter, Depends
from nacsos_data.models.imports import ImportModel, ImportType from nacsos_data.models.imports import ImportModel, ImportType
from nacsos_data.db.crud.imports import \ from nacsos_data.db.crud.imports import \
read_all_imports_for_project, \
read_import, \ read_import, \
upsert_import, \ upsert_import,\
delete_import, \
read_all_imports_for_project, \
read_item_count_for_import read_item_count_for_import
from nacsos_data.util.pipelines.imports import submit_jsonl_import_task, submit_wos_import_task from nacsos_data.util.pipelines.imports import submit_jsonl_import_task, submit_wos_import_task
...@@ -70,5 +71,18 @@ async def trigger_import(import_id: str, ...@@ -70,5 +71,18 @@ async def trigger_import(import_id: str,
engine=db_engine) engine=db_engine)
else: else:
raise NotImplementedError(f'No import trigger for "{import_details.type}" implemented yet.') raise NotImplementedError(f'No import trigger for "{import_details.type}" implemented yet.')
else:
raise InsufficientPermissions('You do not have permission to edit this data import.') raise InsufficientPermissions('You do not have permission to edit this data import.')
@router.delete('/import/delete/{import_id}', response_model=str)
async def delete_import_details(import_id: str,
permissions: UserPermissions = Depends(UserPermissionChecker('imports_edit'))):
import_details = await read_import(import_id=import_id, engine=db_engine)
# First, make sure the user trying to delete this import is actually authorised to delete this specific import
if import_details is not None and str(import_details.project_id) == str(permissions.permissions.project_id):
await delete_import(import_id=import_id, engine=db_engine)
return str(import_id)
raise InsufficientPermissions('You do not have permission to delete this data import.')
from fastapi import APIRouter, Depends, Query, HTTPException, status as http_status from fastapi import APIRouter, Depends, Query, HTTPException, status as http_status
from server.api.errors import DataNotFoundWarning, UserNotFoundError
from server.util.logging import get_logger
from nacsos_data.models.users import UserModel, UserInDBModel, UserBaseModel from nacsos_data.models.users import UserModel, UserInDBModel, UserBaseModel
from nacsos_data.db.crud.users import \ from nacsos_data.db.crud.users import \
read_users, \ read_users, \
read_user_by_id, \ read_user_by_id, \
read_users_by_ids, \ read_users_by_ids, \
create_or_update_user create_or_update_user
from server.util.security import UserPermissionChecker, UserPermissions, get_current_active_user
from server.data import db_engine from server.data import db_engine
from server.api.errors import DataNotFoundWarning, UserNotFoundError
from server.util.logging import get_logger
from server.util.security import UserPermissionChecker, UserPermissions, get_current_active_user
logger = get_logger('nacsos.api.route.admin.users') logger = get_logger('nacsos.api.route.admin.users')
router = APIRouter() router = APIRouter()
......
...@@ -2,7 +2,7 @@ from typing import Union, Literal, TYPE_CHECKING ...@@ -2,7 +2,7 @@ from typing import Union, Literal, TYPE_CHECKING
from pymitter import EventEmitter from pymitter import EventEmitter
from .hooks import imports from .hooks import example
from . import events from . import events
eventbus = EventEmitter(delimiter='_', wildcard=True) eventbus = EventEmitter(delimiter='_', wildcard=True)
...@@ -16,6 +16,6 @@ else: ...@@ -16,6 +16,6 @@ else:
AnyEventLiteral = Literal[tuple(sc._name for sc in events.BaseEvent.get_subclasses())] # noqa PyProtectedMember AnyEventLiteral = Literal[tuple(sc._name for sc in events.BaseEvent.get_subclasses())] # noqa PyProtectedMember
# Permanent/global listeners # Permanent/global listeners
eventbus.on(events.PipelineTaskStatusChangedEvent._name, imports.update_import_status) # noqa PyProtectedMember eventbus.on(events.ExampleEvent._name, example.test_listener) # noqa PyProtectedMember
__all__ = ['eventbus', 'events', 'AnyEvent', 'AnyEventType'] __all__ = ['eventbus', 'events', 'AnyEvent', 'AnyEventType']
from typing import Literal, ClassVar from typing import ClassVar
from pydantic import BaseModel from pydantic import BaseModel
...@@ -17,38 +17,10 @@ class BaseEvent(BaseModel): ...@@ -17,38 +17,10 @@ class BaseEvent(BaseModel):
return tuple(set(recurse(cls))) return tuple(set(recurse(cls)))
TaskStatus = Literal['PENDING', 'RUNNING', 'COMPLETED', 'FAILED', 'CANCELLED'] # FIXME move somewhere else class ExampleEvent(BaseEvent):
_name = 'Example_*'
payload_a: str
class PipelineTaskStatusChangedEvent(BaseEvent): class ExampleSubEvent(ExampleEvent):
""" _name = 'Example_sub'
Emitted when the pipeline service calls the nacsos-core API and tells it about a status change of a task
"""
_name = 'PipelineTaskStatus_*'
task_id: str
status: TaskStatus
project_id: str
user_id: str
import_id: str | None = None
function_name: str # incl module path
class PipelineTaskStatusCompletedEvent(PipelineTaskStatusChangedEvent):
"""
More specific version of `PipelineTaskStatusChangedEvent` emitted when a task finished (successfully/completed)
"""
_name = 'PipelineTask_completed'
class PipelineTaskStatusStartedEvent(PipelineTaskStatusChangedEvent):
"""
More specific version of `PipelineTaskStatusChangedEvent` emitted when a task started
"""
_name = 'PipelineTask_started'
class PipelineTaskStatusFailedEvent(PipelineTaskStatusChangedEvent):
"""
More specific version of `PipelineTaskStatusChangedEvent` emitted when a task finished (failed)
"""
_name = 'PipelineTask_failed'
import logging
from ..events import ExampleEvent
logger = logging.getLogger('nacsos.event-hooks.test')
def test_listener(event: ExampleEvent):
logger.debug(f'Received event {event}')
import logging
from datetime import datetime
from sqlalchemy import select
from nacsos_data.db.schemas import Import
from ..events import PipelineTaskStatusChangedEvent
from ....data import db_engine
logger = logging.getLogger('nacsos.event-hooks.imports')
async def update_import_status(event: PipelineTaskStatusChangedEvent):
logger.debug(f'Maybe going to update import status {event}')
if event.function_name in [
'nacsos_lib.twitter.import.import_twitter_api',
'nacsos_lib.twitter.import.import_twitter_db',
'nacsos_lib.academic.import.import_wos_file'
]:
async with db_engine.session() as session:
stmt = select(Import).filter_by(pipeline_task_id=event.task_id)
import_details: Import | None = (await session.execute(stmt)).scalars().one_or_none()
if import_details is None and event.import_id is not None:
logger.debug(f'second try with {event.import_id}')
stmt = select(Import).filter_by(import_id=event.import_id)
import_details = (await session.execute(stmt)).scalars().one_or_none()
if import_details is not None:
# Seems like task was started, remember the time
if event.status == 'RUNNING' and import_details.time_started is None:
logger.debug( # type: ignore[unreachable]
f'Updating import start time for {import_details.import_id}'
)
import_details.time_started = datetime.now()
await session.commit()
elif (event.status == 'COMPLETED' or event.status == 'FAILED' or event.status == 'CANCELLED') \
and import_details.time_finished is None:
logger.debug( # type: ignore[unreachable]
f'Updating import finish time for {import_details.import_id}'
)
import_details.time_finished = datetime.now()
await session.commit()
...@@ -7,7 +7,7 @@ from fastapi.security import OAuth2PasswordBearer ...@@ -7,7 +7,7 @@ from fastapi.security import OAuth2PasswordBearer
from nacsos_data.models.users import UserModel from nacsos_data.models.users import UserModel
from nacsos_data.models.projects import ProjectPermissionsModel, ProjectPermission from nacsos_data.models.projects import ProjectPermissionsModel, ProjectPermission
from nacsos_data.db.crud.users import read_user_by_name as crud_get_user_by_name, read_user_by_id from nacsos_data.db.crud.users import read_user_by_name as crud_get_user_by_name, read_user_by_name
from nacsos_data.db.crud.projects import read_project_permissions_for_user as crud_get_project_permissions_for_user from nacsos_data.db.crud.projects import read_project_permissions_for_user as crud_get_project_permissions_for_user
from server.api.errors import MissingInformationError from server.api.errors import MissingInformationError
...@@ -73,7 +73,7 @@ async def get_current_user(token: str = Depends(oauth2_scheme)): ...@@ -73,7 +73,7 @@ async def get_current_user(token: str = Depends(oauth2_scheme)):
if token_user is not None: if token_user is not None:
user = await crud_get_user_by_name(username=token_user, engine=db_engine) user = await crud_get_user_by_name(username=token_user, engine=db_engine)
else: else:
user = await read_user_by_id(user_id=settings.USERS.DEFAULT_USER, engine=db_engine) user = await read_user_by_name(username=settings.USERS.DEFAULT_USER, engine=db_engine)
logger.warning('Authentication using fake user!') logger.warning('Authentication using fake user!')
if user is None: if user is None:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment