10215d7fca
Thank you to Marvin W.
2031 lines
83 KiB
Python
2031 lines
83 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from asyncio import TimeoutError
|
|
from datetime import datetime
|
|
from dateutil import parser
|
|
from email.utils import parseaddr
|
|
from fastapi import FastAPI, Form, HTTPException, Request, Response
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import FileResponse, HTMLResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
import glob
|
|
#import logging
|
|
#from os.path import getsize, exists
|
|
import os
|
|
import qrcode
|
|
import random
|
|
import re
|
|
from slixmpp import ClientXMPP, stanza
|
|
from slixmpp.exceptions import IqError, IqTimeout, PresenceError
|
|
from starlette.responses import RedirectResponse
|
|
#import time
|
|
import tomli_w
|
|
from urllib.parse import urlsplit
|
|
import uvicorn
|
|
import xml.etree.ElementTree as ET
|
|
|
|
try:
|
|
import cv2
|
|
except:
|
|
print('OpenCV (cv2) is required for dynamic background.')
|
|
|
|
try:
|
|
import numpy
|
|
except:
|
|
print('NumPy (numpy) is required for dynamic background.')
|
|
|
|
try:
|
|
import tomllib
|
|
except:
|
|
import tomli as tomllib
|
|
|
|
class XmppInstance(ClientXMPP):
|
|
def __init__(self, jid, password, jid_bare):
|
|
super().__init__(jid, password)
|
|
|
|
self.jid_bare = jid_bare
|
|
|
|
self.register_plugin('xep_0030') # XEP-0030: Service Discovery
|
|
self.register_plugin('xep_0045') # XEP-0045: Multi-User Chat
|
|
self.register_plugin('xep_0054') # XEP-0054: vcard-temp
|
|
self.register_plugin('xep_0060') # XEP-0060: Publish-Subscribe
|
|
|
|
self.add_event_handler("session_start", self.on_session_start)
|
|
|
|
async def on_session_start(self, event):
|
|
self.send_presence()
|
|
#self.disconnect()
|
|
|
|
class HttpInstance:
|
|
def __init__(self, configuration):
|
|
|
|
account = configuration['account']
|
|
jabber_id = account['xmpp']
|
|
password = account['pass']
|
|
alias = account['alias']
|
|
|
|
brand = configuration['brand']
|
|
brand_name = brand['name']
|
|
brand_site = brand['site']
|
|
chat_client = brand['chat']
|
|
news_client = brand['news']
|
|
|
|
self.app = FastAPI()
|
|
templates = Jinja2Templates(directory='xhtml')
|
|
|
|
# TODO
|
|
# 1) Mount at the same mountpoint /img.
|
|
# 2) Image filename to be constant, i.e. /img/photo.png and /img/qr.png.
|
|
self.app.mount('/photo', StaticFiles(directory='photo'), name='photo')
|
|
self.app.mount('/qr', StaticFiles(directory='qr'), name='qr')
|
|
self.app.mount('/css', StaticFiles(directory='css'), name='css')
|
|
self.app.mount('/img', StaticFiles(directory='img'), name='img')
|
|
|
|
# @self.app.get('/favicon.ico', include_in_schema=False)
|
|
# def favicon_get():
|
|
# return FileResponse('graphic/hermes.ico')
|
|
|
|
# @self.app.get('/hermes.svg')
|
|
# def logo_get():
|
|
# return FileResponse('graphic/hermes.svg')
|
|
|
|
@self.app.get('/v/{jid}')
|
|
async def view_jid(request: Request, jid):
|
|
"""View recent messages of a conference"""
|
|
jid_path = urlsplit(jid).path
|
|
if parseaddr(jid_path)[1] == jid_path:
|
|
jid_bare = jid_path.lower()
|
|
else:
|
|
jid_bare = jid
|
|
note = 'Jabber ID appears to be malformed'
|
|
|
|
if jid_bare == jabber_id:
|
|
raise HTTPException(status_code=403, detail='access-denied')
|
|
|
|
#try:
|
|
if True:
|
|
exception = jid_vcard = messages_10 = note = node_title = \
|
|
node_note = number_of_pages = page_number = previous = \
|
|
selection = services_sorted = subject = None
|
|
node_name = 'urn:xmpp:microblog:0'
|
|
link_href = 'xmpp:{}?join'.format(jid_bare)
|
|
link_text = 'Join'
|
|
xmpp_uri = '{}'.format(jid_bare)
|
|
|
|
filename = 'details/{}.toml'.format(jid_bare)
|
|
if os.path.exists(filename) and os.path.getsize(filename) > 0:
|
|
jid_details = Data.open_file_toml(filename)
|
|
else:
|
|
jid_details = await FileUtilities.cache_jid_data(
|
|
jabber_id, password, jid_bare, node_name, alias=alias)
|
|
|
|
count = jid_details['count']
|
|
items = jid_details['items']
|
|
jid_info = {
|
|
'error' : jid_details['error'],
|
|
'text' : jid_details['error_text'],
|
|
'condition' : jid_details['error_condition']}
|
|
jid_kind = jid_details['kind']
|
|
jid_vcard = {
|
|
'name' : jid_details['name'],
|
|
'note' : jid_details['note'],
|
|
'type' : jid_details['image_type']}
|
|
messages = jid_details['messages']
|
|
nodes = jid_details['nodes']
|
|
note = jid_details['note']
|
|
subject = jid_details['subject']
|
|
title = jid_details['name']
|
|
|
|
# Group chat messages
|
|
# NOTE TODO
|
|
page_number = request.query_params.get('page', '')
|
|
if page_number:
|
|
try:
|
|
page_number = int(page_number)
|
|
ix = (page_number -1) * 10
|
|
except:
|
|
ix = 0
|
|
page_number = 1
|
|
else:
|
|
ix = 0
|
|
page_number = 1
|
|
messages_10 = messages[ix:][:10]
|
|
number_of_pages = int(len(messages) / 10)
|
|
if number_of_pages < len(messages) / 10: number_of_pages += 1
|
|
|
|
if jid_kind:
|
|
# Action and instance type
|
|
action, instance = XmppUtilities.set_action_instance_type(jid_kind, node_name)
|
|
else: # jid_info['error']
|
|
action = 'Contact'
|
|
instance = view_href = ''
|
|
message = '{}: {} (XEP-0030)'.format(jid_info['text'], jid_info['condition'])
|
|
xmpp_uri = jid_bare
|
|
|
|
# Query URI links
|
|
print('Query URI links')
|
|
links = XmppUtilities.get_query_uri_links(jid_bare, jid_kind, node_name)
|
|
link_href = XmppUtilities.get_link_href(jid_bare, jid_kind, node_name)
|
|
view_href = XmppUtilities.get_view_href(jid_bare, jid_kind, node_name)
|
|
xmpp_uri = XmppUtilities.get_xmpp_uri(jid_bare, jid_kind, node_name)
|
|
|
|
# Graphic files
|
|
filename, filepath, filetype, selection = FileUtilities.handle_photo(jid_bare, jid_vcard, link_href)
|
|
|
|
#except Exception as e:
|
|
else:
|
|
exception = str(e)
|
|
action = 'Error'
|
|
title = 'Slixmpp error'
|
|
xmpp_uri = note = jid
|
|
filename = jid_bare = link_href = link_tex = node_note = \
|
|
node_title = number_of_pages = page_number = previous = \
|
|
selection = services = services_sorted = url = None
|
|
|
|
#if title == 'remote-server-timeout':
|
|
# raise HTTPException(status_code=408, detail='remote-server-timeout')
|
|
#else:
|
|
template_file = 'conference.xhtml'
|
|
template_dict = {
|
|
'action' : action,
|
|
'brand_name' : brand_name,
|
|
'brand_site' : brand_site,
|
|
'chat_client' : chat_client,
|
|
'exception' : exception,
|
|
'filename' : filename,
|
|
'jid_bare' : jid,
|
|
'jid_note' : note,
|
|
'jid_title' : title,
|
|
'links' : links,
|
|
'messages' : messages_10,
|
|
'node_title' : node_title,
|
|
'node_note' : node_note,
|
|
'node_name' : node_name,
|
|
'number_of_pages' : number_of_pages,
|
|
'page_number' : page_number,
|
|
'previous' : previous,
|
|
'request' : request,
|
|
'subject' : subject,
|
|
'title' : title,
|
|
'url' : request.url._url,
|
|
'xmpp_uri' : xmpp_uri}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/c/{jid}')
|
|
async def c_jid_get(request: Request, jid):
|
|
"""Display entries of a vCard4"""
|
|
jid_path = urlsplit(jid).path
|
|
if parseaddr(jid_path)[1] == jid_path:
|
|
jid_bare = jid_path.lower()
|
|
else:
|
|
jid_bare = jid
|
|
note = 'Jabber ID appears to be malformed'
|
|
|
|
if jid_bare == jabber_id:
|
|
raise HTTPException(status_code=403, detail='access-denied')
|
|
|
|
node_name_vcard4 = 'urn:xmpp:vcard4'
|
|
item_id_vcard4 = 'current'
|
|
|
|
#try:
|
|
if True:
|
|
entries = []
|
|
exception = jid_vcard = note = node_items = node_note = \
|
|
number_of_pages = page_number = previous = selection = \
|
|
title = None
|
|
|
|
directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name_vcard4)
|
|
filename = directory + item_id_vcard4 + '.xml'
|
|
if os.path.exists(filename) and os.path.getsize(filename) > 0:
|
|
jid_details = Data.open_file_xml(filename)
|
|
else:
|
|
await FileUtilities.cache_vcard_data(
|
|
jabber_id, password, jid_bare, node_name_vcard4, item_id_vcard4)
|
|
|
|
xml_data = Data.open_file_xml(filename)
|
|
root_element = xml_data.getroot()
|
|
child_element = root_element[0]
|
|
#vcard_info = Syndication.extract_vcard_items(child_element)
|
|
vcard_info = Syndication.extract_vcard4_items(child_element)
|
|
|
|
# Action and instance type
|
|
action = 'Profile'
|
|
|
|
# Query URI links
|
|
print('Query URI links')
|
|
jid_kind = 'account'
|
|
xmpp_uri = XmppUtilities.get_xmpp_uri(jid_bare, jid_kind, node_name_vcard4)
|
|
|
|
# Graphic files
|
|
#filename, filepath, filetype, selection = FileUtilities.handle_photo(jid_bare, jid_vcard, link_href)
|
|
|
|
#except Exception as e:
|
|
else:
|
|
exception = str(e)
|
|
action = 'Error'
|
|
title = 'Slixmpp error'
|
|
xmpp_uri = note = jid
|
|
filename = jid_bare = link_href = link_tex = node_note = \
|
|
node_title = number_of_pages = page_number = previous = \
|
|
selection = url = None
|
|
|
|
if 'fn' in vcard_info and vcard_info['fn']:
|
|
title = vcard_info['fn']
|
|
elif 'alias' in vcard_info and vcard_info['alias']:
|
|
title = vcard_info['alias']
|
|
else:
|
|
title = jid_bare.split('@')[0]
|
|
|
|
if 'alias' in vcard_info and vcard_info['alias']:
|
|
alias = vcard_info['alias']
|
|
else:
|
|
alias = jid_bare.split('@')[0]
|
|
|
|
#if title == 'remote-server-timeout':
|
|
# raise HTTPException(status_code=408, detail='remote-server-timeout')
|
|
#else:
|
|
template_file = 'vcard.xhtml'
|
|
template_dict = {
|
|
'action' : action,
|
|
'alias' : alias,
|
|
'brand_name' : brand_name,
|
|
'brand_site' : brand_site,
|
|
'chat_client' : chat_client,
|
|
'entries' : entries,
|
|
'exception' : exception,
|
|
#'filename' : filename,
|
|
'jid_bare' : jid,
|
|
'jid_note' : note,
|
|
#'jid_title' : title,
|
|
#'node_title' : node_title,
|
|
'node_name' : node_name_vcard4,
|
|
'number_of_pages' : number_of_pages,
|
|
'page_number' : page_number,
|
|
'previous' : previous,
|
|
'request' : request,
|
|
'title' : title,
|
|
'url' : request.url._url,
|
|
'vcard_info' : vcard_info,
|
|
'xmpp_uri' : xmpp_uri}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/b/{jid}')
|
|
async def b_jid_get(request: Request, jid):
|
|
response = await browse_jid_node_get(request, jid, 'urn:xmpp:microblog:0')
|
|
return response
|
|
|
|
# TODO Change to /p/ for pubsub
|
|
@self.app.get('/d/{jid}/{node_name}')
|
|
@self.app.get('/d/{jid}/{node_name}/{item_id}')
|
|
async def d_jid_node_get(request: Request, jid, node_name, item_id=None):
|
|
response = await browse_jid_node_get(request, jid, node_name, item_id=None)
|
|
return response
|
|
|
|
async def browse_jid_node_get(request: Request, jid, node_name, item_id=None):
|
|
"""Browse items of a pubsub node"""
|
|
jid_path = urlsplit(jid).path
|
|
if parseaddr(jid_path)[1] == jid_path:
|
|
jid_bare = jid_path.lower()
|
|
else:
|
|
jid_bare = jid
|
|
note = 'Jabber ID appears to be malformed'
|
|
|
|
if jid_bare == jabber_id:
|
|
raise HTTPException(status_code=403, detail='access-denied')
|
|
|
|
#try:
|
|
if True:
|
|
exception = jid_vcard = note = node_items = node_note = \
|
|
number_of_pages = page_number = previous = selection = None
|
|
|
|
filename = 'details/{}.toml'.format(jid_bare)
|
|
if os.path.exists(filename) and os.path.getsize(filename) > 0:
|
|
jid_details = Data.open_file_toml(filename)
|
|
else:
|
|
jid_details = await FileUtilities.cache_jid_data(
|
|
jabber_id, password, jid_bare, node_name, item_id)
|
|
|
|
# Node item IDs
|
|
nodes = jid_details['nodes']
|
|
#items = jid_details['items']
|
|
# for item in items:
|
|
# if item[1] == node_name:
|
|
# nodes[node_name]['title'] = item[2]
|
|
# break
|
|
supdirectory = 'xep_0060/{}/'.format(jid_bare)
|
|
if not os.path.exists(supdirectory): os.mkdir(supdirectory)
|
|
directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name)
|
|
if not os.path.exists(directory):
|
|
os.mkdir(directory)
|
|
await FileUtilities.cache_node_data(
|
|
jabber_id, password, jid_bare, node_name)
|
|
|
|
count = jid_details['count']
|
|
jid_info = {
|
|
'error' : jid_details['error'],
|
|
'text' : jid_details['error_text'],
|
|
'condition' : jid_details['error_condition']}
|
|
jid_kind = jid_details['kind']
|
|
jid_vcard = {
|
|
'name' : jid_details['name'],
|
|
'note' : jid_details['note'],
|
|
'type' : jid_details['image_type']}
|
|
messages = jid_details['messages']
|
|
#node_title = nodes[node_name]['title'] if 'title' in nodes[node_name] else jid_details['name']
|
|
node_title = node_name
|
|
note = jid_details['note']
|
|
#title = nodes[node_name]['title'] if node_name else jid_details['name']
|
|
title = jid_details['name']
|
|
|
|
#link_href = 'xmpp:{}?pubsub;node={};action=subscribe'.format(
|
|
# jid_bare, node_name)
|
|
#link_text = 'Subscribe'
|
|
#xmpp_uri = '{}?;node={}'.format(jid_bare, node_name)
|
|
|
|
# Node items
|
|
entries = []
|
|
node_items = os.listdir(directory)
|
|
if 'urn:xmpp:avatar:metadata.xml' in node_items:
|
|
node_items.remove('urn:xmpp:avatar:metadata.xml')
|
|
page_number = request.query_params.get('page', '')
|
|
if page_number:
|
|
try:
|
|
page_number = int(page_number)
|
|
ix = (page_number -1) * 10
|
|
except:
|
|
ix = 0
|
|
page_number = 1
|
|
else:
|
|
ix = 0
|
|
page_number = 1
|
|
item_ids_10 = node_items[ix:][:10]
|
|
number_of_pages = int(len(node_items) / 10)
|
|
if number_of_pages < len(node_items) / 10: number_of_pages += 1
|
|
if node_items:
|
|
for item in item_ids_10:
|
|
filename = directory + item
|
|
xml_data = Data.open_file_xml(filename)
|
|
root_element = xml_data.getroot()
|
|
child_element = root_element[0]
|
|
entry = Syndication.extract_atom_items(child_element)
|
|
if entry:
|
|
filename_without_file_extension = item[:len(item)-4]
|
|
entry['id'] = filename_without_file_extension
|
|
entries.append(entry)
|
|
#if len(entries) > 10: break
|
|
|
|
if jid_kind:
|
|
# Action and instance type
|
|
action, instance = XmppUtilities.set_action_instance_type(jid_kind, node_name)
|
|
else: # jid_info['error']
|
|
action = 'Contact'
|
|
instance = view_href = ''
|
|
message = '{}: {} (XEP-0030)'.format(jid_info['text'], jid_info['condition'])
|
|
xmpp_uri = jid_bare
|
|
|
|
# Query URI links
|
|
print('Query URI links')
|
|
links = XmppUtilities.get_query_uri_links(jid_bare, jid_kind, node_name, item_id)
|
|
link_href = XmppUtilities.get_link_href(jid_bare, jid_kind, node_name)
|
|
view_href = XmppUtilities.get_view_href(jid_bare, jid_kind, node_name)
|
|
xmpp_uri = XmppUtilities.get_xmpp_uri(jid_bare, jid_kind, node_name)
|
|
|
|
node_note = xmpp_uri
|
|
|
|
# Graphic files
|
|
filename, filepath, filetype, selection = FileUtilities.handle_photo(jid_bare, jid_vcard, link_href)
|
|
|
|
#except Exception as e:
|
|
else:
|
|
exception = str(e)
|
|
action = 'Error'
|
|
title = 'Slixmpp error'
|
|
xmpp_uri = note = jid
|
|
filename = jid_bare = link_href = link_tex = node_note = \
|
|
node_title = number_of_pages = page_number = previous = \
|
|
selection = url = None
|
|
|
|
#if title == 'remote-server-timeout':
|
|
# raise HTTPException(status_code=408, detail='remote-server-timeout')
|
|
#else:
|
|
template_file = 'node.xhtml'
|
|
template_dict = {
|
|
'action' : action,
|
|
'brand_name' : brand_name,
|
|
'brand_site' : brand_site,
|
|
'chat_client' : chat_client,
|
|
'entries' : entries,
|
|
'exception' : exception,
|
|
'filename' : filename,
|
|
'jid_bare' : jid,
|
|
'jid_note' : note,
|
|
'jid_title' : title,
|
|
'links' : links,
|
|
'node_title' : node_title,
|
|
'node_note' : node_note,
|
|
'node_name' : node_name,
|
|
'number_of_pages' : number_of_pages,
|
|
'page_number' : page_number,
|
|
'previous' : previous,
|
|
'request' : request,
|
|
'title' : node_title,
|
|
'url' : request.url._url,
|
|
'xmpp_uri' : xmpp_uri}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/d/{jid}')
|
|
async def discover_jid_get(request: Request, jid):
|
|
"""View items of a selected service"""
|
|
|
|
jid_path = urlsplit(jid).path
|
|
if parseaddr(jid_path)[1] == jid_path:
|
|
jid_bare = jid_path.lower()
|
|
else:
|
|
jid_bare = jid
|
|
note = 'Jabber ID appears to be malformed'
|
|
|
|
if jid_bare == jabber_id:
|
|
raise HTTPException(status_code=403, detail='access-denied')
|
|
|
|
#try:
|
|
if True:
|
|
exception = note = selection = services_sorted = None
|
|
title = 'Services'
|
|
link_href = xmpp_uri = jid_bare
|
|
link_text = 'Reload'
|
|
|
|
# Start an XMPP instance and retrieve information
|
|
xmpp_instance = XmppInstance(jabber_id, password, jid_bare)
|
|
xmpp_instance.connect()
|
|
|
|
# JID services
|
|
action = 'Discover'
|
|
jid_info = await XmppXep0030.get_jid_info(xmpp_instance, jid_bare)
|
|
iq = jid_info['iq']
|
|
if iq:
|
|
jid_kind = jid_info['kind']
|
|
iq_disco_info = iq['disco_info']
|
|
for identity in iq_disco_info['identities']:
|
|
if jid_kind == identity[0] and identity[3]:
|
|
note = identity[3]
|
|
if not note: note = jid_bare
|
|
jid_items = await XmppXep0030.get_jid_items(xmpp_instance, jid_bare)
|
|
iq = jid_items['iq']
|
|
iq_disco_items = iq['disco_items']
|
|
iq_disco_items_items = iq_disco_items['items']
|
|
services = {}
|
|
#services_sorted = {}
|
|
category = 'unsorted'
|
|
for item in iq_disco_items_items:
|
|
jid_bare = item[0]
|
|
if len(iq_disco_items_items) > 20 or jid_kind and jid_kind in ('pubsub'):
|
|
identity = sub_jid_info = sub_jid_info_iq = ''
|
|
if jid_kind and jid_kind in ('conference', 'mix', 'muc'):
|
|
category = 'conference'
|
|
if jid_kind and jid_kind in ('pubsub'):
|
|
category = 'pubsub'
|
|
else:
|
|
sub_jid_info = await XmppXep0030.get_jid_info(xmpp_instance, jid_bare)
|
|
sub_jid_info_iq = sub_jid_info['iq']
|
|
try:
|
|
for identity_item in sub_jid_info_iq['disco_info']['identities']:
|
|
identity = identity_item
|
|
break
|
|
if sub_jid_info_iq:
|
|
category = identity[0] if (identity, list) and identity[0] else 'other'
|
|
except:
|
|
identity = None
|
|
category = 'unavailable'
|
|
|
|
sub_jid_kind = sub_jid_info['kind'] if 'kind' in sub_jid_info else None
|
|
if category not in services: services[category] = []
|
|
|
|
services[category].append(
|
|
{'identity' : identity,
|
|
'info' : sub_jid_info,
|
|
'jid' : jid_bare,
|
|
'kind' : sub_jid_kind,
|
|
'name' : item[2] or item[1] or item[0],
|
|
'node' : item[1]})
|
|
|
|
services_sorted = {k: v for k, v in services.items() if k != 'unavailable'}
|
|
if 'unavailable' in services: services_sorted['unavailable'] = services['unavailable']
|
|
else:
|
|
message = '{}: {} (XEP-0030)'.format(jid_info['condition'], jid_info['text'])
|
|
services = services_sorted = None
|
|
|
|
xmpp_instance.disconnect()
|
|
|
|
#except Exception as e:
|
|
else:
|
|
exception = str(e)
|
|
action = 'Error'
|
|
title = 'Slixmpp error'
|
|
xmpp_uri = note = jid
|
|
filename = jid_bare = link_href = link_text = selection = services = services_sorted = url = None
|
|
|
|
#if title == 'remote-server-timeout':
|
|
# raise HTTPException(status_code=408, detail='remote-server-timeout')
|
|
#else:
|
|
template_file = 'disco.xhtml'
|
|
template_dict = {
|
|
'action' : action,
|
|
'filename' : 'default.svg',
|
|
'brand_name' : brand_name,
|
|
'brand_site' : brand_site,
|
|
'chat_client' : chat_client,
|
|
'exception' : exception,
|
|
'jid_bare' : jid,
|
|
'note' : note,
|
|
'request' : request,
|
|
'services' : services_sorted,
|
|
'title' : title,
|
|
'url' : request.url._url,
|
|
'link_href' : link_href,
|
|
'link_text' : link_text,
|
|
'xmpp_uri' : xmpp_uri}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/j/{jid}/{node_name}')
|
|
async def jid_node_get(request: Request, jid, node_name):
|
|
response = await main_jid_node_get(request, jid, node_name)
|
|
return response
|
|
|
|
@self.app.get('/j/{jid}')
|
|
async def jid_get(request: Request, jid):
|
|
node_name = request.query_params.get('node', '')
|
|
if node_name:
|
|
response = RedirectResponse(url='/j/{}/{}'.format(jid, node_name))
|
|
else:
|
|
response = await main_jid_node_get(request, jid)
|
|
return response
|
|
|
|
async def main_jid_node_get(request: Request, jid, node_name=None):
|
|
|
|
jid_bare = jid
|
|
jid_path = urlsplit(jid).path
|
|
if parseaddr(jid_path)[1] == jid_path:
|
|
jid_bare = jid_path.lower()
|
|
else:
|
|
jid_bare = jid
|
|
note = 'Jabber ID appears to be malformed'
|
|
|
|
if jid_bare == jabber_id:
|
|
raise HTTPException(status_code=403, detail='access-denied')
|
|
|
|
#try:
|
|
if True:
|
|
action = alias = count_item = count_message = exception = \
|
|
instance = jid_vcard = jid_info = link_href = message = note = \
|
|
selection = title = vcard4 = view_href = xmpp_uri = None
|
|
#node_name = 'urn:xmpp:microblog:0'
|
|
|
|
filename = 'details/{}.toml'.format(jid_bare)
|
|
if os.path.exists(filename) and os.path.getsize(filename) > 0:
|
|
jid_details = Data.open_file_toml(filename)
|
|
else:
|
|
jid_details = await FileUtilities.cache_jid_data(
|
|
jabber_id, password, jid_bare, node_name, alias=alias)
|
|
|
|
# Set node name to 'urn:xmpp:microblog:0'
|
|
jid_kind = jid_details['kind']
|
|
nodes = jid_details['nodes']
|
|
count_message = jid_details['messages']
|
|
if (jid_kind not in ('conference', 'mix', 'muc') and
|
|
'@' in jid_bare and
|
|
not node_name and
|
|
'urn:xmpp:microblog:0' in nodes):
|
|
node_name = 'urn:xmpp:microblog:0'
|
|
|
|
items = jid_details['items']
|
|
jid_info = {
|
|
'error' : jid_details['error'],
|
|
'text' : jid_details['error_text'],
|
|
'condition' : jid_details['error_condition']}
|
|
jid_vcard = {
|
|
'name' : jid_details['name'],
|
|
'note' : jid_details['note'],
|
|
'type' : jid_details['image_type']}
|
|
messages = jid_details['messages']
|
|
#note = nodes[node_name]['title'] if node_name in nodes else jid_details['note']
|
|
note = jid_details['note']
|
|
|
|
# vCard4
|
|
node_name_vcard4 = 'urn:xmpp:vcard4'
|
|
item_id_vcard4 = 'current'
|
|
directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name_vcard4)
|
|
filename = directory + item_id_vcard4 + '.xml'
|
|
if os.path.exists(filename) and os.path.getsize(filename) > 0:
|
|
xml_data = Data.open_file_xml(filename)
|
|
root_element = xml_data.getroot()
|
|
child_element = root_element[0]
|
|
#vcard_info = Syndication.extract_vcard_items(child_element)
|
|
vcard_info = Syndication.extract_vcard4_items(child_element)
|
|
title = vcard_info['fn']
|
|
alias = vcard_info['alias']
|
|
note = vcard_info['note']
|
|
else:
|
|
await FileUtilities.cache_vcard_data(
|
|
jabber_id, password, jid_bare, node_name_vcard4, item_id_vcard4)
|
|
|
|
if os.path.exists(filename) and os.path.getsize(filename) > 0:
|
|
vcard4 = True
|
|
|
|
# Node item IDs
|
|
supdirectory = 'xep_0060/{}/'.format(jid_bare)
|
|
if not os.path.exists(supdirectory): os.mkdir(supdirectory)
|
|
directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name)
|
|
if not os.path.exists(directory):
|
|
os.mkdir(directory)
|
|
await FileUtilities.cache_node_data(
|
|
jabber_id, password, jid_bare, node_name)
|
|
|
|
# Node items
|
|
entries = []
|
|
node_items = os.listdir(directory)
|
|
if 'urn:xmpp:avatar:metadata.xml' in node_items:
|
|
node_items.remove('urn:xmpp:avatar:metadata.xml')
|
|
count_item = len(node_items)
|
|
|
|
# if ('@' in jid_bare and
|
|
# 'urn:xmpp:microblog:0' not in nodes and
|
|
# jid_kind not in ('conference', 'mix', 'muc')):
|
|
# count_item = 0
|
|
# else:
|
|
# count_item = len(node_items)
|
|
|
|
if jid_kind == 'pubsub' and node_name:
|
|
items = jid_details['items']
|
|
for item in items:
|
|
if item[1] == node_name:
|
|
#nodes[node_name]['title'] = item[2]
|
|
title = item[2]
|
|
break
|
|
if not title: title = node_name
|
|
else:
|
|
title = jid_details['name']
|
|
|
|
# TODO Consider also the existence of a node /j/pubsub.movim.eu/i2p
|
|
if jid_kind:
|
|
# Action and instance type
|
|
action, instance = XmppUtilities.set_action_instance_type(jid_kind, node_name)
|
|
view_href = XmppUtilities.get_view_href(jid_bare, jid_kind, node_name)
|
|
xmpp_uri = XmppUtilities.get_xmpp_uri(jid_bare, jid_kind, node_name)
|
|
else: # jid_info['error']
|
|
action = 'Contact'
|
|
instance = view_href = ''
|
|
if jid_info['condition']: message = '{}: {} (XEP-0030)'.format(jid_info['text'], jid_info['condition'])
|
|
xmpp_uri = jid_bare
|
|
link_href = XmppUtilities.get_link_href(jid_bare, jid_kind, node_name)
|
|
|
|
# Query URI links
|
|
print('Query URI links')
|
|
links = XmppUtilities.get_query_uri_links(jid_bare, jid_kind, node_name)
|
|
|
|
# Graphic files
|
|
filename, filepath, filetype, selection = FileUtilities.handle_photo(
|
|
jid_bare, jid_vcard, link_href)
|
|
|
|
#except Exception as e:
|
|
else:
|
|
exception = str(e)
|
|
print(exception)
|
|
action = 'Error'
|
|
title = 'Slixmpp error'
|
|
xmpp_uri = jid
|
|
alias = count_item = count_message = filename = jid_bare = \
|
|
jid_vcard = jid_kind = links = message = selection = url = \
|
|
vcard4 = None
|
|
|
|
note_500 = note[:500]
|
|
note = note_500 + ' …' if note_500 < note else note_500
|
|
|
|
# NOTE Handling of variables "title" and "note" in case of '/j/{jid}/{node_name}' is confusing.
|
|
# TODO Add new keys that are of 'node' and be utilized for nodes, instead of reusing a variable for several roles.
|
|
# FIXME If no title be provided to 'node name', use 'node name' itself as title (to be done at FileUtilities.cache_jid_data).
|
|
|
|
template_file = 'jid.xhtml'
|
|
template_dict = {
|
|
'action' : action,
|
|
'alias' : alias,
|
|
'brand_name' : brand_name,
|
|
'brand_site' : brand_site,
|
|
'chat_client' : chat_client,
|
|
'count_item' : count_item,
|
|
'count_message' : count_message,
|
|
'instance' : instance,
|
|
'exception' : exception,
|
|
'filename' : filename,
|
|
'jid_bare' : jid_bare,
|
|
'jid_kind' : jid_kind,
|
|
'links' : links,
|
|
'message' : message,
|
|
'news_client' : news_client,
|
|
'note' : note, # TODO node_note or title of PubSub JID
|
|
'request' : request,
|
|
'selection' : selection,
|
|
'title' : title, # TODO node_title
|
|
'url' : request.url._url,
|
|
'vcard4' : vcard4,
|
|
'view_href' : view_href,
|
|
'xmpp_uri' : xmpp_uri}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/selection')
|
|
async def selection_get(request: Request):
|
|
filename = 'systems.toml'
|
|
software = Data.open_file_toml(filename)['systems']
|
|
template_file = 'software.xhtml'
|
|
template_dict = {
|
|
'brand_name' : brand_name,
|
|
'brand_site' : brand_site,
|
|
'chat_client' : chat_client,
|
|
'request' : request,
|
|
'software' : software,
|
|
'url' : request.url._url}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
#@self.app.get('/download/select')
|
|
#async def download_select_get(request, software=None):
|
|
|
|
@self.app.get('/download/{software}')
|
|
async def download_software_get(request: Request, software):
|
|
response = await download_get(request, featured=True, software=software)
|
|
return response
|
|
|
|
@self.app.get('/download/{software}/all')
|
|
async def download_software_all_get(request: Request, software):
|
|
response = await download_get(request, featured=False, software=software)
|
|
return response
|
|
|
|
@self.app.get('/download')
|
|
async def download_get(request: Request, featured=True, software=None):
|
|
|
|
# TODO
|
|
# Fearured clients '/download/{software}'
|
|
# All clients '/download/{software}/all'
|
|
# Select software '/download/select'
|
|
|
|
skipped = False
|
|
|
|
if not software:
|
|
user_agent = request.headers.get("user-agent")
|
|
user_agent_lower = user_agent.lower()
|
|
match user_agent_lower:
|
|
case _ if 'bsd' in user_agent_lower:
|
|
software = 'bsd'
|
|
case _ if 'linux' in user_agent_lower:
|
|
software = 'linux'
|
|
case _ if 'haiku' in user_agent_lower:
|
|
software = 'haiku'
|
|
case _ if 'android' in user_agent_lower:
|
|
software = 'android'
|
|
case _ if 'reactos' in user_agent_lower or 'windows' in user_agent_lower:
|
|
software = 'windows'
|
|
case _ if 'ios' in user_agent_lower or 'macos' in user_agent_lower:
|
|
software = 'apple'
|
|
|
|
name = software.title()
|
|
if software == 'bsd': name = 'BSD'
|
|
if software == 'posix': name = 'POSIX'
|
|
if software == 'ubports': name = 'UBports'
|
|
if name.endswith('os'): name = name.replace('os', 'OS')
|
|
|
|
filename_clients = 'clients.toml'
|
|
clients = Data.open_file_toml(filename_clients)
|
|
client_selection = []
|
|
clients_software = 0
|
|
for client in clients:
|
|
if software in clients[client]:
|
|
clients_software += 1
|
|
if featured and 'featured' not in clients[client]['properties']:
|
|
skipped = True
|
|
continue
|
|
client_selected = {
|
|
'name' : clients[client]['title'],
|
|
'about' : clients[client]['about'],
|
|
'href' : clients[client][software],
|
|
'iden' : client,
|
|
'properties' : clients[client]['properties'],
|
|
'resources' : clients[client]['resources'] if 'resources' in clients[client] else ''}
|
|
client_selection.append(client_selected)
|
|
|
|
skipped = False if len(client_selection) == clients_software else True
|
|
|
|
template_file = 'download.xhtml'
|
|
template_dict = {
|
|
'brand_name' : brand_name,
|
|
'brand_site' : brand_site,
|
|
'chat_client' : chat_client,
|
|
'client_selection' : client_selection,
|
|
'featured' : featured,
|
|
'skipped' : skipped,
|
|
'request' : request,
|
|
'software' : software,
|
|
'title' : name,
|
|
'url' : request.url._url}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.exception_handler(403)
|
|
def access_denied_exception_handler(request: Request, exc: HTTPException):
|
|
action = 'Warning'
|
|
title = 'Access Denied'
|
|
return result_get(request, action, title)
|
|
|
|
@self.app.exception_handler(404)
|
|
def not_found_exception_handler(request: Request, exc: HTTPException):
|
|
action = 'Warning'
|
|
title = 'Not Found'
|
|
return result_get(request, action, title)
|
|
|
|
@self.app.exception_handler(500)
|
|
def internal_error_exception_handler(request: Request, exc: HTTPException):
|
|
action = 'Error'
|
|
title = 'Internal Server Error'
|
|
return result_get(request, action, title)
|
|
|
|
@self.app.exception_handler(504)
|
|
def time_out_exception_handler(request: Request, exc: HTTPException):
|
|
action = 'Warning'
|
|
title = 'Time Out'
|
|
return result_get(request, action, title)
|
|
|
|
def result_get(request: Request, action: str, title: str):
|
|
template_file = 'result.xhtml'
|
|
template_dict = {
|
|
'action' : action,
|
|
'brand_name' : brand_name,
|
|
'brand_site' : brand_site,
|
|
'chat_client' : chat_client,
|
|
'request' : request,
|
|
'title' : title,
|
|
'url' : request.url._url}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/')
|
|
async def main_get(request: Request):
|
|
jabber_id = request.query_params.get('jid', '')
|
|
if jabber_id:
|
|
response = RedirectResponse(url='/j/' + jabber_id)
|
|
else:
|
|
template_file = 'main.xhtml'
|
|
template_dict = {
|
|
'brand_name' : brand_name,
|
|
'brand_site' : brand_site,
|
|
'chat_client' : chat_client,
|
|
'request' : request,
|
|
'url' : request.url._url}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
class Data:
|
|
|
|
def open_file_toml(filename: str) -> dict:
|
|
with open(filename, mode="rb") as fn:
|
|
data = tomllib.load(fn)
|
|
return data
|
|
|
|
def save_to_toml(filename: str, data: dict) -> None:
|
|
with open(filename, 'w') as fn:
|
|
data_as_string = tomli_w.dumps(data)
|
|
fn.write(data_as_string)
|
|
|
|
def open_file_xml(filename: str) -> ET.ElementTree:
|
|
data = ET.parse(filename)
|
|
return data
|
|
|
|
def save_to_file(filename: str, data: str) -> None:
|
|
with open(filename, 'w') as fn:
|
|
fn.write(data)
|
|
|
|
class FileUtilities:
|
|
|
|
async def cache_vcard_data(
|
|
jabber_id, password, jid_bare, node_name_vcard4, item_id_vcard4):
|
|
|
|
# Start an XMPP instance and retrieve information
|
|
xmpp_instance = XmppInstance(jabber_id, password, jid_bare)
|
|
xmpp_instance.connect()
|
|
|
|
vcard4_data = await XmppXep0060.get_node_items(
|
|
xmpp_instance, jid_bare, node_name_vcard4, item_ids=[item_id_vcard4])
|
|
|
|
xmpp_instance.disconnect()
|
|
|
|
if vcard4_data:
|
|
supdirectory = 'xep_0060/{}/'.format(jid_bare)
|
|
if not os.path.exists(supdirectory): os.mkdir(supdirectory)
|
|
directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name_vcard4)
|
|
if not os.path.exists(directory): os.mkdir(directory)
|
|
if isinstance(vcard4_data['iq'], stanza.iq.Iq):
|
|
iq = vcard4_data['iq']
|
|
for item in iq['pubsub']['items']:
|
|
filename = directory + item_id_vcard4 + '.xml'
|
|
xml_item_as_string = str(item)
|
|
Data.save_to_file(filename, xml_item_as_string)
|
|
#item_payload = item['payload']
|
|
#vcard4_info = Syndication.extract_vcard4_items(item_payload)
|
|
|
|
async def cache_node_data(
|
|
jabber_id, password, jid_bare, node_name):
|
|
|
|
# Start an XMPP instance and retrieve information
|
|
xmpp_instance = XmppInstance(jabber_id, password, jid_bare)
|
|
xmpp_instance.connect()
|
|
|
|
node_items = await XmppXep0060.get_node_items(
|
|
xmpp_instance, jid_bare, node_name)
|
|
|
|
xmpp_instance.disconnect()
|
|
|
|
if node_items:
|
|
supdirectory = 'xep_0060/{}/'.format(jid_bare)
|
|
if not os.path.exists(supdirectory): os.mkdir(supdirectory)
|
|
directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name)
|
|
if not os.path.exists(directory): os.mkdir(directory)
|
|
if isinstance(node_items['iq'], stanza.iq.Iq):
|
|
iq = node_items['iq']
|
|
namespace = '{http://www.w3.org/2005/Atom}'
|
|
for item in iq['pubsub']['items']:
|
|
item_payload = item['payload']
|
|
date_element = item_payload.find(namespace + 'updated')
|
|
if not date_element: date_element = item_payload.find(namespace + 'published')
|
|
if isinstance(date_element, ET.Element):
|
|
date = date_element.text
|
|
modification_time = parser.parse(date).timestamp()
|
|
filename = directory + item['id'] + '.xml'
|
|
xml_item_as_string = str(item)
|
|
Data.save_to_file(filename, xml_item_as_string)
|
|
if isinstance(date_element, ET.Element):
|
|
file_statistics = os.stat(filename)
|
|
access_time = file_statistics.st_atime
|
|
os.utime(filename, (access_time, modification_time))
|
|
#item_payload = item['payload']
|
|
#entry = Syndication.extract_atom_items(item_payload)
|
|
|
|
async def cache_jid_data(
|
|
jabber_id, password, jid_bare, node_name=None, item_id=None, alias=None):
|
|
|
|
iq_disco_items_list = iq_disco_items_items_list = node_note = node_title = title = ''
|
|
jid_vcard = {
|
|
'name' : '',
|
|
'note' : '',
|
|
'type' : '',
|
|
'bin' : ''}
|
|
|
|
#filename = 'details/{}.toml'.format(jid_bare)
|
|
#if os.path.exists(filename): jid_details = Data.open_file_toml(filename)
|
|
|
|
# Start an XMPP instance and retrieve information
|
|
xmpp_instance = XmppInstance(jabber_id, password, jid_bare)
|
|
xmpp_instance.connect()
|
|
|
|
# JID kind
|
|
print('JID kind')
|
|
instance = message = node_id = None
|
|
jid_info = await XmppXep0030.get_jid_info(xmpp_instance, jid_bare)
|
|
jid_info_iq = jid_info['iq']
|
|
jid_kind = jid_info['kind']
|
|
|
|
# Set node name to 'urn:xmpp:microblog:0' if JID is an account
|
|
if jid_kind == 'account' and not node_name: node_name = 'urn:xmpp:microblog:0'
|
|
|
|
# vCard4 data
|
|
node_name_vcard4 = 'urn:xmpp:vcard4'
|
|
item_id_vcard4 = 'current'
|
|
vcard4_data = await XmppXep0060.get_node_items(
|
|
xmpp_instance, jid_bare, node_name_vcard4, item_ids=[item_id_vcard4])
|
|
if vcard4_data:
|
|
supdirectory = 'xep_0060/{}/'.format(jid_bare)
|
|
if not os.path.exists(supdirectory): os.mkdir(supdirectory)
|
|
directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name_vcard4)
|
|
if not os.path.exists(directory): os.mkdir(directory)
|
|
if isinstance(vcard4_data['iq'], stanza.iq.Iq):
|
|
iq = vcard4_data['iq']
|
|
for item in iq['pubsub']['items']:
|
|
filename = directory + item_id_vcard4 + '.xml'
|
|
xml_item_as_string = str(item)
|
|
Data.save_to_file(filename, xml_item_as_string)
|
|
#item_payload = item['payload']
|
|
#vcard4_info = Syndication.extract_vcard4_items(item_payload)
|
|
|
|
# JID info
|
|
print('JID info')
|
|
# NOTE Group chat of Psi+ Project at jabber.ru has a note in its vCard.
|
|
vcard_data = await XmppXep0054.get_vcard_data(xmpp_instance, jid_bare)
|
|
if not vcard_data['error']:
|
|
conference_title = None
|
|
if jid_kind in ('mix', 'muc'):
|
|
for identity in jid_info_iq['disco_info']['identities']:
|
|
if identity[3]:
|
|
conference_title = identity[3]
|
|
break
|
|
vcard_temp = vcard_data['iq']['vcard_temp']
|
|
jid_vcard = {
|
|
'name' : vcard_temp['FN'] or conference_title or '',
|
|
'note' : vcard_temp['notes'] or node_id or '',
|
|
'type' : vcard_temp['PHOTO']['TYPE'] or '',
|
|
'bin' : vcard_temp['PHOTO']['BINVAL'] or ''}
|
|
|
|
# TODO /d/pubsub.nicoco.fr/blog/urn-uuid-53e43061-1962-3112-bb8a-1473dca61719
|
|
count = ''
|
|
jid_items = await XmppXep0030.get_jid_items(xmpp_instance, jid_bare)
|
|
if isinstance(jid_items['iq'], stanza.iq.Iq):
|
|
iq = jid_items['iq']
|
|
iq_disco_items = iq['disco_items']
|
|
iq_disco_items_items = iq_disco_items['items']
|
|
#iq_disco_items_set = {''}
|
|
iq_disco_items_list = []
|
|
iq_disco_items_items_list = []
|
|
for item in iq_disco_items_items:
|
|
if jid_kind == 'muc':
|
|
#iq_disco_items_set.update([item[2]])
|
|
iq_disco_items_list.append(item[2])
|
|
else:
|
|
#iq_disco_items_set.update([item[1]])
|
|
iq_disco_items_list.append(item[1])
|
|
iq_disco_items_items_list.append(
|
|
[item[0] or '', item[1] or '', item[2] or ''])
|
|
#count = len(iq_disco_items_set)
|
|
count = len(iq_disco_items_list)
|
|
|
|
# Title
|
|
print('Title')
|
|
if (jid_kind not in ('conference', 'mix', 'muc') and
|
|
'@' in jid_bare and
|
|
not node_name):
|
|
# NOTE Variables node_name and node_title do not appear to be utilized.
|
|
node_name = 'urn:xmpp:microblog:0'
|
|
node_title = 'Journal'
|
|
elif jid_kind == 'pubsub':
|
|
category = 'unsorted'
|
|
for item in iq_disco_items_items:
|
|
if item[2] and item[1] == node_name:
|
|
node_title = item[2]
|
|
break
|
|
else:
|
|
jid_items = None
|
|
|
|
if jid_kind == 'server':
|
|
if jid_info_iq:
|
|
for identity in jid_info_iq['disco_info']['identities']:
|
|
if jid_kind == identity[0] and identity[1] == 'im' and identity[3]:
|
|
title = identity[3]
|
|
print(jid_bare)
|
|
print(identity)
|
|
print(jid_info)
|
|
# String 'undefined' is sourced from JID discuss@conference.conversejs.org
|
|
if not title:
|
|
if jid_vcard['name'] and not 'undefined' in jid_vcard['name']:
|
|
title = jid_vcard['name']
|
|
else:
|
|
title = jid_bare.split('@')[0]
|
|
|
|
# JID item count
|
|
#count = await XmppUtilities.count_jid_items(xmpp_instance, jid_bare, node_name, jid_kind)
|
|
#if jid_kind in ('mix', 'muc', 'conference', 'server'):
|
|
# jid_items = await XmppXep0030.get_jid_items(xmpp_instance, jid_bare)
|
|
# if isinstance(jid_items['iq'], stanza.iq.Iq):
|
|
# iq = jid_items['iq']
|
|
# count = len(iq['disco_items']['items'])
|
|
#elif jid_kind in ('account', 'pubsub'):
|
|
# node_item_ids = await XmppXep0060.get_node_item_ids(xmpp_instance, jid_bare, node_name)
|
|
# if isinstance(node_item_ids, stanza.iq.Iq):
|
|
# count = len(node_item_ids['disco_items']['items'])
|
|
|
|
# Group chat messages
|
|
print('Group chat messages')
|
|
messages = []
|
|
subject = ''
|
|
if jid_kind == 'muc':
|
|
#action = 'Join'
|
|
# TODO Create configurations for group chat preview
|
|
room_info_muc = await XmppXep0045.get_room_information(
|
|
xmpp_instance, jid_bare, alias, maxstanzas=50)
|
|
# NOTE Do not mix error messages with node titles and descriptions etc.
|
|
if isinstance(room_info_muc['iq'], tuple):
|
|
iq = room_info_muc['iq']
|
|
for message in iq[3]:
|
|
messages.append({
|
|
'id' : message['id'],
|
|
'alias' : message['mucnick'],
|
|
'body' : message['body'],
|
|
'timestamp' : message['delay']['stamp'].__str__()})
|
|
messages.reverse()
|
|
subject = iq[1]['subject']
|
|
#title = title or node_name
|
|
if not node_title: node_title = node_name
|
|
node_note = jid_bare
|
|
else:
|
|
message = '{}: {} (XEP-0045)'.format(room_info_muc['condition'], room_info_muc['text'])
|
|
|
|
# Node items
|
|
print('Node items')
|
|
nodes = {}
|
|
#if node_name and node_name in iq_disco_items_set:
|
|
if iq_disco_items_list and node_name and node_name in iq_disco_items_list:
|
|
#action = 'Browse'
|
|
node_item_ids = await XmppXep0060.get_node_item_ids(xmpp_instance, jid_bare, node_name)
|
|
if isinstance(node_item_ids['iq'], stanza.iq.Iq):
|
|
iq = node_item_ids['iq']
|
|
nodes[node_name] = {}
|
|
nodes[node_name]['title'] = node_title
|
|
nodes[node_name]['count'] = len(iq['disco_items']['items'])
|
|
nodes[node_name]['item_ids'] = []
|
|
for item_id in iq['disco_items']['items']:
|
|
nodes[node_name]['item_ids'].append(
|
|
[item_id[0] or '', item_id[1] or '', item_id[2] or ''])
|
|
|
|
item_ids = []
|
|
for item in nodes[node_name]['item_ids']:
|
|
item_ids.append(item[2])
|
|
|
|
node_items = await XmppXep0060.get_node_items(
|
|
xmpp_instance, jid_bare, node_name)
|
|
|
|
if node_items:
|
|
supdirectory = 'xep_0060/{}/'.format(jid_bare)
|
|
if not os.path.exists(supdirectory): os.mkdir(supdirectory)
|
|
directory = 'xep_0060/{}/{}/'.format(jid_bare, node_name)
|
|
if not os.path.exists(directory): os.mkdir(directory)
|
|
if isinstance(node_items['iq'], stanza.iq.Iq):
|
|
iq = node_items['iq']
|
|
namespace = '{http://www.w3.org/2005/Atom}'
|
|
for item in iq['pubsub']['items']:
|
|
item_payload = item['payload']
|
|
date_element = item_payload.find(namespace + 'updated')
|
|
if not date_element: date_element = item_payload.find(namespace + 'published')
|
|
if isinstance(date_element, ET.Element):
|
|
date = date_element.text
|
|
modification_time = parser.parse(date).timestamp()
|
|
filename = directory + item['id'] + '.xml'
|
|
xml_item_as_string = str(item)
|
|
Data.save_to_file(filename, xml_item_as_string)
|
|
if isinstance(date_element, ET.Element):
|
|
file_statistics = os.stat(filename)
|
|
access_time = file_statistics.st_atime
|
|
os.utime(filename, (access_time, modification_time))
|
|
#item_payload = item['payload']
|
|
#entry = Syndication.extract_atom_items(item_payload)
|
|
|
|
xmpp_instance.disconnect()
|
|
|
|
# Notes
|
|
print('Notes')
|
|
jid_vcard_note = jid_vcard['note']
|
|
if isinstance(jid_vcard_note, list) and len(jid_vcard_note):
|
|
note = jid_vcard_note[0]['NOTE']
|
|
else:
|
|
note = jid_vcard_note
|
|
#if not note and jid_vcard['name'] and not 'undefined' in jid_vcard['name'] and title != jid_vcard['name']:
|
|
# note = jid_vcard['name']
|
|
|
|
jid_details = {
|
|
'count' : count or '',
|
|
'error' : jid_info['error'],
|
|
'error_text' : jid_info['text'] or '',
|
|
'error_condition' : jid_info['condition'] or '',
|
|
'image_type' : jid_vcard['type'],
|
|
'items' : iq_disco_items_items_list,
|
|
'kind' : jid_kind or '',
|
|
'messages' : messages or '',
|
|
'name' : title,
|
|
'nodes' : nodes,
|
|
'note' : note or '',
|
|
'subject' : subject or ''}
|
|
|
|
print(jid_details)
|
|
|
|
# Query URI href
|
|
link_href = XmppUtilities.get_link_href(jid_bare, jid_kind, node_name)
|
|
|
|
FileUtilities.handle_photo(jid_bare, jid_vcard, link_href)
|
|
|
|
filename = 'details/{}.toml'.format(jid_bare)
|
|
Data.save_to_toml(filename, jid_details)
|
|
|
|
return jid_details
|
|
|
|
def handle_photo(jid_bare, jid_vcard, link_href):
|
|
filename = filepath = filetype = mimetype = selection = None
|
|
filecirca = 'photo/{}.*'.format(jid_bare)
|
|
filepath_guess = glob.glob(filecirca)
|
|
if filepath_guess:
|
|
filepath = filepath_guess[0]
|
|
filetype = filepath.split('.').pop()
|
|
filename = '{}.{}'.format(jid_bare, filetype)
|
|
elif jid_vcard:
|
|
if jid_vcard['type']:
|
|
mimetype = jid_vcard['type']
|
|
if mimetype:
|
|
filetype = mimetype.split('/')[1]
|
|
if filetype == 'svg+xml': filetype = 'svg'
|
|
filename = '{}.{}'.format(jid_bare, filetype)
|
|
filepath = 'photo/{}.{}'.format(jid_bare, filetype)
|
|
#img.save(filename)
|
|
|
|
# Write the decoded bytes to a file
|
|
with open(filepath, 'wb') as file:
|
|
file.write(jid_vcard['bin'])
|
|
|
|
if not filepath or not os.path.exists(filepath) or os.path.getsize(filepath) == 0:
|
|
filename = 'default.svg'
|
|
elif filetype == 'svg':
|
|
selection = Graphics.extract_colours_from_vector(filepath)
|
|
else:
|
|
selection = Graphics.extract_colours_from_raster(filepath)
|
|
|
|
# QR code
|
|
filepath_qrcode = 'qr/{}.png'.format(jid_bare)
|
|
if not os.path.exists(filepath_qrcode) or os.path.getsize(filepath_qrcode) == 0:
|
|
Graphics.generate_qr_code_graphics_from_string(link_href, jid_bare)
|
|
|
|
return filename, filepath, filetype, selection
|
|
|
|
class Graphics:
|
|
|
|
|
|
def extract_colours_from_raster(filepath):
|
|
try:
|
|
img = cv2.imread(filepath)
|
|
#thresholded = cv2.inRange(img, (50, 100, 200), (50, 100, 200))
|
|
thresholded = cv2.inRange(img, (90, 90, 90), (190, 190, 190))
|
|
#thresholded = cv2.bitwise_not(thresholded)
|
|
#thresholded = cv2.inRange(img, (0, 0, 0), (0, 0, 0))
|
|
#res = img + cv2.cvtColor(thresholded, cv2.COLOR_GRAY2BGR)
|
|
|
|
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
|
|
#result = numpy.clip(img, 90, 190)
|
|
#result = numpy.clip(img, 50, 200)
|
|
#result = numpy.clip(img, 100, 150)
|
|
result = numpy.clip(img, 100, 200)
|
|
res = cv2.cvtColor(result, cv2.COLOR_RGB2BGR)
|
|
|
|
"""
|
|
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
|
|
mask = numpy.all(numpy.logical_and(img >= 90, img <= 190), axis=2)
|
|
result = numpy.where(mask[...,None], img, 255)
|
|
res = cv2.cvtColor(result, cv2.COLOR_RGB2BGR)
|
|
"""
|
|
|
|
"""
|
|
# Thresholding for black:
|
|
lower_black = numpy.array([0, 0, 0])
|
|
upper_black = numpy.array([50, 50, 50]) # Adjust this value for the black range
|
|
black_mask = cv2.inRange(img, lower_black, upper_black)
|
|
|
|
# Thresholding for white:
|
|
lower_white = numpy.array([250, 250, 250])
|
|
upper_white = numpy.array([255, 255, 255])
|
|
white_mask = cv2.inRange(img, lower_white, upper_white)
|
|
|
|
# Combine the masks
|
|
combined_mask = cv2.bitwise_or(black_mask, white_mask)
|
|
|
|
# Invert the combined mask
|
|
inverted_mask = cv2.bitwise_not(combined_mask)
|
|
|
|
# Apply the mask to the original image
|
|
res = cv2.bitwise_and(img, img, mask=inverted_mask)
|
|
"""
|
|
|
|
selection = []
|
|
|
|
ix_1st = random.randint(1, len(res)-1)
|
|
res_ix_1st = res[ix_1st]
|
|
ix_ix_1st = random.randint(1, len(res_ix_1st)-1)
|
|
res_ix_ix_1st = res_ix_1st[ix_ix_1st]
|
|
selection.append(numpy.array(res_ix_ix_1st).tolist())
|
|
|
|
ix_2nd = random.randint(1, len(res)-1)
|
|
res_ix_2nd = res[ix_2nd]
|
|
ix_ix_2nd = random.randint(1, len(res_ix_2nd)-1)
|
|
res_ix_ix_2nd = res_ix_2nd[ix_ix_2nd]
|
|
selection.append(numpy.array(res_ix_ix_2nd).tolist())
|
|
print(selection)
|
|
|
|
except Exception as e:
|
|
selection = None
|
|
exception = str(e)
|
|
print(exception)
|
|
|
|
return selection
|
|
|
|
def extract_colours_from_vector(filepath):
|
|
# Parse the SVG file
|
|
tree = ET.parse(filepath)
|
|
root = tree.getroot()
|
|
|
|
# Set to store unique colours
|
|
colours_hex = set()
|
|
colours_rgb = []
|
|
|
|
# SVG namespace
|
|
namespace = {'svg': 'http://www.w3.org/2000/svg'}
|
|
|
|
# Find all possible elements
|
|
for elem in root.findall('.//svg:circle', namespace) + \
|
|
root.findall('.//svg:ellipse', namespace) + \
|
|
root.findall('.//svg:line', namespace) + \
|
|
root.findall('.//svg:path', namespace) + \
|
|
root.findall('.//svg:polygon', namespace) + \
|
|
root.findall('.//svg:rect', namespace) + \
|
|
root.findall('.//svg:text', namespace):
|
|
|
|
fill = elem.get('fill')
|
|
stroke = elem.get('stroke')
|
|
|
|
# Add colours to the set if they are not None or 'none'
|
|
if fill and fill.startswith('#') and len(fill) > 4 and fill.lower() != 'none':
|
|
colours_hex.add(fill)
|
|
if stroke and stroke.startswith('#') and len(stroke) > 4 and stroke.lower() != 'none':
|
|
colours_hex.add(stroke)
|
|
|
|
for colour in colours_hex:
|
|
hex = colour.lstrip('#')
|
|
rgb = list(int(hex[i:i+2], 16) for i in (0, 2, 4))
|
|
rgb.reverse()
|
|
colours_rgb.append(rgb)
|
|
|
|
selection = []
|
|
if len(colours_rgb) > 1:
|
|
for i in range(2):
|
|
ix = random.randint(0, len(colours_rgb)-1)
|
|
selection.append(colours_rgb[ix])
|
|
del colours_rgb[ix]
|
|
elif len(colours_rgb) == 1:
|
|
selection = [colours_rgb[0], colours_rgb[0]]
|
|
|
|
return selection
|
|
|
|
def generate_qr_code_graphics_from_string(text, jid_bare):
|
|
#qrcode_graphics = qrcode.make(text)
|
|
qr = qrcode.QRCode(border=2, box_size=10)
|
|
qr.add_data(text)
|
|
qrcode_graphics = qr.make_image(fill_color='#333', back_color='#f2f2f2')
|
|
qrcode_graphics.save('qr/{}.png'.format(jid_bare))
|
|
|
|
class Syndication:
|
|
|
|
# def extract_vcard_items(xml_data):
|
|
# namespace = '{urn:ietf:params:xml:ns:vcard-4.0}'
|
|
# title = xml_data.find(namespace + 'title')
|
|
#
|
|
# entry = {'fn' : content_text,
|
|
# 'note' : link_href,
|
|
# 'email' : published_text,
|
|
# 'impp' : summary_text,
|
|
# 'url' : tags}
|
|
# return entry
|
|
|
|
def extract_vcard_items(xml_data):
|
|
"""Extracts all items from a vCard XML ElementTree.
|
|
|
|
Args:
|
|
xml_data (ElementTree): The vCard XML as an ElementTree object.
|
|
|
|
Returns:
|
|
dict: A dictionary where keys are item names and values are their text content.
|
|
"""
|
|
|
|
items = {}
|
|
for item in xml_data.iter():
|
|
# Skip the root element (vcard)
|
|
if item.tag == '{urn:ietf:params:xml:ns:vcard-4.0}vcard':
|
|
continue
|
|
|
|
# Extract item name and text content
|
|
item_name = item.tag.split('}')[1]
|
|
|
|
# Check for any direct text content or child elements
|
|
item_text = []
|
|
if item.text:
|
|
item_text.append(item.text)
|
|
for child in item:
|
|
if child.text:
|
|
item_text.append(child.text)
|
|
|
|
# Join text elements if multiple found
|
|
if item_text:
|
|
items[item_name] = ' '.join(item_text).strip() # Strip extra spaces
|
|
else:
|
|
items[item_name] = None
|
|
|
|
return items
|
|
|
|
def extract_vcard4_items(xml_data):
|
|
namespace = '{urn:ietf:params:xml:ns:vcard-4.0}'
|
|
vcard = {}
|
|
|
|
element_em = xml_data.find(namespace + 'email')
|
|
element_fn = xml_data.find(namespace + 'fn')
|
|
element_nn = xml_data.find(namespace + 'nickname')
|
|
element_nt = xml_data.find(namespace + 'note')
|
|
element_og = xml_data.find(namespace + 'org')
|
|
element_im = xml_data.find(namespace + 'impp')
|
|
element_ul = xml_data.find(namespace + 'url')
|
|
|
|
if isinstance(element_em, ET.Element):
|
|
for i in element_em:
|
|
text = i.text
|
|
if text:
|
|
email = text
|
|
break
|
|
else:
|
|
email = ''
|
|
else:
|
|
email = ''
|
|
if isinstance(element_fn, ET.Element):
|
|
for i in element_fn:
|
|
text = i.text
|
|
if text:
|
|
title = text
|
|
break
|
|
else:
|
|
title = ''
|
|
else:
|
|
title = ''
|
|
if isinstance(element_nn, ET.Element):
|
|
for i in element_nn:
|
|
text = i.text
|
|
if text:
|
|
alias = text
|
|
break
|
|
else:
|
|
alias = ''
|
|
else:
|
|
alias = ''
|
|
if isinstance(element_nt, ET.Element):
|
|
for i in element_nt:
|
|
text = i.text
|
|
if text:
|
|
note = text
|
|
break
|
|
else:
|
|
note = ''
|
|
else:
|
|
note = ''
|
|
if isinstance(element_og, ET.Element):
|
|
for i in element_og:
|
|
text = i.text
|
|
if text:
|
|
org = text
|
|
break
|
|
else:
|
|
org = ''
|
|
else:
|
|
org = ''
|
|
if isinstance(element_im, ET.Element):
|
|
for i in element_im:
|
|
text = i.text
|
|
if text:
|
|
impp = text
|
|
break
|
|
else:
|
|
impp = ''
|
|
else:
|
|
impp = ''
|
|
if isinstance(element_ul, ET.Element):
|
|
for i in element_ul:
|
|
text = i.text
|
|
if text:
|
|
url = text
|
|
break
|
|
else:
|
|
url = ''
|
|
else:
|
|
url = ''
|
|
|
|
extra_resources = {
|
|
'code' : [],
|
|
'gallery' : [],
|
|
'journal' : [],
|
|
'movim' : [],
|
|
'peertube' : [],
|
|
}
|
|
for res in extra_resources:
|
|
#for element in xml_data.findall(namespace + 'group[contains(@name, "{}")]'.format(res)):
|
|
#for element in xml_data.findall(namespace + 'group[strats-with(@name, "{}")]'.format(res)):
|
|
matching_elements = [group for group in xml_data.findall(namespace + "group") if res in group.get('name', '')]
|
|
for element in matching_elements:
|
|
for i in element.find(namespace + 'x-ablabel'):
|
|
txt = i.text
|
|
for i in element.find(namespace + 'url'):
|
|
uri = i.text
|
|
extra_resources[res].append({'label' : txt, 'uri' : uri})
|
|
vcard[res] = extra_resources[res]
|
|
|
|
vcard['alias'] = alias
|
|
vcard['email'] = email
|
|
vcard['fn'] = title
|
|
vcard['note'] = note
|
|
vcard['org'] = org
|
|
vcard['impp'] = impp
|
|
vcard['url'] = url
|
|
return vcard
|
|
|
|
|
|
def extract_atom_items(xml_data, limit=False):
|
|
# NOTE
|
|
# `.//` was not needded when node item payload was passed directly.
|
|
# Now that item is saved as xml, it is required to use `.//`.
|
|
# Perhaps navigating a level down (i.e. to "child"), or removing the root from the file would solve this.
|
|
#namespace = './/{http://www.w3.org/2005/Atom}'
|
|
namespace = '{http://www.w3.org/2005/Atom}'
|
|
title = xml_data.find(namespace + 'title')
|
|
links = xml_data.find(namespace + 'link')
|
|
if (not isinstance(title, ET.Element) and
|
|
not isinstance(links, ET.Element)): return None
|
|
title_text = '' if title == None else title.text
|
|
link_href = ''
|
|
if isinstance(links, ET.Element):
|
|
for link in xml_data.findall(namespace + 'link'):
|
|
link_href = link.attrib['href'] if 'href' in link.attrib else ''
|
|
if link_href: break
|
|
contents = xml_data.find(namespace + 'content')
|
|
content_text = ''
|
|
if isinstance(contents, ET.Element):
|
|
for content in xml_data.findall(namespace + 'content'):
|
|
content_text = content.text or ''
|
|
if content_text: break
|
|
summaries = xml_data.find(namespace + 'summary')
|
|
summary_text = ''
|
|
if isinstance(summaries, ET.Element):
|
|
for summary in xml_data.findall(namespace + 'summary'):
|
|
summary_text = summary.text or ''
|
|
if summary_text: break
|
|
published = xml_data.find(namespace + 'published')
|
|
published_text = '' if published == None else published.text
|
|
categories = xml_data.find(namespace + 'category')
|
|
tags = []
|
|
if isinstance(categories, ET.Element):
|
|
for category in xml_data.findall(namespace + 'category'):
|
|
if 'term' in category.attrib and category.attrib['term']:
|
|
category_term = category.attrib['term']
|
|
if len(category_term) < 20:
|
|
tags.append(category_term)
|
|
elif len(category_term) < 50:
|
|
tags.append(category_term)
|
|
if limit and len(tags) > 4: break
|
|
|
|
|
|
identifier = xml_data.find(namespace + 'id')
|
|
if identifier and identifier.attrib: print(identifier.attrib)
|
|
identifier_text = '' if identifier == None else identifier.text
|
|
|
|
instances = '' # TODO Check the Blasta database for instances.
|
|
|
|
entry = {'content' : content_text,
|
|
'href' : link_href,
|
|
'published' : published_text,
|
|
'summary' : summary_text,
|
|
'tags' : tags,
|
|
'title' : title_text,
|
|
'updated' : published_text} # TODO "Updated" is missing
|
|
return entry
|
|
|
|
class XmppUtilities:
|
|
|
|
def set_action_instance_type(jid_kind, node_name=None):
|
|
if jid_kind in ('conference', 'server'):
|
|
action = 'Discover'
|
|
if jid_kind == 'conference':
|
|
instance = 'conferences'
|
|
elif jid_kind == 'server':
|
|
instance = 'services'
|
|
elif jid_kind in ('mix', 'muc'):
|
|
action = 'Join'
|
|
instance = 'occupants'
|
|
elif jid_kind == 'pubsub':
|
|
if node_name:
|
|
action = 'Subscribe'
|
|
instance = 'articles'
|
|
else:
|
|
action = 'Browse'
|
|
instance = 'nodes'
|
|
elif jid_kind == 'account':
|
|
action = 'Message'
|
|
instance = 'articles'
|
|
else: # jid_info['error']
|
|
action = 'Contact'
|
|
return action, instance
|
|
|
|
def get_link_href(jid_bare, jid_kind, node_name=None):
|
|
if jid_kind in ('conference', 'server'):
|
|
link_href = 'xmpp:{}?disco;type=get;request=items'.format(jid_bare)
|
|
elif jid_kind in ('mix', 'muc'):
|
|
link_href = 'xmpp:{}?join'.format(jid_bare)
|
|
elif jid_kind == 'pubsub':
|
|
if node_name:
|
|
link_href = 'xmpp:{}?pubsub;node={};action=subscribe'.format(jid_bare, node_name)
|
|
else:
|
|
link_href = 'xmpp:{}?disco;type=get;request=items'.format(jid_bare)
|
|
elif jid_kind == 'account':
|
|
link_href = 'xmpp:{}?message'.format(jid_bare)
|
|
else: # jid_info['error']
|
|
link_href = 'xmpp:{}'.format(jid_bare)
|
|
return link_href
|
|
|
|
def get_view_href(jid_bare, jid_kind, node_name=None):
|
|
links = []
|
|
view_href = None
|
|
if jid_kind in ('conference', 'server'):
|
|
view_href = '/d/' + jid_bare
|
|
elif jid_kind in ('mix', 'muc'):
|
|
view_href = '/v/' + jid_bare
|
|
elif jid_kind == 'pubsub':
|
|
if node_name:
|
|
view_href = '/d/{}/{}'.format(jid_bare, node_name)
|
|
else:
|
|
view_href = '/d/' + jid_bare
|
|
elif jid_kind == 'account':
|
|
view_href = '/d/{}/{}'.format(jid_bare, node_name)
|
|
return view_href
|
|
|
|
def get_xmpp_uri(jid_bare, jid_kind, node_name=None):
|
|
links = []
|
|
view_href = None
|
|
xmpp_uri = jid_bare
|
|
if jid_kind in ('conference', 'server'):
|
|
xmpp_uri = jid_bare
|
|
elif jid_kind in ('mix', 'muc'):
|
|
xmpp_uri = jid_bare
|
|
elif jid_kind == 'pubsub':
|
|
if node_name:
|
|
xmpp_uri = '{}?;node={}'.format(jid_bare, node_name)
|
|
else:
|
|
xmpp_uri = jid_bare
|
|
elif jid_kind == 'account':
|
|
xmpp_uri = jid_bare
|
|
return xmpp_uri
|
|
|
|
def get_query_uri_links(jid_bare, jid_kind, node_name=None, item_id=None):
|
|
links = []
|
|
if jid_kind in ('conference', 'server'):
|
|
links.append({'name' : 'Discover',
|
|
'href' : 'xmpp:{}?disco;type=get;request=items'.format(jid_bare),
|
|
'iden' : 'discover'})
|
|
xmpp_uri = jid_bare
|
|
elif jid_kind in ('mix', 'muc'):
|
|
links.append({'name' : 'Join',
|
|
'href' : 'xmpp:{}?join'.format(jid_bare),
|
|
'iden' : 'join'})
|
|
elif jid_kind == 'pubsub':
|
|
links.append({'name' : 'Browse',
|
|
'href' : 'xmpp:{}?disco;type=get;request=items'.format(jid_bare),
|
|
'iden' : 'browse'})
|
|
elif jid_kind == 'account':
|
|
links.append({'name' : 'Message',
|
|
'href' : 'xmpp:{}?message'.format(jid_bare),
|
|
'iden' : 'message'})
|
|
links.append({'name' : 'Add',
|
|
'href' : 'xmpp:{}?roster'.format(jid_bare),
|
|
'iden' : 'add'})
|
|
else: # jid_info['error']
|
|
links.append({'name' : 'Connect',
|
|
'href' : 'xmpp:{}'.format(jid_bare),
|
|
'iden' : 'connect'})
|
|
links.append({'name' : 'Add',
|
|
'href' : 'xmpp:{}?roster'.format(jid_bare),
|
|
'iden' : 'add'})
|
|
if item_id:
|
|
links.append({'name' : 'Subscribe',
|
|
'href' : 'xmpp:{}?pubsub;node={};item={};action=subscribe'.format(jid_bare, node_name, item_id),
|
|
'iden' : 'subscribe'})
|
|
links.append({'name' : 'View',
|
|
'href' : 'xmpp:{}?pubsub;node={};item={}'.format(jid_bare, node_name, item_id),
|
|
'iden' : 'view'})
|
|
elif node_name:
|
|
links.append({'name' : 'Subscribe',
|
|
'href' : 'xmpp:{}?pubsub;node={};action=subscribe'.format(jid_bare, node_name),
|
|
'iden' : 'subscribe'})
|
|
links.append({'name' : 'View',
|
|
'href' : 'xmpp:{}?pubsub;node={}'.format(jid_bare, node_name),
|
|
'iden' : 'view'})
|
|
links.append({'name' : 'vCard',
|
|
'href' : 'xmpp:{}?vcard'.format(jid_bare),
|
|
'iden' : 'vcard'})
|
|
return links
|
|
|
|
class XmppXep0030:
|
|
|
|
async def get_jid_items(self, jid_bare):
|
|
try:
|
|
condition = text = ''
|
|
error = False
|
|
iq = await self['xep_0030'].get_items(jid=jid_bare)
|
|
except (IqError, IqTimeout) as e:
|
|
#logger.warning('Chat type could not be determined for {}'.format(jid_bare))
|
|
#logger.error(e)
|
|
iq = ''
|
|
error = True
|
|
condition = e.iq['error']['condition']
|
|
text = e.iq['error']['text'] or 'Error'
|
|
#if not text:
|
|
# # NOTE We might want to set a specific photo for condition remote-server-not-found
|
|
# if condition:
|
|
# text = 'Could not determine JID type'
|
|
# else:
|
|
# text = 'Unknown Error'
|
|
result = {
|
|
'condition' : condition,
|
|
'error' : error,
|
|
'iq' : iq,
|
|
'text' : text}
|
|
return result
|
|
|
|
# NOTE
|
|
# Feature "urn:xmpp:mucsub:0" is present in both, MUC local and MUC hostname
|
|
# Feature "urn:xmpp:serverinfo:0" is present in both, MUC hostname and main hostname
|
|
async def get_jid_info(self, jid_bare):
|
|
jid_kind = None
|
|
try:
|
|
error = False
|
|
condition = text = ''
|
|
iq = await self['xep_0030'].get_info(jid=jid_bare)
|
|
iq_disco_info = iq['disco_info']
|
|
if iq_disco_info:
|
|
features = iq_disco_info['features']
|
|
if 'http://jabber.org/protocol/muc#unique' in features:
|
|
jid_kind = 'conference'
|
|
elif 'urn:xmpp:mix:core:1' in features:
|
|
jid_kind = 'mix'
|
|
elif ('muc_moderated' in features or
|
|
'muc_open' in features or
|
|
'muc_persistent' in features or
|
|
'muc_public' in features or
|
|
'muc_semianonymous' in features or
|
|
'muc_unmoderated' in features or
|
|
'muc_unsecured' in features):
|
|
jid_kind = 'muc'
|
|
elif '@' in jid_bare:
|
|
for identity in iq_disco_info['identities']:
|
|
#if identity[0] == 'pubsub' and identity[1] == 'pep':
|
|
if identity[0] == 'account':
|
|
#if 'urn:xmpp:bookmarks:1#compat-pep' in features:
|
|
#if 'urn:xmpp:bookmarks:1#compat' in features:
|
|
#if 'urn:xmpp:push:0' in features:
|
|
#if 'urn:xmpp:pep-vcard-conversion:0' in features:
|
|
#if 'urn:xmpp:sid:0' in features:
|
|
|
|
# Also in MIX
|
|
#if 'urn:xmpp:mam:2' in features:
|
|
#if 'urn:xmpp:mam:2#extended' in features:
|
|
jid_kind = 'account'
|
|
break
|
|
if identity[0] == 'client' and identity[1] == 'bot':
|
|
jid_kind = 'bot'
|
|
else:
|
|
for identity in iq_disco_info['identities']:
|
|
if identity[0] == 'pubsub' and identity[1] == 'service':
|
|
#if 'http://jabber.org/protocol/pubsub' in features:
|
|
#if 'http://jabber.org/protocol/pubsub#access-authorize' in features:
|
|
#if 'http://jabber.org/protocol/rsm' in features:
|
|
jid_kind = 'pubsub'
|
|
break
|
|
if identity[0] == 'server' and identity[1] == 'im':
|
|
jid_kind = 'server'
|
|
break
|
|
#logger.info('Jabber ID: {}\n'
|
|
# 'Chat Type: {}'.format(jid_bare, result))
|
|
else:
|
|
iq = condition = text = ''
|
|
except (IqError, IqTimeout) as e:
|
|
#logger.warning('Chat type could not be determined for {}'.format(jid_bare))
|
|
#logger.error(e)
|
|
iq = ''
|
|
error = True
|
|
condition = e.iq['error']['condition']
|
|
text = e.iq['error']['text'] or 'Error'
|
|
#if not text:
|
|
# # NOTE We might want to set a specific photo for condition remote-server-not-found
|
|
# if condition:
|
|
# text = 'Could not determine JID type'
|
|
# else:
|
|
# text = 'Unknown Error'
|
|
result = {
|
|
'condition' : condition,
|
|
'error' : error,
|
|
'iq' : iq,
|
|
'text' : text,
|
|
'kind' : jid_kind}
|
|
return result
|
|
|
|
|
|
class XmppXep0045:
|
|
|
|
async def get_room_information(self, jid, alias, maxchars=None, maxstanzas=None, seconds=None):
|
|
#logger.info('Joining groupchat\nJID : {}\n'.format(jid))
|
|
#jid_from = str(self.boundjid) if self.is_component else None
|
|
if not maxchars: maxchars = 1000
|
|
if not maxstanzas: maxstanzas = 50
|
|
if not seconds: seconds = 864000
|
|
try:
|
|
error = False
|
|
condition = text = ''
|
|
#since = datetime.fromtimestamp(time.time()-seconds)
|
|
iq = await self['xep_0045'].join_muc_wait(
|
|
jid,
|
|
alias,
|
|
#maxchars=maxchars,
|
|
maxstanzas=maxstanzas,
|
|
#password=None,
|
|
#presence_options = {"pfrom" : jid_from},
|
|
#seconds=seconds,
|
|
#since=since,
|
|
timeout=10
|
|
)
|
|
except TimeoutError as e:
|
|
#raise HTTPException(status_code=504, detail='request-timeout-reached')
|
|
error = True
|
|
iq = e
|
|
condition = 'Request timeout reached'
|
|
text = str(e)
|
|
except (IqError, IqTimeout, PresenceError) as e:
|
|
error = True
|
|
iq = e
|
|
condition = e.iq['error']['condition']
|
|
text = e.iq['error']['text']
|
|
result = {
|
|
'error' : error,
|
|
'condition' : condition,
|
|
'text' : text,
|
|
'iq' : iq}
|
|
return result
|
|
|
|
async def get_room_data(self, jid_bare):
|
|
return await self['xep_0045'].get_room_config(jid_bare)
|
|
|
|
async def get_room_participants(self, jid_bare):
|
|
return await self['xep_0045'].get_roster(jid_bare)
|
|
|
|
|
|
# NOTE: "Item not found", yet is a group chat
|
|
# That is, JID has no vcard
|
|
# messaging-off@conference.movim.eu
|
|
|
|
class XmppXep0054:
|
|
|
|
async def get_vcard_data(self, jid_bare):
|
|
try:
|
|
error = False
|
|
condition = text = ''
|
|
iq = await self['xep_0054'].get_vcard(jid_bare)
|
|
except (IqError, IqTimeout) as e:
|
|
error = True
|
|
condition = e.iq['error']['condition']
|
|
text = e.iq['error']['text']
|
|
if not text:
|
|
if condition:
|
|
text = 'Could not retrieve vCard'
|
|
else:
|
|
text = 'Unknown Error'
|
|
iq = ''
|
|
result = {
|
|
'error' : error,
|
|
'condition' : condition,
|
|
'text' : text,
|
|
'iq' : iq}
|
|
return result
|
|
|
|
class XmppXep0060:
|
|
|
|
async def get_node_items(self, jid_bare, node_name, item_ids=None, max_items=None):
|
|
try:
|
|
error = False
|
|
condition = text = ''
|
|
if max_items:
|
|
iq = await self['xep_0060'].get_items(
|
|
jid_bare, node_name, timeout=5)
|
|
it = self['xep_0060'].get_items(
|
|
jid_bare, node_name, timeout=5, max_items=max_items, iterator=True)
|
|
q = rsm.Iq()
|
|
q['to'] = jid_bare
|
|
q['disco_items']['node'] = node_name
|
|
async for item in rsm.ResultIterator(q, 'disco_items', '10'):
|
|
print(item['disco_items']['items'])
|
|
|
|
else:
|
|
iq = await self['xep_0060'].get_items(
|
|
jid_bare, node_name, timeout=5, item_ids=item_ids)
|
|
result = iq
|
|
except (IqError, IqTimeout) as e:
|
|
error = True
|
|
iq = ''
|
|
condition = e.iq['error']['condition']
|
|
text = e.iq['error']['text']
|
|
if not text:
|
|
if condition:
|
|
text = 'Could not retrieve node items'
|
|
else:
|
|
text = 'Unknown Error'
|
|
result = {
|
|
'error' : error,
|
|
'condition' : condition,
|
|
'text' : text,
|
|
'iq' : iq}
|
|
return result
|
|
|
|
async def get_node_item_ids(self, jid_bare, node_name):
|
|
try:
|
|
error = False
|
|
condition = text = ''
|
|
iq = await self['xep_0030'].get_items(
|
|
jid_bare, node_name)
|
|
# Broken. See https://codeberg.org/poezio/slixmpp/issues/3548
|
|
#iq = await self['xep_0060'].get_item_ids(
|
|
# jid_bare, node_name, timeout=5)
|
|
except (IqError, IqTimeout) as e:
|
|
error = True
|
|
iq = ''
|
|
condition = e.iq['error']['condition']
|
|
text = e.iq['error']['text']
|
|
if not text:
|
|
if condition:
|
|
text = 'Could not retrieve node item IDs'
|
|
else:
|
|
text = 'Unknown Error'
|
|
result = {
|
|
'error' : error,
|
|
'condition' : condition,
|
|
'text' : text,
|
|
'iq' : iq}
|
|
return result
|
|
|
|
class XmppXep0369:
|
|
|
|
async def get_room_data(self, jid_bare):
|
|
return await self['xep_0369'].get_channel_info(jid_bare)
|
|
|
|
|
|
def main():
|
|
filename_configuration = 'configuration.toml'
|
|
configuration = Data.open_file_toml(filename_configuration)
|
|
|
|
http_instance = HttpInstance(configuration)
|
|
return http_instance.app
|
|
|
|
app = main()
|
|
|
|
# FIXME
|
|
if __name__ == '__main__':
|
|
uvicorn.run(app, host='127.0.0.1', port=8000, reload=True)
|