You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
254 lines
8.3 KiB
254 lines
8.3 KiB
# etl.py |
|
#* ETL of Banco do Brasil Credit cards invoices, partial or full. |
|
|
|
import os |
|
from dotenv import load_dotenv |
|
|
|
load_dotenv() |
|
|
|
insert_query = "INSERT IGNORE INTO default.TRANSACTION (TDATE, ACCOUNTID, MEMO, CITY, COUNTRY, OUTFLOW, INFLOW, OWNERID, INSTALLMENT_NR, INSTALLMENT_TT, CREATED, UPDATED) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s )" |
|
input_file = os.getenv("INPUT_FILE") |
|
|
|
|
|
def find_owner(queried_name: str): |
|
from mysql.connector import connect, Error |
|
|
|
query = "SELECT * FROM OWNER" |
|
result = [] |
|
|
|
try: |
|
with connect( |
|
host="localhost", |
|
user="root", |
|
password="pleasehashapasswordomg", |
|
database="default", |
|
) as connection: |
|
# print("CONNECTED!", connection) |
|
with connection.cursor() as cursor: |
|
cursor.execute(query) |
|
query_result = cursor.fetchall() |
|
|
|
for item in query_result: |
|
if item[1].lower() == queried_name: |
|
result.append(item[0]) |
|
result.append(item[1]) |
|
# print(result) |
|
# print("DONE!") |
|
except Error as e: |
|
print(e) |
|
finally: |
|
connection.close() |
|
|
|
return result if result else None |
|
|
|
|
|
find_owner("daniel")[0] |
|
|
|
|
|
# OUROCARD_VISA_INFINITE-Próxima_Fatura.txt |
|
# OUROCARD_VISA_INFINITE-Ago_24.txt |
|
def create_lists(): |
|
import re |
|
|
|
# Open the text file |
|
# with open("OUROCARD_VISA_INFINITE-Próxima_Fatura.txt", "r", encoding="latin") as file: |
|
with open("OUROCARD_VISA_INFINITE-Ago_24.txt", "r", encoding="latin") as file: |
|
# Read the contents of the file |
|
contents = file.readlines() |
|
|
|
# Define the regex patterns |
|
owner_pattern = r"\d\s?-\s?([A-Z]+)" |
|
line_pattern = r"\d{2}\.\d{2}\.\d{4}.{23}.{14}.{2}\s*-?\d*\.?\d+,\d{2}\s*\d+,\d{2}" |
|
payment_pattern = ( |
|
r"\d{2}\.\d{2}\.\d{4}PGTO.*200211(\s*-?\d*\.?\d+,\d{2})(\s*\d+,\d{2})" |
|
) |
|
partial_invoice_line_pattern = ( |
|
r"\d{2}\/\d{2}.{27}.{16}.{2}\s+\s*-?\d*\.?\d+,\d{2}\s*\d+,\d{2}" |
|
) |
|
|
|
# Lists |
|
current_list = None |
|
owner_list = [] |
|
result = {} |
|
|
|
# silly_counter = 1 |
|
isPartial = True |
|
|
|
# Find Owners |
|
try: |
|
for line in contents: |
|
line = line.strip() |
|
|
|
found_owners = re.findall(owner_pattern, line) |
|
if found_owners: |
|
for owner_name in found_owners: |
|
list_name = f"list_{owner_name.lower()}" |
|
owner_list.append(list_name) |
|
result[list_name] = {} |
|
result[list_name]["owner_name"] = owner_name |
|
result[list_name]["owner_id"] = find_owner(owner_name.lower())[0] |
|
except: |
|
print("Error during owner search") |
|
|
|
# Treat and create transaction lists |
|
try: |
|
for line in contents: |
|
line = line.strip() |
|
|
|
if re.match(owner_pattern, line): |
|
found_owner = re.match(owner_pattern, line) |
|
owner_list = f"list_{found_owner.group(1).lower()}" |
|
current_list = owner_list |
|
result[current_list]["tlist"] = [] |
|
else: |
|
if re.match(payment_pattern, line): |
|
result[current_list]["tlist"].append(line) |
|
elif re.match(line_pattern, line) or re.match( |
|
partial_invoice_line_pattern, line |
|
): |
|
result[current_list]["tlist"].append(line) |
|
except: |
|
print("Error during Transaction Lists creation") |
|
|
|
# Check file pattern |
|
sample = result[current_list]["tlist"][0] |
|
if re.match(line_pattern, sample): |
|
isPartial = False |
|
|
|
for listObj in result: |
|
result[listObj]["isPartial"] = isPartial |
|
|
|
return result |
|
|
|
|
|
def build_insert(input_dict: dict, account: int): |
|
from datetime import date, datetime |
|
import re |
|
import hashlib |
|
|
|
insert_bulk = [] |
|
|
|
# RegEx Patterns |
|
line_group_pattern = r"(?P<day>\d{2})\.(?P<month>\d{2})\.(?P<year>\d{4})(?:(?P<p_memo>.+PARC (?P<p_nr>\d+.)\/(?P<p_tt>\d+)\s.{12})|(?P<memo>.{37}))(?P<country>.{2})(?P<outflow>\s*-?\d*\.?\d+,\d{2})(?P<inflow>\s*\d*\.?\d+,\d{2})" |
|
partial_invoice_group_pattern = r"(?P<day>\d{2})\/(?P<month>\d{2})(?:(?P<p_memo>.+PARC (?P<p_nr>\d{2})\/(?P<p_tt>\d{2}).{15})|(?P<memo>.{43}))(?P<country>.{2})(?P<outflow>\s+\s*-?\d*\.?\d+,\d{2})(?P<inflow>\s*\d+,\d{2})" |
|
payment_pattern = r"(?P<day>\d{2})\.(?P<month>\d{2})\.(?P<year>\d{4})(?P<memo>PGTO DEBITO CONTA).*200211(?P<inflow>\s*-?\d*\.?\d+,\d{2})(?P<outflow>\s*\d+,\d{2})" |
|
|
|
for key in input_dict: |
|
if input_dict[key]["isPartial"]: |
|
pattern_to_use = partial_invoice_group_pattern |
|
else: |
|
pattern_to_use = line_group_pattern |
|
|
|
for item in input_dict[key]["tlist"]: |
|
# check for payment |
|
matches = re.match(payment_pattern, item) |
|
if matches: |
|
tTdate = str( |
|
date( |
|
int(matches.group("year")), |
|
int(matches.group("month")), |
|
int(matches.group("day")), |
|
) |
|
) |
|
tAccount = account |
|
tMemo = matches.group("memo") |
|
tCountry = None |
|
tOutflow = "0.00" |
|
tInflow = ( |
|
matches.group("inflow") |
|
.strip() |
|
.replace(".", "") |
|
.replace(",", ".") |
|
.replace("-", "") |
|
) |
|
tOwner = input_dict[key]["owner_id"] |
|
tInstallmentNr = None |
|
tInstallmentTt = None |
|
tCreated = str(datetime.now(tz=None)) |
|
tUpdated = None |
|
else: |
|
matches = re.match(pattern_to_use, item) |
|
tTdate = str( |
|
date( |
|
# partial files will not have the year data on transactions |
|
( |
|
int(matches.group("year")) |
|
if pattern_to_use == line_group_pattern |
|
else datetime.now().year |
|
), |
|
int(matches.group("month")), |
|
int(matches.group("day")), |
|
) |
|
) |
|
|
|
tAccount = account |
|
|
|
tMemo = ( |
|
matches.group("p_memo") |
|
if matches.group("p_memo") |
|
else matches.group("memo") |
|
) |
|
tInstallmentNr = ( |
|
int(matches.group("p_nr")) if matches.group("p_nr") else None |
|
) |
|
tInstallmentTt = ( |
|
int(matches.group("p_tt")) if matches.group("p_tt") else None |
|
) |
|
|
|
tCountry = matches.group("country") |
|
tOutflow = ( |
|
matches.group("outflow").strip().replace(".", "").replace(",", ".") |
|
) |
|
tInflow = ( |
|
matches.group("inflow").strip().replace(".", "").replace(",", ".") |
|
) |
|
tOwner = input_dict[key]["owner_id"] |
|
|
|
tCreated = str(datetime.now(tz=None)) |
|
tUpdated = None |
|
|
|
preHash = tTdate + tMemo + tOutflow + tInflow |
|
tId = hashlib.sha256(preHash.encode()).hexdigest() |
|
|
|
insert_bulk.append( |
|
( |
|
tId, |
|
tTdate, |
|
tAccount, |
|
tMemo, |
|
tCountry, |
|
tOutflow, |
|
tInflow, |
|
tOwner, |
|
tInstallmentNr, |
|
tInstallmentTt, |
|
tCreated, |
|
tUpdated, |
|
) |
|
) |
|
|
|
return insert_bulk |
|
|
|
|
|
def db_insert(insert_bulk: list[tuple]): |
|
from mysql.connector import connect, Error |
|
|
|
try: |
|
with connect( |
|
host="localhost", |
|
user="root", |
|
password="pleasehashapasswordomg", |
|
database="default", |
|
) as connection: |
|
print("CONNECTED!", connection) |
|
with connection.cursor() as cursor: |
|
cursor.executemany(insert_query, insert_bulk) |
|
connection.commit() |
|
print("DONE!") |
|
except Error as e: |
|
print(e) |
|
finally: |
|
connection.close() |
|
|
|
|
|
db_insert(build_insert(create_lists(), 1))
|
|
|