1
0
Fork 0
forked from sch/KaikOut

Add whitelist mechanism;

Improve affiliation and role recognition;
Atomize functions into module observation.
This commit is contained in:
Schimon Jehudah, Adv. 2024-09-15 15:52:10 +03:00
parent b76e8313bb
commit 86679f18e8
6 changed files with 296 additions and 259 deletions

View file

@ -51,6 +51,20 @@ goodbye
Leave groupchat and delete it from bookmarks. Leave groupchat and delete it from bookmarks.
""" """
[list]
blacklist = """
blacklist [+|-] <keyword>
Jabber IDs to blacklist
comma-separated keywords
'+' appends to, '-' removes from.
"""
whitelist = """
whitelist [+|-] <keyword>
Jabber IDs to whitelist
comma-separated keywords
'+' appends to, '-' removes from.
"""
[manual] [manual]
all = """ all = """
help all help all

View file

@ -1,2 +1,2 @@
__version__ = '0.0.2' __version__ = '0.0.3'
__version_info__ = (0, 0, 2) __version_info__ = (0, 0, 3)

View file

@ -194,6 +194,22 @@ class XmppChat:
response = None response = None
match command_lowercase: match command_lowercase:
case _ if command_lowercase.startswith('blacklist +'):
value = command[11:].strip()
if value:
response = XmppCommands.set_filter(
self, room, db_file, value, 'jid_blacklist', True)
else:
response = ('No action has been taken. '
'Missing Jabber IDs.')
case _ if command_lowercase.startswith('blacklist -'):
value = command[11:].strip()
if value:
response = XmppCommands.set_filter(
self, room, db_file, value, 'jid_blacklist', False)
else:
response = ('No action has been taken. '
'Missing Jabber IDs.')
case 'help': case 'help':
command_list = XmppCommands.print_help() command_list = XmppCommands.print_help()
response = ('Available command keys:\n' response = ('Available command keys:\n'
@ -539,6 +555,22 @@ class XmppChat:
response = str(self.settings[room]['timer']) response = str(self.settings[room]['timer'])
case 'version': case 'version':
response = XmppCommands.print_version() response = XmppCommands.print_version()
case _ if command_lowercase.startswith('whitelist +'):
value = command[11:].strip()
if value:
response = XmppCommands.set_filter(
self, room, db_file, value, 'jid_whitelist', True)
else:
response = ('No action has been taken. '
'Missing Jabber IDs.')
case _ if command_lowercase.startswith('whitelist -'):
value = command[11:].strip()
if value:
response = XmppCommands.set_filter(
self, room, db_file, value, 'jid_whitelist', False)
else:
response = ('No action has been taken. '
'Missing Jabber IDs.')
case _ if command_lowercase.startswith('xmpp:'): case _ if command_lowercase.startswith('xmpp:'):
response = await XmppCommands.muc_join(self, command) response = await XmppCommands.muc_join(self, command)
case _: case _:

View file

@ -12,8 +12,8 @@ from kaikout.xmpp.chat import XmppChat
from kaikout.xmpp.commands import XmppCommands from kaikout.xmpp.commands import XmppCommands
from kaikout.xmpp.groupchat import XmppGroupchat from kaikout.xmpp.groupchat import XmppGroupchat
from kaikout.xmpp.message import XmppMessage from kaikout.xmpp.message import XmppMessage
from kaikout.xmpp.moderation import XmppModeration
from kaikout.xmpp.muc import XmppMuc from kaikout.xmpp.muc import XmppMuc
from kaikout.xmpp.observation import XmppObservation
from kaikout.xmpp.pubsub import XmppPubsub from kaikout.xmpp.pubsub import XmppPubsub
from kaikout.xmpp.status import XmppStatus from kaikout.xmpp.status import XmppStatus
import slixmpp import slixmpp
@ -188,71 +188,16 @@ class XmppClient(slixmpp.ClientXMPP):
# await XmppChat.process_message(self, message) # await XmppChat.process_message(self, message)
if (XmppMuc.is_moderator(self, room, self.alias) and if (XmppMuc.is_moderator(self, room, self.alias) and
self.settings[room]['enabled'] and self.settings[room]['enabled'] and
alias != self.alias): alias != self.alias and
jid_bare and
jid_bare not in self.settings[room]['jid_whitelist']):
identifier = message['id'] identifier = message['id']
fields = [alias, message_body, identifier, timestamp] fields = [alias, message_body, identifier, timestamp]
Log.toml(self, room, fields, 'message') Log.toml(self, room, fields, 'message')
# Check for message # Check for message
if self.settings[room]['check_message']: await XmppObservation.observe_message(self, db_file, alias, message_body, room)
reason = XmppModeration.moderate_message(self, message_body, room)
if reason:
score_max = self.settings[room]['score_messages']
score = XmppCommands.raise_score(self, room, alias, db_file, reason)
if score > score_max:
if self.settings[room]['action']:
jid_bare = await XmppCommands.outcast(self, room, alias, reason)
# admins = await XmppMuc.get_affiliation_list(self, room, 'admin')
# owners = await XmppMuc.get_affiliation_list(self, room, 'owner')
moderators = await XmppMuc.get_role_list(
self, room, 'moderator')
# Report to the moderators.
message_to_moderators = (
'Participant {} ({}) has been banned from '
'groupchat {}.'.format(alias, jid_bare, room))
for alias in moderators:
jid_full = XmppMuc.get_full_jid(self, room, alias)
XmppMessage.send(self, jid_full, message_to_moderators, 'chat')
# Inform the subject
message_to_participant = (
'You were banned from groupchat {}. Please '
'contact the moderators if you think this was '
'a mistake.'.format(room))
XmppMessage.send(self, jid_bare, message_to_participant, 'chat')
else:
await XmppCommands.devoice(self, room, alias, reason)
# Check for inactivity # Check for inactivity
if self.settings[room]['check_inactivity']: await XmppObservation.observe_inactivity(self, db_file, room)
roster_muc = XmppMuc.get_roster(self, room)
for alias in roster_muc:
if alias != self.alias:
jid_bare = XmppMuc.get_full_jid(self, room, alias).split('/')[0]
result, span = XmppModeration.moderate_last_activity(
self, room, jid_bare, timestamp)
if result:
message_to_participant = None
if 'inactivity_notice' not in self.settings[room]:
self.settings[room]['inactivity_notice'] = []
noticed_jids = self.settings[room]['inactivity_notice']
if result == 'Inactivity':
if jid_bare in noticed_jids: noticed_jids.remove(jid_bare)
await XmppCommands.kick(self, room, alias, reason)
message_to_participant = (
'You were expelled from groupchat {} due to '
'being inactive for {} days.'.format(room, span))
elif result == 'Warning' and jid_bare not in noticed_jids:
noticed_jids.append(jid_bare)
time_left = int(span)
if not time_left: time_left = 'an'
message_to_participant = (
'This is an inactivity-warning.\n'
'You are expected to be expelled from '
'groupchat {} within {} hour time.'
.format(room, int(span) or 'an'))
Toml.update_jid_settings(
self, room, db_file, 'inactivity_notice', noticed_jids)
if message_to_participant:
XmppMessage.send(
self, jid_bare, message_to_participant, 'chat')
async def on_muc_got_online(self, presence): async def on_muc_got_online(self, presence):
@ -265,20 +210,12 @@ class XmppClient(slixmpp.ClientXMPP):
fields = ['message', timestamp_iso, alias, presence_body, lang, identifier] fields = ['message', timestamp_iso, alias, presence_body, lang, identifier]
filename = datetime.today().strftime('%Y-%m-%d') + '_' + room filename = datetime.today().strftime('%Y-%m-%d') + '_' + room
Log.csv(filename, fields) Log.csv(filename, fields)
jid_bare = presence['muc']['jid'].bare
if (XmppMuc.is_moderator(self, room, self.alias) and if (XmppMuc.is_moderator(self, room, self.alias) and
self.settings[room]['enabled']): self.settings[room]['enabled'] and
jid = presence['muc']['jid'] jid_bare and
from hashlib import sha256 jid_bare not in self.settings[room]['jid_whitelist']):
jid_to_sha256 = sha256(jid.bare.encode('utf-8')).hexdigest() await XmppObservation.observe_jid(self, alias, jid_bare, room)
for jid in self.blocklist['entries']:
if jid not in self.settings[room]['rtbl_ignore']:
for node in self.blocklist['entries'][jid]:
for item_id in self.blocklist['entries'][jid][node]:
if jid_to_sha256 == item_id:
reason = 'Jabber ID has been marked by RTBL: Publisher: {}; Node: {}.'.format(
jid, node)
await XmppCommands.devoice(self, room, alias, reason)
break
# message_body = 'Greetings {} and welcome to groupchat {}'.format(alias, room) # message_body = 'Greetings {} and welcome to groupchat {}'.format(alias, room)
# XmppMessage.send(self, jid.bare, message_body, 'chat') # XmppMessage.send(self, jid.bare, message_body, 'chat')
# Send MUC-PM in case there is no indication for reception of 1:1 # Send MUC-PM in case there is no indication for reception of 1:1
@ -289,8 +226,7 @@ class XmppClient(slixmpp.ClientXMPP):
async def on_muc_presence(self, presence): async def on_muc_presence(self, presence):
alias = presence['muc']['nick'] alias = presence['muc']['nick']
identifier = presence['id'] identifier = presence['id']
jid_full = presence['muc']['jid'] jid_bare = presence['muc']['jid'].bare
jid_bare = jid_full.bare
lang = presence['lang'] lang = presence['lang']
status_codes = presence['muc']['status_codes'] status_codes = presence['muc']['status_codes']
actor_alias = presence['muc']['item']['actor']['nick'] actor_alias = presence['muc']['item']['actor']['nick']
@ -309,159 +245,18 @@ class XmppClient(slixmpp.ClientXMPP):
db_file = Toml.instantiate(self, room) db_file = Toml.instantiate(self, room)
if (XmppMuc.is_moderator(self, room, self.alias) and if (XmppMuc.is_moderator(self, room, self.alias) and
self.settings[room]['enabled'] and self.settings[room]['enabled'] and
alias != self.alias): alias != self.alias and
jid_bare and
jid_bare not in self.settings[room]['jid_whitelist']):
timestamp = time.time() timestamp = time.time()
fields = [alias, presence_body, identifier, timestamp] fields = [alias, presence_body, identifier, timestamp]
Log.toml(self, room, fields, 'presence') Log.toml(self, room, fields, 'presence')
# Count bans and kicks # Count bans and kicks
if self.settings[room]['check_moderation']: await XmppObservation.observe_strikes(self, db_file, presence, room)
status_codes = presence['muc']['status_codes']
if (301 in status_codes or 307 in status_codes):
actor_jid_bare = presence['muc']['item']['actor']['jid'].bare
actor_alias = presence['muc']['item']['actor']['nick']
if 301 in status_codes:
presence_body = 'User has been banned by {}'.format(actor_alias)
XmppCommands.update_score_ban(self, room, actor_jid_bare, db_file)
elif 307 in status_codes:
presence_body = 'User has been kicked by {}'.format(actor_alias)
XmppCommands.update_score_kick(self, room, actor_jid_bare, db_file)
if 'score_ban' in self.settings[room] and actor_jid_bare in self.settings[room]['score_ban']:
score_ban = self.settings[room]['score_ban'][actor_jid_bare]
else:
score_ban = 0
if 'score_kick' in self.settings[room] and actor_jid_bare in self.settings[room]['score_kick']:
score_kick = self.settings[room]['score_kick'][actor_jid_bare]
else:
score_kick = 0
score_outcast = score_ban + score_kick
if score_outcast > self.settings[room]['score_outcast']:
reason = 'Moderation abuse has been triggered'
await XmppMuc.set_affiliation(self, room, 'member', jid=actor_jid_bare, reason=reason)
await XmppMuc.set_role(self, room, actor_alias, 'participant', reason)
# Check for status message # Check for status message
if self.settings[room]['check_status']: await XmppObservation.observe_status_message(self, alias, db_file, jid_bare, presence_body, room)
reason, timer = XmppModeration.moderate_status_message(self, presence_body, room)
if reason and timer and not (room in self.tasks and
jid_bare in self.tasks[room] and
'countdown' in self.tasks[room][jid_bare]):
print('reason and timer for jid: ' + jid_bare + ' at room ' + room)
score_max = self.settings[room]['score_presence']
score = XmppCommands.raise_score(self, room, alias, db_file, reason)
if room not in self.tasks:
self.tasks[room] = {}
if jid_bare not in self.tasks[room]:
self.tasks[room][jid_bare] = {}
# if 'countdown' in self.tasks[room][jid_bare]:
# self.tasks[room][jid_bare]['countdown'].cancel()
if 'countdown' not in self.tasks[room][jid_bare]:
seconds = self.settings[room]['timer']
self.tasks[room][jid_bare]['countdown'] = asyncio.create_task(
XmppCommands.countdown(self, seconds, room, alias, reason))
message_to_participant = (
'Your status message "{}" violates policies of groupchat '
'{}.\n'
'You have {} seconds to change your status message, in '
'order to avoid consequent actions.'
.format(presence_body, room, seconds))
XmppMessage.send(self, jid_bare, message_to_participant, 'chat')
elif reason and not (room in self.tasks
and jid_bare in self.tasks[room] and
'countdown' in self.tasks[room][jid_bare]):
print('reason for jid: ' + jid_bare + ' at room ' + room)
score_max = self.settings[room]['score_presence']
score = XmppCommands.raise_score(self, room, alias, db_file, reason)
if score > score_max:
if self.settings[room]['action']:
jid_bare = await XmppCommands.outcast(
self, room, alias, reason)
# admins = await XmppMuc.get_affiliation_list(self, room, 'admin')
# owners = await XmppMuc.get_affiliation_list(self, room, 'owner')
moderators = await XmppMuc.get_role_list(
self, room, 'moderator')
# Report to the moderators.
message_to_moderators = (
'Participant {} ({}) has been banned from '
'groupchat {}.'.format(alias, jid_bare, room))
for alias in moderators:
# jid_full = presence['muc']['jid']
jid_full = XmppMuc.get_full_jid(self, room, alias)
XmppMessage.send(self, jid_full, message_to_moderators, 'chat')
# Inform the subject.
message_to_participant = (
'You were banned from groupchat {}. Please '
'contact the moderators if you think this was a '
'mistake.'.format(room))
XmppMessage.send(self, jid_bare, message_to_participant, 'chat')
else:
await XmppCommands.devoice(self, room, alias, reason)
elif (room in self.tasks and
jid_bare in self.tasks[room] and
'countdown' in self.tasks[room][jid_bare]) and not reason:
print('cancel task for jid: ' + jid_bare + ' at room ' + room)
print(self.tasks[room][jid_bare]['countdown'])
if self.tasks[room][jid_bare]['countdown'].cancel():
print(self.tasks[room][jid_bare]['countdown'])
message_to_participant = 'Thank you for your cooperation.'
XmppMessage.send(self, jid_bare, message_to_participant, 'chat')
del self.tasks[room][jid_bare]['countdown']
# Check for inactivity # Check for inactivity
if self.settings[room]['check_inactivity']: await XmppObservation.observe_inactivity(self, db_file, room)
roster_muc = XmppMuc.get_roster(self, room)
for alias in roster_muc:
if alias != self.alias:
jid_bare = XmppMuc.get_full_jid(self, room, alias).split('/')[0]
result, span = XmppModeration.moderate_last_activity(
self, room, jid_bare, timestamp)
if result:
message_to_participant = None
if 'inactivity_notice' not in self.settings[room]:
self.settings[room]['inactivity_notice'] = []
noticed_jids = self.settings[room]['inactivity_notice']
if result == 'Inactivity':
if jid_bare in noticed_jids: noticed_jids.remove(jid_bare)
# FIXME Counting and creating of key entry "score_inactivity" appear not to occur.
score_inactivity = XmppCommands.raise_score_inactivity(self, room, jid_bare, db_file)
if score_inactivity > 10:
jid_bare = await XmppCommands.outcast(self, room, alias, reason)
# admins = await XmppMuc.get_affiliation_list(self, room, 'admin')
# owners = await XmppMuc.get_affiliation_list(self, room, 'owner')
moderators = await XmppMuc.get_role_list(
self, room, 'moderator')
# Report to the moderators.
message_to_moderators = (
'Participant {} ({}) has been banned from '
'groupchat {} due to being inactive for over {} times.'.format(
alias, jid_bare, room, score_inactivity))
for alias in moderators:
# jid_full = presence['muc']['jid']
jid_full = XmppMuc.get_full_jid(self, room, alias)
XmppMessage.send(self, jid_full, message_to_moderators, 'chat')
# Inform the subject.
message_to_participant = (
'You were banned from groupchat {} due to being '
'inactive for over {} times. Please contact the '
' moderators if you think this was a mistake'
.format(room, score_inactivity))
else:
await XmppCommands.kick(self, room, alias, reason)
message_to_participant = (
'You were expelled from groupchat {} due to '
'being inactive for over {} days.'.format(room, span))
XmppCommands.remove_last_activity(self, room, jid_bare, db_file)
elif result == 'Warning' and jid_bare not in noticed_jids:
noticed_jids.append(jid_bare)
time_left = int(span)
if not time_left: time_left = 'an'
message_to_participant = (
'This is an inactivity-warning.\n'
'You are expected to be expelled from '
'groupchat {} within {} hour time.'
.format(room, int(span) or 'an'))
Toml.update_jid_settings(
self, room, db_file, 'inactivity_notice', noticed_jids)
if message_to_participant:
XmppMessage.send(
self, jid_bare, message_to_participant, 'chat')
def on_muc_self_presence(self, presence): def on_muc_self_presence(self, presence):

215
kaikout/xmpp/observation.py Normal file
View file

@ -0,0 +1,215 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
from hashlib import sha256
from kaikout.database import Toml
from kaikout.log import Logger
from kaikout.xmpp.commands import XmppCommands
from kaikout.xmpp.message import XmppMessage
from kaikout.xmpp.moderation import XmppModeration
from kaikout.xmpp.muc import XmppMuc
logger = Logger(__name__)
class XmppObservation:
async def observe_inactivity(self, db_file, room):
# Check for inactivity
if self.settings[room]['check_inactivity']:
roster_muc = XmppMuc.get_roster(self, room)
for alias in roster_muc:
if alias != self.alias:
jid_bare = XmppMuc.get_full_jid(self, room, alias).split('/')[0]
result, span = XmppModeration.moderate_last_activity(
self, room, jid_bare, timestamp)
if result:
message_to_participant = None
if 'inactivity_notice' not in self.settings[room]:
self.settings[room]['inactivity_notice'] = []
noticed_jids = self.settings[room]['inactivity_notice']
if result == 'Inactivity':
if jid_bare in noticed_jids: noticed_jids.remove(jid_bare)
# FIXME Counting and creating of key entry "score_inactivity" appear not to occur.
score_inactivity = XmppCommands.raise_score_inactivity(self, room, jid_bare, db_file)
reason = 'Inactivity detected'
if score_inactivity > 10:
jid_bare = await XmppCommands.outcast(self, room, alias, reason)
# admins = await XmppMuc.get_affiliation_list(self, room, 'admin')
# owners = await XmppMuc.get_affiliation_list(self, room, 'owner')
moderators = await XmppMuc.get_role_list(
self, room, 'moderator')
# Report to the moderators.
message_to_moderators = (
'Participant {} ({}) has been banned from '
'groupchat {} due to being inactive for over {} times.'.format(
alias, jid_bare, room, score_inactivity))
for alias in moderators:
# jid_full = presence['muc']['jid']
jid_full = XmppMuc.get_full_jid(self, room, alias)
XmppMessage.send(self, jid_full, message_to_moderators, 'chat')
# Inform the subject.
message_to_participant = (
'You were banned from groupchat {} due to being '
'inactive for over {} times. Please contact the '
' moderators if you think this was a mistake'
.format(room, score_inactivity))
else:
await XmppCommands.kick(self, room, alias, reason)
message_to_participant = (
'You were expelled from groupchat {} due to '
'being inactive for over {} days.'.format(room, span))
XmppCommands.remove_last_activity(self, room, jid_bare, db_file)
elif result == 'Warning' and jid_bare not in noticed_jids:
noticed_jids.append(jid_bare)
time_left = int(span)
if not time_left: time_left = 'an'
message_to_participant = (
'This is an inactivity-warning.\n'
'You are expected to be expelled from '
'groupchat {} within {} hour time.'
.format(room, int(span) or 'an'))
Toml.update_jid_settings(
self, room, db_file, 'inactivity_notice', noticed_jids)
if message_to_participant:
XmppMessage.send(
self, jid_bare, message_to_participant, 'chat')
async def observe_jid(self, alias, jid_bare, room):
jid_to_sha256 = sha256(jid_bare.encode('utf-8')).hexdigest()
for jid in self.blocklist['entries']:
if jid not in self.settings[room]['rtbl_ignore']:
for node in self.blocklist['entries'][jid]:
for item_id in self.blocklist['entries'][jid][node]:
if jid_to_sha256 == item_id:
reason = 'Jabber ID has been marked by RTBL: Publisher: {}; Node: {}.'.format(
jid, node)
await XmppCommands.devoice(self, room, alias, reason)
break
async def observe_message(self, db_file, alias, message_body, room):
if self.settings[room]['check_message']:
reason = XmppModeration.moderate_message(self, message_body, room)
if reason:
score_max = self.settings[room]['score_messages']
score = XmppCommands.raise_score(self, room, alias, db_file, reason)
if score > score_max:
if self.settings[room]['action']:
jid_bare = await XmppCommands.outcast(self, room, alias, reason)
# admins = await XmppMuc.get_affiliation_list(self, room, 'admin')
# owners = await XmppMuc.get_affiliation_list(self, room, 'owner')
moderators = await XmppMuc.get_role_list(
self, room, 'moderator')
# Report to the moderators.
message_to_moderators = (
'Participant {} ({}) has been banned from '
'groupchat {}.'.format(alias, jid_bare, room))
for alias in moderators:
jid_full = XmppMuc.get_full_jid(self, room, alias)
XmppMessage.send(self, jid_full, message_to_moderators, 'chat')
# Inform the subject
message_to_participant = (
'You were banned from groupchat {}. Please '
'contact the moderators if you think this was '
'a mistake.'.format(room))
XmppMessage.send(self, jid_bare, message_to_participant, 'chat')
else:
await XmppCommands.devoice(self, room, alias, reason)
async def observe_status_message(self, alias, db_file, jid_bare, presence_body, room):
if self.settings[room]['check_status']:
reason, timer = XmppModeration.moderate_status_message(self, presence_body, room)
if reason and timer and not (room in self.tasks and
jid_bare in self.tasks[room] and
'countdown' in self.tasks[room][jid_bare]):
print('reason and timer for jid: ' + jid_bare + ' at room ' + room)
score_max = self.settings[room]['score_presence']
score = XmppCommands.raise_score(self, room, alias, db_file, reason)
if room not in self.tasks:
self.tasks[room] = {}
if jid_bare not in self.tasks[room]:
self.tasks[room][jid_bare] = {}
# if 'countdown' in self.tasks[room][jid_bare]:
# self.tasks[room][jid_bare]['countdown'].cancel()
if 'countdown' not in self.tasks[room][jid_bare]:
seconds = self.settings[room]['timer']
self.tasks[room][jid_bare]['countdown'] = asyncio.create_task(
XmppCommands.countdown(self, seconds, room, alias, reason))
message_to_participant = (
'Your status message "{}" violates policies of groupchat '
'{}.\n'
'You have {} seconds to change your status message, in '
'order to avoid consequent actions.'
.format(presence_body, room, seconds))
XmppMessage.send(self, jid_bare, message_to_participant, 'chat')
elif reason and not (room in self.tasks
and jid_bare in self.tasks[room] and
'countdown' in self.tasks[room][jid_bare]):
print('reason for jid: ' + jid_bare + ' at room ' + room)
score_max = self.settings[room]['score_presence']
score = XmppCommands.raise_score(self, room, alias, db_file, reason)
if score > score_max:
if self.settings[room]['action']:
jid_bare = await XmppCommands.outcast(
self, room, alias, reason)
# admins = await XmppMuc.get_affiliation_list(self, room, 'admin')
# owners = await XmppMuc.get_affiliation_list(self, room, 'owner')
moderators = await XmppMuc.get_role_list(
self, room, 'moderator')
# Report to the moderators.
message_to_moderators = (
'Participant {} ({}) has been banned from '
'groupchat {}.'.format(alias, jid_bare, room))
for alias in moderators:
# jid_full = presence['muc']['jid']
jid_full = XmppMuc.get_full_jid(self, room, alias)
XmppMessage.send(self, jid_full, message_to_moderators, 'chat')
# Inform the subject.
message_to_participant = (
'You were banned from groupchat {}. Please '
'contact the moderators if you think this was a '
'mistake.'.format(room))
XmppMessage.send(self, jid_bare, message_to_participant, 'chat')
else:
await XmppCommands.devoice(self, room, alias, reason)
elif (room in self.tasks and
jid_bare in self.tasks[room] and
'countdown' in self.tasks[room][jid_bare]) and not reason:
print('cancel task for jid: ' + jid_bare + ' at room ' + room)
print(self.tasks[room][jid_bare]['countdown'])
if self.tasks[room][jid_bare]['countdown'].cancel():
print(self.tasks[room][jid_bare]['countdown'])
message_to_participant = 'Thank you for your cooperation.'
XmppMessage.send(self, jid_bare, message_to_participant, 'chat')
del self.tasks[room][jid_bare]['countdown']
async def observe_strikes(self, db_file, presence, room):
if self.settings[room]['check_moderation']:
status_codes = presence['muc']['status_codes']
if (301 in status_codes or 307 in status_codes):
actor_jid_bare = presence['muc']['item']['actor']['jid'].bare
actor_alias = presence['muc']['item']['actor']['nick']
if 301 in status_codes:
presence_body = 'User has been banned by {}'.format(actor_alias)
XmppCommands.update_score_ban(self, room, actor_jid_bare, db_file)
elif 307 in status_codes:
presence_body = 'User has been kicked by {}'.format(actor_alias)
XmppCommands.update_score_kick(self, room, actor_jid_bare, db_file)
if 'score_ban' in self.settings[room] and actor_jid_bare in self.settings[room]['score_ban']:
score_ban = self.settings[room]['score_ban'][actor_jid_bare]
else:
score_ban = 0
if 'score_kick' in self.settings[room] and actor_jid_bare in self.settings[room]['score_kick']:
score_kick = self.settings[room]['score_kick'][actor_jid_bare]
else:
score_kick = 0
score_outcast = score_ban + score_kick
if score_outcast > self.settings[room]['score_outcast']:
reason = 'Moderation abuse has been triggered'
await XmppMuc.set_affiliation(self, room, 'member', jid=actor_jid_bare, reason=reason)
await XmppMuc.set_role(self, room, actor_alias, 'participant', reason)

View file

@ -10,7 +10,6 @@ logger = Logger(__name__)
class XmppUtilities: class XmppUtilities:
async def is_jid_of_moderators(self, room, jid_full): async def is_jid_of_moderators(self, room, jid_full):
# try: # try:
moderators = await XmppMuc.get_role_list(self, room, 'moderator') moderators = await XmppMuc.get_role_list(self, room, 'moderator')
@ -26,7 +25,7 @@ class XmppUtilities:
async def get_chat_type(self, jid): async def get_chat_type(self, jid):
""" """
Check chat (i.e. JID) type. Check chat (i.e. JID) type.
If iqresult["disco_info"]["features"] contains XML namespace If iqresult["disco_info"]["features"] contains XML namespace
of 'http://jabber.org/protocol/muc', then it is a 'groupchat'. of 'http://jabber.org/protocol/muc', then it is a 'groupchat'.
@ -34,12 +33,12 @@ class XmppUtilities:
a chat which is conducted through a groupchat. a chat which is conducted through a groupchat.
Otherwise, determine type 'chat'. Otherwise, determine type 'chat'.
Parameters Parameters
---------- ----------
jid : str jid : str
Jabber ID. Jabber ID.
Returns Returns
------- -------
result : str result : str
@ -69,8 +68,6 @@ class XmppUtilities:
# logger.info('Chat type is:', chat_type) # logger.info('Chat type is:', chat_type)
return result return result
def is_access(self, jid_bare, jid_full, chat_type): def is_access(self, jid_bare, jid_full, chat_type):
"""Determine access privilege""" """Determine access privilege"""
operator = XmppUtilities.is_operator(self, jid_bare) operator = XmppUtilities.is_operator(self, jid_bare)
@ -83,8 +80,7 @@ class XmppUtilities:
else: else:
access = False access = False
return access return access
def is_operator(self, jid_bare): def is_operator(self, jid_bare):
"""Check if given JID is an operator""" """Check if given JID is an operator"""
result = False result = False
@ -94,44 +90,29 @@ class XmppUtilities:
# operator_name = operator['name'] # operator_name = operator['name']
break break
return result return result
def is_admin(self, room, alias): def is_admin(self, room, alias):
"""Check if given JID is an administrator""" """Check if given JID is an administrator"""
role = self.plugin['xep_0045'].get_jid_property(room, alias, 'affiliation') affiliation = self.plugin['xep_0045'].get_jid_property(room, alias, 'affiliation')
if role == 'admin': result = True if affiliation == 'admin' else False
result = True
else:
result = False
return result return result
def is_owner(self, room, alias): def is_owner(self, room, alias):
"""Check if given JID is an owner""" """Check if given JID is an owner"""
role = self.plugin['xep_0045'].get_jid_property(room, alias, 'affiliation') affiliation = self.plugin['xep_0045'].get_jid_property(room, alias, 'affiliation')
if role == 'owner': result = True if affiliation == 'owner' else False
result = True
else:
result = False
return result return result
def is_moderator(self, room, alias): def is_moderator(self, room, alias):
"""Check if given JID is a moderator""" """Check if given JID is a moderator"""
role = self.plugin['xep_0045'].get_jid_property(room, alias, 'role') role = self.plugin['xep_0045'].get_jid_property(room, alias, 'role')
if role == 'moderator': result = True if role == 'moderator' else False
result = True
else:
result = False
return result return result
# NOTE Would this properly work when Alias and Local differ?
def is_member(self, jid_bare, jid_full): def is_member(self, jid_bare, jid_full):
"""Check if given JID is a member""" """Check if given JID is a member"""
alias = jid_full[jid_full.index('/')+1:] alias = jid_full[jid_full.index('/')+1:]
affiliation = self.plugin['xep_0045'].get_jid_property(jid_bare, alias, 'affiliation') affiliation = self.plugin['xep_0045'].get_jid_property(jid_bare, alias, 'affiliation')
if affiliation == 'member': result = True if affiliation == 'member' else False
result = True
else:
result = False
return result return result