#!/usr/bin/env python3 # -*- coding: utf-8 -*- from asyncio import TimeoutError from datetime import datetime from dateutil import parser from email.utils import parseaddr from fastapi import FastAPI, Form, HTTPException, Request, Response from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates import glob #import logging #from os.path import getsize, exists import os import qrcode import random import re from slixmpp import ClientXMPP, stanza from slixmpp.exceptions import IqError, IqTimeout, PresenceError from starlette.responses import RedirectResponse #import time import tomli_w from urllib.parse import urlsplit import uvicorn import xml.etree.ElementTree as ET try: import cv2 except: print('OpenCV (cv2) is required for dynamic background.') try: import numpy except: print('NumPy (numpy) is required for dynamic background.') try: import tomllib except: import tomli as tomllib class XmppInstance(ClientXMPP): def __init__(self, jid, password, jid_bare): super().__init__(jid, password) self.jid_bare = jid_bare self.register_plugin('xep_0030') # XEP-0030: Service Discovery self.register_plugin('xep_0045') # XEP-0045: Multi-User Chat self.register_plugin('xep_0054') # XEP-0054: vcard-temp self.register_plugin('xep_0060') # XEP-0060: Publish-Subscribe self.add_event_handler("session_start", self.on_session_start) async def on_session_start(self, event): self.send_presence() #self.disconnect() class HttpInstance: def __init__(self, configuration): account = configuration['account'] jabber_id = account['xmpp'] password = account['pass'] alias = account['alias'] brand = configuration['brand'] brand_name = brand['name'] brand_site = brand['site'] chat_client = brand['chat'] news_client = brand['news'] self.app = FastAPI() templates = Jinja2Templates(directory='xhtml') # TODO # 1) Mount at the same mountpoint /img. # 2) Image filename to be constant, i.e. /img/photo.png and /img/qr.png. self.app.mount('/photo', StaticFiles(directory='photo'), name='photo') self.app.mount('/qr', StaticFiles(directory='qr'), name='qr') self.app.mount('/css', StaticFiles(directory='css'), name='css') self.app.mount('/img', StaticFiles(directory='img'), name='img') # @self.app.get('/favicon.ico', include_in_schema=False) # def favicon_get(): # return FileResponse('graphic/hermes.ico') # @self.app.get('/hermes.svg') # def logo_get(): # return FileResponse('graphic/hermes.svg') @self.app.get('/v/{jid}') async def view_jid(request: Request, jid): """View recent messages of a conference""" jid_path = urlsplit(jid).path if parseaddr(jid_path)[1] == jid_path: jid_bare = jid_path.lower() else: jid_bare = jid note = 'Jabber ID appears to be malformed' if jid_bare == jabber_id: raise HTTPException(status_code=403, detail='access-denied') #try: if True: exception = jid_vcard = messages_10 = note = node_title = \ node_note = number_of_pages = page_number = previous = \ selection = services_sorted = subject = None link_href = 'xmpp:{}?join'.format(jid_bare) link_text = 'Join' xmpp_uri = '{}'.format(jid_bare) filename = 'details/{}.toml'.format(jid_bare) if os.path.exists(filename) and os.path.getsize(filename) > 0: jid_details = Data.open_file_toml(filename) else: jid_details = await FileUtilities.cache_jid_data( jabber_id, password, jid_bare, alias=alias) count = jid_details['count'] items = jid_details['items'] jid_info = { 'error' : jid_details['error'], 'text' : jid_details['error_text'], 'condition' : jid_details['error_condition']} jid_kind = jid_details['kind'] jid_vcard = { 'name' : jid_details['name'], 'note' : jid_details['note'], 'type' : jid_details['image_type']} messages = jid_details['messages'] nodes = jid_details['nodes'] note = jid_details['note'] subject = jid_details['subject'] title = jid_details['name'] # Group chat messages # NOTE TODO page_number = request.query_params.get('page', '') if page_number: try: page_number = int(page_number) ix = (page_number -1) * 10 except: ix = 0 page_number = 1 else: ix = 0 page_number = 1 messages_10 = messages[ix:][:10] number_of_pages = int(len(messages) / 10) if number_of_pages < len(messages) / 10: number_of_pages += 1 if jid_kind: # Action and instance type action, instance = XmppUtilities.set_action_instance_type(jid_kind) else: # jid_info['error'] action = 'Contact' instance = view_href = '' message = '{}: {} (XEP-0030)'.format(jid_info['text'], jid_info['condition']) xmpp_uri = jid_bare # Query URI links print('Query URI links') links = XmppUtilities.get_query_uri_links(jid_bare, jid_kind) link_href = XmppUtilities.get_link_href(jid_bare, jid_kind) view_href = XmppUtilities.get_view_href(jid_bare, jid_kind) xmpp_uri = XmppUtilities.get_xmpp_uri(jid_bare, jid_kind) # Graphic files filename, filepath, filetype, selection = FileUtilities.handle_photo(jid_bare, jid_vcard, link_href) #except Exception as e: else: exception = str(e) action = 'Error' title = 'Slixmpp error' xmpp_uri = note = jid filename = jid_bare = link_href = link_tex = node_note = \ node_title = number_of_pages = page_number = previous = \ selection = services = services_sorted = url = None #if title == 'remote-server-timeout': # raise HTTPException(status_code=408, detail='remote-server-timeout') #else: template_file = 'conference.xhtml' template_dict = { 'action' : action, 'brand_name' : brand_name, 'brand_site' : brand_site, 'chat_client' : chat_client, 'exception' : exception, 'filename' : filename, 'jid_bare' : jid, 'jid_note' : note, 'jid_title' : title, 'links' : links, 'messages' : messages_10, 'node_title' : node_title, 'node_note' : node_note, 'number_of_pages' : number_of_pages, 'page_number' : page_number, 'previous' : previous, 'request' : request, 'subject' : subject, 'title' : title, 'url' : request.url._url, 'xmpp_uri' : xmpp_uri} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/c/{jid}') async def c_jid_get(request: Request, jid): """Display entries of a vCard4""" jid_path = urlsplit(jid).path if parseaddr(jid_path)[1] == jid_path: jid_bare = jid_path.lower() else: jid_bare = jid note = 'Jabber ID appears to be malformed' if jid_bare == jabber_id: raise HTTPException(status_code=403, detail='access-denied') node_name_vcard4 = 'urn:xmpp:vcard4' item_id_vcard4 = 'current' #try: if True: entries = [] exception = jid_vcard = note = node_items = node_note = \ number_of_pages = page_number = previous = selection = \ title = None directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name_vcard4) filename = directory + item_id_vcard4 + '.xml' if os.path.exists(filename) and os.path.getsize(filename) > 0: xml_data = Data.open_file_xml(filename) else: await FileUtilities.cache_vcard_data( jabber_id, password, jid_bare, node_name_vcard4, item_id_vcard4) xml_data = Data.open_file_xml(filename) root_element = xml_data.getroot() child_element = root_element[0] #vcard_info = Syndication.extract_vcard_items(child_element) vcard_info = Syndication.extract_vcard4_items(child_element) # Action and instance type action = 'Profile' filename = 'details/{}.toml'.format(jid_bare) if os.path.exists(filename) and os.path.getsize(filename) > 0: jid_details = Data.open_file_toml(filename) else: jid_details = await FileUtilities.cache_jid_data( jabber_id, password, jid_bare, alias=alias) # Set node name to 'urn:xmpp:microblog:0' jid_kind = jid_details['kind'] nodes = jid_details['nodes'] if (jid_kind not in ('conference', 'mix', 'muc') and '@' in jid_bare and 'urn:xmpp:microblog:0' in nodes): node_name = 'urn:xmpp:microblog:0' # Query URI links print('Query URI links') jid_kind = 'account' link_href = XmppUtilities.get_link_href(jid_bare, jid_kind) xmpp_uri = XmppUtilities.get_xmpp_uri(jid_bare, jid_kind, node_name_vcard4) links = XmppUtilities.get_query_uri_links(jid_bare, jid_kind, node_name) # Graphic files filename, filepath, filetype, selection = FileUtilities.handle_photo(jid_bare, jid_vcard, link_href) #except Exception as e: else: exception = str(e) action = 'Error' title = 'Slixmpp error' xmpp_uri = note = jid filename = jid_bare = link_href = link_tex = node_note = \ node_title = number_of_pages = page_number = previous = \ selection = url = None if 'fn' in vcard_info and vcard_info['fn']: title = vcard_info['fn'] elif 'alias' in vcard_info and vcard_info['alias']: title = vcard_info['alias'] else: title = jid_bare.split('@')[0] if 'alias' in vcard_info and vcard_info['alias']: alias = vcard_info['alias'] else: alias = jid_bare.split('@')[0] #if title == 'remote-server-timeout': # raise HTTPException(status_code=408, detail='remote-server-timeout') #else: template_file = 'vcard.xhtml' template_dict = { 'action' : action, 'alias' : alias, 'brand_name' : brand_name, 'brand_site' : brand_site, 'chat_client' : chat_client, 'entries' : entries, 'exception' : exception, 'filename' : filename, 'jid_bare' : jid, 'jid_note' : note, #'jid_title' : title, #'node_title' : node_title, 'links' : links, 'node_name' : node_name_vcard4, 'number_of_pages' : number_of_pages, 'page_number' : page_number, 'previous' : previous, 'request' : request, 'title' : title, 'url' : request.url._url, 'vcard_info' : vcard_info, 'xmpp_uri' : xmpp_uri} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/b/{jid}') async def b_jid_get(request: Request, jid): response = await browse_jid_node_get(request, jid, 'urn:xmpp:microblog:0') return response # TODO Change to /p/ for pubsub @self.app.get('/d/{jid}/{node_name}') @self.app.get('/d/{jid}/{node_name}/{item_id}') async def d_jid_node_get(request: Request, jid, node_name, item_id=None): response = await browse_jid_node_get(request, jid, node_name, item_id=None) return response async def browse_jid_node_get(request: Request, jid, node_name, item_id=None): """Browse items of a pubsub node""" jid_path = urlsplit(jid).path if parseaddr(jid_path)[1] == jid_path: jid_bare = jid_path.lower() else: jid_bare = jid note = 'Jabber ID appears to be malformed' if jid_bare == jabber_id: raise HTTPException(status_code=403, detail='access-denied') #try: if True: exception = jid_vcard = note = node_items = node_note = \ number_of_pages = page_number = previous = selection = None filename = 'details/{}.toml'.format(jid_bare) if os.path.exists(filename) and os.path.getsize(filename) > 0: jid_details = Data.open_file_toml(filename) else: jid_details = await FileUtilities.cache_jid_data( jabber_id, password, jid_bare, node_name, item_id) # Node item IDs nodes = jid_details['nodes'] #items = jid_details['items'] # for item in items: # if item[1] == node_name: # nodes[node_name]['title'] = item[2] # break supdirectory = 'xep_0060/{}/'.format(jid_bare) if not os.path.exists(supdirectory): os.mkdir(supdirectory) directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name) if not os.path.exists(directory): os.mkdir(directory) await FileUtilities.cache_node_data( jabber_id, password, jid_bare, node_name) count = jid_details['count'] jid_info = { 'error' : jid_details['error'], 'text' : jid_details['error_text'], 'condition' : jid_details['error_condition']} jid_kind = jid_details['kind'] jid_vcard = { 'name' : jid_details['name'], 'note' : jid_details['note'], 'type' : jid_details['image_type']} messages = jid_details['messages'] #node_title = nodes[node_name]['title'] if 'title' in nodes[node_name] else jid_details['name'] node_title = node_name note = jid_details['note'] #title = nodes[node_name]['title'] if node_name else jid_details['name'] title = jid_details['name'] #link_href = 'xmpp:{}?pubsub;node={};action=subscribe'.format( # jid_bare, node_name) #link_text = 'Subscribe' #xmpp_uri = '{}?;node={}'.format(jid_bare, node_name) # Node items entries = [] node_items = os.listdir(directory) if 'urn:xmpp:avatar:metadata.xml' in node_items: node_items.remove('urn:xmpp:avatar:metadata.xml') page_number = request.query_params.get('page', '') if page_number: try: page_number = int(page_number) ix = (page_number -1) * 10 except: ix = 0 page_number = 1 else: ix = 0 page_number = 1 item_ids_10 = node_items[ix:][:10] number_of_pages = int(len(node_items) / 10) if number_of_pages < len(node_items) / 10: number_of_pages += 1 if node_items: for item in item_ids_10: filename = directory + item xml_data = Data.open_file_xml(filename) root_element = xml_data.getroot() child_element = root_element[0] entry = Syndication.extract_atom_items(child_element) if entry: filename_without_file_extension = item[:len(item)-4] entry['id'] = filename_without_file_extension entries.append(entry) #if len(entries) > 10: break if jid_kind: # Action and instance type action, instance = XmppUtilities.set_action_instance_type(jid_kind, node_name) else: # jid_info['error'] action = 'Contact' instance = view_href = '' message = '{}: {} (XEP-0030)'.format(jid_info['text'], jid_info['condition']) xmpp_uri = jid_bare # Query URI links print('Query URI links') links = XmppUtilities.get_query_uri_links(jid_bare, jid_kind, node_name, item_id) link_href = XmppUtilities.get_link_href(jid_bare, jid_kind, node_name) view_href = XmppUtilities.get_view_href(jid_bare, jid_kind, node_name) xmpp_uri = XmppUtilities.get_xmpp_uri(jid_bare, jid_kind, node_name) node_note = xmpp_uri # Graphic files filename, filepath, filetype, selection = FileUtilities.handle_photo(jid_bare, jid_vcard, link_href) #except Exception as e: else: exception = str(e) action = 'Error' title = 'Slixmpp error' xmpp_uri = note = jid filename = jid_bare = link_href = link_tex = node_note = \ node_title = number_of_pages = page_number = previous = \ selection = url = None #if title == 'remote-server-timeout': # raise HTTPException(status_code=408, detail='remote-server-timeout') #else: template_file = 'node.xhtml' template_dict = { 'action' : action, 'brand_name' : brand_name, 'brand_site' : brand_site, 'chat_client' : chat_client, 'entries' : entries, 'exception' : exception, 'filename' : filename, 'jid_bare' : jid, 'jid_note' : note, 'jid_title' : title, 'links' : links, 'node_title' : node_title, 'node_note' : node_note, 'node_name' : node_name, 'number_of_pages' : number_of_pages, 'page_number' : page_number, 'previous' : previous, 'request' : request, 'title' : node_title, 'url' : request.url._url, 'xmpp_uri' : xmpp_uri} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/d/{jid}') async def discover_jid_get(request: Request, jid): """View items of a selected service""" jid_path = urlsplit(jid).path if parseaddr(jid_path)[1] == jid_path: jid_bare = jid_path.lower() else: jid_bare = jid note = 'Jabber ID appears to be malformed' if jid_bare == jabber_id: raise HTTPException(status_code=403, detail='access-denied') #try: if True: exception = note = selection = services_sorted = None title = 'Services' link_href = xmpp_uri = jid_bare link_text = 'Reload' # Start an XMPP instance and retrieve information xmpp_instance = XmppInstance(jabber_id, password, jid_bare) xmpp_instance.connect() # JID services action = 'Discover' jid_info = await XmppXep0030.get_jid_info(xmpp_instance, jid_bare) iq = jid_info['iq'] if iq: jid_kind = jid_info['kind'] iq_disco_info = iq['disco_info'] for identity in iq_disco_info['identities']: if jid_kind == identity[0] and identity[3]: note = identity[3] if not note: note = jid_bare jid_items = await XmppXep0030.get_jid_items(xmpp_instance, jid_bare) iq = jid_items['iq'] iq_disco_items = iq['disco_items'] iq_disco_items_items = iq_disco_items['items'] services = {} #services_sorted = {} category = 'unsorted' for item in iq_disco_items_items: jid_bare = item[0] if len(iq_disco_items_items) > 20 or jid_kind and jid_kind in ('pubsub'): identity = sub_jid_info = sub_jid_info_iq = '' if jid_kind and jid_kind in ('conference', 'mix', 'muc'): category = 'conference' if jid_kind and jid_kind in ('pubsub'): category = 'pubsub' else: sub_jid_info = await XmppXep0030.get_jid_info(xmpp_instance, jid_bare) sub_jid_info_iq = sub_jid_info['iq'] try: for identity_item in sub_jid_info_iq['disco_info']['identities']: identity = identity_item break if sub_jid_info_iq: category = identity[0] if (identity, list) and identity[0] else 'other' except: identity = None category = 'unavailable' sub_jid_kind = sub_jid_info['kind'] if 'kind' in sub_jid_info else None if category not in services: services[category] = [] services[category].append( {'identity' : identity, 'info' : sub_jid_info, 'jid' : jid_bare, 'kind' : sub_jid_kind, 'name' : item[2] or item[1] or item[0], 'node' : item[1]}) services_sorted = {k: v for k, v in services.items() if k != 'unavailable'} if 'unavailable' in services: services_sorted['unavailable'] = services['unavailable'] else: message = '{}: {} (XEP-0030)'.format(jid_info['condition'], jid_info['text']) services = services_sorted = None xmpp_instance.disconnect() #except Exception as e: else: exception = str(e) action = 'Error' title = 'Slixmpp error' xmpp_uri = note = jid filename = jid_bare = link_href = link_text = selection = services = services_sorted = url = None #if title == 'remote-server-timeout': # raise HTTPException(status_code=408, detail='remote-server-timeout') #else: template_file = 'disco.xhtml' template_dict = { 'action' : action, 'filename' : 'default.svg', 'brand_name' : brand_name, 'brand_site' : brand_site, 'chat_client' : chat_client, 'exception' : exception, 'jid_bare' : jid, 'note' : note, 'request' : request, 'services' : services_sorted, 'title' : title, 'url' : request.url._url, 'link_href' : link_href, 'link_text' : link_text, 'xmpp_uri' : xmpp_uri} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/j/{jid}/{node_name}') async def jid_node_get(request: Request, jid, node_name): response = await main_jid_node_get(request, jid, node_name) return response @self.app.get('/j/{jid}') async def jid_get(request: Request, jid): node_name = request.query_params.get('node', '') if node_name: response = RedirectResponse(url='/j/{}/{}'.format(jid, node_name)) else: response = await main_jid_node_get(request, jid) return response async def main_jid_node_get(request: Request, jid, node_name=None): jid_bare = jid jid_path = urlsplit(jid).path if parseaddr(jid_path)[1] == jid_path: jid_bare = jid_path.lower() else: jid_bare = jid note = 'Jabber ID appears to be malformed' if jid_bare == jabber_id: raise HTTPException(status_code=403, detail='access-denied') #try: if True: action = alias = count_item = count_message = exception = \ instance = jid_vcard = jid_info = link_href = message = note = \ selection = title = vcard4 = view_href = xmpp_uri = None #node_name = 'urn:xmpp:microblog:0' filename = 'details/{}.toml'.format(jid_bare) if os.path.exists(filename) and os.path.getsize(filename) > 0: jid_details = Data.open_file_toml(filename) else: jid_details = await FileUtilities.cache_jid_data( jabber_id, password, jid_bare, node_name, alias=alias) # Set node name to 'urn:xmpp:microblog:0' jid_kind = jid_details['kind'] nodes = jid_details['nodes'] count_message = jid_details['messages'] if (jid_kind not in ('conference', 'mix', 'muc') and '@' in jid_bare and not node_name and 'urn:xmpp:microblog:0' in nodes): node_name = 'urn:xmpp:microblog:0' items = jid_details['items'] jid_info = { 'error' : jid_details['error'], 'text' : jid_details['error_text'], 'condition' : jid_details['error_condition']} jid_vcard = { 'name' : jid_details['name'], 'note' : jid_details['note'], 'type' : jid_details['image_type']} messages = jid_details['messages'] #note = nodes[node_name]['title'] if node_name in nodes else jid_details['note'] #note = jid_details['note'] # vCard4 node_name_vcard4 = 'urn:xmpp:vcard4' item_id_vcard4 = 'current' directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name_vcard4) filename = directory + item_id_vcard4 + '.xml' if os.path.exists(filename) and os.path.getsize(filename) > 0: xml_data = Data.open_file_xml(filename) root_element = xml_data.getroot() child_element = root_element[0] #vcard_info = Syndication.extract_vcard_items(child_element) vcard_info = Syndication.extract_vcard4_items(child_element) title = vcard_info['fn'] alias = vcard_info['alias'] #note = vcard_info['note'] else: await FileUtilities.cache_vcard_data( jabber_id, password, jid_bare, node_name_vcard4, item_id_vcard4) if os.path.exists(filename) and os.path.getsize(filename) > 0: vcard4 = True # Node item IDs supdirectory = 'xep_0060/{}/'.format(jid_bare) if not os.path.exists(supdirectory): os.mkdir(supdirectory) directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name) if not os.path.exists(directory): os.mkdir(directory) await FileUtilities.cache_node_data( jabber_id, password, jid_bare, node_name) # JID or node items if jid_kind in ('mix', 'muc', 'conference', 'server'): count_item = jid_details['count'] elif jid_kind in ('account', 'pubsub'): node_items = os.listdir(directory) if 'urn:xmpp:avatar:metadata.xml' in node_items: node_items.remove('urn:xmpp:avatar:metadata.xml') count_item = len(node_items) # if ('@' in jid_bare and # 'urn:xmpp:microblog:0' not in nodes and # jid_kind not in ('conference', 'mix', 'muc')): # count_item = 0 # else: # count_item = len(node_items) if jid_kind == 'pubsub' and node_name: items = jid_details['items'] for item in items: if item[1] == node_name: #nodes[node_name]['title'] = item[2] title = item[2] break if not title: title = node_name else: title = jid_details['name'] # TODO Consider also the existence of a node /j/pubsub.movim.eu/i2p if jid_kind: # Action and instance type action, instance = XmppUtilities.set_action_instance_type(jid_kind, node_name) view_href = XmppUtilities.get_view_href(jid_bare, jid_kind, node_name) xmpp_uri = XmppUtilities.get_xmpp_uri(jid_bare, jid_kind, node_name) else: # jid_info['error'] action = 'Contact' instance = view_href = '' if jid_info['condition']: message = '{}: {} (XEP-0030)'.format(jid_info['text'], jid_info['condition']) xmpp_uri = jid_bare link_href = XmppUtilities.get_link_href(jid_bare, jid_kind, node_name) # Query URI links print('Query URI links') links = XmppUtilities.get_query_uri_links(jid_bare, jid_kind, node_name) # Graphic files filename, filepath, filetype, selection = FileUtilities.handle_photo( jid_bare, jid_vcard, link_href) #except Exception as e: else: exception = str(e) print(exception) action = 'Error' title = 'Slixmpp error' xmpp_uri = jid alias = count_item = count_message = filename = jid_bare = \ jid_vcard = jid_kind = links = message = selection = url = \ vcard4 = None #note_500 = note[:500] #note = note_500 + ' …' if note_500 < note else note_500 # NOTE Handling of variables "title" and "note" in case of '/j/{jid}/{node_name}' is confusing. # TODO Add new keys that are of 'node' and be utilized for nodes, instead of reusing a variable for several roles. # FIXME If no title be provided to 'node name', use 'node name' itself as title (to be done at FileUtilities.cache_jid_data). template_file = 'jid.xhtml' template_dict = { 'action' : action, 'alias' : alias, 'brand_name' : brand_name, 'brand_site' : brand_site, 'chat_client' : chat_client, 'count_item' : count_item, 'count_message' : count_message, 'instance' : instance, 'exception' : exception, 'filename' : filename, 'jid_bare' : jid_bare, 'jid_kind' : jid_kind, 'links' : links, 'message' : message, 'news_client' : news_client, 'note' : note, # TODO node_note or title of PubSub JID 'request' : request, 'selection' : selection, 'title' : title, # TODO node_title 'url' : request.url._url, 'vcard4' : vcard4, 'view_href' : view_href, 'xmpp_uri' : xmpp_uri} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/selection') async def selection_get(request: Request): filename = 'systems.toml' software = Data.open_file_toml(filename)['systems'] template_file = 'software.xhtml' template_dict = { 'brand_name' : brand_name, 'brand_site' : brand_site, 'chat_client' : chat_client, 'request' : request, 'software' : software, 'url' : request.url._url} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response #@self.app.get('/download/select') #async def download_select_get(request, software=None): @self.app.get('/download/{software}') async def download_software_get(request: Request, software): response = await download_get(request, featured=True, software=software) return response @self.app.get('/download/{software}/all') async def download_software_all_get(request: Request, software): response = await download_get(request, featured=False, software=software) return response @self.app.get('/download') async def download_get(request: Request, featured=True, software=None): # TODO # Fearured clients '/download/{software}' # All clients '/download/{software}/all' # Select software '/download/select' skipped = False if not software: user_agent = request.headers.get("user-agent") user_agent_lower = user_agent.lower() match user_agent_lower: case _ if 'bsd' in user_agent_lower: software = 'bsd' case _ if 'linux' in user_agent_lower: software = 'linux' case _ if 'haiku' in user_agent_lower: software = 'haiku' case _ if 'android' in user_agent_lower: software = 'android' case _ if 'reactos' in user_agent_lower or 'windows' in user_agent_lower: software = 'windows' case _ if 'ios' in user_agent_lower or 'macos' in user_agent_lower: software = 'apple' name = software.title() if software == 'bsd': name = 'BSD' if software == 'posix': name = 'POSIX' if software == 'ubports': name = 'UBports' if name.endswith('os'): name = name.replace('os', 'OS') filename_clients = 'clients.toml' clients = Data.open_file_toml(filename_clients) client_selection = [] clients_software = 0 for client in clients: if software in clients[client]: clients_software += 1 if featured and 'featured' not in clients[client]['properties']: skipped = True continue client_selected = { 'name' : clients[client]['title'], 'about' : clients[client]['about'], 'href' : clients[client][software], 'iden' : client, 'properties' : clients[client]['properties'], 'resources' : clients[client]['resources'] if 'resources' in clients[client] else ''} client_selection.append(client_selected) skipped = False if len(client_selection) == clients_software else True template_file = 'download.xhtml' template_dict = { 'brand_name' : brand_name, 'brand_site' : brand_site, 'chat_client' : chat_client, 'client_selection' : client_selection, 'featured' : featured, 'skipped' : skipped, 'request' : request, 'software' : software, 'title' : name, 'url' : request.url._url} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.exception_handler(403) def access_denied_exception_handler(request: Request, exc: HTTPException): action = 'Warning' title = 'Access Denied' return result_get(request, action, title) @self.app.exception_handler(404) def not_found_exception_handler(request: Request, exc: HTTPException): action = 'Warning' title = 'Not Found' return result_get(request, action, title) @self.app.exception_handler(500) def internal_error_exception_handler(request: Request, exc: HTTPException): action = 'Error' title = 'Internal Server Error' return result_get(request, action, title) @self.app.exception_handler(504) def time_out_exception_handler(request: Request, exc: HTTPException): action = 'Warning' title = 'Time Out' return result_get(request, action, title) def result_get(request: Request, action: str, title: str): template_file = 'result.xhtml' template_dict = { 'action' : action, 'brand_name' : brand_name, 'brand_site' : brand_site, 'chat_client' : chat_client, 'request' : request, 'title' : title, 'url' : request.url._url} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/') async def main_get(request: Request): jabber_id = request.query_params.get('jid', '') if jabber_id: response = RedirectResponse(url='/j/' + jabber_id) else: template_file = 'main.xhtml' template_dict = { 'brand_name' : brand_name, 'brand_site' : brand_site, 'chat_client' : chat_client, 'request' : request, 'url' : request.url._url} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response class Data: def open_file_toml(filename: str) -> dict: with open(filename, mode="rb") as fn: data = tomllib.load(fn) return data def save_to_toml(filename: str, data: dict) -> None: with open(filename, 'w') as fn: data_as_string = tomli_w.dumps(data) fn.write(data_as_string) def open_file_xml(filename: str) -> ET.ElementTree: data = ET.parse(filename) return data def save_to_file(filename: str, data: str) -> None: with open(filename, 'w') as fn: fn.write(data) class FileUtilities: async def cache_vcard_data( jabber_id, password, jid_bare, node_name_vcard4, item_id_vcard4): # Start an XMPP instance and retrieve information xmpp_instance = XmppInstance(jabber_id, password, jid_bare) xmpp_instance.connect() vcard4_data = await XmppXep0060.get_node_items( xmpp_instance, jid_bare, node_name_vcard4, item_ids=[item_id_vcard4]) xmpp_instance.disconnect() if vcard4_data: supdirectory = 'xep_0060/{}/'.format(jid_bare) if not os.path.exists(supdirectory): os.mkdir(supdirectory) directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name_vcard4) if not os.path.exists(directory): os.mkdir(directory) if isinstance(vcard4_data['iq'], stanza.iq.Iq): iq = vcard4_data['iq'] for item in iq['pubsub']['items']: filename = directory + item_id_vcard4 + '.xml' xml_item_as_string = str(item) Data.save_to_file(filename, xml_item_as_string) #item_payload = item['payload'] #vcard4_info = Syndication.extract_vcard4_items(item_payload) async def cache_node_data( jabber_id, password, jid_bare, node_name): # Start an XMPP instance and retrieve information xmpp_instance = XmppInstance(jabber_id, password, jid_bare) xmpp_instance.connect() node_items = await XmppXep0060.get_node_items( xmpp_instance, jid_bare, node_name) xmpp_instance.disconnect() if node_items: supdirectory = 'xep_0060/{}/'.format(jid_bare) if not os.path.exists(supdirectory): os.mkdir(supdirectory) directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name) if not os.path.exists(directory): os.mkdir(directory) if isinstance(node_items['iq'], stanza.iq.Iq): iq = node_items['iq'] namespace = '{http://www.w3.org/2005/Atom}' for item in iq['pubsub']['items']: item_payload = item['payload'] date_element = item_payload.find(namespace + 'updated') if not date_element: date_element = item_payload.find(namespace + 'published') if isinstance(date_element, ET.Element): date = date_element.text modification_time = parser.parse(date).timestamp() filename = directory + item['id'] + '.xml' xml_item_as_string = str(item) Data.save_to_file(filename, xml_item_as_string) if isinstance(date_element, ET.Element): file_statistics = os.stat(filename) access_time = file_statistics.st_atime os.utime(filename, (access_time, modification_time)) #item_payload = item['payload'] #entry = Syndication.extract_atom_items(item_payload) async def cache_jid_data( jabber_id, password, jid_bare, node_name=None, item_id=None, alias=None): iq_disco_items_list = iq_disco_items_items_list = node_note = node_title = title = '' jid_vcard = { 'name' : '', 'note' : '', 'type' : '', 'bin' : ''} #filename = 'details/{}.toml'.format(jid_bare) #if os.path.exists(filename): jid_details = Data.open_file_toml(filename) # Start an XMPP instance and retrieve information xmpp_instance = XmppInstance(jabber_id, password, jid_bare) xmpp_instance.connect() # JID kind print('JID kind') instance = message = node_id = None jid_info = await XmppXep0030.get_jid_info(xmpp_instance, jid_bare) jid_info_iq = jid_info['iq'] jid_kind = jid_info['kind'] # Set node name to 'urn:xmpp:microblog:0' if JID is an account if jid_kind == 'account' and not node_name: node_name = 'urn:xmpp:microblog:0' # vCard4 data node_name_vcard4 = 'urn:xmpp:vcard4' item_id_vcard4 = 'current' vcard4_data = await XmppXep0060.get_node_items( xmpp_instance, jid_bare, node_name_vcard4, item_ids=[item_id_vcard4]) if vcard4_data: supdirectory = 'xep_0060/{}/'.format(jid_bare) if not os.path.exists(supdirectory): os.mkdir(supdirectory) directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name_vcard4) if not os.path.exists(directory): os.mkdir(directory) if isinstance(vcard4_data['iq'], stanza.iq.Iq): iq = vcard4_data['iq'] for item in iq['pubsub']['items']: filename = directory + item_id_vcard4 + '.xml' xml_item_as_string = str(item) Data.save_to_file(filename, xml_item_as_string) #item_payload = item['payload'] #vcard4_info = Syndication.extract_vcard4_items(item_payload) # JID info print('JID info') # NOTE Group chat of Psi+ Project at jabber.ru has a note in its vCard. vcard_data = await XmppXep0054.get_vcard_data(xmpp_instance, jid_bare) if not vcard_data['error']: conference_title = None if jid_kind in ('mix', 'muc'): for identity in jid_info_iq['disco_info']['identities']: if identity[3]: conference_title = identity[3] break vcard_temp = vcard_data['iq']['vcard_temp'] jid_vcard = { 'name' : vcard_temp['FN'] or conference_title or '', 'note' : vcard_temp['notes'] or node_id or '', 'type' : vcard_temp['PHOTO']['TYPE'] or '', 'bin' : vcard_temp['PHOTO']['BINVAL'] or ''} # TODO /d/pubsub.nicoco.fr/blog/urn-uuid-53e43061-1962-3112-bb8a-1473dca61719 count = '' jid_items = await XmppXep0030.get_jid_items(xmpp_instance, jid_bare) if isinstance(jid_items['iq'], stanza.iq.Iq): iq = jid_items['iq'] iq_disco_items = iq['disco_items'] iq_disco_items_items = iq_disco_items['items'] #iq_disco_items_set = {''} iq_disco_items_list = [] iq_disco_items_items_list = [] for item in iq_disco_items_items: if jid_kind == 'muc': #iq_disco_items_set.update([item[2]]) iq_disco_items_list.append(item[2]) else: #iq_disco_items_set.update([item[1]]) iq_disco_items_list.append(item[1]) iq_disco_items_items_list.append( [item[0] or '', item[1] or '', item[2] or '']) #count = len(iq_disco_items_set) count = len(iq_disco_items_list) # Title print('Title') if (jid_kind not in ('conference', 'mix', 'muc') and '@' in jid_bare and not node_name): # NOTE Variables node_name and node_title do not appear to be utilized. node_name = 'urn:xmpp:microblog:0' node_title = 'Journal' elif jid_kind == 'pubsub': category = 'unsorted' for item in iq_disco_items_items: if item[2] and item[1] == node_name: node_title = item[2] break else: jid_items = None if jid_kind == 'server': if jid_info_iq: for identity in jid_info_iq['disco_info']['identities']: if jid_kind == identity[0] and identity[1] == 'im' and identity[3]: title = identity[3] print(jid_bare) print(identity) print(jid_info) # String 'undefined' is sourced from JID discuss@conference.conversejs.org if not title: if jid_vcard['name'] and not 'undefined' in jid_vcard['name']: title = jid_vcard['name'] else: title = jid_bare.split('@')[0] # JID item count #count = await XmppUtilities.count_jid_items(xmpp_instance, jid_bare, node_name, jid_kind) #if jid_kind in ('mix', 'muc', 'conference', 'server'): # jid_items = await XmppXep0030.get_jid_items(xmpp_instance, jid_bare) # if isinstance(jid_items['iq'], stanza.iq.Iq): # iq = jid_items['iq'] # count = len(iq['disco_items']['items']) #elif jid_kind in ('account', 'pubsub'): # node_item_ids = await XmppXep0060.get_node_item_ids(xmpp_instance, jid_bare, node_name) # if isinstance(node_item_ids, stanza.iq.Iq): # count = len(node_item_ids['disco_items']['items']) # Group chat messages print('Group chat messages') messages = [] subject = '' if jid_kind == 'muc': #action = 'Join' # TODO Create configurations for group chat preview room_info_muc = await XmppXep0045.get_room_information( xmpp_instance, jid_bare, alias, maxstanzas=50) # NOTE Do not mix error messages with node titles and descriptions etc. if isinstance(room_info_muc['iq'], tuple): iq = room_info_muc['iq'] for message in iq[3]: messages.append({ 'id' : message['id'], 'alias' : message['mucnick'], 'body' : message['body'], 'timestamp' : message['delay']['stamp'].__str__()}) messages.reverse() subject = iq[1]['subject'] #title = title or node_name if not node_title: node_title = node_name node_note = jid_bare else: message = '{}: {} (XEP-0045)'.format(room_info_muc['condition'], room_info_muc['text']) # Node items print('Node items') nodes = {} #if node_name and node_name in iq_disco_items_set: if iq_disco_items_list and node_name and node_name in iq_disco_items_list: #action = 'Browse' node_item_ids = await XmppXep0060.get_node_item_ids(xmpp_instance, jid_bare, node_name) if isinstance(node_item_ids['iq'], stanza.iq.Iq): iq = node_item_ids['iq'] nodes[node_name] = {} nodes[node_name]['title'] = node_title nodes[node_name]['count'] = len(iq['disco_items']['items']) nodes[node_name]['item_ids'] = [] for item_id in iq['disco_items']['items']: nodes[node_name]['item_ids'].append( [item_id[0] or '', item_id[1] or '', item_id[2] or '']) item_ids = [] for item in nodes[node_name]['item_ids']: item_ids.append(item[2]) node_items = await XmppXep0060.get_node_items( xmpp_instance, jid_bare, node_name) if node_items: supdirectory = 'xep_0060/{}/'.format(jid_bare) if not os.path.exists(supdirectory): os.mkdir(supdirectory) directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name) if not os.path.exists(directory): os.mkdir(directory) if isinstance(node_items['iq'], stanza.iq.Iq): iq = node_items['iq'] namespace = '{http://www.w3.org/2005/Atom}' for item in iq['pubsub']['items']: item_payload = item['payload'] date_element = item_payload.find(namespace + 'updated') if not date_element: date_element = item_payload.find(namespace + 'published') if isinstance(date_element, ET.Element): date = date_element.text modification_time = parser.parse(date).timestamp() filename = directory + item['id'] + '.xml' xml_item_as_string = str(item) Data.save_to_file(filename, xml_item_as_string) if isinstance(date_element, ET.Element): file_statistics = os.stat(filename) access_time = file_statistics.st_atime os.utime(filename, (access_time, modification_time)) #item_payload = item['payload'] #entry = Syndication.extract_atom_items(item_payload) xmpp_instance.disconnect() # Notes print('Notes') jid_vcard_note = jid_vcard['note'] if isinstance(jid_vcard_note, list) and len(jid_vcard_note): note = jid_vcard_note[0]['NOTE'] else: note = jid_vcard_note #if not note and jid_vcard['name'] and not 'undefined' in jid_vcard['name'] and title != jid_vcard['name']: # note = jid_vcard['name'] jid_details = { 'count' : count or '', 'error' : jid_info['error'], 'error_text' : jid_info['text'] or '', 'error_condition' : jid_info['condition'] or '', 'image_type' : jid_vcard['type'], 'items' : iq_disco_items_items_list, 'kind' : jid_kind or '', 'messages' : messages or '', 'name' : title, 'nodes' : nodes, 'note' : note or '', 'subject' : subject or ''} print(jid_details) # Query URI href link_href = XmppUtilities.get_link_href(jid_bare, jid_kind, node_name) FileUtilities.handle_photo(jid_bare, jid_vcard, link_href) filename = 'details/{}.toml'.format(jid_bare) Data.save_to_toml(filename, jid_details) return jid_details def handle_photo(jid_bare, jid_vcard, link_href): filename = filepath = filetype = mimetype = selection = None filecirca = 'photo/{}.*'.format(jid_bare) filepath_guess = glob.glob(filecirca) if filepath_guess: filepath = filepath_guess[0] filetype = filepath.split('.').pop() filename = '{}.{}'.format(jid_bare, filetype) elif jid_vcard: if jid_vcard['type']: mimetype = jid_vcard['type'] if mimetype: filetype = mimetype.split('/')[1] if filetype == 'svg+xml': filetype = 'svg' filename = '{}.{}'.format(jid_bare, filetype) filepath = 'photo/{}.{}'.format(jid_bare, filetype) #img.save(filename) # Write the decoded bytes to a file with open(filepath, 'wb') as file: file.write(jid_vcard['bin']) if not filepath or not os.path.exists(filepath) or os.path.getsize(filepath) == 0: filename = 'default.svg' elif filetype == 'svg': selection = Graphics.extract_colours_from_vector(filepath) else: selection = Graphics.extract_colours_from_raster(filepath) # QR code filepath_qrcode = 'qr/{}.png'.format(jid_bare) if not os.path.exists(filepath_qrcode) or os.path.getsize(filepath_qrcode) == 0: Graphics.generate_qr_code_graphics_from_string(link_href, jid_bare) return filename, filepath, filetype, selection class Graphics: def extract_colours_from_raster(filepath): try: img = cv2.imread(filepath) #thresholded = cv2.inRange(img, (50, 100, 200), (50, 100, 200)) thresholded = cv2.inRange(img, (90, 90, 90), (190, 190, 190)) #thresholded = cv2.bitwise_not(thresholded) #thresholded = cv2.inRange(img, (0, 0, 0), (0, 0, 0)) #res = img + cv2.cvtColor(thresholded, cv2.COLOR_GRAY2BGR) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) #result = numpy.clip(img, 90, 190) #result = numpy.clip(img, 50, 200) #result = numpy.clip(img, 100, 150) result = numpy.clip(img, 100, 200) res = cv2.cvtColor(result, cv2.COLOR_RGB2BGR) """ img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) mask = numpy.all(numpy.logical_and(img >= 90, img <= 190), axis=2) result = numpy.where(mask[...,None], img, 255) res = cv2.cvtColor(result, cv2.COLOR_RGB2BGR) """ """ # Thresholding for black: lower_black = numpy.array([0, 0, 0]) upper_black = numpy.array([50, 50, 50]) # Adjust this value for the black range black_mask = cv2.inRange(img, lower_black, upper_black) # Thresholding for white: lower_white = numpy.array([250, 250, 250]) upper_white = numpy.array([255, 255, 255]) white_mask = cv2.inRange(img, lower_white, upper_white) # Combine the masks combined_mask = cv2.bitwise_or(black_mask, white_mask) # Invert the combined mask inverted_mask = cv2.bitwise_not(combined_mask) # Apply the mask to the original image res = cv2.bitwise_and(img, img, mask=inverted_mask) """ selection = [] ix_1st = random.randint(1, len(res)-1) res_ix_1st = res[ix_1st] ix_ix_1st = random.randint(1, len(res_ix_1st)-1) res_ix_ix_1st = res_ix_1st[ix_ix_1st] selection.append(numpy.array(res_ix_ix_1st).tolist()) ix_2nd = random.randint(1, len(res)-1) res_ix_2nd = res[ix_2nd] ix_ix_2nd = random.randint(1, len(res_ix_2nd)-1) res_ix_ix_2nd = res_ix_2nd[ix_ix_2nd] selection.append(numpy.array(res_ix_ix_2nd).tolist()) print(selection) except Exception as e: selection = None exception = str(e) print(exception) return selection def extract_colours_from_vector(filepath): # Parse the SVG file tree = ET.parse(filepath) root = tree.getroot() # Set to store unique colours colours_hex = set() colours_rgb = [] # SVG namespace namespace = {'svg': 'http://www.w3.org/2000/svg'} # Find all possible elements for elem in root.findall('.//svg:circle', namespace) + \ root.findall('.//svg:ellipse', namespace) + \ root.findall('.//svg:line', namespace) + \ root.findall('.//svg:path', namespace) + \ root.findall('.//svg:polygon', namespace) + \ root.findall('.//svg:rect', namespace) + \ root.findall('.//svg:text', namespace): fill = elem.get('fill') stroke = elem.get('stroke') # Add colours to the set if they are not None or 'none' if fill and fill.startswith('#') and len(fill) > 4 and fill.lower() != 'none': colours_hex.add(fill) if stroke and stroke.startswith('#') and len(stroke) > 4 and stroke.lower() != 'none': colours_hex.add(stroke) for colour in colours_hex: hex = colour.lstrip('#') rgb = list(int(hex[i:i+2], 16) for i in (0, 2, 4)) rgb.reverse() colours_rgb.append(rgb) selection = [] if len(colours_rgb) > 1: for i in range(2): ix = random.randint(0, len(colours_rgb)-1) selection.append(colours_rgb[ix]) del colours_rgb[ix] elif len(colours_rgb) == 1: selection = [colours_rgb[0], colours_rgb[0]] return selection def generate_qr_code_graphics_from_string(text, jid_bare): #qrcode_graphics = qrcode.make(text) qr = qrcode.QRCode(border=2, box_size=10) qr.add_data(text) qrcode_graphics = qr.make_image(fill_color='#333', back_color='#f2f2f2') qrcode_graphics.save('qr/{}.png'.format(jid_bare)) class Syndication: # def extract_vcard_items(xml_data): # namespace = '{urn:ietf:params:xml:ns:vcard-4.0}' # title = xml_data.find(namespace + 'title') # # entry = {'fn' : content_text, # 'note' : link_href, # 'email' : published_text, # 'impp' : summary_text, # 'url' : tags} # return entry def extract_vcard_items(xml_data): """Extracts all items from a vCard XML ElementTree. Args: xml_data (ElementTree): The vCard XML as an ElementTree object. Returns: dict: A dictionary where keys are item names and values are their text content. """ items = {} for item in xml_data.iter(): # Skip the root element (vcard) if item.tag == '{urn:ietf:params:xml:ns:vcard-4.0}vcard': continue # Extract item name and text content item_name = item.tag.split('}')[1] # Check for any direct text content or child elements item_text = [] if item.text: item_text.append(item.text) for child in item: if child.text: item_text.append(child.text) # Join text elements if multiple found if item_text: items[item_name] = ' '.join(item_text).strip() # Strip extra spaces else: items[item_name] = None return items def extract_vcard4_items(xml_data): namespace = '{urn:ietf:params:xml:ns:vcard-4.0}' vcard = {} element_em = xml_data.find(namespace + 'email') element_fn = xml_data.find(namespace + 'fn') element_nn = xml_data.find(namespace + 'nickname') element_nt = xml_data.find(namespace + 'note') element_og = xml_data.find(namespace + 'org') element_im = xml_data.find(namespace + 'impp') element_ul = xml_data.find(namespace + 'url') if isinstance(element_em, ET.Element): for i in element_em: text = i.text if text: email = text break else: email = '' else: email = '' if isinstance(element_fn, ET.Element): for i in element_fn: text = i.text if text: title = text break else: title = '' else: title = '' if isinstance(element_nn, ET.Element): for i in element_nn: text = i.text if text: alias = text break else: alias = '' else: alias = '' if isinstance(element_nt, ET.Element): for i in element_nt: text = i.text if text: note = text break else: note = '' else: note = '' if isinstance(element_og, ET.Element): for i in element_og: text = i.text if text: org = text break else: org = '' else: org = '' if isinstance(element_im, ET.Element): for i in element_im: text = i.text if text: impp = text break else: impp = '' else: impp = '' if isinstance(element_ul, ET.Element): for i in element_ul: text = i.text if text: url = text break else: url = '' else: url = '' vcard['extras'] = {} for element in xml_data.findall(namespace + "group"): category = '?' for i in element.find(namespace + 'x-ablabel'): txt = i.text for i in element.find(namespace + 'url'): uri = i.text for i in element.find(namespace + 'url/' + namespace + 'parameters/' + namespace + 'type'): category = i.text if not category in vcard['extras']: vcard['extras'][category] = [] vcard['extras'][category].append({'label' : txt, 'uri' : uri}) vcard['alias'] = alias vcard['email'] = email vcard['fn'] = title vcard['note'] = note vcard['org'] = org vcard['impp'] = impp vcard['url'] = url return vcard def extract_atom_items(xml_data, limit=False): # NOTE # `.//` was not needded when node item payload was passed directly. # Now that item is saved as xml, it is required to use `.//`. # Perhaps navigating a level down (i.e. to "child"), or removing the root from the file would solve this. #namespace = './/{http://www.w3.org/2005/Atom}' namespace = '{http://www.w3.org/2005/Atom}' title = xml_data.find(namespace + 'title') links = xml_data.find(namespace + 'link') if (not isinstance(title, ET.Element) and not isinstance(links, ET.Element)): return None title_text = '' if title == None else title.text link_href = '' if isinstance(links, ET.Element): for link in xml_data.findall(namespace + 'link'): link_href = link.attrib['href'] if 'href' in link.attrib else '' if link_href: break contents = xml_data.find(namespace + 'content') content_text = '' if isinstance(contents, ET.Element): for content in xml_data.findall(namespace + 'content'): content_text = content.text or '' if content_text: break summaries = xml_data.find(namespace + 'summary') summary_text = '' if isinstance(summaries, ET.Element): for summary in xml_data.findall(namespace + 'summary'): summary_text = summary.text or '' if summary_text: break published = xml_data.find(namespace + 'published') published_text = '' if published == None else published.text categories = xml_data.find(namespace + 'category') tags = [] if isinstance(categories, ET.Element): for category in xml_data.findall(namespace + 'category'): if 'term' in category.attrib and category.attrib['term']: category_term = category.attrib['term'] if len(category_term) < 20: tags.append(category_term) elif len(category_term) < 50: tags.append(category_term) if limit and len(tags) > 4: break identifier = xml_data.find(namespace + 'id') if identifier and identifier.attrib: print(identifier.attrib) identifier_text = '' if identifier == None else identifier.text instances = '' # TODO Check the Blasta database for instances. entry = {'content' : content_text, 'href' : link_href, 'published' : published_text, 'summary' : summary_text, 'tags' : tags, 'title' : title_text, 'updated' : published_text} # TODO "Updated" is missing return entry class XmppUtilities: def set_action_instance_type(jid_kind, node_name=None): if jid_kind in ('conference', 'server'): action = 'Discover' if jid_kind == 'conference': instance = 'conferences' elif jid_kind == 'server': instance = 'services' elif jid_kind in ('mix', 'muc'): action = 'Join' instance = 'occupants' elif jid_kind == 'pubsub': if node_name: action = 'Subscribe' instance = 'articles' else: action = 'Browse' instance = 'nodes' elif jid_kind == 'account': action = 'Message' instance = 'articles' else: # jid_info['error'] action = 'Contact' return action, instance def get_link_href(jid_bare, jid_kind, node_name=None): if jid_kind in ('conference', 'server'): link_href = 'xmpp:{}?disco;type=get;request=items'.format(jid_bare) elif jid_kind in ('mix', 'muc'): link_href = 'xmpp:{}?join'.format(jid_bare) elif jid_kind == 'pubsub': if node_name: link_href = 'xmpp:{}?pubsub;node={};action=subscribe'.format(jid_bare, node_name) else: link_href = 'xmpp:{}?disco;type=get;request=items'.format(jid_bare) elif jid_kind == 'account': link_href = 'xmpp:{}?message'.format(jid_bare) else: # jid_info['error'] link_href = 'xmpp:{}'.format(jid_bare) return link_href def get_view_href(jid_bare, jid_kind, node_name=None): links = [] view_href = None if jid_kind in ('conference', 'server'): view_href = '/d/' + jid_bare elif jid_kind in ('mix', 'muc'): view_href = '/v/' + jid_bare elif jid_kind == 'pubsub': if node_name: view_href = '/d/{}/{}'.format(jid_bare, node_name) else: view_href = '/d/' + jid_bare elif jid_kind == 'account': view_href = '/d/{}/{}'.format(jid_bare, node_name) return view_href def get_xmpp_uri(jid_bare, jid_kind, node_name=None): links = [] view_href = None xmpp_uri = jid_bare if jid_kind in ('conference', 'server'): xmpp_uri = jid_bare elif jid_kind in ('mix', 'muc'): xmpp_uri = jid_bare elif jid_kind == 'pubsub': if node_name: xmpp_uri = '{}?;node={}'.format(jid_bare, node_name) else: xmpp_uri = jid_bare elif jid_kind == 'account': xmpp_uri = jid_bare return xmpp_uri def get_query_uri_links(jid_bare, jid_kind, node_name=None, item_id=None): links = [] if jid_kind in ('conference', 'server'): links.append({'name' : 'Discover', 'href' : 'xmpp:{}?disco;type=get;request=items'.format(jid_bare), 'iden' : 'discover'}) xmpp_uri = jid_bare elif jid_kind in ('mix', 'muc'): links.append({'name' : 'Join', 'href' : 'xmpp:{}?join'.format(jid_bare), 'iden' : 'join'}) elif jid_kind == 'pubsub': links.append({'name' : 'Browse', 'href' : 'xmpp:{}?disco;type=get;request=items'.format(jid_bare), 'iden' : 'browse'}) elif jid_kind == 'account': links.append({'name' : 'Message', 'href' : 'xmpp:{}?message'.format(jid_bare), 'iden' : 'message'}) links.append({'name' : 'Add', 'href' : 'xmpp:{}?roster'.format(jid_bare), 'iden' : 'add'}) else: # jid_info['error'] links.append({'name' : 'Connect', 'href' : 'xmpp:{}'.format(jid_bare), 'iden' : 'connect'}) links.append({'name' : 'Add', 'href' : 'xmpp:{}?roster'.format(jid_bare), 'iden' : 'add'}) if item_id: links.append({'name' : 'Subscribe', 'href' : 'xmpp:{}?pubsub;node={};item={};action=subscribe'.format(jid_bare, node_name, item_id), 'iden' : 'subscribe'}) links.append({'name' : 'View', 'href' : 'xmpp:{}?pubsub;node={};item={}'.format(jid_bare, node_name, item_id), 'iden' : 'view'}) elif node_name: links.append({'name' : 'Subscribe', 'href' : 'xmpp:{}?pubsub;node={};action=subscribe'.format(jid_bare, node_name), 'iden' : 'subscribe'}) links.append({'name' : 'View', 'href' : 'xmpp:{}?pubsub;node={}'.format(jid_bare, node_name), 'iden' : 'view'}) links.append({'name' : 'vCard', 'href' : 'xmpp:{}?vcard'.format(jid_bare), 'iden' : 'vcard'}) return links class XmppXep0030: async def get_jid_items(self, jid_bare): try: condition = text = '' error = False iq = await self['xep_0030'].get_items(jid=jid_bare) except (IqError, IqTimeout) as e: #logger.warning('Chat type could not be determined for {}'.format(jid_bare)) #logger.error(e) iq = '' error = True condition = e.iq['error']['condition'] text = e.iq['error']['text'] or 'Error' #if not text: # # NOTE We might want to set a specific photo for condition remote-server-not-found # if condition: # text = 'Could not determine JID type' # else: # text = 'Unknown Error' result = { 'condition' : condition, 'error' : error, 'iq' : iq, 'text' : text} return result # NOTE # Feature "urn:xmpp:mucsub:0" is present in both, MUC local and MUC hostname # Feature "urn:xmpp:serverinfo:0" is present in both, MUC hostname and main hostname async def get_jid_info(self, jid_bare): jid_kind = None try: error = False condition = text = '' iq = await self['xep_0030'].get_info(jid=jid_bare) iq_disco_info = iq['disco_info'] if iq_disco_info: features = iq_disco_info['features'] if 'http://jabber.org/protocol/muc#unique' in features: jid_kind = 'conference' elif 'urn:xmpp:mix:core:1' in features: jid_kind = 'mix' elif ('muc_moderated' in features or 'muc_open' in features or 'muc_persistent' in features or 'muc_public' in features or 'muc_semianonymous' in features or 'muc_unmoderated' in features or 'muc_unsecured' in features): jid_kind = 'muc' elif '@' in jid_bare: for identity in iq_disco_info['identities']: #if identity[0] == 'pubsub' and identity[1] == 'pep': if identity[0] == 'account': #if 'urn:xmpp:bookmarks:1#compat-pep' in features: #if 'urn:xmpp:bookmarks:1#compat' in features: #if 'urn:xmpp:push:0' in features: #if 'urn:xmpp:pep-vcard-conversion:0' in features: #if 'urn:xmpp:sid:0' in features: # Also in MIX #if 'urn:xmpp:mam:2' in features: #if 'urn:xmpp:mam:2#extended' in features: jid_kind = 'account' break if identity[0] == 'client' and identity[1] == 'bot': jid_kind = 'bot' else: for identity in iq_disco_info['identities']: if identity[0] == 'pubsub' and identity[1] == 'service': #if 'http://jabber.org/protocol/pubsub' in features: #if 'http://jabber.org/protocol/pubsub#access-authorize' in features: #if 'http://jabber.org/protocol/rsm' in features: jid_kind = 'pubsub' break if identity[0] == 'server' and identity[1] == 'im': jid_kind = 'server' break #logger.info('Jabber ID: {}\n' # 'Chat Type: {}'.format(jid_bare, result)) else: iq = condition = text = '' except (IqError, IqTimeout) as e: #logger.warning('Chat type could not be determined for {}'.format(jid_bare)) #logger.error(e) iq = '' error = True condition = e.iq['error']['condition'] text = e.iq['error']['text'] or 'Error' #if not text: # # NOTE We might want to set a specific photo for condition remote-server-not-found # if condition: # text = 'Could not determine JID type' # else: # text = 'Unknown Error' result = { 'condition' : condition, 'error' : error, 'iq' : iq, 'text' : text, 'kind' : jid_kind} return result class XmppXep0045: async def get_room_information(self, jid, alias, maxchars=None, maxstanzas=None, seconds=None): #logger.info('Joining groupchat\nJID : {}\n'.format(jid)) #jid_from = str(self.boundjid) if self.is_component else None if not maxchars: maxchars = 1000 if not maxstanzas: maxstanzas = 50 if not seconds: seconds = 864000 try: error = False condition = text = '' #since = datetime.fromtimestamp(time.time()-seconds) iq = await self['xep_0045'].join_muc_wait( jid, alias, #maxchars=maxchars, maxstanzas=maxstanzas, #password=None, #presence_options = {"pfrom" : jid_from}, #seconds=seconds, #since=since, timeout=10 ) except TimeoutError as e: #raise HTTPException(status_code=504, detail='request-timeout-reached') error = True iq = e condition = 'Request timeout reached' text = str(e) except (IqError, IqTimeout, PresenceError) as e: error = True iq = e condition = e.iq['error']['condition'] text = e.iq['error']['text'] result = { 'error' : error, 'condition' : condition, 'text' : text, 'iq' : iq} return result async def get_room_data(self, jid_bare): return await self['xep_0045'].get_room_config(jid_bare) async def get_room_participants(self, jid_bare): return await self['xep_0045'].get_roster(jid_bare) # NOTE: "Item not found", yet is a group chat # That is, JID has no vcard # messaging-off@conference.movim.eu class XmppXep0054: async def get_vcard_data(self, jid_bare): try: error = False condition = text = '' iq = await self['xep_0054'].get_vcard(jid_bare) except (IqError, IqTimeout) as e: error = True condition = e.iq['error']['condition'] text = e.iq['error']['text'] if not text: if condition: text = 'Could not retrieve vCard' else: text = 'Unknown Error' iq = '' result = { 'error' : error, 'condition' : condition, 'text' : text, 'iq' : iq} return result class XmppXep0060: async def get_node_items(self, jid_bare, node_name, item_ids=None, max_items=None): try: error = False condition = text = '' if max_items: iq = await self['xep_0060'].get_items( jid_bare, node_name, timeout=5) it = self['xep_0060'].get_items( jid_bare, node_name, timeout=5, max_items=max_items, iterator=True) q = rsm.Iq() q['to'] = jid_bare q['disco_items']['node'] = node_name async for item in rsm.ResultIterator(q, 'disco_items', '10'): print(item['disco_items']['items']) else: iq = await self['xep_0060'].get_items( jid_bare, node_name, timeout=5, item_ids=item_ids) result = iq except (IqError, IqTimeout) as e: error = True iq = '' condition = e.iq['error']['condition'] text = e.iq['error']['text'] if not text: if condition: text = 'Could not retrieve node items' else: text = 'Unknown Error' result = { 'error' : error, 'condition' : condition, 'text' : text, 'iq' : iq} return result async def get_node_item_ids(self, jid_bare, node_name): try: error = False condition = text = '' iq = await self['xep_0030'].get_items( jid_bare, node_name) # Broken. See https://codeberg.org/poezio/slixmpp/issues/3548 #iq = await self['xep_0060'].get_item_ids( # jid_bare, node_name, timeout=5) except (IqError, IqTimeout) as e: error = True iq = '' condition = e.iq['error']['condition'] text = e.iq['error']['text'] if not text: if condition: text = 'Could not retrieve node item IDs' else: text = 'Unknown Error' result = { 'error' : error, 'condition' : condition, 'text' : text, 'iq' : iq} return result class XmppXep0369: async def get_room_data(self, jid_bare): return await self['xep_0369'].get_channel_info(jid_bare) def main(): filename_configuration = 'configuration.toml' configuration = Data.open_file_toml(filename_configuration) http_instance = HttpInstance(configuration) return http_instance.app app = main() # FIXME if __name__ == '__main__': uvicorn.run(app, host='127.0.0.1', port=8000, reload=True)