import uuid from typing import TYPE_CHECKING from pydantic import BaseModel from sqlalchemy import select, func as F, distinct, text from sqlalchemy.orm import load_only from fastapi import APIRouter, Depends, HTTPException, status as http_status, Query from nacsos_data.db.schemas import ( BotAnnotationMetaData, AssignmentScope, User, Annotation, BotAnnotation, Assignment ) from nacsos_data.models.annotations import ( AnnotationSchemeModel, AssignmentScopeModel, AssignmentModel, AssignmentStatus, AssignmentScopeConfig, AnnotationSchemeModelFlat ) from nacsos_data.models.bot_annotations import ( BotKind, BotAnnotationMetaDataBaseModel, BotAnnotationResolution, ResolutionMatrix, BotMetaResolveBase, ResolutionProposal ) from nacsos_data.models.users import UserModel from nacsos_data.models.items import AnyItemModel from nacsos_data.db.crud.items import read_any_item_by_item_id from nacsos_data.db.crud.projects import read_project_by_id from nacsos_data.db.crud.annotations import ( read_assignment, read_assignments_for_scope, read_assignments_for_scope_for_user, read_assignment_scopes_for_project, read_assignment_scopes_for_project_for_user, read_annotations_for_assignment, read_next_assignment_for_scope_for_user, read_next_open_assignment_for_scope_for_user, read_annotation_schemes_for_project, upsert_annotations, read_assignment_scope, upsert_annotation_scheme, delete_annotation_scheme, upsert_assignment_scope, delete_assignment_scope, read_item_ids_with_assignment_count_for_project, read_assignment_counts_for_scope, ItemWithCount, AssignmentCounts, UserProjectAssignmentScope, store_assignments, store_resolved_bot_annotations, update_resolved_bot_annotations, read_assignment_overview_for_scope, AssignmentScopeEntry, read_resolved_bot_annotations, read_resolved_bot_annotation_meta, read_resolved_bot_annotations_for_meta ) from nacsos_data.util.annotations.resolve import ( get_resolved_item_annotations, read_annotation_scheme ) from nacsos_data.util.annotations.validation import ( merge_scheme_and_annotations, annotated_scheme_to_annotations, flatten_annotation_scheme ) from nacsos_data.util.annotations.assignments.random import random_assignments from nacsos_data.util.annotations.assignments.random_exclusion import random_assignments_with_exclusion from nacsos_data.util.annotations.assignments.random_nql import random_assignments_with_nql from server.api.errors import ( SaveFailedError, AssignmentScopeNotFoundError, NoNextAssignmentWarning, ProjectNotFoundError, AnnotationSchemeNotFoundError, MissingInformationError, RemainingDependencyWarning ) from server.util.security import UserPermissionChecker from server.data import db_engine if TYPE_CHECKING: from sqlalchemy.ext.asyncio import AsyncSession # noqa F401 router = APIRouter() class AnnotatedItem(BaseModel): scheme: AnnotationSchemeModel assignment: AssignmentModel class AnnotationItem(AnnotatedItem): scope: AssignmentScopeModel item: AnyItemModel @router.get('/schemes/definition/{annotation_scheme_id}', response_model=AnnotationSchemeModelFlat | AnnotationSchemeModel) async def get_scheme_definition(annotation_scheme_id: str, flat: bool = Query(default=False), permissions=Depends(UserPermissionChecker('annotations_read'))) \ -> AnnotationSchemeModelFlat | AnnotationSchemeModel: """ This endpoint returns the detailed definition of an annotation scheme. :param annotation_scheme_id: database id of the annotation scheme. :param flat: True to get the flattened scheme :param permissions: :return: a single annotation scheme """ scheme: AnnotationSchemeModel | None = await read_annotation_scheme(annotation_scheme_id=annotation_scheme_id, db_engine=db_engine) if scheme is not None: if flat: return flatten_annotation_scheme(scheme) return scheme raise AnnotationSchemeNotFoundError(f'No `AnnotationScheme` found in DB for id {annotation_scheme_id}') @router.put('/schemes/definition/', response_model=str) async def put_annotation_scheme(annotation_scheme: AnnotationSchemeModel, permissions=Depends(UserPermissionChecker('annotations_edit'))) -> str: key = await upsert_annotation_scheme(annotation_scheme=annotation_scheme, db_engine=db_engine) return str(key) @router.delete('/schemes/definition/{scheme_id}') async def remove_annotation_scheme(annotation_scheme_id: str, permissions=Depends(UserPermissionChecker('annotations_edit'))) -> None: await delete_annotation_scheme(annotation_scheme_id=annotation_scheme_id, db_engine=db_engine, use_commit=True) @router.get('/schemes/list/{project_id}', response_model=list[AnnotationSchemeModel]) async def get_scheme_definitions_for_project(project_id: str, permissions=Depends(UserPermissionChecker('annotations_read'))) \ -> list[AnnotationSchemeModel]: """ This endpoint returns the detailed definitions of all annotation schemes associated with a project. :param project_id: database id of the project :param permissions: :return: list of annotation schemes """ return await read_annotation_schemes_for_project(project_id=project_id, db_engine=db_engine) async def _construct_annotation_item(assignment: AssignmentModel, project_id: str) -> AnnotationItem: if assignment.assignment_id is None: raise MissingInformationError('No `assignment_id` set for `assignment`.') scope = await read_assignment_scope(assignment_scope_id=assignment.assignment_scope_id, db_engine=db_engine) if scope is None: raise AnnotationSchemeNotFoundError(f'No annotation scope found in DB for id ' f'{assignment.assignment_scope_id}') scheme = await read_annotation_scheme(annotation_scheme_id=assignment.annotation_scheme_id, db_engine=db_engine) if scheme is None: raise AnnotationSchemeNotFoundError(f'No annotation scheme found in DB for id ' f'{assignment.annotation_scheme_id}') annotations = await read_annotations_for_assignment(assignment_id=assignment.assignment_id, db_engine=db_engine) merged_scheme = merge_scheme_and_annotations(annotation_scheme=scheme, annotations=annotations) project = await read_project_by_id(project_id=project_id, engine=db_engine) if project is None: raise ProjectNotFoundError(f'No project found in DB for id {project_id}') item = await read_any_item_by_item_id(item_id=assignment.item_id, item_type=project.type, engine=db_engine) if item is None: raise MissingInformationError(f'No item found in DB for id {assignment.item_id}') return AnnotationItem(scheme=merged_scheme, assignment=assignment, scope=scope, item=item) @router.get('/annotate/next/{assignment_scope_id}/{current_assignment_id}', response_model=AnnotationItem) async def get_next_assignment_for_scope_for_user(assignment_scope_id: str, current_assignment_id: str, permissions=Depends(UserPermissionChecker('annotations_read'))): # FIXME response for "last in list" assignment = await read_next_assignment_for_scope_for_user(current_assignment_id=current_assignment_id, assignment_scope_id=assignment_scope_id, user_id=permissions.user.user_id, db_engine=db_engine) if assignment is None: raise NoNextAssignmentWarning(f'Could not determine a next assignment for scope {assignment_scope_id}') return await _construct_annotation_item(assignment=assignment, project_id=permissions.permissions.project_id) class NoAssignments(Warning): pass @router.get('/annotate/next/{assignment_scope_id}', response_model=AnnotationItem) async def get_next_open_assignment_for_scope_for_user(assignment_scope_id: str, permissions=Depends(UserPermissionChecker('annotations_read'))): assignment = await read_next_open_assignment_for_scope_for_user(assignment_scope_id=assignment_scope_id, user_id=permissions.user.user_id, db_engine=db_engine) # Either there are no assignments, or everything is done. if assignment is None: assignments = await read_assignments_for_scope_for_user(assignment_scope_id=assignment_scope_id, user_id=permissions.user.user_id, db_engine=db_engine, limit=1) if len(assignments) > 0: assignment = assignments[0] else: raise NoAssignments('This user has no assignments in this scope.') return await _construct_annotation_item(assignment=assignment, project_id=permissions.permissions.project_id) @router.get('/annotate/assignment/{assignment_id}', response_model=AnnotationItem) async def get_assignment(assignment_id: str, permissions=Depends(UserPermissionChecker('annotations_read'))): assignment = await read_assignment(assignment_id=assignment_id, db_engine=db_engine) if (assignment is None) or (assignment.user_id != permissions.user.user_id): raise HTTPException(status_code=http_status.HTTP_401_UNAUTHORIZED, detail='You do not have permission to handle this assignment, as it is not yours!') return await _construct_annotation_item(assignment=assignment, project_id=permissions.permissions.project_id) @router.get('/annotate/scopes/{project_id}', response_model=list[UserProjectAssignmentScope]) async def get_assignment_scopes_for_user(project_id: str, permissions=Depends(UserPermissionChecker('annotations_read'))) \ -> list[UserProjectAssignmentScope]: scopes = await read_assignment_scopes_for_project_for_user(project_id=project_id, user_id=permissions.user.user_id, db_engine=db_engine) return scopes @router.get('/annotate/scopes/', response_model=list[AssignmentScopeModel]) async def get_assignment_scopes_for_project(permissions=Depends(UserPermissionChecker('annotations_edit'))) \ -> list[AssignmentScopeModel]: scopes = await read_assignment_scopes_for_project(project_id=permissions.permissions.project_id, db_engine=db_engine) return scopes @router.get('/annotate/scope/{assignment_scope_id}', response_model=AssignmentScopeModel) async def get_assignment_scope(assignment_scope_id: str, permissions=Depends( UserPermissionChecker(['annotations_read', 'annotations_edit'], fulfill_all=False)) ) -> AssignmentScopeModel: scope = await read_assignment_scope(assignment_scope_id=assignment_scope_id, db_engine=db_engine) if scope is not None: return scope raise AssignmentScopeNotFoundError(f'No assignment scope found in the DB for {assignment_scope_id}') @router.put('/annotate/scope/', response_model=str) async def put_assignment_scope(assignment_scope: AssignmentScopeModel, permissions=Depends(UserPermissionChecker('annotations_edit'))) -> str: key = await upsert_assignment_scope(assignment_scope=assignment_scope, db_engine=db_engine) return str(key) @router.delete('/annotate/scope/{assignment_scope_id}') async def remove_assignment_scope(assignment_scope_id: str, permissions=Depends(UserPermissionChecker('annotations_edit'))) -> None: try: await delete_assignment_scope(assignment_scope_id=assignment_scope_id, db_engine=db_engine, use_commit=True) except ValueError as e: raise HTTPException(status_code=http_status.HTTP_400_BAD_REQUEST, detail=str(e)) @router.get('/annotate/scope/counts/{assignment_scope_id}', response_model=AssignmentCounts) async def get_num_assignments_for_scope(assignment_scope_id: str, permissions=Depends( UserPermissionChecker(['annotations_read', 'annotations_edit'], fulfill_all=False)) ) -> AssignmentCounts: scope = await read_assignment_counts_for_scope(assignment_scope_id=assignment_scope_id, db_engine=db_engine) return scope @router.get('/annotate/assignments/{assignment_scope_id}', response_model=list[AssignmentModel]) async def get_assignments(assignment_scope_id: str, permissions=Depends(UserPermissionChecker('annotations_read'))) \ -> list[AssignmentModel]: assignments = await read_assignments_for_scope_for_user(assignment_scope_id=assignment_scope_id, user_id=permissions.user.user_id, db_engine=db_engine) return assignments @router.get('/annotate/assignment/progress/{assignment_scope_id}', response_model=list[AssignmentScopeEntry]) async def get_assignment_indicators_for_scope(assignment_scope_id: str, permissions=Depends(UserPermissionChecker('annotations_read'))) \ -> list[AssignmentScopeEntry]: return await read_assignment_overview_for_scope(assignment_scope_id=assignment_scope_id, connection=db_engine) @router.get('/annotate/assignments/scope/{assignment_scope_id}', response_model=list[AssignmentModel]) async def get_assignments_for_scope(assignment_scope_id: str, permissions=Depends(UserPermissionChecker('annotations_read'))) \ -> list[AssignmentModel]: assignments = await read_assignments_for_scope(assignment_scope_id=assignment_scope_id, db_engine=db_engine) return assignments @router.get('/annotate/annotations/{assignment_scope_id}', response_model=list[AssignmentModel]) async def get_annotations(assignment_scope_id: str, permissions=Depends(UserPermissionChecker('annotations_read'))) \ -> list[AssignmentModel]: assignments = await read_assignments_for_scope_for_user(assignment_scope_id=assignment_scope_id, user_id=permissions.user.user_id, db_engine=db_engine) return assignments @router.post('/annotate/save', response_model=AssignmentStatus) async def save_annotation(annotated_item: AnnotatedItem, permissions=Depends(UserPermissionChecker('annotations_read'))) -> AssignmentStatus: # double-check, that the supposed assignment actually exists if annotated_item.assignment.assignment_id is None: raise MissingInformationError('Missing `assignment_id` in `annotation_item`!') assignment_db = await read_assignment(assignment_id=annotated_item.assignment.assignment_id, db_engine=db_engine) if assignment_db is None: raise MissingInformationError('No assignment found!') if permissions.user.user_id == assignment_db.user_id \ and str(assignment_db.assignment_scope_id) == annotated_item.assignment.assignment_scope_id \ and str(assignment_db.item_id) == annotated_item.assignment.item_id \ and str(assignment_db.annotation_scheme_id) == annotated_item.assignment.annotation_scheme_id: annotations = annotated_scheme_to_annotations(annotated_item.scheme) status = await upsert_annotations(annotations=annotations, assignment_id=annotated_item.assignment.assignment_id, db_engine=db_engine) if status is not None: return status raise SaveFailedError('Failed to save annotation!') else: raise HTTPException( status_code=http_status.HTTP_403_FORBIDDEN, detail='The combination of project, assignment, user, task, and item is invalid.', ) @router.get('/config/items/', response_model=list[ItemWithCount]) async def get_items_with_count(permissions=Depends(UserPermissionChecker('dataset_read'))) \ -> list[ItemWithCount]: items = await read_item_ids_with_assignment_count_for_project(project_id=permissions.permissions.project_id, db_engine=db_engine) return items class MakeAssignmentsRequestModel(BaseModel): annotation_scheme_id: str scope_id: str config: AssignmentScopeConfig save: bool = False @router.post('/config/assignments/', response_model=list[AssignmentModel]) async def make_assignments(payload: MakeAssignmentsRequestModel, permissions=Depends(UserPermissionChecker('annotations_edit'))): if payload.config.config_type == 'random': try: assignments = await random_assignments(assignment_scope_id=payload.scope_id, annotation_scheme_id=payload.annotation_scheme_id, project_id=permissions.permissions.project_id, config=payload.config, engine=db_engine) except ValueError as e: raise HTTPException(status_code=http_status.HTTP_400_BAD_REQUEST, detail=str(e)) elif payload.config.config_type == 'random_exclusion': try: assignments = await random_assignments_with_exclusion( assignment_scope_id=payload.scope_id, annotation_scheme_id=payload.annotation_scheme_id, project_id=permissions.permissions.project_id, config=payload.config, # type: ignore[arg-type] # FIXME engine=db_engine) except ValueError as e: raise HTTPException(status_code=http_status.HTTP_400_BAD_REQUEST, detail=str(e)) elif payload.config.config_type == 'random_nql': try: assignments = await random_assignments_with_nql( assignment_scope_id=payload.scope_id, annotation_scheme_id=payload.annotation_scheme_id, project_id=permissions.permissions.project_id, config=payload.config, # type: ignore[arg-type] # FIXME engine=db_engine ) except ValueError as e: raise HTTPException(status_code=http_status.HTTP_400_BAD_REQUEST, detail=str(e)) else: raise HTTPException(status_code=http_status.HTTP_501_NOT_IMPLEMENTED, detail=f'Method "{payload.config.config_type}" is unknown.') if payload.save: await store_assignments(assignments=assignments, db_engine=db_engine, use_commit=True) return assignments @router.post('/config/scopes/clear/{scheme_id}') async def clear_empty_assignments(scope_id: str, permissions=Depends(UserPermissionChecker('annotations_edit'))) -> None: """ Drop all assignments in a scope that are still incomplete... :param scope_id: :param permissions: :return: """ async with db_engine.session() as session: # type: AsyncSession stmt = text(''' DELETE FROM assignment WHERE assignment_id IN ( WITH counts AS ( SELECT ass.assignment_id, count(ann.assignment_id) as cnt FROM assignment ass LEFT OUTER JOIN annotation ann ON ass.assignment_id = ann.assignment_id WHERE ass.assignment_scope_id = :scope_id GROUP BY ass.assignment_id ) SELECT assignment_id FROM counts WHERE cnt = 0 );''') await session.execute(stmt, {'scope_id': scope_id}) await session.commit() class AssignmentEditInfo(BaseModel): scope_id: str scheme_id: str item_id: str user_id: str order: int @router.put('/config/assignments/edit/', response_model=AssignmentModel) async def edit_assignment(info: AssignmentEditInfo, permissions=Depends(UserPermissionChecker('annotations_edit'))) -> AssignmentModel: async with db_engine.session() as session: # type: AsyncSession # Check, if we already have an assignment for this... assignment = (await session.execute( select(Assignment) .where( Assignment.item_id == info.item_id, Assignment.user_id == info.user_id, Assignment.assignment_scope_id == info.scope_id ))).scalars().one_or_none() n_annotations: int = (await session.execute( # type: ignore[assignment] select(F.count(Annotation.annotation_id).label('n_annotations')) .join(Assignment) .where(Assignment.item_id == info.item_id, Assignment.user_id == info.user_id, Assignment.assignment_scope_id == info.scope_id))).scalar() # yes we do, drop this assignment! if assignment is not None: model = AssignmentModel.model_validate(assignment.__dict__) if n_annotations == 0: await session.delete(assignment) return model raise RemainingDependencyWarning('Assignment has annotations, won\'t delete!') # seems to be a new one, create it! assignment = Assignment( assignment_id=uuid.uuid4(), item_id=info.item_id, user_id=info.user_id, assignment_scope_id=info.scope_id, annotation_scheme_id=info.scheme_id, order=info.order, status=AssignmentStatus.OPEN ) session.add(assignment) model = AssignmentModel.model_validate(assignment.__dict__) await session.commit() return model @router.get('/config/scopes/{scheme_id}', response_model=list[AssignmentScopeModel]) async def get_assignment_scopes_for_scheme(scheme_id: str, permissions=Depends(UserPermissionChecker('annotations_read'))) \ -> list[AssignmentScopeModel]: async with db_engine.session() as session: # type: AsyncSession scopes = await session.execute(select(AssignmentScope) .where(AssignmentScope.annotation_scheme_id == scheme_id)) return [AssignmentScopeModel.model_validate(scope) for scope in scopes.mappings().all()] @router.get('/config/annotators/{scheme_id}', response_model=list[UserModel]) async def get_annotators_for_scheme(scheme_id: str, permissions=Depends(UserPermissionChecker('annotations_edit'))) \ -> list[UserModel]: async with db_engine.session() as session: # type: AsyncSession return [UserModel.model_validate(user.__dict__) for user in ( await session.execute(select(User) .join(Annotation) .distinct() .where(Annotation.annotation_scheme_id == scheme_id))).scalars().all()] @router.post('/config/resolve', response_model=ResolutionProposal) async def get_resolved_annotations(settings: BotMetaResolveBase, assignment_scope_id: str | None = None, bot_annotation_metadat_id: str | None = None, include_empty: bool = False, include_new: bool = False, update_existing: bool = False, permissions=Depends(UserPermissionChecker('annotations_edit'))) \ -> ResolutionProposal: """ Get all annotations that match the filters (e.g. all annotations made by users in scope with :scope_id). :param include_new: :param update_existing: :param assignment_scope_id: :param bot_annotation_metadat_id: :param include_empty: :param settings :param permissions: :return: """ if include_empty is None: include_empty = True # type: ignore[unreachable] if include_new is None: include_new = False # type: ignore[unreachable] if update_existing is None: update_existing = False # type: ignore[unreachable] if bot_annotation_metadat_id is not None: return await read_resolved_bot_annotations(db_engine=db_engine, existing_resolution=bot_annotation_metadat_id, include_new=include_new, include_empty=include_empty, update_existing=update_existing) if assignment_scope_id is None: raise ValueError('Missing assignment scope') return await get_resolved_item_annotations(strategy=settings.algorithm, assignment_scope_id=assignment_scope_id, ignore_repeat=settings.ignore_repeat, ignore_hierarchy=settings.ignore_hierarchy, include_new=include_new, include_empty=include_empty, update_existing=update_existing, db_engine=db_engine) class SavedResolution(BaseModel): meta: BotAnnotationResolution proposal: ResolutionProposal @router.get('/config/resolved/{bot_annotation_metadata_id}', response_model=SavedResolution) async def get_saved_resolved_annotations(bot_annotation_metadata_id: str, permissions=Depends(UserPermissionChecker('annotations_edit'))) \ -> SavedResolution: async with db_engine.session() as session: # type: AsyncSession bot_meta = await read_resolved_bot_annotation_meta(bot_annotation_metadata_id=bot_annotation_metadata_id, session=session) proposal = await read_resolved_bot_annotations_for_meta(session=session, bot_meta=bot_meta, include_new=False, include_empty=False, update_existing=False) return SavedResolution(meta=bot_meta, proposal=proposal) @router.put('/config/resolve/', response_model=str) async def save_resolved_annotations(settings: BotMetaResolveBase, matrix: ResolutionMatrix, name: str, assignment_scope_id: str, annotation_scheme_id: str, permissions=Depends(UserPermissionChecker('annotations_edit'))): meta_id = await store_resolved_bot_annotations(db_engine=db_engine, use_commit=True, project_id=permissions.permissions.project_id, assignment_scope_id=assignment_scope_id, annotation_scheme_id=annotation_scheme_id, name=name, algorithm=settings.algorithm, ignore_hierarchy=settings.ignore_hierarchy, ignore_repeat=settings.ignore_repeat, matrix=matrix) return meta_id @router.put('/config/resolve/update') async def update_resolved_annotations(bot_annotation_metadata_id: str, name: str, matrix: ResolutionMatrix, permissions=Depends(UserPermissionChecker('annotations_edit'))) -> None: # TODO: allow update of filters and settings? await update_resolved_bot_annotations(bot_annotation_metadata_id=bot_annotation_metadata_id, name=name, matrix=matrix, db_engine=db_engine, use_commit=True) @router.get('/config/resolved-list/', response_model=list[BotAnnotationMetaDataBaseModel]) async def list_saved_resolved_annotations(annotation_scheme_id: str | None = None, permissions=Depends(UserPermissionChecker('annotations_read'))): async with db_engine.session() as session: # type: AsyncSession stmt = ( select(BotAnnotationMetaData) .where(BotAnnotationMetaData.project_id == permissions.permissions.project_id, BotAnnotationMetaData.kind == BotKind.RESOLVE) .order_by(BotAnnotationMetaData.time_created) .options(load_only(BotAnnotationMetaData.bot_annotation_metadata_id, BotAnnotationMetaData.annotation_scheme_id, BotAnnotationMetaData.assignment_scope_id, BotAnnotationMetaData.project_id, BotAnnotationMetaData.name, BotAnnotationMetaData.kind, BotAnnotationMetaData.time_updated, BotAnnotationMetaData.time_created)) ) if annotation_scheme_id is not None: stmt = stmt.where(BotAnnotationMetaData.annotation_scheme_id == annotation_scheme_id) exports = (await session.execute(stmt)).scalars().all() return [BotAnnotationMetaDataBaseModel.model_validate(e.__dict__) for e in exports] @router.delete('/config/resolved/{bot_annotation_meta_id}') async def delete_saved_resolved_annotations(bot_annotation_metadata_id: str, permissions=Depends(UserPermissionChecker('annotations_edit'))): async with db_engine.session() as session: # type: AsyncSession meta: BotAnnotationMetaData | None = (await session.execute( select(BotAnnotationMetaData) .where(BotAnnotationMetaData.bot_annotation_metadata_id == bot_annotation_metadata_id))) \ .scalars().one_or_none() if meta is not None: await session.delete(meta) await session.commit() # TODO: do we need to commit? # TODO: ensure bot_annotations are deleted via cascade class BotMetaInfo(BotAnnotationMetaDataBaseModel): num_annotations: int num_annotated_items: int @router.get('/bot/annotations') async def get_bot_annotations(include_resolve: bool = False, permissions=Depends(UserPermissionChecker('annotations_read'))) -> list[BotMetaInfo]: async with db_engine.session() as session: # type: AsyncSession stmt = (select(BotAnnotationMetaData, F.count(BotAnnotation.bot_annotation_id).label('num_annotations'), F.count(distinct(BotAnnotation.item_id)).label('num_annotated_items')) .join(BotAnnotation, BotAnnotation.bot_annotation_metadata_id == BotAnnotationMetaData.bot_annotation_metadata_id) # TODO: filter for != RESOLVE .where(BotAnnotationMetaData.project_id == permissions.permissions.project_id) .group_by(BotAnnotationMetaData.bot_annotation_metadata_id)) rslt = (await session.execute(stmt)).mappings().all() if rslt: return [BotMetaInfo.model_validate({ **r['BotAnnotationMetaData'].__dict__, 'num_annotations': r['num_annotations'], 'num_annotated_items': r['num_annotated_items'] }) for r in rslt] return []