942 lines
41 KiB
Python
942 lines
41 KiB
Python
|
#!/usr/bin/python
|
||
|
# -*- coding: utf-8 -*-
|
||
|
|
||
|
from email.utils import parseaddr
|
||
|
from fastapi import FastAPI, Form, HTTPException, Request, Response
|
||
|
from fastapi.middleware.cors import CORSMiddleware
|
||
|
from fastapi.responses import FileResponse, HTMLResponse
|
||
|
from fastapi.staticfiles import StaticFiles
|
||
|
from fastapi.templating import Jinja2Templates
|
||
|
from jabbercard.config import Cache, Settings, Data
|
||
|
from jabbercard.utilities.graphics import Graphics
|
||
|
from jabbercard.utilities.toml import Toml
|
||
|
from jabbercard.utilities.xml import Syndication
|
||
|
from jabbercard.xmpp.utilities import XmppUtilities
|
||
|
import os
|
||
|
from starlette.responses import RedirectResponse
|
||
|
from urllib.parse import urlsplit
|
||
|
|
||
|
class HttpInstance:
|
||
|
def __init__(self):
|
||
|
|
||
|
directory_settings = Settings.get_directory()
|
||
|
filename_settings = os.path.join(directory_settings, 'settings.toml')
|
||
|
|
||
|
settings = Toml.open_file_toml(filename_settings)
|
||
|
|
||
|
account = settings['account']
|
||
|
jabber_id = account['xmpp']
|
||
|
password = account['pass']
|
||
|
alias = account['alias']
|
||
|
|
||
|
brand = settings['brand']
|
||
|
brand_name = brand['name']
|
||
|
brand_site = brand['site']
|
||
|
chat_client = brand['chat']
|
||
|
news_client = brand['news']
|
||
|
|
||
|
directory_data = Data.get_directory()
|
||
|
directory_data_css = os.path.join(directory_data, 'css')
|
||
|
directory_data_graphic = os.path.join(directory_data, 'graphic')
|
||
|
directory_data_img = os.path.join(directory_data, 'img')
|
||
|
filename_favicon = os.path.join(directory_data, 'img', 'favicon.ico')
|
||
|
directory_data_template = os.path.join(directory_data, 'template')
|
||
|
|
||
|
directory_cache = Cache.get_directory()
|
||
|
directory_cache_qr = os.path.join(directory_cache, 'qr')
|
||
|
directory_cache_photo = os.path.join(directory_cache, 'photo')
|
||
|
|
||
|
self.app = FastAPI()
|
||
|
templates = Jinja2Templates(directory=directory_data_template)
|
||
|
|
||
|
# TODO
|
||
|
# 1) Mount at the same mountpoint /img.
|
||
|
# 2) Image filename to be constant, i.e. /img/photo.png and /img/qr.png.
|
||
|
self.app.mount('/photo', StaticFiles(directory=directory_cache_photo), name='photo')
|
||
|
self.app.mount('/qr', StaticFiles(directory=directory_cache_qr), name='qr')
|
||
|
|
||
|
self.app.mount('/css', StaticFiles(directory=directory_data_css), name='css')
|
||
|
self.app.mount('/img', StaticFiles(directory=directory_data_img), name='img')
|
||
|
|
||
|
# @self.app.get(filename_favicon, include_in_schema=False)
|
||
|
# def favicon_get():
|
||
|
# return FileResponse('graphic/hermes.ico')
|
||
|
|
||
|
# @self.app.get('/hermes.svg')
|
||
|
# def logo_get():
|
||
|
# return FileResponse('graphic/hermes.svg')
|
||
|
|
||
|
@self.app.get('/v/{jid}')
|
||
|
async def view_jid(request: Request, jid):
|
||
|
"""View recent messages of a conference"""
|
||
|
jid_path = urlsplit(jid).path
|
||
|
if parseaddr(jid_path)[1] == jid_path:
|
||
|
jid_bare = jid_path.lower()
|
||
|
else:
|
||
|
jid_bare = jid
|
||
|
note = 'Jabber ID appears to be malformed'
|
||
|
|
||
|
if jid_bare == jabber_id:
|
||
|
raise HTTPException(status_code=403, detail='access-denied')
|
||
|
|
||
|
#try:
|
||
|
if True:
|
||
|
exception = jid_vcard = messages_10 = note = node_title = \
|
||
|
node_note = number_of_pages = page_number = previous = \
|
||
|
selection = services_sorted = subject = None
|
||
|
link_href = 'xmpp:{}?join'.format(jid_bare)
|
||
|
link_text = 'Join'
|
||
|
xmpp_uri = '{}'.format(jid_bare)
|
||
|
|
||
|
filename = os.path.join(directory_cache, 'details', jid_bare + '.toml')
|
||
|
if os.path.exists(filename) and os.path.getsize(filename) > 0:
|
||
|
jid_details = Toml.open_file_toml(filename)
|
||
|
else:
|
||
|
jid_details = await XmppUtilities.cache_jid_data(
|
||
|
jabber_id, password, jid_bare, alias=alias)
|
||
|
|
||
|
count = jid_details['count']
|
||
|
items = jid_details['items']
|
||
|
jid_info = {
|
||
|
'error' : jid_details['error'],
|
||
|
'text' : jid_details['error_text'],
|
||
|
'condition' : jid_details['error_condition']}
|
||
|
jid_kind = jid_details['kind']
|
||
|
jid_vcard = {
|
||
|
'name' : jid_details['name'],
|
||
|
'note' : jid_details['note'],
|
||
|
'type' : jid_details['image_type']}
|
||
|
messages = jid_details['messages']
|
||
|
nodes = jid_details['nodes']
|
||
|
note = jid_details['note']
|
||
|
subject = jid_details['subject']
|
||
|
title = jid_details['name']
|
||
|
|
||
|
# Group chat messages
|
||
|
# NOTE TODO
|
||
|
page_number = request.query_params.get('page', '')
|
||
|
if page_number:
|
||
|
try:
|
||
|
page_number = int(page_number)
|
||
|
ix = (page_number -1) * 10
|
||
|
except:
|
||
|
ix = 0
|
||
|
page_number = 1
|
||
|
else:
|
||
|
ix = 0
|
||
|
page_number = 1
|
||
|
messages_10 = messages[ix:][:10]
|
||
|
number_of_pages = int(len(messages) / 10)
|
||
|
if number_of_pages < len(messages) / 10: number_of_pages += 1
|
||
|
|
||
|
if jid_kind:
|
||
|
# Action and instance type
|
||
|
action, instance = XmppUtilities.set_action_instance_type(jid_kind)
|
||
|
else: # jid_info['error']
|
||
|
action = 'Contact'
|
||
|
instance = view_href = ''
|
||
|
message = '{}: {} (XEP-0030)'.format(jid_info['text'], jid_info['condition'])
|
||
|
xmpp_uri = jid_bare
|
||
|
|
||
|
# Query URI links
|
||
|
print('Query URI links')
|
||
|
links = XmppUtilities.get_query_uri_links(jid_bare, jid_kind)
|
||
|
link_href = XmppUtilities.get_link_href(jid_bare, jid_kind)
|
||
|
view_href = XmppUtilities.get_view_href(jid_bare, jid_kind)
|
||
|
xmpp_uri = XmppUtilities.get_xmpp_uri(jid_bare, jid_kind)
|
||
|
|
||
|
# Graphic files
|
||
|
filename, filepath, filetype, selection = Graphics.handle_photo(
|
||
|
jid_bare, jid_vcard, link_href)
|
||
|
|
||
|
#except Exception as e:
|
||
|
else:
|
||
|
exception = str(e)
|
||
|
action = 'Error'
|
||
|
title = 'Slixmpp error'
|
||
|
xmpp_uri = note = jid
|
||
|
filename = jid_bare = link_href = link_tex = node_note = \
|
||
|
node_title = number_of_pages = page_number = previous = \
|
||
|
selection = services = services_sorted = url = None
|
||
|
|
||
|
#if title == 'remote-server-timeout':
|
||
|
# raise HTTPException(status_code=408, detail='remote-server-timeout')
|
||
|
#else:
|
||
|
template_file = 'conference.xhtml'
|
||
|
template_dict = {
|
||
|
'action' : action,
|
||
|
'brand_name' : brand_name,
|
||
|
'brand_site' : brand_site,
|
||
|
'chat_client' : chat_client,
|
||
|
'exception' : exception,
|
||
|
'filename' : filename,
|
||
|
'jid_bare' : jid,
|
||
|
'jid_note' : note,
|
||
|
'jid_title' : title,
|
||
|
'links' : links,
|
||
|
'messages' : messages_10,
|
||
|
'node_title' : node_title,
|
||
|
'node_note' : node_note,
|
||
|
'number_of_pages' : number_of_pages,
|
||
|
'page_number' : page_number,
|
||
|
'previous' : previous,
|
||
|
'request' : request,
|
||
|
'selection' : selection,
|
||
|
'subject' : subject,
|
||
|
'title' : title,
|
||
|
'url' : request.url._url,
|
||
|
'xmpp_uri' : xmpp_uri}
|
||
|
response = templates.TemplateResponse(template_file, template_dict)
|
||
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
||
|
return response
|
||
|
|
||
|
@self.app.get('/c/{jid}')
|
||
|
async def c_jid_get(request: Request, jid):
|
||
|
"""Display entries of a vCard4"""
|
||
|
jid_path = urlsplit(jid).path
|
||
|
if parseaddr(jid_path)[1] == jid_path:
|
||
|
jid_bare = jid_path.lower()
|
||
|
else:
|
||
|
jid_bare = jid
|
||
|
note = 'Jabber ID appears to be malformed'
|
||
|
|
||
|
if jid_bare == jabber_id:
|
||
|
raise HTTPException(status_code=403, detail='access-denied')
|
||
|
|
||
|
node_name_vcard4 = 'urn:xmpp:vcard4'
|
||
|
item_id_vcard4 = 'current'
|
||
|
|
||
|
#try:
|
||
|
if True:
|
||
|
entries = []
|
||
|
exception = jid_vcard = note = node_items = node_note = \
|
||
|
number_of_pages = page_number = previous = selection = \
|
||
|
title = None
|
||
|
|
||
|
filename = os.path.join(directory_cache, 'xep_0060', jid_bare, node_name_vcard4, item_id_vcard4 + '.xml')
|
||
|
if os.path.exists(filename) and os.path.getsize(filename) > 0:
|
||
|
xml_data = Toml.open_file_xml(filename)
|
||
|
else:
|
||
|
await XmppUtilities.cache_vcard_data(
|
||
|
jabber_id, password, jid_bare, node_name_vcard4, item_id_vcard4)
|
||
|
|
||
|
xml_data = Toml.open_file_xml(filename)
|
||
|
root_element = xml_data.getroot()
|
||
|
child_element = root_element[0]
|
||
|
#vcard_info = Syndication.extract_vcard_items(child_element)
|
||
|
vcard_info = Syndication.extract_vcard4_items(child_element)
|
||
|
|
||
|
# Action and instance type
|
||
|
action = 'Profile'
|
||
|
|
||
|
filename = os.path.join(directory_cache, 'details', jid_bare + '.toml')
|
||
|
if os.path.exists(filename) and os.path.getsize(filename) > 0:
|
||
|
jid_details = Toml.open_file_toml(filename)
|
||
|
else:
|
||
|
jid_details = await XmppUtilities.cache_jid_data(
|
||
|
jabber_id, password, jid_bare, alias=alias)
|
||
|
|
||
|
# Set node name to 'urn:xmpp:microblog:0'
|
||
|
jid_kind = jid_details['kind']
|
||
|
nodes = jid_details['nodes']
|
||
|
if (jid_kind not in ('conference', 'mix', 'muc') and
|
||
|
'@' in jid_bare and
|
||
|
'urn:xmpp:microblog:0' in nodes):
|
||
|
node_name = 'urn:xmpp:microblog:0'
|
||
|
|
||
|
# Query URI links
|
||
|
print('Query URI links')
|
||
|
jid_kind = 'account'
|
||
|
link_href = XmppUtilities.get_link_href(jid_bare, jid_kind)
|
||
|
xmpp_uri = XmppUtilities.get_xmpp_uri(jid_bare, jid_kind, node_name_vcard4)
|
||
|
links = XmppUtilities.get_query_uri_links(jid_bare, jid_kind, node_name)
|
||
|
|
||
|
# Graphic files
|
||
|
filename, filepath, filetype, selection = Graphics.handle_photo(
|
||
|
jid_bare, jid_vcard, link_href)
|
||
|
|
||
|
#except Exception as e:
|
||
|
else:
|
||
|
exception = str(e)
|
||
|
action = 'Error'
|
||
|
title = 'Slixmpp error'
|
||
|
xmpp_uri = note = jid
|
||
|
filename = jid_bare = link_href = link_tex = node_note = \
|
||
|
node_title = number_of_pages = page_number = previous = \
|
||
|
selection = url = None
|
||
|
|
||
|
if 'fn' in vcard_info and vcard_info['fn']:
|
||
|
title = vcard_info['fn']
|
||
|
elif 'alias' in vcard_info and vcard_info['alias']:
|
||
|
title = vcard_info['alias']
|
||
|
else:
|
||
|
title = jid_bare.split('@')[0]
|
||
|
|
||
|
if 'alias' in vcard_info and vcard_info['alias']:
|
||
|
alias = vcard_info['alias']
|
||
|
else:
|
||
|
alias = jid_bare.split('@')[0]
|
||
|
|
||
|
#if title == 'remote-server-timeout':
|
||
|
# raise HTTPException(status_code=408, detail='remote-server-timeout')
|
||
|
#else:
|
||
|
template_file = 'vcard.xhtml'
|
||
|
template_dict = {
|
||
|
'action' : action,
|
||
|
'alias' : alias,
|
||
|
'brand_name' : brand_name,
|
||
|
'brand_site' : brand_site,
|
||
|
'chat_client' : chat_client,
|
||
|
'entries' : entries,
|
||
|
'exception' : exception,
|
||
|
'filename' : filename,
|
||
|
'jid_bare' : jid,
|
||
|
'jid_note' : note,
|
||
|
#'jid_title' : title,
|
||
|
#'node_title' : node_title,
|
||
|
'links' : links,
|
||
|
'node_name' : node_name_vcard4,
|
||
|
'number_of_pages' : number_of_pages,
|
||
|
'page_number' : page_number,
|
||
|
'previous' : previous,
|
||
|
'request' : request,
|
||
|
'selection' : selection,
|
||
|
'title' : title,
|
||
|
'url' : request.url._url,
|
||
|
'vcard_info' : vcard_info,
|
||
|
'xmpp_uri' : xmpp_uri}
|
||
|
response = templates.TemplateResponse(template_file, template_dict)
|
||
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
||
|
return response
|
||
|
|
||
|
@self.app.get('/b/{jid}')
|
||
|
async def b_jid_get(request: Request, jid):
|
||
|
response = await browse_jid_node_get(request, jid, 'urn:xmpp:microblog:0')
|
||
|
return response
|
||
|
|
||
|
# TODO Change to /p/ for pubsub
|
||
|
@self.app.get('/d/{jid}/{node_name}')
|
||
|
@self.app.get('/d/{jid}/{node_name}/{item_id}')
|
||
|
async def d_jid_node_get(request: Request, jid, node_name, item_id=None):
|
||
|
response = await browse_jid_node_get(request, jid, node_name, item_id)
|
||
|
return response
|
||
|
|
||
|
async def browse_jid_node_get(request: Request, jid, node_name, item_id=None):
|
||
|
"""Browse items of a pubsub node"""
|
||
|
jid_path = urlsplit(jid).path
|
||
|
if parseaddr(jid_path)[1] == jid_path:
|
||
|
jid_bare = jid_path.lower()
|
||
|
else:
|
||
|
jid_bare = jid
|
||
|
note = 'Jabber ID appears to be malformed'
|
||
|
|
||
|
if jid_bare == jabber_id:
|
||
|
raise HTTPException(status_code=403, detail='access-denied')
|
||
|
|
||
|
#try:
|
||
|
if True:
|
||
|
exception = jid_vcard = note = node_items = node_note = \
|
||
|
number_of_pages = page_number = previous = selection = None
|
||
|
|
||
|
filename = os.path.join(directory_cache, 'details', jid_bare + '.toml')
|
||
|
if os.path.exists(filename) and os.path.getsize(filename) > 0:
|
||
|
jid_details = Toml.open_file_toml(filename)
|
||
|
else:
|
||
|
jid_details = await XmppUtilities.cache_jid_data(
|
||
|
jabber_id, password, jid_bare, node_name, item_id)
|
||
|
|
||
|
# Node item IDs
|
||
|
nodes = jid_details['nodes']
|
||
|
#items = jid_details['items']
|
||
|
# for item in items:
|
||
|
# if item[1] == node_name:
|
||
|
# nodes[node_name]['title'] = item[2]
|
||
|
# break
|
||
|
supdirectory = os.path.join(directory_cache, 'xep_0060', jid_bare)
|
||
|
if not os.path.exists(supdirectory): os.mkdir(supdirectory)
|
||
|
directory = os.path.join(directory_cache, 'xep_0060', jid_bare, node_name)
|
||
|
if not os.path.exists(directory):
|
||
|
os.mkdir(directory)
|
||
|
await XmppUtilities.cache_node_data(
|
||
|
jabber_id, password, jid_bare, node_name)
|
||
|
|
||
|
count = jid_details['count']
|
||
|
jid_info = {
|
||
|
'error' : jid_details['error'],
|
||
|
'text' : jid_details['error_text'],
|
||
|
'condition' : jid_details['error_condition']}
|
||
|
jid_kind = jid_details['kind']
|
||
|
jid_vcard = {
|
||
|
'name' : jid_details['name'],
|
||
|
'note' : jid_details['note'],
|
||
|
'type' : jid_details['image_type']}
|
||
|
messages = jid_details['messages']
|
||
|
#node_title = nodes[node_name]['title'] if 'title' in nodes[node_name] else jid_details['name']
|
||
|
node_title = node_name
|
||
|
note = jid_details['note']
|
||
|
#title = nodes[node_name]['title'] if node_name else jid_details['name']
|
||
|
title = jid_details['name']
|
||
|
|
||
|
#link_href = 'xmpp:{}?pubsub;node={};action=subscribe'.format(
|
||
|
# jid_bare, node_name)
|
||
|
#link_text = 'Subscribe'
|
||
|
#xmpp_uri = '{}?;node={}'.format(jid_bare, node_name)
|
||
|
|
||
|
# TODO Support viewing of a single item
|
||
|
# Node items
|
||
|
entries = []
|
||
|
if item_id:
|
||
|
node_items = item_ids_10 = [item_id + '.xml']
|
||
|
else:
|
||
|
node_items = os.listdir(directory)
|
||
|
if 'urn:xmpp:avatar:metadata.xml' in node_items:
|
||
|
node_items.remove('urn:xmpp:avatar:metadata.xml')
|
||
|
page_number = request.query_params.get('page', '')
|
||
|
if page_number:
|
||
|
try:
|
||
|
page_number = int(page_number)
|
||
|
ix = (page_number -1) * 10
|
||
|
except:
|
||
|
ix = 0
|
||
|
page_number = 1
|
||
|
else:
|
||
|
ix = 0
|
||
|
page_number = 1
|
||
|
item_ids_10 = node_items[ix:][:10]
|
||
|
number_of_pages = int(len(node_items) / 10)
|
||
|
if number_of_pages < len(node_items) / 10: number_of_pages += 1
|
||
|
if node_items:
|
||
|
for item in item_ids_10:
|
||
|
filename = os.path.join(directory, item)
|
||
|
xml_data = Toml.open_file_xml(filename)
|
||
|
root_element = xml_data.getroot()
|
||
|
child_element = root_element[0]
|
||
|
entry = Syndication.extract_atom_items(child_element)
|
||
|
if entry:
|
||
|
filename_without_file_extension = item[:len(item)-4]
|
||
|
entry['id'] = filename_without_file_extension
|
||
|
entries.append(entry)
|
||
|
#if len(entries) > 10: break
|
||
|
|
||
|
if jid_kind:
|
||
|
# Action and instance type
|
||
|
action, instance = XmppUtilities.set_action_instance_type(jid_kind, node_name)
|
||
|
else: # jid_info['error']
|
||
|
action = 'Contact'
|
||
|
instance = view_href = ''
|
||
|
message = '{}: {} (XEP-0030)'.format(jid_info['text'], jid_info['condition'])
|
||
|
xmpp_uri = jid_bare
|
||
|
|
||
|
# Query URI links
|
||
|
print('Query URI links')
|
||
|
links = XmppUtilities.get_query_uri_links(jid_bare, jid_kind, node_name, item_id)
|
||
|
link_href = XmppUtilities.get_link_href(jid_bare, jid_kind, node_name)
|
||
|
view_href = XmppUtilities.get_view_href(jid_bare, jid_kind, node_name)
|
||
|
xmpp_uri = XmppUtilities.get_xmpp_uri(jid_bare, jid_kind, node_name)
|
||
|
|
||
|
node_note = xmpp_uri
|
||
|
|
||
|
# Graphic files
|
||
|
filename, filepath, filetype, selection = Graphics.handle_photo(
|
||
|
jid_bare, jid_vcard, link_href)
|
||
|
|
||
|
#except Exception as e:
|
||
|
else:
|
||
|
exception = str(e)
|
||
|
action = 'Error'
|
||
|
title = 'Slixmpp error'
|
||
|
xmpp_uri = note = jid
|
||
|
filename = jid_bare = link_href = link_tex = node_note = \
|
||
|
node_title = number_of_pages = page_number = previous = \
|
||
|
selection = url = None
|
||
|
|
||
|
#if title == 'remote-server-timeout':
|
||
|
# raise HTTPException(status_code=408, detail='remote-server-timeout')
|
||
|
#else:
|
||
|
template_file = 'node.xhtml'
|
||
|
template_dict = {
|
||
|
'action' : action,
|
||
|
'brand_name' : brand_name,
|
||
|
'brand_site' : brand_site,
|
||
|
'chat_client' : chat_client,
|
||
|
'entries' : entries,
|
||
|
'exception' : exception,
|
||
|
'filename' : filename,
|
||
|
'jid_bare' : jid,
|
||
|
'jid_note' : note,
|
||
|
'jid_title' : title,
|
||
|
'links' : links,
|
||
|
'node_title' : node_title,
|
||
|
'node_note' : node_note,
|
||
|
'node_name' : node_name,
|
||
|
'number_of_pages' : number_of_pages,
|
||
|
'page_number' : page_number,
|
||
|
'previous' : previous,
|
||
|
'request' : request,
|
||
|
'selection' : selection,
|
||
|
'title' : node_title,
|
||
|
'url' : request.url._url,
|
||
|
'xmpp_uri' : xmpp_uri}
|
||
|
response = templates.TemplateResponse(template_file, template_dict)
|
||
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
||
|
return response
|
||
|
|
||
|
@self.app.get('/d/{jid}')
|
||
|
async def discover_jid_get(request: Request, jid):
|
||
|
"""View items of a selected service"""
|
||
|
|
||
|
jid_path = urlsplit(jid).path
|
||
|
if parseaddr(jid_path)[1] == jid_path:
|
||
|
jid_bare = jid_path.lower()
|
||
|
else:
|
||
|
jid_bare = jid
|
||
|
note = 'Jabber ID appears to be malformed'
|
||
|
|
||
|
if jid_bare == jabber_id:
|
||
|
raise HTTPException(status_code=403, detail='access-denied')
|
||
|
|
||
|
#try:
|
||
|
if True:
|
||
|
exception = note = selection = services_sorted = None
|
||
|
title = 'Services'
|
||
|
link_href = xmpp_uri = jid_bare
|
||
|
link_text = 'Reload'
|
||
|
|
||
|
# Start an XMPP instance and retrieve information
|
||
|
xmpp_instance = XmppInstance(jabber_id, password, jid_bare)
|
||
|
xmpp_instance.connect()
|
||
|
|
||
|
# JID services
|
||
|
action = 'Discover'
|
||
|
jid_info = await XmppXep0030.get_jid_info(xmpp_instance, jid_bare)
|
||
|
iq = jid_info['iq']
|
||
|
if iq:
|
||
|
jid_kind = jid_info['kind']
|
||
|
iq_disco_info = iq['disco_info']
|
||
|
for identity in iq_disco_info['identities']:
|
||
|
if jid_kind == identity[0] and identity[3]:
|
||
|
note = identity[3]
|
||
|
if not note: note = jid_bare
|
||
|
jid_items = await XmppXep0030.get_jid_items(xmpp_instance, jid_bare)
|
||
|
iq = jid_items['iq']
|
||
|
iq_disco_items = iq['disco_items']
|
||
|
iq_disco_items_items = iq_disco_items['items']
|
||
|
services = {}
|
||
|
#services_sorted = {}
|
||
|
category = 'unsorted'
|
||
|
for item in iq_disco_items_items:
|
||
|
jid_bare = item[0]
|
||
|
if len(iq_disco_items_items) > 20 or jid_kind and jid_kind in ('pubsub'):
|
||
|
identity = sub_jid_info = sub_jid_info_iq = ''
|
||
|
if jid_kind and jid_kind in ('conference', 'mix', 'muc'):
|
||
|
category = 'conference'
|
||
|
if jid_kind and jid_kind in ('pubsub'):
|
||
|
category = 'pubsub'
|
||
|
else:
|
||
|
sub_jid_info = await XmppXep0030.get_jid_info(xmpp_instance, jid_bare)
|
||
|
sub_jid_info_iq = sub_jid_info['iq']
|
||
|
try:
|
||
|
for identity_item in sub_jid_info_iq['disco_info']['identities']:
|
||
|
identity = identity_item
|
||
|
break
|
||
|
if sub_jid_info_iq:
|
||
|
category = identity[0] if (identity, list) and identity[0] else 'other'
|
||
|
except:
|
||
|
identity = None
|
||
|
category = 'unavailable'
|
||
|
|
||
|
sub_jid_kind = sub_jid_info['kind'] if 'kind' in sub_jid_info else None
|
||
|
if category not in services: services[category] = []
|
||
|
|
||
|
services[category].append(
|
||
|
{'identity' : identity,
|
||
|
'info' : sub_jid_info,
|
||
|
'jid' : jid_bare,
|
||
|
'kind' : sub_jid_kind,
|
||
|
'name' : item[2] or item[1] or item[0],
|
||
|
'node' : item[1]})
|
||
|
|
||
|
services_sorted = {k: v for k, v in services.items() if k != 'unavailable'}
|
||
|
if 'unavailable' in services: services_sorted['unavailable'] = services['unavailable']
|
||
|
else:
|
||
|
message = '{}: {} (XEP-0030)'.format(jid_info['condition'], jid_info['text'])
|
||
|
services = services_sorted = None
|
||
|
|
||
|
xmpp_instance.disconnect()
|
||
|
|
||
|
#except Exception as e:
|
||
|
else:
|
||
|
exception = str(e)
|
||
|
action = 'Error'
|
||
|
title = 'Slixmpp error'
|
||
|
xmpp_uri = note = jid
|
||
|
filename = jid_bare = link_href = link_text = selection = services = services_sorted = url = None
|
||
|
|
||
|
#if title == 'remote-server-timeout':
|
||
|
# raise HTTPException(status_code=408, detail='remote-server-timeout')
|
||
|
#else:
|
||
|
template_file = 'disco.xhtml'
|
||
|
template_dict = {
|
||
|
'action' : action,
|
||
|
'filename' : 'default.svg',
|
||
|
'brand_name' : brand_name,
|
||
|
'brand_site' : brand_site,
|
||
|
'chat_client' : chat_client,
|
||
|
'exception' : exception,
|
||
|
'jid_bare' : jid,
|
||
|
'note' : note,
|
||
|
'request' : request,
|
||
|
'services' : services_sorted,
|
||
|
'title' : title,
|
||
|
'url' : request.url._url,
|
||
|
'link_href' : link_href,
|
||
|
'link_text' : link_text,
|
||
|
'xmpp_uri' : xmpp_uri}
|
||
|
response = templates.TemplateResponse(template_file, template_dict)
|
||
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
||
|
return response
|
||
|
|
||
|
@self.app.get('/j/{jid}/{node_name}')
|
||
|
async def jid_node_get(request: Request, jid, node_name):
|
||
|
response = await main_jid_node_get(request, jid, node_name)
|
||
|
return response
|
||
|
|
||
|
@self.app.get('/j/{jid}')
|
||
|
async def jid_get(request: Request, jid):
|
||
|
node_name = request.query_params.get('node', '')
|
||
|
if node_name:
|
||
|
response = RedirectResponse(url='/j/{}/{}'.format(jid, node_name))
|
||
|
else:
|
||
|
response = await main_jid_node_get(request, jid)
|
||
|
return response
|
||
|
|
||
|
async def main_jid_node_get(request: Request, jid, node_name=None):
|
||
|
|
||
|
jid_bare = jid
|
||
|
jid_path = urlsplit(jid).path
|
||
|
if parseaddr(jid_path)[1] == jid_path:
|
||
|
jid_bare = jid_path.lower()
|
||
|
else:
|
||
|
jid_bare = jid
|
||
|
note = 'Jabber ID appears to be malformed'
|
||
|
|
||
|
if jid_bare == jabber_id:
|
||
|
raise HTTPException(status_code=403, detail='access-denied')
|
||
|
|
||
|
#try:
|
||
|
if True:
|
||
|
action = alias = count_item = count_message = exception = \
|
||
|
instance = jid_vcard = jid_info = link_href = message = note = \
|
||
|
selection = title = vcard4 = view_href = xmpp_uri = None
|
||
|
#node_name = 'urn:xmpp:microblog:0'
|
||
|
|
||
|
filename = os.path.join(directory_cache, 'details', jid_bare + '.toml')
|
||
|
if os.path.exists(filename) and os.path.getsize(filename) > 0:
|
||
|
jid_details = Toml.open_file_toml(filename)
|
||
|
else:
|
||
|
jid_details = await XmppUtilities.cache_jid_data(
|
||
|
jabber_id, password, jid_bare, node_name, alias=alias)
|
||
|
|
||
|
# Set node name to 'urn:xmpp:microblog:0'
|
||
|
jid_kind = jid_details['kind']
|
||
|
nodes = jid_details['nodes']
|
||
|
count_message = jid_details['messages']
|
||
|
if (jid_kind not in ('conference', 'mix', 'muc') and
|
||
|
'@' in jid_bare and
|
||
|
not node_name and
|
||
|
'urn:xmpp:microblog:0' in nodes):
|
||
|
node_name = 'urn:xmpp:microblog:0'
|
||
|
|
||
|
items = jid_details['items']
|
||
|
jid_info = {
|
||
|
'error' : jid_details['error'],
|
||
|
'text' : jid_details['error_text'],
|
||
|
'condition' : jid_details['error_condition']}
|
||
|
jid_vcard = {
|
||
|
'name' : jid_details['name'],
|
||
|
'note' : jid_details['note'],
|
||
|
'type' : jid_details['image_type']}
|
||
|
messages = jid_details['messages']
|
||
|
#note = nodes[node_name]['title'] if node_name in nodes else jid_details['note']
|
||
|
#note = jid_details['note']
|
||
|
|
||
|
# vCard4
|
||
|
node_name_vcard4 = 'urn:xmpp:vcard4'
|
||
|
item_id_vcard4 = 'current'
|
||
|
directory = os.path.join(directory_cache, 'xep_0060', jid_bare, node_name_vcard4)
|
||
|
filename = os.path.join(directory, item_id_vcard4 + '.xml')
|
||
|
if os.path.exists(filename) and os.path.getsize(filename) > 0:
|
||
|
xml_data = Toml.open_file_xml(filename)
|
||
|
root_element = xml_data.getroot()
|
||
|
child_element = root_element[0]
|
||
|
#vcard_info = Syndication.extract_vcard_items(child_element)
|
||
|
vcard_info = Syndication.extract_vcard4_items(child_element)
|
||
|
title = vcard_info['fn']
|
||
|
alias = vcard_info['alias']
|
||
|
#note = vcard_info['note']
|
||
|
else:
|
||
|
await XmppUtilities.cache_vcard_data(
|
||
|
jabber_id, password, jid_bare, node_name_vcard4, item_id_vcard4)
|
||
|
|
||
|
if os.path.exists(filename) and os.path.getsize(filename) > 0:
|
||
|
vcard4 = True
|
||
|
|
||
|
# Node item IDs
|
||
|
supdirectory = os.path.join(directory_cache, 'xep_0060', jid_bare)
|
||
|
if not os.path.exists(supdirectory): os.mkdir(supdirectory)
|
||
|
if node_name:
|
||
|
directory = os.path.join(directory_cache, 'xep_0060', jid_bare, node_name)
|
||
|
if not os.path.exists(directory):
|
||
|
os.mkdir(directory)
|
||
|
await XmppUtilities.cache_node_data(
|
||
|
jabber_id, password, jid_bare, node_name)
|
||
|
|
||
|
# JID or node items
|
||
|
if jid_kind in ('mix', 'muc', 'conference', 'server'):
|
||
|
count_item = jid_details['count']
|
||
|
elif jid_kind in ('account', 'pubsub'):
|
||
|
node_items = os.listdir(directory)
|
||
|
if 'urn:xmpp:avatar:metadata.xml' in node_items:
|
||
|
node_items.remove('urn:xmpp:avatar:metadata.xml')
|
||
|
count_item = len(node_items)
|
||
|
|
||
|
# if ('@' in jid_bare and
|
||
|
# 'urn:xmpp:microblog:0' not in nodes and
|
||
|
# jid_kind not in ('conference', 'mix', 'muc')):
|
||
|
# count_item = 0
|
||
|
# else:
|
||
|
# count_item = len(node_items)
|
||
|
|
||
|
if jid_kind == 'pubsub' and node_name:
|
||
|
items = jid_details['items']
|
||
|
for item in items:
|
||
|
if item[1] == node_name:
|
||
|
#nodes[node_name]['title'] = item[2]
|
||
|
title = item[2]
|
||
|
break
|
||
|
if not title: title = node_name
|
||
|
else:
|
||
|
title = jid_details['name']
|
||
|
|
||
|
# TODO Consider also the existence of a node /j/pubsub.movim.eu/i2p
|
||
|
if jid_kind:
|
||
|
# Action and instance type
|
||
|
action, instance = XmppUtilities.set_action_instance_type(jid_kind, node_name)
|
||
|
view_href = XmppUtilities.get_view_href(jid_bare, jid_kind, node_name)
|
||
|
xmpp_uri = XmppUtilities.get_xmpp_uri(jid_bare, jid_kind, node_name)
|
||
|
else: # jid_info['error']
|
||
|
action = 'Contact'
|
||
|
instance = view_href = ''
|
||
|
if jid_info['condition']: message = '{}: {} (XEP-0030)'.format(jid_info['text'], jid_info['condition'])
|
||
|
xmpp_uri = jid_bare
|
||
|
link_href = XmppUtilities.get_link_href(jid_bare, jid_kind, node_name)
|
||
|
|
||
|
# Query URI links
|
||
|
print('Query URI links')
|
||
|
links = XmppUtilities.get_query_uri_links(jid_bare, jid_kind, node_name)
|
||
|
|
||
|
# Graphic files
|
||
|
filename, filepath, filetype, selection = Graphics.handle_photo(
|
||
|
jid_bare, jid_vcard, link_href)
|
||
|
|
||
|
#except Exception as e:
|
||
|
else:
|
||
|
exception = str(e)
|
||
|
print(exception)
|
||
|
action = 'Error'
|
||
|
title = 'Slixmpp error'
|
||
|
xmpp_uri = jid
|
||
|
alias = count_item = count_message = filename = jid_bare = \
|
||
|
jid_vcard = jid_kind = links = message = selection = url = \
|
||
|
vcard4 = None
|
||
|
|
||
|
#note_500 = note[:500]
|
||
|
#note = note_500 + ' …' if note_500 < note else note_500
|
||
|
|
||
|
# NOTE Handling of variables "title" and "note" in case of '/j/{jid}/{node_name}' is confusing.
|
||
|
# TODO Add new keys that are of 'node' and be utilized for nodes, instead of reusing a variable for several roles.
|
||
|
# FIXME If no title be provided to 'node name', use 'node name' itself as title (to be done at XmppUtilities.cache_jid_data).
|
||
|
|
||
|
template_file = 'jid.xhtml'
|
||
|
template_dict = {
|
||
|
'action' : action,
|
||
|
'alias' : alias,
|
||
|
'brand_name' : brand_name,
|
||
|
'brand_site' : brand_site,
|
||
|
'chat_client' : chat_client,
|
||
|
'count_item' : count_item,
|
||
|
'count_message' : count_message,
|
||
|
'instance' : instance,
|
||
|
'exception' : exception,
|
||
|
'filename' : filename,
|
||
|
'jid_bare' : jid_bare,
|
||
|
'jid_kind' : jid_kind,
|
||
|
'links' : links,
|
||
|
'message' : message,
|
||
|
'news_client' : news_client,
|
||
|
'note' : note, # TODO node_note or title of PubSub JID
|
||
|
'request' : request,
|
||
|
'selection' : selection,
|
||
|
'title' : title, # TODO node_title
|
||
|
'url' : request.url._url,
|
||
|
'vcard4' : vcard4,
|
||
|
'view_href' : view_href,
|
||
|
'xmpp_uri' : xmpp_uri}
|
||
|
response = templates.TemplateResponse(template_file, template_dict)
|
||
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
||
|
return response
|
||
|
|
||
|
@self.app.get('/selection')
|
||
|
async def selection_get(request: Request):
|
||
|
filename = os.path.join(directory_data, 'systems.toml')
|
||
|
software = Toml.open_file_toml(filename)['systems']
|
||
|
template_file = 'software.xhtml'
|
||
|
template_dict = {
|
||
|
'brand_name' : brand_name,
|
||
|
'brand_site' : brand_site,
|
||
|
'chat_client' : chat_client,
|
||
|
'request' : request,
|
||
|
'software' : software,
|
||
|
'url' : request.url._url}
|
||
|
response = templates.TemplateResponse(template_file, template_dict)
|
||
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
||
|
return response
|
||
|
|
||
|
#@self.app.get('/download/select')
|
||
|
#async def download_select_get(request, software=None):
|
||
|
|
||
|
@self.app.get('/download/{software}')
|
||
|
async def download_software_get(request: Request, software):
|
||
|
response = await download_get(request, featured=True, software=software)
|
||
|
return response
|
||
|
|
||
|
@self.app.get('/download/{software}/all')
|
||
|
async def download_software_all_get(request: Request, software):
|
||
|
response = await download_get(request, featured=False, software=software)
|
||
|
return response
|
||
|
|
||
|
@self.app.get('/download')
|
||
|
async def download_get(request: Request, featured=True, software=None):
|
||
|
|
||
|
# TODO
|
||
|
# Fearured clients '/download/{software}'
|
||
|
# All clients '/download/{software}/all'
|
||
|
# Select software '/download/select'
|
||
|
|
||
|
skipped = False
|
||
|
|
||
|
if not software:
|
||
|
user_agent = request.headers.get("user-agent")
|
||
|
user_agent_lower = user_agent.lower()
|
||
|
match user_agent_lower:
|
||
|
case _ if 'bsd' in user_agent_lower:
|
||
|
software = 'bsd'
|
||
|
case _ if 'linux' in user_agent_lower:
|
||
|
software = 'linux'
|
||
|
case _ if 'haiku' in user_agent_lower:
|
||
|
software = 'haiku'
|
||
|
case _ if 'android' in user_agent_lower:
|
||
|
software = 'android'
|
||
|
case _ if 'reactos' in user_agent_lower or 'windows' in user_agent_lower:
|
||
|
software = 'windows'
|
||
|
case _ if 'ios' in user_agent_lower or 'macos' in user_agent_lower:
|
||
|
software = 'apple'
|
||
|
|
||
|
name = software.title()
|
||
|
if software == 'bsd': name = 'BSD'
|
||
|
if software == 'posix': name = 'POSIX'
|
||
|
if software == 'ubports': name = 'UBports'
|
||
|
if name.endswith('os'): name = name.replace('os', 'OS')
|
||
|
|
||
|
filename_clients = os.path.join(directory_data, 'clients.toml')
|
||
|
clients = Toml.open_file_toml(filename_clients)
|
||
|
client_selection = []
|
||
|
clients_software = 0
|
||
|
for client in clients:
|
||
|
if software in clients[client]:
|
||
|
clients_software += 1
|
||
|
if featured and 'featured' not in clients[client]['properties']:
|
||
|
skipped = True
|
||
|
continue
|
||
|
client_selected = {
|
||
|
'name' : clients[client]['title'],
|
||
|
'about' : clients[client]['about'],
|
||
|
'href' : clients[client][software],
|
||
|
'iden' : client,
|
||
|
'properties' : clients[client]['properties'],
|
||
|
'resources' : clients[client]['resources'] if 'resources' in clients[client] else ''}
|
||
|
client_selection.append(client_selected)
|
||
|
|
||
|
skipped = False if len(client_selection) == clients_software else True
|
||
|
|
||
|
template_file = 'download.xhtml'
|
||
|
template_dict = {
|
||
|
'brand_name' : brand_name,
|
||
|
'brand_site' : brand_site,
|
||
|
'chat_client' : chat_client,
|
||
|
'client_selection' : client_selection,
|
||
|
'featured' : featured,
|
||
|
'skipped' : skipped,
|
||
|
'request' : request,
|
||
|
'software' : software,
|
||
|
'title' : name,
|
||
|
'url' : request.url._url}
|
||
|
response = templates.TemplateResponse(template_file, template_dict)
|
||
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
||
|
return response
|
||
|
|
||
|
@self.app.exception_handler(403)
|
||
|
def access_denied_exception_handler(request: Request, exc: HTTPException):
|
||
|
action = 'Warning'
|
||
|
title = 'Access Denied'
|
||
|
return result_get(request, action, title)
|
||
|
|
||
|
@self.app.exception_handler(404)
|
||
|
def not_found_exception_handler(request: Request, exc: HTTPException):
|
||
|
action = 'Warning'
|
||
|
title = 'Not Found'
|
||
|
return result_get(request, action, title)
|
||
|
|
||
|
@self.app.exception_handler(500)
|
||
|
def internal_error_exception_handler(request: Request, exc: HTTPException):
|
||
|
action = 'Error'
|
||
|
title = 'Internal Server Error'
|
||
|
return result_get(request, action, title)
|
||
|
|
||
|
@self.app.exception_handler(504)
|
||
|
def time_out_exception_handler(request: Request, exc: HTTPException):
|
||
|
action = 'Warning'
|
||
|
title = 'Time Out'
|
||
|
return result_get(request, action, title)
|
||
|
|
||
|
def result_get(request: Request, action: str, title: str):
|
||
|
template_file = 'result.xhtml'
|
||
|
template_dict = {
|
||
|
'action' : action,
|
||
|
'brand_name' : brand_name,
|
||
|
'brand_site' : brand_site,
|
||
|
'chat_client' : chat_client,
|
||
|
'request' : request,
|
||
|
'title' : title,
|
||
|
'url' : request.url._url}
|
||
|
response = templates.TemplateResponse(template_file, template_dict)
|
||
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
||
|
return response
|
||
|
|
||
|
@self.app.get('/')
|
||
|
async def main_get(request: Request):
|
||
|
jabber_id = request.query_params.get('jid', '')
|
||
|
if jabber_id:
|
||
|
response = RedirectResponse(url='/j/' + jabber_id)
|
||
|
else:
|
||
|
template_file = 'main.xhtml'
|
||
|
template_dict = {
|
||
|
'brand_name' : brand_name,
|
||
|
'brand_site' : brand_site,
|
||
|
'chat_client' : chat_client,
|
||
|
'request' : request,
|
||
|
'url' : request.url._url}
|
||
|
response = templates.TemplateResponse(template_file, template_dict)
|
||
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
||
|
return response
|