Skip to content
Snippets Groups Projects
Commit 42a1fab6 authored by Tim Repke's avatar Tim Repke
Browse files

improve error handling and add ping route for testing

parent d21e9773
No related branches found
No related tags found
No related merge requests found
from fastapi import APIRouter
from fastapi import APIRouter, status as http_status
from fastapi.responses import PlainTextResponse
from server.util.logging import get_logger
from server.util.security import InsufficientPermissions
logger = get_logger('nacsos.api.route.ping')
router = APIRouter()
......@@ -37,6 +38,11 @@ async def _warn() -> str:
raise ExampleWarning('Warning in your face!')
@router.get('/permission')
async def perm():
raise InsufficientPermissions('You do not have permission to edit this data import.')
@router.post('/{name}', response_class=PlainTextResponse)
async def _ping(name: str) -> str:
return f'Hello {name}'
import time
from typing import Literal
import json
from typing import Literal, Any
from pydantic import BaseModel
from fastapi import HTTPException, status as http_status
......@@ -30,34 +31,45 @@ class ErrorDetail(BaseModel):
level: Literal['WARNING', 'ERROR']
# The message/cause of the Warning/Exception
message: str
# attached args
args: list[Any]
class ErrorHandlingMiddleware(BaseHTTPMiddleware):
@classmethod
def _resolve_args(cls, ew: Exception | Warning):
def _resolve_args(cls, ew: Exception | Warning) -> list[Any]:
if hasattr(ew, 'args') and ew.args is not None and len(ew.args) > 0:
return ' | '.join([str(arg) for arg in ew.args])
return repr(ew)
ret = []
for arg in ew.args:
try:
json.dumps(arg) # test if this is json-serializable
ret.append(arg)
except TypeError:
ret.append(repr(arg))
return ret
return [repr(ew)]
@classmethod
def _resolve_status(cls, ew: Exception | Warning) -> http_status:
if hasattr(ew, 'status'):
return ew.status
return http_status.HTTP_400_BAD_REQUEST
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
try:
response = await call_next(request)
return response
except Warning as w:
logger.exception(w)
return await http_exception_handler(request,
exc=HTTPException(
status_code=http_status.HTTP_400_BAD_REQUEST,
detail=ErrorDetail(level='WARNING', type=w.__class__.__name__,
message=self._resolve_args(w)).dict()))
except Exception as ex:
logger.exception(ex)
return await http_exception_handler(request,
exc=HTTPException(status_code=http_status.HTTP_400_BAD_REQUEST,
detail=ErrorDetail(level='ERROR',
type=ex.__class__.__name__,
message=self._resolve_args(
ex)).dict()))
except (Exception, Warning) as ew:
logger.exception(ew)
return await http_exception_handler(
request,
exc=HTTPException(
status_code=self._resolve_status(ew),
detail=ErrorDetail(level='WARNING' if isinstance(ew, Warning) else 'ERROR',
type=ew.__class__.__name__,
message=str(ew),
args=self._resolve_args(ew)).dict()
))
class TimingMiddleware(BaseHTTPMiddleware):
......
......@@ -3,7 +3,7 @@ from datetime import timedelta, datetime
from pydantic import BaseModel
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status, Header
from fastapi import Depends, HTTPException, status as http_status, Header
from fastapi.security import OAuth2PasswordBearer
from nacsos_data.models.users import UserModel
......@@ -19,6 +19,10 @@ from server.util.logging import get_logger
logger = get_logger('nacsos.util.security')
class InsufficientPermissions(Exception):
status = http_status.HTTP_403_FORBIDDEN
class UserPermissions(BaseModel):
user: UserModel
permissions: ProjectPermissionsModel
......@@ -67,7 +71,7 @@ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
status_code=http_status.HTTP_401_UNAUTHORIZED,
detail='Could not validate credentials',
headers={'WWW-Authenticate': 'Bearer'},
)
......@@ -96,14 +100,14 @@ async def get_current_user(token: str = Depends(oauth2_scheme)):
async def get_current_active_user(current_user: UserModel = Depends(get_current_user)) -> UserModel:
if not current_user.is_active:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Inactive user')
raise HTTPException(status_code=http_status.HTTP_400_BAD_REQUEST, detail='Inactive user')
return current_user
def get_current_active_superuser(current_user: UserModel = Depends(get_current_active_user)) -> UserModel:
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="The user doesn't have enough privileges"
status_code=http_status.HTTP_400_BAD_REQUEST, detail="The user doesn't have enough privileges"
)
return current_user
......@@ -156,22 +160,20 @@ class UserPermissionChecker:
# check that each required permission is fulfilled
for permission in self.permissions:
if self.fulfill_all and not project_permissions[permission]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f'User does not have permission "{permission}" for project "{x_project_id}".',
raise InsufficientPermissions(
f'User does not have permission "{permission}" for project "{x_project_id}".'
)
any_permission_fulfilled = any_permission_fulfilled or project_permissions[permission]
if not any_permission_fulfilled and not self.fulfill_all:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f'User does not have any of the required permissions ({self.permissions}) '
f'for project "{x_project_id}".',
raise InsufficientPermissions(
f'User does not have any of the required permissions ({self.permissions}) '
f'for project "{x_project_id}".'
)
return user_permissions
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
status_code=http_status.HTTP_403_FORBIDDEN,
detail=f'User does not have permission to access project "{x_project_id}".',
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment