2from flask_jwt_extended
import create_access_token, get_jwt_identity, get_jwt
3from flask
import request
4from SQLAdminConnections
import SQL_AdminConnector
as SQLC
5from SQLAdminConnections
import SQL_AdminQuerys
as SQLQ
13 additional_claims = {
"user_id": user_id}
14 access_token = create_access_token(identity=user_id, additional_claims=additional_claims)
15 jti = get_jwt()[
"jti"]
22 auth_header = request.headers.get(
'Authorization',
None)
24 token = auth_header.split()[1]
27 return {
"AUTH":
False,
"Reason":
"No token provided"}
28 except Exception
as e:
29 print(f
"An error occurred: {e}")
30 return {
"AUTH":
False,
"Reason":
"Unable to verify token"}
35 connection = SQLC.SQLConAdmin()
37 connection.execute_query(SQLQ.SQLQueries.use_users_database())
38 result = connection.execute_query(SQLQ.SQLQueries.check_token_validity(token))
39 if result
and result[0][0]:
42 return {
"AUTH":
False,
"Reason":
"Token invalid or revoked"}
43 except Exception
as e:
44 print(f
"An error occurred while checking token validity: {e}")
45 return {
"AUTH":
False,
"Reason":
"Database error during token check"}
53 user_identity = get_jwt_identity()
54 user_id = user_identity[
"user_id"]
56 except Exception
as e:
57 print(f
"An error occurred while logging out: {e}")
64 connection = SQLC.SQLConAdmin()
66 connection.execute_query(SQLQ.SQLQueries.use_users_database())
67 connection.execute_query(SQLQ.SQLQueries.revoke_tokens_by_user_id(user_id))
68 connection.cnx.commit()
69 except Exception
as e:
70 print(f
"An error occurred while revoking tokens: {e}")
80 connection = SQLC.SQLConAdmin()
82 connection.execute_query(SQLQ.SQLQueries.use_users_database())
83 connection.execute_query(SQLQ.SQLQueries.insert_token_id(jti, user_id))
84 connection.cnx.commit()
85 except Exception
as e:
86 print(f
"An error occurred while storing token: {e}")
store_token(self, user_id, jti)
revoke_tokens_by_user_id(self, user_id)
check_token_validity(self, token)