Modularize code. Move OPML related functions to a new module opml.py.

This commit is contained in:
Schimon Jehudah, Adv. 2024-06-11 05:21:07 +03:00
parent 0a26ac163b
commit def8b5f120
7 changed files with 112 additions and 154 deletions

View file

@ -39,7 +39,9 @@ from slixfeed.config import Config
import slixfeed.crawl as crawl
import slixfeed.dt as dt
import slixfeed.fetch as fetch
from slixfeed.opml import Opml
import slixfeed.sqlite as sqlite
import slixfeed.task as task
import slixfeed.url as uri
from slixfeed.url import (
complete_url,
@ -48,7 +50,6 @@ from slixfeed.url import (
replace_hostname,
trim_url
)
import slixfeed.task as task
from slixfeed.xmpp.bookmark import XmppBookmark
from slixfeed.xmpp.muc import XmppGroupchat
from slixfeed.xmpp.iq import XmppIQ
@ -89,7 +90,7 @@ def export_feeds(self, jid_bare, ext):
case 'md':
export_to_markdown(jid_bare, filename, results)
case 'opml':
export_to_opml(jid_bare, filename, results)
Opml.export_to_file(jid_bare, filename, results)
# case 'xbel':
# response = 'Not yet implemented.'
return filename
@ -210,10 +211,12 @@ async def xmpp_pubsub_send_selected_entry(self, jid_bare, jid_file, node_id, ent
else:
feed_id = sqlite.get_feed_id_by_entry_index(db_file, entry_id)
feed_id = feed_id[0]
feed_properties = sqlite.get_feed_properties(db_file, feed_id)
node_id = feed_properties[2]
node_title = feed_properties[3]
node_subtitle = feed_properties[5]
node_id, node_title, node_subtitle = sqlite.get_feed_properties(db_file, feed_id)
print('THIS IS A TEST')
print(node_id)
print(node_title)
print(node_subtitle)
print('THIS IS A TEST')
xep = None
iq_create_node = XmppPubsub.create_node(
self, jid_bare, node_id, xep, node_title, node_subtitle)
@ -810,68 +813,6 @@ async def list_unread_entries(self, result, feed_title, jid):
return news_item
def list_search_results(query, results):
function_name = sys._getframe().f_code.co_name
logger.debug('{}: query: {}'
.format(function_name, query))
message = ("Search results for '{}':\n\n```"
.format(query))
for result in results:
message += ("\n{}\n{}\n"
.format(str(result[0]), str(result[1])))
if len(results):
message += "```\nTotal of {} results".format(len(results))
else:
message = "No results were found for: {}".format(query)
return message
async def list_options(self, jid_bare):
"""
Print options.
Parameters
----------
jid_bare : str
Jabber ID.
Returns
-------
msg : str
Options as message.
"""
function_name = sys._getframe().f_code.co_name
logger.debug('{}: jid: {}'
.format(function_name, jid_bare))
# msg = """You have {} unread news items out of {} from {} news sources.
# """.format(unread_entries, entries, feeds)
# try:
# value = cur.execute(sql, par).fetchone()[0]
# except:
# print("Error for key:", key)
# value = "Default"
# values.extend([value])
value_archive = Config.get_setting_value(self.settings, jid_bare, 'archive')
value_interval = Config.get_setting_value(self.settings, jid_bare, 'interval')
value_quantum = Config.get_setting_value(self.settings, jid_bare, 'quantum')
value_enabled = Config.get_setting_value(self.settings, jid_bare, 'enabled')
message = ("Options:"
"\n"
"```"
"\n"
"Items to archive : {}\n"
"Update interval : {}\n"
"Items per update : {}\n"
"Operation status : {}\n"
"```").format(value_archive, value_interval, value_quantum,
value_enabled)
return message
def pick_a_feed(lang=None):
function_name = sys._getframe().f_code.co_name
logger.debug('{}: lang: {}'
@ -884,20 +825,6 @@ def pick_a_feed(lang=None):
return url
def list_bookmarks(self, conferences):
function_name = sys._getframe().f_code.co_name
logger.debug('{}'.format(function_name))
message = '\nList of groupchats:\n\n```\n'
for conference in conferences:
message += ('Name: {}\n'
'Room: {}\n'
'\n'
.format(conference['name'], conference['jid']))
message += ('```\nTotal of {} groupchats.\n'
.format(len(conferences)))
return message
def export_to_markdown(jid, filename, results):
function_name = sys._getframe().f_code.co_name
logger.debug('{}: jid: {} filename: {}'
@ -912,60 +839,6 @@ def export_to_markdown(jid, filename, results):
.format(dt.current_date(), jid))
# TODO Consider adding element jid as a pointer of import
def export_to_opml(jid, filename, results):
# print(jid, filename, results)
function_name = sys._getframe().f_code.co_name
logger.debug('{} jid: {} filename: {}'
.format(function_name, jid, filename))
root = ETR.Element("opml")
root.set("version", "1.0")
head = ETR.SubElement(root, "head")
ETR.SubElement(head, "title").text = "{}".format(jid)
ETR.SubElement(head, "description").text = (
"Set of subscriptions exported by Slixfeed")
ETR.SubElement(head, "generator").text = "Slixfeed"
ETR.SubElement(head, "urlPublic").text = (
"https://gitgud.io/sjehuda/slixfeed")
time_stamp = dt.current_time()
ETR.SubElement(head, "dateCreated").text = time_stamp
ETR.SubElement(head, "dateModified").text = time_stamp
body = ETR.SubElement(root, "body")
for result in results:
outline = ETR.SubElement(body, "outline")
outline.set("text", result[1])
outline.set("xmlUrl", result[2])
# outline.set("type", result[2])
tree = ETR.ElementTree(root)
tree.write(filename)
async def import_opml(db_file, result):
function_name = sys._getframe().f_code.co_name
logger.debug('{}: db_file: {}'
.format(function_name, db_file))
if not result['error']:
document = result['content']
root = ETR.fromstring(document)
before = sqlite.get_number_of_items(db_file, 'feeds_properties')
feeds = []
for child in root.findall(".//outline"):
url = child.get("xmlUrl")
title = child.get("text")
# feed = (url, title)
# feeds.extend([feed])
feed = {
'title' : title,
'url' : url,
}
feeds.extend([feed])
await sqlite.import_feeds(db_file, feeds)
await sqlite.add_metadata(db_file)
after = sqlite.get_number_of_items(db_file, 'feeds_properties')
difference = int(after) - int(before)
return difference
async def add_feed(self, jid_bare, db_file, url, identifier):
function_name = sys._getframe().f_code.co_name
logger.debug('{}: db_file: {} url: {}'

View file

@ -153,7 +153,7 @@ async def probe_page(url, document=None):
result = None
except Exception as e:
logger.error(str(e))
logger.warning("Failed to parse URL as feed for {}.".format(url))
logger.warn("Failed to parse URL as feed for {}.".format(url))
result = {'link' : None,
'index' : None,
'name' : None,

65
slixfeed/opml.py Normal file
View file

@ -0,0 +1,65 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from slixfeed.log import Logger
import slixfeed.dt as dt
import slixfeed.sqlite as sqlite
import sys
import xml.etree.ElementTree as ETR
logger = Logger(__name__)
class Opml:
# TODO Consider adding element jid as a pointer of import
def export_to_file(jid, filename, results):
# print(jid, filename, results)
function_name = sys._getframe().f_code.co_name
logger.debug('{} jid: {} filename: {}'
.format(function_name, jid, filename))
root = ETR.Element("opml")
root.set("version", "1.0")
head = ETR.SubElement(root, "head")
ETR.SubElement(head, "title").text = "{}".format(jid)
ETR.SubElement(head, "description").text = (
"Set of subscriptions exported by Slixfeed")
ETR.SubElement(head, "generator").text = "Slixfeed"
ETR.SubElement(head, "urlPublic").text = (
"https://gitgud.io/sjehuda/slixfeed")
time_stamp = dt.current_time()
ETR.SubElement(head, "dateCreated").text = time_stamp
ETR.SubElement(head, "dateModified").text = time_stamp
body = ETR.SubElement(root, "body")
for result in results:
outline = ETR.SubElement(body, "outline")
outline.set("text", result[1])
outline.set("xmlUrl", result[2])
# outline.set("type", result[2])
tree = ETR.ElementTree(root)
tree.write(filename)
async def import_from_file(db_file, result):
function_name = sys._getframe().f_code.co_name
logger.debug('{}: db_file: {}'
.format(function_name, db_file))
if not result['error']:
document = result['content']
root = ETR.fromstring(document)
before = sqlite.get_number_of_items(db_file, 'feeds_properties')
feeds = []
for child in root.findall(".//outline"):
url = child.get("xmlUrl")
title = child.get("text")
# feed = (url, title)
# feeds.extend([feed])
feed = {
'title' : title,
'url' : url,
}
feeds.extend([feed])
await sqlite.import_feeds(db_file, feeds)
await sqlite.add_metadata(db_file)
after = sqlite.get_number_of_items(db_file, 'feeds_properties')
difference = int(after) - int(before)
return difference

View file

@ -920,7 +920,7 @@ def get_feed_properties(db_file, feed_id):
cur = conn.cursor()
sql = (
"""
SELECT *
SELECT identifier, title, subtitle
FROM feeds_properties
WHERE id = :feed_id
"""
@ -1583,7 +1583,7 @@ async def mark_entry_as_read(cur, ix):
cur.execute(sql, par)
def get_status_information_of_feed(db_file, feed_id):
def get_last_update_time_of_feed(db_file, feed_id):
"""
Get status information of given feed.
@ -1601,7 +1601,7 @@ def get_status_information_of_feed(db_file, feed_id):
cur = conn.cursor()
sql = (
"""
SELECT *
SELECT renewed, scanned
FROM feeds_state
WHERE feed_id = ?
"""

View file

@ -1,2 +1,2 @@
__version__ = '0.1.74'
__version_info__ = (0, 1, 74)
__version__ = '0.1.75'
__version_info__ = (0, 1, 75)

View file

@ -53,6 +53,7 @@ import slixfeed.crawl as crawl
import slixfeed.dt as dt
import slixfeed.fetch as fetch
from slixfeed.log import Logger
from slixfeed.opml import Opml
import slixfeed.sqlite as sqlite
import slixfeed.url as uri
from slixfeed.version import __version__
@ -344,9 +345,11 @@ class XmppClient(slixmpp.ClientXMPP):
# XmppCommand.adhoc_commands(self)
# self.service_reactions()
task.task_ping(self)
# NOTE This might take more memory due to
# function sqlite.get_unread_entries_of_feed
results = await XmppPubsub.get_pubsub_services(self)
for result in results + [{'jid' : self.boundjid.bare,
'name' : self.alias}]:
'name' : self.alias}]:
jid_bare = result['jid']
if jid_bare not in self.settings:
db_file = config.get_pathname_to_database(jid_bare)
@ -1894,9 +1897,10 @@ class XmppClient(slixmpp.ClientXMPP):
url = result['link']
feed_id = str(result['index'])
entries = sqlite.get_entries_of_feed(db_file, feed_id)
last_renewed = sqlite.get_status_information_of_feed(db_file,
renewed, scanned = sqlite.get_last_update_time_of_feed(db_file,
feed_id)
last_renewed = str(last_renewed[5])
last_updated = renewed or scanned
last_updated = str(last_updated)
options = form.add_field(desc='Recent titles from subscription',
ftype='list-multi',
label='Preview')
@ -1906,7 +1910,7 @@ class XmppClient(slixmpp.ClientXMPP):
label='Information')
form.add_field(ftype='text-single',
label='Renewed',
value=last_renewed)
value=last_updated)
form.add_field(ftype='text-single',
label='ID #',
value=feed_id)
@ -1930,9 +1934,10 @@ class XmppClient(slixmpp.ClientXMPP):
url = result['link']
feed_id = str(result['index'])
entries = sqlite.get_entries_of_feed(db_file, feed_id)
last_updated = sqlite.get_status_information_of_feed(db_file,
renewed, scanned = sqlite.get_last_update_time_of_feed(db_file,
feed_id)
last_updated = str(last_updated[3])
last_updated = renewed or scanned
last_updated = str(last_updated)
options = form.add_field(desc='Recent titles from subscription',
ftype='list-multi',
label='Preview')
@ -2857,7 +2862,7 @@ class XmppClient(slixmpp.ClientXMPP):
jid_bare = jid[0] if isinstance(jid, list) else jid
db_file = config.get_pathname_to_database(jid_bare)
result = await fetch.http(url)
count = await action.import_opml(db_file, result)
count = await Opml.import_from_file(db_file, result)
try:
int(count)
# form = self['xep_0004'].make_form('result', 'Done')

View file

@ -10,6 +10,7 @@ import slixfeed.crawl as crawl
from slixfeed.config import Config
import slixfeed.dt as dt
import slixfeed.fetch as fetch
from slixfeed.opml import Opml
import slixfeed.sqlite as sqlite
import slixfeed.task as task
import slixfeed.url as uri
@ -292,7 +293,13 @@ class XmppCommands:
async def print_bookmarks(self):
conferences = await XmppBookmark.get_bookmarks(self)
message = action.list_bookmarks(self, conferences)
message = '\nList of groupchats:\n\n```\n'
for conference in conferences:
message += ('Name: {}\n'
'Room: {}\n'
'\n'
.format(conference['name'], conference['jid']))
message += ('```\nTotal of {} groupchats.\n'.format(len(conferences)))
return message
@ -339,7 +346,7 @@ class XmppCommands:
async def import_opml(self, db_file, jid_bare, command):
url = command
result = await fetch.http(url)
count = await action.import_opml(db_file, result)
count = await Opml.import_from_file(db_file, result)
if count:
message = ('Successfully imported {} feeds.'
.format(count))
@ -943,11 +950,19 @@ class XmppCommands:
async def search_items(self, db_file, query):
if query:
if len(query) > 1:
if len(query) > 3:
results = sqlite.search_entries(db_file, query)
message = action.list_search_results(query, results)
message = ("Search results for '{}':\n\n```"
.format(query))
for result in results:
message += ("\n{}\n{}\n"
.format(str(result[0]), str(result[1])))
if len(results):
message += "```\nTotal of {} results".format(len(results))
else:
message = "No results were found for: {}".format(query)
else:
message = 'Enter at least 2 characters to search'
message = 'Enter at least 4 characters to search'
else:
message = ('No action has been taken.'
'\n'