Improve batch processing of invalid items.

This commit is contained in:
Schimon Jehudah, Adv. 2024-06-14 12:38:44 +03:00
parent d0b49b5717
commit fb4ca2c852
4 changed files with 157 additions and 121 deletions

View file

@ -20,8 +20,9 @@ TODO
""" """
from asyncio import Lock from asyncio import Lock
# from slixfeed.data import join_url import slixfeed.dt as dt
from slixfeed.log import Logger from slixfeed.log import Logger
from slixfeed.url import join_url
from sqlite3 import connect, Error, IntegrityError from sqlite3 import connect, Error, IntegrityError
import sys import sys
import time import time
@ -1616,8 +1617,8 @@ def get_last_update_time_of_feed(db_file, feed_id):
""" """
) )
par = (feed_id,) par = (feed_id,)
count = cur.execute(sql, par).fetchone() result = cur.execute(sql, par).fetchone()
return count return result
def get_unread_entries_of_feed(db_file, feed_id): def get_unread_entries_of_feed(db_file, feed_id):
@ -2681,6 +2682,139 @@ def get_contents_by_entry_id(db_file, entry_id):
return result return result
def get_invalid_entries(db_file, url, feed):
"""
List entries that do not exist in a given feed.
Parameters
----------
db_file : str
Path to database file.
url : str
Feed URL.
feed : list
Parsed feed document.
Returns
-------
ixs : dict
List of indexes of invalid items.
"""
function_name = sys._getframe().f_code.co_name
logger.debug('{}: db_file: {} url: {}'.format(function_name, db_file, url))
feed_id = get_feed_id(db_file, url)
feed_id = feed_id[0]
items = get_entries_of_feed(db_file, feed_id)
entries = feed.entries
ixs = {}
for item in items:
ix, entry_title, entry_link, entry_id, timestamp = item
read_status = is_entry_read(db_file, ix)
read_status = read_status[0]
for entry in entries:
title = None
link = None
time = None
# TODO better check and don't repeat code
if entry.has_key("id") and entry_id:
if entry.id == entry_id:
print(url)
print("compare entry.id == entry_id:", entry.id)
print("compare entry.id == entry_id:", entry_id)
print("============")
# items_valid.append(ix)
break
else:
# Prepare a title to compare
if entry.has_key("title"):
title = entry.title
else:
title = feed["feed"]["title"]
# Prepare a link to compare
if entry.has_key("link"):
link = join_url(url, entry.link)
else:
link = url
# Compare date, link and title
if entry.has_key("published") and timestamp:
print(url)
print("compare published:", title, link, time)
print("compare published:", entry_title, entry_link, timestamp)
print("============")
time = dt.rfc2822_to_iso8601(entry.published)
if (entry_title == title and
entry_link == link and
timestamp == time):
# items_valid.append(ix)
break
else:
# Compare link and title
if (entry_title == title and
entry_link == link):
print(url)
print("compare entry_link == link:", title, link)
print("compare entry_title == title:", entry_title, entry_link)
print("============")
# items_valid.append(ix)
break
# print('invalid entry:')
# print(entry)
# TODO better check and don't repeat code
ixs[ix] = read_status
print(ixs)
return ixs
async def process_invalid_entries(db_file, ixs):
"""
Batch process of invalid items.
Parameters
----------
db_file : TYPE
DESCRIPTION.
ixs : TYPE
DESCRIPTION.
Returns
-------
None.
"""
function_name = sys._getframe().f_code.co_name
logger.debug('{}: db_file: {} ixs: {}'
.format(function_name, db_file, ixs))
async with DBLOCK:
with create_connection(db_file) as conn:
cur = conn.cursor()
for ix in ixs:
logger.debug('{}: ix: {}'.format(function_name, ix))
if ixs[ix] == 1:
print('index {} ({}) be deleted'.format(ix, ixs[ix]))
sql = (
"""
DELETE
FROM entries_properties
WHERE id = :ix
"""
)
else:
print('index {} ({}) be archived'.format(ix, ixs[ix]))
sql = (
"""
UPDATE entries_state
SET archived = 1
WHERE entry_id = :ix
"""
)
par = (ix,)
# cur.execute(sql, par)
try:
print('cur')
cur.execute(sql, par)
except Exception as e:
logger.error(e)
# TODO Move entries that don't exist into table archive. # TODO Move entries that don't exist into table archive.
# NOTE Entries that are read from archive are deleted. # NOTE Entries that are read from archive are deleted.
# NOTE Unlike entries from table entries, entries from # NOTE Unlike entries from table entries, entries from

View file

@ -35,7 +35,7 @@ import slixfeed.fetch as fetch
from slixfeed.log import Logger from slixfeed.log import Logger
import slixfeed.sqlite as sqlite import slixfeed.sqlite as sqlite
from slixfeed.url import join_url, trim_url from slixfeed.url import join_url, trim_url
from slixfeed.utilities import Html, MD, SQLiteMaintain from slixfeed.utilities import Html, MD
from slixmpp.xmlstream import ET from slixmpp.xmlstream import ET
import sys import sys
from urllib.parse import urlsplit from urllib.parse import urlsplit
@ -855,24 +855,39 @@ class FeedTask:
status_code = result['status_code'] status_code = result['status_code']
feed_id = sqlite.get_feed_id(db_file, url) feed_id = sqlite.get_feed_id(db_file, url)
feed_id = feed_id[0] feed_id = feed_id[0]
print('feed_id')
print(feed_id)
if not result['error']: if not result['error']:
await sqlite.update_feed_status(db_file, feed_id, status_code) await sqlite.update_feed_status(db_file, feed_id, status_code)
document = result['content'] document = result['content']
feed = parse(document) feed = parse(document)
feed_valid = 0 if feed.bozo else 1 feed_valid = 0 if feed.bozo else 1
print('feed_valid')
print(feed_valid)
await sqlite.update_feed_validity(db_file, feed_id, feed_valid) await sqlite.update_feed_validity(db_file, feed_id, feed_valid)
feed_properties = Feed.get_properties_of_feed( feed_properties = Feed.get_properties_of_feed(
db_file, feed_id, feed) db_file, feed_id, feed)
print('feed_properties')
print(feed_properties)
await sqlite.update_feed_properties( await sqlite.update_feed_properties(
db_file, feed_id, feed_properties) db_file, feed_id, feed_properties)
new_entries = Feed.get_properties_of_entries( new_entries = Feed.get_properties_of_entries(
jid_bare, db_file, url, feed_id, feed) jid_bare, db_file, url, feed_id, feed)
print('new_entries')
print(new_entries)
print('if new_entries')
if new_entries: if new_entries:
print('if new_entries (YES)')
print('{}: {} new_entries: {} ({})'.format(jid_bare, len(new_entries), url, feed_id)) print('{}: {} new_entries: {} ({})'.format(jid_bare, len(new_entries), url, feed_id))
await sqlite.add_entries_and_update_feed_state(db_file, feed_id, new_entries) await sqlite.add_entries_and_update_feed_state(db_file, feed_id, new_entries)
await SQLiteMaintain.remove_nonexistent_entries(self, jid_bare, db_file, url, feed) limit = Config.get_setting_value(self.settings, jid_bare, 'archive')
# await SQLiteMaintain.remove_nonexistent_entries(self, jid_bare, db_file, url, feed) ixs = sqlite.get_invalid_entries(db_file, url, feed)
await sqlite.process_invalid_entries(db_file, ixs)
await sqlite.maintain_archive(db_file, limit)
# await sqlite.process_invalid_entries(db_file, ixs)
print('end : ' + url) print('end : ' + url)
limit2 = Config.get_setting_value(self.settings, jid_bare, 'archive')
await sqlite.maintain_archive(db_file, limit2)
# await asyncio.sleep(50) # await asyncio.sleep(50)
val = Config.get_setting_value(self.settings, jid_bare, 'check') val = Config.get_setting_value(self.settings, jid_bare, 'check')
await asyncio.sleep(60 * float(val)) await asyncio.sleep(60 * float(val))

View file

@ -194,119 +194,6 @@ class MD:
file.write(entry) file.write(entry)
class SQLiteMaintain:
# TODO
# (1) Check for duplications
# (2) append all duplications to a list
# (3) Send the list to a function in module sqlite.
async def remove_nonexistent_entries(self, jid_bare, db_file, url, feed):
"""
Remove entries that don't exist in a given parsed feed.
Check the entries returned from feed and delete read non
existing entries, otherwise move to table archive, if unread.
Parameters
----------
db_file : str
Path to database file.
url : str
Feed URL.
feed : list
Parsed feed document.
"""
function_name = sys._getframe().f_code.co_name
logger.debug('{}: db_file: {} url: {}'
.format(function_name, db_file, url))
feed_id = sqlite.get_feed_id(db_file, url)
feed_id = feed_id[0]
items = sqlite.get_entries_of_feed(db_file, feed_id)
entries = feed.entries
limit = Config.get_setting_value(self.settings, jid_bare, 'archive')
print(limit)
for item in items:
ix, entry_title, entry_link, entry_id, timestamp = item
read_status = sqlite.is_entry_read(db_file, ix)
read_status = read_status[0]
valid = False
for entry in entries:
title = None
link = None
time = None
# valid = False
# TODO better check and don't repeat code
if entry.has_key("id") and entry_id:
if entry.id == entry_id:
print("compare entry.id == entry_id:", entry.id)
print("compare entry.id == entry_id:", entry_id)
print("============")
valid = True
break
else:
if entry.has_key("title"):
title = entry.title
else:
title = feed["feed"]["title"]
if entry.has_key("link"):
link = join_url(url, entry.link)
else:
link = url
if entry.has_key("published") and timestamp:
print("compare published:", title, link, time)
print("compare published:", entry_title, entry_link, timestamp)
print("============")
time = dt.rfc2822_to_iso8601(entry.published)
if (entry_title == title and
entry_link == link and
timestamp == time):
valid = True
break
else:
if (entry_title == title and
entry_link == link):
print("compare entry_link == link:", title, link)
print("compare entry_title == title:", entry_title, entry_link)
print("============")
valid = True
break
# TODO better check and don't repeat code
if not valid:
# print("id: ", ix)
# if title:
# print("title: ", title)
# print("entry_title: ", entry_title)
# if link:
# print("link: ", link)
# print("entry_link: ", entry_link)
# if entry.id:
# print("last_entry:", entry.id)
# print("entry_id: ", entry_id)
# if time:
# print("time: ", time)
# print("timestamp: ", timestamp)
# print("read: ", read_status)
# breakpoint()
# TODO Send to table archive
# TODO Also make a regular/routine check for sources that
# have been changed (though that can only happen when
# manually editing)
# ix = item[0]
# print(">>> SOURCE: ", source)
# print(">>> INVALID:", entry_title)
# print("title:", entry_title)
# print("link :", entry_link)
# print("id :", entry_id)
if read_status == 1:
await sqlite.delete_entry_by_id(db_file, ix)
# print(">>> DELETING:", entry_title)
else:
# print(">>> ARCHIVING:", entry_title)
await sqlite.archive_entry(db_file, ix)
await sqlite.maintain_archive(db_file, limit)
""" """
Consider utilizing a dict as a handler that would match task keyword to functions. Consider utilizing a dict as a handler that would match task keyword to functions.

View file

@ -1,2 +1,2 @@
__version__ = '0.1.79' __version__ = '0.1.80'
__version_info__ = (0, 1, 79) __version_info__ = (0, 1, 80)