1402 lines
58 KiB
Python
1402 lines
58 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from datetime import datetime
|
|
from email.utils import parseaddr
|
|
from fastapi import FastAPI, Form, HTTPException, Request, Response
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import FileResponse, HTMLResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
import glob
|
|
#import logging
|
|
#from os import mkdir
|
|
#from os.path import getsize, exists
|
|
import os
|
|
import qrcode
|
|
import random
|
|
import re
|
|
from slixmpp import ClientXMPP, stanza
|
|
from slixmpp.exceptions import IqError, IqTimeout, PresenceError
|
|
from starlette.responses import RedirectResponse
|
|
#import time
|
|
import tomli_w
|
|
from urllib.parse import urlsplit
|
|
import uvicorn
|
|
import xml.etree.ElementTree as ET
|
|
|
|
try:
|
|
import cv2
|
|
except:
|
|
print('OpenCV (cv2) is required for dynamic background.')
|
|
|
|
try:
|
|
import numpy
|
|
except:
|
|
print('NumPy (numpy) is required for dynamic background.')
|
|
|
|
try:
|
|
import tomllib
|
|
except:
|
|
import tomli as tomllib
|
|
|
|
class XmppInstance(ClientXMPP):
|
|
def __init__(self, jid, password, jid_bare):
|
|
super().__init__(jid, password)
|
|
|
|
self.jid_bare = jid_bare
|
|
|
|
self.register_plugin('xep_0030') # XEP-0030: Service Discovery
|
|
self.register_plugin('xep_0045') # XEP-0045: Multi-User Chat
|
|
self.register_plugin('xep_0054') # XEP-0054: vcard-temp
|
|
self.register_plugin('xep_0060') # XEP-0060: Publish-Subscribe
|
|
|
|
self.add_event_handler("session_start", self.on_session_start)
|
|
|
|
async def on_session_start(self, event):
|
|
self.send_presence()
|
|
#self.disconnect()
|
|
|
|
class HttpInstance:
|
|
def __init__(self, jabber_id, password, alias):
|
|
|
|
self.app = FastAPI()
|
|
templates = Jinja2Templates(directory='xhtml')
|
|
|
|
# TODO
|
|
# 1) Mount at the same mountpoint /img.
|
|
# 2) Image filename to be constant, i.e. /img/photo.png and /img/qr.png.
|
|
self.app.mount('/photo', StaticFiles(directory='photo'), name='photo')
|
|
self.app.mount('/qr', StaticFiles(directory='qr'), name='qr')
|
|
self.app.mount('/css', StaticFiles(directory='css'), name='css')
|
|
self.app.mount('/img', StaticFiles(directory='img'), name='img')
|
|
|
|
# @self.app.get('/favicon.ico', include_in_schema=False)
|
|
# def favicon_get():
|
|
# return FileResponse('graphic/hermes.ico')
|
|
|
|
# @self.app.get('/hermes.svg')
|
|
# def logo_get():
|
|
# return FileResponse('graphic/hermes.svg')
|
|
|
|
@self.app.get('/v/{jid}')
|
|
async def view_jid(request: Request, jid):
|
|
"""View recent messages of a conference"""
|
|
jid_path = urlsplit(jid).path
|
|
if parseaddr(jid_path)[1] == jid_path:
|
|
jid_bare = jid_path.lower()
|
|
else:
|
|
jid_bare = jid
|
|
note = 'Jabber ID appears to be malformed'
|
|
|
|
#try:
|
|
if True:
|
|
exception = jid_vcard = messages_10 = note = node_title = \
|
|
node_note = number_of_pages = page_number = previous = \
|
|
selection = services_sorted = subject = None
|
|
node_name = 'urn:xmpp:microblog:0'
|
|
link_href = 'xmpp:{}?join'.format(jid_bare)
|
|
link_text = 'Join'
|
|
xmpp_uri = '{}'.format(jid_bare)
|
|
|
|
filename = 'details/{}.toml'.format(jid_bare)
|
|
if os.path.exists(filename) and os.path.getsize(filename) > 0:
|
|
jid_details = Data.open_file_toml(filename)
|
|
else:
|
|
jid_details = await FileUtilities.cache_jid_data(
|
|
jabber_id, password, jid_bare, node_name, alias=alias)
|
|
|
|
action = jid_details['action']
|
|
count = jid_details['count']
|
|
instance = jid_details['instance']
|
|
items = jid_details['items']
|
|
jid_info = {
|
|
'error' : jid_details['error'],
|
|
'text' : jid_details['error_text'],
|
|
'condition' : jid_details['error_condition']}
|
|
jid_kind = jid_details['kind']
|
|
jid_vcard = {
|
|
'name' : jid_details['name'],
|
|
'note' : jid_details['note'],
|
|
'type' : jid_details['image_type']}
|
|
link_href = jid_details['link_href']
|
|
messages = jid_details['messages']
|
|
nodes = jid_details['nodes']
|
|
note = jid_details['note']
|
|
subject = jid_details['subject']
|
|
title = jid_details['name']
|
|
xmpp_uri = jid_details['uri']
|
|
view_href = jid_details['view_href']
|
|
|
|
# Group chat messages
|
|
# NOTE TODO
|
|
page_number = request.query_params.get('page', '')
|
|
if page_number:
|
|
try:
|
|
page_number = int(page_number)
|
|
ix = (page_number -1) * 10
|
|
except:
|
|
ix = 0
|
|
page_number = 1
|
|
else:
|
|
ix = 0
|
|
page_number = 1
|
|
messages_10 = messages[ix:][:10]
|
|
number_of_pages = int(len(messages) / 10)
|
|
if number_of_pages < len(messages) / 10: number_of_pages += 1
|
|
|
|
# Query URI links
|
|
action, instance, link_href, links, node_name, view_href, xmpp_uri = XmppUtilities.set_query_uri_link(
|
|
jid_bare, jid_info, jid_kind, node_name)
|
|
|
|
# Graphic files
|
|
filename, filepath, filetype, selection = FileUtilities.handle_photo(jid_bare, jid_vcard, link_href)
|
|
|
|
#except Exception as e:
|
|
else:
|
|
exception = str(e)
|
|
action = 'Error'
|
|
title = 'Slixmpp error'
|
|
xmpp_uri = note = jid
|
|
filename = jid_bare = link_href = link_tex = node_note = \
|
|
node_title = number_of_pages = page_number = previous = \
|
|
selection = services = services_sorted = url = None
|
|
|
|
#if title == 'remote-server-timeout':
|
|
# raise HTTPException(status_code=408, detail='remote-server-timeout')
|
|
#else:
|
|
template_file = 'conference.xhtml'
|
|
template_dict = {
|
|
'action' : action,
|
|
'exception' : exception,
|
|
'filename' : filename,
|
|
'jid_bare' : jid,
|
|
'jid_note' : note,
|
|
'jid_title' : title,
|
|
'links' : links,
|
|
'messages' : messages_10,
|
|
'node_title' : node_title,
|
|
'node_note' : node_note,
|
|
'node_name' : node_name,
|
|
'number_of_pages' : number_of_pages,
|
|
'page_number' : page_number,
|
|
'previous' : previous,
|
|
'request' : request,
|
|
'subject' : subject,
|
|
'title' : title,
|
|
'url' : request.url._url,
|
|
'xmpp_uri' : xmpp_uri}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
# NOTE Was /b/
|
|
@self.app.get('/d/{jid}/{node_name}')
|
|
@self.app.get('/d/{jid}/{node_name}/{item_id}')
|
|
async def browse_jid_node_get(request: Request, jid, node_name, item_id=None):
|
|
"""Browse items of a pubsub node"""
|
|
jid_path = urlsplit(jid).path
|
|
if parseaddr(jid_path)[1] == jid_path:
|
|
jid_bare = jid_path.lower()
|
|
else:
|
|
jid_bare = jid
|
|
note = 'Jabber ID appears to be malformed'
|
|
|
|
#try:
|
|
if True:
|
|
entries = []
|
|
exception = jid_vcard = note = node_note = number_of_pages = \
|
|
page_number = previous = selection = services_sorted = None
|
|
node_title = node_name
|
|
link_href = 'xmpp:{}?pubsub;node={};action=subscribe'.format(
|
|
jid_bare, node_name)
|
|
link_text = 'Subscribe'
|
|
xmpp_uri = '{}?;node={}'.format(jid_bare, node_name)
|
|
|
|
filename = 'details/{}.toml'.format(jid_bare)
|
|
if os.path.exists(filename) and os.path.getsize(filename) > 0:
|
|
jid_details = Data.open_file_toml(filename)
|
|
else:
|
|
jid_details = await FileUtilities.cache_jid_data(
|
|
jabber_id, password, jid_bare, node_name, item_id)
|
|
|
|
action = jid_details['action']
|
|
count = jid_details['count']
|
|
instance = jid_details['instance']
|
|
items = jid_details['items']
|
|
jid_info = {
|
|
'error' : jid_details['error'],
|
|
'text' : jid_details['error_text'],
|
|
'condition' : jid_details['error_condition']}
|
|
jid_kind = jid_details['kind']
|
|
jid_vcard = {
|
|
'name' : jid_details['name'],
|
|
'note' : jid_details['note'],
|
|
'type' : jid_details['image_type']}
|
|
link_href = jid_details['link_href']
|
|
messages = jid_details['messages']
|
|
nodes = jid_details['nodes']
|
|
note = jid_details['note']
|
|
title = jid_details['name']
|
|
xmpp_uri = jid_details['uri']
|
|
view_href = jid_details['view_href']
|
|
|
|
xmpp_instance = XmppInstance(jabber_id, password, jid_bare)
|
|
xmpp_instance.connect()
|
|
|
|
# Node item IDs
|
|
if node_name not in nodes:
|
|
nodes[node_name] = {}
|
|
node_item_ids = await XmppXep0060.get_node_item_ids(
|
|
xmpp_instance, jid_bare, node_name)
|
|
#node_item_ids = await XmppUtilities.get_item_ids_of_node(
|
|
# jabber_id, password, jid_bare, node_name, nodes)
|
|
if isinstance(node_item_ids['iq'], stanza.iq.Iq):
|
|
nodes[node_name]['count'] = len(node_item_ids['iq']['disco_items']['items'])
|
|
nodes[node_name]['item_ids'] = []
|
|
for item in node_item_ids['iq']['disco_items']['items']:
|
|
nodes[node_name]['item_ids'].append(
|
|
[item[0] or '', item[1] or '', item[2] or ''])
|
|
|
|
# Node items
|
|
if item_id:
|
|
previous = True
|
|
node_items = await XmppXep0060.get_node_items(
|
|
xmpp_instance, jid_bare, node_name, item_ids=[item_id])
|
|
else:
|
|
item_ids = []
|
|
for item in nodes[node_name]['item_ids']:
|
|
item_ids.append(item[2])
|
|
# NOTE Consider to neglect the reversal of order, because, then, items can be found at the same page.
|
|
item_ids.reverse()
|
|
page_number = request.query_params.get('page', '')
|
|
if page_number:
|
|
try:
|
|
page_number = int(page_number)
|
|
ix = (page_number -1) * 10
|
|
except:
|
|
ix = 0
|
|
page_number = 1
|
|
else:
|
|
ix = 0
|
|
page_number = 1
|
|
item_ids_10 = item_ids[ix:][:10]
|
|
number_of_pages = int(len(item_ids) / 10)
|
|
if number_of_pages < len(item_ids) / 10: number_of_pages += 1
|
|
node_items = await XmppXep0060.get_node_items(
|
|
xmpp_instance, jid_bare, node_name, item_ids=item_ids_10)
|
|
if not node_items:
|
|
action = 'Warning'
|
|
node_title = jid_info['condition']
|
|
node_note = jid_info['text']
|
|
services = services_sorted = None
|
|
elif isinstance(node_items, IqTimeout):
|
|
action = 'Warning'
|
|
node_title = 'Timeout'
|
|
node_note = 'Timeout error'
|
|
services = services_sorted = None
|
|
elif isinstance(node_items, IqError):
|
|
action = 'Warning'
|
|
breakpoint()
|
|
node_title = node_items['condition']
|
|
node_note = node_items['text']
|
|
services = services_sorted = None
|
|
else:
|
|
#title = title or node_name
|
|
if not node_title: node_title = node_name
|
|
node_note = jid_bare
|
|
for item in node_items['pubsub']['items']:
|
|
item_payload = item['payload']
|
|
entry = Syndication.extract_items(item_payload)
|
|
if entry: entry['id'] = item['id']
|
|
entries.append(entry)
|
|
#if len(entries) > 10: break
|
|
if entries: entries.reverse()
|
|
|
|
xmpp_instance.disconnect()
|
|
|
|
# Query URI links
|
|
action, instance, link_href, links, node_name, view_href, xmpp_uri = XmppUtilities.set_query_uri_link(
|
|
jid_bare, jid_info, jid_kind, node_name, item_id)
|
|
|
|
# Graphic files
|
|
filename, filepath, filetype, selection = FileUtilities.handle_photo(jid_bare, jid_vcard, link_href)
|
|
|
|
#except Exception as e:
|
|
else:
|
|
exception = str(e)
|
|
action = 'Error'
|
|
title = 'Slixmpp error'
|
|
xmpp_uri = note = jid
|
|
filename = jid_bare = link_href = link_tex = node_note = \
|
|
node_title = number_of_pages = page_number = previous = \
|
|
selection = services = services_sorted = url = None
|
|
|
|
#if title == 'remote-server-timeout':
|
|
# raise HTTPException(status_code=408, detail='remote-server-timeout')
|
|
#else:
|
|
template_file = 'node.xhtml'
|
|
template_dict = {
|
|
'action' : action,
|
|
'entries' : entries,
|
|
'exception' : exception,
|
|
'filename' : filename,
|
|
'jid_bare' : jid,
|
|
'jid_note' : note,
|
|
'jid_title' : title,
|
|
'links' : links,
|
|
'node_title' : node_title,
|
|
'node_note' : node_note,
|
|
'node_name' : node_name,
|
|
'number_of_pages' : number_of_pages,
|
|
'page_number' : page_number,
|
|
'previous' : previous,
|
|
'request' : request,
|
|
'title' : node_title,
|
|
'url' : request.url._url,
|
|
'xmpp_uri' : xmpp_uri}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/d/{jid}')
|
|
async def discover_jid_get(request: Request, jid):
|
|
"""View items of a selected service"""
|
|
|
|
jid_path = urlsplit(jid).path
|
|
if parseaddr(jid_path)[1] == jid_path:
|
|
jid_bare = jid_path.lower()
|
|
else:
|
|
jid_bare = jid
|
|
note = 'Jabber ID appears to be malformed'
|
|
|
|
#try:
|
|
if True:
|
|
exception = note = selection = services_sorted = None
|
|
title = 'Services'
|
|
link_href = xmpp_uri = jid_bare
|
|
link_text = 'Reload'
|
|
|
|
# Start an XMPP instance and retrieve information
|
|
xmpp_instance = XmppInstance(jabber_id, password, jid_bare)
|
|
xmpp_instance.connect()
|
|
|
|
# JID services
|
|
action = 'Discover'
|
|
jid_info = await XmppXep0030.get_jid_info(xmpp_instance, jid_bare)
|
|
iq = jid_info['iq']
|
|
if iq:
|
|
jid_kind = jid_info['kind']
|
|
iq_disco_info = iq['disco_info']
|
|
for identity in iq_disco_info['identities']:
|
|
if jid_kind == identity[0] and identity[3]:
|
|
note = identity[3]
|
|
if not note: note = jid_bare
|
|
jid_items = await XmppXep0030.get_jid_items(xmpp_instance, jid_bare)
|
|
iq = jid_items['iq']
|
|
iq_disco_items = iq['disco_items']
|
|
iq_disco_items_items = iq_disco_items['items']
|
|
services = {}
|
|
#services_sorted = {}
|
|
category = 'unsorted'
|
|
for item in iq_disco_items_items:
|
|
jid_bare = item[0]
|
|
if len(iq_disco_items_items) > 20 or jid_kind and jid_kind in ('pubsub'):
|
|
identity = sub_jid_info = sub_jid_info_iq = ''
|
|
if jid_kind and jid_kind in ('conference', 'mix', 'muc'):
|
|
category = 'conference'
|
|
if jid_kind and jid_kind in ('pubsub'):
|
|
category = 'pubsub'
|
|
else:
|
|
sub_jid_info = await XmppXep0030.get_jid_info(xmpp_instance, jid_bare)
|
|
sub_jid_info_iq = sub_jid_info['iq']
|
|
try:
|
|
for identity_item in sub_jid_info_iq['disco_info']['identities']:
|
|
identity = identity_item
|
|
break
|
|
if sub_jid_info_iq:
|
|
category = identity[0] if (identity, list) and identity[0] else 'other'
|
|
except:
|
|
identity = None
|
|
category = 'unavailable'
|
|
|
|
sub_jid_kind = sub_jid_info['kind'] if 'kind' in sub_jid_info else None
|
|
if category not in services: services[category] = []
|
|
|
|
services[category].append(
|
|
{'identity' : identity,
|
|
'info' : sub_jid_info,
|
|
'jid' : jid_bare,
|
|
'kind' : sub_jid_kind,
|
|
'name' : item[2] or item[1] or item[0],
|
|
'node' : item[1]})
|
|
|
|
services_sorted = {k: v for k, v in services.items() if k != 'unavailable'}
|
|
if 'unavailable' in services: services_sorted['unavailable'] = services['unavailable']
|
|
else:
|
|
action = 'Warning'
|
|
title = jid_info['condition']
|
|
note = jid_info['text']
|
|
services = services_sorted = None
|
|
|
|
xmpp_instance.disconnect()
|
|
|
|
#except Exception as e:
|
|
else:
|
|
exception = str(e)
|
|
action = 'Error'
|
|
title = 'Slixmpp error'
|
|
xmpp_uri = note = jid
|
|
filename = jid_bare = link_href = link_text = selection = services = services_sorted = url = None
|
|
|
|
#if title == 'remote-server-timeout':
|
|
# raise HTTPException(status_code=408, detail='remote-server-timeout')
|
|
#else:
|
|
template_file = 'disco.xhtml'
|
|
template_dict = {
|
|
'action' : action,
|
|
'exception' : exception,
|
|
'filename' : 'default.svg',
|
|
'jid_bare' : jid,
|
|
'note' : note,
|
|
'request' : request,
|
|
'services' : services_sorted,
|
|
'title' : title,
|
|
'url' : request.url._url,
|
|
'link_href' : link_href,
|
|
'link_text' : link_text,
|
|
'xmpp_uri' : xmpp_uri}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/j/{jid}/{node_name}')
|
|
async def jid_node_get(request: Request, jid, node_name):
|
|
response = await main_jid_node_get(request, jid, node_name)
|
|
return response
|
|
|
|
@self.app.get('/j/{jid}')
|
|
async def jid_get(request: Request, jid):
|
|
node_name = request.query_params.get('node', '')
|
|
if node_name:
|
|
response = RedirectResponse(url='/j/{}/{}'.format(jid, node_name))
|
|
else:
|
|
response = await main_jid_node_get(request, jid)
|
|
return response
|
|
|
|
async def main_jid_node_get(request: Request, jid, node_name=None):
|
|
|
|
jid_bare = jid
|
|
jid_path = urlsplit(jid).path
|
|
if parseaddr(jid_path)[1] == jid_path:
|
|
jid_bare = jid_path.lower()
|
|
else:
|
|
jid_bare = jid
|
|
note = 'Jabber ID appears to be malformed'
|
|
|
|
#try:
|
|
if True:
|
|
action = count = exception = instance = jid_vcard = \
|
|
jid_info = link_href = message = note = selection = title = \
|
|
view_href = xmpp_uri = None
|
|
#node_name = 'urn:xmpp:microblog:0'
|
|
|
|
filename = 'details/{}.toml'.format(jid_bare)
|
|
if os.path.exists(filename) and os.path.getsize(filename) > 0:
|
|
jid_details = Data.open_file_toml(filename)
|
|
else:
|
|
jid_details = await FileUtilities.cache_jid_data(
|
|
jabber_id, password, jid_bare, node_name, alias=alias)
|
|
|
|
# Set node name to 'urn:xmpp:microblog:0'
|
|
jid_kind = jid_details['kind']
|
|
nodes = jid_details['nodes']
|
|
if (jid_kind not in ('conference', 'mix', 'muc') and
|
|
'@' in jid_bare and
|
|
not node_name and
|
|
'urn:xmpp:microblog:0' in nodes):
|
|
node_name = 'urn:xmpp:microblog:0'
|
|
|
|
if ('@' in jid_bare and
|
|
'urn:xmpp:microblog:0' not in nodes and
|
|
jid_kind not in ('conference', 'mix', 'muc')):
|
|
count = 0
|
|
else:
|
|
count = nodes[node_name]['count'] if node_name in nodes else jid_details['count']
|
|
|
|
action = jid_details['action']
|
|
instance = jid_details['instance']
|
|
items = jid_details['items']
|
|
jid_info = {
|
|
'error' : jid_details['error'],
|
|
'text' : jid_details['error_text'],
|
|
'condition' : jid_details['error_condition']}
|
|
jid_vcard = {
|
|
'name' : jid_details['name'],
|
|
'note' : jid_details['note'],
|
|
'type' : jid_details['image_type']}
|
|
link_href = jid_details['link_href']
|
|
messages = jid_details['messages']
|
|
note = jid_details['note']
|
|
title = jid_details['name']
|
|
xmpp_uri = jid_details['uri']
|
|
view_href = jid_details['view_href']
|
|
|
|
if node_name not in nodes:
|
|
nodes[node_name] = await XmppUtilities.get_item_ids_of_node(
|
|
jabber_id, password, jid_bare, node_name, nodes)
|
|
|
|
# Query URI links
|
|
action, instance, link_href, links, node_name, view_href, xmpp_uri = XmppUtilities.set_query_uri_link(
|
|
jid_bare, jid_info, jid_kind, node_name)
|
|
|
|
# Graphic files
|
|
filename, filepath, filetype, selection = FileUtilities.handle_photo(jid_bare, jid_vcard, link_href)
|
|
|
|
#except Exception as e:
|
|
else:
|
|
exception = str(e)
|
|
print(exception)
|
|
action = 'Error'
|
|
title = 'Slixmpp error'
|
|
xmpp_uri = jid
|
|
count = filename = jid_bare = jid_vcard = jid_kind = links = \
|
|
message = selection = url = None
|
|
|
|
template_file = 'jid.xhtml'
|
|
template_dict = {
|
|
'action' : action,
|
|
'count' : count,
|
|
'instance' : instance,
|
|
'exception' : exception,
|
|
'filename' : filename,
|
|
'jid_bare' : jid_bare,
|
|
'jid_kind' : jid_kind,
|
|
'links' : links,
|
|
'message' : message,
|
|
'note' : note,
|
|
'request' : request,
|
|
'selection' : selection,
|
|
'title' : title,
|
|
'url' : request.url._url,
|
|
'view_href' : view_href,
|
|
'xmpp_uri' : xmpp_uri}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.exception_handler(404)
|
|
def not_found_exception_handler(request: Request, exc: HTTPException):
|
|
action = 'Warning'
|
|
title = 'Not Found'
|
|
return result_get(request, action, title)
|
|
|
|
@self.app.exception_handler(500)
|
|
def internal_error_exception_handler(request: Request, exc: HTTPException):
|
|
action = 'Error'
|
|
title = 'Internal Server Error'
|
|
return result_get(request, action, title)
|
|
|
|
def result_get(request: Request, action: str, title: str):
|
|
template_file = 'result.xhtml'
|
|
template_dict = {
|
|
'action' : action,
|
|
'request' : request,
|
|
'title' : title,
|
|
'url' : request.url._url}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/')
|
|
async def main_get(request: Request):
|
|
jabber_id = request.query_params.get('jid', '')
|
|
if jabber_id:
|
|
response = RedirectResponse(url='/j/' + jabber_id)
|
|
else:
|
|
template_file = 'main.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'url' : request.url._url}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
class Data:
|
|
|
|
def open_file_toml(filename: str) -> dict:
|
|
with open(filename, mode="rb") as fn:
|
|
data = tomllib.load(fn)
|
|
return data
|
|
|
|
def save_to_toml(filename: str, data: dict) -> None:
|
|
with open(filename, 'w') as fn:
|
|
data_as_string = tomli_w.dumps(data)
|
|
fn.write(data_as_string)
|
|
|
|
class FileUtilities:
|
|
|
|
async def cache_jid_data(jabber_id, password, jid_bare, node_name=None, item_id=None, alias=None):
|
|
|
|
iq_disco_items_list = iq_disco_items_items_list = node_title = title = ''
|
|
jid_vcard = {
|
|
'name' : '',
|
|
'note' : '',
|
|
'type' : '',
|
|
'bin' : ''}
|
|
|
|
#filename = 'details/{}.toml'.format(jid_bare)
|
|
#if os.path.exists(filename): jid_details = Data.open_file_toml(filename)
|
|
|
|
# Start an XMPP instance and retrieve information
|
|
xmpp_instance = XmppInstance(jabber_id, password, jid_bare)
|
|
xmpp_instance.connect()
|
|
|
|
# JID kind
|
|
print('JID kind')
|
|
instance = message = node_id = None
|
|
jid_info = await XmppXep0030.get_jid_info(xmpp_instance, jid_bare)
|
|
jid_info_iq = jid_info['iq']
|
|
jid_kind = jid_info['kind']
|
|
|
|
# Query URI links
|
|
print('Query URI links')
|
|
action, instance, link_href, links, node_name, view_href, xmpp_uri = XmppUtilities.set_query_uri_link(
|
|
jid_bare, jid_info, jid_kind, node_name)
|
|
|
|
# JID info
|
|
print('JID info')
|
|
# NOTE Group chat of Psi+ Project at jabber.ru has a note in its vCard.
|
|
vcard_data = await XmppXep0054.get_vcard_data(xmpp_instance, jid_bare)
|
|
if not vcard_data['error']:
|
|
conference_title = None
|
|
if jid_kind in ('mix', 'muc'):
|
|
for identity in jid_info_iq['disco_info']['identities']:
|
|
if identity[3]:
|
|
conference_title = identity[3]
|
|
break
|
|
vcard_temp = vcard_data['iq']['vcard_temp']
|
|
jid_vcard = {
|
|
'name' : vcard_temp['FN'] or conference_title or '',
|
|
'note' : vcard_temp['notes'] or node_id or '',
|
|
'type' : vcard_temp['PHOTO']['TYPE'] or '',
|
|
'bin' : vcard_temp['PHOTO']['BINVAL'] or ''
|
|
}
|
|
|
|
# TODO /d/pubsub.nicoco.fr/blog/urn-uuid-53e43061-1962-3112-bb8a-1473dca61719
|
|
count = ''
|
|
jid_items = await XmppXep0030.get_jid_items(xmpp_instance, jid_bare)
|
|
if isinstance(jid_items['iq'], stanza.iq.Iq):
|
|
iq = jid_items['iq']
|
|
iq_disco_items = iq['disco_items']
|
|
iq_disco_items_items = iq_disco_items['items']
|
|
#iq_disco_items_set = {''}
|
|
iq_disco_items_list = []
|
|
iq_disco_items_items_list = []
|
|
for item in iq_disco_items_items:
|
|
if jid_kind == 'muc':
|
|
#iq_disco_items_set.update([item[2]])
|
|
iq_disco_items_list.append(item[2])
|
|
else:
|
|
#iq_disco_items_set.update([item[1]])
|
|
iq_disco_items_list.append(item[1])
|
|
iq_disco_items_items_list.append(
|
|
[item[0] or '', item[1] or '', item[2] or ''])
|
|
#count = len(iq_disco_items_set)
|
|
count = len(iq_disco_items_list)
|
|
|
|
# Title
|
|
print('Title')
|
|
if jid_kind not in ('conference', 'mix', 'muc') and '@' in jid_bare:
|
|
# NOTE Variables node_name and node_title do not appear to be utilized.
|
|
node_name = 'urn:xmpp:microblog:0'
|
|
node_title = 'Journal'
|
|
elif jid_kind == 'pubsub':
|
|
category = 'unsorted'
|
|
for item in iq_disco_items_items:
|
|
if item[2] and item[1] == node_name:
|
|
#title = item[2]
|
|
title = node_title = item[2]
|
|
break
|
|
else:
|
|
jid_items = None
|
|
|
|
if jid_kind == 'server':
|
|
if jid_info_iq:
|
|
for identity in jid_info_iq['disco_info']['identities']:
|
|
if jid_kind == identity[0] and identity[1] == 'im' and identity[3]:
|
|
title = identity[3]
|
|
print(jid_bare)
|
|
print(identity)
|
|
print(jid_info)
|
|
# String 'undefined' is sourced from JID discuss@conference.conversejs.org
|
|
if not title:
|
|
if jid_vcard['name'] and not 'undefined' in jid_vcard['name']:
|
|
title = jid_vcard['name']
|
|
else:
|
|
title = jid_bare.split('@')[0]
|
|
|
|
# JID item count
|
|
#count = await XmppUtilities.count_jid_items(xmpp_instance, jid_bare, node_name, jid_kind)
|
|
#if jid_kind in ('mix', 'muc', 'conference', 'server'):
|
|
# jid_items = await XmppXep0030.get_jid_items(xmpp_instance, jid_bare)
|
|
# if isinstance(jid_items['iq'], stanza.iq.Iq):
|
|
# count = len(jid_items['iq']['disco_items']['items'])
|
|
#elif jid_kind in ('account', 'pubsub'):
|
|
# node_item_ids = await XmppXep0060.get_node_item_ids(xmpp_instance, jid_bare, node_name)
|
|
# if isinstance(node_item_ids, stanza.iq.Iq):
|
|
# count = len(node_item_ids['disco_items']['items'])
|
|
|
|
# Group chat messages
|
|
print('Group chat messages')
|
|
messages = []
|
|
subject = ''
|
|
if jid_kind == 'muc':
|
|
action = 'Join'
|
|
# TODO Create configurations for group chat preview
|
|
room_info_muc = await XmppXep0045.get_room_information(xmpp_instance, jid_bare, alias, maxstanzas=50)
|
|
if not room_info_muc:
|
|
action = 'Warning'
|
|
node_title = jid_info['condition']
|
|
node_note = jid_info['text']
|
|
services = services_sorted = None
|
|
elif isinstance(room_info_muc, IqTimeout):
|
|
action = 'Warning'
|
|
node_title = 'Timeout'
|
|
node_note = 'Timeout error'
|
|
services = services_sorted = None
|
|
elif isinstance(room_info_muc, IqError):
|
|
action = 'Warning'
|
|
breakpoint()
|
|
node_title = room_info_muc['condition']
|
|
node_note = room_info_muc['text']
|
|
services = services_sorted = None
|
|
else:
|
|
for message in room_info_muc['iq'][3]:
|
|
messages.append({
|
|
'id' : message['id'],
|
|
'alias' : message['mucnick'],
|
|
'body' : message['body'],
|
|
'timestamp' : message['delay']['stamp'].__str__()})
|
|
messages.reverse()
|
|
subject = room_info_muc['iq'][1]['subject']
|
|
#title = title or node_name
|
|
if not node_title: node_title = node_name
|
|
node_note = jid_bare
|
|
|
|
# Node items
|
|
print('Node items')
|
|
nodes = {}
|
|
#if node_name and node_name in iq_disco_items_set:
|
|
if iq_disco_items_list and node_name and node_name in iq_disco_items_list:
|
|
action = 'Browse'
|
|
node_item_ids = await XmppXep0060.get_node_item_ids(xmpp_instance, jid_bare, node_name)
|
|
if isinstance(node_item_ids['iq'], stanza.iq.Iq):
|
|
nodes[node_name] = {}
|
|
nodes[node_name]['count'] = len(node_item_ids['iq']['disco_items']['items'])
|
|
nodes[node_name]['item_ids'] = []
|
|
for item_id in node_item_ids['iq']['disco_items']['items']:
|
|
nodes[node_name]['item_ids'].append(
|
|
[item_id[0] or '', item_id[1] or '', item_id[2] or ''])
|
|
|
|
xmpp_instance.disconnect()
|
|
|
|
# Notes
|
|
print('Notes')
|
|
jid_vcard_note = jid_vcard['note']
|
|
if isinstance(jid_vcard_note, list) and len(jid_vcard_note):
|
|
note = jid_vcard_note[0]['NOTE']
|
|
else:
|
|
note = jid_vcard_note
|
|
#if not note and jid_vcard['name'] and not 'undefined' in jid_vcard['name'] and title != jid_vcard['name']:
|
|
# note = jid_vcard['name']
|
|
|
|
jid_details = {
|
|
'action' : action or '',
|
|
'count' : count or '',
|
|
'error' : jid_info['error'],
|
|
'error_text' : jid_info['text'] or '',
|
|
'error_condition' : jid_info['condition'] or '',
|
|
'image_type' : jid_vcard['type'],
|
|
'instance' : instance or '',
|
|
'items' : iq_disco_items_items_list,
|
|
'kind' : jid_kind or '',
|
|
'link_href' : link_href,
|
|
'messages' : messages or '',
|
|
'name' : title,
|
|
'nodes' : nodes,
|
|
'note' : note or '',
|
|
'subject' : subject or '',
|
|
'uri' : xmpp_uri or '',
|
|
'view_href' : view_href}
|
|
|
|
print(jid_details)
|
|
|
|
FileUtilities.handle_photo(jid_bare, jid_vcard, link_href)
|
|
|
|
filename = 'details/{}.toml'.format(jid_bare)
|
|
Data.save_to_toml(filename, jid_details)
|
|
|
|
return jid_details
|
|
|
|
def handle_photo(jid_bare, jid_vcard, link_href):
|
|
filename = filepath = filetype = mimetype = selection = None
|
|
filecirca = 'photo/{}.*'.format(jid_bare)
|
|
filepath_guess = glob.glob(filecirca)
|
|
if filepath_guess:
|
|
filepath = filepath_guess[0]
|
|
filetype = filepath.split('.').pop()
|
|
filename = '{}.{}'.format(jid_bare, filetype)
|
|
elif jid_vcard:
|
|
if jid_vcard['type']:
|
|
mimetype = jid_vcard['type']
|
|
if mimetype:
|
|
filetype = mimetype.split('/')[1]
|
|
if filetype == 'svg+xml': filetype = 'svg'
|
|
filename = '{}.{}'.format(jid_bare, filetype)
|
|
filepath = 'photo/{}.{}'.format(jid_bare, filetype)
|
|
#img.save(filename)
|
|
|
|
# Write the decoded bytes to a file
|
|
with open(filepath, 'wb') as file:
|
|
file.write(jid_vcard['bin'])
|
|
|
|
if not filepath or not os.path.exists(filepath) or os.path.getsize(filepath) == 0:
|
|
filename = 'default.svg'
|
|
elif filetype == 'svg':
|
|
selection = Graphics.extract_colours_from_vector(filepath)
|
|
else:
|
|
selection = Graphics.extract_colours_from_raster(filepath)
|
|
|
|
# QR code
|
|
filepath_qrcode = 'qr/{}.png'.format(jid_bare)
|
|
if not os.path.exists(filepath_qrcode) or os.path.getsize(filepath_qrcode) == 0:
|
|
Graphics.generate_qr_code_graphics_from_string(link_href, jid_bare)
|
|
|
|
return filename, filepath, filetype, selection
|
|
|
|
class Graphics:
|
|
|
|
|
|
def extract_colours_from_raster(filepath):
|
|
try:
|
|
img = cv2.imread(filepath)
|
|
#thresholded = cv2.inRange(img, (50, 100, 200), (50, 100, 200))
|
|
thresholded = cv2.inRange(img, (90, 90, 90), (190, 190, 190))
|
|
#thresholded = cv2.bitwise_not(thresholded)
|
|
#thresholded = cv2.inRange(img, (0, 0, 0), (0, 0, 0))
|
|
#res = img + cv2.cvtColor(thresholded, cv2.COLOR_GRAY2BGR)
|
|
|
|
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
|
|
#result = numpy.clip(img, 90, 190)
|
|
#result = numpy.clip(img, 50, 200)
|
|
#result = numpy.clip(img, 100, 150)
|
|
result = numpy.clip(img, 100, 200)
|
|
res = cv2.cvtColor(result, cv2.COLOR_RGB2BGR)
|
|
|
|
"""
|
|
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
|
|
mask = numpy.all(numpy.logical_and(img >= 90, img <= 190), axis=2)
|
|
result = numpy.where(mask[...,None], img, 255)
|
|
res = cv2.cvtColor(result, cv2.COLOR_RGB2BGR)
|
|
"""
|
|
|
|
"""
|
|
# Thresholding for black:
|
|
lower_black = numpy.array([0, 0, 0])
|
|
upper_black = numpy.array([50, 50, 50]) # Adjust this value for the black range
|
|
black_mask = cv2.inRange(img, lower_black, upper_black)
|
|
|
|
# Thresholding for white:
|
|
lower_white = numpy.array([250, 250, 250])
|
|
upper_white = numpy.array([255, 255, 255])
|
|
white_mask = cv2.inRange(img, lower_white, upper_white)
|
|
|
|
# Combine the masks
|
|
combined_mask = cv2.bitwise_or(black_mask, white_mask)
|
|
|
|
# Invert the combined mask
|
|
inverted_mask = cv2.bitwise_not(combined_mask)
|
|
|
|
# Apply the mask to the original image
|
|
res = cv2.bitwise_and(img, img, mask=inverted_mask)
|
|
"""
|
|
|
|
selection = []
|
|
|
|
ix_1st = random.randint(1, len(res)-1)
|
|
res_ix_1st = res[ix_1st]
|
|
ix_ix_1st = random.randint(1, len(res_ix_1st)-1)
|
|
res_ix_ix_1st = res_ix_1st[ix_ix_1st]
|
|
selection.append(numpy.array(res_ix_ix_1st).tolist())
|
|
|
|
ix_2nd = random.randint(1, len(res)-1)
|
|
res_ix_2nd = res[ix_2nd]
|
|
ix_ix_2nd = random.randint(1, len(res_ix_2nd)-1)
|
|
res_ix_ix_2nd = res_ix_2nd[ix_ix_2nd]
|
|
selection.append(numpy.array(res_ix_ix_2nd).tolist())
|
|
print(selection)
|
|
|
|
except Exception as e:
|
|
selection = None
|
|
exception = str(e)
|
|
print(exception)
|
|
|
|
return selection
|
|
|
|
def extract_colours_from_vector(filepath):
|
|
# Parse the SVG file
|
|
tree = ET.parse(filepath)
|
|
root = tree.getroot()
|
|
|
|
# Set to store unique colours
|
|
colours_hex = set()
|
|
colours_rgb = []
|
|
|
|
# SVG namespace
|
|
namespace = {'svg': 'http://www.w3.org/2000/svg'}
|
|
|
|
# Find all possible elements
|
|
for elem in root.findall('.//svg:circle', namespace) + \
|
|
root.findall('.//svg:ellipse', namespace) + \
|
|
root.findall('.//svg:line', namespace) + \
|
|
root.findall('.//svg:path', namespace) + \
|
|
root.findall('.//svg:polygon', namespace) + \
|
|
root.findall('.//svg:rect', namespace) + \
|
|
root.findall('.//svg:text', namespace):
|
|
|
|
fill = elem.get('fill')
|
|
stroke = elem.get('stroke')
|
|
|
|
# Add colours to the set if they are not None or 'none'
|
|
if fill and fill.startswith('#') and len(fill) > 4 and fill.lower() != 'none':
|
|
colours_hex.add(fill)
|
|
if stroke and stroke.startswith('#') and len(stroke) > 4 and stroke.lower() != 'none':
|
|
colours_hex.add(stroke)
|
|
|
|
for colour in colours_hex:
|
|
hex = colour.lstrip('#')
|
|
rgb = list(int(hex[i:i+2], 16) for i in (0, 2, 4))
|
|
rgb.reverse()
|
|
colours_rgb.append(rgb)
|
|
|
|
selection = []
|
|
if len(colours_rgb) > 1:
|
|
for i in range(2):
|
|
ix = random.randint(0, len(colours_rgb)-1)
|
|
selection.append(colours_rgb[ix])
|
|
del colours_rgb[ix]
|
|
elif len(colours_rgb) == 1:
|
|
selection = [colours_rgb[0], colours_rgb[0]]
|
|
|
|
return selection
|
|
|
|
def generate_qr_code_graphics_from_string(text, jid_bare):
|
|
#qrcode_graphics = qrcode.make(text)
|
|
qr = qrcode.QRCode(border=2, box_size=10)
|
|
qr.add_data(text)
|
|
qrcode_graphics = qr.make_image(fill_color='#333', back_color='#f2f2f2')
|
|
qrcode_graphics.save('qr/{}.png'.format(jid_bare))
|
|
|
|
class Syndication:
|
|
|
|
def extract_items(item_payload, limit=False):
|
|
namespace = '{http://www.w3.org/2005/Atom}'
|
|
title = item_payload.find(namespace + 'title')
|
|
links = item_payload.find(namespace + 'link')
|
|
if (not isinstance(title, ET.Element) and
|
|
not isinstance(links, ET.Element)): return None
|
|
title_text = '' if title == None else title.text
|
|
link_href = ''
|
|
if isinstance(links, ET.Element):
|
|
for link in item_payload.findall(namespace + 'link'):
|
|
link_href = link.attrib['href'] if 'href' in link.attrib else ''
|
|
if link_href: break
|
|
contents = item_payload.find(namespace + 'content')
|
|
content_text = ''
|
|
if isinstance(contents, ET.Element):
|
|
for content in item_payload.findall(namespace + 'content'):
|
|
content_text = content.text or ''
|
|
if content_text: break
|
|
summaries = item_payload.find(namespace + 'summary')
|
|
summary_text = ''
|
|
if isinstance(summaries, ET.Element):
|
|
for summary in item_payload.findall(namespace + 'summary'):
|
|
summary_text = summary.text or ''
|
|
if summary_text: break
|
|
published = item_payload.find(namespace + 'published')
|
|
published_text = '' if published == None else published.text
|
|
categories = item_payload.find(namespace + 'category')
|
|
tags = []
|
|
if isinstance(categories, ET.Element):
|
|
for category in item_payload.findall(namespace + 'category'):
|
|
if 'term' in category.attrib and category.attrib['term']:
|
|
category_term = category.attrib['term']
|
|
if len(category_term) < 20:
|
|
tags.append(category_term)
|
|
elif len(category_term) < 50:
|
|
tags.append(category_term)
|
|
if limit and len(tags) > 4: break
|
|
|
|
|
|
identifier = item_payload.find(namespace + 'id')
|
|
if identifier and identifier.attrib: print(identifier.attrib)
|
|
identifier_text = '' if identifier == None else identifier.text
|
|
|
|
instances = '' # TODO Check the Blasta database for instances.
|
|
|
|
entry = {'content' : content_text,
|
|
'href' : link_href,
|
|
'published' : published_text,
|
|
'summary' : summary_text,
|
|
'tags' : tags,
|
|
'title' : title_text,
|
|
'updated' : published_text} # TODO "Updated" is missing
|
|
return entry
|
|
|
|
class XmppUtilities:
|
|
|
|
async def get_item_ids_of_node(jabber_id, password, jid_bare, node_name, nodes):
|
|
xmpp_instance = XmppInstance(jabber_id, password, jid_bare)
|
|
xmpp_instance.connect()
|
|
node_item_ids = await XmppXep0060.get_node_item_ids(xmpp_instance, jid_bare, node_name)
|
|
xmpp_instance.disconnect()
|
|
return node_item_ids
|
|
|
|
def set_query_uri_link(jid_bare, jid_info, jid_kind, node_name=None, item_id=None):
|
|
links = []
|
|
if jid_info['error']:
|
|
message = '{}: {} (XEP-0030)'.format(jid_info['text'], jid_info['condition'])
|
|
action = 'Connect with'
|
|
link_href = 'xmpp:{}'.format(jid_bare)
|
|
links.append({'name' : 'Connect',
|
|
'href' : link_href,
|
|
'iden' : 'connect'})
|
|
xmpp_uri = jid_bare
|
|
instance = view_href = ''
|
|
elif jid_kind in ('conference', 'server'):
|
|
action = 'Discover'
|
|
if jid_kind == 'conference':
|
|
instance = 'conferences'
|
|
elif jid_kind == 'server':
|
|
instance = 'services'
|
|
link_href = 'xmpp:{}?disco;type=get;request=items'.format(jid_bare)
|
|
links.append({'name' : 'Discover',
|
|
'href' : link_href,
|
|
'iden' : 'discover'})
|
|
view_href = '/d/' + jid_bare
|
|
xmpp_uri = jid_bare
|
|
elif jid_kind in ('mix', 'muc'):
|
|
#title = 'Group Chat ' + title
|
|
# TODO Set group chat subject as description.
|
|
action = 'Join'
|
|
instance = 'participants'
|
|
link_href = 'xmpp:{}?join'.format(jid_bare)
|
|
links.append({'name' : 'Join',
|
|
'href' : link_href,
|
|
'iden' : 'join'})
|
|
view_href = '/v/' + jid_bare
|
|
xmpp_uri = jid_bare
|
|
# room_info = await XmppXep0045.get_room_data(xmpp_instance, jid_bare)
|
|
# breakpoint()
|
|
elif jid_kind == 'pubsub':
|
|
#node_name = request.query_params.get('node', '')
|
|
if node_name:
|
|
action = 'Subscribe'
|
|
instance = 'articles'
|
|
link_href = 'xmpp:{}?pubsub;node={};action=subscribe'.format(jid_bare, node_name)
|
|
view_href = '/d/{}/{}'.format(jid_bare, node_name)
|
|
xmpp_uri = '{}?;node={}'.format(jid_bare, node_name)
|
|
else:
|
|
action = 'Browse'
|
|
instance = 'nodes'
|
|
link_href = 'xmpp:{}?disco;type=get;request=items'.format(jid_bare)
|
|
links.append({'name' : 'Browse',
|
|
'href' : link_href,
|
|
'iden' : 'browse'})
|
|
view_href = '/d/' + jid_bare
|
|
xmpp_uri = jid_bare
|
|
else:
|
|
action = 'Message'
|
|
instance = 'articles'
|
|
link_href = 'xmpp:{}?message'.format(jid_bare)
|
|
links.append({'name' : 'Add',
|
|
'href' : 'xmpp:{}?roster'.format(jid_bare),
|
|
'iden' : 'add'})
|
|
links.append({'name' : 'Message',
|
|
'href' : link_href,
|
|
'iden' : 'message'})
|
|
#node_name = 'urn:xmpp:microblog:0'
|
|
view_href = '/d/{}/{}'.format(jid_bare, node_name)
|
|
xmpp_uri = jid_bare
|
|
if item_id:
|
|
links.append({'name' : 'Subscribe',
|
|
'href' : 'xmpp:{}?pubsub;node={};item={};action=subscribe'.format(jid_bare, node_name, item_id),
|
|
'iden' : 'subscribe'})
|
|
links.append({'name' : 'View',
|
|
'href' : 'xmpp:{}?pubsub;node={};item={}'.format(jid_bare, node_name, item_id),
|
|
'iden' : 'view'})
|
|
elif node_name:
|
|
links.append({'name' : 'Subscribe',
|
|
'href' : 'xmpp:{}?pubsub;node={};action=subscribe'.format(jid_bare, node_name),
|
|
'iden' : 'subscribe'})
|
|
links.append({'name' : 'View',
|
|
'href' : 'xmpp:{}?pubsub;node={}'.format(jid_bare, node_name),
|
|
'iden' : 'view'})
|
|
links.append({'name' : 'vCard',
|
|
'href' : 'xmpp:{}?vcard'.format(jid_bare),
|
|
'iden' : 'vcard'})
|
|
return action, instance, link_href, links, node_name, view_href, xmpp_uri
|
|
|
|
class XmppXep0030:
|
|
|
|
async def get_jid_items(self, jid_bare):
|
|
try:
|
|
condition = text = None
|
|
error = False
|
|
iq = await self['xep_0030'].get_items(jid=jid_bare)
|
|
except (IqError, IqTimeout) as e:
|
|
#logger.warning('Chat type could not be determined for {}'.format(jid_bare))
|
|
#logger.error(e)
|
|
iq = None
|
|
error = True
|
|
condition = e.iq['error']['condition']
|
|
text = e.iq['error']['text'] or 'Error'
|
|
#if not text:
|
|
# # NOTE We might want to set a specific photo for condition remote-server-not-found
|
|
# if condition:
|
|
# text = 'Could not determine JID type'
|
|
# else:
|
|
# text = 'Unknown Error'
|
|
result = {
|
|
'condition' : condition,
|
|
'error' : error,
|
|
'iq' : iq,
|
|
'text' : text}
|
|
return result
|
|
|
|
# NOTE
|
|
# Feature "urn:xmpp:mucsub:0" is present in both, MUC local and MUC hostname
|
|
# Feature "urn:xmpp:serverinfo:0" is present in both, MUC hostname and main hostname
|
|
async def get_jid_info(self, jid_bare):
|
|
jid_kind = None
|
|
try:
|
|
error = False
|
|
condition = text = None
|
|
iq = await self['xep_0030'].get_info(jid=jid_bare)
|
|
iq_disco_info = iq['disco_info']
|
|
if iq_disco_info:
|
|
features = iq_disco_info['features']
|
|
if 'http://jabber.org/protocol/muc#unique' in features:
|
|
jid_kind = 'conference'
|
|
elif 'urn:xmpp:mix:core:1' in features:
|
|
jid_kind = 'mix'
|
|
elif ('muc_moderated' in features or
|
|
'muc_open' in features or
|
|
'muc_persistent' in features or
|
|
'muc_public' in features or
|
|
'muc_semianonymous' in features or
|
|
'muc_unmoderated' in features or
|
|
'muc_unsecured' in features):
|
|
jid_kind = 'muc'
|
|
else:
|
|
for identity in iq_disco_info['identities']:
|
|
if identity[0] == 'pubsub' and identity[1] == 'service':
|
|
#if 'http://jabber.org/protocol/pubsub' in features:
|
|
#if 'http://jabber.org/protocol/pubsub#access-authorize' in features:
|
|
#if 'http://jabber.org/protocol/rsm' in features:
|
|
jid_kind = 'pubsub'
|
|
break
|
|
if identity[0] == 'server' and identity[1] == 'im':
|
|
jid_kind = 'server'
|
|
break
|
|
#if identity[0] == 'pubsub' and identity[1] == 'pep':
|
|
if identity[0] == 'account':
|
|
#if 'urn:xmpp:bookmarks:1#compat-pep' in features:
|
|
#if 'urn:xmpp:bookmarks:1#compat' in features:
|
|
#if 'urn:xmpp:push:0' in features:
|
|
#if 'urn:xmpp:pep-vcard-conversion:0' in features:
|
|
#if 'urn:xmpp:sid:0' in features:
|
|
|
|
# Also in MIX
|
|
#if 'urn:xmpp:mam:2' in features:
|
|
#if 'urn:xmpp:mam:2#extended' in features:
|
|
jid_kind = 'account'
|
|
break
|
|
if identity[0] == 'client' and identity[1] == 'bot':
|
|
jid_kind = 'bot'
|
|
#logger.info('Jabber ID: {}\n'
|
|
# 'Chat Type: {}'.format(jid_bare, result))
|
|
else:
|
|
iq = condition = text = None
|
|
except (IqError, IqTimeout) as e:
|
|
#logger.warning('Chat type could not be determined for {}'.format(jid_bare))
|
|
#logger.error(e)
|
|
iq = None
|
|
error = True
|
|
condition = e.iq['error']['condition']
|
|
text = e.iq['error']['text'] or 'Error'
|
|
#if not text:
|
|
# # NOTE We might want to set a specific photo for condition remote-server-not-found
|
|
# if condition:
|
|
# text = 'Could not determine JID type'
|
|
# else:
|
|
# text = 'Unknown Error'
|
|
result = {
|
|
'condition' : condition,
|
|
'error' : error,
|
|
'iq' : iq,
|
|
'text' : text,
|
|
'kind' : jid_kind}
|
|
return result
|
|
|
|
|
|
class XmppXep0045:
|
|
|
|
async def get_room_information(self, jid, alias, maxchars=None, maxstanzas=None, seconds=None):
|
|
#logger.info('Joining groupchat\nJID : {}\n'.format(jid))
|
|
#jid_from = str(self.boundjid) if self.is_component else None
|
|
if not maxchars: maxchars = 1000
|
|
if not maxstanzas: maxstanzas = 50
|
|
if not seconds: seconds = 864000
|
|
try:
|
|
error = False
|
|
condition = text = None
|
|
#since = datetime.fromtimestamp(time.time()-seconds)
|
|
iq = await self['xep_0045'].join_muc_wait(
|
|
jid,
|
|
alias,
|
|
#maxchars=maxchars,
|
|
maxstanzas=maxstanzas,
|
|
#password=None,
|
|
#presence_options = {"pfrom" : jid_from},
|
|
#seconds=seconds,
|
|
#since=since,
|
|
#timeout=30
|
|
)
|
|
except (IqError, IqTimeout, PresenceError) as e:
|
|
error = True
|
|
iq = None
|
|
condition = e.iq['error']['condition']
|
|
text = e.iq['error']['text']
|
|
result = {
|
|
'error' : error,
|
|
'condition' : condition,
|
|
'text' : text,
|
|
'iq' : iq}
|
|
return result
|
|
|
|
async def get_room_data(self, jid_bare):
|
|
return await self['xep_0045'].get_room_config(jid_bare)
|
|
|
|
async def get_room_participants(self, jid_bare):
|
|
return await self['xep_0045'].get_roster(jid_bare)
|
|
|
|
|
|
# NOTE: "Item not found", yet is a group chat
|
|
# That is, JID has no vcard
|
|
# messaging-off@conference.movim.eu
|
|
|
|
class XmppXep0054:
|
|
|
|
async def get_vcard_data(self, jid_bare):
|
|
try:
|
|
error = False
|
|
condition = text = None
|
|
iq = await self['xep_0054'].get_vcard(jid_bare)
|
|
except (IqError, IqTimeout) as e:
|
|
error = True
|
|
condition = e.iq['error']['condition']
|
|
text = e.iq['error']['text']
|
|
if not text:
|
|
if condition:
|
|
text = 'Could not retrieve vCard'
|
|
else:
|
|
text = 'Unknown Error'
|
|
iq = None
|
|
result = {
|
|
'error' : error,
|
|
'condition' : condition,
|
|
'text' : text,
|
|
'iq' : iq}
|
|
return result
|
|
|
|
class XmppXep0060:
|
|
|
|
async def get_node_items(self, jid_bare, node_name, item_ids=None, max_items=None):
|
|
try:
|
|
if max_items:
|
|
iq = await self['xep_0060'].get_items(
|
|
jid_bare, node_name, timeout=5)
|
|
it = self['xep_0060'].get_items(
|
|
jid_bare, node_name, timeout=5, max_items=max_items, iterator=True)
|
|
q = rsm.Iq()
|
|
q['to'] = jid_bare
|
|
q['disco_items']['node'] = node_name
|
|
async for item in rsm.ResultIterator(q, 'disco_items', '10'):
|
|
print(item['disco_items']['items'])
|
|
|
|
else:
|
|
iq = await self['xep_0060'].get_items(
|
|
jid_bare, node_name, timeout=5, item_ids=item_ids)
|
|
result = iq
|
|
except IqError as e:
|
|
if e.iq['error']['text'] == 'Node not found':
|
|
result = 'Node not found'
|
|
elif e.iq['error']['condition'] == 'item-not-found':
|
|
result = 'Item not found'
|
|
else:
|
|
result = None
|
|
except IqTimeout as e:
|
|
result = e
|
|
return result
|
|
|
|
async def get_node_item_ids(self, jid_bare, node_name):
|
|
try:
|
|
error = False
|
|
condition = text = None
|
|
iq = await self['xep_0030'].get_items(
|
|
jid_bare, node_name)
|
|
# Broken. See https://codeberg.org/poezio/slixmpp/issues/3548
|
|
#iq = await self['xep_0060'].get_item_ids(
|
|
# jid_bare, node_name, timeout=5)
|
|
except (IqError, IqTimeout) as e:
|
|
error = True
|
|
condition = e.iq['error']['condition']
|
|
text = e.iq['error']['text']
|
|
if not text:
|
|
if condition:
|
|
text = 'Could not retrieve node items'
|
|
else:
|
|
text = 'Unknown Error'
|
|
iq = None
|
|
result = {
|
|
'error' : error,
|
|
'condition' : condition,
|
|
'text' : text,
|
|
'iq' : iq}
|
|
return result
|
|
|
|
class XmppXep0369:
|
|
|
|
async def get_room_data(self, jid_bare):
|
|
return await self['xep_0369'].get_channel_info(jid_bare)
|
|
|
|
|
|
def main():
|
|
filename_configuration = 'configuration.toml'
|
|
data = Data.open_file_toml(filename_configuration)
|
|
|
|
account = data['account']
|
|
jabber_id = account['xmpp']
|
|
password = account['pass']
|
|
alias = account['alias']
|
|
|
|
http_instance = HttpInstance(jabber_id, password, alias)
|
|
return http_instance.app
|
|
|
|
app = main()
|
|
|
|
# FIXME
|
|
if __name__ == '__main__':
|
|
uvicorn.run(app, host='127.0.0.1', port=8000, reload=True)
|