Add bots her.st and sid;

Do not import kaikout.sqlite;
Improve handling of configuration session;
Add support for RTBL;
Add scores for actions kick (role none) and ban (affiliation outcast);
Add recourse functions to be utilized when bot affiliation is not owner;
Add functions to check for affiliations admin and owner.
This commit is contained in:
Schimon Jehudah, Adv. 2024-07-30 07:56:00 +03:00
parent 632622f98f
commit 61c49c678a
9 changed files with 425 additions and 86 deletions

View file

@ -83,6 +83,63 @@ network, albeit they make an extensive use of XMPP themselves.
"""]
url = "http://schimon.i2p"
[[friends]]
title = "Similar Projects"
subtitle = """
From Austria to Japan. Moderation bots made by our counterparts. \
It is obvious that KaikOut may not be as useful as other moderation bots, as \
each project is formed namely by peculiar design and political principles. \
You are encouraged to review the following projects and pick the one that \
fits best to you.
"""
[[friends]]
name = "sid an xmpp bot"
info = ["""
The ultimate XMPP bot! Some of its features include
RTBL
Real Time Block List which would download a hash list of blacklisted JIDs, and \
block them upon join into groupchat.
Feeds
Publish news from various Debian feeds (security, planet, package tracker).
It can easily be used for other feeds (Atom and RSS).
Debian Archive
Fetch debian packages info from the debian archive
Debian Bugs
Intercept debian bugs numbers in MUC messages and send info about it
"""]
interface = "Groupchat"
url = "https://kaliko.gitlab.io/sid/index.html"
[[friends]]
name = "her.st bot"
info = ["""
The ultimate XMPP bot! Some of its features include
Anti Spam
Social Credit Score
Distributed Moderation
Url Rewriter for Privacy Frontends
Url Title Extractor
Url Description Extractor
Url Image Extractor
Searx
Searx Images
Searx Videos
Crypto Exchange Rates
Translation
Activity Graphs
Join our MUC to see it in action and make it join yours! Its free!
However, its not open-source.
"""]
interface = "Groupchat"
url = "https://her.st/bot/"
[[legal]]
title = "Legal"
subtitle = "Legal Notice"

View file

@ -6,7 +6,7 @@ from email.utils import parseaddr
import hashlib
from kaikout.database import Toml
from kaikout.log import Logger
import kaikout.sqlite as sqlite
#import kaikout.sqlite as sqlite
import os
import sys
import tomli_w

View file

@ -1,2 +1,2 @@
__version__ = '0.0.1'
__version_info__ = (0, 0, 1)
__version__ = '0.0.2'
__version_info__ = (0, 0, 2)

View file

@ -60,69 +60,100 @@ class XmppChat:
jid_full = jid.full
room = self.sessions[jid_bare] if jid_bare in self.sessions else message_body
status_mode,status_text, message_response = None, None, None
# Start a configuration session
if '@' in room:
if room in XmppMuc.get_joined_rooms(self):
alias = await XmppUtilities.is_jid_of_moderators(
self, room, jid_full)
if jid_bare not in self.sessions:
if alias:
# alias = XmppMuc.get_alias(self, room, jid)
# if XmppUtilities.is_moderator(self, room, alias):
self.sessions[jid_bare] = room
message_response = (
'A session to configure groupchat {} has been '
'established.'.format(room))
status_mode = 'chat'
status_text = 'Session is opened: {}'.format(room)
owners = await XmppMuc.get_affiliation(self, room,
'owner')
for owner in owners:
if XmppUtilities.is_moderator(self, room, self.alias):
alias = await XmppUtilities.is_jid_of_moderators(
self, room, jid_full)
if jid_bare not in self.sessions:
if alias:
# alias = XmppMuc.get_alias(self, room, jid)
# if XmppUtilities.is_moderator(self, room, alias):
room_owners = await XmppMuc.get_affiliation_list(
self, room, 'owner')
if not room_owners:
present_participants = XmppMuc.get_roster(self, room)
present_owners_alias = []
for participant in present_participants:
affiliation = XmppMuc.get_affiliation(
self, room, participant)
if affiliation == 'owner':
present_owners_alias.append(participant)
present_owners = []
for owner_alias in present_owners_alias:
owner_jid_full = XmppMuc.get_full_jid(self, room, owner_alias)
owner_jid_bare = owner_jid_full.split('/')[0]
present_owners.append(owner_jid_bare)
if not present_owners:
# present_moderators = await XmppMuc.get_role_list(
# self, room, 'moderator')
# moderators = []
# [moderators.append(moderator) for moderator in present_moderators if moderator not in moderators]
room_admins = await XmppMuc.get_affiliation_list(
self, room, 'admin')
# NOTE: Consideration, when there is no access
# to the list of owners from groupchat configuration
# then send a message to the groupchat - use alias
# instead of a Jabber ID.
jids_to_notify = room_owners or present_owners or room_admins
self.sessions[jid_bare] = room
message_response = (
'A session to configure groupchat {} has been '
'established.'.format(room))
status_mode = 'chat'
status_text = 'Session is opened: {}'.format(room)
for jid in jids_to_notify:
message_notification = (
'A session for groupchat {} has been '
'activated by moderator {}'
.format(room, jid_bare))
XmppMessage.send(
self, jid, message_notification, 'chat')
else:
message_response = (
'You do not appear to be a moderator of '
'groupchat {}'.format(room))
status_mode = 'available'
status_text = (
'Type the desired groupchat - in which you '
'are a moderator at - to configure')
moderators = await XmppMuc.get_role_list(
self, room, 'moderator')
message_notification = (
'A session for groupchat {} has been '
'activated by moderator {}'
'An unauthorized attempt to establish a '
'session for groupchat {} has been made by {}'
.format(room, jid_bare))
XmppMessage.send(
self, owner, message_notification, 'chat')
else:
for moderator in moderators:
jid_full = XmppMuc.get_full_jid(self, room,
moderator)
XmppMessage.send(
self, jid_full, message_notification, 'chat')
elif not alias:
del self.sessions[jid_bare]
message_response = (
'You do not appear to be a moderator of '
'groupchat {}'.format(room))
status_mode = 'available'
status_text = (
'Type the desired groupchat - in which you '
'are a moderator at - to configure')
moderators = await XmppMuc.get_role(
'The session has been ended, because you are no '
'longer a moderator at groupchat {}'.format(room))
status_mode = 'away'
status_text = 'Session is closed: {}'.format(room)
moderators = await XmppMuc.get_role_list(
self, room, 'moderator')
message_notification = (
'An unauthorized attempt to establish a '
'session for groupchat {} has been made by {}'
.format(room, jid_bare))
'The session for groupchat {} with former '
'moderator {} has been terminated.\n'
'A termination message has been sent to {}'
.format(room, jid_bare, jid_bare))
for moderator in moderators:
jid_full = XmppMuc.get_full_jid(self, room,
moderator)
XmppMessage.send(
self, jid_full, message_notification, 'chat')
elif not alias:
del self.sessions[jid_bare]
message_response = (
'The session has been ended, because you are no '
'longer a moderator at groupchat {}'.format(room))
status_mode = 'away'
status_text = 'Session is closed: {}'.format(room)
moderators = await XmppMuc.get_role(
self, room, 'moderator')
message_notification = (
'The session for groupchat {} with former '
'moderator {} has been terminated.\n'
'A termination message has been sent to {}'
.format(room, jid_bare, jid_bare))
for moderator in moderators:
jid_full = XmppMuc.get_full_jid(self, room,
moderator)
XmppMessage.send(
self, jid_full, message_notification, 'chat')
else:
room = self.sessions[jid_bare]
else:
room = self.sessions[jid_bare]
message_response = (
'KaikOut must be a moderator at groupchat "{}".'
.format(room))
else:
message_response = (
'Invite KaikOut to groupchat "{}" and try again.\n'
@ -138,6 +169,8 @@ class XmppChat:
XmppMessage.send_reply(self, message, message_response)
return
else:
print('AN UNKNOWN MESSAGE')
print(message)
return
db_file = Toml.instantiate(self, room)

View file

@ -15,6 +15,7 @@ from kaikout.xmpp.groupchat import XmppGroupchat
from kaikout.xmpp.message import XmppMessage
from kaikout.xmpp.moderation import XmppModeration
from kaikout.xmpp.muc import XmppMuc
from kaikout.xmpp.pubsub import XmppPubsub
from kaikout.xmpp.status import XmppStatus
import time
@ -81,7 +82,7 @@ class XmppClient(slixmpp.ClientXMPP):
self.add_event_handler("groupchat_direct_invite", self.on_groupchat_direct_invite) # XEP_0249
self.add_event_handler("groupchat_invite", self.on_groupchat_invite) # XEP_0045
self.add_event_handler("message", self.on_message)
# self.add_event_handler("reactions", self.on_reactions)
self.add_event_handler("reactions", self.on_reactions)
# self.add_event_handler("room_activity", self.on_room_activity)
# self.add_event_handler("session_resumed", self.on_session_resumed)
self.add_event_handler("session_start", self.on_session_start)
@ -197,9 +198,10 @@ class XmppClient(slixmpp.ClientXMPP):
if score > score_max:
if self.settings[room]['action']:
jid_bare = await XmppCommands.outcast(self, room, alias, reason)
# admins = await XmppMuc.get_affiliation(self, room, 'admin')
# owners = await XmppMuc.get_affiliation(self, room, 'owner')
moderators = await XmppMuc.get_role(self, room, 'moderator')
# admins = await XmppMuc.get_affiliation_list(self, room, 'admin')
# owners = await XmppMuc.get_affiliation_list(self, room, 'owner')
moderators = await XmppMuc.get_role_list(
self, room, 'moderator')
# Report to the moderators.
message_to_moderators = (
'Participant {} ({}) has been banned from '
@ -250,13 +252,50 @@ class XmppClient(slixmpp.ClientXMPP):
self, jid_bare, message_to_participant, 'chat')
async def on_muc_got_online(self, presence):
room = presence['muc']['room']
alias = presence['muc']['nick']
presence_body = 'User has joined to the groupchat.'
identifier = presence['id']
lang = presence['lang']
timestamp_iso = datetime.now().isoformat()
fields = ['message', timestamp_iso, alias, presence_body, lang, identifier]
filename = datetime.today().strftime('%Y-%m-%d') + '_' + room
Log.csv(filename, fields)
if (XmppMuc.is_moderator(self, room, self.alias) and
self.settings[room]['enabled']):
jid = presence['muc']['jid']
from hashlib import sha256
jid_to_sha256 = sha256(jid.bare.encode('utf-8')).hexdigest()
rtbl_jid_full = 'xmppbl.org'
rtbl_node_id = 'muc_bans_sha256'
rtbl_list = await XmppPubsub.get_items(self, rtbl_jid_full, rtbl_node_id)
for item in rtbl_list['pubsub']['items']:
if jid_to_sha256 == item['id']:
reason = 'Jabber ID has been marked by RTBL.'
await XmppCommands.devoice(self, room, alias, reason)
break
# message_body = 'Greetings {} and welcome to groupchat {}'.format(alias, room)
# XmppMessage.send(self, jid.bare, message_body, 'chat')
# Send MUC-PM in case there is no indication for reception of 1:1
# jid_from = presence['from']
# XmppMessage.send(self, jid_from, message_body, 'chat')
async def on_muc_presence(self, presence):
alias = presence['muc']['nick']
identifier = presence['id']
jid_full = presence['muc']['jid']
jid_bare = jid_full.bare
lang = presence['lang']
presence_body = presence['status']
status_codes = presence['muc']['status_codes']
actor_alias = presence['muc']['item']['actor']['nick']
if 301 in status_codes:
presence_body = 'User has been banned by {}'.format(actor_alias)
elif 307 in status_codes:
presence_body = 'User has been kicked by {}'.format(actor_alias)
else:
presence_body = presence['status']
room = presence['muc']['room']
timestamp_iso = datetime.now().isoformat()
fields = ['presence', timestamp_iso, alias, presence_body, lang, identifier]
@ -267,10 +306,34 @@ class XmppClient(slixmpp.ClientXMPP):
if (XmppMuc.is_moderator(self, room, self.alias) and
self.settings[room]['enabled'] and
alias != self.alias):
# import time # FIXME Why is this required if it is already stated at the top?
timestamp = time.time()
fields = [alias, presence_body, identifier, timestamp]
Log.toml(self, room, fields, 'presence')
# Count bans and kicks
if self.settings[room]['check_moderation']:
status_codes = presence['muc']['status_codes']
if (301 in status_codes or 307 in status_codes):
actor_jid_bare = presence['muc']['item']['actor']['jid'].bare
actor_alias = presence['muc']['item']['actor']['nick']
if 301 in status_codes:
presence_body = 'User has been banned by {}'.format(actor_alias)
XmppCommands.update_score_ban(self, room, actor_jid_bare, db_file)
elif 307 in status_codes:
presence_body = 'User has been kicked by {}'.format(actor_alias)
XmppCommands.update_score_kick(self, room, actor_jid_bare, db_file)
if 'score_ban' in self.settings[room] and actor_jid_bare in self.settings[room]['score_ban']:
score_ban = self.settings[room]['score_ban'][actor_jid_bare]
else:
score_ban = 0
if 'score_kick' in self.settings[room] and actor_jid_bare in self.settings[room]['score_kick']:
score_kick = self.settings[room]['score_kick'][actor_jid_bare]
else:
score_kick = 0
score_outcast = score_ban + score_kick
if score_outcast > self.settings[room]['score_outcast']:
reason = 'Moderation abuse has been triggered'
await XmppMuc.set_affiliation(self, room, 'member', jid=actor_jid_bare, reason=reason)
await XmppMuc.set_role(self, room, actor_alias, 'participant', reason)
# Check for status message
if self.settings[room]['check_status']:
reason, timer = XmppModeration.moderate_status_message(self, presence_body, room)
@ -305,10 +368,12 @@ class XmppClient(slixmpp.ClientXMPP):
score = XmppCommands.raise_score(self, room, alias, db_file, reason)
if score > score_max:
if self.settings[room]['action']:
jid_bare = await XmppCommands.outcast(self, room, alias, reason)
# admins = await XmppMuc.get_affiliation(self, room, 'admin')
# owners = await XmppMuc.get_affiliation(self, room, 'owner')
moderators = await XmppMuc.get_role(self, room, 'moderator')
jid_bare = await XmppCommands.outcast(
self, room, alias, reason)
# admins = await XmppMuc.get_affiliation_list(self, room, 'admin')
# owners = await XmppMuc.get_affiliation_list(self, room, 'owner')
moderators = await XmppMuc.get_role_list(
self, room, 'moderator')
# Report to the moderators.
message_to_moderators = (
'Participant {} ({}) has been banned from '
@ -370,14 +435,27 @@ class XmppClient(slixmpp.ClientXMPP):
self, jid_bare, message_to_participant, 'chat')
async def on_muc_self_presence(self, presence):
def on_muc_self_presence(self, presence):
actor = presence['muc']['item']['actor']['nick']
alias = presence['muc']['nick']
room = presence['muc']['room']
if actor and alias == self.alias: XmppStatus.send_status_message(self, room)
async def on_room_activity(self, presence):
def on_reactions(self, message):
reactions = message['reactions']['values']
alias = message['mucnick']
room = message['mucroom']
affiliation = XmppMuc.get_affiliation(self, room, alias)
if affiliation == 'member' and '👎' in reactions:
message_body = '{} {} has reacted to message {} with {}'.format(
affiliation, alias, message['id'], message['reactions']['plugin']['value'])
message.reply(message_body).send()
print(message_body)
print(room)
def on_room_activity(self, presence):
print('on_room_activity')
print(presence)
print('testing mix core')
@ -406,6 +484,7 @@ class XmppClient(slixmpp.ClientXMPP):
# See also get_joined_rooms of slixmpp.plugins.xep_0045
for room in rooms:
XmppStatus.send_status_message(self, room)
self.add_event_handler("muc::%s::got_online" % room, self.on_muc_got_online)
self.add_event_handler("muc::%s::presence" % room, self.on_muc_presence)
self.add_event_handler("muc::%s::self-presence" % room, self.on_muc_self_presence)
await asyncio.sleep(5)

View file

@ -249,12 +249,12 @@ class XmppCommands:
def print_statistics(db_file):
"""
Print statistics.
Parameters
----------
db_file : str
Path to database file.
Returns
-------
msg : str
@ -294,25 +294,27 @@ class XmppCommands:
def print_version():
message = __version__
return message
def raise_score(self, room, alias, db_file, reason):
"""
Raise score by one.
Parameters
----------
db_file : str
Database filename.
room : str
Jabber ID.
alias : str
Alias.
db_file : str
Database filename.
reason : str
Reason.
Returns
-------
result.
"""
status_message = '✒️ Writing a score against {} for {}'.format(alias, reason)
self.action_count += 1
@ -331,23 +333,77 @@ class XmppCommands:
XmppStatus.send_status_message(self, room)
result = scores[jid_bare] if jid_full and jid_bare else 0
return result
def update_score_ban(self, room, jid_bare, db_file):
"""
Update ban score.
Parameters
----------
room : str
Jabber ID.
jid_bare : str
Jabber ID.
db_file : str
Database filename.
Returns
-------
result.
"""
scores = self.settings[room]['score_ban'] if 'score_ban' in self.settings[room] else {}
scores[jid_bare] = scores[jid_bare] + 1 if jid_bare in scores else 1
Toml.update_jid_settings(self, room, db_file, 'score_ban', scores)
# result = scores[jid_bare]
# return result
def update_score_kick(self, room, jid_bare, db_file):
"""
Update kick score.
Parameters
----------
room : str
Jabber ID.
jid_bare : str
Jabber ID.
db_file : str
Database filename.
Returns
-------
result.
"""
scores = self.settings[room]['score_kick'] if 'score_kick' in self.settings[room] else {}
scores[jid_bare] = scores[jid_bare] + 1 if jid_bare in scores else 1
Toml.update_jid_settings(self, room, db_file, 'score_kick', scores)
# result = scores[jid_bare]
# return result
def update_last_activity(self, room, jid_bare, db_file, timestamp):
"""
Update last message activity.
Parameters
----------
room : str
Jabber ID.
db_file : str
Database filename.
jid_bare : str
Jabber ID.
timestamp :
Time stamp.
Returns
-------
result.
"""
activity = self.settings[room]['last_activity'] if 'last_activity' in self.settings[room] else {}
activity[jid_bare] = timestamp

View file

@ -23,11 +23,24 @@ logger = Logger(__name__)
class XmppMuc:
def get_affiliation(self, room, alias):
"""Get an affiliation of a specified alias"""
affiliation = self.plugin['xep_0045'].get_jid_property(room, alias, 'affiliation')
return affiliation
async def get_affiliation(self, room, affiliation):
jids = await self.plugin['xep_0045'].get_affiliation_list(room, affiliation)
return jids
async def get_affiliation_list(self, room, affiliation):
"""Get an affiliation list from groupchat config"""
try:
jids = await self.plugin['xep_0045'].get_affiliation_list(room, affiliation)
return jids
except Exception as e:
logger.error('KaikOut has failed to query the server for a list '
'of Jabber IDs with the affiliation "{}" for '
'groupchat {}'.format(affiliation, room))
logger.error(e)
def get_alias(self, room, jid):
@ -51,11 +64,24 @@ class XmppMuc:
def get_joined_rooms(self):
rooms = self.plugin['xep_0045'].get_joined_rooms()
return rooms
def get_role(self, room, alias):
"""Get a role of a specified alias"""
role = self.plugin['xep_0045'].get_jid_property(room, alias, 'role')
return role
async def get_role(self, room, role):
jids = await self.plugin['xep_0045'].get_roles_list(room, role)
return jids
async def get_role_list(self, room, role):
"""Get a role list from groupchat config"""
try:
jids = await self.plugin['xep_0045'].get_roles_list(room, role)
return jids
except Exception as e:
logger.error('KaikOut has failed to query the server for a list '
'of Jabber IDs with the role "{}" for groupchat {}'
.format(role, room))
logger.error(e)
def get_roster(self, room):

68
kaikout/xmpp/pubsub.py Normal file
View file

@ -0,0 +1,68 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Functions create_node and create_entry are derived from project atomtopubsub.
"""
import hashlib
from kaikout.log import Logger
logger = Logger(__name__)
class XmppPubsub:
async def get_pubsub_services(self):
results = []
iq = await self['xep_0030'].get_items(jid=self.boundjid.domain)
items = iq['disco_items']['items']
for item in items:
iq = await self['xep_0030'].get_info(jid=item[0])
identities = iq['disco_info']['identities']
for identity in identities:
if identity[0] == 'pubsub' and identity[1] == 'service':
result = {}
result['jid'] = item[0]
if item[1]: result['name'] = item[1]
elif item[2]: result['name'] = item[2]
else: result['name'] = item[0]
results.extend([result])
return results
async def get_node_properties(self, jid, node):
config = await self.plugin['xep_0060'].get_node_config(jid, node)
subscriptions = await self.plugin['xep_0060'].get_node_subscriptions(jid, node)
affiliations = await self.plugin['xep_0060'].get_node_affiliations(jid, node)
properties = {'config': config,
'subscriptions': subscriptions,
'affiliations': affiliations}
breakpoint()
return properties
async def get_node_configuration(self, jid, node_id):
node = await self.plugin['xep_0060'].get_node_config(jid, node_id)
if not node:
print('NODE CONFIG', node_id, str(node))
return node
async def get_nodes(self, jid):
nodes = await self.plugin['xep_0060'].get_nodes(jid)
# 'self' would lead to slixmpp.jid.InvalidJID: idna validation failed:
return nodes
async def get_item(self, jid, node, item_id):
item = await self.plugin['xep_0060'].get_item(jid, node, item_id)
return item
async def get_items(self, jid, node):
items = await self.plugin['xep_0060'].get_items(jid, node)
return items

View file

@ -13,7 +13,7 @@ class XmppUtilities:
async def is_jid_of_moderators(self, room, jid_full):
# try:
moderators = await XmppMuc.get_role(self, room, 'moderator')
moderators = await XmppMuc.get_role_list(self, room, 'moderator')
for alias in moderators:
# Note: You might want to compare jid_bare
if XmppMuc.get_full_jid(self, room, alias) == jid_full:
@ -96,6 +96,26 @@ class XmppUtilities:
return result
def is_admin(self, room, alias):
"""Check if given JID is an administrator"""
role = self.plugin['xep_0045'].get_jid_property(room, alias, 'affiliation')
if role == 'admin':
result = True
else:
result = False
return result
def is_owner(self, room, alias):
"""Check if given JID is an owner"""
role = self.plugin['xep_0045'].get_jid_property(room, alias, 'affiliation')
if role == 'owner':
result = True
else:
result = False
return result
def is_moderator(self, room, alias):
"""Check if given JID is a moderator"""
role = self.plugin['xep_0045'].get_jid_property(room, alias, 'role')