mirror of
https://github.com/mightyBroccoli/xmpp-chatbot.git
synced 2024-12-04 22:33:36 +01:00
Merge pull request #7 from mightyBroccoli/dev
# added + added more comments to xep requests + added opt_arg to version, xep and contact + implemented data dictionary to hold all data in main bot + added message_ids # changed * updated gitignore file * partly reworked servercontact implementation * complete rework of uptime, version * part rework of xep requests * complete rework of validate function * updated HandleError function * part rework of StaticStrings function * complete rework of queue building and deduplication
This commit is contained in:
commit
ce1af725d7
9 changed files with 435 additions and 259 deletions
4
.gitignore
vendored
4
.gitignore
vendored
|
@ -60,6 +60,6 @@ target/
|
||||||
|
|
||||||
# .idea
|
# .idea
|
||||||
.idea
|
.idea
|
||||||
.etag
|
|
||||||
bot\.cfg
|
bot\.cfg
|
||||||
xeplist.xml
|
common/xeplist.xml
|
||||||
|
common/.etag
|
|
@ -1,108 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
# -*- coding: utf-8 -*-
|
|
||||||
|
|
||||||
# XEP-0072: Server Version
|
|
||||||
class Version:
|
|
||||||
def __init__(self, version, msg, target):
|
|
||||||
self.version = version['software_version']['version']
|
|
||||||
self.os = version['software_version']['os']
|
|
||||||
self.name = version['software_version']['name']
|
|
||||||
self.nick = msg['mucnick']
|
|
||||||
self.message_type = msg['type']
|
|
||||||
self.target = target
|
|
||||||
|
|
||||||
def format_version(self):
|
|
||||||
if self.message_type == "groupchat":
|
|
||||||
text = "%s: %s is running %s version %s on %s" % (self.nick, self.target, self.name, self.version, self.os)
|
|
||||||
else:
|
|
||||||
text = "%s is running %s version %s on %s" % (self.target, self.name, self.version, self.os)
|
|
||||||
|
|
||||||
return text
|
|
||||||
|
|
||||||
|
|
||||||
# XEP-0012: Last Activity
|
|
||||||
class LastActivity:
|
|
||||||
""" query the server uptime of the specified domain, defined by XEP-0012 """
|
|
||||||
def __init__(self, last_activity, msg, target):
|
|
||||||
self.last_activity = last_activity
|
|
||||||
self.nick = msg['mucnick']
|
|
||||||
self.message_type = msg['type']
|
|
||||||
self.target = target
|
|
||||||
|
|
||||||
def format_values(self, granularity=4):
|
|
||||||
seconds = self.last_activity['last_activity']['seconds']
|
|
||||||
uptime = []
|
|
||||||
intervals = (
|
|
||||||
('years', 31536000), # 60 * 60 * 24 * 365
|
|
||||||
('weeks', 604800), # 60 * 60 * 24 * 7
|
|
||||||
('days', 86400), # 60 * 60 * 24
|
|
||||||
('hours', 3600), # 60 * 60
|
|
||||||
('minutes', 60),
|
|
||||||
('seconds', 1)
|
|
||||||
)
|
|
||||||
for name, count in intervals:
|
|
||||||
value = seconds // count
|
|
||||||
if value:
|
|
||||||
seconds -= value * count
|
|
||||||
if value == 1:
|
|
||||||
name = name.rstrip('s')
|
|
||||||
uptime.append("{} {}".format(value, name))
|
|
||||||
result = ' '.join(uptime[:granularity])
|
|
||||||
|
|
||||||
if self.message_type == "groupchat":
|
|
||||||
text = "%s: %s is running since %s" % (self.nick, self.target, result)
|
|
||||||
else:
|
|
||||||
text = "%s is running since %s" % (self.target, result)
|
|
||||||
|
|
||||||
return text
|
|
||||||
|
|
||||||
|
|
||||||
# XEP-0157: Contact Addresses for XMPP Services
|
|
||||||
class ContactInfo:
|
|
||||||
def __init__(self, contact, msg, target):
|
|
||||||
self.contact = contact
|
|
||||||
self.message = msg
|
|
||||||
self.target = target
|
|
||||||
|
|
||||||
def format_contact(self):
|
|
||||||
server_info = []
|
|
||||||
sep = ' , '
|
|
||||||
possible_vars = ['abuse-addresses',
|
|
||||||
'admin-addresses',
|
|
||||||
'feedback-addresses',
|
|
||||||
'sales-addresses',
|
|
||||||
'security-addresses',
|
|
||||||
'support-addresses']
|
|
||||||
|
|
||||||
for field in self.contact['disco_info']['form']:
|
|
||||||
var = field['var']
|
|
||||||
if var in possible_vars:
|
|
||||||
field_value = field.get_value(convert=False)
|
|
||||||
value = sep.join(field_value) if isinstance(field_value, list) else field_value
|
|
||||||
server_info.append(' - %s: %s' % (var, value))
|
|
||||||
|
|
||||||
if server_info:
|
|
||||||
text = "contact addresses for %s are" % self.target
|
|
||||||
for count in range(server_info.__len__()):
|
|
||||||
text += "\n" + server_info[count]
|
|
||||||
else:
|
|
||||||
text = "%s has no contact addresses configured." % self.target
|
|
||||||
|
|
||||||
return text
|
|
||||||
|
|
||||||
|
|
||||||
# class handeling XMPPError exeptions
|
|
||||||
class HandleError:
|
|
||||||
def __init__(self, error, msg, key, target="target missing"):
|
|
||||||
self.error = error
|
|
||||||
self.message = msg
|
|
||||||
self.key = key
|
|
||||||
self.target = target
|
|
||||||
|
|
||||||
def build_report(self):
|
|
||||||
condition = self.error.condition
|
|
||||||
keyword = self.key[1:]
|
|
||||||
|
|
||||||
text = "There was an error requesting " + self.target + '\'s ' + keyword + " : " + condition
|
|
||||||
|
|
||||||
return text
|
|
106
classes/servercontact.py
Normal file
106
classes/servercontact.py
Normal file
|
@ -0,0 +1,106 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
import defusedxml.ElementTree as Et
|
||||||
|
|
||||||
|
|
||||||
|
# XEP-0157: Contact Addresses for XMPP Services
|
||||||
|
class ServerContact:
|
||||||
|
"""
|
||||||
|
plugin to process the server contact addresses from a disco query
|
||||||
|
"""
|
||||||
|
def __init__(self):
|
||||||
|
# init all necessary variables
|
||||||
|
self.possible_vars = ['abuse-addresses',
|
||||||
|
'admin-addresses',
|
||||||
|
'feedback-addresses',
|
||||||
|
'sales-addresses',
|
||||||
|
'security-addresses',
|
||||||
|
'support-addresses']
|
||||||
|
|
||||||
|
self.contact = None
|
||||||
|
self.target, self.opt_arg = None, None
|
||||||
|
|
||||||
|
def opt_arg_abbreviation(self):
|
||||||
|
"""
|
||||||
|
optional argument abbreviation function
|
||||||
|
if the provided string > 2 characters the most likely key will be chosen
|
||||||
|
:return: completes the opt_arg to the most likely one
|
||||||
|
"""
|
||||||
|
# if opt_argument is smaller then 2 pass to prohibit multiple answers
|
||||||
|
if len(self.opt_arg) < 2:
|
||||||
|
pass
|
||||||
|
|
||||||
|
abbr = str(self.opt_arg)
|
||||||
|
possible_abbr = ["abuse-addresses", "admin-addresses", "feedback-addresses", "sales-addresses",
|
||||||
|
"security-addresses", "support-addresses"]
|
||||||
|
|
||||||
|
# searches the best match in the list of possible_abbr and completes the opt_arg to that
|
||||||
|
self.opt_arg = [s for s in possible_abbr if s.startswith(abbr)][0]
|
||||||
|
|
||||||
|
def process(self):
|
||||||
|
# optional argument abbreviation
|
||||||
|
self.opt_arg_abbreviation()
|
||||||
|
|
||||||
|
# get etree from base xml
|
||||||
|
iq = Et.fromstring(str(self.contact))
|
||||||
|
|
||||||
|
# check if query is a valid result query
|
||||||
|
if iq.find('{http://jabber.org/protocol/disco#info}query'):
|
||||||
|
# only init result dict if result query is present
|
||||||
|
result = dict()
|
||||||
|
|
||||||
|
# extract query from iq
|
||||||
|
query = iq.find('{http://jabber.org/protocol/disco#info}query')
|
||||||
|
|
||||||
|
# extract jabber:x:data from query
|
||||||
|
xdata = query.findall('{jabber:x:data}x')
|
||||||
|
|
||||||
|
# iterate over all nodes with the xdata tag
|
||||||
|
for node in xdata:
|
||||||
|
|
||||||
|
# iterate over all child elements in node
|
||||||
|
for child in node:
|
||||||
|
|
||||||
|
# if one opt_arg is defined return just that one
|
||||||
|
if self.opt_arg in self.possible_vars:
|
||||||
|
if child.attrib['var'] == self.opt_arg:
|
||||||
|
# add section to result dict and append info
|
||||||
|
result[child.attrib['var']] = list()
|
||||||
|
for value in child:
|
||||||
|
result[child.attrib['var']].append(value.text)
|
||||||
|
|
||||||
|
# if node has a var attribute that matches our list process
|
||||||
|
elif child.attrib['var'] in self.possible_vars:
|
||||||
|
# add section to result dict and append info
|
||||||
|
result[child.attrib['var']] = list()
|
||||||
|
for value in child:
|
||||||
|
result[child.attrib['var']].append(value.text)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def format(self, query, target, opt_arg):
|
||||||
|
self.contact = query
|
||||||
|
|
||||||
|
self.target = target
|
||||||
|
self.opt_arg = opt_arg
|
||||||
|
|
||||||
|
result = self.process()
|
||||||
|
|
||||||
|
# if result is present continue
|
||||||
|
if result:
|
||||||
|
text = "contact addresses for %s are\n" % self.target
|
||||||
|
|
||||||
|
# if opt_arg is present and member of possible_vars change text line
|
||||||
|
if self.opt_arg in self.possible_vars:
|
||||||
|
text = "%s for %s are\n" % (self.opt_arg, self.target)
|
||||||
|
|
||||||
|
for key in result.keys():
|
||||||
|
addr = ' , '.join(result[key])
|
||||||
|
text += "- %s : %s\n" % (key, addr)
|
||||||
|
else:
|
||||||
|
text = "%s has no contact addresses configured." % self.target
|
||||||
|
|
||||||
|
# if opt_arg is present and member of possible_vars but the key is empty change text line
|
||||||
|
if self.opt_arg in self.possible_vars:
|
||||||
|
text = "%s for %s are not defined." % (self.opt_arg, self.target)
|
||||||
|
|
||||||
|
return text
|
50
classes/uptime.py
Normal file
50
classes/uptime.py
Normal file
|
@ -0,0 +1,50 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
|
||||||
|
# XEP-0012: Last Activity
|
||||||
|
class LastActivity:
|
||||||
|
"""
|
||||||
|
query the server uptime of the specified domain, defined by XEP-0012
|
||||||
|
"""
|
||||||
|
def __init__(self):
|
||||||
|
# init all necessary variables
|
||||||
|
self.last_activity = None
|
||||||
|
self.target, self.opt_arg = None, None
|
||||||
|
|
||||||
|
def process(self, granularity=4):
|
||||||
|
seconds = self.last_activity['last_activity']['seconds']
|
||||||
|
uptime = []
|
||||||
|
|
||||||
|
# touple with displayable time sections
|
||||||
|
intervals = (
|
||||||
|
('years', 31536000), # 60 * 60 * 24 * 365
|
||||||
|
('weeks', 604800), # 60 * 60 * 24 * 7
|
||||||
|
('days', 86400), # 60 * 60 * 24
|
||||||
|
('hours', 3600), # 60 * 60
|
||||||
|
('minutes', 60),
|
||||||
|
('seconds', 1)
|
||||||
|
)
|
||||||
|
|
||||||
|
# for every element in possible time section process the seconds
|
||||||
|
for name, count in intervals:
|
||||||
|
value = seconds // count
|
||||||
|
if value:
|
||||||
|
seconds -= value * count
|
||||||
|
if value == 1:
|
||||||
|
name = name.rstrip('s')
|
||||||
|
uptime.append("{} {}".format(value, name))
|
||||||
|
result = ' '.join(uptime[:granularity])
|
||||||
|
|
||||||
|
# insert values into result string
|
||||||
|
text = "%s is running since %s" % (self.target, result)
|
||||||
|
|
||||||
|
return text
|
||||||
|
|
||||||
|
def format(self, query, target, opt_arg):
|
||||||
|
self.last_activity = query
|
||||||
|
|
||||||
|
self.target = target
|
||||||
|
self.opt_arg = opt_arg
|
||||||
|
|
||||||
|
reply = self.process()
|
||||||
|
return reply
|
39
classes/version.py
Normal file
39
classes/version.py
Normal file
|
@ -0,0 +1,39 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
|
||||||
|
# XEP-0072: Server Version
|
||||||
|
class Version:
|
||||||
|
"""
|
||||||
|
process and format a version query
|
||||||
|
"""
|
||||||
|
def __init__(self):
|
||||||
|
# init all necessary variables
|
||||||
|
self.software_version = None
|
||||||
|
self.target, self.opt_arg = None, None
|
||||||
|
|
||||||
|
def format_result(self):
|
||||||
|
# list of all possible opt_arg
|
||||||
|
possible_opt_args = ["version", "os", "name"]
|
||||||
|
|
||||||
|
name = self.software_version['name']
|
||||||
|
version = self.software_version['version']
|
||||||
|
os = self.software_version['os']
|
||||||
|
|
||||||
|
# if opt_arg is given member of possible_opt_args list return that element
|
||||||
|
if self.opt_arg in possible_opt_args:
|
||||||
|
text = "%s: %s" % (self.opt_arg, self.software_version[self.opt_arg])
|
||||||
|
|
||||||
|
# otherwise return full version string
|
||||||
|
else:
|
||||||
|
text = "%s is running %s version %s on %s" % (self.target, name, version, os)
|
||||||
|
|
||||||
|
return text
|
||||||
|
|
||||||
|
def format(self, query, target, opt_arg):
|
||||||
|
self.software_version = query['software_version']
|
||||||
|
|
||||||
|
self.target = target
|
||||||
|
self.opt_arg = opt_arg
|
||||||
|
|
||||||
|
reply = self.format_result()
|
||||||
|
return reply
|
|
@ -1,19 +1,17 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
import os
|
||||||
import requests
|
import requests
|
||||||
import defusedxml.ElementTree as ET
|
import defusedxml.ElementTree as Et
|
||||||
|
|
||||||
|
|
||||||
class XEPRequest:
|
class XEPRequest:
|
||||||
def __init__(self, msg, xepnumber):
|
"""
|
||||||
"""
|
class which requests the header of the referenced xep
|
||||||
class which requests the header of the referenced xep
|
"""
|
||||||
:param xepnumber: number int or str to request the xep for
|
def __init__(self):
|
||||||
"""
|
# init all necessary variables
|
||||||
self.message_type = msg['type']
|
self.reqxep, self.opt_arg = None, None
|
||||||
self.muc_nick = msg['mucnick']
|
|
||||||
|
|
||||||
self.reqxep = str(xepnumber)
|
|
||||||
self.xeplist = None
|
self.xeplist = None
|
||||||
self.acceptedxeps = list()
|
self.acceptedxeps = list()
|
||||||
|
|
||||||
|
@ -21,30 +19,40 @@ class XEPRequest:
|
||||||
"""
|
"""
|
||||||
query and save the current xep list to reduce network bandwidth
|
query and save the current xep list to reduce network bandwidth
|
||||||
"""
|
"""
|
||||||
try:
|
# check if etag header is present if not set local_etag to ""
|
||||||
with open(".etag") as file:
|
if os.path.isfile("./common/.etag"):
|
||||||
|
with open("./common/.etag") as file:
|
||||||
local_etag = file.read()
|
local_etag = file.read()
|
||||||
except FileNotFoundError:
|
else:
|
||||||
local_etag = ""
|
local_etag = ""
|
||||||
|
|
||||||
with requests.Session() as s:
|
with requests.Session() as s:
|
||||||
|
# head request the xeplist.xml
|
||||||
s.headers.update({'Accept': 'application/xml'})
|
s.headers.update({'Accept': 'application/xml'})
|
||||||
head = s.head("https://xmpp.org/extensions/xeplist.xml")
|
head = s.head("https://xmpp.org/extensions/xeplist.xml")
|
||||||
etag = head.headers['etag']
|
etag = head.headers['etag']
|
||||||
|
|
||||||
|
# compare etag with local_etag if they match up no request is made
|
||||||
if local_etag == etag:
|
if local_etag == etag:
|
||||||
with open("xeplist.xml", "r") as file:
|
with open("./common/xeplist.xml", "r") as file:
|
||||||
self.xeplist = ET.fromstring(file.read())
|
self.xeplist = Et.fromstring(file.read())
|
||||||
|
|
||||||
|
# if the connection is not possible use cached xml if present
|
||||||
|
elif os.path.isfile("./common/xeplist.xml") and head.status_code != 200:
|
||||||
|
with open("./common/xeplist.xml", "r") as file:
|
||||||
|
self.xeplist = Et.fromstring(file.read())
|
||||||
|
|
||||||
|
# in any other case request the latest xml
|
||||||
else:
|
else:
|
||||||
r = s.get("https://xmpp.org/extensions/xeplist.xml")
|
r = s.get("https://xmpp.org/extensions/xeplist.xml")
|
||||||
r.encoding = 'utf-8'
|
r.encoding = 'utf-8'
|
||||||
local_etag = head.headers['etag']
|
local_etag = head.headers['etag']
|
||||||
|
|
||||||
with open("xeplist.xml", "w") as file:
|
with open("./common/xeplist.xml", "w") as file:
|
||||||
file.write(r.content.decode())
|
file.write(r.content.decode())
|
||||||
self.xeplist = ET.fromstring(r.content.decode())
|
self.xeplist = Et.fromstring(r.content.decode())
|
||||||
|
|
||||||
with open('.etag', 'w') as string:
|
with open('./common/.etag', 'w') as string:
|
||||||
string.write(local_etag)
|
string.write(local_etag)
|
||||||
|
|
||||||
# populate xep comparison list
|
# populate xep comparison list
|
||||||
|
@ -54,34 +62,57 @@ class XEPRequest:
|
||||||
def get(self):
|
def get(self):
|
||||||
"""
|
"""
|
||||||
function to query the xep entry if xepnumber is present in xeplist
|
function to query the xep entry if xepnumber is present in xeplist
|
||||||
:return: nicely formatted xep header information
|
:return: formatted xep header information
|
||||||
"""
|
"""
|
||||||
|
# all possible subtags grouped by location
|
||||||
|
last_revision_tags = ["date", "version", "initials", "remark"]
|
||||||
|
xep_tags = ["number", "title", "abstract", "type", "status", "approver", "shortname", "sig", "lastcall", "date", "version", "initials", "remark"]
|
||||||
|
|
||||||
# check if xeplist is accurate
|
# check if xeplist is accurate
|
||||||
self.req_xeplist()
|
self.req_xeplist()
|
||||||
|
|
||||||
result = list()
|
result = list()
|
||||||
# if requested number is inside acceptedxeps continou
|
# if requested number is member of acceptedxeps continue
|
||||||
if self.reqxep in self.acceptedxeps:
|
if str(self.reqxep) in self.acceptedxeps:
|
||||||
searchstring = ".//*[@accepted='true']/[number='%s']" % self.reqxep
|
searchstring = ".//*[@accepted='true']/[number='%s']" % self.reqxep
|
||||||
|
|
||||||
for item in self.xeplist.findall(searchstring):
|
for item in self.xeplist.findall(searchstring):
|
||||||
for x in range(1,5):
|
# if the opt_arg references is member of xeptag return only that tag
|
||||||
result.append(item[x].tag + " : " + item[x].text)
|
if self.opt_arg in xep_tags:
|
||||||
|
|
||||||
|
# if the opt_arg references is member of last_revision_tags return only that subtag
|
||||||
|
if self.opt_arg in last_revision_tags:
|
||||||
|
query = item.find("last-revision").find(self.opt_arg)
|
||||||
|
else:
|
||||||
|
query = item.find(self.opt_arg)
|
||||||
|
|
||||||
|
# append opt_arg results to the result list
|
||||||
|
if query is not None:
|
||||||
|
result.append("%s : %s" % (query.tag, query.text))
|
||||||
|
else:
|
||||||
|
result.append("%s does not have a %s tag." % (self.reqxep, self.opt_arg))
|
||||||
|
|
||||||
|
# in any other case return the general answer
|
||||||
|
else:
|
||||||
|
result_opts = ["title", "type", "abstract", "status"]
|
||||||
|
for tag in result_opts:
|
||||||
|
result.append(item.find(tag).text)
|
||||||
|
|
||||||
|
# if the requested number is no member of acceptedxeps and/or not accepted return error.
|
||||||
else:
|
else:
|
||||||
if self.message_type == "groupchat":
|
result.append("XEP-%s : is not available." % self.reqxep)
|
||||||
result.append(self.muc_nick + " : " + "XEP-" + str(self.reqxep) + " : is not available.")
|
|
||||||
else:
|
|
||||||
result.append("XEP-" + str(self.reqxep) + " : is not available.")
|
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def format(self):
|
def format(self, query, target, opt_arg):
|
||||||
|
"""
|
||||||
|
:param target: number int or str to request the xep for
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
self.reqxep = int(target)
|
||||||
|
self.opt_arg = opt_arg
|
||||||
|
|
||||||
reply = self.get()
|
reply = self.get()
|
||||||
if self.message_type == "groupchat":
|
|
||||||
text = "%s: " % self.muc_nick
|
|
||||||
reply[0] = text + reply[0]
|
|
||||||
|
|
||||||
text = '\n'.join(reply)
|
text = '\n'.join(reply)
|
||||||
|
|
||||||
return text
|
return text
|
||||||
|
|
62
common/misc.py
Executable file
62
common/misc.py
Executable file
|
@ -0,0 +1,62 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
import validators
|
||||||
|
from common.strings import StaticAnswers
|
||||||
|
|
||||||
|
|
||||||
|
def deduplicate(reply):
|
||||||
|
"""
|
||||||
|
list deduplication method
|
||||||
|
:param list reply: list containing non unique items
|
||||||
|
:return: list containing unique items
|
||||||
|
"""
|
||||||
|
reply_dedup = list()
|
||||||
|
for item in reply:
|
||||||
|
if item not in reply_dedup:
|
||||||
|
reply_dedup.append(item)
|
||||||
|
|
||||||
|
return reply_dedup
|
||||||
|
|
||||||
|
|
||||||
|
def validate(keyword, target):
|
||||||
|
"""
|
||||||
|
validation method to reduce malformed querys and unnecessary connection attempts
|
||||||
|
:param keyword: used keyword
|
||||||
|
:param target: provided target
|
||||||
|
:return: true if valid
|
||||||
|
"""
|
||||||
|
# if keyword in domain_keywords list
|
||||||
|
if keyword in StaticAnswers().keys('domain_keywords'):
|
||||||
|
# if target is a domain / email return True
|
||||||
|
if validators.domain(target) or validators.email(target):
|
||||||
|
return True
|
||||||
|
|
||||||
|
# check if keyword is in number_keyword list
|
||||||
|
elif keyword in StaticAnswers().keys('number_keywords'):
|
||||||
|
# if target only consists of digits return True
|
||||||
|
return target.isdigit()
|
||||||
|
|
||||||
|
# if keyword is in no_arg_keywords list return True
|
||||||
|
elif keyword in StaticAnswers().keys("no_arg_keywords"):
|
||||||
|
return True
|
||||||
|
|
||||||
|
# if the target could not be validated until this return False
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
#
|
||||||
|
class HandleError:
|
||||||
|
"""
|
||||||
|
simple XMPP error / exception class formating the error condition
|
||||||
|
"""
|
||||||
|
def __init__(self, error, key, target):
|
||||||
|
# init all necessary variables
|
||||||
|
self.text = error.text
|
||||||
|
self.condition = error.condition
|
||||||
|
self.key = key
|
||||||
|
self.target = target
|
||||||
|
|
||||||
|
def report(self):
|
||||||
|
# return the formatted result string to the user
|
||||||
|
text = "%s. %s %s resulted in: %s" % (self.text, self.key, self.target, self.condition)
|
||||||
|
|
||||||
|
return text
|
|
@ -13,11 +13,13 @@ class StaticAnswers:
|
||||||
'version': '!version domain.tld -- receive XMPP server version',
|
'version': '!version domain.tld -- receive XMPP server version',
|
||||||
'uptime': '!uptime domain.tld -- receive XMPP server uptime',
|
'uptime': '!uptime domain.tld -- receive XMPP server uptime',
|
||||||
'contact': '!contact domain.tld -- receive XMPP server contact address info',
|
'contact': '!contact domain.tld -- receive XMPP server contact address info',
|
||||||
'xep': '!xep XEP Number -- recieve information about the specified XEP'}
|
'xep': '!xep XEP Number -- recieve information about the specified XEP'
|
||||||
|
}
|
||||||
self.possible_answers = {
|
self.possible_answers = {
|
||||||
'1': 'I heard that, %s.',
|
'1': 'I heard that, %s.',
|
||||||
'2': 'I am sorry for that %s.',
|
'2': 'I am sorry for that %s.',
|
||||||
'3': '%s did you try turning it off and on again?'}
|
'3': '%s did you try turning it off and on again?'
|
||||||
|
}
|
||||||
self.error_messages = {
|
self.error_messages = {
|
||||||
'1': 'not reachable',
|
'1': 'not reachable',
|
||||||
'2': 'not a valid target'
|
'2': 'not a valid target'
|
||||||
|
@ -29,14 +31,13 @@ class StaticAnswers:
|
||||||
"number_keywords": ["!xep"]
|
"number_keywords": ["!xep"]
|
||||||
}
|
}
|
||||||
|
|
||||||
def keys(self, arg="", keyword='keywords'):
|
def keys(self, key=""):
|
||||||
if arg == 'list':
|
# if specific keyword in referenced return that
|
||||||
try:
|
if key in self.keywords.keys():
|
||||||
return self.keywords[keyword]
|
return self.keywords[key]
|
||||||
except KeyError:
|
|
||||||
return self.keywords['keywords']
|
# in any other case return the whole dict
|
||||||
else:
|
return self.keywords["keywords"]
|
||||||
return self.keywords
|
|
||||||
|
|
||||||
def gen_help(self):
|
def gen_help(self):
|
||||||
helpdoc = "\n".join(['%s' % value for (_, value) in self.helpfile.items()])
|
helpdoc = "\n".join(['%s' % value for (_, value) in self.helpfile.items()])
|
209
main.py
Normal file → Executable file
209
main.py
Normal file → Executable file
|
@ -1,24 +1,25 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
"""
|
"""
|
||||||
Slixmpp: The Slick XMPP Library
|
James the MagicXMPP Bot
|
||||||
Copyright (C) 2010 Nathanael C. Fritz
|
build with Slick XMPP Library
|
||||||
This file is part of Slixmpp.
|
Copyright (C) 2018 Nico Wellpott
|
||||||
|
|
||||||
See the file LICENSE for copying permission.
|
See the file LICENSE for copying permission.
|
||||||
"""
|
"""
|
||||||
import asyncio
|
|
||||||
import slixmpp
|
import slixmpp
|
||||||
import ssl
|
import ssl
|
||||||
import validators
|
|
||||||
import configparser
|
import configparser
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from argparse import ArgumentParser
|
from argparse import ArgumentParser
|
||||||
from slixmpp.exceptions import XMPPError
|
from slixmpp.exceptions import XMPPError
|
||||||
|
|
||||||
from classes.strings import StaticAnswers
|
import common.misc as misc
|
||||||
from classes.functions import Version, LastActivity, ContactInfo, HandleError
|
from common.strings import StaticAnswers
|
||||||
|
from classes.servercontact import ServerContact
|
||||||
|
from classes.version import Version
|
||||||
|
from classes.uptime import LastActivity
|
||||||
from classes.xep import XEPRequest
|
from classes.xep import XEPRequest
|
||||||
|
|
||||||
|
|
||||||
|
@ -28,11 +29,19 @@ class QueryBot(slixmpp.ClientXMPP):
|
||||||
self.ssl_version = ssl.PROTOCOL_TLSv1_2
|
self.ssl_version = ssl.PROTOCOL_TLSv1_2
|
||||||
self.room = room
|
self.room = room
|
||||||
self.nick = nick
|
self.nick = nick
|
||||||
|
self.use_message_ids = True
|
||||||
|
|
||||||
|
self.functions = {
|
||||||
|
"!uptime": LastActivity(),
|
||||||
|
"!contact": ServerContact(),
|
||||||
|
"!version": Version(),
|
||||||
|
"!xep": XEPRequest()
|
||||||
|
}
|
||||||
|
|
||||||
# session start event, starting point for the presence and roster requests
|
# session start event, starting point for the presence and roster requests
|
||||||
self.add_event_handler('session_start', self.start)
|
self.add_event_handler('session_start', self.start)
|
||||||
|
|
||||||
# register handler to recieve both groupchat and normal message events
|
# register receive handler for both groupchat and normal message events
|
||||||
self.add_event_handler('message', self.message)
|
self.add_event_handler('message', self.message)
|
||||||
|
|
||||||
def start(self, event):
|
def start(self, event):
|
||||||
|
@ -43,125 +52,111 @@ class QueryBot(slixmpp.ClientXMPP):
|
||||||
self.get_roster()
|
self.get_roster()
|
||||||
|
|
||||||
# If a room password is needed, use: password=the_room_password
|
# If a room password is needed, use: password=the_room_password
|
||||||
for rooms in self.room.split(sep=","):
|
if self.room:
|
||||||
self.plugin['xep_0045'].join_muc(rooms, self.nick, wait=True)
|
for rooms in self.room.split(sep=","):
|
||||||
|
logging.debug("joining: %s" % rooms)
|
||||||
|
self.plugin['xep_0045'].join_muc(rooms, self.nick, wait=True)
|
||||||
|
|
||||||
def validate_domain(self, wordlist, index):
|
async def message(self, msg):
|
||||||
"""
|
|
||||||
validation method to reduce malformed querys and unnecessary connection attempts
|
|
||||||
:param wordlist: words separated by " " from the message
|
|
||||||
:param index: keyword index inside the message
|
|
||||||
:return: true if valid
|
|
||||||
"""
|
|
||||||
# keyword inside the message
|
|
||||||
argument = wordlist[index]
|
|
||||||
|
|
||||||
# check if argument is in the argument list
|
|
||||||
if argument in StaticAnswers().keys(arg='list'):
|
|
||||||
# if argument uses a domain check for occurence in list and check domain
|
|
||||||
if argument in StaticAnswers().keys(arg='list', keyword='domain_keywords'):
|
|
||||||
try:
|
|
||||||
target = wordlist[index + 1]
|
|
||||||
if validators.domain(target):
|
|
||||||
return True
|
|
||||||
except IndexError:
|
|
||||||
# except an IndexError if a keywords is the last word in the message
|
|
||||||
return False
|
|
||||||
|
|
||||||
# check if number keyword is used if true check if target is assignable
|
|
||||||
elif argument in StaticAnswers().keys(arg='list', keyword='number_keywords'):
|
|
||||||
try:
|
|
||||||
target = wordlist[index + 1]
|
|
||||||
return True
|
|
||||||
except IndexError:
|
|
||||||
# except an IndexError if target is not assignable
|
|
||||||
return False
|
|
||||||
# check if argument is inside no_arg list
|
|
||||||
elif argument in StaticAnswers().keys(arg='list', keyword="no_arg_keywords"):
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
def deduplicate(self, reply):
|
|
||||||
"""
|
|
||||||
deduplication method for the result list
|
|
||||||
:param list reply: list containing strings
|
|
||||||
:return: list containing unique strings
|
|
||||||
"""
|
|
||||||
reply_dedup = list()
|
|
||||||
for item in reply:
|
|
||||||
if item not in reply_dedup:
|
|
||||||
reply_dedup.append(item)
|
|
||||||
|
|
||||||
return reply_dedup
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
|
||||||
def message(self, msg):
|
|
||||||
"""
|
"""
|
||||||
:param msg: received message stanza
|
:param msg: received message stanza
|
||||||
"""
|
"""
|
||||||
# init empty reply list
|
data = {
|
||||||
reply = list()
|
'words': list(),
|
||||||
|
'reply': list(),
|
||||||
|
'queue': list()
|
||||||
|
}
|
||||||
|
|
||||||
# catch self messages to prevent self flooding
|
# catch self messages to prevent self flooding
|
||||||
if msg['mucnick'] == self.nick:
|
if msg['mucnick'] == self.nick:
|
||||||
return
|
return
|
||||||
|
|
||||||
elif self.nick in msg['body']:
|
elif self.nick in msg['body']:
|
||||||
# add pre predefined text to reply list
|
# add pre predefined text to reply list
|
||||||
reply.append(StaticAnswers(msg['mucnick']).gen_answer())
|
data['reply'].append(StaticAnswers(msg['mucnick']).gen_answer())
|
||||||
|
|
||||||
# building the queue
|
data = self.build_queue(data, msg)
|
||||||
# double splitting to exclude whitespaces
|
|
||||||
words = " ".join(msg['body'].split()).split(sep=" ")
|
|
||||||
queue = list()
|
|
||||||
|
|
||||||
# check all words in side the message for possible hits
|
|
||||||
for x in enumerate(words):
|
|
||||||
# check word for match in keywords list
|
|
||||||
for y in StaticAnswers().keys(arg='list'):
|
|
||||||
# if so queue the keyword and the postion in the string
|
|
||||||
if x[1] == y:
|
|
||||||
# only add job to queue if domain is valid
|
|
||||||
if self.validate_domain(words, x[0]):
|
|
||||||
queue.append({str(y): x[0]})
|
|
||||||
|
|
||||||
# queue
|
# queue
|
||||||
for job in queue:
|
for job in data['queue']:
|
||||||
for key in job:
|
keys = list(job.keys())
|
||||||
keyword = key
|
keyword = keys[0]
|
||||||
index = job[key]
|
|
||||||
|
|
||||||
if keyword == '!help':
|
target = job[keyword][0]
|
||||||
reply.append(StaticAnswers().gen_help())
|
opt_arg = job[keyword][1]
|
||||||
continue
|
query = None
|
||||||
|
|
||||||
try:
|
if keyword == '!help':
|
||||||
target = words[index + 1]
|
data['reply'].append(StaticAnswers().gen_help())
|
||||||
if keyword == '!uptime':
|
continue
|
||||||
last_activity = yield from self['xep_0012'].get_last_activity(target)
|
|
||||||
reply.append(LastActivity(last_activity, msg, target).format_values())
|
|
||||||
|
|
||||||
elif keyword == "!version":
|
try:
|
||||||
version = yield from self['xep_0092'].get_version(target)
|
if keyword == "!uptime":
|
||||||
reply.append(Version(version, msg, target).format_version())
|
query = await self['xep_0012'].get_last_activity(jid=target)
|
||||||
|
|
||||||
elif keyword == "!contact":
|
elif keyword == "!version":
|
||||||
contact = yield from self['xep_0030'].get_info(jid=target, cached=False)
|
query = await self['xep_0092'].get_version(jid=target)
|
||||||
reply.append(ContactInfo(contact, msg, target).format_contact())
|
|
||||||
|
|
||||||
elif keyword == "!xep":
|
elif keyword == "!contact":
|
||||||
reply.append(XEPRequest(msg, target).format())
|
query = await self['xep_0030'].get_info(jid=target, cached=False)
|
||||||
|
|
||||||
except XMPPError as error:
|
except XMPPError as error:
|
||||||
reply.append(HandleError(error, msg, key, target).build_report())
|
logging.info(misc.HandleError(error, keyword, target).report())
|
||||||
|
data['reply'].append(misc.HandleError(error, keyword, target).report())
|
||||||
|
continue
|
||||||
|
|
||||||
|
data["reply"].append(self.functions[keyword].format(query=query, target=target, opt_arg=opt_arg))
|
||||||
|
|
||||||
# remove None type from list and send all elements
|
# remove None type from list and send all elements
|
||||||
if list(filter(None.__ne__, reply)) and reply:
|
if list(filter(None.__ne__, data['reply'])) and data['reply']:
|
||||||
reply = self.deduplicate(reply)
|
|
||||||
|
# if msg type is groupchat prepend mucnick
|
||||||
|
if msg["type"] == "groupchat":
|
||||||
|
data["reply"][0] = "%s: " % msg["mucnick"] + data["reply"][0]
|
||||||
|
|
||||||
|
# reply = misc.deduplicate(data['reply'])
|
||||||
|
reply = data["reply"]
|
||||||
self.send_message(mto=msg['from'].bare, mbody="\n".join(reply), mtype=msg['type'])
|
self.send_message(mto=msg['from'].bare, mbody="\n".join(reply), mtype=msg['type'])
|
||||||
|
|
||||||
|
def build_queue(self, data, msg):
|
||||||
|
# building the queue
|
||||||
|
# double splitting to exclude whitespaces
|
||||||
|
data['words'] = " ".join(msg['body'].split()).split(sep=" ")
|
||||||
|
wordcount = len(data["words"])
|
||||||
|
|
||||||
|
# check all words in side the message for possible hits
|
||||||
|
for x in enumerate(data['words']):
|
||||||
|
# check for valid keywords
|
||||||
|
index = x[0]
|
||||||
|
keyword = x[1]
|
||||||
|
|
||||||
|
# match all words starting with ! and member of no_arg_keywords
|
||||||
|
if keyword.startswith("!") and keyword in StaticAnswers().keys("no_arg_keywords"):
|
||||||
|
data['queue'].append({keyword: [None, None]})
|
||||||
|
|
||||||
|
# matching all words starting with ! and member of keywords
|
||||||
|
elif keyword.startswith("!") and keyword in StaticAnswers().keys("keywords"):
|
||||||
|
# init variables to circumvent IndexErrors
|
||||||
|
target, opt_arg = None, None
|
||||||
|
|
||||||
|
# compare to wordcount if assignment is possible
|
||||||
|
if index + 1 < wordcount:
|
||||||
|
target = data["words"][index + 1]
|
||||||
|
|
||||||
|
if index + 2 < wordcount:
|
||||||
|
if not data["words"][index + 2].startswith("!"):
|
||||||
|
opt_arg = data["words"][index + 2]
|
||||||
|
|
||||||
|
# only add job to queue if domain is valid
|
||||||
|
if misc.validate(keyword, target):
|
||||||
|
logging.debug("Item added to queue %s" % {str(keyword): [target, opt_arg]})
|
||||||
|
data['queue'].append({str(keyword): [target, opt_arg]})
|
||||||
|
|
||||||
|
# deduplicate queue elements
|
||||||
|
data["queue"] = misc.deduplicate(data["queue"])
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
# command line arguments.
|
# command line arguments.
|
||||||
|
@ -170,12 +165,12 @@ if __name__ == '__main__':
|
||||||
const=logging.ERROR, default=logging.INFO)
|
const=logging.ERROR, default=logging.INFO)
|
||||||
parser.add_argument('-d', '--debug', help='set logging to DEBUG', action='store_const', dest='loglevel',
|
parser.add_argument('-d', '--debug', help='set logging to DEBUG', action='store_const', dest='loglevel',
|
||||||
const=logging.DEBUG, default=logging.INFO)
|
const=logging.DEBUG, default=logging.INFO)
|
||||||
parser.add_argument('-D', '--dev', help='set logging to console', action='store_const', dest='logfile', const="",
|
parser.add_argument('-D', '--dev', help='set logging to console', action='store_const', dest='logfile',
|
||||||
default='bot.log')
|
const="", default='bot.log')
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
# logging
|
# logging
|
||||||
logging.basicConfig(filename=args.logfile, level=args.loglevel, format='%(levelname)-8s %(message)s')
|
logging.basicConfig(filename=args.logfile, level=args.loglevel, format='%(levelname)s: %(asctime)s: %(message)s')
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# configfile
|
# configfile
|
||||||
|
|
Loading…
Reference in a new issue