JabberCard/jabbercard/xmpp/xep_0060.py
Schimon Jehudah, Adv. 5e495579c2 Add file PyProject;
Support display of a single pubsub node item;
Update document README;
Modularize code;
2024-11-17 17:30:38 +02:00

69 lines
2.4 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
from asyncio import TimeoutError
from slixmpp.exceptions import IqError, IqTimeout, PresenceError
class XmppXep0060:
async def get_node_items(self, jid_bare, node_name, item_ids=None, max_items=None):
try:
error = False
condition = text = ''
if max_items:
iq = await self['xep_0060'].get_items(
jid_bare, node_name, timeout=5)
it = self['xep_0060'].get_items(
jid_bare, node_name, timeout=5, max_items=max_items, iterator=True)
q = rsm.Iq()
q['to'] = jid_bare
q['disco_items']['node'] = node_name
async for item in rsm.ResultIterator(q, 'disco_items', '10'):
print(item['disco_items']['items'])
else:
iq = await self['xep_0060'].get_items(
jid_bare, node_name, timeout=5, item_ids=item_ids)
result = iq
except (IqError, IqTimeout) as e:
error = True
iq = ''
condition = e.iq['error']['condition']
text = e.iq['error']['text']
if not text:
if condition:
text = 'Could not retrieve node items'
else:
text = 'Unknown Error'
result = {
'error' : error,
'condition' : condition,
'text' : text,
'iq' : iq}
return result
async def get_node_item_ids(self, jid_bare, node_name):
try:
error = False
condition = text = ''
iq = await self['xep_0030'].get_items(
jid_bare, node_name)
# Broken. See https://codeberg.org/poezio/slixmpp/issues/3548
#iq = await self['xep_0060'].get_item_ids(
# jid_bare, node_name, timeout=5)
except (IqError, IqTimeout) as e:
error = True
iq = ''
condition = e.iq['error']['condition']
text = e.iq['error']['text']
if not text:
if condition:
text = 'Could not retrieve node item IDs'
else:
text = 'Unknown Error'
result = {
'error' : error,
'condition' : condition,
'text' : text,
'iq' : iq}
return result