mirror of
https://github.com/mightyBroccoli/xmpp-chatbot.git
synced 2024-12-04 22:33:36 +01:00
commit
1987753e3f
5 changed files with 127 additions and 17 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -60,4 +60,6 @@ target/
|
|||
|
||||
# .idea
|
||||
.idea
|
||||
.etag
|
||||
bot\.cfg
|
||||
xeplist.xml
|
||||
|
|
|
@ -93,7 +93,7 @@ class ContactInfo:
|
|||
|
||||
# class handeling XMPPError exeptions
|
||||
class HandleError:
|
||||
def __init__(self, error, msg, key, target):
|
||||
def __init__(self, error, msg, key, target="target missing"):
|
||||
self.error = error
|
||||
self.message = msg
|
||||
self.key = key
|
||||
|
|
|
@ -12,7 +12,8 @@ class StaticAnswers:
|
|||
'help': '!help -- display this text',
|
||||
'version': '!version domain.tld -- receive XMPP server version',
|
||||
'uptime': '!uptime domain.tld -- receive XMPP server uptime',
|
||||
'contact': '!contact domain.tld -- receive XMPP server contact address info'}
|
||||
'contact': '!contact domain.tld -- receive XMPP server contact address info',
|
||||
'xep': '!xep XEP Number -- recieve information about the specified XEP'}
|
||||
self.possible_answers = {
|
||||
'1': 'I heard that, %s.',
|
||||
'2': 'I am sorry for that %s.',
|
||||
|
@ -22,8 +23,10 @@ class StaticAnswers:
|
|||
'2': 'not a valid target'
|
||||
}
|
||||
self.keywords = {
|
||||
"keywords": ["!help", "!uptime", "!version", "!contact"],
|
||||
"no_arg_keywords": ["!help"]
|
||||
"keywords": ["!help", "!uptime", "!version", "!contact", "!xep"],
|
||||
"domain_keywords": ["!uptime", "!version", "!contact"],
|
||||
"no_arg_keywords": ["!help"],
|
||||
"number_keywords": ["!xep"]
|
||||
}
|
||||
|
||||
def keys(self, arg="", keyword='keywords'):
|
||||
|
|
87
classes/xep.py
Normal file
87
classes/xep.py
Normal file
|
@ -0,0 +1,87 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
import requests
|
||||
import defusedxml.ElementTree as ET
|
||||
|
||||
|
||||
class XEPRequest:
|
||||
def __init__(self, msg, xepnumber):
|
||||
"""
|
||||
class which requests the header of the referenced xep
|
||||
:param xepnumber: number int or str to request the xep for
|
||||
"""
|
||||
self.message_type = msg['type']
|
||||
self.muc_nick = msg['mucnick']
|
||||
|
||||
self.reqxep = str(xepnumber)
|
||||
self.xeplist = None
|
||||
self.acceptedxeps = list()
|
||||
|
||||
def req_xeplist(self):
|
||||
"""
|
||||
query and save the current xep list to reduce network bandwidth
|
||||
"""
|
||||
try:
|
||||
with open(".etag") as file:
|
||||
local_etag = file.read()
|
||||
except FileNotFoundError:
|
||||
local_etag = ""
|
||||
|
||||
with requests.Session() as s:
|
||||
s.headers.update({'Accept': 'application/xml'})
|
||||
head = s.head("https://xmpp.org/extensions/xeplist.xml")
|
||||
etag = head.headers['etag']
|
||||
|
||||
if local_etag == etag:
|
||||
with open("xeplist.xml", "r") as file:
|
||||
self.xeplist = ET.fromstring(file.read())
|
||||
else:
|
||||
r = s.get("https://xmpp.org/extensions/xeplist.xml")
|
||||
r.encoding = 'utf-8'
|
||||
local_etag = head.headers['etag']
|
||||
|
||||
with open("xeplist.xml", "w") as file:
|
||||
file.write(r.content.decode())
|
||||
self.xeplist = ET.fromstring(r.content.decode())
|
||||
|
||||
with open('.etag', 'w') as string:
|
||||
string.write(local_etag)
|
||||
|
||||
# populate xep comparison list
|
||||
for xep in self.xeplist.findall(".//*[@accepted='true']/number"):
|
||||
self.acceptedxeps.append(xep.text)
|
||||
|
||||
def get(self):
|
||||
"""
|
||||
function to query the xep entry if xepnumber is present in xeplist
|
||||
:return: nicely formatted xep header information
|
||||
"""
|
||||
# check if xeplist is accurate
|
||||
self.req_xeplist()
|
||||
|
||||
result = list()
|
||||
# if requested number is inside acceptedxeps continou
|
||||
if self.reqxep in self.acceptedxeps:
|
||||
searchstring = ".//*[@accepted='true']/[number='%s']" % self.reqxep
|
||||
|
||||
for item in self.xeplist.findall(searchstring):
|
||||
for x in range(1,5):
|
||||
result.append(item[x].tag + " : " + item[x].text)
|
||||
|
||||
else:
|
||||
if self.message_type == "groupchat":
|
||||
result.append(self.muc_nick + " : " + "XEP-" + str(self.reqxep) + " : is not available.")
|
||||
else:
|
||||
result.append("XEP-" + str(self.reqxep) + " : is not available.")
|
||||
|
||||
return result
|
||||
|
||||
def format(self):
|
||||
reply = self.get()
|
||||
if self.message_type == "groupchat":
|
||||
text = "%s: " % self.muc_nick
|
||||
reply[0] = text + reply[0]
|
||||
|
||||
text = '\n'.join(reply)
|
||||
|
||||
return text
|
30
main.py
30
main.py
|
@ -19,6 +19,7 @@ from slixmpp.exceptions import XMPPError
|
|||
|
||||
from classes.strings import StaticAnswers
|
||||
from classes.functions import Version, LastActivity, ContactInfo, HandleError
|
||||
from classes.xep import XEPRequest
|
||||
|
||||
|
||||
class QueryBot(slixmpp.ClientXMPP):
|
||||
|
@ -36,7 +37,7 @@ class QueryBot(slixmpp.ClientXMPP):
|
|||
|
||||
def start(self, event):
|
||||
"""
|
||||
:param str event -- An empty dictionary. The session_start event does not provide any additional data.
|
||||
:param event -- An empty dictionary. The session_start event does not provide any additional data.
|
||||
"""
|
||||
self.send_presence()
|
||||
self.get_roster()
|
||||
|
@ -47,16 +48,18 @@ class QueryBot(slixmpp.ClientXMPP):
|
|||
|
||||
def validate_domain(self, wordlist, index):
|
||||
"""
|
||||
validation method to reduce connection attemps to unvalid domains
|
||||
:param wordlist: words seperated by " " from the message
|
||||
validation method to reduce malformed querys and unnecessary connection attempts
|
||||
:param wordlist: words separated by " " from the message
|
||||
:param index: keyword index inside the message
|
||||
:return: true if valid
|
||||
"""
|
||||
# keyword inside the message
|
||||
argument = wordlist[index]
|
||||
|
||||
# if the argument is not inside the no_arg_keywords target is index + 1
|
||||
if argument not in StaticAnswers().keys(arg='list', keyword="no_arg_keywords"):
|
||||
# check if argument is in the argument list
|
||||
if argument in StaticAnswers().keys(arg='list'):
|
||||
# if argument uses a domain check for occurence in list and check domain
|
||||
if argument in StaticAnswers().keys(arg='list', keyword='domain_keywords'):
|
||||
try:
|
||||
target = wordlist[index + 1]
|
||||
if validators.domain(target):
|
||||
|
@ -64,10 +67,22 @@ class QueryBot(slixmpp.ClientXMPP):
|
|||
except IndexError:
|
||||
# except an IndexError if a keywords is the last word in the message
|
||||
return False
|
||||
|
||||
# check if number keyword is used if true check if target is assignable
|
||||
elif argument in StaticAnswers().keys(arg='list', keyword='number_keywords'):
|
||||
try:
|
||||
target = wordlist[index + 1]
|
||||
return True
|
||||
except IndexError:
|
||||
# except an IndexError if target is not assignable
|
||||
return False
|
||||
# check if argument is inside no_arg list
|
||||
elif argument in StaticAnswers().keys(arg='list', keyword="no_arg_keywords"):
|
||||
return True
|
||||
else:
|
||||
return
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
|
||||
def deduplicate(self, reply):
|
||||
"""
|
||||
|
@ -136,6 +151,9 @@ class QueryBot(slixmpp.ClientXMPP):
|
|||
contact = yield from self['xep_0030'].get_info(jid=target, cached=False)
|
||||
reply.append(ContactInfo(contact, msg, target).format_contact())
|
||||
|
||||
elif keyword == "!xep":
|
||||
reply.append(XEPRequest(msg, target).format())
|
||||
|
||||
except XMPPError as error:
|
||||
reply.append(HandleError(error, msg, key, target).build_report())
|
||||
|
||||
|
|
Loading…
Reference in a new issue