| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524 |
- #!/usr/bin/python3
- import mysql.connector
- import requests
- from bs4 import BeautifulSoup
- import urllib.parse
- import datetime
- import custom_email
- from tabulate import tabulate
- from configparser import ConfigParser
- from os import path
- import logging
- from email.mime.text import MIMEText
- ### TO DO ###
- #
- # Check if db entries no longer appear online (mark expired)
- # When checking online from various sites, check if address already exists in db
- # - if so, warn user and do not add
- # Check results against database for changes
- # - update and add/change date_modified
- # Add database column to hold parcel number. Make links to GIS servers
- #
- #############
- class Property:
- """Description of a proerty"""
- def __init__(self, site_name, type, MLS, address, city, st, zip, county, price, acres, title='', sqft=0, bedrooms=0,
- baths=0, description='', link=''):
- self.site_name = site_name
- self.type = type
- self.MLS = MLS
- self.title = title
- self.sqft = sqft
- self.bedrooms = bedrooms
- self.baths = baths
- self.address = address
- self.city = city
- self.st = st
- self.zip = zip
- self.county = county
- self.price = price
- self.acres = acres
- self.description = description
- self.link = link
- class Parameters:
- '''Parameters taken from config file'''
- def __init__(self, file='landsearch.conf'):
- # print(path.dirname(__file__))
- self.file = path.join(path.dirname(__file__), file)
- if not path.exists(self.file):
- raise FileNotFoundError("The config file cannot be opened", self.file)
- try:
- self.config = ConfigParser()
- self.config.read(self.file)
- self.search_params = self.config['Search']
- self.log_params = self.config['Logging']
- self.email_params = self.config['Email']
- except Exception as err:
- print(err, "Using default search Parameters")
- class Mylogger:
- ''' Logging tool for this session'''
- def __init__(self):
- log_params = Parameters().log_params
- filename = log_params.get('log_file')
- level = int(log_params.get('logging_level', str('30')))
- format = '%(asctime)s %(levelname)-8s %(message)s'
- datefmt = '%Y-%m-%d %H:%M:%S'
- class Search:
- '''Universal Search Criteria'''
- def checktype(self, attribute):
- '''Fixes string None in config file and converts to '' '''
- if not attribute == 'None':
- return attribute
- else:
- return ''
- def __init__(self, file='landsearch.conf'):
- params = Parameters()
- search_params = params.search_params
- log_params = params.log_params
- logging.basicConfig(filename=log_params.get('log_file'),
- level=int(log_params.get('logging_level', str('30'))),
- format='%(asctime)s %(levelname)-8s %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S') ## Default log level WARNING (30)
- logging.getLogger("urllib3").setLevel(logging.WARNING) ## Supress Requests method logging
- logging.debug("Log level set to %s", logging.root.level)
- Mylogger()
- county = search_params.get('county', ['Gwinnett', 'Hall', 'Jackson', 'Walton', 'Barrow'])
- if isinstance(county, str):
- county = county.split(", ")
- type = search_params.get('type', ['farm', 'house', 'land'])
- if isinstance(type, str):
- type = type.split(", ")
- self.types = ['land', 'farm', 'home', 'house']
- self.county = county
- self.lower_price = self.checktype(search_params.get('lower_price', '0'))
- self.upper_price = self.checktype(search_params.get('upper_price', '525000'))
- self.lower_acres = self.checktype(search_params.get('lower_acres', '5'))
- self.upper_acres = self.checktype(search_params.get('upper_acres', '15'))
- self.type = type ##accept list!
- self.lower_sqft = self.checktype(search_params.get('lower_sqft', ''))
- self.upper_sqft = self.checktype(search_params.get('upper_sqft', ''))
- self.lower_bedrooms = self.checktype(search_params.get('lower_bedrooms', ''))
- self.upper_bedrooms = self.checktype(search_params.get('upper_bedrooms', ''))
- for property_type in self.type:
- assert property_type in self.types, (
- "Unknown type '" + property_type + "'. Property Type must be of type: " + str(self.types))
- ## FOR TESTING, PRINT ALL ATTRIBUTES OF SEARCH ##
- logging.info(vars(self))
- class ImproperSearchError(Exception):
- def __init__(self, search, message="Improper Search. Must use instance of Search class"):
- self.search = search
- self.message = message
- super().__init__(self.message)
- class MLSDATA:
- """Fetches and stores MLS Data
- Currently only supports GeorgiaMLS.com (GMLS)"""
- counties = ['Gwinnett', 'Barrow', 'Hall', 'Jackson', 'Walton']
- GoogleAPIKey = 'AIzaSyAXAnpBtjv760W8YIPqKZ0dFXpwAaZN7Es'
- # live_google = False
- def __init__(self, mlstype):
- self.parameters = Parameters()
- self.help = "This is a class that will retrieve MLS data from various sources, store the info in a database, and run queries on the data."
- self.mlstype = mlstype.lower() ## Determines what kind of data is to be retreieve (gmls, Zillow, etc)
- self.cursor = ''
- self.cnx = ''
- self.new_listings = []
- self.email = self.parameters.search_params.getboolean('email')
- self.live_google = self.parameters.search_params.getboolean('live_google')
- self.recipients = self.parameters.email_params['recipients']
- print('Email ' + str(self.email))
- def stringbuilder(self, search: Search, county):
- """ Takes Search class and build appropriate URL query based on mlstype. Currently only supports gmls."""
- if self.mlstype == 'gmls':
- base_addr = 'https://www.georgiamls.com/real-estate/search-action.cfm?'
- params = [('cnty', county),
- ('lpl', search.lower_price), ('lph', search.upper_price),
- ('acresL', search.lower_acres), ('acresH', search.upper_acres),
- ('sqftl', search.lower_sqft), ('sqfth', search.upper_sqft),
- ('orderBy', 'b'),
- ('scat', '1'),
- ('sdsp', 'g')]
- for type in search.type:
- if 'land' in type.lower():
- params.append(('typ', 'll'))
- if 'farm' in type.lower():
- params.append(('typ', 'af'))
- if 'home' in type.lower():
- params.append(('typ', 'sd'))
- if 'house' in type.lower():
- params.append(('typ', 'sd'))
- search_string = base_addr + urllib.parse.urlencode(params)
- print(search_string)
- logging.debug(search_string)
- return search_string
- def break_address(self, address):
- """Takes an address string in the form 'street address|city, state zip' and returns a list"""
- street = address[:address.find('|')]
- csz = address[address.find('|') + 1:]
- city = csz[:csz.find(',')]
- st = csz[csz.find(',') + 1:].split(' ')[1]
- zip = csz[csz.find(',') + 1:].split(' ')[2]
- split_address = [street, city, st, zip]
- return split_address
- def gmlsparser(self, URL, county, pages=''):
- """ Retrieve the website for georgiamls.com and returns a list of Property objects.
- UNIQUE TO GEORGIAMLS.COM ONLY!!"""
- properties_list = []
- r = requests.get(URL)
- soup = BeautifulSoup(r.content, 'html5lib')
- if pages == '':
- try:
- pages = soup.find("div", {'class': 'small listing-pagination-count'}).getText().strip().split(" ")[-1]
- current_page = \
- soup.find("div", {'class': 'small listing-pagination-count'}).getText().strip().split(" ")[-3]
- except AttributeError as err:
- print("No Results Found.")
- return
- else:
- print('pages already set to: ' + str(pages))
- for page in range(0, int(pages)):
- print('Processing Page: ' + str(page + 1) + ' of ' + str(pages))
- if not page == 0:
- next_URL = URL + '&start=' + str(((12 * page) + 1))
- soup = BeautifulSoup(requests.get(next_URL).content, 'html5lib')
- raw_listings = soup.findAll("div", {'class': 'col-xs-12 col-sm-6 col-lg-4 text-center listing-gallery'})
- for listing in raw_listings:
- items = listing.findAll("p") ##
- site_name = self.mlstype
- MLS = " ".join(items[3].getText().strip()[6:15].split()) ## MLS NUMBER
- title = '' ## Listing Title (address if no title)
- price = items[0].string.strip() ## Price
- if self.mlstype == 'gmls':
- link = 'https://www.georgiamls.com' + listing.a['href']
- detail_request = requests.get(link)
- detail_soup = BeautifulSoup(detail_request.content, 'html5lib')
- details = detail_soup.findAll('tr')
- bedbath = details[1].findAll('td')[1].getText().strip().split('/')
- br = bedbath[0][:-3]
- ba = bedbath[1][:-3]
- baths = ba ## IF House is present
- bedrooms = br ## IF House is present
- address = ''
- for element in details:
- if 'sqft' in element.getText():
- sqft = element.findAll('td')[1].getText().strip()[:-5].replace(',', '')
- if 'lot size' in element.getText().lower():
- acres = element.findAll('td')[1].getText().strip()[:-6]
- if 'Property Type' in element.getText():
- ptype = element.findAll('td')[1].getText().strip()
- if 'acreage' in ptype.lower():
- type = 'af'
- elif 'land lot' in ptype.lower():
- type = 'll'
- elif 'single family home' in ptype.lower():
- type = 'sf'
- else:
- type = 'unknown'
- if 'Address' in element.getText():
- if not address: # Prevents finding the word 'address' elsewhere in the listings
- address = element.findAll('td')[1]
- # 7 print("TEST ADDRESS: ", element)
- street_address = list(address)[0].strip()
- csz = list(address)[2].strip()
- split_address = self.break_address(street_address + '|' + csz)
- description = detail_soup.find('div', {'id': 'listing-remarks'}).getText().strip().replace('\t', '')
- data = Property(site_name=self.mlstype,
- type=type,
- MLS=MLS,
- bedrooms=bedrooms,
- baths=baths,
- sqft=sqft,
- address=split_address[0],
- city=split_address[1].title(),
- st=split_address[2].upper(),
- zip=split_address[3],
- county=county.title(),
- price=price.replace('$', '').replace(',', ''),
- acres=acres,
- description=description,
- link=link)
- properties_list.append(data)
- logging.debug('Scanned: ' + data.address)
- print('Scanned: ' + data.address)
- return properties_list
- def getmlsdata(self, search: Search, county):
- """This is the main entrypoint. Takes arguments to pass to stringbuilder to create the URL.
- Selects appropriate parser based on self.mlstype from class intance.
- Needs any modifications from the standard search ($0 to $500,000, 5 to 15 acres, etc)
- See class search for more information.
- --> 9/1/20 - takes Search class as argument. All properties are handled by the class <--"""
- logging.info('getgmlsdata starting.')
- logging.debug('Scanning: ' + county + " county")
- if isinstance(search, Search):
- if not county in self.counties: ### FIX for lower()
- print("County " + county + " not regognized. Exiting")
- else:
- print("Scanning for results in " + county + " using the " + self.mlstype.upper() + " database.")
- if self.mlstype == 'gmls':
- list = self.gmlsparser(self.stringbuilder(search, county), county)
- logging.info(
- "Completed search in " + county + " county. " + str(len(list)) + " total properties scanned.")
- return list
- else:
- raise ImproperSearchError(search)
- def check_db(self, criteria_dict):
- """Check dictionary of critera against database.
- Currently accepts keys: MLS, title, address (street number/name, not city/state/zip).
- Returns True if records exists."""
- if not self.cursor: ## Check if DB is connected
- try:
- self.connect_db()
- logging.debug("No Database Connection. Connecting to DB in check_db function.")
- except Exception as err:
- print("Could not connect to Database. " + str(err))
- logging.warning("Could not connect to Database. " + str(err))
- return 0
- for criteria in criteria_dict:
- ## Determine criteria passed, and execute queries for each
- if criteria == 'MLS':
- self.cursor.execute("SELECT COUNT(*) FROM properties WHERE MLS = %(MLS)s GROUP BY id",
- {criteria: criteria_dict[criteria]})
- if self.cursor.rowcount > 0: return self.cursor.rowcount # stop for loop if match already found.
- elif criteria == 'title':
- self.cursor.execute("SELECT COUNT(*) FROM properties WHERE title = %(title)s GROUP BY id",
- {criteria: criteria_dict[criteria]})
- if self.cursor.rowcount > 0: return self.cursor.rowcount # stop for loop if match already found.
- elif criteria == 'address':
- self.cursor.execute("SELECT COUNT(*) FROM properties WHERE address = %(address)s GROUP BY id",
- {criteria: criteria_dict[criteria]})
- if self.cursor.rowcount > 0: return self.cursor.rowcount # stop for loop if match already found.
- else:
- print("Cannot search on parameter: " + criteria)
- return self.cursor.rowcount
- def get_google(self, property):
- """Supplies date from Google Distance Matrix API to populate
- distance_to_work
- time_to_work
- distance_to_school
- time_to_school
- Costs money, so it should only be called when inserting a new db record.
- Returns distance in METERS (1m = 0.000621371 mi) and time in SECONDS
- returns fully populated Propery object."""
- print("Fetching live Google Data. $$")
- logging.warning("Calling Google API. $$")
- destination1 = 'Hebron Christian Acadamy' ## Working query for Hebron Christian Acadamy
- destination2 = 'JHRJ+FJ Atlanta, Georgia' ## Plus code for Hourly parking at Int'l Terminal, KATL
- params = {}
- params['units'] = 'imperial'
- params['origins'] = property.address + ', ' + property.city + ' ' + property.st
- params['destinations'] = 'Hebron Christian Acadamy|JHRJ+FJ Atlanta, Georgia'
- params['key'] = self.GoogleAPIKey
- baseURL = 'https://maps.googleapis.com/maps/api/distancematrix/json?'
- API_URL = baseURL + urllib.parse.urlencode(params)
- # print(API_URL)
- # Send Request and capture result as json
- try:
- google_result = requests.get(API_URL).json()
- if google_result['status'] == 'OK':
- property.distance_to_school = google_result['rows'][0]['elements'][0]['distance']['value']
- property.time_to_school = google_result['rows'][0]['elements'][0]['duration']['value']
- property.distance_to_work = google_result['rows'][0]['elements'][1]['distance']['value']
- property.time_to_work = google_result['rows'][0]['elements'][1]['duration']['value']
- except:
- print("ERROR: Failed to obtain Google API data")
- def insertrecord(self, property, work_address=None, school_address=None):
- """Inserts record into database. Takes argument Property class object."""
- if not self.cursor:
- print("not self.cursor")
- logging.debug("MYSQL connection not established. Trying to connect...")
- try:
- self.connect_db()
- logging.debug("Connecting to DB in insertrecord fucntion.")
- except Exception as err:
- print("Could not connect to Database. " + str(err))
- logging.warning("Could not connect to Database. " + str(err))
- return
- if self.cursor:
- criteria_dict = property.__dict__
- criteria_dict['Date_Added'] = str(datetime.date.today())
- placeholder_columns = ", ".join(criteria_dict.keys())
- placeholder_values = ", ".join([":{0}".format(col) for col in criteria_dict.keys()])
- qry = "INSERT INTO properties ({placeholder_columns}) VALUES {placeholder_values}".format(
- placeholder_columns=placeholder_columns, placeholder_values=tuple(criteria_dict.values()))
- try:
- self.cursor.execute(qry)
- self.cnx.commit()
- print("Inserted " + criteria_dict['MLS'] + " | " + criteria_dict['address'] + " into database.")
- logging.info("Inserted " + criteria_dict['MLS'] + " | " + criteria_dict['address'] + " into database.")
- except Exception as e:
- print("Could not insert " + criteria_dict['address'] + " into database. Database connection error.")
- logging.warning("Could not insert " + criteria_dict['address'] + "into database. Database connection "
- "error.")
- logging.warning(str(e))
- else:
- print("Database is not connected or cursor not filled. Use function 'connectdb()' to establish")
- print(str(self.cursor))
- def connect_db(self, host='192.168.100.26', user='landsearchuser', password='1234', database='landsearch'):
- """Connects to database and returns a cursor object"""
- self.cnx = mysql.connector.connect(host=host, user=user, password=password, database=database, buffered=True)
- self.cursor = self.cnx.cursor()
- return self.cursor
- def close_db(self):
- """Cleanly close the db."""
- self.cursor.close()
- self.cnx.close()
- def db_insert(self, properties: list):
- """Inserts records into database. Takes list of Property class objects"""
- if not properties == None:
- if not isinstance(properties, list):
- raise TypeError('type list required')
- for property in properties:
- if not self.check_db({'MLS': property.MLS, 'address': property.address}):
- if self.live_google: self.get_google(
- property) ## <- This will populate distance and time fields if set TRUE
- else:
- print("NOT fetching google data. Suppressed by settings in landsearch.conf")
- logging.warning("NOT fetching google data for " + property.address + ". Suppressed by "
- "settings in "
- "landsearch.conf")
- self.insertrecord(property)
- self.new_listings.append(property)
- else:
- print(property.MLS + ' | ' + property.address + ' is already in db. Not inserted.')
- ##REMOVE FOR TESTING###
- # self.new_listings.append(property)
- #######################
- else:
- print("Empty dataset. No records to insert.")
- logging.info("Database Update Complete.")
- logging.info(str(len(self.new_listings)) + " new listings found.")
- def email_results(self):
- global mymail, html
- html = ''
- # sendto = ['M_Stagl@hotmail.com', 'stagl.mike@gmail.com']
- sendto = self.recipients.split(',')
- # print(type(self.recipients))
- # print(type(sendto))
- if self.email:
- ''' Send some kind of email! '''
- # If there are new listings, populate email ##
- if len(self.new_listings) > 0:
- body = ''
- data = []
- html = '<html><head></head><body>'
- html += '<p>Daily Real Estate Search Report.</p>'
- html += '<p>The following properties have been found which may be of interest:</p>'
- html += '<table>'
- subj = str(len(self.new_listings)) + " New Real Estate Listings for " + str(datetime.date.today())
- for listing in self.new_listings:
- row = []
- row.append(listing.MLS)
- row.append(listing.address)
- row.append('{:0,.2f}'.format(float(listing.acres)))
- row.append(listing.sqft)
- row.append('${:0,.0f}'.format(int(listing.price)))
- row.append(listing.time_to_school / 60 if hasattr(listing, 'time_to_school') else 'NA')
- row.append(listing.link)
- data.append(row)
- html += '<tr><td colspan=100%><hr></td></tr>'
- html += '<tr><th>MLS</th><td>' + listing.MLS + '</td></tr>'
- html += '<tr><th>Address</th><td>' + listing.address + '</td></tr>'
- html += '<tr><th>Acres</th><td>' + '{:0,.2f}'.format(float(listing.acres)) + '</td></tr>'
- html += '<tr><th>Time To School</th><td>' + (str(int(round(listing.time_to_school/60))) if hasattr(listing, 'time_to_school') else 'NA') + '</td></tr>'
- html += '<tr><th>Price</th><td>' + '${:0,.0f}'.format(int(listing.price)) + '</td></tr>'
- html += '<tr><th>Link</th><td><a href=' + listing.link + '>Link</a></td></tr>'
- body = """Daily Real Estate Search Report.\n
- The following properties have been found which may be of interest.\n
- """
- results = tabulate(data, headers=['MLS', 'Address', 'Acres', 'sqft', 'Price', 'Time to School', 'link'])
- body += results
- html += '<tr><td colspan=100%><hr></td></tr>'
- html += '</table></body></html>'
- # htmlformat = MIMEText(html, 'html')
- # mymail = custom_email.simplemail(subj, body, sendto)
- # mymail = custom_email.simplemail(subj, htmlformat, sendto)
- # print(body)
- # print(html)
- else:
- body = 'No new listings found'
- html = 'No new listings found.'
- subj = '0 New Real Estate Listings for ' + str(datetime.date.today())
- try:
- mymail = custom_email.simplemail(subj, body, sendto, html=html)
- # mymail = custom_email.simplemail(subj, body, sendto)
- mymail.sendhtml()
- print("Email sent.")
- logging.info('Emails sent to: ' + str(sendto))
- except Exception as e:
- print("Error sending email. " + str(e))
- logging.warning("Error sending email. " + str(e))
- else:
- print("Suppressing email based on landsearch.conf preferences.")
- logging.warning("Suppressing email based on landsearch.conf preferences.")
- if __name__ == '__main__':
- gmls = MLSDATA('GMLS') # Create MLSDATA object
- mysearch = Search() # Create a custom search object
- myresults = []
- ## Create function in MLSDATA module:
- # - takes counties from configparser and calls getmlsdata for each county.
- # - Compiles results into single list and returns that list
- # - User code would look something like this:
- # _ mysearch = Search()
- # _ mydata = gmls.findalllistings(mysearch) # This would control the looping of counties and return a list like normal
- # _ gmls.dbinsert(myresults) # This would automate db opening and closing
- for county in mysearch.county:
- print("local search: ", county)
- mysearch = Search() ## Search used to take county as parameter, so this loop would work. Now Search class contains list. loop must occur in getmlsdata module
- mydata = gmls.getmlsdata(mysearch, county)
- if mydata: # Avoids a crash is there is no data
- for listing in mydata:
- myresults.append(listing)
- #gmls.connectdb()
- gmls.db_insert(myresults)
- #gmls.closedb()
- gmls.email_results()
|