From 169878c3b7e52a6eb1a2e77c7a27f8db379a5ddb Mon Sep 17 00:00:00 2001 From: Schimon Jehudah Date: Thu, 2 Nov 2023 05:18:26 +0000 Subject: [PATCH] Update xmpphandler.py --- slixfeed/xmpphandler.py | 306 ++++++++++++++++++++++++++-------------- 1 file changed, 198 insertions(+), 108 deletions(-) diff --git a/slixfeed/xmpphandler.py b/slixfeed/xmpphandler.py index 29e9e65..92d5dd6 100644 --- a/slixfeed/xmpphandler.py +++ b/slixfeed/xmpphandler.py @@ -1,6 +1,8 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +from datetime import datetime + import asyncio import os import slixmpp @@ -9,13 +11,28 @@ import confighandler import datahandler import sqlitehandler +jid_tasker = {} task_manager = {} +time_now = datetime.now() +# time_now = time_now.strftime("%H:%M:%S") + +def print_time(): + # return datetime.now().strftime("%H:%M:%S") + now = datetime.now() + current_time = now.strftime("%H:%M:%S") + return current_time + + class Slixfeed(slixmpp.ClientXMPP): """ Slixmpp news bot that will send updates from feeds it receives. """ + + print("slixmpp.ClientXMPP") + print(repr(slixmpp.ClientXMPP)) + def __init__(self, jid, password): slixmpp.ClientXMPP.__init__(self, jid, password) @@ -25,7 +42,7 @@ class Slixfeed(slixmpp.ClientXMPP): # listen for this event so that we we can initialize # our roster. self.add_event_handler("session_start", self.start) - self.add_event_handler("session_start", self.select_file) + # self.add_event_handler("session_start", self.select_file) # self.add_event_handler("session_start", self.send_status) # self.add_event_handler("session_start", self.check_updates) @@ -37,6 +54,7 @@ class Slixfeed(slixmpp.ClientXMPP): # Initialize event loop self.loop = asyncio.get_event_loop() + async def start(self, event): """ Process the session_start event. @@ -52,8 +70,16 @@ class Slixfeed(slixmpp.ClientXMPP): """ self.send_presence() await self.get_roster() + await self.select_file() - async def message(self, event, msg): + self.send_presence( + pshow="away", + pstatus="Slixmpp has been restarted.", + pto="sch@pimux.de" + ) + + + async def message(self, msg): """ Process incoming message stanzas. Be aware that this also includes MUC messages and error messages. It is usually @@ -65,60 +91,85 @@ class Slixfeed(slixmpp.ClientXMPP): for stanza objects and the Message stanza to see how it may be used. """ - if msg['type'] in ('chat', 'normal'): - jid = msg['from'].bare - message = " ".join(msg['body'].split()) - if message.lower().startswith('help'): + if msg["type"] in ("chat", "normal"): + action = 0 + jid = msg["from"].bare + message = " ".join(msg["body"].split()) + message = message.lower() + if message.startswith("help"): action = print_help() # NOTE: Might not need it - elif message.lower().startswith('add '): - action = await initdb(jid, datahandler.add_feed, message[4:]) + # elif message.startswith("add "): + # url = message[4:] + elif message.startswith("http"): + url = message + action = await initdb(jid, datahandler.add_feed, url) # action = "> " + message + "\n" + action - elif message.lower().startswith('quantum '): + elif message.startswith("quantum "): key = message[:7] val = message[8:] # action = "Every update will contain {} news items.".format(action) action = await initdb(jid, sqlitehandler.set_settings_value, [key, val]) await self.refresh_task(jid, key, val) - elif message.lower().startswith('disable'): - # action = "Updates are disabled." - action = await initdb(jid, sqlitehandler.set_settings_value, message) - await self.refresh_task(jid, "enabled", 0) - elif message.lower().startswith('enable'): - # action = "Updates are enabled." - action = await initdb(jid, sqlitehandler.set_settings_value, message) - await self.refresh_task(jid, "enabled", 1) - elif message.lower().startswith('interval '): + elif message.startswith("interval "): key = message[:8] val = message[9:] # action = "Updates will be sent every {} minutes.".format(action) action = await initdb(jid, sqlitehandler.set_settings_value, [key, val]) - await self.refresh_task(event, jid, key, val) - elif message.lower().startswith('list'): + await self.refresh_task(jid, key, val) + elif message.startswith("list"): action = await initdb(jid, sqlitehandler.list_subscriptions) - elif message.lower().startswith('recent '): - action = await initdb(jid, sqlitehandler.last_entries, message[7:]) - elif message.lower().startswith('remove '): - action = await initdb(jid, sqlitehandler.remove_feed, message[7:]) - elif message.lower().startswith('search '): - action = await initdb(jid, sqlitehandler.search_entries, message[7:]) - elif message.lower().startswith('status '): - action = await initdb(jid, sqlitehandler.toggle_status, message[7:]) - elif message.lower().startswith('unread'): + elif message.startswith("recent "): + num = message[7:] + action = await initdb(jid, sqlitehandler.last_entries, num) + elif message.startswith("remove "): + ix = message[7:] + action = await initdb(jid, sqlitehandler.remove_feed, ix) + elif message.startswith("search "): + query = message[7:] + action = await initdb(jid, sqlitehandler.search_entries, query) + elif message.startswith("start"): + # action = "Updates are enabled." + key = "enabled" + val = 1 + actiona = await initdb(jid, sqlitehandler.set_settings_value, [key, val]) + asyncio.create_task(self.task_jid(jid)) + # print(print_time(), "task_manager[jid]") + # print(task_manager[jid]) + elif message.startswith("stats"): action = await initdb(jid, sqlitehandler.statistics) + elif message.startswith("status "): + ix = message[7:] + action = await initdb(jid, sqlitehandler.toggle_status, ix) + elif message.startswith("stop"): + # action = "Updates are disabled." + try: + task_manager[jid]["check"].cancel() + # task_manager[jid]["status"].cancel() + task_manager[jid]["interval"].cancel() + key = "enabled" + val = 0 + actiona = await initdb(jid, sqlitehandler.set_settings_value, [key, val]) + await self.send_status(jid) + print(print_time(), "task_manager[jid]") + print(task_manager[jid]) + except: + # action = "Updates are already disabled." + await self.send_status(jid) else: action = "Unknown command. Press \"help\" for list of commands" - msg.reply(action).send() + if action: msg.reply(action).send() + print(print_time(), "COMMAND ACCOUNT") print("COMMAND:", message) - print("ACCOUNT: " + str(msg['from'])) + print("ACCOUNT: " + str(msg["from"])) - async def select_file(self, event): + + async def select_file(self): """ Initiate actions by JID (Jabber ID). :param self: Self - :param event: Event """ while True: db_dir = confighandler.get_default_dbdir() @@ -128,73 +179,73 @@ class Slixfeed(slixmpp.ClientXMPP): "Add Slixfeed contact to your roster \n" "Send a feed to the bot by: \n" "add https://reclaimthenet.org/feed/") + print(print_time(), msg) print(msg) else: os.chdir(db_dir) files = os.listdir() - async with asyncio.TaskGroup() as tg: - print("main task") - print(repr(tg)) - for file in files: - if file.endswith('.db') and not file.endswith('.db-jour.db'): - jid = file[:-3] - tg.create_task(self.jid(event, jid)) - # task_manager.update({jid: tg}) - print(task_manager) - print(jid, repr(tg)) - print(jid, id(tg)) + # TODO Use loop (with gather) instead of TaskGroup + # for file in files: + # if file.endswith(".db") and not file.endswith(".db-jour.db"): + # jid = file[:-3] + # jid_tasker[jid] = asyncio.create_task(self.task_jid(jid)) + # await jid_tasker[jid] + async with asyncio.TaskGroup() as tg: + print("main task") + print(print_time(), "repr(tg)") + print(repr(tg)) # + for file in files: + if file.endswith(".db") and not file.endswith(".db-jour.db"): + jid = file[:-3] + tg.create_task(self.task_jid(jid)) + # task_manager.update({jid: tg}) + # print(task_manager) # {} + print(print_time(), "repr(tg) id(tg)") + print(jid, repr(tg)) # sch@pimux.de + print(jid, id(tg)) # sch@pimux.de 139879835500624 + # + # 139879835500624 - async def jid(self, event, jid): + + async def task_jid(self, jid): """ JID (Jabber ID) task manager. :param self: Self - :param event: Event :param jid: Jabber ID """ enabled = await initdb( jid, sqlitehandler.get_settings_value, - 'enabled' + "enabled" ) - print("enabled", enabled, jid) + print(print_time(), "enabled", enabled, jid) if enabled: - async with asyncio.TaskGroup() as tg: - print("sub task") - print(repr(self)) - print(id(self)) - print(repr(tg)) - print(id(tg)) - tg.create_task(self.check_updates(event, jid)) - # tg.create_task(self.send_update(event, jid)) - task_manager[jid] = {} - task_manager[jid]['interval'] = tg.create_task(self.send_update(event, jid)) - print(task_manager[jid]) - tg.create_task(self.send_status(event, jid)) + print("sub task") + print(print_time(), "repr(self) id(self)") + print(repr(self)) + print(id(self)) + task_manager[jid] = {} + task_manager[jid]["check"] = asyncio.create_task(check_updates(jid)) + task_manager[jid]["status"] = asyncio.create_task(self.send_status(jid)) + task_manager[jid]["interval"] = asyncio.create_task(self.send_update(jid)) + await task_manager[jid]["check"] + await task_manager[jid]["status"] + await task_manager[jid]["interval"] + print(print_time(), "task_manager[jid].items()") + print(task_manager[jid].items()) + print(print_time(), "task_manager[jid]") + print(task_manager[jid]) + print(print_time(), "task_manager") + print(task_manager) else: - await self.send_status(event, jid) + await self.send_status(jid) - async def check_updates(self, event, jid): - """ - Start calling for update check up. - - :param self: Self - :param event: Event - :param jid: Jabber ID - """ - while True: - print("> CHCK UPDATE",jid) - await initdb(jid, datahandler.download_updates) - await asyncio.sleep(60 * 90) - # Schedule to call this function again in 90 minutes - # self.loop.call_at(self.loop.time() + 60 * 90, self.loop.create_task, self.check_updates(event, jid)) - - async def send_update(self, event, jid): + async def send_update(self, jid): """ Send news items as messages. :param self: Self - :param event: Event :param jid: Jabber ID """ new = await initdb( @@ -202,62 +253,71 @@ class Slixfeed(slixmpp.ClientXMPP): sqlitehandler.get_entry_unread ) if new: - print("> SEND UPDATE",jid) + print(print_time(), "> SEND UPDATE",jid) self.send_message( mto=jid, mbody=new, - mtype='chat' + mtype="chat" ) interval = await initdb( jid, sqlitehandler.get_settings_value, - 'interval' + "interval" ) # await asyncio.sleep(60 * interval) - self.loop.call_at(self.loop.time() + 60 * interval, self.loop.create_task, self.send_update(event, jid)) + self.loop.call_at( + self.loop.time() + 60 * interval, + self.loop.create_task, + self.send_update(jid) + ) - async def send_status(self, event, jid): + async def send_status(self, jid): """ Send status message. :param self: Self - :param event: Event :param jid: Jabber ID """ - print("> SEND STATUS",jid) + print(print_time(), "> SEND STATUS",jid) unread = await initdb( jid, sqlitehandler.get_number_of_entries_unread ) if unread: - msg_status = "📰 News items: {}".format(str(unread)) - typ_status = "chat" + status_text = "📰 News items: {}".format(str(unread)) + status_mode = "chat" else: - msg_status = "🗞 No News" - typ_status = "available" + status_text = "🗞 No News" + status_mode = "available" enabled = await initdb( jid, sqlitehandler.get_settings_value, - 'enabled' + "enabled" ) if not enabled: - typ_status = "xa" + status_mode = "xa" - # print(msg_status, 'for', jid) + # print(status_text, "for", jid) self.send_presence( - pshow=typ_status, - pstatus=msg_status, + pshow=status_mode, + pstatus=status_text, pto=jid, #pfrom=None ) - # await asyncio.sleep(60 * 20) - self.loop.call_at(self.loop.time() + 60 * 20, self.loop.create_task, self.send_status(event, jid)) + + await asyncio.sleep(60 * 20) + + # self.loop.call_at( + # self.loop.time() + 60 * 20, + # self.loop.create_task, + # self.send_status(jid) + # ) - async def refresh_task(self, event, jid, key, val): + async def refresh_task(self, jid, key, val): """ Apply settings on runtime. @@ -268,11 +328,41 @@ class Slixfeed(slixmpp.ClientXMPP): """ if jid in task_manager: task_manager[jid][key].cancel() - task_manager[jid][key] = self.send_update.loop.call_at( - self.send_update.loop.time() + 60 * val, - self.send_update.loop.create_task, - self.send_update(event, jid) + loop = asyncio.get_event_loop() + print(print_time(), "loop") + print(loop) + print(print_time(), "loop") + task_manager[jid][key] = loop.call_at( + loop.time() + 60 * float(val), + loop.create_task, + self.send_update(jid) ) + # task_manager[jid][key] = self.send_update.loop.call_at( + # self.send_update.loop.time() + 60 * val, + # self.send_update.loop.create_task, + # self.send_update(jid) + # ) + + +# TODO Take this function out of +# +async def check_updates(jid): + """ + Start calling for update check up. + + :param jid: Jabber ID + """ + while True: + print(print_time(), "> CHCK UPDATE",jid) + await initdb(jid, datahandler.download_updates) + await asyncio.sleep(60 * 90) + # Schedule to call this function again in 90 minutes + # self.loop.call_at( + # self.loop.time() + 60 * 90, + # self.loop.create_task, + # self.check_updates(jid) + # ) + def print_help(): """ @@ -285,14 +375,14 @@ def print_help(): " Supported filetypes: Atom, RDF and RSS. \n" "\n" "BASIC USAGE: \n" - " enable \n" - " Send updates. \n" - " disable \n" - " Stop sending updates. \n" + " start \n" + " Enable bot and send updates. \n" + " Stop \n" + " Disable bot and stop updates. \n" " batch N \n" - " Send N updates on ech interval. \n" + " Send N updates for each interval. \n" " interval N \n" - " Send an update each N minutes. \n" + " Send an update every N minutes. \n" " feed list \n" " List subscriptions. \n" "\n" @@ -343,7 +433,7 @@ def print_help(): async def initdb(jid, callback, message=None): """ Callback function to instantiate action on database. - + :param jid: JID (Jabber ID). :param callback: Function name. :param massage: Optional kwarg when a message is a part or required argument.