diff --git a/slixfeed.py b/slixfeed.py index f602d21..d4bc0d1 100644 --- a/slixfeed.py +++ b/slixfeed.py @@ -31,6 +31,8 @@ import time # offline = False +lock = asyncio.Lock() + class Slixfeed(slixmpp.ClientXMPP): """ Slixmpp bot that will send updates of feeds it @@ -149,7 +151,8 @@ class Slixfeed(slixmpp.ClientXMPP): async def send_updates(self, event): #while not offline: - while True: + #while True: + async with lock: print(time.strftime("%H:%M:%S")) # print(offline) db_dir = get_default_dbdir() @@ -194,8 +197,9 @@ class Slixfeed(slixmpp.ClientXMPP): # asyncio.ensure_future(send_updates(self)) -async def check_updates(): - while True: +async def check_updates(lock): + #while True: + async with lock: db_dir = get_default_dbdir() if not os.path.isdir(db_dir): msg = ("Slixfeed can not work without a database. \n" @@ -216,7 +220,7 @@ async def check_updates(): ) await asyncio.sleep(60 * 30) -asyncio.ensure_future(check_updates()) +asyncio.ensure_future(check_updates(lock)) # async def tasks(): # # Begin scanning feeds @@ -226,7 +230,7 @@ asyncio.ensure_future(check_updates()) async def tasks(jid, password): # Begin scanning feeds await asyncio.gather( - check_updates(), + check_updates(lock), Slixfeed(jid, password).send_updates() ) @@ -335,8 +339,11 @@ async def initdb(jid, message, callback): create_table(conn, entries_table_sql) else: print("Error! cannot create the database connection.") - if message: - return await callback(conn, message) + if lock: + if message: + return await callback(conn, message, lock) + else: + return await callback(conn, lock) else: return await callback(conn) @@ -371,72 +378,70 @@ def create_table(conn, create_table_sql): # def setup_info(jid): # def start_process(jid): -async def download_updates(conn): - with conn: - # cur = conn.cursor() - # get current date - #today = date.today() - urls = await get_subscriptions(conn) - for url in urls: - #"".join(url) - source = url[0] - res = await download_feed(conn, source) - cur = conn.cursor() - sql = "UPDATE feeds SET status = :status WHERE address = :url" - cur.execute(sql, {"status": res[1], "url": source}) - conn.commit() - sql = "UPDATE feeds SET scanned = :scanned WHERE address = :url" - cur.execute(sql, {"scanned": date.today(), "url": source}) - conn.commit() - if res[0]: - try: - feed = feedparser.parse(res[0]) - if feed.bozo: - bozo = ("WARNING: Bozo detected for feed <{}>. " - "For more information, visit " - "https://pythonhosted.org/feedparser/bozo.html" - .format(source)) - print(bozo) - cur = conn.cursor() - sql = "UPDATE feeds SET valid = 0 WHERE address = ?" +async def download_updates(conn, lock): + async with lock: + with conn: + # cur = conn.cursor() + # get current date + #today = date.today() + urls = await get_subscriptions(conn) + for url in urls: + #"".join(url) + source = url[0] + res = await download_feed(conn, source) + cur = conn.cursor() + sql = "UPDATE feeds SET status = :status, scanned = :scanned WHERE address = :url" + cur.execute(sql, {"status": res[1], "scanned": date.today(), "url": source}) + conn.commit() + if res[0]: + try: + feed = feedparser.parse(res[0]) + if feed.bozo: + bozo = ("WARNING: Bozo detected for feed <{}>. " + "For more information, visit " + "https://pythonhosted.org/feedparser/bozo.html" + .format(source)) + print(bozo) + valid = 0 + else: + valid = 1 + sql = "UPDATE feeds SET valid = :validity WHERE address = :url" + cur.execute(sql, {"validity": valid, "url": source}) + conn.commit() + except (IncompleteReadError, IncompleteRead, error.URLError) as e: + print(e) + return + # TODO Place these couple of lines back down + # NOTE Need to correct the SQL statement to do so + entries = feed.entries + length = len(entries) + await remove_entry(conn, source, length) + for entry in entries: + if entry.has_key("title"): + title = entry.title else: - sql = "UPDATE feeds SET valid = 1 WHERE address = ?" - cur.execute(sql, (source,)) - conn.commit() - except (IncompleteReadError, IncompleteRead, error.URLError) as e: - print(e) - return - # TODO Place these couple of lines back down - # NOTE Need to correct the SQL statement to do so - entries = feed.entries - length = len(entries) - await remove_entry(conn, source, length) - for entry in entries: - if entry.has_key("title"): - title = entry.title - else: - title = feed["feed"]["title"] - link = source if not entry.link else entry.link - exist = await check_entry(conn, title, link) - if not exist: - if entry.has_key("summary"): - summary = entry.summary - # Remove HTML tags - summary = BeautifulSoup(summary, "lxml").text - # TODO Limit text length - summary = summary.replace("\n\n", "\n")[:300] + " ‍⃨" - else: - summary = '*** No summary ***' - #print('~~~~~~summary not in entry') - entry = (title, summary, link, source, 0); - await add_entry(conn, entry) - await set_date(conn, source) - #make_message - # message = title + '\n\n' + summary + '\n\nLink: ' + link - # print(message) - # news.append(message) - # print(len(news)) - # return news + title = feed["feed"]["title"] + link = source if not entry.link else entry.link + exist = await check_entry(conn, title, link) + if not exist: + if entry.has_key("summary"): + summary = entry.summary + # Remove HTML tags + summary = BeautifulSoup(summary, "lxml").text + # TODO Limit text length + summary = summary.replace("\n\n", "\n")[:300] + " ‍⃨" + else: + summary = '*** No summary ***' + #print('~~~~~~summary not in entry') + entry = (title, summary, link, source, 0); + await add_entry(conn, entry) + await set_date(conn, source) + #make_message + # message = title + '\n\n' + summary + '\n\nLink: ' + link + # print(message) + # news.append(message) + # print(len(news)) + # return news async def download_feed(conn, url): async with aiohttp.ClientSession() as session: @@ -465,13 +470,15 @@ async def check_feed(conn, url): cur.execute(sql, (url,)) return cur.fetchone() -async def add_feed(conn, url): +async def add_feed(conn, url, lock): """ Add a new feed into the feeds table :param conn: :param feed: :return: string """ + #TODO consider async with lock + await lock.acquire() #conn = create_connection(db_file) cur = conn.cursor() print(time.strftime("%H:%M:%S"), "conn.cursor() from add_feed(conn, url)") @@ -486,6 +493,7 @@ async def add_feed(conn, url): VALUES(?,?,?,?) """ cur.execute(sql, feed) conn.commit() + lock.release() bozo = ("WARNING: Bozo detected. Failed to load URL.") print(bozo) return "Failed to parse URL as feed" @@ -496,12 +504,14 @@ async def add_feed(conn, url): VALUES(?,?,?,?,?) """ cur.execute(sql, feed) conn.commit() + lock.release() else: feed = (url, 1, res[1], 0) sql = """INSERT INTO feeds(address,enabled,status,valid) VALUES(?,?,?,?) """ cur.execute(sql, feed) conn.commit() + lock.release() return "Failed to get URL. HTTP Error {}".format(res[1]) print(time.strftime("%H:%M:%S"), "conn.commit() from add_feed(conn, url)") # source = title if not '' else url @@ -512,7 +522,7 @@ async def add_feed(conn, url): msg = "News source is already listed in the subscription list" return msg -async def remove_feed(conn, id): +async def remove_feed(conn, id, lock): """ Delete a feed by feed id :param conn: @@ -521,6 +531,7 @@ async def remove_feed(conn, id): """ # You have chose to remove feed (title, url) from your feed list. # Enter "delete" to confirm removal. + await lock.acquire() #conn = create_connection(db_file) cur = conn.cursor() print(time.strftime("%H:%M:%S"), "conn.cursor() from remove_feed(conn, id)") @@ -534,6 +545,7 @@ async def remove_feed(conn, id): sql = "DELETE FROM feeds WHERE id = ?" cur.execute(sql, (id,)) conn.commit() + lock.release() print(time.strftime("%H:%M:%S"), "conn.commit() from remove_feed(conn, id)") return """News source <{}> has been removed from subscriptions list """.format(url) @@ -604,13 +616,14 @@ async def feed_refresh(conn, id): conn.commit() # TODO mark_all_read for entries of feed -async def toggle_status(conn, id): +async def toggle_status(conn, id, lock): """ Set status of feed :param conn: :param id: id of the feed :return: string """ + await lock.acquire() #conn = create_connection(db_file) cur = conn.cursor() print(time.strftime("%H:%M:%S"), "conn.cursor() from toggle_status(conn, id)") @@ -633,6 +646,7 @@ async def toggle_status(conn, id): sql = "UPDATE feeds SET enabled = :status WHERE id = :id" cur.execute(sql, {"status": status, "id": id}) conn.commit() + lock.release() print(time.strftime("%H:%M:%S"), "conn.commit() from toggle_status(conn, id)") return notice @@ -693,17 +707,19 @@ async def get_subscriptions(conn): result = cur.execute(sql) return result -async def list_subscriptions(conn): +async def list_subscriptions(conn, lock): """ Query feeds :param conn: :return: rows (string) """ + await lock.acquire() cur = conn.cursor() print(time.strftime("%H:%M:%S"), "conn.cursor() from list_subscriptions(conn)") #sql = "SELECT id, address FROM feeds" sql = "SELECT name, address, updated, id, enabled FROM feeds" results = cur.execute(sql) + lock.release() feeds_list = "List of subscriptions: \n" counter = 0 for result in results: @@ -722,7 +738,7 @@ async def list_subscriptions(conn): "feed add https://reclaimthenet.org/feed/") return msg -async def last_entries(conn, num): +async def last_entries(conn, num, lock): """ Query feeds :param conn: @@ -734,9 +750,11 @@ async def last_entries(conn, num): num = 50 elif num < 1: num = 1 + await lock.acquire() cur = conn.cursor() sql = "SELECT title, link FROM entries ORDER BY ROWID DESC LIMIT {}".format(num) results = cur.execute(sql) + lock.release() titles_list = "Recent {} titles: \n".format(num) for result in results: # titles_list += """\nTitle: {} \nLink: {} @@ -744,7 +762,7 @@ async def last_entries(conn, num): """.format(str(result[0]), str(result[1])) return titles_list -async def search_entries(conn, query): +async def search_entries(conn, query, lock): """ Query feeds :param conn: @@ -753,10 +771,12 @@ async def search_entries(conn, query): """ if len(query) < 2: return "Please enter at least 2 characters to search" + await lock.acquire() cur = conn.cursor() sql = "SELECT title, link FROM entries WHERE title LIKE '%{}%' LIMIT 50".format(query) # sql = "SELECT title, link FROM entries WHERE title OR link LIKE '%{}%'".format(query) results = cur.execute(sql) + lock.release() results_list = "Search results for '{}': \n".format(query) counter = 0 for result in results: