1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
# -*- coding: utf-8 -*-
from __future__ import absolute_import
try:
import Cookie
except ImportError:
import http.cookies as Cookie
import sys
import jwt
from . import ApiController, RESTController, \
allow_empty_body, set_cookies
from .. import logger, mgr
from ..exceptions import DashboardException
from ..services.auth import AuthManager, JwtManager
from ..services.access_control import UserDoesNotExist
# Python 3.8 introduced `samesite` attribute:
# https://docs.python.org/3/library/http.cookies.html#morsel-objects
if sys.version_info < (3, 8):
Cookie.Morsel._reserved["samesite"] = "SameSite" # type: ignore # pylint: disable=W0212
@ApiController('/auth', secure=False)
class Auth(RESTController):
"""
Provide authenticates and returns JWT token.
"""
def create(self, username, password):
user_perms = AuthManager.authenticate(username, password)
if user_perms is not None:
url_prefix = 'https' if mgr.get_localized_module_option('ssl') else 'http'
logger.debug('Login successful')
token = JwtManager.gen_token(username)
# For backward-compatibility: PyJWT versions < 2.0.0 return bytes.
token = token.decode('utf-8') if isinstance(token, bytes) else token
set_cookies(url_prefix, token)
return {
'token': token,
'username': username,
'permissions': user_perms
}
logger.debug('Login failed')
raise DashboardException(msg='Invalid credentials',
code='invalid_credentials',
component='auth')
@RESTController.Collection('POST')
@allow_empty_body
def logout(self):
logger.debug('Logout successful')
token = JwtManager.get_token_from_header()
JwtManager.blacklist_token(token)
redirect_url = '#/login'
if mgr.SSO_DB.protocol == 'saml2':
redirect_url = 'auth/saml2/slo'
return {
'redirect_url': redirect_url
}
def _get_login_url(self):
if mgr.SSO_DB.protocol == 'saml2':
return 'auth/saml2/login'
return '#/login'
@RESTController.Collection('POST')
def check(self, token):
if token:
try:
token = JwtManager.decode_token(token)
if not JwtManager.is_blacklisted(token['jti']):
user = AuthManager.get_user(token['username'])
if user.lastUpdate <= token['iat']:
return {
'username': user.username,
'permissions': user.permissions_dict(),
}
logger.debug("AMT: user info changed after token was"
" issued, iat=%s lastUpdate=%s",
token['iat'], user.lastUpdate)
else:
logger.debug('AMT: Token is black-listed')
except jwt.exceptions.ExpiredSignatureError:
logger.debug("AMT: Token has expired")
except jwt.exceptions.InvalidTokenError:
logger.debug("AMT: Failed to decode token")
except UserDoesNotExist:
logger.debug("AMT: Invalid token: user %s does not exist",
token['username'])
return {
'login_url': self._get_login_url()
}
|