Improve SQLite performance.

Handle missing packages errors.
This commit is contained in:
Schimon Jehudah 2024-01-10 20:06:56 +00:00
parent 46a0819229
commit 0ca37dfdee
4 changed files with 327 additions and 122 deletions

View file

@ -5,19 +5,25 @@
TODO
1) Call sqlite function from function statistics.
1) Function scan at "for entry in entries"
Suppress directly calling function "add_entry" (accept db_file)
Pass a list of valid entries to a new function "add_entries"
(accept db_file) which would call function "add_entry" (accept cur).
* accelerate adding of large set of entries at once.
* prevent (or mitigate halt of consequent actions).
* reduce I/O.
2) Call sqlite function from function statistics.
Returning a list of values doesn't' seem to be a good practice.
"""
from asyncio.exceptions import IncompleteReadError
from bs4 import BeautifulSoup
import html2text
from http.client import IncompleteRead
from feedparser import parse
import logging
from lxml import html
import pdfkit
from readability import Document
import slixfeed.config as config
import slixfeed.crawl as crawl
@ -40,6 +46,20 @@ from urllib import error
from urllib.parse import urlsplit
import xml.etree.ElementTree as ET
try:
import html2text
except:
logging.info(
"Package html2text was not found.\n"
"Markdown support is disabled.")
try:
import pdfkit
except:
logging.info(
"Package pdfkit was not found.\n"
"PDF support is disabled.")
def log_to_markdown(timestamp, filename, jid, message):
"""
@ -302,13 +322,14 @@ def export_to_markdown(jid, filename, results):
current_date(), jid))
# TODO Consider adding element jid as a pointer of import
def export_to_opml(jid, filename, results):
root = ET.Element("opml")
root.set("version", "1.0")
head = ET.SubElement(root, "head")
ET.SubElement(head, "title").text = "Subscriptions for {}".format(jid)
ET.SubElement(head, "title").text = "{}".format(jid)
ET.SubElement(head, "description").text = (
"Set of feeds exported with Slixfeed")
"Set of subscriptions exported by Slixfeed")
ET.SubElement(head, "generator").text = "Slixfeed"
ET.SubElement(head, "urlPublic").text = (
"https://gitgud.io/sjehuda/slixfeed")
@ -339,8 +360,8 @@ async def import_opml(db_file, url):
# feed = (url, title)
# feeds.extend([feed])
feeds.extend([(url, title)])
await sqlite.import_feeds(
db_file, feeds)
await sqlite.import_feeds(db_file, feeds)
await sqlite.add_metadata(db_file)
after = await sqlite.get_number_of_items(
db_file, 'feeds')
difference = int(after) - int(before)
@ -581,6 +602,7 @@ async def scan(db_file, url):
status = result[1]
except:
return
new_entries = []
if document and status == 200:
feed = parse(document)
entries = feed.entries
@ -642,10 +664,10 @@ async def scan(db_file, url):
summary = entry.summary if entry.has_key("summary") else ''
read_status = 0
pathname = urlsplit(link).path
string = ("{} {} {}"
string = (
"{} {} {}"
).format(
title, summary, pathname
)
title, summary, pathname)
allow_list = await config.is_include_keyword(
db_file, "filter-allow", string)
if not allow_list:
@ -654,24 +676,42 @@ async def scan(db_file, url):
if reject_list:
read_status = 1
logging.debug(
"Rejected due to keyword {}".format(reject_list))
"Rejected : {}\n"
"Keyword : {}".format(
link, reject_list))
if isinstance(date, int):
logging.error(
"Variable 'date' is int: {}".format(date))
await sqlite.add_entry(
db_file, title, link, entry_id,
url, date, read_status)
await sqlite.set_date(db_file, url)
entry = {
"title": title,
"link": link,
"entry_id": entry_id,
"url": url,
"date": date,
"read_status": read_status
}
new_entries.extend([entry])
# await sqlite.add_entry(
# db_file, title, link, entry_id,
# url, date, read_status)
# await sqlite.set_date(db_file, url)
if len(new_entries):
await sqlite.add_entries_and_update_timestamp(
db_file, new_entries)
async def get_content(url):
result = await fetch.download_feed(url)
if result[0]:
data = result[0]
code = result[1]
if data:
document = Document(result[0])
content = document.summary()
info = [code, content]
else:
content = None
return content
info = [code, None]
return info
# TODO Either adapt it to filename
# or change it to something else
#filename = document.title()
@ -691,6 +731,7 @@ def extract_first_image(url, content):
image_url = None
return image_url
def generate_html(text, filename):
with open(filename, 'w') as file:
file.write(text)

View file

@ -5,14 +5,11 @@
TODO
1) Table feeds:
category
type (atom, rdf, rss0.9. rss2 etc.)
2) Function mark_all_read for entries of given feed
3) Statistics
1) Function to open connection (receive db_file).
Function to close connection.
All other functions to receive cursor.
2) Merge function add_metadata into function import_feeds.
"""
from asyncio import Lock
@ -89,7 +86,7 @@ def create_tables(db_file):
"""
CREATE TABLE IF NOT EXISTS properties (
id INTEGER NOT NULL,
feed_id INTEGER NOT NULL,
feed_id INTEGER NOT NULL UNIQUE,
type TEXT,
encoding TEXT,
language TEXT,
@ -105,7 +102,7 @@ def create_tables(db_file):
"""
CREATE TABLE IF NOT EXISTS status (
id INTEGER NOT NULL,
feed_id INTEGER NOT NULL,
feed_id INTEGER NOT NULL UNIQUE,
enabled INTEGER NOT NULL DEFAULT 1,
updated TEXT,
scanned TEXT,
@ -113,6 +110,7 @@ def create_tables(db_file):
status_code INTEGER,
valid INTEGER,
filter INTEGER NOT NULL DEFAULT 1,
priority INTEGER,
FOREIGN KEY ("feed_id") REFERENCES "feeds" ("id")
ON UPDATE CASCADE
ON DELETE CASCADE,
@ -260,6 +258,84 @@ async def import_feeds(db_file, feeds):
logging.error(e)
async def add_metadata(db_file):
"""
Insert a new feed into the feeds table.
Parameters
----------
db_file : str
Path to database file.
"""
async with DBLOCK:
with create_connection(db_file) as conn:
cur = conn.cursor()
sql = (
"""
SELECT id
FROM feeds
ORDER BY id ASC
"""
)
ixs = cur.execute(sql).fetchall()
for ix in ixs:
feed_id = ix[0]
insert_feed_status(cur, feed_id)
insert_feed_properties(cur, feed_id)
def insert_feed_status(cur, feed_id):
"""
Set feed status.
Parameters
----------
cur : object
Cursor object.
"""
sql = (
"""
INSERT
INTO status(
feed_id)
VALUES(
?)
"""
)
try:
cur.execute(sql, (feed_id,))
except IntegrityError as e:
logging.warning(
"Skipping feed_id {} for table status".format(feed_id))
logging.error(e)
def insert_feed_properties(cur, feed_id):
"""
Set feed properties.
Parameters
----------
cur : object
Cursor object.
"""
sql = (
"""
INSERT
INTO properties(
feed_id)
VALUES(
?)
"""
)
try:
cur.execute(sql, (feed_id,))
except IntegrityError as e:
logging.warning(
"Skipping feed_id {} for table properties".format(feed_id))
logging.error(e)
async def insert_feed(
db_file, url, title=None, entries=None, version=None,
encoding=None, language=None, status_code=None, updated=None):
@ -339,6 +415,61 @@ async def insert_feed(
cur.execute(sql, properties)
async def insert_feed_(
db_file, url, title=None, entries=None, version=None,
encoding=None, language=None, status_code=None, updated=None):
"""
Insert a new feed into the feeds table.
Parameters
----------
db_file : str
Path to database file.
url : str
URL.
title : str, optional
Feed title. The default is None.
entries : int, optional
Number of entries. The default is None.
version : str, optional
Type of feed. The default is None.
encoding : str, optional
Encoding of feed. The default is None.
language : str, optional
Language code of feed. The default is None.
status : str, optional
HTTP status code. The default is None.
updated : ???, optional
Date feed was last updated. The default is None.
status : str, optional
HTTP status code. The default is None.
updated : ???, optional
Date feed was last updated. The default is None.
"""
async with DBLOCK:
with create_connection(db_file) as conn:
cur = conn.cursor()
feed = (
title, url
)
sql = (
"""
INSERT
INTO feeds(
name, url)
VALUES(
?, ?)
"""
)
cur.execute(sql, feed)
feed_id = get_feed_id(cur, url)
insert_feed_properties(
cur, feed_id, entries=None,
version=None, encoding=None, language=None)
insert_feed_status(
cur, feed_id, status_code=None, updated=None)
async def remove_feed_by_url(db_file, url):
"""
Delete a feed by feed URL.
@ -560,7 +691,7 @@ async def get_unread_entries(db_file, num):
return results
async def get_feed_id(cur, url):
def get_feed_id(cur, url):
"""
Get index of given feed.
@ -896,9 +1027,9 @@ async def set_enabled_status(db_file, ix, status):
cur = conn.cursor()
sql = (
"""
UPDATE feeds
UPDATE status
SET enabled = :status
WHERE id = :id
WHERE feed_id = :id
"""
)
cur.execute(sql, {
@ -943,14 +1074,7 @@ async def add_entry(
async with DBLOCK:
with create_connection(db_file) as conn:
cur = conn.cursor()
sql = (
"""
SELECT id
FROM feeds
WHERE url = :url
"""
)
feed_id = cur.execute(sql, (url,)).fetchone()[0]
feed_id = get_feed_id(cur, url)
sql = (
"""
INSERT
@ -985,6 +1109,59 @@ async def add_entry(
# # breakpoint()
async def add_entries_and_update_timestamp(db_file, new_entries):
"""
Add new entries.
Parameters
----------
db_file : str
Path to database file.
new_entries : list
Set of entries as dict.
"""
async with DBLOCK:
with create_connection(db_file) as conn:
cur = conn.cursor()
feeds = []
for entry in new_entries:
url = entry["url"]
feed_id = get_feed_id(cur, url)
sql = (
"""
INSERT
INTO entries(
title, link, entry_id, feed_id, timestamp, read)
VALUES(
:title, :link, :entry_id, :feed_id, :timestamp, :read)
"""
)
cur.execute(sql, {
"title": entry["title"],
"link": entry["link"],
"entry_id": entry["entry_id"],
"feed_id": feed_id,
"timestamp": entry["date"],
"read": entry["read_status"]
})
if url not in feeds:
feeds.extend([url])
for feed in feeds:
url = feed
feed_id = get_feed_id(cur, url)
sql = (
"""
UPDATE status
SET renewed = :today
WHERE feed_id = :feed_id
"""
)
cur.execute(sql, {
"today": date.today(),
"feed_id": feed_id
})
async def set_date(db_file, url):
"""
Set renewed date of given feed.
@ -999,14 +1176,7 @@ async def set_date(db_file, url):
async with DBLOCK:
with create_connection(db_file) as conn:
cur = conn.cursor()
sql = (
"""
SELECT id
FROM feeds
WHERE url = :url
"""
)
feed_id = cur.execute(sql, (url,)).fetchone()[0]
feed_id = get_feed_id(cur, url)
sql = (
"""
UPDATE status
@ -1037,17 +1207,7 @@ async def update_feed_status(db_file, url, status_code):
async with DBLOCK:
with create_connection(db_file) as conn:
cur = conn.cursor()
sql = (
"""
SELECT id
FROM feeds
WHERE url = :url
"""
)
# try:
feed_id = cur.execute(sql, (url,)).fetchone()[0]
# except:
# breakpoint()
feed_id = get_feed_id(cur, url)
sql = (
"""
UPDATE status
@ -1078,14 +1238,7 @@ async def update_feed_validity(db_file, url, valid):
async with DBLOCK:
with create_connection(db_file) as conn:
cur = conn.cursor()
sql = (
"""
SELECT id
FROM feeds
WHERE url = :url
"""
)
feed_id = cur.execute(sql, (url,)).fetchone()[0]
feed_id = get_feed_id(cur, url)
sql = (
"""
UPDATE status
@ -1117,14 +1270,7 @@ async def update_feed_properties(db_file, url, entries, updated):
async with DBLOCK:
with create_connection(db_file) as conn:
cur = conn.cursor()
sql = (
"""
SELECT id
FROM feeds
WHERE url = :url
"""
)
feed_id = cur.execute(sql, (url,)).fetchone()[0]
feed_id = get_feed_id(cur, url)
sql = (
"""
UPDATE properties
@ -1455,14 +1601,7 @@ async def check_entry_exist(
cur = get_cursor(db_file)
exist = False
if entry_id:
sql = (
"""
SELECT id
FROM feeds
WHERE url = :url
"""
)
feed_id = cur.execute(sql, (url,)).fetchone()[0]
feed_id = get_feed_id(cur, url)
sql = (
"""
SELECT id

View file

@ -242,7 +242,10 @@ async def send_update(self, jid, num=None):
# breakpoint()
await mark_as_read(db_file, result[0])
if not image_url:
content = await action.get_content(url)
info = await action.get_content(url)
content = info[1]
status = info[0]
if status == 200:
image_url = action.extract_first_image(url, content)
new = " ".join(news_digest)
# breakpoint()

View file

@ -98,7 +98,7 @@ async def message(self, message):
count = await action.import_opml(db_file, url)
if count:
response = (
"Successfully imported {} feeds"
"Successfully imported {} feeds."
).format(count)
else:
response = (
@ -109,6 +109,7 @@ async def message(self, message):
await task.start_tasks_xmpp(
self, jid, ["status"])
send_reply_message(self, message, response)
return
if message["type"] == "groupchat":
@ -399,19 +400,19 @@ async def message(self, message):
status_type = "dnd"
status_message = (
"📤️ Procesing request to export feeds into {} ..."
).format(key)
).format(ex)
send_status_message(
self, jid, status_type, status_message)
data_dir = get_default_data_directory()
if not os.path.isdir(data_dir):
os.mkdir(data_dir)
if not os.path.isdir(data_dir + '/' + key):
os.mkdir(data_dir + '/' + key)
if not os.path.isdir(data_dir + '/' + ex):
os.mkdir(data_dir + '/' + ex)
filename = os.path.join(
data_dir, key, "slixfeed_" + timestamp() + "." + key)
data_dir, ex, "slixfeed_" + timestamp() + "." + ex)
db_file = get_pathname_to_database(jid)
results = await sqlite.get_feeds(db_file)
match key:
match ex:
case "html":
response = "Not yet implemented."
case "md":
@ -425,7 +426,7 @@ async def message(self, message):
url = await upload.start(self, jid, filename)
# response = (
# "Feeds exported successfully to {}.\n{}"
# ).format(key, url)
# ).format(ex, url)
# send_oob_reply_message(message, url, response)
await send_oob_message(
self, jid, url)
@ -468,8 +469,13 @@ async def message(self, message):
response = "No entry Id with {}".format(ix)
except:
url = ix_url
content = await action.get_content(url)
url = uri.remove_tracking_parameters(url)
url = (uri.replace_hostname(url, "link")) or url
info = await action.get_content(url)
content = info[1]
status = info[0]
if content:
try:
match ext:
case "html":
action.generate_html(content, filename)
@ -483,8 +489,17 @@ async def message(self, message):
self, jid, url)
await task.start_tasks_xmpp(
self, jid, ["status"])
except:
logging.warning(
"Check that packages html2text, pdfkit "
"and wkhtmltopdf are installed")
response = (
"Failed to export to {}"
).format(ext)
else:
response = "Failed to fetch resource."
response = (
"Failed to fetch resource. Reason: {}"
).format(status)
else:
response = "Missing entry Id."
else:
@ -506,7 +521,7 @@ async def message(self, message):
# count = await action.import_opml(db_file, url)
# if count:
# response = (
# "Successfully imported {} feeds"
# "Successfully imported {} feeds."
# ).format(count)
# else:
# response = (
@ -532,12 +547,19 @@ async def message(self, message):
url = uri.feed_to_http(url)
url = (uri.replace_hostname(url, "feed")) or url
db_file = get_pathname_to_database(jid)
try:
response = await action.add_feed(
db_file, url)
await task.clean_tasks_xmpp(
jid, ["status"])
await task.start_tasks_xmpp(
self, jid, ["status"])
except:
response = (
"> {}\nNews source is in the process "
"of being added to the subscription "
"list.".format(url)
)
send_reply_message(self, message, response)
case _ if message_lowercase.startswith("feeds"):
query = message_text[6:]
@ -872,7 +894,7 @@ async def message(self, message):
try:
await sqlite.set_enabled_status(db_file, ix, 1)
response = (
"Updates are now disabled for news source {}."
"Updates are now enabled for news source {}."
).format(ix)
except:
response = "No news source with ID {}.".format(ix)