#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ TODO 1) Function scan at "for entry in entries" Suppress directly calling function "add_entry" (accept db_file) Pass a list of valid entries to a new function "add_entries" (accept db_file) which would call function "add_entry" (accept cur). * accelerate adding of large set of entries at once. * prevent (or mitigate halt of consequent actions). * reduce I/O. 2) Call sqlite function from function statistics. Returning a list of values doesn't' seem to be a good practice. 3) Special statistics for operator: * Size of database(s); * Amount of JIDs subscribed; * Amount of feeds of all JIDs; * Amount of entries of all JIDs. 4) Consider to append text to remind to share presence '✒️ Share online status to receive updates' 5) Request for subscription if (await XmppUtilities.get_chat_type(self, jid_bare) == 'chat' and not self.client_roster[jid_bare]['to']): XmppPresence.subscription(self, jid_bare, 'subscribe') await XmppRoster.add(self, jid_bare) status_message = '✒️ Share online status to receive updates' XmppPresence.send(self, jid_bare, status_message) message_subject = 'RSS News Bot' message_body = 'Share online status to receive updates.' XmppMessage.send_headline(self, jid_bare, message_subject, message_body, 'chat') """ from datetime import datetime from dateutil.parser import parse from email.utils import parseaddr, parsedate, parsedate_to_datetime import hashlib from lxml import etree, html import os import random import slixfeed.fetch as fetch from slixfeed.log import Logger import slixfeed.sqlite as sqlite import sys from urllib.parse import ( parse_qs, urlencode, urljoin, # urlparse, urlsplit, urlunsplit ) try: import tomllib except: import tomli as tomllib logger = Logger(__name__) class Config: def get_default_data_directory(): if os.environ.get('HOME'): data_home = os.path.join(os.environ.get('HOME'), '.local', 'share') return os.path.join(data_home, 'kaikout') elif sys.platform == 'win32': data_home = os.environ.get('APPDATA') if data_home is None: return os.path.join( os.path.dirname(__file__) + '/kaikout_data') else: return os.path.join(os.path.dirname(__file__) + '/kaikout_data') def get_default_config_directory(): """ Determine the directory path where configuration will be stored. * If $XDG_CONFIG_HOME is defined, use it; * else if $HOME exists, use it; * else if the platform is Windows, use %APPDATA%; * else use the current directory. Returns ------- str Path to configuration directory. """ # config_home = xdg.BaseDirectory.xdg_config_home config_home = os.environ.get('XDG_CONFIG_HOME') if config_home is None: if os.environ.get('HOME') is None: if sys.platform == 'win32': config_home = os.environ.get('APPDATA') if config_home is None: return os.path.abspath('.') else: return os.path.abspath('.') else: config_home = os.path.join( os.environ.get('HOME'), '.config' ) return os.path.join(config_home, 'kaikout') def get_setting_value(db_file, key): value = sqlite.get_setting_value(db_file, key) if value: value = value[0] else: value = Config.get_value('settings', 'Settings', key) return value def get_values(filename, key=None): config_dir = Config.get_default_config_directory() if not os.path.isdir(config_dir): config_dir = '/usr/share/slixfeed/' if not os.path.isdir(config_dir): config_dir = os.path.dirname(__file__) + "/assets" config_file = os.path.join(config_dir, filename) with open(config_file, mode="rb") as defaults: result = tomllib.load(defaults) values = result[key] if key else result return values class Database: def instantiate(dir_data, jid_bare): """ Instantiate action on database and return its filename location. Parameters ---------- dir_data : str Directory. jid_file : str Jabber ID. Returns ------- db_file Filename. """ db_file = os.path.join(dir_data, 'sqlite', f'{jid_bare}.db') sqlite.create_tables(db_file) return db_file class DateAndTime: #https://feedparser.readthedocs.io/en/latest/date-parsing.html def now(): """ ISO 8601 Timestamp. Returns ------- date : ??? ISO 8601 Timestamp. """ date = datetime.now().isoformat() return date def convert_struct_time_to_iso8601(struct_time): date = datetime(*struct_time[:6]) date = date.isoformat() return date def convert_seconds_to_yyyy_mm_dd(seconds_time): date_time = datetime.fromtimestamp(seconds_time) formatted_date = date_time.strftime('%Y-%m-%d') return formatted_date def current_date(): """ Print MM DD, YYYY (Weekday Time) timestamp. Returns ------- date : str MM DD, YYYY (Weekday Time) timestamp. """ now = datetime.now() time = now.strftime("%B %d, %Y (%A %T)") return time def current_time(): """ Print HH:MM:SS timestamp. Returns ------- date : str HH:MM:SS timestamp. """ now = datetime.now() time = now.strftime("%H:%M:%S") return time def timestamp(): """ Print time stamp to be used in filename. Returns ------- formatted_time : str %Y%m%d-%H%M%S timestamp. """ now = datetime.now() formatted_time = now.strftime("%Y%m%d-%H%M%S") return formatted_time def validate(date): """ Validate date format. Parameters ---------- date : str Timestamp. Returns ------- date : str Timestamp. """ try: parse(date) except: date = DateAndTime.now() return date def rfc2822_to_iso8601(date): """ Convert RFC 2822 into ISO 8601. Parameters ---------- date : str RFC 2822 Timestamp. Returns ------- date : str ISO 8601 Timestamp. """ if parsedate(date): try: date = parsedate_to_datetime(date) date = date.isoformat() except: date = DateAndTime.now() return date class Documentation: def manual(config_dir, section=None, command=None): function_name = sys._getframe().f_code.co_name logger.debug('{}: filename: {}'.format(function_name, config_dir)) filename = os.path.join(config_dir, 'commands.toml') with open(filename, mode="rb") as commands: cmds = tomllib.load(commands) if section == 'all': cmd_list = '' for cmd in cmds: for i in cmds[cmd]: cmd_list += cmds[cmd][i] + '\n' elif command and section: try: cmd_list = cmds[section][command] except KeyError as e: logger.error(e) cmd_list = None elif section: try: cmd_list = [] for cmd in cmds[section]: cmd_list.extend([cmd]) except KeyError as e: logger.error('KeyError:' + str(e)) cmd_list = None else: cmd_list = [] for cmd in cmds: cmd_list.extend([cmd]) return cmd_list class Html: async def extract_image_from_html(url): function_name = sys._getframe().f_code.co_name logger.debug('{}: url: {}'.format(function_name, url)) result = await fetch.http(settings_network, url) if not result['error']: data = result['content'] tree = html.fromstring(data) # TODO Exclude banners, class="share" links etc. images = tree.xpath( '//img[not(' 'contains(@src, "avatar") or ' 'contains(@src, "cc-by-sa") or ' 'contains(@src, " https://www.lilithsaintcrow.com/2024/02/love-anonymous//image/png;base64,iVBORw0KGgoAAAANSUhEUgAABaAAAAeAAQAAAAAQ6M16AAAAAnRSTlMAAHaTzTgAAAFmSURBVBgZ7cEBAQAAAIKg/q92SMAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADgWE3LAAGyZmPPAAAAAElFTkSuQmCC TODO 1) ActivityPub URL revealer activitypub_to_http. 2) SQLite preference "instance" for preferred instances. """ class Url: # NOTE # hostname and protocol are listed as one in file proxies.toml. # Perhaps a better practice would be to have them separated. # NOTE # File proxies.toml will remain as it is, in order to be # coordinated with the dataset of project LibRedirect, even # though rule-sets might be adopted (see )Privacy Redirect). def get_hostname(url): parted_url = urlsplit(url) hostname = parted_url.netloc if hostname.startswith('www.'): hostname = hostname.replace('www.', '') return hostname async def replace_hostname(configuration_directory, proxies, settings_network, url, url_type): """ Replace hostname. Parameters ---------- proxies : list A list of hostnames. url : str A URL. url_type : str A "feed" or a "link". Returns ------- url : str A processed URL. """ url_new = None parted_url = urlsplit(url) # protocol = parted_url.scheme hostname = parted_url.netloc hostname = hostname.replace('www.','') pathname = parted_url.path queries = parted_url.query fragment = parted_url.fragment for proxy_name in proxies: proxy = proxies[proxy_name] if hostname in proxy['hostname'] and url_type in proxy['type']: while not url_new: print('>>>') print(url_new) proxy_type = 'clearnet' proxy_list = proxy[proxy_type] if len(proxy_list): # proxy_list = proxies[proxy_name][proxy_type] proxy_url = random.choice(proxy_list) parted_proxy_url = urlsplit(proxy_url) protocol_new = parted_proxy_url.scheme hostname_new = parted_proxy_url.netloc url_new = urlunsplit([protocol_new, hostname_new, pathname, queries, fragment]) print(proxy_url) print(url_new) print('>>>') response = await fetch.http(settings_network, url_new) if (response and response['status_code'] == 200 and # response.reason == 'OK' and url_new.startswith(proxy_url)): break else: proxies_obsolete_file = os.path.join(configuration_directory, 'proxies_obsolete.toml') proxies_file = os.path.join(configuration_directory, 'proxies.toml') breakpoint() proxies_obsolete = Toml.open_file(proxies_obsolete_file) proxies_obsolete['proxies'][proxy_name][proxy_type].append(proxy_url) Toml.save_file(proxies_obsolete_file, proxies_obsolete) # TODO self.proxies might need to be changed, so self probably should be passed. proxies['proxies'][proxy_name][proxy_type].remove(proxy_url) Toml.save_file(proxies_file, proxies) url_new = None else: logger.warning('No proxy URLs for {}. ' 'Please update proxies.toml' .format(proxy_name)) url_new = url break return url_new def remove_tracking_parameters(trackers, url): """ Remove queries with tracking parameters. Parameters ---------- trackers : list A list of queries. url : str A URL. Returns ------- url : str A processed URL. """ if url.startswith('data:') and ';base64,' in url: return url parted_url = urlsplit(url) protocol = parted_url.scheme hostname = parted_url.netloc pathname = parted_url.path queries = parse_qs(parted_url.query) fragment = parted_url.fragment for tracker in trackers: if tracker in queries: del queries[tracker] queries_new = urlencode(queries, doseq=True) url = urlunsplit([protocol, hostname, pathname, queries_new, fragment]) return url def feed_to_http(url): """ Replace scheme FEED by HTTP. Parameters ---------- url : str URL. Returns ------- new_url : str URL. """ par_url = urlsplit(url) new_url = urlunsplit(['http', par_url.netloc, par_url.path, par_url.query, par_url.fragment]) return new_url def check_xmpp_uri(uri): """ Check validity of XMPP URI. Parameters ---------- uri : str URI. Returns ------- jid : str JID or None. """ jid = urlsplit(uri).path if parseaddr(jid)[1] != jid: jid = False return jid # NOTE Read the documentation # https://docs.python.org/3/library/urllib.parse.html#urllib.parse.urljoin def complete_url(source, link): """ Check if URL is pathname and complete it into URL. Parameters ---------- source : str Feed URL. link : str Link URL or pathname. Returns ------- str URL. """ if link.startswith('data:') and ';base64,' in link: return link if link.startswith('www.'): return 'http://' + link parted_link = urlsplit(link) parted_feed = urlsplit(source) if parted_link.scheme == 'magnet' and parted_link.query: return link if parted_link.scheme and parted_link.netloc: return link if link.startswith('//'): if parted_link.netloc and parted_link.path: new_link = urlunsplit([parted_feed.scheme, parted_link.netloc, parted_link.path, parted_link.query, parted_link.fragment]) elif link.startswith('/'): new_link = urlunsplit([parted_feed.scheme, parted_feed.netloc, parted_link.path, parted_link.query, parted_link.fragment]) elif link.startswith('../'): pathlink = parted_link.path.split('/') pathfeed = parted_feed.path.split('/') for i in pathlink: if i == '..': if pathlink.index('..') == 0: pathfeed.pop() else: break while pathlink.count('..'): if pathlink.index('..') == 0: pathlink.remove('..') else: break pathlink = '/'.join(pathlink) pathfeed.extend([pathlink]) new_link = urlunsplit([parted_feed.scheme, parted_feed.netloc, '/'.join(pathfeed), parted_link.query, parted_link.fragment]) else: pathlink = parted_link.path.split('/') pathfeed = parted_feed.path.split('/') if link.startswith('./'): pathlink.remove('.') if not source.endswith('/'): pathfeed.pop() pathlink = '/'.join(pathlink) pathfeed.extend([pathlink]) new_link = urlunsplit([parted_feed.scheme, parted_feed.netloc, '/'.join(pathfeed), parted_link.query, parted_link.fragment]) return new_link # TODO # Feed https://www.ocaml.org/feed.xml # Link %20https://frama-c.com/fc-versions/cobalt.html%20 # FIXME # Feed https://cyber.dabamos.de/blog/feed.rss # Link https://cyber.dabamos.de/blog/#article-2022-07-15 def join_url(source, link): """ Join base URL with given pathname. Parameters ---------- source : str Feed URL. link : str Link URL or pathname. Returns ------- str URL. """ if link.startswith('data:') and ';base64,' in link: return link if link.startswith('www.'): new_link = 'http://' + link elif link.startswith('%20') and link.endswith('%20'): old_link = link.split('%20') del old_link[0] old_link.pop() new_link = ''.join(old_link) else: new_link = urljoin(source, link) return new_link def trim_url(url): """ Check URL pathname for double slash. Parameters ---------- url : str URL. Returns ------- url : str URL. """ if url.startswith('data:') and ';base64,' in url: return url parted_url = urlsplit(url) protocol = parted_url.scheme hostname = parted_url.netloc pathname = parted_url.path queries = parted_url.query fragment = parted_url.fragment while '//' in pathname: pathname = pathname.replace('//', '/') url = urlunsplit([protocol, hostname, pathname, queries, fragment]) return url def activitypub_to_http(namespace): """ Replace ActivityPub namespace by HTTP. Parameters ---------- namespace : str Namespace. Returns ------- new_url : str URL. """ class String: def generate_identifier(url, counter): hostname = Url.get_hostname(url) hostname = hostname.replace('.','-') identifier = hostname + ':' + str(counter) return identifier # string_to_md5_hash # NOTE Warning: Entry might not have a link # TODO Handle situation error def md5_hash(url): url_encoded = url.encode() url_hashed = hashlib.md5(url_encoded) url_digest = url_hashed.hexdigest() return url_digest class Utilities: # string_to_md5_hash # NOTE Warning: Entry might not have a link # TODO Handle situation error def hash_url_to_md5(url): url_encoded = url.encode() url_hashed = hashlib.md5(url_encoded) url_digest = url_hashed.hexdigest() return url_digest def pick_a_feed(dir_config, lang=None): function_name = sys._getframe().f_code.co_name logger.debug('{}: lang: {}' .format(function_name, lang)) filename_feeds = os.path.join(dir_config, 'feeds.toml') with open(filename_feeds, mode="rb") as feeds: urls = tomllib.load(feeds) import random url = random.choice(urls['feeds']) return url