Merge branch 'dev'

+ added more comments to xep requests
+ added opt_arg to version, xep and contact
+ implemented data dictionary to hold all data in main bot
+ added message_ids

* updated gitignore file
* partly reworked servercontact implementation
* complete rework of uptime, version
* part rework of xep requests
* complete rework of validate function
* updated HandleError function
* part rework of StaticStrings function
* complete rework of queue building and deduplication
* logging parameter fix
This commit is contained in:
nico 2018-11-11 03:12:11 +01:00
commit 69951bba37
9 changed files with 435 additions and 259 deletions

4
.gitignore vendored
View file

@ -60,6 +60,6 @@ target/
# .idea
.idea
.etag
bot\.cfg
xeplist.xml
common/xeplist.xml
common/.etag

View file

@ -1,108 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# XEP-0072: Server Version
class Version:
def __init__(self, version, msg, target):
self.version = version['software_version']['version']
self.os = version['software_version']['os']
self.name = version['software_version']['name']
self.nick = msg['mucnick']
self.message_type = msg['type']
self.target = target
def format_version(self):
if self.message_type == "groupchat":
text = "%s: %s is running %s version %s on %s" % (self.nick, self.target, self.name, self.version, self.os)
else:
text = "%s is running %s version %s on %s" % (self.target, self.name, self.version, self.os)
return text
# XEP-0012: Last Activity
class LastActivity:
""" query the server uptime of the specified domain, defined by XEP-0012 """
def __init__(self, last_activity, msg, target):
self.last_activity = last_activity
self.nick = msg['mucnick']
self.message_type = msg['type']
self.target = target
def format_values(self, granularity=4):
seconds = self.last_activity['last_activity']['seconds']
uptime = []
intervals = (
('years', 31536000), # 60 * 60 * 24 * 365
('weeks', 604800), # 60 * 60 * 24 * 7
('days', 86400), # 60 * 60 * 24
('hours', 3600), # 60 * 60
('minutes', 60),
('seconds', 1)
)
for name, count in intervals:
value = seconds // count
if value:
seconds -= value * count
if value == 1:
name = name.rstrip('s')
uptime.append("{} {}".format(value, name))
result = ' '.join(uptime[:granularity])
if self.message_type == "groupchat":
text = "%s: %s is running since %s" % (self.nick, self.target, result)
else:
text = "%s is running since %s" % (self.target, result)
return text
# XEP-0157: Contact Addresses for XMPP Services
class ContactInfo:
def __init__(self, contact, msg, target):
self.contact = contact
self.message = msg
self.target = target
def format_contact(self):
server_info = []
sep = ' , '
possible_vars = ['abuse-addresses',
'admin-addresses',
'feedback-addresses',
'sales-addresses',
'security-addresses',
'support-addresses']
for field in self.contact['disco_info']['form']:
var = field['var']
if var in possible_vars:
field_value = field.get_value(convert=False)
value = sep.join(field_value) if isinstance(field_value, list) else field_value
server_info.append(' - %s: %s' % (var, value))
if server_info:
text = "contact addresses for %s are" % self.target
for count in range(server_info.__len__()):
text += "\n" + server_info[count]
else:
text = "%s has no contact addresses configured." % self.target
return text
# class handeling XMPPError exeptions
class HandleError:
def __init__(self, error, msg, key, target="target missing"):
self.error = error
self.message = msg
self.key = key
self.target = target
def build_report(self):
condition = self.error.condition
keyword = self.key[1:]
text = "There was an error requesting " + self.target + '\'s ' + keyword + " : " + condition
return text

106
classes/servercontact.py Normal file
View file

@ -0,0 +1,106 @@
# -*- coding: utf-8 -*-
import defusedxml.ElementTree as Et
# XEP-0157: Contact Addresses for XMPP Services
class ServerContact:
"""
plugin to process the server contact addresses from a disco query
"""
def __init__(self):
# init all necessary variables
self.possible_vars = ['abuse-addresses',
'admin-addresses',
'feedback-addresses',
'sales-addresses',
'security-addresses',
'support-addresses']
self.contact = None
self.target, self.opt_arg = None, None
def opt_arg_abbreviation(self):
"""
optional argument abbreviation function
if the provided string > 2 characters the most likely key will be chosen
:return: completes the opt_arg to the most likely one
"""
# if opt_argument is smaller then 2 pass to prohibit multiple answers
if len(self.opt_arg) < 2:
pass
abbr = str(self.opt_arg)
possible_abbr = ["abuse-addresses", "admin-addresses", "feedback-addresses", "sales-addresses",
"security-addresses", "support-addresses"]
# searches the best match in the list of possible_abbr and completes the opt_arg to that
self.opt_arg = [s for s in possible_abbr if s.startswith(abbr)][0]
def process(self):
# optional argument abbreviation
self.opt_arg_abbreviation()
# get etree from base xml
iq = Et.fromstring(str(self.contact))
# check if query is a valid result query
if iq.find('{http://jabber.org/protocol/disco#info}query'):
# only init result dict if result query is present
result = dict()
# extract query from iq
query = iq.find('{http://jabber.org/protocol/disco#info}query')
# extract jabber:x:data from query
xdata = query.findall('{jabber:x:data}x')
# iterate over all nodes with the xdata tag
for node in xdata:
# iterate over all child elements in node
for child in node:
# if one opt_arg is defined return just that one
if self.opt_arg in self.possible_vars:
if child.attrib['var'] == self.opt_arg:
# add section to result dict and append info
result[child.attrib['var']] = list()
for value in child:
result[child.attrib['var']].append(value.text)
# if node has a var attribute that matches our list process
elif child.attrib['var'] in self.possible_vars:
# add section to result dict and append info
result[child.attrib['var']] = list()
for value in child:
result[child.attrib['var']].append(value.text)
return result
def format(self, query, target, opt_arg):
self.contact = query
self.target = target
self.opt_arg = opt_arg
result = self.process()
# if result is present continue
if result:
text = "contact addresses for %s are\n" % self.target
# if opt_arg is present and member of possible_vars change text line
if self.opt_arg in self.possible_vars:
text = "%s for %s are\n" % (self.opt_arg, self.target)
for key in result.keys():
addr = ' , '.join(result[key])
text += "- %s : %s\n" % (key, addr)
else:
text = "%s has no contact addresses configured." % self.target
# if opt_arg is present and member of possible_vars but the key is empty change text line
if self.opt_arg in self.possible_vars:
text = "%s for %s are not defined." % (self.opt_arg, self.target)
return text

50
classes/uptime.py Normal file
View file

@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
# XEP-0012: Last Activity
class LastActivity:
"""
query the server uptime of the specified domain, defined by XEP-0012
"""
def __init__(self):
# init all necessary variables
self.last_activity = None
self.target, self.opt_arg = None, None
def process(self, granularity=4):
seconds = self.last_activity['last_activity']['seconds']
uptime = []
# touple with displayable time sections
intervals = (
('years', 31536000), # 60 * 60 * 24 * 365
('weeks', 604800), # 60 * 60 * 24 * 7
('days', 86400), # 60 * 60 * 24
('hours', 3600), # 60 * 60
('minutes', 60),
('seconds', 1)
)
# for every element in possible time section process the seconds
for name, count in intervals:
value = seconds // count
if value:
seconds -= value * count
if value == 1:
name = name.rstrip('s')
uptime.append("{} {}".format(value, name))
result = ' '.join(uptime[:granularity])
# insert values into result string
text = "%s is running since %s" % (self.target, result)
return text
def format(self, query, target, opt_arg):
self.last_activity = query
self.target = target
self.opt_arg = opt_arg
reply = self.process()
return reply

39
classes/version.py Normal file
View file

@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
# XEP-0072: Server Version
class Version:
"""
process and format a version query
"""
def __init__(self):
# init all necessary variables
self.software_version = None
self.target, self.opt_arg = None, None
def format_result(self):
# list of all possible opt_arg
possible_opt_args = ["version", "os", "name"]
name = self.software_version['name']
version = self.software_version['version']
os = self.software_version['os']
# if opt_arg is given member of possible_opt_args list return that element
if self.opt_arg in possible_opt_args:
text = "%s: %s" % (self.opt_arg, self.software_version[self.opt_arg])
# otherwise return full version string
else:
text = "%s is running %s version %s on %s" % (self.target, name, version, os)
return text
def format(self, query, target, opt_arg):
self.software_version = query['software_version']
self.target = target
self.opt_arg = opt_arg
reply = self.format_result()
return reply

View file

@ -1,19 +1,17 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import requests
import defusedxml.ElementTree as ET
import defusedxml.ElementTree as Et
class XEPRequest:
def __init__(self, msg, xepnumber):
"""
class which requests the header of the referenced xep
:param xepnumber: number int or str to request the xep for
"""
self.message_type = msg['type']
self.muc_nick = msg['mucnick']
"""
class which requests the header of the referenced xep
"""
def __init__(self):
# init all necessary variables
self.reqxep, self.opt_arg = None, None
self.reqxep = str(xepnumber)
self.xeplist = None
self.acceptedxeps = list()
@ -21,30 +19,40 @@ class XEPRequest:
"""
query and save the current xep list to reduce network bandwidth
"""
try:
with open(".etag") as file:
# check if etag header is present if not set local_etag to ""
if os.path.isfile("./common/.etag"):
with open("./common/.etag") as file:
local_etag = file.read()
except FileNotFoundError:
else:
local_etag = ""
with requests.Session() as s:
# head request the xeplist.xml
s.headers.update({'Accept': 'application/xml'})
head = s.head("https://xmpp.org/extensions/xeplist.xml")
etag = head.headers['etag']
# compare etag with local_etag if they match up no request is made
if local_etag == etag:
with open("xeplist.xml", "r") as file:
self.xeplist = ET.fromstring(file.read())
with open("./common/xeplist.xml", "r") as file:
self.xeplist = Et.fromstring(file.read())
# if the connection is not possible use cached xml if present
elif os.path.isfile("./common/xeplist.xml") and head.status_code != 200:
with open("./common/xeplist.xml", "r") as file:
self.xeplist = Et.fromstring(file.read())
# in any other case request the latest xml
else:
r = s.get("https://xmpp.org/extensions/xeplist.xml")
r.encoding = 'utf-8'
local_etag = head.headers['etag']
with open("xeplist.xml", "w") as file:
with open("./common/xeplist.xml", "w") as file:
file.write(r.content.decode())
self.xeplist = ET.fromstring(r.content.decode())
self.xeplist = Et.fromstring(r.content.decode())
with open('.etag', 'w') as string:
with open('./common/.etag', 'w') as string:
string.write(local_etag)
# populate xep comparison list
@ -54,34 +62,57 @@ class XEPRequest:
def get(self):
"""
function to query the xep entry if xepnumber is present in xeplist
:return: nicely formatted xep header information
:return: formatted xep header information
"""
# all possible subtags grouped by location
last_revision_tags = ["date", "version", "initials", "remark"]
xep_tags = ["number", "title", "abstract", "type", "status", "approver", "shortname", "sig", "lastcall", "date", "version", "initials", "remark"]
# check if xeplist is accurate
self.req_xeplist()
result = list()
# if requested number is inside acceptedxeps continou
if self.reqxep in self.acceptedxeps:
# if requested number is member of acceptedxeps continue
if str(self.reqxep) in self.acceptedxeps:
searchstring = ".//*[@accepted='true']/[number='%s']" % self.reqxep
for item in self.xeplist.findall(searchstring):
for x in range(1,5):
result.append(item[x].tag + " : " + item[x].text)
# if the opt_arg references is member of xeptag return only that tag
if self.opt_arg in xep_tags:
# if the opt_arg references is member of last_revision_tags return only that subtag
if self.opt_arg in last_revision_tags:
query = item.find("last-revision").find(self.opt_arg)
else:
query = item.find(self.opt_arg)
# append opt_arg results to the result list
if query is not None:
result.append("%s : %s" % (query.tag, query.text))
else:
result.append("%s does not have a %s tag." % (self.reqxep, self.opt_arg))
# in any other case return the general answer
else:
result_opts = ["title", "type", "abstract", "status"]
for tag in result_opts:
result.append(item.find(tag).text)
# if the requested number is no member of acceptedxeps and/or not accepted return error.
else:
if self.message_type == "groupchat":
result.append(self.muc_nick + " : " + "XEP-" + str(self.reqxep) + " : is not available.")
else:
result.append("XEP-" + str(self.reqxep) + " : is not available.")
result.append("XEP-%s : is not available." % self.reqxep)
return result
def format(self):
def format(self, query, target, opt_arg):
"""
:param target: number int or str to request the xep for
:return:
"""
self.reqxep = int(target)
self.opt_arg = opt_arg
reply = self.get()
if self.message_type == "groupchat":
text = "%s: " % self.muc_nick
reply[0] = text + reply[0]
text = '\n'.join(reply)
return text

62
common/misc.py Executable file
View file

@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
import validators
from common.strings import StaticAnswers
def deduplicate(reply):
"""
list deduplication method
:param list reply: list containing non unique items
:return: list containing unique items
"""
reply_dedup = list()
for item in reply:
if item not in reply_dedup:
reply_dedup.append(item)
return reply_dedup
def validate(keyword, target):
"""
validation method to reduce malformed querys and unnecessary connection attempts
:param keyword: used keyword
:param target: provided target
:return: true if valid
"""
# if keyword in domain_keywords list
if keyword in StaticAnswers().keys('domain_keywords'):
# if target is a domain / email return True
if validators.domain(target) or validators.email(target):
return True
# check if keyword is in number_keyword list
elif keyword in StaticAnswers().keys('number_keywords'):
# if target only consists of digits return True
return target.isdigit()
# if keyword is in no_arg_keywords list return True
elif keyword in StaticAnswers().keys("no_arg_keywords"):
return True
# if the target could not be validated until this return False
return False
#
class HandleError:
"""
simple XMPP error / exception class formating the error condition
"""
def __init__(self, error, key, target):
# init all necessary variables
self.text = error.text
self.condition = error.condition
self.key = key
self.target = target
def report(self):
# return the formatted result string to the user
text = "%s. %s %s resulted in: %s" % (self.text, self.key, self.target, self.condition)
return text

View file

@ -13,11 +13,13 @@ class StaticAnswers:
'version': '!version domain.tld -- receive XMPP server version',
'uptime': '!uptime domain.tld -- receive XMPP server uptime',
'contact': '!contact domain.tld -- receive XMPP server contact address info',
'xep': '!xep XEP Number -- recieve information about the specified XEP'}
'xep': '!xep XEP Number -- recieve information about the specified XEP'
}
self.possible_answers = {
'1': 'I heard that, %s.',
'2': 'I am sorry for that %s.',
'3': '%s did you try turning it off and on again?'}
'3': '%s did you try turning it off and on again?'
}
self.error_messages = {
'1': 'not reachable',
'2': 'not a valid target'
@ -29,14 +31,13 @@ class StaticAnswers:
"number_keywords": ["!xep"]
}
def keys(self, arg="", keyword='keywords'):
if arg == 'list':
try:
return self.keywords[keyword]
except KeyError:
return self.keywords['keywords']
else:
return self.keywords
def keys(self, key=""):
# if specific keyword in referenced return that
if key in self.keywords.keys():
return self.keywords[key]
# in any other case return the whole dict
return self.keywords["keywords"]
def gen_help(self):
helpdoc = "\n".join(['%s' % value for (_, value) in self.helpfile.items()])

209
main.py Normal file → Executable file
View file

@ -1,24 +1,25 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Slixmpp: The Slick XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of Slixmpp.
James the MagicXMPP Bot
build with Slick XMPP Library
Copyright (C) 2018 Nico Wellpott
See the file LICENSE for copying permission.
"""
import asyncio
import slixmpp
import ssl
import validators
import configparser
import logging
from argparse import ArgumentParser
from slixmpp.exceptions import XMPPError
from classes.strings import StaticAnswers
from classes.functions import Version, LastActivity, ContactInfo, HandleError
import common.misc as misc
from common.strings import StaticAnswers
from classes.servercontact import ServerContact
from classes.version import Version
from classes.uptime import LastActivity
from classes.xep import XEPRequest
@ -28,11 +29,19 @@ class QueryBot(slixmpp.ClientXMPP):
self.ssl_version = ssl.PROTOCOL_TLSv1_2
self.room = room
self.nick = nick
self.use_message_ids = True
self.functions = {
"!uptime": LastActivity(),
"!contact": ServerContact(),
"!version": Version(),
"!xep": XEPRequest()
}
# session start event, starting point for the presence and roster requests
self.add_event_handler('session_start', self.start)
# register handler to recieve both groupchat and normal message events
# register receive handler for both groupchat and normal message events
self.add_event_handler('message', self.message)
def start(self, event):
@ -43,125 +52,111 @@ class QueryBot(slixmpp.ClientXMPP):
self.get_roster()
# If a room password is needed, use: password=the_room_password
for rooms in self.room.split(sep=","):
self.plugin['xep_0045'].join_muc(rooms, self.nick, wait=True)
if self.room:
for rooms in self.room.split(sep=","):
logging.debug("joining: %s" % rooms)
self.plugin['xep_0045'].join_muc(rooms, self.nick, wait=True)
def validate_domain(self, wordlist, index):
"""
validation method to reduce malformed querys and unnecessary connection attempts
:param wordlist: words separated by " " from the message
:param index: keyword index inside the message
:return: true if valid
"""
# keyword inside the message
argument = wordlist[index]
# check if argument is in the argument list
if argument in StaticAnswers().keys(arg='list'):
# if argument uses a domain check for occurence in list and check domain
if argument in StaticAnswers().keys(arg='list', keyword='domain_keywords'):
try:
target = wordlist[index + 1]
if validators.domain(target):
return True
except IndexError:
# except an IndexError if a keywords is the last word in the message
return False
# check if number keyword is used if true check if target is assignable
elif argument in StaticAnswers().keys(arg='list', keyword='number_keywords'):
try:
target = wordlist[index + 1]
return True
except IndexError:
# except an IndexError if target is not assignable
return False
# check if argument is inside no_arg list
elif argument in StaticAnswers().keys(arg='list', keyword="no_arg_keywords"):
return True
else:
return False
else:
return False
def deduplicate(self, reply):
"""
deduplication method for the result list
:param list reply: list containing strings
:return: list containing unique strings
"""
reply_dedup = list()
for item in reply:
if item not in reply_dedup:
reply_dedup.append(item)
return reply_dedup
@asyncio.coroutine
def message(self, msg):
async def message(self, msg):
"""
:param msg: received message stanza
"""
# init empty reply list
reply = list()
data = {
'words': list(),
'reply': list(),
'queue': list()
}
# catch self messages to prevent self flooding
if msg['mucnick'] == self.nick:
return
elif self.nick in msg['body']:
# add pre predefined text to reply list
reply.append(StaticAnswers(msg['mucnick']).gen_answer())
data['reply'].append(StaticAnswers(msg['mucnick']).gen_answer())
# building the queue
# double splitting to exclude whitespaces
words = " ".join(msg['body'].split()).split(sep=" ")
queue = list()
# check all words in side the message for possible hits
for x in enumerate(words):
# check word for match in keywords list
for y in StaticAnswers().keys(arg='list'):
# if so queue the keyword and the postion in the string
if x[1] == y:
# only add job to queue if domain is valid
if self.validate_domain(words, x[0]):
queue.append({str(y): x[0]})
data = self.build_queue(data, msg)
# queue
for job in queue:
for key in job:
keyword = key
index = job[key]
for job in data['queue']:
keys = list(job.keys())
keyword = keys[0]
if keyword == '!help':
reply.append(StaticAnswers().gen_help())
continue
target = job[keyword][0]
opt_arg = job[keyword][1]
query = None
try:
target = words[index + 1]
if keyword == '!uptime':
last_activity = yield from self['xep_0012'].get_last_activity(target)
reply.append(LastActivity(last_activity, msg, target).format_values())
if keyword == '!help':
data['reply'].append(StaticAnswers().gen_help())
continue
elif keyword == "!version":
version = yield from self['xep_0092'].get_version(target)
reply.append(Version(version, msg, target).format_version())
try:
if keyword == "!uptime":
query = await self['xep_0012'].get_last_activity(jid=target)
elif keyword == "!contact":
contact = yield from self['xep_0030'].get_info(jid=target, cached=False)
reply.append(ContactInfo(contact, msg, target).format_contact())
elif keyword == "!version":
query = await self['xep_0092'].get_version(jid=target)
elif keyword == "!xep":
reply.append(XEPRequest(msg, target).format())
elif keyword == "!contact":
query = await self['xep_0030'].get_info(jid=target, cached=False)
except XMPPError as error:
reply.append(HandleError(error, msg, key, target).build_report())
except XMPPError as error:
logging.info(misc.HandleError(error, keyword, target).report())
data['reply'].append(misc.HandleError(error, keyword, target).report())
continue
data["reply"].append(self.functions[keyword].format(query=query, target=target, opt_arg=opt_arg))
# remove None type from list and send all elements
if list(filter(None.__ne__, reply)) and reply:
reply = self.deduplicate(reply)
if list(filter(None.__ne__, data['reply'])) and data['reply']:
# if msg type is groupchat prepend mucnick
if msg["type"] == "groupchat":
data["reply"][0] = "%s: " % msg["mucnick"] + data["reply"][0]
# reply = misc.deduplicate(data['reply'])
reply = data["reply"]
self.send_message(mto=msg['from'].bare, mbody="\n".join(reply), mtype=msg['type'])
def build_queue(self, data, msg):
# building the queue
# double splitting to exclude whitespaces
data['words'] = " ".join(msg['body'].split()).split(sep=" ")
wordcount = len(data["words"])
# check all words in side the message for possible hits
for x in enumerate(data['words']):
# check for valid keywords
index = x[0]
keyword = x[1]
# match all words starting with ! and member of no_arg_keywords
if keyword.startswith("!") and keyword in StaticAnswers().keys("no_arg_keywords"):
data['queue'].append({keyword: [None, None]})
# matching all words starting with ! and member of keywords
elif keyword.startswith("!") and keyword in StaticAnswers().keys("keywords"):
# init variables to circumvent IndexErrors
target, opt_arg = None, None
# compare to wordcount if assignment is possible
if index + 1 < wordcount:
target = data["words"][index + 1]
if index + 2 < wordcount:
if not data["words"][index + 2].startswith("!"):
opt_arg = data["words"][index + 2]
# only add job to queue if domain is valid
if misc.validate(keyword, target):
logging.debug("Item added to queue %s" % {str(keyword): [target, opt_arg]})
data['queue'].append({str(keyword): [target, opt_arg]})
# deduplicate queue elements
data["queue"] = misc.deduplicate(data["queue"])
return data
if __name__ == '__main__':
# command line arguments.
@ -170,12 +165,12 @@ if __name__ == '__main__':
const=logging.ERROR, default=logging.INFO)
parser.add_argument('-d', '--debug', help='set logging to DEBUG', action='store_const', dest='loglevel',
const=logging.DEBUG, default=logging.INFO)
parser.add_argument('-D', '--dev', help='set logging to console', action='store_const', dest='logfile', const="",
default='bot.log')
parser.add_argument('-D', '--dev', help='set logging to console', action='store_const', dest='logfile',
const="", default='bot.log')
args = parser.parse_args()
# logging
logging.basicConfig(filename=args.logfile, level=args.loglevel, format='%(levelname)-8s %(message)s')
logging.basicConfig(filename=args.logfile, level=args.loglevel, format='%(levelname)s: %(asctime)s: %(message)s')
logger = logging.getLogger(__name__)
# configfile