#!/usr/bin/python # -*- coding: utf-8 -*- import datetime from fastapi import FastAPI, Request, Response from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles import json from slixmpp import ClientXMPP from slixmpp.exceptions import IqError, IqTimeout import xml.etree.ElementTree as ET #import importlib.resources try: import tomllib except: import tomli as tomllib app = FastAPI() class XmppInstance(ClientXMPP): def __init__(self, jid, password): super().__init__(jid, password) self.register_plugin('xep_0060') self.connect() # self.process(forever=False) xmpp = None # Mount static graphic, script and stylesheet directories app.mount("/css", StaticFiles(directory="css"), name="css") app.mount("/data", StaticFiles(directory="data"), name="data") app.mount("/graphic", StaticFiles(directory="graphic"), name="graphic") app.mount("/script", StaticFiles(directory="script"), name="script") app.mount("/xsl", StaticFiles(directory="xsl"), name="xsl") @app.get('/favicon.ico', include_in_schema=False) async def favicon(): return FileResponse('favicon.ico') @app.get('/opml') async def view_pubsub_nodes(request: Request): global xmpp if not xmpp: credentials = get_configuration('account') xmpp = XmppInstance(credentials['xmpp'], credentials['pass']) # xmpp.connect() pubsub = request.query_params.get('pubsub', '') settings = get_configuration('settings') result = None if settings['service']: if settings['include'] in pubsub or not settings['include']: if pubsub: iq = await get_nodes(pubsub) if iq: link = 'xmpp:{pubsub}'.format(pubsub=pubsub) xml_opml = generate_opml(iq) result = append_stylesheet(xml_opml, 'opml.xsl') else: text = 'Please check that PubSub Jabber ID is valid and accessible.' xml_atom = error_message(text) result = append_stylesheet( xml_atom, 'atom.xsl', namespace='http://www.w3.org/2005/Atom') else: text = 'The given domain {} is not allowed.'.format(pubsub) xml_atom = error_message(text) result = append_stylesheet( xml_atom, 'atom.xsl', namespace='http://www.w3.org/2005/Atom') default = get_configuration('default') if not result: if default['pubsub']: if not pubsub: pubsub = default['pubsub'] iq = await get_nodes(pubsub) link = 'xmpp:{pubsub}'.format(pubsub=pubsub) xml_opml = generate_opml(iq) result = append_stylesheet(xml_opml, 'opml.xsl') elif not settings['service']: pubsub = default['pubsub'] link = 'xmpp:{pubsub}'.format(pubsub=pubsub) xml_opml = generate_opml(iq) result = append_stylesheet(xml_opml, 'opml.xsl') else: text = 'Please contact the administrator and ask him to set default PubSub and Node ID.' xml_atom = error_message(text) result = append_stylesheet( xml_atom, 'atom.xsl', namespace='http://www.w3.org/2005/Atom') response = Response(content=result, media_type="application/xml") return response @app.get('/atom') async def view_node_items(request: Request): global xmpp if not xmpp: credentials = get_configuration('account') xmpp = XmppInstance(credentials['xmpp'], credentials['pass']) # xmpp.connect() pubsub = request.query_params.get('pubsub', '') node = request.query_params.get('node', '') item_id = request.query_params.get('item', '') settings = get_configuration('settings') result = None if settings['service']: if settings['include'] in pubsub or not settings['include']: if pubsub and node and item_id: iq = await get_node_item(pubsub, node, item_id) if iq: link = form_an_item_link(pubsub, node, item_id) xml_atom = generate_atom(iq, link) iq = await get_node_items(pubsub, node) if iq: generate_json(iq) else: operator = get_configuration('settings')['operator'] json_data = [{'title' : 'Error retrieving items list.', 'link' : ('javascript:alert("XJP: XMPP Journal Publisher has experienced an error ' 'while attempting to retrieve the list of items for Node {} of PubSub {}.")') .format(node, pubsub)}, {'title' : 'Contact the operator.', 'link' : ('xmpp:{}?message;subject=XJP: XMPP Journal Publisher;body=Greetings! ' 'I am contacting you to inform you that there is an error listing ' 'node items for Node {} on PubSub {}.').format(operator, node, pubsub)}] filename = 'data/{}.json'.format(node) with open(filename, 'w', encoding='utf-8') as f: json.dump(json_data, f, ensure_ascii=False, indent=4) else: text = 'Please check that PubSub node and item are valid and accessible.' xml_atom = error_message(text) result = append_stylesheet( xml_atom, 'atom.xsl', namespace='http://www.w3.org/2005/Atom') # try: # iq = await get_node_items(pubsub, node) # generate_json(iq, node) # except: # operator = get_configuration('settings')['operator'] # json_data = [{'title' : 'Timeout retrieving node items from {}'.format(node), # 'link' : 'xmpp:{}?message'.format(operator)}] # filename = 'data/{}.json'.format(node) # with open(filename, 'w', encoding='utf-8') as f: # json.dump(json_data, f, ensure_ascii=False, indent=4) elif pubsub and node: iq = await get_node_items(pubsub, node) if iq: link = form_a_node_link(pubsub, node) xml_atom = generate_atom(iq, link) else: text = 'Please check that PubSub node is valid and accessible.' xml_atom = error_message(text) result = append_stylesheet( xml_atom, 'atom.xsl', namespace='http://www.w3.org/2005/Atom') elif pubsub: text = 'Node parameter is missing.' xml_atom = error_message(text) result = append_stylesheet( xml_atom, 'atom.xsl', namespace='http://www.w3.org/2005/Atom') elif node: text = 'PubSub parameter is missing.' xml_atom = error_message(text) result = append_stylesheet( xml_atom, 'atom.xsl', namespace='http://www.w3.org/2005/Atom') # else: # result = ('Mandatory parameter PubSub and ' # 'optional parameter Node are missing.') else: text = 'The given domain {} is not allowed.'.format(pubsub) xml_atom = error_message(text) result = append_stylesheet( xml_atom, 'atom.xsl', namespace='http://www.w3.org/2005/Atom') default = get_configuration('default') if not result: if default['pubsub'] and default['nodeid']: if not pubsub and not node: pubsub = default['pubsub'] node = default['nodeid'] iq = await get_node_items(pubsub, node) link = form_a_node_link(pubsub, node) xml_atom = generate_atom(iq, link) result = append_stylesheet( xml_atom, 'atom.xsl', namespace='http://www.w3.org/2005/Atom') elif not settings['service']: pubsub = default['pubsub'] node = default['nodeid'] iq = await get_node_items(pubsub, node) link = form_a_node_link(pubsub, node) xml_atom = generate_atom(iq, link) result = append_stylesheet( xml_atom, 'atom.xsl', namespace='http://www.w3.org/2005/Atom') else: text = 'Please contact the administrator and ask him to set default PubSub and Node ID.' xml_atom = error_message(text) result = append_stylesheet( xml_atom, 'atom.xsl', namespace='http://www.w3.org/2005/Atom') response = Response(content=result, media_type="application/xml") return response def get_configuration(section): with open('configuration.toml', mode="rb") as configuration: result = tomllib.load(configuration)[section] return result #@timeout(5) async def get_node_item(pubsub, node, item_id): try: iq = await xmpp.plugin['xep_0060'].get_item(pubsub, node, item_id, timeout=5) return iq except (IqError, IqTimeout) as e: print(e) async def get_node_items(pubsub, node): try: iq = await xmpp.plugin['xep_0060'].get_items(pubsub, node, timeout=5) return iq except (IqError, IqTimeout) as e: print(e) async def get_nodes(pubsub): try: iq = await xmpp.plugin['xep_0060'].get_nodes(pubsub, timeout=5) return iq except (IqError, IqTimeout) as e: print(e) def form_a_node_link(pubsub, node): link = 'xmpp:{pubsub}?;node={node}'.format(pubsub=pubsub, node=node) return link def form_an_item_link(pubsub, node, item_id): link = 'xmpp:{pubsub}?;node={node};item={item}'.format( pubsub=pubsub, node=node, item=item_id) return link def error_message(text): """Error message in RFC 4287: The Atom Syndication Format.""" title = 'StreamBurner' subtitle = 'XMPP Journal Publisher' description = ('This is a syndication feed generated with XMPP Journal ' 'Publisher, which conveys XEP-0060: Publish-Subscribe ' 'nodes to standard RFC 4287: The Atom Syndication Format.') language = 'en' feed = ET.Element("feed") feed.set('xmlns', 'http://www.w3.org/2005/Atom') ET.SubElement(feed, 'title', {'type': 'text'}).text = title ET.SubElement(feed, 'subtitle', {'type': 'text'}).text = subtitle ET.SubElement(feed, 'author', {'name':'XMPP Journal Publisher','email':'xjp@schimon.i2p'}) ET.SubElement(feed, 'generator', { 'uri': 'https://git.xmpp-it.net/sch/XJP', 'version': '0.1'}).text = 'XMPP Journal Publisher (XJP)' ET.SubElement(feed, 'updated').text = datetime.datetime.now(datetime.UTC).isoformat() entry = ET.SubElement(feed, 'entry') ET.SubElement(entry, 'title').text = 'Error' ET.SubElement(entry, 'id').text = 'xjp-error' ET.SubElement(entry, 'updated').text = datetime.datetime.now(datetime.UTC).isoformat() ET.SubElement(entry, 'published').text = datetime.datetime.now(datetime.UTC).isoformat() # ET.SubElement(entry, 'summary', {'type': summary_type_text}).text = summary_text ET.SubElement(entry, 'content', {'type': 'text'}).text = text return ET.tostring(feed, encoding='unicode') # generate_rfc_4287 def generate_atom(iq, link): """Generate an Atom Syndication Format (RFC 4287) from a Publish-Subscribe (XEP-0060) node items.""" pubsub = iq['from'].bare node = iq['pubsub']['items']['node'] title = node link = link # link = form_a_node_link(pubsub, node) # subtitle = 'XMPP PubSub Syndication Feed' subtitle = pubsub description = ('This is a syndication feed generated with XMPP Journal ' 'Publisher, which conveys XEP-0060: Publish-Subscribe ' 'nodes to standard RFC 4287: The Atom Syndication Format.') language = iq['pubsub']['items']['lang'] items = iq['pubsub']['items'] feed = ET.Element("feed") feed.set('xmlns', 'http://www.w3.org/2005/Atom') ET.SubElement(feed, 'title', {'type': 'text'}).text = title ET.SubElement(feed, 'subtitle', {'type': 'text'}).text = subtitle ET.SubElement(feed, 'link', {'rel': 'self', 'href': link}) ET.SubElement(feed, 'generator', { 'uri': 'https://git.xmpp-it.net/sch/XJP', 'version': '0.1'}).text = 'XMPP Journal Publisher (XJP)' ET.SubElement(feed, 'updated').text = datetime.datetime.now(datetime.UTC).isoformat() for item in items: item_id = item['id'] item_payload = item['payload'] namespace = '{http://www.w3.org/2005/Atom}' title = item_payload.find(namespace + 'title') title_text = None if title == None else title.text # link = item_payload.find(namespace + 'link') # link_href = '' if link == None else link.attrib['href'] link_href = form_an_item_link(pubsub, node, item_id) if not title_text or not link_href: continue content = item_payload.find(namespace + 'content') content_text = 'No content' if content == None else content.text if content: content_type = content.attrib['type'] if 'type' in content.attrib else 'text' content_type_text = 'html' if 'html' in content_type else 'text' else: content_type_text = 'html' published = item_payload.find(namespace + 'published') published_text = None if published == None else published.text updated = item_payload.find(namespace + 'updated') updated_text = None if updated == None else updated.text author = item_payload.find(namespace + 'author') if author and author.attrib: print(author.attrib) identifier = item_payload.find(namespace + 'id') if identifier and identifier.attrib: print(identifier.attrib) identifier_text = 'None' if identifier == None else identifier.text entry = ET.SubElement(feed, 'entry') ET.SubElement(entry, 'title').text = title_text ET.SubElement(entry, 'link', {'href': link_href}) ET.SubElement(entry, 'id').text = identifier_text ET.SubElement(entry, 'updated').text = updated_text ET.SubElement(entry, 'published').text = published_text # ET.SubElement(entry, 'summary', {'type': summary_type_text}).text = summary_text ET.SubElement(entry, 'content', {'type': content_type_text}).text = content_text return ET.tostring(feed, encoding='unicode') def generate_json(iq): """Create a JSON file from node items.""" json_data = [] pubsub = iq['from'].bare node = iq['pubsub']['items']['node'] entries = iq['pubsub']['items'] for entry in entries: item_id = entry['id'] item_payload = entry['payload'] namespace = '{http://www.w3.org/2005/Atom}' title = item_payload.find(namespace + 'title') title_text = '*** No Title ***' if title == None else title.text # updated = item.find(namespace + 'updated') # updated = None if updated == None else updated.text # if updated: updated = datetime.datetime.fromisoformat(updated) link_href = form_an_item_link(pubsub, node, item_id) # link = item.find(namespace + 'link') # link_href = '' if link == None else link.attrib['href'] json_data_entry = {'title' : title_text, 'link' : link_href} json_data.append(json_data_entry) if len(json_data) > 6: break filename = 'data/{}.json'.format(node) with open(filename, 'w', encoding='utf-8') as f: json.dump(json_data, f, ensure_ascii=False, indent=4) """Patch function to append elements which are not provided by feedgenerator""" def append_element(xml_data, element, text): root = ET.fromstring(xml_data) # Create the generator element generator_element = ET.Element(element) generator_element.text = text # Append the generator element to the root root.append(generator_element) # Return the modified XML as a string return ET.tostring(root, encoding='unicode') """Patch function to append XSLT reference to XML""" """Why is not this a built-in function of ElementTree or LXML""" def append_stylesheet(xml_data, filename, namespace=None): # Register namespace in order to avoide ns0: if namespace: ET.register_namespace("", namespace) # Load XML from string tree = ET.fromstring(xml_data) # The following direction removes the XML declaration xml_data_without_a_declaration = ET.tostring(tree, encoding='unicode') # Add XML declaration and stylesheet xml_data_declaration = ( '' ''.format(filename) + xml_data_without_a_declaration) return xml_data_declaration def generate_opml(iq): pubsub = iq['from'].bare items = iq['disco_items']['items'] opml = ET.Element("opml") opml.set("version", "1.0") head = ET.SubElement(opml, "head") ET.SubElement(head, "title").text = pubsub ET.SubElement(head, "description").text = ( "PubSub Nodes of {}").format(pubsub) ET.SubElement(head, "generator").text = 'XMPP Journal Publisher (XJP)' ET.SubElement(head, "urlPublic").text = 'https://git.xmpp-it.net/sch/XJP' time_stamp = datetime.datetime.now(datetime.UTC).isoformat() ET.SubElement(head, "dateCreated").text = time_stamp ET.SubElement(head, "dateModified").text = time_stamp body = ET.SubElement(opml, "body") for item in items: pubsub, node, title = item uri = form_a_node_link(pubsub, node) outline = ET.SubElement(body, "outline") outline.set("text", title or node) outline.set("xmlUrl", uri) return ET.tostring(opml, encoding='unicode')