Improve PubSub support (WIP).

Improve MUC handling.
Add command "send" to add feeds to other JIDs.
Ad-Hoc: Add functionality to specify JId to commit actions on upon.
Add subscription preview upon adding a subscription.
Add project Jabber RSS Transport to bots list.
Add projects Psi and Psi+ to clients list.
This commit is contained in:
Schimon Jehudah 2024-03-26 16:23:22 +00:00
parent 35beab7802
commit 5507b16161
13 changed files with 1767 additions and 1134 deletions

View file

@ -49,8 +49,10 @@ from slixfeed.url import (
)
import slixfeed.task as task
from slixfeed.xmpp.bookmark import XmppBookmark
from slixfeed.xmpp.iq import XmppIQ
from slixfeed.xmpp.message import XmppMessage
from slixfeed.xmpp.presence import XmppPresence
from slixfeed.xmpp.publish import XmppPubsub
from slixfeed.xmpp.upload import XmppUpload
from slixfeed.xmpp.utility import get_chat_type
import sys
@ -134,7 +136,7 @@ if (await get_chat_type(self, jid_bare) == 'chat' and
"""
async def xmpp_send_status(self, jid):
async def xmpp_send_status_message(self, jid):
"""
Send status message.
@ -190,7 +192,50 @@ async def xmpp_send_status(self, jid):
# )
async def xmpp_send_update(self, jid, num=None):
async def xmpp_send_pubsub(self, jid_bare):
function_name = sys._getframe().f_code.co_name
logger.debug('{}: jid_bare: {}'.format(function_name, jid_bare))
jid_file = jid_bare.replace('/', '_')
db_file = config.get_pathname_to_database(jid_file)
enabled = Config.get_setting_value(self.settings, jid_bare, 'enabled')
if enabled:
subscriptions = sqlite.get_active_feeds_url(db_file)
for url in subscriptions:
url = url[0]
feed_id = sqlite.get_feed_id(db_file, url)
feed_id = feed_id[0]
feed_title = sqlite.get_feed_title(db_file, feed_id)
feed_title = feed_title[0]
feed_summary = None
node = sqlite.get_node_name(db_file, feed_id)
node = node[0]
xep = None
iq_create_node = XmppPubsub.create_node(
self, jid_bare, node, xep, feed_title, feed_summary)
await XmppIQ.send(self, iq_create_node)
entries = sqlite.get_unread_entries_of_feed(db_file, feed_id)
feed_properties = sqlite.get_feed_properties(db_file, feed_id)
feed_version = feed_properties[2]
for entry in entries:
feed_entry = {'author' : None,
'authors' : None,
'category' : None,
'content' : None,
'description' : entry[3],
'href' : entry[2],
'links' : entry[4],
'tags' : None,
'title' : entry[1],
'type' : None,
'updated' : entry[7]}
iq_create_entry = XmppPubsub.create_entry(
self, jid_bare, node, feed_entry, feed_version)
await XmppIQ.send(self, iq_create_entry)
ix = entry[0]
await sqlite.mark_as_read(db_file, ix)
async def xmpp_send_message(self, jid, num=None):
"""
Send news items as messages.
@ -272,7 +317,7 @@ async def xmpp_send_update(self, jid, num=None):
# TODO Do not refresh task before
# verifying that it was completed.
# await start_tasks_xmpp(self, jid, ['status'])
# await start_tasks_xmpp_chat(self, jid, ['status'])
# await refresh_task(self, jid, send_update, 'interval')
# interval = await initdb(
@ -767,145 +812,164 @@ async def import_opml(db_file, result):
return difference
async def add_feed(self, jid_bare, db_file, url):
async def add_feed(self, jid_bare, db_file, url, node):
function_name = sys._getframe().f_code.co_name
logger.debug('{}: db_file: {} url: {}'
.format(function_name, db_file, url))
while True:
exist = sqlite.get_feed_id_and_name(db_file, url)
if not exist:
result = await fetch.http(url)
message = result['message']
status_code = result['status_code']
if not result['error']:
document = result['content']
feed = parse(document)
# if is_feed(url, feed):
if is_feed(feed):
if "title" in feed["feed"].keys():
title = feed["feed"]["title"]
else:
title = urlsplit(url).netloc
if "language" in feed["feed"].keys():
language = feed["feed"]["language"]
else:
language = ''
if "encoding" in feed.keys():
encoding = feed["encoding"]
else:
encoding = ''
if "updated_parsed" in feed["feed"].keys():
updated = feed["feed"]["updated_parsed"]
try:
updated = dt.convert_struct_time_to_iso8601(updated)
except:
exist_feed = sqlite.get_feed_id_and_name(db_file, url)
if not exist_feed:
exist_node = sqlite.check_node_exist(db_file, node)
if not exist_node:
result = await fetch.http(url)
message = result['message']
status_code = result['status_code']
if not result['error']:
document = result['content']
feed = parse(document)
# if is_feed(url, feed):
if is_feed(feed):
if "title" in feed["feed"].keys():
title = feed["feed"]["title"]
else:
title = urlsplit(url).netloc
if "language" in feed["feed"].keys():
language = feed["feed"]["language"]
else:
language = ''
if "encoding" in feed.keys():
encoding = feed["encoding"]
else:
encoding = ''
if "updated_parsed" in feed["feed"].keys():
updated = feed["feed"]["updated_parsed"]
try:
updated = dt.convert_struct_time_to_iso8601(updated)
except:
updated = ''
else:
updated = ''
else:
updated = ''
version = feed["version"]
entries = len(feed["entries"])
await sqlite.insert_feed(db_file, url,
title=title,
entries=entries,
version=version,
encoding=encoding,
language=language,
status_code=status_code,
updated=updated)
await scan(self, jid_bare, db_file, url)
old = Config.get_setting_value(self.settings, jid_bare, 'old')
feed_id = sqlite.get_feed_id(db_file, url)
feed_id = feed_id[0]
if not old:
await sqlite.mark_feed_as_read(db_file, feed_id)
result_final = {'link' : url,
'index' : feed_id,
'name' : title,
'code' : status_code,
'error' : False,
'exist' : False}
break
# NOTE This elif statement be unnecessary
# when feedparser be supporting json feed.
elif is_feed_json(document):
feed = json.loads(document)
if "title" in feed.keys():
title = feed["title"]
else:
title = urlsplit(url).netloc
if "language" in feed.keys():
language = feed["language"]
else:
language = ''
if "encoding" in feed.keys():
encoding = feed["encoding"]
else:
encoding = ''
if "date_published" in feed.keys():
updated = feed["date_published"]
try:
updated = dt.convert_struct_time_to_iso8601(updated)
except:
updated = ''
else:
updated = ''
version = 'json' + feed["version"].split('/').pop()
entries = len(feed["items"])
await sqlite.insert_feed(db_file, url,
title=title,
entries=entries,
version=version,
encoding=encoding,
language=language,
status_code=status_code,
updated=updated)
await scan_json(self, jid_bare, db_file, url)
old = Config.get_setting_value(self.settings, jid_bare, 'old')
if not old:
version = feed["version"]
entries = len(feed["entries"])
await sqlite.insert_feed(db_file, url, title, node,
entries=entries,
version=version,
encoding=encoding,
language=language,
status_code=status_code,
updated=updated)
await scan(self, jid_bare, db_file, url)
old = Config.get_setting_value(self.settings, jid_bare, 'old')
feed_id = sqlite.get_feed_id(db_file, url)
feed_id = feed_id[0]
await sqlite.mark_feed_as_read(db_file, feed_id)
result_final = {'link' : url,
'index' : feed_id,
'name' : title,
'code' : status_code,
'error' : False,
'exist' : False}
break
else:
# NOTE Do not be tempted to return a compact dictionary.
# That is, dictionary within dictionary
# Return multiple dictionaries in a list or tuple.
result = await crawl.probe_page(url, document)
if not result:
# Get out of the loop with dict indicating error.
if not old:
await sqlite.mark_feed_as_read(db_file, feed_id)
result_final = {'link' : url,
'index' : None,
'name' : None,
'index' : feed_id,
'name' : title,
'code' : status_code,
'error' : True,
'error' : False,
'message': message,
'exist' : False}
'exist' : False,
'node' : None}
break
elif isinstance(result, list):
# Get out of the loop and deliver a list of dicts.
result_final = result
# NOTE This elif statement be unnecessary
# when feedparser be supporting json feed.
elif is_feed_json(document):
feed = json.loads(document)
if "title" in feed.keys():
title = feed["title"]
else:
title = urlsplit(url).netloc
if "language" in feed.keys():
language = feed["language"]
else:
language = ''
if "encoding" in feed.keys():
encoding = feed["encoding"]
else:
encoding = ''
if "date_published" in feed.keys():
updated = feed["date_published"]
try:
updated = dt.convert_struct_time_to_iso8601(updated)
except:
updated = ''
else:
updated = ''
version = 'json' + feed["version"].split('/').pop()
entries = len(feed["items"])
await sqlite.insert_feed(db_file, url, title, node,
entries=entries,
version=version,
encoding=encoding,
language=language,
status_code=status_code,
updated=updated)
await scan_json(self, jid_bare, db_file, url)
old = Config.get_setting_value(self.settings, jid_bare, 'old')
if not old:
feed_id = sqlite.get_feed_id(db_file, url)
feed_id = feed_id[0]
await sqlite.mark_feed_as_read(db_file, feed_id)
result_final = {'link' : url,
'index' : feed_id,
'name' : title,
'code' : status_code,
'error' : False,
'message': message,
'exist' : False,
'node' : None}
break
else:
# Go back up to the while loop and try again.
url = result['link']
# NOTE Do not be tempted to return a compact dictionary.
# That is, dictionary within dictionary
# Return multiple dictionaries in a list or tuple.
result = await crawl.probe_page(url, document)
if not result:
# Get out of the loop with dict indicating error.
result_final = {'link' : url,
'index' : None,
'name' : None,
'code' : status_code,
'error' : True,
'message': message,
'exist' : False,
'node' : None}
break
elif isinstance(result, list):
# Get out of the loop and deliver a list of dicts.
result_final = result
break
else:
# Go back up to the while loop and try again.
url = result['link']
else:
result_final = {'link' : url,
'index' : None,
'name' : None,
'code' : status_code,
'error' : True,
'message': message,
'exist' : False,
'node' : None}
break
else:
ix = exist_node[1]
node = exist_node[2]
message = 'Node is already allocated.'
result_final = {'link' : url,
'index' : None,
'index' : ix,
'name' : None,
'code' : status_code,
'error' : True,
'code' : None,
'error' : False,
'message': message,
'exist' : False}
'exist' : False,
'node' : node}
break
else:
ix = exist[0]
name = exist[1]
ix = exist_feed[0]
name = exist_feed[1]
message = 'URL already exist.'
result_final = {'link' : url,
'index' : ix,
@ -913,7 +977,8 @@ async def add_feed(self, jid_bare, db_file, url):
'code' : None,
'error' : False,
'message': message,
'exist' : True}
'exist' : True,
'node' : None}
break
return result_final

View file

@ -174,6 +174,16 @@ newsfeeds.
interface = "Groupchat"
url = "https://salsa.debian.org/mdosch/feed-to-muc"
[[friends]]
name = "Jabber RSS Transport"
info = ["""
Jabber RSS Transport is one of the possibilities to read news in Jabber \
via RSS, which allows you not to use a separate program, but to receive \
them directly in your favorite Jabber client.
"""]
interface = "Chat"
url = "https://jabberworld.info/Jabber_RSS_Transport"
[[friends]]
name = "JabRSS by Christof"
info = ["""
@ -648,15 +658,15 @@ name = "Poezio"
info = "XMPP client for console"
url = "https://poez.io"
# [[clients]]
# name = "Psi"
# info = "XMPP client for desktop"
# url = "https://psi-im.org"
[[clients]]
name = "Psi"
info = "XMPP client for desktop"
url = "https://psi-im.org"
# [[clients]]
# name = "Psi+"
# info = "XMPP client for desktop"
# url = "https://psi-plus.com"
[[clients]]
name = "Psi+"
info = "XMPP client for desktop"
url = "https://psi-plus.com"
# [[clients]]
# name = "Swift"

View file

@ -58,6 +58,18 @@ name = "Berlin XMPP Meetup"
link = "https://mov.im/?feed/pubsub.movim.eu/berlin-xmpp-meetup"
tags = ["event", "germany", "xmpp"]
[[feeds]]
lang = "de-de"
name = "blog | hasecke"
link = "https://www.hasecke.eu/index.xml"
tags = ["linux", "p2p", "software", "technology"]
[[feeds]]
lang = "de-de"
name = "heise online News"
link = "https://www.heise.de/rss/heise-atom.xml"
tags = ["computer", "industry", "electronics", "technology"]
[[feeds]]
lang = "de-de"
name = "CCC Event Blog"
@ -214,6 +226,12 @@ name = "La Quadrature du Net"
link = "https://www.laquadrature.net/en/feed/"
tags = ["news", "politics", "privacy", "surveillance"]
[[feeds]]
lang = "en-gb"
name = "op-co.de blog"
link = "https://op-co.de/blog/index.rss"
tags = ["code", "germany", "jabber", "mastodon", "telecommunication", "xmpp"]
[[feeds]]
lang = "en-gb"
name = "Pimux XMPP News"
@ -408,9 +426,9 @@ tags = ["health"]
[[feeds]]
lang = "en-us"
name = "Monal IM"
link = "https://monal-im.org/index.xml"
tags = ["iphone", "xmpp"]
name = "modernity"
link = "https://modernity.news/feed/"
tags = ["culture", "news", "politics", "usa"]
[[feeds]]
lang = "en-us"
@ -418,6 +436,12 @@ name = "Mom on a Mission"
link = "https://www.mom-on-a-mission.blog/all-posts?format=rss"
tags = ["family", "farming", "food", "gardening", "survival"]
[[feeds]]
lang = "en-us"
name = "Monal IM"
link = "https://monal-im.org/index.xml"
tags = ["iphone", "xmpp"]
[[feeds]]
lang = "en-us"
name = "NLnet News"
@ -778,18 +802,6 @@ name = "שיחה מקומית"
link = "https://www.mekomit.co.il/feed/"
tags = ["news", "politics"]
[[feeds]]
lang = "it-it"
name = "Diggita / Prima Pagina"
link = "https://diggita.com/rss.php"
tags = ["computer", "culture", "food", "technology"]
[[feeds]]
lang = "it-it"
name = "Feddit.it"
link = "https://feddit.it/feeds/local.xml?sort=Active"
tags = ["fediverse", "forum"]
[[feeds]]
lang = "it-it"
name = "A sysadmin's (mis)adventures"
@ -802,12 +814,30 @@ name = "Avvocato a Roma"
link = "https://www.studiosabatino.it/feed/"
tags = ["law"]
[[feeds]]
lang = "it-it"
name = "Diggita / Prima Pagina"
link = "https://diggita.com/rss.php"
tags = ["computer", "culture", "food", "technology"]
[[feeds]]
lang = "it-it"
name = "Disroot Blog"
link = "https://disroot.org/it/blog.atom"
tags = ["decentralization", "privacy"]
[[feeds]]
lang = "it-it"
name = "Feddit.it"
link = "https://feddit.it/feeds/local.xml?sort=Active"
tags = ["fediverse", "forum"]
[[feeds]]
lang = "it-it"
name = "Italian XMPP Happy Hour - Podcast"
link = "https://podcast.xmpp-it.net/api/v1/channels/xmpphappyhour/rss"
tags = ["decentralization", "privacy", "technology", "telecommunication", "xmpp"]
[[feeds]]
lang = "it-it"
name = "LinuxTrent"

View file

@ -167,6 +167,19 @@ def create_tables(db_file):
);
"""
)
feeds_pubsub_table_sql = (
"""
CREATE TABLE IF NOT EXISTS feeds_pubsub (
id INTEGER NOT NULL,
feed_id INTEGER NOT NULL UNIQUE,
node TEXT NOT NULL UNIQUE,
FOREIGN KEY ("feed_id") REFERENCES "feeds" ("id")
ON UPDATE CASCADE
ON DELETE CASCADE,
PRIMARY KEY (id)
);
"""
)
feeds_rules_table_sql = (
"""
CREATE TABLE IF NOT EXISTS feeds_rules (
@ -287,6 +300,7 @@ def create_tables(db_file):
cur.execute(feeds_table_sql)
cur.execute(feeds_state_table_sql)
cur.execute(feeds_properties_table_sql)
cur.execute(feeds_pubsub_table_sql)
cur.execute(feeds_rules_table_sql)
cur.execute(feeds_tags_table_sql)
cur.execute(filters_table_sql)
@ -390,6 +404,7 @@ async def add_metadata(db_file):
feed_id = ix[0]
insert_feed_status(cur, feed_id)
insert_feed_properties(cur, feed_id)
insert_feed_pubsub(cur, feed_id)
def insert_feed_status(cur, feed_id):
@ -452,7 +467,37 @@ def insert_feed_properties(cur, feed_id):
logger.error(e)
async def insert_feed(db_file, url, title=None, entries=None, version=None,
def insert_feed_pubsub(cur, feed_id):
"""
Set feed pubsub.
Parameters
----------
cur : object
Cursor object.
"""
function_name = sys._getframe().f_code.co_name
logger.debug('{}: feed_id: {}'
.format(function_name, feed_id))
sql = (
"""
INSERT
INTO feeds_pubsub(
feed_id)
VALUES(
?)
"""
)
par = (feed_id,)
try:
cur.execute(sql, par)
except IntegrityError as e:
logger.warning(
"Skipping feed_id {} for table feeds_pubsub".format(feed_id))
logger.error(e)
async def insert_feed(db_file, url, title, node, entries=None, version=None,
encoding=None, language=None, status_code=None,
updated=None):
"""
@ -464,8 +509,10 @@ async def insert_feed(db_file, url, title=None, entries=None, version=None,
Path to database file.
url : str
URL.
title : str, optional
Feed title. The default is None.
title : str
Feed title.
node : str
Feed Node.
entries : int, optional
Number of entries. The default is None.
version : str, optional
@ -533,6 +580,19 @@ async def insert_feed(db_file, url, title=None, entries=None, version=None,
feed_id, entries, version, encoding, language
)
cur.execute(sql, par)
sql = (
"""
INSERT
INTO feeds_pubsub(
feed_id, node)
VALUES(
?, ?)
"""
)
par = (
feed_id, node
)
cur.execute(sql, par)
async def insert_feed_(db_file, url, title=None, entries=None, version=None,
@ -699,7 +759,8 @@ def get_feeds_by_tag_id(db_file, tag_id):
FROM feeds
INNER JOIN feeds_tags ON feeds.id = feeds_tags.feed_id
INNER JOIN tags ON tags.id = feeds_tags.tag_id
WHERE tags.id = ?;
WHERE tags.id = ?
ORDER BY feeds.name;
"""
)
par = (tag_id,)
@ -734,7 +795,8 @@ def get_tags_by_feed_id(db_file, feed_id):
FROM tags
INNER JOIN feeds_tags ON tags.id = feeds_tags.tag_id
INNER JOIN feeds ON feeds.id = feeds_tags.feed_id
WHERE feeds.id = ?;
WHERE feeds.id = ?
ORDER BY tags.tag;
"""
)
par = (feed_id,)
@ -777,6 +839,109 @@ async def set_feed_id_and_tag_id(db_file, feed_id, tag_id):
cur.execute(sql, par)
def get_feed_properties(db_file, feed_id):
"""
Get properties of given feed.
Parameters
----------
db_file : str
Path to database file.
feed_id : str
Feed ID.
Returns
-------
node : str
Node name.
"""
function_name = sys._getframe().f_code.co_name
logger.debug('{}: db_file: {} feed_id: {}'
.format(function_name, db_file, feed_id))
with create_connection(db_file) as conn:
cur = conn.cursor()
sql = (
"""
SELECT *
FROM feeds_properties
WHERE feed_id = ?
"""
)
par = (feed_id,)
name = cur.execute(sql, par).fetchone()
return name
def get_node_name(db_file, feed_id):
"""
Get name of given node.
Parameters
----------
db_file : str
Path to database file.
feed_id : str
Feed ID.
Returns
-------
node : str
Node name.
"""
function_name = sys._getframe().f_code.co_name
logger.debug('{}: db_file: {} feed_id: {}'
.format(function_name, db_file, feed_id))
with create_connection(db_file) as conn:
cur = conn.cursor()
sql = (
"""
SELECT node
FROM feeds_pubsub
WHERE feed_id = ?
"""
)
par = (feed_id,)
name = cur.execute(sql, par).fetchone()
return name
def check_node_exist(db_file, node_name):
"""
Check whether node exist.
Parameters
----------
db_file : str
Path to database file.
node_name : str
Node name.
Returns
-------
id : str
ID.
feed_id : str
Feed ID.
node : str
Node name.
"""
function_name = sys._getframe().f_code.co_name
logger.debug('{}: db_file: {} node_name: {}'
.format(function_name, db_file, node_name))
with create_connection(db_file) as conn:
cur = conn.cursor()
sql = (
"""
SELECT id, feed_id, node
FROM feeds_pubsub
WHERE node = ?
"""
)
par = (node_name,)
name = cur.execute(sql, par).fetchone()
return name
def get_tag_id(db_file, tag_name):
"""
Get ID of given tag. Check whether tag exist.
@ -1327,6 +1492,62 @@ async def mark_entry_as_read(cur, ix):
cur.execute(sql, par)
def get_status_information_of_feed(db_file, feed_id):
"""
Get status information of given feed.
Parameters
----------
db_file : str
Path to database file.
feed_id : str
Feed Id.
"""
function_name = sys._getframe().f_code.co_name
logger.debug('{}: db_file: {} feed_id: {}'
.format(function_name, db_file, feed_id))
with create_connection(db_file) as conn:
cur = conn.cursor()
sql = (
"""
SELECT *
FROM feeds_state
WHERE feed_id = ?
"""
)
par = (feed_id,)
count = cur.execute(sql, par).fetchone()
return count
def get_unread_entries_of_feed(db_file, feed_id):
"""
Get entries of given feed.
Parameters
----------
db_file : str
Path to database file.
feed_id : str
Feed Id.
"""
function_name = sys._getframe().f_code.co_name
logger.debug('{}: db_file: {} feed_id: {}'
.format(function_name, db_file, feed_id))
with create_connection(db_file) as conn:
cur = conn.cursor()
sql = (
"""
SELECT *
FROM entries
WHERE read = 0 AND feed_id = ?
"""
)
par = (feed_id,)
count = cur.execute(sql, par).fetchall()
return count
def get_number_of_unread_entries_by_feed(db_file, feed_id):
"""
Count entries of given feed.
@ -2157,6 +2378,37 @@ def get_feeds_by_enabled_state(db_file, enabled_state):
return result
def get_feeds_and_enabled_state(db_file):
"""
Select table feeds and join column enabled.
Parameters
----------
db_file : str
Path to database file.
Returns
-------
result : tuple
List of URLs.
"""
function_name = sys._getframe().f_code.co_name
logger.debug('{}: db_file: {}'
.format(function_name, db_file))
with create_connection(db_file) as conn:
cur = conn.cursor()
sql = (
"""
SELECT feeds.*, feeds_state.enabled
FROM feeds
INNER JOIN feeds_state ON feeds.id = feeds_state.feed_id
ORDER BY feeds.name ASC
"""
)
result = cur.execute(sql).fetchall()
return result
def get_active_feeds_url(db_file):
"""
Query table feeds for active URLs.
@ -2211,6 +2463,7 @@ def get_tags(db_file):
"""
SELECT tag, id
FROM tags
ORDER BY tag
"""
)
result = cur.execute(sql).fetchall()
@ -2245,6 +2498,7 @@ def get_feeds(db_file):
"""
SELECT id, name, url
FROM feeds
ORDER BY name
"""
)
result = cur.execute(sql).fetchall()

View file

@ -3,6 +3,19 @@
"""
IMPORTANT CONSIDERATION
This file appears to be redundant and may be replaced by a dict handler that
would match task keyword to functions.
Or use it as a class Task
tasks_xmpp_chat = {"check" : check_updates,
"status" : task_status_message,
"interval" : task_message}
tasks_xmpp_pubsub = {"check" : check_updates,
"pubsub" : task_pubsub}
TODO
1) Deprecate "add" (see above) and make it interactive.
@ -86,6 +99,17 @@ loop = asyncio.get_event_loop()
# task_ping = asyncio.create_task(ping(self, jid=None))
class Task:
def start(self, jid_full, tasks=None):
asyncio.create_task()
def cancel(self, jid_full, tasks=None):
pass
def task_ping(self):
# global task_ping_instance
try:
@ -109,7 +133,49 @@ await taskhandler.start_tasks(
)
"""
async def start_tasks_xmpp(self, jid_bare, tasks=None):
async def start_tasks_xmpp_pubsub(self, jid_bare, tasks=None):
try:
self.task_manager[jid_bare]
except KeyError as e:
self.task_manager[jid_bare] = {}
logging.debug('KeyError:', str(e))
logging.info('Creating new task manager for JID {}'.format(jid_bare))
if not tasks:
tasks = ['check', 'publish']
logging.info('Stopping tasks {} for JID {}'.format(tasks, jid_bare))
for task in tasks:
# if self.task_manager[jid][task]:
try:
self.task_manager[jid_bare][task].cancel()
except:
logging.info('No task {} for JID {} (start_tasks_xmpp_chat)'
.format(task, jid_bare))
logging.info('Starting tasks {} for JID {}'.format(tasks, jid_bare))
for task in tasks:
# print("task:", task)
# print("tasks:")
# print(tasks)
# breakpoint()
match task:
case 'publish':
self.task_manager[jid_bare]['publish'] = asyncio.create_task(
task_publish(self, jid_bare))
case 'check':
self.task_manager[jid_bare]['check'] = asyncio.create_task(
check_updates(self, jid_bare))
async def task_publish(self, jid_bare):
jid_file = jid_bare.replace('/', '_')
db_file = config.get_pathname_to_database(jid_file)
if jid_bare not in self.settings:
Config.add_settings_jid(self.settings, jid_bare, db_file)
while True:
await action.xmpp_send_pubsub(self, jid_bare)
await asyncio.sleep(60 * 180)
async def start_tasks_xmpp_chat(self, jid_bare, tasks=None):
"""
NOTE
@ -133,7 +199,7 @@ async def start_tasks_xmpp(self, jid_bare, tasks=None):
try:
self.task_manager[jid_bare][task].cancel()
except:
logging.info('No task {} for JID {} (start_tasks_xmpp)'
logging.info('No task {} for JID {} (start_tasks_xmpp_chat)'
.format(task, jid_bare))
logging.info('Starting tasks {} for JID {}'.format(tasks, jid_bare))
for task in tasks:
@ -147,10 +213,10 @@ async def start_tasks_xmpp(self, jid_bare, tasks=None):
check_updates(self, jid_bare))
case 'status':
self.task_manager[jid_bare]['status'] = asyncio.create_task(
task_status(self, jid_bare))
task_status_message(self, jid_bare))
case 'interval':
self.task_manager[jid_bare]['interval'] = asyncio.create_task(
task_send(self, jid_bare))
task_message(self, jid_bare))
# for task in self.task_manager[jid].values():
# print("task_manager[jid].values()")
# print(self.task_manager[jid].values())
@ -162,12 +228,12 @@ async def start_tasks_xmpp(self, jid_bare, tasks=None):
# await task
async def task_status(self, jid):
await action.xmpp_send_status(self, jid)
refresh_task(self, jid, task_status, 'status', '90')
async def task_status_message(self, jid):
await action.xmpp_send_status_message(self, jid)
refresh_task(self, jid, task_status_message, 'status', '90')
async def task_send(self, jid_bare):
async def task_message(self, jid_bare):
jid_file = jid_bare.replace('/', '_')
db_file = config.get_pathname_to_database(jid_file)
if jid_bare not in self.settings:
@ -195,12 +261,12 @@ async def task_send(self, jid_bare):
await sqlite.update_last_update_time(db_file)
else:
await sqlite.set_last_update_time(db_file)
await action.xmpp_send_update(self, jid_bare)
refresh_task(self, jid_bare, task_send, 'interval')
await start_tasks_xmpp(self, jid_bare, ['status'])
await action.xmpp_send_message(self, jid_bare)
refresh_task(self, jid_bare, task_message, 'interval')
await start_tasks_xmpp_chat(self, jid_bare, ['status'])
def clean_tasks_xmpp(self, jid, tasks=None):
def clean_tasks_xmpp_chat(self, jid, tasks=None):
if not tasks:
tasks = ['interval', 'status', 'check']
logging.info('Stopping tasks {} for JID {}'.format(tasks, jid))

View file

@ -45,7 +45,9 @@ from urllib.parse import (
def get_hostname(url):
parted_url = urlsplit(url)
return parted_url.netloc
hostname = parted_url.netloc
if hostname.startswith('www.'): hostname = hostname.replace('www.', '')
return hostname
def replace_hostname(url, url_type):

View file

@ -1,2 +1,2 @@
__version__ = '0.1.47'
__version_info__ = (0, 1, 47)
__version__ = '0.1.48'
__version_info__ = (0, 1, 48)

File diff suppressed because it is too large Load diff

View file

@ -2189,14 +2189,14 @@ class XmppCommand:
status_type=status_type)
await asyncio.sleep(5)
key_list = ['check', 'status', 'interval']
await task.start_tasks_xmpp(self, jid_bare, key_list)
await task.start_tasks_xmpp_chat(self, jid_bare, key_list)
if (key == 'enabled' and
val == 0 and
str(is_enabled) == 1):
logger.info('Slixfeed has been disabled for {}'.format(jid_bare))
key_list = ['interval', 'status']
task.clean_tasks_xmpp(self, jid_bare, key_list)
task.clean_tasks_xmpp_chat(self, jid_bare, key_list)
status_type = 'xa'
status_message = '📪️ Send "Start" to receive updates'
XmppPresence.send(self, jid_bare, status_message,
@ -2212,10 +2212,10 @@ class XmppCommand:
# XmppPresence.send(self, jid, status_message,
# status_type=status_type)
# await asyncio.sleep(5)
# await task.start_tasks_xmpp(self, jid, ['check', 'status',
# await task.start_tasks_xmpp_chat(self, jid, ['check', 'status',
# 'interval'])
# else:
# task.clean_tasks_xmpp(self, jid, ['interval', 'status'])
# task.clean_tasks_xmpp_chat(self, jid, ['interval', 'status'])
# status_type = 'xa'
# status_message = '📪️ Send "Start" to receive Jabber updates'
# XmppPresence.send(self, jid, status_message,

File diff suppressed because it is too large Load diff

View file

@ -34,7 +34,8 @@ class XmppGroupchat:
self.join(self, inviter, jid)
def autojoin(self, bookmarks):
async def autojoin(self, bookmarks):
jid_from = str(self.boundjid) if self.is_component else None
conferences = bookmarks["private"]["bookmarks"]["conferences"]
for conference in conferences:
if conference["jid"] and conference["autojoin"]:
@ -42,13 +43,10 @@ class XmppGroupchat:
conference["nick"] = self.alias
logging.error('Alias (i.e. Nicknname) is missing for '
'bookmark {}'.format(conference['name']))
# jid_from = str(self.boundjid) if self.is_component else None
self.plugin['xep_0045'].join_muc(conference["jid"],
conference["nick"],
# pfrom=jid_from,
# If a room password is needed, use:
# password=the_room_password,
)
await self.plugin['xep_0045'].join_muc_wait(conference["jid"],
conference["nick"],
presence_options = {"pfrom" : jid_from},
password=None)
logging.info('Autojoin groupchat\n'
'Name : {}\n'
'JID : {}\n'
@ -61,7 +59,7 @@ class XmppGroupchat:
.format(conference['name']))
def join(self, inviter, jid):
async def join(self, inviter, jid):
# token = await initdb(
# muc_jid,
# sqlite.get_setting_value,
@ -86,15 +84,14 @@ class XmppGroupchat:
'Inviter : {}\n'
.format(jid, inviter))
jid_from = str(self.boundjid) if self.is_component else None
self.plugin['xep_0045'].join_muc(jid,
self.alias,
pfrom=jid_from
# If a room password is needed, use:
# password=the_room_password,
)
await self.plugin['xep_0045'].join_muc_wait(jid,
self.alias,
presence_options = {"pfrom" : jid_from},
password=None)
def leave(self, jid):
jid_from = str(self.boundjid) if self.is_component else None
message = ('This news bot will now leave this groupchat.\n'
'The JID of this news bot is xmpp:{}?message'
.format(self.boundjid.bare))
@ -108,4 +105,4 @@ class XmppGroupchat:
self.plugin['xep_0045'].leave_muc(jid,
self.alias,
status_message,
self.boundjid)
jid_from)

View file

@ -102,7 +102,7 @@ async def message(self, message):
message_text.lower().endswith('.opml')):
url = message_text
key_list = ['status']
task.clean_tasks_xmpp(self, jid_bare, key_list)
task.clean_tasks_xmpp_chat(self, jid_bare, key_list)
status_type = 'dnd'
status_message = '📥️ Procesing request to import feeds...'
# pending_tasks_num = len(self.pending_tasks[jid_bare])
@ -122,7 +122,7 @@ async def message(self, message):
del self.pending_tasks[jid_bare][pending_tasks_num]
# del self.pending_tasks[jid_bare][self.pending_tasks_counter]
key_list = ['status']
await task.start_tasks_xmpp(self, jid_bare, key_list)
await task.start_tasks_xmpp_chat(self, jid_bare, key_list)
XmppMessage.send_reply(self, message, response)
return
@ -321,11 +321,20 @@ async def message(self, message):
title = ' '.join(message_text.split(' ')[1:])
if not title:
title = uri.get_hostname(url)
counter = 0
hostname = uri.get_hostname(url)
node = hostname + ':' + str(counter)
while True:
if sqlite.check_node_exist(db_file, node):
counter += 1
node = hostname + ':' + str(counter)
else:
break
if url.startswith('http'):
db_file = config.get_pathname_to_database(jid_file)
exist = sqlite.get_feed_id_and_name(db_file, url)
if not exist:
await sqlite.insert_feed(db_file, url, title)
await sqlite.insert_feed(db_file, url, title, node)
await action.scan(self, jid_bare, db_file, url)
if jid_bare not in self.settings:
Config.add_settings_jid(self.settings, jid_bare,
@ -333,10 +342,10 @@ async def message(self, message):
old = Config.get_setting_value(self.settings, jid_bare,
'old')
if old:
# task.clean_tasks_xmpp(self, jid_bare, ['status'])
# task.clean_tasks_xmpp_chat(self, jid_bare, ['status'])
# await send_status(jid)
key_list = ['status']
await task.start_tasks_xmpp(self, jid_bare, key_list)
await task.start_tasks_xmpp_chat(self, jid_bare, key_list)
else:
feed_id = sqlite.get_feed_id(db_file, url)
feed_id = feed_id[0]
@ -541,7 +550,7 @@ async def message(self, message):
del self.pending_tasks[jid_bare][pending_tasks_num]
# del self.pending_tasks[jid_bare][self.pending_tasks_counter]
key_list = ['status']
await task.start_tasks_xmpp(self, jid_bare, key_list)
await task.start_tasks_xmpp_chat(self, jid_bare, key_list)
else:
response = ('Unsupported filetype.\n'
'Try: md or opml')
@ -645,7 +654,7 @@ async def message(self, message):
del self.pending_tasks[jid_bare][pending_tasks_num]
# del self.pending_tasks[jid_bare][self.pending_tasks_counter]
key_list = ['status']
await task.start_tasks_xmpp(self, jid_bare, key_list)
await task.start_tasks_xmpp_chat(self, jid_bare, key_list)
if response:
logging.warning('Error for URL {}: {}'.format(url, error))
XmppMessage.send_reply(self, message, response)
@ -653,7 +662,7 @@ async def message(self, message):
message_lowercase.endswith('.opml')):
url = message_text
key_list = ['status']
task.clean_tasks_xmpp(self, jid_bare, key_list)
task.clean_tasks_xmpp_chat(self, jid_bare, key_list)
status_type = 'dnd'
status_message = '📥️ Procesing request to import feeds...'
# pending_tasks_num = len(self.pending_tasks[jid_bare])
@ -674,12 +683,109 @@ async def message(self, message):
del self.pending_tasks[jid_bare][pending_tasks_num]
# del self.pending_tasks[jid_bare][self.pending_tasks_counter]
key_list = ['status']
await task.start_tasks_xmpp(self, jid_bare, key_list)
await task.start_tasks_xmpp_chat(self, jid_bare, key_list)
XmppMessage.send_reply(self, message, response)
# TODO Handle node error
# sqlite3.IntegrityError: UNIQUE constraint failed: feeds_pubsub.node
# ERROR:slixmpp.basexmpp:UNIQUE constraint failed: feeds_pubsub.node
case _ if message_lowercase.startswith('send '):
if is_operator(self, jid_bare):
info = message_text[5:].split(' ')
if len(info) > 1:
jid = info[0]
if '/' not in jid:
url = info[1]
db_file = config.get_pathname_to_database(jid)
if len(info) > 2:
node = info[2]
else:
counter = 0
hostname = uri.get_hostname(url)
node = hostname + ':' + str(counter)
while True:
if sqlite.check_node_exist(db_file, node):
counter += 1
node = hostname + ':' + str(counter)
else:
break
# task.clean_tasks_xmpp_chat(self, jid_bare, ['status'])
status_type = 'dnd'
status_message = ('📫️ Processing request to fetch data from {}'
.format(url))
# pending_tasks_num = len(self.pending_tasks[jid_bare])
pending_tasks_num = randrange(10000, 99999)
self.pending_tasks[jid_bare][pending_tasks_num] = status_message
# self.pending_tasks_counter += 1
# self.pending_tasks[jid_bare][self.pending_tasks_counter] = status_message
XmppPresence.send(self, jid_bare, status_message,
status_type=status_type)
if url.startswith('feed:'):
url = uri.feed_to_http(url)
url = (uri.replace_hostname(url, 'feed')) or url
result = await action.add_feed(self, jid_bare, db_file, url, node)
if isinstance(result, list):
results = result
response = ("Web feeds found for {}\n\n```\n"
.format(url))
for result in results:
response += ("Title : {}\n"
"Link : {}\n"
"\n"
.format(result['name'], result['link']))
response += ('```\nTotal of {} feeds.'
.format(len(results)))
elif result['exist']:
response = ('> {}\nNews source "{}" is already '
'listed in the subscription list at '
'index {}'
.format(result['link'],
result['name'],
result['index']))
elif result['node']:
response = ('> {}\nNode "{}" is already '
'allocated to index {}'
.format(result['link'],
result['node'],
result['index']))
elif result['error']:
response = ('> {}\nFailed to find subscriptions. '
'Reason: {} (status code: {})'
.format(url, result['message'],
result['code']))
else:
response = ('> {}\nNews source "{}" has been '
'added to subscription list.'
.format(result['link'], result['name']))
# task.clean_tasks_xmpp_chat(self, jid_bare, ['status'])
del self.pending_tasks[jid_bare][pending_tasks_num]
# del self.pending_tasks[jid_bare][self.pending_tasks_counter]
print(self.pending_tasks)
key_list = ['status']
await task.start_tasks_xmpp_chat(self, jid_bare, key_list)
# except:
# response = (
# '> {}\nNews source is in the process '
# 'of being added to the subscription '
# 'list.'.format(url)
# )
else:
response = ('No action has been taken.'
'\n'
'JID Must not include "/".')
else:
response = ('No action has been taken.'
'\n'
'Missing argument. '
'Enter PubSub JID and subscription URL '
'(and optionally: NodeName).')
else:
response = ('This action is restricted. '
'Type: adding node.')
XmppMessage.send_reply(self, message, response)
case _ if (message_lowercase.startswith('http') or
message_lowercase.startswith('feed:')):
url = message_text
# task.clean_tasks_xmpp(self, jid_bare, ['status'])
# task.clean_tasks_xmpp_chat(self, jid_bare, ['status'])
status_type = 'dnd'
status_message = ('📫️ Processing request to fetch data from {}'
.format(url))
@ -694,8 +800,17 @@ async def message(self, message):
url = uri.feed_to_http(url)
url = (uri.replace_hostname(url, 'feed')) or url
db_file = config.get_pathname_to_database(jid_file)
counter = 0
hostname = uri.get_hostname(url)
node = hostname + ':' + str(counter)
while True:
if sqlite.check_node_exist(db_file, node):
counter += 1
node = hostname + ':' + str(counter)
else:
break
# try:
result = await action.add_feed(self, jid_bare, db_file, url)
result = await action.add_feed(self, jid_bare, db_file, url, node)
if isinstance(result, list):
results = result
response = ("Web feeds found for {}\n\n```\n"
@ -722,12 +837,12 @@ async def message(self, message):
response = ('> {}\nNews source "{}" has been '
'added to subscription list.'
.format(result['link'], result['name']))
# task.clean_tasks_xmpp(self, jid_bare, ['status'])
# task.clean_tasks_xmpp_chat(self, jid_bare, ['status'])
del self.pending_tasks[jid_bare][pending_tasks_num]
# del self.pending_tasks[jid_bare][self.pending_tasks_counter]
print(self.pending_tasks)
key_list = ['status']
await task.start_tasks_xmpp(self, jid_bare, key_list)
await task.start_tasks_xmpp_chat(self, jid_bare, key_list)
# except:
# response = (
# '> {}\nNews source is in the process '
@ -768,8 +883,8 @@ async def message(self, message):
self.settings, jid_bare, db_file, key, val_new)
# NOTE Perhaps this should be replaced by functions
# clean and start
task.refresh_task(self, jid_bare, task.task_send, key,
val_new)
task.refresh_task(self, jid_bare,
task.task_message, key, val_new)
response = ('Updates will be sent every {} minutes '
'(was: {}).'.format(val_new, val_old))
except:
@ -870,11 +985,11 @@ async def message(self, message):
case _ if message_lowercase.startswith('next'):
num = message_text[5:]
if num:
await action.xmpp_send_update(self, jid_bare, num)
await action.xmpp_send_message(self, jid_bare, num)
else:
await action.xmpp_send_update(self, jid_bare)
await action.xmpp_send_message(self, jid_bare)
key_list = ['status']
await task.start_tasks_xmpp(self, jid_bare, key_list)
await task.start_tasks_xmpp_chat(self, jid_bare, key_list)
case 'old':
db_file = config.get_pathname_to_database(jid_file)
key = 'old'
@ -932,7 +1047,7 @@ async def message(self, message):
url = data[0]
if url:
key_list = ['status']
task.clean_tasks_xmpp(self, jid_bare, key_list)
task.clean_tasks_xmpp_chat(self, jid_bare, key_list)
status_type = 'dnd'
status_message = ('📫️ Processing request to fetch data '
'from {}'.format(url))
@ -1028,7 +1143,7 @@ async def message(self, message):
XmppMessage.send_reply(self, message, response)
del self.pending_tasks[jid_bare][pending_tasks_num]
key_list = ['status']
await task.start_tasks_xmpp(self, jid_bare, key_list)
await task.start_tasks_xmpp_chat(self, jid_bare, key_list)
case _ if message_lowercase.startswith('recent '):
num = message_text[7:]
if num:
@ -1087,20 +1202,20 @@ async def message(self, message):
'News source does not exist. '
.format(url))
# refresh_task(self, jid_bare, send_status, 'status', 20)
# task.clean_tasks_xmpp(self, jid_bare, ['status'])
# task.clean_tasks_xmpp_chat(self, jid_bare, ['status'])
key_list = ['status']
await task.start_tasks_xmpp(self, jid_bare, key_list)
await task.start_tasks_xmpp_chat(self, jid_bare, key_list)
XmppMessage.send_reply(self, message, response)
else:
response = ('No action has been taken.'
'\n'
'Missing argument. '
'Enter feed URL or index number.')
'Enter subscription URL or index number.')
case _ if message_lowercase.startswith('reset'):
# TODO Reset also by ID
ix_url = message_text[6:]
key_list = ['status']
task.clean_tasks_xmpp(self, jid_bare, key_list)
task.clean_tasks_xmpp_chat(self, jid_bare, key_list)
status_type = 'dnd'
status_message = '📫️ Marking entries as read...'
# pending_tasks_num = len(self.pending_tasks[jid_bare])
@ -1152,7 +1267,7 @@ async def message(self, message):
del self.pending_tasks[jid_bare][pending_tasks_num]
# del self.pending_tasks[jid_bare][self.pending_tasks_counter]
key_list = ['status']
await task.start_tasks_xmpp(self, jid_bare, key_list)
await task.start_tasks_xmpp_chat(self, jid_bare, key_list)
case _ if message_lowercase.startswith('search '):
query = message_text[7:]
if query:
@ -1179,7 +1294,7 @@ async def message(self, message):
status_type=status_type)
await asyncio.sleep(5)
key_list = ['check', 'status', 'interval']
await task.start_tasks_xmpp(self, jid_bare, key_list)
await task.start_tasks_xmpp_chat(self, jid_bare, key_list)
response = 'Updates are enabled.'
XmppMessage.send_reply(self, message, response)
case 'stats':
@ -1204,7 +1319,7 @@ async def message(self, message):
.format(feed_id))
XmppMessage.send_reply(self, message, response)
key_list = ['status']
await task.start_tasks_xmpp(self, jid_bare, key_list)
await task.start_tasks_xmpp_chat(self, jid_bare, key_list)
case _ if message_lowercase.startswith('rename '):
message_text = message_text[7:]
feed_id = message_text.split(' ')[0]
@ -1264,7 +1379,7 @@ async def message(self, message):
await Config.set_setting_value(
self.settings, jid_bare, db_file, key, val)
key_list = ['interval', 'status']
task.clean_tasks_xmpp(self, jid_bare, key_list)
task.clean_tasks_xmpp_chat(self, jid_bare, key_list)
status_type = 'xa'
status_message = '📪️ Send "Start" to receive Jabber updates'
XmppPresence.send(self, jid_bare, status_message,

View file

@ -1,7 +1,11 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# TODO Implement XEP-0472: Pubsub Social Feed
"""
Functions create_node and create_entry are derived from project atomtopubsub.
"""
import hashlib
import slixmpp.plugins.xep_0060.stanza.pubsub as pubsub
@ -10,7 +14,21 @@ from slixmpp.xmlstream import ET
class XmppPubsub:
# TODO Make use of var "xep" with match/case
async def get_pubsub_services(self):
jids = [self.boundjid.bare]
iq = await self['xep_0030'].get_items(jid=self.boundjid.domain)
items = iq['disco_items']['items']
for item in items:
iq = await self['xep_0030'].get_info(jid=item[0])
identities = iq['disco_info']['identities']
for identity in identities:
if identity[0] == 'pubsub' and identity[1] == 'service':
jid = item[0]
jids.extend([jid])
return jids
# TODO Make use of var "xep" with match/case (XEP-0060, XEP-0277, XEP-0472)
def create_node(self, jid, node, xep ,title, summary=None):
jid_from = str(self.boundjid) if self.is_component else None
iq = self.Iq(stype='set',
@ -61,7 +79,7 @@ class XmppPubsub:
# It provides nicer urls.
# Respond to atomtopubsub:
# I think it would be a good idea to use md5 checksum of Url as Id for
# I think it would be beneficial to use md5 checksum of Url as Id for
# cross reference, and namely - in another project to utilize PubSub as
# links sharing system (see del.icio.us) - to share node entries.
@ -76,10 +94,10 @@ class XmppPubsub:
node_entry.set('xmlns', 'http://www.w3.org/2005/Atom')
title = ET.SubElement(node_entry, "title")
title.text = entry.title
title.text = entry['title']
updated = ET.SubElement(node_entry, "updated")
updated.text = entry.updated
updated.text = entry['updated']
# Content
if version == 'atom3':