Rapportsystem API - of D03N
Hovedprosjekt FiV Programmering 20-24
Loading...
Searching...
No Matches
dataExtractor.py
Go to the documentation of this file.
1from SQLAdminConnections import SQL_AdminConnector as SQLC
2from SQLAdminConnections import SQL_AdminQuerys as SQLQ
3from datetime import datetime, date, time, timedelta
4from decimal import Decimal
5
6from flask_jwt_extended import get_jwt_identity
7
8#Defines the class data_extractor
10 def __init__(self):
11 pass
12
13 def extractTableDescription(self, email):
14 try:
15 #changes @ and . in email to _
16 email = email.replace("@", "_").replace(".", "_")
17
18 #Dict for storing data
19 result_dict = {}
20
21 #Connects to the database server
22 connection = SQLC.SQLConAdmin()
23 connection.connect()
24
25 #Uses the database
26 query = SQLQ.SQLQueries.use_database("db_" + email)
27 connection.execute_query(query)
28
29 #Gets all tablenames from the database
30 show_tables_query = SQLQ.SQLQueries.show_tables()
31 all_tables = connection.execute_query(show_tables_query)
32
33 #Gets tablenames and description from the database
34 for table in all_tables:
35 #Gets the current table
36 current_table = table[0]
37
38 #Adds the data to the dictionary
39 table_description = connection.execute_query(SQLQ.SQLQueries.getTableDescription(current_table))
40
41 #Convert datetime objects to strings in the description
42 table_description = {col[0]: str(col[1]) if isinstance(col[1], (datetime, date, time, timedelta, Decimal)) else col[1] for col in table_description}
43
44 #Adds the data to the dictionary
45 result_dict[current_table] = table_description
46
47 # Error handling
48 except Exception as e:
49 print(e)
50 connection.cnx.close()
51 connection.close()
52 return {"Error": "Error when extracting table description from the database for table: " + current_table}
53
54 # Returns closes the connection and returns the data
55 finally:
56 connection.cnx.close()
57 connection.close()
58 return {"Tables": result_dict}
59
60
61 # Gets data from the database
62 def extractData(self, email):
63 try:
64 #changes @ and . in email to _
65 email = email.replace("@", "_").replace(".", "_")
66
67 #Dict for storing data
68 result_dict = {}
69
70 #Connects to the database server
71 connection = SQLC.SQLConAdmin()
72 connection.connect()
73
74 #Uses the database
75 query = SQLQ.SQLQueries.use_database("db_" + email)
76 connection.execute_query(query)
77
78 #Gets all tablenames from the database
79 show_tables_query = SQLQ.SQLQueries.show_tables()
80 all_tables = connection.execute_query(show_tables_query)
81
82 #Gets tablenames and data from the database
83 for table in all_tables:
84 #Gets the current table
85 current_table = table[0]
86
87 #Adds the data to the dictionary
88 table_description = connection.execute_query(SQLQ.SQLQueries.getTableDescription(current_table))
89
90 #Convert datetime objects to strings in the description
91 table_description = {col[0]: str(col[1]) if isinstance(col[1], (datetime, date, time, timedelta, Decimal)) else col[1] for col in table_description}
92
93 #Gets all data from current table
94 data_in_tables = connection.execute_query(SQLQ.SQLQueries.getAllFromTable(current_table))
95
96 #Convert datetime objects to strings in the data
97 data_in_tables = [dict(zip(table_description.keys(), (str(value) if isinstance(value, (datetime, date, time, timedelta, Decimal)) else value for value in row))) for row in data_in_tables]
98
99 #Adds the data to the dictionary
100 result_dict[current_table] = {"Table_description": table_description, "Data": data_in_tables}
101
102 # Error handling
103 except Exception as e:
104 print(e)
105 connection.cnx.close()
106 connection.close()
107 return {"Error": "Error when extracting data from the database for table: " + current_table}
108
109 # Returns closes the connection and returns the data
110 finally:
111 connection.cnx.close()
112 connection.close()
113 return {"Tables": result_dict}
114
115 def _parse_date(self, date_str):
116 try:
117 return datetime.strptime(date_str, "%Y-%m-%d")
118 except ValueError:
119 return None
120
121 def extractGivenTable(self, email, table_name, start_date=None, stop_date=None, num_rows=None):
122 connection = None # Initialize connection variable to None
123 try:
124 dbName= email.replace("@", "_").replace(".", "_")
125 result_dict = {}
126
127 userDatabaseLogin = get_jwt_identity()
128
129 # Define connection variable first
130 connection = SQLC.SQLConAdmin(None, userDatabaseLogin['email'], userDatabaseLogin['password'], userDatabaseLogin['db_name'])
131 connection.connect()
132 print("Connected to the database")
133
134 # Use the specified database
135 use_database_query = SQLQ.SQLQueries.use_database(self.getDatabaseName(email))
136 connection.execute_query(use_database_query)
137
138 # Get the data from the table
139 query = SQLQ.SQLQueries.getAllFromTable(table_name)
140 if start_date and stop_date:
141 # Konverter start_date og stop_date til datetime-objekter
142 start_datetime = datetime.strptime(start_date, '%Y-%m-%d')
143 stop_datetime = datetime.strptime(stop_date, '%Y-%m-%d')
144 query = SQLQ.SQLQueries.get_data_between_dates(table_name, start_datetime, stop_datetime)
145 # If num_rows is provided, get the first num_rows rows from the table
146 elif num_rows:
147 query = SQLQ.SQLQueries.limit_query(table_name, num_rows)
148
149 # Get the data from the table
150 data_in_table = connection.execute_query(query)
151
152 # Get the table description
153 table_description = connection.execute_query(SQLQ.SQLQueries.getTableDescription(table_name))
154 table_description = {col[0]: str(col[1]) if isinstance(col[1], (datetime, date, time, timedelta, Decimal)) else col[1] for col in table_description}
155
156 # Convert datetime objects to strings in the data
157 data_in_table = [dict(zip(table_description.keys(), (str(value) if isinstance(value, (datetime, date, time, timedelta, Decimal)) else value for value in row))) for row in data_in_table]
158
159 # Add the data to the dictionary
160 result_dict[table_name] = {"Data": data_in_table}
161
162 except Exception as e:
163 # Log the exception for better error tracking
164 print(e)
165 return {"Error": f"Error when extracting data from the database for table: {table_name}"}
166
167 finally:
168 # Close the connection if it's not None
169 if connection:
170 print("Closing the connection")
171 connection.cnx.close()
172 connection.close()
173
174 return {"requested_data": result_dict}
175
176 # Extracts the database name from the user
177 def getDatabaseName(self,email):
178 try:
179 connection = SQLC.SQLConAdmin()
180 connection.connect()
181
182 #Uses the database
183 query = SQLQ.SQLQueries.use_users_database()
184 connection.execute_query(query)
185
186 #Gets the database name from the email
187 query = SQLQ.SQLQueries.get_database_name(email)
188 dbName = connection.execute_query(query)
189 userDataBaseName = dbName[0][0]
190 connection.cnx.close()
191 connection.close()
192
193 except Exception as e:
194 print(e)
195 connection.cnx.close()
196 connection.close()
197 return {"Error": "Error when extracting database name from the database for user: " + email}
198
199 finally:
200 connection.cnx.close()
201 connection.close()
202
203 return userDataBaseName
204
205 def getAllSubUsers(self):
206 try:
207 current_user = get_jwt_identity()
208 email = current_user['email']
209
210 #Dict for storing data
211 result_dict = {}
212
213 #Connects to the database server
214 connection = SQLC.SQLConAdmin()
215 connection.connect()
216
217 #Uses the database
218 query = SQLQ.SQLQueries.use_users_database()
219 connection.execute_query(query)
220
221 #Gets all subusers from the database
222 query = SQLQ.SQLQueries.get_all_sub_users(email)
223 all_subusers = connection.execute_query(query)
224
225 #Adds the data to the dictionary
226 result_dict["Subusers"] = all_subusers
227
228 except Exception as e:
229 print(e)
230 connection.cnx.close()
231 connection.close()
232 return {"Error": "Error when extracting subusers from the database for user: " + email}
233
234 finally:
235 connection.cnx.close()
236 connection.close()
237 return result_dict
extractGivenTable(self, email, table_name, start_date=None, stop_date=None, num_rows=None)