Add PubSub functionality.

This commit is contained in:
Schimon Jehudah 2024-03-24 08:14:20 +00:00
parent aed43f34dd
commit 35beab7802
7 changed files with 900 additions and 214 deletions

View file

@ -745,11 +745,10 @@ def export_to_opml(jid, filename, results):
tree.write(filename)
async def import_opml(db_file, url):
async def import_opml(db_file, result):
function_name = sys._getframe().f_code.co_name
logger.debug('{}: db_file: {} url: {}'
.format(function_name, db_file, url))
result = await fetch.http(url)
logger.debug('{}: db_file: {}'
.format(function_name, db_file))
if not result['error']:
document = result['content']
root = ET.fromstring(document)
@ -1070,18 +1069,10 @@ async def scan_json(self, jid_bare, db_file, url):
new_entries)
async def view_feed(url):
def view_feed(url, feed):
function_name = sys._getframe().f_code.co_name
logger.debug('{}: url: {}'
.format(function_name, url))
while True:
result = await fetch.http(url)
if not result['error']:
document = result['content']
status = result['status_code']
feed = parse(document)
# if is_feed(url, feed):
if is_feed(feed):
if "title" in feed["feed"].keys():
title = feed["feed"]["title"]
else:
@ -1120,33 +1111,13 @@ async def view_feed(url):
response += (
"```\nSource: {}"
).format(url)
break
else:
result = await crawl.probe_page(url, document)
if isinstance(result, str):
response = result
break
else:
url = result[0]
else:
response = ('> {}\nFailed to load URL. Reason: {}'
.format(url, status))
break
return response
async def view_entry(url, num):
def view_entry(url, feed, num):
function_name = sys._getframe().f_code.co_name
logger.debug('{}: url: {} num: {}'
.format(function_name, url, num))
while True:
result = await fetch.http(url)
if not result['error']:
document = result['content']
status = result['status_code']
feed = parse(document)
# if is_feed(url, feed):
if is_feed(feed):
if "title" in feed["feed"].keys():
title = feed["feed"]["title"]
else:
@ -1189,18 +1160,6 @@ async def view_entry(url, num):
"{}\n"
"\n"
.format(title, summary, link))
break
else:
result = await crawl.probe_page(url, document)
if isinstance(result, str):
response = result
break
else:
url = result[0]
else:
response = ('> {}\nFailed to load URL. Reason: {}'
.format(url, status))
break
return response

View file

@ -1,2 +1,2 @@
__version__ = '0.1.46'
__version_info__ = (0, 1, 46)
__version__ = '0.1.47'
__version_info__ = (0, 1, 47)

View file

@ -30,10 +30,12 @@ NOTE
"""
import asyncio
from feedparser import parse
import slixmpp
import slixfeed.task as task
from slixfeed.url import join_url, trim_url
# from slixmpp.plugins.xep_0363.http_upload import FileTooBig, HTTPError, UploadServiceNotFound
# from slixmpp.plugins.xep_0402 import BookmarkStorage, Conference
# from slixmpp.plugins.xep_0048.stanza import Bookmarks
@ -49,9 +51,11 @@ from slixfeed.version import __version__
from slixfeed.xmpp.bookmark import XmppBookmark
from slixfeed.xmpp.connect import XmppConnect
from slixfeed.xmpp.muc import XmppGroupchat
from slixfeed.xmpp.iq import XmppIQ
from slixfeed.xmpp.message import XmppMessage
import slixfeed.xmpp.process as process
import slixfeed.xmpp.profile as profile
from slixfeed.xmpp.publish import XmppPubsub
from slixfeed.xmpp.roster import XmppRoster
# import slixfeed.xmpp.service as service
from slixfeed.xmpp.presence import XmppPresence
@ -123,6 +127,9 @@ class Slixfeed(slixmpp.ClientXMPP):
# Handler for task messages
self.pending_tasks = {}
# Handler for task messages counter
# self.pending_tasks_counter = 0
# Handler for ping
self.task_ping_instance = {}
@ -684,15 +691,18 @@ class Slixfeed(slixmpp.ClientXMPP):
# NOTE https://codeberg.org/poezio/slixmpp/issues/3515
# if is_operator(self, jid_bare):
self['xep_0050'].add_command(node='recent',
name='📰️ Browse',
handler=self._handle_recent)
self['xep_0050'].add_command(node='subscription',
name='🪶️ Subscribe',
handler=self._handle_subscription_add)
self['xep_0050'].add_command(node='publish',
name='📣️ Publish',
handler=self._handle_publish)
name='📻 Publish',
handler=self._handle_pubsub_add)
self['xep_0050'].add_command(node='post',
name='📣️ Post',
handler=self._handle_post)
self['xep_0050'].add_command(node='recent',
name='📰️ Browse',
handler=self._handle_recent)
self['xep_0050'].add_command(node='subscriptions',
name='🎫️ Subscriptions',
handler=self._handle_subscriptions)
@ -727,33 +737,56 @@ class Slixfeed(slixmpp.ClientXMPP):
# Special interface
# http://jabber.org/protocol/commands#actions
async def _handle_publish(self, iq, session):
form = self['xep_0004'].make_form('form', 'Publish')
form['instructions'] = ('In order to publish via PubSub, you will '
'have to choose a PubSub hostname and '
'have a privilege to publish into it.')
# TODO Select from list-multi
async def _handle_post(self, iq, session):
jid_bare = session['from'].bare
if not is_operator(self, jid_bare):
text_warn = 'PubSub is not available.'
session['notes'] = [['warn', text_warn]]
return session
form = self['xep_0004'].make_form('form', 'Post')
form['instructions'] = ('In order to post to PubSub, you will have to '
'choose a PubSub Jabber ID (e.g. {}) and '
'Slixfeed has to be allowed to publish into '
'it.'
.format(self.boundjid.bare))
form.add_field(var='url',
ftype='text-single',
label='URL',
desc='Enter subscription URL.',
desc='Enter a subscription URL.',
value='http://',
required=True)
form.add_field(var='pubsub',
# options = form.add_field(var='jid',
# ftype='list-single',
# label='PubSub',
# desc='Select a PubSub Jabber ID.',
# value=self.boundjid.bare,
# required=True)
# iq = await self['xep_0030'].get_items(jid=self.boundjid.domain)
# for item in iq['disco_items']['items']:
# jid = item[0]
# if item[1]: name = item[1]
# elif item[2]: name = item[2]
# else: name = jid
# options.addOption(jid, name)
form.add_field(var='jid',
ftype='text-single',
label='PubSub',
desc='Enter a PubSub URL.',
value='pubsub.' + self.boundjid.host,
desc='Enter a PubSub Jabber ID.',
value=self.boundjid.bare,
# value='pubsub.' + self.boundjid.host,
required=True)
form.add_field(var='node',
ftype='text-single',
label='Node',
desc='Node to publish at.',
desc=('Enter a node to publish to (The default value '
'is "urn:xmpp:microblog:0" which is allocated to '
'each Jabber ID in the Movim system).'),
value='urn:xmpp:microblog:0',
required=False)
options = form.add_field(var='xep',
ftype='list-single',
label='Type',
desc='Select XEP.',
label='Protocol',
desc='Select XMPP Extension Protocol.',
value='0060',
required=True)
options.addOption('XEP-0060: Publish-Subscribe', '0060')
@ -766,15 +799,177 @@ class Slixfeed(slixmpp.ClientXMPP):
session['payload'] = form
return session
def _handle_preview(self, payload, session):
async def _handle_preview(self, payload, session):
jid_full = str(session['from'])
function_name = sys._getframe().f_code.co_name
logger.debug('{}: jid_full: {}'
.format(function_name, jid_full))
text_note = ('PubSub support will be available soon.')
session['notes'] = [['info', text_note]]
values = payload['values']
jid = values['jid']
jid_bare = session['from'].bare
if jid != jid_bare and not is_operator(self, jid_bare):
text_warn = ('Posting to {} is restricted to operators only.'
.format(jid_bare))
session['allow_prev'] = False
session['has_next'] = False
session['next'] = None
session['notes'] = [['warn', text_warn]]
session['prev'] = None
session['payload'] = None
return session
node = values['node']
url = values['url']
xep = values['xep']
form = self['xep_0004'].make_form('form', 'Publish')
while True:
result = await fetch.http(url)
status = result['status_code']
if not result['error']:
document = result['content']
feed = parse(document)
# if is_feed(url, feed):
if action.is_feed(feed):
form['instructions'] = 'Select entries to publish.'
options = form.add_field(var='entries',
ftype='list-multi',
label='Titles',
desc='Select entries to post.',
required=True)
if "title" in feed["feed"].keys():
title = feed["feed"]["title"]
else:
title = url
entries = feed.entries
entry_ix = 0
for entry in entries:
if entry.has_key("title"):
title = entry.title
else:
if entry.has_key("published"):
title = entry.published
title = dt.rfc2822_to_iso8601(title)
elif entry.has_key("updated"):
title = entry.updated
title = dt.rfc2822_to_iso8601(title)
else:
title = "*** No title ***"
options.addOption(title, str(entry_ix))
entry_ix += 1
if entry_ix > 9:
break
session['allow_prev'] = True
session['has_next'] = True
session['next'] = self._handle_post_complete
session['prev'] = self._handle_post
session['payload'] = form
break
else:
result = await crawl.probe_page(url, document)
if isinstance(result, list):
form['instructions'] = ('Discovered {} subscriptions '
'from the given URL. Please '
'choose a subscription.'
.format(len(result)))
options = form.add_field(var='url',
ftype='list-single',
label='Feeds',
desc='Select a feed.',
required=True)
results = result
for result in results:
title = result['name']
url = result['link']
title = title if title else url
options.addOption(title, url)
session['allow_prev'] = True
session['has_next'] = True
session['next'] = self._handle_preview
session['prev'] = self._handle_post
session['payload'] = form
break
else:
url = result['link']
else:
text_error = ('Failed to load URL {}'
'\n\n'
'Reason: {}'
.format(url, status))
session['notes'] = [['error', text_error]]
session['payload'] = None
break
form.add_field(var='node',
ftype='hidden',
value=node)
form.add_field(var='jid',
ftype='hidden',
value=jid)
# It is essential to place URL at the end, because it might mutate
# For example http://blacklistednews.com would change
# to https://www.blacklistednews.com/rss.php
form.add_field(var='url',
ftype='hidden',
value=url)
form.add_field(var='xep',
ftype='hidden',
value=xep)
return session
async def _handle_post_complete(self, payload, session):
values = payload['values']
entries = values['entries']
# It might not be good to pass feed object as its size might be too big
# Consider a handler self.feeds[url][feed] or self.feeds[jid][url][feed]
# It is not possible to assign non-str to transfer.
# feed = values['feed']
node = values['node'][0]
jid = values['jid'][0]
url = values['url'][0]
xep = values['xep'][0]
result = await fetch.http(url)
if 'content' in result:
document = result['content']
feed = parse(document)
if feed.feed.has_key('title'):
feed_title = feed.feed.title
if feed.feed.has_key('description'):
feed_summary = feed.feed.description
elif feed.feed.has_key('subtitle'):
feed_summary = feed.feed.subtitle
else:
feed_summary = None
iq_create_node = XmppPubsub.create_node(
self, jid, node, xep, feed_title, feed_summary)
await XmppIQ.send(self, iq_create_node)
feed_version = feed.version
for entry in entries:
entry = int(entry)
feed_entry = feed.entries[entry]
# if feed.entries[entry].has_key("title"):
# title = feed.entries[entry].title
# else:
# if feed.entries[entry].has_key("published"):
# title = feed.entries[entry].published
# title = dt.rfc2822_to_iso8601(title)
# elif feed.entries[entry].has_key("updated"):
# title = feed.entries[entry].updated
# title = dt.rfc2822_to_iso8601(title)
# else:
# title = "*** No title ***"
# if feed.entries[entry].has_key("summary"):
# summary = feed.entries[entry].summary
iq_create_entry = XmppPubsub.create_entry(
self, jid, node, feed_entry, feed_version)
await XmppIQ.send(self, iq_create_entry)
text_info = 'Posted {} entries.'.format(len(entries))
session['allow_prev'] = False
session['has_next'] = False
session['next'] = None
session['notes'] = [['info', text_info]]
session['prev'] = None
session['payload'] = None
else:
session['payload'] = payload
return session
async def _handle_profile(self, iq, session):
jid_full = str(session['from'])
@ -950,9 +1145,8 @@ class Slixfeed(slixmpp.ClientXMPP):
db_file = config.get_pathname_to_database(jid_file)
# In this case (as is typical), the payload is a form
values = payload['values']
for value in values:
key = value
val = values[value]
for key in values:
val = values[key]
# NOTE We might want to add new keywords from
# an empty form instead of editing a form.
# keywords = sqlite.get_filter_value(db_file, key)
@ -973,6 +1167,80 @@ class Slixfeed(slixmpp.ClientXMPP):
return session
async def _handle_pubsub_add(self, iq, session):
jid_full = str(session['from'])
function_name = sys._getframe().f_code.co_name
logger.debug('{}: jid_full: {}'
.format(function_name, jid_full))
jid_bare = session['from'].bare
if not is_operator(self, jid_bare):
text_warn = 'PubSub is not available.'
session['notes'] = [['warn', text_warn]]
return session
chat_type = await get_chat_type(self, jid_bare)
moderator = None
if chat_type == 'groupchat':
moderator = is_moderator(self, jid_bare, jid_full)
if chat_type == 'chat' or moderator:
form = self['xep_0004'].make_form('form', 'Publish')
form['instructions'] = ('In order to publish via PubSub, you will '
'have to choose a PubSub Jabber ID (e.g. '
'{}) and Slixfeed has to be allowed to '
'publish into it.'
.format('pubsub.' + self.boundjid.host))
form.add_field(var='subscription',
# TODO Make it possible to add several subscriptions at once;
# Similarly to BitTorrent trackers list
# ftype='text-multi',
# label='Subscription URLs',
# desc=('Add subscriptions one time per '
# 'subscription.'),
ftype='text-single',
label='URL',
desc='Enter a subscription URL.',
value='http://',
required=True)
form.add_field(var='jid',
ftype='text-single',
label='PubSub',
desc='Enter a PubSub Jabber ID.',
value=self.boundjid.bare,
required=True)
form.add_field(var='node',
ftype='text-single',
label='Node',
desc=('Enter a node to publish to (The default '
'value is "urn:xmpp:microblog:0" which '
'is allocated to each Jabber ID in the '
'Movim system).'),
value='urn:xmpp:microblog:0',
required=False)
options = form.add_field(var='xep',
ftype='list-single',
label='Protocol',
desc='Select XMPP Extension Protocol.',
value='0060',
required=True)
options.addOption('XEP-0060: Publish-Subscribe', '0060')
options.addOption('XEP-0277: Microblogging over XMPP', '0277')
options.addOption('XEP-0472: Pubsub Social Feed', '0472')
# form.add_field(var='scan',
# ftype='boolean',
# label='Scan',
# desc='Scan URL for validity (recommended).',
# value=True)
session['allow_prev'] = False
session['has_next'] = True
session['next'] = self._handle_subscription_new
session['prev'] = None
session['payload'] = form
else:
text_warn = ('This resource is restricted to moderators of {}.'
.format(jid_bare))
session['notes'] = [['warn', text_warn]]
return session
async def _handle_subscription_add(self, iq, session):
jid_full = str(session['from'])
function_name = sys._getframe().f_code.co_name
@ -995,9 +1263,18 @@ class Slixfeed(slixmpp.ClientXMPP):
# 'subscription.'),
ftype='text-single',
label='URL',
desc='Enter subscription URL.',
desc='Enter a subscription URL.',
value='http://',
required=True)
if is_operator(self, jid_bare):
form.add_field(ftype='fixed',
label='Subscriber')
form.add_field(var='jid',
ftype='text-single',
label='Jabber ID',
desc=('Enter a Jabber ID to add the '
'subscription to (The default Jabber ID '
'is your own).'))
# form.add_field(var='scan',
# ftype='boolean',
# label='Scan',
@ -2016,7 +2293,7 @@ class Slixfeed(slixmpp.ClientXMPP):
ftype='list-single',
label='Choose',
required=True,
value='admin')
value='export')
jid = session['from'].bare
if is_operator(self, jid):
options.addOption('Administration', 'admin')
@ -2291,7 +2568,8 @@ class Slixfeed(slixmpp.ClientXMPP):
jid_bare = session['from'].bare
jid_file = jid_bare.replace('/', '_')
db_file = config.get_pathname_to_database(jid_file)
count = await action.import_opml(db_file, url)
result = await fetch.http(url)
count = await action.import_opml(db_file, result)
try:
int(count)
# form = self['xep_0004'].make_form('result', 'Done')
@ -2868,9 +3146,8 @@ class Slixfeed(slixmpp.ClientXMPP):
Config.add_settings_jid(self.settings, jid_bare, db_file)
# In this case (as is typical), the payload is a form
values = payload['values']
for value in values:
key = value
val = values[value]
for key in values:
val = values[key]
if key in ('enabled', 'media', 'old'):
if val == True:

View file

@ -37,9 +37,11 @@ from slixfeed.version import __version__
from slixfeed.xmpp.connect import XmppConnect
# NOTE MUC is possible for component
from slixfeed.xmpp.muc import XmppGroupchat
from slixfeed.xmpp.iq import XmppIQ
from slixfeed.xmpp.message import XmppMessage
import slixfeed.xmpp.process as process
import slixfeed.xmpp.profile as profile
from slixfeed.xmpp.publish import XmppPubsub
# from slixfeed.xmpp.roster import XmppRoster
# import slixfeed.xmpp.service as service
from slixfeed.xmpp.presence import XmppPresence
@ -114,6 +116,9 @@ class SlixfeedComponent(slixmpp.ComponentXMPP):
# Handler for task messages
self.pending_tasks = {}
# Handler for task messages counter
# self.pending_tasks_counter = 0
# Handler for ping
self.task_ping_instance = {}
@ -643,15 +648,18 @@ class SlixfeedComponent(slixmpp.ComponentXMPP):
# NOTE https://codeberg.org/poezio/slixmpp/issues/3515
# if is_operator(self, jid_bare):
self['xep_0050'].add_command(node='recent',
name='📰️ Browse',
handler=self._handle_recent)
self['xep_0050'].add_command(node='subscription',
name='🪶️ Subscribe',
handler=self._handle_subscription_add)
self['xep_0050'].add_command(node='publish',
name='📣️ Publish',
handler=self._handle_publish)
name='📻 Publish',
handler=self._handle_pubsub_add)
self['xep_0050'].add_command(node='post',
name='📣️ Post',
handler=self._handle_post)
self['xep_0050'].add_command(node='recent',
name='📰️ Browse',
handler=self._handle_recent)
self['xep_0050'].add_command(node='subscriptions',
name='🎫️ Subscriptions',
handler=self._handle_subscriptions)
@ -686,33 +694,56 @@ class SlixfeedComponent(slixmpp.ComponentXMPP):
# Special interface
# http://jabber.org/protocol/commands#actions
async def _handle_publish(self, iq, session):
form = self['xep_0004'].make_form('form', 'Publish')
form['instructions'] = ('In order to publish via PubSub, you will '
'have to choose a PubSub hostname and '
'have a privilege to publish into it.')
# TODO Select from list-multi
async def _handle_post(self, iq, session):
jid_bare = session['from'].bare
if not is_operator(self, jid_bare):
text_warn = 'PubSub is not available.'
session['notes'] = [['warn', text_warn]]
return session
form = self['xep_0004'].make_form('form', 'Post')
form['instructions'] = ('In order to post to PubSub, you will have to '
'choose a PubSub Jabber ID (e.g. {}) and '
'Slixfeed has to be allowed to publish into '
'it.'
.format(self.boundjid.bare))
form.add_field(var='url',
ftype='text-single',
label='URL',
desc='Enter subscription URL.',
desc='Enter a subscription URL.',
value='http://',
required=True)
form.add_field(var='pubsub',
# options = form.add_field(var='jid',
# ftype='list-single',
# label='PubSub',
# desc='Select a PubSub Jabber ID.',
# value=self.boundjid.bare,
# required=True)
# iq = await self['xep_0030'].get_items(jid=self.boundjid.domain)
# for item in iq['disco_items']['items']:
# jid = item[0]
# if item[1]: name = item[1]
# elif item[2]: name = item[2]
# else: name = jid
# options.addOption(jid, name)
form.add_field(var='jid',
ftype='text-single',
label='PubSub',
desc='Enter a PubSub URL.',
value='pubsub.' + self.boundjid.host,
desc='Enter a PubSub Jabber ID.',
value=self.boundjid.bare,
# value='pubsub.' + self.boundjid.host,
required=True)
form.add_field(var='node',
ftype='text-single',
label='Node',
desc='Node to publish at.',
desc=('Enter a node to publish to (The default value '
'is "urn:xmpp:microblog:0" which is allocated to '
'each Jabber ID in the Movim system).'),
value='urn:xmpp:microblog:0',
required=False)
options = form.add_field(var='xep',
ftype='list-single',
label='Type',
desc='Select XEP.',
label='Protocol',
desc='Select XMPP Extension Protocol.',
value='0060',
required=True)
options.addOption('XEP-0060: Publish-Subscribe', '0060')
@ -725,15 +756,177 @@ class SlixfeedComponent(slixmpp.ComponentXMPP):
session['payload'] = form
return session
def _handle_preview(self, payload, session):
async def _handle_preview(self, payload, session):
jid_full = str(session['from'])
function_name = sys._getframe().f_code.co_name
logger.debug('{}: jid_full: {}'
.format(function_name, jid_full))
text_note = ('PubSub support will be available soon.')
session['notes'] = [['info', text_note]]
values = payload['values']
jid = values['jid']
jid_bare = session['from'].bare
if jid != jid_bare and not is_operator(self, jid_bare):
text_warn = ('Posting to {} is restricted to operators only.'
.format(jid_bare))
session['allow_prev'] = False
session['has_next'] = False
session['next'] = None
session['notes'] = [['warn', text_warn]]
session['prev'] = None
session['payload'] = None
return session
node = values['node']
url = values['url']
xep = values['xep']
form = self['xep_0004'].make_form('form', 'Publish')
while True:
result = await fetch.http(url)
status = result['status_code']
if not result['error']:
document = result['content']
feed = parse(document)
# if is_feed(url, feed):
if action.is_feed(feed):
form['instructions'] = 'Select entries to publish.'
options = form.add_field(var='entries',
ftype='list-multi',
label='Titles',
desc='Select entries to post.',
required=True)
if "title" in feed["feed"].keys():
title = feed["feed"]["title"]
else:
title = url
entries = feed.entries
entry_ix = 0
for entry in entries:
if entry.has_key("title"):
title = entry.title
else:
if entry.has_key("published"):
title = entry.published
title = dt.rfc2822_to_iso8601(title)
elif entry.has_key("updated"):
title = entry.updated
title = dt.rfc2822_to_iso8601(title)
else:
title = "*** No title ***"
options.addOption(title, str(entry_ix))
entry_ix += 1
if entry_ix > 9:
break
session['allow_prev'] = True
session['has_next'] = True
session['next'] = self._handle_post_complete
session['prev'] = self._handle_post
session['payload'] = form
break
else:
result = await crawl.probe_page(url, document)
if isinstance(result, list):
form['instructions'] = ('Discovered {} subscriptions '
'from the given URL. Please '
'choose a subscription.'
.format(len(result)))
options = form.add_field(var='url',
ftype='list-single',
label='Feeds',
desc='Select a feed.',
required=True)
results = result
for result in results:
title = result['name']
url = result['link']
title = title if title else url
options.addOption(title, url)
session['allow_prev'] = True
session['has_next'] = True
session['next'] = self._handle_preview
session['prev'] = self._handle_post
session['payload'] = form
break
else:
url = result['link']
else:
text_error = ('Failed to load URL {}'
'\n\n'
'Reason: {}'
.format(url, status))
session['notes'] = [['error', text_error]]
session['payload'] = None
break
form.add_field(var='node',
ftype='hidden',
value=node)
form.add_field(var='jid',
ftype='hidden',
value=jid)
# It is essential to place URL at the end, because it might mutate
# For example http://blacklistednews.com would change
# to https://www.blacklistednews.com/rss.php
form.add_field(var='url',
ftype='hidden',
value=url)
form.add_field(var='xep',
ftype='hidden',
value=xep)
return session
async def _handle_post_complete(self, payload, session):
values = payload['values']
entries = values['entries']
# It might not be good to pass feed object as its size might be too big
# Consider a handler self.feeds[url][feed] or self.feeds[jid][url][feed]
# It is not possible to assign non-str to transfer.
# feed = values['feed']
node = values['node'][0]
jid = values['jid'][0]
url = values['url'][0]
xep = values['xep'][0]
result = await fetch.http(url)
if 'content' in result:
document = result['content']
feed = parse(document)
if feed.feed.has_key('title'):
feed_title = feed.feed.title
if feed.feed.has_key('description'):
feed_summary = feed.feed.description
elif feed.feed.has_key('subtitle'):
feed_summary = feed.feed.subtitle
else:
feed_summary = None
iq_create_node = XmppPubsub.create_node(
self, jid, node, xep, feed_title, feed_summary)
await XmppIQ.send(self, iq_create_node)
feed_version = feed.version
for entry in entries:
entry = int(entry)
feed_entry = feed.entries[entry]
# if feed.entries[entry].has_key("title"):
# title = feed.entries[entry].title
# else:
# if feed.entries[entry].has_key("published"):
# title = feed.entries[entry].published
# title = dt.rfc2822_to_iso8601(title)
# elif feed.entries[entry].has_key("updated"):
# title = feed.entries[entry].updated
# title = dt.rfc2822_to_iso8601(title)
# else:
# title = "*** No title ***"
# if feed.entries[entry].has_key("summary"):
# summary = feed.entries[entry].summary
iq_create_entry = XmppPubsub.create_entry(
self, jid, node, feed_entry, feed_version)
await XmppIQ.send(self, iq_create_entry)
text_info = 'Posted {} entries.'.format(len(entries))
session['allow_prev'] = False
session['has_next'] = False
session['next'] = None
session['notes'] = [['info', text_info]]
session['prev'] = None
session['payload'] = None
else:
session['payload'] = payload
return session
async def _handle_profile(self, iq, session):
jid_full = str(session['from'])
@ -909,9 +1102,8 @@ class SlixfeedComponent(slixmpp.ComponentXMPP):
db_file = config.get_pathname_to_database(jid_file)
# In this case (as is typical), the payload is a form
values = payload['values']
for value in values:
key = value
val = values[value]
for key in values:
val = values[key]
# NOTE We might want to add new keywords from
# an empty form instead of editing a form.
# keywords = sqlite.get_filter_value(db_file, key)
@ -932,6 +1124,80 @@ class SlixfeedComponent(slixmpp.ComponentXMPP):
return session
async def _handle_pubsub_add(self, iq, session):
jid_full = str(session['from'])
function_name = sys._getframe().f_code.co_name
logger.debug('{}: jid_full: {}'
.format(function_name, jid_full))
jid_bare = session['from'].bare
if not is_operator(self, jid_bare):
text_warn = 'PubSub is not available.'
session['notes'] = [['warn', text_warn]]
return session
chat_type = await get_chat_type(self, jid_bare)
moderator = None
if chat_type == 'groupchat':
moderator = is_moderator(self, jid_bare, jid_full)
if chat_type == 'chat' or moderator:
form = self['xep_0004'].make_form('form', 'Publish')
form['instructions'] = ('In order to publish via PubSub, you will '
'have to choose a PubSub Jabber ID (e.g. '
'{}) and Slixfeed has to be allowed to '
'publish into it.'
.format('pubsub.' + self.boundjid.host))
form.add_field(var='subscription',
# TODO Make it possible to add several subscriptions at once;
# Similarly to BitTorrent trackers list
# ftype='text-multi',
# label='Subscription URLs',
# desc=('Add subscriptions one time per '
# 'subscription.'),
ftype='text-single',
label='URL',
desc='Enter a subscription URL.',
value='http://',
required=True)
form.add_field(var='jid',
ftype='text-single',
label='PubSub',
desc='Enter a PubSub Jabber ID.',
value=self.boundjid.bare,
required=True)
form.add_field(var='node',
ftype='text-single',
label='Node',
desc=('Enter a node to publish to (The default '
'value is "urn:xmpp:microblog:0" which '
'is allocated to each Jabber ID in the '
'Movim system).'),
value='urn:xmpp:microblog:0',
required=False)
options = form.add_field(var='xep',
ftype='list-single',
label='Protocol',
desc='Select XMPP Extension Protocol.',
value='0060',
required=True)
options.addOption('XEP-0060: Publish-Subscribe', '0060')
options.addOption('XEP-0277: Microblogging over XMPP', '0277')
options.addOption('XEP-0472: Pubsub Social Feed', '0472')
# form.add_field(var='scan',
# ftype='boolean',
# label='Scan',
# desc='Scan URL for validity (recommended).',
# value=True)
session['allow_prev'] = False
session['has_next'] = True
session['next'] = self._handle_subscription_new
session['prev'] = None
session['payload'] = form
else:
text_warn = ('This resource is restricted to moderators of {}.'
.format(jid_bare))
session['notes'] = [['warn', text_warn]]
return session
async def _handle_subscription_add(self, iq, session):
jid_full = str(session['from'])
function_name = sys._getframe().f_code.co_name
@ -954,9 +1220,18 @@ class SlixfeedComponent(slixmpp.ComponentXMPP):
# 'subscription.'),
ftype='text-single',
label='URL',
desc='Enter subscription URL.',
desc='Enter a subscription URL.',
value='http://',
required=True)
if is_operator(self, jid_bare):
form.add_field(ftype='fixed',
label='Subscriber')
form.add_field(var='jid',
ftype='text-single',
label='Jabber ID',
desc=('Enter a Jabber ID to add the '
'subscription to (The default Jabber ID '
'is your own).'))
# form.add_field(var='scan',
# ftype='boolean',
# label='Scan',
@ -1975,7 +2250,7 @@ class SlixfeedComponent(slixmpp.ComponentXMPP):
ftype='list-single',
label='Choose',
required=True,
value='admin')
value='export')
jid = session['from'].bare
if is_operator(self, jid):
options.addOption('Administration', 'admin')
@ -2250,7 +2525,8 @@ class SlixfeedComponent(slixmpp.ComponentXMPP):
jid_bare = session['from'].bare
jid_file = jid_bare.replace('/', '_')
db_file = config.get_pathname_to_database(jid_file)
count = await action.import_opml(db_file, url)
result = await fetch.http(url)
count = await action.import_opml(db_file, result)
try:
int(count)
# form = self['xep_0004'].make_form('result', 'Done')
@ -2827,9 +3103,8 @@ class SlixfeedComponent(slixmpp.ComponentXMPP):
Config.add_settings_jid(self.settings, jid_bare, db_file)
# In this case (as is typical), the payload is a form
values = payload['values']
for value in values:
key = value
val = values[value]
for key in values:
val = values[key]
if key in ('enabled', 'media', 'old'):
if val == True:

View file

@ -1,14 +1,14 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from slixmpp.exceptions import IqError
import logging
from slixmpp.exceptions import IqError, IqTimeout
class XmppIQ:
async def send(self, iq):
try:
await iq.send(timeout=5)
except IqError as e:
if e.etype == 'cancel' and e.condition == 'conflict':
return
raise
except (IqError, IqTimeout) as e:
logging.error('Error XmppIQ')
logging.error(str(e))

View file

@ -23,14 +23,13 @@ TODO
"""
from slixfeed.xmpp.publish import XmppPubsub
from slixfeed.xmpp.iq import XmppIQ
import asyncio
from feedparser import parse
import logging
import os
import slixfeed.action as action
import slixfeed.config as config
import slixfeed.crawl as crawl
from slixfeed.config import Config
import slixfeed.dt as dt
import slixfeed.fetch as fetch
@ -109,15 +108,19 @@ async def message(self, message):
# pending_tasks_num = len(self.pending_tasks[jid_bare])
pending_tasks_num = randrange(10000, 99999)
self.pending_tasks[jid_bare][pending_tasks_num] = status_message
# self.pending_tasks_counter += 1
# self.pending_tasks[jid_bare][self.pending_tasks_counter] = status_message
XmppPresence.send(self, jid_bare, status_message,
status_type=status_type)
db_file = config.get_pathname_to_database(jid_file)
count = await action.import_opml(db_file, url)
result = await fetch.http(url)
count = await action.import_opml(db_file, result)
if count:
response = 'Successfully imported {} feeds.'.format(count)
else:
response = 'OPML file was not imported.'
del self.pending_tasks[jid_bare][pending_tasks_num]
# del self.pending_tasks[jid_bare][self.pending_tasks_counter]
key_list = ['status']
await task.start_tasks_xmpp(self, jid_bare, key_list)
XmppMessage.send_reply(self, message, response)
@ -287,13 +290,6 @@ async def message(self, message):
.format(self.alias))
XmppMessage.send_reply(self, message, response)
title = message_lowercase
summary = message_text
# from random import randrange
node = str(randrange(10000, 99999))
iq = XmppPubsub.create(self, title, summary, node)
await XmppIQ.send(self, iq)
# case _ if message_lowercase.startswith('activate'):
# if message['type'] == 'groupchat':
# acode = message[9:]
@ -529,6 +525,8 @@ async def message(self, message):
# pending_tasks_num = len(self.pending_tasks[jid_bare])
pending_tasks_num = randrange(10000, 99999)
self.pending_tasks[jid_bare][pending_tasks_num] = status_message
# self.pending_tasks_counter += 1
# self.pending_tasks[jid_bare][self.pending_tasks_counter] = status_message
XmppPresence.send(self, jid_bare, status_message,
status_type=status_type)
filename = await action.export_feeds(self, jid_bare,
@ -541,6 +539,7 @@ async def message(self, message):
chat_type = await get_chat_type(self, jid_bare)
XmppMessage.send_oob(self, jid_bare, url, chat_type)
del self.pending_tasks[jid_bare][pending_tasks_num]
# del self.pending_tasks[jid_bare][self.pending_tasks_counter]
key_list = ['status']
await task.start_tasks_xmpp(self, jid_bare, key_list)
else:
@ -579,6 +578,8 @@ async def message(self, message):
# pending_tasks_num = len(self.pending_tasks[jid_bare])
pending_tasks_num = randrange(10000, 99999)
self.pending_tasks[jid_bare][pending_tasks_num] = status_message
# self.pending_tasks_counter += 1
# self.pending_tasks[jid_bare][self.pending_tasks_counter] = status_message
XmppPresence.send(self, jid_bare, status_message,
status_type=status_type)
db_file = config.get_pathname_to_database(jid_file)
@ -642,6 +643,7 @@ async def message(self, message):
'Try: epub, html, md (markdown), '
'pdf, or txt (text)')
del self.pending_tasks[jid_bare][pending_tasks_num]
# del self.pending_tasks[jid_bare][self.pending_tasks_counter]
key_list = ['status']
await task.start_tasks_xmpp(self, jid_bare, key_list)
if response:
@ -657,16 +659,20 @@ async def message(self, message):
# pending_tasks_num = len(self.pending_tasks[jid_bare])
pending_tasks_num = randrange(10000, 99999)
self.pending_tasks[jid_bare][pending_tasks_num] = status_message
# self.pending_tasks_counter += 1
# self.pending_tasks[jid_bare][self.pending_tasks_counter] = status_message
XmppPresence.send(self, jid_bare, status_message,
status_type=status_type)
db_file = config.get_pathname_to_database(jid_file)
count = await action.import_opml(db_file, url)
result = await fetch.http(url)
count = await action.import_opml(db_file, result)
if count:
response = ('Successfully imported {} feeds.'
.format(count))
else:
response = 'OPML file was not imported.'
del self.pending_tasks[jid_bare][pending_tasks_num]
# del self.pending_tasks[jid_bare][self.pending_tasks_counter]
key_list = ['status']
await task.start_tasks_xmpp(self, jid_bare, key_list)
XmppMessage.send_reply(self, message, response)
@ -680,7 +686,8 @@ async def message(self, message):
# pending_tasks_num = len(self.pending_tasks[jid_bare])
pending_tasks_num = randrange(10000, 99999)
self.pending_tasks[jid_bare][pending_tasks_num] = status_message
print(self.pending_tasks)
# self.pending_tasks_counter += 1
# self.pending_tasks[jid_bare][self.pending_tasks_counter] = status_message
XmppPresence.send(self, jid_bare, status_message,
status_type=status_type)
if url.startswith('feed:'):
@ -716,8 +723,8 @@ async def message(self, message):
'added to subscription list.'
.format(result['link'], result['name']))
# task.clean_tasks_xmpp(self, jid_bare, ['status'])
print(self.pending_tasks)
del self.pending_tasks[jid_bare][pending_tasks_num]
# del self.pending_tasks[jid_bare][self.pending_tasks_counter]
print(self.pending_tasks)
key_list = ['status']
await task.start_tasks_xmpp(self, jid_bare, key_list)
@ -929,7 +936,7 @@ async def message(self, message):
status_type = 'dnd'
status_message = ('📫️ Processing request to fetch data '
'from {}'.format(url))
pending_tasks_num = randrange(10000, 99999)
self.pending_tasks[jid_bare][pending_tasks_num] = status_message
XmppPresence.send(self, jid_bare, status_message,
status_type=status_type)
@ -939,7 +946,36 @@ async def message(self, message):
match len(data):
case 1:
if url.startswith('http'):
response = await action.view_feed(url)
while True:
result = await fetch.http(url)
if not result['error']:
document = result['content']
status = result['status_code']
feed = parse(document)
# if is_feed(url, feed):
if action.is_feed(feed):
response = action.view_feed(url, feed)
break
else:
result = await crawl.probe_page(url, document)
if isinstance(result, list):
results = result
response = ("Web feeds found for {}\n\n```\n"
.format(url))
for result in results:
response += ("Title : {}\n"
"Link : {}\n"
"\n"
.format(result['name'], result['link']))
response += ('```\nTotal of {} feeds.'
.format(len(results)))
break
else:
url = result[0]
else:
response = ('> {}\nFailed to load URL. Reason: {}'
.format(url, status))
break
else:
response = ('No action has been taken.'
'\n'
@ -947,7 +983,36 @@ async def message(self, message):
case 2:
num = data[1]
if url.startswith('http'):
response = await action.view_entry(url, num)
while True:
result = await fetch.http(url)
if not result['error']:
document = result['content']
status = result['status_code']
feed = parse(document)
# if is_feed(url, feed):
if action.is_feed(feed):
response = action.view_entry(url, feed, num)
break
else:
result = await crawl.probe_page(url, document)
if isinstance(result, list):
results = result
response = ("Web feeds found for {}\n\n```\n"
.format(url))
for result in results:
response += ("Title : {}\n"
"Link : {}\n"
"\n"
.format(result['name'], result['link']))
response += ('```\nTotal of {} feeds.'
.format(len(results)))
break
else:
url = result[0]
else:
response = ('> {}\nFailed to load URL. Reason: {}'
.format(url, status))
break
else:
response = ('No action has been taken.'
'\n'
@ -1041,6 +1106,8 @@ async def message(self, message):
# pending_tasks_num = len(self.pending_tasks[jid_bare])
pending_tasks_num = randrange(10000, 99999)
self.pending_tasks[jid_bare][pending_tasks_num] = status_message
# self.pending_tasks_counter += 1
# self.pending_tasks[jid_bare][self.pending_tasks_counter] = status_message
XmppPresence.send(self, jid_bare, status_message,
status_type=status_type)
db_file = config.get_pathname_to_database(jid_file)
@ -1083,6 +1150,7 @@ async def message(self, message):
response = 'All entries have been marked as read.'
XmppMessage.send_reply(self, message, response)
del self.pending_tasks[jid_bare][pending_tasks_num]
# del self.pending_tasks[jid_bare][self.pending_tasks_counter]
key_list = ['status']
await task.start_tasks_xmpp(self, jid_bare, key_list)
case _ if message_lowercase.startswith('search '):

View file

@ -3,12 +3,18 @@
# TODO Implement XEP-0472: Pubsub Social Feed
import hashlib
import slixmpp.plugins.xep_0060.stanza.pubsub as pubsub
from slixmpp.xmlstream import ET
class XmppPubsub:
def create(self, title, summary, node, server):
# TODO Make use of var "xep" with match/case
def create_node(self, jid, node, xep ,title, summary=None):
jid_from = str(self.boundjid) if self.is_component else None
iq = self.Iq(stype="set",
sto=server,
iq = self.Iq(stype='set',
sto=jid,
sfrom=jid_from)
iq['pubsub']['create']['node'] = node
form = iq['pubsub']['configure']['form']
@ -42,3 +48,104 @@ class XmppPubsub:
value='http://www.w3.org/2005/Atom')
return iq
def create_entry(self, jid, node, entry, version):
iq = self.Iq(stype="set", sto=jid)
iq['pubsub']['publish']['node'] = node
item = pubsub.Item()
# From atomtopubsub:
# character / is causing a bug in movim. replacing : and , with - in id.
# It provides nicer urls.
# Respond to atomtopubsub:
# I think it would be a good idea to use md5 checksum of Url as Id for
# cross reference, and namely - in another project to utilize PubSub as
# links sharing system (see del.icio.us) - to share node entries.
# NOTE Warning: Entry might not have a link
# TODO Handle situation error
url_encoded = entry.link.encode()
url_hashed = hashlib.md5(url_encoded)
url_digest = url_hashed.hexdigest()
item['id'] = url_digest
node_entry = ET.Element("entry")
node_entry.set('xmlns', 'http://www.w3.org/2005/Atom')
title = ET.SubElement(node_entry, "title")
title.text = entry.title
updated = ET.SubElement(node_entry, "updated")
updated.text = entry.updated
# Content
if version == 'atom3':
if hasattr(entry.content[0], 'type'):
content = ET.SubElement(node_entry, "content")
content.set('type', entry.content[0].type)
content.text = entry.content[0].value
elif version =='rss20' or 'rss10' or 'atom10':
if hasattr(entry, "content"):
content = ET.SubElement(node_entry, "content")
content.set('type', 'text/html')
content.text = entry.content[0].value
elif hasattr(entry, "description"):
content = ET.SubElement(node_entry,"content")
content.set('type', 'text/html')
content.text = entry.description
print('In Description - PublishX')
# Links
if hasattr(entry, 'links'):
for l in entry.links:
link = ET.SubElement(node_entry, "link")
if hasattr(l, 'href'):
link.set('href', l['href'])
link.set('type', l['type'])
link.set('rel', l['rel'])
elif hasattr(entry, 'link'):
link.set('href', entry['link'])
# Tags
if hasattr(entry, 'tags'):
for t in entry.tags:
tag = ET.SubElement(node_entry, "category")
tag.set('term', t.term)
# Categories
if hasattr(entry,'category'):
for c in entry["category"]:
cat = ET.SubElement(node_entry, "category")
cat.set('category', entry.category[0])
# Authors
if version == 'atom03':
if hasattr(entry, 'authors'):
author = ET.SubElement(node_entry, "author")
name = ET.SubElement(author, "name")
name.text = entry.authors[0].name
if hasattr(entry.authors[0], 'href'):
uri = ET.SubElement(author, "uri")
uri.text = entry.authors[0].href
elif version == 'rss20' or 'rss10' or 'atom10':
if hasattr(entry, 'author'):
author = ET.SubElement(node_entry, "author")
name = ET.SubElement(node_entry, "author")
name.text = entry.author
if hasattr(entry.author, 'href'):
uri = ET.SubElement(author, "uri")
uri.text = entry.authors[0].href
item['payload'] = node_entry
iq['pubsub']['publish'].append(item)
return iq