KudosBot/kudos.py
2024-03-20 13:29:05 +01:00

161 lines
6.6 KiB
Python
Executable file

#!.venv/bin/python3
import slixmpp
from slixmpp.exceptions import IqError, IqTimeout
import sqlite3
class KudosBot(slixmpp.ClientXMPP):
def __init__(self, jid, password, rooms, nick):
super().__init__(jid, password)
self.rooms = rooms
self.nick = nick
self.db_name = 'kudos.db' # Nome del file del database
self.connect_db()
self.create_table()
self.add_event_handler("session_start", self.start)
self.add_event_handler("groupchat_message", self.muc_message)
def connect_db(self):
self.conn = sqlite3.connect(self.db_name)
self.cursor = self.conn.cursor()
def create_table(self):
self.cursor.execute('''CREATE TABLE IF NOT EXISTS kudos
(nome TEXT PRIMARY KEY, punti INTEGER NOT NULL)''')
self.conn.commit()
def aggiorna_o_crea_kudos(self, nome):
self.cursor.execute('SELECT punti FROM kudos WHERE nome = ?', (nome,))
row = self.cursor.fetchone()
if row:
self.cursor.execute('UPDATE kudos SET punti = punti + 1 WHERE nome = ?', (nome,))
else:
self.cursor.execute('INSERT INTO kudos (nome, punti) VALUES (?, 1)', (nome,))
self.conn.commit()
def diminuisci_kudos(self, nome):
self.cursor.execute('SELECT punti FROM kudos WHERE nome = ?', (nome,))
row = self.cursor.fetchone()
if row:
self.cursor.execute('UPDATE kudos SET punti = punti - 1 WHERE nome = ?', (nome,))
else:
self.cursor.execute('INSERT INTO kudos (nome, punti) VALUES (?, -1)', (nome,))
self.conn.commit()
def leggi_classifica(self):
self.cursor.execute('SELECT nome, punti FROM kudos ORDER BY punti DESC LIMIT 10')
return self.cursor.fetchall()
def leggi_contro_classifica(self):
self.cursor.execute('SELECT nome, punti FROM kudos ORDER BY punti ASC LIMIT 10')
return self.cursor.fetchall()
def ottieni_karma(self, nome):
self.cursor.execute('SELECT * FROM kudos WHERE nome = ?', (nome,))
return self.cursor.fetchone()
def seleziona_utente(self, nome):
self.cursor.execute('SELECT * FROM kudos WHERE nome = ?', (nome,))
return self.cursor.fetchone()
def rimuovi_utente(self, nome):
self.cursor.execute('DELETE FROM kudos WHERE nome = ?', (nome,))
self.conn.commit()
return
async def start(self, event):
try:
await self.get_roster()
except IqError as err:
print(f"Errore: {err.iq['error']['text']}")
self.disconnect()
except IqTimeout:
print("Timeout del server")
self.disconnect()
else:
self.send_presence()
for room in self.rooms:
await self.plugin['xep_0045'].join_muc(room, self.nick)
async def muc_message(self, msg):
if msg['mucnick'] != self.nick and msg['body'].endswith('!kudos'):
return
if msg['mucnick'] != self.nick and msg['body'].endswith('!nokudos'):
return
if msg['mucnick'] != self.nick and msg['body'].endswith('!karma'):
return
if msg['mucnick'] != self.nick and msg['body'].endswith('!rm'):
return
if msg['mucnick'] != self.nick and msg['body'].startswith('!help'):
helpmsg = "!kudos <destinatario>\n!nokudos <destinatario>\n!classifica\n!controclassifica\n!karma <destinatario>\n!rm <destinatario>"
msg.reply(helpmsg).send()
if msg['mucnick'] != self.nick and msg['body'].startswith('!kudos'):
destinatario = msg['body'].split()[1]
# Prevenzione auto-assegnazione kudos
if msg['mucnick'] == destinatario:
msg.reply("Non puoi assegnare kudos a te stesso!").send()
self.diminuisci_kudos(destinatario)
msg.reply(f"KarmaKiller: Kudos tolti a {destinatario}!").send()
return
self.aggiorna_o_crea_kudos(destinatario)
msg.reply(f"Kudos assegnati a {destinatario}!").send()
elif msg['body'].startswith('!nokudos'):
destinatario = msg['body'].split()[1]
# Prevenzione auto-assegnazione kudos
if msg['mucnick'] == destinatario:
msg.reply("Non puoi togliere kudos a te stesso!").send()
return
self.diminuisci_kudos(destinatario)
msg.reply(f"Kudos tolti a {destinatario}!").send()
elif msg['body'].startswith('!karma'):
destinatario = msg['body'].split()[1]
karma_utente = self.ottieni_karma(destinatario)
if karma_utente:
mapping = ' '.join(map(str, karma_utente))
utente = mapping.split()[0]
punti = mapping.split()[1]
karma_utente = f"Kudos per {utente}: {punti}!"
msg.reply(karma_utente).send()
else:
msg.reply("Nessun destinatario trovato!").send()
elif msg['body'].startswith('!rm'):
destinatario = msg['body'].split()[1]
# Prevenzione auto-assegnazione kudos
if msg['mucnick'] == destinatario:
msg.reply("Non puoi rimuovere il tuo karma!").send()
return
utente_con_kudos = self.seleziona_utente(destinatario)
if utente_con_kudos:
mapping = ' '.join(map(str, utente_con_kudos))
utente = mapping.split()[0]
self.rimuovi_utente(utente)
msg.reply(f"Kudos {utente} rimosso!").send()
else:
msg.reply(f"Kudos non trovati per {destinatario}!").send()
elif msg['body'].startswith('!classifica'):
classifica = "Classifica Kudos:\n"
for utente, punti in self.leggi_classifica():
classifica += f"{utente}: {punti} kudos\n"
msg.reply(classifica).send()
elif msg['body'].startswith('!controclassifica'):
controclassifica = "ControClassifica Kudos:\n"
for utente, punti in self.leggi_contro_classifica():
controclassifica += f"{utente}: {punti} kudos\n"
msg.reply(controclassifica).send()
if __name__ == '__main__':
jid = ""
password = "sekret"
rooms = ["lozibaldone@conference.xmpp-it.net", "bots@chat.woodpeckersnest.space"]
nick = "KudosBot"
xmpp = KudosBot(jid, password, rooms, nick)
xmpp.register_plugin('xep_0030') # Service Discovery
xmpp.register_plugin('xep_0045') # Multi-User Chat
xmpp.register_plugin('xep_0199') # XMPP Ping
xmpp.connect()
xmpp.process(forever=False)