* new uptime.py file

- removed asyncio from import
* fixed bug with muc jid parser
* outsorced validate and dedup to misc.py
This commit is contained in:
nico 2018-10-11 19:28:18 +02:00
parent 9d45271778
commit 559ab280ca
2 changed files with 67 additions and 68 deletions

45
classes/uptime.py Normal file
View file

@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
# XEP-0012: Last Activity
class LastActivity:
""" query the server uptime of the specified domain, defined by XEP-0012 """
def __init__(self, session, msg, target):
self.session = session
self.nick = msg['mucnick']
self.message_type = msg['type']
self.target = target
async def query(self):
last_activity = await self.session['xep_0012'].get_last_activity(jid=self.target)
seconds = await last_activity['last_activity']['seconds']
return seconds
async def format_values(self, granularity=4):
seconds = await self.query()
#seconds = last_activity['last_activity']['seconds']
uptime = []
intervals = (
('years', 31536000), # 60 * 60 * 24 * 365
('weeks', 604800), # 60 * 60 * 24 * 7
('days', 86400), # 60 * 60 * 24
('hours', 3600), # 60 * 60
('minutes', 60),
('seconds', 1)
)
for name, count in intervals:
value = seconds // count
if value:
seconds -= value * count
if value == 1:
name = name.rstrip('s')
uptime.append("{} {}".format(value, name))
result = ' '.join(uptime[:granularity])
if self.message_type == "groupchat":
text = "%s: %s is running since %s" % (self.nick, self.target, result)
else:
text = "%s is running since %s" % (self.target, result)
return text

86
main.py
View file

@ -1,16 +1,14 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
Slixmpp: The Slick XMPP Library James the MagicXMPP Bot
Copyright (C) 2010 Nathanael C. Fritz build with Slick XMPP Library
This file is part of Slixmpp. Copyright (C) 2018 Nico Wellpott
See the file LICENSE for copying permission. See the file LICENSE for copying permission.
""" """
import asyncio
import slixmpp import slixmpp
import ssl import ssl
import validators
import configparser import configparser
import logging import logging
@ -19,7 +17,9 @@ from slixmpp.exceptions import XMPPError
import common.misc as misc import common.misc as misc
from common.strings import StaticAnswers from common.strings import StaticAnswers
from classes.functions import Version, LastActivity, ContactInfo, HandleError from classes.functions import Version, ContactInfo, HandleError
from classes.servercontact import ServerContact
from classes.uptime import LastActivity
from classes.xep import XEPRequest from classes.xep import XEPRequest
@ -39,7 +39,7 @@ class QueryBot(slixmpp.ClientXMPP):
# session start event, starting point for the presence and roster requests # session start event, starting point for the presence and roster requests
self.add_event_handler('session_start', self.start) self.add_event_handler('session_start', self.start)
# register recieve handler for both groupchat and normal message events # register receive handler for both groupchat and normal message events
self.add_event_handler('message', self.message) self.add_event_handler('message', self.message)
def start(self, event): def start(self, event):
@ -50,65 +50,19 @@ class QueryBot(slixmpp.ClientXMPP):
self.get_roster() self.get_roster()
# If a room password is needed, use: password=the_room_password # If a room password is needed, use: password=the_room_password
if self.room:
for rooms in self.room.split(sep=","): for rooms in self.room.split(sep=","):
self.plugin['xep_0045'].join_muc(rooms, self.nick, wait=True) self.plugin['xep_0045'].join_muc(rooms, self.nick, wait=True)
def validate(self, wordlist, index):
"""
validation method to reduce malformed querys and unnecessary connection attempts
:param wordlist: words separated by " " from the message
:param index: keyword index inside the message
:return: true if valid
"""
# keyword inside the message
argument = wordlist[index]
# check if argument is in the argument list
if argument in StaticAnswers().keys(arg='list'):
# if argument uses a domain check for occurence in list and check domain
if argument in StaticAnswers().keys(arg='list', keyword='domain_keywords'):
try:
target = wordlist[index + 1]
if validators.domain(target):
return True
except IndexError:
# except an IndexError if a keywords is the last word in the message
return False
# check if number keyword is used if true check if target is assignable
elif argument in StaticAnswers().keys(arg='list', keyword='number_keywords'):
try:
if wordlist[index + 1]:
return True
except IndexError:
# except an IndexError if target is not assignable
return False
# check if argument is inside no_arg list
elif argument in StaticAnswers().keys(arg='list', keyword="no_arg_keywords"):
return True
else:
return False
else:
return False
def deduplicate(self, reply):
"""
deduplication method for the result list
:param list reply: list containing strings
:return: list containing unique strings
"""
reply_dedup = list()
for item in reply:
if item not in reply_dedup:
reply_dedup.append(item)
return reply_dedup
async def message(self, msg): async def message(self, msg):
""" """
:param msg: received message stanza :param msg: received message stanza
""" """
data = self.data data = {
'words': list(),
'reply': list(),
'queue': list()
}
# catch self messages to prevent self flooding # catch self messages to prevent self flooding
if msg['mucnick'] == self.nick: if msg['mucnick'] == self.nick:
@ -144,18 +98,18 @@ class QueryBot(slixmpp.ClientXMPP):
target = data['words'][index + 1] target = data['words'][index + 1]
try: try:
if keyword == '!uptime': if keyword == '!uptime':
last_activity = await self['xep_0012'].get_last_activity(jid=target) data['reply'].append(LastActivity(self, msg, target).format_values())
self.data['reply'].append(LastActivity(last_activity, msg, target).format_values())
#last_activity = await self['xep_0012'].get_last_activity(jid=target)
#data['reply'].append(LastActivity(last_activity, msg, target).format_values())
elif keyword == "!version": elif keyword == "!version":
version = await self['xep_0092'].get_version(jid=target) version = await self['xep_0092'].get_version(jid=target)
self.data['reply'].append(Version(version, msg, target).format_version()) data['reply'].append(Version(version, msg, target).format_version())
elif keyword == "!contact": elif keyword == "!contact":
last_activity = await self['xep_0012'].get_last_activity(jid=target) contact = await self['xep_0030'].get_info(jid=target, cached=False)
self.data['reply'].append(LastActivity(last_activity, msg, target).format_values()) data['reply'].append(ServerContact(contact, msg, target).format_contact())
elif keyword == "!xep": elif keyword == "!xep":
data['reply'].append(XEPRequest(msg, target).format()) data['reply'].append(XEPRequest(msg, target).format())