Add functionality to export feeds to OPML

This commit is contained in:
Schimon Jehudah 2024-01-03 10:37:33 +00:00
parent 9843337e43
commit a3d68f6a8f
11 changed files with 233 additions and 62 deletions

View file

@ -106,8 +106,11 @@ class Jabber:
xmpp.register_plugin('xep_0048') # Bookmarks
xmpp.register_plugin('xep_0060') # Publish-Subscribe
# xmpp.register_plugin('xep_0065') # SOCKS5 Bytestreams
xmpp.register_plugin('xep_0066') # Out of Band Data
xmpp.register_plugin('xep_0071') # XHTML-IM
xmpp.register_plugin('xep_0199', {'keepalive': True}) # XMPP Ping
xmpp.register_plugin('xep_0249') # Multi-User Chat
xmpp.register_plugin('xep_0363') # HTTP File Upload
xmpp.register_plugin('xep_0402') # PEP Native Bookmarks
# proxy_enabled = get_value("accounts", "XMPP Connect", "proxy_enabled")

View file

@ -5,30 +5,25 @@
TODO
1) Use file settings.csv and pathnames.txt instead:
See get_value_default and get_default_list
1) Website-specific filter (i.e. audiobookbay).
2) Website-specific filter (i.e. audiobookbay).
2) Exclude websites from being subjected to filtering (e.g. metapedia).
3) Exclude websites from filtering (e.g. metapedia).
4) Filter phrases:
3) Filter phrases:
Refer to sqlitehandler.search_entries for implementation.
It is expected to be more complex than function search_entries.
5) Copy file from /etc/slixfeed/ or /usr/share/slixfeed/
4) Copy file from /etc/slixfeed/ or /usr/share/slixfeed/
"""
import configparser
# from file import get_default_confdir
import slixfeed.config as config
import slixfeed.sqlite as sqlite
import os
# from random import randrange
import sys
import yaml
import logging
def get_value(filename, section, keys):
"""
@ -48,8 +43,9 @@ def get_value(filename, section, keys):
result : list or str
A single value as string or multiple values as list.
"""
result = None
config_res = configparser.RawConfigParser()
config_dir = config.get_default_confdir()
config_dir = get_default_confdir()
# if not os.path.isdir(config_dir):
# config_dir = '/usr/share/slixfeed/'
if not os.path.isdir(config_dir):
@ -63,17 +59,27 @@ def get_value(filename, section, keys):
for key in keys:
try:
value = section_res[key]
logging.debug("Found value {} for key {}".format(
value, key))
except:
print("Missing key:", key)
value = ''
logging.error("Missing key:", key)
result.extend([value])
elif isinstance(keys, str):
key = keys
try:
result = section_res[key]
logging.debug("Found value {} for key {}".format(
value, key))
except:
print("Missing key:", key)
result = ''
# logging.error("Missing key:", key)
if result == None:
logging.error(
"Check configuration file {} for "
"missing key {} under section {}.".format(
filename, section, keys))
else:
return result
@ -96,7 +102,7 @@ def get_value_default(filename, section, key):
Value.
"""
config_res = configparser.RawConfigParser()
config_dir = config.get_default_confdir()
config_dir = get_default_confdir()
if not os.path.isdir(config_dir):
config_dir = '/usr/share/slixfeed/'
config_file = os.path.join(config_dir, filename + ".ini")
@ -120,7 +126,7 @@ def get_list(filename):
result : list
List of pathnames or keywords.
"""
config_dir = config.get_default_confdir()
config_dir = get_default_confdir()
if not os.path.isdir(config_dir):
config_dir = '/usr/share/slixfeed/'
config_file = os.path.join(config_dir, filename)

View file

@ -15,7 +15,7 @@ def now():
Returns
-------
date : ?
date : ???
ISO 8601 Timestamp.
"""
date = datetime.now().isoformat()
@ -28,7 +28,7 @@ def current_time():
Returns
-------
date : ?
date : str
HH:MM:SS timestamp.
"""
now = datetime.now()
@ -36,6 +36,20 @@ def current_time():
return time
def timestamp():
"""
Print time stamp to be used in filename.
Returns
-------
formatted_time : str
%Y%m%d-%H%M%S timestamp.
"""
now = datetime.now()
formatted_time = now.strftime("%Y%m%d-%H%M%S")
return formatted_time
def validate(date):
"""
Validate date format.

75
slixfeed/opml.py Normal file
View file

@ -0,0 +1,75 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
{
'bozo': False,
'bozo_exception': None,
'feeds': [
{
'url': 'https://kurtmckee.org/tag/listparser/feed',
'title': 'listparser blog',
'categories': [],
'tags': []
},
{
'url': 'https://github.com/kurtmckee/listparser/commits/develop.atom',
'title': 'listparser changelog',
'categories': [],
'tags': []
}
],
'lists': [],
'opportunities': [],
'meta': {
'title': 'listparser project feeds',
'author': {
'name': 'Kurt McKee',
'email': 'contactme@kurtmckee.org',
'url': 'https://kurtmckee.org/'
}
},
'version': 'opml2'
}
"""
import listparser
import lxml
async def import_from_file(db_file, opml_doc):
feeds = listparser.parse(opml_doc)['feeds']
for feed in feeds:
url = feed['url']
title = feed['title']
# categories = feed['categories']
# tags = feed['tags']
# await datahandler.add_feed_no_check(db_file, [url, title])
from slixfeed.datetime import current_time
import xml.etree.ElementTree as ET
# NOTE Use OPyML or LXML
async def export_to_file(jid, filename, results):
root = ET.Element("opml")
root.set("version", "1.0")
head = ET.SubElement(root, "head")
ET.SubElement(head, "title").text = "Subscriptions for {}".format(jid)
ET.SubElement(head, "description").text = (
"Set of feeds exported with Slixfeed.")
ET.SubElement(head, "generator").text = "Slixfeed"
ET.SubElement(head, "urlPublic").text = (
"https://gitgud.io/sjehuda/slixfeed")
time_stamp = current_time()
ET.SubElement(head, "dateCreated").text = time_stamp
ET.SubElement(head, "dateModified").text = time_stamp
body = ET.SubElement(root, "body")
for result in results:
outline = ET.SubElement(body, "outline")
outline.set("text", result[0])
outline.set("xmlUrl", result[1])
# outline.set("type", result[2])
tree = ET.ElementTree(root)
tree.write(filename)

View file

@ -836,9 +836,9 @@ async def add_entry(cur, entry):
try:
cur.execute(sql, entry)
except:
print("")
# print("Unknown error for sqlite.add_entry")
# print(entry)
# None
print("Unknown error for sqlite.add_entry")
print(entry)
# print(current_time(), "COROUTINE OBJECT NOW")
# for i in entry:
# print(type(i))

View file

@ -86,7 +86,7 @@ await taskhandler.start_tasks(
"""
async def start_tasks_xmpp(self, jid, tasks):
# print("start_tasks_xmpp", jid, tasks)
logging.debug("Starting tasks {} for JID {}".format(tasks, jid))
task_manager[jid] = {}
for task in tasks:
# print("task:", task)
@ -115,13 +115,14 @@ async def start_tasks_xmpp(self, jid, tasks):
async def clean_tasks_xmpp(jid, tasks):
# print("clean_tasks_xmpp", jid, tasks)
logging.debug("Stopping tasks {} for JID {}".format(tasks, jid))
for task in tasks:
# if task_manager[jid][task]:
try:
task_manager[jid][task].cancel()
except:
print("No task", task, "for JID", jid, "(clean_tasks)")
logging.debug(
"No task {} for JID {} (clean_tasks)".format(task, jid))
"""
@ -138,7 +139,6 @@ Consider callback e.g. Slixfeed.send_status.
Or taskhandler for each protocol or specific taskhandler function.
"""
async def task_jid(self, jid):
# print("task_jid", jid)
"""
JID (Jabber ID) task manager.
@ -149,7 +149,6 @@ async def task_jid(self, jid):
"""
db_file = get_pathname_to_database(jid)
enabled = await get_settings_value(db_file, "enabled")
# print(await current_time(), "enabled", enabled, jid)
if enabled:
# NOTE Perhaps we want to utilize super with keyword
# arguments in order to know what tasks to initiate.
@ -187,8 +186,6 @@ async def task_jid(self, jid):
async def send_update(self, jid, num=None):
print("send_update", jid)
# print(await current_time(), jid, "def send_update")
"""
Send news items as messages.
@ -199,8 +196,7 @@ async def send_update(self, jid, num=None):
num : str, optional
Number. The default is None.
"""
# print("Starting send_update()")
# print(jid)
logging.debug("Sending a news update to JID {}".format(jid))
db_file = get_pathname_to_database(jid)
enabled = await get_settings_value(db_file, "enabled")
if enabled:
@ -260,8 +256,6 @@ async def send_update(self, jid, num=None):
async def send_status(self, jid):
# print("send_status", jid)
# print(await current_time(), jid, "def send_status")
"""
Send status message.
@ -270,7 +264,7 @@ async def send_status(self, jid):
jid : str
Jabber ID.
"""
# print(await current_time(), "> SEND STATUS",jid)
logging.debug("Sending a status message to JID {}".format(jid))
status_text="🤖️ Slixfeed RSS News Bot"
db_file = get_pathname_to_database(jid)
enabled = await get_settings_value(db_file, "enabled")
@ -323,7 +317,6 @@ async def send_status(self, jid):
async def refresh_task(self, jid, callback, key, val=None):
# print("refresh_task", jid, key)
"""
Apply new setting at runtime.
@ -336,6 +329,7 @@ async def refresh_task(self, jid, callback, key, val=None):
val : str, optional
Value. The default is None.
"""
logging.debug("Refreshing task {} for JID {}".format(callback, jid))
if not val:
db_file = get_pathname_to_database(jid)
val = await get_settings_value(db_file, key)
@ -344,7 +338,9 @@ async def refresh_task(self, jid, callback, key, val=None):
try:
task_manager[jid][key].cancel()
except:
print("No task of type", key, "to cancel for JID", jid)
logging.debug(
"No task of type {} to cancel for "
"JID {} (clean_tasks)").format(key, jid)
# task_manager[jid][key] = loop.call_at(
# loop.time() + 60 * float(val),
# loop.create_task,
@ -374,8 +370,6 @@ async def wait_and_run(self, callback, jid, val):
# TODO Take this function out of
# <class 'slixmpp.clientxmpp.ClientXMPP'>
async def check_updates(jid):
# print("check_updates", jid)
# print(await current_time(), jid, "def check_updates")
"""
Start calling for update check up.
@ -384,8 +378,8 @@ async def check_updates(jid):
jid : str
Jabber ID.
"""
logging.debug("Scanning for updates for JID {}".format(jid))
while True:
# print(await current_time(), "> CHCK UPDATE",jid)
db_file = get_pathname_to_database(jid)
await download_updates(db_file)
val = get_value_default("settings", "Settings", "check")
@ -399,8 +393,8 @@ async def check_updates(jid):
async def start_tasks(self, presence):
# print("def presence_available", presence["from"].bare)
jid = presence["from"].bare
logging.debug("Beginning tasks for JID {}".format(jid))
if jid not in self.boundjid.bare:
await clean_tasks_xmpp(
jid, ["interval", "status", "check"])
@ -414,7 +408,7 @@ async def start_tasks(self, presence):
async def stop_tasks(self, presence):
if not self.boundjid.bare:
jid = presence["from"].bare
print(">>> unavailable:", jid)
logging.debug("Stopping tasks for JID {}".format(jid))
await clean_tasks_xmpp(
jid, ["interval", "status", "check"])
@ -440,7 +434,7 @@ async def check_readiness(self, presence):
jid = presence["from"].bare
if presence["show"] in ("away", "dnd", "xa"):
print(">>> away, dnd, xa:", jid)
logging.debug("Stopping updates for JID {}".format(jid))
await clean_tasks_xmpp(
jid, ["interval"])
await start_tasks_xmpp(

View file

@ -1,14 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
TODO
1) Function to list bookmarks
"""
from slixmpp.plugins.xep_0048.stanza import Bookmarks

View file

@ -17,7 +17,10 @@ async def recover_connection(self, event, message):
# print(current_time(),"Maximum connection attempts exceeded.")
# logging.error("Maximum connection attempts exceeded.")
print(current_time(), "Attempt number", self.connection_attempts)
seconds = int(get_value("accounts", "XMPP Connect", "reconnect_timeout"))
seconds = get_value("accounts", "XMPP Connect", "reconnect_timeout")
if not seconds:
seconds = 30
seconds = int(seconds)
print(current_time(), "Next attempt within", seconds, "seconds")
# NOTE asyncio.sleep doesn't interval as expected
# await asyncio.sleep(seconds)

View file

@ -12,7 +12,7 @@ TODO
3) If groupchat error is received, send that error message to inviter.
"""
import logging
import slixfeed.xmpp.bookmark as bookmark
import slixfeed.xmpp.process as process
from slixfeed.datetime import current_time
@ -38,7 +38,7 @@ async def autojoin(self, event):
for conference in conferences:
if conference["autojoin"]:
muc_jid = conference["jid"]
print(current_time(), "Autojoining groupchat", muc_jid)
logging.debug("Autojoin groupchat", muc_jid)
self.plugin['xep_0045'].join_muc(
muc_jid,
self.nick,

View file

@ -25,8 +25,9 @@ from slixfeed.config import (
get_value,
get_pathname_to_database,
remove_from_list)
from slixfeed.datetime import current_time
from slixfeed.datetime import current_time, timestamp
import slixfeed.fetch as fetcher
import slixfeed.opml as opml
import slixfeed.sqlite as sqlite
import slixfeed.task as task
import slixfeed.utility as utility
@ -36,6 +37,7 @@ import slixfeed.xmpp.compose as compose
import slixfeed.xmpp.muc as groupchat
import slixfeed.xmpp.status as status
import slixfeed.xmpp.text as text
import slixfeed.xmpp.upload as upload
async def event(self, event):
@ -76,8 +78,6 @@ async def message(self, message):
for stanza objects and the Message stanza to see
how it may be used.
"""
# print("message")
# print(message)
if message["type"] in ("chat", "groupchat", "normal"):
jid = message["from"].bare
if message["type"] == "groupchat":
@ -325,12 +325,46 @@ async def message(self, message):
else:
response = "Missing keywords."
send_reply_message(self, message, response)
case _ if message_lowercase.startswith("export "):
valid = 1
key = message_text[7:]
data_dir = get_default_dbdir()
if not os.path.isdir(data_dir):
os.mkdir(data_dir)
filename = os.path.join(
data_dir, "opml", "slixfeed_" + timestamp() + "." + key)
db_file = get_pathname_to_database(jid)
results = await sqlite.get_feeds(db_file)
match key:
case "opml":
status_type = "dnd"
status_message = (
"📂️ Procesing request to export feeds into OPML ...")
send_status_message(self, jid, status_type, status_message)
await opml.export_to_file(
jid, filename, results)
url = await upload.start(self, jid, filename)
response = (
"Feeds exported successfully to an OPML "
"Outline Syndication.\n{}").format(url)
await task.start_tasks_xmpp(self, jid, ["status"])
case "html":
response = "Not yet implemented."
case "markdown":
response = "Not yet implemented"
case _:
response = "Unsupported filetype."
valid = 0
if valid:
# send_oob_reply_message(message, url, response)
send_oob_message(self, jid, url)
send_reply_message(self, message, response)
case _ if (message_lowercase.startswith("gemini") or
message_lowercase.startswith("gopher:")):
message_lowercase.startswith("gopher")):
response = "Gemini and Gopher are not supported yet."
send_reply_message(self, message, response)
case _ if (message_lowercase.startswith("http") or
message_lowercase.startswith("feed:")):
message_lowercase.startswith("feed")):
url = message_text
await task.clean_tasks_xmpp(jid, ["status"])
status_type = "dnd"
@ -713,14 +747,14 @@ async def message(self, message):
# if response: message.reply(response).send()
if not response: response = "EMPTY MESSAGE - ACTION ONLY"
log_dir = get_default_dbdir()
if not os.path.isdir(log_dir):
os.mkdir(log_dir)
data_dir = get_default_dbdir()
if not os.path.isdir(data_dir):
os.mkdir(data_dir)
utility.log_as_markdown(
current_time(), os.path.join(log_dir, jid),
current_time(), os.path.join(data_dir, "logs", jid),
jid, message_text)
utility.log_as_markdown(
current_time(), os.path.join(log_dir, jid),
current_time(), os.path.join(data_dir, "logs", jid),
self.boundjid.bare, response)
@ -735,6 +769,21 @@ def send_reply_message(self, message, response):
message.reply(response).send()
def send_oob_reply_message(message, url, response):
reply = message.reply(response)
reply['oob']['url'] = url
reply.send()
def send_oob_message(self, jid, url):
html = (
f'<body xmlns="http://www.w3.org/1999/xhtml">'
f'<a href="{url}">{url}</a></body>')
message = self.make_message(mto=jid, mbody=url, mhtml=html)
message['oob']['url'] = url
message.send()
# def greet(self, jid, chat_type="chat"):
# messages = [
# "Greetings!",

35
slixfeed/xmpp/upload.py Normal file
View file

@ -0,0 +1,35 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Based on http_upload.py example from project slixmpp
https://codeberg.org/poezio/slixmpp/src/branch/master/examples/http_upload.py
"""
import logging
from slixmpp.exceptions import IqTimeout
# import sys
async def start(self, jid, filename, domain=None):
logging.info('Uploading file %s...', filename)
try:
upload_file = self['xep_0363'].upload_file
# if self.encrypted and not self['xep_0454']:
# print(
# 'The xep_0454 module isn\'t available. '
# 'Ensure you have \'cryptography\' '
# 'from extras_require installed.',
# file=sys.stderr,
# )
# return
# elif self.encrypted:
# upload_file = self['xep_0454'].upload_file
url = await upload_file(
filename, domain, timeout=10,
)
except IqTimeout:
raise TimeoutError('Could not send message in time')
logging.info('Upload success!')
logging.info('Sending file to %s', jid)
return url