annotations.py 28.35 KiB
import uuid
from typing import TYPE_CHECKING
from pydantic import BaseModel
from sqlalchemy import select, text
from sqlalchemy.orm import load_only
from fastapi import APIRouter, Depends, HTTPException, status as http_status, Query
from nacsos_data.db.schemas import \
BotAnnotationMetaData, \
AssignmentScope, \
User, \
Annotation
from nacsos_data.models.annotations import \
AnnotationSchemeModel, \
AssignmentScopeModel, \
AssignmentModel, \
AssignmentStatus, \
AssignmentScopeConfig, \
AnnotationSchemeModelFlat, \
FlattenedAnnotationSchemeLabel
from nacsos_data.models.bot_annotations import \
ResolutionMethod, \
AnnotationFilters, \
BotAnnotationModel, \
AnnotationCollection, \
BotMetaResolve, \
GroupedBotAnnotation, \
AnnotationCollectionDB, \
BotKind, \
BotAnnotationMetaDataBaseModel
from nacsos_data.models.users import UserModel
from nacsos_data.models.items import AnyItemModel
from nacsos_data.db.crud.items import read_any_item_by_item_id
from nacsos_data.db.crud.projects import read_project_by_id
from nacsos_data.db.crud.annotations import \
read_assignment, \
read_assignments_for_scope, \
read_assignments_for_scope_for_user, \
read_assignment_scopes_for_project, \
read_assignment_scopes_for_project_for_user, \
read_annotations_for_assignment, \
read_next_assignment_for_scope_for_user, \
read_next_open_assignment_for_scope_for_user, \
read_annotation_scheme, \
read_annotation_schemes_for_project, \
upsert_annotations, \
read_assignment_scope, \
upsert_annotation_scheme, \
delete_annotation_scheme, \
upsert_assignment_scope, \
delete_assignment_scope, \
read_item_ids_with_assignment_count_for_project, \
read_assignment_counts_for_scope, \
ItemWithCount, \
AssignmentCounts, \
UserProjectAssignmentScope, \
store_assignments, \
store_resolved_bot_annotations, \
update_resolved_bot_annotations
from nacsos_data.util.annotations.resolve import \
AnnotationFilterObject, \
get_resolved_item_annotations, \
read_bot_annotations
from nacsos_data.util.annotations.validation import \
merge_scheme_and_annotations, \
annotated_scheme_to_annotations, \
flatten_annotation_scheme
from nacsos_data.util.annotations.assignments.random import random_assignments
from nacsos_data.util.annotations.assignments.random_exclusion import random_assignments_with_exclusion
from server.api.errors import \
SaveFailedError, \
AssignmentScopeNotFoundError, \
NoNextAssignmentWarning, \
ProjectNotFoundError, \
AnnotationSchemeNotFoundError, \
MissingInformationError
from server.util.security import UserPermissionChecker
from server.data import db_engine
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession # noqa F401
router = APIRouter()
class AnnotatedItem(BaseModel):
scheme: AnnotationSchemeModel
assignment: AssignmentModel
class AnnotationItem(AnnotatedItem):
scope: AssignmentScopeModel
item: AnyItemModel
@router.get('/schemes/definition/{annotation_scheme_id}',
response_model=AnnotationSchemeModelFlat | AnnotationSchemeModel)
async def get_scheme_definition(annotation_scheme_id: str,
flat: bool = Query(default=False),
permissions=Depends(UserPermissionChecker('annotations_read'))) \
-> AnnotationSchemeModelFlat | AnnotationSchemeModel:
"""
This endpoint returns the detailed definition of an annotation scheme.
:param annotation_scheme_id: database id of the annotation scheme.
:param flat: True to get the flattened scheme
:param permissions:
:return: a single annotation scheme
"""
scheme = await read_annotation_scheme(annotation_scheme_id=annotation_scheme_id, db_engine=db_engine)
if scheme is not None:
if flat:
return flatten_annotation_scheme(scheme)
return scheme
raise AnnotationSchemeNotFoundError(f'No `AnnotationScheme` found in DB for id {annotation_scheme_id}')
@router.put('/schemes/definition/', response_model=str)
async def put_annotation_scheme(annotation_scheme: AnnotationSchemeModel,
permissions=Depends(UserPermissionChecker('annotations_edit'))) -> str:
key = await upsert_annotation_scheme(annotation_scheme=annotation_scheme, db_engine=db_engine)
return str(key)
@router.delete('/schemes/definition/{scheme_id}')
async def remove_annotation_scheme(annotation_scheme_id: str,
permissions=Depends(UserPermissionChecker('annotations_edit'))) -> None:
await delete_annotation_scheme(annotation_scheme_id=annotation_scheme_id, db_engine=db_engine)
@router.get('/schemes/list/{project_id}', response_model=list[AnnotationSchemeModel])
async def get_scheme_definitions_for_project(project_id: str,
permissions=Depends(UserPermissionChecker('annotations_read'))) \
-> list[AnnotationSchemeModel]:
"""
This endpoint returns the detailed definitions of all annotation schemes associated with a project.
:param project_id: database id of the project
:param permissions:
:return: list of annotation schemes
"""
return await read_annotation_schemes_for_project(project_id=project_id, db_engine=db_engine)
async def _construct_annotation_item(assignment: AssignmentModel, project_id: str) -> AnnotationItem:
if assignment.assignment_id is None:
raise MissingInformationError('No `assignment_id` set for `assignment`.')
scope = await read_assignment_scope(assignment_scope_id=assignment.assignment_scope_id, db_engine=db_engine)
if scope is None:
raise AnnotationSchemeNotFoundError(f'No annotation scope found in DB for id '
f'{assignment.assignment_scope_id}')
scheme = await read_annotation_scheme(annotation_scheme_id=assignment.annotation_scheme_id, db_engine=db_engine)
if scheme is None:
raise AnnotationSchemeNotFoundError(f'No annotation scheme found in DB for id '
f'{assignment.annotation_scheme_id}')
annotations = await read_annotations_for_assignment(assignment_id=assignment.assignment_id, db_engine=db_engine)
merged_scheme = merge_scheme_and_annotations(annotation_scheme=scheme, annotations=annotations)
project = await read_project_by_id(project_id=project_id, engine=db_engine)
if project is None:
raise ProjectNotFoundError(f'No project found in DB for id {project_id}')
item = await read_any_item_by_item_id(item_id=assignment.item_id, item_type=project.type, engine=db_engine)
if item is None:
raise MissingInformationError(f'No item found in DB for id {assignment.item_id}')
return AnnotationItem(scheme=merged_scheme, assignment=assignment, scope=scope, item=item)
@router.get('/annotate/next/{assignment_scope_id}/{current_assignment_id}', response_model=AnnotationItem)
async def get_next_assignment_for_scope_for_user(assignment_scope_id: str,
current_assignment_id: str,
permissions=Depends(UserPermissionChecker('annotations_read'))):
# FIXME response for "last in list"
assignment = await read_next_assignment_for_scope_for_user(current_assignment_id=current_assignment_id,
assignment_scope_id=assignment_scope_id,
user_id=permissions.user.user_id,
db_engine=db_engine)
if assignment is None:
raise NoNextAssignmentWarning(f'Could not determine a next assignment for scope {assignment_scope_id}')
return await _construct_annotation_item(assignment=assignment, project_id=permissions.permissions.project_id)
class NoAssignments(Warning):
pass
@router.get('/annotate/next/{assignment_scope_id}', response_model=AnnotationItem)
async def get_next_open_assignment_for_scope_for_user(assignment_scope_id: str,
permissions=Depends(UserPermissionChecker('annotations_read'))):
assignment = await read_next_open_assignment_for_scope_for_user(assignment_scope_id=assignment_scope_id,
user_id=permissions.user.user_id,
db_engine=db_engine)
# Either there are no assignments, or everything is done.
if assignment is None:
assignments = await read_assignments_for_scope_for_user(assignment_scope_id=assignment_scope_id,
user_id=permissions.user.user_id,
db_engine=db_engine, limit=1)
if len(assignments) > 0:
assignment = assignments[0]
else:
raise NoAssignments('This user has no assignments in this scope.')
return await _construct_annotation_item(assignment=assignment, project_id=permissions.permissions.project_id)
@router.get('/annotate/assignment/{assignment_id}', response_model=AnnotationItem)
async def get_assignment(assignment_id: str,
permissions=Depends(UserPermissionChecker('annotations_read'))):
assignment = await read_assignment(assignment_id=assignment_id, db_engine=db_engine)
if (assignment is None) or (assignment.user_id != permissions.user.user_id):
raise HTTPException(status_code=http_status.HTTP_401_UNAUTHORIZED,
detail='You do not have permission to handle this assignment, as it is not yours!')
return await _construct_annotation_item(assignment=assignment, project_id=permissions.permissions.project_id)
@router.get('/annotate/scopes/{project_id}', response_model=list[UserProjectAssignmentScope])
async def get_assignment_scopes_for_user(project_id: str,
permissions=Depends(UserPermissionChecker('annotations_read'))) \
-> list[UserProjectAssignmentScope]:
scopes = await read_assignment_scopes_for_project_for_user(project_id=project_id,
user_id=permissions.user.user_id,
db_engine=db_engine)
return scopes
@router.get('/annotate/scopes/', response_model=list[AssignmentScopeModel])
async def get_assignment_scopes_for_project(permissions=Depends(UserPermissionChecker('annotations_edit'))) \
-> list[AssignmentScopeModel]:
scopes = await read_assignment_scopes_for_project(project_id=permissions.permissions.project_id,
db_engine=db_engine)
return scopes
@router.get('/annotate/scope/{assignment_scope_id}', response_model=AssignmentScopeModel)
async def get_assignment_scope(assignment_scope_id: str,
permissions=Depends(
UserPermissionChecker(['annotations_read', 'annotations_edit'], fulfill_all=False))
) -> AssignmentScopeModel:
scope = await read_assignment_scope(assignment_scope_id=assignment_scope_id, db_engine=db_engine)
if scope is not None:
return scope
raise AssignmentScopeNotFoundError(f'No assignment scope found in the DB for {assignment_scope_id}')
@router.put('/annotate/scope/', response_model=str)
async def put_assignment_scope(assignment_scope: AssignmentScopeModel,
permissions=Depends(UserPermissionChecker('annotations_edit'))) -> str:
key = await upsert_assignment_scope(assignment_scope=assignment_scope, db_engine=db_engine)
return str(key)
@router.delete('/annotate/scope/{assignment_scope_id}')
async def remove_assignment_scope(assignment_scope_id: str,
permissions=Depends(UserPermissionChecker('annotations_edit'))) -> None:
try:
await delete_assignment_scope(assignment_scope_id=assignment_scope_id, db_engine=db_engine)
except ValueError as e:
raise HTTPException(status_code=http_status.HTTP_400_BAD_REQUEST,
detail=str(e))
@router.get('/annotate/scope/counts/{assignment_scope_id}', response_model=AssignmentCounts)
async def get_num_assignments_for_scope(assignment_scope_id: str,
permissions=Depends(
UserPermissionChecker(['annotations_read', 'annotations_edit'],
fulfill_all=False))
) -> AssignmentCounts:
scope = await read_assignment_counts_for_scope(assignment_scope_id=assignment_scope_id, db_engine=db_engine)
return scope
@router.get('/annotate/assignments/{assignment_scope_id}', response_model=list[AssignmentModel])
async def get_assignments(assignment_scope_id: str, permissions=Depends(UserPermissionChecker('annotations_read'))) \
-> list[AssignmentModel]:
assignments = await read_assignments_for_scope_for_user(assignment_scope_id=assignment_scope_id,
user_id=permissions.user.user_id,
db_engine=db_engine)
return assignments
class ProgressIndicatorLabel(BaseModel):
repeat: int
value_int: int | None = None
value_bool: bool | None = None
class ProgressIndicator(BaseModel):
assignment_id: str | uuid.UUID
item_id: str | uuid.UUID
order: int
status: AssignmentStatus
labels: dict[str, list[ProgressIndicatorLabel]] | None = None
@router.get('/annotate/assignment/progress/{assignment_scope_id}', response_model=list[ProgressIndicator])
async def get_assignment_indicators_for_scope_for_user(assignment_scope_id: str,
permissions=Depends(UserPermissionChecker('annotations_read'))) \
-> list[ProgressIndicator]:
async with db_engine.session() as session: # type: AsyncSession
stmt = text('''
WITH tmp as (SELECT assignment.assignment_id,
assignment.item_id,
assignment."order",
assignment.status,
annotation.key,
jsonb_agg(jsonb_build_object('repeat', annotation.repeat,
'value_bool', annotation.value_bool,
'value_int', annotation.value_int)) as pl
FROM assignment
LEFT OUTER JOIN annotation ON assignment.assignment_id = annotation.assignment_id
WHERE assignment.user_id = :user_id
AND assignment.assignment_scope_id = :scope_id
GROUP BY assignment.assignment_id,
assignment.item_id,
assignment."order",
annotation.key)
SELECT assignment_id,
item_id,
"order",
status,
jsonb_object_agg(key, pl) filter (where key is not null ) as labels
FROM tmp
GROUP BY tmp.assignment_id,
tmp.item_id,
tmp."order",
tmp.status
ORDER BY tmp."order" ASC;
''')
results = (await session.execute(stmt, {
'scope_id': assignment_scope_id,
'user_id': permissions.user.user_id
})).mappings().all()
return [ProgressIndicator.model_validate(r) for r in results]
@router.get('/annotate/assignments/scope/{assignment_scope_id}', response_model=list[AssignmentModel])
async def get_assignments_for_scope(assignment_scope_id: str,
permissions=Depends(UserPermissionChecker('annotations_read'))) \
-> list[AssignmentModel]:
assignments = await read_assignments_for_scope(assignment_scope_id=assignment_scope_id,
db_engine=db_engine)
return assignments
@router.get('/annotate/annotations/{assignment_scope_id}', response_model=list[AssignmentModel])
async def get_annotations(assignment_scope_id: str, permissions=Depends(UserPermissionChecker('annotations_read'))) \
-> list[AssignmentModel]:
assignments = await read_assignments_for_scope_for_user(assignment_scope_id=assignment_scope_id,
user_id=permissions.user.user_id,
db_engine=db_engine)
return assignments
@router.post('/annotate/save', response_model=AssignmentStatus)
async def save_annotation(annotated_item: AnnotatedItem,
permissions=Depends(UserPermissionChecker('annotations_read'))) -> AssignmentStatus:
# double-check, that the supposed assignment actually exists
if annotated_item.assignment.assignment_id is None:
raise MissingInformationError('Missing `assignment_id` in `annotation_item`!')
assignment_db = await read_assignment(assignment_id=annotated_item.assignment.assignment_id, db_engine=db_engine)
if assignment_db is None:
raise MissingInformationError('No assignment found!')
if permissions.user.user_id == assignment_db.user_id \
and str(assignment_db.assignment_scope_id) == annotated_item.assignment.assignment_scope_id \
and str(assignment_db.item_id) == annotated_item.assignment.item_id \
and str(assignment_db.annotation_scheme_id) == annotated_item.assignment.annotation_scheme_id:
annotations = annotated_scheme_to_annotations(annotated_item.scheme)
status = await upsert_annotations(annotations=annotations,
assignment_id=annotated_item.assignment.assignment_id,
db_engine=db_engine)
if status is not None:
return status
raise SaveFailedError('Failed to save annotation!')
else:
raise HTTPException(
status_code=http_status.HTTP_403_FORBIDDEN,
detail='The combination of project, assignment, user, task, and item is invalid.',
)
@router.get('/config/items/', response_model=list[ItemWithCount])
async def get_items_with_count(permissions=Depends(UserPermissionChecker('dataset_read'))) \
-> list[ItemWithCount]:
items = await read_item_ids_with_assignment_count_for_project(project_id=permissions.permissions.project_id,
db_engine=db_engine)
return items
class MakeAssignmentsRequestModel(BaseModel):
annotation_scheme_id: str
scope_id: str
config: AssignmentScopeConfig
save: bool = False
@router.post('/config/assignments/', response_model=list[AssignmentModel])
async def make_assignments(payload: MakeAssignmentsRequestModel,
permissions=Depends(UserPermissionChecker('annotations_edit'))):
if payload.config.config_type == 'random':
try:
assignments = await random_assignments(assignment_scope_id=payload.scope_id,
annotation_scheme_id=payload.annotation_scheme_id,
project_id=permissions.permissions.project_id,
config=payload.config,
engine=db_engine)
except ValueError as e:
raise HTTPException(status_code=http_status.HTTP_400_BAD_REQUEST,
detail=str(e))
elif payload.config.config_type == 'random_exclusion':
try:
assignments = await random_assignments_with_exclusion(
assignment_scope_id=payload.scope_id,
annotation_scheme_id=payload.annotation_scheme_id,
project_id=permissions.permissions.project_id,
config=payload.config, # type: ignore[arg-type] # FIXME
engine=db_engine)
except ValueError as e:
raise HTTPException(status_code=http_status.HTTP_400_BAD_REQUEST,
detail=str(e))
else:
raise HTTPException(status_code=http_status.HTTP_501_NOT_IMPLEMENTED,
detail=f'Method "{payload.config.config_type}" is unknown.')
if payload.save:
await store_assignments(assignments=assignments, db_engine=db_engine)
return assignments
@router.get('/config/scopes/{scheme_id}', response_model=list[AssignmentScopeModel])
async def get_assignment_scopes_for_scheme(scheme_id: str,
permissions=Depends(UserPermissionChecker('annotations_read'))) \
-> list[AssignmentScopeModel]:
async with db_engine.session() as session: # type: AsyncSession
return [AssignmentScopeModel.model_validate(scope.__dict__)
for scope in (await session.execute(select(AssignmentScope)
.where(AssignmentScope.annotation_scheme_id == scheme_id))
).scalars().all()]
@router.get('/config/annotators/{scheme_id}', response_model=list[UserModel])
async def get_annotators_for_scheme(scheme_id: str,
permissions=Depends(UserPermissionChecker('annotations_edit'))) \
-> list[UserModel]:
async with db_engine.session() as session: # type: AsyncSession
return [UserModel.model_validate(user.__dict__)
for user in (
await session.execute(select(User)
.join(Annotation)
.distinct()
.where(Annotation.annotation_scheme_id == scheme_id))).scalars().all()]
class ResolutionProposalResponse(BaseModel):
collection: AnnotationCollection
proposal: dict[str, list[GroupedBotAnnotation]]
scheme_flat: list[FlattenedAnnotationSchemeLabel]
class SavedResolutionResponse(BaseModel):
name: str
meta: BotMetaResolve
saved: dict[str, list[GroupedBotAnnotation]]
@router.get('/config/resolve/', response_model=ResolutionProposalResponse)
async def get_resolved_annotations(strategy: ResolutionMethod,
scheme_id: str,
scope_id: list[str] | None = Query(default=None),
user_id: list[str] | None = Query(default=None),
key: list[str] | None = Query(default=None),
repeat: list[int] | None = Query(default=None),
ignore_order: bool | None = Query(default=False),
ignore_hierarchy: bool | None = Query(default=False),
permissions=Depends(UserPermissionChecker('annotations_edit'))):
"""
Get all annotations that match the filters (e.g. all annotations made by users in scope with :scope_id).
Annotations are returned in a 3D matrix:
rows (dict entries): items (key: item_id)
columns (list index of dict entry): Label (key in scheme + repeat); index map in matrix.keys
cells: list of annotations by each user for item/Label combination
:param strategy
:param scheme_id:
:param scope_id:
:param user_id:
:param key:
:param repeat:
:param permissions:
:param ignore_order:
:param ignore_hierarchy:
:return:
"""
filters = AnnotationFilters(
scheme_id=scheme_id,
scope_id=scope_id,
user_id=user_id,
key=key,
repeat=repeat,
)
if ignore_hierarchy is None:
ignore_hierarchy = False
if ignore_order is None:
ignore_order = False
scheme, flat_labels, collection, resolved = \
await get_resolved_item_annotations(strategy=strategy,
filters=AnnotationFilterObject.model_validate(filters),
ignore_order=ignore_order,
ignore_hierarchy=ignore_hierarchy,
db_engine=db_engine)
return ResolutionProposalResponse(collection=collection, proposal=resolved, scheme_flat=flat_labels)
class ResolutionPayload(BaseModel):
name: str
strategy: ResolutionMethod
filters: AnnotationFilters
ignore_order: bool
ignore_hierarchy: bool
collection: AnnotationCollectionDB
bot_annotations: list[BotAnnotationModel]
@router.put('/config/resolve/', response_model=str)
async def save_resolved_annotations(data: ResolutionPayload,
permissions=Depends(UserPermissionChecker('annotations_edit'))):
meta_id = await store_resolved_bot_annotations(
project_id=permissions.permissions.project_id, name=data.name, algorithm=data.strategy,
filters=data.filters, ignore_hierarchy=data.ignore_hierarchy, ignore_repeat=data.ignore_order,
collection=data.collection, bot_annotations=data.bot_annotations, db_engine=db_engine)
return meta_id
@router.put('/config/resolve/update')
async def update_resolved_annotations(bot_annotation_metadata_id: str,
name: str,
bot_annotations: list[BotAnnotationModel],
permissions=Depends(UserPermissionChecker('annotations_edit'))) -> None:
await update_resolved_bot_annotations(bot_annotation_metadata_id=bot_annotation_metadata_id,
name=name, bot_annotations=bot_annotations, db_engine=db_engine)
@router.get('/config/resolved-list/', response_model=list[BotAnnotationMetaDataBaseModel])
async def list_saved_resolved_annotations(permissions=Depends(UserPermissionChecker('annotations_read'))):
async with db_engine.session() as session: # type: AsyncSession
exports = (await session.execute(
select(BotAnnotationMetaData)
.where(BotAnnotationMetaData.project_id == permissions.permissions.project_id,
BotAnnotationMetaData.kind == BotKind.RESOLVE)
.options(load_only(BotAnnotationMetaData.bot_annotation_metadata_id,
BotAnnotationMetaData.annotation_scheme_id,
BotAnnotationMetaData.assignment_scope_id,
BotAnnotationMetaData.project_id,
BotAnnotationMetaData.name,
BotAnnotationMetaData.kind,
BotAnnotationMetaData.time_updated,
BotAnnotationMetaData.time_created)))) \
.scalars().all()
return [BotAnnotationMetaDataBaseModel.model_validate(e.__dict__) for e in exports]
@router.get('/config/resolved/{bot_annotation_meta_id}', response_model=SavedResolutionResponse)
async def get_saved_resolved_annotations(bot_annotation_metadata_id: str,
permissions=Depends(UserPermissionChecker('annotations_edit'))):
bot_annotations = await read_bot_annotations(bot_annotation_metadata_id=bot_annotation_metadata_id,
db_engine=db_engine)
async with db_engine.session() as session: # type: AsyncSession
meta: BotAnnotationMetaData = (await session.execute(
select(BotAnnotationMetaData)
.where(BotAnnotationMetaData.bot_annotation_metadata_id == bot_annotation_metadata_id))) \
.scalars().one()
return SavedResolutionResponse(
name=meta.name,
meta=meta.meta,
saved=bot_annotations
)
@router.delete('/config/resolved/{bot_annotation_meta_id}')
async def delete_saved_resolved_annotations(bot_annotation_metadata_id: str,
permissions=Depends(UserPermissionChecker('annotations_edit'))):
async with db_engine.session() as session: # type: AsyncSession
meta: BotAnnotationMetaData = (await session.execute(
select(BotAnnotationMetaData)
.where(BotAnnotationMetaData.bot_annotation_metadata_id == bot_annotation_metadata_id))) \
.scalars().one()
await session.delete(meta)