#!/usr/bin/env python3 # -*- coding: utf-8 -*- # from slixmpp import JID from kaikout.database import DatabaseToml from kaikout.log import Logger from kaikout.utilities import Documentation from kaikout.xmpp.commands import XmppCommands from kaikout.xmpp.message import XmppMessage from kaikout.xmpp.muc import XmppMuc from kaikout.xmpp.status import XmppStatus from kaikout.xmpp.utilities import XmppUtilities import time logger = Logger(__name__) # for task in main_task: # task.cancel() # Deprecated in favour of event "presence_available" # if not main_task: # await select_file() class XmppChat: async def process_message(self, message): """ Process incoming message stanzas. Be aware that this also includes MUC messages and error messages. It is usually a good practice to check the messages's type before processing or sending replies. Parameters ---------- message : str The received message stanza. See the documentation for stanza objects and the Message stanza to see how it may be used. """ # jid_bare = message['from'].bare # jid_full = str(message['from']) # Process commands. message_type = message['type'] message_body = message['body'] jid = message['from'] jid_bare = jid.bare if message_type == 'groupchat': alias = message['mucnick'] room = message['mucroom'] self_alias = XmppUtilities.get_self_alias(self, jid_bare) if (message['mucnick'] == self.alias or not XmppUtilities.is_moderator(self, room, alias) or (not message_body.startswith(self_alias + ' ') and not message_body.startswith(self_alias + ',') and not message_body.startswith(self_alias + ':'))): return # Adding one to the length because of # assumption that a comma or a dot is added self_alias_length = len(self_alias) + 1 command = message_body[self_alias_length:].lstrip() elif message_type in ('chat', 'normal'): command = message_body jid_full = jid.full room = self.sessions[jid_bare] if jid_bare in self.sessions else message_body status_mode,status_text, message_response = None, None, None # Start a configuration session if '@' in room: if room in XmppMuc.get_joined_rooms(self): if XmppUtilities.is_moderator(self, room, self.alias): alias = await XmppUtilities.is_jid_of_moderators( self, room, jid_full) if jid_bare not in self.sessions: if alias: # alias = XmppMuc.get_alias(self, room, jid) # if XmppUtilities.is_moderator(self, room, alias): room_owners = await XmppMuc.get_affiliation_list( self, room, 'owner') if not room_owners: present_participants = XmppMuc.get_roster(self, room) present_owners_alias = [] for participant in present_participants: affiliation = XmppMuc.get_affiliation( self, room, participant) if affiliation == 'owner': present_owners_alias.append(participant) present_owners = [] for owner_alias in present_owners_alias: owner_jid_full = XmppMuc.get_full_jid(self, room, owner_alias) owner_jid_bare = owner_jid_full.split('/')[0] present_owners.append(owner_jid_bare) if not present_owners: # present_moderators = await XmppMuc.get_role_list( # self, room, 'moderator') # moderators = [] # [moderators.append(moderator) for moderator in present_moderators if moderator not in moderators] room_admins = await XmppMuc.get_affiliation_list( self, room, 'admin') # NOTE: Consideration, when there is no access # to the list of owners from groupchat configuration # then send a message to the groupchat - use alias # instead of a Jabber ID. jids_to_notify = room_owners or present_owners or room_admins self.sessions[jid_bare] = room message_response = ( 'A session to configure groupchat {} has been ' 'established.'.format(room)) status_mode = 'chat' status_text = 'Session is opened: {}'.format(room) for jid in jids_to_notify: message_notification = ( 'A session for groupchat {} has been ' 'activated by moderator {}' .format(room, jid_bare)) XmppMessage.send( self, jid, message_notification, 'chat') else: message_response = ( 'You do not appear to be a moderator of ' 'groupchat {}'.format(room)) status_mode = 'available' status_text = ( 'Type the desired groupchat - in which you ' 'are a moderator at - to configure') moderators = await XmppMuc.get_role_list( self, room, 'moderator') message_notification = ( 'An unauthorized attempt to establish a ' 'session for groupchat {} has been made by {}' .format(room, jid_bare)) for moderator in moderators: jid_full = XmppMuc.get_full_jid(self, room, moderator) XmppMessage.send( self, jid_full, message_notification, 'chat') elif not alias: del self.sessions[jid_bare] message_response = ( 'The session has been ended, because you are no ' 'longer a moderator at groupchat {}'.format(room)) status_mode = 'away' status_text = 'Session is closed: {}'.format(room) moderators = await XmppMuc.get_role_list( self, room, 'moderator') message_notification = ( 'The session for groupchat {} with former ' 'moderator {} has been terminated.\n' 'A termination message has been sent to {}' .format(room, jid_bare, jid_bare)) for moderator in moderators: jid_full = XmppMuc.get_full_jid(self, room, moderator) XmppMessage.send( self, jid_full, message_notification, 'chat') else: room = self.sessions[jid_bare] else: message_response = ( 'KaikOut must be a moderator at groupchat "{}".' .format(room)) else: message_response = ( 'Invite KaikOut to groupchat "{}" and try again.\n' 'A session will not begin if KaikOut is not present ' 'in groupchat.'.format(room)) else: message_response = ('The text "{}" does not appear to be a ' 'valid groupchat Jabber ID.'.format(room)) if status_mode and status_text: XmppStatus.send_status_message(self, jid_full, status_mode, status_text) if message_response: XmppMessage.send_reply(self, message, message_response) return else: print('AN UNKNOWN MESSAGE') print(message) return db_file = DatabaseToml.instantiate(self, room) # # Support private message via groupchat # # See https://codeberg.org/poezio/slixmpp/issues/3506 # if message_type == 'chat' and message.get_plugin('muc', check=True): # # jid_bare = message['from'].bare # if (jid_bare == jid_full[:jid_full.index('/')]): # # TODO Count and alert of MUC-PM attempts # return command_time_start = time.time() command_lowercase = command.lower() # if not self.settings[room]['enabled']: # if not command_lowercase.startswith('start'): # return response = None match command_lowercase: case _ if command_lowercase.startswith('blacklist +'): value = command[11:].strip() if value: response = XmppCommands.set_filter( self, room, db_file, value, 'jid_blacklist', True) else: response = ('No action has been taken. ' 'Missing Jabber IDs.') case _ if command_lowercase.startswith('blacklist -'): value = command[11:].strip() if value: response = XmppCommands.set_filter( self, room, db_file, value, 'jid_blacklist', False) else: response = ('No action has been taken. ' 'Missing Jabber IDs.') case 'help': command_list = XmppCommands.print_help() response = ('Available command keys:\n' '```\n{}\n```\n' 'Usage: `help `' .format(command_list)) case 'help all': command_list = Documentation.manual('commands.toml', section='all') response = ('Complete list of commands:\n' '```\n{}\n```' .format(command_list)) case _ if command_lowercase.startswith('help'): command = command[5:].lower() command = command.split(' ') if len(command) == 2: command_root = command[0] command_name = command[1] command_list = Documentation.manual( 'commands.toml', section=command_root, command=command_name) if command_list: command_list = ''.join(command_list) response = (command_list) else: response = ('KeyError for {} {}' .format(command_root, command_name)) elif len(command) == 1: command = command[0] command_list = Documentation.manual('commands.toml', command) if command_list: command_list = ' '.join(command_list) response = ('Available command `{}` keys:\n' '```\n{}\n```\n' 'Usage: `help {} `' .format(command, command_list, command)) else: response = 'KeyError for {}'.format(command) else: response = ('Invalid. Enter command key ' 'or command key & name') case 'info': entries = XmppCommands.print_info_list() response = ('Available command options:\n' '```\n{}\n```\n' 'Usage: `info