| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332 |
- import calendar
- import os.path
- from pypdf import PdfReader
- from pypdf.errors import PdfReadError
- import sys
- import logging
- from datetime import datetime
- import csv
- logger = logging.getLogger(__name__)
- ch = logging.StreamHandler()
- ch.setLevel(logging.DEBUG)
- stream_formatter = logging.Formatter('%(levelname)s - %(message)s')
- ch.setFormatter(stream_formatter)
- logger.addHandler(ch)
- logging.basicConfig(filename='log.txt', encoding='utf-8', format='%(asctime)s:%(levelname)s:%(message)s',
- datefmt='%m/%d/%y %H:%M:%S', level=logging.DEBUG)
- # class paycheck:
- #
- # def __init__(self):
- # # TODO - add variables to be returned or obtained in code
- # pass
- #
- # pass
- def loadfiles(pointer, folder):
- # Take a file or a folder as an input and load all applicable filenames into a list
- # return a list
- list_files = []
- if folder: # input is a folder
- logger.debug("Loading folder: %s", os.path.basename(pointer))
- for file in os.listdir(pointer):
- list_files.append(os.path.join(pointer, file))
- elif not folder: # input is a file
- list_files.append(pointer)
- logger.debug("Loading file: %s", os.path.basename(pointer))
- return list_files
- def checkargs(args):
- list_file = []
- list_folder = []
- for arg in args[1:]:
- if os.path.isfile(arg):
- list_file.append(arg)
- elif os.path.isdir(arg):
- list_folder.append(arg)
- if arg == "-debug":
- global debug
- debug = True
- # print(dict_file["folder"])
- if not len(list_file) == 1 and not len(list_folder) == 1:
- raise Exception("Error parsing filename or folder name.")
- if len(list_file) == 1 and len(list_folder) == 1:
- raise Exception("Both filename and folder provided.")
- if list_file:
- files = loadfiles(list_file[0], False)
- elif list_folder:
- files = loadfiles(list_folder[0], True)
- else:
- raise Exception("Error sending file or folder to loadfiles.")
- logging.debug('checkargs complete with no errors')
- return (files)
- def paycheckv2_parser(lines, filename):
- ### This will parse paychecks using the post Jul 2022 formar
- keywords = ['Current', 'BANK OF AMERICA', 'Earnings', 'TFP']
- dict_result = {}
- dict_tfp = {}
- dict_result['Filename'] = filename
- dict_result["Check Date"] = datetime.strptime(lines[2].split()[8], "%m/%d/%Y")
- dict_result["ParserVersion"] = "V2"
- for line in lines:
- # print(line)
- dict_result["month"] = dict_result["Check Date"].strftime('%m')
- if line.find(keywords[0]) == 0:
- dict_result["Gross Pay"] = line.split()[1]
- if line.find(keywords[1]) >= 0:
- dict_result["BoA Deposit"] = line.split()[6]
- if line.find(keywords[2]) == 0 and len(line.split()) > 1:
- dict_result["Earnings"] = line.split()[1]
- if line.find(keywords[3]) >= 0 and not line.find('TFP/Hrs') >= 0:
- ## Only displays for second paycheck of the period
- ## check if third from last entry is a number of some sort. This shows it is a current pay item
- tfp_line = line.split()
- if len(tfp_line) >= 5:
- if tfp_line[-3].find('.') >= 0 and not tfp_line[-1] == "Taxes":
- desc = ' '.join(tfp_line[:-3])
- amt = tfp_line[-3][:-6]
- dict_tfp[desc] = amt
- dict_result["TFP"] = dict_tfp
- if tfp_line[-1] == "Taxes" and tfp_line[-4].find('.') >= 0:
- desc = tfp_line[0]
- amt = tfp_line[-4][:-6]
- dict_tfp[desc] = amt
- dict_result["TFP"] = dict_tfp
- total_tfp = float(0)
- for tfp in dict_tfp.items():
- total_tfp += float(tfp[1])
- dict_result["Total TFP"] = round(total_tfp, 2)
- # print(dict_result)
- return dict_result
- def paycheckv1_parser(lines, filename, linetwo):
- # Check Date
- # Gross Pay
- # BoA Deposit
- # earnings
- # TFP
- dict_result = {}
- dict_tfp = {}
- dateline_num = 2
- grosspayline_num = 4
- if linetwo == "45":
- dateline_num += 1
- grosspayline_num += 1
- # print(lines[3])
- dateline = datetime.strptime(list(filter(None, lines[dateline_num].split(' ')))[5], '%m/%d/%Y').month
- # print(dateline)
- earningsline = list(filter(None, lines[grosspayline_num].split(' ')))
- # print(earningsline)
- # print(filename, earningsline[earningsline.index('=')+1])
- dict_result['Filename'] = filename
- dict_result["Check Date"] = datetime.strptime(list(filter(None, lines[dateline_num].split(' ')))[5], "%m/%d/%Y")
- dict_result['month'] = datetime.strptime(list(filter(None, lines[dateline_num].split(' ')))[5], "%m/%d/%Y").month
- dict_result["ParserVersion"] = "V1"
- dict_result["Gross Pay"] = list(filter(None, lines[grosspayline_num].split(' ')))[0]
- dict_result["Earnings"] = earningsline[earningsline.index('=') + 1]
- dict_result["BoA Deposit"] = float(0.0)
- start = 0
- end = 0
- for i, line in enumerate(lines):
- # print(line)
- if line.find("BANK OF AMERICA") >= 0:
- dict_result["BoA Deposit"] = list(filter(None, line.split(' ')))[-1]
- if line.find("Gross") >= 0 and line.find("Hrs") >= 0:
- # print("START")
- start = i
- if line.find('Total Gross Pay') >= 0:
- # print("END")
- end = i
- for line in lines[start + 1:end - 1]:
- split_line = line.split(' ')
- desc = (line[:26].strip())
- # print(desc)
- dataline = line[26:].split(' ')
- tfp = 0
- if len(dataline[0]) > 0 and not desc == "Pilot Per Diem DAY":
- tfp = float(dataline[0])
- # print(desc, tfp)
- if tfp > 0:
- dict_tfp[desc] = tfp
- dict_result["TFP"] = dict_tfp
- total_tfp = float(0)
- for tfp in dict_tfp.items():
- total_tfp += float(tfp[1])
- dict_result["Total TFP"] = round(total_tfp, 2)
- # print(dict_result)
- return dict_result
- def calculate_result(list_result):
- if not type(list_result) is list:
- raise Exception("calculate_result requires a list of dictionaries, even if one item.")
- if len(list_result) > 1:
- list_result = consolodateMonths(list_result)
- result = []
- logging.debug('Length of calculate_result results:' + str(len(result)))
- for resulty in list_result:
- # print(resulty)
- if resulty['Total BoA'] > 0 and resulty['Total Earnings'] > 0:
- try:
- resulty['BoA to Earnings'] = resulty['Total BoA'] / resulty['Total Earnings']
- resulty['BoA to TFP'] = resulty['Total BoA'] / resulty['Total TFP']
- resulty['Total Deductions'] = resulty['Gross Pay'] - resulty['Total Earnings']
- except ZeroDivisionError:
- logging.warning("Zero Division Error")
- except:
- logging.warning('Unable to calculate totals. Something went wrong.')
- result.append(resulty)
- logging.debug('calculate_result complete')
- return(result)
- def consolodateMonths(list_results):
- # Takes list with all data and builds a new list with all paychecks for each month combined and totaled
- list_monthdata = []
- # print(list_results)
- # print(datetime.strftime(list_results[0]['Check Date'], '%Y'))
- for month in range(1, 13):
- monthdata = {}
- monthdata['Gross Pay'] = 0
- monthdata['Total TFP'] = 0
- monthdata['Total BoA'] = 0
- monthdata['Total Earnings'] = 0
- monthdata['Month'] = int(month)
- monthdata['Year'] = datetime.strftime(list_results[0]['Check Date'], '%Y')
- for i, check in enumerate(list_results):
- if int(check['Check Date'].strftime('%m').lstrip('0')) == month:
- # print(check)
- monthdata['Gross Pay'] += float(check['Gross Pay'].replace(',', ''))
- monthdata['Total TFP'] += float(check['Total TFP'])
- if check['BoA Deposit'] != float(0.0):
- monthdata['Total BoA'] += float(check['BoA Deposit'].replace(',', ''))
- monthdata['Total Earnings'] += float(check['Earnings'].replace(',', ''))
- list_monthdata.append(monthdata)
- # print(list_monthdata)
- return list_monthdata
- def parse_pdf(filename): # Takes only one fielname, not a list!
- if type(filename) == list:
- raise Exception("parse_pdf accepts only one input, not a list")
- list_result = []
- # for file in filename:
- templines = []
- #print(file)
- #Test if file is valid pdf file
- try:
- reader = PdfReader(filename)
- except PdfReadError:
- raise Exception(filename, "is not a valid PDF file.")
- pages = reader.pages
- # print(pages)
- for i in range(len(pages)):
- templines.append(pages[i].extract_text().split('\n'))
- if len(pages) == 1:
- lines = templines[0]
- elif len(pages) == 2:
- lines = templines[0] + templines[1]
- else:
- raise Exception('Paycheck', i, ' appears to be greater than 2 pages. Unable to Parse')
- # If lines[1] == 81, then its post July 2022 (new format)
- # If lines[1] == about 121, then is pre July 2022 (old format)
- # print(os.path.basename(filename))
- # print(len(lines[1]), lines[1])
- #
- # Check for signs of paycheck versioning
- if len(lines[1]) == 81:
- result = paycheckv2_parser(lines, filename)
- logging.debug('Using Parser v2.')
- elif len(lines[1]) == 122:
- result = paycheckv1_parser(lines, filename, "122")
- logging.debug('Using Parser v1 with line 2 length 122.')
- elif len(lines[1]) == 45:
- result = paycheckv1_parser(lines, filename, "45")
- logging.debug('Using Parser v1 with line 2 lenght 45')
- else:
- logger.warning("Unable to determine paycheck format for %s. Check logs for details. Line 2 length: %s" % (os.path.basename(filename), str(len(lines[1]))))
- raise Exception("Unable to determine paycheck format for %s. Check logs for details. Line 2 length: %s" % (os.path.basename(filename), str(len(lines[1]))))
- logging.debug('ParsePDF Complete')
- return result
- # Begin Application
- logging.warning('***Starting Application***')
- files = checkargs(sys.argv) # Get Arguments from Command Line, check user's request, and return a list of files to
- result = []
- for i, file in enumerate(files):
- if os.path.isfile(file): # Ignore folders such as W2 or other statements
- print(i, file)
- result.append(parse_pdf(file))
- logging.debug('results: ' +str( len(result)))
- full_result = calculate_result(result)
- for i in full_result:
- print(i)
- file_name = full_result[0]['Year']
- with open(f'{file_name}.csv', 'w', newline='') as f:
- w = csv.writer(f)
- w.writerow(full_result[0].keys())
- for row in full_result:
- w.writerow(row.values())
- f.close
|