JabberCard/fasi.py

1410 lines
62 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from datetime import datetime
from email.utils import parseaddr
from fastapi import FastAPI, Form, HTTPException, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
#import logging
#from os import mkdir
#from os.path import getsize, exists
import os
import qrcode
import random
import re
from slixmpp import ClientXMPP, stanza
from slixmpp.exceptions import IqError, IqTimeout, PresenceError
from starlette.responses import RedirectResponse
#import time
import tomli_w
from urllib.parse import urlsplit
import uvicorn
import xml.etree.ElementTree as ET
try:
import cv2
except:
print('OpenCV (cv2) is required for dynamic background.')
try:
import numpy
except:
print('NumPy (numpy) is required for dynamic background.')
try:
import tomllib
except:
import tomli as tomllib
class XmppInstance(ClientXMPP):
def __init__(self, jid, password, jid_bare):
super().__init__(jid, password)
self.jid_bare = jid_bare
self.register_plugin('xep_0030') # XEP-0030: Service Discovery
self.register_plugin('xep_0045') # XEP-0045: Multi-User Chat
self.register_plugin('xep_0054') # XEP-0054: vcard-temp
self.register_plugin('xep_0060') # XEP-0060: Publish-Subscribe
self.add_event_handler("session_start", self.on_session_start)
async def on_session_start(self, event):
self.send_presence()
#self.disconnect()
class HttpInstance:
def __init__(self, jabber_id, password, alias):
self.app = FastAPI()
templates = Jinja2Templates(directory='xhtml')
# TODO
# 1) Mount at the same mountpoint /img.
# 2) Image filename to be constant, i.e. /img/photo.png and /img/qr.png.
self.app.mount('/photo', StaticFiles(directory='photo'), name='photo')
self.app.mount('/qr', StaticFiles(directory='qr'), name='qr')
self.app.mount('/css', StaticFiles(directory='css'), name='css')
self.app.mount('/img', StaticFiles(directory='img'), name='img')
# @self.app.get('/favicon.ico', include_in_schema=False)
# def favicon_get():
# return FileResponse('graphic/hermes.ico')
# @self.app.get('/hermes.svg')
# def logo_get():
# return FileResponse('graphic/hermes.svg')
@self.app.get('/v/{jid}')
async def view_jid(request: Request, jid):
"""View messages of jabber id"""
jid_path = urlsplit(jid).path
if parseaddr(jid_path)[1] == jid_path:
jid_bare = jid_path.lower()
else:
jid_bare = jid
note = 'Jabber ID appears to be malformed'
try:
exception = node_title = note = number_of_pages = page_number = previous = selection = services_sorted = None
node_name = 'urn:xmpp:microblog:0'
link_href = 'xmpp:{}?join'.format(jid_bare)
link_text = 'Join'
xmpp_uri = '{}'.format(jid_bare)
# Start an XMPP instance and retrieve information
xmpp_instance = XmppInstance(jabber_id, password, jid_bare)
xmpp_instance.connect()
# JID kind
instance = message = node_id = None
jid_info = await XmppXep0030.get_jid_info(xmpp_instance, jid_bare)
jid_info_iq = jid_info['iq']
jid_kind = jid_info['kind']
links = []
if jid_info['error']:
message = '{}: {} (XEP-0030)'.format(jid_info['text'], jid_info['condition'])
action = 'Connect with'
link_href = 'xmpp:{}'.format(jid_bare)
links.append({'name' : 'Connect',
'href' : link_href,
'iden' : 'connect'})
xmpp_uri = jid_bare
elif jid_kind in ('conference', 'server'):
action = 'Discover'
if jid_kind == 'conference':
instance = 'conferences'
elif jid_kind == 'server':
instance = 'services'
link_href = 'xmpp:{}?disco;type=get;request=items'.format(jid_bare)
links.append({'name' : 'Discover',
'href' : link_href,
'iden' : 'discover'})
view_href = '/d/' + jid_bare
xmpp_uri = jid_bare
elif jid_kind in ('mix', 'muc'):
#title = 'Group Chat ' + title
# TODO Set group chat subject as description.
action = 'Join'
instance = 'participants'
link_href = 'xmpp:{}?join'.format(jid_bare)
links.append({'name' : 'Join',
'href' : link_href,
'iden' : 'join'})
view_href = '/v/' + jid_bare
xmpp_uri = jid_bare
# room_info = await XmppXep0045.get_room_data(xmpp_instance, jid_bare)
# breakpoint()
elif jid_kind == 'pubsub':
#node_name = request.query_params.get('node', '')
if node_name:
action = 'Subscribe'
instance = 'articles'
link_href = 'xmpp:{}?pubsub;node={};action=subscribe'.format(jid_bare, node_name)
view_href = '/d/{}/{}'.format(jid_bare, node_name)
xmpp_uri = '{}?;node={}'.format(jid_bare, node_name)
else:
action = 'Browse'
instance = 'nodes'
link_href = 'xmpp:{}?disco;type=get;request=items'.format(jid_bare)
links.append({'name' : 'Browse',
'href' : link_href,
'iden' : 'browse'})
view_href = '/d/' + jid_bare
xmpp_uri = jid_bare
else:
action = 'Message'
instance = 'articles'
link_href = 'xmpp:{}?message'.format(jid_bare)
links.append({'name' : 'Add',
'href' : 'xmpp:{}?roster'.format(jid_bare),
'id' : 'add'})
links.append({'name' : 'Message',
'href' : link_href,
'iden' : 'message'})
node_name = 'urn:xmpp:microblog:0'
view_href = '/d/{}/{}'.format(jid_bare, node_name)
xmpp_uri = jid_bare
links.append({'name' : 'Subscribe',
'href' : 'xmpp:{}?pubsub;node={};action=subscribe'.format(jid_bare, node_name),
'iden' : 'subscribe'})
links.append({'name' : 'View',
'href' : 'xmpp:{}?pubsub;node={}'.format(jid_bare, node_name),
'iden' : 'view'})
links.append({'name' : 'vCard',
'href' : 'xmpp:{}?vcard'.format(jid_bare),
'iden' : 'vcard'})
# JID info
# NOTE Group chat of Psi+ Project at jabber.ru has a note in its vCard.
# TODO Retrieve group chat title (try also with xep_0045
vcard_data = await XmppXep0054.get_vcard_data(xmpp_instance, jid_bare)
if vcard_data['error']:
jid_detail = {}
#jid_detail['note'] = '{}: {}'.format(vcard_data['text'], vcard_data['condition'])
jid_detail['name'] = jid_detail['note'] = jid_detail['note'] = jid_detail['type'] = jid_detail['bin'] = None
else:
conference_title = None
if jid_kind in ('mix', 'muc'):
for identity in jid_info_iq['disco_info']['identities']:
if identity[3]:
conference_title = identity[3]
break
vcard_temp = vcard_data['iq']['vcard_temp']
jid_detail = {
'name' : vcard_temp['FN'] or conference_title,
'note' : vcard_temp['notes'] or node_id,
'type' : vcard_temp['PHOTO']['TYPE'],
'bin' : vcard_temp['PHOTO']['BINVAL']
}
# Group chat messages
action = 'Join'
messages = []
# TODO Create configurations for group chat preview
room_info_muc = await XmppXep0045.get_room_information(xmpp_instance, jid_bare, alias, maxstanzas=50)
messages = room_info_muc['iq'][3]
messages.reverse()
subject = room_info_muc['iq'][1]['subject']
page_number = request.query_params.get('page', '')
if page_number:
try:
page_number = int(page_number)
ix = (page_number -1) * 10
except:
ix = 0
page_number = 1
else:
ix = 0
page_number = 1
messages_10 = messages[ix:][:10]
number_of_pages = int(len(messages) / 10)
if number_of_pages < len(messages) / 10: number_of_pages += 1
if not room_info_muc:
action = 'Warning'
node_title = jid_info['condition']
node_note = jid_info['text']
services = services_sorted = None
elif isinstance(room_info_muc, IqTimeout):
action = 'Warning'
node_title = 'Timeout'
node_note = 'Timeout error'
services = services_sorted = None
elif isinstance(room_info_muc, IqError):
action = 'Warning'
breakpoint()
node_title = room_info_muc['condition']
node_note = room_info_muc['text']
services = services_sorted = None
else:
#title = title or node_name
if not node_title: node_title = node_name
node_note = jid_bare
xmpp_instance.disconnect()
# Notes
jid_detail_note = jid_detail['note']
if isinstance(jid_detail_note, list) and len(jid_detail_note):
note = jid_detail_note[0]['NOTE']
else:
note = jid_detail_note
#if not note and jid_detail['name'] and not 'undefined' in jid_detail['name'] and title != jid_detail['name']:
# note = jid_detail['name']
# File type
mimetype = filename = filepath = None
if jid_detail['type']:
mimetype = jid_detail['type']
if mimetype:
filetype = mimetype.split('/')[1]
if filetype == 'svg+xml': filetype = 'svg'
filename = '{}.{}'.format(jid_bare, filetype)
filepath = 'photo/{}.{}'.format(jid_bare, filetype)
#img.save(filename)
# Write the decoded bytes to a file
with open(filepath, 'wb') as file:
file.write(jid_detail['bin'])
#from PIL import Image
#img = Image.open(filepath)
#rgb_im = im.convert("RGB")
#rgb_im.save('{}_mod.jpg'.format(jid_bare))
# Default photo. Utilized, if there is no image file.
if not filepath or not os.path.exists(filepath) or os.path.getsize(filepath) == 0:
filename = 'default.svg'
elif filetype == 'svg':
selection = Graphics.extract_colours_from_vector(filepath)
else:
selection = Graphics.extract_colours_from_raster(filepath)
# QR code
Graphics.generate_qr_code_graphics_from_string(link_href, jid_bare)
except Exception as e:
exception = str(e)
action = 'Error'
title = 'Slixmpp error'
xmpp_uri = note = jid
filename = jid_bare = link_href = link_tex = node_note = node_title = number_of_pages = page_number = previous = selection = services = services_sorted = url = None
#if title == 'remote-server-timeout':
# raise HTTPException(status_code=408, detail='remote-server-timeout')
#else:
template_file = 'conference.xhtml'
template_dict = {
'action' : action,
'exception' : exception,
'filename' : filename,
'jid_bare' : jid,
'jid_note' : jid_detail['note'],
'jid_title' : jid_detail['name'],
'links' : links,
'messages' : messages_10,
'node_title' : node_title,
'node_note' : node_note,
'node_name' : node_name,
'number_of_pages' : number_of_pages,
'page_number' : page_number,
'previous' : previous,
'request' : request,
'subject' : subject,
'title' : jid_detail['name'],
'url' : request.url._url,
'xmpp_uri' : xmpp_uri}
response = templates.TemplateResponse(template_file, template_dict)
response.headers['Content-Type'] = 'application/xhtml+xml'
return response
# NOTE Was /b/
@self.app.get('/d/{jid}/{node_name}')
@self.app.get('/d/{jid}/{node_name}/{item_id}')
async def browse_jid_node_get(request: Request, jid, node_name, item_id=None):
"""Browse items of a pubsub node"""
jid_path = urlsplit(jid).path
if parseaddr(jid_path)[1] == jid_path:
jid_bare = jid_path.lower()
else:
jid_bare = jid
note = 'Jabber ID appears to be malformed'
try:
exception = note = number_of_pages = page_number = previous = selection = services_sorted = None
node_title = node_name
link_href = 'xmpp:{}?pubsub;node={};action=subscribe'.format(jid_bare, node_name)
link_text = 'Subscribe'
xmpp_uri = '{}?;node={}'.format(jid_bare, node_name)
# Start an XMPP instance and retrieve information
xmpp_instance = XmppInstance(jabber_id, password, jid_bare)
xmpp_instance.connect()
# JID kind
instance = message = node_id = None
jid_info = await XmppXep0030.get_jid_info(xmpp_instance, jid_bare)
jid_info_iq = jid_info['iq']
jid_kind = jid_info['kind']
links = []
if jid_info['error']:
message = '{}: {} (XEP-0030)'.format(jid_info['text'], jid_info['condition'])
action = 'Connect with'
link_href = 'xmpp:{}'.format(jid_bare)
links.append({'name' : 'Connect',
'href' : link_href,
'iden' : 'connect'})
xmpp_uri = jid_bare
elif jid_kind in ('conference', 'server'):
action = 'Discover'
if jid_kind == 'conference':
instance = 'conferences'
elif jid_kind == 'server':
instance = 'services'
link_href = 'xmpp:{}?disco;type=get;request=items'.format(jid_bare)
links.append({'name' : 'Discover',
'href' : link_href,
'iden' : 'discover'})
view_href = '/d/' + jid_bare
xmpp_uri = jid_bare
elif jid_kind in ('mix', 'muc'):
#title = 'Group Chat ' + title
# TODO Set group chat subject as description.
action = 'Join'
instance = 'participants'
link_href = 'xmpp:{}?join'.format(jid_bare)
links.append({'name' : 'Join',
'href' : link_href,
'iden' : 'join'})
view_href = '/v/' + jid_bare
xmpp_uri = jid_bare
# room_info = await XmppXep0045.get_room_data(xmpp_instance, jid_bare)
# breakpoint()
elif jid_kind == 'pubsub':
#node_name = request.query_params.get('node', '')
if node_name:
action = 'Subscribe'
instance = 'articles'
link_href = 'xmpp:{}?pubsub;node={};action=subscribe'.format(jid_bare, node_name)
view_href = '/d/{}/{}'.format(jid_bare, node_name)
xmpp_uri = '{}?;node={}'.format(jid_bare, node_name)
else:
action = 'Browse'
instance = 'nodes'
link_href = 'xmpp:{}?disco;type=get;request=items'.format(jid_bare)
links.append({'name' : 'Browse',
'href' : link_href,
'iden' : 'browse'})
view_href = '/d/' + jid_bare
xmpp_uri = jid_bare
else:
action = 'Message'
instance = 'articles'
link_href = 'xmpp:{}?message'.format(jid_bare)
links.append({'name' : 'Add',
'href' : 'xmpp:{}?roster'.format(jid_bare),
'iden' : 'add'})
links.append({'name' : 'Add',
'href' : link_href,
'iden' : 'message'})
node_name = 'urn:xmpp:microblog:0'
view_href = '/d/{}/{}'.format(jid_bare, node_name)
xmpp_uri = jid_bare
if item_id:
links.append({'name' : 'Subscribe',
'href' : 'xmpp:{}?pubsub;node={};item={};action=subscribe'.format(jid_bare, node_name, item_id),
'iden' : 'subscribe'})
links.append({'name' : 'View',
'href' : 'xmpp:{}?pubsub;node={};item={}'.format(jid_bare, node_name, item_id),
'iden' : 'view'})
else:
links.append({'name' : 'Subscribe',
'href' : 'xmpp:{}?pubsub;node={};action=subscribe'.format(jid_bare, node_name),
'iden' : 'subscribe'})
links.append({'name' : 'View',
'href' : 'xmpp:{}?pubsub;node={}'.format(jid_bare, node_name),
'iden' : 'view'})
links.append({'name' : 'vCard',
'href' : 'xmpp:{}?vcard'.format(jid_bare),
'iden' : 'vcard'})
# JID info
# NOTE Group chat of Psi+ Project at jabber.ru has a note in its vCard.
vcard_data = await XmppXep0054.get_vcard_data(xmpp_instance, jid_bare)
if vcard_data['error']:
jid_detail = {}
#jid_detail['note'] = '{}: {}'.format(vcard_data['text'], vcard_data['condition'])
jid_detail['name'] = jid_detail['note'] = jid_detail['note'] = jid_detail['type'] = jid_detail['bin'] = None
else:
conference_title = None
if jid_kind in ('mix', 'muc'):
for identity in jid_info_iq['disco_info']['identities']:
if identity[3]:
conference_title = identity[3]
break
vcard_temp = vcard_data['iq']['vcard_temp']
jid_detail = {
'name' : vcard_temp['FN'] or conference_title,
'note' : vcard_temp['notes'] or node_id,
'type' : vcard_temp['PHOTO']['TYPE'],
'bin' : vcard_temp['PHOTO']['BINVAL']
}
# Title
# TODO /d/pubsub.nicoco.fr/blog/urn-uuid-53e43061-1962-3112-bb8a-1473dca61719
if '@' in jid_bare and node_name == 'urn:xmpp:microblog:0':
node_title = 'Journal'
else:
jid_items = await XmppXep0030.get_jid_items(xmpp_instance, jid_bare)
iq = jid_items['iq']
iq_disco_items = iq['disco_items']
iq_disco_items_items = iq_disco_items['items']
category = 'unsorted'
for item in iq_disco_items_items:
if item[2] and item[1] == node_name:
node_title = item[2]
break
# Node items
action = 'Browse'
entries = []
if item_id:
previous = True
node_items = await XmppXep0060.get_node_items(xmpp_instance, jid_bare, node_name, item_ids=[item_id])
else:
item_ids = []
node_item_ids = await XmppXep0060.get_node_item_ids(xmpp_instance, jid_bare, node_name)
for item_id in node_item_ids['disco_items']['items']:
item_ids.append(item_id[2])
# NOTE Consider to skip the reversal of order, because, then, items can be found at the same page.
item_ids.reverse()
page_number = request.query_params.get('page', '')
if page_number:
try:
page_number = int(page_number)
ix = (page_number -1) * 10
except:
ix = 0
page_number = 1
else:
ix = 0
page_number = 1
item_ids_10 = item_ids[ix:][:10]
node_items = await XmppXep0060.get_node_items(xmpp_instance, jid_bare, node_name, item_ids=item_ids_10)
number_of_pages = int(len(item_ids) / 10)
if number_of_pages < len(item_ids) / 10: number_of_pages += 1
if not node_items:
action = 'Warning'
node_title = jid_info['condition']
node_note = jid_info['text']
services = services_sorted = None
elif isinstance(node_items, IqTimeout):
action = 'Warning'
node_title = 'Timeout'
node_note = 'Timeout error'
services = services_sorted = None
elif isinstance(node_items, IqError):
action = 'Warning'
breakpoint()
node_title = node_items['condition']
node_note = node_items['text']
services = services_sorted = None
else:
#title = title or node_name
if not node_title: node_title = node_name
node_note = jid_bare
for item in node_items['pubsub']['items']:
item_payload = item['payload']
entry = Syndication.extract_items(item_payload)
if entry: entry['id'] = item['id']
entries.append(entry)
#if len(entries) > 10: break
if entries: entries.reverse()
xmpp_instance.disconnect()
# Notes
jid_detail_note = jid_detail['note']
if isinstance(jid_detail_note, list) and len(jid_detail_note):
note = jid_detail_note[0]['NOTE']
else:
note = jid_detail_note
#if not note and jid_detail['name'] and not 'undefined' in jid_detail['name'] and title != jid_detail['name']:
# note = jid_detail['name']
# File type
mimetype = filename = filepath = None
if jid_detail['type']:
mimetype = jid_detail['type']
if mimetype:
filetype = mimetype.split('/')[1]
if filetype == 'svg+xml': filetype = 'svg'
filename = '{}.{}'.format(jid_bare, filetype)
filepath = 'photo/{}.{}'.format(jid_bare, filetype)
#img.save(filename)
# Write the decoded bytes to a file
with open(filepath, 'wb') as file:
file.write(jid_detail['bin'])
#from PIL import Image
#img = Image.open(filepath)
#rgb_im = im.convert("RGB")
#rgb_im.save('{}_mod.jpg'.format(jid_bare))
# Default photo. Utilized, if there is no image file.
if not filepath or not os.path.exists(filepath) or os.path.getsize(filepath) == 0:
filename = 'default.svg'
elif filetype == 'svg':
selection = Graphics.extract_colours_from_vector(filepath)
else:
selection = Graphics.extract_colours_from_raster(filepath)
# QR code
Graphics.generate_qr_code_graphics_from_string(link_href, jid_bare)
except Exception as e:
exception = str(e)
action = 'Error'
title = 'Slixmpp error'
xmpp_uri = note = jid
filename = jid_bare = link_href = link_tex = node_note = node_title = number_of_pages = page_number = previous = selection = services = services_sorted = url = None
#if title == 'remote-server-timeout':
# raise HTTPException(status_code=408, detail='remote-server-timeout')
#else:
template_file = 'node.xhtml'
template_dict = {
'action' : action,
'entries' : entries,
'exception' : exception,
'filename' : filename,
'jid_bare' : jid,
'jid_note' : jid_detail['note'],
'jid_title' : jid_detail['name'],
'links' : links,
'node_title' : node_title,
'node_note' : node_note,
'node_name' : node_name,
'number_of_pages' : number_of_pages,
'page_number' : page_number,
'previous' : previous,
'request' : request,
'title' : node_title,
'url' : request.url._url,
'xmpp_uri' : xmpp_uri}
response = templates.TemplateResponse(template_file, template_dict)
response.headers['Content-Type'] = 'application/xhtml+xml'
return response
@self.app.get('/d/{jid}')
async def discover_jid_get(request: Request, jid):
"""View items of a selected service"""
jid_path = urlsplit(jid).path
if parseaddr(jid_path)[1] == jid_path:
jid_bare = jid_path.lower()
else:
jid_bare = jid
note = 'Jabber ID appears to be malformed'
try:
exception = note = selection = services_sorted = None
title = 'Services'
link_href = xmpp_uri = jid_bare
link_text = 'Reload'
# Start an XMPP instance and retrieve information
xmpp_instance = XmppInstance(jabber_id, password, jid_bare)
xmpp_instance.connect()
# JID services
action = 'Discover'
jid_info = await XmppXep0030.get_jid_info(xmpp_instance, jid_bare)
iq = jid_info['iq']
if iq:
jid_kind = jid_info['kind']
iq_disco_info = iq['disco_info']
for identity in iq_disco_info['identities']:
if jid_kind == identity[0] and identity[3]:
note = identity[3]
if not note: note = jid_bare
jid_items = await XmppXep0030.get_jid_items(xmpp_instance, jid_bare)
iq = jid_items['iq']
iq_disco_items = iq['disco_items']
iq_disco_items_items = iq_disco_items['items']
services = {}
#services_sorted = {}
category = 'unsorted'
for item in iq_disco_items_items:
jid_bare = item[0]
if len(iq_disco_items_items) > 20 or jid_kind and jid_kind in ('pubsub'):
identity = sub_jid_info = sub_jid_info_iq = ''
if jid_kind and jid_kind in ('conference', 'mix', 'muc'):
category = 'conference'
if jid_kind and jid_kind in ('pubsub'):
category = 'pubsub'
else:
sub_jid_info = await XmppXep0030.get_jid_info(xmpp_instance, jid_bare)
sub_jid_info_iq = sub_jid_info['iq']
try:
for identity_item in sub_jid_info_iq['disco_info']['identities']:
identity = identity_item
break
if sub_jid_info_iq:
category = identity[0] if (identity, list) and identity[0] else 'other'
except:
identity = None
category = 'unavailable'
sub_jid_kind = sub_jid_info['kind'] if 'kind' in sub_jid_info else None
if category not in services: services[category] = []
services[category].append(
{'identity' : identity,
'info' : sub_jid_info,
'jid' : jid_bare,
'kind' : sub_jid_kind,
'name' : item[2] or item[1] or item[0],
'node' : item[1]})
services_sorted = {k: v for k, v in services.items() if k != 'unavailable'}
if 'unavailable' in services: services_sorted['unavailable'] = services['unavailable']
else:
action = 'Warning'
title = jid_info['condition']
note = jid_info['text']
services = services_sorted = None
xmpp_instance.disconnect()
except Exception as e:
exception = str(e)
action = 'Error'
title = 'Slixmpp error'
xmpp_uri = note = jid
filename = jid_bare = link_href = link_text = selection = services = services_sorted = url = None
#if title == 'remote-server-timeout':
# raise HTTPException(status_code=408, detail='remote-server-timeout')
#else:
template_file = 'disco.xhtml'
template_dict = {
'action' : action,
'exception' : exception,
'filename' : 'default.svg',
'jid_bare' : jid,
'note' : note,
'request' : request,
'services' : services_sorted,
'title' : title,
'url' : request.url._url,
'link_href' : link_href,
'link_text' : link_text,
'xmpp_uri' : xmpp_uri}
response = templates.TemplateResponse(template_file, template_dict)
response.headers['Content-Type'] = 'application/xhtml+xml'
return response
@self.app.get('/v/{jid}')
async def view_jid_get(request: Request, jid):
"""View recent messages of a conference"""
pass #TODO
@self.app.get('/{jid}/{node_name}')
async def jid_node_get(request: Request, jid, node_name):
response = await main_jid_node_get(request, jid, node_name)
return response
@self.app.get('/{jid}')
async def jid_get(request: Request, jid):
node_name = request.query_params.get('node', '')
if node_name:
response = RedirectResponse(url='/{}/{}'.format(jid, node_name))
else:
response = await main_jid_node_get(request, jid)
return response
async def main_jid_node_get(request: Request, jid, node_name=None):
jid_bare = jid
jid_path = urlsplit(jid).path
if parseaddr(jid_path)[1] == jid_path:
jid_bare = jid_path.lower()
else:
jid_bare = jid
note = 'Jabber ID appears to be malformed'
try:
exception = note = selection = title = view_href = None
# Start an XMPP instance and retrieve information
xmpp_instance = XmppInstance(jabber_id, password, jid_bare)
xmpp_instance.connect()
# JID kind
instance = message = node_id = None
jid_info = await XmppXep0030.get_jid_info(xmpp_instance, jid_bare)
jid_info_iq = jid_info['iq']
jid_kind = jid_info['kind']
links = []
if jid_info['error']:
message = '{}: {} (XEP-0030)'.format(jid_info['text'], jid_info['condition'])
action = 'Connect with'
link_href = 'xmpp:{}'.format(jid_bare)
links.append(['Connect', link_href])
xmpp_uri = jid_bare
elif jid_kind in ('conference', 'server'):
action = 'Discover'
if jid_kind == 'conference':
instance = 'conferences'
elif jid_kind == 'server':
instance = 'services'
link_href = 'xmpp:{}?disco;type=get;request=items'.format(jid_bare)
links.append(['Discover', link_href])
view_href = '/d/' + jid_bare
xmpp_uri = jid_bare
elif jid_kind in ('mix', 'muc'):
#title = 'Group Chat ' + title
# TODO Set group chat subject as description.
action = 'Join'
instance = 'participants'
link_href = 'xmpp:{}?join'.format(jid_bare)
links.append(['Join', link_href])
view_href = '/v/' + jid_bare
xmpp_uri = jid_bare
# room_info = await XmppXep0045.get_room_data(xmpp_instance, jid_bare)
# breakpoint()
elif jid_kind == 'pubsub':
#node_name = request.query_params.get('node', '')
if node_name:
action = 'Subscribe'
instance = 'articles'
link_href = 'xmpp:{}?pubsub;node={};action=subscribe'.format(jid_bare, node_name)
links.append(['Subscribe', link_href])
links.append(['View', 'xmpp:{}?pubsub;node={}'.format(jid_bare, node_name)])
view_href = '/d/{}/{}'.format(jid_bare, node_name)
xmpp_uri = '{}?;node={}'.format(jid_bare, node_name)
else:
action = 'Browse'
instance = 'nodes'
link_href = 'xmpp:{}?disco;type=get;request=items'.format(jid_bare)
links.append(['Browse', link_href])
view_href = '/d/' + jid_bare
xmpp_uri = jid_bare
else:
action = 'Message'
instance = 'articles'
link_href = 'xmpp:{}?message'.format(jid_bare)
links.append(['Add', 'xmpp:{}?roster'.format(jid_bare)])
links.append(['Message', link_href])
node_name = 'urn:xmpp:microblog:0'
view_href = '/d/{}/{}'.format(jid_bare, node_name)
xmpp_uri = jid_bare
links.append(['vCard', 'xmpp:{}?vcard'.format(jid_bare)])
# JID item count
count = None
if jid_kind in ('mix', 'muc', 'conference', 'server'):
jid_items = await XmppXep0030.get_jid_items(xmpp_instance, jid_bare)
count = len(jid_items['iq']['disco_items']['items'])
elif jid_kind in ('account', 'pubsub'):
node_items = await XmppXep0060.get_node_item_ids(xmpp_instance, jid_bare, node_name)
if isinstance(node_items, stanza.iq.Iq):
count = len(node_items['disco_items']['items'])
# JID info
# NOTE Group chat of Psi+ Project at jabber.ru has a note in its vCard.
vcard_data = await XmppXep0054.get_vcard_data(xmpp_instance, jid_bare)
if vcard_data['error']:
jid_detail = {}
#jid_detail['note'] = '{}: {}'.format(vcard_data['text'], vcard_data['condition'])
jid_detail['name'] = jid_detail['note'] = jid_detail['note'] = jid_detail['type'] = jid_detail['bin'] = None
else:
conference_title = None
if jid_kind in ('mix', 'muc'):
for identity in jid_info_iq['disco_info']['identities']:
if identity[3]:
conference_title = identity[3]
break
vcard_temp = vcard_data['iq']['vcard_temp']
jid_detail = {
'name' : vcard_temp['FN'] or conference_title,
'note' : vcard_temp['notes'] or node_id,
'type' : vcard_temp['PHOTO']['TYPE'],
'bin' : vcard_temp['PHOTO']['BINVAL']
}
# Title
if jid_kind == 'pubsub':
jid_items = await XmppXep0030.get_jid_items(xmpp_instance, jid_bare)
iq = jid_items['iq']
iq_disco_items = iq['disco_items']
iq_disco_items_items = iq_disco_items['items']
for item in iq_disco_items_items:
if item[2] and item[1] == node_name:
title = item[2]
break
if jid_kind == 'server':
if jid_info_iq:
for identity in jid_info_iq['disco_info']['identities']:
if jid_kind == identity[0] and identity[1] == 'im' and identity[3]:
title = identity[3]
print(jid_bare)
print(identity)
print(jid_info)
# String 'undefined' is sourced from JID discuss@conference.conversejs.org
if not title:
if jid_detail['name'] and not 'undefined' in jid_detail['name']:
title = jid_detail['name']
else:
title = jid_bare.split('@')[0]
xmpp_instance.disconnect()
# Notes
jid_detail_note = jid_detail['note']
if isinstance(jid_detail_note, list) and len(jid_detail_note):
note = jid_detail_note[0]['NOTE']
else:
note = jid_detail_note
#if not note and jid_detail['name'] and not 'undefined' in jid_detail['name'] and title != jid_detail['name']:
# note = jid_detail['name']
# File type
mimetype = filename = filepath = None
if jid_detail['type']:
mimetype = jid_detail['type']
if mimetype:
filetype = mimetype.split('/')[1]
if filetype == 'svg+xml': filetype = 'svg'
filename = '{}.{}'.format(jid_bare, filetype)
filepath = 'photo/{}.{}'.format(jid_bare, filetype)
#img.save(filename)
# Write the decoded bytes to a file
with open(filepath, 'wb') as file:
file.write(jid_detail['bin'])
#from PIL import Image
#img = Image.open(filepath)
#rgb_im = im.convert("RGB")
#rgb_im.save('{}_mod.jpg'.format(jid_bare))
# Default photo. Utilized, if there is no image file.
if not filepath or not os.path.exists(filepath) or os.path.getsize(filepath) == 0:
filename = 'default.svg'
elif filetype == 'svg':
selection = Graphics.extract_colours_from_vector(filepath)
else:
selection = Graphics.extract_colours_from_raster(filepath)
# QR code
Graphics.generate_qr_code_graphics_from_string(link_href, jid_bare)
except Exception as e:
exception = str(e)
print(exception)
action = 'Error'
title = 'Slixmpp error'
xmpp_uri = jid
count = filename = jid_bare = jid_kind = links = message = selection = url = None
template_file = 'jid.xhtml'
template_dict = {
'action' : action,
'count' : count,
'instance' : instance,
'exception' : exception,
'filename' : filename,
'jid_bare' : jid_bare,
'jid_kind' : jid_kind,
'links' : links,
'message' : message,
'note' : note,
'request' : request,
'selection' : selection,
'title' : title,
'url' : request.url._url,
'view_href' : view_href,
'xmpp_uri' : xmpp_uri}
response = templates.TemplateResponse(template_file, template_dict)
response.headers['Content-Type'] = 'application/xhtml+xml'
return response
@self.app.exception_handler(404)
def not_found_exception_handler(request: Request, exc: HTTPException):
action = 'Warning'
title = 'Not Found'
return result_get(request, action, title)
@self.app.exception_handler(500)
def internal_error_exception_handler(request: Request, exc: HTTPException):
action = 'Error'
title = 'Internal Server Error'
return result_get(request, action, title)
def result_get(request: Request, action: str, title: str):
template_file = 'result.xhtml'
template_dict = {
'action' : action,
'request' : request,
'title' : title,
'url' : request.url._url}
response = templates.TemplateResponse(template_file, template_dict)
response.headers['Content-Type'] = 'application/xhtml+xml'
return response
@self.app.get('/')
async def main_get(request: Request):
jabber_id = request.query_params.get('jid', '')
if jabber_id:
response = RedirectResponse(url='/' + jabber_id)
else:
template_file = 'main.xhtml'
template_dict = {
'request' : request,
'url' : request.url._url}
response = templates.TemplateResponse(template_file, template_dict)
response.headers['Content-Type'] = 'application/xhtml+xml'
return response
class Data:
def open_file_toml(filename: str) -> dict:
with open(filename, mode="rb") as fn:
data = tomllib.load(fn)
return data
def save_to_toml(filename: str, data: dict) -> None:
with open(filename, 'w') as fn:
data_as_string = tomli_w.dumps(data)
fn.write(data_as_string)
class Graphics:
def extract_colours_from_raster(filepath):
try:
img = cv2.imread(filepath)
#thresholded = cv2.inRange(img, (50, 100, 200), (50, 100, 200))
thresholded = cv2.inRange(img, (90, 90, 90), (190, 190, 190))
#thresholded = cv2.bitwise_not(thresholded)
#thresholded = cv2.inRange(img, (0, 0, 0), (0, 0, 0))
#res = img + cv2.cvtColor(thresholded, cv2.COLOR_GRAY2BGR)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
#result = numpy.clip(img, 90, 190)
#result = numpy.clip(img, 50, 200)
#result = numpy.clip(img, 100, 150)
result = numpy.clip(img, 100, 200)
res = cv2.cvtColor(result, cv2.COLOR_RGB2BGR)
"""
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
mask = numpy.all(numpy.logical_and(img >= 90, img <= 190), axis=2)
result = numpy.where(mask[...,None], img, 255)
res = cv2.cvtColor(result, cv2.COLOR_RGB2BGR)
"""
"""
# Thresholding for black:
lower_black = numpy.array([0, 0, 0])
upper_black = numpy.array([50, 50, 50]) # Adjust this value for the black range
black_mask = cv2.inRange(img, lower_black, upper_black)
# Thresholding for white:
lower_white = numpy.array([250, 250, 250])
upper_white = numpy.array([255, 255, 255])
white_mask = cv2.inRange(img, lower_white, upper_white)
# Combine the masks
combined_mask = cv2.bitwise_or(black_mask, white_mask)
# Invert the combined mask
inverted_mask = cv2.bitwise_not(combined_mask)
# Apply the mask to the original image
res = cv2.bitwise_and(img, img, mask=inverted_mask)
"""
selection = []
ix_1st = random.randint(1, len(res)-1)
res_ix_1st = res[ix_1st]
ix_ix_1st = random.randint(1, len(res_ix_1st)-1)
res_ix_ix_1st = res_ix_1st[ix_ix_1st]
selection.append(numpy.array(res_ix_ix_1st).tolist())
ix_2nd = random.randint(1, len(res)-1)
res_ix_2nd = res[ix_2nd]
ix_ix_2nd = random.randint(1, len(res_ix_2nd)-1)
res_ix_ix_2nd = res_ix_2nd[ix_ix_2nd]
selection.append(numpy.array(res_ix_ix_2nd).tolist())
print(selection)
except Exception as e:
selection = None
exception = str(e)
print(exception)
return selection
def extract_colours_from_vector(filepath):
# Parse the SVG file
tree = ET.parse(filepath)
root = tree.getroot()
# Set to store unique colours
colours_hex = set()
colours_rgb = []
# SVG namespace
namespace = {'svg': 'http://www.w3.org/2000/svg'}
# Find all possible elements
for elem in root.findall('.//svg:circle', namespace) + \
root.findall('.//svg:ellipse', namespace) + \
root.findall('.//svg:line', namespace) + \
root.findall('.//svg:path', namespace) + \
root.findall('.//svg:polygon', namespace) + \
root.findall('.//svg:rect', namespace) + \
root.findall('.//svg:text', namespace):
fill = elem.get('fill')
stroke = elem.get('stroke')
# Add colours to the set if they are not None or 'none'
if fill and fill.startswith('#') and len(fill) > 4 and fill.lower() != 'none':
colours_hex.add(fill)
if stroke and stroke.startswith('#') and len(stroke) > 4 and stroke.lower() != 'none':
colours_hex.add(stroke)
for colour in colours_hex:
hex = colour.lstrip('#')
rgb = list(int(hex[i:i+2], 16) for i in (0, 2, 4))
rgb.reverse()
colours_rgb.append(rgb)
selection = []
if len(colours_rgb) > 1:
for i in range(2):
ix = random.randint(0, len(colours_rgb)-1)
selection.append(colours_rgb[ix])
del colours_rgb[ix]
elif len(colours_rgb) == 1:
selection = [colours_rgb[0], colours_rgb[0]]
return selection
def generate_qr_code_graphics_from_string(text, jid_bare):
#qrcode_graphics = qrcode.make(text)
qr = qrcode.QRCode(border=2, box_size=10)
qr.add_data(text)
qrcode_graphics = qr.make_image(fill_color='#333', back_color='#f2f2f2')
qrcode_graphics.save('qr/{}.png'.format(jid_bare))
class Syndication:
def extract_items(item_payload, limit=False):
namespace = '{http://www.w3.org/2005/Atom}'
title = item_payload.find(namespace + 'title')
links = item_payload.find(namespace + 'link')
if (not isinstance(title, ET.Element) and
not isinstance(links, ET.Element)): return None
title_text = '' if title == None else title.text
if isinstance(links, ET.Element):
for link in item_payload.findall(namespace + 'link'):
link_href = link.attrib['href'] if 'href' in link.attrib else ''
if link_href: break
contents = item_payload.find(namespace + 'content')
content_text = ''
if isinstance(contents, ET.Element):
for content in item_payload.findall(namespace + 'content'):
content_text = content.text or ''
if content_text: break
summaries = item_payload.find(namespace + 'summary')
summary_text = ''
if isinstance(summaries, ET.Element):
for summary in item_payload.findall(namespace + 'summary'):
summary_text = summary.text or ''
if summary_text: break
published = item_payload.find(namespace + 'published')
published_text = '' if published == None else published.text
categories = item_payload.find(namespace + 'category')
tags = []
if isinstance(categories, ET.Element):
for category in item_payload.findall(namespace + 'category'):
if 'term' in category.attrib and category.attrib['term']:
category_term = category.attrib['term']
if len(category_term) < 20:
tags.append(category_term)
elif len(category_term) < 50:
tags.append(category_term)
if limit and len(tags) > 4: break
identifier = item_payload.find(namespace + 'id')
if identifier and identifier.attrib: print(identifier.attrib)
identifier_text = '' if identifier == None else identifier.text
instances = '' # TODO Check the Blasta database for instances.
entry = {'content' : content_text,
'href' : link_href,
'published' : published_text,
'summary' : summary_text,
'tags' : tags,
'title' : title_text,
'updated' : published_text} # TODO "Updated" is missing
return entry
class XmppXep0030:
async def get_jid_items(self, jid_bare):
try:
condition = text = None
error = False
iq = await self['xep_0030'].get_items(jid=jid_bare)
except (IqError, IqTimeout) as e:
#logger.warning('Chat type could not be determined for {}'.format(jid_bare))
#logger.error(e)
iq = None
error = True
condition = e.iq['error']['condition']
text = e.iq['error']['text'] or 'Error'
#if not text:
# # NOTE We might want to set a specific photo for condition remote-server-not-found
# if condition:
# text = 'Could not determine JID type'
# else:
# text = 'Unknown Error'
result = {
'condition' : condition,
'error' : error,
'iq' : iq,
'text' : text}
return result
# NOTE
# Feature "urn:xmpp:mucsub:0" is present in both, MUC local and MUC hostname
# Feature "urn:xmpp:serverinfo:0" is present in both, MUC hostname and main hostname
async def get_jid_info(self, jid_bare):
jid_kind = None
try:
error = False
condition = text = None
iq = await self['xep_0030'].get_info(jid=jid_bare)
iq_disco_info = iq['disco_info']
if iq_disco_info:
features = iq_disco_info['features']
if 'http://jabber.org/protocol/muc#unique' in features:
jid_kind = 'conference'
elif 'urn:xmpp:mix:core:1' in features:
jid_kind = 'mix'
elif ('muc_moderated' in features or
'muc_open' in features or
'muc_persistent' in features or
'muc_public' in features or
'muc_semianonymous' in features or
'muc_unmoderated' in features or
'muc_unsecured' in features):
jid_kind = 'muc'
else:
for identity in iq_disco_info['identities']:
if identity[0] == 'pubsub' and identity[1] == 'service':
#if 'http://jabber.org/protocol/pubsub' in features:
#if 'http://jabber.org/protocol/pubsub#access-authorize' in features:
#if 'http://jabber.org/protocol/rsm' in features:
jid_kind = 'pubsub'
break
if identity[0] == 'server' and identity[1] == 'im':
jid_kind = 'server'
break
#if identity[0] == 'pubsub' and identity[1] == 'pep':
if identity[0] == 'account':
#if 'urn:xmpp:bookmarks:1#compat-pep' in features:
#if 'urn:xmpp:bookmarks:1#compat' in features:
#if 'urn:xmpp:push:0' in features:
#if 'urn:xmpp:pep-vcard-conversion:0' in features:
#if 'urn:xmpp:sid:0' in features:
# Also in MIX
#if 'urn:xmpp:mam:2' in features:
#if 'urn:xmpp:mam:2#extended' in features:
jid_kind = 'account'
break
if identity[0] == 'client' and identity[1] == 'bot':
jid_kind = 'bot'
#logger.info('Jabber ID: {}\n'
# 'Chat Type: {}'.format(jid_bare, result))
else:
iq = condition = text = None
except (IqError, IqTimeout) as e:
#logger.warning('Chat type could not be determined for {}'.format(jid_bare))
#logger.error(e)
iq = None
error = True
condition = e.iq['error']['condition']
text = e.iq['error']['text'] or 'Error'
#if not text:
# # NOTE We might want to set a specific photo for condition remote-server-not-found
# if condition:
# text = 'Could not determine JID type'
# else:
# text = 'Unknown Error'
result = {
'condition' : condition,
'error' : error,
'iq' : iq,
'text' : text,
'kind' : jid_kind}
return result
class XmppXep0045:
async def get_room_information(self, jid, alias, maxchars=None, maxstanzas=None, seconds=None):
#logger.info('Joining groupchat\nJID : {}\n'.format(jid))
#jid_from = str(self.boundjid) if self.is_component else None
if not maxchars: maxchars = 1000
if not maxstanzas: maxstanzas = 50
if not seconds: seconds = 864000
try:
error = False
condition = text = None
#since = datetime.fromtimestamp(time.time()-seconds)
iq = await self['xep_0045'].join_muc_wait(
jid,
alias,
#maxchars=maxchars,
maxstanzas=maxstanzas,
#password=None,
#presence_options = {"pfrom" : jid_from},
#seconds=seconds,
#since=since,
#timeout=30
)
except (IqError, IqTimeout, PresenceError) as e:
error = True
iq = None
condition = e.iq['error']['condition']
text = e.iq['error']['text']
result = {
'error' : error,
'condition' : condition,
'text' : text,
'iq' : iq}
return result
async def get_room_data(self, jid_bare):
return await self['xep_0045'].get_room_config(jid_bare)
async def get_room_participants(self, jid_bare):
return await self['xep_0045'].get_roster(jid_bare)
# NOTE: "Item not found", yet is a group chat
# That is, JID has no vcard
# messaging-off@conference.movim.eu
class XmppXep0054:
async def get_vcard_data(self, jid_bare):
try:
error = False
condition = text = None
iq = await self['xep_0054'].get_vcard(jid_bare)
except (IqError, IqTimeout) as e:
error = True
condition = e.iq['error']['condition']
text = e.iq['error']['text']
if not text:
if condition:
text = 'Could not retrieve vCard'
else:
text = 'Unknown Error'
iq = None
result = {
'error' : error,
'condition' : condition,
'text' : text,
'iq' : iq}
return result
class XmppXep0060:
async def get_node_items(self, jid_bare, node_name, item_ids=None, max_items=None):
try:
if max_items:
iq = await self['xep_0060'].get_items(
jid_bare, node_name, timeout=5)
it = self['xep_0060'].get_items(
jid_bare, node_name, timeout=5, max_items=max_items, iterator=True)
q = rsm.Iq()
q['to'] = jid_bare
q['disco_items']['node'] = node_name
async for item in rsm.ResultIterator(q, 'disco_items', '10'):
print(item['disco_items']['items'])
else:
iq = await self['xep_0060'].get_items(
jid_bare, node_name, timeout=5, item_ids=item_ids)
result = iq
except IqError as e:
if e.iq['error']['text'] == 'Node not found':
result = 'Node not found'
elif e.iq['error']['condition'] == 'item-not-found':
result = 'Item not found'
else:
result = None
except IqTimeout as e:
result = e
return result
async def get_node_item_ids(self, jid_bare, node_name):
try:
iq = await self['xep_0030'].get_items(
jid_bare, node_name)
# Broken. See https://codeberg.org/poezio/slixmpp/issues/3548
#iq = await self['xep_0060'].get_item_ids(
# jid_bare, node_name, timeout=5)
result = iq
except IqError as e:
if e.iq['error']['text'] == 'Node not found':
result = 'Node not found'
elif e.iq['error']['condition'] == 'item-not-found':
result = 'Item not found'
else:
result = None
except IqTimeout as e:
result = e
return result
class XmppXep0369:
async def get_room_data(self, jid_bare):
return await self['xep_0369'].get_channel_info(jid_bare)
def main():
filename_configuration = 'configuration.toml'
data = Data.open_file_toml(filename_configuration)
account = data['account']
jabber_id = account['xmpp']
password = account['pass']
alias = account['alias']
http_instance = HttpInstance(jabber_id, password, alias)
return http_instance.app
app = main()
# FIXME
if __name__ == '__main__':
uvicorn.run(app, host='127.0.0.1', port=8000, reload=True)