Slixfeed/slixfeed/xmpphandler.py

442 lines
15 KiB
Python
Raw Normal View History

2023-10-24 16:43:14 +02:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
2023-11-02 06:18:26 +01:00
from datetime import datetime
2023-10-24 16:43:14 +02:00
import asyncio
import os
import slixmpp
import confighandler
import datahandler
import sqlitehandler
2023-11-02 06:18:26 +01:00
jid_tasker = {}
2023-10-24 16:43:14 +02:00
task_manager = {}
loop = asyncio.get_event_loop()
2023-10-24 16:43:14 +02:00
2023-11-02 06:18:26 +01:00
time_now = datetime.now()
# time_now = time_now.strftime("%H:%M:%S")
def print_time():
# return datetime.now().strftime("%H:%M:%S")
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
return current_time
2023-10-24 16:43:14 +02:00
class Slixfeed(slixmpp.ClientXMPP):
"""
Slixmpp news bot that will send updates
from feeds it receives.
"""
2023-11-02 06:18:26 +01:00
print("slixmpp.ClientXMPP")
print(repr(slixmpp.ClientXMPP))
2023-10-24 16:43:14 +02:00
def __init__(self, jid, password):
slixmpp.ClientXMPP.__init__(self, jid, password)
# The session_start event will be triggered when
# the bot establishes its connection with the server
# and the XML streams are ready for use. We want to
# listen for this event so that we we can initialize
# our roster.
self.add_event_handler("session_start", self.start)
2023-11-02 06:18:26 +01:00
# self.add_event_handler("session_start", self.select_file)
2023-10-24 16:43:14 +02:00
# self.add_event_handler("session_start", self.send_status)
# self.add_event_handler("session_start", self.check_updates)
# The message event is triggered whenever a message
# stanza is received. Be aware that that includes
# MUC messages and error messages.
self.add_event_handler("message", self.message)
self.add_event_handler("disconnected", self.reconnect)
# Initialize event loop
# self.loop = asyncio.get_event_loop()
2023-10-24 16:43:14 +02:00
2023-11-02 06:18:26 +01:00
2023-10-24 16:43:14 +02:00
async def start(self, event):
"""
Process the session_start event.
Typical actions for the session_start event are
requesting the roster and broadcasting an initial
presence stanza.
Arguments:
event -- An empty dictionary. The session_start
event does not provide any additional
data.
"""
self.send_presence()
await self.get_roster()
2023-11-02 06:18:26 +01:00
await self.select_file()
self.send_presence(
pshow="away",
pstatus="Slixmpp has been restarted.",
pto="sch@pimux.de"
)
2023-10-24 16:43:14 +02:00
2023-11-02 06:18:26 +01:00
async def message(self, msg):
2023-10-24 16:43:14 +02:00
"""
Process incoming message stanzas. Be aware that this also
includes MUC messages and error messages. It is usually
a good idea to check the messages's type before processing
or sending replies.
Arguments:
msg -- The received message stanza. See the documentation
for stanza objects and the Message stanza to see
how it may be used.
"""
2023-11-02 06:18:26 +01:00
if msg["type"] in ("chat", "normal"):
action = 0
jid = msg["from"].bare
message = " ".join(msg["body"].split())
message = message.lower()
print(print_time(), "ACCOUNT: " + str(msg["from"]))
print(print_time(), "COMMAND:", message)
2023-11-02 06:18:26 +01:00
if message.startswith("help"):
2023-10-24 16:43:14 +02:00
action = print_help()
# NOTE: Might not need it
2023-11-02 06:18:26 +01:00
# elif message.startswith("add "):
# url = message[4:]
elif message.startswith("http"):
url = message
action = await initdb(jid, datahandler.add_feed, url)
2023-10-24 16:43:14 +02:00
# action = "> " + message + "\n" + action
2023-11-02 06:18:26 +01:00
elif message.startswith("quantum "):
2023-10-24 16:43:14 +02:00
key = message[:7]
val = message[8:]
# action = "Every update will contain {} news items.".format(action)
action = await initdb(jid, sqlitehandler.set_settings_value, [key, val])
await self.refresh_task(jid, key, val)
2023-11-02 06:18:26 +01:00
elif message.startswith("interval "):
2023-10-24 16:43:14 +02:00
key = message[:8]
val = message[9:]
# action = "Updates will be sent every {} minutes.".format(action)
action = await initdb(jid, sqlitehandler.set_settings_value, [key, val])
2023-11-02 06:18:26 +01:00
await self.refresh_task(jid, key, val)
elif message.startswith("list"):
2023-10-24 16:43:14 +02:00
action = await initdb(jid, sqlitehandler.list_subscriptions)
2023-11-02 06:18:26 +01:00
elif message.startswith("recent "):
num = message[7:]
action = await initdb(jid, sqlitehandler.last_entries, num)
elif message.startswith("remove "):
ix = message[7:]
action = await initdb(jid, sqlitehandler.remove_feed, ix)
elif message.startswith("search "):
query = message[7:]
action = await initdb(jid, sqlitehandler.search_entries, query)
elif message.startswith("start"):
# action = "Updates are enabled."
key = "enabled"
val = 1
action = await initdb(jid, sqlitehandler.set_settings_value, [key, val])
2023-11-02 06:18:26 +01:00
asyncio.create_task(self.task_jid(jid))
# print(print_time(), "task_manager[jid]")
# print(task_manager[jid])
elif message.startswith("stats"):
2023-10-24 16:43:14 +02:00
action = await initdb(jid, sqlitehandler.statistics)
2023-11-02 06:18:26 +01:00
elif message.startswith("status "):
ix = message[7:]
action = await initdb(jid, sqlitehandler.toggle_status, ix)
elif message.startswith("stop"):
# action = "Updates are disabled."
try:
# task_manager[jid]["check"].cancel()
2023-11-02 06:18:26 +01:00
# task_manager[jid]["status"].cancel()
task_manager[jid]["interval"].cancel()
key = "enabled"
val = 0
action = await initdb(jid, sqlitehandler.set_settings_value, [key, val])
2023-11-02 06:18:26 +01:00
except:
action = "Updates are already disabled."
# print("Updates are already disabled. Nothing to do.")
# await self.send_status(jid)
await self.task_jid(jid)
2023-10-24 16:43:14 +02:00
else:
action = "Unknown command. Press \"help\" for list of commands"
# NOTE Message won't be sent if status is send before it
# Or it is because we cancel task send_status
2023-11-02 06:18:26 +01:00
if action: msg.reply(action).send()
2023-10-24 16:43:14 +02:00
2023-11-02 06:18:26 +01:00
async def select_file(self):
2023-10-24 16:43:14 +02:00
"""
Initiate actions by JID (Jabber ID).
:param self: Self
"""
while True:
db_dir = confighandler.get_default_dbdir()
if not os.path.isdir(db_dir):
msg = ("Slixfeed can not work without a database. \n"
"To create a database, follow these steps: \n"
"Add Slixfeed contact to your roster \n"
"Send a feed to the bot by: \n"
"add https://reclaimthenet.org/feed/")
2023-11-02 06:18:26 +01:00
print(print_time(), msg)
2023-10-24 16:43:14 +02:00
print(msg)
else:
os.chdir(db_dir)
files = os.listdir()
2023-11-02 06:18:26 +01:00
# TODO Use loop (with gather) instead of TaskGroup
# for file in files:
# if file.endswith(".db") and not file.endswith(".db-jour.db"):
# jid = file[:-3]
# jid_tasker[jid] = asyncio.create_task(self.task_jid(jid))
# await jid_tasker[jid]
async with asyncio.TaskGroup() as tg:
for file in files:
if file.endswith(".db") and not file.endswith(".db-jour.db"):
jid = file[:-3]
tg.create_task(self.task_jid(jid))
# task_manager.update({jid: tg})
async def task_jid(self, jid):
2023-10-24 16:43:14 +02:00
"""
JID (Jabber ID) task manager.
:param self: Self
:param jid: Jabber ID
"""
enabled = await initdb(
jid,
sqlitehandler.get_settings_value,
2023-11-02 06:18:26 +01:00
"enabled"
2023-10-24 16:43:14 +02:00
)
2023-11-02 06:18:26 +01:00
print(print_time(), "enabled", enabled, jid)
2023-10-24 16:43:14 +02:00
if enabled:
2023-11-02 06:18:26 +01:00
task_manager[jid] = {}
task_manager[jid]["check"] = asyncio.create_task(check_updates(jid))
task_manager[jid]["status"] = asyncio.create_task(self.send_status(jid))
task_manager[jid]["interval"] = asyncio.create_task(self.send_update(jid))
await task_manager[jid]["check"]
await task_manager[jid]["status"]
await task_manager[jid]["interval"]
2023-10-24 16:43:14 +02:00
else:
2023-11-02 06:18:26 +01:00
await self.send_status(jid)
2023-10-24 16:43:14 +02:00
2023-11-02 06:18:26 +01:00
async def send_update(self, jid):
2023-10-24 16:43:14 +02:00
"""
Send news items as messages.
:param self: Self
:param jid: Jabber ID
"""
new = await initdb(
jid,
sqlitehandler.get_entry_unread
)
if new:
2023-11-02 06:18:26 +01:00
print(print_time(), "> SEND UPDATE",jid)
2023-10-24 16:43:14 +02:00
self.send_message(
mto=jid,
mbody=new,
2023-11-02 06:18:26 +01:00
mtype="chat"
2023-10-24 16:43:14 +02:00
)
interval = await initdb(
jid,
sqlitehandler.get_settings_value,
2023-11-02 06:18:26 +01:00
"interval"
2023-10-24 16:43:14 +02:00
)
print(print_time(), "asyncio.get_event_loop().time()")
print(print_time(), asyncio.get_event_loop().time())
2023-10-24 16:43:14 +02:00
# await asyncio.sleep(60 * interval)
task_manager[jid]["interval"] = loop.call_at(
loop.time() + 60 * interval,
loop.create_task,
2023-11-02 06:18:26 +01:00
self.send_update(jid)
)
# loop.call_later(
# 60 * interval,
# loop.create_task,
# self.send_update(jid)
# )
2023-10-24 16:43:14 +02:00
2023-11-02 06:18:26 +01:00
async def send_status(self, jid):
2023-10-24 16:43:14 +02:00
"""
Send status message.
:param self: Self
:param jid: Jabber ID
"""
2023-11-02 06:18:26 +01:00
print(print_time(), "> SEND STATUS",jid)
2023-10-24 16:43:14 +02:00
unread = await initdb(
jid,
sqlitehandler.get_number_of_entries_unread
)
if unread:
2023-11-02 06:18:26 +01:00
status_text = "📰 News items: {}".format(str(unread))
status_mode = "chat"
2023-10-24 16:43:14 +02:00
else:
2023-11-02 06:18:26 +01:00
status_text = "🗞 No News"
status_mode = "available"
2023-10-24 16:43:14 +02:00
enabled = await initdb(
jid,
sqlitehandler.get_settings_value,
2023-11-02 06:18:26 +01:00
"enabled"
2023-10-24 16:43:14 +02:00
)
if not enabled:
2023-11-02 06:18:26 +01:00
status_mode = "xa"
2023-10-24 16:43:14 +02:00
2023-11-02 06:18:26 +01:00
# print(status_text, "for", jid)
2023-10-24 16:43:14 +02:00
self.send_presence(
2023-11-02 06:18:26 +01:00
pshow=status_mode,
pstatus=status_text,
2023-10-24 16:43:14 +02:00
pto=jid,
#pfrom=None
)
2023-11-02 06:18:26 +01:00
await asyncio.sleep(60 * 20)
2023-10-24 16:43:14 +02:00
# loop.call_at(
# loop.time() + 60 * 20,
# loop.create_task,
2023-11-02 06:18:26 +01:00
# self.send_status(jid)
# )
async def refresh_task(self, jid, key, val):
2023-10-24 16:43:14 +02:00
"""
Apply settings on runtime.
:param self: Self
:param jid: Jabber ID
:param key: Key
:param val: Value
"""
if jid in task_manager:
task_manager[jid][key].cancel()
2023-11-02 06:18:26 +01:00
task_manager[jid][key] = loop.call_at(
loop.time() + 60 * float(val),
loop.create_task,
self.send_update(jid)
2023-10-24 16:43:14 +02:00
)
# task_manager[jid][key] = loop.call_later(
# 60 * float(val),
# loop.create_task,
# self.send_update(jid)
# )
2023-11-02 06:18:26 +01:00
# task_manager[jid][key] = self.send_update.loop.call_at(
# self.send_update.loop.time() + 60 * val,
# self.send_update.loop.create_task,
# self.send_update(jid)
# )
# TODO Take this function out of
# <class 'slixmpp.clientxmpp.ClientXMPP'>
async def check_updates(jid):
"""
Start calling for update check up.
:param jid: Jabber ID
"""
while True:
print(print_time(), "> CHCK UPDATE",jid)
await initdb(jid, datahandler.download_updates)
await asyncio.sleep(60 * 90)
# Schedule to call this function again in 90 minutes
# loop.call_at(
# loop.time() + 60 * 90,
# loop.create_task,
2023-11-02 06:18:26 +01:00
# self.check_updates(jid)
# )
2023-10-24 16:43:14 +02:00
def print_help():
"""
Print help manual.
"""
msg = ("Slixfeed - News syndication bot for Jabber/XMPP \n"
"\n"
"DESCRIPTION: \n"
" Slixfeed is a news aggregator bot for online news feeds. \n"
" Supported filetypes: Atom, RDF and RSS. \n"
"\n"
"BASIC USAGE: \n"
" Start \n"
2023-11-02 06:18:26 +01:00
" Enable bot and send updates. \n"
" Stop \n"
" Disable bot and stop updates. \n"
2023-10-24 16:43:14 +02:00
" batch N \n"
2023-11-02 06:18:26 +01:00
" Send N updates for each interval. \n"
2023-10-24 16:43:14 +02:00
" interval N \n"
2023-11-02 06:18:26 +01:00
" Send an update every N minutes. \n"
2023-10-24 16:43:14 +02:00
" feed list \n"
" List subscriptions. \n"
"\n"
"EDIT OPTIONS: \n"
" add URL \n"
" Add URL to subscription list. \n"
" remove ID \n"
" Remove feed from subscription list. \n"
" status ID \n"
" Toggle update status of feed. \n"
"\n"
"SEARCH OPTIONS: \n"
" search TEXT \n"
" Search news items by given keywords. \n"
" recent N \n"
" List recent N news items (up to 50 items). \n"
"\n"
"STATISTICS OPTIONS: \n"
" analyses \n"
" Show report and statistics of feeds. \n"
" obsolete \n"
" List feeds that are not available. \n"
" unread \n"
" Print number of unread news items. \n"
"\n"
"BACKUP OPTIONS: \n"
" export opml \n"
" Send an OPML file with your feeds. \n"
" backup news html\n"
" Send an HTML formatted file of your news items. \n"
" backup news md \n"
" Send a Markdown file of your news items. \n"
" backup news text \n"
" Send a Plain Text file of your news items. \n"
"\n"
"DOCUMENTATION: \n"
" Slixfeed \n"
" https://gitgud.io/sjehuda/slixfeed \n"
" Slixmpp \n"
" https://slixmpp.readthedocs.io/ \n"
" feedparser \n"
" https://pythonhosted.org/feedparser")
return msg
# TODO Perhaps this needs to be executed
# just once per program execution
async def initdb(jid, callback, message=None):
"""
Callback function to instantiate action on database.
2023-11-02 06:18:26 +01:00
2023-10-24 16:43:14 +02:00
:param jid: JID (Jabber ID).
:param callback: Function name.
:param massage: Optional kwarg when a message is a part or required argument.
"""
db_dir = confighandler.get_default_dbdir()
if not os.path.isdir(db_dir):
os.mkdir(db_dir)
db_file = os.path.join(db_dir, r"{}.db".format(jid))
sqlitehandler.create_tables(db_file)
# await sqlitehandler.set_default_values(db_file)
if message:
return await callback(db_file, message)
else:
return await callback(db_file)