diff --git a/pubsub_to_atom.py b/pubsub_to_atom.py index 23af1ef..b14a998 100644 --- a/pubsub_to_atom.py +++ b/pubsub_to_atom.py @@ -10,6 +10,7 @@ from os import mkdir from os.path import exists from slixmpp import ClientXMPP from slixmpp.exceptions import IqError, IqTimeout +from slixmpp.stanza.iq import Iq import xml.etree.ElementTree as ET #import importlib.resources @@ -25,555 +26,602 @@ class XmppInstance(ClientXMPP): self.connect() # self.process(forever=False) -xmpp = None +class HttpInstance: -app = FastAPI() - -# Mount static graphic, script and stylesheet directories -app.mount("/css", StaticFiles(directory="css"), name="css") -app.mount("/data", StaticFiles(directory="data"), name="data") -app.mount("/graphic", StaticFiles(directory="graphic"), name="graphic") -app.mount("/script", StaticFiles(directory="script"), name="script") -app.mount("/xsl", StaticFiles(directory="xsl"), name="xsl") - -@app.get('/favicon.ico', include_in_schema=False) -async def favicon(): - return FileResponse('favicon.ico') - -@app.route('/') -@app.get('/opml') -async def view_pubsub_nodes(request: Request): - global xmpp - if not xmpp: - credentials = get_configuration('account') + app = FastAPI() + + # Mount static graphic, script and stylesheet directories + app.mount("/css", StaticFiles(directory="css"), name="css") + app.mount("/data", StaticFiles(directory="data"), name="data") + app.mount("/graphic", StaticFiles(directory="graphic"), name="graphic") + app.mount("/script", StaticFiles(directory="script"), name="script") + app.mount("/xsl", StaticFiles(directory="xsl"), name="xsl") + + @app.get('/favicon.ico', include_in_schema=False) + async def favicon(): + return FileResponse('favicon.ico') + + @app.route('/') + @app.get('/opml') + async def view_pubsub_nodes(request: Request): + credentials = Utilities.get_configuration('account') xmpp = XmppInstance(credentials['xmpp'], credentials['pass']) # xmpp.connect() - - pubsub = request.query_params.get('pubsub', '') - settings = get_configuration('settings') - result = None - if settings['service']: - if settings['include'] in pubsub or not settings['include']: - if pubsub: - iq = await get_nodes(pubsub) - if iq: + + pubsub = request.query_params.get('pubsub', '') + settings = Utilities.get_configuration('settings') + result = None + if settings['service']: + if settings['include'] in pubsub or not settings['include']: + if pubsub: + iq = await XmppXep0060.get_nodes(xmpp, pubsub) + if iq: + link = 'xmpp:{pubsub}'.format(pubsub=pubsub) + xml_opml = Xml.generate_opml(iq) + result = Xml.append_stylesheet(xml_opml, 'opml') + else: + text = 'Please ensure that PubSub "{}" (Jabber ID) is valid and accessible.'.format(pubsub) + xml_atom = Xml.error_message(text) + result = Xml.append_stylesheet(xml_atom, 'atom') + else: + text = 'The given domain {} is not allowed.'.format(pubsub) + xml_atom = Xml.error_message(text) + result = Xml.append_stylesheet(xml_atom, 'atom') + default = Utilities.get_configuration('default') + if not result: + if default['pubsub']: + if not pubsub: + pubsub = default['pubsub'] + iq = await XmppXep0060.get_nodes(xmpp, pubsub) link = 'xmpp:{pubsub}'.format(pubsub=pubsub) - xml_opml = generate_opml(iq) - result = append_stylesheet(xml_opml, 'opml') - else: - text = 'Please ensure that PubSub "{}" (Jabber ID) is valid and accessible.'.format(pubsub) - xml_atom = error_message(text) - result = append_stylesheet(xml_atom, 'atom') - else: - text = 'The given domain {} is not allowed.'.format(pubsub) - xml_atom = error_message(text) - result = append_stylesheet(xml_atom, 'atom') - default = get_configuration('default') - if not result: - if default['pubsub']: - if not pubsub: - pubsub = default['pubsub'] - iq = await get_nodes(pubsub) - link = 'xmpp:{pubsub}'.format(pubsub=pubsub) - xml_opml = generate_opml(iq) - result = append_stylesheet(xml_opml, 'opml') - elif not settings['service']: - pubsub = default['pubsub'] - link = 'xmpp:{pubsub}'.format(pubsub=pubsub) - xml_opml = generate_opml(iq) - result = append_stylesheet(xml_opml, 'opml') - else: - text = 'Please contact the administrator and ask him to set default PubSub and Node ID.' - xml_atom = error_message(text) - result = append_stylesheet(xml_atom, 'atom') - response = Response(content=result, media_type="application/xml") - return response - -@app.get('/atom') -async def view_node_items(request: Request): - global xmpp - if not xmpp: - credentials = get_configuration('account') + xml_opml = Xml.generate_opml(iq) + result = Xml.append_stylesheet(xml_opml, 'opml') + elif not settings['service']: + pubsub = default['pubsub'] + link = 'xmpp:{pubsub}'.format(pubsub=pubsub) + xml_opml = Xml.generate_opml(iq) + result = Xml.append_stylesheet(xml_opml, 'opml') + else: + text = 'Please contact the administrator and ask him to set default PubSub and Node ID.' + xml_atom = Xml.error_message(text) + result = Xml.append_stylesheet(xml_atom, 'atom') + xmpp.disconnect() + response = Response(content=result, media_type="application/xml") + return response + + @app.get('/atom') + async def view_node_items(request: Request): + credentials = Utilities.get_configuration('account') xmpp = XmppInstance(credentials['xmpp'], credentials['pass']) # xmpp.connect() - - pubsub = request.query_params.get('pubsub', '') - node = request.query_params.get('node', '') - item_id = request.query_params.get('item', '') - settings = get_configuration('settings') - result = None - if settings['service']: - if settings['include'] in pubsub or not settings['include']: - if pubsub and node and item_id: - iq = await get_node_item(pubsub, node, item_id) - if iq: - link = form_an_item_link(pubsub, node, item_id) - xml_atom = generate_atom_comment(iq, link) if 'urn:xmpp:microblog:0:comments/' in node else generate_atom_post(iq, link) - iq = await get_node_items(pubsub, node) - if not '/' in node: - if iq: - generate_json(iq) + + pubsub = request.query_params.get('pubsub', '') + node = request.query_params.get('node', '') + item_id = request.query_params.get('item', '') + settings = Utilities.get_configuration('settings') + result = None + if settings['service']: + if settings['include'] in pubsub or not settings['include']: + if pubsub and node and item_id: + iq = await XmppXep0060.get_node_item(xmpp, pubsub, node, item_id) + if iq: + link = Utilities.form_an_item_link(pubsub, node, item_id) + if 'urn:xmpp:microblog:0:comments/' in node: + atom = Xml.extract_atom(iq) + xml_atom = Xml.generate_atom_comment(atom, pubsub, node, link) else: - operator = get_configuration('settings')['operator'] - json_data = [{'title' : 'Error retrieving node items.', - 'link' : ('javascript:alert("Rivista has experienced an error ' - 'while attempting to retrieve the list of items for ' - 'Node {} of PubSub {}.")') - .format(node, pubsub)}, - {'title' : 'Contact the operator.', - 'link' : ('xmpp:{}?message;subject=Rivista;body=Greetings! ' - 'I am contacting you to inform you that there is an error listing ' - 'node items for Node {} on PubSub {}.').format(operator, node, pubsub)}] - filename = 'data/{}.json'.format(node) - with open(filename, 'w', encoding='utf-8') as f: - json.dump(json_data, f, ensure_ascii=False, indent=4) - else: - text = 'Please ensure that PubSub node "{}" and item "{}" are valid and accessible.'.format(node, item_id) - xml_atom = error_message(text) - result = append_stylesheet(xml_atom, 'atom') - - # try: - # iq = await get_node_items(pubsub, node) - # generate_json(iq, node) - # except: - # operator = get_configuration('settings')['operator'] - # json_data = [{'title' : 'Timeout retrieving node items from {}'.format(node), - # 'link' : 'xmpp:{}?message'.format(operator)}] - # filename = 'data/{}.json'.format(node) - # with open(filename, 'w', encoding='utf-8') as f: - # json.dump(json_data, f, ensure_ascii=False, indent=4) - elif pubsub and node: - iq = await get_node_items(pubsub, node) - if iq: - link = form_a_node_link(pubsub, node) - xml_atom = generate_atom_comment(iq, link) if 'urn:xmpp:microblog:0:comments/' in node else generate_atom_post(iq, link) - else: - text = 'Please ensure that PubSub node "{}" is valid and accessible.'.format(node) - xml_atom = error_message(text) - result = append_stylesheet(xml_atom, 'atom') - elif pubsub: - text = 'Node parameter is missing.' - xml_atom = error_message(text) - result = append_stylesheet(xml_atom, 'atom') - elif node: - text = 'PubSub parameter is missing.' - xml_atom = error_message(text) - result = append_stylesheet(xml_atom, 'atom') -# else: -# text = ('Mandatory parameter PubSub and ' -# 'optional parameter Node are missing.') -# xml_atom = error_message(text) -# result = append_stylesheet(xml_atom, 'atom') - else: - text = 'The given domain {} is not allowed.'.format(pubsub) - xml_atom = error_message(text) - result = append_stylesheet(xml_atom, 'atom') - if not result: - default = get_configuration('default') - if default['pubsub'] and default['nodeid']: - if not pubsub and not node: - pubsub = default['pubsub'] - node = default['nodeid'] - iq = await get_node_items(pubsub, node) - if iq: - link = form_a_node_link(pubsub, node) - xml_atom = generate_atom_post(iq, link) - else: - text = 'Please ensure that PubSub node "{}" is valid and accessible.'.format(node) - xml_atom = error_message(text) - elif not settings['service']: - pubsub = default['pubsub'] - node = default['nodeid'] - iq = await get_node_items(pubsub, node) - if iq: - link = form_a_node_link(pubsub, node) - xml_atom = generate_atom_post(iq, link) - else: - text = 'Please ensure that PubSub node "{}" is valid and accessible.'.format(node) - xml_atom = error_message(text) - result = append_stylesheet(xml_atom, 'atom') - else: - text = 'Please contact the administrator and ask him to set default PubSub and Node ID.' - xml_atom = error_message(text) - result = append_stylesheet(xml_atom, 'atom') - response = Response(content=result, media_type="application/xml") - return response - -def get_configuration(section): - with open('configuration.toml', mode="rb") as configuration: - result = tomllib.load(configuration)[section] - return result - -#@timeout(5) -async def get_node_item(pubsub, node, item_id): - try: - iq = await xmpp.plugin['xep_0060'].get_item(pubsub, node, item_id, timeout=5) - return iq - except (IqError, IqTimeout) as e: - print(e) - -async def get_node_items(pubsub, node): - try: - iq = await xmpp.plugin['xep_0060'].get_items(pubsub, node, timeout=5) - return iq - except (IqError, IqTimeout) as e: - print(e) - -async def get_nodes(pubsub): - try: - iq = await xmpp.plugin['xep_0060'].get_nodes(pubsub, timeout=5) - return iq - except (IqError, IqTimeout) as e: - print(e) - -def form_a_node_link(pubsub, node): - link = 'xmpp:{pubsub}?;node={node}'.format(pubsub=pubsub, node=node) - return link - -def form_an_item_link(pubsub, node, item_id): - link = 'xmpp:{pubsub}?;node={node};item={item}'.format( - pubsub=pubsub, node=node, item=item_id) - return link - -def error_message(text): - """Error message in RFC 4287: The Atom Syndication Format.""" - title = 'Rivista' - subtitle = 'XMPP Journal Publisher' - description = ('This is a syndication feed generated with Rivista, an XMPP ' - 'Journal Publisher, which conveys XEP-0060: Publish-' - 'Subscribe nodes to standard RFC 4287: The Atom Syndication ' - 'Format.') - language = 'en' - feed = ET.Element("feed") - feed.set('xmlns', 'http://www.w3.org/2005/Atom') - ET.SubElement(feed, 'title', {'type': 'text'}).text = title - ET.SubElement(feed, 'subtitle', {'type': 'text'}).text = subtitle - ET.SubElement(feed, 'author', {'name':'Rivista','email':'rivista@schimon.i2p'}) - ET.SubElement(feed, 'generator', { - 'uri': 'https://git.xmpp-it.net/sch/Rivista', - 'version': '0.1'}).text = 'Rivista XJP' - ET.SubElement(feed, 'updated').text = datetime.datetime.now(datetime.UTC).isoformat() - entry = ET.SubElement(feed, 'entry') - ET.SubElement(entry, 'title').text = 'Error' - ET.SubElement(entry, 'id').text = 'rivista-error' - ET.SubElement(entry, 'updated').text = datetime.datetime.now(datetime.UTC).isoformat() - ET.SubElement(entry, 'published').text = datetime.datetime.now(datetime.UTC).isoformat() - # ET.SubElement(entry, 'summary', {'type': summary_type_text}).text = summary_text - ET.SubElement(entry, 'content', {'type': 'text'}).text = text - return ET.tostring(feed, encoding='unicode') - -# generate_rfc_4287 -def generate_atom_post(iq, link): - """Generate an Atom Syndication Format (RFC 4287) from a Publish-Subscribe (XEP-0060) node items.""" - pubsub = iq['from'].bare - node = iq['pubsub']['items']['node'] - title = node - link = link - # link = form_a_node_link(pubsub, node) - # subtitle = 'XMPP PubSub Syndication Feed' - subtitle = pubsub - description = ('This is a syndication feed generated with Rivista, an XMPP ' - 'Journal Publisher, which conveys XEP-0060: Publish-' - 'Subscribe nodes to standard RFC 4287: The Atom Syndication ' - 'Format.') - language = iq['pubsub']['items']['lang'] - items = iq['pubsub']['items'] - e_feed = ET.Element("feed") - e_feed.set('xmlns', 'http://www.w3.org/2005/Atom') - ET.SubElement(e_feed, 'title', {'type': 'text'}).text = title - ET.SubElement(e_feed, 'subtitle', {'type': 'text'}).text = subtitle - ET.SubElement(e_feed, 'link', {'rel': 'self', 'href': link}) - ET.SubElement(e_feed, 'generator', { - 'uri': 'https://git.xmpp-it.net/sch/Rivista', - 'version': '0.1'}).text = 'Rivista XJP' - ET.SubElement(e_feed, 'updated').text = datetime.datetime.now(datetime.UTC).isoformat() - for item in list(items)[::-1]: - item_id = item['id'] - item_payload = item['payload'] - namespace = '{http://www.w3.org/2005/Atom}' - title = item_payload.find(namespace + 'title') - links = item_payload.find(namespace + 'link') - if (not isinstance(title, ET.Element) and - not isinstance(links, ET.Element)): continue - e_entry = ET.SubElement(e_feed, 'entry') - title_text = None if title == None else title.text - ET.SubElement(e_entry, 'title').text = title_text - if isinstance(links, ET.Element): - for link in item_payload.findall(namespace + 'link'): - link_href = link.attrib['href'] if 'href' in link.attrib else '' - link_type = link.attrib['type'] if 'type' in link.attrib else '' - link_rel = link.attrib['rel'] if 'rel' in link.attrib else '' - ET.SubElement(e_entry, 'link', {'href': link_href, 'rel': link_rel, 'type': link_type}) - else: - ET.SubElement(e_entry, 'content', {'href': ''}) - link_xmpp = form_an_item_link(pubsub, node, item_id) - ET.SubElement(e_entry, 'link', {'href': link_xmpp, 'rel': 'alternate', 'type': 'x-scheme-handler/xmpp'}) - contents = item_payload.find(namespace + 'content') - if isinstance(contents, ET.Element): - for content in item_payload.findall(namespace + 'content'): - if not content.text: continue - content_text = content.text - content_type = content.attrib['type'] if 'type' in content.attrib else 'html' - content_type_text = 'html' if 'html' in content_type else 'text' - ET.SubElement(e_entry, 'content', {'type': content_type_text}).text = content_text - else: - summary = item_payload.find(namespace + 'summary') - summary_text = summary.text if summary else None - if summary_text: - summary_type = summary.attrib['type'] if 'type' in summary.attrib else 'html' - summary_type_text = 'html' if 'html' in summary_type else 'text' - ET.SubElement(e_entry, 'content', {'type': summary_type_text}).text = summary_text + atom = Xml.extract_atom(iq) + xml_atom = Xml.generate_atom_post(atom, pubsub, node, link) + iq = await XmppXep0060.get_node_items(xmpp, pubsub, node) + if not '/' in node: + if iq: + Utilities.generate_json(iq) + else: + operator = Utilities.get_configuration('settings')['operator'] + json_data = [{'title' : 'Error retrieving node items.', + 'link' : ('javascript:alert("Rivista has experienced an error ' + 'while attempting to retrieve the list of items for ' + 'Node {} of PubSub {}.")') + .format(node, pubsub)}, + {'title' : 'Contact the operator.', + 'link' : ('xmpp:{}?message;subject=Rivista;body=Greetings! ' + 'I am contacting you to inform you that there is an error listing ' + 'node items for Node {} on PubSub {}.').format(operator, node, pubsub)}] + filename = 'data/{}.json'.format(node) + with open(filename, 'w', encoding='utf-8') as f: + json.dump(json_data, f, ensure_ascii=False, indent=4) + else: + text = 'Please ensure that PubSub node "{}" and item "{}" are valid and accessible.'.format(node, item_id) + xml_atom = Xml.error_message(text) + result = Xml.append_stylesheet(xml_atom, 'atom') + + # try: + # iq = await XmppXep0060.get_node_items(xmpp, pubsub, node) + # Utilities.generate_json(iq, node) + # except: + # operator = Utilities.get_configuration('settings')['operator'] + # json_data = [{'title' : 'Timeout retrieving node items from {}'.format(node), + # 'link' : 'xmpp:{}?message'.format(operator)}] + # filename = 'data/{}.json'.format(node) + # with open(filename, 'w', encoding='utf-8') as f: + # json.dump(json_data, f, ensure_ascii=False, indent=4) + elif pubsub and node: + iq = await XmppXep0060.get_node_items(xmpp, pubsub, node) + if iq: + link = Utilities.form_a_node_link(pubsub, node) + if 'urn:xmpp:microblog:0:comments/' in node: + atom = Xml.extract_atom(iq) + xml_atom = Xml.generate_atom_comment(atom, pubsub, node, link) + else: + atom = Xml.extract_atom(iq) + xml_atom = Xml.generate_atom_post(atom, pubsub, node, link) + else: + text = 'Please ensure that PubSub node "{}" is valid and accessible.'.format(node) + xml_atom = Xml.error_message(text) + result = Xml.append_stylesheet(xml_atom, 'atom') + elif pubsub: + text = 'Node parameter is missing.' + xml_atom = Xml.error_message(text) + result = Xml.append_stylesheet(xml_atom, 'atom') + elif node: + text = 'PubSub parameter is missing.' + xml_atom = Xml.error_message(text) + result = Xml.append_stylesheet(xml_atom, 'atom') + # else: + # text = ('Mandatory parameter PubSub and ' + # 'optional parameter Node are missing.') + # xml_atom = Xml.error_message(text) + # result = Xml.append_stylesheet(xml_atom, 'atom') else: - ET.SubElement(e_entry, 'content').text = 'No content.' - published = item_payload.find(namespace + 'published') - published_text = None if published == None else published.text - ET.SubElement(e_entry, 'published').text = published_text - updated = item_payload.find(namespace + 'updated') - updated_text = None if updated == None else updated.text - ET.SubElement(e_entry, 'updated').text = updated_text - authors = item_payload.find(namespace + 'author') - if isinstance(authors, ET.Element): - for author in item_payload.findall(namespace + 'author'): - e_author = ET.SubElement(e_entry, 'author') - author_email = author.find(namespace + 'email') - if author_email is not None: - author_email_text = author_email.text - if author_email_text: ET.SubElement(e_author, 'email').text = author_email.text - else: - author_email_text = None - author_uri = author.find(namespace + 'uri') - if author_uri is not None: - author_uri_text = author_uri.text - if author_uri_text: ET.SubElement(e_author, 'uri').text = author_uri.text - else: - author_uri_text = None - author_name = author.find(namespace + 'name') - if author_name is not None and author_name.text: - author_name_text = author_name.text - else: - author_name_text = author_uri_text or author_email_text - ET.SubElement(e_author, 'name').text = author_name_text - for uri in item_payload.iter(namespace + 'author'): - uri_text = uri.text - if uri_text: - ET.SubElement(e_entry, 'uri').text = uri_text -# if not e_author: -# ET.SubElement(e_author, 'name').text = uri_text -# ET.SubElement(e_author, 'uri').text = uri_text - categories = item_payload.find(namespace + 'category') - if isinstance(categories, ET.Element): - for category in item_payload.findall(namespace + 'category'): - if 'term' in category.attrib and category.attrib['term']: - category_term = category.attrib['term'] - ET.SubElement(e_entry, 'category', {'term': category_term}) - - - identifier = item_payload.find(namespace + 'id') - if identifier and identifier.attrib: print(identifier.attrib) - - identifier_text = None if identifier == None else identifier.text - ET.SubElement(e_entry, 'id').text = identifier_text - # ET.SubElement(e_entry, 'summary', {'type': summary_type_text}).text = summary_text - return ET.tostring(e_feed, encoding='unicode') - -# generate_rfc_4287 -def generate_atom_comment(iq, link): - """Generate an Atom Syndication Format (RFC 4287) from a Publish-Subscribe (XEP-0060) node items.""" - pubsub = iq['from'].bare - node = iq['pubsub']['items']['node'] - title = node - link = link - # link = form_a_node_link(pubsub, node) - # subtitle = 'XMPP PubSub Syndication Feed' - subtitle = pubsub - description = ('This is a syndication feed generated with Rivista, an XMPP ' - 'Journal Publisher, which conveys XEP-0060: Publish-' - 'Subscribe nodes to standard RFC 4287: The Atom Syndication ' - 'Format.') - language = iq['pubsub']['items']['lang'] - items = iq['pubsub']['items'] - e_feed = ET.Element("feed") - e_feed.set('xmlns', 'http://www.w3.org/2005/Atom') - ET.SubElement(e_feed, 'title', {'type': 'text'}).text = title - ET.SubElement(e_feed, 'subtitle', {'type': 'text'}).text = subtitle - ET.SubElement(e_feed, 'link', {'rel': 'self', 'href': link}) - ET.SubElement(e_feed, 'generator', { - 'uri': 'https://git.xmpp-it.net/sch/Rivista', - 'version': '0.1'}).text = 'Rivista XJP' - ET.SubElement(e_feed, 'updated').text = datetime.datetime.now(datetime.UTC).isoformat() - for item in list(items)[::-1]: - item_id = item['id'] - item_payload = item['payload'] - namespace = '{http://www.w3.org/2005/Atom}' - authors = item_payload.find(namespace + 'author') - links = item_payload.find(namespace + 'link') - if (not isinstance(authors, ET.Element) and - not isinstance(links, ET.Element)): continue - e_entry = ET.SubElement(e_feed, 'entry') - author_text = None - for author in item_payload.findall(namespace + 'author'): - author_email = author.find(namespace + 'email') - if author_email is not None: - author_text = author_email.text - if not author_text: - author_uri = author.find(namespace + 'uri') - if author_uri is not None: - author_text = author_uri.text - if not author_text: - author_name = author.find(namespace + 'name') - if author_name is not None and author_name.text: - author_text = author_name.text - if not author_text: - for uri in item_payload.iter(namespace + 'author'): - author_text = uri.text - if author_text: - ET.SubElement(e_entry, 'title').text = author_text - break - if isinstance(links, ET.Element): - for link in item_payload.findall(namespace + 'link'): - link_href = link.attrib['href'] if 'href' in link.attrib else '' - link_type = link.attrib['type'] if 'type' in link.attrib else '' - link_rel = link.attrib['rel'] if 'rel' in link.attrib else '' - ET.SubElement(e_entry, 'link', {'href': link_href, 'rel': link_rel, 'type': link_type}) - else: - ET.SubElement(e_entry, 'content', {'href': ''}) - link_xmpp = form_an_item_link(pubsub, node, item_id) - ET.SubElement(e_entry, 'link', {'href': link_xmpp, 'rel': 'alternate', 'type': 'x-scheme-handler/xmpp'}) - contents = item_payload.find(namespace + 'content') - if isinstance(contents, ET.Element): - for content in item_payload.findall(namespace + 'content'): - if not content.text: continue - content_text = content.text - content_type = content.attrib['type'] if 'type' in content.attrib else 'html' - content_type_text = 'html' if 'html' in content_type else 'text' - ET.SubElement(e_entry, 'content', {'type': content_type_text}).text = content_text - else: - summary = item_payload.find(namespace + 'summary') - summary_text = summary.text if summary else None - if summary_text: - summary_type = summary.attrib['type'] if 'type' in summary.attrib else 'html' - summary_type_text = 'html' if 'html' in summary_type else 'text' - ET.SubElement(e_entry, 'content', {'type': summary_type_text}).text = summary_text + text = 'The given domain {} is not allowed.'.format(pubsub) + xml_atom = Xml.error_message(text) + result = Xml.append_stylesheet(xml_atom, 'atom') + if not result: + default = Utilities.get_configuration('default') + if default['pubsub'] and default['nodeid']: + if not pubsub and not node: + pubsub = default['pubsub'] + node = default['nodeid'] + iq = await XmppXep0060.get_node_items(xmpp, pubsub, node) + if iq: + link = Utilities.form_a_node_link(pubsub, node) + atom = Xml.extract_atom(iq) + xml_atom = Xml.generate_atom_post(atom, pubsub, node, link) + else: + text = 'Please ensure that PubSub node "{}" is valid and accessible.'.format(node) + xml_atom = Xml.error_message(text) + elif not settings['service']: + pubsub = default['pubsub'] + node = default['nodeid'] + iq = await XmppXep0060.get_node_items(xmpp, pubsub, node) + if iq: + link = Utilities.form_a_node_link(pubsub, node) + atom = Xml.extract_atom(iq) + xml_atom = Xml.generate_atom_post(atom, pubsub, node, link) + else: + text = 'Please ensure that PubSub node "{}" is valid and accessible.'.format(node) + xml_atom = Xml.error_message(text) + result = Xml.append_stylesheet(xml_atom, 'atom') else: - title = item_payload.find(namespace + 'title') - title_text = None if title == None else title.text - if title_text: - ET.SubElement(e_entry, 'content').text = title_text - else: - ET.SubElement(e_entry, 'content').text = 'No content.' - published = item_payload.find(namespace + 'published') - published_text = None if published == None else published.text - ET.SubElement(e_entry, 'published').text = published_text - updated = item_payload.find(namespace + 'updated') - updated_text = None if updated == None else updated.text - ET.SubElement(e_entry, 'updated').text = updated_text - authors = item_payload.find(namespace + 'author') - if isinstance(authors, ET.Element): - contact_text = None - for author in item_payload.findall(namespace + 'author'): - author_email = author.find(namespace + 'email') - if author_email is not None: - contact_text = author_email.text - if not contact_text: + text = 'Please contact the administrator and ask him to set default PubSub and Node ID.' + xml_atom = Xml.error_message(text) + result = Xml.append_stylesheet(xml_atom, 'atom') + xmpp.disconnect() + response = Response(content=result, media_type="application/xml") + return response + + +class XmppXep0060: + + async def get_node_item(self, pubsub, node, item_id): + try: + iq = await self.plugin['xep_0060'].get_item(pubsub, node, item_id, timeout=5) + return iq + except (IqError, IqTimeout) as e: + print(e) + + async def get_node_items(self, pubsub, node): + try: + iq = await self.plugin['xep_0060'].get_items(pubsub, node, timeout=5) + return iq + except (IqError, IqTimeout) as e: + print(e) + + async def get_nodes(self, pubsub): + try: + iq = await self.plugin['xep_0060'].get_nodes(pubsub, timeout=5) + return iq + except (IqError, IqTimeout) as e: + print(e) + + +class Xml: + + def error_message(text): + """Error message in RFC 4287: The Atom Syndication Format.""" + title = 'Rivista' + subtitle = 'XMPP Journal Publisher' + description = ('This is a syndication feed generated with Rivista, an XMPP ' + 'Journal Publisher, which conveys XEP-0060: Publish-' + 'Subscribe nodes to standard RFC 4287: The Atom Syndication ' + 'Format.') + language = 'en' + feed = ET.Element("feed") + feed.set('xmlns', 'http://www.w3.org/2005/Atom') + ET.SubElement(feed, 'title', {'type': 'text'}).text = title + ET.SubElement(feed, 'subtitle', {'type': 'text'}).text = subtitle + ET.SubElement(feed, 'author', {'name':'Rivista','email':'rivista@schimon.i2p'}) + ET.SubElement(feed, 'generator', { + 'uri': 'https://git.xmpp-it.net/sch/Rivista', + 'version': '0.1'}).text = 'Rivista XJP' + ET.SubElement(feed, 'updated').text = datetime.datetime.now(datetime.UTC).isoformat() + entry = ET.SubElement(feed, 'entry') + ET.SubElement(entry, 'title').text = 'Error' + ET.SubElement(entry, 'id').text = 'rivista-error' + ET.SubElement(entry, 'updated').text = datetime.datetime.now(datetime.UTC).isoformat() + ET.SubElement(entry, 'published').text = datetime.datetime.now(datetime.UTC).isoformat() + # ET.SubElement(entry, 'summary', {'type': summary_type_text}).text = summary_text + ET.SubElement(entry, 'content', {'type': 'text'}).text = text + return ET.tostring(feed, encoding='unicode') + + """Patch function to append XSLT reference to XML""" + """Why is not this a built-in function of ElementTree or LXML""" + def append_stylesheet(xml_data, type): + # Register namespace in order to avoide ns0: + if type == 'atom': ET.register_namespace('', 'http://www.w3.org/2005/Atom') + # Load XML from string + tree = ET.fromstring(xml_data) + # The following direction removes the XML declaration + xml_data_without_a_declaration = ET.tostring(tree, encoding='unicode') + # Add XML declaration and stylesheet + xml_data_declaration = ( + '' + ''.format(type) + + xml_data_without_a_declaration) + return xml_data_declaration + + def generate_opml(iq): + pubsub = iq['from'].bare + items = iq['disco_items']['items'] + opml = ET.Element("opml") + opml.set("version", "1.0") + head = ET.SubElement(opml, "head") + ET.SubElement(head, "title").text = 'An OPML of ' + pubsub + ET.SubElement(head, "description").text = ( + "PubSub Nodes of {}").format(pubsub) + ET.SubElement(head, "generator").text = 'Rivista' + ET.SubElement(head, "urlPublic").text = 'https://git.xmpp-it.net/sch/Rivista' + time_stamp = datetime.datetime.now(datetime.UTC).isoformat() + ET.SubElement(head, "dateCreated").text = time_stamp + ET.SubElement(head, "dateModified").text = time_stamp + body = ET.SubElement(opml, "body") + for item in items: + pubsub, node, title = item + uri = Utilities.form_a_node_link(pubsub, node) + outline = ET.SubElement(body, "outline") + outline.set("text", title or node) + outline.set("xmlUrl", uri) + return ET.tostring(opml, encoding='unicode') + + def extract_atom(iq: Iq): + """Extract data from an Atom Syndication Format (RFC 4287) of a Publish-Subscribe (XEP-0060) node item.""" + jid = iq['from'].bare + node = iq['pubsub']['items']['node'] + atom = {} + atom['title'] = jid + atom['subtitle'] = node + atom['language'] = iq['pubsub']['items']['lang'] + atom['items'] = [] + items = iq['pubsub']['items'] + for item in list(items)[::-1]: + atom_item = {} + item_payload = item['payload'] + namespace = '{http://www.w3.org/2005/Atom}' + title = item_payload.find(namespace + 'title') + links = item_payload.find(namespace + 'link') + if (not isinstance(title, ET.Element) and + not isinstance(links, ET.Element)): continue + title_text = 'No title' if title == None else title.text + atom_item['title'] = title_text + if isinstance(links, ET.Element): + atom_item['links'] = [] + for link in item_payload.findall(namespace + 'link'): + link_href = link.attrib['href'] if 'href' in link.attrib else '' + link_type = link.attrib['type'] if 'type' in link.attrib else '' + link_rel = link.attrib['rel'] if 'rel' in link.attrib else '' + atom_item['links'].append({'href': link_href, + 'rel': link_rel, + 'type': link_type}) + contents = item_payload.find(namespace + 'content') + atom_item['contents'] = [] + if isinstance(contents, ET.Element): + for content in item_payload.findall(namespace + 'content'): + if not content.text: continue + content_text = content.text + content_type = content.attrib['type'] if 'type' in content.attrib else 'html' + content_type_text = 'html' if 'html' in content_type else 'text' + atom_item['contents'].append({'text' : content_text, + 'type' : content_type_text}) + else: + summary = item_payload.find(namespace + 'summary') + summary_text = summary.text if summary else None + if summary_text: + summary_type = summary.attrib['type'] if 'type' in summary.attrib else 'html' + summary_type_text = 'html' if 'html' in summary_type else 'text' + atom_item['contents'].append(summary_text) + # else: + # atom_item['contents'].append('No content.') + published = item_payload.find(namespace + 'published') + published_text = '' if published == None else published.text + atom_item['published'] = published_text + updated = item_payload.find(namespace + 'updated') + updated_text = '' if updated == None else updated.text + atom_item['updated'] = updated_text + atom_item['authors'] = [] + authors = item_payload.find(namespace + 'author') + if isinstance(authors, ET.Element): + for author in item_payload.findall(namespace + 'author'): + atom_item_author = {} + author_email = author.find(namespace + 'email') + if author_email is not None: + author_email_text = author_email.text + if author_email_text: + atom_item_author['email'] = author_email_text + else: + author_email_text = None author_uri = author.find(namespace + 'uri') if author_uri is not None: - contact_text = author_uri.text - if not contact_text: - for uri in item_payload.iter(namespace + 'author'): - contact_text = uri.text - if contact_text: - if contact_text.startswith('xmpp:'): - contact_type = 'x-scheme-handler/xmpp' - elif contact_text.startswith('mailto:'): - contact_type = 'x-scheme-handler/mailto' + author_uri_text = author_uri.text + if author_uri_text: + atom_item_author['uri'] = author_uri_text else: - contact_type = None - if contact_type: - ET.SubElement(e_entry, 'link', {'href': contact_text, 'rel': 'contact', 'type': contact_type}) + author_uri_text = None + author_name = author.find(namespace + 'name') + if author_name is not None and author_name.text: + author_name_text = author_name.text else: - ET.SubElement(e_entry, 'link', {'href': contact_text, 'rel': 'contact'}) - break - categories = item_payload.find(namespace + 'category') - if isinstance(categories, ET.Element): - for category in item_payload.findall(namespace + 'category'): - if 'term' in category.attrib and category.attrib['term']: - category_term = category.attrib['term'] - ET.SubElement(e_entry, 'category', {'term': category_term}) + author_name_text = author_uri_text or author_email_text + atom_item_author['name'] = author_name_text + atom_item['authors'].append(atom_item_author) + categories = item_payload.find(namespace + 'category') + atom_item['categories'] = [] + if isinstance(categories, ET.Element): + for category in item_payload.findall(namespace + 'category'): + if 'term' in category.attrib and category.attrib['term']: + category_term = category.attrib['term'] + atom_item['categories'].append(category_term) + identifier = item_payload.find(namespace + 'id') + if identifier is not None and identifier.attrib: print(identifier.attrib) + identifier_text = item['id'] if identifier == None else identifier.text + atom_item['id'] = identifier_text + #atom_item['id'] = item['id'] + atom['items'].append(atom_item) + return atom + + def generate_xhtml(atom: dict): + """Generate an XHTML document.""" + e_html = ET.Element('html') + e_html.set('xmlns', 'http://www.w3.org/1999/xhtml') + e_head = ET.SubElement(e_html, 'head') + ET.SubElement(e_head, 'title').text = atom['title'] + ET.SubElement(e_head, 'link', {'rel': 'stylesheet', + 'href': 'pubsub.css'}) + e_body = ET.SubElement(e_html, "body") + ET.SubElement(e_body, "h1").text = atom['title'] + ET.SubElement(e_body, "h2").text = atom['subtitle'] + for item in atom['items']: + item_id = item['id'] + title = item['title'] + links = item['links'] + e_article = ET.SubElement(e_body, 'article') + e_title = ET.SubElement(e_article, 'h3') + e_title.text = item['title'] + e_title.set('id', item['id']) + e_date = ET.SubElement(e_article, 'h4') + e_date.text = item['published'] + e_date.set('title', 'Updated: ' + item['updated']) + authors = item['authors'] + if authors: + e_authors = ET.SubElement(e_article, "dl") + ET.SubElement(e_authors, "dt").text = 'Authors' + for author in authors: + e_dd = ET.SubElement(e_authors, 'dd') + e_author = ET.SubElement(e_dd, 'a') + e_author.text = author['name'] or author['uri'] or author['email'] + if 'email' in author and author['email']: + e_author.set('href', 'mailto:' + author['email']) + elif 'uri' in author and author['uri']: + e_author.set('href', author['uri']) + for content in item['contents']: + ET.SubElement(e_article, 'p', {'type': content['type']}).text = content['text'] + if links: + e_links = ET.SubElement(e_article, "dl") + e_links.set('class', 'links') + ET.SubElement(e_links, "dt").text = 'Links' + for link in links: + e_dd = ET.SubElement(e_links, 'dd') + e_link = ET.SubElement(e_dd, 'a') + e_link.set('href', link['href']) + e_link.text = link['rel'] + if link['type']: ET.SubElement(e_dd, 'span').text = link['type'] + categories = item['categories'] + if categories: + e_categories = ET.SubElement(e_article, "dl") + e_categories.set('class', 'categories') + ET.SubElement(e_categories, "dt").text = 'Categories' + for category in categories: + ET.SubElement(e_categories, 'dd').text = category + return ET.tostring(e_html, encoding='unicode') + + # generate_rfc_4287 + def generate_atom_post(atom: dict, pubsub: str, node: str, link: str): + """Generate an Atom Syndication Format (RFC 4287) from a Publish-Subscribe (XEP-0060) node items.""" + # link = Utilities.form_a_node_link(pubsub, node) + # subtitle = 'XMPP PubSub Syndication Feed' + description = ('This is a syndication feed generated with Rivista, an XMPP ' + 'Journal Publisher, which conveys XEP-0060: Publish-' + 'Subscribe nodes to standard RFC 4287: The Atom Syndication ' + 'Format.') + e_feed = ET.Element("feed") + e_feed.set('xmlns', 'http://www.w3.org/2005/Atom') + ET.SubElement(e_feed, 'title', {'type': 'text'}).text = atom['title'] + ET.SubElement(e_feed, 'subtitle', {'type': 'text'}).text = atom['subtitle'] + ET.SubElement(e_feed, 'link', {'rel': 'self', 'href': link}) + ET.SubElement(e_feed, 'generator', { + 'uri': 'https://git.xmpp-it.net/sch/Rivista', + 'version': '0.1'}).text = 'Rivista XJP' + ET.SubElement(e_feed, 'updated').text = datetime.datetime.now(datetime.UTC).isoformat() + for item in atom['items']: + e_entry = ET.SubElement(e_feed, 'entry') + ET.SubElement(e_entry, 'title').text = item['title'] + links = item['links'] + if links: + for link in links: + ET.SubElement(e_entry, 'link', {'href': link['href'], + 'rel': link['rel'], + 'type': link['type']}) + else: + # NOTE What does this instruction line do? + ET.SubElement(e_entry, 'content', {'href': ''}) + link_xmpp = Utilities.form_an_item_link(pubsub, node, item['id']) + ET.SubElement(e_entry, 'link', {'href': link_xmpp, + 'rel': 'alternate', + 'type': 'x-scheme-handler/xmpp'}) + contents = item['contents'] + if contents: + for content in contents: + ET.SubElement(e_entry, 'content', {'type': content['type']}).text = content['text'] + else: + ET.SubElement(e_entry, 'content').text = 'No content.' + ET.SubElement(e_entry, 'published').text = item['published'] + ET.SubElement(e_entry, 'updated').text = item['updated'] + authors = item['authors'] + if authors: + for author in authors: + e_author = ET.SubElement(e_entry, 'author') + if 'email' in author and author['email']: + ET.SubElement(e_author, 'email').text = author['email'] + if 'uri' in author and author['uri']: + ET.SubElement(e_entry, 'uri').text = author['uri'] + ET.SubElement(e_author, 'uri').text = author['uri'] + ET.SubElement(e_author, 'name').text = author['name'] or author['uri'] or author['email'] + categories = item['categories'] + if categories: + for category in categories: + ET.SubElement(e_entry, 'category', {'term': category}) + + ET.SubElement(e_entry, 'id').text = item['id'] + return ET.tostring(e_feed, encoding='unicode') + + # generate_rfc_4287 + def generate_atom_comment(atom: dict, pubsub: str, node: str, link: str): + """Generate an Atom Syndication Format (RFC 4287) from a Publish-Subscribe (XEP-0060) node items.""" + # link = Utilities.form_a_node_link(pubsub, node) + # subtitle = 'XMPP PubSub Syndication Feed' + description = ('This is a syndication feed generated with Rivista, an XMPP ' + 'Journal Publisher, which conveys XEP-0060: Publish-' + 'Subscribe nodes to standard RFC 4287: The Atom Syndication ' + 'Format.') + e_feed = ET.Element("feed") + e_feed.set('xmlns', 'http://www.w3.org/2005/Atom') + ET.SubElement(e_feed, 'title', {'type': 'text'}).text = atom['title'] + ET.SubElement(e_feed, 'subtitle', {'type': 'text'}).text = atom['subtitle'] + ET.SubElement(e_feed, 'link', {'rel': 'self', 'href': link}) + ET.SubElement(e_feed, 'generator', { + 'uri': 'https://git.xmpp-it.net/sch/Rivista', + 'version': '0.1'}).text = 'Rivista XJP' + ET.SubElement(e_feed, 'updated').text = datetime.datetime.now(datetime.UTC).isoformat() + for item in atom['items']: + e_entry = ET.SubElement(e_feed, 'entry') + ET.SubElement(e_entry, 'title').text = item['authors'][0]['name'] + links = item['links'] + if links: + for link in links: + ET.SubElement(e_entry, 'link', {'href': link['href'], + 'rel': link['rel'], + 'type': link['type']}) + else: + # NOTE What does this instruction line do? + ET.SubElement(e_entry, 'content', {'href': ''}) + link_xmpp = Utilities.form_an_item_link(pubsub, node, item['id']) + ET.SubElement(e_entry, 'link', {'href': link_xmpp, + 'rel': 'alternate', + 'type': 'x-scheme-handler/xmpp'}) + contents = item['contents'] + if contents: + for content in contents: + ET.SubElement(e_entry, 'content', {'type': content['type']}).text = content['text'] + else: + ET.SubElement(e_entry, 'content').text = 'No content.' + ET.SubElement(e_entry, 'published').text = item['published'] + ET.SubElement(e_entry, 'updated').text = item['updated'] + authors = item['authors'] + if authors: + for author in authors: + e_author = ET.SubElement(e_entry, 'author') + if 'email' in author and author['email']: + ET.SubElement(e_author, 'email').text = author['email'] + if 'uri' in author and author['uri']: + ET.SubElement(e_entry, 'uri').text = author['uri'] + ET.SubElement(e_author, 'uri').text = author['uri'] + ET.SubElement(e_author, 'name').text = author['name'] or author['uri'] or author['email'] + categories = item['categories'] + if categories: + for category in categories: + ET.SubElement(e_entry, 'category', {'term': category}) + + ET.SubElement(e_entry, 'id').text = item['id'] + return ET.tostring(e_feed, encoding='unicode') - identifier = item_payload.find(namespace + 'id') - if identifier and identifier.attrib: print(identifier.attrib) +class Utilities: - identifier_text = None if identifier == None else identifier.text - ET.SubElement(e_entry, 'id').text = identifier_text - # ET.SubElement(e_entry, 'summary', {'type': summary_type_text}).text = summary_text - return ET.tostring(e_feed, encoding='unicode') + def get_configuration(section): + with open('configuration.toml', mode="rb") as configuration: + result = tomllib.load(configuration)[section] + return result -def generate_json(iq): - """Create a JSON file from node items.""" - json_data = [] - pubsub = iq['from'].bare - node = iq['pubsub']['items']['node'] - entries = iq['pubsub']['items'] - for entry in entries: - item_id = entry['id'] - item_payload = entry['payload'] - namespace = '{http://www.w3.org/2005/Atom}' - title = item_payload.find(namespace + 'title') - title_text = '*** No Title ***' if title == None else title.text - # updated = item.find(namespace + 'updated') - # updated = None if updated == None else updated.text - # if updated: updated = datetime.datetime.fromisoformat(updated) - link_href = form_an_item_link(pubsub, node, item_id) - # link = item.find(namespace + 'link') - # link_href = '' if link == None else link.attrib['href'] - json_data_entry = {'title' : title_text, - 'link' : link_href} - json_data.append(json_data_entry) - #if len(json_data) > 6: break - directory = 'data/{}/'.format(pubsub) - if not exists(directory): - mkdir(directory) - filename = 'data/{}/{}.json'.format(pubsub, node) + def form_a_node_link(pubsub, node): + link = 'xmpp:{pubsub}?;node={node}'.format(pubsub=pubsub, node=node) + return link - with open(filename, 'w', encoding='utf-8') as f: - json.dump(json_data, f, ensure_ascii=False, indent=4) + def form_an_item_link(pubsub, node, item_id): + link = 'xmpp:{pubsub}?;node={node};item={item}'.format( + pubsub=pubsub, node=node, item=item_id) + return link -"""Patch function to append XSLT reference to XML""" -"""Why is not this a built-in function of ElementTree or LXML""" -def append_stylesheet(xml_data, type): - # Register namespace in order to avoide ns0: - if type == 'atom': ET.register_namespace('', 'http://www.w3.org/2005/Atom') - # Load XML from string - tree = ET.fromstring(xml_data) - # The following direction removes the XML declaration - xml_data_without_a_declaration = ET.tostring(tree, encoding='unicode') - # Add XML declaration and stylesheet - xml_data_declaration = ( - '' - ''.format(type) + - xml_data_without_a_declaration) - return xml_data_declaration + def generate_json(iq): + """Create a JSON file from node items.""" + json_data = [] + pubsub = iq['from'].bare + node = iq['pubsub']['items']['node'] + entries = iq['pubsub']['items'] + for entry in entries: + item_id = entry['id'] + item_payload = entry['payload'] + namespace = '{http://www.w3.org/2005/Atom}' + title = item_payload.find(namespace + 'title') + title_text = '*** No Title ***' if title == None else title.text + # updated = item.find(namespace + 'updated') + # updated = None if updated == None else updated.text + # if updated: updated = datetime.datetime.fromisoformat(updated) + link_href = Utilities.form_an_item_link(pubsub, node, item_id) + # link = item.find(namespace + 'link') + # link_href = '' if link == None else link.attrib['href'] + json_data_entry = {'title' : title_text, + 'link' : link_href} + json_data.append(json_data_entry) + #if len(json_data) > 6: break + directory = 'data/{}/'.format(pubsub) + if not exists(directory): + mkdir(directory) + filename = 'data/{}/{}.json'.format(pubsub, node) + + with open(filename, 'w', encoding='utf-8') as f: + json.dump(json_data, f, ensure_ascii=False, indent=4) -def generate_opml(iq): - pubsub = iq['from'].bare - items = iq['disco_items']['items'] - opml = ET.Element("opml") - opml.set("version", "1.0") - head = ET.SubElement(opml, "head") - ET.SubElement(head, "title").text = 'An OPML of ' + pubsub - ET.SubElement(head, "description").text = ( - "PubSub Nodes of {}").format(pubsub) - ET.SubElement(head, "generator").text = 'Rivista' - ET.SubElement(head, "urlPublic").text = 'https://git.xmpp-it.net/sch/Rivista' - time_stamp = datetime.datetime.now(datetime.UTC).isoformat() - ET.SubElement(head, "dateCreated").text = time_stamp - ET.SubElement(head, "dateModified").text = time_stamp - body = ET.SubElement(opml, "body") - for item in items: - pubsub, node, title = item - uri = form_a_node_link(pubsub, node) - outline = ET.SubElement(body, "outline") - outline.set("text", title or node) - outline.set("xmlUrl", uri) - return ET.tostring(opml, encoding='unicode') +def main(): + http_instance = HttpInstance() + return http_instance.app + +app = main()