xmpp-chatbot/main.py
nico 825167b8aa
Squashed commit of the following:
commit 91cf4b74105f69f16a80825581a0d2d34d15d155
Merge: 30ceaea 90b42ed
Author: nico <nico.wellpott@uni-oldenburg.de>
Date:   Wed Oct 3 22:34:42 2018 +0200

    Merge remote-tracking branch 'origin/xep' into xep

commit 30ceaea56a77ed95deba30c4fe65e238ea5960ac
Author: nico <nico.wellpott@uni-oldenburg.de>
Date:   Wed Oct 3 22:34:33 2018 +0200

    Initial Version XEP query

    + added initial version of xep query class

    Init Implementation

    + added xep plugin to bot class

    * reworked validation function
    * updated .gitignore file
    + added xep plugin

commit 90b42edb9b8e92eba3bb67030d5f919b1e71d0bc
Author: nico <nico.wellpott@uni-oldenburg.de>
Date:   Wed Oct 3 22:34:21 2018 +0200

    * reworked validation function
    * updated .gitignore file
    + added xep plugin

commit 25c78807731417867840d6fe4abf598e64aded28
Author: nico <nico@magicbroccoli.de>
Date:   Wed Oct 3 10:54:02 2018 +0200

    Init Implementation

    + added xep plugin to bot class

commit fe711f44d40671d927e9b946fb66679b297272c8
Author: nico <nico@magicbroccoli.de>
Date:   Tue Oct 2 21:20:08 2018 +0200

    Initial Version XEP query

    + added initial version of xep query class
2018-10-03 23:24:36 +02:00

202 lines
6.4 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Slixmpp: The Slick XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of Slixmpp.
See the file LICENSE for copying permission.
"""
import asyncio
import slixmpp
import ssl
import validators
import configparser
import logging
from argparse import ArgumentParser
from slixmpp.exceptions import XMPPError
from classes.strings import StaticAnswers
from classes.functions import Version, LastActivity, ContactInfo, HandleError
from classes.xep import XEPRequest
class QueryBot(slixmpp.ClientXMPP):
def __init__(self, jid, password, room, nick):
slixmpp.ClientXMPP.__init__(self, jid, password)
self.ssl_version = ssl.PROTOCOL_TLSv1_2
self.room = room
self.nick = nick
# session start event, starting point for the presence and roster requests
self.add_event_handler('session_start', self.start)
# register handler to recieve both groupchat and normal message events
self.add_event_handler('message', self.message)
def start(self, event):
"""
:param event -- An empty dictionary. The session_start event does not provide any additional data.
"""
self.send_presence()
self.get_roster()
# If a room password is needed, use: password=the_room_password
for rooms in self.room.split(sep=","):
self.plugin['xep_0045'].join_muc(rooms, self.nick, wait=True)
def validate_domain(self, wordlist, index):
"""
validation method to reduce malformed querys and unnecessary connection attempts
:param wordlist: words separated by " " from the message
:param index: keyword index inside the message
:return: true if valid
"""
# keyword inside the message
argument = wordlist[index]
# check if argument is in the argument list
if argument in StaticAnswers().keys(arg='list'):
# if argument uses a domain check for occurence in list and check domain
if argument in StaticAnswers().keys(arg='list', keyword='domain_keywords'):
try:
target = wordlist[index + 1]
if validators.domain(target):
return True
except IndexError:
# except an IndexError if a keywords is the last word in the message
return False
# check if number keyword is used if true check if target is assignable
elif argument in StaticAnswers().keys(arg='list', keyword='number_keywords'):
try:
target = wordlist[index + 1]
return True
except IndexError:
# except an IndexError if target is not assignable
return False
# check if argument is inside no_arg list
elif argument in StaticAnswers().keys(arg='list', keyword="no_arg_keywords"):
return True
else:
return False
else:
return False
def deduplicate(self, reply):
"""
deduplication method for the result list
:param list reply: list containing strings
:return: list containing unique strings
"""
reply_dedup = list()
for item in reply:
if item not in reply_dedup:
reply_dedup.append(item)
return reply_dedup
@asyncio.coroutine
def message(self, msg):
"""
:param msg: received message stanza
"""
# init empty reply list
reply = list()
# catch self messages to prevent self flooding
if msg['mucnick'] == self.nick:
return
elif self.nick in msg['body']:
# add pre predefined text to reply list
reply.append(StaticAnswers(msg['mucnick']).gen_answer())
# building the queue
# double splitting to exclude whitespaces
words = " ".join(msg['body'].split()).split(sep=" ")
queue = list()
# check all words in side the message for possible hits
for x in enumerate(words):
# check word for match in keywords list
for y in StaticAnswers().keys(arg='list'):
# if so queue the keyword and the postion in the string
if x[1] == y:
# only add job to queue if domain is valid
if self.validate_domain(words, x[0]):
queue.append({str(y): x[0]})
# queue
for job in queue:
for key in job:
keyword = key
index = job[key]
if keyword == '!help':
reply.append(StaticAnswers().gen_help())
continue
try:
target = words[index + 1]
if keyword == '!uptime':
last_activity = yield from self['xep_0012'].get_last_activity(target)
reply.append(LastActivity(last_activity, msg, target).format_values())
elif keyword == "!version":
version = yield from self['xep_0092'].get_version(target)
reply.append(Version(version, msg, target).format_version())
elif keyword == "!contact":
contact = yield from self['xep_0030'].get_info(jid=target, cached=False)
reply.append(ContactInfo(contact, msg, target).format_contact())
elif keyword == "!xep":
reply.append(XEPRequest(msg, target).format())
except XMPPError as error:
reply.append(HandleError(error, msg, key, target).build_report())
# remove None type from list and send all elements
if list(filter(None.__ne__, reply)) and reply:
reply = self.deduplicate(reply)
self.send_message(mto=msg['from'].bare, mbody="\n".join(reply), mtype=msg['type'])
if __name__ == '__main__':
# command line arguments.
parser = ArgumentParser()
parser.add_argument('-q', '--quiet', help='set logging to ERROR', action='store_const', dest='loglevel',
const=logging.ERROR, default=logging.INFO)
parser.add_argument('-d', '--debug', help='set logging to DEBUG', action='store_const', dest='loglevel',
const=logging.DEBUG, default=logging.INFO)
parser.add_argument('-D', '--dev', help='set logging to console', action='store_const', dest='logfile', const="",
default='bot.log')
args = parser.parse_args()
# logging
logging.basicConfig(filename=args.logfile, level=args.loglevel, format='%(levelname)-8s %(message)s')
logger = logging.getLogger(__name__)
# configfile
config = configparser.RawConfigParser()
config.read('./bot.cfg')
args.jid = config.get('Account', 'jid')
args.password = config.get('Account', 'password')
args.room = config.get('MUC', 'rooms')
args.nick = config.get('MUC', 'nick')
# init the bot and register used slixmpp plugins
xmpp = QueryBot(args.jid, args.password, args.room, args.nick)
xmpp.register_plugin('xep_0012') # Last Activity
xmpp.register_plugin('xep_0030') # Service Discovery
xmpp.register_plugin('xep_0045') # Multi-User Chat
xmpp.register_plugin('xep_0060') # PubSub
xmpp.register_plugin('xep_0085') # Chat State Notifications
xmpp.register_plugin('xep_0092') # Software Version
xmpp.register_plugin('xep_0128') # Service Discovery Extensions
xmpp.register_plugin('xep_0199') # XMPP Ping
# connect and start receiving stanzas
xmpp.connect()
xmpp.process()