#!/usr/bin/python3 import mysql.connector import requests from bs4 import BeautifulSoup import urllib.parse import re from sys import exit as exit import json import datetime import custom_email from tabulate import tabulate from configparser import ConfigParser from os import path import logging ### TO DO ### # # email results # allow this script to be called and work by itself (if __name__ == __main__) # Print useful reports (land only, house and land, etc) # Check if db entries no longer appear online (mark expired) # When checking online from various sites, check if address already exists in db # - if so, warn user and do not add # Add date_added to initial entries # Check results against database for changes # - update and add/change date_modified # Add argument to run update query when results.py is calles # Add database column to hold parcel number. Make links to GIS servers # # IDENTIFY NEW PROPERTIES!! # # Automate db opening and closing when calling dbinsert() # ############# class Property: """Description of a proerty""" def __init__ (self, site_name, type, MLS, address, city, st, zip, \ county, price, acres, title='', sqft=0, bedrooms=0, baths=0, description='', link=''): self.site_name = site_name self.type = type self.MLS = MLS self.title = title self.sqft = sqft self.bedrooms = bedrooms self.baths = baths self.address = address self.city = city self.st = st self.zip = zip self.county = county self.price = price self.acres = acres self.description = description self.link = link class Search: """Universal Search Criteria""" def checktype(self, attribute): if not attribute == 'None': return attribute else: return '' # def __init__(self, county: list, lower_price=0, upper_price=500000, \ # lower_acres=5, upper_acres=15, type=['farm','land','home'], lower_sqft='', upper_sqft='', \ # lower_bedrooms='', upper_bedrooms=''): def __init__(self, file = 'landsearch.conf'): self.file = file if not path.exists(self.file): raise FileNotFoundError("The config file cannot be opened", self.file) try: config = ConfigParser() config.read(self.file) search_params = config['Search'] log_params = config['Logging'] except FileNotFoundError as err: print(err, "Using default search parameters.") except Exception as err: print(err, "Using default search parameters.") logging.basicConfig(filename=log_params.get('log_file'), \ level=log_params.get('logging_level', 30), \ format='%(asctime)s %(levelname)-8s %(message)s', \ datefmt='%Y-%m-%d %H:%M:%S') ## Default log level WARNING (30) logging.getLogger("urllib3").setLevel(logging.WARNING) ## Supress Requests method logging logging.debug("Log level set to %s", logging.root.level) county = search_params.get('county', ['Gwinnett', 'Hall', 'Jackson', 'Walton', 'Barrow']) if isinstance(county, str): county = county.split(", ") type = search_params.get('type', ['farm', 'house', 'land']) if isinstance(type, str): type = type.split(", ") self.types=['land', 'farm', 'home', 'house'] self.county = county self.lower_price = self.checktype(search_params.get('lower_price', 0)) self.upper_price = self.checktype(search_params.get('upper_price', 525000)) self.lower_acres = self.checktype(search_params.get('lower_acres', 5)) self.upper_acres = self.checktype(search_params.get('upper_acres', 15)) self.type = type ##accept list! self.lower_sqft = self.checktype(search_params.get('lower_sqft', '')) self.upper_sqft = self.checktype(search_params.get('upper_sqft', '')) self.lower_bedrooms = self.checktype(search_params.get('lower_bedrooms', '')) self.upper_bedrooms = self.checktype(search_params.get('upper_bedrooms', '')) # self.lower_price = search_params.get('lower_price', 0) # self.upper_price = search_params.get('upper_price', 525000) # self.lower_acres = search_params.get('lower_acres', 5) # self.upper_acres = search_params.get('upper_acres', 15) # self.lower_sqft = search_params.get('lower_sqft', '') # self.upper_sqft = search_params.get('upper_sqft', '') # self.lower_bedrooms = search_params.get('lower_bedrooms', '') # self.upper_bedrooms = search_params.get('upper_bedrooms', '') for property_type in self.type: assert property_type in self.types, ("Unknown type '" + property_type + "'. Property Type must be of type: " + str(self.types)) ## FOR TESTING, PRINT ALL ATTRIBUTES OF SEARCH ## logging.debug(vars(self)) class ImproperSearchError(Exception): def __init__ (self, search, message="Improper Search. Must use instance of Search class"): self.search = search self.message = message super().__init__(self.message) class MLSDATA: """Fetches and stores MLS Data Currently only supports GeorgiaMLS.com (GMLS)""" counties=['Gwinnett', 'Barrow', 'Hall', 'Jackson', 'Walton'] GoogleAPIKey = 'AIzaSyAXAnpBtjv760W8YIPqKZ0dFXpwAaZN7Es' live_google = True def __init__ (self, mlstype): self.help = "This is a class that will retrieve MLS data from various sources, store the info in a database, and run queries on the data." self.mlstype = mlstype.lower() ## Determines what kind of data is to be retreieve (gmls, Zillow, etc) self.cursor = '' self.cnx = '' self.new_listings = [] def stringbuilder(self, search: Search, county): """ Takes Search class and build appropriate URL query based on mlstype. Currently only supports gmls.""" if self.mlstype == 'gmls': base_addr = 'https://www.georgiamls.com/real-estate/search-action.cfm?' params = [('cnty', county), \ ('lpl', search.lower_price), ('lph', search.upper_price), \ ('acresL', search.lower_acres), ('acresH', search.upper_acres), \ ('sqftl', search.lower_sqft), ('sqfth', search.upper_sqft), \ ('orderBy', 'b'), \ ('scat', '1'), \ ('sdsp', 'g')] for type in search.type: if 'land' in type.lower(): params.append(('typ', 'll')) if 'farm' in type.lower(): params.append(('typ', 'af')) if 'home' in type.lower(): params.append(('typ', 'sd')) if 'house' in type.lower(): params.append(('typ', 'sd')) search_string = base_addr + urllib.parse.urlencode(params) print(search_string) return search_string def break_address(self, address): """Takes an address string in the form 'street address|city, state zip' and returns a list""" street = address[:address.find('|')] csz = address[address.find('|')+1:] city = csz[:csz.find(',')] st = csz[csz.find(',')+1:].split(' ')[1] zip = csz[csz.find(',')+1:].split(' ')[2] split_address = [street, city, st, zip] return split_address def gmlsparser(self, URL, county, pages=''): """ Retrieve the website for georgiamls.com and returns a list of Property objects. UNIQUE TO GEORGIAMLS.COM ONLY!!""" properties_list = [] r = requests.get(URL) soup = BeautifulSoup(r.content, 'html5lib') if pages == '': try: pages = soup.find("div", {'class':'small listing-pagination-count'}).getText().strip().split(" ")[-1] current_page = soup.find("div", {'class':'small listing-pagination-count'}).getText().strip().split(" ")[-3] except AttributeError as err: print("No Results Found.") return else: print('pages already set to: ' + str(pages)) for page in range(0, int(pages)): print('Processing Page: ' + str(page + 1) + ' of ' + str(pages)) if not page == 0: next_URL = URL + '&start=' + str(((12*page)+1)) soup = BeautifulSoup(requests.get(next_URL).content, 'html5lib') raw_listings = soup.findAll("div", {'class':'col-xs-12 col-sm-6 col-lg-4 text-center listing-gallery'}) for listing in raw_listings: items = listing.findAll("p") ## site_name = self.mlstype MLS = " ".join(items[3].getText().strip()[6:15].split()) ## MLS NUMBER title = '' ## Listing Title (address if no title) price = items[0].string.strip() ## Price if self.mlstype == 'gmls': link = 'https://www.georgiamls.com' + listing.a['href'] detail_request = requests.get(link) detail_soup = BeautifulSoup(detail_request.content, 'html5lib') details = detail_soup.findAll('tr') bedbath = details[1].findAll('td')[1].getText().strip().split('/') br = bedbath[0][:-3] ba = bedbath[1][:-3] baths = ba ## IF House is present bedrooms = br ## IF House is present address = '' for element in details: if 'sqft' in element.getText(): sqft = element.findAll('td')[1].getText().strip()[:-5].replace(',','') if 'lot size' in element.getText().lower(): acres = element.findAll('td')[1].getText().strip()[:-6] if 'Property Type' in element.getText(): ptype = element.findAll('td')[1].getText().strip() if 'acreage' in ptype.lower(): type = 'af' elif 'land lot' in ptype.lower(): type = 'll' elif 'single family home' in ptype.lower(): type = 'sf' else: type = 'unknown' if 'Address' in element.getText(): if not address: #Prevents finding the word 'address' elsewhere in the listings address = element.findAll('td')[1] #7 print("TEST ADDRESS: ", element) street_address = list(address)[0].strip() csz = list(address)[2].strip() split_address = self.break_address(street_address + '|' + csz) description = detail_soup.find('div', {'id':'listing-remarks'}).getText().strip().replace('\t','') data = Property(site_name = self.mlstype, \ type = type, \ MLS = MLS, \ bedrooms = bedrooms, \ baths = baths, \ sqft = sqft, \ address = split_address[0], \ city = split_address[1].title(), \ st = split_address[2].upper(), \ zip = split_address[3], \ county = county.title(), \ price = price.replace('$','').replace(',',''), \ acres = acres, \ description = description, \ link = link) properties_list.append(data) print('Scanned: ' + data.address) return properties_list def getmlsdata(self, search: Search, county): """This is the main entrypoint. Takes arguments to pass to stringbuilder to create the URL. Selects appropriate parser based on self.mlstype from class intance. Needs any modifications from the standard search ($0 to $500,000, 5 to 15 acres, etc) See class search for more information. --> 9/1/20 - takes Search class as argument. All properties are handled by the class <--""" if isinstance(search, Search): ## # PROGRAM BREAKS HERE - Used to loop for each county, not Search class contains list of counties. Need to automate looping. ## if not county in self.counties: ### FIX for lower() print("County " + county + " not regognized. Exiting") else: print("Scanning for results in " + county + " using the " + self.mlstype.upper() + " database.") if self.mlstype == 'gmls': list = self.gmlsparser(self.stringbuilder(search, county), county) return list else: raise ImproperSearchError(search) def checkdb(self, criteria_dict): """Check dictionary of critera against database. Currently accepts keys: MLS, title, address (street number/name, not city/state/zip). Returns True if records exists.""" if self.cursor: ## Check if DB is connected for criteria in criteria_dict: ## Determine criteria passed, and execute queries for each if criteria == 'MLS': self.cursor.execute("SELECT COUNT(*) FROM properties WHERE MLS = %(MLS)s GROUP BY id", {criteria:criteria_dict[criteria]}) if self.cursor.rowcount > 0: return self.cursor.rowcount # stop for loop if match already found. elif criteria == 'title': self.cursor.execute("SELECT COUNT(*) FROM properties WHERE title = %(title)s GROUP BY id", {criteria:criteria_dict[criteria]}) if self.cursor.rowcount > 0: return self.cursor.rowcount # stop for loop if match already found. elif criteria == 'address': self.cursor.execute("SELECT COUNT(*) FROM properties WHERE address = %(address)s GROUP BY id", {criteria:criteria_dict[criteria]}) if self.cursor.rowcount > 0: return self.cursor.rowcount # stop for loop if match already found. else: print("Cannot search on parameter: " + criteria) return self.cursor.rowcount else: print("Database is not connected or cursor not filled. Use function 'connectdb()' to establish") def getGoogle(self, property): """Supplies date from Google Distance Matrix API to populate distance_to_work time_to_work distance_to_school time_to_school Costs money, so it should only be called when inserting a new db record. Returns distance in METERS (1m = 0.000621371 mi) and time in SECONDS returns fully populated Propery object.""" print("Fetching live Google Data. $$") # Build Request destination1 = 'Hebron Christian Acadamy' ## Working query for Hebron Christian Acadamy destination2 = 'JHRJ+FJ Atlanta, Georgia' ## Plus code for Hourly parking at Int'l Terminal, KATL params = {} params['units'] = 'imperial' params['origins'] = property.address + ', ' + property.city + ' ' + property.st params['destinations'] = 'Hebron Christian Acadamy|JHRJ+FJ Atlanta, Georgia' params['key'] = self.GoogleAPIKey baseURL = 'https://maps.googleapis.com/maps/api/distancematrix/json?' API_URL = baseURL + urllib.parse.urlencode(params) # print(API_URL) # Send Request and capture result as json try: google_result = requests.get(API_URL).json() if google_result['status'] == 'OK': property.distance_to_school = google_result['rows'][0]['elements'][0]['distance']['value'] property.time_to_school = google_result['rows'][0]['elements'][0]['duration']['value'] property.distance_to_work = google_result['rows'][0]['elements'][1]['distance']['value'] property.time_to_work = google_result['rows'][0]['elements'][1]['duration']['value'] except: print("ERROR: Failed to obtain Google API data") #Load sample data for testing: # with open('complex.json') as f: # data = json.load(f) # google_result = data ### end testing json ### def insertrecord(self, property, work_address=None, school_address=None): """Inserts record into database. Takes argument Property class object. FUTURE - add date_added field to insert operation.""" if self.cursor: criteria_dict = property.__dict__ criteria_dict['Date_Added'] = str(datetime.date.today()) placeholder_columns = ", ".join(criteria_dict.keys()) placeholder_values = ", ".join([":{0}".format(col) for col in criteria_dict.keys()]) qry = "INSERT INTO properties ({placeholder_columns}) VALUES {placeholder_values}".format(placeholder_columns=placeholder_columns, placeholder_values=tuple(criteria_dict.values())) self.cursor.execute(qry) self.cnx.commit() print("Inserted " + criteria_dict['MLS'] + " | " + criteria_dict['address'] + " into database.") else: print("Database is not connected or cursor not filled. Use function 'connectdb()' to establish") def connectdb(self, host='192.168.100.26', user='landsearchuser', password='1234', database='landsearch'): """Connects to database and returns a cursor object""" self.cnx = mysql.connector.connect(host=host, user=user, password=password, database=database, buffered=True) self.cursor = self.cnx.cursor() return self.cursor def closedb(self): """Cleanly close the db.""" self.cursor.close() self.cnx.close() def dbinsert(self, properties: list): """Inserts records into database. Takes list of Property class objects""" if not properties == None: if not isinstance(properties, list): raise TypeError('type list required') for property in properties: if not self.checkdb({'MLS': property.MLS, 'address': property.address}): if self.live_google: self.getGoogle(property) ## <- This will populate distance and time fields if set TRUE self.insertrecord(property) self.new_listings.append(property) else: print(property.MLS + ' | ' + property.address + ' is already in db. Not inserted.') ##REMOVE FOR TESTING### # self.new_listings.append(property) ####################### else: print("Empty dataset. No records to insert.") def alerts(self): pass def email(self): body = '' data = [] subj = "New Real Estate Listings for " + str(datetime.date.today()) for listing in self.new_listings: row = [] body += listing.MLS + " | " + listing.address + " | " + listing.acres + " | " + listing.price + " | " + listing.link + "\n" row.append(listing.MLS) row.append(listing.address) row.append('{:0,.2f}'.format(float(listing.acres))) row.append(listing.sqft) row.append('${:0,.0f}'.format(int(listing.price))) row.append(listing.time_to_school/60 if hasattr(listing, 'time_to_school') else 'NA') row.append(listing.link) data.append(row) body = """\ Daily Real Estate Search Report\n The following properties have been found which may be of interest.\n """ results = tabulate(data, headers=['MLS', 'Address', 'Acres', 'sqft', 'Price', 'Time to School', 'link']) body += results sendto = ['stagl.mike@gmail.com', 'M_Stagl@hotmail.com'] mymail = custom_email.simplemail(subj, body, sendto) if len(self.new_listings) > 0: try: mymail.sendmail() except Exception as e: print("Error sending email. " + e) else: print("No new listings. Email not sent") # REMOVE AFTER TESTING # mymail.sendmail() ######################## ########### BEGIN CODE ###############33 if __name__ == '__main__': gmls = MLSDATA('GMLS') # Create MLSDATA object mysearch = Search() # Create a custom search object # print(len(mysearch.county)) # print(mysearch.county[0]) myresults = [] ## Create function in MLSDATA module: # - takes counties from configparser and calls getmlsdata for each county. # - Compiles results into single list and returns that list # - User code would look something like this: # _ mysearch = Search() # _ mydata = gmls.findalllistings(mysearch) # This would control the looping of counties and return a list like normal # _ gmls.dbinsert(myresults) # This would automate db opening and closing for county in mysearch.county: print("local search: ", county) mysearch = Search() ## Search used to take county as parameter, so this loop would work. Now Search class contains list. loop must occur in getmlsdata module mydata = gmls.getmlsdata(mysearch, county) if mydata: #Avoids a crash is there is no data for listing in mydata: myresults.append(listing) # print(len(myresults)) # print(myresults[0].address) gmls.connectdb() gmls.dbinsert(myresults) gmls.closedb() # # gmls.email() # #print() #print(str(len(gmls.new_listings)) + " new properties found!") #print() #for listing in gmls.new_listings: # print(listing.MLS, listing.address) # gmls = MLSDATA('GMLS') # # #new_properties = [] # ## for county in ['Jackson']: ### FIX # for county in gmls.counties: ### FIX # mysearch = Search(county, type=['farm', 'house', 'land'], upper_price=525000) ### FIX # mydata = gmls.getmlsdata(mysearch) # # gmls.connectdb() # gmls.dbinsert(mydata) # gmls.closedb() # # gmls.email() # #print() #print(str(len(gmls.new_listings)) + " new properties found!") #print() #for listing in gmls.new_listings: # print(listing.MLS, listing.address) # #