From 7e5a15d57b5946466dbae2a1673b073f81017149 Mon Sep 17 00:00:00 2001 From: Schimon Jehudah Date: Tue, 10 Oct 2023 13:41:20 +0000 Subject: [PATCH] - Add TaskGroups so all JIDs be served co-routinely - Add Settings --- slixfeed/__main__.py | 227 +++++++++++++++++++++++-------------------- 1 file changed, 121 insertions(+), 106 deletions(-) diff --git a/slixfeed/__main__.py b/slixfeed/__main__.py index 2c02d41..c3b770b 100644 --- a/slixfeed/__main__.py +++ b/slixfeed/__main__.py @@ -55,9 +55,9 @@ class Slixfeed(slixmpp.ClientXMPP): # listen for this event so that we we can initialize # our roster. self.add_event_handler("session_start", self.start) - self.add_event_handler("session_start", self.send_update) - self.add_event_handler("session_start", self.send_status) - self.add_event_handler("session_start", self.check_updates) + self.add_event_handler("session_start", self.select_file) + # self.add_event_handler("session_start", self.send_status) + # self.add_event_handler("session_start", self.check_updates) # The message event is triggered whenever a message # stanza is received. Be aware that that includes @@ -98,51 +98,47 @@ class Slixfeed(slixmpp.ClientXMPP): if message.lower().startswith('help'): action = print_help() # NOTE: Might not need it - elif message.lower().startswith('recent '): - action = await initdb(msg['from'].bare, database.last_entries, message[7:]) - elif message.lower().startswith('search '): - action = await initdb( msg['from'].bare, database.search_entries, message[7:]) - elif message.lower().startswith('list'): - action = await initdb(msg['from'].bare, database.list_subscriptions) elif message.lower().startswith('add '): action = await initdb(msg['from'].bare, add_feed, message[4:]) + # action = "> " + message + "\n" + action + elif message.lower().startswith('quantum '): + action = await initdb(msg['from'].bare, database.set_settings_value, [message[:7], message[8:]]) + # action = "Every update will contain {} news items.".format(action) + elif message.lower().startswith('disable'): + action = await initdb(msg['from'].bare, database.set_settings_value, message) + # action = "Updates are disabled." + elif message.lower().startswith('enable'): + action = await initdb(msg['from'].bare, database.set_settings_value, message) + # action = "Updates are enabled." + elif message.lower().startswith('interval '): + action = await initdb(msg['from'].bare, database.set_settings_value, [message[:8], message[9:]]) + # action = "Updates will be sent every {} minutes.".format(action) + elif message.lower().startswith('list'): + action = await initdb(msg['from'].bare, database.list_subscriptions) + elif message.lower().startswith('recent '): + action = await initdb(msg['from'].bare, database.last_entries, message[7:]) elif message.lower().startswith('remove '): action = await initdb(msg['from'].bare, database.remove_feed, message[7:]) + elif message.lower().startswith('search '): + action = await initdb( msg['from'].bare, database.search_entries, message[7:]) elif message.lower().startswith('status '): action = await initdb(msg['from'].bare, database.toggle_status, message[7:]) elif message.lower().startswith('unread'): action = await initdb(msg['from'].bare, database.statistics) - elif message.lower().startswith('enable'): - action = toggle_state(msg['from'].bare, True) - elif message.lower().startswith('disable'): - action = toggle_state(msg['from'].bare, False) else: - action = 'Unknown command. Press "help" for list of commands' + action = "Unknown command. Press \"help\" for list of commands" msg.reply(action).send() print("COMMAND:", message) print("ACCOUNT: " + str(msg['from'])) - async def check_updates(self, event): - while True: - print("Checking update") - db_dir = get_default_dbdir() - if not os.path.isdir(db_dir): - msg = ("Slixfeed can not work without a database. \n" - "To create a database, follow these steps: \n" - "Add Slixfeed contact to your roster \n" - "Send a feed to the bot by: \n" - "feed add https://reclaimthenet.org/feed/") - print(msg) - else: - files = os.listdir(db_dir) - for file in files: - jid = file[:-3] - print("download_updates",jid) - await initdb(jid, download_updates) - await asyncio.sleep(90) - - async def send_update(self, event): + async def select_file(self, event): + """ + Initiate actions by JID (Jabber ID). + + :param self: + :param event: + """ while True: db_dir = get_default_dbdir() if not os.path.isdir(db_dir): @@ -155,62 +151,87 @@ class Slixfeed(slixmpp.ClientXMPP): else: os.chdir(db_dir) files = os.listdir() - for file in files: - if not file.endswith('.db-jour.db'): - jid = file[:-3] - print("get_entry_unread",jid) + async with asyncio.TaskGroup() as tg: + for file in files: + if file.endswith('.db') and not file.endswith('.db-jour.db'): + jid = file[:-3] + tg.create_task(self.jid(event, jid)) - new = await initdb( - jid, - database.get_entry_unread - ) + async def jid(self, event, jid): + """ + JID (Jabber ID) task manager. + + :param self: + :param event: + :param jid: Jabber ID + """ + enabled = await initdb(jid, database.get_settings_value, 'enabled') + print("enabled", enabled, jid) + if enabled: + async with asyncio.TaskGroup() as tg: + tg.create_task(self.check_updates(event, jid)) + tg.create_task(self.send_update(event, jid)) + tg.create_task(self.send_status(event, jid)) - if new: - msg = self.send_message( - mto=jid, - mbody=new, - mtype='chat' - ) - - unread = await initdb( - jid, - database.get_number_of_entries_unread - ) - - if unread: - msg_status = ('📰 News items:', str(unread)) - msg_status = ' '.join(msg_status) - else: - msg_status = '🗞 No News' - - print(msg_status, 'for', jid) - - # Send status message - self.send_presence( - pstatus=msg_status, - pto=jid, - #pfrom=None - ) - - # await asyncio.sleep(15) - await asyncio.sleep(60 * 3) - - async def send_status(self, event): + async def check_updates(self, event, jid): + """ + Start calling for update check up. + + :param self: + :param event: + :param jid: Jabber ID + """ while True: - db_dir = get_default_dbdir() - if not os.path.isdir(db_dir): - msg = ("Slixfeed can not work without a database. \n" - "To create a database, follow these steps: \n" - "Add Slixfeed contact to your roster \n" - "Send a feed to the bot by: \n" - "feed add https://reclaimthenet.org/feed/") - print(msg) - else: - files = os.listdir(db_dir) - for file in files: - jid = file[:-3] + print("> CHCK UPDATE",jid) + await initdb(jid, download_updates) + await asyncio.sleep(60 * 90) - await asyncio.sleep(60) + async def send_update(self, event, jid): + """ + Send news items as messages. + + :param self: + :param event: + :param jid: Jabber ID + """ + print("> SEND UPDATE",jid) + new = await initdb( + jid, + database.get_entry_unread + ) + if new: + self.send_message( + mto=jid, + mbody=new, + mtype='chat' + ) + interval = await initdb(jid, database.get_settings_value, 'interval') + await asyncio.sleep(60 * interval) + + async def send_status(self, event, jid): + """ + Send status message. + + :param self: + :param event: + :param jid: Jabber ID + """ + print("> SEND STATUS",jid) + unread = await initdb( + jid, + database.get_number_of_entries_unread + ) + if unread: + msg_status = ('📰 News items:', str(unread)) + else: + msg_status = '🗞 No News' + # print(msg_status, 'for', jid) + self.send_presence( + pstatus=msg_status, + pto=jid, + #pfrom=None + ) + await asyncio.sleep(60 * 20) def print_help(): @@ -324,7 +345,7 @@ async def initdb(jid, callback, message=None): os.mkdir(db_dir) db_file = os.path.join(db_dir, r"{}.db".format(jid)) database.create_tables(db_file) - + # await database.set_default_values(db_file) if message: return await callback(db_file, message) else: @@ -341,9 +362,7 @@ async def download_updates(db_file): urls = await database.get_subscriptions(db_file) for url in urls: - # print("for url in urls") source = url[0] - # print("source: ", source) res = await download_feed(source) # TypeError: 'NoneType' object is not subscriptable if res is None: @@ -358,15 +377,14 @@ async def download_updates(db_file): try: feed = feedparser.parse(res[0]) if feed.bozo: - bozo = ("WARNING: Bozo detected for feed <{}>. " - "For more information, visit " - "https://pythonhosted.org/feedparser/bozo.html" - .format(source)) - print(bozo) + # bozo = ("WARNING: Bozo detected for feed <{}>. " + # "For more information, visit " + # "https://pythonhosted.org/feedparser/bozo.html" + # .format(source)) + # print(bozo) valid = 0 else: valid = 1 - await database.update_source_validity(db_file, source, valid) except (IncompleteReadError, IncompleteRead, error.URLError) as e: print(e) @@ -396,7 +414,6 @@ async def download_updates(db_file): link = entry.link else: link = source - # print('source:', source) exist = await database.check_entry_exist(db_file, title, link) @@ -413,7 +430,6 @@ async def download_updates(db_file): summary = '*** No summary ***' entry = (title, summary, link, source, 0); await database.add_entry_and_set_date(db_file, source, entry) - # print("### added", new_entry, "entries") async def download_feed(url): @@ -423,8 +439,6 @@ async def download_feed(url): :param url: URL. :return: Document or error message. """ - # print("download_feed") - # time.sleep(1) timeout = aiohttp.ClientTimeout(total=10) async with aiohttp.ClientSession() as session: # async with aiohttp.ClientSession(trust_env=True) as session: @@ -444,7 +458,7 @@ async def download_feed(url): print('Error', str(e)) return [False, "Error: " + str(e)] except asyncio.TimeoutError as e: - print('Timeout', str(e)) + # print('Timeout:', str(e)) return [False, "Timeout"] @@ -615,19 +629,20 @@ async def add_feed(db_file, url): feed_addr = feed msg += "{}\n{}\n\n".format(feed_name, feed_addr) msg += "The above feeds were extracted from\n{}".format(url) - return msg elif feeds: url = list(feeds)[0] - return await add_feed(db_file, url) + msg = await add_feed(db_file, url) else: - return "No news feeds were found for URL <{}>.".format(url) + msg = "No news feeds were found for URL <{}>.".format(url) else: - return await database.add_feed(db_file, feed, url, res) + msg = await database.add_feed(db_file, feed, url, res) else: - return "Failed to get URL <{}>. Reason: {}".format(url, res[1]) + msg = "Failed to get URL <{}>. Reason: {}".format(url, res[1]) else: ix = exist[0] - return "News source <{}> is already listed in the subscription list at index {}".format(url, ix) + name = exist[1] + msg = "> {}\nNews source \"{}\" is already listed in the subscription list at index {}".format(url, name, ix) + return msg def toggle_state(jid, state):