Rapportsystem API - of D03N
Hovedprosjekt FiV Programmering 20-24
Loading...
Searching...
No Matches
login.py
Go to the documentation of this file.
1#imports necessary modules
2from flask_restx import Resource
3from flask import request
4from mysql.connector import Error
5import httpagentparser as parser
6from flask_jwt_extended import create_access_token
7import os
8import sys
9current_directory = os.getcwd()
10sys.path.append(os.path.join(current_directory))
11
12#Imports custom modules
13from authorization import login_validation as login_auth
14from Models import user_model as UM
15from SQLAdminConnections import SQL_AdminConnector as SQLC
16from SQLAdminConnections import SQL_AdminQuerys as SQLQ
17from USER_session import tokenHandler as TH
18
19# Login route
21 tokenHandler = TH.UserTokenHandler()
22
23 @ns.route('/login')
24 class login(Resource):
25 #Sets model for swagger
26 new_login_model = UM.login_model(ns)
27 #Documentation for swagger UI
28 @ns.doc('login', description='Logs user in when given Username and Password...',
29 responses={200: 'OK',
30 400: 'Invalid Argument or faulty data',
31 500: 'Internal server error'})
32
33 #Validates input
34 @ns.expect(new_login_model, validate=True)
35 def post(self):
36 data = request.get_json()
37 username = data["username"].lower()
38 password = data["password"]
39
40 #Validates credentials
41 login_validation = login_auth.loginValidation(username, password).validate_credentials()
42 user_exists, user_id, user_accountType = login_validation
43
44 #Checks if user exists
45 if user_exists:
46
47 connection = SQLC.SQLConAdmin()
48 connection.connect()
49 #Deletes old tokens for user
50 connection.execute_query(SQLQ.SQLQueries.use_users_database())
51 connection.execute_query(SQLQ.SQLQueries.delete_tokens_by_user_id(user_id))
52 connection.cnx.commit()
53
54 #Get database name
55 connection.execute_query(SQLQ.SQLQueries.use_users_database())
56 query = connection.execute_query(SQLQ.SQLQueries.get_database_name(username.lower()))
57
58 database_name = query[0][0]
59
60 #Get key from db
61 connection.execute_query(SQLQ.SQLQueries.use_users_database())
62 query = connection.execute_query(SQLQ.SQLQueries.get_pw(data["username"].lower()))
63
64 #Generates token for user
65 access_token = create_access_token(identity={'user_id': user_id, 'email': username, 'accountType': user_accountType, 'password': password,"db_name": database_name})
66
67 #Stores token in database
68 tokenHandler.store_token(user_id, access_token)
69
70 #saves user activity to database
71 save_activity(user_id)
72
73 #Temp-stores user info in current_user
74 current_user = {
75 "user_id": user_id,
76 "email": username,
77 "accountType": user_accountType
78 }
79
80 #Returns token and user info
81 return {"message": "Log-in successful", "user": current_user, "access_token": access_token}, 200
82 #Returns error if user does not exist
83 return {"Error": "Invalid username or password"}, 400
84
85#Function for saving activity to database
86def save_activity(user_id):
87 tokenHandler = TH.UserTokenHandler()
88
89 #Checks if user has old activity - deletes 30 days old activity
90 check_old_activity(user_id)
91
92 #Gets ip address, browser name and operating system
93 ip_address = request.environ.get('HTTP_X_REAL_IP', request.remote_addr)
94 user_agent = request.headers.get('User-Agent')
95 parsed_agent = parser.detect(user_agent)
96 browser_name = parsed_agent.get('browser', {}).get('name', 'Unknown')
97 operating_system = parsed_agent.get('os', {}).get('name', 'Unknown')
98
99 try:
100 #connect to database
101 connection = SQLC.SQLConAdmin()
102 connection.connect()
103
104 #Checking old activitys
105 check_old_activity(user_id)
106
107 connection.execute_query(SQLQ.SQLQueries.use_users_database())
108
109 #save activity to database
110 connection.execute_query(SQLQ.SQLQueries.insert_user_activity(user_id, ip_address, browser_name, operating_system))
111 connection.cnx.commit()
112
113 except Error as e:
114 print("Error while saving activity to database.", e)
115 finally:
116 connection.close()
117
118#Function for checking and deleting old activity
120 try:
121 # Connect to the database
122 connection = SQLC.SQLConAdmin()
123 connection.connect()
124 # Use the 'users' database
125 connection.execute_query(SQLQ.SQLQueries.use_users_database())
126
127 # Count the number of activities for the user
128 result = connection.execute_query(SQLQ.SQLQueries.count_user_activities(user_id))
129 activity_count = result[0][0] if result else 0
130
131 # If there are more than 5 activities, fetch and delete the oldest one
132 if activity_count > 4:
133 result = connection.execute_query(SQLQ.SQLQueries.get_oldest_activity_id(user_id))
134 oldest_activity_id = result[0][0] if result else None
135
136 if oldest_activity_id:
137 connection.execute_query(SQLQ.SQLQueries.delete_activity_by_id(oldest_activity_id))
138
139 # Commit the changes
140 connection.cnx.commit()
141
142 except Error as e:
143 print("Error while checking and deleting old activity.", e)
144 finally:
145 connection.close()