From f65be8b5c845ff36cb76ffb5d8b0e659dd6b6261 Mon Sep 17 00:00:00 2001 From: Schimon Jehudah Date: Tue, 2 Jan 2024 11:42:41 +0000 Subject: [PATCH] WIP Add http proxy support. Add more functionality to handle bookmarks. Split into more modules. Remove callback function initdb. Tasked status messages are broken. --- slixfeed/__main__.py | 32 +- slixfeed/config.py | 67 +++- slixfeed/fetch.py | 115 ++---- slixfeed/sqlite.py | 196 +++------ slixfeed/task.py | 145 ++++--- slixfeed/url.py | 8 +- slixfeed/utility.py | 109 +++++ slixfeed/xmpp/bookmark.py | 7 + slixfeed/xmpp/client.py | 497 ++++++----------------- slixfeed/xmpp/compose.py | 815 ++++++-------------------------------- slixfeed/xmpp/connect.py | 8 +- slixfeed/xmpp/muc.py | 50 ++- slixfeed/xmpp/process.py | 757 +++++++++++++++++++++++++++++++++++ slixfeed/xmpp/roster.py | 54 +++ slixfeed/xmpp/state.py | 63 +++ slixfeed/xmpp/utility.py | 43 ++ 16 files changed, 1553 insertions(+), 1413 deletions(-) create mode 100644 slixfeed/utility.py create mode 100644 slixfeed/xmpp/process.py create mode 100644 slixfeed/xmpp/roster.py create mode 100644 slixfeed/xmpp/state.py create mode 100644 slixfeed/xmpp/utility.py diff --git a/slixfeed/__main__.py b/slixfeed/__main__.py index a9de02c..89d2b42 100644 --- a/slixfeed/__main__.py +++ b/slixfeed/__main__.py @@ -55,7 +55,8 @@ TODO Did you know that you can follow you favorite Mastodon feeds by just sending the URL address? Supported fediverse websites are: - Akkoma, HubZilla, Mastodon, Misskey, Pixelfed, Pleroma, Soapbox. + Akkoma, Firefish (Calckey), Friendica, HubZilla, + Mastodon, Misskey, Pixelfed, Pleroma, Socialhome, Soapbox. 15) Brand: News Broker, Newsman, Newsdealer, Laura Harbinger @@ -88,14 +89,13 @@ import os # # with start_action(action_type="message()", msg=msg): #import slixfeed.irchandler +from slixfeed.config import get_value from slixfeed.xmpp.client import Slixfeed #import slixfeed.matrixhandler class Jabber: - def __init__(self, jid, password, nick): - # Setup the Slixfeed and register plugins. Note that while plugins may # have interdependencies, the order in which you register them does # not matter. @@ -104,11 +104,31 @@ class Jabber: xmpp.register_plugin('xep_0030') # Service Discovery xmpp.register_plugin('xep_0045') # Multi-User Chat xmpp.register_plugin('xep_0048') # Bookmarks - xmpp.register_plugin('xep_0060') # PubSub - xmpp.register_plugin('xep_0199', {'keepalive': True, 'frequency': 15}) # XMPP Ping + xmpp.register_plugin('xep_0060') # Publish-Subscribe + # xmpp.register_plugin('xep_0065') # SOCKS5 Bytestreams + xmpp.register_plugin('xep_0199', {'keepalive': True}) # XMPP Ping xmpp.register_plugin('xep_0249') # Multi-User Chat xmpp.register_plugin('xep_0402') # PEP Native Bookmarks - + + # proxy_enabled = get_value("accounts", "XMPP Connect", "proxy_enabled") + # if proxy_enabled == '1': + # values = get_value("accounts", "XMPP Connect", [ + # "proxy_host", + # "proxy_port", + # "proxy_username", + # "proxy_password" + # ]) + # print("Proxy is enabled: {}:{}".format(values[0], values[1])) + # xmpp.use_proxy = True + # xmpp.proxy_config = { + # 'host': values[0], + # 'port': values[1], + # 'username': values[2], + # 'password': values[3] + # } + # proxy = {'socks5': (values[0], values[1])} + # xmpp.proxy = {'socks5': ('localhost', 9050)} + # Connect to the XMPP server and start processing XMPP stanzas. xmpp.connect() xmpp.process() diff --git a/slixfeed/config.py b/slixfeed/config.py index a77cb8e..6b38be4 100644 --- a/slixfeed/config.py +++ b/slixfeed/config.py @@ -16,19 +16,62 @@ TODO Refer to sqlitehandler.search_entries for implementation. It is expected to be more complex than function search_entries. -""" +5) Copy file from /etc/slixfeed/ or /usr/share/slixfeed/ +""" import configparser # from file import get_default_confdir import slixfeed.config as config import slixfeed.sqlite as sqlite import os -from random import randrange +# from random import randrange import sys import yaml -async def get_value_default(key, section): + +def get_value(filename, section, keys): + """ + Get setting value. + + Parameters + ---------- + filename : str + INI filename. + keys : list or str + A single key as string or multiple keys as list. + section : str + INI Section. + + Returns + ------- + result : list or str + A single value as string or multiple values as list. + """ + config_res = configparser.RawConfigParser() + config_dir = config.get_default_confdir() + # if not os.path.isdir(config_dir): + # config_dir = '/usr/share/slixfeed/' + if not os.path.isdir(config_dir): + os.mkdir(config_dir) + config_file = os.path.join(config_dir, filename + ".ini") + config_res.read(config_file) + if config_res.has_section(section): + section_res = config_res[section] + if isinstance(keys, list): + result = [] + for key in keys: + result.extend([section_res[key]]) + elif isinstance(keys, str): + key = keys + result = section_res[key] + return result + + +# TODO Store config file as an object in runtime, otherwise +# the file will be opened time and time again. +# TODO Copy file from /etc/slixfeed/ or /usr/share/slixfeed/ +def get_value_default(filename, section, key): """ Get settings default value. @@ -47,17 +90,14 @@ async def get_value_default(key, section): config_dir = config.get_default_confdir() if not os.path.isdir(config_dir): config_dir = '/usr/share/slixfeed/' - config_file = os.path.join(config_dir, r"settings.ini") + config_file = os.path.join(config_dir, filename + ".ini") config_res.read(config_file) if config_res.has_section(section): result = config_res[section][key] - isinstance(result, int) - isinstance(result, str) - breakpoint return result -async def get_list(filename): +def get_list(filename): """ Get settings default value. @@ -149,7 +189,7 @@ def get_default_confdir(): return os.path.join(config_home, 'slixfeed') -async def initdb(jid, callback, message=None): +def get_pathname_to_database(jid): """ Callback function to instantiate action on database. @@ -173,11 +213,12 @@ async def initdb(jid, callback, message=None): os.mkdir(db_dir) db_file = os.path.join(db_dir, r"{}.db".format(jid)) sqlite.create_tables(db_file) + return db_file # await set_default_values(db_file) - if message: - return await callback(db_file, message) - else: - return await callback(db_file) + # if message: + # return await callback(db_file, message) + # else: + # return await callback(db_file) async def add_to_list(newwords, keywords): diff --git a/slixfeed/fetch.py b/slixfeed/fetch.py index b928d41..89ff2d9 100644 --- a/slixfeed/fetch.py +++ b/slixfeed/fetch.py @@ -14,7 +14,7 @@ TODO 2) Check also for HTML, not only feed.bozo. -3) Add "if is_feed(url, feed)" to view_entry and view_feed +3) Add "if utility.is_feed(url, feed)" to view_entry and view_feed 4) Refactor view_entry and view_feed - Why "if" twice? @@ -24,17 +24,18 @@ from aiohttp import ClientError, ClientSession, ClientTimeout from asyncio import TimeoutError from asyncio.exceptions import IncompleteReadError from bs4 import BeautifulSoup -import slixfeed.config as config -from slixfeed.datetime import now, rfc2822_to_iso8601 from email.utils import parseaddr from feedparser import parse from http.client import IncompleteRead from lxml import html +import slixfeed.config as config +from slixfeed.datetime import now, rfc2822_to_iso8601 +import slixfeed.utility as utility import slixfeed.sqlite as sqlite from slixfeed.url import complete_url, join_url, trim_url from urllib import error # from xml.etree.ElementTree import ElementTree, ParseError -from urllib.parse import urljoin, urlsplit, urlunsplit +from urllib.parse import urlsplit, urlunsplit # NOTE Why (if res[0]) and (if res[1] == 200)? async def download_updates(db_file, url=None): @@ -262,9 +263,9 @@ async def view_feed(url): # breakpoint() if result[1] == 200: feed = parse(result[0]) - title = get_title(url, feed) + title = utility.get_title(url, feed) entries = feed.entries - msg = "Preview of {}:\n```\n".format(title) + msg = "Preview of {}:\n\n```\n".format(title) counter = 0 for entry in entries: counter += 1 @@ -339,7 +340,7 @@ async def view_entry(url, num): # breakpoint() if result[1] == 200: feed = parse(result[0]) - title = get_title(url, result[0]) + title = utility.get_title(url, result[0]) entries = feed.entries num = int(num) - 1 entry = entries[num] @@ -447,8 +448,8 @@ async def add_feed(db_file, url): res = await download_feed(url) if res[0]: feed = parse(res[0]) - title = get_title(url, feed) - if is_feed(url, feed): + title = utility.get_title(url, feed) + if utility.is_feed(url, feed): status = res[1] msg = await sqlite.insert_feed( db_file, @@ -533,17 +534,23 @@ async def download_feed(url): Document or error message. """ try: - user_agent = await config.get_value_default("user-agent", "Network") + user_agent = config.get_value_default("settings", "Network", "user-agent") except: user_agent = "Slixfeed/0.1" if not len(user_agent): user_agent = "Slixfeed/0.1" + proxy = config.get_value("settings", "Network", "http_proxy") timeout = ClientTimeout(total=10) headers = {'User-Agent': user_agent} async with ClientSession(headers=headers) as session: # async with ClientSession(trust_env=True) as session: try: - async with session.get(url, timeout=timeout) as response: + async with session.get( + url, + proxy=proxy, + # proxy_auth=(proxy_username, proxy_password) + timeout=timeout + ) as response: status = response.status if response.status == 200: try: @@ -584,31 +591,6 @@ async def download_feed(url): return msg -def get_title(url, feed): - """ - Get title of feed. - - Parameters - ---------- - url : str - URL. - feed : dict - Parsed feed document. - - Returns - ------- - title : str - Title or URL hostname. - """ - try: - title = feed["feed"]["title"] - except: - title = urlsplit(url).netloc - if not title: - title = urlsplit(url).netloc - return title - - # TODO Improve scan by gradual decreasing of path async def feed_mode_request(url, tree): """ @@ -630,7 +612,7 @@ async def feed_mode_request(url, tree): """ feeds = {} parted_url = urlsplit(url) - paths = await config.get_list("lists.yaml") + paths = config.get_list("lists.yaml") paths = paths["pathnames"] for path in paths: address = urlunsplit([ @@ -650,7 +632,9 @@ async def feed_mode_request(url, tree): title = '*** No Title ***' feeds[address] = title # Check whether URL has path (i.e. not root) - if parted_url.path.split('/')[1]: + # Check parted_url.path to avoid error in case root wasn't given + # TODO Make more tests + if parted_url.path and parted_url.path.split('/')[1]: paths.extend( [".atom", ".feed", ".rdf", ".rss"] ) if '.rss' not in paths else -1 @@ -673,8 +657,9 @@ async def feed_mode_request(url, tree): if len(feeds) > 1: counter = 0 msg = ( - "RSS URL discovery has found {} feeds:\n```\n" + "RSS URL discovery has found {} feeds:\n\n```\n" ).format(len(feeds)) + feed_mark = 0 for feed in feeds: try: feed_name = feeds[feed]["feed"]["title"] @@ -686,7 +671,6 @@ async def feed_mode_request(url, tree): feed_amnt = len(feeds[feed].entries) except: continue - feed_mark = 0 if feed_amnt: # NOTE Because there could be many false positives # which are revealed in second phase of scan, we @@ -741,7 +725,7 @@ async def feed_mode_scan(url, tree): feeds = {} # paths = [] # TODO Test - paths = await config.get_list("lists.yaml") + paths = config.get_list("lists.yaml") paths = paths["pathnames"] for path in paths: # xpath_query = "//*[@*[contains(.,'{}')]]".format(path) @@ -794,8 +778,9 @@ async def feed_mode_scan(url, tree): # breakpoint() counter = 0 msg = ( - "RSS URL scan has found {} feeds:\n```\n" + "RSS URL scan has found {} feeds:\n\n```\n" ).format(len(feeds)) + feed_mark = 0 for feed in feeds: # try: # res = await download_feed(feed) @@ -807,7 +792,6 @@ async def feed_mode_scan(url, tree): feed_name = urlsplit(feed).netloc feed_addr = feed feed_amnt = len(feeds[feed].entries) - feed_mark = 0 if feed_amnt: # NOTE Because there could be many false positives # which are revealed in second phase of scan, we @@ -872,7 +856,7 @@ async def feed_mode_auto_discovery(url, tree): feeds = tree.xpath(xpath_query) if len(feeds) > 1: msg = ( - "RSS Auto-Discovery has found {} feeds:\n```\n" + "RSS Auto-Discovery has found {} feeds:\n\n```\n" ).format(len(feeds)) for feed in feeds: # # The following code works; @@ -896,46 +880,3 @@ async def feed_mode_auto_discovery(url, tree): elif feeds: feed_addr = join_url(url, feeds[0].xpath('@href')[0]) return [feed_addr] - - -def is_feed(url, feed): - """ - Determine whether document is feed or not. - - Parameters - ---------- - url : str - URL. - feed : dict - Parsed feed. - - Returns - ------- - val : boolean - True or False. - """ - msg = None - if not feed.entries: - try: - feed["feed"]["title"] - val = True - msg = ( - "Empty feed for {}" - ).format(url) - except: - val = False - msg = ( - "No entries nor title for {}" - ).format(url) - elif feed.bozo: - val = False - msg = ( - "Bozo detected for {}" - ).format(url) - else: - val = True - msg = ( - "Good feed for {}" - ).format(url) - print(msg) - return val diff --git a/slixfeed/sqlite.py b/slixfeed/sqlite.py index 94860e0..b59b0a9 100644 --- a/slixfeed/sqlite.py +++ b/slixfeed/sqlite.py @@ -311,7 +311,7 @@ async def check_feed_exist(db_file, url): return result -async def get_number_of_items(db_file, table): +def get_number_of_items(db_file, table): """ Return number of entries or feeds. @@ -337,7 +337,7 @@ async def get_number_of_items(db_file, table): return count -async def get_number_of_feeds_active(db_file): +def get_number_of_feeds_active(db_file): """ Return number of active feeds. @@ -362,7 +362,7 @@ async def get_number_of_feeds_active(db_file): return count -async def get_number_of_entries_unread(db_file): +def get_number_of_entries_unread(db_file): """ Return number of unread items. @@ -491,8 +491,8 @@ async def get_entry_unread(db_file, num=None): # summary = ["> " + line for line in summary] # summary = "\n".join(summary) link = result[2] - link = await remove_tracking_parameters(link) - link = (await replace_hostname(link, "link")) or link + link = remove_tracking_parameters(link) + link = (replace_hostname(link, "link")) or link sql = ( "SELECT name " "FROM feeds " @@ -504,19 +504,11 @@ async def get_entry_unread(db_file, num=None): if num > 1: news_list += ( "\n{}\n{}\n{}\n" - ).format( - str(title), - str(link), - str(feed) - ) + ).format(str(title), str(link), str(feed)) else: news_list = ( "{}\n{}\n{}" - ).format( - str(title), - str(link), - str(feed) - ) + ).format(str(title), str(link), str(feed)) # TODO While `async with DBLOCK` does work well from # outside of functions, it would be better practice # to place it within the functions. @@ -625,53 +617,31 @@ async def statistics(db_file): msg : str Statistics as message. """ - feeds = await get_number_of_items(db_file, 'feeds') - active_feeds = await get_number_of_feeds_active(db_file) - entries = await get_number_of_items(db_file, 'entries') - archive = await get_number_of_items(db_file, 'archive') - unread_entries = await get_number_of_entries_unread(db_file) + values = [] + values.extend([get_number_of_entries_unread(db_file)]) + entries = get_number_of_items(db_file, 'entries') + archive = get_number_of_items(db_file, 'archive') + values.extend([entries + archive]) + values.extend([get_number_of_feeds_active(db_file)]) + values.extend([get_number_of_items(db_file, 'feeds')]) # msg = """You have {} unread news items out of {} from {} news sources. # """.format(unread_entries, entries, feeds) with create_connection(db_file) as conn: cur = conn.cursor() - vals = [] - for key in [ - "archive", - "interval", - "quantum", - "enabled" - ]: + for key in ["archive", "interval", + "quantum", "enabled"]: sql = ( "SELECT value " "FROM settings " "WHERE key = ?" ) try: - val = cur.execute(sql, (key,)).fetchone()[0] + value = cur.execute(sql, (key,)).fetchone()[0] except: print("Error for key:", key) - val = "none" - vals.extend([val]) - msg = ( - "```" - "\nSTATISTICS\n" - "News items : {} / {}\n" - "News sources : {} / {}\n" - "\nOPTIONS\n" - "Items to archive : {}\n" - "Update interval : {}\n" - "Items per update : {}\n" - "Operation status : {}\n" - "```" - ).format( - unread_entries, entries + archive, - active_feeds, feeds, - vals[0], - vals[1], - vals[2], - vals[3] - ) - return msg + value = "none" + values.extend([value]) + return values async def update_statistics(cur): @@ -684,9 +654,9 @@ async def update_statistics(cur): Cursor object. """ stat_dict = {} - stat_dict["feeds"] = await get_number_of_items(cur, 'feeds') - stat_dict["entries"] = await get_number_of_items(cur, 'entries') - stat_dict["unread"] = await get_number_of_entries_unread(cur=cur) + stat_dict["feeds"] = get_number_of_items(cur, 'feeds') + stat_dict["entries"] = get_number_of_items(cur, 'entries') + stat_dict["unread"] = get_number_of_entries_unread(cur=cur) for i in stat_dict: sql = ( "SELECT id " @@ -913,13 +883,14 @@ async def add_entry(cur, entry): try: cur.execute(sql, entry) except: - print(current_time(), "COROUTINE OBJECT NOW") + print("Unknown error for sqlite.add_entry") + # print(current_time(), "COROUTINE OBJECT NOW") # for i in entry: # print(type(i)) # print(i) # print(type(entry)) print(entry) - print(current_time(), "COROUTINE OBJECT NOW") + # print(current_time(), "COROUTINE OBJECT NOW") # breakpoint() @@ -1147,7 +1118,7 @@ async def get_feeds_url(db_file): return result -async def list_feeds(db_file): +async def get_feeds(db_file): """ Query table feeds and list items. @@ -1158,8 +1129,8 @@ async def list_feeds(db_file): Returns ------- - msg : str - URLs of feeds as message. + msg : ??? + URLs of feeds. """ cur = get_cursor(db_file) sql = ( @@ -1167,37 +1138,11 @@ async def list_feeds(db_file): "FROM feeds" ) results = cur.execute(sql) - feeds_list = "\nList of subscriptions:\n```\n" - counter = 0 - for result in results: - counter += 1 - feeds_list += ( - "Name : {}\n" - "Address : {}\n" - "Updated : {}\n" - "Status : {}\n" - "ID : {}\n" - "\n" - ).format( - str(result[0]), - str(result[1]), - str(result[2]), - str(result[3]), - str(result[4]) - ) - if counter: - return feeds_list + ( - "```\nTotal of {} subscriptions.\n" - ).format(counter) - else: - msg = ( - "List of subscriptions is empty.\n" - "To add feed, send a URL\n" - "Try these:\n" - # TODO Pick random from featured/recommended - "https://reclaimthenet.org/feed/" - ) - return msg + print("type of resilts in sqlite.py is:") + print(type(results)) + print(results) + breakpoint() + return results async def last_entries(db_file, num): @@ -1235,21 +1180,6 @@ async def last_entries(db_file, num): "LIMIT :num " ) results = cur.execute(sql, (num,)) - titles_list = "Recent {} titles:\n```".format(num) - counter = 0 - for result in results: - counter += 1 - titles_list += ( - "\n{}\n{}\n" - ).format( - str(result[0]), - str(result[1]) - ) - if counter: - titles_list += "```\n" - return titles_list - else: - return "There are no news at the moment." async def search_feeds(db_file, query): @@ -1277,28 +1207,7 @@ async def search_feeds(db_file, query): "LIMIT 50" ) results = cur.execute(sql, [f'%{query}%', f'%{query}%']) - results_list = ( - "Feeds containing '{}':\n```" - ).format(query) - counter = 0 - for result in results: - counter += 1 - results_list += ( - "\nName : {}" - "\nURL : {}" - "\nIndex : {}" - "\nMode : {}" - "\n" - ).format( - str(result[0]), - str(result[1]), - str(result[2]), - str(result[3]) - ) - if counter: - return results_list + "\n```\nTotal of {} feeds".format(counter) - else: - return "No feeds were found for: {}".format(query) + return results async def search_entries(db_file, query): @@ -1332,22 +1241,7 @@ async def search_entries(db_file, query): f'%{query}%', f'%{query}%' )) - results_list = ( - "Search results for '{}':\n```" - ).format(query) - counter = 0 - for result in results: - counter += 1 - results_list += ( - "\n{}\n{}\n" - ).format( - str(result[0]), - str(result[1]) - ) - if counter: - return results_list + "```\nTotal of {} results".format(counter) - else: - return "No results were found for: {}".format(query) + return results """ FIXME @@ -1471,7 +1365,7 @@ async def set_settings_value(db_file, key_value): # key = "enabled" # val = 0 key = key_value[0] - val = key_value[1] + value = key_value[1] async with DBLOCK: with create_connection(db_file) as conn: cur = conn.cursor() @@ -1483,7 +1377,7 @@ async def set_settings_value(db_file, key_value): ) cur.execute(sql, { "key": key, - "value": val + "value": value }) @@ -1509,7 +1403,7 @@ async def set_settings_value_default(cur, key): # sql = "SELECT id FROM settings WHERE key = ?" # cur.execute(sql, (i,)) # if not cur.fetchone(): -# val = await settings.get_value_default(i) +# val = settings.get_value_default(i) # sql = "INSERT INTO settings(key,value) VALUES(?,?)" # cur.execute(sql, (i, val)) sql = ( @@ -1519,14 +1413,14 @@ async def set_settings_value_default(cur, key): ) cur.execute(sql, (key,)) if not cur.fetchone(): - val = await config.get_value_default(key, "Settings") + value = config.get_value_default("settings", "Settings", key) sql = ( "INSERT " "INTO settings(key,value) " "VALUES(?,?)" ) - cur.execute(sql, (key, val)) - return val + cur.execute(sql, (key, value)) + return value async def get_settings_value(db_file, key): @@ -1553,9 +1447,9 @@ async def get_settings_value(db_file, key): # cur.execute(sql, (key,)) # result = cur.fetchone() # except: - # result = await settings.get_value_default(key) + # result = settings.get_value_default(key) # if not result: - # result = await settings.get_value_default(key) + # result = settings.get_value_default(key) # return result with create_connection(db_file) as conn: try: @@ -1636,7 +1530,7 @@ async def set_filters_value_default(cur, key): ) cur.execute(sql, (key,)) if not cur.fetchone(): - val = await config.get_list("lists.yaml") + val = config.get_list("lists.yaml") val = val[key] val = ",".join(val) sql = ( diff --git a/slixfeed/task.py b/slixfeed/task.py index 7d9aa67..94ae06c 100644 --- a/slixfeed/task.py +++ b/slixfeed/task.py @@ -44,7 +44,10 @@ import logging import os import slixmpp -from slixfeed.config import initdb, get_default_dbdir, get_value_default +from slixfeed.config import ( + get_pathname_to_database, + get_default_dbdir, + get_value_default) from slixfeed.datetime import current_time from slixfeed.fetch import download_updates from slixfeed.sqlite import ( @@ -55,6 +58,7 @@ from slixfeed.sqlite import ( ) # from xmpp import Slixfeed import slixfeed.xmpp.client as xmpp +import slixfeed.xmpp.utility as utility main_task = [] jid_tasker = {} @@ -87,16 +91,13 @@ async def start_tasks_xmpp(self, jid, tasks): match task: case "check": task_manager[jid]["check"] = asyncio.create_task( - check_updates(jid) - ) + check_updates(jid)) case "status": task_manager[jid]["status"] = asyncio.create_task( - send_status(self, jid) - ) + send_status(self, jid)) case "interval": task_manager[jid]["interval"] = asyncio.create_task( - send_update(self, jid) - ) + send_update(self, jid)) # for task in task_manager[jid].values(): # print("task_manager[jid].values()") # print(task_manager[jid].values()) @@ -107,6 +108,7 @@ async def start_tasks_xmpp(self, jid, tasks): # breakpoint() # await task + async def clean_tasks_xmpp(jid, tasks): # print("clean_tasks_xmpp", jid, tasks) for task in tasks: @@ -140,25 +142,19 @@ async def task_jid(self, jid): jid : str Jabber ID. """ - enabled = await initdb( - jid, - get_settings_value, - "enabled" - ) + db_file = get_pathname_to_database(jid) + enabled = await get_settings_value(db_file, "enabled") # print(await current_time(), "enabled", enabled, jid) if enabled: # NOTE Perhaps we want to utilize super with keyword # arguments in order to know what tasks to initiate. task_manager[jid] = {} task_manager[jid]["check"] = asyncio.create_task( - check_updates(jid) - ) + check_updates(jid)) task_manager[jid]["status"] = asyncio.create_task( - send_status(self, jid) - ) + send_status(self, jid)) task_manager[jid]["interval"] = asyncio.create_task( - send_update(self, jid) - ) + send_update(self, jid)) await task_manager[jid]["check"] await task_manager[jid]["status"] await task_manager[jid]["interval"] @@ -200,37 +196,22 @@ async def send_update(self, jid, num=None): """ # print("Starting send_update()") # print(jid) - enabled = await initdb( - jid, - get_settings_value, - "enabled" - ) + db_file = get_pathname_to_database(jid) + enabled = await get_settings_value(db_file, "enabled") if enabled: - new = await initdb( - jid, - get_entry_unread, - num - ) + new = await get_entry_unread(db_file, num) if new: # TODO Add while loop to assure delivery. # print(await current_time(), ">>> ACT send_message",jid) - chat_type = await xmpp.Slixfeed.is_muc(self, jid) + chat_type = await utility.jid_type(self, jid) # NOTE Do we need "if statement"? See NOTE at is_muc. if chat_type in ("chat", "groupchat"): xmpp.Slixfeed.send_message( - self, - mto=jid, - mbody=new, - mtype=chat_type - ) + self, mto=jid, mbody=new, mtype=chat_type) # TODO Do not refresh task before # verifying that it was completed. await refresh_task( - self, - jid, - send_update, - "interval" - ) + self, jid, send_update, "interval") # interval = await initdb( # jid, # get_settings_value, @@ -269,20 +250,13 @@ async def send_status(self, jid): """ # print(await current_time(), "> SEND STATUS",jid) status_text="πŸ€–οΈ Slixfeed RSS News Bot" - enabled = await initdb( - jid, - get_settings_value, - "enabled" - ) + db_file = get_pathname_to_database(jid) + enabled = await get_settings_value(db_file, "enabled") if not enabled: status_mode = "xa" status_text = "πŸ“«οΈ Send \"Start\" to receive updates" else: - feeds = await initdb( - jid, - get_number_of_items, - "feeds" - ) + feeds = get_number_of_items(db_file, "feeds") # print(await current_time(), jid, "has", feeds, "feeds") if not feeds: print(">>> not feeds:", feeds, "jid:", jid) @@ -291,10 +265,7 @@ async def send_status(self, jid): "πŸ“ͺ️ Send a URL from a blog or a news website" ) else: - unread = await initdb( - jid, - get_number_of_entries_unread - ) + unread = await get_number_of_entries_unread(db_file) if unread: status_mode = "chat" status_text = ( @@ -321,12 +292,7 @@ async def send_status(self, jid): ) # await asyncio.sleep(60 * 20) await refresh_task( - self, - jid, - send_status, - "status", - "20" - ) + self, jid, send_status, "status", "20") # loop.call_at( # loop.time() + 60 * 20, # loop.create_task, @@ -349,11 +315,8 @@ async def refresh_task(self, jid, callback, key, val=None): Value. The default is None. """ if not val: - val = await initdb( - jid, - get_settings_value, - key - ) + db_file = get_pathname_to_database(jid) + val = await get_settings_value(db_file, key) # if task_manager[jid][key]: if jid in task_manager: try: @@ -401,8 +364,9 @@ async def check_updates(jid): """ while True: # print(await current_time(), "> CHCK UPDATE",jid) - await initdb(jid, download_updates) - val = await get_value_default("check", "Settings") + db_file = get_pathname_to_database(jid) + await download_updates(db_file) + val = get_value_default("settings", "Settings", "check") await asyncio.sleep(60 * float(val)) # Schedule to call this function again in 90 minutes # loop.call_at( @@ -412,6 +376,55 @@ async def check_updates(jid): # ) +async def start_tasks(self, presence): + # print("def presence_available", presence["from"].bare) + jid = presence["from"].bare + if jid not in self.boundjid.bare: + await clean_tasks_xmpp( + jid, ["interval", "status", "check"]) + await start_tasks_xmpp( + self, jid, ["interval", "status", "check"]) + # await task_jid(self, jid) + # main_task.extend([asyncio.create_task(task_jid(jid))]) + # print(main_task) + + +async def stop_tasks(self, presence): + if not self.boundjid.bare: + jid = presence["from"].bare + print(">>> unavailable:", jid) + await clean_tasks_xmpp( + jid, ["interval", "status", "check"]) + + +async def check_readiness(self, presence): + """ + Begin tasks if available, otherwise eliminate tasks. + + Parameters + ---------- + presence : str + XML stanza . + + Returns + ------- + None. + """ + # print("def check_readiness", presence["from"].bare, presence["type"]) + # # available unavailable away (chat) dnd xa + # print(">>> type", presence["type"], presence["from"].bare) + # # away chat dnd xa + # print(">>> show", presence["show"], presence["from"].bare) + + jid = presence["from"].bare + if presence["show"] in ("away", "dnd", "xa"): + print(">>> away, dnd, xa:", jid) + await clean_tasks_xmpp( + jid, ["interval"]) + await start_tasks_xmpp( + self, jid, ["status", "check"]) + + """ NOTE This is an older system, utilizing local storage instead of XMPP presence. diff --git a/slixfeed/url.py b/slixfeed/url.py index dd14191..db52dd2 100644 --- a/slixfeed/url.py +++ b/slixfeed/url.py @@ -31,7 +31,7 @@ from urllib.parse import ( # proxies.yaml. Perhaps a better practice would be to have # them separated. File proxies.yaml will remainas is in order # to be coordinated with the dataset of project LibRedirect. -async def replace_hostname(url, url_type): +def replace_hostname(url, url_type): """ Replace hostname. @@ -54,7 +54,7 @@ async def replace_hostname(url, url_type): pathname = parted_url.path queries = parted_url.query fragment = parted_url.fragment - proxies = await config.get_list("proxies.yaml") + proxies = config.get_list("proxies.yaml") for proxy in proxies: proxy = proxies[proxy] if hostname in proxy["hostname"] and url_type in proxy["type"]: @@ -72,7 +72,7 @@ async def replace_hostname(url, url_type): return url -async def remove_tracking_parameters(url): +def remove_tracking_parameters(url): """ Remove queries with tracking parameters. @@ -92,7 +92,7 @@ async def remove_tracking_parameters(url): pathname = parted_url.path queries = parse_qs(parted_url.query) fragment = parted_url.fragment - trackers = await config.get_list("queries.yaml") + trackers = config.get_list("queries.yaml") trackers = trackers["trackers"] for tracker in trackers: if tracker in queries: del queries[tracker] diff --git a/slixfeed/utility.py b/slixfeed/utility.py new file mode 100644 index 0000000..07d0cfa --- /dev/null +++ b/slixfeed/utility.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" + +TODO + +1) is_feed: Look into the type ("atom", "rss2" etc.) + +""" + +from urllib.parse import urlsplit + + +def log_as_markdown(timestamp, filename, jid, message): + """ + Log message to file. + + Parameters + ---------- + timestamp : str + Time stamp. + filename : str + Jabber ID as name of file. + jid : str + Jabber ID. + message : str + Message content. + + Returns + ------- + None. + + """ + with open(filename + '.md', 'a') as file: + # entry = "{} {}:\n{}\n\n".format(timestamp, jid, message) + entry = ( + "## {}\n" + "### {}\n\n" + "{}\n\n").format(jid, timestamp, message) + file.write(entry) + + +def get_title(url, feed): + """ + Get title of feed. + + Parameters + ---------- + url : str + URL. + feed : dict + Parsed feed document. + + Returns + ------- + title : str + Title or URL hostname. + """ + try: + title = feed["feed"]["title"] + except: + title = urlsplit(url).netloc + if not title: + title = urlsplit(url).netloc + return title + + +def is_feed(url, feed): + """ + Determine whether document is feed or not. + + Parameters + ---------- + url : str + URL. + feed : dict + Parsed feed. + + Returns + ------- + val : boolean + True or False. + """ + msg = None + if not feed.entries: + try: + feed["feed"]["title"] + val = True + msg = ( + "Empty feed for {}" + ).format(url) + except: + val = False + msg = ( + "No entries nor title for {}" + ).format(url) + elif feed.bozo: + val = False + msg = ( + "Bozo detected for {}" + ).format(url) + else: + val = True + msg = ( + "Good feed for {}" + ).format(url) + print(msg) + return val diff --git a/slixfeed/xmpp/bookmark.py b/slixfeed/xmpp/bookmark.py index 2eb3749..12cd5ac 100644 --- a/slixfeed/xmpp/bookmark.py +++ b/slixfeed/xmpp/bookmark.py @@ -40,6 +40,13 @@ async def add(self, muc_jid): # await self['xep_0402'].publish(bm) +async def get(self): + result = await self.plugin['xep_0048'].get_bookmarks() + bookmarks = result["private"]["bookmarks"] + conferences = bookmarks["conferences"] + return conferences + + async def remove(self, muc_jid): result = await self.plugin['xep_0048'].get_bookmarks() bookmarks = result["private"]["bookmarks"] diff --git a/slixfeed/xmpp/client.py b/slixfeed/xmpp/client.py index 0e431da..aed6668 100644 --- a/slixfeed/xmpp/client.py +++ b/slixfeed/xmpp/client.py @@ -48,7 +48,7 @@ NOTE """ import asyncio -from slixfeed.config import add_to_list, initdb, get_list, remove_from_list +from slixfeed.config import add_to_list, get_list, remove_from_list import slixfeed.fetch as fetcher from slixfeed.datetime import current_time import logging @@ -69,10 +69,13 @@ import xmltodict import xml.etree.ElementTree as ET from lxml import etree -import slixfeed.xmpp.compose as compose import slixfeed.xmpp.connect as connect +import slixfeed.xmpp.process as process import slixfeed.xmpp.muc as muc +import slixfeed.xmpp.roster as roster +import slixfeed.xmpp.state as state import slixfeed.xmpp.status as status +import slixfeed.xmpp.utility as utility main_task = [] jid_tasker = {} @@ -108,42 +111,39 @@ class Slixfeed(slixmpp.ClientXMPP): # and the XML streams are ready for use. We want to # listen for this event so that we we can initialize # our roster. - self.add_event_handler("session_start", self.start_session) - self.add_event_handler("session_resumed", self.start_session) - self.add_event_handler("session_start", self.autojoin_muc) - self.add_event_handler("session_resumed", self.autojoin_muc) + self.add_event_handler("session_start", self.on_session_start) + self.add_event_handler("session_resumed", self.on_session_resumed) self.add_event_handler("got_offline", print("got_offline")) # self.add_event_handler("got_online", self.check_readiness) - self.add_event_handler("changed_status", self.check_readiness) - self.add_event_handler("presence_unavailable", self.stop_tasks) + self.add_event_handler("changed_status", self.on_changed_status) + self.add_event_handler("presence_available", self.on_presence_available) + self.add_event_handler("presence_unavailable", self.on_presence_unavailable) - # self.add_event_handler("changed_subscription", self.check_subscription) + self.add_event_handler("changed_subscription", self.on_changed_subscription) - # self.add_event_handler("chatstate_active", self.check_chatstate_active) - # self.add_event_handler("chatstate_gone", self.check_chatstate_gone) + self.add_event_handler("chatstate_active", self.on_chatstate_active) + self.add_event_handler("chatstate_gone", self.on_chatstate_gone) self.add_event_handler("chatstate_composing", self.check_chatstate_composing) self.add_event_handler("chatstate_paused", self.check_chatstate_paused) # The message event is triggered whenever a message # stanza is received. Be aware that that includes # MUC messages and error messages. - self.add_event_handler("message", self.process_message) - self.add_event_handler("message", self.settle) + self.add_event_handler("message", self.on_message) - self.add_event_handler("groupchat_invite", self.process_muc_invite) # XEP_0045 - self.add_event_handler("groupchat_direct_invite", self.process_muc_invite) # XEP_0249 + self.add_event_handler("groupchat_invite", self.on_groupchat_invite) # XEP_0045 + self.add_event_handler("groupchat_direct_invite", self.on_groupchat_direct_invite) # XEP_0249 # self.add_event_handler("groupchat_message", self.message) # self.add_event_handler("disconnected", self.reconnect) # self.add_event_handler("disconnected", self.inspect_connection) - self.add_event_handler("reactions", self.reactions) - self.add_event_handler("presence_available", self.presence_available) - self.add_event_handler("presence_error", self.presence_error) - self.add_event_handler("presence_subscribe", self.presence_subscribe) - self.add_event_handler("presence_subscribed", self.presence_subscribed) - self.add_event_handler("presence_unsubscribe", self.presence_unsubscribe) - self.add_event_handler("presence_unsubscribed", self.unsubscribe) + self.add_event_handler("reactions", self.on_reactions) + self.add_event_handler("presence_error", self.on_presence_error) + self.add_event_handler("presence_subscribe", self.on_presence_subscribe) + self.add_event_handler("presence_subscribed", self.on_presence_subscribed) + self.add_event_handler("presence_unsubscribe", self.on_presence_unsubscribe) + self.add_event_handler("presence_unsubscribed", self.on_presence_unsubscribed) # Initialize event loop # self.loop = asyncio.get_event_loop() @@ -154,110 +154,110 @@ class Slixfeed(slixmpp.ClientXMPP): self.add_event_handler("connection_failed", self.on_connection_failed) self.add_event_handler("session_end", self.on_session_end) - """ - FIXME - - This function is triggered even when status is dnd/away/xa. - This results in sending messages even when status is dnd/away/xa. - See function check_readiness. - - NOTE - - The issue occurs only at bot startup. - Once status is changed to dnd/away/xa, the interval stops - as expected. - - TODO - - Use "sleep()" - - """ - async def presence_available(self, presence): - # print("def presence_available", presence["from"].bare) - jid = presence["from"].bare - print("presence_available", jid) - if jid not in self.boundjid.bare: - await task.clean_tasks_xmpp( - jid, - ["interval", "status", "check"] - ) - await task.start_tasks_xmpp( - self, - jid, - ["interval", "status", "check"] - ) - # await task_jid(self, jid) - # main_task.extend([asyncio.create_task(task_jid(jid))]) - # print(main_task) - - async def stop_tasks(self, presence): - if not self.boundjid.bare: - jid = presence["from"].bare - print(">>> unavailable:", jid) - await task.clean_tasks_xmpp( - jid, - ["interval", "status", "check"] - ) + async def on_groupchat_invite(self, message): + print("on_groupchat_invite") + await muc.accept_invitation(self, message) - async def presence_error(self, presence): - print("presence_error") - print(presence) - - async def presence_subscribe(self, presence): - print("presence_subscribe") - print(presence) - - async def presence_subscribed(self, presence): - print("presence_subscribed") - print(presence) - - async def reactions(self, message): - print("reactions") - print(message) - - # async def accept_muc_invite(self, message, ctr=None): - # # if isinstance(message, str): - # if not ctr: - # ctr = message["from"].bare - # jid = message['groupchat_invite']['jid'] - # else: - # jid = message - async def process_muc_invite(self, message): - # operator muc_chat - inviter = message["from"].bare - muc_jid = message['groupchat_invite']['jid'] - await muc.join_groupchat(self, inviter, muc_jid) - - - async def autojoin_muc(self, event): - result = await self.plugin['xep_0048'].get_bookmarks() - bookmarks = result["private"]["bookmarks"] - conferences = bookmarks["conferences"] - for conference in conferences: - if conference["autojoin"]: - muc_jid = conference["jid"] - print(current_time(), "Autojoining groupchat", muc_jid) - self.plugin['xep_0045'].join_muc( - muc_jid, - self.nick, - # If a room password is needed, use: - # password=the_room_password, - ) + async def on_groupchat_direct_invite(self, message): + print("on_groupchat_direct_invite") + await muc.accept_invitation(self, message) async def on_session_end(self, event): - print(current_time(), "Session ended. Attempting to reconnect.") - print(event) - logging.warning("Session ended. Attempting to reconnect.") - await connect.recover_connection(self, event) + message = "Session has ended." + await connect.recover_connection(self, event, message) async def on_connection_failed(self, event): - print(current_time(), "Connection failed. Attempting to reconnect.") - print(event) - logging.warning("Connection failed. Attempting to reconnect.") - await connect.recover_connection(self, event) + message = "Connection has failed." + await connect.recover_connection(self, event, message) + + + async def on_session_start(self, event): + await process.event(self, event) + await muc.autojoin(self, event) + + + async def on_session_resumed(self, event): + await process.event(self, event) + await muc.autojoin(self, event) + + + # TODO Request for subscription + async def on_message(self, message): + jid = message["from"].bare + if "chat" == await utility.jid_type(self, jid): + await roster.add(self, jid) + await state.request(self, jid) + # chat_type = message["type"] + # message_body = message["body"] + # message_reply = message.reply + await process.message(self, message) + + + async def on_changed_status(self, presence): + await task.check_readiness(self, presence) + + + # TODO Request for subscription + async def on_presence_subscribe(self, presence): + jid = presence["from"].bare + await state.request(self, jid) + print("on_presence_subscribe") + print(presence) + + + async def on_presence_subscribed(self, presence): + jid = presence["from"].bare + process.greet(self, jid) + + + async def on_presence_available(self, presence): + await task.start_tasks(self, presence) + print("on_presence_available") + print(presence) + + + async def on_presence_unsubscribed(self, presence): + await state.unsubscribed(self, presence) + + + async def on_presence_unavailable(self, presence): + await task.stop_tasks(self, presence) + + + async def on_changed_subscription(self, presence): + print("on_changed_subscription") + print(presence) + jid = presence["from"].bare + # breakpoint() + + + async def on_presence_unsubscribe(self, presence): + print("on_presence_unsubscribe") + print(presence) + + + async def on_presence_error(self, presence): + print("on_presence_error") + print(presence) + + + async def on_reactions(self, message): + print("on_reactions") + print(message) + + + async def on_chatstate_active(self, message): + print("on_chatstate_active") + print(message) + + + async def on_chatstate_gone(self, message): + print("on_chatstate_gone") + print(message) async def check_chatstate_composing(self, message): @@ -286,266 +286,3 @@ class Slixfeed(slixmpp.ClientXMPP): 20 ) - - async def check_readiness(self, presence): - """ - If available, begin tasks. - If unavailable, eliminate tasks. - - Parameters - ---------- - presence : str - XML stanza . - - Returns - ------- - None. - """ - # print("def check_readiness", presence["from"].bare, presence["type"]) - # # available unavailable away (chat) dnd xa - # print(">>> type", presence["type"], presence["from"].bare) - # # away chat dnd xa - # print(">>> show", presence["show"], presence["from"].bare) - - jid = presence["from"].bare - if presence["show"] in ("away", "dnd", "xa"): - print(">>> away, dnd, xa:", jid) - await task.clean_tasks_xmpp( - jid, - ["interval"] - ) - await task.start_tasks_xmpp( - self, - jid, - ["status", "check"] - ) - - - async def resume(self, event): - print("def resume") - print(event) - self.send_presence() - await self.get_roster() - - - async def start_session(self, event): - """ - Process the session_start event. - - Typical actions for the session_start event are - requesting the roster and broadcasting an initial - presence stanza. - - Arguments: - event -- An empty dictionary. The session_start - event does not provide any additional - data. - """ - print("def start_session") - print(event) - self.send_presence() - await self.get_roster() - - # for task in main_task: - # task.cancel() - - # Deprecated in favour of event "presence_available" - # if not main_task: - # await select_file() - - - async def is_muc(self, jid): - """ - Check whether a JID is of MUC. - - Parameters - ---------- - jid : str - Jabber ID. - - Returns - ------- - str - "chat" or "groupchat. - """ - try: - iqresult = await self["xep_0030"].get_info(jid=jid) - features = iqresult["disco_info"]["features"] - # identity = iqresult['disco_info']['identities'] - # if 'account' in indentity: - # if 'conference' in indentity: - if 'http://jabber.org/protocol/muc' in features: - return "groupchat" - # TODO elif - # NOTE Is it needed? We do not interact with gateways or services - else: - return "chat" - # TODO Test whether this exception is realized - except IqTimeout as e: - messages = [ - ("Timeout IQ"), - ("IQ Stanza:", e), - ("Jabber ID:", jid) - ] - for message in messages: - print(current_time(), message) - logging.error(current_time(), message) - - - async def settle(self, msg): - """ - Add JID to roster and settle subscription. - - Parameters - ---------- - jid : str - Jabber ID. - - Returns - ------- - None. - """ - jid = msg["from"].bare - if await self.is_muc(jid): - # Check whether JID is in bookmarks; otherwise, add it. - print(jid, "is muc") - else: - await self.get_roster() - # Check whether JID is in roster; otherwise, add it. - if jid not in self.client_roster.keys(): - self.send_presence_subscription( - pto=jid, - ptype="subscribe", - pnick=self.nick - ) - self.update_roster( - jid, - subscription="both" - ) - # Check whether JID is subscribed; otherwise, ask for presence. - if not self.client_roster[jid]["to"]: - self.send_presence_subscription( - pto=jid, - pfrom=self.boundjid.bare, - ptype="subscribe", - pnick=self.nick - ) - self.send_message( - mto=jid, - # mtype="headline", - msubject="RSS News Bot", - mbody=( - "Accept subscription request to receive updates." - ), - mfrom=self.boundjid.bare, - mnick=self.nick - ) - self.send_presence( - pto=jid, - pfrom=self.boundjid.bare, - # Accept symbol πŸ‰‘οΈ πŸ‘οΈ ✍ - pstatus=( - "βœ’οΈ Accept subscription request to receive updates." - ), - # ptype="subscribe", - pnick=self.nick - ) - - - async def presence_unsubscribe(self, presence): - print("presence_unsubscribe") - print(presence) - - - async def unsubscribe(self, presence): - jid = presence["from"].bare - self.send_presence( - pto=jid, - pfrom=self.boundjid.bare, - pstatus="πŸ–‹οΈ Subscribe to receive updates", - pnick=self.nick - ) - self.send_message( - mto=jid, - mbody="You have been unsubscribed." - ) - self.update_roster( - jid, - subscription="remove" - ) - - - async def process_message(self, message): - """ - Process incoming message stanzas. Be aware that this also - includes MUC messages and error messages. It is usually - a good practice to check the messages's type before - processing or sending replies. - - Parameters - ---------- - message : str - The received message stanza. See the documentation - for stanza objects and the Message stanza to see - how it may be used. - """ - # print("message") - # print(message) - if message["type"] in ("chat", "groupchat", "normal"): - jid = message["from"].bare - if message["type"] == "groupchat": - # nick = message["from"][message["from"].index("/")+1:] - nick = str(message["from"]) - nick = nick[nick.index("/")+1:] - if (message['muc']['nick'] == self.nick or - not message["body"].startswith("!")): - return - # token = await initdb( - # jid, - # get_settings_value, - # "token" - # ) - # if token == "accepted": - # operator = await initdb( - # jid, - # get_settings_value, - # "masters" - # ) - # if operator: - # if nick not in operator: - # return - # approved = False - jid_full = str(message["from"]) - role = self.plugin['xep_0045'].get_jid_property( - jid, - jid_full[jid_full.index("/")+1:], - "role") - if role != "moderator": - return - # if role == "moderator": - # approved = True - # TODO Implement a list of temporary operators - # Once an operator is appointed, the control would last - # untile the participant has been disconnected from MUC - # An operator is a function to appoint non moderators. - # Changing nickname is fine and consist of no problem. - # if not approved: - # operator = await initdb( - # jid, - # get_settings_value, - # "masters" - # ) - # if operator: - # if nick in operator: - # approved = True - # if not approved: - # return - - # # Begin processing new JID - # # Deprecated in favour of event "presence_available" - # db_dir = get_default_dbdir() - # os.chdir(db_dir) - # if jid + ".db" not in os.listdir(): - # await task_jid(jid) - - await compose.message(self, jid, message) diff --git a/slixfeed/xmpp/compose.py b/slixfeed/xmpp/compose.py index 7b12f97..879775f 100644 --- a/slixfeed/xmpp/compose.py +++ b/slixfeed/xmpp/compose.py @@ -2,698 +2,137 @@ # -*- coding: utf-8 -*- """ - -TODO - -1) Deprecate "add" (see above) and make it interactive. - Slixfeed: Do you still want to add this URL to subscription list? - See: case _ if message_lowercase.startswith("add"): - +TODO Port functions insert_feed, remove_feed, get_entry_unread """ -from slixfeed.config import add_to_list, initdb, get_list, remove_from_list -from slixfeed.datetime import current_time -import slixfeed.fetch as fetcher -import slixfeed.sqlite as sqlite -import slixfeed.task as task -import slixfeed.url as uri -import slixfeed.xmpp.status as status -import slixfeed.xmpp.text as text +import slixfeed.xmpp.bookmark as bookmark +from slixfeed.url import remove_tracking_parameters, replace_hostname -async def message(self, jid, message): - action = None - message_text = " ".join(message["body"].split()) - if message["type"] == "groupchat": - message_text = message_text[1:] - message_lowercase = message_text.lower() - print(current_time(), "ACCOUNT: " + str(message["from"])) - print(current_time(), "COMMAND:", message_text) +def list_search_results(query, results): + results_list = ( + "Search results for '{}':\n\n```" + ).format(query) + counter = 0 + for result in results: + counter += 1 + results_list += ( + "\n{}\n{}\n" + ).format(str(result[0]), str(result[1])) + if counter: + return results_list + "```\nTotal of {} results".format(counter) + else: + return "No results were found for: {}".format(query) - match message_lowercase: - case "commands": - action = text.print_cmd() - case "help": - action = text.print_help() - case "info": - action = text.print_info() - case _ if message_lowercase in [ - "greetings", "hallo", "hello", "hey", - "hi", "hola", "holla", "hollo"]: - action = ( - "Greeting!\n" - "I'm Slixfeed, an RSS News Bot!\n" - "Send \"help\" for instructions." - ) - # print("task_manager[jid]") - # print(task_manager[jid]) - await self.get_roster() - print("roster 1") - print(self.client_roster) - print("roster 2") - print(self.client_roster.keys()) - print("jid") - print(jid) - await self.autojoin_muc() - # case _ if message_lowercase.startswith("activate"): - # if message["type"] == "groupchat": - # acode = message[9:] - # token = await initdb( - # jid, - # get_settings_value, - # "token" - # ) - # if int(acode) == token: - # await initdb( - # jid, - # set_settings_value, - # ["masters", nick] - # ) - # await initdb( - # jid, - # set_settings_value, - # ["token", "accepted"] - # ) - # action = "{}, your are in command.".format(nick) - # else: - # action = "Activation code is not valid." - # else: - # action = "This command is valid for groupchat only." - case _ if message_lowercase.startswith("add"): - message_text = message_text[4:] - url = message_text.split(" ")[0] - title = " ".join(message_text.split(" ")[1:]) - if url.startswith("http"): - action = await initdb( - jid, - fetcher.add_feed_no_check, - [url, title] - ) - old = await initdb( - jid, - sqlite.get_settings_value, - "old" - ) - if old: - await task.clean_tasks_xmpp( - jid, - ["status"] - ) - # await send_status(jid) - await task.start_tasks_xmpp( - self, - jid, - ["status"] - ) - else: - await initdb( - jid, - sqlite.mark_source_as_read, - url - ) - else: - action = "Missing URL." - case _ if message_lowercase.startswith("allow +"): - key = "filter-" + message_text[:5] - val = message_text[7:] - if val: - keywords = await initdb( - jid, - sqlite.get_filters_value, - key - ) - val = await add_to_list( - val, - keywords - ) - await initdb( - jid, - sqlite.set_filters_value, - [key, val] - ) - action = ( - "Approved keywords\n" - "```\n{}\n```" - ).format(val) - else: - action = "Missing keywords." - case _ if message_lowercase.startswith("allow -"): - key = "filter-" + message_text[:5] - val = message_text[7:] - if val: - keywords = await initdb( - jid, - sqlite.get_filters_value, - key - ) - val = await remove_from_list( - val, - keywords - ) - await initdb( - jid, - sqlite.set_filters_value, - [key, val] - ) - action = ( - "Approved keywords\n" - "```\n{}\n```" - ).format(val) - else: - action = "Missing keywords." - case _ if message_lowercase.startswith("archive"): - key = message_text[:7] - val = message_text[8:] - if val: - try: - if int(val) > 500: - action = "Value may not be greater than 500." - else: - await initdb( - jid, - sqlite.set_settings_value, - [key, val] - ) - action = ( - "Maximum archived items has been set to {}." - ).format(val) - except: - action = "Enter a numeric value only." - else: - action = "Missing value." - case _ if message_lowercase.startswith("deny +"): - key = "filter-" + message_text[:4] - val = message_text[6:] - if val: - keywords = await initdb( - jid, - sqlite.get_filters_value, - key - ) - val = await add_to_list( - val, - keywords - ) - await initdb( - jid, - sqlite.set_filters_value, - [key, val] - ) - action = ( - "Rejected keywords\n" - "```\n{}\n```" - ).format(val) - else: - action = "Missing keywords." - case _ if message_lowercase.startswith("deny -"): - key = "filter-" + message_text[:4] - val = message_text[6:] - if val: - keywords = await initdb( - jid, - sqlite.get_filters_value, - key - ) - val = await remove_from_list( - val, - keywords - ) - await initdb( - jid, - sqlite.set_filters_value, - [key, val] - ) - action = ( - "Rejected keywords\n" - "```\n{}\n```" - ).format(val) - else: - action = "Missing keywords." - case _ if (message_lowercase.startswith("gemini") or - message_lowercase.startswith("gopher:")): - action = "Gemini and Gopher are not supported yet." - case _ if (message_lowercase.startswith("http") or - message_lowercase.startswith("feed:")): - url = message_text - await task.clean_tasks_xmpp( - jid, - ["status"] +def list_feeds_by_query(query, results): + results_list = ( + "Feeds containing '{}':\n\n```" + ).format(query) + counter = 0 + for result in results: + counter += 1 + results_list += ( + "\nName : {}" + "\nURL : {}" + "\nIndex : {}" + "\nMode : {}" + "\n" + ).format(str(result[0]), str(result[1]), + str(result[2]), str(result[3])) + if counter: + return results_list + "\n```\nTotal of {} feeds".format(counter) + else: + return "No feeds were found for: {}".format(query) + + +def list_statistics(values): + """ + Return table statistics. + + Parameters + ---------- + db_file : str + Path to database file. + + Returns + ------- + msg : str + Statistics as message. + """ + msg = ( + "```" + "\nSTATISTICS\n" + "News items : {}/{}\n" + "News sources : {}/{}\n" + "\nOPTIONS\n" + "Items to archive : {}\n" + "Update interval : {}\n" + "Items per update : {}\n" + "Operation status : {}\n" + "```" + ).format(values[0], values[1], values[2], values[3], + values[4], values[5], values[6], values[7]) + return msg + + +async def list_last_entries(results, num): + titles_list = "Recent {} titles:\n\n```".format(num) + counter = 0 + for result in results: + counter += 1 + titles_list += ( + "\n{}\n{}\n" + ).format(str(result[0]), str(result[1])) + if counter: + titles_list += "```\n" + return titles_list + else: + return "There are no news at the moment." + + +async def list_feeds(results): + feeds_list = "\nList of subscriptions:\n\n```\n" + counter = 0 + for result in results: + counter += 1 + feeds_list += ( + "Name : {}\n" + "Address : {}\n" + "Updated : {}\n" + "Status : {}\n" + "ID : {}\n" + "\n" + ).format(str(result[0]), str(result[1]), str(result[2]), + str(result[3]), str(result[4])) + if counter: + return feeds_list + ( + "```\nTotal of {} subscriptions.\n" + ).format(counter) + else: + msg = ( + "List of subscriptions is empty.\n" + "To add feed, send a URL\n" + "Try these:\n" + # TODO Pick random from featured/recommended + "https://reclaimthenet.org/feed/" + ) + return msg + + +async def list_bookmarks(self): + conferences = bookmark.get(self) + groupchat_list = "\nList of groupchats:\n\n```\n" + counter = 0 + for conference in conferences: + counter += 1 + groupchat_list += ( + "{}\n" + "\n" + ).format( + conference["jid"] ) - status_message = ( - "πŸ“«οΈ Processing request to fetch data from {}" - ).format(url) - status.process_task_message(self, jid, status_message) - if url.startswith("feed:"): - url = uri.feed_to_http(url) - # url_alt = await uri.replace_hostname(url, "feed") - # if url_alt: - # url = url_alt - url = (await uri.replace_hostname(url, "feed")) or url - action = await initdb( - jid, - fetcher.add_feed, - url - ) - await task.start_tasks_xmpp( - self, - jid, - ["status"] - ) - # action = "> " + message + "\n" + action - # FIXME Make the taskhandler to update status message - # await refresh_task( - # self, - # jid, - # send_status, - # "status", - # 20 - # ) - # NOTE This would show the number of new unread entries - old = await initdb( - jid, - sqlite.get_settings_value, - "old" - ) - if old: - await task.clean_tasks_xmpp( - jid, - ["status"] - ) - # await send_status(jid) - await task.start_tasks_xmpp( - self, - jid, - ["status"] - ) - else: - await initdb( - jid, - sqlite.mark_source_as_read, - url - ) - case _ if message_lowercase.startswith("feeds"): - query = message_text[6:] - if query: - if len(query) > 3: - action = await initdb( - jid, - sqlite.search_feeds, - query - ) - else: - action = ( - "Enter at least 4 characters to search" - ) - else: - action = await initdb( - jid, - sqlite.list_feeds - ) - case "goodbye": - if message["type"] == "groupchat": - await self.close_muc(jid) - else: - action = "This command is valid for groupchat only." - case _ if message_lowercase.startswith("interval"): - # FIXME - # The following error occurs only upon first attempt to set interval. - # /usr/lib/python3.11/asyncio/events.py:73: RuntimeWarning: coroutine 'Slixfeed.send_update' was never awaited - # self._args = None - # RuntimeWarning: Enable tracemalloc to get the object allocation traceback - key = message_text[:8] - val = message_text[9:] - if val: - # action = ( - # "Updates will be sent every {} minutes." - # ).format(action) - await initdb( - jid, - sqlite.set_settings_value, - [key, val] - ) - # NOTE Perhaps this should be replaced - # by functions clean and start - await task.refresh_task( - self, - jid, - task.send_update, - key, - val - ) - action = ( - "Updates will be sent every {} minutes." - ).format(val) - else: - action = "Missing value." - case _ if message_lowercase.startswith("join"): - muc = uri.check_xmpp_uri(message_text[5:]) - if muc: - "TODO probe JID and confirm it's a groupchat" - await self.join_muc(jid, muc) - action = ( - "Joined groupchat {}" - ).format(message_text) - else: - action = ( - "> {}\nXMPP URI is not valid." - ).format(message_text) - case _ if message_lowercase.startswith("length"): - key = message_text[:6] - val = message_text[7:] - if val: - try: - val = int(val) - await initdb( - jid, - sqlite.set_settings_value, - [key, val] - ) - if val == 0: - action = ( - "Summary length limit is disabled." - ) - else: - action = ( - "Summary maximum length " - "is set to {} characters." - ).format(val) - except: - action = "Enter a numeric value only." - else: - action = "Missing value." - # case _ if message_lowercase.startswith("mastership"): - # key = message_text[:7] - # val = message_text[11:] - # if val: - # names = await initdb( - # jid, - # get_settings_value, - # key - # ) - # val = await add_to_list( - # val, - # names - # ) - # await initdb( - # jid, - # set_settings_value, - # [key, val] - # ) - # action = ( - # "Operators\n" - # "```\n{}\n```" - # ).format(val) - # else: - # action = "Missing value." - case "new": - await initdb( - jid, - sqlite.set_settings_value, - ["old", 0] - ) - action = ( - "Only new items of newly added feeds will be sent." - ) - # TODO Will you add support for number of messages? - case "next": - # num = message_text[5:] - await task.clean_tasks_xmpp( - jid, - ["interval", "status"] - ) - await task.start_tasks_xmpp( - self, - jid, - ["interval", "status"] - ) - # await refresh_task( - # self, - # jid, - # send_update, - # "interval", - # num - # ) - # await refresh_task( - # self, - # jid, - # send_status, - # "status", - # 20 - # ) - # await refresh_task(jid, key, val) - case "old": - await initdb( - jid, - sqlite.set_settings_value, - ["old", 1] - ) - action = ( - "All items of newly added feeds will be sent." - ) - case _ if message_lowercase.startswith("quantum"): - key = message_text[:7] - val = message_text[8:] - if val: - try: - val = int(val) - # action = ( - # "Every update will contain {} news items." - # ).format(action) - await initdb( - jid, - sqlite.set_settings_value, - [key, val] - ) - action = ( - "Next update will contain {} news items." - ).format(val) - except: - action = "Enter a numeric value only." - else: - action = "Missing value." - case "random": - # TODO /questions/2279706/select-random-row-from-a-sqlite-table - # NOTE sqlitehandler.get_entry_unread - action = "Updates will be sent by random order." - case _ if message_lowercase.startswith("read"): - data = message_text[5:] - data = data.split() - url = data[0] - await task.clean_tasks_xmpp( - jid, - ["status"] - ) - status_message = ( - "πŸ“«οΈ Processing request to fetch data from {}" - ).format(url) - status.process_task_message(self, jid, status_message) - if url.startswith("feed:"): - url = uri.feed_to_http(url) - url = (await uri.replace_hostname(url, "feed")) or url - match len(data): - case 1: - if url.startswith("http"): - action = await fetcher.view_feed(url) - else: - action = "Missing URL." - case 2: - num = data[1] - if url.startswith("http"): - action = await fetcher.view_entry(url, num) - else: - action = "Missing URL." - case _: - action = ( - "Enter command as follows:\n" - "`read ` or `read `\n" - "URL must not contain white space." - ) - await task.start_tasks_xmpp( - self, - jid, - ["status"] - ) - case _ if message_lowercase.startswith("recent"): - num = message_text[7:] - if num: - try: - num = int(num) - if num < 1 or num > 50: - action = "Value must be ranged from 1 to 50." - else: - action = await initdb( - jid, - sqlite.last_entries, - num - ) - except: - action = "Enter a numeric value only." - else: - action = "Missing value." - # NOTE Should people be asked for numeric value? - case _ if message_lowercase.startswith("remove"): - ix = message_text[7:] - if ix: - action = await initdb( - jid, - sqlite.remove_feed, - ix - ) - # await refresh_task( - # self, - # jid, - # send_status, - # "status", - # 20 - # ) - await task.clean_tasks_xmpp( - jid, - ["status"] - ) - await task.start_tasks_xmpp( - self, - jid, - ["status"] - ) - else: - action = "Missing feed ID." - case _ if message_lowercase.startswith("reset"): - source = message_text[6:] - await task.clean_tasks_xmpp( - jid, - ["status"] - ) - status_message = ( - "πŸ“«οΈ Marking entries as read..." - ) - status.process_task_message(self, jid, status_message) - if source: - await initdb( - jid, - sqlite.mark_source_as_read, - source - ) - action = ( - "All entries of {} have been " - "marked as read.".format(source) - ) - else: - await initdb( - jid, - sqlite.mark_all_as_read - ) - action = "All entries have been marked as read." - await task.start_tasks_xmpp( - self, - jid, - ["status"] - ) - case _ if message_lowercase.startswith("search"): - query = message_text[7:] - if query: - if len(query) > 1: - action = await initdb( - jid, - sqlite.search_entries, - query - ) - else: - action = ( - "Enter at least 2 characters to search" - ) - else: - action = "Missing search query." - case "start": - # action = "Updates are enabled." - key = "enabled" - val = 1 - await initdb( - jid, - sqlite.set_settings_value, - [key, val] - ) - # asyncio.create_task(task_jid(self, jid)) - await task.start_tasks_xmpp( - self, - jid, - ["interval", "status", "check"] - ) - action = "Updates are enabled." - # print(current_time(), "task_manager[jid]") - # print(task_manager[jid]) - case "stats": - action = await initdb( - jid, - sqlite.statistics - ) - case _ if message_lowercase.startswith("status "): - ix = message_text[7:] - action = await initdb( - jid, - sqlite.toggle_status, - ix - ) - case "stop": - # FIXME - # The following error occurs only upon first attempt to stop. - # /usr/lib/python3.11/asyncio/events.py:73: RuntimeWarning: coroutine 'Slixfeed.send_update' was never awaited - # self._args = None - # RuntimeWarning: Enable tracemalloc to get the object allocation traceback - # action = "Updates are disabled." - # try: - # # task_manager[jid]["check"].cancel() - # # task_manager[jid]["status"].cancel() - # task_manager[jid]["interval"].cancel() - # key = "enabled" - # val = 0 - # action = await initdb( - # jid, - # set_settings_value, - # [key, val] - # ) - # except: - # action = "Updates are already disabled." - # # print("Updates are already disabled. Nothing to do.") - # # await send_status(jid) - key = "enabled" - val = 0 - await initdb( - jid, - sqlite.set_settings_value, - [key, val] - ) - await task.clean_tasks_xmpp( - jid, - ["interval", "status"] - ) - self.send_presence( - pshow="xa", - pstatus="πŸ’‘οΈ Send \"Start\" to receive Jabber news", - pto=jid, - ) - action = "Updates are disabled." - case "support": - # TODO Send an invitation. - action = "Join xmpp:slixfeed@chat.woodpeckersnest.space?join" - case _ if message_lowercase.startswith("xmpp:"): - muc = uri.check_xmpp_uri(message_text) - if muc: - "TODO probe JID and confirm it's a groupchat" - await self.join_muc(jid, muc) - action = ( - "Joined groupchat {}" - ).format(message_text) - else: - action = ( - "> {}\nXMPP URI is not valid." - ).format(message_text) - case _: - action = ( - "Unknown command. " - "Press \"help\" for list of commands" - ) - # TODO Use message correction here - # NOTE This might not be a good idea if - # commands are sent one close to the next - if action: message.reply(action).send() + groupchat_list += ( + "```\nTotal of {} groupchats.\n" + ).format(counter) + return groupchat_list \ No newline at end of file diff --git a/slixfeed/xmpp/connect.py b/slixfeed/xmpp/connect.py index 9e2aabc..831b5b2 100644 --- a/slixfeed/xmpp/connect.py +++ b/slixfeed/xmpp/connect.py @@ -1,11 +1,15 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +from slixfeed.config import get_value from slixfeed.datetime import current_time from time import sleep +import logging -async def recover_connection(self, event): +async def recover_connection(self, event, message): + logging.warning(message) + print(current_time(), message, "Attempting to reconnect.") self.connection_attempts += 1 # if self.connection_attempts <= self.max_connection_attempts: # self.reconnect(wait=5.0) # wait a bit before attempting to reconnect @@ -13,7 +17,7 @@ async def recover_connection(self, event): # print(current_time(),"Maximum connection attempts exceeded.") # logging.error("Maximum connection attempts exceeded.") print(current_time(), "Attempt number", self.connection_attempts) - seconds = 30 + seconds = int(get_value("accounts", "XMPP Connect", "reconnect_timeout")) print(current_time(), "Next attempt within", seconds, "seconds") # NOTE asyncio.sleep doesn't interval as expected # await asyncio.sleep(seconds) diff --git a/slixfeed/xmpp/muc.py b/slixfeed/xmpp/muc.py index 62727d0..8c3d4ba 100644 --- a/slixfeed/xmpp/muc.py +++ b/slixfeed/xmpp/muc.py @@ -14,8 +14,39 @@ TODO """ import slixfeed.xmpp.bookmark as bookmark +import slixfeed.xmpp.process as process +from slixfeed.datetime import current_time -async def join_groupchat(self, inviter, muc_jid): +# async def accept_muc_invite(self, message, ctr=None): +# # if isinstance(message, str): +# if not ctr: +# ctr = message["from"].bare +# jid = message['groupchat_invite']['jid'] +# else: +# jid = message +async def accept_invitation(self, message): + # operator muc_chat + inviter = message["from"].bare + muc_jid = message['groupchat_invite']['jid'] + await join(self, inviter, muc_jid) + + +async def autojoin(self, event): + result = await self.plugin['xep_0048'].get_bookmarks() + bookmarks = result["private"]["bookmarks"] + conferences = bookmarks["conferences"] + for conference in conferences: + if conference["autojoin"]: + muc_jid = conference["jid"] + print(current_time(), "Autojoining groupchat", muc_jid) + self.plugin['xep_0045'].join_muc( + muc_jid, + self.nick, + # If a room password is needed, use: + # password=the_room_password, + ) + +async def join(self, inviter, muc_jid): # token = await initdb( # muc_jid, # get_settings_value, @@ -43,23 +74,10 @@ async def join_groupchat(self, inviter, muc_jid): # password=the_room_password, ) await bookmark.add(self, muc_jid) - messages = [ - "Greetings!", - "I'm {}, the news anchor.".format(self.nick), - "My job is to bring you the latest news " - "from sources you provide me with.", - "You may always reach me via " - "xmpp:{}?message".format(self.boundjid.bare) - ] - for message in messages: - self.send_message( - mto=muc_jid, - mbody=message, - mtype="groupchat" - ) + process.greet(self, muc_jid, chat_type="groupchat") -async def close_groupchat(self, muc_jid): +async def leave(self, muc_jid): messages = [ "Whenever you need an RSS service again, " "please don’t hesitate to contact me.", diff --git a/slixfeed/xmpp/process.py b/slixfeed/xmpp/process.py new file mode 100644 index 0000000..27e5441 --- /dev/null +++ b/slixfeed/xmpp/process.py @@ -0,0 +1,757 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" + +TODO + +1) Deprecate "add" (see above) and make it interactive. + Slixfeed: Do you still want to add this URL to subscription list? + See: case _ if message_lowercase.startswith("add"): + +2) If subscription is inadequate (see state.request), send a message that says so. + + elif not self.client_roster[jid]["to"]: + breakpoint() + message.reply("Share online status to activate bot.").send() + return + +""" + +import os +from slixfeed.config import ( + add_to_list, + get_default_dbdir, + get_value, + get_pathname_to_database, + remove_from_list) +from slixfeed.datetime import current_time +import slixfeed.fetch as fetcher +import slixfeed.sqlite as sqlite +import slixfeed.task as task +import slixfeed.utility as utility +import slixfeed.url as uri +import slixfeed.xmpp.bookmark as bookmark +import slixfeed.xmpp.compose as compose +import slixfeed.xmpp.muc as groupchat +import slixfeed.xmpp.status as status +import slixfeed.xmpp.text as text + + +async def event(self, event): + """ + Process the session_start event. + + Typical actions for the session_start event are + requesting the roster and broadcasting an initial + presence stanza. + + Arguments: + event -- An empty dictionary. The session_start + event does not provide any additional + data. + """ + self.send_presence() + await self.get_roster() + + # for task in main_task: + # task.cancel() + + # Deprecated in favour of event "presence_available" + # if not main_task: + # await select_file() + + +async def message(self, message): + """ + Process incoming message stanzas. Be aware that this also + includes MUC messages and error messages. It is usually + a good practice to check the messages's type before + processing or sending replies. + + Parameters + ---------- + message : str + The received message stanza. See the documentation + for stanza objects and the Message stanza to see + how it may be used. + """ + # print("message") + # print(message) + if message["type"] in ("chat", "groupchat", "normal"): + jid = message["from"].bare + if message["type"] == "groupchat": + # nick = message["from"][message["from"].index("/")+1:] + nick = str(message["from"]) + nick = nick[nick.index("/")+1:] + if (message['muc']['nick'] == self.nick or + not message["body"].startswith("!")): + return + # token = await initdb( + # jid, + # get_settings_value, + # "token" + # ) + # if token == "accepted": + # operator = await initdb( + # jid, + # get_settings_value, + # "masters" + # ) + # if operator: + # if nick not in operator: + # return + # approved = False + jid_full = str(message["from"]) + role = self.plugin['xep_0045'].get_jid_property( + jid, + jid_full[jid_full.index("/")+1:], + "role") + if role != "moderator": + return + # if role == "moderator": + # approved = True + # TODO Implement a list of temporary operators + # Once an operator is appointed, the control would last + # untile the participant has been disconnected from MUC + # An operator is a function to appoint non moderators. + # Changing nickname is fine and consist of no problem. + # if not approved: + # operator = await initdb( + # jid, + # get_settings_value, + # "masters" + # ) + # if operator: + # if nick in operator: + # approved = True + # if not approved: + # return + + # # Begin processing new JID + # # Deprecated in favour of event "presence_available" + # db_dir = get_default_dbdir() + # os.chdir(db_dir) + # if jid + ".db" not in os.listdir(): + # await task_jid(jid) + + # await compose.message(self, jid, message) + + message_text = " ".join(message["body"].split()) + if message["type"] == "groupchat": + message_text = message_text[1:] + message_lowercase = message_text.lower() + + print(current_time(), "ACCOUNT: " + str(message["from"])) + print(current_time(), "COMMAND:", message_text) + + match message_lowercase: + case "breakpoint": + breakpoint() + case "commands": + response = text.print_cmd() + send_reply_message(self, message, response) + case "help": + response = text.print_help() + send_reply_message(self, message, response) + case "info": + response = text.print_info() + send_reply_message(self, message, response) + case _ if message_lowercase in [ + "greetings", "hallo", "hello", "hey", + "hi", "hola", "holla", "hollo"]: + response = ( + "Greeting!\n" + "I'm Slixfeed, an RSS News Bot!\n" + "Send \"help\" for instructions.\n" + ) + send_reply_message(self, message, response) + # print("task_manager[jid]") + # print(task_manager[jid]) + await self.get_roster() + print("roster 1") + print(self.client_roster) + print("roster 2") + print(self.client_roster.keys()) + print("jid") + print(jid) + + # case _ if message_lowercase.startswith("activate"): + # if message["type"] == "groupchat": + # acode = message[9:] + # token = await initdb( + # jid, + # get_settings_value, + # "token" + # ) + # if int(acode) == token: + # await initdb( + # jid, + # set_settings_value, + # ["masters", nick] + # ) + # await initdb( + # jid, + # set_settings_value, + # ["token", "accepted"] + # ) + # response = "{}, your are in command.".format(nick) + # else: + # response = "Activation code is not valid." + # else: + # response = "This command is valid for groupchat only." + case _ if message_lowercase.startswith("add"): + message_text = message_text[4:] + url = message_text.split(" ")[0] + title = " ".join(message_text.split(" ")[1:]) + if url.startswith("http"): + db_file = get_pathname_to_database(jid) + response = await fetcher.add_feed_no_check(db_file, [url, title]) + old = await sqlite.get_settings_value(db_file, "old") + if old: + await task.clean_tasks_xmpp(jid, ["status"]) + # await send_status(jid) + await task.start_tasks_xmpp(self, jid, ["status"]) + else: + db_file = get_pathname_to_database(jid) + await sqlite.mark_source_as_read(db_file, url) + else: + response = "Missing URL." + send_reply_message(self, message, response) + case _ if message_lowercase.startswith("allow +"): + key = "filter-" + message_text[:5] + val = message_text[7:] + if val: + db_file = get_pathname_to_database(jid) + keywords = await sqlite.get_filters_value(db_file, key) + val = await add_to_list(val, keywords) + await sqlite.set_filters_value(db_file, [key, val]) + response = ( + "Approved keywords\n" + "```\n{}\n```" + ).format(val) + else: + response = "Missing keywords." + send_reply_message(self, message, response) + case _ if message_lowercase.startswith("allow -"): + key = "filter-" + message_text[:5] + val = message_text[7:] + if val: + db_file = get_pathname_to_database(jid) + keywords = await sqlite.get_filters_value(db_file, key) + val = await remove_from_list(val, keywords) + await sqlite.set_filters_value(db_file, [key, val]) + response = ( + "Approved keywords\n" + "```\n{}\n```" + ).format(val) + else: + response = "Missing keywords." + send_reply_message(self, message, response) + case _ if message_lowercase.startswith("archive"): + key = message_text[:7] + val = message_text[8:] + if val: + try: + if int(val) > 500: + response = "Value may not be greater than 500." + else: + db_file = get_pathname_to_database(jid) + await sqlite.set_settings_value(db_file, [key, val]) + response = ( + "Maximum archived items has been set to {}." + ).format(val) + except: + response = "Enter a numeric value only." + else: + response = "Missing value." + send_reply_message(self, message, response) + case _ if message_lowercase.startswith("bookmark - "): + if jid == get_value("accounts", "XMPP", "operator"): + muc_jid = message_text[11:] + await bookmark.remove(self, muc_jid) + response = ( + "Groupchat {} has been removed from bookmarks." + ).format(muc_jid) + else: + response = ( + "This response is restricted. " + "Type: removing bookmarks." + ) + send_reply_message(self, message, response) + case "bookmarks": + response = await compose.list_bookmarks(self) + send_reply_message(self, message, response) + case _ if message_lowercase.startswith("deny +"): + key = "filter-" + message_text[:4] + val = message_text[6:] + if val: + db_file = get_pathname_to_database(jid) + keywords = await sqlite.get_filters_value(db_file, key) + val = await add_to_list(val, keywords) + await sqlite.set_filters_value(db_file, [key, val]) + response = ( + "Rejected keywords\n" + "```\n{}\n```" + ).format(val) + else: + response = "Missing keywords." + send_reply_message(self, message, response) + case _ if message_lowercase.startswith("deny -"): + key = "filter-" + message_text[:4] + val = message_text[6:] + if val: + db_file = get_pathname_to_database(jid) + keywords = await sqlite.get_filters_value(db_file, key) + val = await remove_from_list(val, keywords) + await sqlite.set_filters_value(db_file, [key, val]) + response = ( + "Rejected keywords\n" + "```\n{}\n```" + ).format(val) + else: + response = "Missing keywords." + send_reply_message(self, message, response) + case _ if (message_lowercase.startswith("gemini") or + message_lowercase.startswith("gopher:")): + response = "Gemini and Gopher are not supported yet." + send_reply_message(self, message, response) + case _ if (message_lowercase.startswith("http") or + message_lowercase.startswith("feed:")): + url = message_text + await task.clean_tasks_xmpp(jid, ["status"]) + status_type = "dnd" + status_message = ( + "πŸ“«οΈ Processing request to fetch data from {}" + ).format(url) + send_status_message(self, jid, status_type, status_message) + send_reply_message(self, message, response) + if url.startswith("feed:"): + url = uri.feed_to_http(url) + # url_alt = await uri.replace_hostname(url, "feed") + # if url_alt: + # url = url_alt + url = (uri.replace_hostname(url, "feed")) or url + db_file = get_pathname_to_database(jid) + response = await fetcher.add_feed(db_file, url) + await task.start_tasks_xmpp(self, jid, ["status"]) + # response = "> " + message + "\n" + response + # FIXME Make the taskhandler to update status message + # await refresh_task( + # self, + # jid, + # send_status, + # "status", + # 20 + # ) + # NOTE This would show the number of new unread entries + old = await sqlite.get_settings_value(db_file, "old") + if old: + await task.clean_tasks_xmpp(jid, ["status"]) + # await send_status(jid) + await task.start_tasks_xmpp(self, jid, ["status"]) + else: + db_file = get_pathname_to_database(jid) + await sqlite.mark_source_as_read(db_file, url) + send_reply_message(self, message, response) + case _ if message_lowercase.startswith("feeds"): + query = message_text[6:] + if query: + if len(query) > 3: + db_file = get_pathname_to_database(jid) + result = await sqlite.search_feeds(db_file, query) + response = compose.list_feeds_by_query(query, result) + else: + response = ( + "Enter at least 4 characters to search" + ) + else: + db_file = get_pathname_to_database(jid) + result = await sqlite.get_feeds(db_file) + response = compose.list_feeds(result) + send_reply_message(self, message, response) + case "goodbye": + if message["type"] == "groupchat": + await groupchat.leave(self, jid) + else: + response = "This command is valid for groupchat only." + send_reply_message(self, message, response) + case _ if message_lowercase.startswith("interval"): + # FIXME + # The following error occurs only upon first attempt to set interval. + # /usr/lib/python3.11/asyncio/events.py:73: RuntimeWarning: coroutine 'Slixfeed.send_update' was never awaited + # self._args = None + # RuntimeWarning: Enable tracemalloc to get the object allocation traceback + key = message_text[:8] + val = message_text[9:] + if val: + # response = ( + # "Updates will be sent every {} minutes." + # ).format(response) + db_file = get_pathname_to_database(jid) + await sqlite.set_settings_value(db_file, [key, val]) + # NOTE Perhaps this should be replaced + # by functions clean and start + await task.refresh_task( + self, jid, task.send_update, key, val) + response = ( + "Updates will be sent every {} minutes." + ).format(val) + else: + response = "Missing value." + send_reply_message(self, message, response) + case _ if message_lowercase.startswith("join"): + muc_jid = uri.check_xmpp_uri(message_text[5:]) + if muc_jid: + # TODO probe JID and confirm it's a groupchat + await groupchat.join(self, jid, muc_jid) + response = ( + "Joined groupchat {}" + ).format(message_text) + else: + response = ( + "> {}\nXMPP URI is not valid." + ).format(message_text) + send_reply_message(self, message, response) + case _ if message_lowercase.startswith("length"): + key = message_text[:6] + val = message_text[7:] + if val: + try: + val = int(val) + db_file = get_pathname_to_database(jid) + await sqlite.set_settings_value(db_file, [key, val]) + if val == 0: + response = ( + "Summary length limit is disabled." + ) + else: + response = ( + "Summary maximum length " + "is set to {} characters." + ).format(val) + except: + response = "Enter a numeric value only." + else: + response = "Missing value." + # case _ if message_lowercase.startswith("mastership"): + # key = message_text[:7] + # val = message_text[11:] + # if val: + # names = await initdb( + # jid, + # get_settings_value, + # key + # ) + # val = await add_to_list( + # val, + # names + # ) + # await initdb( + # jid, + # set_settings_value, + # [key, val] + # ) + # response = ( + # "Operators\n" + # "```\n{}\n```" + # ).format(val) + # else: + # response = "Missing value." + send_reply_message(self, message, response) + case "new": + db_file = get_pathname_to_database(jid) + sqlite.set_settings_value(db_file, ["old", 0]) + response = ( + "Only new items of newly added feeds will be sent." + ) + send_reply_message(self, message, response) + # TODO Will you add support for number of messages? + case "next": + # num = message_text[5:] + await task.clean_tasks_xmpp(jid, ["interval", "status"]) + await task.start_tasks_xmpp(self, jid, ["interval", "status"]) + # await refresh_task( + # self, + # jid, + # send_update, + # "interval", + # num + # ) + # await refresh_task( + # self, + # jid, + # send_status, + # "status", + # 20 + # ) + # await refresh_task(jid, key, val) + case "old": + db_file = get_pathname_to_database(jid) + await sqlite.set_settings_value(db_file, ["old", 1]) + response = ( + "All items of newly added feeds will be sent." + ) + send_reply_message(self, message, response) + case _ if message_lowercase.startswith("quantum"): + key = message_text[:7] + val = message_text[8:] + if val: + try: + val = int(val) + # response = ( + # "Every update will contain {} news items." + # ).format(response) + db_file = get_pathname_to_database(jid) + await sqlite.set_settings_value(db_file, [key, val]) + response = ( + "Next update will contain {} news items." + ).format(val) + except: + response = "Enter a numeric value only." + else: + response = "Missing value." + send_reply_message(self, message, response) + case "random": + # TODO /questions/2279706/select-random-row-from-a-sqlite-table + # NOTE sqlitehandler.get_entry_unread + response = "Updates will be sent by random order." + send_reply_message(self, message, response) + case _ if message_lowercase.startswith("read"): + data = message_text[5:] + data = data.split() + url = data[0] + await task.clean_tasks_xmpp(jid, ["status"]) + status_type = "dnd" + status_message = ( + "πŸ“«οΈ Processing request to fetch data from {}" + ).format(url) + send_status_message(self, jid, status_type, status_message) + if url.startswith("feed:"): + url = uri.feed_to_http(url) + url = (uri.replace_hostname(url, "feed")) or url + match len(data): + case 1: + if url.startswith("http"): + response = await fetcher.view_feed(url) + else: + response = "Missing URL." + case 2: + num = data[1] + if url.startswith("http"): + response = await fetcher.view_entry(url, num) + else: + response = "Missing URL." + case _: + response = ( + "Enter command as follows:\n" + "`read ` or `read `\n" + "URL must not contain white space." + ) + send_reply_message(self, message, response) + await task.start_tasks_xmpp(self, jid, ["status"]) + case _ if message_lowercase.startswith("recent"): + num = message_text[7:] + if num: + try: + num = int(num) + if num < 1 or num > 50: + response = "Value must be ranged from 1 to 50." + else: + db_file = get_pathname_to_database(jid) + result = await sqlite.last_entries(db_file, num) + response = compose.list_last_entries(result, num) + except: + response = "Enter a numeric value only." + else: + response = "Missing value." + send_reply_message(self, message, response) + # NOTE Should people be asked for numeric value? + case _ if message_lowercase.startswith("remove"): + ix = message_text[7:] + if ix: + db_file = get_pathname_to_database(jid) + response = await sqlite.remove_feed(db_file, ix) + # await refresh_task( + # self, + # jid, + # send_status, + # "status", + # 20 + # ) + await task.clean_tasks_xmpp(jid, ["status"]) + await task.start_tasks_xmpp(self, jid, ["status"]) + else: + response = "Missing feed ID." + send_reply_message(self, message, response) + case _ if message_lowercase.startswith("reset"): + source = message_text[6:] + await task.clean_tasks_xmpp(jid, ["status"]) + status_type = "dnd" + status_message = "πŸ“«οΈ Marking entries as read..." + send_status_message(self, jid, status_type, status_message) + if source: + db_file = get_pathname_to_database(jid) + await sqlite.mark_source_as_read(db_file, source) + response = ( + "All entries of {} have been " + "marked as read.".format(source) + ) + else: + db_file = get_pathname_to_database(jid) + await sqlite.mark_all_as_read(db_file) + response = "All entries have been marked as read." + send_reply_message(self, message, response) + await task.start_tasks_xmpp(self, jid, ["status"]) + case _ if message_lowercase.startswith("search"): + query = message_text[7:] + if query: + if len(query) > 1: + db_file = get_pathname_to_database(jid) + results = await sqlite.search_entries(db_file, query) + response = compose.list_search_results(query, results) + else: + response = ( + "Enter at least 2 characters to search" + ) + else: + response = "Missing search query." + send_reply_message(self, message, response) + case "start": + # response = "Updates are enabled." + key = "enabled" + val = 1 + db_file = get_pathname_to_database(jid) + await sqlite.set_settings_value(db_file, [key, val]) + # asyncio.create_task(task_jid(self, jid)) + await task.start_tasks_xmpp(self, jid, ["interval", "status", "check"]) + response = "Updates are enabled." + # print(current_time(), "task_manager[jid]") + # print(task_manager[jid]) + send_reply_message(self, message, response) + case "stats": + db_file = get_pathname_to_database(jid) + result = await sqlite.statistics(db_file) + response = compose.list_statistics(result) + send_reply_message(self, message, response) + case _ if message_lowercase.startswith("status "): + ix = message_text[7:] + db_file = get_pathname_to_database(jid) + response = await sqlite.toggle_status(db_file, ix) + send_reply_message(self, message, response) + case "stop": + # FIXME + # The following error occurs only upon first attempt to stop. + # /usr/lib/python3.11/asyncio/events.py:73: RuntimeWarning: coroutine 'Slixfeed.send_update' was never awaited + # self._args = None + # RuntimeWarning: Enable tracemalloc to get the object allocation traceback + # response = "Updates are disabled." + # try: + # # task_manager[jid]["check"].cancel() + # # task_manager[jid]["status"].cancel() + # task_manager[jid]["interval"].cancel() + # key = "enabled" + # val = 0 + # response = await initdb( + # jid, + # set_settings_value, + # [key, val] + # ) + # except: + # response = "Updates are already disabled." + # # print("Updates are already disabled. Nothing to do.") + # # await send_status(jid) + key = "enabled" + val = 0 + db_file = get_pathname_to_database(jid) + await sqlite.set_settings_value(db_file, [key, val]) + await task.clean_tasks_xmpp(jid, ["interval", "status"]) + response = "Updates are disabled." + send_reply_message(self, message, response) + status_type = "xa" + status_message = "πŸ’‘οΈ Send \"Start\" to receive Jabber updates" + send_status_message(self, jid, status_type, status_message) + case "support": + # TODO Send an invitation. + response = ( + "Join xmpp:slixfeed@chat.woodpeckersnest.space?join") + send_reply_message(self, message, response) + case _ if message_lowercase.startswith("xmpp:"): + muc_jid = uri.check_xmpp_uri(message_text) + if muc_jid: + # TODO probe JID and confirm it's a groupchat + await groupchat.join(self, jid, muc_jid) + response = ( + "Joined groupchat {}" + ).format(message_text) + else: + response = ( + "> {}\nXMPP URI is not valid." + ).format(message_text) + send_reply_message(self, message, response) + case _: + response = ( + "Unknown command. " + "Press \"help\" for list of commands" + ) + send_reply_message(self, message, response) + # TODO Use message correction here + # NOTE This might not be a good idea if + # commands are sent one close to the next + # if response: message.reply(response).send() + + log_dir = get_default_dbdir() + if not os.path.isdir(log_dir): + os.mkdir(log_dir) + utility.log_as_markdown( + current_time(), os.path.join(log_dir, jid), + jid, message_text) + utility.log_as_markdown( + current_time(), os.path.join(log_dir, jid), + self.boundjid.bare, response) + + +def send_status_message(self, jid, status_type, status_message): + self.send_presence( + pshow=status_type, + pstatus=status_message, + pto=jid) + + +def send_reply_message(self, message, response): + message.reply(response).send() + + +# def greet(self, jid, chat_type="chat"): +# messages = [ +# "Greetings!", +# "I'm {}, the news anchor.".format(self.nick), +# "My job is to bring you the latest news " +# "from sources you provide me with.", +# "You may always reach me via " +# "xmpp:{}?message".format(self.boundjid.bare) +# ] +# for message in messages: +# self.send_message( +# mto=jid, +# mbody=message, +# mtype=chat_type +# ) + + +def greet(self, jid, chat_type="chat"): + message = ( + "Greetings!\n" + "I'm {}, the news anchor.\n" + "My job is to bring you the latest " + "news from sources you provide me with.\n" + "You may always reach me via xmpp:{}?message").format( + self.nick, + self.boundjid.bare + ) + self.send_message( + mto=jid, + mbody=message, + mtype=chat_type + ) + diff --git a/slixfeed/xmpp/roster.py b/slixfeed/xmpp/roster.py new file mode 100644 index 0000000..09f33f4 --- /dev/null +++ b/slixfeed/xmpp/roster.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" + +TODO + +1) remove_subscription (clean_roster) + Remove presence from contacts that don't share presence. + +""" + +import slixfeed.xmpp.utility as utility + +async def add(self, jid): + """ + Add JID to roster. + + Parameters + ---------- + jid : str + Jabber ID. + + Returns + ------- + None. + """ + if await utility.jid_type(self, jid) == "groupchat": + # Check whether JID is in bookmarks; otherwise, add it. + print(jid, "is muc") + return + else: + await self.get_roster() + # Check whether JID is in roster; otherwise, add it. + if jid not in self.client_roster.keys(): + self.send_presence_subscription( + pto=jid, + ptype="subscribe", + pnick=self.nick + ) + self.update_roster( + jid, + subscription="both" + ) + + +def remove_subscription(self): + """ + Remove subscription from contacts that don't share their presence. + + Returns + ------- + None. + """ \ No newline at end of file diff --git a/slixfeed/xmpp/state.py b/slixfeed/xmpp/state.py new file mode 100644 index 0000000..b0e724f --- /dev/null +++ b/slixfeed/xmpp/state.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + + +async def request(self, jid): + """ + Ask contant to settle subscription. + + Parameters + ---------- + jid : str + Jabber ID. + + Returns + ------- + None. + """ + # Check whether JID is subscribed; otherwise, ask for presence. + if not self.client_roster[jid]["to"]: + self.send_presence_subscription( + pto=jid, + pfrom=self.boundjid.bare, + ptype="subscribe", + pnick=self.nick + ) + self.send_message( + mto=jid, + # mtype="headline", + msubject="RSS News Bot", + mbody=( + "Share online status to receive updates." + ), + mfrom=self.boundjid.bare, + mnick=self.nick + ) + self.send_presence( + pto=jid, + pfrom=self.boundjid.bare, + # Accept symbol πŸ‰‘οΈ πŸ‘οΈ ✍ + pstatus=( + "βœ’οΈ Share online status to receive updates." + ), + # ptype="subscribe", + pnick=self.nick + ) + + +async def unsubscribed(self, presence): + jid = presence["from"].bare + self.send_message( + mto=jid, + mbody="You have been unsubscribed." + ) + self.send_presence( + pto=jid, + pfrom=self.boundjid.bare, + pstatus="πŸ–‹οΈ Subscribe to receive updates", + pnick=self.nick + ) + self.update_roster( + jid, + subscription="remove" + ) diff --git a/slixfeed/xmpp/utility.py b/slixfeed/xmpp/utility.py new file mode 100644 index 0000000..f784e43 --- /dev/null +++ b/slixfeed/xmpp/utility.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from slixfeed.datetime import current_time +from slixmpp.exceptions import IqTimeout +import logging + +async def jid_type(self, jid): + """ + Check whether a JID is of MUC. + + Parameters + ---------- + jid : str + Jabber ID. + + Returns + ------- + str + "chat" or "groupchat. + """ + try: + iqresult = await self["xep_0030"].get_info(jid=jid) + features = iqresult["disco_info"]["features"] + # identity = iqresult['disco_info']['identities'] + # if 'account' in indentity: + # if 'conference' in indentity: + if 'http://jabber.org/protocol/muc' in features: + return "groupchat" + # TODO elif + # NOTE Is it needed? We do not interact with gateways or services + else: + return "chat" + # TODO Test whether this exception is realized + except IqTimeout as e: + messages = [ + ("Timeout IQ"), + ("IQ Stanza:", e), + ("Jabber ID:", jid) + ] + for message in messages: + print(current_time(), message) + logging.error(current_time(), message) \ No newline at end of file