From 35beab7802b07eef7ee8bb7650d498d771c6f49c Mon Sep 17 00:00:00 2001 From: Schimon Jehudah Date: Sun, 24 Mar 2024 08:14:20 +0000 Subject: [PATCH] Add PubSub functionality. --- slixfeed/action.py | 207 +++++++++------------- slixfeed/version.py | 4 +- slixfeed/xmpp/client.py | 341 +++++++++++++++++++++++++++++++++---- slixfeed/xmpp/component.py | 337 ++++++++++++++++++++++++++++++++---- slixfeed/xmpp/iq.py | 10 +- slixfeed/xmpp/process.py | 102 +++++++++-- slixfeed/xmpp/publish.py | 113 +++++++++++- 7 files changed, 900 insertions(+), 214 deletions(-) diff --git a/slixfeed/action.py b/slixfeed/action.py index 1e21092..36359c7 100644 --- a/slixfeed/action.py +++ b/slixfeed/action.py @@ -745,11 +745,10 @@ def export_to_opml(jid, filename, results): tree.write(filename) -async def import_opml(db_file, url): +async def import_opml(db_file, result): function_name = sys._getframe().f_code.co_name - logger.debug('{}: db_file: {} url: {}' - .format(function_name, db_file, url)) - result = await fetch.http(url) + logger.debug('{}: db_file: {}' + .format(function_name, db_file)) if not result['error']: document = result['content'] root = ET.fromstring(document) @@ -1070,137 +1069,97 @@ async def scan_json(self, jid_bare, db_file, url): new_entries) -async def view_feed(url): +def view_feed(url, feed): function_name = sys._getframe().f_code.co_name logger.debug('{}: url: {}' .format(function_name, url)) - while True: - result = await fetch.http(url) - if not result['error']: - document = result['content'] - status = result['status_code'] - feed = parse(document) - # if is_feed(url, feed): - if is_feed(feed): - if "title" in feed["feed"].keys(): - title = feed["feed"]["title"] - else: - title = urlsplit(url).netloc - entries = feed.entries - response = "Preview of {}:\n\n```\n".format(title) - counter = 0 - for entry in entries: - counter += 1 - if entry.has_key("title"): - title = entry.title - else: - title = "*** No title ***" - if entry.has_key("link"): - # link = complete_url(source, entry.link) - link = join_url(url, entry.link) - link = trim_url(link) - else: - link = "*** No link ***" - if entry.has_key("published"): - date = entry.published - date = dt.rfc2822_to_iso8601(date) - elif entry.has_key("updated"): - date = entry.updated - date = dt.rfc2822_to_iso8601(date) - else: - date = "*** No date ***" - response += ("Title : {}\n" - "Date : {}\n" - "Link : {}\n" - "Count : {}\n" - "\n" - .format(title, date, link, counter)) - if counter > 4: - break - response += ( - "```\nSource: {}" - ).format(url) - break - else: - result = await crawl.probe_page(url, document) - if isinstance(result, str): - response = result - break - else: - url = result[0] + if "title" in feed["feed"].keys(): + title = feed["feed"]["title"] + else: + title = urlsplit(url).netloc + entries = feed.entries + response = "Preview of {}:\n\n```\n".format(title) + counter = 0 + for entry in entries: + counter += 1 + if entry.has_key("title"): + title = entry.title else: - response = ('> {}\nFailed to load URL. Reason: {}' - .format(url, status)) + title = "*** No title ***" + if entry.has_key("link"): + # link = complete_url(source, entry.link) + link = join_url(url, entry.link) + link = trim_url(link) + else: + link = "*** No link ***" + if entry.has_key("published"): + date = entry.published + date = dt.rfc2822_to_iso8601(date) + elif entry.has_key("updated"): + date = entry.updated + date = dt.rfc2822_to_iso8601(date) + else: + date = "*** No date ***" + response += ("Title : {}\n" + "Date : {}\n" + "Link : {}\n" + "Count : {}\n" + "\n" + .format(title, date, link, counter)) + if counter > 4: break + response += ( + "```\nSource: {}" + ).format(url) return response -async def view_entry(url, num): +def view_entry(url, feed, num): function_name = sys._getframe().f_code.co_name logger.debug('{}: url: {} num: {}' .format(function_name, url, num)) - while True: - result = await fetch.http(url) - if not result['error']: - document = result['content'] - status = result['status_code'] - feed = parse(document) - # if is_feed(url, feed): - if is_feed(feed): - if "title" in feed["feed"].keys(): - title = feed["feed"]["title"] - else: - title = urlsplit(url).netloc - entries = feed.entries - num = int(num) - 1 - entry = entries[num] - response = "Preview of {}:\n\n```\n".format(title) - if entry.has_key("title"): - title = entry.title - else: - title = "*** No title ***" - if entry.has_key("published"): - date = entry.published - date = dt.rfc2822_to_iso8601(date) - elif entry.has_key("updated"): - date = entry.updated - date = dt.rfc2822_to_iso8601(date) - else: - date = "*** No date ***" - if entry.has_key("summary"): - summary = entry.summary - # Remove HTML tags - summary = BeautifulSoup(summary, "lxml").text - # TODO Limit text length - summary = summary.replace("\n\n\n", "\n\n") - else: - summary = "*** No summary ***" - if entry.has_key("link"): - # link = complete_url(source, entry.link) - link = join_url(url, entry.link) - link = trim_url(link) - else: - link = "*** No link ***" - response = ("{}\n" - "\n" - # "> {}\n" - "{}\n" - "\n" - "{}\n" - "\n" - .format(title, summary, link)) - break - else: - result = await crawl.probe_page(url, document) - if isinstance(result, str): - response = result - break - else: - url = result[0] - else: - response = ('> {}\nFailed to load URL. Reason: {}' - .format(url, status)) - break + if "title" in feed["feed"].keys(): + title = feed["feed"]["title"] + else: + title = urlsplit(url).netloc + entries = feed.entries + num = int(num) - 1 + entry = entries[num] + response = "Preview of {}:\n\n```\n".format(title) + if entry.has_key("title"): + title = entry.title + else: + title = "*** No title ***" + if entry.has_key("published"): + date = entry.published + date = dt.rfc2822_to_iso8601(date) + elif entry.has_key("updated"): + date = entry.updated + date = dt.rfc2822_to_iso8601(date) + else: + date = "*** No date ***" + if entry.has_key("summary"): + summary = entry.summary + # Remove HTML tags + summary = BeautifulSoup(summary, "lxml").text + # TODO Limit text length + summary = summary.replace("\n\n\n", "\n\n") + else: + summary = "*** No summary ***" + if entry.has_key("link"): + # link = complete_url(source, entry.link) + link = join_url(url, entry.link) + link = trim_url(link) + else: + link = "*** No link ***" + response = ("{}\n" + "\n" + # "> {}\n" + "{}\n" + "\n" + "{}\n" + "\n" + .format(title, summary, link)) return response diff --git a/slixfeed/version.py b/slixfeed/version.py index 1e8ec43..df3a761 100644 --- a/slixfeed/version.py +++ b/slixfeed/version.py @@ -1,2 +1,2 @@ -__version__ = '0.1.46' -__version_info__ = (0, 1, 46) +__version__ = '0.1.47' +__version_info__ = (0, 1, 47) diff --git a/slixfeed/xmpp/client.py b/slixfeed/xmpp/client.py index c908959..103f6ab 100644 --- a/slixfeed/xmpp/client.py +++ b/slixfeed/xmpp/client.py @@ -30,10 +30,12 @@ NOTE """ + import asyncio +from feedparser import parse import slixmpp import slixfeed.task as task - +from slixfeed.url import join_url, trim_url # from slixmpp.plugins.xep_0363.http_upload import FileTooBig, HTTPError, UploadServiceNotFound # from slixmpp.plugins.xep_0402 import BookmarkStorage, Conference # from slixmpp.plugins.xep_0048.stanza import Bookmarks @@ -49,9 +51,11 @@ from slixfeed.version import __version__ from slixfeed.xmpp.bookmark import XmppBookmark from slixfeed.xmpp.connect import XmppConnect from slixfeed.xmpp.muc import XmppGroupchat +from slixfeed.xmpp.iq import XmppIQ from slixfeed.xmpp.message import XmppMessage import slixfeed.xmpp.process as process import slixfeed.xmpp.profile as profile +from slixfeed.xmpp.publish import XmppPubsub from slixfeed.xmpp.roster import XmppRoster # import slixfeed.xmpp.service as service from slixfeed.xmpp.presence import XmppPresence @@ -123,6 +127,9 @@ class Slixfeed(slixmpp.ClientXMPP): # Handler for task messages self.pending_tasks = {} + # Handler for task messages counter + # self.pending_tasks_counter = 0 + # Handler for ping self.task_ping_instance = {} @@ -684,15 +691,18 @@ class Slixfeed(slixmpp.ClientXMPP): # NOTE https://codeberg.org/poezio/slixmpp/issues/3515 # if is_operator(self, jid_bare): - self['xep_0050'].add_command(node='recent', - name='📰️ Browse', - handler=self._handle_recent) self['xep_0050'].add_command(node='subscription', name='🪶️ Subscribe', handler=self._handle_subscription_add) self['xep_0050'].add_command(node='publish', - name='📣️ Publish', - handler=self._handle_publish) + name='📻 Publish', + handler=self._handle_pubsub_add) + self['xep_0050'].add_command(node='post', + name='📣️ Post', + handler=self._handle_post) + self['xep_0050'].add_command(node='recent', + name='📰️ Browse', + handler=self._handle_recent) self['xep_0050'].add_command(node='subscriptions', name='🎫️ Subscriptions', handler=self._handle_subscriptions) @@ -727,33 +737,56 @@ class Slixfeed(slixmpp.ClientXMPP): # Special interface # http://jabber.org/protocol/commands#actions - async def _handle_publish(self, iq, session): - form = self['xep_0004'].make_form('form', 'Publish') - form['instructions'] = ('In order to publish via PubSub, you will ' - 'have to choose a PubSub hostname and ' - 'have a privilege to publish into it.') - # TODO Select from list-multi + async def _handle_post(self, iq, session): + jid_bare = session['from'].bare + if not is_operator(self, jid_bare): + text_warn = 'PubSub is not available.' + session['notes'] = [['warn', text_warn]] + return session + form = self['xep_0004'].make_form('form', 'Post') + form['instructions'] = ('In order to post to PubSub, you will have to ' + 'choose a PubSub Jabber ID (e.g. {}) and ' + 'Slixfeed has to be allowed to publish into ' + 'it.' + .format(self.boundjid.bare)) form.add_field(var='url', ftype='text-single', label='URL', - desc='Enter subscription URL.', + desc='Enter a subscription URL.', value='http://', required=True) - form.add_field(var='pubsub', + # options = form.add_field(var='jid', + # ftype='list-single', + # label='PubSub', + # desc='Select a PubSub Jabber ID.', + # value=self.boundjid.bare, + # required=True) + # iq = await self['xep_0030'].get_items(jid=self.boundjid.domain) + # for item in iq['disco_items']['items']: + # jid = item[0] + # if item[1]: name = item[1] + # elif item[2]: name = item[2] + # else: name = jid + # options.addOption(jid, name) + form.add_field(var='jid', ftype='text-single', label='PubSub', - desc='Enter a PubSub URL.', - value='pubsub.' + self.boundjid.host, + desc='Enter a PubSub Jabber ID.', + value=self.boundjid.bare, + # value='pubsub.' + self.boundjid.host, required=True) form.add_field(var='node', ftype='text-single', label='Node', - desc='Node to publish at.', + desc=('Enter a node to publish to (The default value ' + 'is "urn:xmpp:microblog:0" which is allocated to ' + 'each Jabber ID in the Movim system).'), + value='urn:xmpp:microblog:0', required=False) options = form.add_field(var='xep', ftype='list-single', - label='Type', - desc='Select XEP.', + label='Protocol', + desc='Select XMPP Extension Protocol.', value='0060', required=True) options.addOption('XEP-0060: Publish-Subscribe', '0060') @@ -766,14 +799,176 @@ class Slixfeed(slixmpp.ClientXMPP): session['payload'] = form return session - def _handle_preview(self, payload, session): + async def _handle_preview(self, payload, session): jid_full = str(session['from']) function_name = sys._getframe().f_code.co_name logger.debug('{}: jid_full: {}' .format(function_name, jid_full)) - text_note = ('PubSub support will be available soon.') - session['notes'] = [['info', text_note]] - session['payload'] = None + values = payload['values'] + jid = values['jid'] + jid_bare = session['from'].bare + if jid != jid_bare and not is_operator(self, jid_bare): + text_warn = ('Posting to {} is restricted to operators only.' + .format(jid_bare)) + session['allow_prev'] = False + session['has_next'] = False + session['next'] = None + session['notes'] = [['warn', text_warn]] + session['prev'] = None + session['payload'] = None + return session + node = values['node'] + url = values['url'] + xep = values['xep'] + form = self['xep_0004'].make_form('form', 'Publish') + while True: + result = await fetch.http(url) + status = result['status_code'] + if not result['error']: + document = result['content'] + feed = parse(document) + # if is_feed(url, feed): + if action.is_feed(feed): + form['instructions'] = 'Select entries to publish.' + options = form.add_field(var='entries', + ftype='list-multi', + label='Titles', + desc='Select entries to post.', + required=True) + if "title" in feed["feed"].keys(): + title = feed["feed"]["title"] + else: + title = url + entries = feed.entries + entry_ix = 0 + for entry in entries: + if entry.has_key("title"): + title = entry.title + else: + if entry.has_key("published"): + title = entry.published + title = dt.rfc2822_to_iso8601(title) + elif entry.has_key("updated"): + title = entry.updated + title = dt.rfc2822_to_iso8601(title) + else: + title = "*** No title ***" + options.addOption(title, str(entry_ix)) + entry_ix += 1 + if entry_ix > 9: + break + session['allow_prev'] = True + session['has_next'] = True + session['next'] = self._handle_post_complete + session['prev'] = self._handle_post + session['payload'] = form + break + else: + result = await crawl.probe_page(url, document) + if isinstance(result, list): + form['instructions'] = ('Discovered {} subscriptions ' + 'from the given URL. Please ' + 'choose a subscription.' + .format(len(result))) + options = form.add_field(var='url', + ftype='list-single', + label='Feeds', + desc='Select a feed.', + required=True) + results = result + for result in results: + title = result['name'] + url = result['link'] + title = title if title else url + options.addOption(title, url) + session['allow_prev'] = True + session['has_next'] = True + session['next'] = self._handle_preview + session['prev'] = self._handle_post + session['payload'] = form + break + else: + url = result['link'] + else: + text_error = ('Failed to load URL {}' + '\n\n' + 'Reason: {}' + .format(url, status)) + session['notes'] = [['error', text_error]] + session['payload'] = None + break + form.add_field(var='node', + ftype='hidden', + value=node) + form.add_field(var='jid', + ftype='hidden', + value=jid) + # It is essential to place URL at the end, because it might mutate + # For example http://blacklistednews.com would change + # to https://www.blacklistednews.com/rss.php + form.add_field(var='url', + ftype='hidden', + value=url) + form.add_field(var='xep', + ftype='hidden', + value=xep) + return session + + async def _handle_post_complete(self, payload, session): + values = payload['values'] + entries = values['entries'] + # It might not be good to pass feed object as its size might be too big + # Consider a handler self.feeds[url][feed] or self.feeds[jid][url][feed] + # It is not possible to assign non-str to transfer. + # feed = values['feed'] + node = values['node'][0] + jid = values['jid'][0] + url = values['url'][0] + xep = values['xep'][0] + result = await fetch.http(url) + if 'content' in result: + document = result['content'] + feed = parse(document) + if feed.feed.has_key('title'): + feed_title = feed.feed.title + if feed.feed.has_key('description'): + feed_summary = feed.feed.description + elif feed.feed.has_key('subtitle'): + feed_summary = feed.feed.subtitle + else: + feed_summary = None + iq_create_node = XmppPubsub.create_node( + self, jid, node, xep, feed_title, feed_summary) + await XmppIQ.send(self, iq_create_node) + feed_version = feed.version + for entry in entries: + entry = int(entry) + feed_entry = feed.entries[entry] + # if feed.entries[entry].has_key("title"): + # title = feed.entries[entry].title + # else: + # if feed.entries[entry].has_key("published"): + # title = feed.entries[entry].published + # title = dt.rfc2822_to_iso8601(title) + # elif feed.entries[entry].has_key("updated"): + # title = feed.entries[entry].updated + # title = dt.rfc2822_to_iso8601(title) + # else: + # title = "*** No title ***" + # if feed.entries[entry].has_key("summary"): + # summary = feed.entries[entry].summary + iq_create_entry = XmppPubsub.create_entry( + self, jid, node, feed_entry, feed_version) + await XmppIQ.send(self, iq_create_entry) + text_info = 'Posted {} entries.'.format(len(entries)) + session['allow_prev'] = False + session['has_next'] = False + session['next'] = None + session['notes'] = [['info', text_info]] + session['prev'] = None + session['payload'] = None + else: + session['payload'] = payload return session async def _handle_profile(self, iq, session): @@ -950,9 +1145,8 @@ class Slixfeed(slixmpp.ClientXMPP): db_file = config.get_pathname_to_database(jid_file) # In this case (as is typical), the payload is a form values = payload['values'] - for value in values: - key = value - val = values[value] + for key in values: + val = values[key] # NOTE We might want to add new keywords from # an empty form instead of editing a form. # keywords = sqlite.get_filter_value(db_file, key) @@ -973,6 +1167,80 @@ class Slixfeed(slixmpp.ClientXMPP): return session + async def _handle_pubsub_add(self, iq, session): + jid_full = str(session['from']) + function_name = sys._getframe().f_code.co_name + logger.debug('{}: jid_full: {}' + .format(function_name, jid_full)) + jid_bare = session['from'].bare + if not is_operator(self, jid_bare): + text_warn = 'PubSub is not available.' + session['notes'] = [['warn', text_warn]] + return session + chat_type = await get_chat_type(self, jid_bare) + moderator = None + if chat_type == 'groupchat': + moderator = is_moderator(self, jid_bare, jid_full) + if chat_type == 'chat' or moderator: + form = self['xep_0004'].make_form('form', 'Publish') + form['instructions'] = ('In order to publish via PubSub, you will ' + 'have to choose a PubSub Jabber ID (e.g. ' + '{}) and Slixfeed has to be allowed to ' + 'publish into it.' + .format('pubsub.' + self.boundjid.host)) + form.add_field(var='subscription', + # TODO Make it possible to add several subscriptions at once; + # Similarly to BitTorrent trackers list + # ftype='text-multi', + # label='Subscription URLs', + # desc=('Add subscriptions one time per ' + # 'subscription.'), + ftype='text-single', + label='URL', + desc='Enter a subscription URL.', + value='http://', + required=True) + form.add_field(var='jid', + ftype='text-single', + label='PubSub', + desc='Enter a PubSub Jabber ID.', + value=self.boundjid.bare, + required=True) + form.add_field(var='node', + ftype='text-single', + label='Node', + desc=('Enter a node to publish to (The default ' + 'value is "urn:xmpp:microblog:0" which ' + 'is allocated to each Jabber ID in the ' + 'Movim system).'), + value='urn:xmpp:microblog:0', + required=False) + options = form.add_field(var='xep', + ftype='list-single', + label='Protocol', + desc='Select XMPP Extension Protocol.', + value='0060', + required=True) + options.addOption('XEP-0060: Publish-Subscribe', '0060') + options.addOption('XEP-0277: Microblogging over XMPP', '0277') + options.addOption('XEP-0472: Pubsub Social Feed', '0472') + # form.add_field(var='scan', + # ftype='boolean', + # label='Scan', + # desc='Scan URL for validity (recommended).', + # value=True) + session['allow_prev'] = False + session['has_next'] = True + session['next'] = self._handle_subscription_new + session['prev'] = None + session['payload'] = form + else: + text_warn = ('This resource is restricted to moderators of {}.' + .format(jid_bare)) + session['notes'] = [['warn', text_warn]] + return session + + async def _handle_subscription_add(self, iq, session): jid_full = str(session['from']) function_name = sys._getframe().f_code.co_name @@ -995,9 +1263,18 @@ class Slixfeed(slixmpp.ClientXMPP): # 'subscription.'), ftype='text-single', label='URL', - desc='Enter subscription URL.', + desc='Enter a subscription URL.', value='http://', required=True) + if is_operator(self, jid_bare): + form.add_field(ftype='fixed', + label='Subscriber') + form.add_field(var='jid', + ftype='text-single', + label='Jabber ID', + desc=('Enter a Jabber ID to add the ' + 'subscription to (The default Jabber ID ' + 'is your own).')) # form.add_field(var='scan', # ftype='boolean', # label='Scan', @@ -2016,7 +2293,7 @@ class Slixfeed(slixmpp.ClientXMPP): ftype='list-single', label='Choose', required=True, - value='admin') + value='export') jid = session['from'].bare if is_operator(self, jid): options.addOption('Administration', 'admin') @@ -2291,7 +2568,8 @@ class Slixfeed(slixmpp.ClientXMPP): jid_bare = session['from'].bare jid_file = jid_bare.replace('/', '_') db_file = config.get_pathname_to_database(jid_file) - count = await action.import_opml(db_file, url) + result = await fetch.http(url) + count = await action.import_opml(db_file, result) try: int(count) # form = self['xep_0004'].make_form('result', 'Done') @@ -2868,9 +3146,8 @@ class Slixfeed(slixmpp.ClientXMPP): Config.add_settings_jid(self.settings, jid_bare, db_file) # In this case (as is typical), the payload is a form values = payload['values'] - for value in values: - key = value - val = values[value] + for key in values: + val = values[key] if key in ('enabled', 'media', 'old'): if val == True: diff --git a/slixfeed/xmpp/component.py b/slixfeed/xmpp/component.py index 40ffa4e..f755a44 100644 --- a/slixfeed/xmpp/component.py +++ b/slixfeed/xmpp/component.py @@ -37,9 +37,11 @@ from slixfeed.version import __version__ from slixfeed.xmpp.connect import XmppConnect # NOTE MUC is possible for component from slixfeed.xmpp.muc import XmppGroupchat +from slixfeed.xmpp.iq import XmppIQ from slixfeed.xmpp.message import XmppMessage import slixfeed.xmpp.process as process import slixfeed.xmpp.profile as profile +from slixfeed.xmpp.publish import XmppPubsub # from slixfeed.xmpp.roster import XmppRoster # import slixfeed.xmpp.service as service from slixfeed.xmpp.presence import XmppPresence @@ -114,6 +116,9 @@ class SlixfeedComponent(slixmpp.ComponentXMPP): # Handler for task messages self.pending_tasks = {} + # Handler for task messages counter + # self.pending_tasks_counter = 0 + # Handler for ping self.task_ping_instance = {} @@ -643,15 +648,18 @@ class SlixfeedComponent(slixmpp.ComponentXMPP): # NOTE https://codeberg.org/poezio/slixmpp/issues/3515 # if is_operator(self, jid_bare): - self['xep_0050'].add_command(node='recent', - name='📰️ Browse', - handler=self._handle_recent) self['xep_0050'].add_command(node='subscription', name='🪶️ Subscribe', handler=self._handle_subscription_add) self['xep_0050'].add_command(node='publish', - name='📣️ Publish', - handler=self._handle_publish) + name='📻 Publish', + handler=self._handle_pubsub_add) + self['xep_0050'].add_command(node='post', + name='📣️ Post', + handler=self._handle_post) + self['xep_0050'].add_command(node='recent', + name='📰️ Browse', + handler=self._handle_recent) self['xep_0050'].add_command(node='subscriptions', name='🎫️ Subscriptions', handler=self._handle_subscriptions) @@ -686,33 +694,56 @@ class SlixfeedComponent(slixmpp.ComponentXMPP): # Special interface # http://jabber.org/protocol/commands#actions - async def _handle_publish(self, iq, session): - form = self['xep_0004'].make_form('form', 'Publish') - form['instructions'] = ('In order to publish via PubSub, you will ' - 'have to choose a PubSub hostname and ' - 'have a privilege to publish into it.') - # TODO Select from list-multi + async def _handle_post(self, iq, session): + jid_bare = session['from'].bare + if not is_operator(self, jid_bare): + text_warn = 'PubSub is not available.' + session['notes'] = [['warn', text_warn]] + return session + form = self['xep_0004'].make_form('form', 'Post') + form['instructions'] = ('In order to post to PubSub, you will have to ' + 'choose a PubSub Jabber ID (e.g. {}) and ' + 'Slixfeed has to be allowed to publish into ' + 'it.' + .format(self.boundjid.bare)) form.add_field(var='url', ftype='text-single', label='URL', - desc='Enter subscription URL.', + desc='Enter a subscription URL.', value='http://', required=True) - form.add_field(var='pubsub', + # options = form.add_field(var='jid', + # ftype='list-single', + # label='PubSub', + # desc='Select a PubSub Jabber ID.', + # value=self.boundjid.bare, + # required=True) + # iq = await self['xep_0030'].get_items(jid=self.boundjid.domain) + # for item in iq['disco_items']['items']: + # jid = item[0] + # if item[1]: name = item[1] + # elif item[2]: name = item[2] + # else: name = jid + # options.addOption(jid, name) + form.add_field(var='jid', ftype='text-single', label='PubSub', - desc='Enter a PubSub URL.', - value='pubsub.' + self.boundjid.host, + desc='Enter a PubSub Jabber ID.', + value=self.boundjid.bare, + # value='pubsub.' + self.boundjid.host, required=True) form.add_field(var='node', ftype='text-single', label='Node', - desc='Node to publish at.', + desc=('Enter a node to publish to (The default value ' + 'is "urn:xmpp:microblog:0" which is allocated to ' + 'each Jabber ID in the Movim system).'), + value='urn:xmpp:microblog:0', required=False) options = form.add_field(var='xep', ftype='list-single', - label='Type', - desc='Select XEP.', + label='Protocol', + desc='Select XMPP Extension Protocol.', value='0060', required=True) options.addOption('XEP-0060: Publish-Subscribe', '0060') @@ -725,14 +756,176 @@ class SlixfeedComponent(slixmpp.ComponentXMPP): session['payload'] = form return session - def _handle_preview(self, payload, session): + async def _handle_preview(self, payload, session): jid_full = str(session['from']) function_name = sys._getframe().f_code.co_name logger.debug('{}: jid_full: {}' .format(function_name, jid_full)) - text_note = ('PubSub support will be available soon.') - session['notes'] = [['info', text_note]] - session['payload'] = None + values = payload['values'] + jid = values['jid'] + jid_bare = session['from'].bare + if jid != jid_bare and not is_operator(self, jid_bare): + text_warn = ('Posting to {} is restricted to operators only.' + .format(jid_bare)) + session['allow_prev'] = False + session['has_next'] = False + session['next'] = None + session['notes'] = [['warn', text_warn]] + session['prev'] = None + session['payload'] = None + return session + node = values['node'] + url = values['url'] + xep = values['xep'] + form = self['xep_0004'].make_form('form', 'Publish') + while True: + result = await fetch.http(url) + status = result['status_code'] + if not result['error']: + document = result['content'] + feed = parse(document) + # if is_feed(url, feed): + if action.is_feed(feed): + form['instructions'] = 'Select entries to publish.' + options = form.add_field(var='entries', + ftype='list-multi', + label='Titles', + desc='Select entries to post.', + required=True) + if "title" in feed["feed"].keys(): + title = feed["feed"]["title"] + else: + title = url + entries = feed.entries + entry_ix = 0 + for entry in entries: + if entry.has_key("title"): + title = entry.title + else: + if entry.has_key("published"): + title = entry.published + title = dt.rfc2822_to_iso8601(title) + elif entry.has_key("updated"): + title = entry.updated + title = dt.rfc2822_to_iso8601(title) + else: + title = "*** No title ***" + options.addOption(title, str(entry_ix)) + entry_ix += 1 + if entry_ix > 9: + break + session['allow_prev'] = True + session['has_next'] = True + session['next'] = self._handle_post_complete + session['prev'] = self._handle_post + session['payload'] = form + break + else: + result = await crawl.probe_page(url, document) + if isinstance(result, list): + form['instructions'] = ('Discovered {} subscriptions ' + 'from the given URL. Please ' + 'choose a subscription.' + .format(len(result))) + options = form.add_field(var='url', + ftype='list-single', + label='Feeds', + desc='Select a feed.', + required=True) + results = result + for result in results: + title = result['name'] + url = result['link'] + title = title if title else url + options.addOption(title, url) + session['allow_prev'] = True + session['has_next'] = True + session['next'] = self._handle_preview + session['prev'] = self._handle_post + session['payload'] = form + break + else: + url = result['link'] + else: + text_error = ('Failed to load URL {}' + '\n\n' + 'Reason: {}' + .format(url, status)) + session['notes'] = [['error', text_error]] + session['payload'] = None + break + form.add_field(var='node', + ftype='hidden', + value=node) + form.add_field(var='jid', + ftype='hidden', + value=jid) + # It is essential to place URL at the end, because it might mutate + # For example http://blacklistednews.com would change + # to https://www.blacklistednews.com/rss.php + form.add_field(var='url', + ftype='hidden', + value=url) + form.add_field(var='xep', + ftype='hidden', + value=xep) + return session + + async def _handle_post_complete(self, payload, session): + values = payload['values'] + entries = values['entries'] + # It might not be good to pass feed object as its size might be too big + # Consider a handler self.feeds[url][feed] or self.feeds[jid][url][feed] + # It is not possible to assign non-str to transfer. + # feed = values['feed'] + node = values['node'][0] + jid = values['jid'][0] + url = values['url'][0] + xep = values['xep'][0] + result = await fetch.http(url) + if 'content' in result: + document = result['content'] + feed = parse(document) + if feed.feed.has_key('title'): + feed_title = feed.feed.title + if feed.feed.has_key('description'): + feed_summary = feed.feed.description + elif feed.feed.has_key('subtitle'): + feed_summary = feed.feed.subtitle + else: + feed_summary = None + iq_create_node = XmppPubsub.create_node( + self, jid, node, xep, feed_title, feed_summary) + await XmppIQ.send(self, iq_create_node) + feed_version = feed.version + for entry in entries: + entry = int(entry) + feed_entry = feed.entries[entry] + # if feed.entries[entry].has_key("title"): + # title = feed.entries[entry].title + # else: + # if feed.entries[entry].has_key("published"): + # title = feed.entries[entry].published + # title = dt.rfc2822_to_iso8601(title) + # elif feed.entries[entry].has_key("updated"): + # title = feed.entries[entry].updated + # title = dt.rfc2822_to_iso8601(title) + # else: + # title = "*** No title ***" + # if feed.entries[entry].has_key("summary"): + # summary = feed.entries[entry].summary + iq_create_entry = XmppPubsub.create_entry( + self, jid, node, feed_entry, feed_version) + await XmppIQ.send(self, iq_create_entry) + text_info = 'Posted {} entries.'.format(len(entries)) + session['allow_prev'] = False + session['has_next'] = False + session['next'] = None + session['notes'] = [['info', text_info]] + session['prev'] = None + session['payload'] = None + else: + session['payload'] = payload return session async def _handle_profile(self, iq, session): @@ -909,9 +1102,8 @@ class SlixfeedComponent(slixmpp.ComponentXMPP): db_file = config.get_pathname_to_database(jid_file) # In this case (as is typical), the payload is a form values = payload['values'] - for value in values: - key = value - val = values[value] + for key in values: + val = values[key] # NOTE We might want to add new keywords from # an empty form instead of editing a form. # keywords = sqlite.get_filter_value(db_file, key) @@ -932,6 +1124,80 @@ class SlixfeedComponent(slixmpp.ComponentXMPP): return session + async def _handle_pubsub_add(self, iq, session): + jid_full = str(session['from']) + function_name = sys._getframe().f_code.co_name + logger.debug('{}: jid_full: {}' + .format(function_name, jid_full)) + jid_bare = session['from'].bare + if not is_operator(self, jid_bare): + text_warn = 'PubSub is not available.' + session['notes'] = [['warn', text_warn]] + return session + chat_type = await get_chat_type(self, jid_bare) + moderator = None + if chat_type == 'groupchat': + moderator = is_moderator(self, jid_bare, jid_full) + if chat_type == 'chat' or moderator: + form = self['xep_0004'].make_form('form', 'Publish') + form['instructions'] = ('In order to publish via PubSub, you will ' + 'have to choose a PubSub Jabber ID (e.g. ' + '{}) and Slixfeed has to be allowed to ' + 'publish into it.' + .format('pubsub.' + self.boundjid.host)) + form.add_field(var='subscription', + # TODO Make it possible to add several subscriptions at once; + # Similarly to BitTorrent trackers list + # ftype='text-multi', + # label='Subscription URLs', + # desc=('Add subscriptions one time per ' + # 'subscription.'), + ftype='text-single', + label='URL', + desc='Enter a subscription URL.', + value='http://', + required=True) + form.add_field(var='jid', + ftype='text-single', + label='PubSub', + desc='Enter a PubSub Jabber ID.', + value=self.boundjid.bare, + required=True) + form.add_field(var='node', + ftype='text-single', + label='Node', + desc=('Enter a node to publish to (The default ' + 'value is "urn:xmpp:microblog:0" which ' + 'is allocated to each Jabber ID in the ' + 'Movim system).'), + value='urn:xmpp:microblog:0', + required=False) + options = form.add_field(var='xep', + ftype='list-single', + label='Protocol', + desc='Select XMPP Extension Protocol.', + value='0060', + required=True) + options.addOption('XEP-0060: Publish-Subscribe', '0060') + options.addOption('XEP-0277: Microblogging over XMPP', '0277') + options.addOption('XEP-0472: Pubsub Social Feed', '0472') + # form.add_field(var='scan', + # ftype='boolean', + # label='Scan', + # desc='Scan URL for validity (recommended).', + # value=True) + session['allow_prev'] = False + session['has_next'] = True + session['next'] = self._handle_subscription_new + session['prev'] = None + session['payload'] = form + else: + text_warn = ('This resource is restricted to moderators of {}.' + .format(jid_bare)) + session['notes'] = [['warn', text_warn]] + return session + + async def _handle_subscription_add(self, iq, session): jid_full = str(session['from']) function_name = sys._getframe().f_code.co_name @@ -954,9 +1220,18 @@ class SlixfeedComponent(slixmpp.ComponentXMPP): # 'subscription.'), ftype='text-single', label='URL', - desc='Enter subscription URL.', + desc='Enter a subscription URL.', value='http://', required=True) + if is_operator(self, jid_bare): + form.add_field(ftype='fixed', + label='Subscriber') + form.add_field(var='jid', + ftype='text-single', + label='Jabber ID', + desc=('Enter a Jabber ID to add the ' + 'subscription to (The default Jabber ID ' + 'is your own).')) # form.add_field(var='scan', # ftype='boolean', # label='Scan', @@ -1975,7 +2250,7 @@ class SlixfeedComponent(slixmpp.ComponentXMPP): ftype='list-single', label='Choose', required=True, - value='admin') + value='export') jid = session['from'].bare if is_operator(self, jid): options.addOption('Administration', 'admin') @@ -2250,7 +2525,8 @@ class SlixfeedComponent(slixmpp.ComponentXMPP): jid_bare = session['from'].bare jid_file = jid_bare.replace('/', '_') db_file = config.get_pathname_to_database(jid_file) - count = await action.import_opml(db_file, url) + result = await fetch.http(url) + count = await action.import_opml(db_file, result) try: int(count) # form = self['xep_0004'].make_form('result', 'Done') @@ -2827,9 +3103,8 @@ class SlixfeedComponent(slixmpp.ComponentXMPP): Config.add_settings_jid(self.settings, jid_bare, db_file) # In this case (as is typical), the payload is a form values = payload['values'] - for value in values: - key = value - val = values[value] + for key in values: + val = values[key] if key in ('enabled', 'media', 'old'): if val == True: diff --git a/slixfeed/xmpp/iq.py b/slixfeed/xmpp/iq.py index b139e36..df87dfd 100644 --- a/slixfeed/xmpp/iq.py +++ b/slixfeed/xmpp/iq.py @@ -1,14 +1,14 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -from slixmpp.exceptions import IqError +import logging +from slixmpp.exceptions import IqError, IqTimeout class XmppIQ: async def send(self, iq): try: await iq.send(timeout=5) - except IqError as e: - if e.etype == 'cancel' and e.condition == 'conflict': - return - raise + except (IqError, IqTimeout) as e: + logging.error('Error XmppIQ') + logging.error(str(e)) diff --git a/slixfeed/xmpp/process.py b/slixfeed/xmpp/process.py index 48a5a7a..53554e6 100644 --- a/slixfeed/xmpp/process.py +++ b/slixfeed/xmpp/process.py @@ -23,14 +23,13 @@ TODO """ -from slixfeed.xmpp.publish import XmppPubsub -from slixfeed.xmpp.iq import XmppIQ - import asyncio +from feedparser import parse import logging import os import slixfeed.action as action import slixfeed.config as config +import slixfeed.crawl as crawl from slixfeed.config import Config import slixfeed.dt as dt import slixfeed.fetch as fetch @@ -109,15 +108,19 @@ async def message(self, message): # pending_tasks_num = len(self.pending_tasks[jid_bare]) pending_tasks_num = randrange(10000, 99999) self.pending_tasks[jid_bare][pending_tasks_num] = status_message + # self.pending_tasks_counter += 1 + # self.pending_tasks[jid_bare][self.pending_tasks_counter] = status_message XmppPresence.send(self, jid_bare, status_message, status_type=status_type) db_file = config.get_pathname_to_database(jid_file) - count = await action.import_opml(db_file, url) + result = await fetch.http(url) + count = await action.import_opml(db_file, result) if count: response = 'Successfully imported {} feeds.'.format(count) else: response = 'OPML file was not imported.' del self.pending_tasks[jid_bare][pending_tasks_num] + # del self.pending_tasks[jid_bare][self.pending_tasks_counter] key_list = ['status'] await task.start_tasks_xmpp(self, jid_bare, key_list) XmppMessage.send_reply(self, message, response) @@ -286,13 +289,6 @@ async def message(self, message): 'Send "help" for further instructions.\n' .format(self.alias)) XmppMessage.send_reply(self, message, response) - - title = message_lowercase - summary = message_text - # from random import randrange - node = str(randrange(10000, 99999)) - iq = XmppPubsub.create(self, title, summary, node) - await XmppIQ.send(self, iq) # case _ if message_lowercase.startswith('activate'): # if message['type'] == 'groupchat': @@ -529,6 +525,8 @@ async def message(self, message): # pending_tasks_num = len(self.pending_tasks[jid_bare]) pending_tasks_num = randrange(10000, 99999) self.pending_tasks[jid_bare][pending_tasks_num] = status_message + # self.pending_tasks_counter += 1 + # self.pending_tasks[jid_bare][self.pending_tasks_counter] = status_message XmppPresence.send(self, jid_bare, status_message, status_type=status_type) filename = await action.export_feeds(self, jid_bare, @@ -541,6 +539,7 @@ async def message(self, message): chat_type = await get_chat_type(self, jid_bare) XmppMessage.send_oob(self, jid_bare, url, chat_type) del self.pending_tasks[jid_bare][pending_tasks_num] + # del self.pending_tasks[jid_bare][self.pending_tasks_counter] key_list = ['status'] await task.start_tasks_xmpp(self, jid_bare, key_list) else: @@ -579,6 +578,8 @@ async def message(self, message): # pending_tasks_num = len(self.pending_tasks[jid_bare]) pending_tasks_num = randrange(10000, 99999) self.pending_tasks[jid_bare][pending_tasks_num] = status_message + # self.pending_tasks_counter += 1 + # self.pending_tasks[jid_bare][self.pending_tasks_counter] = status_message XmppPresence.send(self, jid_bare, status_message, status_type=status_type) db_file = config.get_pathname_to_database(jid_file) @@ -642,6 +643,7 @@ async def message(self, message): 'Try: epub, html, md (markdown), ' 'pdf, or txt (text)') del self.pending_tasks[jid_bare][pending_tasks_num] + # del self.pending_tasks[jid_bare][self.pending_tasks_counter] key_list = ['status'] await task.start_tasks_xmpp(self, jid_bare, key_list) if response: @@ -657,16 +659,20 @@ async def message(self, message): # pending_tasks_num = len(self.pending_tasks[jid_bare]) pending_tasks_num = randrange(10000, 99999) self.pending_tasks[jid_bare][pending_tasks_num] = status_message + # self.pending_tasks_counter += 1 + # self.pending_tasks[jid_bare][self.pending_tasks_counter] = status_message XmppPresence.send(self, jid_bare, status_message, status_type=status_type) db_file = config.get_pathname_to_database(jid_file) - count = await action.import_opml(db_file, url) + result = await fetch.http(url) + count = await action.import_opml(db_file, result) if count: response = ('Successfully imported {} feeds.' .format(count)) else: response = 'OPML file was not imported.' del self.pending_tasks[jid_bare][pending_tasks_num] + # del self.pending_tasks[jid_bare][self.pending_tasks_counter] key_list = ['status'] await task.start_tasks_xmpp(self, jid_bare, key_list) XmppMessage.send_reply(self, message, response) @@ -680,7 +686,8 @@ async def message(self, message): # pending_tasks_num = len(self.pending_tasks[jid_bare]) pending_tasks_num = randrange(10000, 99999) self.pending_tasks[jid_bare][pending_tasks_num] = status_message - print(self.pending_tasks) + # self.pending_tasks_counter += 1 + # self.pending_tasks[jid_bare][self.pending_tasks_counter] = status_message XmppPresence.send(self, jid_bare, status_message, status_type=status_type) if url.startswith('feed:'): @@ -716,8 +723,8 @@ async def message(self, message): 'added to subscription list.' .format(result['link'], result['name'])) # task.clean_tasks_xmpp(self, jid_bare, ['status']) - print(self.pending_tasks) del self.pending_tasks[jid_bare][pending_tasks_num] + # del self.pending_tasks[jid_bare][self.pending_tasks_counter] print(self.pending_tasks) key_list = ['status'] await task.start_tasks_xmpp(self, jid_bare, key_list) @@ -929,7 +936,7 @@ async def message(self, message): status_type = 'dnd' status_message = ('📫️ Processing request to fetch data ' 'from {}'.format(url)) - + pending_tasks_num = randrange(10000, 99999) self.pending_tasks[jid_bare][pending_tasks_num] = status_message XmppPresence.send(self, jid_bare, status_message, status_type=status_type) @@ -939,7 +946,36 @@ async def message(self, message): match len(data): case 1: if url.startswith('http'): - response = await action.view_feed(url) + while True: + result = await fetch.http(url) + if not result['error']: + document = result['content'] + status = result['status_code'] + feed = parse(document) + # if is_feed(url, feed): + if action.is_feed(feed): + response = action.view_feed(url, feed) + break + else: + result = await crawl.probe_page(url, document) + if isinstance(result, list): + results = result + response = ("Web feeds found for {}\n\n```\n" + .format(url)) + for result in results: + response += ("Title : {}\n" + "Link : {}\n" + "\n" + .format(result['name'], result['link'])) + response += ('```\nTotal of {} feeds.' + .format(len(results))) + break + else: + url = result[0] + else: + response = ('> {}\nFailed to load URL. Reason: {}' + .format(url, status)) + break else: response = ('No action has been taken.' '\n' @@ -947,7 +983,36 @@ async def message(self, message): case 2: num = data[1] if url.startswith('http'): - response = await action.view_entry(url, num) + while True: + result = await fetch.http(url) + if not result['error']: + document = result['content'] + status = result['status_code'] + feed = parse(document) + # if is_feed(url, feed): + if action.is_feed(feed): + response = action.view_entry(url, feed, num) + break + else: + result = await crawl.probe_page(url, document) + if isinstance(result, list): + results = result + response = ("Web feeds found for {}\n\n```\n" + .format(url)) + for result in results: + response += ("Title : {}\n" + "Link : {}\n" + "\n" + .format(result['name'], result['link'])) + response += ('```\nTotal of {} feeds.' + .format(len(results))) + break + else: + url = result[0] + else: + response = ('> {}\nFailed to load URL. Reason: {}' + .format(url, status)) + break else: response = ('No action has been taken.' '\n' @@ -1041,6 +1106,8 @@ async def message(self, message): # pending_tasks_num = len(self.pending_tasks[jid_bare]) pending_tasks_num = randrange(10000, 99999) self.pending_tasks[jid_bare][pending_tasks_num] = status_message + # self.pending_tasks_counter += 1 + # self.pending_tasks[jid_bare][self.pending_tasks_counter] = status_message XmppPresence.send(self, jid_bare, status_message, status_type=status_type) db_file = config.get_pathname_to_database(jid_file) @@ -1083,6 +1150,7 @@ async def message(self, message): response = 'All entries have been marked as read.' XmppMessage.send_reply(self, message, response) del self.pending_tasks[jid_bare][pending_tasks_num] + # del self.pending_tasks[jid_bare][self.pending_tasks_counter] key_list = ['status'] await task.start_tasks_xmpp(self, jid_bare, key_list) case _ if message_lowercase.startswith('search '): diff --git a/slixfeed/xmpp/publish.py b/slixfeed/xmpp/publish.py index 14cb4f9..ba3a5b9 100644 --- a/slixfeed/xmpp/publish.py +++ b/slixfeed/xmpp/publish.py @@ -3,12 +3,18 @@ # TODO Implement XEP-0472: Pubsub Social Feed +import hashlib +import slixmpp.plugins.xep_0060.stanza.pubsub as pubsub +from slixmpp.xmlstream import ET + class XmppPubsub: - def create(self, title, summary, node, server): + + # TODO Make use of var "xep" with match/case + def create_node(self, jid, node, xep ,title, summary=None): jid_from = str(self.boundjid) if self.is_component else None - iq = self.Iq(stype="set", - sto=server, + iq = self.Iq(stype='set', + sto=jid, sfrom=jid_from) iq['pubsub']['create']['node'] = node form = iq['pubsub']['configure']['form'] @@ -42,3 +48,104 @@ class XmppPubsub: value='http://www.w3.org/2005/Atom') return iq + + + def create_entry(self, jid, node, entry, version): + iq = self.Iq(stype="set", sto=jid) + iq['pubsub']['publish']['node'] = node + + item = pubsub.Item() + + # From atomtopubsub: + # character / is causing a bug in movim. replacing : and , with - in id. + # It provides nicer urls. + + # Respond to atomtopubsub: + # I think it would be a good idea to use md5 checksum of Url as Id for + # cross reference, and namely - in another project to utilize PubSub as + # links sharing system (see del.icio.us) - to share node entries. + + # NOTE Warning: Entry might not have a link + # TODO Handle situation error + url_encoded = entry.link.encode() + url_hashed = hashlib.md5(url_encoded) + url_digest = url_hashed.hexdigest() + item['id'] = url_digest + + node_entry = ET.Element("entry") + node_entry.set('xmlns', 'http://www.w3.org/2005/Atom') + + title = ET.SubElement(node_entry, "title") + title.text = entry.title + + updated = ET.SubElement(node_entry, "updated") + updated.text = entry.updated + + # Content + if version == 'atom3': + + if hasattr(entry.content[0], 'type'): + content = ET.SubElement(node_entry, "content") + content.set('type', entry.content[0].type) + content.text = entry.content[0].value + + elif version =='rss20' or 'rss10' or 'atom10': + if hasattr(entry, "content"): + content = ET.SubElement(node_entry, "content") + content.set('type', 'text/html') + content.text = entry.content[0].value + + elif hasattr(entry, "description"): + content = ET.SubElement(node_entry,"content") + content.set('type', 'text/html') + content.text = entry.description + print('In Description - PublishX') + + # Links + if hasattr(entry, 'links'): + for l in entry.links: + link = ET.SubElement(node_entry, "link") + if hasattr(l, 'href'): + link.set('href', l['href']) + link.set('type', l['type']) + link.set('rel', l['rel']) + elif hasattr(entry, 'link'): + link.set('href', entry['link']) + + # Tags + if hasattr(entry, 'tags'): + for t in entry.tags: + tag = ET.SubElement(node_entry, "category") + tag.set('term', t.term) + + # Categories + if hasattr(entry,'category'): + for c in entry["category"]: + cat = ET.SubElement(node_entry, "category") + cat.set('category', entry.category[0]) + + # Authors + if version == 'atom03': + if hasattr(entry, 'authors'): + author = ET.SubElement(node_entry, "author") + name = ET.SubElement(author, "name") + name.text = entry.authors[0].name + if hasattr(entry.authors[0], 'href'): + uri = ET.SubElement(author, "uri") + uri.text = entry.authors[0].href + + elif version == 'rss20' or 'rss10' or 'atom10': + if hasattr(entry, 'author'): + author = ET.SubElement(node_entry, "author") + name = ET.SubElement(node_entry, "author") + name.text = entry.author + + if hasattr(entry.author, 'href'): + uri = ET.SubElement(author, "uri") + uri.text = entry.authors[0].href + + item['payload'] = node_entry + + iq['pubsub']['publish'].append(item) + + return iq