Rivista/pubsub_to_atom.py
Schimon Jehudah, Adv. ad34af72ff Add support for contents of Libervia (XEP-0277);
Various of visual improvements.
2024-07-31 19:21:33 +03:00

413 lines
19 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
import datetime
from fastapi import FastAPI, Request, Response
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
import json
from slixmpp import ClientXMPP
from slixmpp.exceptions import IqError, IqTimeout
import xml.etree.ElementTree as ET
#import importlib.resources
try:
import tomllib
except:
import tomli as tomllib
class XmppInstance(ClientXMPP):
def __init__(self, jid, password):
super().__init__(jid, password)
self.register_plugin('xep_0060')
self.connect()
# self.process(forever=False)
xmpp = None
app = FastAPI()
# Mount static graphic, script and stylesheet directories
app.mount("/css", StaticFiles(directory="css"), name="css")
app.mount("/data", StaticFiles(directory="data"), name="data")
app.mount("/graphic", StaticFiles(directory="graphic"), name="graphic")
app.mount("/script", StaticFiles(directory="script"), name="script")
app.mount("/xsl", StaticFiles(directory="xsl"), name="xsl")
@app.get('/favicon.ico', include_in_schema=False)
async def favicon():
return FileResponse('favicon.ico')
@app.route('/')
@app.get('/opml')
async def view_pubsub_nodes(request: Request):
global xmpp
if not xmpp:
credentials = get_configuration('account')
xmpp = XmppInstance(credentials['xmpp'], credentials['pass'])
# xmpp.connect()
pubsub = request.query_params.get('pubsub', '')
settings = get_configuration('settings')
result = None
if settings['service']:
if settings['include'] in pubsub or not settings['include']:
if pubsub:
iq = await get_nodes(pubsub)
if iq:
link = 'xmpp:{pubsub}'.format(pubsub=pubsub)
xml_opml = generate_opml(iq)
result = append_stylesheet(xml_opml, 'opml')
else:
text = 'Please ensure that the PubSub Jabber ID is valid and accessible.'
xml_atom = error_message(text)
result = append_stylesheet(xml_atom, 'atom')
else:
text = 'The given domain {} is not allowed.'.format(pubsub)
xml_atom = error_message(text)
result = append_stylesheet(xml_atom, 'atom')
default = get_configuration('default')
if not result:
if default['pubsub']:
if not pubsub:
pubsub = default['pubsub']
iq = await get_nodes(pubsub)
link = 'xmpp:{pubsub}'.format(pubsub=pubsub)
xml_opml = generate_opml(iq)
result = append_stylesheet(xml_opml, 'opml')
elif not settings['service']:
pubsub = default['pubsub']
link = 'xmpp:{pubsub}'.format(pubsub=pubsub)
xml_opml = generate_opml(iq)
result = append_stylesheet(xml_opml, 'opml')
else:
text = 'Please contact the administrator and ask him to set default PubSub and Node ID.'
xml_atom = error_message(text)
result = append_stylesheet(xml_atom, 'atom')
response = Response(content=result, media_type="application/xml")
return response
@app.get('/atom')
async def view_node_items(request: Request):
global xmpp
if not xmpp:
credentials = get_configuration('account')
xmpp = XmppInstance(credentials['xmpp'], credentials['pass'])
# xmpp.connect()
pubsub = request.query_params.get('pubsub', '')
node = request.query_params.get('node', '')
item_id = request.query_params.get('item', '')
settings = get_configuration('settings')
result = None
if settings['service']:
if settings['include'] in pubsub or not settings['include']:
if pubsub and node and item_id:
iq = await get_node_item(pubsub, node, item_id)
if iq:
link = form_an_item_link(pubsub, node, item_id)
xml_atom = generate_atom(iq, link)
iq = await get_node_items(pubsub, node)
if not '/' in node:
if iq:
generate_json(iq)
else:
operator = get_configuration('settings')['operator']
json_data = [{'title' : 'Error retrieving node items.',
'link' : ('javascript:alert("Rivista has experienced an error '
'while attempting to retrieve the list of items for '
'Node {} of PubSub {}.")')
.format(node, pubsub)},
{'title' : 'Contact the operator.',
'link' : ('xmpp:{}?message;subject=Rivista;body=Greetings! '
'I am contacting you to inform you that there is an error listing '
'node items for Node {} on PubSub {}.').format(operator, node, pubsub)}]
filename = 'data/{}.json'.format(node)
with open(filename, 'w', encoding='utf-8') as f:
json.dump(json_data, f, ensure_ascii=False, indent=4)
else:
text = 'Please ensure that the PubSub node and item are valid and accessible.'
xml_atom = error_message(text)
result = append_stylesheet(xml_atom, 'atom')
# try:
# iq = await get_node_items(pubsub, node)
# generate_json(iq, node)
# except:
# operator = get_configuration('settings')['operator']
# json_data = [{'title' : 'Timeout retrieving node items from {}'.format(node),
# 'link' : 'xmpp:{}?message'.format(operator)}]
# filename = 'data/{}.json'.format(node)
# with open(filename, 'w', encoding='utf-8') as f:
# json.dump(json_data, f, ensure_ascii=False, indent=4)
elif pubsub and node:
iq = await get_node_items(pubsub, node)
if iq:
link = form_a_node_link(pubsub, node)
xml_atom = generate_atom(iq, link)
else:
text = 'Please ensure that the PubSub node is valid and accessible.'
xml_atom = error_message(text)
result = append_stylesheet(xml_atom, 'atom')
elif pubsub:
text = 'Node parameter is missing.'
xml_atom = error_message(text)
result = append_stylesheet(xml_atom, 'atom')
elif node:
text = 'PubSub parameter is missing.'
xml_atom = error_message(text)
result = append_stylesheet(xml_atom, 'atom')
# else:
# result = ('Mandatory parameter PubSub and '
# 'optional parameter Node are missing.')
else:
text = 'The given domain {} is not allowed.'.format(pubsub)
xml_atom = error_message(text)
result = append_stylesheet(xml_atom, 'atom')
default = get_configuration('default')
if not result:
if default['pubsub'] and default['nodeid']:
if not pubsub and not node:
pubsub = default['pubsub']
node = default['nodeid']
iq = await get_node_items(pubsub, node)
link = form_a_node_link(pubsub, node)
xml_atom = generate_atom(iq, link)
result = append_stylesheet(xml_atom, 'atom')
elif not settings['service']:
pubsub = default['pubsub']
node = default['nodeid']
iq = await get_node_items(pubsub, node)
link = form_a_node_link(pubsub, node)
xml_atom = generate_atom(iq, link)
result = append_stylesheet(xml_atom, 'atom')
else:
text = 'Please contact the administrator and ask him to set default PubSub and Node ID.'
xml_atom = error_message(text)
result = append_stylesheet(xml_atom, 'atom')
response = Response(content=result, media_type="application/xml")
return response
def get_configuration(section):
with open('configuration.toml', mode="rb") as configuration:
result = tomllib.load(configuration)[section]
return result
#@timeout(5)
async def get_node_item(pubsub, node, item_id):
try:
iq = await xmpp.plugin['xep_0060'].get_item(pubsub, node, item_id, timeout=5)
return iq
except (IqError, IqTimeout) as e:
print(e)
async def get_node_items(pubsub, node):
try:
iq = await xmpp.plugin['xep_0060'].get_items(pubsub, node, timeout=5)
return iq
except (IqError, IqTimeout) as e:
print(e)
async def get_nodes(pubsub):
try:
iq = await xmpp.plugin['xep_0060'].get_nodes(pubsub, timeout=5)
return iq
except (IqError, IqTimeout) as e:
print(e)
def form_a_node_link(pubsub, node):
link = 'xmpp:{pubsub}?;node={node}'.format(pubsub=pubsub, node=node)
return link
def form_an_item_link(pubsub, node, item_id):
link = 'xmpp:{pubsub}?;node={node};item={item}'.format(
pubsub=pubsub, node=node, item=item_id)
return link
def error_message(text):
"""Error message in RFC 4287: The Atom Syndication Format."""
title = 'Rivista'
subtitle = 'XMPP Journal Publisher'
description = ('This is a syndication feed generated with Rivista, an XMPP '
'Journal Publisher, which conveys XEP-0060: Publish-'
'Subscribe nodes to standard RFC 4287: The Atom Syndication '
'Format.')
language = 'en'
feed = ET.Element("feed")
feed.set('xmlns', 'http://www.w3.org/2005/Atom')
ET.SubElement(feed, 'title', {'type': 'text'}).text = title
ET.SubElement(feed, 'subtitle', {'type': 'text'}).text = subtitle
ET.SubElement(feed, 'author', {'name':'Rivista','email':'rivista@schimon.i2p'})
ET.SubElement(feed, 'generator', {
'uri': 'https://git.xmpp-it.net/sch/Rivista',
'version': '0.1'}).text = 'Rivista'
ET.SubElement(feed, 'updated').text = datetime.datetime.now(datetime.UTC).isoformat()
entry = ET.SubElement(feed, 'entry')
ET.SubElement(entry, 'title').text = 'Error'
ET.SubElement(entry, 'id').text = 'rivista-error'
ET.SubElement(entry, 'updated').text = datetime.datetime.now(datetime.UTC).isoformat()
ET.SubElement(entry, 'published').text = datetime.datetime.now(datetime.UTC).isoformat()
# ET.SubElement(entry, 'summary', {'type': summary_type_text}).text = summary_text
ET.SubElement(entry, 'content', {'type': 'text'}).text = text
return ET.tostring(feed, encoding='unicode')
# generate_rfc_4287
def generate_atom(iq, link):
"""Generate an Atom Syndication Format (RFC 4287) from a Publish-Subscribe (XEP-0060) node items."""
pubsub = iq['from'].bare
node = iq['pubsub']['items']['node']
title = node
link = link
# link = form_a_node_link(pubsub, node)
# subtitle = 'XMPP PubSub Syndication Feed'
subtitle = pubsub
description = ('This is a syndication feed generated with Rivista, an XMPP '
'Journal Publisher, which conveys XEP-0060: Publish-'
'Subscribe nodes to standard RFC 4287: The Atom Syndication '
'Format.')
language = iq['pubsub']['items']['lang']
items = iq['pubsub']['items']
e_feed = ET.Element("feed")
e_feed.set('xmlns', 'http://www.w3.org/2005/Atom')
ET.SubElement(e_feed, 'title', {'type': 'text'}).text = title
ET.SubElement(e_feed, 'subtitle', {'type': 'text'}).text = subtitle
ET.SubElement(e_feed, 'link', {'rel': 'self', 'href': link})
ET.SubElement(e_feed, 'generator', {
'uri': 'https://git.xmpp-it.net/sch/Rivista',
'version': '0.1'}).text = 'Rivista'
ET.SubElement(e_feed, 'updated').text = datetime.datetime.now(datetime.UTC).isoformat()
for item in items:
item_id = item['id']
item_payload = item['payload']
namespace = '{http://www.w3.org/2005/Atom}'
title = item_payload.find(namespace + 'title')
links = item_payload.find(namespace + 'link')
if (not isinstance(title, ET.Element) and
not isinstance(links, ET.Element)): continue
title_text = None if title == None else title.text
e_entry = ET.SubElement(e_feed, 'entry')
ET.SubElement(e_entry, 'title').text = title_text
if isinstance(links, ET.Element):
for link in item_payload.findall(namespace + 'link'):
link_href = link.attrib['href'] if 'href' in link.attrib else ''
link_type = link.attrib['type'] if 'type' in link.attrib else ''
link_rel = link.attrib['rel'] if 'rel' in link.attrib else ''
ET.SubElement(e_entry, 'link', {'href': link_href, 'rel': link_rel, 'type': link_type})
else:
ET.SubElement(e_entry, 'content', {'href': ''})
link_xmpp = form_an_item_link(pubsub, node, item_id)
ET.SubElement(e_entry, 'link', {'href': link_xmpp, 'rel': 'alternate', 'type': 'x-scheme-handler/xmpp'})
contents = item_payload.find(namespace + 'content')
if isinstance(contents, ET.Element):
for content in item_payload.findall(namespace + 'content'):
if not content.text: continue
content_text = content.text
content_type = content.attrib['type'] if 'type' in content.attrib else 'html'
content_type_text = 'html' if 'html' in content_type else 'text'
ET.SubElement(e_entry, 'content', {'type': content_type_text}).text = content_text
else:
summary = item_payload.find(namespace + 'summary')
summary_text = summary.text
if summary_text:
summary_type = summary.attrib['type'] if 'type' in summary.attrib else 'html'
summary_type_text = 'html' if 'html' in summary_type else 'text'
ET.SubElement(e_entry, 'content', {'type': summary_type_text}).text = summary_text
else:
ET.SubElement(e_entry, 'content').text = 'No content.'
published = item_payload.find(namespace + 'published')
published_text = None if published == None else published.text
ET.SubElement(e_entry, 'published').text = published_text
updated = item_payload.find(namespace + 'updated')
updated_text = None if updated == None else updated.text
ET.SubElement(e_entry, 'updated').text = updated_text
e_author = ET.SubElement(e_entry, 'author')
authors = item_payload.find(namespace + 'author')
if isinstance(authors, ET.Element):
for author in item_payload.findall(namespace + 'author'):
if not author.text: continue
author_text = author.text
author_email = link.attrib['href'] if 'href' in link.attrib else ''
author_uri = link.attrib['type'] if 'type' in link.attrib else ''
author_summary = link.attrib['rel'] if 'rel' in link.attrib else ''
ET.SubElement(e_author, 'name').text = author_text
if author and author.attrib: print(author.attrib)
categories = item_payload.find(namespace + 'category')
if isinstance(categories, ET.Element):
for category in item_payload.findall(namespace + 'category'):
if 'term' in category.attrib and category.attrib['term']:
category_term = category.attrib['term']
ET.SubElement(e_entry, 'category', {'term': category_term})
identifier = item_payload.find(namespace + 'id')
if identifier and identifier.attrib: print(identifier.attrib)
identifier_text = None if identifier == None else identifier.text
ET.SubElement(e_entry, 'id').text = identifier_text
# ET.SubElement(e_entry, 'summary', {'type': summary_type_text}).text = summary_text
return ET.tostring(e_feed, encoding='unicode')
def generate_json(iq):
"""Create a JSON file from node items."""
json_data = []
pubsub = iq['from'].bare
node = iq['pubsub']['items']['node']
entries = iq['pubsub']['items']
for entry in entries:
item_id = entry['id']
item_payload = entry['payload']
namespace = '{http://www.w3.org/2005/Atom}'
title = item_payload.find(namespace + 'title')
title_text = '*** No Title ***' if title == None else title.text
# updated = item.find(namespace + 'updated')
# updated = None if updated == None else updated.text
# if updated: updated = datetime.datetime.fromisoformat(updated)
link_href = form_an_item_link(pubsub, node, item_id)
# link = item.find(namespace + 'link')
# link_href = '' if link == None else link.attrib['href']
json_data_entry = {'title' : title_text,
'link' : link_href}
json_data.append(json_data_entry)
if len(json_data) > 6: break
filename = 'data/{}.json'.format(node)
with open(filename, 'w', encoding='utf-8') as f:
json.dump(json_data, f, ensure_ascii=False, indent=4)
"""Patch function to append XSLT reference to XML"""
"""Why is not this a built-in function of ElementTree or LXML"""
def append_stylesheet(xml_data, type):
# Register namespace in order to avoide ns0:
if type == 'atom': ET.register_namespace('', 'http://www.w3.org/2005/Atom')
# Load XML from string
tree = ET.fromstring(xml_data)
# The following direction removes the XML declaration
xml_data_without_a_declaration = ET.tostring(tree, encoding='unicode')
# Add XML declaration and stylesheet
xml_data_declaration = (
'<?xml version="1.0" encoding="utf-8"?>'
'<?xml-stylesheet type="text/xsl" href="xsl/{}.xsl"?>'.format(type) +
xml_data_without_a_declaration)
return xml_data_declaration
def generate_opml(iq):
pubsub = iq['from'].bare
items = iq['disco_items']['items']
opml = ET.Element("opml")
opml.set("version", "1.0")
head = ET.SubElement(opml, "head")
ET.SubElement(head, "title").text = 'An OPML of ' + pubsub
ET.SubElement(head, "description").text = (
"PubSub Nodes of {}").format(pubsub)
ET.SubElement(head, "generator").text = 'Rivista'
ET.SubElement(head, "urlPublic").text = 'https://git.xmpp-it.net/sch/Rivista'
time_stamp = datetime.datetime.now(datetime.UTC).isoformat()
ET.SubElement(head, "dateCreated").text = time_stamp
ET.SubElement(head, "dateModified").text = time_stamp
body = ET.SubElement(opml, "body")
for item in items:
pubsub, node, title = item
uri = form_a_node_link(pubsub, node)
outline = ET.SubElement(body, "outline")
outline.set("text", title or node)
outline.set("xmlUrl", uri)
return ET.tostring(opml, encoding='unicode')