Fix automated publishing mechanism of PubSub, so it will not halt other procedures.

This commit is contained in:
Schimon Jehudah, Adv. 2024-07-31 20:53:42 +03:00
parent 4642aa694b
commit ed7491b5a9
7 changed files with 80 additions and 13 deletions

View file

@ -88,7 +88,7 @@ $ slixfeed
## Recommended Clients ## Recommended Clients
Slixfeed works with any XMPP chat client; if you want to make use of the visual interface Slixfeed has to offer (i.e. Ad-Hoc Commands), then you are advised to use [Cheogram](https://cheogram.com), [Converse](https://conversejs.org), [Gajim](https://gajim.org), [monocles chat](https://monocles.chat), [Movim](https://mov.im), [Poezio](https://poez.io), [Psi](https://psi-im.org) or [Psi+](https://psi-plus.com). Slixfeed works with any XMPP chat client; if you want to make use of the visual interface Slixfeed has to offer (i.e. Ad-Hoc Commands), then you are advised to use [Cheogram](https://cheogram.com), [Converse](https://conversejs.org), [Gajim](https://gajim.org), [LeechCraft](https://leechcraft.org/plugins-azoth-xoox), [monocles chat](https://monocles.chat), [Movim](https://mov.im), [Poezio](https://poez.io), [Psi](https://psi-im.org) or [Psi+](https://psi-plus.com).
### Support ### Support

View file

@ -58,4 +58,5 @@ class Message:
def printer(text): def printer(text):
now = datetime.now() now = datetime.now()
current_time = now.strftime("%H:%M:%S") current_time = now.strftime("%H:%M:%S")
print('{} {}'.format(current_time, text), end='\r') # print('{} {}'.format(current_time, text), end='\r')
print('{} {}'.format(current_time, text))

View file

@ -1371,6 +1371,10 @@ class FeedTask:
# ) # )
# Consider an endless loop. See XmppPubsubTask.loop_task
# def restart_task(self, jid_bare):
def restart_task(self, jid_bare): def restart_task(self, jid_bare):
if jid_bare == self.boundjid.bare: if jid_bare == self.boundjid.bare:
return return

View file

@ -1,2 +1,2 @@
__version__ = '0.1.92' __version__ = '0.1.93'
__version_info__ = (0, 1, 92) __version_info__ = (0, 1, 93)

View file

@ -349,21 +349,29 @@ class XmppClient(slixmpp.ClientXMPP):
await self.get_roster() await self.get_roster()
# self.service_reactions() # self.service_reactions()
XmppConnectTask.ping(self) XmppConnectTask.ping(self)
# results = await XmppPubsub.get_pubsub_services(self)
# for result in results + [{'jid' : self.boundjid.bare,
# 'name' : self.alias}]:
# jid_bare = result['jid']
# if jid_bare not in self.settings:
# db_file = config.get_pathname_to_database(jid_bare)
# Config.add_settings_jid(self, jid_bare, db_file)
# await FeedTask.check_updates(self, jid_bare)
# XmppPubsubTask.task_publish(self, jid_bare)
bookmarks = await XmppBookmark.get_bookmarks(self) bookmarks = await XmppBookmark.get_bookmarks(self)
await XmppGroupchat.autojoin(self, bookmarks) await XmppGroupchat.autojoin(self, bookmarks)
if 'ipc' in self.defaults and self.defaults['ipc']['bsd']: if 'ipc' in self.defaults and self.defaults['ipc']['bsd']:
# Start Inter-Process Communication # Start Inter-Process Communication
print('POSIX sockets: Initiating IPC server...') print('POSIX sockets: Initiating IPC server...')
self.ipc = asyncio.create_task(XmppIpcServer.ipc(self)) self.ipc = asyncio.create_task(XmppIpcServer.ipc(self))
while True:
results = await XmppPubsub.get_pubsub_services(self)
for result in results + [{'jid' : self.boundjid.bare,
'name' : self.alias}]:
jid_bare = result['jid']
if jid_bare not in self.settings:
db_file = config.get_pathname_to_database(jid_bare)
Config.add_settings_jid(self, jid_bare, db_file)
#await FeedTask.check_updates(self, jid_bare)
#await XmppPubsubTask.task_publish(self, jid_bare)
FeedTask.restart_task(self, jid_bare)
#XmppPubsubTask.loop_task(self, jid_bare)
XmppPubsubTask.restart_task(self, jid_bare)
await asyncio.sleep(60 * 180)
time_end = time.time() time_end = time.time()
difference = time_end - time_begin difference = time_end - time_begin
if difference > 1: logger.warning('{} (time: {})'.format(function_name, if difference > 1: logger.warning('{} (time: {})'.format(function_name,

View file

@ -49,6 +49,7 @@ class XmppOmemo:
async def decrypt(self, message: Message, allow_untrusted: bool = False): async def decrypt(self, message: Message, allow_untrusted: bool = False):
jid = message['from'] jid = message['from']
try: try:
print('XmppOmemo.decrypt')
message_omemo_encrypted = message['omemo_encrypted'] message_omemo_encrypted = message['omemo_encrypted']
message_body = await self['xep_0384'].decrypt_message( message_body = await self['xep_0384'].decrypt_message(
message_omemo_encrypted, jid, allow_untrusted) message_omemo_encrypted, jid, allow_untrusted)
@ -62,6 +63,7 @@ class XmppOmemo:
omemo_decrypted = response = None omemo_decrypted = response = None
retry = None retry = None
except (MissingOwnKey,) as exn: except (MissingOwnKey,) as exn:
print('XmppOmemo.decrypt. except: MissingOwnKey')
# The message is missing our own key, it was not encrypted for # The message is missing our own key, it was not encrypted for
# us, and we can't decrypt it. # us, and we can't decrypt it.
response = ('Error: Your message has not been encrypted for ' response = ('Error: Your message has not been encrypted for '
@ -70,6 +72,7 @@ class XmppOmemo:
retry = False retry = False
logger.error(exn) logger.error(exn)
except (NoAvailableSession,) as exn: except (NoAvailableSession,) as exn:
print('XmppOmemo.decrypt. except: NoAvailableSession')
# We received a message from that contained a session that we # We received a message from that contained a session that we
# don't know about (deleted session storage, etc.). We can't # don't know about (deleted session storage, etc.). We can't
# decrypt the message, and it's going to be lost. # decrypt the message, and it's going to be lost.
@ -82,6 +85,8 @@ class XmppOmemo:
retry = False retry = False
logger.error(exn) logger.error(exn)
except (UndecidedException, UntrustedException) as exn: except (UndecidedException, UntrustedException) as exn:
print('XmppOmemo.decrypt. except: UndecidedException')
print('XmppOmemo.decrypt. except: UntrustedException')
# We received a message from an untrusted device. We can # We received a message from an untrusted device. We can
# choose to decrypt the message nonetheless, with the # choose to decrypt the message nonetheless, with the
# `allow_untrusted` flag on the `decrypt_message` call, which # `allow_untrusted` flag on the `decrypt_message` call, which
@ -98,6 +103,7 @@ class XmppOmemo:
# We resend, setting the `allow_untrusted` parameter to True. # We resend, setting the `allow_untrusted` parameter to True.
# await XmppChat.process_message(self, message, allow_untrusted=True) # await XmppChat.process_message(self, message, allow_untrusted=True)
except (EncryptionPrepareException,) as exn: except (EncryptionPrepareException,) as exn:
print('XmppOmemo.decrypt. except: EncryptionPrepareException')
# Slixmpp tried its best, but there were errors it couldn't # Slixmpp tried its best, but there were errors it couldn't
# resolve. At this point you should have seen other exceptions # resolve. At this point you should have seen other exceptions
# and given a chance to resolve them already. # and given a chance to resolve them already.
@ -107,6 +113,7 @@ class XmppOmemo:
retry = False retry = False
logger.error(exn) logger.error(exn)
except (Exception,) as exn: except (Exception,) as exn:
print('XmppOmemo.decrypt. except: Exception')
response = ('Error: Your message has not been encrypted for ' response = ('Error: Your message has not been encrypted for '
'Slixfeed (Unknown).') 'Slixfeed (Unknown).')
omemo_decrypted = False omemo_decrypted = False
@ -118,9 +125,12 @@ class XmppOmemo:
async def encrypt(self, jid: JID, message_body): async def encrypt(self, jid: JID, message_body):
print(jid)
print(message_body)
expect_problems = {} # type: Optional[Dict[JID, List[int]]] expect_problems = {} # type: Optional[Dict[JID, List[int]]]
while True: while True:
try: try:
print('XmppOmemo.encrypt')
# `encrypt_message` excepts the plaintext to be sent, a list of # `encrypt_message` excepts the plaintext to be sent, a list of
# bare JIDs to encrypt to, and optionally a dict of problems to # bare JIDs to encrypt to, and optionally a dict of problems to
# expect per bare JID. # expect per bare JID.
@ -137,6 +147,7 @@ class XmppOmemo:
omemo_encrypted = True omemo_encrypted = True
break break
except UndecidedException as exn: except UndecidedException as exn:
print('XmppOmemo.encrypt. except: UndecidedException')
# The library prevents us from sending a message to an # The library prevents us from sending a message to an
# untrusted/undecided barejid, so we need to make a decision here. # untrusted/undecided barejid, so we need to make a decision here.
# This is where you prompt your user to ask what to do. In # This is where you prompt your user to ask what to do. In
@ -145,6 +156,7 @@ class XmppOmemo:
omemo_encrypted = False omemo_encrypted = False
# TODO: catch NoEligibleDevicesException # TODO: catch NoEligibleDevicesException
except EncryptionPrepareException as exn: except EncryptionPrepareException as exn:
print('XmppOmemo.encrypt. except: EncryptionPrepareException')
# This exception is being raised when the library has tried # This exception is being raised when the library has tried
# all it could and doesn't know what to do anymore. It # all it could and doesn't know what to do anymore. It
# contains a list of exceptions that the user must resolve, or # contains a list of exceptions that the user must resolve, or
@ -168,10 +180,12 @@ class XmppOmemo:
device_list = expect_problems.setdefault(jid, []) device_list = expect_problems.setdefault(jid, [])
device_list.append(error.device) device_list.append(error.device)
except (IqError, IqTimeout) as exn: except (IqError, IqTimeout) as exn:
print('XmppOmemo.encrypt. except: IqError, IqTimeout')
message_body = ('An error occured while fetching information ' message_body = ('An error occured while fetching information '
'on a recipient.\n%r' % exn) 'on a recipient.\n%r' % exn)
omemo_encrypted = False omemo_encrypted = False
except Exception as exn: except Exception as exn:
print('XmppOmemo.encrypt. except: Exception')
message_body = ('An error occured while attempting to encrypt' message_body = ('An error occured while attempting to encrypt'
'.\n%r' % exn) '.\n%r' % exn)
omemo_encrypted = False omemo_encrypted = False

View file

@ -19,6 +19,7 @@ from slixfeed.syndication import Feed
from slixfeed.utilities import String, Url, Utilities from slixfeed.utilities import String, Url, Utilities
from slixfeed.xmpp.iq import XmppIQ from slixfeed.xmpp.iq import XmppIQ
import sys import sys
import time
logger = Logger(__name__) logger = Logger(__name__)
@ -370,12 +371,51 @@ class XmppPubsubAction:
await XmppIQ.send(self, iq_create_entry) await XmppIQ.send(self, iq_create_entry)
ix = entry[0] ix = entry[0]
await sqlite.mark_as_read(db_file, ix) await sqlite.mark_as_read(db_file, ix)
print(report)
return report return report
class XmppPubsubTask: class XmppPubsubTask:
def loop_task(self, jid_bare):
db_file = config.get_pathname_to_database(jid_bare)
if jid_bare not in self.settings:
Config.add_settings_jid(self, jid_bare, db_file)
while True:
if jid_bare not in self.task_manager:
self.task_manager[jid_bare] = {}
logger.info('Creating new task manager for JID {}'.format(jid_bare))
logger.info('Stopping task "publish" for JID {}'.format(jid_bare))
try:
self.task_manager[jid_bare]['publish'].cancel()
except:
logger.info('No task "publish" for JID {} (XmppPubsubAction.send_unread_items)'
.format(jid_bare))
logger.info('Starting tasks "publish" for JID {}'.format(jid_bare))
self.task_manager[jid_bare]['publish'] = asyncio.create_task(
XmppPubsubAction.send_unread_items(self, jid_bare))
time.sleep(60 * 180)
def restart_task(self, jid_bare):
db_file = config.get_pathname_to_database(jid_bare)
if jid_bare not in self.settings:
Config.add_settings_jid(self, jid_bare, db_file)
if jid_bare not in self.task_manager:
self.task_manager[jid_bare] = {}
logger.info('Creating new task manager for JID {}'.format(jid_bare))
logger.info('Stopping task "publish" for JID {}'.format(jid_bare))
try:
self.task_manager[jid_bare]['publish'].cancel()
except:
logger.info('No task "publish" for JID {} (XmppPubsubAction.send_unread_items)'
.format(jid_bare))
logger.info('Starting tasks "publish" for JID {}'.format(jid_bare))
self.task_manager[jid_bare]['publish'] = asyncio.create_task(
XmppPubsubAction.send_unread_items(self, jid_bare))
async def task_publish(self, jid_bare): async def task_publish(self, jid_bare):
db_file = config.get_pathname_to_database(jid_bare) db_file = config.get_pathname_to_database(jid_bare)
if jid_bare not in self.settings: if jid_bare not in self.settings: