| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446 |
- #!/usr/bin/python3
- import mysql.connector
- import requests
- from bs4 import BeautifulSoup
- import urllib.parse
- import re
- from sys import exit as exit
- import json
- import datetime
- import custom_email
- from tabulate import tabulate
- from configparser import ConfigParser
- from os import path
- ### TO DO ###
- #
- # email results
- # allow this script to be called and work by itself (if __name__ == __main__)
- # Print useful reports (land only, house and land, etc)
- # Check if db entries no longer appear online (mark expired)
- # When checking online from various sites, check if address already exists in db
- # - if so, warn user and do not add
- # Add date_added to initial entries
- # Check results against database for changes
- # - update and add/change date_modified
- # Add argument to run update query when results.py is calles
- # Add database column to hold parcel number. Make links to GIS servers
- #
- # IDENTIFY NEW PROPERTIES!!
- #
- # Automate db opening and closing when calling dbinsert()
- #
- #############
- class Property:
- """Description of a proerty"""
-
- def __init__ (self, site_name, type, MLS, address, city, st, zip, \
- county, price, acres, title='', sqft=0, bedrooms=0, baths=0, description='', link=''):
- self.site_name = site_name
- self.type = type
- self.MLS = MLS
- self.title = title
- self.sqft = sqft
- self.bedrooms = bedrooms
- self.baths = baths
- self.address = address
- self.city = city
- self.st = st
- self.zip = zip
- self.county = county
- self.price = price
- self.acres = acres
- self.description = description
- self.link = link
-
- class Search:
- """Universal Search Criteria"""
- # def __init__(self, county: list, lower_price=0, upper_price=500000, \
- # lower_acres=5, upper_acres=15, type=['farm','land','home'], lower_sqft='', upper_sqft='', \
- # lower_bedrooms='', upper_bedrooms=''):
- def __init__(self, file = '../landsearch.conf'):
- self.file = file
- if not path.exists(self.file):
- raise FileNotFoundError("The config file cannot be opened", self.file)
- try:
- config = ConfigParser()
- config.read(self.file)
- search_params = config['Search']
- except FileNotFoundError as err:
- print(err, "Using default search parameters.")
- except Exception as err:
- print(err, "Using default search parameters.")
- county = search_params.get('county', ['Gwinnett', 'Hall', 'Jackson', 'Walton', 'Barrow'])
- if isinstance(county, str):
- county = county.split(", ")
- type = search_params.get('type', ['farm', 'house', 'land'])
- if isinstance(type, str):
- type = type.split(", ")
- self.types=['land', 'farm', 'home', 'house']
- self.county = county
- self.lower_price = search_params.get('lower_price', 0)
- self.upper_price = search_params.get('upper_price', 525000)
- self.lower_acres = search_params.get('lower_acres', 5)
- self.upper_acres = search_params.get('upper_acres', 15)
- self.type = type ##accept list!
- self.lower_sqft = search_params.get('lower_sqft', '')
- self.upper_sqft = search_params.get('upper_sqft', '')
- self.lower_bedrooms = search_params.get('lower_bedrooms', '')
- self.upper_bedrooms = search_params.get('upper_bedrooms', '')
- for property_type in self.type:
- assert property_type in self.types, ("Unknown type '" + property_type + "'. Property Type must be of type: " + str(self.types))
- ## FOR TESTING, PRINT ALL ATTRIBUTES OF SEARCH ##
- # print(vars(self))
-
- class ImproperSearchError(Exception):
- def __init__ (self, search, message="Improper Search. Must use instance of Search class"):
- self.search = search
- self.message = message
- super().__init__(self.message)
- class MLSDATA:
- """Fetches and stores MLS Data
- Currently only supports GeorgiaMLS.com (GMLS)"""
- counties=['Gwinnett', 'Barrow', 'Hall', 'Jackson', 'Walton']
- GoogleAPIKey = 'AIzaSyAXAnpBtjv760W8YIPqKZ0dFXpwAaZN7Es'
- live_google = True
- def __init__ (self, mlstype):
- self.help = "This is a class that will retrieve MLS data from various sources, store the info in a database, and run queries on the data."
- self.mlstype = mlstype.lower() ## Determines what kind of data is to be retreieve (gmls, Zillow, etc)
- self.cursor = ''
- self.cnx = ''
- self.new_listings = []
- def stringbuilder(self, search: Search, county):
- """ Takes Search class and build appropriate URL query based on mlstype. Currently only supports gmls."""
- if self.mlstype == 'gmls':
- base_addr = 'https://www.georgiamls.com/real-estate/search-action.cfm?'
- params = [('cnty', county), \
- ('lpl', search.lower_price), ('lph', search.upper_price), \
- ('acresL', search.lower_acres), ('acresH', search.upper_acres), \
- ('orderBy', 'b'), \
- ('scat', '1'), \
- ('sdsp', 'g')]
- for type in search.type:
- if 'land' in type.lower():
- params.append(('typ', 'll'))
- if 'farm' in type.lower():
- params.append(('typ', 'af'))
- if 'home' in type.lower():
- params.append(('typ', 'sd'))
- if 'house' in type.lower():
- params.append(('typ', 'sd'))
- search_string = base_addr + urllib.parse.urlencode(params)
- print(search_string)
- return search_string
- def break_address(self, address):
- """Takes an address string in the form 'street address|city, state zip' and returns a list"""
- street = address[:address.find('|')]
- csz = address[address.find('|')+1:]
- city = csz[:csz.find(',')]
- st = csz[csz.find(',')+1:].split(' ')[1]
- zip = csz[csz.find(',')+1:].split(' ')[2]
- split_address = [street, city, st, zip]
- return split_address
- def gmlsparser(self, URL, county, pages=''):
- """ Retrieve the website for georgiamls.com and returns a list of Property objects.
- UNIQUE TO GEORGIAMLS.COM ONLY!!"""
- properties_list = []
- r = requests.get(URL)
- soup = BeautifulSoup(r.content, 'html5lib')
- if pages == '':
- try:
- pages = soup.find("div", {'class':'small listing-pagination-count'}).getText().strip().split(" ")[-1]
- current_page = soup.find("div", {'class':'small listing-pagination-count'}).getText().strip().split(" ")[-3]
- except AttributeError as err:
- print("No Results Found.")
- return
- else:
- print('pages already set to: ' + str(pages))
- for page in range(0, int(pages)):
- print('Processing Page: ' + str(page + 1) + ' of ' + str(pages))
- if not page == 0:
- next_URL = URL + '&start=' + str(((12*page)+1))
- soup = BeautifulSoup(requests.get(next_URL).content, 'html5lib')
- raw_listings = soup.findAll("div", {'class':'col-xs-12 col-sm-6 col-lg-4 text-center listing-gallery'})
- for listing in raw_listings:
- items = listing.findAll("p") ##
- site_name = self.mlstype
- MLS = " ".join(items[3].getText().strip()[6:15].split()) ## MLS NUMBER
- title = '' ## Listing Title (address if no title)
- price = items[0].string.strip() ## Price
- if self.mlstype == 'gmls':
- link = 'https://www.georgiamls.com' + listing.a['href']
- detail_request = requests.get(link)
- detail_soup = BeautifulSoup(detail_request.content, 'html5lib')
- details = detail_soup.findAll('tr')
- bedbath = details[1].findAll('td')[1].getText().strip().split('/')
- br = bedbath[0][:-3]
- ba = bedbath[1][:-3]
- baths = ba ## IF House is present
- bedrooms = br ## IF House is present
- for element in details:
- if 'sqft' in element.getText():
- sqft = element.findAll('td')[1].getText().strip()[:-5].replace(',','')
- if 'lot size' in element.getText().lower():
- acres = element.findAll('td')[1].getText().strip()[:-6]
- if 'Property Type' in element.getText():
- ptype = element.findAll('td')[1].getText().strip()
- if 'acreage' in ptype.lower():
- type = 'af'
- elif 'land lot' in ptype.lower():
- type = 'll'
- elif 'single family home' in ptype.lower():
- type = 'sf'
- else:
- type = 'unknown'
- if 'Address' in element.getText():
- address = element.findAll('td')[1]
- street_address = list(address)[0].strip()
- csz = list(address)[2].strip()
- split_address = self.break_address(street_address + '|' + csz)
- description = detail_soup.find('div', {'id':'listing-remarks'}).getText().strip().replace('\t','')
- data = Property(site_name = self.mlstype, \
- type = type, \
- MLS = MLS, \
- bedrooms = bedrooms, \
- baths = baths, \
- sqft = sqft, \
- address = split_address[0], \
- city = split_address[1].title(), \
- st = split_address[2].upper(), \
- zip = split_address[3], \
- county = county.title(), \
- price = price.replace('$','').replace(',',''), \
- acres = acres, \
- description = description, \
- link = link)
- properties_list.append(data)
- print('Scanned: ' + data.address)
- return properties_list
- def getmlsdata(self, search: Search, county):
- """This is the main entrypoint. Takes arguments to pass to stringbuilder to create the URL.
- Selects appropriate parser based on self.mlstype from class intance.
- Needs any modifications from the standard search ($0 to $500,000, 5 to 15 acres, etc)
- See class search for more information.
- --> 9/1/20 - takes Search class as argument. All properties are handled by the class <--"""
- if isinstance(search, Search):
- ##
- # PROGRAM BREAKS HERE - Used to loop for each county, not Search class contains list of counties. Need to automate looping.
- ##
- if not county in self.counties: ### FIX for lower()
- print("County " + county + " not regognized. Exiting")
- else:
- print("Scanning for results in " + county + " using the " + self.mlstype.upper() + " database.")
- if self.mlstype == 'gmls':
- list = self.gmlsparser(self.stringbuilder(search, county), county)
- return list
- else:
- raise ImproperSearchError(search)
- def checkdb(self, criteria_dict):
- """Check dictionary of critera against database.
- Currently accepts keys: MLS, title, address (street number/name, not city/state/zip).
- Returns True if records exists."""
- if self.cursor: ## Check if DB is connected
- for criteria in criteria_dict:
- ## Determine criteria passed, and execute queries for each
- if criteria == 'MLS':
- self.cursor.execute("SELECT COUNT(*) FROM properties WHERE MLS = %(MLS)s GROUP BY id", {criteria:criteria_dict[criteria]})
- if self.cursor.rowcount > 0: return self.cursor.rowcount # stop for loop if match already found.
- elif criteria == 'title':
- self.cursor.execute("SELECT COUNT(*) FROM properties WHERE title = %(title)s GROUP BY id", {criteria:criteria_dict[criteria]})
- if self.cursor.rowcount > 0: return self.cursor.rowcount # stop for loop if match already found.
- elif criteria == 'address':
- self.cursor.execute("SELECT COUNT(*) FROM properties WHERE address = %(address)s GROUP BY id", {criteria:criteria_dict[criteria]})
- if self.cursor.rowcount > 0: return self.cursor.rowcount # stop for loop if match already found.
- else:
- print("Cannot search on parameter: " + criteria)
- return self.cursor.rowcount
- else:
- print("Database is not connected or cursor not filled. Use function 'connectdb()' to establish")
- def getGoogle(self, property):
- """Supplies date from Google Distance Matrix API to populate
- distance_to_work
- time_to_work
- distance_to_school
- time_to_school
- Costs money, so it should only be called when inserting a new db record.
- Returns distance in METERS (1m = 0.000621371 mi) and time in SECONDS
- returns fully populated Propery object."""
- print("Fetching live Google Data. $$")
- # Build Request
- destination1 = 'Hebron Christian Acadamy' ## Working query for Hebron Christian Acadamy
- destination2 = 'JHRJ+FJ Atlanta, Georgia' ## Plus code for Hourly parking at Int'l Terminal, KATL
- params = {}
- params['units'] = 'imperial'
- params['origins'] = property.address + ', ' + property.city + ' ' + property.st
- params['destinations'] = 'Hebron Christian Acadamy|JHRJ+FJ Atlanta, Georgia'
- params['key'] = self.GoogleAPIKey
- baseURL = 'https://maps.googleapis.com/maps/api/distancematrix/json?'
- API_URL = baseURL + urllib.parse.urlencode(params)
- # print(API_URL)
- # Send Request and capture result as json
- try:
- google_result = requests.get(API_URL).json()
- if google_result['status'] == 'OK':
- property.distance_to_school = google_result['rows'][0]['elements'][0]['distance']['value']
- property.time_to_school = google_result['rows'][0]['elements'][0]['duration']['value']
- property.distance_to_work = google_result['rows'][0]['elements'][1]['distance']['value']
- property.time_to_work = google_result['rows'][0]['elements'][1]['duration']['value']
- except:
- print("ERROR: Failed to obtain Google API data")
- #Load sample data for testing:
- # with open('complex.json') as f:
- # data = json.load(f)
- # google_result = data
- ### end testing json ###
- def insertrecord(self, property, work_address=None, school_address=None):
- """Inserts record into database. Takes argument Property class object.
- FUTURE - add date_added field to insert operation."""
- if self.cursor:
- criteria_dict = property.__dict__
- criteria_dict['Date_Added'] = str(datetime.date.today())
- placeholder_columns = ", ".join(criteria_dict.keys())
- placeholder_values = ", ".join([":{0}".format(col) for col in criteria_dict.keys()])
- qry = "INSERT INTO properties ({placeholder_columns}) VALUES {placeholder_values}".format(placeholder_columns=placeholder_columns, placeholder_values=tuple(criteria_dict.values()))
- self.cursor.execute(qry)
- self.cnx.commit()
- print("Inserted " + criteria_dict['MLS'] + " | " + criteria_dict['address'] + " into database.")
- else:
- print("Database is not connected or cursor not filled. Use function 'connectdb()' to establish")
- def connectdb(self, host='192.168.100.26', user='landsearchuser', password='1234', database='landsearch'):
- """Connects to database and returns a cursor object"""
- self.cnx = mysql.connector.connect(host=host, user=user, password=password, database=database, buffered=True)
- self.cursor = self.cnx.cursor()
- return self.cursor
- def closedb(self):
- """Cleanly close the db."""
- self.cursor.close()
- self.cnx.close()
- def dbinsert(self, properties: list):
- """Inserts records into database. Takes list of Property class objects"""
- if not properties == None:
- if not isinstance(properties, list):
- raise TypeError('type list required')
- for property in properties:
- if not self.checkdb({'MLS': property.MLS, 'address': property.address}):
- if self.live_google: self.getGoogle(property) ## <- This will populate distance and time fields if set TRUE
- self.insertrecord(property)
- self.new_listings.append(property)
- else:
- print(property.MLS + ' | ' + property.address + ' is already in db. Not inserted.')
- ##REMOVE FOR TESTING###
- # self.new_listings.append(property)
- #######################
- else:
- print("Empty dataset. No records to insert.")
- def alerts(self):
- pass
- def email(self):
- body = ''
- data = []
- subj = "New Real Estate Listings for " + str(datetime.date.today())
- for listing in self.new_listings:
- row = []
- body += listing.MLS + " | " + listing.address + " | " + listing.acres + " | " + listing.price + " | " + listing.link + "\n"
- row.append(listing.MLS)
- row.append(listing.address)
- row.append('{:0,.2f}'.format(float(listing.acres)))
- row.append(listing.sqft)
- row.append('${:0,.0f}'.format(int(listing.price)))
- row.append(listing.time_to_school/60 if hasattr(listing, 'time_to_school') else 'NA')
- row.append(listing.link)
- data.append(row)
- body = """\
- Daily Real Estate Search Report\n
- The following properties have been found which may be of interest.\n
- """
- results = tabulate(data, headers=['MLS', 'Address', 'Acres', 'sqft', 'Price', 'Time to School', 'link'])
- body += results
- sendto = ['stagl.mike@gmail.com', 'M_Stagl@hotmail.com']
- mymail = custom_email.simplemail(subj, body, sendto)
-
- if len(self.new_listings) > 0:
- try:
- mymail.sendmail()
- except Exception as e:
- print("Error sending email. " + e)
- else:
- print("No new listings. Email not sent")
- # REMOVE AFTER TESTING #
- mymail.sendmail()
- ########################
- ########### BEGIN CODE ###############33
- if __name__ == '__main__':
- gmls = MLSDATA('GMLS') # Create MLSDATA object
- mysearch = Search() # Create a custom search object
- # print(len(mysearch.county))
- # print(mysearch.county[0])
- myresults = []
-
- ## Create function in MLSDATA module:
- # - takes counties from configparser and calls getmlsdata for each county.
- # - Compiles results into single list and returns that list
- # - User code would look something like this:
- # _ mysearch = Search()
- # _ mydata = gmls.findalllistings(mysearch) # This would control the looping of counties and return a list like normal
- # _ gmls.dbinsert(myresults) # This would automate db opening and closing
- for county in mysearch.county:
- print("local search: ", county)
- mysearch = Search() ## Search used to take county as parameter, so this loop would work. Now Search class contains list. loop must occur in getmlsdata module
- mydata = gmls.getmlsdata(mysearch, county)
- for listing in mydata:
- myresults.append(listing)
- # print(len(myresults))
- # print(myresults[0].address)
- gmls.connectdb()
- gmls.dbinsert(myresults)
- gmls.closedb()
- #
- # gmls.email()
- #
- #print()
- #print(str(len(gmls.new_listings)) + " new properties found!")
- #print()
- #for listing in gmls.new_listings:
- # print(listing.MLS, listing.address)
|