Slixfeed/slixfeed/xmpp/publish.py

420 lines
16 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Functions create_node and create_entry are derived from project atomtopubsub.
"""
import asyncio
import hashlib
2024-03-24 09:14:20 +01:00
import slixmpp.plugins.xep_0060.stanza.pubsub as pubsub
from slixmpp.xmlstream import ET
import slixfeed.config as config
from slixfeed.config import Config
from slixfeed.log import Logger
import slixfeed.sqlite as sqlite
from slixfeed.syndication import Feed
from slixfeed.utilities import String, Url, Utilities
from slixfeed.xmpp.iq import XmppIQ
import sys
logger = Logger(__name__)
2024-03-24 09:14:20 +01:00
class XmppPubsub:
2024-03-24 09:14:20 +01:00
async def get_pubsub_services(self):
results = []
iq = await self['xep_0030'].get_items(jid=self.boundjid.domain)
items = iq['disco_items']['items']
for item in items:
iq = await self['xep_0030'].get_info(jid=item[0])
identities = iq['disco_info']['identities']
for identity in identities:
if identity[0] == 'pubsub' and identity[1] == 'service':
result = {}
result['jid'] = item[0]
if item[1]: result['name'] = item[1]
elif item[2]: result['name'] = item[2]
else: result['name'] = item[0]
results.extend([result])
return results
async def get_node_properties(self, jid_bare, node):
config = await self.plugin['xep_0060'].get_node_config(jid_bare, node)
subscriptions = await self.plugin['xep_0060'].get_node_subscriptions(jid_bare, node)
affiliations = await self.plugin['xep_0060'].get_node_affiliations(jid_bare, node)
properties = {'config': config,
'subscriptions': subscriptions,
'affiliations': affiliations}
breakpoint()
return properties
async def get_node_configuration(self, jid_bare, node_id):
node = await self.plugin['xep_0060'].get_node_config(jid_bare, node_id)
return node
async def get_nodes(self, jid_bare):
nodes = await self.plugin['xep_0060'].get_nodes(jid_bare)
# 'self' would lead to slixmpp.jid.InvalidJID: idna validation failed:
return nodes
async def get_item(self, jid_bare, node, item_id):
item = await self.plugin['xep_0060'].get_item(jid_bare, node, item_id)
return item
async def get_items(self, jid_bare, node):
items = await self.plugin['xep_0060'].get_items(jid_bare, node)
return items
def delete_node(self, jid_bare, node):
jid_from = self.boundjid.bare if self.is_component else None
self.plugin['xep_0060'].delete_node(jid_bare, node, ifrom=jid_from)
def purge_node(self, jid_bare, node):
jid_from = self.boundjid.bare if self.is_component else None
self.plugin['xep_0060'].purge(jid_bare, node, ifrom=jid_from)
# iq = self.Iq(stype='set',
# sto=jid_bare,
# sfrom=jid_from)
# iq['pubsub']['purge']['node'] = node
# return iq
# TODO Make use of var "xep" with match/case (XEP-0060, XEP-0277, XEP-0472)
def create_node(self, jid_bare, node, xep=None ,title=None, subtitle=None):
jid_from = self.boundjid.bare if self.is_component else None
2024-03-24 09:14:20 +01:00
iq = self.Iq(stype='set',
sto=jid_bare,
sfrom=jid_from)
iq['pubsub']['create']['node'] = node
form = iq['pubsub']['configure']['form']
form['type'] = 'submit'
form.addField('pubsub#title',
ftype='text-single',
value=title)
form.addField('pubsub#description',
ftype='text-single',
value=subtitle)
form.addField('pubsub#notify_retract',
ftype='boolean',
value=1)
form.addField('pubsub#max_items',
ftype='text-single',
value='20')
form.addField('pubsub#persist_items',
ftype='boolean',
value=1)
form.addField('pubsub#send_last_published_item',
ftype='text-single',
value='never')
form.addField('pubsub#deliver_payloads',
ftype='boolean',
value=0)
form.addField('pubsub#type',
ftype='text-single',
value='http://www.w3.org/2005/Atom')
return iq
2024-03-24 09:14:20 +01:00
# TODO Consider to create a separate function called "create_atom_entry"
# or "create_rfc4287_entry" for anything related to variable "node_entry".
def create_entry(self, jid_bare, node_id, item_id, node_item):
iq = self.Iq(stype="set", sto=jid_bare)
iq['pubsub']['publish']['node'] = node_id
2024-03-24 09:14:20 +01:00
item = pubsub.Item()
# From atomtopubsub:
# character / is causing a bug in movim. replacing : and , with - in id.
# It provides nicer urls.
# Respond to atomtopubsub:
# I think it would be beneficial to use md5 checksum of Url as Id for
2024-03-24 09:14:20 +01:00
# cross reference, and namely - in another project to utilize PubSub as
# links sharing system (see del.icio.us) - to share node entries.
item['id'] = item_id
item['payload'] = node_item
iq['pubsub']['publish'].append(item)
return iq
def _create_entry(self, jid_bare, node, entry, version):
iq = self.Iq(stype="set", sto=jid_bare)
iq['pubsub']['publish']['node'] = node
item = pubsub.Item()
# From atomtopubsub:
# character / is causing a bug in movim. replacing : and , with - in id.
# It provides nicer urls.
# Respond to atomtopubsub:
# I think it would be beneficial to use md5 checksum of Url as Id for
# cross reference, and namely - in another project to utilize PubSub as
# links sharing system (see del.icio.us) - to share node entries.
# NOTE Warning: Entry might not have a link
# TODO Handle situation error
url_encoded = entry.link.encode()
url_hashed = hashlib.md5(url_encoded)
url_digest = url_hashed.hexdigest()
item['id'] = url_digest + '_html'
node_entry = ET.Element("entry")
node_entry.set('xmlns', 'http://www.w3.org/2005/Atom')
title = ET.SubElement(node_entry, "title")
title.text = entry.title
updated = ET.SubElement(node_entry, "updated")
updated.text = entry.updated
2024-03-24 09:14:20 +01:00
# Content
if version == 'atom3':
if hasattr(entry.content[0], 'type'):
content = ET.SubElement(node_entry, "content")
content.set('type', entry.content[0].type)
content.text = entry.content[0].value
elif version =='rss20' or 'rss10' or 'atom10':
if hasattr(entry, "content"):
content = ET.SubElement(node_entry, "content")
content.set('type', 'text/html')
content.text = entry.content[0].value
elif hasattr(entry, "description"):
content = ET.SubElement(node_entry,"content")
content.set('type', 'text/html')
content.text = entry.description
print('In Description - PublishX')
# Links
if hasattr(entry, 'links'):
for l in entry.links:
link = ET.SubElement(node_entry, "link")
if hasattr(l, 'href'):
link.set('href', l['href'])
link.set('type', l['type'])
link.set('rel', l['rel'])
elif hasattr(entry, 'link'):
link.set('href', entry['link'])
# Tags
if hasattr(entry, 'tags'):
for t in entry.tags:
tag = ET.SubElement(node_entry, "category")
tag.set('term', t.term)
# Categories
if hasattr(entry,'category'):
for c in entry["category"]:
cat = ET.SubElement(node_entry, "category")
cat.set('category', entry.category[0])
# Authors
if version == 'atom03':
if hasattr(entry, 'authors'):
author = ET.SubElement(node_entry, "author")
name = ET.SubElement(author, "name")
name.text = entry.authors[0].name
if hasattr(entry.authors[0], 'href'):
uri = ET.SubElement(author, "uri")
uri.text = entry.authors[0].href
elif version == 'rss20' or 'rss10' or 'atom10':
if hasattr(entry, 'author'):
author = ET.SubElement(node_entry, "author")
name = ET.SubElement(node_entry, "author")
name.text = entry.author
if hasattr(entry.author, 'href'):
uri = ET.SubElement(author, "uri")
uri.text = entry.authors[0].href
item['payload'] = node_entry
iq['pubsub']['publish'].append(item)
return iq
class XmppPubsubAction:
async def send_selected_entry(self, jid_bare, node_id, entry_id):
function_name = sys._getframe().f_code.co_name
logger.debug('{}: jid_bare: {}'.format(function_name, jid_bare))
db_file = config.get_pathname_to_database(jid_bare)
feed_id = sqlite.get_feed_id_by_entry_index(db_file, entry_id)
feed_id = feed_id[0]
node_id, node_title, node_subtitle = sqlite.get_feed_properties(db_file, feed_id)
xep = None
iq_create_node = XmppPubsub.create_node(
self, jid_bare, node_id, xep, node_title, node_subtitle)
await XmppIQ.send(self, iq_create_node)
entry = sqlite.get_entry_properties(db_file, entry_id)
print('xmpp_pubsub_send_selected_entry',jid_bare)
print(node_id)
entry_dict = Feed.pack_entry_into_dict(db_file, entry)
node_item = Feed.create_rfc4287_entry(entry_dict)
item_id = Utilities.hash_url_to_md5(entry_dict['link'])
iq_create_entry = XmppPubsub.create_entry(
self, jid_bare, node_id, item_id, node_item)
await XmppIQ.send(self, iq_create_entry)
await sqlite.mark_as_read(db_file, entry_id)
# NOTE This value is returned for the sake of testing
return entry_dict['link']
async def send_unread_items(self, jid_bare):
"""
Parameters
----------
jid_bare : str
Bare Jabber ID.
Returns
-------
report : dict
URL and Number of processed entries.
"""
function_name = sys._getframe().f_code.co_name
logger.debug('{}: jid_bare: {}'.format(function_name, jid_bare))
db_file = config.get_pathname_to_database(jid_bare)
report = {}
subscriptions = sqlite.get_active_feeds_url(db_file)
for url in subscriptions:
url = url[0]
# feed_id = sqlite.get_feed_id(db_file, url)
# feed_id = feed_id[0]
# feed_properties = sqlite.get_feed_properties(db_file, feed_id)
feed_id = sqlite.get_feed_id(db_file, url)
feed_id = feed_id[0]
# Publish to node based on feed identifier for PubSub service.
# node_id = feed_properties[2]
# node_title = feed_properties[3]
# node_subtitle = feed_properties[5]
node_id = sqlite.get_feed_identifier(db_file, feed_id)
node_id = node_id[0]
if not node_id:
counter = 0
while True:
identifier = String.generate_identifier(url, counter)
if sqlite.check_identifier_exist(db_file, identifier):
counter += 1
else:
break
await sqlite.update_feed_identifier(db_file, feed_id, identifier)
node_id = sqlite.get_feed_identifier(db_file, feed_id)
node_id = node_id[0]
node_title = sqlite.get_feed_title(db_file, feed_id)
node_title = node_title[0]
node_subtitle = sqlite.get_feed_subtitle(db_file, feed_id)
node_subtitle = node_subtitle[0]
xep = None
#node_exist = await XmppPubsub.get_node_configuration(self, jid_bare, node_id)
nodes = await XmppPubsub.get_nodes(self, jid_bare)
node_items = nodes['disco_items']['items']
node_exist = False
for node_item in node_items:
if node_item[1] == node_id:
node_exist = True
break
print(['node_exist', node_exist])
if not node_exist:
iq_create_node = XmppPubsub.create_node(
self, jid_bare, node_id, xep, node_title, node_subtitle)
result = await XmppIQ.send(self, iq_create_node)
result_condition = result.iq['error']['condition']
if result_condition in ('forbidden', 'service-unavailable'):
reason = result.iq['error']['text']
print('Creation of node {} for JID {} has failed'.format(node_id, jid_bare, reason))
return
entries = sqlite.get_unread_entries_of_feed(db_file, feed_id)
report[url] = len(entries)
for entry in entries:
feed_entry = Feed.pack_entry_into_dict(db_file, entry)
node_entry = Feed.create_rfc4287_entry(feed_entry)
entry_url = feed_entry['link']
item_id = Utilities.hash_url_to_md5(entry_url)
print(['PubSub node item was sent to', jid_bare, node_id])
print([entry_url, item_id])
iq_create_entry = XmppPubsub.create_entry(
self, jid_bare, node_id, item_id, node_entry)
result = await XmppIQ.send(self, iq_create_entry)
ix = entry[0]
await sqlite.mark_as_read(db_file, ix)
print(report)
return report
class XmppPubsubTask:
async def loop_task(self, jid_bare):
db_file = config.get_pathname_to_database(jid_bare)
if jid_bare not in self.settings:
Config.add_settings_jid(self, jid_bare, db_file)
while True:
print('Looping task "publish" for JID {}'.format(jid_bare))
if jid_bare not in self.task_manager:
self.task_manager[jid_bare] = {}
logger.info('Creating new task manager for JID {}'.format(jid_bare))
logger.info('Stopping task "publish" for JID {}'.format(jid_bare))
try:
self.task_manager[jid_bare]['publish'].cancel()
except:
logger.info('No task "publish" for JID {} (XmppPubsubAction.send_unread_items)'
.format(jid_bare))
logger.info('Starting tasks "publish" for JID {}'.format(jid_bare))
self.task_manager[jid_bare]['publish'] = asyncio.create_task(
XmppPubsubAction.send_unread_items(self, jid_bare))
await asyncio.sleep(60 * 180)
def restart_task(self, jid_bare):
db_file = config.get_pathname_to_database(jid_bare)
if jid_bare not in self.settings:
Config.add_settings_jid(self, jid_bare, db_file)
if jid_bare not in self.task_manager:
self.task_manager[jid_bare] = {}
logger.info('Creating new task manager for JID {}'.format(jid_bare))
logger.info('Stopping task "publish" for JID {}'.format(jid_bare))
try:
self.task_manager[jid_bare]['publish'].cancel()
except:
logger.info('No task "publish" for JID {} (XmppPubsubAction.send_unread_items)'
.format(jid_bare))
logger.info('Starting tasks "publish" for JID {}'.format(jid_bare))
self.task_manager[jid_bare]['publish'] = asyncio.create_task(
XmppPubsubAction.send_unread_items(self, jid_bare))
async def task_publish(self, jid_bare):
db_file = config.get_pathname_to_database(jid_bare)
if jid_bare not in self.settings:
Config.add_settings_jid(self, jid_bare, db_file)
while True:
await XmppPubsubAction.send_unread_items(self, jid_bare)
await asyncio.sleep(60 * 180)