Newer
Older
from sqlalchemy.orm import load_only
from fastapi import APIRouter, Depends, HTTPException, status as http_status, Query
from nacsos_data.db.schemas import \
BotAnnotationMetaData, \
AssignmentScope, \
User, \
from nacsos_data.models.annotations import \
AnnotationSchemeModel, \
AssignmentScopeModel, \
AssignmentModel, \
AssignmentStatus, \
AnnotationSchemeModelFlat, \
FlattenedAnnotationSchemeLabel
from nacsos_data.models.bot_annotations import \
ResolutionMethod, \
AnnotationFilters, \
BotAnnotationModel, \
GroupedBotAnnotation, \
AnnotationCollectionDB, \
BotKind, \
BotAnnotationMetaDataBaseModel
from nacsos_data.models.users import UserModel
from nacsos_data.models.items import AnyItemModel
from nacsos_data.db.crud.items import read_any_item_by_item_id
from nacsos_data.db.crud.projects import read_project_by_id
from nacsos_data.db.crud.annotations import \
read_assignment, \
read_assignments_for_scope, \
read_assignments_for_scope_for_user, \
read_assignment_scopes_for_project, \
read_assignment_scopes_for_project_for_user, \
read_annotations_for_assignment, \
read_next_assignment_for_scope_for_user, \
read_next_open_assignment_for_scope_for_user, \
read_annotation_scheme, \
read_annotation_schemes_for_project, \
upsert_annotation_scheme, \
delete_annotation_scheme, \
upsert_assignment_scope, \
delete_assignment_scope, \
read_item_ids_with_assignment_count_for_project, \
read_assignment_counts_for_scope, \
ItemWithCount, \
AssignmentCounts, \
UserProjectAssignmentScope, \
store_resolved_bot_annotations, \
update_resolved_bot_annotations
from nacsos_data.util.annotations.resolve import \
AnnotationFilterObject, \
get_resolved_item_annotations, \
read_bot_annotations
from nacsos_data.util.annotations.validation import \
merge_scheme_and_annotations, \
annotated_scheme_to_annotations, \
flatten_annotation_scheme
from nacsos_data.util.annotations.assignments.random import random_assignments
from nacsos_data.util.annotations.assignments.random_exclusion import random_assignments_with_exclusion
from server.api.errors import \
SaveFailedError, \
AssignmentScopeNotFoundError, \
NoNextAssignmentWarning, \
ProjectNotFoundError, \
AnnotationSchemeNotFoundError, \
from server.util.security import UserPermissionChecker
from server.data import db_engine
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession # noqa F401
assignment: AssignmentModel
class AnnotationItem(AnnotatedItem):
scope: AssignmentScopeModel
item: AnyItemModel
@router.get('/schemes/definition/{annotation_scheme_id}',
response_model=AnnotationSchemeModelFlat | AnnotationSchemeModel)
async def get_scheme_definition(annotation_scheme_id: str,
flat: bool = Query(default=False),
permissions=Depends(UserPermissionChecker('annotations_read'))) \
-> AnnotationSchemeModelFlat | AnnotationSchemeModel:
This endpoint returns the detailed definition of an annotation scheme.
:param annotation_scheme_id: database id of the annotation scheme.
:param flat: True to get the flattened scheme
scheme = await read_annotation_scheme(annotation_scheme_id=annotation_scheme_id, db_engine=db_engine)
if flat:
return flatten_annotation_scheme(scheme)
return scheme
raise AnnotationSchemeNotFoundError(f'No `AnnotationScheme` found in DB for id {annotation_scheme_id}')
@router.put('/schemes/definition/', response_model=str)
async def put_annotation_scheme(annotation_scheme: AnnotationSchemeModel,
permissions=Depends(UserPermissionChecker('annotations_edit'))) -> str:
key = await upsert_annotation_scheme(annotation_scheme=annotation_scheme, db_engine=db_engine)
@router.delete('/schemes/definition/{scheme_id}')
async def remove_annotation_scheme(annotation_scheme_id: str,
permissions=Depends(UserPermissionChecker('annotations_edit'))) -> None:
await delete_annotation_scheme(annotation_scheme_id=annotation_scheme_id, db_engine=db_engine)
@router.get('/schemes/list/{project_id}', response_model=list[AnnotationSchemeModel])
async def get_scheme_definitions_for_project(project_id: str,
permissions=Depends(UserPermissionChecker('annotations_read'))) \
-> list[AnnotationSchemeModel]:
This endpoint returns the detailed definitions of all annotation schemes associated with a project.
return await read_annotation_schemes_for_project(project_id=project_id, db_engine=db_engine)
async def _construct_annotation_item(assignment: AssignmentModel, project_id: str) -> AnnotationItem:
if assignment.assignment_id is None:
raise MissingInformationError('No `assignment_id` set for `assignment`.')
scope = await read_assignment_scope(assignment_scope_id=assignment.assignment_scope_id, db_engine=db_engine)
if scope is None:
raise AnnotationSchemeNotFoundError(f'No annotation scope found in DB for id '
f'{assignment.assignment_scope_id}')
scheme = await read_annotation_scheme(annotation_scheme_id=assignment.annotation_scheme_id, db_engine=db_engine)
if scheme is None:
raise AnnotationSchemeNotFoundError(f'No annotation scheme found in DB for id '
f'{assignment.annotation_scheme_id}')
annotations = await read_annotations_for_assignment(assignment_id=assignment.assignment_id, db_engine=db_engine)
merged_scheme = merge_scheme_and_annotations(annotation_scheme=scheme, annotations=annotations)
project = await read_project_by_id(project_id=project_id, engine=db_engine)
if project is None:
raise ProjectNotFoundError(f'No project found in DB for id {project_id}')
item = await read_any_item_by_item_id(item_id=assignment.item_id, item_type=project.type, engine=db_engine)
if item is None:
raise MissingInformationError(f'No item found in DB for id {assignment.item_id}')
return AnnotationItem(scheme=merged_scheme, assignment=assignment, scope=scope, item=item)
@router.get('/annotate/next/{assignment_scope_id}/{current_assignment_id}', response_model=AnnotationItem)
async def get_next_assignment_for_scope_for_user(assignment_scope_id: str,
current_assignment_id: str,
permissions=Depends(UserPermissionChecker('annotations_read'))):
# FIXME response for "last in list"
assignment = await read_next_assignment_for_scope_for_user(current_assignment_id=current_assignment_id,
assignment_scope_id=assignment_scope_id,
user_id=permissions.user.user_id,
if assignment is None:
raise NoNextAssignmentWarning(f'Could not determine a next assignment for scope {assignment_scope_id}')
return await _construct_annotation_item(assignment=assignment, project_id=permissions.permissions.project_id)
class NoAssignments(Warning):
pass
@router.get('/annotate/next/{assignment_scope_id}', response_model=AnnotationItem)
async def get_next_open_assignment_for_scope_for_user(assignment_scope_id: str,
permissions=Depends(UserPermissionChecker('annotations_read'))):
assignment = await read_next_open_assignment_for_scope_for_user(assignment_scope_id=assignment_scope_id,
user_id=permissions.user.user_id,
# Either there are no assignments, or everything is done.
if assignment is None:
assignments = await read_assignments_for_scope_for_user(assignment_scope_id=assignment_scope_id,
user_id=permissions.user.user_id,
db_engine=db_engine, limit=1)
if len(assignments) > 0:
assignment = assignments[0]
else:
raise NoAssignments('This user has no assignments in this scope.')
return await _construct_annotation_item(assignment=assignment, project_id=permissions.permissions.project_id)
@router.get('/annotate/assignment/{assignment_id}', response_model=AnnotationItem)
async def get_assignment(assignment_id: str,
permissions=Depends(UserPermissionChecker('annotations_read'))):
assignment = await read_assignment(assignment_id=assignment_id, db_engine=db_engine)
if (assignment is None) or (assignment.user_id != permissions.user.user_id):
raise HTTPException(status_code=http_status.HTTP_401_UNAUTHORIZED,
detail='You do not have permission to handle this assignment, as it is not yours!')
return await _construct_annotation_item(assignment=assignment, project_id=permissions.permissions.project_id)
@router.get('/annotate/scopes/{project_id}', response_model=list[UserProjectAssignmentScope])
async def get_assignment_scopes_for_user(project_id: str,
permissions=Depends(UserPermissionChecker('annotations_read'))) \
-> list[UserProjectAssignmentScope]:
scopes = await read_assignment_scopes_for_project_for_user(project_id=project_id,
user_id=permissions.user.user_id,
@router.get('/annotate/scopes/', response_model=list[AssignmentScopeModel])
async def get_assignment_scopes_for_project(permissions=Depends(UserPermissionChecker('annotations_edit'))) \
-> list[AssignmentScopeModel]:
scopes = await read_assignment_scopes_for_project(project_id=permissions.permissions.project_id,
return scopes
@router.get('/annotate/scope/{assignment_scope_id}', response_model=AssignmentScopeModel)
async def get_assignment_scope(assignment_scope_id: str,
permissions=Depends(
UserPermissionChecker(['annotations_read', 'annotations_edit'], fulfill_all=False))
) -> AssignmentScopeModel:
scope = await read_assignment_scope(assignment_scope_id=assignment_scope_id, db_engine=db_engine)
if scope is not None:
return scope
raise AssignmentScopeNotFoundError(f'No assignment scope found in the DB for {assignment_scope_id}')
@router.put('/annotate/scope/', response_model=str)
async def put_assignment_scope(assignment_scope: AssignmentScopeModel,
permissions=Depends(UserPermissionChecker('annotations_edit'))) -> str:
key = await upsert_assignment_scope(assignment_scope=assignment_scope, db_engine=db_engine)
return str(key)
@router.delete('/annotate/scope/{assignment_scope_id}')
async def remove_assignment_scope(assignment_scope_id: str,
permissions=Depends(UserPermissionChecker('annotations_edit'))) -> None:
try:
await delete_assignment_scope(assignment_scope_id=assignment_scope_id, db_engine=db_engine)
except ValueError as e:
raise HTTPException(status_code=http_status.HTTP_400_BAD_REQUEST,
detail=str(e))
@router.get('/annotate/scope/counts/{assignment_scope_id}', response_model=AssignmentCounts)
async def get_num_assignments_for_scope(assignment_scope_id: str,
permissions=Depends(
UserPermissionChecker(['annotations_read', 'annotations_edit'],
scope = await read_assignment_counts_for_scope(assignment_scope_id=assignment_scope_id, db_engine=db_engine)
@router.get('/annotate/assignments/{assignment_scope_id}', response_model=list[AssignmentModel])
async def get_assignments(assignment_scope_id: str, permissions=Depends(UserPermissionChecker('annotations_read'))) \
-> list[AssignmentModel]:
assignments = await read_assignments_for_scope_for_user(assignment_scope_id=assignment_scope_id,
user_id=permissions.user.user_id,
return assignments
class ProgressIndicatorLabel(BaseModel):
repeat: int
value_int: int | None = None
value_bool: bool | None = None
class ProgressIndicator(BaseModel):
assignment_id: str | uuid.UUID
item_id: str | uuid.UUID
order: int
status: AssignmentStatus
labels: dict[str, list[ProgressIndicatorLabel]] | None = None
@router.get('/annotate/assignment/progress/{assignment_scope_id}', response_model=list[ProgressIndicator])
async def get_assignment_indicators_for_scope_for_user(assignment_scope_id: str,
permissions=Depends(UserPermissionChecker('annotations_read'))) \
-> list[ProgressIndicator]:
async with db_engine.session() as session: # type: AsyncSession
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
stmt = text('''
WITH tmp as (SELECT assignment.assignment_id,
assignment.item_id,
assignment."order",
assignment.status,
annotation.key,
jsonb_agg(jsonb_build_object('repeat', annotation.repeat,
'value_bool', annotation.value_bool,
'value_int', annotation.value_int)) as pl
FROM assignment
LEFT OUTER JOIN annotation ON assignment.assignment_id = annotation.assignment_id
WHERE assignment.user_id = :user_id
AND assignment.assignment_scope_id = :scope_id
GROUP BY assignment.assignment_id,
assignment.item_id,
assignment."order",
annotation.key)
SELECT assignment_id,
item_id,
"order",
status,
jsonb_object_agg(key, pl) filter (where key is not null ) as labels
FROM tmp
GROUP BY tmp.assignment_id,
tmp.item_id,
tmp."order",
tmp.status
ORDER BY tmp."order" ASC;
''')
results = (await session.execute(stmt, {
'scope_id': assignment_scope_id,
'user_id': permissions.user.user_id
})).mappings().all()
return [ProgressIndicator.parse_obj(r) for r in results]
@router.get('/annotate/assignments/scope/{assignment_scope_id}', response_model=list[AssignmentModel])
async def get_assignments_for_scope(assignment_scope_id: str,
permissions=Depends(UserPermissionChecker('annotations_read'))) \
-> list[AssignmentModel]:
assignments = await read_assignments_for_scope(assignment_scope_id=assignment_scope_id,
@router.get('/annotate/annotations/{assignment_scope_id}', response_model=list[AssignmentModel])
async def get_annotations(assignment_scope_id: str, permissions=Depends(UserPermissionChecker('annotations_read'))) \
-> list[AssignmentModel]:
assignments = await read_assignments_for_scope_for_user(assignment_scope_id=assignment_scope_id,
user_id=permissions.user.user_id,
@router.post('/annotate/save', response_model=AssignmentStatus)
async def save_annotation(annotated_item: AnnotatedItem,
permissions=Depends(UserPermissionChecker('annotations_read'))) -> AssignmentStatus:
# double-check, that the supposed assignment actually exists
if annotated_item.assignment.assignment_id is None:
raise MissingInformationError('Missing `assignment_id` in `annotation_item`!')
assignment_db = await read_assignment(assignment_id=annotated_item.assignment.assignment_id, db_engine=db_engine)
if assignment_db is None:
raise MissingInformationError('No assignment found!')
if permissions.user.user_id == assignment_db.user_id \
and str(assignment_db.assignment_scope_id) == annotated_item.assignment.assignment_scope_id \
and str(assignment_db.item_id) == annotated_item.assignment.item_id \
and str(assignment_db.annotation_scheme_id) == annotated_item.assignment.annotation_scheme_id:
annotations = annotated_scheme_to_annotations(annotated_item.scheme)
status = await upsert_annotations(annotations=annotations,
assignment_id=annotated_item.assignment.assignment_id,
if status is not None:
return status
raise SaveFailedError('Failed to save annotation!')
else:
raise HTTPException(
status_code=http_status.HTTP_403_FORBIDDEN,
detail='The combination of project, assignment, user, task, and item is invalid.',
@router.get('/config/items/', response_model=list[ItemWithCount])
async def get_items_with_count(permissions=Depends(UserPermissionChecker('dataset_read'))) \
-> list[ItemWithCount]:
items = await read_item_ids_with_assignment_count_for_project(project_id=permissions.permissions.project_id,
return items
class MakeAssignmentsRequestModel(BaseModel):
scope_id: str
config: AssignmentScopeConfig
save: bool = False
@router.post('/config/assignments/', response_model=list[AssignmentModel])
async def make_assignments(payload: MakeAssignmentsRequestModel,
permissions=Depends(UserPermissionChecker('annotations_edit'))):
if payload.config.config_type == 'random':
try:
assignments = await random_assignments(assignment_scope_id=payload.scope_id,
annotation_scheme_id=payload.annotation_scheme_id,
project_id=permissions.permissions.project_id,
config=payload.config,
engine=db_engine)
except ValueError as e:
raise HTTPException(status_code=http_status.HTTP_400_BAD_REQUEST,
detail=str(e))
elif payload.config.config_type == 'random_exclusion':
assignments = await random_assignments_with_exclusion(
assignment_scope_id=payload.scope_id,
annotation_scheme_id=payload.annotation_scheme_id,
project_id=permissions.permissions.project_id,
config=payload.config, # type: ignore[arg-type] # FIXME
engine=db_engine)
except ValueError as e:
raise HTTPException(status_code=http_status.HTTP_400_BAD_REQUEST,
detail=str(e))
else:
raise HTTPException(status_code=http_status.HTTP_501_NOT_IMPLEMENTED,
detail=f'Method "{payload.config.config_type}" is unknown.')
if payload.save:
await store_assignments(assignments=assignments, db_engine=db_engine)
@router.get('/config/scopes/{scheme_id}', response_model=list[AssignmentScopeModel])
async def get_assignment_scopes_for_scheme(scheme_id: str,
permissions=Depends(UserPermissionChecker('annotations_read'))) \
-> list[AssignmentScopeModel]:
async with db_engine.session() as session: # type: AsyncSession
return [AssignmentScopeModel.parse_obj(scope.__dict__)
for scope in (await session.execute(select(AssignmentScope)
.where(AssignmentScope.annotation_scheme_id == scheme_id))
).scalars().all()]
@router.get('/config/annotators/{scheme_id}', response_model=list[UserModel])
async def get_annotators_for_scheme(scheme_id: str,
permissions=Depends(UserPermissionChecker('annotations_edit'))) \
-> list[UserModel]:
async with db_engine.session() as session: # type: AsyncSession
return [UserModel.parse_obj(user.__dict__)
for user in (
await session.execute(select(User)
.join(Annotation)
.distinct()
.where(Annotation.annotation_scheme_id == scheme_id))).scalars().all()]
class ResolutionProposalResponse(BaseModel):
scheme_flat: list[FlattenedAnnotationSchemeLabel]
class SavedResolutionResponse(BaseModel):
saved: dict[str, list[GroupedBotAnnotation]]
@router.get('/config/resolve/', response_model=ResolutionProposalResponse)
async def get_resolved_annotations(strategy: ResolutionMethod,
scheme_id: str,
scope_id: list[str] | None = Query(default=None),
user_id: list[str] | None = Query(default=None),
key: list[str] | None = Query(default=None),
repeat: list[int] | None = Query(default=None),
ignore_order: bool | None = Query(default=False),
ignore_hierarchy: bool | None = Query(default=False),
permissions=Depends(UserPermissionChecker('annotations_edit'))):
"""
Get all annotations that match the filters (e.g. all annotations made by users in scope with :scope_id).
Annotations are returned in a 3D matrix:
rows (dict entries): items (key: item_id)
columns (list index of dict entry): Label (key in scheme + repeat); index map in matrix.keys
cells: list of annotations by each user for item/Label combination
:param strategy
:param scheme_id:
:param scope_id:
:param user_id:
:param key:
:param repeat:
:param permissions:
filters = AnnotationFilters(
scheme_id=scheme_id,
scope_id=scope_id,
user_id=user_id,
key=key,
repeat=repeat,
)
if ignore_hierarchy is None:
ignore_hierarchy = False
if ignore_order is None:
ignore_order = False
scheme, flat_labels, collection, resolved = \
await get_resolved_item_annotations(strategy=strategy,
filters=AnnotationFilterObject.parse_obj(filters),
ignore_order=ignore_order,
ignore_hierarchy=ignore_hierarchy,
db_engine=db_engine)
return ResolutionProposalResponse(collection=collection, proposal=resolved, scheme_flat=flat_labels)
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
class ResolutionPayload(BaseModel):
name: str
strategy: ResolutionMethod
filters: AnnotationFilters
ignore_order: bool
ignore_hierarchy: bool
collection: AnnotationCollectionDB
bot_annotations: list[BotAnnotationModel]
@router.put('/config/resolve/', response_model=str)
async def save_resolved_annotations(data: ResolutionPayload,
permissions=Depends(UserPermissionChecker('annotations_edit'))):
meta_id = await store_resolved_bot_annotations(
project_id=permissions.permissions.project_id, name=data.name, algorithm=data.strategy,
filters=data.filters, ignore_hierarchy=data.ignore_hierarchy, ignore_repeat=data.ignore_order,
collection=data.collection, bot_annotations=data.bot_annotations, db_engine=db_engine)
return meta_id
@router.put('/config/resolve/update')
async def update_resolved_annotations(bot_annotation_metadata_id: str,
name: str,
bot_annotations: list[BotAnnotationModel],
permissions=Depends(UserPermissionChecker('annotations_edit'))) -> None:
await update_resolved_bot_annotations(bot_annotation_metadata_id=bot_annotation_metadata_id,
name=name, bot_annotations=bot_annotations, db_engine=db_engine)
@router.get('/config/resolved-list/', response_model=list[BotAnnotationMetaDataBaseModel])
async def list_saved_resolved_annotations(permissions=Depends(UserPermissionChecker('annotations_read'))):
async with db_engine.session() as session: # type: AsyncSession
exports = (await session.execute(
select(BotAnnotationMetaData)
.where(BotAnnotationMetaData.project_id == permissions.permissions.project_id,
BotAnnotationMetaData.kind == BotKind.RESOLVE)
.options(load_only(BotAnnotationMetaData.bot_annotation_metadata_id,
BotAnnotationMetaData.annotation_scheme_id,
BotAnnotationMetaData.project_id,
BotAnnotationMetaData.name,
BotAnnotationMetaData.kind,
BotAnnotationMetaData.time_updated,
BotAnnotationMetaData.time_created)))) \
.scalars().all()
return [BotAnnotationMetaDataBaseModel.parse_obj(e.__dict__) for e in exports]
@router.get('/config/resolved/{bot_annotation_meta_id}', response_model=SavedResolutionResponse)
async def get_saved_resolved_annotations(bot_annotation_metadata_id: str,
permissions=Depends(UserPermissionChecker('annotations_edit'))):
bot_annotations = await read_bot_annotations(bot_annotation_metadata_id=bot_annotation_metadata_id,
db_engine=db_engine)
async with db_engine.session() as session: # type: AsyncSession
meta: BotAnnotationMetaData = (await session.execute(
select(BotAnnotationMetaData)
.where(BotAnnotationMetaData.bot_annotation_metadata_id == bot_annotation_metadata_id))) \
.scalars().one()
@router.delete('/config/resolved/{bot_annotation_meta_id}')
async def delete_saved_resolved_annotations(bot_annotation_metadata_id: str,
permissions=Depends(UserPermissionChecker('annotations_edit'))):
async with db_engine.session() as session: # type: AsyncSession
meta: BotAnnotationMetaData = (await session.execute(
select(BotAnnotationMetaData)
.where(BotAnnotationMetaData.bot_annotation_metadata_id == bot_annotation_metadata_id))) \
.scalars().one()
await session.delete(meta)