#!/usr/bin/env python3 # -*- coding: utf-8 -*- import csv from email.utils import parseaddr import hashlib from kaikout.database import DatabaseToml from kaikout.log import Logger #import kaikout.sqlite as sqlite import os import sys import tomli_w from urllib.parse import urlsplit try: import tomllib except: import tomli as tomllib logger = Logger(__name__) class Config: def get_default_data_directory(): """ Determine the directory path where data will be stored. * If $XDG_DATA_HOME is defined, use it; * else if $HOME exists, use it; * else if the platform is Windows, use %APPDATA%; * else use the current directory. Returns ------- str Path to data directory. """ directory_home = os.environ.get('HOME') if directory_home: data_home = os.path.join(directory_home, '.local', 'share') return os.path.join(data_home, 'kaikout') elif sys.platform == 'win32': data_home = os.environ.get('APPDATA') if data_home is None: return 'kaikout_data' else: return 'kaikout_data' def get_default_config_directory(): """ Determine the directory path where configuration will be stored. * If $XDG_CONFIG_HOME is defined, use it; * else if $HOME exists, use it; * else if the platform is Windows, use %APPDATA%; * else use the current directory. Returns ------- str Path to configuration directory. """ # directory_config_home = xdg.BaseDirectory.xdg_config_home directory_config_home = os.environ.get('XDG_CONFIG_HOME') if directory_config_home is None: directory_home = os.environ.get('HOME') if directory_home is None: if sys.platform == 'win32': directory_config_home = os.environ.get('APPDATA') if directory_config_home is None: return 'kaikout_config' else: return 'kaikout_config' else: directory_config_home = os.path.join(directory_home, '.config') return os.path.join(directory_config_home, 'kaikout') class Documentation: def manual(filename, section=None, command=None): function_name = sys._getframe().f_code.co_name logger.debug('{}: filename: {}'.format(function_name, filename)) config_dir = Config.get_default_config_directory() with open(config_dir + '/' + filename, mode="rb") as f: cmds = tomllib.load(f) if section == 'all': cmd_list = '' for cmd in cmds: for i in cmds[cmd]: cmd_list += cmds[cmd][i] + '\n' elif command and section: try: cmd_list = cmds[section][command] except KeyError as e: logger.error(e) cmd_list = None elif section: try: cmd_list = [] for cmd in cmds[section]: cmd_list.extend([cmd]) except KeyError as e: logger.error('KeyError:' + str(e)) cmd_list = None else: cmd_list = [] for cmd in cmds: cmd_list.extend([cmd]) return cmd_list class Log: def jid_exist(filename, fields): """ Ceck whether Alias and Jabber ID exist. Parameters ---------- filename : str Filename. fields : list jid, alias, timestamp. Returns ------- None. """ data_dir = Config.get_default_data_directory() if not os.path.isdir(data_dir): return False data_dir_logs = os.path.join(data_dir, 'logs') if not os.path.isdir(data_dir_logs): return False csv_file = os.path.join(data_dir_logs, f'{filename}.csv') if not os.path.exists(csv_file): return False with open(csv_file, 'r') as f: reader = csv.reader(f) for line in reader: if line[0] == fields[0]: return True def alias_jid_exist(filename, fields): """ Ceck whether Alias and Jabber ID exist. Parameters ---------- filename : str Filename. fields : list jid, alias, timestamp. Returns ------- None. """ data_dir = Config.get_default_data_directory() if not os.path.isdir(data_dir): return False data_dir_logs = os.path.join(data_dir, 'logs') if not os.path.isdir(data_dir_logs): return False csv_file = os.path.join(data_dir_logs, f'{filename}.csv') if not os.path.exists(csv_file): return False with open(csv_file, 'r') as f: reader = csv.reader(f) for line in reader: if line[0] == fields[0] and line[1] == fields[1]: return True def csv_jid(filename, fields): """ Log Jabber ID to CSV file. Parameters ---------- filename : str Filename. fields : list jid, alias, timestamp. Returns ------- None. """ data_dir = Config.get_default_data_directory() if not os.path.isdir(data_dir): os.mkdir(data_dir) data_dir_logs = os.path.join(data_dir, 'logs') if not os.path.isdir(data_dir_logs): os.mkdir(data_dir_logs) csv_file = os.path.join(data_dir_logs, f'{filename}.csv') if not os.path.exists(csv_file): columns = ['jid', 'alias', 'timestamp'] with open(csv_file, 'w') as f: writer = csv.writer(f) writer.writerow(columns) with open(csv_file, 'a') as f: writer = csv.writer(f) writer.writerow(fields) def csv(filename, fields): """ Log message or presence to CSV file. Parameters ---------- filename : str Filename. fields : list type, timestamp, alias, body, lang, and identifier. Returns ------- None. """ data_dir = Config.get_default_data_directory() if not os.path.isdir(data_dir): os.mkdir(data_dir) data_dir_logs = os.path.join(data_dir, 'logs') if not os.path.isdir(data_dir_logs): os.mkdir(data_dir_logs) csv_file = os.path.join(data_dir_logs, f'{filename}.csv') if not os.path.exists(csv_file): columns = ['type', 'timestamp', 'alias', 'body', 'lang', 'identifier'] with open(csv_file, 'w') as f: writer = csv.writer(f) writer.writerow(columns) with open(csv_file, 'a') as f: writer = csv.writer(f) writer.writerow(fields) def toml(self, room, fields, stanza_type): """ Log message to TOML file. Parameters ---------- room : str Group chat Jabber ID. fields : list alias, room, identifier, timestamp. stanza_type: str message or presence. Returns ------- None. """ alias, content, identifier, timestamp = fields data_dir = DatabaseToml.get_default_data_directory() filename = DatabaseToml.get_data_file(data_dir, room) # filename = room + '.toml' entry = {} entry['alias'] = alias entry['body'] = content entry['id'] = identifier entry['timestamp'] = timestamp activity_type = 'activity_' + stanza_type message_activity_list = self.settings[room][activity_type] if activity_type in self.settings[room] else [] while len(message_activity_list) > 20: message_activity_list.pop(0) message_activity_list.append(entry) self.settings[room][activity_type] = message_activity_list # NOTE This directive might not be needed data = self.settings[room] content = tomli_w.dumps(data) with open(filename, 'w') as f: f.write(content) class BlockList: def get_filename(): """ Get pathname of filename. If filename does not exist, create it. Parameters ---------- None. Returns ------- filename : str Pathname. """ data_dir = Config.get_default_data_directory() if not os.path.isdir(data_dir): os.mkdir(data_dir) filename = os.path.join(data_dir, r"blocklist.toml") if not os.path.exists(filename): data = {'entries' : {}} content = tomli_w.dumps(data) with open(filename, 'w') as f: f.write(content) return filename def add_entry_to_blocklist(self, jabber_id, node_id, item_id): """ Update blocklist file. Parameters ---------- jabber_id : str Jabber ID. node_id : str Node name. item_id : str Item ID. Returns ------- None. """ if jabber_id not in self.blocklist['entries']: self.blocklist['entries'][jabber_id] = {} if node_id not in self.blocklist['entries'][jabber_id]: self.blocklist['entries'][jabber_id][node_id] = [] self.blocklist['entries'][jabber_id][node_id].append(item_id) data = self.blocklist content = tomli_w.dumps(data) filename = BlockList.get_filename() with open(filename, 'w') as f: f.write(content) class String: def md5_hash(url): """ Hash URL string to MD5 checksum. Parameters ---------- url : str URL. Returns ------- url_digest : str Hashed URL as an MD5 checksum. """ url_encoded = url.encode() url_hashed = hashlib.md5(url_encoded) url_digest = url_hashed.hexdigest() return url_digest class Toml: def open_file(filename: str) -> dict: with open(filename, mode="rb") as fn: data = tomllib.load(fn) return data def save_file(filename: str, data: dict) -> None: with open(filename, 'w') as fn: data_as_string = tomli_w.dumps(data) fn.write(data_as_string) class Url: def check_xmpp_uri(uri): """ Check validity of XMPP URI. Parameters ---------- uri : str URI. Returns ------- jid : str JID or None. """ jid = urlsplit(uri).path if parseaddr(jid)[1] != jid: jid = False return jid