Add file PyProject;

Support display of a single pubsub node item;
Update document README;
Modularize code;
This commit is contained in:
Schimon Jehudah, Adv. 2024-11-17 17:30:38 +02:00
parent 37aa7e8f40
commit 5e495579c2
32 changed files with 2431 additions and 2059 deletions

View file

@ -73,15 +73,57 @@ JabberCard requires an XMPP account to work, it only needs an XMPP server.
- XEP-0060: Publish-Subscribe
- XEP-0292: vCard4 Over XMPP
## Install
## Installation
Use the following commands, to begin JabberCard.
It is possible to install JabberCard using pip and pipx.
#### pip inside venv
```shell
git clone https://git.xmpp-it.net/sch/JabberCard
cd JabberCard/
python -m uvicorn fasi:app
```
$ python3 -m venv .venv
$ source .venv/bin/activate
```
##### Install
```
$ pip install git+https://git.xmpp-it.net/sch/JabberCard
```
#### pipx
##### Install
```
$ pipx install git+https://git.xmpp-it.net/sch/JabberCard
```
##### Update
```
$ pipx reinstall jabbercard
```
```
$ pipx uninstall jabbercard
$ pipx install git+https://git.xmpp-it.net/sch/JabberCard
```
### Configure
Copy file `settings.toml` to `~/.config/jabbercard/`.
Copy directories `css`, `img`, and `template` to `~/.local/share/jabbercard/`.
Copy files `clients.toml`, and `systems.toml` to `~/.local/share/jabbercard/`.
### Start
```
$ jabbercard
```
Open URL http://localhost:8000 and connect with your Jabber ID.
## License
@ -101,7 +143,7 @@ Schimon Jehudah Zachary 2024
## Thanks
A special thank you for Mr. Georg Lukas of [op-co.de](https://op-co.de/) from
A special thank you for Mr. Georg Lukas of [op-co.de](https://op-co.de) from
Germany, who has gracefully published a sophisticated client-side HTML
invitation page, and consequently exposed that idea to the founder of this
project.

View file

@ -1 +0,0 @@
This directory caches textual data of Jabber IDs.

2048
fasi.py

File diff suppressed because it is too large Load diff

3
jabbercard/__init__.py Normal file
View file

@ -0,0 +1,3 @@
from jabbercard.version import __version__, __version_info__
print('JabberCard', __version__)

25
jabbercard/__main__.py Normal file
View file

@ -0,0 +1,25 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from jabbercard.config import Cache
from jabbercard.http.instance import HttpInstance
#import logging
import os
#from os.path import getsize, exists
import re
#import time
import uvicorn
def main():
http_instance = HttpInstance()
return http_instance.app
if __name__ == 'jabbercard.__main__':
directory_cache = Cache.get_directory()
if not os.path.exists(directory_cache): os.mkdir(directory_cache)
for subdirectory in ('details', 'photo', 'qr', 'xep_0060'):
subdirectory_cache = os.path.join(directory_cache, subdirectory)
if not os.path.exists(subdirectory_cache): os.mkdir(subdirectory_cache)
app = main()
uvicorn.run(app, host='127.0.0.1', port=8000, reload=False)

116
jabbercard/config.py Normal file
View file

@ -0,0 +1,116 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Functions get_directory() were taken from project jarun/buku.
By Arun Prakash Jana (jarun) and Dmitry Marakasov (AMDmi3).
"""
import os
import sys
try:
import tomllib
except:
import tomli as tomllib
class Settings:
def get_directory():
"""
Determine the directory path where setting files be stored.
* If $XDG_CONFIG_HOME is defined, use it;
* else if $HOME exists, use it;
* else if the platform is Windows, use %APPDATA%;
* else use the current directory.
Returns
-------
str
Path to configuration directory.
"""
# config_home = xdg.BaseDirectory.xdg_config_home
config_home = os.environ.get('XDG_CONFIG_HOME')
if config_home is None:
if os.environ.get('HOME') is None:
if sys.platform == 'win32':
config_home = os.environ.get('APPDATA')
if config_home is None:
return os.path.abspath('.')
else:
return os.path.abspath('.')
else:
config_home = os.path.join(
os.environ.get('HOME'), '.config'
)
return os.path.join(config_home, 'jabbercard')
def get_setting(filename, section):
with open(filename, mode="rb") as settings:
result = tomllib.load(settings)[section]
return result
class Data:
def get_directory():
"""
Determine the directory path where data files be stored.
* If $XDG_DATA_HOME is defined, use it;
* else if $HOME exists, use it;
* else if the platform is Windows, use %APPDATA%;
* else use the current directory.
Returns
-------
str
Path to database file.
"""
# data_home = xdg.BaseDirectory.xdg_data_home
data_home = os.environ.get('XDG_DATA_HOME')
if data_home is None:
if os.environ.get('HOME') is None:
if sys.platform == 'win32':
data_home = os.environ.get('APPDATA')
if data_home is None:
return os.path.abspath('.jabbercard/data')
else:
return os.path.abspath('.jabbercard/data')
else:
data_home = os.path.join(
os.environ.get('HOME'), '.local', 'share'
)
return os.path.join(data_home, 'jabbercard')
class Cache:
def get_directory():
"""
Determine the directory path where cache files be stored.
* If $XDG_CACHE_HOME is defined, use it;
* else if $HOME exists, use it;
* else if the platform is Windows, use %APPDATA%;
* else use the current directory.
Returns
-------
str
Path to cache directory.
"""
# cache_home = xdg.BaseDirectory.xdg_cache_home
cache_home = os.environ.get('XDG_CACHE_HOME')
if cache_home is None:
if os.environ.get('HOME') is None:
if sys.platform == 'win32':
cache_home = os.environ.get('APPDATA')
if cache_home is None:
return os.path.abspath('.jabbercard/cache')
else:
return os.path.abspath('.jabbercard/cache')
else:
cache_home = os.path.join(
os.environ.get('HOME'), '.cache'
)
return os.path.join(cache_home, 'jabbercard')

941
jabbercard/http/instance.py Normal file
View file

@ -0,0 +1,941 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
from email.utils import parseaddr
from fastapi import FastAPI, Form, HTTPException, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from jabbercard.config import Cache, Settings, Data
from jabbercard.utilities.graphics import Graphics
from jabbercard.utilities.toml import Toml
from jabbercard.utilities.xml import Syndication
from jabbercard.xmpp.utilities import XmppUtilities
import os
from starlette.responses import RedirectResponse
from urllib.parse import urlsplit
class HttpInstance:
def __init__(self):
directory_settings = Settings.get_directory()
filename_settings = os.path.join(directory_settings, 'settings.toml')
settings = Toml.open_file_toml(filename_settings)
account = settings['account']
jabber_id = account['xmpp']
password = account['pass']
alias = account['alias']
brand = settings['brand']
brand_name = brand['name']
brand_site = brand['site']
chat_client = brand['chat']
news_client = brand['news']
directory_data = Data.get_directory()
directory_data_css = os.path.join(directory_data, 'css')
directory_data_graphic = os.path.join(directory_data, 'graphic')
directory_data_img = os.path.join(directory_data, 'img')
filename_favicon = os.path.join(directory_data, 'img', 'favicon.ico')
directory_data_template = os.path.join(directory_data, 'template')
directory_cache = Cache.get_directory()
directory_cache_qr = os.path.join(directory_cache, 'qr')
directory_cache_photo = os.path.join(directory_cache, 'photo')
self.app = FastAPI()
templates = Jinja2Templates(directory=directory_data_template)
# TODO
# 1) Mount at the same mountpoint /img.
# 2) Image filename to be constant, i.e. /img/photo.png and /img/qr.png.
self.app.mount('/photo', StaticFiles(directory=directory_cache_photo), name='photo')
self.app.mount('/qr', StaticFiles(directory=directory_cache_qr), name='qr')
self.app.mount('/css', StaticFiles(directory=directory_data_css), name='css')
self.app.mount('/img', StaticFiles(directory=directory_data_img), name='img')
# @self.app.get(filename_favicon, include_in_schema=False)
# def favicon_get():
# return FileResponse('graphic/hermes.ico')
# @self.app.get('/hermes.svg')
# def logo_get():
# return FileResponse('graphic/hermes.svg')
@self.app.get('/v/{jid}')
async def view_jid(request: Request, jid):
"""View recent messages of a conference"""
jid_path = urlsplit(jid).path
if parseaddr(jid_path)[1] == jid_path:
jid_bare = jid_path.lower()
else:
jid_bare = jid
note = 'Jabber ID appears to be malformed'
if jid_bare == jabber_id:
raise HTTPException(status_code=403, detail='access-denied')
#try:
if True:
exception = jid_vcard = messages_10 = note = node_title = \
node_note = number_of_pages = page_number = previous = \
selection = services_sorted = subject = None
link_href = 'xmpp:{}?join'.format(jid_bare)
link_text = 'Join'
xmpp_uri = '{}'.format(jid_bare)
filename = os.path.join(directory_cache, 'details', jid_bare + '.toml')
if os.path.exists(filename) and os.path.getsize(filename) > 0:
jid_details = Toml.open_file_toml(filename)
else:
jid_details = await XmppUtilities.cache_jid_data(
jabber_id, password, jid_bare, alias=alias)
count = jid_details['count']
items = jid_details['items']
jid_info = {
'error' : jid_details['error'],
'text' : jid_details['error_text'],
'condition' : jid_details['error_condition']}
jid_kind = jid_details['kind']
jid_vcard = {
'name' : jid_details['name'],
'note' : jid_details['note'],
'type' : jid_details['image_type']}
messages = jid_details['messages']
nodes = jid_details['nodes']
note = jid_details['note']
subject = jid_details['subject']
title = jid_details['name']
# Group chat messages
# NOTE TODO
page_number = request.query_params.get('page', '')
if page_number:
try:
page_number = int(page_number)
ix = (page_number -1) * 10
except:
ix = 0
page_number = 1
else:
ix = 0
page_number = 1
messages_10 = messages[ix:][:10]
number_of_pages = int(len(messages) / 10)
if number_of_pages < len(messages) / 10: number_of_pages += 1
if jid_kind:
# Action and instance type
action, instance = XmppUtilities.set_action_instance_type(jid_kind)
else: # jid_info['error']
action = 'Contact'
instance = view_href = ''
message = '{}: {} (XEP-0030)'.format(jid_info['text'], jid_info['condition'])
xmpp_uri = jid_bare
# Query URI links
print('Query URI links')
links = XmppUtilities.get_query_uri_links(jid_bare, jid_kind)
link_href = XmppUtilities.get_link_href(jid_bare, jid_kind)
view_href = XmppUtilities.get_view_href(jid_bare, jid_kind)
xmpp_uri = XmppUtilities.get_xmpp_uri(jid_bare, jid_kind)
# Graphic files
filename, filepath, filetype, selection = Graphics.handle_photo(
jid_bare, jid_vcard, link_href)
#except Exception as e:
else:
exception = str(e)
action = 'Error'
title = 'Slixmpp error'
xmpp_uri = note = jid
filename = jid_bare = link_href = link_tex = node_note = \
node_title = number_of_pages = page_number = previous = \
selection = services = services_sorted = url = None
#if title == 'remote-server-timeout':
# raise HTTPException(status_code=408, detail='remote-server-timeout')
#else:
template_file = 'conference.xhtml'
template_dict = {
'action' : action,
'brand_name' : brand_name,
'brand_site' : brand_site,
'chat_client' : chat_client,
'exception' : exception,
'filename' : filename,
'jid_bare' : jid,
'jid_note' : note,
'jid_title' : title,
'links' : links,
'messages' : messages_10,
'node_title' : node_title,
'node_note' : node_note,
'number_of_pages' : number_of_pages,
'page_number' : page_number,
'previous' : previous,
'request' : request,
'selection' : selection,
'subject' : subject,
'title' : title,
'url' : request.url._url,
'xmpp_uri' : xmpp_uri}
response = templates.TemplateResponse(template_file, template_dict)
response.headers['Content-Type'] = 'application/xhtml+xml'
return response
@self.app.get('/c/{jid}')
async def c_jid_get(request: Request, jid):
"""Display entries of a vCard4"""
jid_path = urlsplit(jid).path
if parseaddr(jid_path)[1] == jid_path:
jid_bare = jid_path.lower()
else:
jid_bare = jid
note = 'Jabber ID appears to be malformed'
if jid_bare == jabber_id:
raise HTTPException(status_code=403, detail='access-denied')
node_name_vcard4 = 'urn:xmpp:vcard4'
item_id_vcard4 = 'current'
#try:
if True:
entries = []
exception = jid_vcard = note = node_items = node_note = \
number_of_pages = page_number = previous = selection = \
title = None
filename = os.path.join(directory_cache, 'xep_0060', jid_bare, node_name_vcard4, item_id_vcard4 + '.xml')
if os.path.exists(filename) and os.path.getsize(filename) > 0:
xml_data = Toml.open_file_xml(filename)
else:
await XmppUtilities.cache_vcard_data(
jabber_id, password, jid_bare, node_name_vcard4, item_id_vcard4)
xml_data = Toml.open_file_xml(filename)
root_element = xml_data.getroot()
child_element = root_element[0]
#vcard_info = Syndication.extract_vcard_items(child_element)
vcard_info = Syndication.extract_vcard4_items(child_element)
# Action and instance type
action = 'Profile'
filename = os.path.join(directory_cache, 'details', jid_bare + '.toml')
if os.path.exists(filename) and os.path.getsize(filename) > 0:
jid_details = Toml.open_file_toml(filename)
else:
jid_details = await XmppUtilities.cache_jid_data(
jabber_id, password, jid_bare, alias=alias)
# Set node name to 'urn:xmpp:microblog:0'
jid_kind = jid_details['kind']
nodes = jid_details['nodes']
if (jid_kind not in ('conference', 'mix', 'muc') and
'@' in jid_bare and
'urn:xmpp:microblog:0' in nodes):
node_name = 'urn:xmpp:microblog:0'
# Query URI links
print('Query URI links')
jid_kind = 'account'
link_href = XmppUtilities.get_link_href(jid_bare, jid_kind)
xmpp_uri = XmppUtilities.get_xmpp_uri(jid_bare, jid_kind, node_name_vcard4)
links = XmppUtilities.get_query_uri_links(jid_bare, jid_kind, node_name)
# Graphic files
filename, filepath, filetype, selection = Graphics.handle_photo(
jid_bare, jid_vcard, link_href)
#except Exception as e:
else:
exception = str(e)
action = 'Error'
title = 'Slixmpp error'
xmpp_uri = note = jid
filename = jid_bare = link_href = link_tex = node_note = \
node_title = number_of_pages = page_number = previous = \
selection = url = None
if 'fn' in vcard_info and vcard_info['fn']:
title = vcard_info['fn']
elif 'alias' in vcard_info and vcard_info['alias']:
title = vcard_info['alias']
else:
title = jid_bare.split('@')[0]
if 'alias' in vcard_info and vcard_info['alias']:
alias = vcard_info['alias']
else:
alias = jid_bare.split('@')[0]
#if title == 'remote-server-timeout':
# raise HTTPException(status_code=408, detail='remote-server-timeout')
#else:
template_file = 'vcard.xhtml'
template_dict = {
'action' : action,
'alias' : alias,
'brand_name' : brand_name,
'brand_site' : brand_site,
'chat_client' : chat_client,
'entries' : entries,
'exception' : exception,
'filename' : filename,
'jid_bare' : jid,
'jid_note' : note,
#'jid_title' : title,
#'node_title' : node_title,
'links' : links,
'node_name' : node_name_vcard4,
'number_of_pages' : number_of_pages,
'page_number' : page_number,
'previous' : previous,
'request' : request,
'selection' : selection,
'title' : title,
'url' : request.url._url,
'vcard_info' : vcard_info,
'xmpp_uri' : xmpp_uri}
response = templates.TemplateResponse(template_file, template_dict)
response.headers['Content-Type'] = 'application/xhtml+xml'
return response
@self.app.get('/b/{jid}')
async def b_jid_get(request: Request, jid):
response = await browse_jid_node_get(request, jid, 'urn:xmpp:microblog:0')
return response
# TODO Change to /p/ for pubsub
@self.app.get('/d/{jid}/{node_name}')
@self.app.get('/d/{jid}/{node_name}/{item_id}')
async def d_jid_node_get(request: Request, jid, node_name, item_id=None):
response = await browse_jid_node_get(request, jid, node_name, item_id)
return response
async def browse_jid_node_get(request: Request, jid, node_name, item_id=None):
"""Browse items of a pubsub node"""
jid_path = urlsplit(jid).path
if parseaddr(jid_path)[1] == jid_path:
jid_bare = jid_path.lower()
else:
jid_bare = jid
note = 'Jabber ID appears to be malformed'
if jid_bare == jabber_id:
raise HTTPException(status_code=403, detail='access-denied')
#try:
if True:
exception = jid_vcard = note = node_items = node_note = \
number_of_pages = page_number = previous = selection = None
filename = os.path.join(directory_cache, 'details', jid_bare + '.toml')
if os.path.exists(filename) and os.path.getsize(filename) > 0:
jid_details = Toml.open_file_toml(filename)
else:
jid_details = await XmppUtilities.cache_jid_data(
jabber_id, password, jid_bare, node_name, item_id)
# Node item IDs
nodes = jid_details['nodes']
#items = jid_details['items']
# for item in items:
# if item[1] == node_name:
# nodes[node_name]['title'] = item[2]
# break
supdirectory = os.path.join(directory_cache, 'xep_0060', jid_bare)
if not os.path.exists(supdirectory): os.mkdir(supdirectory)
directory = os.path.join(directory_cache, 'xep_0060', jid_bare, node_name)
if not os.path.exists(directory):
os.mkdir(directory)
await XmppUtilities.cache_node_data(
jabber_id, password, jid_bare, node_name)
count = jid_details['count']
jid_info = {
'error' : jid_details['error'],
'text' : jid_details['error_text'],
'condition' : jid_details['error_condition']}
jid_kind = jid_details['kind']
jid_vcard = {
'name' : jid_details['name'],
'note' : jid_details['note'],
'type' : jid_details['image_type']}
messages = jid_details['messages']
#node_title = nodes[node_name]['title'] if 'title' in nodes[node_name] else jid_details['name']
node_title = node_name
note = jid_details['note']
#title = nodes[node_name]['title'] if node_name else jid_details['name']
title = jid_details['name']
#link_href = 'xmpp:{}?pubsub;node={};action=subscribe'.format(
# jid_bare, node_name)
#link_text = 'Subscribe'
#xmpp_uri = '{}?;node={}'.format(jid_bare, node_name)
# TODO Support viewing of a single item
# Node items
entries = []
if item_id:
node_items = item_ids_10 = [item_id + '.xml']
else:
node_items = os.listdir(directory)
if 'urn:xmpp:avatar:metadata.xml' in node_items:
node_items.remove('urn:xmpp:avatar:metadata.xml')
page_number = request.query_params.get('page', '')
if page_number:
try:
page_number = int(page_number)
ix = (page_number -1) * 10
except:
ix = 0
page_number = 1
else:
ix = 0
page_number = 1
item_ids_10 = node_items[ix:][:10]
number_of_pages = int(len(node_items) / 10)
if number_of_pages < len(node_items) / 10: number_of_pages += 1
if node_items:
for item in item_ids_10:
filename = os.path.join(directory, item)
xml_data = Toml.open_file_xml(filename)
root_element = xml_data.getroot()
child_element = root_element[0]
entry = Syndication.extract_atom_items(child_element)
if entry:
filename_without_file_extension = item[:len(item)-4]
entry['id'] = filename_without_file_extension
entries.append(entry)
#if len(entries) > 10: break
if jid_kind:
# Action and instance type
action, instance = XmppUtilities.set_action_instance_type(jid_kind, node_name)
else: # jid_info['error']
action = 'Contact'
instance = view_href = ''
message = '{}: {} (XEP-0030)'.format(jid_info['text'], jid_info['condition'])
xmpp_uri = jid_bare
# Query URI links
print('Query URI links')
links = XmppUtilities.get_query_uri_links(jid_bare, jid_kind, node_name, item_id)
link_href = XmppUtilities.get_link_href(jid_bare, jid_kind, node_name)
view_href = XmppUtilities.get_view_href(jid_bare, jid_kind, node_name)
xmpp_uri = XmppUtilities.get_xmpp_uri(jid_bare, jid_kind, node_name)
node_note = xmpp_uri
# Graphic files
filename, filepath, filetype, selection = Graphics.handle_photo(
jid_bare, jid_vcard, link_href)
#except Exception as e:
else:
exception = str(e)
action = 'Error'
title = 'Slixmpp error'
xmpp_uri = note = jid
filename = jid_bare = link_href = link_tex = node_note = \
node_title = number_of_pages = page_number = previous = \
selection = url = None
#if title == 'remote-server-timeout':
# raise HTTPException(status_code=408, detail='remote-server-timeout')
#else:
template_file = 'node.xhtml'
template_dict = {
'action' : action,
'brand_name' : brand_name,
'brand_site' : brand_site,
'chat_client' : chat_client,
'entries' : entries,
'exception' : exception,
'filename' : filename,
'jid_bare' : jid,
'jid_note' : note,
'jid_title' : title,
'links' : links,
'node_title' : node_title,
'node_note' : node_note,
'node_name' : node_name,
'number_of_pages' : number_of_pages,
'page_number' : page_number,
'previous' : previous,
'request' : request,
'selection' : selection,
'title' : node_title,
'url' : request.url._url,
'xmpp_uri' : xmpp_uri}
response = templates.TemplateResponse(template_file, template_dict)
response.headers['Content-Type'] = 'application/xhtml+xml'
return response
@self.app.get('/d/{jid}')
async def discover_jid_get(request: Request, jid):
"""View items of a selected service"""
jid_path = urlsplit(jid).path
if parseaddr(jid_path)[1] == jid_path:
jid_bare = jid_path.lower()
else:
jid_bare = jid
note = 'Jabber ID appears to be malformed'
if jid_bare == jabber_id:
raise HTTPException(status_code=403, detail='access-denied')
#try:
if True:
exception = note = selection = services_sorted = None
title = 'Services'
link_href = xmpp_uri = jid_bare
link_text = 'Reload'
# Start an XMPP instance and retrieve information
xmpp_instance = XmppInstance(jabber_id, password, jid_bare)
xmpp_instance.connect()
# JID services
action = 'Discover'
jid_info = await XmppXep0030.get_jid_info(xmpp_instance, jid_bare)
iq = jid_info['iq']
if iq:
jid_kind = jid_info['kind']
iq_disco_info = iq['disco_info']
for identity in iq_disco_info['identities']:
if jid_kind == identity[0] and identity[3]:
note = identity[3]
if not note: note = jid_bare
jid_items = await XmppXep0030.get_jid_items(xmpp_instance, jid_bare)
iq = jid_items['iq']
iq_disco_items = iq['disco_items']
iq_disco_items_items = iq_disco_items['items']
services = {}
#services_sorted = {}
category = 'unsorted'
for item in iq_disco_items_items:
jid_bare = item[0]
if len(iq_disco_items_items) > 20 or jid_kind and jid_kind in ('pubsub'):
identity = sub_jid_info = sub_jid_info_iq = ''
if jid_kind and jid_kind in ('conference', 'mix', 'muc'):
category = 'conference'
if jid_kind and jid_kind in ('pubsub'):
category = 'pubsub'
else:
sub_jid_info = await XmppXep0030.get_jid_info(xmpp_instance, jid_bare)
sub_jid_info_iq = sub_jid_info['iq']
try:
for identity_item in sub_jid_info_iq['disco_info']['identities']:
identity = identity_item
break
if sub_jid_info_iq:
category = identity[0] if (identity, list) and identity[0] else 'other'
except:
identity = None
category = 'unavailable'
sub_jid_kind = sub_jid_info['kind'] if 'kind' in sub_jid_info else None
if category not in services: services[category] = []
services[category].append(
{'identity' : identity,
'info' : sub_jid_info,
'jid' : jid_bare,
'kind' : sub_jid_kind,
'name' : item[2] or item[1] or item[0],
'node' : item[1]})
services_sorted = {k: v for k, v in services.items() if k != 'unavailable'}
if 'unavailable' in services: services_sorted['unavailable'] = services['unavailable']
else:
message = '{}: {} (XEP-0030)'.format(jid_info['condition'], jid_info['text'])
services = services_sorted = None
xmpp_instance.disconnect()
#except Exception as e:
else:
exception = str(e)
action = 'Error'
title = 'Slixmpp error'
xmpp_uri = note = jid
filename = jid_bare = link_href = link_text = selection = services = services_sorted = url = None
#if title == 'remote-server-timeout':
# raise HTTPException(status_code=408, detail='remote-server-timeout')
#else:
template_file = 'disco.xhtml'
template_dict = {
'action' : action,
'filename' : 'default.svg',
'brand_name' : brand_name,
'brand_site' : brand_site,
'chat_client' : chat_client,
'exception' : exception,
'jid_bare' : jid,
'note' : note,
'request' : request,
'services' : services_sorted,
'title' : title,
'url' : request.url._url,
'link_href' : link_href,
'link_text' : link_text,
'xmpp_uri' : xmpp_uri}
response = templates.TemplateResponse(template_file, template_dict)
response.headers['Content-Type'] = 'application/xhtml+xml'
return response
@self.app.get('/j/{jid}/{node_name}')
async def jid_node_get(request: Request, jid, node_name):
response = await main_jid_node_get(request, jid, node_name)
return response
@self.app.get('/j/{jid}')
async def jid_get(request: Request, jid):
node_name = request.query_params.get('node', '')
if node_name:
response = RedirectResponse(url='/j/{}/{}'.format(jid, node_name))
else:
response = await main_jid_node_get(request, jid)
return response
async def main_jid_node_get(request: Request, jid, node_name=None):
jid_bare = jid
jid_path = urlsplit(jid).path
if parseaddr(jid_path)[1] == jid_path:
jid_bare = jid_path.lower()
else:
jid_bare = jid
note = 'Jabber ID appears to be malformed'
if jid_bare == jabber_id:
raise HTTPException(status_code=403, detail='access-denied')
#try:
if True:
action = alias = count_item = count_message = exception = \
instance = jid_vcard = jid_info = link_href = message = note = \
selection = title = vcard4 = view_href = xmpp_uri = None
#node_name = 'urn:xmpp:microblog:0'
filename = os.path.join(directory_cache, 'details', jid_bare + '.toml')
if os.path.exists(filename) and os.path.getsize(filename) > 0:
jid_details = Toml.open_file_toml(filename)
else:
jid_details = await XmppUtilities.cache_jid_data(
jabber_id, password, jid_bare, node_name, alias=alias)
# Set node name to 'urn:xmpp:microblog:0'
jid_kind = jid_details['kind']
nodes = jid_details['nodes']
count_message = jid_details['messages']
if (jid_kind not in ('conference', 'mix', 'muc') and
'@' in jid_bare and
not node_name and
'urn:xmpp:microblog:0' in nodes):
node_name = 'urn:xmpp:microblog:0'
items = jid_details['items']
jid_info = {
'error' : jid_details['error'],
'text' : jid_details['error_text'],
'condition' : jid_details['error_condition']}
jid_vcard = {
'name' : jid_details['name'],
'note' : jid_details['note'],
'type' : jid_details['image_type']}
messages = jid_details['messages']
#note = nodes[node_name]['title'] if node_name in nodes else jid_details['note']
#note = jid_details['note']
# vCard4
node_name_vcard4 = 'urn:xmpp:vcard4'
item_id_vcard4 = 'current'
directory = os.path.join(directory_cache, 'xep_0060', jid_bare, node_name_vcard4)
filename = os.path.join(directory, item_id_vcard4 + '.xml')
if os.path.exists(filename) and os.path.getsize(filename) > 0:
xml_data = Toml.open_file_xml(filename)
root_element = xml_data.getroot()
child_element = root_element[0]
#vcard_info = Syndication.extract_vcard_items(child_element)
vcard_info = Syndication.extract_vcard4_items(child_element)
title = vcard_info['fn']
alias = vcard_info['alias']
#note = vcard_info['note']
else:
await XmppUtilities.cache_vcard_data(
jabber_id, password, jid_bare, node_name_vcard4, item_id_vcard4)
if os.path.exists(filename) and os.path.getsize(filename) > 0:
vcard4 = True
# Node item IDs
supdirectory = os.path.join(directory_cache, 'xep_0060', jid_bare)
if not os.path.exists(supdirectory): os.mkdir(supdirectory)
if node_name:
directory = os.path.join(directory_cache, 'xep_0060', jid_bare, node_name)
if not os.path.exists(directory):
os.mkdir(directory)
await XmppUtilities.cache_node_data(
jabber_id, password, jid_bare, node_name)
# JID or node items
if jid_kind in ('mix', 'muc', 'conference', 'server'):
count_item = jid_details['count']
elif jid_kind in ('account', 'pubsub'):
node_items = os.listdir(directory)
if 'urn:xmpp:avatar:metadata.xml' in node_items:
node_items.remove('urn:xmpp:avatar:metadata.xml')
count_item = len(node_items)
# if ('@' in jid_bare and
# 'urn:xmpp:microblog:0' not in nodes and
# jid_kind not in ('conference', 'mix', 'muc')):
# count_item = 0
# else:
# count_item = len(node_items)
if jid_kind == 'pubsub' and node_name:
items = jid_details['items']
for item in items:
if item[1] == node_name:
#nodes[node_name]['title'] = item[2]
title = item[2]
break
if not title: title = node_name
else:
title = jid_details['name']
# TODO Consider also the existence of a node /j/pubsub.movim.eu/i2p
if jid_kind:
# Action and instance type
action, instance = XmppUtilities.set_action_instance_type(jid_kind, node_name)
view_href = XmppUtilities.get_view_href(jid_bare, jid_kind, node_name)
xmpp_uri = XmppUtilities.get_xmpp_uri(jid_bare, jid_kind, node_name)
else: # jid_info['error']
action = 'Contact'
instance = view_href = ''
if jid_info['condition']: message = '{}: {} (XEP-0030)'.format(jid_info['text'], jid_info['condition'])
xmpp_uri = jid_bare
link_href = XmppUtilities.get_link_href(jid_bare, jid_kind, node_name)
# Query URI links
print('Query URI links')
links = XmppUtilities.get_query_uri_links(jid_bare, jid_kind, node_name)
# Graphic files
filename, filepath, filetype, selection = Graphics.handle_photo(
jid_bare, jid_vcard, link_href)
#except Exception as e:
else:
exception = str(e)
print(exception)
action = 'Error'
title = 'Slixmpp error'
xmpp_uri = jid
alias = count_item = count_message = filename = jid_bare = \
jid_vcard = jid_kind = links = message = selection = url = \
vcard4 = None
#note_500 = note[:500]
#note = note_500 + ' …' if note_500 < note else note_500
# NOTE Handling of variables "title" and "note" in case of '/j/{jid}/{node_name}' is confusing.
# TODO Add new keys that are of 'node' and be utilized for nodes, instead of reusing a variable for several roles.
# FIXME If no title be provided to 'node name', use 'node name' itself as title (to be done at XmppUtilities.cache_jid_data).
template_file = 'jid.xhtml'
template_dict = {
'action' : action,
'alias' : alias,
'brand_name' : brand_name,
'brand_site' : brand_site,
'chat_client' : chat_client,
'count_item' : count_item,
'count_message' : count_message,
'instance' : instance,
'exception' : exception,
'filename' : filename,
'jid_bare' : jid_bare,
'jid_kind' : jid_kind,
'links' : links,
'message' : message,
'news_client' : news_client,
'note' : note, # TODO node_note or title of PubSub JID
'request' : request,
'selection' : selection,
'title' : title, # TODO node_title
'url' : request.url._url,
'vcard4' : vcard4,
'view_href' : view_href,
'xmpp_uri' : xmpp_uri}
response = templates.TemplateResponse(template_file, template_dict)
response.headers['Content-Type'] = 'application/xhtml+xml'
return response
@self.app.get('/selection')
async def selection_get(request: Request):
filename = os.path.join(directory_data, 'systems.toml')
software = Toml.open_file_toml(filename)['systems']
template_file = 'software.xhtml'
template_dict = {
'brand_name' : brand_name,
'brand_site' : brand_site,
'chat_client' : chat_client,
'request' : request,
'software' : software,
'url' : request.url._url}
response = templates.TemplateResponse(template_file, template_dict)
response.headers['Content-Type'] = 'application/xhtml+xml'
return response
#@self.app.get('/download/select')
#async def download_select_get(request, software=None):
@self.app.get('/download/{software}')
async def download_software_get(request: Request, software):
response = await download_get(request, featured=True, software=software)
return response
@self.app.get('/download/{software}/all')
async def download_software_all_get(request: Request, software):
response = await download_get(request, featured=False, software=software)
return response
@self.app.get('/download')
async def download_get(request: Request, featured=True, software=None):
# TODO
# Fearured clients '/download/{software}'
# All clients '/download/{software}/all'
# Select software '/download/select'
skipped = False
if not software:
user_agent = request.headers.get("user-agent")
user_agent_lower = user_agent.lower()
match user_agent_lower:
case _ if 'bsd' in user_agent_lower:
software = 'bsd'
case _ if 'linux' in user_agent_lower:
software = 'linux'
case _ if 'haiku' in user_agent_lower:
software = 'haiku'
case _ if 'android' in user_agent_lower:
software = 'android'
case _ if 'reactos' in user_agent_lower or 'windows' in user_agent_lower:
software = 'windows'
case _ if 'ios' in user_agent_lower or 'macos' in user_agent_lower:
software = 'apple'
name = software.title()
if software == 'bsd': name = 'BSD'
if software == 'posix': name = 'POSIX'
if software == 'ubports': name = 'UBports'
if name.endswith('os'): name = name.replace('os', 'OS')
filename_clients = os.path.join(directory_data, 'clients.toml')
clients = Toml.open_file_toml(filename_clients)
client_selection = []
clients_software = 0
for client in clients:
if software in clients[client]:
clients_software += 1
if featured and 'featured' not in clients[client]['properties']:
skipped = True
continue
client_selected = {
'name' : clients[client]['title'],
'about' : clients[client]['about'],
'href' : clients[client][software],
'iden' : client,
'properties' : clients[client]['properties'],
'resources' : clients[client]['resources'] if 'resources' in clients[client] else ''}
client_selection.append(client_selected)
skipped = False if len(client_selection) == clients_software else True
template_file = 'download.xhtml'
template_dict = {
'brand_name' : brand_name,
'brand_site' : brand_site,
'chat_client' : chat_client,
'client_selection' : client_selection,
'featured' : featured,
'skipped' : skipped,
'request' : request,
'software' : software,
'title' : name,
'url' : request.url._url}
response = templates.TemplateResponse(template_file, template_dict)
response.headers['Content-Type'] = 'application/xhtml+xml'
return response
@self.app.exception_handler(403)
def access_denied_exception_handler(request: Request, exc: HTTPException):
action = 'Warning'
title = 'Access Denied'
return result_get(request, action, title)
@self.app.exception_handler(404)
def not_found_exception_handler(request: Request, exc: HTTPException):
action = 'Warning'
title = 'Not Found'
return result_get(request, action, title)
@self.app.exception_handler(500)
def internal_error_exception_handler(request: Request, exc: HTTPException):
action = 'Error'
title = 'Internal Server Error'
return result_get(request, action, title)
@self.app.exception_handler(504)
def time_out_exception_handler(request: Request, exc: HTTPException):
action = 'Warning'
title = 'Time Out'
return result_get(request, action, title)
def result_get(request: Request, action: str, title: str):
template_file = 'result.xhtml'
template_dict = {
'action' : action,
'brand_name' : brand_name,
'brand_site' : brand_site,
'chat_client' : chat_client,
'request' : request,
'title' : title,
'url' : request.url._url}
response = templates.TemplateResponse(template_file, template_dict)
response.headers['Content-Type'] = 'application/xhtml+xml'
return response
@self.app.get('/')
async def main_get(request: Request):
jabber_id = request.query_params.get('jid', '')
if jabber_id:
response = RedirectResponse(url='/j/' + jabber_id)
else:
template_file = 'main.xhtml'
template_dict = {
'brand_name' : brand_name,
'brand_site' : brand_site,
'chat_client' : chat_client,
'request' : request,
'url' : request.url._url}
response = templates.TemplateResponse(template_file, template_dict)
response.headers['Content-Type'] = 'application/xhtml+xml'
return response

View file

@ -0,0 +1,181 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
from jabbercard.config import Cache
import glob
import os
import qrcode
import random
try:
import cv2
except:
print('OpenCV (cv2) is required for dynamic background.')
try:
import numpy
except:
print('NumPy (numpy) is required for dynamic background.')
class Graphics:
def handle_photo(jid_bare, jid_vcard, link_href):
filename = filepath = filetype = mimetype = selection = None
directory_cache = Cache.get_directory()
filecirca = os.path.join(directory_cache, 'photo', jid_bare, '.*')
filepath_guess = glob.glob(filecirca)
if filepath_guess:
filepath = filepath_guess[0]
filetype = filepath.split('.').pop()
filename = '{}.{}'.format(jid_bare, filetype)
elif jid_vcard:
if jid_vcard['type']:
mimetype = jid_vcard['type']
if mimetype:
filetype = mimetype.split('/')[1]
if filetype == 'svg+xml': filetype = 'svg'
filename = '{}.{}'.format(jid_bare, filetype)
filepath = os.path.join(directory_cache, 'photo', filename)
#img.save(filename)
# Write the decoded bytes to a file
if 'bin' in jid_vcard:
with open(filepath, 'wb') as file:
file.write(jid_vcard['bin'])
if not filepath or not os.path.exists(filepath) or os.path.getsize(filepath) == 0:
filename = 'default.svg'
elif filetype == 'svg':
selection = Graphics.extract_colours_from_vector(filepath)
else:
selection = Graphics.extract_colours_from_raster(filepath)
# QR code
filepath_qrcode = os.path.join(directory_cache, 'qr', jid_bare, '.png')
if not os.path.exists(filepath_qrcode) or os.path.getsize(filepath_qrcode) == 0:
Graphics.generate_qr_code_graphics_from_string(link_href, jid_bare)
return filename, filepath, filetype, selection
def extract_colours_from_raster(filepath):
try:
img = cv2.imread(filepath)
#thresholded = cv2.inRange(img, (50, 100, 200), (50, 100, 200))
thresholded = cv2.inRange(img, (90, 90, 90), (190, 190, 190))
#thresholded = cv2.bitwise_not(thresholded)
#thresholded = cv2.inRange(img, (0, 0, 0), (0, 0, 0))
#res = img + cv2.cvtColor(thresholded, cv2.COLOR_GRAY2BGR)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
#result = numpy.clip(img, 90, 190)
#result = numpy.clip(img, 50, 200)
#result = numpy.clip(img, 100, 150)
result = numpy.clip(img, 100, 200)
res = cv2.cvtColor(result, cv2.COLOR_RGB2BGR)
"""
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
mask = numpy.all(numpy.logical_and(img >= 90, img <= 190), axis=2)
result = numpy.where(mask[...,None], img, 255)
res = cv2.cvtColor(result, cv2.COLOR_RGB2BGR)
"""
"""
# Thresholding for black:
lower_black = numpy.array([0, 0, 0])
upper_black = numpy.array([50, 50, 50]) # Adjust this value for the black range
black_mask = cv2.inRange(img, lower_black, upper_black)
# Thresholding for white:
lower_white = numpy.array([250, 250, 250])
upper_white = numpy.array([255, 255, 255])
white_mask = cv2.inRange(img, lower_white, upper_white)
# Combine the masks
combined_mask = cv2.bitwise_or(black_mask, white_mask)
# Invert the combined mask
inverted_mask = cv2.bitwise_not(combined_mask)
# Apply the mask to the original image
res = cv2.bitwise_and(img, img, mask=inverted_mask)
"""
selection = []
ix_1st = random.randint(1, len(res)-1)
res_ix_1st = res[ix_1st]
ix_ix_1st = random.randint(1, len(res_ix_1st)-1)
res_ix_ix_1st = res_ix_1st[ix_ix_1st]
selection.append(numpy.array(res_ix_ix_1st).tolist())
ix_2nd = random.randint(1, len(res)-1)
res_ix_2nd = res[ix_2nd]
ix_ix_2nd = random.randint(1, len(res_ix_2nd)-1)
res_ix_ix_2nd = res_ix_2nd[ix_ix_2nd]
selection.append(numpy.array(res_ix_ix_2nd).tolist())
print(selection)
except Exception as e:
selection = None
exception = str(e)
print(exception)
return selection
def extract_colours_from_vector(filepath):
# Parse the SVG file
tree = ET.parse(filepath)
root = tree.getroot()
# Set to store unique colours
colours_hex = set()
colours_rgb = []
# SVG namespace
namespace = {'svg': 'http://www.w3.org/2000/svg'}
# Find all possible elements
for elem in root.findall('.//svg:circle', namespace) + \
root.findall('.//svg:ellipse', namespace) + \
root.findall('.//svg:line', namespace) + \
root.findall('.//svg:path', namespace) + \
root.findall('.//svg:polygon', namespace) + \
root.findall('.//svg:rect', namespace) + \
root.findall('.//svg:text', namespace):
fill = elem.get('fill')
stroke = elem.get('stroke')
# Add colours to the set if they are not None or 'none'
if fill and fill.startswith('#') and len(fill) > 4 and fill.lower() != 'none':
colours_hex.add(fill)
if stroke and stroke.startswith('#') and len(stroke) > 4 and stroke.lower() != 'none':
colours_hex.add(stroke)
for colour in colours_hex:
hex = colour.lstrip('#')
rgb = list(int(hex[i:i+2], 16) for i in (0, 2, 4))
rgb.reverse()
colours_rgb.append(rgb)
selection = []
if len(colours_rgb) > 1:
for i in range(2):
ix = random.randint(0, len(colours_rgb)-1)
selection.append(colours_rgb[ix])
del colours_rgb[ix]
elif len(colours_rgb) == 1:
selection = [colours_rgb[0], colours_rgb[0]]
return selection
def generate_qr_code_graphics_from_string(text, jid_bare):
#qrcode_graphics = qrcode.make(text)
qr = qrcode.QRCode(border=2, box_size=10)
qr.add_data(text)
qrcode_graphics = qr.make_image(fill_color='#333', back_color='#f2f2f2')
directory_cache = Cache.get_directory()
filename = os.path.join(directory_cache, 'qr', jid_bare + '.png')
qrcode_graphics.save(filename)

View file

@ -0,0 +1,30 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
import tomli_w
import xml.etree.ElementTree as ET
try:
import tomllib
except:
import tomli as tomllib
class Toml:
def open_file_toml(filename: str) -> dict:
with open(filename, mode="rb") as fn:
data = tomllib.load(fn)
return data
def save_to_toml(filename: str, data: dict) -> None:
with open(filename, 'w') as fn:
data_as_string = tomli_w.dumps(data)
fn.write(data_as_string)
def open_file_xml(filename: str) -> ET.ElementTree:
data = ET.parse(filename)
return data
def save_to_file(filename: str, data: str) -> None:
with open(filename, 'w') as fn:
fn.write(data)

220
jabbercard/utilities/xml.py Normal file
View file

@ -0,0 +1,220 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import xml.etree.ElementTree as ET
class Syndication:
# def extract_vcard_items(xml_data):
# namespace = '{urn:ietf:params:xml:ns:vcard-4.0}'
# title = xml_data.find(namespace + 'title')
#
# entry = {'fn' : content_text,
# 'note' : link_href,
# 'email' : published_text,
# 'impp' : summary_text,
# 'url' : tags}
# return entry
def extract_vcard_items(xml_data):
"""Extracts all items from a vCard XML ElementTree.
Args:
xml_data (ElementTree): The vCard XML as an ElementTree object.
Returns:
dict: A dictionary where keys are item names and values are their text content.
"""
items = {}
for item in xml_data.iter():
# Skip the root element (vcard)
if item.tag == '{urn:ietf:params:xml:ns:vcard-4.0}vcard':
continue
# Extract item name and text content
item_name = item.tag.split('}')[1]
# Check for any direct text content or child elements
item_text = []
if item.text:
item_text.append(item.text)
for child in item:
if child.text:
item_text.append(child.text)
# Join text elements if multiple found
if item_text:
items[item_name] = ' '.join(item_text).strip() # Strip extra spaces
else:
items[item_name] = None
return items
def extract_vcard4_items(xml_data):
namespace = '{urn:ietf:params:xml:ns:vcard-4.0}'
vcard = {}
element_em = xml_data.find(namespace + 'email')
element_fn = xml_data.find(namespace + 'fn')
element_nn = xml_data.find(namespace + 'nickname')
element_nt = xml_data.find(namespace + 'note')
element_og = xml_data.find(namespace + 'org')
element_im = xml_data.find(namespace + 'impp')
element_ul = xml_data.find(namespace + 'url')
if isinstance(element_em, ET.Element):
for i in element_em:
text = i.text
if text:
email = text
break
else:
email = ''
else:
email = ''
if isinstance(element_fn, ET.Element):
for i in element_fn:
text = i.text
if text:
title = text
break
else:
title = ''
else:
title = ''
if isinstance(element_nn, ET.Element):
for i in element_nn:
text = i.text
if text:
alias = text
break
else:
alias = ''
else:
alias = ''
if isinstance(element_nt, ET.Element):
for i in element_nt:
text = i.text
if text:
note = text
break
else:
note = ''
else:
note = ''
if isinstance(element_og, ET.Element):
for i in element_og:
text = i.text
if text:
org = text
break
else:
org = ''
else:
org = ''
if isinstance(element_im, ET.Element):
for i in element_im:
text = i.text
if text:
impp = text
break
else:
impp = ''
else:
impp = ''
if isinstance(element_ul, ET.Element):
for i in element_ul:
text = i.text
if text:
url = text
break
else:
url = ''
else:
url = ''
vcard['extras'] = {}
for element in xml_data.findall(namespace + "group"):
category = '?'
for i in element.find(namespace + 'x-ablabel'):
txt = i.text
for i in element.find(namespace + 'url'):
uri = i.text
for i in element.find(namespace + 'url/' + namespace + 'parameters/' + namespace + 'type'):
category = i.text
if not category in vcard['extras']: vcard['extras'][category] = []
vcard['extras'][category].append({'label' : txt, 'uri' : uri})
vcard['alias'] = alias
vcard['email'] = email
vcard['fn'] = title
vcard['note'] = note
vcard['org'] = org
vcard['impp'] = impp
vcard['url'] = url
return vcard
def extract_atom_items(xml_data, limit=False):
# NOTE
# `.//` was not needded when node item payload was passed directly.
# Now that item is saved as xml, it is required to use `.//`.
# Perhaps navigating a level down (i.e. to "child"), or removing the root from the file would solve this.
#namespace = './/{http://www.w3.org/2005/Atom}'
namespace = '{http://www.w3.org/2005/Atom}'
title = xml_data.find(namespace + 'title')
links = xml_data.find(namespace + 'link')
if (not isinstance(title, ET.Element) and
not isinstance(links, ET.Element)): return None
title_text = '' if title == None else title.text
link_href = ''
if isinstance(links, ET.Element):
for link in xml_data.findall(namespace + 'link'):
link_href = link.attrib['href'] if 'href' in link.attrib else ''
if link_href: break
contents = xml_data.find(namespace + 'content')
content_text = ''
if isinstance(contents, ET.Element):
for content in xml_data.findall(namespace + 'content'):
content_text = content.text or ''
if content_text: break
summaries = xml_data.find(namespace + 'summary')
summary_text = ''
if isinstance(summaries, ET.Element):
for summary in xml_data.findall(namespace + 'summary'):
summary_text = summary.text or ''
if summary_text: break
published = xml_data.find(namespace + 'published')
published_text = '' if published == None else published.text
categories = xml_data.find(namespace + 'category')
tags = []
if isinstance(categories, ET.Element):
for category in xml_data.findall(namespace + 'category'):
if 'term' in category.attrib and category.attrib['term']:
category_term = category.attrib['term']
if len(category_term) < 20:
tags.append(category_term)
elif len(category_term) < 50:
tags.append(category_term)
if limit and len(tags) > 4: break
identifier = xml_data.find(namespace + 'id')
if identifier and identifier.attrib: print(identifier.attrib)
identifier_text = '' if identifier == None else identifier.text
instances = '' # TODO Check the Blasta database for instances.
entry = {'content' : content_text,
'href' : link_href,
'published' : published_text,
'summary' : summary_text,
'tags' : tags,
'title' : title_text,
'updated' : published_text} # TODO "Updated" is missing
return entry

2
jabbercard/version.py Normal file
View file

@ -0,0 +1,2 @@
__version__ = '0.1'
__version_info__ = (0, 1)

View file

@ -0,0 +1,22 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
from slixmpp import ClientXMPP
class XmppInstance(ClientXMPP):
def __init__(self, jid, password, jid_bare):
super().__init__(jid, password)
self.jid_bare = jid_bare
self.register_plugin('xep_0030') # XEP-0030: Service Discovery
self.register_plugin('xep_0045') # XEP-0045: Multi-User Chat
self.register_plugin('xep_0054') # XEP-0054: vcard-temp
self.register_plugin('xep_0060') # XEP-0060: Publish-Subscribe
self.register_plugin('xep_0369') # XEP-0369: Mediated Information eXchange (MIX)
self.add_event_handler("session_start", self.on_session_start)
async def on_session_start(self, event):
self.send_presence()
#self.disconnect()

View file

@ -0,0 +1,467 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
from jabbercard.config import Cache
from jabbercard.utilities.graphics import Graphics
from jabbercard.utilities.toml import Toml
from jabbercard.xmpp.instance import XmppInstance
from jabbercard.xmpp.xep_0030 import XmppXep0030
from jabbercard.xmpp.xep_0045 import XmppXep0045
from jabbercard.xmpp.xep_0054 import XmppXep0054
from jabbercard.xmpp.xep_0060 import XmppXep0060
from jabbercard.xmpp.xep_0369 import XmppXep0369
import os
from dateutil import parser
from slixmpp import stanza
import xml.etree.ElementTree as ET
class XmppUtilities:
async def cache_vcard_data(
jabber_id, password, jid_bare, node_name_vcard4, item_id_vcard4):
# Start an XMPP instance and retrieve information
xmpp_instance = XmppInstance(jabber_id, password, jid_bare)
xmpp_instance.connect()
vcard4_data = await XmppXep0060.get_node_items(
xmpp_instance, jid_bare, node_name_vcard4, item_ids=[item_id_vcard4])
xmpp_instance.disconnect()
if vcard4_data:
directory_cache = Cache.get_directory()
supdirectory = os.path.join(directory_cache, 'xep_0060', jid_bare)
if not os.path.exists(supdirectory): os.mkdir(supdirectory)
directory = os.path.join(directory_cache, 'xep_0060', jid_bare, node_name_vcard4)
if not os.path.exists(directory): os.mkdir(directory)
if isinstance(vcard4_data['iq'], stanza.iq.Iq):
iq = vcard4_data['iq']
for item in iq['pubsub']['items']:
filename = directory + item_id_vcard4 + '.xml'
xml_item_as_string = str(item)
Toml.save_to_file(filename, xml_item_as_string)
#item_payload = item['payload']
#vcard4_info = Syndication.extract_vcard4_items(item_payload)
async def cache_node_data(
jabber_id, password, jid_bare, node_name):
# Start an XMPP instance and retrieve information
xmpp_instance = XmppInstance(jabber_id, password, jid_bare)
xmpp_instance.connect()
node_items = await XmppXep0060.get_node_items(
xmpp_instance, jid_bare, node_name)
xmpp_instance.disconnect()
if node_items:
supdirectory = 'xep_0060/{}/'.format(jid_bare)
if not os.path.exists(supdirectory): os.mkdir(supdirectory)
directory = os.path.join(directory_cache, 'xep_0060', jid_bare, node_name)
if not os.path.exists(directory): os.mkdir(directory)
if isinstance(node_items['iq'], stanza.iq.Iq):
iq = node_items['iq']
namespace = '{http://www.w3.org/2005/Atom}'
for item in iq['pubsub']['items']:
item_payload = item['payload']
date_element = item_payload.find(namespace + 'updated')
if not date_element: date_element = item_payload.find(namespace + 'published')
if isinstance(date_element, ET.Element):
date = date_element.text
modification_time = parser.parse(date).timestamp()
filename = directory + item['id'] + '.xml'
xml_item_as_string = str(item)
Toml.save_to_file(filename, xml_item_as_string)
if isinstance(date_element, ET.Element):
file_statistics = os.stat(filename)
access_time = file_statistics.st_atime
os.utime(filename, (access_time, modification_time))
#item_payload = item['payload']
#entry = Syndication.extract_atom_items(item_payload)
async def cache_jid_data(
jabber_id, password, jid_bare, node_name=None, item_id=None, alias=None):
iq_disco_items_list = iq_disco_items_items_list = node_note = node_title = title = ''
jid_vcard = {
'name' : '',
'note' : '',
'type' : '',
'bin' : ''}
#filename = 'details/{}.toml'.format(jid_bare)
#if os.path.exists(filename): jid_details = Toml.open_file_toml(filename)
# Start an XMPP instance and retrieve information
xmpp_instance = XmppInstance(jabber_id, password, jid_bare)
xmpp_instance.connect()
# JID kind
print('JID kind')
instance = message = node_id = None
jid_info = await XmppXep0030.get_jid_info(xmpp_instance, jid_bare)
jid_info_iq = jid_info['iq']
jid_kind = jid_info['kind']
# Set node name to 'urn:xmpp:microblog:0' if JID is an account
if jid_kind == 'account' and not node_name: node_name = 'urn:xmpp:microblog:0'
# vCard4 data
node_name_vcard4 = 'urn:xmpp:vcard4'
item_id_vcard4 = 'current'
vcard4_data = await XmppXep0060.get_node_items(
xmpp_instance, jid_bare, node_name_vcard4, item_ids=[item_id_vcard4])
if vcard4_data:
directory_cache = Cache.get_directory()
supdirectory = os.path.join(directory_cache, 'xep_0060', jid_bare)
if not os.path.exists(supdirectory): os.mkdir(supdirectory)
directory = os.path.join(directory_cache, 'xep_0060', jid_bare, node_name_vcard4)
if not os.path.exists(directory): os.mkdir(directory)
if isinstance(vcard4_data['iq'], stanza.iq.Iq):
iq = vcard4_data['iq']
for item in iq['pubsub']['items']:
filename = directory + item_id_vcard4 + '.xml'
xml_item_as_string = str(item)
Toml.save_to_file(filename, xml_item_as_string)
#item_payload = item['payload']
#vcard4_info = Syndication.extract_vcard4_items(item_payload)
# JID info
print('JID info')
# NOTE Group chat of Psi+ Project at jabber.ru has a note in its vCard.
vcard_data = await XmppXep0054.get_vcard_data(xmpp_instance, jid_bare)
if not vcard_data['error']:
conference_title = None
if jid_kind in ('mix', 'muc'):
for identity in jid_info_iq['disco_info']['identities']:
if identity[3]:
conference_title = identity[3]
break
vcard_temp = vcard_data['iq']['vcard_temp']
jid_vcard = {
'name' : vcard_temp['FN'] or conference_title or '',
'note' : vcard_temp['notes'] or node_id or '',
'type' : vcard_temp['PHOTO']['TYPE'] or '',
'bin' : vcard_temp['PHOTO']['BINVAL'] or ''}
# TODO /d/pubsub.nicoco.fr/blog/urn-uuid-53e43061-1962-3112-bb8a-1473dca61719
count = ''
jid_items = await XmppXep0030.get_jid_items(xmpp_instance, jid_bare)
if isinstance(jid_items['iq'], stanza.iq.Iq):
iq = jid_items['iq']
iq_disco_items = iq['disco_items']
iq_disco_items_items = iq_disco_items['items']
#iq_disco_items_set = {''}
iq_disco_items_list = []
iq_disco_items_items_list = []
for item in iq_disco_items_items:
if jid_kind == 'muc':
#iq_disco_items_set.update([item[2]])
iq_disco_items_list.append(item[2])
else:
#iq_disco_items_set.update([item[1]])
iq_disco_items_list.append(item[1])
iq_disco_items_items_list.append(
[item[0] or '', item[1] or '', item[2] or ''])
#count = len(iq_disco_items_set)
count = len(iq_disco_items_list)
# Title
print('Title')
if (jid_kind not in ('conference', 'mix', 'muc') and
'@' in jid_bare and
not node_name):
# NOTE Variables node_name and node_title do not appear to be utilized.
node_name = 'urn:xmpp:microblog:0'
node_title = 'Journal'
elif jid_kind == 'pubsub':
category = 'unsorted'
for item in iq_disco_items_items:
if item[2] and item[1] == node_name:
node_title = item[2]
break
else:
jid_items = None
if jid_kind == 'server':
if jid_info_iq:
for identity in jid_info_iq['disco_info']['identities']:
if jid_kind == identity[0] and identity[1] == 'im' and identity[3]:
title = identity[3]
print(jid_bare)
print(identity)
print(jid_info)
# String 'undefined' is sourced from JID discuss@conference.conversejs.org
if not title:
if jid_vcard['name'] and not 'undefined' in jid_vcard['name']:
title = jid_vcard['name']
else:
title = jid_bare.split('@')[0]
# JID item count
#count = await XmppUtilities.count_jid_items(xmpp_instance, jid_bare, node_name, jid_kind)
#if jid_kind in ('mix', 'muc', 'conference', 'server'):
# jid_items = await XmppXep0030.get_jid_items(xmpp_instance, jid_bare)
# if isinstance(jid_items['iq'], stanza.iq.Iq):
# iq = jid_items['iq']
# count = len(iq['disco_items']['items'])
#elif jid_kind in ('account', 'pubsub'):
# node_item_ids = await XmppXep0060.get_node_item_ids(xmpp_instance, jid_bare, node_name)
# if isinstance(node_item_ids, stanza.iq.Iq):
# count = len(node_item_ids['disco_items']['items'])
# Group chat messages
print('Group chat messages')
messages = []
subject = ''
if jid_kind == 'muc':
#action = 'Join'
# TODO Create configurations for group chat preview
room_info_muc = await XmppXep0045.get_room_information(
xmpp_instance, jid_bare, alias, maxstanzas=50)
# NOTE Do not mix error messages with node titles and descriptions etc.
if isinstance(room_info_muc['iq'], tuple):
iq = room_info_muc['iq']
for message in iq[3]:
messages.append({
'id' : message['id'],
'alias' : message['mucnick'],
'body' : message['body'],
'timestamp' : message['delay']['stamp'].__str__()})
messages.reverse()
subject = iq[1]['subject']
#title = title or node_name
if not node_title: node_title = node_name
node_note = jid_bare
else:
message = '{}: {} (XEP-0045)'.format(room_info_muc['condition'], room_info_muc['text'])
elif jid_kind == 'mix':
room_info_muc = await XmppXep0369.get_room_information(
xmpp_instance, jid_bare, alias)
if isinstance(room_info_muc['iq'], tuple):
iq = room_info_muc['iq']
for message in iq[3]:
messages.append({
'id' : message['id'],
'alias' : message['mucnick'],
'body' : message['body'],
'timestamp' : message['delay']['stamp'].__str__()})
messages.reverse()
subject = iq[1]['subject']
#title = title or node_name
if not node_title: node_title = node_name
node_note = jid_bare
else:
message = '{}: {} (XEP-0369)'.format(room_info_muc['condition'], room_info_muc['text'])
# Node items
print('Node items')
nodes = {}
#if node_name and node_name in iq_disco_items_set:
if iq_disco_items_list and node_name and node_name in iq_disco_items_list:
#action = 'Browse'
node_item_ids = await XmppXep0060.get_node_item_ids(xmpp_instance, jid_bare, node_name)
if isinstance(node_item_ids['iq'], stanza.iq.Iq):
iq = node_item_ids['iq']
nodes[node_name] = {}
nodes[node_name]['title'] = node_title
nodes[node_name]['count'] = len(iq['disco_items']['items'])
nodes[node_name]['item_ids'] = []
for item_id in iq['disco_items']['items']:
nodes[node_name]['item_ids'].append(
[item_id[0] or '', item_id[1] or '', item_id[2] or ''])
item_ids = []
for item in nodes[node_name]['item_ids']:
item_ids.append(item[2])
node_items = await XmppXep0060.get_node_items(
xmpp_instance, jid_bare, node_name)
if node_items:
directory_cache = Cache.get_directory()
supdirectory = os.path.join(directory_cache, 'xep_0060', jid_bare)
if not os.path.exists(supdirectory): os.mkdir(supdirectory)
directory = os.path.join(directory_cache, 'xep_0060', jid_bare, node_name)
if not os.path.exists(directory): os.mkdir(directory)
if isinstance(node_items['iq'], stanza.iq.Iq):
iq = node_items['iq']
namespace = '{http://www.w3.org/2005/Atom}'
for item in iq['pubsub']['items']:
item_payload = item['payload']
date_element = item_payload.find(namespace + 'updated')
if not date_element: date_element = item_payload.find(namespace + 'published')
if isinstance(date_element, ET.Element):
date = date_element.text
modification_time = parser.parse(date).timestamp()
filename = directory + item['id'] + '.xml'
xml_item_as_string = str(item)
Toml.save_to_file(filename, xml_item_as_string)
if isinstance(date_element, ET.Element):
file_statistics = os.stat(filename)
access_time = file_statistics.st_atime
os.utime(filename, (access_time, modification_time))
#item_payload = item['payload']
#entry = Syndication.extract_atom_items(item_payload)
xmpp_instance.disconnect()
# Notes
print('Notes')
jid_vcard_note = jid_vcard['note']
if isinstance(jid_vcard_note, list) and len(jid_vcard_note):
note = jid_vcard_note[0]['NOTE']
else:
note = jid_vcard_note
#if not note and jid_vcard['name'] and not 'undefined' in jid_vcard['name'] and title != jid_vcard['name']:
# note = jid_vcard['name']
jid_details = {
'count' : count or '',
'error' : jid_info['error'],
'error_text' : jid_info['text'] or '',
'error_condition' : jid_info['condition'] or '',
'image_type' : jid_vcard['type'],
'items' : iq_disco_items_items_list,
'kind' : jid_kind or '',
'messages' : messages or '',
'name' : title,
'nodes' : nodes,
'note' : note or '',
'subject' : subject or ''}
print(jid_details)
# Query URI href
link_href = XmppUtilities.get_link_href(jid_bare, jid_kind, node_name)
Graphics.handle_photo(jid_bare, jid_vcard, link_href)
directory_cache = Cache.get_directory()
filename = os.path.join(directory_cache, 'details', jid_bare + '.toml')
Toml.save_to_toml(filename, jid_details)
return jid_details
def set_action_instance_type(jid_kind, node_name=None):
if jid_kind in ('conference', 'server'):
action = 'Discover'
if jid_kind == 'conference':
instance = 'conferences'
elif jid_kind == 'server':
instance = 'services'
elif jid_kind in ('mix', 'muc'):
action = 'Join'
instance = 'occupants'
elif jid_kind == 'pubsub':
if node_name:
action = 'Subscribe'
instance = 'articles'
else:
action = 'Browse'
instance = 'nodes'
elif jid_kind == 'account':
action = 'Message'
instance = 'articles'
else: # jid_info['error']
action = 'Contact'
return action, instance
def get_link_href(jid_bare, jid_kind, node_name=None):
if jid_kind in ('conference', 'server'):
link_href = 'xmpp:{}?disco;type=get;request=items'.format(jid_bare)
elif jid_kind in ('mix', 'muc'):
link_href = 'xmpp:{}?join'.format(jid_bare)
elif jid_kind == 'pubsub':
if node_name:
link_href = 'xmpp:{}?pubsub;node={};action=subscribe'.format(jid_bare, node_name)
else:
link_href = 'xmpp:{}?disco;type=get;request=items'.format(jid_bare)
elif jid_kind == 'account':
link_href = 'xmpp:{}?message'.format(jid_bare)
else: # jid_info['error']
link_href = 'xmpp:{}'.format(jid_bare)
return link_href
def get_view_href(jid_bare, jid_kind, node_name=None):
links = []
view_href = None
if jid_kind in ('conference', 'server'):
view_href = '/d/' + jid_bare
elif jid_kind in ('mix', 'muc'):
view_href = '/v/' + jid_bare
elif jid_kind == 'pubsub':
if node_name:
view_href = '/d/{}/{}'.format(jid_bare, node_name)
else:
view_href = '/d/' + jid_bare
elif jid_kind == 'account':
view_href = '/d/{}/{}'.format(jid_bare, node_name)
return view_href
def get_xmpp_uri(jid_bare, jid_kind, node_name=None):
links = []
view_href = None
xmpp_uri = jid_bare
if jid_kind in ('conference', 'server'):
xmpp_uri = jid_bare
elif jid_kind in ('mix', 'muc'):
xmpp_uri = jid_bare
elif jid_kind == 'pubsub':
if node_name:
xmpp_uri = '{}?;node={}'.format(jid_bare, node_name)
else:
xmpp_uri = jid_bare
elif jid_kind == 'account':
xmpp_uri = jid_bare
return xmpp_uri
def get_query_uri_links(jid_bare, jid_kind, node_name=None, item_id=None):
links = []
if jid_kind in ('conference', 'server'):
links.append({'name' : 'Discover',
'href' : 'xmpp:{}?disco;type=get;request=items'.format(jid_bare),
'iden' : 'discover'})
xmpp_uri = jid_bare
elif jid_kind in ('mix', 'muc'):
links.append({'name' : 'Join',
'href' : 'xmpp:{}?join'.format(jid_bare),
'iden' : 'join'})
elif jid_kind == 'pubsub':
links.append({'name' : 'Browse',
'href' : 'xmpp:{}?disco;type=get;request=items'.format(jid_bare),
'iden' : 'browse'})
elif jid_kind == 'account':
links.append({'name' : 'Message',
'href' : 'xmpp:{}?message'.format(jid_bare),
'iden' : 'message'})
links.append({'name' : 'Add',
'href' : 'xmpp:{}?roster'.format(jid_bare),
'iden' : 'add'})
else: # jid_info['error']
links.append({'name' : 'Connect',
'href' : 'xmpp:{}'.format(jid_bare),
'iden' : 'connect'})
links.append({'name' : 'Add',
'href' : 'xmpp:{}?roster'.format(jid_bare),
'iden' : 'add'})
if item_id:
links.append({'name' : 'Subscribe',
'href' : 'xmpp:{}?pubsub;node={};item={};action=subscribe'.format(jid_bare, node_name, item_id),
'iden' : 'subscribe'})
links.append({'name' : 'View',
'href' : 'xmpp:{}?pubsub;node={};item={}'.format(jid_bare, node_name, item_id),
'iden' : 'view'})
elif node_name:
links.append({'name' : 'Subscribe',
'href' : 'xmpp:{}?pubsub;node={};action=subscribe'.format(jid_bare, node_name),
'iden' : 'subscribe'})
links.append({'name' : 'View',
'href' : 'xmpp:{}?pubsub;node={}'.format(jid_bare, node_name),
'iden' : 'view'})
links.append({'name' : 'vCard',
'href' : 'xmpp:{}?vcard'.format(jid_bare),
'iden' : 'vcard'})
return links

109
jabbercard/xmpp/xep_0030.py Normal file
View file

@ -0,0 +1,109 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
from asyncio import TimeoutError
from slixmpp.exceptions import IqError, IqTimeout, PresenceError
class XmppXep0030:
async def get_jid_items(self, jid_bare):
try:
condition = text = ''
error = False
iq = await self['xep_0030'].get_items(jid=jid_bare)
except (IqError, IqTimeout) as e:
#logger.warning('Chat type could not be determined for {}'.format(jid_bare))
#logger.error(e)
iq = ''
error = True
condition = e.iq['error']['condition']
text = e.iq['error']['text'] or 'Error'
#if not text:
# # NOTE We might want to set a specific photo for condition remote-server-not-found
# if condition:
# text = 'Could not determine JID type'
# else:
# text = 'Unknown Error'
result = {
'condition' : condition,
'error' : error,
'iq' : iq,
'text' : text}
return result
# NOTE
# Feature "urn:xmpp:mucsub:0" is present in both, MUC local and MUC hostname
# Feature "urn:xmpp:serverinfo:0" is present in both, MUC hostname and main hostname
async def get_jid_info(self, jid_bare):
jid_kind = None
try:
error = False
condition = text = ''
iq = await self['xep_0030'].get_info(jid=jid_bare)
iq_disco_info = iq['disco_info']
if iq_disco_info:
features = iq_disco_info['features']
if 'http://jabber.org/protocol/muc#unique' in features:
jid_kind = 'conference'
elif 'urn:xmpp:mix:core:1' in features:
jid_kind = 'mix'
elif ('muc_moderated' in features or
'muc_open' in features or
'muc_persistent' in features or
'muc_public' in features or
'muc_semianonymous' in features or
'muc_unmoderated' in features or
'muc_unsecured' in features):
jid_kind = 'muc'
elif '@' in jid_bare:
for identity in iq_disco_info['identities']:
#if identity[0] == 'pubsub' and identity[1] == 'pep':
if identity[0] == 'account':
#if 'urn:xmpp:bookmarks:1#compat-pep' in features:
#if 'urn:xmpp:bookmarks:1#compat' in features:
#if 'urn:xmpp:push:0' in features:
#if 'urn:xmpp:pep-vcard-conversion:0' in features:
#if 'urn:xmpp:sid:0' in features:
# Also in MIX
#if 'urn:xmpp:mam:2' in features:
#if 'urn:xmpp:mam:2#extended' in features:
jid_kind = 'account'
break
if identity[0] == 'client' and identity[1] == 'bot':
jid_kind = 'bot'
else:
for identity in iq_disco_info['identities']:
if identity[0] == 'pubsub' and identity[1] == 'service':
#if 'http://jabber.org/protocol/pubsub' in features:
#if 'http://jabber.org/protocol/pubsub#access-authorize' in features:
#if 'http://jabber.org/protocol/rsm' in features:
jid_kind = 'pubsub'
break
if identity[0] == 'server' and identity[1] == 'im':
jid_kind = 'server'
break
#logger.info('Jabber ID: {}\n'
# 'Chat Type: {}'.format(jid_bare, result))
else:
iq = condition = text = ''
except (IqError, IqTimeout) as e:
#logger.warning('Chat type could not be determined for {}'.format(jid_bare))
#logger.error(e)
iq = ''
error = True
condition = e.iq['error']['condition']
text = e.iq['error']['text'] or 'Error'
#if not text:
# # NOTE We might want to set a specific photo for condition remote-server-not-found
# if condition:
# text = 'Could not determine JID type'
# else:
# text = 'Unknown Error'
result = {
'condition' : condition,
'error' : error,
'iq' : iq,
'text' : text,
'kind' : jid_kind}
return result

View file

@ -0,0 +1,58 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
from asyncio import TimeoutError
from datetime import datetime
from slixmpp.exceptions import IqError, IqTimeout, PresenceError
class XmppXep0045:
async def get_room_information(self, jid, alias, maxchars=None, maxstanzas=None, seconds=None):
#logger.info('Joining groupchat\nJID : {}\n'.format(jid))
#jid_from = str(self.boundjid) if self.is_component else None
if not maxchars: maxchars = 1000
if not maxstanzas: maxstanzas = 50
if not seconds: seconds = 864000
try:
error = False
condition = text = ''
#since = datetime.fromtimestamp(time.time()-seconds)
iq = await self['xep_0045'].join_muc_wait(
jid,
alias,
#maxchars=maxchars,
maxstanzas=maxstanzas,
#password=None,
#presence_options = {"pfrom" : jid_from},
#seconds=seconds,
#since=since,
timeout=10
)
except TimeoutError as e:
#raise HTTPException(status_code=504, detail='request-timeout-reached')
error = True
iq = e
condition = 'Request timeout reached'
text = str(e)
except (IqError, IqTimeout, PresenceError) as e:
error = True
iq = e
condition = e.iq['error']['condition']
text = e.iq['error']['text']
result = {
'error' : error,
'condition' : condition,
'text' : text,
'iq' : iq}
return result
async def get_room_data(self, jid_bare):
return await self['xep_0045'].get_room_config(jid_bare)
async def get_room_participants(self, jid_bare):
return await self['xep_0045'].get_roster(jid_bare)
# NOTE: "Item not found", yet is a group chat
# That is, JID has no vcard
# messaging-off@conference.movim.eu

View file

@ -0,0 +1,29 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
from asyncio import TimeoutError
from slixmpp.exceptions import IqError, IqTimeout, PresenceError
class XmppXep0054:
async def get_vcard_data(self, jid_bare):
try:
error = False
condition = text = ''
iq = await self['xep_0054'].get_vcard(jid_bare)
except (IqError, IqTimeout) as e:
error = True
condition = e.iq['error']['condition']
text = e.iq['error']['text']
if not text:
if condition:
text = 'Could not retrieve vCard'
else:
text = 'Unknown Error'
iq = ''
result = {
'error' : error,
'condition' : condition,
'text' : text,
'iq' : iq}
return result

View file

@ -0,0 +1,69 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
from asyncio import TimeoutError
from slixmpp.exceptions import IqError, IqTimeout, PresenceError
class XmppXep0060:
async def get_node_items(self, jid_bare, node_name, item_ids=None, max_items=None):
try:
error = False
condition = text = ''
if max_items:
iq = await self['xep_0060'].get_items(
jid_bare, node_name, timeout=5)
it = self['xep_0060'].get_items(
jid_bare, node_name, timeout=5, max_items=max_items, iterator=True)
q = rsm.Iq()
q['to'] = jid_bare
q['disco_items']['node'] = node_name
async for item in rsm.ResultIterator(q, 'disco_items', '10'):
print(item['disco_items']['items'])
else:
iq = await self['xep_0060'].get_items(
jid_bare, node_name, timeout=5, item_ids=item_ids)
result = iq
except (IqError, IqTimeout) as e:
error = True
iq = ''
condition = e.iq['error']['condition']
text = e.iq['error']['text']
if not text:
if condition:
text = 'Could not retrieve node items'
else:
text = 'Unknown Error'
result = {
'error' : error,
'condition' : condition,
'text' : text,
'iq' : iq}
return result
async def get_node_item_ids(self, jid_bare, node_name):
try:
error = False
condition = text = ''
iq = await self['xep_0030'].get_items(
jid_bare, node_name)
# Broken. See https://codeberg.org/poezio/slixmpp/issues/3548
#iq = await self['xep_0060'].get_item_ids(
# jid_bare, node_name, timeout=5)
except (IqError, IqTimeout) as e:
error = True
iq = ''
condition = e.iq['error']['condition']
text = e.iq['error']['text']
if not text:
if condition:
text = 'Could not retrieve node item IDs'
else:
text = 'Unknown Error'
result = {
'error' : error,
'condition' : condition,
'text' : text,
'iq' : iq}
return result

View file

@ -0,0 +1,34 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
from asyncio import TimeoutError
from slixmpp.exceptions import IqError, IqTimeout, PresenceError
class XmppXep0369:
async def get_room_information(self, jid, alias):
iq = await self['xep_0369'].join_channel(jid, alias)
breakpoint()
print('GOOD DAY! Please contact the developer!')
try:
error = False
condition = text = ''
#iq = await self['xep_0369'].get_channel_info(jid)
iq = await self['xep_0369'].join_channel(jid, alias)
except TimeoutError as e:
#raise HTTPException(status_code=504, detail='request-timeout-reached')
error = True
iq = e
condition = 'Request timeout reached'
text = str(e)
except (IqError, IqTimeout, PresenceError) as e:
error = True
iq = e
condition = e.iq['error']['condition']
text = e.iq['error']['text']
result = {
'error' : error,
'condition' : condition,
'text' : text,
'iq' : iq}
return result

View file

@ -1 +0,0 @@
This directory caches conference messages.

76
pyproject.toml Normal file
View file

@ -0,0 +1,76 @@
[build-system]
requires = ["setuptools>=61.2"]
build-backend = "setuptools.build_meta"
[project]
name = "JabberCard"
version = "0.1"
description = "An HTML based invitation and service explorer for XMPP."
authors = [{name = "Schimon Zachary", email = "sch@fedora.email"}]
license = {text = "MIT"}
classifiers = [
"Framework :: slixmpp",
"Intended Audience :: End Users/Desktop",
"License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
"License :: OSI Approved :: MIT License",
"Natural Language :: English",
"Programming Language :: Python",
"Programming Language :: Python :: 3.10",
"Topic :: Communications :: Chat",
"Topic :: Internet :: Extensible Messaging and Presence Protocol (XMPP)",
"Topic :: Internet :: WWW/HTTP :: Dynamic Content :: News/Diary",
"Topic :: Internet :: Instant Messaging",
"Topic :: Internet :: XMPP",
"Topic :: Office/Business :: News/Diary",
]
keywords = [
"atom",
"bot",
"chat",
"im",
"jabber",
"json",
"news",
"rdf",
"rss",
"syndication",
"xml",
"xmpp",
]
# urls = {Homepage = "https://gitgud.io/sjehuda/slixfeed"}
dependencies = [
"fastapi",
"jinja2",
"lxml",
"numpy",
"opencv-python",
"python-dateutil",
"qrcode",
"slixmpp",
"tomli", # Python 3.10
"tomli_w",
"uvicorn",
]
[project.urls]
Homepage = "https://schapps.woodpeckersnest.space/JabberCard"
Repository = "https://git.xmpp-it.net/sch/JabberCard"
Issues = "https://gitgud.io/sjehuda/JabberCard/issues"
[project.optional-dependencies]
omemo = ["slixmpp-omemo"]
proxy = ["pysocks"]
# [project.readme]
# text = "Slixfeed is a news aggregator bot for online news feeds. This program is primarily designed for XMPP"
[project.scripts]
jabbercard = "jabbercard.__main__:main"
[tool.setuptools]
platforms = ["any"]
[tool.setuptools.package-data]
"*" = ["*.toml"]

View file

@ -1 +0,0 @@
This directory caches QR code files.

View file

@ -1 +0,0 @@
This directory caches PubSub node items.