main.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. import calendar
  2. import os.path
  3. from pypdf import PdfReader
  4. from pypdf.errors import PdfReadError
  5. import sys
  6. import logging
  7. from datetime import datetime
  8. import csv
  9. logger = logging.getLogger(__name__)
  10. ch = logging.StreamHandler()
  11. ch.setLevel(logging.DEBUG)
  12. stream_formatter = logging.Formatter('%(levelname)s - %(message)s')
  13. ch.setFormatter(stream_formatter)
  14. logger.addHandler(ch)
  15. logging.basicConfig(filename='log.txt', encoding='utf-8', format='%(asctime)s:%(levelname)s:%(message)s',
  16. datefmt='%m/%d/%y %H:%M:%S', level=logging.DEBUG)
  17. # class paycheck:
  18. #
  19. # def __init__(self):
  20. # # TODO - add variables to be returned or obtained in code
  21. # pass
  22. #
  23. # pass
  24. def loadfiles(pointer, folder):
  25. # Take a file or a folder as an input and load all applicable filenames into a list
  26. # return a list
  27. list_files = []
  28. if folder: # input is a folder
  29. logger.debug("Loading folder: %s", os.path.basename(pointer))
  30. for file in os.listdir(pointer):
  31. list_files.append(os.path.join(pointer, file))
  32. elif not folder: # input is a file
  33. list_files.append(pointer)
  34. logger.debug("Loading file: %s", os.path.basename(pointer))
  35. return list_files
  36. def checkargs(args):
  37. list_file = []
  38. list_folder = []
  39. for arg in args[1:]:
  40. if os.path.isfile(arg):
  41. list_file.append(arg)
  42. elif os.path.isdir(arg):
  43. list_folder.append(arg)
  44. if arg == "-debug":
  45. global debug
  46. debug = True
  47. # print(dict_file["folder"])
  48. if not len(list_file) == 1 and not len(list_folder) == 1:
  49. raise Exception("Error parsing filename or folder name.")
  50. if len(list_file) == 1 and len(list_folder) == 1:
  51. raise Exception("Both filename and folder provided.")
  52. if list_file:
  53. files = loadfiles(list_file[0], False)
  54. elif list_folder:
  55. files = loadfiles(list_folder[0], True)
  56. else:
  57. raise Exception("Error sending file or folder to loadfiles.")
  58. logging.debug('checkargs complete with no errors')
  59. return (files)
  60. def paycheckv2_parser(lines, filename):
  61. ### This will parse paychecks using the post Jul 2022 formar
  62. keywords = ['Current', 'BANK OF AMERICA', 'Earnings', 'TFP']
  63. dict_result = {}
  64. dict_tfp = {}
  65. dict_result['Filename'] = filename
  66. dict_result["Check Date"] = datetime.strptime(lines[2].split()[8], "%m/%d/%Y")
  67. dict_result["ParserVersion"] = "V2"
  68. for line in lines:
  69. # print(line)
  70. dict_result["month"] = dict_result["Check Date"].strftime('%m')
  71. if line.find(keywords[0]) == 0:
  72. dict_result["Gross Pay"] = line.split()[1]
  73. if line.find(keywords[1]) >= 0:
  74. dict_result["BoA Deposit"] = line.split()[6]
  75. if line.find(keywords[2]) == 0 and len(line.split()) > 1:
  76. dict_result["Earnings"] = line.split()[1]
  77. if line.find(keywords[3]) >= 0 and not line.find('TFP/Hrs') >= 0:
  78. ## Only displays for second paycheck of the period
  79. ## check if third from last entry is a number of some sort. This shows it is a current pay item
  80. tfp_line = line.split()
  81. if len(tfp_line) >= 5:
  82. if tfp_line[-3].find('.') >= 0 and not tfp_line[-1] == "Taxes":
  83. desc = ' '.join(tfp_line[:-3])
  84. amt = tfp_line[-3][:-6]
  85. dict_tfp[desc] = amt
  86. dict_result["TFP"] = dict_tfp
  87. if tfp_line[-1] == "Taxes" and tfp_line[-4].find('.') >= 0:
  88. desc = tfp_line[0]
  89. amt = tfp_line[-4][:-6]
  90. dict_tfp[desc] = amt
  91. dict_result["TFP"] = dict_tfp
  92. total_tfp = float(0)
  93. for tfp in dict_tfp.items():
  94. total_tfp += float(tfp[1])
  95. dict_result["Total TFP"] = round(total_tfp, 2)
  96. # print(dict_result)
  97. return dict_result
  98. def paycheckv1_parser(lines, filename, linetwo):
  99. # Check Date
  100. # Gross Pay
  101. # BoA Deposit
  102. # earnings
  103. # TFP
  104. dict_result = {}
  105. dict_tfp = {}
  106. dateline_num = 2
  107. grosspayline_num = 4
  108. if linetwo == "45":
  109. dateline_num += 1
  110. grosspayline_num += 1
  111. # print(lines[3])
  112. dateline = datetime.strptime(list(filter(None, lines[dateline_num].split(' ')))[5], '%m/%d/%Y').month
  113. # print(dateline)
  114. earningsline = list(filter(None, lines[grosspayline_num].split(' ')))
  115. # print(earningsline)
  116. # print(filename, earningsline[earningsline.index('=')+1])
  117. dict_result['Filename'] = filename
  118. dict_result["Check Date"] = datetime.strptime(list(filter(None, lines[dateline_num].split(' ')))[5], "%m/%d/%Y")
  119. dict_result['month'] = datetime.strptime(list(filter(None, lines[dateline_num].split(' ')))[5], "%m/%d/%Y").month
  120. dict_result["ParserVersion"] = "V1"
  121. dict_result["Gross Pay"] = list(filter(None, lines[grosspayline_num].split(' ')))[0]
  122. dict_result["Earnings"] = earningsline[earningsline.index('=') + 1]
  123. dict_result["BoA Deposit"] = float(0.0)
  124. start = 0
  125. end = 0
  126. for i, line in enumerate(lines):
  127. # print(line)
  128. if line.find("BANK OF AMERICA") >= 0:
  129. dict_result["BoA Deposit"] = list(filter(None, line.split(' ')))[-1]
  130. if line.find("Gross") >= 0 and line.find("Hrs") >= 0:
  131. # print("START")
  132. start = i
  133. if line.find('Total Gross Pay') >= 0:
  134. # print("END")
  135. end = i
  136. for line in lines[start + 1:end - 1]:
  137. split_line = line.split(' ')
  138. desc = (line[:26].strip())
  139. # print(desc)
  140. dataline = line[26:].split(' ')
  141. tfp = 0
  142. if len(dataline[0]) > 0 and not desc == "Pilot Per Diem DAY":
  143. tfp = float(dataline[0])
  144. # print(desc, tfp)
  145. if tfp > 0:
  146. dict_tfp[desc] = tfp
  147. dict_result["TFP"] = dict_tfp
  148. total_tfp = float(0)
  149. for tfp in dict_tfp.items():
  150. total_tfp += float(tfp[1])
  151. dict_result["Total TFP"] = round(total_tfp, 2)
  152. # print(dict_result)
  153. return dict_result
  154. def calculate_result(list_result):
  155. if not type(list_result) is list:
  156. raise Exception("calculate_result requires a list of dictionaries, even if one item.")
  157. if len(list_result) > 1:
  158. list_result = consolodateMonths(list_result)
  159. result = []
  160. logging.debug('Length of calculate_result results:' + str(len(result)))
  161. for resulty in list_result:
  162. # print(resulty)
  163. if resulty['Total BoA'] > 0 and resulty['Total Earnings'] > 0:
  164. try:
  165. resulty['BoA to Earnings'] = resulty['Total BoA'] / resulty['Total Earnings']
  166. resulty['BoA to TFP'] = resulty['Total BoA'] / resulty['Total TFP']
  167. resulty['Total Deductions'] = resulty['Gross Pay'] - resulty['Total Earnings']
  168. except ZeroDivisionError:
  169. logging.warning("Zero Division Error")
  170. except:
  171. logging.warning('Unable to calculate totals. Something went wrong.')
  172. result.append(resulty)
  173. logging.debug('calculate_result complete')
  174. return(result)
  175. def consolodateMonths(list_results):
  176. # Takes list with all data and builds a new list with all paychecks for each month combined and totaled
  177. list_monthdata = []
  178. # print(list_results)
  179. # print(datetime.strftime(list_results[0]['Check Date'], '%Y'))
  180. for month in range(1, 13):
  181. monthdata = {}
  182. monthdata['Gross Pay'] = 0
  183. monthdata['Total TFP'] = 0
  184. monthdata['Total BoA'] = 0
  185. monthdata['Total Earnings'] = 0
  186. monthdata['Month'] = int(month)
  187. monthdata['Year'] = datetime.strftime(list_results[0]['Check Date'], '%Y')
  188. for i, check in enumerate(list_results):
  189. if int(check['Check Date'].strftime('%m').lstrip('0')) == month:
  190. # print(check)
  191. monthdata['Gross Pay'] += float(check['Gross Pay'].replace(',', ''))
  192. monthdata['Total TFP'] += float(check['Total TFP'])
  193. if check['BoA Deposit'] != float(0.0):
  194. monthdata['Total BoA'] += float(check['BoA Deposit'].replace(',', ''))
  195. monthdata['Total Earnings'] += float(check['Earnings'].replace(',', ''))
  196. list_monthdata.append(monthdata)
  197. # print(list_monthdata)
  198. return list_monthdata
  199. def parse_pdf(filename): # Takes only one fielname, not a list!
  200. if type(filename) == list:
  201. raise Exception("parse_pdf accepts only one input, not a list")
  202. list_result = []
  203. # for file in filename:
  204. templines = []
  205. #print(file)
  206. #Test if file is valid pdf file
  207. try:
  208. reader = PdfReader(filename)
  209. except PdfReadError:
  210. raise Exception(filename, "is not a valid PDF file.")
  211. pages = reader.pages
  212. # print(pages)
  213. for i in range(len(pages)):
  214. templines.append(pages[i].extract_text().split('\n'))
  215. if len(pages) == 1:
  216. lines = templines[0]
  217. elif len(pages) == 2:
  218. lines = templines[0] + templines[1]
  219. else:
  220. raise Exception('Paycheck', i, ' appears to be greater than 2 pages. Unable to Parse')
  221. # If lines[1] == 81, then its post July 2022 (new format)
  222. # If lines[1] == about 121, then is pre July 2022 (old format)
  223. # print(os.path.basename(filename))
  224. # print(len(lines[1]), lines[1])
  225. #
  226. # Check for signs of paycheck versioning
  227. if len(lines[1]) == 81:
  228. result = paycheckv2_parser(lines, filename)
  229. logging.debug('Using Parser v2.')
  230. elif len(lines[1]) == 122:
  231. result = paycheckv1_parser(lines, filename, "122")
  232. logging.debug('Using Parser v1 with line 2 length 122.')
  233. elif len(lines[1]) == 45:
  234. result = paycheckv1_parser(lines, filename, "45")
  235. logging.debug('Using Parser v1 with line 2 lenght 45')
  236. else:
  237. logger.warning("Unable to determine paycheck format for %s. Check logs for details. Line 2 length: %s" % (os.path.basename(filename), str(len(lines[1]))))
  238. raise Exception("Unable to determine paycheck format for %s. Check logs for details. Line 2 length: %s" % (os.path.basename(filename), str(len(lines[1]))))
  239. logging.debug('ParsePDF Complete')
  240. return result
  241. # Begin Application
  242. logging.warning('***Starting Application***')
  243. files = checkargs(sys.argv) # Get Arguments from Command Line, check user's request, and return a list of files to
  244. result = []
  245. for i, file in enumerate(files):
  246. if os.path.isfile(file): # Ignore folders such as W2 or other statements
  247. print(i, file)
  248. result.append(parse_pdf(file))
  249. logging.debug('results: ' +str( len(result)))
  250. full_result = calculate_result(result)
  251. for i in full_result:
  252. print(i)
  253. file_name = full_result[0]['Year']
  254. with open(f'{file_name}.csv', 'w', newline='') as f:
  255. w = csv.writer(f)
  256. w.writerow(full_result[0].keys())
  257. for row in full_result:
  258. w.writerow(row.values())
  259. f.close