Update xmpphandler.py

This commit is contained in:
Schimon Jehudah 2023-11-02 05:18:26 +00:00
parent 85d98609c5
commit 169878c3b7

View file

@ -1,6 +1,8 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from datetime import datetime
import asyncio import asyncio
import os import os
import slixmpp import slixmpp
@ -9,13 +11,28 @@ import confighandler
import datahandler import datahandler
import sqlitehandler import sqlitehandler
jid_tasker = {}
task_manager = {} task_manager = {}
time_now = datetime.now()
# time_now = time_now.strftime("%H:%M:%S")
def print_time():
# return datetime.now().strftime("%H:%M:%S")
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
return current_time
class Slixfeed(slixmpp.ClientXMPP): class Slixfeed(slixmpp.ClientXMPP):
""" """
Slixmpp news bot that will send updates Slixmpp news bot that will send updates
from feeds it receives. from feeds it receives.
""" """
print("slixmpp.ClientXMPP")
print(repr(slixmpp.ClientXMPP))
def __init__(self, jid, password): def __init__(self, jid, password):
slixmpp.ClientXMPP.__init__(self, jid, password) slixmpp.ClientXMPP.__init__(self, jid, password)
@ -25,7 +42,7 @@ class Slixfeed(slixmpp.ClientXMPP):
# listen for this event so that we we can initialize # listen for this event so that we we can initialize
# our roster. # our roster.
self.add_event_handler("session_start", self.start) self.add_event_handler("session_start", self.start)
self.add_event_handler("session_start", self.select_file) # self.add_event_handler("session_start", self.select_file)
# self.add_event_handler("session_start", self.send_status) # self.add_event_handler("session_start", self.send_status)
# self.add_event_handler("session_start", self.check_updates) # self.add_event_handler("session_start", self.check_updates)
@ -37,6 +54,7 @@ class Slixfeed(slixmpp.ClientXMPP):
# Initialize event loop # Initialize event loop
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
async def start(self, event): async def start(self, event):
""" """
Process the session_start event. Process the session_start event.
@ -52,8 +70,16 @@ class Slixfeed(slixmpp.ClientXMPP):
""" """
self.send_presence() self.send_presence()
await self.get_roster() await self.get_roster()
await self.select_file()
async def message(self, event, msg): self.send_presence(
pshow="away",
pstatus="Slixmpp has been restarted.",
pto="sch@pimux.de"
)
async def message(self, msg):
""" """
Process incoming message stanzas. Be aware that this also Process incoming message stanzas. Be aware that this also
includes MUC messages and error messages. It is usually includes MUC messages and error messages. It is usually
@ -65,60 +91,85 @@ class Slixfeed(slixmpp.ClientXMPP):
for stanza objects and the Message stanza to see for stanza objects and the Message stanza to see
how it may be used. how it may be used.
""" """
if msg['type'] in ('chat', 'normal'): if msg["type"] in ("chat", "normal"):
jid = msg['from'].bare action = 0
message = " ".join(msg['body'].split()) jid = msg["from"].bare
if message.lower().startswith('help'): message = " ".join(msg["body"].split())
message = message.lower()
if message.startswith("help"):
action = print_help() action = print_help()
# NOTE: Might not need it # NOTE: Might not need it
elif message.lower().startswith('add '): # elif message.startswith("add "):
action = await initdb(jid, datahandler.add_feed, message[4:]) # url = message[4:]
elif message.startswith("http"):
url = message
action = await initdb(jid, datahandler.add_feed, url)
# action = "> " + message + "\n" + action # action = "> " + message + "\n" + action
elif message.lower().startswith('quantum '): elif message.startswith("quantum "):
key = message[:7] key = message[:7]
val = message[8:] val = message[8:]
# action = "Every update will contain {} news items.".format(action) # action = "Every update will contain {} news items.".format(action)
action = await initdb(jid, sqlitehandler.set_settings_value, [key, val]) action = await initdb(jid, sqlitehandler.set_settings_value, [key, val])
await self.refresh_task(jid, key, val) await self.refresh_task(jid, key, val)
elif message.lower().startswith('disable'): elif message.startswith("interval "):
# action = "Updates are disabled."
action = await initdb(jid, sqlitehandler.set_settings_value, message)
await self.refresh_task(jid, "enabled", 0)
elif message.lower().startswith('enable'):
# action = "Updates are enabled."
action = await initdb(jid, sqlitehandler.set_settings_value, message)
await self.refresh_task(jid, "enabled", 1)
elif message.lower().startswith('interval '):
key = message[:8] key = message[:8]
val = message[9:] val = message[9:]
# action = "Updates will be sent every {} minutes.".format(action) # action = "Updates will be sent every {} minutes.".format(action)
action = await initdb(jid, sqlitehandler.set_settings_value, [key, val]) action = await initdb(jid, sqlitehandler.set_settings_value, [key, val])
await self.refresh_task(event, jid, key, val) await self.refresh_task(jid, key, val)
elif message.lower().startswith('list'): elif message.startswith("list"):
action = await initdb(jid, sqlitehandler.list_subscriptions) action = await initdb(jid, sqlitehandler.list_subscriptions)
elif message.lower().startswith('recent '): elif message.startswith("recent "):
action = await initdb(jid, sqlitehandler.last_entries, message[7:]) num = message[7:]
elif message.lower().startswith('remove '): action = await initdb(jid, sqlitehandler.last_entries, num)
action = await initdb(jid, sqlitehandler.remove_feed, message[7:]) elif message.startswith("remove "):
elif message.lower().startswith('search '): ix = message[7:]
action = await initdb(jid, sqlitehandler.search_entries, message[7:]) action = await initdb(jid, sqlitehandler.remove_feed, ix)
elif message.lower().startswith('status '): elif message.startswith("search "):
action = await initdb(jid, sqlitehandler.toggle_status, message[7:]) query = message[7:]
elif message.lower().startswith('unread'): action = await initdb(jid, sqlitehandler.search_entries, query)
elif message.startswith("start"):
# action = "Updates are enabled."
key = "enabled"
val = 1
actiona = await initdb(jid, sqlitehandler.set_settings_value, [key, val])
asyncio.create_task(self.task_jid(jid))
# print(print_time(), "task_manager[jid]")
# print(task_manager[jid])
elif message.startswith("stats"):
action = await initdb(jid, sqlitehandler.statistics) action = await initdb(jid, sqlitehandler.statistics)
elif message.startswith("status "):
ix = message[7:]
action = await initdb(jid, sqlitehandler.toggle_status, ix)
elif message.startswith("stop"):
# action = "Updates are disabled."
try:
task_manager[jid]["check"].cancel()
# task_manager[jid]["status"].cancel()
task_manager[jid]["interval"].cancel()
key = "enabled"
val = 0
actiona = await initdb(jid, sqlitehandler.set_settings_value, [key, val])
await self.send_status(jid)
print(print_time(), "task_manager[jid]")
print(task_manager[jid])
except:
# action = "Updates are already disabled."
await self.send_status(jid)
else: else:
action = "Unknown command. Press \"help\" for list of commands" action = "Unknown command. Press \"help\" for list of commands"
msg.reply(action).send() if action: msg.reply(action).send()
print(print_time(), "COMMAND ACCOUNT")
print("COMMAND:", message) print("COMMAND:", message)
print("ACCOUNT: " + str(msg['from'])) print("ACCOUNT: " + str(msg["from"]))
async def select_file(self, event):
async def select_file(self):
""" """
Initiate actions by JID (Jabber ID). Initiate actions by JID (Jabber ID).
:param self: Self :param self: Self
:param event: Event
""" """
while True: while True:
db_dir = confighandler.get_default_dbdir() db_dir = confighandler.get_default_dbdir()
@ -128,73 +179,73 @@ class Slixfeed(slixmpp.ClientXMPP):
"Add Slixfeed contact to your roster \n" "Add Slixfeed contact to your roster \n"
"Send a feed to the bot by: \n" "Send a feed to the bot by: \n"
"add https://reclaimthenet.org/feed/") "add https://reclaimthenet.org/feed/")
print(print_time(), msg)
print(msg) print(msg)
else: else:
os.chdir(db_dir) os.chdir(db_dir)
files = os.listdir() files = os.listdir()
# TODO Use loop (with gather) instead of TaskGroup
# for file in files:
# if file.endswith(".db") and not file.endswith(".db-jour.db"):
# jid = file[:-3]
# jid_tasker[jid] = asyncio.create_task(self.task_jid(jid))
# await jid_tasker[jid]
async with asyncio.TaskGroup() as tg: async with asyncio.TaskGroup() as tg:
print("main task") print("main task")
print(repr(tg)) print(print_time(), "repr(tg)")
print(repr(tg)) # <TaskGroup entered>
for file in files: for file in files:
if file.endswith('.db') and not file.endswith('.db-jour.db'): if file.endswith(".db") and not file.endswith(".db-jour.db"):
jid = file[:-3] jid = file[:-3]
tg.create_task(self.jid(event, jid)) tg.create_task(self.task_jid(jid))
# task_manager.update({jid: tg}) # task_manager.update({jid: tg})
print(task_manager) # print(task_manager) # {}
print(jid, repr(tg)) print(print_time(), "repr(tg) id(tg)")
print(jid, id(tg)) print(jid, repr(tg)) # sch@pimux.de <TaskGroup tasks=1 entered>
print(jid, id(tg)) # sch@pimux.de 139879835500624
# <xmpphandler.Slixfeed object at 0x7f24922124d0> <TaskGroup tasks=2 entered>
# <xmpphandler.Slixfeed object at 0x7f24922124d0> 139879835500624
async def jid(self, event, jid):
async def task_jid(self, jid):
""" """
JID (Jabber ID) task manager. JID (Jabber ID) task manager.
:param self: Self :param self: Self
:param event: Event
:param jid: Jabber ID :param jid: Jabber ID
""" """
enabled = await initdb( enabled = await initdb(
jid, jid,
sqlitehandler.get_settings_value, sqlitehandler.get_settings_value,
'enabled' "enabled"
) )
print("enabled", enabled, jid) print(print_time(), "enabled", enabled, jid)
if enabled: if enabled:
async with asyncio.TaskGroup() as tg:
print("sub task") print("sub task")
print(print_time(), "repr(self) id(self)")
print(repr(self)) print(repr(self))
print(id(self)) print(id(self))
print(repr(tg))
print(id(tg))
tg.create_task(self.check_updates(event, jid))
# tg.create_task(self.send_update(event, jid))
task_manager[jid] = {} task_manager[jid] = {}
task_manager[jid]['interval'] = tg.create_task(self.send_update(event, jid)) task_manager[jid]["check"] = asyncio.create_task(check_updates(jid))
task_manager[jid]["status"] = asyncio.create_task(self.send_status(jid))
task_manager[jid]["interval"] = asyncio.create_task(self.send_update(jid))
await task_manager[jid]["check"]
await task_manager[jid]["status"]
await task_manager[jid]["interval"]
print(print_time(), "task_manager[jid].items()")
print(task_manager[jid].items())
print(print_time(), "task_manager[jid]")
print(task_manager[jid]) print(task_manager[jid])
tg.create_task(self.send_status(event, jid)) print(print_time(), "task_manager")
print(task_manager)
else: else:
await self.send_status(event, jid) await self.send_status(jid)
async def check_updates(self, event, jid): async def send_update(self, jid):
"""
Start calling for update check up.
:param self: Self
:param event: Event
:param jid: Jabber ID
"""
while True:
print("> CHCK UPDATE",jid)
await initdb(jid, datahandler.download_updates)
await asyncio.sleep(60 * 90)
# Schedule to call this function again in 90 minutes
# self.loop.call_at(self.loop.time() + 60 * 90, self.loop.create_task, self.check_updates(event, jid))
async def send_update(self, event, jid):
""" """
Send news items as messages. Send news items as messages.
:param self: Self :param self: Self
:param event: Event
:param jid: Jabber ID :param jid: Jabber ID
""" """
new = await initdb( new = await initdb(
@ -202,62 +253,71 @@ class Slixfeed(slixmpp.ClientXMPP):
sqlitehandler.get_entry_unread sqlitehandler.get_entry_unread
) )
if new: if new:
print("> SEND UPDATE",jid) print(print_time(), "> SEND UPDATE",jid)
self.send_message( self.send_message(
mto=jid, mto=jid,
mbody=new, mbody=new,
mtype='chat' mtype="chat"
) )
interval = await initdb( interval = await initdb(
jid, jid,
sqlitehandler.get_settings_value, sqlitehandler.get_settings_value,
'interval' "interval"
) )
# await asyncio.sleep(60 * interval) # await asyncio.sleep(60 * interval)
self.loop.call_at(self.loop.time() + 60 * interval, self.loop.create_task, self.send_update(event, jid)) self.loop.call_at(
self.loop.time() + 60 * interval,
self.loop.create_task,
self.send_update(jid)
)
async def send_status(self, event, jid): async def send_status(self, jid):
""" """
Send status message. Send status message.
:param self: Self :param self: Self
:param event: Event
:param jid: Jabber ID :param jid: Jabber ID
""" """
print("> SEND STATUS",jid) print(print_time(), "> SEND STATUS",jid)
unread = await initdb( unread = await initdb(
jid, jid,
sqlitehandler.get_number_of_entries_unread sqlitehandler.get_number_of_entries_unread
) )
if unread: if unread:
msg_status = "📰 News items: {}".format(str(unread)) status_text = "📰 News items: {}".format(str(unread))
typ_status = "chat" status_mode = "chat"
else: else:
msg_status = "🗞 No News" status_text = "🗞 No News"
typ_status = "available" status_mode = "available"
enabled = await initdb( enabled = await initdb(
jid, jid,
sqlitehandler.get_settings_value, sqlitehandler.get_settings_value,
'enabled' "enabled"
) )
if not enabled: if not enabled:
typ_status = "xa" status_mode = "xa"
# print(msg_status, 'for', jid) # print(status_text, "for", jid)
self.send_presence( self.send_presence(
pshow=typ_status, pshow=status_mode,
pstatus=msg_status, pstatus=status_text,
pto=jid, pto=jid,
#pfrom=None #pfrom=None
) )
# await asyncio.sleep(60 * 20)
self.loop.call_at(self.loop.time() + 60 * 20, self.loop.create_task, self.send_status(event, jid)) await asyncio.sleep(60 * 20)
# self.loop.call_at(
# self.loop.time() + 60 * 20,
# self.loop.create_task,
# self.send_status(jid)
# )
async def refresh_task(self, event, jid, key, val): async def refresh_task(self, jid, key, val):
""" """
Apply settings on runtime. Apply settings on runtime.
@ -268,11 +328,41 @@ class Slixfeed(slixmpp.ClientXMPP):
""" """
if jid in task_manager: if jid in task_manager:
task_manager[jid][key].cancel() task_manager[jid][key].cancel()
task_manager[jid][key] = self.send_update.loop.call_at( loop = asyncio.get_event_loop()
self.send_update.loop.time() + 60 * val, print(print_time(), "loop")
self.send_update.loop.create_task, print(loop)
self.send_update(event, jid) print(print_time(), "loop")
task_manager[jid][key] = loop.call_at(
loop.time() + 60 * float(val),
loop.create_task,
self.send_update(jid)
) )
# task_manager[jid][key] = self.send_update.loop.call_at(
# self.send_update.loop.time() + 60 * val,
# self.send_update.loop.create_task,
# self.send_update(jid)
# )
# TODO Take this function out of
# <class 'slixmpp.clientxmpp.ClientXMPP'>
async def check_updates(jid):
"""
Start calling for update check up.
:param jid: Jabber ID
"""
while True:
print(print_time(), "> CHCK UPDATE",jid)
await initdb(jid, datahandler.download_updates)
await asyncio.sleep(60 * 90)
# Schedule to call this function again in 90 minutes
# self.loop.call_at(
# self.loop.time() + 60 * 90,
# self.loop.create_task,
# self.check_updates(jid)
# )
def print_help(): def print_help():
""" """
@ -285,14 +375,14 @@ def print_help():
" Supported filetypes: Atom, RDF and RSS. \n" " Supported filetypes: Atom, RDF and RSS. \n"
"\n" "\n"
"BASIC USAGE: \n" "BASIC USAGE: \n"
" enable \n" " start \n"
" Send updates. \n" " Enable bot and send updates. \n"
" disable \n" " Stop \n"
" Stop sending updates. \n" " Disable bot and stop updates. \n"
" batch N \n" " batch N \n"
" Send N updates on ech interval. \n" " Send N updates for each interval. \n"
" interval N \n" " interval N \n"
" Send an update each N minutes. \n" " Send an update every N minutes. \n"
" feed list \n" " feed list \n"
" List subscriptions. \n" " List subscriptions. \n"
"\n" "\n"