KaikOut/kaikout/utilities.py
Schimon Jehudah, Adv. 8df43fd327 Add RTBL configuration file;
Add functionality to exclude specified RTBL lists.
2024-07-30 15:07:34 +03:00

306 lines
8.6 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import csv
from email.utils import parseaddr
import hashlib
from kaikout.database import Toml
from kaikout.log import Logger
#import kaikout.sqlite as sqlite
import os
import sys
import tomli_w
from urllib.parse import urlsplit
try:
import tomllib
except:
import tomli as tomllib
logger = Logger(__name__)
class Config:
def get_default_data_directory():
"""
Determine the directory path where data will be stored.
* If $XDG_DATA_HOME is defined, use it;
* else if $HOME exists, use it;
* else if the platform is Windows, use %APPDATA%;
* else use the current directory.
Returns
-------
str
Path to data directory.
"""
if os.environ.get('HOME'):
data_home = os.path.join(os.environ.get('HOME'), '.local', 'share')
return os.path.join(data_home, 'kaikout')
elif sys.platform == 'win32':
data_home = os.environ.get('APPDATA')
if data_home is None:
return os.path.join(
os.path.dirname(__file__) + '/kaikout_data')
else:
return os.path.join(os.path.dirname(__file__) + '/kaikout_data')
def get_default_config_directory():
"""
Determine the directory path where configuration will be stored.
* If $XDG_CONFIG_HOME is defined, use it;
* else if $HOME exists, use it;
* else if the platform is Windows, use %APPDATA%;
* else use the current directory.
Returns
-------
str
Path to configuration directory.
"""
# config_home = xdg.BaseDirectory.xdg_config_home
config_home = os.environ.get('XDG_CONFIG_HOME')
if config_home is None:
if os.environ.get('HOME') is None:
if sys.platform == 'win32':
config_home = os.environ.get('APPDATA')
if config_home is None:
return os.path.abspath('.')
else:
return os.path.abspath('.')
else:
config_home = os.path.join(
os.environ.get('HOME'), '.config'
)
return os.path.join(config_home, 'kaikout')
def get_setting_value(db_file, key):
value = sqlite.get_setting_value(db_file, key)
if value:
value = value[0]
else:
value = Config.get_value('settings', 'Settings', key)
return value
def get_values(filename, key=None):
config_dir = Config.get_default_config_directory()
if not os.path.isdir(config_dir): config_dir = '/usr/share/kaikout/'
if not os.path.isdir(config_dir): config_dir = os.path.dirname(__file__) + "/assets"
config_file = os.path.join(config_dir, filename)
with open(config_file, mode="rb") as f: result = tomllib.load(f)
values = result[key] if key else result
return values
class Documentation:
def manual(filename, section=None, command=None):
function_name = sys._getframe().f_code.co_name
logger.debug('{}: filename: {}'.format(function_name, filename))
config_dir = Config.get_default_config_directory()
with open(config_dir + '/' + filename, mode="rb") as f: cmds = tomllib.load(f)
if section == 'all':
cmd_list = ''
for cmd in cmds:
for i in cmds[cmd]:
cmd_list += cmds[cmd][i] + '\n'
elif command and section:
try:
cmd_list = cmds[section][command]
except KeyError as e:
logger.error(e)
cmd_list = None
elif section:
try:
cmd_list = []
for cmd in cmds[section]:
cmd_list.extend([cmd])
except KeyError as e:
logger.error('KeyError:' + str(e))
cmd_list = None
else:
cmd_list = []
for cmd in cmds:
cmd_list.extend([cmd])
return cmd_list
class Log:
def csv(filename, fields):
"""
Log message to CSV file.
Parameters
----------
message : slixmpp.stanza.message.Message
Message object.
Returns
-------
None.
"""
data_dir = Config.get_default_data_directory()
if not os.path.isdir(data_dir): os.mkdir(data_dir)
if not os.path.isdir(data_dir + "/logs"): os.mkdir(data_dir + "/logs")
csv_file = os.path.join(data_dir, "logs", r"{}.csv".format(filename))
if not os.path.exists(csv_file):
columns = ['type', 'timestamp', 'alias', 'body', 'lang', 'identifier']
with open(csv_file, 'a') as f:
writer = csv.writer(f)
writer.writerow(columns)
with open(csv_file, 'a') as f:
writer = csv.writer(f)
writer.writerow(fields)
def toml(self, room, fields, stanza_type):
"""
Log message to TOML file.
Parameters
----------
room : str
Group chat Jabber ID.
fields : list
alias, room, identifier, timestamp.
stanza_type: str
message or presence.
Returns
-------
None.
"""
alias, content, identifier, timestamp = fields
data_dir = Toml.get_default_data_directory()
filename = Toml.get_data_file(data_dir, room)
# filename = room + '.toml'
entry = {}
entry['alias'] = alias
entry['body'] = content
entry['id'] = identifier
entry['timestamp'] = timestamp
activity_type = 'activity_' + stanza_type
message_activity_list = self.settings[room][activity_type] if activity_type in self.settings[room] else []
while len(message_activity_list) > 20: message_activity_list.pop(0)
message_activity_list.append(entry)
self.settings[room][activity_type] = message_activity_list # NOTE This directive might not be needed
data = self.settings[room]
content = tomli_w.dumps(data)
with open(filename, 'w') as f: f.write(content)
class BlockList:
def get_filename():
"""
Get pathname of filename.
If filename does not exist, create it.
Parameters
----------
None.
Returns
-------
filename : str
Pathname.
"""
data_dir = Config.get_default_data_directory()
if not os.path.isdir(data_dir): os.mkdir(data_dir)
filename = os.path.join(data_dir, r"blocklist.toml")
if not os.path.exists(filename):
data = {'entries' : {}}
content = tomli_w.dumps(data)
with open(filename, 'w') as f: f.write(content)
return filename
def load_blocklist(self):
filename = BlockList.get_filename()
with open(filename, 'rb') as f:
self.blocklist = tomllib.load(f)
def add_entry_to_blocklist(self, jabber_id, node_id, item_id):
"""
Update blocklist file.
Parameters
----------
jabber_id : str
Jabber ID.
node_id : str
Node name.
item_id : str
Item ID.
Returns
-------
None.
"""
if jabber_id not in self.blocklist['entries']:
self.blocklist['entries'][jabber_id] = {}
if node_id not in self.blocklist['entries'][jabber_id]:
self.blocklist['entries'][jabber_id][node_id] = []
self.blocklist['entries'][jabber_id][node_id].append(item_id)
data = self.blocklist
content = tomli_w.dumps(data)
filename = BlockList.get_filename()
with open(filename, 'w') as f: f.write(content)
class Url:
def check_xmpp_uri(uri):
"""
Check validity of XMPP URI.
Parameters
----------
uri : str
URI.
Returns
-------
jid : str
JID or None.
"""
jid = urlsplit(uri).path
if parseaddr(jid)[1] != jid:
jid = False
return jid
class String:
def md5_hash(url):
"""
Hash URL string to MD5 checksum.
Parameters
----------
url : str
URL.
Returns
-------
url_digest : str
Hashed URL as an MD5 checksum.
"""
url_encoded = url.encode()
url_hashed = hashlib.md5(url_encoded)
url_digest = url_hashed.hexdigest()
return url_digest