xmpp-chatbot/classes/xep.py

119 lines
3.6 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
import os
import requests
import defusedxml.ElementTree as Et
class XEPRequest:
"""
class which requests the header of the referenced xep
"""
def __init__(self):
# init all necessary variables
self.reqxep, self.opt_arg = None, None
self.xeplist = None
self.acceptedxeps = list()
def req_xeplist(self):
"""
query and save the current xep list to reduce network bandwidth
"""
# check if etag header is present if not set local_etag to ""
if os.path.isfile("./common/.etag"):
with open("./common/.etag") as file:
local_etag = file.read()
else:
local_etag = ""
with requests.Session() as s:
# head request the xeplist.xml
s.headers.update({'Accept': 'application/xml'})
head = s.head("https://xmpp.org/extensions/xeplist.xml")
etag = head.headers['etag']
# compare etag with local_etag if they match up no request is made
if local_etag == etag:
with open("./common/xeplist.xml", "r") as file:
self.xeplist = Et.fromstring(file.read())
# if the connection is not possible use cached xml if present
elif os.path.isfile("./common/xeplist.xml") and head.status_code != 200:
with open("./common/xeplist.xml", "r") as file:
self.xeplist = Et.fromstring(file.read())
# in any other case request the latest xml
else:
r = s.get("https://xmpp.org/extensions/xeplist.xml")
r.encoding = 'utf-8'
local_etag = head.headers['etag']
with open("./common/xeplist.xml", "w") as file:
file.write(r.content.decode())
self.xeplist = Et.fromstring(r.content.decode())
with open('./common/.etag', 'w') as string:
string.write(local_etag)
# populate xep comparison list
for xep in self.xeplist.findall(".//*[@accepted='true']/number"):
self.acceptedxeps.append(xep.text)
def get(self):
"""
function to query the xep entry if xepnumber is present in xeplist
:return: formatted xep header information
"""
# all possible subtags grouped by location
last_revision_tags = ["date", "version", "initials", "remark"]
xep_tags = ["number", "title", "abstract", "type", "status", "approver", "shortname", "sig", "lastcall", "date", "version", "initials", "remark"]
# check if xeplist is accurate
self.req_xeplist()
result = list()
# if requested number is member of acceptedxeps continue
if str(self.reqxep) in self.acceptedxeps:
searchstring = ".//*[@accepted='true']/[number='%s']" % self.reqxep
for item in self.xeplist.findall(searchstring):
# if the opt_arg references is member of xeptag return only that tag
if self.opt_arg in xep_tags:
# if the opt_arg references is member of last_revision_tags return only that subtag
if self.opt_arg in last_revision_tags:
query = item.find("last-revision").find(self.opt_arg)
else:
query = item.find(self.opt_arg)
# append opt_arg results to the result list
if query is not None:
result.append("%s : %s" % (query.tag, query.text))
else:
result.append("%s does not have a %s tag." % (self.reqxep, self.opt_arg))
# in any other case return the general answer
else:
result_opts = ["title", "type", "abstract", "status"]
for tag in result_opts:
result.append(item.find(tag).text)
# if the requested number is no member of acceptedxeps and/or not accepted return error.
else:
result.append("XEP-%s : is not available." % self.reqxep)
return result
def format(self, query, target, opt_arg):
"""
:param target: number int or str to request the xep for
:return:
"""
self.reqxep = int(target)
self.opt_arg = opt_arg
reply = self.get()
text = '\n'.join(reply)
return text