Update slixfeed.py

This commit is contained in:
Schimon Jehudah 2022-08-14 15:23:46 +00:00
parent 604aa73db1
commit 173aeb09cf

View file

@ -31,14 +31,16 @@ import time
# offline = False # offline = False
lock = asyncio.Lock()
class Slixfeed(slixmpp.ClientXMPP): class Slixfeed(slixmpp.ClientXMPP):
""" """
Slixmpp bot that will send updates of feeds it Slixmpp bot that will send updates of feeds it
receives. receives.
""" """
lock = asyncio.Lock()
print("locked?")
print(lock.locked())
def __init__(self, jid, password): def __init__(self, jid, password):
slixmpp.ClientXMPP.__init__(self, jid, password) slixmpp.ClientXMPP.__init__(self, jid, password)
@ -103,36 +105,42 @@ class Slixfeed(slixmpp.ClientXMPP):
print("COMMAND: feed recent") print("COMMAND: feed recent")
print("ACCOUNT: " + str(msg['from'])) print("ACCOUNT: " + str(msg['from']))
action = await initdb(msg['from'].bare, action = await initdb(msg['from'].bare,
self.lock,
message[12:], message[12:],
last_entries) last_entries)
elif message.lower().startswith('feed search '): elif message.lower().startswith('feed search '):
print("COMMAND: feed search") print("COMMAND: feed search")
print("ACCOUNT: " + str(msg['from'])) print("ACCOUNT: " + str(msg['from']))
action = await initdb(msg['from'].bare, action = await initdb(msg['from'].bare,
self.lock,
message[12:], message[12:],
search_entries) search_entries)
elif message.lower().startswith('feed list'): elif message.lower().startswith('feed list'):
print("COMMAND: feed list") print("COMMAND: feed list")
print("ACCOUNT: " + str(msg['from'])) print("ACCOUNT: " + str(msg['from']))
action = await initdb(msg['from'].bare, action = await initdb(msg['from'].bare,
self.lock,
False, False,
list_subscriptions) list_subscriptions)
elif message.lower().startswith('feed add '): elif message.lower().startswith('feed add '):
print("COMMAND: feed add") print("COMMAND: feed add")
print("ACCOUNT: " + str(msg['from'])) print("ACCOUNT: " + str(msg['from']))
action = await initdb(msg['from'].bare, action = await initdb(msg['from'].bare,
self.lock,
message[9:], message[9:],
add_feed) add_feed)
elif message.lower().startswith('feed remove '): elif message.lower().startswith('feed remove '):
print("COMMAND: feed remove") print("COMMAND: feed remove")
print("ACCOUNT: " + str(msg['from'])) print("ACCOUNT: " + str(msg['from']))
action = await initdb(msg['from'].bare, action = await initdb(msg['from'].bare,
self.lock,
message[12:], message[12:],
remove_feed) remove_feed)
elif message.lower().startswith('feed status '): elif message.lower().startswith('feed status '):
print("COMMAND: feed status") print("COMMAND: feed status")
print("ACCOUNT: " + str(msg['from'])) print("ACCOUNT: " + str(msg['from']))
action = await initdb(msg['from'].bare, action = await initdb(msg['from'].bare,
self.lock,
message[12:], message[12:],
toggle_status) toggle_status)
elif message.lower().startswith('enable'): elif message.lower().startswith('enable'):
@ -152,7 +160,7 @@ class Slixfeed(slixmpp.ClientXMPP):
async def send_updates(self, event): async def send_updates(self, event):
#while not offline: #while not offline:
#while True: #while True:
async with lock: async with self.lock:
print(time.strftime("%H:%M:%S")) print(time.strftime("%H:%M:%S"))
# print(offline) # print(offline)
db_dir = get_default_dbdir() db_dir = get_default_dbdir()
@ -174,7 +182,7 @@ class Slixfeed(slixmpp.ClientXMPP):
# d = self.send_ping(self, jid) # d = self.send_ping(self, jid)
# print('d') # print('d')
# print(d) # print(d)
new = await initdb(jid, False, get_unread) new = await initdb(jid, self.lock, False, get_unread)
if new: if new:
msg = self.make_message(mto=jid, mbody=new, msg = self.make_message(mto=jid, mbody=new,
mtype='chat') mtype='chat')
@ -197,9 +205,9 @@ class Slixfeed(slixmpp.ClientXMPP):
# asyncio.ensure_future(send_updates(self)) # asyncio.ensure_future(send_updates(self))
async def check_updates(lock): async def check_updates():
#while True: #async with lock:
async with lock: while True:
db_dir = get_default_dbdir() db_dir = get_default_dbdir()
if not os.path.isdir(db_dir): if not os.path.isdir(db_dir):
msg = ("Slixfeed can not work without a database. \n" msg = ("Slixfeed can not work without a database. \n"
@ -215,12 +223,13 @@ async def check_updates(lock):
jid = file[:-3] jid = file[:-3]
await initdb( await initdb(
jid, jid,
Slixfeed.lock,
False, False,
download_updates download_updates
) )
await asyncio.sleep(60 * 30) await asyncio.sleep(60 * 30)
asyncio.ensure_future(check_updates(lock)) asyncio.ensure_future(check_updates())
# async def tasks(): # async def tasks():
# # Begin scanning feeds # # Begin scanning feeds
@ -230,7 +239,7 @@ asyncio.ensure_future(check_updates(lock))
async def tasks(jid, password): async def tasks(jid, password):
# Begin scanning feeds # Begin scanning feeds
await asyncio.gather( await asyncio.gather(
check_updates(lock), check_updates(),
Slixfeed(jid, password).send_updates() Slixfeed(jid, password).send_updates()
) )
@ -304,7 +313,8 @@ def get_default_dbdir():
# TODO Perhaps this needs to be executed # TODO Perhaps this needs to be executed
# just once per program execution # just once per program execution
async def initdb(jid, message, callback): async def initdb(jid, lock, message, callback):
print("initdb(jid, lock, message, callback)")
db_dir = get_default_dbdir() db_dir = get_default_dbdir()
if not os.path.isdir(db_dir): if not os.path.isdir(db_dir):
os.mkdir(db_dir) os.mkdir(db_dir)
@ -341,10 +351,16 @@ async def initdb(jid, message, callback):
print("Error! cannot create the database connection.") print("Error! cannot create the database connection.")
if lock: if lock:
if message: if message:
return await callback(conn, message, lock) print("if message")
print(message)
print(lock.locked())
return await callback(conn, lock, message)
else: else:
print("if message else")
print(lock.locked())
return await callback(conn, lock) return await callback(conn, lock)
else: else:
print("lock else")
return await callback(conn) return await callback(conn)
def create_connection(db_file): def create_connection(db_file):
@ -379,69 +395,72 @@ def create_table(conn, create_table_sql):
# def setup_info(jid): # def setup_info(jid):
# def start_process(jid): # def start_process(jid):
async def download_updates(conn, lock): async def download_updates(conn, lock):
async with lock: print("download_updates(conn, lock)")
with conn: with conn:
# cur = conn.cursor() # cur = conn.cursor()
# get current date # get current date
#today = date.today() #today = date.today()
urls = await get_subscriptions(conn) urls = await get_subscriptions(conn)
for url in urls: for url in urls:
#"".join(url) #"".join(url)
source = url[0] source = url[0]
res = await download_feed(conn, source) res = await download_feed(conn, source)
cur = conn.cursor() await lock.acquire()
sql = "UPDATE feeds SET status = :status, scanned = :scanned WHERE address = :url" cur = conn.cursor()
cur.execute(sql, {"status": res[1], "scanned": date.today(), "url": source}) sql = "UPDATE feeds SET status = :status, scanned = :scanned WHERE address = :url"
conn.commit() cur.execute(sql, {"status": res[1], "scanned": date.today(), "url": source})
if res[0]: conn.commit()
try: if res[0]:
feed = feedparser.parse(res[0]) try:
if feed.bozo: feed = feedparser.parse(res[0])
bozo = ("WARNING: Bozo detected for feed <{}>. " if feed.bozo:
"For more information, visit " bozo = ("WARNING: Bozo detected for feed <{}>. "
"https://pythonhosted.org/feedparser/bozo.html" "For more information, visit "
.format(source)) "https://pythonhosted.org/feedparser/bozo.html"
print(bozo) .format(source))
valid = 0 print(bozo)
else: valid = 0
valid = 1
sql = "UPDATE feeds SET valid = :validity WHERE address = :url"
cur.execute(sql, {"validity": valid, "url": source})
conn.commit()
except (IncompleteReadError, IncompleteRead, error.URLError) as e:
print(e)
return
# TODO Place these couple of lines back down
# NOTE Need to correct the SQL statement to do so
entries = feed.entries
length = len(entries)
await remove_entry(conn, source, length)
for entry in entries:
if entry.has_key("title"):
title = entry.title
else: else:
title = feed["feed"]["title"] valid = 1
link = source if not entry.link else entry.link sql = "UPDATE feeds SET valid = :validity WHERE address = :url"
exist = await check_entry(conn, title, link) cur.execute(sql, {"validity": valid, "url": source})
if not exist: conn.commit()
if entry.has_key("summary"): except (IncompleteReadError, IncompleteRead, error.URLError) as e:
summary = entry.summary print(e)
# Remove HTML tags lock.release()
summary = BeautifulSoup(summary, "lxml").text return
# TODO Limit text length # TODO Place these couple of lines back down
summary = summary.replace("\n\n", "\n")[:300] + " ‍⃨" # NOTE Need to correct the SQL statement to do so
else: entries = feed.entries
summary = '*** No summary ***' length = len(entries)
#print('~~~~~~summary not in entry') await remove_entry(conn, source, length)
entry = (title, summary, link, source, 0); for entry in entries:
await add_entry(conn, entry) if entry.has_key("title"):
await set_date(conn, source) title = entry.title
#make_message else:
# message = title + '\n\n' + summary + '\n\nLink: ' + link title = feed["feed"]["title"]
# print(message) link = source if not entry.link else entry.link
# news.append(message) exist = await check_entry(conn, title, link)
# print(len(news)) if not exist:
# return news if entry.has_key("summary"):
summary = entry.summary
# Remove HTML tags
summary = BeautifulSoup(summary, "lxml").text
# TODO Limit text length
summary = summary.replace("\n\n", "\n")[:300] + " ‍⃨"
else:
summary = '*** No summary ***'
#print('~~~~~~summary not in entry')
entry = (title, summary, link, source, 0);
await add_entry(conn, entry)
await set_date(conn, source)
lock.release()
#make_message
# message = title + '\n\n' + summary + '\n\nLink: ' + link
# print(message)
# news.append(message)
# print(len(news))
# return news
async def download_feed(conn, url): async def download_feed(conn, url):
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
@ -470,13 +489,14 @@ async def check_feed(conn, url):
cur.execute(sql, (url,)) cur.execute(sql, (url,))
return cur.fetchone() return cur.fetchone()
async def add_feed(conn, url, lock): async def add_feed(conn, lock, url):
""" """
Add a new feed into the feeds table Add a new feed into the feeds table
:param conn: :param conn:
:param feed: :param feed:
:return: string :return: string
""" """
print("add_feed(conn, lock, url)")
#TODO consider async with lock #TODO consider async with lock
await lock.acquire() await lock.acquire()
#conn = create_connection(db_file) #conn = create_connection(db_file)
@ -522,13 +542,14 @@ async def add_feed(conn, url, lock):
msg = "News source is already listed in the subscription list" msg = "News source is already listed in the subscription list"
return msg return msg
async def remove_feed(conn, id, lock): async def remove_feed(conn, lock, id):
""" """
Delete a feed by feed id Delete a feed by feed id
:param conn: :param conn:
:param id: id of the feed :param id: id of the feed
:return: string :return: string
""" """
print("remove_feed(conn, lock, id)")
# You have chose to remove feed (title, url) from your feed list. # You have chose to remove feed (title, url) from your feed list.
# Enter "delete" to confirm removal. # Enter "delete" to confirm removal.
await lock.acquire() await lock.acquire()
@ -616,13 +637,14 @@ async def feed_refresh(conn, id):
conn.commit() conn.commit()
# TODO mark_all_read for entries of feed # TODO mark_all_read for entries of feed
async def toggle_status(conn, id, lock): async def toggle_status(conn, lock, id):
""" """
Set status of feed Set status of feed
:param conn: :param conn:
:param id: id of the feed :param id: id of the feed
:return: string :return: string
""" """
print("toggle_status(conn, lock, id)")
await lock.acquire() await lock.acquire()
#conn = create_connection(db_file) #conn = create_connection(db_file)
cur = conn.cursor() cur = conn.cursor()
@ -713,6 +735,7 @@ async def list_subscriptions(conn, lock):
:param conn: :param conn:
:return: rows (string) :return: rows (string)
""" """
print("list_subscriptions(conn, lock)")
await lock.acquire() await lock.acquire()
cur = conn.cursor() cur = conn.cursor()
print(time.strftime("%H:%M:%S"), "conn.cursor() from list_subscriptions(conn)") print(time.strftime("%H:%M:%S"), "conn.cursor() from list_subscriptions(conn)")
@ -738,13 +761,14 @@ async def list_subscriptions(conn, lock):
"feed add https://reclaimthenet.org/feed/") "feed add https://reclaimthenet.org/feed/")
return msg return msg
async def last_entries(conn, num, lock): async def last_entries(conn, lock, num):
""" """
Query feeds Query feeds
:param conn: :param conn:
:param num: integer :param num: integer
:return: rows (string) :return: rows (string)
""" """
print("last_entries(conn, lock, num)")
num = int(num) num = int(num)
if num > 50: if num > 50:
num = 50 num = 50
@ -762,13 +786,14 @@ async def last_entries(conn, num, lock):
""".format(str(result[0]), str(result[1])) """.format(str(result[0]), str(result[1]))
return titles_list return titles_list
async def search_entries(conn, query, lock): async def search_entries(conn, lock, query):
""" """
Query feeds Query feeds
:param conn: :param conn:
:param query: string :param query: string
:return: rows (string) :return: rows (string)
""" """
print("search_entries(conn, lock, query)")
if len(query) < 2: if len(query) < 2:
return "Please enter at least 2 characters to search" return "Please enter at least 2 characters to search"
await lock.acquire() await lock.acquire()