#!/usr/bin/python # -*- coding: utf-8 -*- import datetime from fastapi import FastAPI, Request, Response from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles import json from os import mkdir from os.path import exists from slixmpp import ClientXMPP from slixmpp.exceptions import IqError, IqTimeout from slixmpp.stanza.iq import Iq import xml.etree.ElementTree as ET #import importlib.resources try: import tomllib except: import tomli as tomllib class XmppInstance(ClientXMPP): def __init__(self, jid, password): super().__init__(jid, password) self.register_plugin('xep_0060') self.connect() # self.process(forever=False) class HttpInstance: app = FastAPI() # Mount static graphic, script and stylesheet directories app.mount("/css", StaticFiles(directory="css"), name="css") app.mount("/data", StaticFiles(directory="data"), name="data") app.mount("/graphic", StaticFiles(directory="graphic"), name="graphic") app.mount("/script", StaticFiles(directory="script"), name="script") app.mount("/xsl", StaticFiles(directory="xsl"), name="xsl") @app.get('/favicon.ico', include_in_schema=False) async def favicon(): return FileResponse('favicon.ico') @app.route('/') @app.get('/opml') async def view_pubsub_nodes(request: Request): credentials = Utilities.get_configuration('account') xmpp = XmppInstance(credentials['xmpp'], credentials['pass']) # xmpp.connect() pubsub = request.query_params.get('pubsub', '') settings = Utilities.get_configuration('settings') result = None if settings['service']: if settings['include'] in pubsub or not settings['include']: if pubsub: iq = await XmppXep0060.get_nodes(xmpp, pubsub) if iq: link = 'xmpp:{pubsub}'.format(pubsub=pubsub) xml_opml = Xml.generate_opml(iq) result = Xml.append_stylesheet(xml_opml, 'opml') else: text = 'Please ensure that PubSub "{}" (Jabber ID) is valid and accessible.'.format(pubsub) xml_atom = Xml.error_message(text) result = Xml.append_stylesheet(xml_atom, 'atom') else: text = 'The given domain {} is not allowed.'.format(pubsub) xml_atom = Xml.error_message(text) result = Xml.append_stylesheet(xml_atom, 'atom') default = Utilities.get_configuration('default') if not result: if default['pubsub']: if not pubsub: pubsub = default['pubsub'] iq = await XmppXep0060.get_nodes(xmpp, pubsub) link = 'xmpp:{pubsub}'.format(pubsub=pubsub) xml_opml = Xml.generate_opml(iq) result = Xml.append_stylesheet(xml_opml, 'opml') elif not settings['service']: pubsub = default['pubsub'] link = 'xmpp:{pubsub}'.format(pubsub=pubsub) xml_opml = Xml.generate_opml(iq) result = Xml.append_stylesheet(xml_opml, 'opml') else: text = 'Please contact the administrator and ask him to set default PubSub and Node ID.' xml_atom = Xml.error_message(text) result = Xml.append_stylesheet(xml_atom, 'atom') xmpp.disconnect() response = Response(content=result, media_type="application/xml") return response @app.get('/atom') async def view_node_items(request: Request): credentials = Utilities.get_configuration('account') xmpp = XmppInstance(credentials['xmpp'], credentials['pass']) # xmpp.connect() pubsub = request.query_params.get('pubsub', '') node = request.query_params.get('node', '') item_id = request.query_params.get('item', '') settings = Utilities.get_configuration('settings') result = None if settings['service']: if settings['include'] in pubsub or not settings['include']: if pubsub and node and item_id: iq = await XmppXep0060.get_node_item(xmpp, pubsub, node, item_id) if iq: link = Utilities.form_an_item_link(pubsub, node, item_id) if 'urn:xmpp:microblog:0:comments/' in node: atom = Xml.extract_atom(iq) xml_atom = Xml.generate_atom_comment(atom, pubsub, node, link) else: atom = Xml.extract_atom(iq) xml_atom = Xml.generate_atom_post(atom, pubsub, node, link) iq = await XmppXep0060.get_node_items(xmpp, pubsub, node) if not '/' in node: if iq: Utilities.generate_json(iq) else: operator = Utilities.get_configuration('settings')['operator'] json_data = [{'title' : 'Error retrieving node items.', 'link' : ('javascript:alert("Rivista has experienced an error ' 'while attempting to retrieve the list of items for ' 'Node {} of PubSub {}.")') .format(node, pubsub)}, {'title' : 'Contact the operator.', 'link' : ('xmpp:{}?message;subject=Rivista;body=Greetings! ' 'I am contacting you to inform you that there is an error listing ' 'node items for Node {} on PubSub {}.').format(operator, node, pubsub)}] filename = 'data/{}.json'.format(node) with open(filename, 'w', encoding='utf-8') as f: json.dump(json_data, f, ensure_ascii=False, indent=4) else: text = 'Please ensure that PubSub node "{}" and item "{}" are valid and accessible.'.format(node, item_id) xml_atom = Xml.error_message(text) result = Xml.append_stylesheet(xml_atom, 'atom') # try: # iq = await XmppXep0060.get_node_items(xmpp, pubsub, node) # Utilities.generate_json(iq, node) # except: # operator = Utilities.get_configuration('settings')['operator'] # json_data = [{'title' : 'Timeout retrieving node items from {}'.format(node), # 'link' : 'xmpp:{}?message'.format(operator)}] # filename = 'data/{}.json'.format(node) # with open(filename, 'w', encoding='utf-8') as f: # json.dump(json_data, f, ensure_ascii=False, indent=4) elif pubsub and node: iq = await XmppXep0060.get_node_items(xmpp, pubsub, node) if iq: link = Utilities.form_a_node_link(pubsub, node) if 'urn:xmpp:microblog:0:comments/' in node: atom = Xml.extract_atom(iq) xml_atom = Xml.generate_atom_comment(atom, pubsub, node, link) else: atom = Xml.extract_atom(iq) xml_atom = Xml.generate_atom_post(atom, pubsub, node, link) else: text = 'Please ensure that PubSub node "{}" is valid and accessible.'.format(node) xml_atom = Xml.error_message(text) result = Xml.append_stylesheet(xml_atom, 'atom') elif pubsub: text = 'Node parameter is missing.' xml_atom = Xml.error_message(text) result = Xml.append_stylesheet(xml_atom, 'atom') elif node: text = 'PubSub parameter is missing.' xml_atom = Xml.error_message(text) result = Xml.append_stylesheet(xml_atom, 'atom') # else: # text = ('Mandatory parameter PubSub and ' # 'optional parameter Node are missing.') # xml_atom = Xml.error_message(text) # result = Xml.append_stylesheet(xml_atom, 'atom') else: text = 'The given domain {} is not allowed.'.format(pubsub) xml_atom = Xml.error_message(text) result = Xml.append_stylesheet(xml_atom, 'atom') if not result: default = Utilities.get_configuration('default') if default['pubsub'] and default['nodeid']: if not pubsub and not node: pubsub = default['pubsub'] node = default['nodeid'] iq = await XmppXep0060.get_node_items(xmpp, pubsub, node) if iq: link = Utilities.form_a_node_link(pubsub, node) atom = Xml.extract_atom(iq) xml_atom = Xml.generate_atom_post(atom, pubsub, node, link) else: text = 'Please ensure that PubSub node "{}" is valid and accessible.'.format(node) xml_atom = Xml.error_message(text) elif not settings['service']: pubsub = default['pubsub'] node = default['nodeid'] iq = await XmppXep0060.get_node_items(xmpp, pubsub, node) if iq: link = Utilities.form_a_node_link(pubsub, node) atom = Xml.extract_atom(iq) xml_atom = Xml.generate_atom_post(atom, pubsub, node, link) else: text = 'Please ensure that PubSub node "{}" is valid and accessible.'.format(node) xml_atom = Xml.error_message(text) result = Xml.append_stylesheet(xml_atom, 'atom') else: text = 'Please contact the administrator and ask him to set default PubSub and Node ID.' xml_atom = Xml.error_message(text) result = Xml.append_stylesheet(xml_atom, 'atom') xmpp.disconnect() response = Response(content=result, media_type="application/xml") return response class XmppXep0060: async def get_node_item(self, pubsub, node, item_id): try: iq = await self.plugin['xep_0060'].get_item(pubsub, node, item_id, timeout=5) return iq except (IqError, IqTimeout) as e: print(e) async def get_node_items(self, pubsub, node): try: iq = await self.plugin['xep_0060'].get_items(pubsub, node, timeout=5) return iq except (IqError, IqTimeout) as e: print(e) async def get_nodes(self, pubsub): try: iq = await self.plugin['xep_0060'].get_nodes(pubsub, timeout=5) return iq except (IqError, IqTimeout) as e: print(e) class Xml: def error_message(text): """Error message in RFC 4287: The Atom Syndication Format.""" title = 'Rivista' subtitle = 'XMPP Journal Publisher' description = ('This is a syndication feed generated with Rivista, an XMPP ' 'Journal Publisher, which conveys XEP-0060: Publish-' 'Subscribe nodes to standard RFC 4287: The Atom Syndication ' 'Format.') language = 'en' feed = ET.Element("feed") feed.set('xmlns', 'http://www.w3.org/2005/Atom') ET.SubElement(feed, 'title', {'type': 'text'}).text = title ET.SubElement(feed, 'subtitle', {'type': 'text'}).text = subtitle ET.SubElement(feed, 'author', {'name':'Rivista','email':'rivista@schimon.i2p'}) ET.SubElement(feed, 'generator', { 'uri': 'https://git.xmpp-it.net/sch/Rivista', 'version': '0.1'}).text = 'Rivista XJP' ET.SubElement(feed, 'updated').text = datetime.datetime.now(datetime.UTC).isoformat() entry = ET.SubElement(feed, 'entry') ET.SubElement(entry, 'title').text = 'Error' ET.SubElement(entry, 'id').text = 'rivista-error' ET.SubElement(entry, 'updated').text = datetime.datetime.now(datetime.UTC).isoformat() ET.SubElement(entry, 'published').text = datetime.datetime.now(datetime.UTC).isoformat() # ET.SubElement(entry, 'summary', {'type': summary_type_text}).text = summary_text ET.SubElement(entry, 'content', {'type': 'text'}).text = text return ET.tostring(feed, encoding='unicode') """Patch function to append XSLT reference to XML""" """Why is not this a built-in function of ElementTree or LXML""" def append_stylesheet(xml_data, type): # Register namespace in order to avoide ns0: if type == 'atom': ET.register_namespace('', 'http://www.w3.org/2005/Atom') # Load XML from string tree = ET.fromstring(xml_data) # The following direction removes the XML declaration xml_data_without_a_declaration = ET.tostring(tree, encoding='unicode') # Add XML declaration and stylesheet xml_data_declaration = ( '' ''.format(type) + xml_data_without_a_declaration) return xml_data_declaration def generate_opml(iq): pubsub = iq['from'].bare items = iq['disco_items']['items'] opml = ET.Element("opml") opml.set("version", "1.0") head = ET.SubElement(opml, "head") ET.SubElement(head, "title").text = 'An OPML of ' + pubsub ET.SubElement(head, "description").text = ( "PubSub Nodes of {}").format(pubsub) ET.SubElement(head, "generator").text = 'Rivista' ET.SubElement(head, "urlPublic").text = 'https://git.xmpp-it.net/sch/Rivista' time_stamp = datetime.datetime.now(datetime.UTC).isoformat() ET.SubElement(head, "dateCreated").text = time_stamp ET.SubElement(head, "dateModified").text = time_stamp body = ET.SubElement(opml, "body") for item in items: pubsub, node, title = item uri = Utilities.form_a_node_link(pubsub, node) outline = ET.SubElement(body, "outline") outline.set("text", title or node) outline.set("xmlUrl", uri) return ET.tostring(opml, encoding='unicode') def extract_atom(iq: Iq): """Extract data from an Atom Syndication Format (RFC 4287) of a Publish-Subscribe (XEP-0060) node item.""" jid = iq['from'].bare node = iq['pubsub']['items']['node'] atom = {} atom['title'] = jid atom['subtitle'] = node atom['language'] = iq['pubsub']['items']['lang'] atom['items'] = [] items = iq['pubsub']['items'] for item in list(items)[::-1]: atom_item = {} item_payload = item['payload'] namespace = '{http://www.w3.org/2005/Atom}' title = item_payload.find(namespace + 'title') links = item_payload.find(namespace + 'link') if (not isinstance(title, ET.Element) and not isinstance(links, ET.Element)): continue title_text = 'No title' if title == None else title.text atom_item['title'] = title_text if isinstance(links, ET.Element): atom_item['links'] = [] for link in item_payload.findall(namespace + 'link'): link_href = link.attrib['href'] if 'href' in link.attrib else '' link_type = link.attrib['type'] if 'type' in link.attrib else '' link_rel = link.attrib['rel'] if 'rel' in link.attrib else '' atom_item['links'].append({'href': link_href, 'rel': link_rel, 'type': link_type}) contents = item_payload.find(namespace + 'content') atom_item['contents'] = [] if isinstance(contents, ET.Element): for content in item_payload.findall(namespace + 'content'): if not content.text: continue content_text = content.text content_type = content.attrib['type'] if 'type' in content.attrib else 'html' content_type_text = 'html' if 'html' in content_type else 'text' atom_item['contents'].append({'text' : content_text, 'type' : content_type_text}) else: summary = item_payload.find(namespace + 'summary') summary_text = summary.text if summary else None if summary_text: summary_type = summary.attrib['type'] if 'type' in summary.attrib else 'html' summary_type_text = 'html' if 'html' in summary_type else 'text' atom_item['contents'].append(summary_text) # else: # atom_item['contents'].append('No content.') published = item_payload.find(namespace + 'published') published_text = '' if published == None else published.text atom_item['published'] = published_text updated = item_payload.find(namespace + 'updated') updated_text = '' if updated == None else updated.text atom_item['updated'] = updated_text atom_item['authors'] = [] authors = item_payload.find(namespace + 'author') if isinstance(authors, ET.Element): for author in item_payload.findall(namespace + 'author'): atom_item_author = {} author_email = author.find(namespace + 'email') if author_email is not None: author_email_text = author_email.text if author_email_text: atom_item_author['email'] = author_email_text else: author_email_text = None author_uri = author.find(namespace + 'uri') if author_uri is not None: author_uri_text = author_uri.text if author_uri_text: atom_item_author['uri'] = author_uri_text else: author_uri_text = None author_name = author.find(namespace + 'name') if author_name is not None and author_name.text: author_name_text = author_name.text else: author_name_text = author_uri_text or author_email_text atom_item_author['name'] = author_name_text atom_item['authors'].append(atom_item_author) categories = item_payload.find(namespace + 'category') atom_item['categories'] = [] if isinstance(categories, ET.Element): for category in item_payload.findall(namespace + 'category'): if 'term' in category.attrib and category.attrib['term']: category_term = category.attrib['term'] atom_item['categories'].append(category_term) identifier = item_payload.find(namespace + 'id') if identifier is not None and identifier.attrib: print(identifier.attrib) identifier_text = item['id'] if identifier == None else identifier.text atom_item['id'] = identifier_text #atom_item['id'] = item['id'] atom['items'].append(atom_item) return atom def generate_xhtml(atom: dict): """Generate an XHTML document.""" e_html = ET.Element('html') e_html.set('xmlns', 'http://www.w3.org/1999/xhtml') e_head = ET.SubElement(e_html, 'head') ET.SubElement(e_head, 'title').text = atom['title'] ET.SubElement(e_head, 'link', {'rel': 'stylesheet', 'href': 'pubsub.css'}) e_body = ET.SubElement(e_html, "body") ET.SubElement(e_body, "h1").text = atom['title'] ET.SubElement(e_body, "h2").text = atom['subtitle'] for item in atom['items']: item_id = item['id'] title = item['title'] links = item['links'] e_article = ET.SubElement(e_body, 'article') e_title = ET.SubElement(e_article, 'h3') e_title.text = item['title'] e_title.set('id', item['id']) e_date = ET.SubElement(e_article, 'h4') e_date.text = item['published'] e_date.set('title', 'Updated: ' + item['updated']) authors = item['authors'] if authors: e_authors = ET.SubElement(e_article, "dl") ET.SubElement(e_authors, "dt").text = 'Authors' for author in authors: e_dd = ET.SubElement(e_authors, 'dd') e_author = ET.SubElement(e_dd, 'a') e_author.text = author['name'] or author['uri'] or author['email'] if 'email' in author and author['email']: e_author.set('href', 'mailto:' + author['email']) elif 'uri' in author and author['uri']: e_author.set('href', author['uri']) for content in item['contents']: ET.SubElement(e_article, 'p', {'type': content['type']}).text = content['text'] if links: e_links = ET.SubElement(e_article, "dl") e_links.set('class', 'links') ET.SubElement(e_links, "dt").text = 'Links' for link in links: e_dd = ET.SubElement(e_links, 'dd') e_link = ET.SubElement(e_dd, 'a') e_link.set('href', link['href']) e_link.text = link['rel'] if link['type']: ET.SubElement(e_dd, 'span').text = link['type'] categories = item['categories'] if categories: e_categories = ET.SubElement(e_article, "dl") e_categories.set('class', 'categories') ET.SubElement(e_categories, "dt").text = 'Categories' for category in categories: ET.SubElement(e_categories, 'dd').text = category return ET.tostring(e_html, encoding='unicode') # generate_rfc_4287 def generate_atom_post(atom: dict, pubsub: str, node: str, link: str): """Generate an Atom Syndication Format (RFC 4287) from a Publish-Subscribe (XEP-0060) node items.""" # link = Utilities.form_a_node_link(pubsub, node) # subtitle = 'XMPP PubSub Syndication Feed' description = ('This is a syndication feed generated with Rivista, an XMPP ' 'Journal Publisher, which conveys XEP-0060: Publish-' 'Subscribe nodes to standard RFC 4287: The Atom Syndication ' 'Format.') e_feed = ET.Element("feed") e_feed.set('xmlns', 'http://www.w3.org/2005/Atom') ET.SubElement(e_feed, 'title', {'type': 'text'}).text = atom['title'] ET.SubElement(e_feed, 'subtitle', {'type': 'text'}).text = atom['subtitle'] ET.SubElement(e_feed, 'link', {'rel': 'self', 'href': link}) ET.SubElement(e_feed, 'generator', { 'uri': 'https://git.xmpp-it.net/sch/Rivista', 'version': '0.1'}).text = 'Rivista XJP' ET.SubElement(e_feed, 'updated').text = datetime.datetime.now(datetime.UTC).isoformat() for item in atom['items']: e_entry = ET.SubElement(e_feed, 'entry') ET.SubElement(e_entry, 'title').text = item['title'] links = item['links'] if links: for link in links: ET.SubElement(e_entry, 'link', {'href': link['href'], 'rel': link['rel'], 'type': link['type']}) else: # NOTE What does this instruction line do? ET.SubElement(e_entry, 'content', {'href': ''}) link_xmpp = Utilities.form_an_item_link(pubsub, node, item['id']) ET.SubElement(e_entry, 'link', {'href': link_xmpp, 'rel': 'alternate', 'type': 'x-scheme-handler/xmpp'}) contents = item['contents'] if contents: for content in contents: ET.SubElement(e_entry, 'content', {'type': content['type']}).text = content['text'] else: ET.SubElement(e_entry, 'content').text = 'No content.' ET.SubElement(e_entry, 'published').text = item['published'] ET.SubElement(e_entry, 'updated').text = item['updated'] authors = item['authors'] if authors: for author in authors: e_author = ET.SubElement(e_entry, 'author') if 'email' in author and author['email']: ET.SubElement(e_author, 'email').text = author['email'] if 'uri' in author and author['uri']: ET.SubElement(e_entry, 'uri').text = author['uri'] ET.SubElement(e_author, 'uri').text = author['uri'] ET.SubElement(e_author, 'name').text = author['name'] or author['uri'] or author['email'] categories = item['categories'] if categories: for category in categories: ET.SubElement(e_entry, 'category', {'term': category}) ET.SubElement(e_entry, 'id').text = item['id'] return ET.tostring(e_feed, encoding='unicode') # generate_rfc_4287 def generate_atom_comment(atom: dict, pubsub: str, node: str, link: str): """Generate an Atom Syndication Format (RFC 4287) from a Publish-Subscribe (XEP-0060) node items.""" # link = Utilities.form_a_node_link(pubsub, node) # subtitle = 'XMPP PubSub Syndication Feed' description = ('This is a syndication feed generated with Rivista, an XMPP ' 'Journal Publisher, which conveys XEP-0060: Publish-' 'Subscribe nodes to standard RFC 4287: The Atom Syndication ' 'Format.') e_feed = ET.Element("feed") e_feed.set('xmlns', 'http://www.w3.org/2005/Atom') ET.SubElement(e_feed, 'title', {'type': 'text'}).text = atom['title'] ET.SubElement(e_feed, 'subtitle', {'type': 'text'}).text = atom['subtitle'] ET.SubElement(e_feed, 'link', {'rel': 'self', 'href': link}) ET.SubElement(e_feed, 'generator', { 'uri': 'https://git.xmpp-it.net/sch/Rivista', 'version': '0.1'}).text = 'Rivista XJP' ET.SubElement(e_feed, 'updated').text = datetime.datetime.now(datetime.UTC).isoformat() for item in atom['items']: e_entry = ET.SubElement(e_feed, 'entry') ET.SubElement(e_entry, 'title').text = item['authors'][0]['name'] links = item['links'] if links: for link in links: ET.SubElement(e_entry, 'link', {'href': link['href'], 'rel': link['rel'], 'type': link['type']}) else: # NOTE What does this instruction line do? ET.SubElement(e_entry, 'content', {'href': ''}) link_xmpp = Utilities.form_an_item_link(pubsub, node, item['id']) ET.SubElement(e_entry, 'link', {'href': link_xmpp, 'rel': 'alternate', 'type': 'x-scheme-handler/xmpp'}) contents = item['contents'] if contents: for content in contents: ET.SubElement(e_entry, 'content', {'type': content['type']}).text = content['text'] else: ET.SubElement(e_entry, 'content').text = 'No content.' ET.SubElement(e_entry, 'published').text = item['published'] ET.SubElement(e_entry, 'updated').text = item['updated'] authors = item['authors'] if authors: for author in authors: e_author = ET.SubElement(e_entry, 'author') if 'email' in author and author['email']: ET.SubElement(e_author, 'email').text = author['email'] if 'uri' in author and author['uri']: ET.SubElement(e_entry, 'uri').text = author['uri'] ET.SubElement(e_author, 'uri').text = author['uri'] ET.SubElement(e_author, 'name').text = author['name'] or author['uri'] or author['email'] categories = item['categories'] if categories: for category in categories: ET.SubElement(e_entry, 'category', {'term': category}) ET.SubElement(e_entry, 'id').text = item['id'] return ET.tostring(e_feed, encoding='unicode') class Utilities: def get_configuration(section): with open('configuration.toml', mode="rb") as configuration: result = tomllib.load(configuration)[section] return result def form_a_node_link(pubsub, node): link = 'xmpp:{pubsub}?;node={node}'.format(pubsub=pubsub, node=node) return link def form_an_item_link(pubsub, node, item_id): link = 'xmpp:{pubsub}?;node={node};item={item}'.format( pubsub=pubsub, node=node, item=item_id) return link def generate_json(iq): """Create a JSON file from node items.""" json_data = [] pubsub = iq['from'].bare node = iq['pubsub']['items']['node'] entries = iq['pubsub']['items'] for entry in entries: item_id = entry['id'] item_payload = entry['payload'] namespace = '{http://www.w3.org/2005/Atom}' title = item_payload.find(namespace + 'title') title_text = '*** No Title ***' if title == None else title.text # updated = item.find(namespace + 'updated') # updated = None if updated == None else updated.text # if updated: updated = datetime.datetime.fromisoformat(updated) link_href = Utilities.form_an_item_link(pubsub, node, item_id) # link = item.find(namespace + 'link') # link_href = '' if link == None else link.attrib['href'] json_data_entry = {'title' : title_text, 'link' : link_href} json_data.append(json_data_entry) #if len(json_data) > 6: break directory = 'data/{}/'.format(pubsub) if not exists(directory): mkdir(directory) filename = 'data/{}/{}.json'.format(pubsub, node) with open(filename, 'w', encoding='utf-8') as f: json.dump(json_data, f, ensure_ascii=False, indent=4) def main(): http_instance = HttpInstance() return http_instance.app app = main()