personal finance control engine
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

499 lines
16 KiB

{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"test_file = 'OUROCARD_VISA_INFINITE-Ago_24.txt'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"with open('OUROCARD_VISA_INFINITE-Ago_24.txt', 'r') as reader:\n",
" data = reader.read()\n",
" print(data)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import re\n",
"\n",
"# Open the text file\n",
"with open('OUROCARD_VISA_INFINITE-Ago_24.txt', 'r') as file:\n",
" # Read the contents of the file\n",
" contents = file.read()\n",
"\n",
"# Define the regex pattern to match\n",
"pattern = r'\\d{2}\\.\\d{2}\\.\\d{4}.{23}.{14}.{2}\\s*\\d+,\\d{2}\\s*\\d+,\\d{2}'\n",
"\n",
"# Iterate over the lines that match the pattern\n",
"for matches in re.finditer(pattern, contents):\n",
" print(matches.group())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import re\n",
"\n",
"# Open the text file\n",
"with open('OUROCARD_VISA_INFINITE-Ago_24.txt', 'r') as file:\n",
" # Read the contents of the file\n",
" contents = file.read()\n",
"\n",
"# Define the regex pattern to match\n",
"pattern = r'.*DANIEL.*|.*IZABELY.*'\n",
"\n",
"# Iterate over the lines that match the pattern\n",
"for matches in re.finditer(pattern, contents):\n",
" print(matches.group())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import re\n",
"\n",
"# Open the text file\n",
"with open('OUROCARD_VISA_INFINITE-Ago_24.txt', 'r') as file:\n",
" # Read the contents of the file\n",
" contents = file.read()\n",
"\n",
"# Define the regex patterns\n",
"dan_pattern = r'*DANIEL.*'\n",
"iza_pattern = r'.*IZABELY.*'\n",
"line_pattern = r'\\d{2}\\.\\d{2}\\.\\d{4}.{23}.{14}.{2}\\s*\\d+,\\d{2}\\s*\\d+,\\d{2}'\n",
"\n",
"# Iterate over the lines that match the pattern\n",
"for matches in re.finditer(line_pattern, contents):\n",
" print(matches.group())\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Open the text file\n",
"with open('table-test.txt', 'r') as file:\n",
" # Read the contents of the file\n",
" contents = file.readlines()\n",
"\n",
"# Initialize lists to store the lines under each table\n",
"table_a_lines = []\n",
"table_b_lines = []\n",
"\n",
"# Flag to determine which table section we are in\n",
"current_table = None\n",
"\n",
"# Iterate over the lines in the file\n",
"for line in contents:\n",
" line = line.strip() # Remove leading and trailing whitespace\n",
"\n",
" # Check for TABLEA and TABLEB\n",
" if line == 'TABLEA':\n",
" current_table = 'TABLEA'\n",
" elif line == 'TABLEB':\n",
" current_table = 'TABLEB'\n",
" else:\n",
" # Add lines to the appropriate list based on the current table\n",
" if current_table == 'TABLEA':\n",
" table_a_lines.append(line)\n",
" elif current_table == 'TABLEB':\n",
" table_b_lines.append(line)\n",
"\n",
"# Print the results\n",
"print('Lines under TABLEA:')\n",
"for data in table_a_lines:\n",
" print(data)\n",
"\n",
"print('\\nLines under TABLEB:')\n",
"for data in table_b_lines:\n",
" print(data)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import re\n",
"from datetime import date, datetime\n",
"import locale\n",
"\n",
"locale.setlocale(locale.LC_ALL, 'pt_BR.UTF-8')\n",
"\n",
"# Open the text file\n",
"with open('OUROCARD_VISA_INFINITE-Ago_24.txt', 'r', encoding='latin') as file:\n",
" # Read the contents of the file\n",
" contents = file.readlines()\n",
"\n",
"# Define the regex patterns\n",
"dan_pattern = r'1 - DANIEL.*'\n",
"iza_pattern = r'4 - IZABELY.*'\n",
"line_pattern = r'\\d{2}\\.\\d{2}\\.\\d{4}.{23}.{14}.{2}\\s*\\d+,\\d{2}\\s*\\d+,\\d{2}'\n",
"line_group_pattern = r'(\\d{2})\\.(\\d{2})\\.(\\d{4})(.{23})(.{14})(.{2})(\\s*\\d+,\\d{2})(\\s*\\d+,\\d{2})'\n",
"\n",
"# Lists\n",
"list_dan = []\n",
"list_iza = []\n",
"current_list = None\n",
"\n",
"insert_bulk = []\n",
"\n",
"# Iterate all lines\n",
"for line in contents:\n",
" line = line.strip()\n",
" if re.match(dan_pattern, line):\n",
" current_list = 'list_dan'\n",
" print('found Dan')\n",
" elif re.match(iza_pattern, line):\n",
" current_list = 'list_iza'\n",
" print('found Iza')\n",
" else:\n",
" if re.match(line_pattern, line):\n",
" if current_list == 'list_dan':\n",
" print(\"dan\", line)\n",
" list_dan.append(line)\n",
" if current_list == 'list_iza':\n",
" print(\"iza\", line)\n",
" list_iza.append(line)\n",
"\n",
"print('list_dan - tuples for insert')\n",
"for item in list_dan:\n",
" matches = re.search(line_group_pattern, item)\n",
" tTdate = str(date(int(matches.group(3)), int(matches.group(2)), int(matches.group(1))))\n",
" tAccount = 1\n",
" tMemo = matches.group(4)\n",
" tCity = matches.group(5)\n",
" tCountry = matches.group(6)\n",
" tOutflow = matches.group(7).strip().replace(',', '.')\n",
" tInflow = matches.group(8).strip().replace(',', '.')\n",
" tOwner = 1\n",
" tInstallments = 1\n",
" tCreated = str(datetime.now(tz=None))\n",
" tUpdated = None\n",
" insert_bulk.append(( tTdate, tAccount, tMemo, tCity, tCountry, tOutflow, tInflow, tOwner, tInstallments, tCreated, tUpdated ))\n",
"\n",
"print('list_dan - tuples for insert')\n",
"for item in list_iza:\n",
" matches = re.search(line_group_pattern, item)\n",
" tTdate = str(date(int(matches.group(3)), int(matches.group(2)), int(matches.group(1))))\n",
" tAccount = 1\n",
" tMemo = matches.group(4)\n",
" tCity = matches.group(5)\n",
" tCountry = matches.group(6)\n",
" tOutflow = matches.group(7).strip().replace(',', '.')\n",
" tInflow = matches.group(8).strip().replace(',', '.')\n",
" tOwner = 2\n",
" tInstallments = 1\n",
" tCreated = str(datetime.now(tz=None))\n",
" tUpdated = None\n",
" insert_bulk.append(( tTdate, tAccount, tMemo, tCity, tCountry, tOutflow, tInflow, tOwner, tInstallments, tCreated, tUpdated ))\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"insert_query = \"INSERT INTO default.TRANSACTION (TDATE, ACCOUNTID, MEMO, CITY, COUNTRY, OUTFLOW, INFLOW, OWNERID, INSTALLMENT_NR, INSTALLMENT_TT, CREATED, UPDATED) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s )\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def create_lists():\n",
" import re\n",
"\n",
" # Open the text file\n",
" with open(\"OUROCARD_VISA_INFINITE-Ago_24.txt\", \"r\", encoding=\"latin\") as file:\n",
" # Read the contents of the file\n",
" contents = file.readlines()\n",
"\n",
" # Define the regex patterns\n",
" owner_pattern = r\"\\d\\s*-\\s*(\\w+)\"\n",
" line_pattern = r\"\\d{2}\\.\\d{2}\\.\\d{4}.{23}.{14}.{2}\\s*-?\\d*\\.?\\d+,\\d{2}\\s*\\d+,\\d{2}\"\n",
" payment_pattern = (r\"\\d{2}\\.\\d{2}\\.\\d{4}PGTO.*200211(\\s*-?\\d*\\.?\\d+,\\d{2})(\\s*\\d+,\\d{2})\")\n",
"\n",
" # Lists\n",
" current_list = None\n",
" owner_list = []\n",
" result = {}\n",
"\n",
" silly_counter = 1\n",
"\n",
" # Find Owners\n",
" for line in contents:\n",
" line = line.strip()\n",
"\n",
" found_owners = re.findall(owner_pattern, line)\n",
" if found_owners:\n",
" for owner_name in found_owners:\n",
" list_name = f\"list_{owner_name.lower()}\"\n",
" owner_list.append(list_name)\n",
" result[list_name] = {}\n",
" result[list_name][\"owner_name\"] = owner_name\n",
" result[list_name][\"owner_id\"] = silly_counter\n",
" silly_counter = silly_counter + 1\n",
"\n",
" for line in contents:\n",
" line = line.strip()\n",
"\n",
" if re.match(owner_pattern, line):\n",
" found_owner = re.match(owner_pattern, line)\n",
" owner_list = f\"list_{found_owner.group(1).lower()}\"\n",
" current_list = owner_list\n",
" result[current_list][\"tlist\"] = []\n",
" else:\n",
" if re.match(payment_pattern, line):\n",
" result[current_list][\"tlist\"].append(line)\n",
" elif re.match(line_pattern, line):\n",
" result[current_list][\"tlist\"].append(line)\n",
"\n",
" return result"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(create_lists())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def build_insert(input_dict: dict, account: int):\n",
" from datetime import date, datetime\n",
" import re\n",
"\n",
" insert_bulk = []\n",
" line_group_pattern = r\"(\\d{2})\\.(\\d{2})\\.(\\d{4})((.+PARC (\\d+.)\\/(\\d+))(\\s.{12})|(.{23})(.{14}))(.{2})(\\s*-?\\d*\\.?\\d+,\\d{2})(\\s*\\d*\\.?\\d+,\\d{2})\"\n",
" payment_pattern = r\"(\\d{2})\\.(\\d{2})\\.(\\d{4})(PGTO DEBITO CONTA).*200211(\\s*-?\\d*\\.?\\d+,\\d{2})(\\s*\\d+,\\d{2})\"\n",
"\n",
" for key in input_dict:\n",
" for item in input_dict[key][\"tlist\"]:\n",
" # * check for payment\n",
" matches = re.match(payment_pattern, item)\n",
" if matches:\n",
" tTdate = str(\n",
" date(\n",
" int(matches.group(3)),\n",
" int(matches.group(2)),\n",
" int(matches.group(1)),\n",
" )\n",
" )\n",
" tAccount = account\n",
" tMemo = matches.group(4)\n",
" tCity = None\n",
" tCountry = None\n",
" tOutflow = None\n",
" tInflow = matches.group(5).strip().replace(\".\", \"\").replace(\",\", \".\")\n",
" tOwner = input_dict[key][\"owner_id\"]\n",
" tInstallmentNr = None\n",
" tInstallmentTt = None\n",
" tCreated = str(datetime.now(tz=None))\n",
" tUpdated = None\n",
" else:\n",
" matches = re.match(line_group_pattern, item)\n",
" tTdate = str(\n",
" date(\n",
" int(matches.group(3)),\n",
" int(matches.group(2)),\n",
" int(matches.group(1)),\n",
" )\n",
" )\n",
" tAccount = account\n",
"\n",
" # * check for Installments\n",
" if matches.group(5):\n",
" tMemo = matches.group(5)\n",
" tCity = matches.group(8)\n",
" tInstallmentNr = int(matches.group(6))\n",
" tInstallmentTt = int(matches.group(7))\n",
" else:\n",
" tMemo = matches.group(9)\n",
" tCity = matches.group(10)\n",
" tInstallmentNr = 1\n",
" tInstallmentTt = None\n",
"\n",
" tCountry = matches.group(11)\n",
" tOutflow = matches.group(12).strip().replace(\".\", \"\").replace(\",\", \".\")\n",
" tInflow = matches.group(13).strip().replace(\".\", \"\").replace(\",\", \".\")\n",
" tOwner = input_dict[key][\"owner_id\"]\n",
"\n",
" tCreated = str(datetime.now(tz=None))\n",
" tUpdated = None\n",
" insert_bulk.append(\n",
" (\n",
" tTdate,\n",
" tAccount,\n",
" tMemo,\n",
" tCity,\n",
" tCountry,\n",
" tOutflow,\n",
" tInflow,\n",
" tOwner,\n",
" tInstallmentNr,\n",
" tInstallmentTt,\n",
" tCreated,\n",
" tUpdated,\n",
" )\n",
" )\n",
"\n",
" return insert_bulk"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def db_insert(insert_bulk: list[tuple]):\n",
" from mysql.connector import connect, Error\n",
"\n",
" try:\n",
" with connect(\n",
" host=\"localhost\",\n",
" user=\"root\",\n",
" password=\"pleasehashapasswordomg\",\n",
" database=\"default\",\n",
" ) as connection:\n",
" print(\"CONNECTED!\", connection)\n",
" with connection.cursor() as cursor:\n",
" cursor.executemany(insert_query, insert_bulk)\n",
" connection.commit()\n",
" print(\"DONE!\")\n",
" except Error as e:\n",
" print(e)\n",
" finally:\n",
" connection.close()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"db_insert(build_insert(create_lists(), 1))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"build_insert(create_lists(), 1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"create_lists()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dictTest = {\n",
" \"owner1\": {\n",
" \"owner_label\": \"foo\",\n",
" \"owner_id\": 1,\n",
" \"list1\": [\"thingies, thingies, 42\"],\n",
" },\n",
" \"owner2\": {\n",
" \"owner_label\": \"bar\",\n",
" \"owner_id\": 2,\n",
" \"list1\": [\"thingies, thingies, 42\"],\n",
" },\n",
"}\n",
"\n",
"for owner in dictTest:\n",
" print(dictTest[owner][\"owner_id\"], dictTest[owner][\"owner_label\"])\n",
" for item in dictTest[owner][\"list1\"]:\n",
" print(item)\n",
"\n",
"dictTest[\"owner1\"][\"owner_label\"] = \"yadda\"\n",
"\n",
"for owner in dictTest:\n",
" print(dictTest[owner][\"owner_id\"], dictTest[owner][\"owner_label\"])\n",
" for item in dictTest[owner][\"list1\"]:\n",
" print(item)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"param1 = \"foo\"\n",
"param2 = \"bar\"\n",
"testy = {}\n",
"testy[param1] = {}\n",
"testy[param1][param2] = [\"what\", \"when\", \"why\"]\n",
"testy[param1][\"number\"] = 1\n",
"\n",
"print(testy)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}