Restore support for disabling and enabling feeds.

Assign handlers to "self".
Towards better handling of subscription and unsubscription.
Minor fixes.
This commit is contained in:
Schimon Jehudah 2024-02-07 22:24:59 +00:00
parent 51e48c8389
commit 6c16fa4af1
11 changed files with 223 additions and 210 deletions

View file

@ -143,7 +143,7 @@ async def xmpp_start_updates(self, message, jid, jid_file):
else: else:
await sqlite.set_settings_value(db_file, [key, val]) await sqlite.set_settings_value(db_file, [key, val])
status_type = 'available' status_type = 'available'
status_message = '💡 Welcome back!' status_message = '📫 Welcome back!'
XmppPresence.send(self, jid, status_message, status_type=status_type) XmppPresence.send(self, jid, status_message, status_type=status_type)
message_body = 'Updates are enabled.' message_body = 'Updates are enabled.'
XmppMessage.send_reply(self, message, message_body) XmppMessage.send_reply(self, message, message_body)
@ -159,11 +159,11 @@ async def xmpp_stop_updates(self, message, jid, jid_file):
await sqlite.update_settings_value(db_file, [key, val]) await sqlite.update_settings_value(db_file, [key, val])
else: else:
await sqlite.set_settings_value(db_file, [key, val]) await sqlite.set_settings_value(db_file, [key, val])
await task.clean_tasks_xmpp(jid, ['interval', 'status']) task.clean_tasks_xmpp(self, jid, ['interval', 'status'])
message_body = 'Updates are disabled.' message_body = 'Updates are disabled.'
XmppMessage.send_reply(self, message, message_body) XmppMessage.send_reply(self, message, message_body)
status_type = 'xa' status_type = 'xa'
status_message = '💡 Send "Start" to receive Jabber updates' status_message = '📪 Send "Start" to receive Jabber updates'
XmppPresence.send(self, jid, status_message, status_type=status_type) XmppPresence.send(self, jid, status_message, status_type=status_type)
@ -1098,8 +1098,7 @@ async def download_document(self, message, jid, jid_file, message_text, ix_url,
url = None url = None
error = None error = None
response = None response = None
if ext in ('epub', 'html', 'markdown', if ext in ('epub', 'html', 'markdown', 'md', 'pdf', 'text', 'txt'):
'md', 'pdf', 'text', 'txt'):
match ext: match ext:
case 'markdown': case 'markdown':
ext = 'md' ext = 'md'

View file

@ -61,7 +61,7 @@ No operator was specified for this instance.
platforms = """ platforms = """
Supported platforms: XMPP Supported platforms: XMPP
Platforms to be added in future: Briar, Email, IRC, Matrix, MQTT, Nostr, Tox. Platforms to be added in future: ActivityPub, Briar, Email, IRC, LXMF, Matrix, MQTT, Nostr, Tox.
For ideal experience, we recommend using XMPP. For ideal experience, we recommend using XMPP.
""" """

View file

@ -628,7 +628,7 @@ async def get_number_of_feeds_active(db_file):
Returns Returns
------- -------
count : ? count : str
Number of rows. Number of rows.
""" """
with create_connection(db_file) as conn: with create_connection(db_file) as conn:
@ -1018,7 +1018,7 @@ async def update_statistics(cur):
cur.execute(sql, par) cur.execute(sql, par)
async def set_enabled_status(db_file, ix, status): async def set_enabled_status(db_file, feed_id, status):
""" """
Set status of feed to enabled or not enabled (i.e. disabled). Set status of feed to enabled or not enabled (i.e. disabled).
@ -1026,8 +1026,8 @@ async def set_enabled_status(db_file, ix, status):
---------- ----------
db_file : str db_file : str
Path to database file. Path to database file.
ix : str feed_id : str
Index of entry. Index of feed.
status : int status : int
0 or 1. 0 or 1.
""" """
@ -1038,12 +1038,12 @@ async def set_enabled_status(db_file, ix, status):
""" """
UPDATE feeds_state UPDATE feeds_state
SET enabled = :status SET enabled = :status
WHERE feed_id = :id WHERE feed_id = :feed_id
""" """
) )
par = { par = {
"status": status, "status": status,
"id": ix "feed_id": feed_id
} }
cur.execute(sql, par) cur.execute(sql, par)
@ -1395,7 +1395,7 @@ async def get_entries_of_feed(db_file, feed_id):
# "feed" urls that are enabled in table "status" # "feed" urls that are enabled in table "status"
async def get_feeds_url(db_file): async def get_feeds_url(db_file):
""" """
Query active feeds for URLs. Query table feeds for URLs.
Parameters Parameters
---------- ----------
@ -1419,6 +1419,34 @@ async def get_feeds_url(db_file):
return result return result
async def get_active_feeds_url(db_file):
"""
Query table feeds for active URLs.
Parameters
----------
db_file : str
Path to database file.
Returns
-------
result : list
URLs of active feeds.
"""
with create_connection(db_file) as conn:
cur = conn.cursor()
sql = (
"""
SELECT feeds.url
FROM feeds
INNER JOIN feeds_state ON feeds.id = feeds_state.feed_id
WHERE feeds_state.enabled = 1
"""
)
result = cur.execute(sql).fetchall()
return result
async def get_feeds(db_file): async def get_feeds(db_file):
""" """
Query table feeds and list items. Query table feeds and list items.

View file

@ -53,9 +53,9 @@ from slixfeed.xmpp.connect import XmppConnect
from slixfeed.xmpp.utility import get_chat_type from slixfeed.xmpp.utility import get_chat_type
import time import time
main_task = [] # main_task = []
jid_tasker = {} # jid_tasker = {}
task_manager = {} # task_manager = {}
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
@ -71,12 +71,12 @@ loop = asyncio.get_event_loop()
def ping_task(self): def ping_task(self):
global ping_task_instance # global ping_task_instance
try: try:
ping_task_instance.cancel() self.ping_task_instance.cancel()
except: except:
logging.info('No ping task to cancel.') logging.info('No ping task to cancel.')
ping_task_instance = asyncio.create_task(XmppConnect.ping(self)) self.ping_task_instance = asyncio.create_task(XmppConnect.ping(self))
""" """
@ -94,21 +94,28 @@ await taskhandler.start_tasks(
""" """
async def start_tasks_xmpp(self, jid, tasks=None): async def start_tasks_xmpp(self, jid, tasks=None):
"""
NOTE
For proper activation of tasks involving task 'interval', it is essential
to place task 'interval' as the last to start due to await asyncio.sleep()
which otherwise would postpone tasks that would be set after task 'interval'
"""
if jid == self.boundjid.bare: if jid == self.boundjid.bare:
return return
try: try:
task_manager[jid] self.task_manager[jid]
except KeyError as e: except KeyError as e:
task_manager[jid] = {} self.task_manager[jid] = {}
logging.debug('KeyError:', str(e)) logging.debug('KeyError:', str(e))
logging.info('Creating new task manager for JID {}'.format(jid)) logging.info('Creating new task manager for JID {}'.format(jid))
if not tasks: if not tasks:
tasks = ['status', 'check', 'interval'] tasks = ['status', 'check', 'interval']
logging.info('Stopping tasks {} for JID {}'.format(tasks, jid)) logging.info('Stopping tasks {} for JID {}'.format(tasks, jid))
for task in tasks: for task in tasks:
# if task_manager[jid][task]: # if self.task_manager[jid][task]:
try: try:
task_manager[jid][task].cancel() self.task_manager[jid][task].cancel()
except: except:
logging.info('No task {} for JID {} (start_tasks_xmpp)' logging.info('No task {} for JID {} (start_tasks_xmpp)'
.format(task, jid)) .format(task, jid))
@ -120,10 +127,10 @@ async def start_tasks_xmpp(self, jid, tasks=None):
# breakpoint() # breakpoint()
match task: match task:
case 'check': case 'check':
task_manager[jid]['check'] = asyncio.create_task( self.task_manager[jid]['check'] = asyncio.create_task(
check_updates(jid)) check_updates(jid))
case 'status': case 'status':
task_manager[jid]['status'] = asyncio.create_task( self.task_manager[jid]['status'] = asyncio.create_task(
send_status(self, jid)) send_status(self, jid))
case 'interval': case 'interval':
jid_file = jid.replace('/', '_') jid_file = jid.replace('/', '_')
@ -152,11 +159,11 @@ async def start_tasks_xmpp(self, jid, tasks=None):
await sqlite.update_last_update_time(db_file) await sqlite.update_last_update_time(db_file)
else: else:
await sqlite.set_last_update_time(db_file) await sqlite.set_last_update_time(db_file)
task_manager[jid]['interval'] = asyncio.create_task( self.task_manager[jid]['interval'] = asyncio.create_task(
send_update(self, jid)) send_update(self, jid))
# for task in task_manager[jid].values(): # for task in self.task_manager[jid].values():
# print("task_manager[jid].values()") # print("task_manager[jid].values()")
# print(task_manager[jid].values()) # print(self.task_manager[jid].values())
# print("task") # print("task")
# print(task) # print(task)
# print("jid") # print("jid")
@ -165,14 +172,14 @@ async def start_tasks_xmpp(self, jid, tasks=None):
# await task # await task
async def clean_tasks_xmpp(jid, tasks=None): def clean_tasks_xmpp(self, jid, tasks=None):
if not tasks: if not tasks:
tasks = ['interval', 'status', 'check'] tasks = ['interval', 'status', 'check']
logging.info('Stopping tasks {} for JID {}'.format(tasks, jid)) logging.info('Stopping tasks {} for JID {}'.format(tasks, jid))
for task in tasks: for task in tasks:
# if task_manager[jid][task]: # if self.task_manager[jid][task]:
try: try:
task_manager[jid][task].cancel() self.task_manager[jid][task].cancel()
except: except:
logging.debug('No task {} for JID {} (clean_tasks_xmpp)' logging.debug('No task {} for JID {} (clean_tasks_xmpp)'
.format(task, jid)) .format(task, jid))
@ -230,6 +237,8 @@ async def send_update(self, jid, num=None):
media = await action.extract_image_from_html(url) media = await action.extract_image_from_html(url)
if media and news_digest: if media and news_digest:
print('SENDING MESSAGE (if media and news_digest)')
print(news_digest)
# Send textual message # Send textual message
XmppMessage.send(self, jid, news_digest, chat_type) XmppMessage.send(self, jid, news_digest, chat_type)
news_digest = '' news_digest = ''
@ -238,6 +247,8 @@ async def send_update(self, jid, num=None):
media = None media = None
if news_digest: if news_digest:
print('SENDING MESSAGE (if news_digest)')
print(news_digest)
# TODO Add while loop to assure delivery. # TODO Add while loop to assure delivery.
# print(await current_time(), ">>> ACT send_message",jid) # print(await current_time(), ">>> ACT send_message",jid)
# NOTE Do we need "if statement"? See NOTE at is_muc. # NOTE Do we need "if statement"? See NOTE at is_muc.
@ -255,14 +266,13 @@ async def send_update(self, jid, num=None):
# TODO Do not refresh task before # TODO Do not refresh task before
# verifying that it was completed. # verifying that it was completed.
await refresh_task( await refresh_task(self, jid, send_update, 'interval')
self, jid, send_update, 'interval')
# interval = await initdb( # interval = await initdb(
# jid, # jid,
# sqlite.get_settings_value, # sqlite.get_settings_value,
# "interval" # "interval"
# ) # )
# task_manager[jid]["interval"] = loop.call_at( # self.task_manager[jid]["interval"] = loop.call_at(
# loop.time() + 60 * interval, # loop.time() + 60 * interval,
# loop.create_task, # loop.create_task,
# send_update(jid) # send_update(jid)
@ -298,7 +308,7 @@ async def send_status(self, jid):
enabled = await config.get_setting_value(db_file, 'enabled') enabled = await config.get_setting_value(db_file, 'enabled')
if not enabled: if not enabled:
status_mode = 'xa' status_mode = 'xa'
status_text = '📫 Send "Start" to receive updates' status_text = '📪 Send "Start" to receive updates'
else: else:
feeds = await sqlite.get_number_of_items(db_file, 'feeds') feeds = await sqlite.get_number_of_items(db_file, 'feeds')
# print(await current_time(), jid, "has", feeds, "feeds") # print(await current_time(), jid, "has", feeds, "feeds")
@ -350,29 +360,29 @@ async def refresh_task(self, jid, callback, key, val=None):
jid_file = jid.replace('/', '_') jid_file = jid.replace('/', '_')
db_file = config.get_pathname_to_database(jid_file) db_file = config.get_pathname_to_database(jid_file)
val = await config.get_setting_value(db_file, key) val = await config.get_setting_value(db_file, key)
# if task_manager[jid][key]: # if self.task_manager[jid][key]:
if jid in task_manager: if jid in self.task_manager:
try: try:
task_manager[jid][key].cancel() self.task_manager[jid][key].cancel()
except: except:
logging.info('No task of type {} to cancel for ' logging.info('No task of type {} to cancel for '
'JID {} (refresh_task)'.format(key, jid) 'JID {} (refresh_task)'.format(key, jid)
) )
# task_manager[jid][key] = loop.call_at( # self.task_manager[jid][key] = loop.call_at(
# loop.time() + 60 * float(val), # loop.time() + 60 * float(val),
# loop.create_task, # loop.create_task,
# (callback(self, jid)) # (callback(self, jid))
# # send_update(jid) # # send_update(jid)
# ) # )
task_manager[jid][key] = loop.create_task( self.task_manager[jid][key] = loop.create_task(
wait_and_run(self, callback, jid, val) wait_and_run(self, callback, jid, val)
) )
# task_manager[jid][key] = loop.call_later( # self.task_manager[jid][key] = loop.call_later(
# 60 * float(val), # 60 * float(val),
# loop.create_task, # loop.create_task,
# send_update(jid) # send_update(jid)
# ) # )
# task_manager[jid][key] = send_update.loop.call_at( # self.task_manager[jid][key] = send_update.loop.call_at(
# send_update.loop.time() + 60 * val, # send_update.loop.time() + 60 * val,
# send_update.loop.create_task, # send_update.loop.create_task,
# send_update(jid) # send_update(jid)
@ -399,7 +409,7 @@ async def check_updates(jid):
while True: while True:
jid_file = jid.replace('/', '_') jid_file = jid.replace('/', '_')
db_file = config.get_pathname_to_database(jid_file) db_file = config.get_pathname_to_database(jid_file)
urls = await sqlite.get_feeds_url(db_file) urls = await sqlite.get_active_feeds_url(db_file)
for url in urls: for url in urls:
await action.scan(db_file, url) await action.scan(db_file, url)
val = config.get_value('settings', 'Settings', 'check') val = config.get_value('settings', 'Settings', 'check')
@ -416,7 +426,7 @@ async def check_updates(jid):
NOTE NOTE
This is an older system, utilizing local storage instead of XMPP presence. This is an older system, utilizing local storage instead of XMPP presence.
This function is good for use with protocols that might not have presence. This function is good for use with protocols that might not have presence.
ActivityPub, IRC, LXMF, Matrix, SMTP, Tox. ActivityPub, IRC, LXMF, Matrix, Nostr, SMTP, Tox.
""" """
async def select_file(self): async def select_file(self):
""" """
@ -446,10 +456,8 @@ async def select_file(self):
if (file.endswith('.db') and if (file.endswith('.db') and
not file.endswith('.db-jour.db')): not file.endswith('.db-jour.db')):
jid = file[:-3] jid = file[:-3]
main_task.extend( main_task.extend([tg.create_task(self.task_jid(jid))])
[tg.create_task(self.task_jid(jid))]
)
# main_task = [tg.create_task(self.task_jid(jid))] # main_task = [tg.create_task(self.task_jid(jid))]
# task_manager.update({jid: tg}) # self.task_manager.update({jid: tg})

View file

@ -1,2 +1,2 @@
__version__ = '0.1.1' __version__ = '0.1.2'
__version_info__ = (0, 1, 1) __version_info__ = (0, 1, 2)

View file

@ -100,12 +100,20 @@ class Slixfeed(slixmpp.ClientXMPP):
# NOTE # NOTE
# The bot works fine when the nickname is hardcoded; or # The bot works fine when the nickname is hardcoded; or
# The bot won't join some MUCs when its nickname has brackets # The bot won't join some MUCs when its nickname has brackets
# Handler for nickname
self.alias = alias self.alias = alias
# The session_start event will be triggered when
# the bot establishes its connection with the server # Handlers for tasks
# and the XML streams are ready for use. We want to self.task_manager = {}
# listen for this event so that we we can initialize
# our roster. # Handlers for ping
self.ping_task_instance = {}
# Handlers for connection events
self.connection_attempts = 0
self.max_connection_attempts = 10
self.add_event_handler("session_start", self.add_event_handler("session_start",
self.on_session_start) self.on_session_start)
self.add_event_handler("session_resumed", self.add_event_handler("session_resumed",
@ -158,9 +166,6 @@ class Slixfeed(slixmpp.ClientXMPP):
# Initialize event loop # Initialize event loop
# self.loop = asyncio.get_event_loop() # self.loop = asyncio.get_event_loop()
# handlers for connection events
self.connection_attempts = 0
self.max_connection_attempts = 10
self.add_event_handler('connection_failed', self.add_event_handler('connection_failed',
self.on_connection_failed) self.on_connection_failed)
self.add_event_handler('session_end', self.add_event_handler('session_end',
@ -174,8 +179,7 @@ class Slixfeed(slixmpp.ClientXMPP):
muc_jid = message['groupchat_invite']['jid'] muc_jid = message['groupchat_invite']['jid']
await XmppBookmark.add(self, muc_jid) await XmppBookmark.add(self, muc_jid)
await XmppGroupchat.join(self, inviter, muc_jid) await XmppGroupchat.join(self, inviter, muc_jid)
message_body = ('Greetings!\n' message_body = ('Greetings! I am {}, the news anchor.\n'
'I am {}, the news anchor.\n'
'My job is to bring you the latest ' 'My job is to bring you the latest '
'news from sources you provide me with.\n' 'news from sources you provide me with.\n'
'You may always reach me via xmpp:{}?message' 'You may always reach me via xmpp:{}?message'
@ -189,8 +193,7 @@ class Slixfeed(slixmpp.ClientXMPP):
muc_jid = message['groupchat_invite']['jid'] muc_jid = message['groupchat_invite']['jid']
await XmppBookmark.add(self, muc_jid) await XmppBookmark.add(self, muc_jid)
await XmppGroupchat.join(self, inviter, muc_jid) await XmppGroupchat.join(self, inviter, muc_jid)
message_body = ('Greetings!\n' message_body = ('Greetings! I am {}, the news anchor.\n'
'I am {}, the news anchor.\n'
'My job is to bring you the latest ' 'My job is to bring you the latest '
'news from sources you provide me with.\n' 'news from sources you provide me with.\n'
'You may always reach me via xmpp:{}?message' 'You may always reach me via xmpp:{}?message'
@ -260,14 +263,14 @@ class Slixfeed(slixmpp.ClientXMPP):
# await task.check_readiness(self, presence) # await task.check_readiness(self, presence)
jid = presence['from'].bare jid = presence['from'].bare
if presence['show'] in ('away', 'dnd', 'xa'): if presence['show'] in ('away', 'dnd', 'xa'):
await task.clean_tasks_xmpp(jid, ['interval']) task.clean_tasks_xmpp(self, jid, ['interval'])
await task.start_tasks_xmpp(self, jid, ['status', 'check']) await task.start_tasks_xmpp(self, jid, ['status', 'check'])
async def on_presence_subscribe(self, presence): async def on_presence_subscribe(self, presence):
jid = presence['from'].bare jid = presence['from'].bare
if not self.client_roster[jid]['to']: if not self.client_roster[jid]['to']:
XmppPresence.subscribe(self, jid) XmppPresence.subscription(self, jid, 'subscribe')
await XmppRoster.add(self, jid) await XmppRoster.add(self, jid)
status_message = '✒️ Share online status to receive updates' status_message = '✒️ Share online status to receive updates'
XmppPresence.send(self, jid, status_message) XmppPresence.send(self, jid, status_message)
@ -280,8 +283,7 @@ class Slixfeed(slixmpp.ClientXMPP):
async def on_presence_subscribed(self, presence): async def on_presence_subscribed(self, presence):
jid = presence['from'].bare jid = presence['from'].bare
message_subject = 'RSS News Bot' message_subject = 'RSS News Bot'
message_body = ('Greetings!\n' message_body = ('Greetings! I am {}, the news anchor.\n'
'I am {}, the news anchor.\n'
'My job is to bring you the latest ' 'My job is to bring you the latest '
'news from sources you provide me with.\n' 'news from sources you provide me with.\n'
'You may always reach me via xmpp:{}?message' 'You may always reach me via xmpp:{}?message'
@ -304,17 +306,19 @@ class Slixfeed(slixmpp.ClientXMPP):
async def on_presence_unsubscribed(self, presence): async def on_presence_unsubscribed(self, presence):
jid = presence['from'].bare jid = presence['from'].bare
message_body = 'You have been unsubscribed.' message_body = 'You have been unsubscribed.'
status_message = '🖋️ Subscribe to receive updates' # status_message = '🖋️ Subscribe to receive updates'
# status_message = None
XmppMessage.send(self, jid, message_body, 'chat') XmppMessage.send(self, jid, message_body, 'chat')
XmppPresence.send(self, jid, status_message, XmppPresence.subscription(self, jid, 'unsubscribe')
presence_type='unsubscribed') # XmppPresence.send(self, jid, status_message,
# presence_type='unsubscribed')
await XmppRoster.remove(self, jid) await XmppRoster.remove(self, jid)
async def on_presence_unavailable(self, presence): async def on_presence_unavailable(self, presence):
jid = presence['from'].bare jid = presence['from'].bare
# await task.stop_tasks(self, jid) # await task.stop_tasks(self, jid)
await task.clean_tasks_xmpp(jid) task.clean_tasks_xmpp(self, jid)
# TODO # TODO
@ -326,7 +330,7 @@ class Slixfeed(slixmpp.ClientXMPP):
print("on_presence_error") print("on_presence_error")
print(presence) print(presence)
jid = presence["from"].bare jid = presence["from"].bare
await task.clean_tasks_xmpp(jid) task.clean_tasks_xmpp(self, jid)
async def on_reactions(self, message): async def on_reactions(self, message):
@ -337,36 +341,36 @@ class Slixfeed(slixmpp.ClientXMPP):
async def on_chatstate_active(self, message): async def on_chatstate_active(self, message):
if message['type'] in ('chat', 'normal'): if message['type'] in ('chat', 'normal'):
jid = message['from'].bare jid = message['from'].bare
# await task.clean_tasks_xmpp(jid, ['status']) # task.clean_tasks_xmpp(self, jid, ['status'])
await task.start_tasks_xmpp(self, jid, ['status']) await task.start_tasks_xmpp(self, jid, ['status'])
async def on_chatstate_composing(self, message): async def on_chatstate_composing(self, message):
if message['type'] in ('chat', 'normal'): if message['type'] in ('chat', 'normal'):
jid = message['from'].bare jid = message['from'].bare
# await task.clean_tasks_xmpp(jid, ['status']) # task.clean_tasks_xmpp(self, jid, ['status'])
status_message='Press "help" for manual, or "info" for information.' status_message='💡 Press "help" for manual, or "info" for information.'
XmppPresence.send(self, jid, status_message) XmppPresence.send(self, jid, status_message)
async def on_chatstate_gone(self, message): async def on_chatstate_gone(self, message):
if message['type'] in ('chat', 'normal'): if message['type'] in ('chat', 'normal'):
jid = message['from'].bare jid = message['from'].bare
# await task.clean_tasks_xmpp(jid, ['status']) # task.clean_tasks_xmpp(self, jid, ['status'])
await task.start_tasks_xmpp(self, jid, ['status']) await task.start_tasks_xmpp(self, jid, ['status'])
async def on_chatstate_inactive(self, message): async def on_chatstate_inactive(self, message):
if message['type'] in ('chat', 'normal'): if message['type'] in ('chat', 'normal'):
jid = message['from'].bare jid = message['from'].bare
# await task.clean_tasks_xmpp(jid, ['status']) # task.clean_tasks_xmpp(self, jid, ['status'])
await task.start_tasks_xmpp(self, jid, ['status']) await task.start_tasks_xmpp(self, jid, ['status'])
async def on_chatstate_paused(self, message): async def on_chatstate_paused(self, message):
if message['type'] in ('chat', 'normal'): if message['type'] in ('chat', 'normal'):
jid = message['from'].bare jid = message['from'].bare
# await task.clean_tasks_xmpp(jid, ['status']) # task.clean_tasks_xmpp(self, jid, ['status'])
await task.start_tasks_xmpp(self, jid, ['status']) await task.start_tasks_xmpp(self, jid, ['status'])

View file

@ -3,44 +3,10 @@
""" """
FIXME
1) Function check_readiness or event "changed_status" is causing for
triple status messages and also false ones that indicate of lack
of feeds.
TODO TODO
1) Use loop (with gather) instead of TaskGroup. 1) Look into self.set_jid in order to be able to join to groupchats
https://slixmpp.readthedocs.io/en/latest/api/basexmpp.html#slixmpp.basexmpp.BaseXMPP.set_jid
2) Assure message delivery before calling a new task.
See https://slixmpp.readthedocs.io/en/latest/event_index.html#term-marker_acknowledged
3) XHTTML-IM
case _ if message_lowercase.startswith("html"):
message['html']="
Parse me!
"
self.send_message(
mto=jid,
mfrom=self.boundjid.bare,
mhtml=message
)
NOTE
1) Self presence
Apparently, it is possible to view self presence.
This means that there is no need to store presences in order to switch or restore presence.
check_readiness
📂 Send a URL from a blog or a news website.
JID: self.boundjid.bare
MUC: self.alias
2) Extracting attribute using xmltodict.
import xmltodict
message = xmltodict.parse(str(message))
jid = message["message"]["x"]["@jid"]
""" """
@ -97,11 +63,23 @@ class SlixfeedComponent(slixmpp.ComponentXMPP):
def __init__(self, jid, secret, hostname, port, alias=None): def __init__(self, jid, secret, hostname, port, alias=None):
slixmpp.ComponentXMPP.__init__(self, jid, secret, hostname, port) slixmpp.ComponentXMPP.__init__(self, jid, secret, hostname, port)
# The session_start event will be triggered when # NOTE
# the bot establishes its connection with the server # The bot works fine when the nickname is hardcoded; or
# and the XML streams are ready for use. We want to # The bot won't join some MUCs when its nickname has brackets
# listen for this event so that we we can initialize
# our roster. # Handler for nickname
self.alias = alias
# Handlers for tasks
self.task_manager = {}
# Handlers for ping
self.ping_task_instance = {}
# Handlers for connection events
self.connection_attempts = 0
self.max_connection_attempts = 10
self.add_event_handler("session_start", self.add_event_handler("session_start",
self.on_session_start) self.on_session_start)
self.add_event_handler("session_resumed", self.add_event_handler("session_resumed",
@ -154,9 +132,6 @@ class SlixfeedComponent(slixmpp.ComponentXMPP):
# Initialize event loop # Initialize event loop
# self.loop = asyncio.get_event_loop() # self.loop = asyncio.get_event_loop()
# handlers for connection events
self.connection_attempts = 0
self.max_connection_attempts = 10
self.add_event_handler('connection_failed', self.add_event_handler('connection_failed',
self.on_connection_failed) self.on_connection_failed)
self.add_event_handler('session_end', self.add_event_handler('session_end',
@ -241,21 +216,20 @@ class SlixfeedComponent(slixmpp.ComponentXMPP):
# await task.check_readiness(self, presence) # await task.check_readiness(self, presence)
jid = presence['from'].bare jid = presence['from'].bare
if presence['show'] in ('away', 'dnd', 'xa'): if presence['show'] in ('away', 'dnd', 'xa'):
await task.clean_tasks_xmpp(jid, ['interval']) task.clean_tasks_xmpp(self, jid, ['interval'])
await task.start_tasks_xmpp(self, jid, ['status', 'check']) await task.start_tasks_xmpp(self, jid, ['status', 'check'])
async def on_presence_subscribe(self, presence): async def on_presence_subscribe(self, presence):
jid = presence['from'].bare jid = presence['from'].bare
# XmppPresence.request(self, jid) # XmppPresence.request(self, jid)
XmppPresence.subscribe(self, jid) XmppPresence.subscription(self, jid, 'subscribe')
async def on_presence_subscribed(self, presence): async def on_presence_subscribed(self, presence):
jid = presence['from'].bare jid = presence['from'].bare
message_subject = 'RSS News Bot' message_subject = 'RSS News Bot'
message_body = ('Greetings!\n' message_body = ('Greetings! I am {}, the news anchor.\n'
'I am {}, the news anchor.\n'
'My job is to bring you the latest ' 'My job is to bring you the latest '
'news from sources you provide me with.\n' 'news from sources you provide me with.\n'
'You may always reach me via xmpp:{}?message' 'You may always reach me via xmpp:{}?message'
@ -278,16 +252,18 @@ class SlixfeedComponent(slixmpp.ComponentXMPP):
async def on_presence_unsubscribed(self, presence): async def on_presence_unsubscribed(self, presence):
jid = presence['from'].bare jid = presence['from'].bare
message_body = 'You have been unsubscribed.' message_body = 'You have been unsubscribed.'
status_message = '🖋️ Subscribe to receive updates' # status_message = '🖋️ Subscribe to receive updates'
# status_message = None
XmppMessage.send(self, jid, message_body, 'chat') XmppMessage.send(self, jid, message_body, 'chat')
XmppPresence.send(self, jid, status_message, XmppPresence.subscription(self, jid, 'unsubscribe')
presence_type='unsubscribed') # XmppPresence.send(self, jid, status_message,
# presence_type='unsubscribed')
async def on_presence_unavailable(self, presence): async def on_presence_unavailable(self, presence):
jid = presence['from'].bare jid = presence['from'].bare
# await task.stop_tasks(self, jid) # await task.stop_tasks(self, jid)
await task.clean_tasks_xmpp(jid) task.clean_tasks_xmpp(self, jid)
# TODO # TODO
@ -299,7 +275,7 @@ class SlixfeedComponent(slixmpp.ComponentXMPP):
print("on_presence_error") print("on_presence_error")
print(presence) print(presence)
jid = presence["from"].bare jid = presence["from"].bare
await task.clean_tasks_xmpp(jid) task.clean_tasks_xmpp(self, jid)
async def on_reactions(self, message): async def on_reactions(self, message):
@ -310,36 +286,36 @@ class SlixfeedComponent(slixmpp.ComponentXMPP):
async def on_chatstate_active(self, message): async def on_chatstate_active(self, message):
if message['type'] in ('chat', 'normal'): if message['type'] in ('chat', 'normal'):
jid = message['from'].bare jid = message['from'].bare
# await task.clean_tasks_xmpp(jid, ['status']) # task.clean_tasks_xmpp(self, jid, ['status'])
await task.start_tasks_xmpp(self, jid, ['status']) await task.start_tasks_xmpp(self, jid, ['status'])
async def on_chatstate_composing(self, message): async def on_chatstate_composing(self, message):
if message['type'] in ('chat', 'normal'): if message['type'] in ('chat', 'normal'):
jid = message['from'].bare jid = message['from'].bare
# await task.clean_tasks_xmpp(jid, ['status']) # task.clean_tasks_xmpp(self, jid, ['status'])
status_message='Press "help" for manual, or "info" for information.' status_message='💡 Press "help" for manual, or "info" for information.'
XmppPresence.send(self, jid, status_message) XmppPresence.send(self, jid, status_message)
async def on_chatstate_gone(self, message): async def on_chatstate_gone(self, message):
if message['type'] in ('chat', 'normal'): if message['type'] in ('chat', 'normal'):
jid = message['from'].bare jid = message['from'].bare
# await task.clean_tasks_xmpp(jid, ['status']) # task.clean_tasks_xmpp(self, jid, ['status'])
await task.start_tasks_xmpp(self, jid, ['status']) await task.start_tasks_xmpp(self, jid, ['status'])
async def on_chatstate_inactive(self, message): async def on_chatstate_inactive(self, message):
if message['type'] in ('chat', 'normal'): if message['type'] in ('chat', 'normal'):
jid = message['from'].bare jid = message['from'].bare
# await task.clean_tasks_xmpp(jid, ['status']) # task.clean_tasks_xmpp(self, jid, ['status'])
await task.start_tasks_xmpp(self, jid, ['status']) await task.start_tasks_xmpp(self, jid, ['status'])
async def on_chatstate_paused(self, message): async def on_chatstate_paused(self, message):
if message['type'] in ('chat', 'normal'): if message['type'] in ('chat', 'normal'):
jid = message['from'].bare jid = message['from'].bare
# await task.clean_tasks_xmpp(jid, ['status']) # task.clean_tasks_xmpp(self, jid, ['status'])
await task.start_tasks_xmpp(self, jid, ['status']) await task.start_tasks_xmpp(self, jid, ['status'])

View file

@ -24,6 +24,28 @@ NOTE
See XEP-0367: Message Attaching See XEP-0367: Message Attaching
FIXME
ERROR:asyncio:Task exception was never retrieved
future: <Task finished name='Task-3410' coro=<send_update() done, defined at /home/admin/.venv/lib/python3.11/site-packages/slixfeed/task.py:181> exception=ParseError('not well-formed (invalid token): line 1, column 198')>
Traceback (most recent call last):
File "/home/jojo/.venv/lib/python3.11/site-packages/slixfeed/task.py", line 237, in send_update
XmppMessage.send_oob(self, jid, media, chat_type)
File "/home/jojo/.venv/lib/python3.11/site-packages/slixfeed/xmpp/message.py", line 56, in send_oob
message = self.make_message(mto=jid,
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/jojo/.venv/lib/python3.11/site-packages/slixmpp/basexmpp.py", line 517, in make_message
message['html']['body'] = mhtml
~~~~~~~~~~~~~~~^^^^^^^^
File "/home/jojo/.venv/lib/python3.11/site-packages/slixmpp/xmlstream/stanzabase.py", line 792, in __setitem__
getattr(self, set_method)(value, **kwargs)
File "/home/jojo/.venv/lib/python3.11/site-packages/slixmpp/plugins/xep_0071/stanza.py", line 38, in set_body
xhtml = ET.fromstring(content)
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/xml/etree/ElementTree.py", line 1338, in XML
parser.feed(text)
xml.etree.ElementTree.ParseError: not well-formed (invalid token): line 1, column 198
""" """
class XmppMessage: class XmppMessage:
@ -50,16 +72,20 @@ class XmppMessage:
def send_oob(self, jid, url, chat_type): def send_oob(self, jid, url, chat_type):
html = ( try:
f'<body xmlns="http://www.w3.org/1999/xhtml">' html = (
f'<a href="{url}">{url}</a></body>') f'<body xmlns="http://www.w3.org/1999/xhtml">'
message = self.make_message(mto=jid, f'<a href="{url}">{url}</a></body>')
mfrom=self.boundjid.bare, message = self.make_message(mto=jid,
mbody=url, mfrom=self.boundjid.bare,
mhtml=html, mbody=url,
mtype=chat_type) mhtml=html,
message['oob']['url'] = url mtype=chat_type)
message.send() message['oob']['url'] = url
message.send()
except:
logging.error('ERROR!')
logging.error(jid, url, chat_type, html)
# FIXME Solve this function # FIXME Solve this function

View file

@ -17,8 +17,6 @@ FIXME
""" """
import logging import logging
from slixfeed.dt import current_time
from slixfeed.xmpp.message import XmppMessage
class XmppGroupchat: class XmppGroupchat:

View file

@ -7,6 +7,10 @@ NOTE
Accept symbols 🉑 👍 Accept symbols 🉑 👍
TODO
Remove subscription from JID that do not (stopped) share presence.
""" """
class XmppPresence: class XmppPresence:
@ -20,18 +24,8 @@ class XmppPresence:
ptype=presence_type) ptype=presence_type)
def subscribe(self, jid): def subscription(self, jid, presence_type):
self.send_presence_subscription(pto=jid, self.send_presence_subscription(pto=jid,
pfrom=self.boundjid.bare, pfrom=self.boundjid.bare,
ptype='subscribe', ptype=presence_type,
pnick=self.alias) pnick=self.alias)
def remove(self):
"""
Remove subscription from JID that do not (stopped) share presence.
Returns
-------
None.
"""

View file

@ -86,7 +86,7 @@ async def message(self, message):
if (message_text.lower().startswith('http') and if (message_text.lower().startswith('http') and
message_text.lower().endswith('.opml')): message_text.lower().endswith('.opml')):
url = message_text url = message_text
await task.clean_tasks_xmpp(jid, ['status']) task.clean_tasks_xmpp(self, jid, ['status'])
status_type = 'dnd' status_type = 'dnd'
status_message = '📥️ Procesing request to import feeds...' status_message = '📥️ Procesing request to import feeds...'
XmppPresence.send(self, jid, status_message, XmppPresence.send(self, jid, status_message,
@ -97,7 +97,7 @@ async def message(self, message):
response = 'Successfully imported {} feeds.'.format(count) response = 'Successfully imported {} feeds.'.format(count)
else: else:
response = 'OPML file was not imported.' response = 'OPML file was not imported.'
# await task.clean_tasks_xmpp(jid, ['status']) # task.clean_tasks_xmpp(self, jid, ['status'])
await task.start_tasks_xmpp(self, jid, ['status']) await task.start_tasks_xmpp(self, jid, ['status'])
XmppMessage.send_reply(self, message, response) XmppMessage.send_reply(self, message, response)
return return
@ -296,7 +296,7 @@ async def message(self, message):
await action.scan(db_file, url) await action.scan(db_file, url)
old = await config.get_setting_value(db_file, "old") old = await config.get_setting_value(db_file, "old")
if old: if old:
# await task.clean_tasks_xmpp(jid, ['status']) # task.clean_tasks_xmpp(self, jid, ['status'])
# await send_status(jid) # await send_status(jid)
await task.start_tasks_xmpp(self, jid, ['status']) await task.start_tasks_xmpp(self, jid, ['status'])
else: else:
@ -489,33 +489,25 @@ async def message(self, message):
# case _ if (message_lowercase.startswith('http')) and( # case _ if (message_lowercase.startswith('http')) and(
# message_lowercase.endswith('.opml')): # message_lowercase.endswith('.opml')):
# url = message_text # url = message_text
# await task.clean_tasks_xmpp( # task.clean_tasks_xmpp(self, jid, ['status'])
# jid, ['status'])
# status_type = 'dnd' # status_type = 'dnd'
# status_message = ( # status_message = '📥️ Procesing request to import feeds...'
# '📥️ Procesing request to import feeds...' # XmppPresence.send(self, jid, status_message,
# ) # status_type=status_type)
# XmppPresence.send(
# self, jid, status_message, status_type=status_type)
# db_file = config.get_pathname_to_database(jid_file) # db_file = config.get_pathname_to_database(jid_file)
# count = await action.import_opml(db_file, url) # count = await action.import_opml(db_file, url)
# if count: # if count:
# response = ( # response = ('Successfully imported {} feeds.'
# 'Successfully imported {} feeds.' # .format(count))
# ).format(count)
# else: # else:
# response = ( # response = 'OPML file was not imported.'
# 'OPML file was not imported.' # task.clean_tasks_xmpp(self, jid, ['status'])
# ) # await task.start_tasks_xmpp(self, jid, ['status'])
# await task.clean_tasks_xmpp(
# jid, ['status'])
# await task.start_tasks_xmpp(
# self, jid, ['status'])
# XmppMessage.send_reply(self, message, response) # XmppMessage.send_reply(self, message, response)
case _ if (message_lowercase.startswith('http') or case _ if (message_lowercase.startswith('http') or
message_lowercase.startswith('feed:')): message_lowercase.startswith('feed:')):
url = message_text url = message_text
# await task.clean_tasks_xmpp(jid, ['status']) # task.clean_tasks_xmpp(self, jid, ['status'])
status_type = 'dnd' status_type = 'dnd'
status_message = ('📫️ Processing request ' status_message = ('📫️ Processing request '
'to fetch data from {}' 'to fetch data from {}'
@ -528,7 +520,7 @@ async def message(self, message):
db_file = config.get_pathname_to_database(jid_file) db_file = config.get_pathname_to_database(jid_file)
# try: # try:
response = await action.add_feed(db_file, url) response = await action.add_feed(db_file, url)
# await task.clean_tasks_xmpp(jid, ['status']) # task.clean_tasks_xmpp(self, jid, ['status'])
await task.start_tasks_xmpp(self, jid, ['status']) await task.start_tasks_xmpp(self, jid, ['status'])
# except: # except:
# response = ( # response = (
@ -641,25 +633,11 @@ async def message(self, message):
await task.send_update(self, jid) await task.send_update(self, jid)
# await task.clean_tasks_xmpp( # task.clean_tasks_xmpp(self, jid, ['interval', 'status'])
# jid, ['interval', 'status']) # await task.start_tasks_xmpp(self, jid, ['status', 'interval'])
# await task.start_tasks_xmpp(
# self, jid, ['interval', 'status'])
# await refresh_task( # await refresh_task(self, jid, send_update, 'interval', num)
# self, # await refresh_task(self, jid, send_status, 'status', 20)
# jid,
# send_update,
# 'interval',
# num
# )
# await refresh_task(
# self,
# jid,
# send_status,
# 'status',
# 20
# )
# await refresh_task(jid, key, val) # await refresh_task(jid, key, val)
case 'old': case 'old':
db_file = config.get_pathname_to_database(jid_file) db_file = config.get_pathname_to_database(jid_file)
@ -703,7 +681,7 @@ async def message(self, message):
data = message_text[5:] data = message_text[5:]
data = data.split() data = data.split()
url = data[0] url = data[0]
await task.clean_tasks_xmpp(jid, ['status']) task.clean_tasks_xmpp(self, jid, ['status'])
status_type = 'dnd' status_type = 'dnd'
status_message = ('📫️ Processing request to fetch data from {}' status_message = ('📫️ Processing request to fetch data from {}'
.format(url)) .format(url))
@ -787,7 +765,7 @@ async def message(self, message):
# 'status', # 'status',
# 20 # 20
# ) # )
# await task.clean_tasks_xmpp(jid, ['status']) # task.clean_tasks_xmpp(self, jid, ['status'])
await task.start_tasks_xmpp(self, jid, ['status']) await task.start_tasks_xmpp(self, jid, ['status'])
else: else:
response = 'Missing feed URL or index number.' response = 'Missing feed URL or index number.'
@ -795,7 +773,7 @@ async def message(self, message):
case _ if message_lowercase.startswith('reset'): case _ if message_lowercase.startswith('reset'):
# TODO Reset also by ID # TODO Reset also by ID
ix_url = message_text[6:] ix_url = message_text[6:]
await task.clean_tasks_xmpp(jid, ['status']) task.clean_tasks_xmpp(self, jid, ['status'])
status_type = 'dnd' status_type = 'dnd'
status_message = '📫️ Marking entries as read...' status_message = '📫️ Marking entries as read...'
XmppPresence.send(self, jid, status_message, XmppPresence.send(self, jid, status_message,
@ -859,22 +837,24 @@ async def message(self, message):
response = await action.list_statistics(db_file) response = await action.list_statistics(db_file)
XmppMessage.send_reply(self, message, response) XmppMessage.send_reply(self, message, response)
case _ if message_lowercase.startswith('disable '): case _ if message_lowercase.startswith('disable '):
ix = message_text[8:] feed_id = message_text[8:]
db_file = config.get_pathname_to_database(jid_file) db_file = config.get_pathname_to_database(jid_file)
try: try:
await sqlite.set_enabled_status(db_file, ix, 0) await sqlite.set_enabled_status(db_file, feed_id, 0)
await sqlite.mark_feed_as_read(db_file, feed_id)
response = ('Updates are now disabled for news source {}.' response = ('Updates are now disabled for news source {}.'
.format(ix)) .format(feed_id))
except: except:
response = 'No news source with index {}.'.format(ix) response = 'No news source with index {}.'.format(feed_id)
XmppMessage.send_reply(self, message, response) XmppMessage.send_reply(self, message, response)
await task.start_tasks_xmpp(self, jid, ['status'])
case _ if message_lowercase.startswith('enable'): case _ if message_lowercase.startswith('enable'):
ix = message_text[7:] feed_id = message_text[7:]
db_file = config.get_pathname_to_database(jid_file) db_file = config.get_pathname_to_database(jid_file)
try: try:
await sqlite.set_enabled_status(db_file, ix, 1) await sqlite.set_enabled_status(db_file, feed_id, 1)
response = ('Updates are now enabled for news source {}.' response = ('Updates are now enabled for news source {}.'
.format(ix)) .format(feed_id))
except: except:
response = 'No news source with index {}.'.format(ix) response = 'No news source with index {}.'.format(ix)
XmppMessage.send_reply(self, message, response) XmppMessage.send_reply(self, message, response)