forked from sch/Rivista
143 lines
5.5 KiB
Python
143 lines
5.5 KiB
Python
#!/usr/bin/python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
#import datetime
|
|
from fastapi import FastAPI, Request, Response
|
|
from fastapi.responses import FileResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
import feedgenerator
|
|
from slixmpp import ClientXMPP
|
|
import xml.etree.ElementTree as ET
|
|
#import importlib.resources
|
|
|
|
try:
|
|
import tomllib
|
|
except:
|
|
import tomli as tomllib
|
|
|
|
app = FastAPI()
|
|
|
|
class XmppInstance(ClientXMPP):
|
|
def __init__(self, jid, password):
|
|
super().__init__(jid, password)
|
|
self.register_plugin('xep_0060')
|
|
self.connect()
|
|
# self.process(forever=False)
|
|
|
|
xmpp = None
|
|
|
|
# Mount static graphic, script and stylesheet directories
|
|
app.mount("/css", StaticFiles(directory="css"), name="css")
|
|
app.mount("/graphic", StaticFiles(directory="graphic"), name="graphic")
|
|
app.mount("/script", StaticFiles(directory="script"), name="script")
|
|
app.mount("/xsl", StaticFiles(directory="xsl"), name="xsl")
|
|
|
|
@app.get('/favicon.ico', include_in_schema=False)
|
|
async def favicon():
|
|
return FileResponse('favicon.ico')
|
|
|
|
@app.get('/atom')
|
|
async def view_pubsub(request: Request):
|
|
global xmpp
|
|
if not xmpp:
|
|
with open('configuration.toml', mode="rb") as configuration:
|
|
credentials = tomllib.load(configuration)['account']
|
|
xmpp = XmppInstance(credentials['xmpp'], credentials['pass'])
|
|
# xmpp.connect()
|
|
|
|
pubsub = request.query_params.get('pubsub', '')
|
|
node = request.query_params.get('node', '')
|
|
item_id = request.query_params.get('item', '')
|
|
if pubsub and node and item_id:
|
|
iq = await xmpp.plugin['xep_0060'].get_item(pubsub, node, item_id)
|
|
link = 'xmpp:{pubsub}?;node={node};item={item}'.format(
|
|
pubsub=pubsub, node=node, item=item_id)
|
|
xml_atom = pubsub_to_atom(iq, link)
|
|
result = append_stylesheet(xml_atom)
|
|
elif pubsub and node:
|
|
iq = await xmpp.plugin['xep_0060'].get_items(pubsub, node)
|
|
link = 'xmpp:{pubsub}?;node={node}'.format(pubsub=pubsub, node=node)
|
|
xml_atom = pubsub_to_atom(iq, link)
|
|
result = append_stylesheet(xml_atom)
|
|
elif pubsub:
|
|
iq = await xmpp.plugin['xep_0060'].get_nodes(pubsub)
|
|
link = 'xmpp:{pubsub}'.format(pubsub=pubsub)
|
|
result = pubsub_to_opml(iq)
|
|
elif node:
|
|
result = 'PubSub parameter is missing.'
|
|
else:
|
|
result = ('Mandatory parameter PubSub and '
|
|
'optional parameter Node are missing.')
|
|
return Response(content=result, media_type="application/xml")
|
|
|
|
def pubsub_to_atom(iq, link):
|
|
"""Convert XEP-0060: Publish-Subscribe to RFC 4287: The Atom Syndication Format."""
|
|
feed = feedgenerator.Atom1Feed(
|
|
description = ('This is a syndication feed generated with PubSub to '
|
|
'Atom, which conveys XEP-0060: Publish-Subscribe nodes '
|
|
'to standard RFC 4287: The Atom Syndication Format.'),
|
|
language = iq['pubsub']['items']['lang'],
|
|
link = link,
|
|
subtitle = 'XMPP PubSub Syndication Feed',
|
|
title = iq['pubsub']['items']['node'])
|
|
# See also iq['pubsub']['items']['substanzas']
|
|
entries = iq['pubsub']['items']
|
|
for entry in entries:
|
|
item = entry['payload']
|
|
namespace = '{http://www.w3.org/2005/Atom}'
|
|
title = item.find(namespace + 'title')
|
|
title = None if title == None else title.text
|
|
feed_url = 'gemini://schimon.i2p/'
|
|
updated = item.find(namespace + 'updated')
|
|
updated = None if updated == None else updated.text
|
|
# if updated: updated = datetime.datetime.fromisoformat(updated)
|
|
content = item.find(namespace + 'content')
|
|
content = 'No content' if content == None else content.text
|
|
link = item.find(namespace + 'link')
|
|
link = '' if link == None else link.attrib['href']
|
|
author = item.find(namespace + 'author')
|
|
if author and author.attrib: print(author.attrib)
|
|
author = 'None' if author == None else author.text
|
|
# create entry
|
|
feed.add_item(
|
|
description = content,
|
|
# enclosure = feedgenerator.Enclosure(enclosure, enclosure_size, enclosure_type) if args.entry_enclosure else None,
|
|
link = link,
|
|
# pubdate = updated,
|
|
title = title,
|
|
unique_id = link)
|
|
xml_atom = feed.writeString('utf-8')
|
|
xml_atom_extended = append_element(
|
|
xml_atom,
|
|
'generator',
|
|
'XPTA: XMPP PubSub To Atom')
|
|
return xml_atom_extended
|
|
|
|
"""Patch function to append elements which are not provided by feedgenerator"""
|
|
def append_element(xml_data, element, text):
|
|
root = ET.fromstring(xml_data)
|
|
|
|
# Create the generator element
|
|
generator_element = ET.Element(element)
|
|
generator_element.text = text
|
|
|
|
# Append the generator element to the root
|
|
root.append(generator_element)
|
|
|
|
# Return the modified XML as a string
|
|
return ET.tostring(root, encoding='unicode')
|
|
|
|
"""Patch function to append XSLT reference to XML"""
|
|
"""Why is not this a built-in function of ElementTree or LXML"""
|
|
def append_stylesheet(xml_data):
|
|
# Register namespace in order to avoide ns0:
|
|
ET.register_namespace("", "http://www.w3.org/2005/Atom")
|
|
# Load XML from string
|
|
tree = ET.fromstring(xml_data)
|
|
# The following direction removes the XML declaration
|
|
xml_data_no_declaration = ET.tostring(tree, encoding='unicode')
|
|
# Add XML declaration and stylesheet
|
|
xml_data_declaration = ('<?xml version="1.0" encoding="utf-8"?>'
|
|
'<?xml-stylesheet type="text/xsl" href="xsl/stylesheet.xsl"?>' +
|
|
xml_data_no_declaration)
|
|
return xml_data_declaration
|