Rivista/pubsub_to_atom.py

301 lines
13 KiB
Python
Raw Normal View History

2024-07-08 23:26:18 +02:00
#!/usr/bin/python
# -*- coding: utf-8 -*-
import datetime
2024-07-08 23:26:18 +02:00
from fastapi import FastAPI, Request, Response
2024-07-09 17:17:18 +02:00
from fastapi.responses import FileResponse
2024-07-08 23:26:18 +02:00
from fastapi.staticfiles import StaticFiles
import feedgenerator
import json
2024-07-08 23:26:18 +02:00
from slixmpp import ClientXMPP
from slixmpp.exceptions import IqError, IqTimeout
2024-07-08 23:26:18 +02:00
import xml.etree.ElementTree as ET
#import importlib.resources
try:
import tomllib
except:
import tomli as tomllib
app = FastAPI()
class XmppInstance(ClientXMPP):
def __init__(self, jid, password):
super().__init__(jid, password)
self.register_plugin('xep_0060')
self.connect()
# self.process(forever=False)
xmpp = None
2024-07-09 17:17:18 +02:00
# Mount static graphic, script and stylesheet directories
2024-07-08 23:26:18 +02:00
app.mount("/css", StaticFiles(directory="css"), name="css")
app.mount("/data", StaticFiles(directory="data"), name="data")
2024-07-09 17:17:18 +02:00
app.mount("/graphic", StaticFiles(directory="graphic"), name="graphic")
2024-07-08 23:26:18 +02:00
app.mount("/script", StaticFiles(directory="script"), name="script")
app.mount("/xsl", StaticFiles(directory="xsl"), name="xsl")
2024-07-09 17:17:18 +02:00
@app.get('/favicon.ico', include_in_schema=False)
async def favicon():
return FileResponse('favicon.ico')
2024-07-08 23:26:18 +02:00
@app.get('/atom')
async def view_pubsub(request: Request):
global xmpp
if not xmpp:
credentials = get_configuration('account')
2024-07-08 23:26:18 +02:00
xmpp = XmppInstance(credentials['xmpp'], credentials['pass'])
# xmpp.connect()
pubsub = request.query_params.get('pubsub', '')
node = request.query_params.get('node', '')
item_id = request.query_params.get('item', '')
settings = get_configuration('settings')
result = None
if settings['service']:
if settings['include'] in pubsub or not settings['include']:
if pubsub and node and item_id:
2024-07-11 18:01:45 +02:00
iq = await get_node_item(pubsub, node, item_id)
if iq:
2024-07-11 18:01:45 +02:00
link = 'xmpp:{pubsub}?;node={node};item={item}'.format(
pubsub=pubsub, node=node, item=item_id)
xml_atom = generate_rfc_4287(iq, link)
iq = await get_node_items(pubsub, node)
if iq:
generate_json(iq, node)
else:
operator = get_configuration('settings')['operator']
json_data = [{'title' : 'Error retrieving items list.',
'link' : ('javascript:alert("XJP: XMPP Journal Publisher has experienced an error '
'while attempting to retrieve the list of items for Node {} of PubSub {}.")')
.format(node, pubsub)},
{'title' : 'Contact the operator.',
'link' : ('xmpp:{}?message;subject=XJP: XMPP Journal Publisher;body=Greetings! '
'I am contacting you to inform you that there is an error listing '
'node items for Node {} on PubSub {}.').format(operator, node, pubsub)}]
2024-07-11 18:01:45 +02:00
filename = 'data/{}.json'.format(node)
with open(filename, 'w', encoding='utf-8') as f:
json.dump(json_data, f, ensure_ascii=False, indent=4)
else:
2024-07-11 18:01:45 +02:00
text = 'Please check that PubSub node and item are valid and accessible.'
xml_atom = error_message(text)
result = append_stylesheet(xml_atom)
# try:
# iq = await get_node_items(pubsub, node)
# generate_json(iq, node)
# except:
# operator = get_configuration('settings')['operator']
# json_data = [{'title' : 'Timeout retrieving node items from {}'.format(node),
# 'link' : 'xmpp:{}?message'.format(operator)}]
# filename = 'data/{}.json'.format(node)
# with open(filename, 'w', encoding='utf-8') as f:
# json.dump(json_data, f, ensure_ascii=False, indent=4)
elif pubsub and node:
iq = await get_node_items(pubsub, node)
2024-07-11 18:01:45 +02:00
if iq:
link = form_a_link(pubsub, node)
xml_atom = generate_rfc_4287(iq, link)
else:
text = 'Please check that PubSub node is valid and accessible.'
xml_atom = error_message(text)
result = append_stylesheet(xml_atom)
elif pubsub:
2024-07-11 18:01:45 +02:00
iq = await get_nodes(pubsub)
if iq:
link = 'xmpp:{pubsub}'.format(pubsub=pubsub)
result = pubsub_to_opml(iq)
else:
text = 'Please check that PubSub Jabber ID is valid and accessible.'
xml_atom = error_message(text)
result = append_stylesheet(xml_atom)
elif node:
text = 'PubSub parameter is missing.'
xml_atom = error_message(text)
result = append_stylesheet(xml_atom)
# else:
# result = ('Mandatory parameter PubSub and '
# 'optional parameter Node are missing.')
else:
text = 'The given domain {} is not allowed.'.format(pubsub)
xml_atom = error_message(text)
result = append_stylesheet(xml_atom)
default = get_configuration('default')
if not result:
if default['pubsub'] and default['nodeid']:
if not pubsub and not node:
pubsub = default['pubsub']
node = default['nodeid']
iq = await get_node_items(pubsub, node)
link = form_a_link(pubsub, node)
xml_atom = generate_rfc_4287(iq, link)
result = append_stylesheet(xml_atom)
elif not settings['service']:
pubsub = default['pubsub']
node = default['nodeid']
iq = await get_node_items(pubsub, node)
link = form_a_link(pubsub, node)
xml_atom = generate_rfc_4287(iq, link)
result = append_stylesheet(xml_atom)
else:
text = 'Please contact the administrator and ask him to set default PubSub and Node ID.'
xml_atom = error_message(text)
result = append_stylesheet(xml_atom)
response = Response(content=result, media_type="application/xml")
return response
def get_configuration(section):
with open('configuration.toml', mode="rb") as configuration:
result = tomllib.load(configuration)[section]
return result
#@timeout(5)
2024-07-11 18:01:45 +02:00
async def get_node_item(pubsub, node, item_id):
try:
iq = await xmpp.plugin['xep_0060'].get_item(pubsub, node, item_id, timeout=5)
return iq
except (IqError, IqTimeout) as e:
print(e)
async def get_node_items(pubsub, node):
try:
iq = await xmpp.plugin['xep_0060'].get_items(pubsub, node, timeout=5)
return iq
2024-07-11 18:01:45 +02:00
except (IqError, IqTimeout) as e:
print(e)
async def get_nodes(pubsub):
try:
await xmpp.plugin['xep_0060'].get_nodes(pubsub, timeout=5)
return iq
except (IqError, IqTimeout) as e:
print(e)
def form_a_link(pubsub, node):
link = 'xmpp:{pubsub}?;node={node}'.format(pubsub=pubsub, node=node)
return link
def error_message(text):
"""Error message in RFC 4287: The Atom Syndication Format."""
feed = feedgenerator.Atom1Feed(
description = ('This is a syndication feed generated with XMPP Journal '
'Publisher (XJP), which conveys XEP-0060: Publish-'
'Subscribe nodes to standard RFC 4287: The Atom '
'Syndication Format.'),
language = 'en',
link = '',
subtitle = 'XMPP Journal Publisher',
title = 'StreamBurner')
namespace = '{http://www.w3.org/2005/Atom}'
feed_url = 'gemini://schimon.i2p/'
# create entry
feed.add_item(
description = text,
# enclosure = feedgenerator.Enclosure(enclosure, enclosure_size, enclosure_type) if args.entry_enclosure else None,
link = '',
# pubdate = updated,
title = 'Error',
# unique_id = ''
)
xml_atom = feed.writeString('utf-8')
xml_atom_extended = append_element(
xml_atom,
'generator',
'XMPP Journal Publisher (XJP)')
return xml_atom_extended
def generate_rfc_4287(iq, link):
2024-07-08 23:26:18 +02:00
"""Convert XEP-0060: Publish-Subscribe to RFC 4287: The Atom Syndication Format."""
feed = feedgenerator.Atom1Feed(
description = ('This is a syndication feed generated with PubSub To '
2024-07-08 23:26:18 +02:00
'Atom, which conveys XEP-0060: Publish-Subscribe nodes '
'to standard RFC 4287: The Atom Syndication Format.'),
language = iq['pubsub']['items']['lang'],
link = link,
subtitle = 'XMPP PubSub Syndication Feed',
title = iq['pubsub']['items']['node'])
# See also iq['pubsub']['items']['substanzas']
entries = iq['pubsub']['items']
for entry in entries:
item = entry['payload']
namespace = '{http://www.w3.org/2005/Atom}'
title = item.find(namespace + 'title')
title = None if title == None else title.text
updated = item.find(namespace + 'updated')
updated = None if updated == None else updated.text
published = item.find(namespace + 'published')
published = None if published == None else published.text
if not updated and not published: updated = datetime.datetime.utcnow().isoformat()
2024-07-08 23:26:18 +02:00
content = item.find(namespace + 'content')
content = 'No content' if content == None else content.text
link = item.find(namespace + 'link')
link = '' if link == None else link.attrib['href']
author = item.find(namespace + 'author')
if author and author.attrib: print(author.attrib)
author = 'None' if author == None else author.text
# create entry
feed.add_item(
description = content,
# enclosure = feedgenerator.Enclosure(enclosure, enclosure_size, enclosure_type) if args.entry_enclosure else None,
link = link,
pubdate = published or updated,
2024-07-08 23:26:18 +02:00
title = title,
unique_id = link)
xml_atom = feed.writeString('utf-8')
xml_atom_extended = append_element(
xml_atom,
'generator',
'XMPP Journal Publisher (XJP)')
2024-07-08 23:26:18 +02:00
return xml_atom_extended
def generate_json(iq, node):
"""Create a JSON file from node items."""
json_data = []
entries = iq['pubsub']['items']
for entry in entries:
item = entry['payload']
namespace = '{http://www.w3.org/2005/Atom}'
title = item.find(namespace + 'title')
title = None if title == None else title.text
# updated = item.find(namespace + 'updated')
# updated = None if updated == None else updated.text
# if updated: updated = datetime.datetime.fromisoformat(updated)
link = item.find(namespace + 'link')
link = '' if link == None else link.attrib['href']
json_data_entry = {'title' : title,
'link' : link}
json_data.append(json_data_entry)
if len(json_data) > 6: break
filename = 'data/{}.json'.format(node)
with open(filename, 'w', encoding='utf-8') as f:
json.dump(json_data, f, ensure_ascii=False, indent=4)
2024-07-08 23:26:18 +02:00
"""Patch function to append elements which are not provided by feedgenerator"""
def append_element(xml_data, element, text):
root = ET.fromstring(xml_data)
# Create the generator element
generator_element = ET.Element(element)
generator_element.text = text
# Append the generator element to the root
root.append(generator_element)
# Return the modified XML as a string
return ET.tostring(root, encoding='unicode')
"""Patch function to append XSLT reference to XML"""
"""Why is not this a built-in function of ElementTree or LXML"""
def append_stylesheet(xml_data):
# Register namespace in order to avoide ns0:
ET.register_namespace("", "http://www.w3.org/2005/Atom")
# Load XML from string
tree = ET.fromstring(xml_data)
# The following direction removes the XML declaration
xml_data_no_declaration = ET.tostring(tree, encoding='unicode')
# Add XML declaration and stylesheet
xml_data_declaration = ('<?xml version="1.0" encoding="utf-8"?>'
'<?xml-stylesheet type="text/xsl" href="xsl/stylesheet.xsl"?>' +
xml_data_no_declaration)
return xml_data_declaration