Rapportsystem API - of D03N
Hovedprosjekt FiV Programmering 20-24
Loading...
Searching...
No Matches
tokenHandler.py
Go to the documentation of this file.
1import uuid
2from flask_jwt_extended import create_access_token, get_jwt_identity, get_jwt
3from flask import request
4from SQLAdminConnections import SQL_AdminConnector as SQLC
5from SQLAdminConnections import SQL_AdminQuerys as SQLQ
6
8 def __init__(self):
9 pass
10
11 # Generates a JWT for a given user
12 def login(self, user_id):
13 additional_claims = {"user_id": user_id}
14 access_token = create_access_token(identity=user_id, additional_claims=additional_claims)
15 jti = get_jwt()["jti"] # Extracting the JTI from the token
16 self.store_token(user_id, jti) # Storing the token's JTI in the database
17 return access_token
18
19 # Validates a given JWT
21 try:
22 auth_header = request.headers.get('Authorization', None)
23 if auth_header:
24 token = auth_header.split()[1] # Splitter 'Bearer <token>' og tar den andre delen
25 return self.check_token_validity(token)
26 else:
27 return {"AUTH": False, "Reason": "No token provided"}
28 except Exception as e:
29 print(f"An error occurred: {e}")
30 return {"AUTH": False, "Reason": "Unable to verify token"}
31
32 # Checks if the token ID (jti) is valid and not revoked in the database
33 def check_token_validity(self, token):
34 try:
35 connection = SQLC.SQLConAdmin()
36 connection.connect()
37 connection.execute_query(SQLQ.SQLQueries.use_users_database())
38 result = connection.execute_query(SQLQ.SQLQueries.check_token_validity(token))
39 if result and result[0][0]: #query returns a boolean or similar to indicate validity
40 return {"AUTH": True}
41 else:
42 return {"AUTH": False, "Reason": "Token invalid or revoked"}
43 except Exception as e:
44 print(f"An error occurred while checking token validity: {e}")
45 return {"AUTH": False, "Reason": "Database error during token check"}
46 finally:
47 if connection:
48 connection.close()
49
50 # Logs out the user by marking the token ID as revoked
51 def logout(self):
52 try:
53 user_identity = get_jwt_identity() # Get the identity from the token
54 user_id = user_identity["user_id"] # Extracting the user ID
55 self.revoke_tokens_by_user_id(user_id)
56 except Exception as e:
57 print(f"An error occurred while logging out: {e}")
58 return False
59 return True
60
61 # Revokes all tokens for a given user
62 def revoke_tokens_by_user_id(self, user_id):
63 try:
64 connection = SQLC.SQLConAdmin()
65 connection.connect()
66 connection.execute_query(SQLQ.SQLQueries.use_users_database())
67 connection.execute_query(SQLQ.SQLQueries.revoke_tokens_by_user_id(user_id))
68 connection.cnx.commit()
69 except Exception as e:
70 print(f"An error occurred while revoking tokens: {e}")
71 return False
72 finally:
73 if connection:
74 connection.close()
75 return True
76
77 # Stores the token ID in the database for tracking purposes
78 def store_token(self, user_id, jti):
79 try:
80 connection = SQLC.SQLConAdmin()
81 connection.connect()
82 connection.execute_query(SQLQ.SQLQueries.use_users_database())
83 connection.execute_query(SQLQ.SQLQueries.insert_token_id(jti, user_id))
84 connection.cnx.commit()
85 except Exception as e:
86 print(f"An error occurred while storing token: {e}")
87 return False
88 finally:
89 if connection:
90 connection.close()
91 return True
store_token(self, user_id, jti)
revoke_tokens_by_user_id(self, user_id)