From 0ca37dfdee1ca54af37f0ff8462998190c99a7be Mon Sep 17 00:00:00 2001 From: Schimon Jehudah Date: Wed, 10 Jan 2024 20:06:56 +0000 Subject: [PATCH] Improve SQLite performance. Handle missing packages errors. --- slixfeed/action.py | 95 ++++++++++---- slixfeed/sqlite.py | 265 +++++++++++++++++++++++++++++---------- slixfeed/task.py | 7 +- slixfeed/xmpp/process.py | 82 +++++++----- 4 files changed, 327 insertions(+), 122 deletions(-) diff --git a/slixfeed/action.py b/slixfeed/action.py index f15ff68..5615681 100644 --- a/slixfeed/action.py +++ b/slixfeed/action.py @@ -5,19 +5,25 @@ TODO -1) Call sqlite function from function statistics. +1) Function scan at "for entry in entries" + Suppress directly calling function "add_entry" (accept db_file) + Pass a list of valid entries to a new function "add_entries" + (accept db_file) which would call function "add_entry" (accept cur). + * accelerate adding of large set of entries at once. + * prevent (or mitigate halt of consequent actions). + * reduce I/O. + +2) Call sqlite function from function statistics. Returning a list of values doesn't' seem to be a good practice. """ from asyncio.exceptions import IncompleteReadError from bs4 import BeautifulSoup -import html2text from http.client import IncompleteRead from feedparser import parse import logging from lxml import html -import pdfkit from readability import Document import slixfeed.config as config import slixfeed.crawl as crawl @@ -40,6 +46,20 @@ from urllib import error from urllib.parse import urlsplit import xml.etree.ElementTree as ET +try: + import html2text +except: + logging.info( + "Package html2text was not found.\n" + "Markdown support is disabled.") + +try: + import pdfkit +except: + logging.info( + "Package pdfkit was not found.\n" + "PDF support is disabled.") + def log_to_markdown(timestamp, filename, jid, message): """ @@ -302,13 +322,14 @@ def export_to_markdown(jid, filename, results): current_date(), jid)) +# TODO Consider adding element jid as a pointer of import def export_to_opml(jid, filename, results): root = ET.Element("opml") root.set("version", "1.0") head = ET.SubElement(root, "head") - ET.SubElement(head, "title").text = "Subscriptions for {}".format(jid) + ET.SubElement(head, "title").text = "{}".format(jid) ET.SubElement(head, "description").text = ( - "Set of feeds exported with Slixfeed") + "Set of subscriptions exported by Slixfeed") ET.SubElement(head, "generator").text = "Slixfeed" ET.SubElement(head, "urlPublic").text = ( "https://gitgud.io/sjehuda/slixfeed") @@ -339,8 +360,8 @@ async def import_opml(db_file, url): # feed = (url, title) # feeds.extend([feed]) feeds.extend([(url, title)]) - await sqlite.import_feeds( - db_file, feeds) + await sqlite.import_feeds(db_file, feeds) + await sqlite.add_metadata(db_file) after = await sqlite.get_number_of_items( db_file, 'feeds') difference = int(after) - int(before) @@ -581,6 +602,7 @@ async def scan(db_file, url): status = result[1] except: return + new_entries = [] if document and status == 200: feed = parse(document) entries = feed.entries @@ -642,10 +664,10 @@ async def scan(db_file, url): summary = entry.summary if entry.has_key("summary") else '' read_status = 0 pathname = urlsplit(link).path - string = ("{} {} {}" - ).format( - title, summary, pathname - ) + string = ( + "{} {} {}" + ).format( + title, summary, pathname) allow_list = await config.is_include_keyword( db_file, "filter-allow", string) if not allow_list: @@ -654,24 +676,42 @@ async def scan(db_file, url): if reject_list: read_status = 1 logging.debug( - "Rejected due to keyword {}".format(reject_list)) + "Rejected : {}\n" + "Keyword : {}".format( + link, reject_list)) if isinstance(date, int): logging.error( "Variable 'date' is int: {}".format(date)) - await sqlite.add_entry( - db_file, title, link, entry_id, - url, date, read_status) - await sqlite.set_date(db_file, url) + entry = { + "title": title, + "link": link, + "entry_id": entry_id, + "url": url, + "date": date, + "read_status": read_status + } + new_entries.extend([entry]) + # await sqlite.add_entry( + # db_file, title, link, entry_id, + # url, date, read_status) + # await sqlite.set_date(db_file, url) + if len(new_entries): + await sqlite.add_entries_and_update_timestamp( + db_file, new_entries) + async def get_content(url): result = await fetch.download_feed(url) - if result[0]: + data = result[0] + code = result[1] + if data: document = Document(result[0]) content = document.summary() + info = [code, content] else: - content = None - return content + info = [code, None] + return info # TODO Either adapt it to filename # or change it to something else #filename = document.title() @@ -691,21 +731,22 @@ def extract_first_image(url, content): image_url = None return image_url + def generate_html(text, filename): - with open(filename, 'w') as file: - file.write(text) + with open(filename, 'w') as file: + file.write(text) def generate_pdf(text, filename): - pdfkit.from_string(text, filename) + pdfkit.from_string(text, filename) def generate_markdown(text, filename): - h2m = html2text.HTML2Text() - # Convert HTML to Markdown - markdown = h2m.handle(text) - with open(filename, 'w') as file: - file.write(markdown) + h2m = html2text.HTML2Text() + # Convert HTML to Markdown + markdown = h2m.handle(text) + with open(filename, 'w') as file: + file.write(markdown) # NOTE Why (if res[0]) and (if res[1] == 200)? diff --git a/slixfeed/sqlite.py b/slixfeed/sqlite.py index d571baa..628205e 100644 --- a/slixfeed/sqlite.py +++ b/slixfeed/sqlite.py @@ -5,14 +5,11 @@ TODO -1) Table feeds: - category - type (atom, rdf, rss0.9. rss2 etc.) - -2) Function mark_all_read for entries of given feed - -3) Statistics +1) Function to open connection (receive db_file). + Function to close connection. + All other functions to receive cursor. +2) Merge function add_metadata into function import_feeds. """ from asyncio import Lock @@ -89,7 +86,7 @@ def create_tables(db_file): """ CREATE TABLE IF NOT EXISTS properties ( id INTEGER NOT NULL, - feed_id INTEGER NOT NULL, + feed_id INTEGER NOT NULL UNIQUE, type TEXT, encoding TEXT, language TEXT, @@ -105,7 +102,7 @@ def create_tables(db_file): """ CREATE TABLE IF NOT EXISTS status ( id INTEGER NOT NULL, - feed_id INTEGER NOT NULL, + feed_id INTEGER NOT NULL UNIQUE, enabled INTEGER NOT NULL DEFAULT 1, updated TEXT, scanned TEXT, @@ -113,6 +110,7 @@ def create_tables(db_file): status_code INTEGER, valid INTEGER, filter INTEGER NOT NULL DEFAULT 1, + priority INTEGER, FOREIGN KEY ("feed_id") REFERENCES "feeds" ("id") ON UPDATE CASCADE ON DELETE CASCADE, @@ -260,6 +258,84 @@ async def import_feeds(db_file, feeds): logging.error(e) +async def add_metadata(db_file): + """ + Insert a new feed into the feeds table. + + Parameters + ---------- + db_file : str + Path to database file. + """ + async with DBLOCK: + with create_connection(db_file) as conn: + cur = conn.cursor() + sql = ( + """ + SELECT id + FROM feeds + ORDER BY id ASC + """ + ) + ixs = cur.execute(sql).fetchall() + for ix in ixs: + feed_id = ix[0] + insert_feed_status(cur, feed_id) + insert_feed_properties(cur, feed_id) + + +def insert_feed_status(cur, feed_id): + """ + Set feed status. + + Parameters + ---------- + cur : object + Cursor object. + """ + sql = ( + """ + INSERT + INTO status( + feed_id) + VALUES( + ?) + """ + ) + try: + cur.execute(sql, (feed_id,)) + except IntegrityError as e: + logging.warning( + "Skipping feed_id {} for table status".format(feed_id)) + logging.error(e) + + +def insert_feed_properties(cur, feed_id): + """ + Set feed properties. + + Parameters + ---------- + cur : object + Cursor object. + """ + sql = ( + """ + INSERT + INTO properties( + feed_id) + VALUES( + ?) + """ + ) + try: + cur.execute(sql, (feed_id,)) + except IntegrityError as e: + logging.warning( + "Skipping feed_id {} for table properties".format(feed_id)) + logging.error(e) + + async def insert_feed( db_file, url, title=None, entries=None, version=None, encoding=None, language=None, status_code=None, updated=None): @@ -339,6 +415,61 @@ async def insert_feed( cur.execute(sql, properties) +async def insert_feed_( + db_file, url, title=None, entries=None, version=None, + encoding=None, language=None, status_code=None, updated=None): + """ + Insert a new feed into the feeds table. + + Parameters + ---------- + db_file : str + Path to database file. + url : str + URL. + title : str, optional + Feed title. The default is None. + entries : int, optional + Number of entries. The default is None. + version : str, optional + Type of feed. The default is None. + encoding : str, optional + Encoding of feed. The default is None. + language : str, optional + Language code of feed. The default is None. + status : str, optional + HTTP status code. The default is None. + updated : ???, optional + Date feed was last updated. The default is None. + status : str, optional + HTTP status code. The default is None. + updated : ???, optional + Date feed was last updated. The default is None. + """ + async with DBLOCK: + with create_connection(db_file) as conn: + cur = conn.cursor() + feed = ( + title, url + ) + sql = ( + """ + INSERT + INTO feeds( + name, url) + VALUES( + ?, ?) + """ + ) + cur.execute(sql, feed) + feed_id = get_feed_id(cur, url) + insert_feed_properties( + cur, feed_id, entries=None, + version=None, encoding=None, language=None) + insert_feed_status( + cur, feed_id, status_code=None, updated=None) + + async def remove_feed_by_url(db_file, url): """ Delete a feed by feed URL. @@ -560,7 +691,7 @@ async def get_unread_entries(db_file, num): return results -async def get_feed_id(cur, url): +def get_feed_id(cur, url): """ Get index of given feed. @@ -896,9 +1027,9 @@ async def set_enabled_status(db_file, ix, status): cur = conn.cursor() sql = ( """ - UPDATE feeds + UPDATE status SET enabled = :status - WHERE id = :id + WHERE feed_id = :id """ ) cur.execute(sql, { @@ -943,14 +1074,7 @@ async def add_entry( async with DBLOCK: with create_connection(db_file) as conn: cur = conn.cursor() - sql = ( - """ - SELECT id - FROM feeds - WHERE url = :url - """ - ) - feed_id = cur.execute(sql, (url,)).fetchone()[0] + feed_id = get_feed_id(cur, url) sql = ( """ INSERT @@ -985,6 +1109,59 @@ async def add_entry( # # breakpoint() +async def add_entries_and_update_timestamp(db_file, new_entries): + """ + Add new entries. + + Parameters + ---------- + db_file : str + Path to database file. + new_entries : list + Set of entries as dict. + """ + async with DBLOCK: + with create_connection(db_file) as conn: + cur = conn.cursor() + feeds = [] + for entry in new_entries: + url = entry["url"] + feed_id = get_feed_id(cur, url) + sql = ( + """ + INSERT + INTO entries( + title, link, entry_id, feed_id, timestamp, read) + VALUES( + :title, :link, :entry_id, :feed_id, :timestamp, :read) + """ + ) + cur.execute(sql, { + "title": entry["title"], + "link": entry["link"], + "entry_id": entry["entry_id"], + "feed_id": feed_id, + "timestamp": entry["date"], + "read": entry["read_status"] + }) + if url not in feeds: + feeds.extend([url]) + for feed in feeds: + url = feed + feed_id = get_feed_id(cur, url) + sql = ( + """ + UPDATE status + SET renewed = :today + WHERE feed_id = :feed_id + """ + ) + cur.execute(sql, { + "today": date.today(), + "feed_id": feed_id + }) + + async def set_date(db_file, url): """ Set renewed date of given feed. @@ -999,14 +1176,7 @@ async def set_date(db_file, url): async with DBLOCK: with create_connection(db_file) as conn: cur = conn.cursor() - sql = ( - """ - SELECT id - FROM feeds - WHERE url = :url - """ - ) - feed_id = cur.execute(sql, (url,)).fetchone()[0] + feed_id = get_feed_id(cur, url) sql = ( """ UPDATE status @@ -1037,17 +1207,7 @@ async def update_feed_status(db_file, url, status_code): async with DBLOCK: with create_connection(db_file) as conn: cur = conn.cursor() - sql = ( - """ - SELECT id - FROM feeds - WHERE url = :url - """ - ) - # try: - feed_id = cur.execute(sql, (url,)).fetchone()[0] - # except: - # breakpoint() + feed_id = get_feed_id(cur, url) sql = ( """ UPDATE status @@ -1078,14 +1238,7 @@ async def update_feed_validity(db_file, url, valid): async with DBLOCK: with create_connection(db_file) as conn: cur = conn.cursor() - sql = ( - """ - SELECT id - FROM feeds - WHERE url = :url - """ - ) - feed_id = cur.execute(sql, (url,)).fetchone()[0] + feed_id = get_feed_id(cur, url) sql = ( """ UPDATE status @@ -1117,14 +1270,7 @@ async def update_feed_properties(db_file, url, entries, updated): async with DBLOCK: with create_connection(db_file) as conn: cur = conn.cursor() - sql = ( - """ - SELECT id - FROM feeds - WHERE url = :url - """ - ) - feed_id = cur.execute(sql, (url,)).fetchone()[0] + feed_id = get_feed_id(cur, url) sql = ( """ UPDATE properties @@ -1455,14 +1601,7 @@ async def check_entry_exist( cur = get_cursor(db_file) exist = False if entry_id: - sql = ( - """ - SELECT id - FROM feeds - WHERE url = :url - """ - ) - feed_id = cur.execute(sql, (url,)).fetchone()[0] + feed_id = get_feed_id(cur, url) sql = ( """ SELECT id diff --git a/slixfeed/task.py b/slixfeed/task.py index dabfd42..94d073a 100644 --- a/slixfeed/task.py +++ b/slixfeed/task.py @@ -242,8 +242,11 @@ async def send_update(self, jid, num=None): # breakpoint() await mark_as_read(db_file, result[0]) if not image_url: - content = await action.get_content(url) - image_url = action.extract_first_image(url, content) + info = await action.get_content(url) + content = info[1] + status = info[0] + if status == 200: + image_url = action.extract_first_image(url, content) new = " ".join(news_digest) # breakpoint() if new: diff --git a/slixfeed/xmpp/process.py b/slixfeed/xmpp/process.py index e42af54..dcf07ac 100644 --- a/slixfeed/xmpp/process.py +++ b/slixfeed/xmpp/process.py @@ -98,7 +98,7 @@ async def message(self, message): count = await action.import_opml(db_file, url) if count: response = ( - "Successfully imported {} feeds" + "Successfully imported {} feeds." ).format(count) else: response = ( @@ -109,6 +109,7 @@ async def message(self, message): await task.start_tasks_xmpp( self, jid, ["status"]) send_reply_message(self, message, response) + return if message["type"] == "groupchat": @@ -399,19 +400,19 @@ async def message(self, message): status_type = "dnd" status_message = ( "📤️ Procesing request to export feeds into {} ..." - ).format(key) + ).format(ex) send_status_message( self, jid, status_type, status_message) data_dir = get_default_data_directory() if not os.path.isdir(data_dir): os.mkdir(data_dir) - if not os.path.isdir(data_dir + '/' + key): - os.mkdir(data_dir + '/' + key) + if not os.path.isdir(data_dir + '/' + ex): + os.mkdir(data_dir + '/' + ex) filename = os.path.join( - data_dir, key, "slixfeed_" + timestamp() + "." + key) + data_dir, ex, "slixfeed_" + timestamp() + "." + ex) db_file = get_pathname_to_database(jid) results = await sqlite.get_feeds(db_file) - match key: + match ex: case "html": response = "Not yet implemented." case "md": @@ -425,7 +426,7 @@ async def message(self, message): url = await upload.start(self, jid, filename) # response = ( # "Feeds exported successfully to {}.\n{}" - # ).format(key, url) + # ).format(ex, url) # send_oob_reply_message(message, url, response) await send_oob_message( self, jid, url) @@ -468,23 +469,37 @@ async def message(self, message): response = "No entry Id with {}".format(ix) except: url = ix_url - content = await action.get_content(url) + url = uri.remove_tracking_parameters(url) + url = (uri.replace_hostname(url, "link")) or url + info = await action.get_content(url) + content = info[1] + status = info[0] if content: - match ext: - case "html": - action.generate_html(content, filename) - case "md": - action.generate_markdown(content, filename) - case "pdf": - action.generate_pdf(content, filename) - url = await upload.start( - self, jid, filename) - await send_oob_message( - self, jid, url) - await task.start_tasks_xmpp( - self, jid, ["status"]) + try: + match ext: + case "html": + action.generate_html(content, filename) + case "md": + action.generate_markdown(content, filename) + case "pdf": + action.generate_pdf(content, filename) + url = await upload.start( + self, jid, filename) + await send_oob_message( + self, jid, url) + await task.start_tasks_xmpp( + self, jid, ["status"]) + except: + logging.warning( + "Check that packages html2text, pdfkit " + "and wkhtmltopdf are installed") + response = ( + "Failed to export to {}" + ).format(ext) else: - response = "Failed to fetch resource." + response = ( + "Failed to fetch resource. Reason: {}" + ).format(status) else: response = "Missing entry Id." else: @@ -506,7 +521,7 @@ async def message(self, message): # count = await action.import_opml(db_file, url) # if count: # response = ( - # "Successfully imported {} feeds" + # "Successfully imported {} feeds." # ).format(count) # else: # response = ( @@ -532,12 +547,19 @@ async def message(self, message): url = uri.feed_to_http(url) url = (uri.replace_hostname(url, "feed")) or url db_file = get_pathname_to_database(jid) - response = await action.add_feed( - db_file, url) - await task.clean_tasks_xmpp( - jid, ["status"]) - await task.start_tasks_xmpp( - self, jid, ["status"]) + try: + response = await action.add_feed( + db_file, url) + await task.clean_tasks_xmpp( + jid, ["status"]) + await task.start_tasks_xmpp( + self, jid, ["status"]) + except: + response = ( + "> {}\nNews source is in the process " + "of being added to the subscription " + "list.".format(url) + ) send_reply_message(self, message, response) case _ if message_lowercase.startswith("feeds"): query = message_text[6:] @@ -872,7 +894,7 @@ async def message(self, message): try: await sqlite.set_enabled_status(db_file, ix, 1) response = ( - "Updates are now disabled for news source {}." + "Updates are now enabled for news source {}." ).format(ix) except: response = "No news source with ID {}.".format(ix)