Source code for records

#!/usr/bin/python3
import argparse
import sys
import os
import time
from timeout import timeout
import logging
from pyvirtualdisplay import Display
import csv
from logging.handlers import TimedRotatingFileHandler
import json
import math
from selenium.common.exceptions import *
import initium
import string
import re

[docs]def create_timed_rotating_log(path): # Create logger logger = logging.getLogger("Rotating Log") logger.setLevel(logging.INFO) # Create formatter logFormatter = logging.Formatter("%(asctime)s [%(levelname)-5.5s] %(message)s") # Rotating file handler fileHandler = TimedRotatingFileHandler(path, when="midnight", interval=1, backupCount=5) fileHandler.setFormatter(logFormatter) logger.addHandler(fileHandler) # Console output handler consoleHandler = logging.StreamHandler() consoleHandler.setFormatter(logFormatter) logger.addHandler(consoleHandler) return logger
[docs]class Records(initium.webdriver, initium.initium): def __init__(self): self.cfg = self._parse_config() logger.info("Initializing webdriver") # Hax super().__init__() logger.info("Connecting to Initium") self.get("http://www.playinitium.com") logger.info("Logging in as {0}".format(self.cfg["uname"])) # Enter email login = self.find_element_by_name("email") login.send_keys(self.cfg["email"]) # Enter pw login = self.find_element_by_name("password") login.send_keys(self.cfg["pw"]) for button in self.find_elements_by_class_name( "main-button"): if button.text == "Login": button.click() def _parse_config(self): logger.info('Parsing command line arguments') self._parser = argparse.ArgumentParser(description="Bower - A Python based bot for Initium - http://playinitium.com") self._parser.add_argument("-c", "--config", dest="config_file", default="../cfg.json", help="Configuration to use (default=../cfg.json)") self._parser.add_argument("-d", "--do_chat", dest="do_chat", action="store_true", help="Handle commands in each rooms local chat") self._parser.add_argument('-m', '--map', dest='map_file', default='map.json', help='Map file to use (default=map.json)') self._parser.add_argument('-l', '--headless', help='Run in headless operation mode if HEADLESS=true/True/T') self.args = self._parser.parse_args() if self.args.headless == 'true' or self.args.headless == 'True' or self.args.headless == 'T' or self.args.headless == 'TRUE': display = Display(visible=0, size=(800,600)) display.start() logger.info("Reading config") if os.path.isfile(self.args.config_file): with open(self.args.config_file, encoding="utf-8") as data_file: return json.loads(data_file.read()) else: logger.error("Error: Config: {0} not found".format(self.args.config_file)) sys.exit(1)
[docs] def score_item(self): ## Score any item by determining its type and calling appropriate functions # Sleep for a time to ensure page has updated with new item info time.sleep(1) # Define Variables score = -1 ## First determine what type of item it is # Check <p> tag in popup paragraph_element = self.find_elements_by_xpath('//div[contains(@class,"cluetip-inner") and contains(@class, "ui-widget-content") and contains(@class, "ui-cluetip-content")]/div[@class="main-page"]/p')[0] #paragraph_element = paragraph_element.find_elements_by_class_name("main-page")[0].find_elements_by_tag_name("p")[0] paragraph_text = paragraph_element.get_attribute("innerHTML").lower() # Weapon or Armor? if "weapon" in paragraph_text: score = self.calculate_score_weapon() else: if "armor" in paragraph_text or "shield" in paragraph_text: if "shirt" in paragraph_text: score = self.calculate_score_armor(shirt=True) else: score = self.calculate_score_armor(shirt=False) else: # Not weapon or armor so we don't score it! #logger.warning("Ignoring non-equipment item.") # Score remains -1 pass return score
[docs] def calculate_score_armor(self, chance=0, reduction=0, penalty=0, shirt=False): ## Calculates an armor's score based on block chance, damage reduction, and dexterity penalty ## Note this calculation varies if a shirt is tested start = time.time() DEF_MAX_RUNTIME = 14.9 loaded = False while not loaded: try: # Check if we've run for too long end = time.time() if end-start > DEF_MAX_RUNTIME: logger.warning("Score calculation timed out!") return -1 # Build stat list elements = self.find_elements_by_class_name("main-item-subnote") chance = int(elements[0].get_attribute("innerHTML").split("%")[0]) reduction = int(elements[1].get_attribute("innerHTML")) penalty = int(elements[2].get_attribute("innerHTML").split("%")[0]) loaded = True except e: logger.error(e) return -1 ## Actual calculation # Is the item a shirt? if shirt: score = 1.7 * ((chance / (penalty + 1.0))) * (reduction / 13.33) else: # Must be normal armor then score = chance * (reduction / 25) - penalty # probably should be more complex return score
# @timeout(60)
[docs] def calculate_score_weapon(self, dice=0, sides=0, chance=0, mult=0): ## Calculates a weapon's score based on damage and critical stats # Ensure the popup loads but we don't wait too long start = time.time() DEF_MAX_RUNTIME = 14.9 loaded = False while not loaded: try: # Check if we've run for too long end = time.time() if end-start > DEF_MAX_RUNTIME: logger.warning("Score calculation timed out!") return -1 # Build stat list elements = self.find_elements_by_class_name("main-item-subnote") damage = elements[0].get_attribute("innerHTML").split("D") # Is this actually a block chance stat and thus armor piece? if "%" in damage: logger.warning("Inspected armor as weapon!") return self.calculate_score_armor() # Call armor instead try: dice = int(damage[0]) except ValueError: #logger.warning("Inspected armor item!") return -1 # Looks like a weapon, go on sides = int(damage[1]) chance = float(elements[1].get_attribute("innerHTML").split("%")[0]) mult = float(elements[2].get_attribute("innerHTML").split("x")[0]) loaded = True except: pass # Actual calculation score = dice + 0.4 * math.pow((dice * sides), 1.5) * (mult / 4.0) * math.pow(2.2, (1.0+(chance*3/100.0))) return score
[docs] def load_records(self, file_path): ## Loads records from a file and returns a 2 dimensional array of all records in the file ## Define Variables item_records = [] ## Open the file # First check if file exists if os.path.isfile(file_path): # File exists, load it in records_file = open(file_path, 'r') reader = csv.reader(records_file) for row in reader: item_records.append(row) records_file.close() ## Now return the array return item_records
[docs] def remove_record(self, file_path, rank): ## Deletes an individual item by rank from the records file ## Note Rank is 1-infinity where 1 is the best. -1 represents ALL records # Load the records records = self.load_records(file_path) if rank == -1: ## Delete ALL records # Simply delete the file try: os.remove(file_path) logger.info("Successfully removed all entries of the item.") return except: logger.error("An error occurred while deleting the file: " + file_path) else: # Delete the item del records[rank-1] ## Save new records # Open file for writing records_file = open(file_path, 'w') # Now write to the file, overwriting it. writer = csv.writer(records_file) writer.writerows(records) # Close the file records_file.close() logger.info("Successfully removed item.") return self.say("Global", "Successfully removed the item(s).")
[docs] def save_records(self, item_name, item_id, item_score): ## Loads the file, then saves the item to it ## Saves as: [item_score, item_id] # Variables for scope item_records = [] records_file = None result = -1 ## Load records from file path path = "../logs/records/items/"+item_name+".txt" item_records = self.load_records(path) ## Determine if the item already exists in records for each in item_records: if str(each[1]) == str(item_id): # ID is found in records already return result ## Now open file for writing records_file = open(path, 'w') ## Now append new item if len(item_records) == 0: item_records.append([item_score, item_id]) result = 0 else: done = False for each in item_records: if item_score > float(each[0]): result = item_records.index(each) # Where we place the item info item_records.insert(result, [item_score, item_id]) done = True break if not done: item_records.append([item_score, item_id]) ## Now write to the file, overwriting it. writer = csv.writer(records_file) writer.writerows(item_records) ## Finish: close file and log success records_file.close() logger.info("Successfully saved item") return result
if __name__ == "__main__": # Create a logger for all functions to use main_log_file = "../logs/records/main.log" logger = create_timed_rotating_log(main_log_file) # Create the bot, which begins login process Bot = Records() ## We will ignore the DEF_MAX_RECENT_REQUESTS # of recent requests ## In other words, if a lookup request was one of the DEF_MAX_RECENT_REQUESTS most recent requests, we will ignore it # First define variables recent_lookup_requests = [] DEF_MAX_RECENT_REQUESTS = 6 ## Now let's repopulate the recent requests incase we crash after answering a request reqauth, reqmsg = [], [] while reqauth == 0 or len(reqauth) == 0: reqauth, reqmsg = Bot.update_messages("Global") for each in reqmsg: #print("Author: " + reqauth[reqmsg.index(each)] + ". Msg: " + each) if "#" in each and "[Bot] Bower" == reqauth[reqmsg.index(each)]: ## If we sent a request here, append the item to the recent list # Identify the item item = each.split(" ", 2)[2] item = item.split(" with a score")[0] # Append it if we haven't already if item not in recent_lookup_requests: recent_lookup_requests.append(item) # If recent_lookup_requests is too large, resize it while len(recent_lookup_requests) > DEF_MAX_RECENT_REQUESTS: recent_lookup_requests = recent_lookup_requests[1:DEF_MAX_RECENT_REQUESTS] # Inform administrator of recent lookups logger.warning("Assuming recent lookups of: " + str(recent_lookup_requests)) # Make sure we don't accidentally repeat admin commands last_admin_command = None # See if we have a recent admin command for each in reqmsg[0:30]: if "bower rm" in each: last_admin_command = each break logger.warning("Assuming last admin command of: " + str(last_admin_command)) # Tell logger we're ready to operate logger.info("Now listening for item shares") while True: ## Look in Global chat Bot.update_messages("Global") ## Check for item shares items = Bot.find_elements_by_class_name("chat-embedded-item") # Iterate through each, saving them all score = 0 # Get item stats if any try: stats = Bot.get_item_stats() except: stats = None for each in items: last_score = score last_stats = stats try: # Click the item link = each.find_elements_by_tag_name("a")[0] # for some reason, sometimes this winds up being a share button. # so for now, DO NOT CLICK if it says share if link.get_attribute("innerHTML") != "Share": link.click() #print(str(Bot.get_item_stats())) # Calculate score of popup item score = Bot.score_item() except ElementNotVisibleException as e: #logger.error(str(e)) #logger.warning("Item not visible, checking for lookup requests instead.") break except StaleElementReferenceException as e: logger.error(str(e)) logger.error("StaleElementReferenceException occurred! ! ! !") logger.error("Checking for lookup requests instead.") break except Exception as e: logger.error(str(e)) break ## Attempting to filter issues # Check scores if last_score == score: #oops we broke it. skip item continue if score != 0: # Do we have stats on the item? stats = Bot.get_item_stats() if not stats['Block Chance'] and not stats['Dice Quantity'] and not stats['Dice Sides']: # We don't have dice or block chance, so skip it continue # Now check item stats start = time.time() escape = False while stats == last_stats: stats = Bot.get_item_stats() # Make sure we dont run too long if time.time() - start >= 15: logger.warning("Took too long to load stats! Skipping item.") score = 0 escape = True break if escape: continue if float(score) <= 2.0 or float(score) >= 400.0: # 1.64 default, 400 never been seen # Inspeceed item was not a scorable item, so skip to next item continue # Get item ID from "rel="viewitemmini.jsp?itemId=4961250779856896" itemID = link.get_attribute("rel").split("itemId=")[1] # Get the item name original_item_name = ' '.join(link.find_elements_by_tag_name("div")[1].get_attribute("innerHTML").split()) item_name = ''.join(link.find_elements_by_tag_name("div")[1].get_attribute("innerHTML").split()) item_name = re.sub('(?!\s)[\W_]', '', item_name) # Need to also strip spaces # Save the item result = Bot.save_records(item_name, itemID, score) if result != -1: logger.info("Found #" + str(result+1) + " "+ original_item_name + ". Score: " + str(score)) author_element = link.find_elements_by_xpath('../../..')[0].find_elements_by_tag_name("span")[1].find_elements_by_tag_name("a")[0] if result == 0: ## Best of its kind # Get author name author = author_element.get_attribute("innerHTML") # Announce in global chat and private message Bot.say("Global", author+" discovered the best Item("+itemID+"), with a score of "+"{:10.2f}".format(score) + "!" ) time.sleep(1) # Avoiding chat ban Bot.reply(author_element, "Your Item(" + itemID + ") is the BEST so far!! Score: " + "{:10.2f}".format(score) + "!" ) else: if result > 0: # Whisper author pass Bot.reply(author_element, "Your Item(" + itemID + ") is the #" + str(result+1) + "in the game! Score: " + "{:10.2f}".format(score) + "!" ) ## Listen for requests in Global chat ## "Bower Longsword" or "Bower The Black Blade" authors, messages = Bot.update_messages("Global") for each in messages[:14]: if "bower " in each.lower(): ## Did Rade ask us? # NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE # NOTE: SLACK SUPPORT VULNERABILITY HERE! NOTE # NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE if "rade" in authors[messages.index(each)].lower() or "rade: bower rm " in each: # If this command is new, if each != last_admin_command: # Then make sure we don't run it again last_admin_command = each if " rm " in each: logger.info("Received remove request from Rade") # Get the item name and file name goal = each.split("rm ")[1] goal = goal.split(" ") rank = goal[len(goal)-1] goal = goal[0:len(goal)-1] # Chop off rank file_goal = ''.join(goal) file_goal = re.sub('(?!\s)[\W_]', '', file_goal) ## Now remove the file if "all" == rank: # Remove ALL values form the file Bot.remove_record("../logs/records/items/"+file_goal+".txt", -1) else: # Remove specified values from the file Bot.remove_record("../logs/records/items/"+file_goal+".txt", int(rank)) # Restart the bot sys.exit(0) logger.info("Processing lookup request...") goal = None try: goal = each.split("ower ")[1] # Everything after bower file_goal = ''.join(goal.split()) file_goal = re.sub('(?!\s)[\W_]', '', file_goal) if goal in recent_lookup_requests: # Ignore the request logger.warning("Ignoring recent request!") continue logger.info("Received lookup request for \"" + goal + "\" in file \"" + file_goal + ".txt\"") except e: logger.error("An error occurred while procesing lookup request") logger.error(e) continue item_records = Bot.load_records("../logs/records/items/"+file_goal+".txt") # Mention first 3 in global try: Bot.say("Global", "#1 Item(" + item_records[0][1] + ") with a score of " + "{:10.2f}".format(float(item_records[0][0])) + "!!!") time.sleep(1) Bot.say("Global", "#2 Item(" + item_records[1][1] + ") with a score of " + "{:10.2f}".format(float(item_records[1][0])) + "!!") time.sleep(1) Bot.say("Global", "#3 Item(" + item_records[2][1] + ") with a score of " + "{:10.2f}".format(float(item_records[2][0])) + "!") except IndexError as e: logger.warning("An error occurred processing top 3 of " + goal) logger.error(e) # Add the goal to recent_lookup_requests so we don't spam it. recent_lookup_requests.append(goal) # Take a break before continuing, chat ban avoiding. time.sleep(1) #If recent_lookup_requests is too large, cut out the first one if len(recent_lookup_requests) > DEF_MAX_RECENT_REQUESTS: recent_lookup_requests = recent_lookup_requests[1:DEF_MAX_RECENT_REQUESTS] logger.error("OUTSIDE WHILE TRUE LOOP!")