refactorization (#1)

* changed all lineendings to lf
* cleaned up main class
* refactor bot
* refactor bot functions
* moved functions.py to classes dir
* code comment changes
* changed code comment style
- removed unnecessary pass statement
+ added missing newline
* simplified function and return statements
This commit is contained in:
nico 2018-10-01 23:17:09 +02:00 committed by GitHub
parent b9a2e5adcd
commit 294a728b0f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 460 additions and 351 deletions

126
.gitignore vendored
View file

@ -1,63 +1,63 @@
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*.py[cod] *.py[cod]
*$py.class *$py.class
# C extensions # C extensions
*.so *.so
# Distribution / packaging # Distribution / packaging
.Python .Python
env/ env/
venv/ venv/
build/ build/
develop-eggs/ develop-eggs/
dist/ dist/
downloads/ downloads/
eggs/ eggs/
.eggs/ .eggs/
lib/ lib/
lib64/ lib64/
parts/ parts/
sdist/ sdist/
var/ var/
*.egg-info/ *.egg-info/
.installed.cfg .installed.cfg
*.egg *.egg
# PyInstaller # PyInstaller
# Usually these files are written by a python script from a template # Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it. # before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest *.manifest
*.spec *.spec
# Installer logs # Installer logs
pip-log.txt pip-log.txt
pip-delete-this-directory.txt pip-delete-this-directory.txt
# Unit test / coverage reports # Unit test / coverage reports
htmlcov/ htmlcov/
.tox/ .tox/
.coverage .coverage
.coverage.* .coverage.*
.cache .cache
nosetests.xml nosetests.xml
coverage.xml coverage.xml
*,cover *,cover
# Translations # Translations
*.mo *.mo
*.pot *.pot
# Django stuff: # Django stuff:
*.log *.log
# Sphinx documentation # Sphinx documentation
docs/_build/ docs/_build/
# PyBuilder # PyBuilder
target/ target/
# .idea # .idea
.idea .idea
bot\.cfg bot\.cfg

View file

@ -1,43 +1,43 @@
# Python XMPP Chatbot # Python XMPP Chatbot
## beforehand ## beforehand
Do not opperate this bot on foreign servers. Do not opperate this bot on foreign servers.
### functions ### functions
- query xmpp server software and version [XEP-0092](https://xmpp.org/extensions/xep-0092.html) - query xmpp server software and version [XEP-0092](https://xmpp.org/extensions/xep-0092.html)
- query xmpp server uptime [XEP-0012](https://xmpp.org/extensions/xep-0012.html) - query xmpp server uptime [XEP-0012](https://xmpp.org/extensions/xep-0012.html)
- query xmpp server contact addresses [XEP-0157](https://xmpp.org/extensions/xep-0157.html) - query xmpp server contact addresses [XEP-0157](https://xmpp.org/extensions/xep-0157.html)
- display help output - display help output
- respond to username being mentioned - respond to username being mentioned
### todo ### todo
- [ ] Github Webhook - [ ] Github Webhook
### install ### install
#### requirements #### requirements
- slixmpp - slixmpp
- configparser - configparser
- datetime - datetime
- random - random
- validators - validators
#### configuration #### configuration
`bot.cfg` replace dummy file with correct credentials/ parameters `bot.cfg` replace dummy file with correct credentials/ parameters
````cfg ````cfg
[Account] [Account]
jid=nick@domain.tld/querybot-0.1 jid=nick@domain.tld/querybot-0.1
password=super_secret_password password=super_secret_password
[MUC] [MUC]
rooms=room_to_connect_to@conference.domain.tld rooms=room_to_connect_to@conference.domain.tld,another_room@conference.domain.tld
nick=mucnickname nick=mucnickname
[ADMIN] [ADMIN]
admins=admins ( ! muc nick and not the jid nickname) admins=admins ( ! muc nick and not the jid nickname)
```` ````
##### systemd ##### systemd
Copy the systemd dummy file into systemd service folder. Copy the systemd dummy file into systemd service folder.
`systemdctl daemon-reload` and `systemctl start magicbot.service` to start the bot. `systemdctl daemon-reload` and `systemctl start magicbot.service` to start the bot.
If it is neccecary to start the bot automatically when the system boots do `systemctl enable magicbot.service`. If it is neccecary to start the bot automatically when the system boots do `systemctl enable magicbot.service`.
#### starting the bot without systemd #### starting the bot without systemd
Got to the bots directory and run `./main.py &`. Got to the bots directory and run `./main.py &`.

108
classes/functions.py Normal file
View file

@ -0,0 +1,108 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# XEP-0072: Server Version
class Version:
def __init__(self, version, msg, target):
self.version = version['software_version']['version']
self.os = version['software_version']['os']
self.name = version['software_version']['name']
self.nick = msg['mucnick']
self.message_type = msg['type']
self.target = target
def format_version(self):
if self.message_type == "groupchat":
text = "%s: %s is running %s version %s on %s" % (self.nick, self.target, self.name, self.version, self.os)
else:
text = "%s is running %s version %s on %s" % (self.target, self.name, self.version, self.os)
return text
# XEP-0012: Last Activity
class LastActivity:
""" query the server uptime of the specified domain, defined by XEP-0012 """
def __init__(self, last_activity, msg, target):
self.last_activity = last_activity
self.nick = msg['mucnick']
self.message_type = msg['type']
self.target = target
def format_values(self, granularity=4):
seconds = self.last_activity['last_activity']['seconds']
uptime = []
intervals = (
('years', 31536000), # 60 * 60 * 24 * 365
('weeks', 604800), # 60 * 60 * 24 * 7
('days', 86400), # 60 * 60 * 24
('hours', 3600), # 60 * 60
('minutes', 60),
('seconds', 1)
)
for name, count in intervals:
value = seconds // count
if value:
seconds -= value * count
if value == 1:
name = name.rstrip('s')
uptime.append("{} {}".format(value, name))
result = ' '.join(uptime[:granularity])
if self.message_type == "groupchat":
text = "%s: %s is running since %s" % (self.nick, self.target, result)
else:
text = "%s is running since %s" % (self.target, result)
return text
# XEP-0157: Contact Addresses for XMPP Services
class ContactInfo:
def __init__(self, contact, msg, target):
self.contact = contact
self.message = msg
self.target = target
def format_contact(self):
server_info = []
sep = ' , '
possible_vars = ['abuse-addresses',
'admin-addresses',
'feedback-addresses',
'sales-addresses',
'security-addresses',
'support-addresses']
for field in self.contact['disco_info']['form']:
var = field['var']
if var in possible_vars:
field_value = field.get_value(convert=False)
value = sep.join(field_value) if isinstance(field_value, list) else field_value
server_info.append(' - %s: %s' % (var, value))
if server_info:
text = "contact addresses for %s are" % self.target
for count in range(server_info.__len__()):
text += "\n" + server_info[count]
else:
text = "%s has no contact addresses configured." % self.target
return text
# class handeling XMPPError exeptions
class HandleError:
def __init__(self, error, msg, key, target):
self.error = error
self.message = msg
self.key = key
self.target = target
def build_report(self):
condition = self.error.condition
keyword = self.key[1:]
text = "There was an error requesting " + self.target + '\'s ' + keyword + " : " + condition
return text

51
classes/strings.py Normal file
View file

@ -0,0 +1,51 @@
# -*- coding: utf-8 -*-
from random import randint
class StaticAnswers:
"""
collection of callable static/ semi-static strings
"""
def __init__(self, nick=""):
self.nickname = nick
self.helpfile = {
'help': '!help -- display this text',
'version': '!version domain.tld -- receive XMPP server version',
'uptime': '!uptime domain.tld -- receive XMPP server uptime',
'contact': '!contact domain.tld -- receive XMPP server contact address info'}
self.possible_answers = {
'1': 'I heard that, %s.',
'2': 'I am sorry for that %s.',
'3': '%s did you try turning it off and on again?'}
self.error_messages = {
'1': 'not reachable',
'2': 'not a valid target'
}
self.keywords = {
"keywords": ["!help", "!uptime", "!version", "!contact"],
"no_arg_keywords": ["!help"]
}
def keys(self, arg="", keyword='keywords'):
if arg == 'list':
try:
return self.keywords[keyword]
except KeyError:
return self.keywords['keywords']
else:
return self.keywords
def gen_help(self):
helpdoc = "\n".join(['%s' % value for (_, value) in self.helpfile.items()])
return helpdoc
def gen_answer(self):
possible_answers = self.possible_answers
return possible_answers[str(randint(1, possible_answers.__len__()))] % self.nickname
def error(self,code):
try:
text = self.error_messages[str(code)]
except KeyError:
return 'unknown error'
return text

View file

@ -1,13 +1,13 @@
[Unit] [Unit]
2 Description=SlixXMPP service bot 2 Description=SlixXMPP service bot
3 After=network.target ejabberd.service 3 After=network.target ejabberd.service
4 4
5 [Service] 5 [Service]
6 Type=simple 6 Type=simple
7 ExecStart=/usr/bin/python3 /path/to/main.py 7 ExecStart=/usr/bin/python3 /path/to/main.py
8 Restart=on-failure 8 Restart=on-failure
9 RestartSec=60s 9 RestartSec=60s
10 User=nico 10 User=nico
11 11
12 [Install] 12 [Install]
13 WantedBy=multi-user.target 13 WantedBy=multi-user.target

416
main.py
View file

@ -1,233 +1,183 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
Slixmpp: The Slick XMPP Library Slixmpp: The Slick XMPP Library
Copyright (C) 2010 Nathanael C. Fritz Copyright (C) 2010 Nathanael C. Fritz
This file is part of Slixmpp. This file is part of Slixmpp.
See the file LICENSE for copying permission. See the file LICENSE for copying permission.
""" """
import asyncio import asyncio
import configparser import slixmpp
import logging import ssl
import slixmpp import validators
import ssl import configparser
import validators import logging
from argparse import ArgumentParser
from datetime import datetime, timedelta from argparse import ArgumentParser
from random import randint from slixmpp.exceptions import XMPPError
from slixmpp.exceptions import XMPPError
from classes.strings import StaticAnswers
from classes.functions import Version, LastActivity, ContactInfo, HandleError
class QueryBot(slixmpp.ClientXMPP):
""" A simple Slixmpp bot with some features """
def __init__(self, jid, password, room, nick): class QueryBot(slixmpp.ClientXMPP):
slixmpp.ClientXMPP.__init__(self, jid, password) def __init__(self, jid, password, room, nick):
slixmpp.ClientXMPP.__init__(self, jid, password)
self.room = room self.ssl_version = ssl.PROTOCOL_TLSv1_2
self.nick = nick self.room = room
self.nick = nick
# session start event, starting point for the presence and roster requests
self.add_event_handler('session_start', self.start) # session start event, starting point for the presence and roster requests
self.add_event_handler('session_start', self.start)
# register handler to recieve both groupchat and normal message events
self.add_event_handler('message', self.message) # register handler to recieve both groupchat and normal message events
self.add_event_handler('message', self.message)
def start(self, event):
""" def start(self, event):
Arguments: """
event -- An empty dictionary. The session_start event does not provide any additional data. :param str event -- An empty dictionary. The session_start event does not provide any additional data.
""" """
self.send_presence() self.send_presence()
self.get_roster() self.get_roster()
# If a room password is needed, use: password=the_room_password # If a room password is needed, use: password=the_room_password
for rooms in self.room.split(sep=","): for rooms in self.room.split(sep=","):
self.plugin['xep_0045'].join_muc(rooms, self.nick, wait=True) self.plugin['xep_0045'].join_muc(rooms, self.nick, wait=True)
def validate_domain(self, wordlist, index):
@staticmethod """
def precheck(line): validation method to reduce connection attemps to unvalid domains
""" :param wordlist: words seperated by " " from the message
pre check function :param index: keyword index inside the message
- check that keywords are used properly :return: true if valid
- check that following a keyword a proper jid is following """
:param line: line from message body # keyword inside the message
:return: true if correct argument = wordlist[index]
"""
keywords = ["!help", "!uptime", "!version", "!contact"] # if the argument is not inside the no_arg_keywords target is index + 1
proper_domain, proper_key = False, False if argument not in StaticAnswers().keys(arg='list', keyword="no_arg_keywords"):
try:
try: target = wordlist[index + 1]
# check for valid keyword in position 0 if validators.domain(target):
if line[0] in keywords: return True
proper_key = True except IndexError:
else: # except an IndexError if a keywords is the last word in the message
return return False
elif argument in StaticAnswers().keys(arg='list', keyword="no_arg_keywords"):
# help command is used return True
if line[0] == "!help": else:
proper_domain = True return
# check if domain is valid
elif validators.domain(line[1]): def deduplicate(self, reply):
proper_domain = True """
else: deduplication method for the result list
return :param list reply: list containing strings
except IndexError: :return: list containing unique strings
pass """
reply_dedup = list()
return proper_key and proper_domain for item in reply:
if item not in reply_dedup:
@asyncio.coroutine reply_dedup.append(item)
def message(self, msg):
""" return reply_dedup
Arguments:
msg -- The received message stanza. See the documentation for stanza objects and the Message stanza to see @asyncio.coroutine
how it may be used. def message(self, msg):
""" """
:param msg: received message stanza
# catch self messages to prevent self flooding """
if msg['mucnick'] == self.nick: # init empty reply list
return reply = list()
if self.nick in msg['body']: # catch self messages to prevent self flooding
# answer with predefined text when mucnick is used if msg['mucnick'] == self.nick:
self.send_message(mto=msg['from'].bare, mbody=notice_answer(msg['mucnick']), mtype=msg['type']) return
elif self.nick in msg['body']:
for line in msg['body'].splitlines(): # add pre predefined text to reply list
""" split multiline messages into lines to check every line for keywords """ reply.append(StaticAnswers(msg['mucnick']).gen_answer())
line = line.split(sep= " ")
# building the queue
if self.precheck(line): # double splitting to exclude whitespaces
""" true if keyword and domain are valid """ words = " ".join(msg['body'].split()).split(sep=" ")
# Display help queue = list()
if line[0] == '!help':
""" display help when keyword !help is recieved """ # check all words in side the message for possible hits
self.send_message(mto=msg['from'].bare, mbody=help_doc(), mtype=msg['type']) for x in enumerate(words):
# check word for match in keywords list
# XEP-0072: Server Version for y in StaticAnswers().keys(arg='list'):
if line[0] == '!version': # if so queue the keyword and the postion in the string
""" query the server software version of the specified domain, defined by XEP-0092 """ if x[1] == y:
try: # only add job to queue if domain is valid
version = yield from self['xep_0092'].get_version(line[1]) if self.validate_domain(words, x[0]):
queue.append({str(y): x[0]})
if msg['type'] == "groupchat":
text = "%s: %s is running %s version %s on %s" % (msg['mucnick'], line[1], version[ # queue
'software_version']['name'], version['software_version']['version'], version[ for job in queue:
'software_version']['os']) for key in job:
else: keyword = key
text = "%s is running %s version %s on %s" % (line[1], version['software_version'][ index = job[key]
'name'], version['software_version']['version'], version['software_version']['os'])
if keyword == '!help':
self.send_message(mto=msg['from'].bare, mbody=text, mtype=msg['type']) reply.append(StaticAnswers().gen_help())
except NameError:
pass try:
except XMPPError: target = words[index + 1]
pass if keyword == '!uptime':
last_activity = yield from self['xep_0012'].get_last_activity(target)
# XEP-0012: Last Activity reply.append(LastActivity(last_activity, msg, target).format_values())
if line[0] == '!uptime':
""" query the server uptime of the specified domain, defined by XEP-0012 """ elif keyword == "!version":
try: version = yield from self['xep_0092'].get_version(target)
# try if domain[0] is set if not just pass reply.append(Version(version, msg, target).format_version())
last_activity = yield from self['xep_0012'].get_last_activity(line[1])
uptime = datetime(1, 1, 1) + timedelta(seconds=last_activity['last_activity']['seconds']) elif keyword == "!contact":
contact = yield from self['xep_0030'].get_info(jid=target, cached=False)
if msg['type'] == "groupchat": reply.append(ContactInfo(contact, msg, target).format_contact())
text = "%s: %s is running since %d days %d hours %d minutes" % (msg['mucnick'], line[1],
uptime.day - 1, uptime.hour, except XMPPError as error:
uptime.minute) reply.append(HandleError(error, msg, key, target).build_report())
else:
text = "%s is running since %d days %d hours %d minutes" % (line[1], uptime.day - 1, # remove None type from list and send all elements
uptime.hour, uptime.minute) if list(filter(None.__ne__, reply)) and reply:
self.send_message(mto=msg['from'].bare, mbody=text, mtype=msg['type']) reply = self.deduplicate(reply)
except NameError: self.send_message(mto=msg['from'].bare, mbody="\n".join(reply), mtype=msg['type'])
pass
except XMPPError:
pass if __name__ == '__main__':
# command line arguments.
# XEP-0157: Contact Addresses for XMPP Services parser = ArgumentParser()
if line[0] == "!contact": parser.add_argument('-q', '--quiet', help='set logging to ERROR', action='store_const', dest='loglevel',
""" query the XEP-0030: Service Discovery and extract contact information """ const=logging.ERROR, default=logging.INFO)
try: parser.add_argument('-d', '--debug', help='set logging to DEBUG', action='store_const', dest='loglevel',
result = yield from self['xep_0030'].get_info(jid=line[1], cached=False) const=logging.DEBUG, default=logging.INFO)
server_info = [] parser.add_argument('-D', '--dev', help='set logging to console', action='store_const', dest='logfile', const="",
for field in result['disco_info']['form']: default='bot.log')
var = field['var'] args = parser.parse_args()
if field['type'] == 'hidden' and var == 'FORM_TYPE':
title = field['value'][0] # logging
continue logging.basicConfig(filename=args.logfile, level=args.loglevel, format='%(levelname)-8s %(message)s')
sep = ', ' logger = logging.getLogger(__name__)
field_value = field.get_value(convert=False)
value = sep.join(field_value) if isinstance(field_value, list) else field_value # configfile
server_info.append('%s: %s' % (var, value)) config = configparser.RawConfigParser()
config.read('./bot.cfg')
text = "contact addresses for %s are" % (line[1]) args.jid = config.get('Account', 'jid')
for count in range(len(server_info)): args.password = config.get('Account', 'password')
text += "\n" + server_info[count] args.room = config.get('MUC', 'rooms')
args.nick = config.get('MUC', 'nick')
self.send_message(mto=msg['from'].bare, mbody=text, mtype=msg['type'])
except NameError: # init the bot and register used slixmpp plugins
pass xmpp = QueryBot(args.jid, args.password, args.room, args.nick)
except XMPPError: xmpp.register_plugin('xep_0012') # Last Activity
pass xmpp.register_plugin('xep_0030') # Service Discovery
xmpp.register_plugin('xep_0045') # Multi-User Chat
# TODO xmpp.register_plugin('xep_0060') # PubSub
# append all results to single message send just once xmpp.register_plugin('xep_0085') # Chat State Notifications
else: xmpp.register_plugin('xep_0092') # Software Version
pass xmpp.register_plugin('xep_0128') # Service Discovery Extensions
xmpp.register_plugin('xep_0199') # XMPP Ping
def help_doc(): # connect and start receiving stanzas
helpfile = {'help': '!help -- display this text', xmpp.connect()
'version': '!version domain.tld -- receive XMPP server version', xmpp.process()
'uptime':'!uptime domain.tld -- receive XMPP server uptime',
'contact': '!contact domain.tld -- receive XMPP server contact address info'}
return "".join(['%s\n' % (value) for (_, value) in helpfile.items()])
def notice_answer(nickname):
possible_answers = {'1': 'I heard that, %s.',
'2': 'I am sorry for that %s.',
'3': '%s did you try turning it off and on again?'}
return possible_answers[str(randint(1, len(possible_answers)))] % nickname
if __name__ == '__main__':
# command line arguments.
parser = ArgumentParser()
parser.add_argument('-q', '--quiet', help='set logging to ERROR', action='store_const', dest='loglevel',
const=logging.ERROR, default=logging.INFO)
parser.add_argument('-d', '--debug', help='set logging to DEBUG', action='store_const', dest='loglevel',
const=logging.DEBUG, default=logging.INFO)
parser.add_argument('-D', '--dev', help='set logging to console', action='store_const', dest='logfile',
const="", default='bot.log')
args = parser.parse_args()
# logging
logging.basicConfig(filename=args.logfile, level=args.loglevel, format='%(levelname)-8s %(message)s')
logger = logging.getLogger(__name__)
# configfile
config = configparser.RawConfigParser()
config.read('./bot.cfg')
args.jid = config.get('Account', 'jid')
args.password = config.get('Account', 'password')
args.room = config.get('MUC', 'rooms')
args.nick = config.get('MUC', 'nick')
args.admins = config.get('ADMIN', 'admins')
# init the bot and register used slixmpp plugins
xmpp = QueryBot(args.jid, args.password, args.room, args.nick)
xmpp.ssl_version = ssl.PROTOCOL_TLSv1_2
xmpp.register_plugin('xep_0012') # Last Activity
xmpp.register_plugin('xep_0030') # Service Discovery
xmpp.register_plugin('xep_0045') # Multi-User Chat
xmpp.register_plugin('xep_0060') # PubSub
xmpp.register_plugin('xep_0085') # Chat State Notifications
xmpp.register_plugin('xep_0092') # Software Version
xmpp.register_plugin('xep_0128') # Service Discovery Extensions
xmpp.register_plugin('xep_0199') # XMPP Ping
# connect and start receiving stanzas
xmpp.connect()
xmpp.process()