Compare commits

...

2 commits

Author SHA1 Message Date
Schimon Jehudah, Adv.
a02cc5a7a4 Check self presence upon an invitation to a group chat. 2024-11-19 11:11:49 +02:00
Schimon Jehudah, Adv.
39b0cffe66 Greet new occupants (Thank you to polymath). 2024-11-19 11:09:50 +02:00
3 changed files with 130 additions and 17 deletions

View file

@ -465,9 +465,10 @@ class Toml:
data_dir = Toml.get_default_data_directory() data_dir = Toml.get_default_data_directory()
if not os.path.isdir(data_dir): if not os.path.isdir(data_dir):
os.mkdir(data_dir) os.mkdir(data_dir)
if not os.path.isdir(data_dir + "/toml"): data_dir_toml = os.path.join(data_dir, 'toml')
os.mkdir(data_dir + "/toml") if not os.path.isdir(data_dir_toml):
filename = os.path.join(data_dir, "toml", r"{}.toml".format(room)) os.mkdir(data_dir_toml)
filename = os.path.join(data_dir_toml, f'{room}.toml')
if not os.path.exists(filename): if not os.path.exists(filename):
Toml.create_settings_file(self, filename) Toml.create_settings_file(self, filename)
Toml.load_jid_settings(self, room, filename) Toml.load_jid_settings(self, room, filename)
@ -482,13 +483,13 @@ class Toml:
data_home = os.environ.get('APPDATA') data_home = os.environ.get('APPDATA')
if data_home is None: if data_home is None:
return os.path.join( return os.path.join(
os.path.dirname(__file__) + '/kaikout_data') os.path.dirname(__file__), 'kaikout_data')
else: else:
return os.path.join(os.path.dirname(__file__) + '/kaikout_data') return os.path.join(os.path.dirname(__file__), 'kaikout_data')
def get_data_file(data_dir, room): def get_data_file(data_dir, room):
toml_file = os.path.join(data_dir, "toml", r"{}.toml".format(room)) toml_file = os.path.join(data_dir, 'toml', f'{room}.toml')
return toml_file return toml_file

View file

@ -44,9 +44,9 @@ class Config:
data_home = os.environ.get('APPDATA') data_home = os.environ.get('APPDATA')
if data_home is None: if data_home is None:
return os.path.join( return os.path.join(
os.path.dirname(__file__) + '/kaikout_data') os.path.dirname(__file__), 'kaikout_data')
else: else:
return os.path.join(os.path.dirname(__file__) + '/kaikout_data') return os.path.join(os.path.dirname(__file__), 'kaikout_data')
def get_default_config_directory(): def get_default_config_directory():
@ -91,8 +91,8 @@ class Config:
def get_values(filename, key=None): def get_values(filename, key=None):
config_dir = Config.get_default_config_directory() config_dir = Config.get_default_config_directory()
if not os.path.isdir(config_dir): config_dir = '/usr/share/kaikout/' if not os.path.isdir(config_dir): config_dir = os.path.join('usr', 'share', 'kaikout')
if not os.path.isdir(config_dir): config_dir = os.path.dirname(__file__) + "/assets" if not os.path.isdir(config_dir): config_dir = os.path.join(os.path.dirname(__file__), 'assets')
config_file = os.path.join(config_dir, filename) config_file = os.path.join(config_dir, filename)
with open(config_file, mode="rb") as f: result = tomllib.load(f) with open(config_file, mode="rb") as f: result = tomllib.load(f)
values = result[key] if key else result values = result[key] if key else result
@ -136,14 +136,73 @@ class Documentation:
class Log: class Log:
def csv(filename, fields): def jid_exist(filename, fields):
""" """
Log message to CSV file. Ceck whether Alias and Jabber ID exist.
Parameters Parameters
---------- ----------
message : slixmpp.stanza.message.Message filename : str
Message object. Filename.
fields : list
jid, alias, timestamp.
Returns
-------
None.
"""
data_dir = Config.get_default_data_directory()
if not os.path.isdir(data_dir): return False
data_dir_logs = os.path.join(data_dir, 'logs')
if not os.path.isdir(data_dir_logs): return False
csv_file = os.path.join(data_dir_logs, f'{filename}.csv')
if not os.path.exists(csv_file): return False
with open(csv_file, 'r') as f:
reader = csv.reader(f)
for line in reader:
if line[0] == fields[0]:
return True
def alias_jid_exist(filename, fields):
"""
Ceck whether Alias and Jabber ID exist.
Parameters
----------
filename : str
Filename.
fields : list
jid, alias, timestamp.
Returns
-------
None.
"""
data_dir = Config.get_default_data_directory()
if not os.path.isdir(data_dir): return False
data_dir_logs = os.path.join(data_dir, 'logs')
if not os.path.isdir(data_dir_logs): return False
csv_file = os.path.join(data_dir_logs, f'{filename}.csv')
if not os.path.exists(csv_file): return False
with open(csv_file, 'r') as f:
reader = csv.reader(f)
for line in reader:
if line[0] == fields[0] and line[1] == fields[1]:
return True
def csv_jid(filename, fields):
"""
Log Jabber ID to CSV file.
Parameters
----------
filename : str
Filename.
fields : list
jid, alias, timestamp.
Returns Returns
------- -------
@ -151,11 +210,42 @@ class Log:
""" """
data_dir = Config.get_default_data_directory() data_dir = Config.get_default_data_directory()
if not os.path.isdir(data_dir): os.mkdir(data_dir) if not os.path.isdir(data_dir): os.mkdir(data_dir)
if not os.path.isdir(data_dir + "/logs"): os.mkdir(data_dir + "/logs") data_dir_logs = os.path.join(data_dir, 'logs')
csv_file = os.path.join(data_dir, "logs", r"{}.csv".format(filename)) if not os.path.isdir(data_dir_logs): os.mkdir(data_dir_logs)
csv_file = os.path.join(data_dir_logs, f'{filename}.csv')
if not os.path.exists(csv_file):
columns = ['jid', 'alias', 'timestamp']
with open(csv_file, 'w') as f:
writer = csv.writer(f)
writer.writerow(columns)
with open(csv_file, 'a') as f:
writer = csv.writer(f)
writer.writerow(fields)
def csv(filename, fields):
"""
Log message or presence to CSV file.
Parameters
----------
filename : str
Filename.
fields : list
type, timestamp, alias, body, lang, and identifier.
Returns
-------
None.
"""
data_dir = Config.get_default_data_directory()
if not os.path.isdir(data_dir): os.mkdir(data_dir)
data_dir_logs = os.path.join(data_dir, 'logs')
if not os.path.isdir(data_dir_logs): os.mkdir(data_dir_logs)
csv_file = os.path.join(data_dir_logs, f'{filename}.csv')
if not os.path.exists(csv_file): if not os.path.exists(csv_file):
columns = ['type', 'timestamp', 'alias', 'body', 'lang', 'identifier'] columns = ['type', 'timestamp', 'alias', 'body', 'lang', 'identifier']
with open(csv_file, 'a') as f: with open(csv_file, 'w') as f:
writer = csv.writer(f) writer = csv.writer(f)
writer.writerow(columns) writer.writerow(columns)
with open(csv_file, 'a') as f: with open(csv_file, 'a') as f:

View file

@ -148,6 +148,9 @@ class XmppClient(slixmpp.ClientXMPP):
.format(self.alias, self.boundjid.bare)) .format(self.alias, self.boundjid.bare))
XmppMessage.send(self, room, message_body, 'groupchat') XmppMessage.send(self, room, message_body, 'groupchat')
XmppStatus.send_status_message(self, room) XmppStatus.send_status_message(self, room)
self.add_event_handler("muc::%s::got_online" % room, self.on_muc_got_online)
self.add_event_handler("muc::%s::presence" % room, self.on_muc_presence)
self.add_event_handler("muc::%s::self-presence" % room, self.on_muc_self_presence)
async def on_groupchat_direct_invite(self, message): async def on_groupchat_direct_invite(self, message):
@ -164,6 +167,9 @@ class XmppClient(slixmpp.ClientXMPP):
.format(self.boundjid.bare)) .format(self.boundjid.bare))
XmppMessage.send(self, room, message_body, 'groupchat') XmppMessage.send(self, room, message_body, 'groupchat')
XmppStatus.send_status_message(self, room) XmppStatus.send_status_message(self, room)
self.add_event_handler("muc::%s::got_online" % room, self.on_muc_got_online)
self.add_event_handler("muc::%s::presence" % room, self.on_muc_presence)
self.add_event_handler("muc::%s::self-presence" % room, self.on_muc_self_presence)
async def on_message(self, message): async def on_message(self, message):
@ -211,6 +217,15 @@ class XmppClient(slixmpp.ClientXMPP):
filename = datetime.today().strftime('%Y-%m-%d') + '_' + room filename = datetime.today().strftime('%Y-%m-%d') + '_' + room
Log.csv(filename, fields) Log.csv(filename, fields)
jid_bare = presence['muc']['jid'].bare jid_bare = presence['muc']['jid'].bare
fields = [jid_bare, alias, timestamp_iso]
if jid_bare:
if not Log.jid_exist(room, fields):
message_body = (f'Welcome, {alias}! We hope that you would '
'have a good time at this group chat.')
XmppMessage.send(self, room, message_body, 'groupchat')
Log.csv_jid(room, fields)
elif not Log.alias_jid_exist(room, fields):
Log.csv_jid(room, fields)
if (XmppMuc.is_moderator(self, room, self.alias) and if (XmppMuc.is_moderator(self, room, self.alias) and
self.settings[room]['enabled'] and self.settings[room]['enabled'] and
jid_bare and jid_bare and
@ -263,6 +278,13 @@ class XmppClient(slixmpp.ClientXMPP):
alias = presence['muc']['nick'] alias = presence['muc']['nick']
room = presence['muc']['room'] room = presence['muc']['room']
if actor and alias == self.alias: XmppStatus.send_status_message(self, room) if actor and alias == self.alias: XmppStatus.send_status_message(self, room)
# TODO Check whether group chat is not anonymous
if XmppMuc.is_moderator(self, room, self.alias):
timestamp_iso = datetime.now().isoformat()
for alias in XmppMuc.get_roster(self, room):
jid_bare = XmppMuc.get_full_jid(self, room, alias).split('/')[0]
fields = [jid_bare, alias, timestamp_iso]
if not Log.alias_jid_exist(room, fields): Log.csv_jid(room, fields)
def on_reactions(self, message): def on_reactions(self, message):