Rivista/pubsub_to_atom.py
2024-07-11 19:01:45 +03:00

291 lines
12 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
#import datetime
from fastapi import FastAPI, Request, Response
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
import feedgenerator
import json
from slixmpp import ClientXMPP
from slixmpp.exceptions import IqError, IqTimeout
import xml.etree.ElementTree as ET
#import importlib.resources
try:
import tomllib
except:
import tomli as tomllib
app = FastAPI()
class XmppInstance(ClientXMPP):
def __init__(self, jid, password):
super().__init__(jid, password)
self.register_plugin('xep_0060')
self.connect()
# self.process(forever=False)
xmpp = None
# Mount static graphic, script and stylesheet directories
app.mount("/css", StaticFiles(directory="css"), name="css")
app.mount("/data", StaticFiles(directory="data"), name="data")
app.mount("/graphic", StaticFiles(directory="graphic"), name="graphic")
app.mount("/script", StaticFiles(directory="script"), name="script")
app.mount("/xsl", StaticFiles(directory="xsl"), name="xsl")
@app.get('/favicon.ico', include_in_schema=False)
async def favicon():
return FileResponse('favicon.ico')
@app.get('/atom')
async def view_pubsub(request: Request):
global xmpp
if not xmpp:
credentials = get_configuration('account')
xmpp = XmppInstance(credentials['xmpp'], credentials['pass'])
# xmpp.connect()
pubsub = request.query_params.get('pubsub', '')
node = request.query_params.get('node', '')
item_id = request.query_params.get('item', '')
settings = get_configuration('settings')
result = None
if settings['service']:
if settings['include'] in pubsub or not settings['include']:
if pubsub and node and item_id:
iq = await get_node_item(pubsub, node, item_id)
if iq:
link = 'xmpp:{pubsub}?;node={node};item={item}'.format(
pubsub=pubsub, node=node, item=item_id)
xml_atom = generate_rfc_4287(iq, link)
iq = await get_node_items(pubsub, node)
if iq:
generate_json(iq, node)
else:
operator = get_configuration('settings')['operator']
json_data = [{'title' : 'Timeout Error: Press here to contact the operator.',
'link' : 'xmpp:{}?message'.format(operator)}]
filename = 'data/{}.json'.format(node)
with open(filename, 'w', encoding='utf-8') as f:
json.dump(json_data, f, ensure_ascii=False, indent=4)
else:
text = 'Please check that PubSub node and item are valid and accessible.'
xml_atom = error_message(text)
result = append_stylesheet(xml_atom)
# try:
# iq = await get_node_items(pubsub, node)
# generate_json(iq, node)
# except:
# operator = get_configuration('settings')['operator']
# json_data = [{'title' : 'Timeout retrieving node items from {}'.format(node),
# 'link' : 'xmpp:{}?message'.format(operator)}]
# filename = 'data/{}.json'.format(node)
# with open(filename, 'w', encoding='utf-8') as f:
# json.dump(json_data, f, ensure_ascii=False, indent=4)
elif pubsub and node:
iq = await get_node_items(pubsub, node)
if iq:
link = form_a_link(pubsub, node)
xml_atom = generate_rfc_4287(iq, link)
else:
text = 'Please check that PubSub node is valid and accessible.'
xml_atom = error_message(text)
result = append_stylesheet(xml_atom)
elif pubsub:
iq = await get_nodes(pubsub)
if iq:
link = 'xmpp:{pubsub}'.format(pubsub=pubsub)
result = pubsub_to_opml(iq)
else:
text = 'Please check that PubSub Jabber ID is valid and accessible.'
xml_atom = error_message(text)
result = append_stylesheet(xml_atom)
elif node:
text = 'PubSub parameter is missing.'
xml_atom = error_message(text)
result = append_stylesheet(xml_atom)
# else:
# result = ('Mandatory parameter PubSub and '
# 'optional parameter Node are missing.')
else:
text = 'The given domain {} is not allowed.'.format(pubsub)
xml_atom = error_message(text)
result = append_stylesheet(xml_atom)
default = get_configuration('default')
if not result:
if default['pubsub'] and default['nodeid']:
if not pubsub and not node:
pubsub = default['pubsub']
node = default['nodeid']
iq = await get_node_items(pubsub, node)
link = form_a_link(pubsub, node)
xml_atom = generate_rfc_4287(iq, link)
result = append_stylesheet(xml_atom)
elif not settings['service']:
pubsub = default['pubsub']
node = default['nodeid']
iq = await get_node_items(pubsub, node)
link = form_a_link(pubsub, node)
xml_atom = generate_rfc_4287(iq, link)
result = append_stylesheet(xml_atom)
else:
text = 'Please contact the administrator and ask him to set default PubSub and Node ID.'
xml_atom = error_message(text)
result = append_stylesheet(xml_atom)
response = Response(content=result, media_type="application/xml")
return response
def get_configuration(section):
with open('configuration.toml', mode="rb") as configuration:
result = tomllib.load(configuration)[section]
return result
#@timeout(5)
async def get_node_item(pubsub, node, item_id):
try:
iq = await xmpp.plugin['xep_0060'].get_item(pubsub, node, item_id, timeout=5)
return iq
except (IqError, IqTimeout) as e:
print(e)
async def get_node_items(pubsub, node):
try:
iq = await xmpp.plugin['xep_0060'].get_items(pubsub, node, timeout=5)
return iq
except (IqError, IqTimeout) as e:
print(e)
async def get_nodes(pubsub):
try:
await xmpp.plugin['xep_0060'].get_nodes(pubsub, timeout=5)
return iq
except (IqError, IqTimeout) as e:
print(e)
def form_a_link(pubsub, node):
link = 'xmpp:{pubsub}?;node={node}'.format(pubsub=pubsub, node=node)
return link
def error_message(text):
"""Error message in RFC 4287: The Atom Syndication Format."""
feed = feedgenerator.Atom1Feed(
description = ('This is a syndication feed generated with PubSub To '
'Atom, which conveys XEP-0060: Publish-Subscribe nodes '
'to standard RFC 4287: The Atom Syndication Format.'),
language = 'en',
link = '',
subtitle = 'XMPP PubSub To Atom',
title = 'StreamBurner')
namespace = '{http://www.w3.org/2005/Atom}'
feed_url = 'gemini://schimon.i2p/'
# create entry
feed.add_item(
description = text,
# enclosure = feedgenerator.Enclosure(enclosure, enclosure_size, enclosure_type) if args.entry_enclosure else None,
link = '',
# pubdate = updated,
title = 'Error',
# unique_id = ''
)
xml_atom = feed.writeString('utf-8')
xml_atom_extended = append_element(
xml_atom,
'generator',
'XPTA: XMPP PubSub To Atom')
return xml_atom_extended
def generate_rfc_4287(iq, link):
"""Convert XEP-0060: Publish-Subscribe to RFC 4287: The Atom Syndication Format."""
feed = feedgenerator.Atom1Feed(
description = ('This is a syndication feed generated with PubSub To '
'Atom, which conveys XEP-0060: Publish-Subscribe nodes '
'to standard RFC 4287: The Atom Syndication Format.'),
language = iq['pubsub']['items']['lang'],
link = link,
subtitle = 'XMPP PubSub Syndication Feed',
title = iq['pubsub']['items']['node'])
# See also iq['pubsub']['items']['substanzas']
entries = iq['pubsub']['items']
for entry in entries:
item = entry['payload']
namespace = '{http://www.w3.org/2005/Atom}'
title = item.find(namespace + 'title')
title = None if title == None else title.text
updated = item.find(namespace + 'updated')
updated = None if updated == None else updated.text
# if updated: updated = datetime.datetime.fromisoformat(updated)
content = item.find(namespace + 'content')
content = 'No content' if content == None else content.text
link = item.find(namespace + 'link')
link = '' if link == None else link.attrib['href']
author = item.find(namespace + 'author')
if author and author.attrib: print(author.attrib)
author = 'None' if author == None else author.text
# create entry
feed.add_item(
description = content,
# enclosure = feedgenerator.Enclosure(enclosure, enclosure_size, enclosure_type) if args.entry_enclosure else None,
link = link,
# pubdate = updated,
title = title,
unique_id = link)
xml_atom = feed.writeString('utf-8')
xml_atom_extended = append_element(
xml_atom,
'generator',
'XPTA: XMPP PubSub To Atom')
return xml_atom_extended
def generate_json(iq, node):
"""Create a JSON file from node items."""
json_data = []
entries = iq['pubsub']['items']
for entry in entries:
item = entry['payload']
namespace = '{http://www.w3.org/2005/Atom}'
title = item.find(namespace + 'title')
title = None if title == None else title.text
# updated = item.find(namespace + 'updated')
# updated = None if updated == None else updated.text
# if updated: updated = datetime.datetime.fromisoformat(updated)
link = item.find(namespace + 'link')
link = '' if link == None else link.attrib['href']
json_data_entry = {'title' : title,
'link' : link}
json_data.append(json_data_entry)
if len(json_data) > 6: break
filename = 'data/{}.json'.format(node)
with open(filename, 'w', encoding='utf-8') as f:
json.dump(json_data, f, ensure_ascii=False, indent=4)
"""Patch function to append elements which are not provided by feedgenerator"""
def append_element(xml_data, element, text):
root = ET.fromstring(xml_data)
# Create the generator element
generator_element = ET.Element(element)
generator_element.text = text
# Append the generator element to the root
root.append(generator_element)
# Return the modified XML as a string
return ET.tostring(root, encoding='unicode')
"""Patch function to append XSLT reference to XML"""
"""Why is not this a built-in function of ElementTree or LXML"""
def append_stylesheet(xml_data):
# Register namespace in order to avoide ns0:
ET.register_namespace("", "http://www.w3.org/2005/Atom")
# Load XML from string
tree = ET.fromstring(xml_data)
# The following direction removes the XML declaration
xml_data_no_declaration = ET.tostring(tree, encoding='unicode')
# Add XML declaration and stylesheet
xml_data_declaration = ('<?xml version="1.0" encoding="utf-8"?>'
'<?xml-stylesheet type="text/xsl" href="xsl/stylesheet.xsl"?>' +
xml_data_no_declaration)
return xml_data_declaration