From 8511239e2cdbe304cac3bdf4a0b3b2cc51c27425 Mon Sep 17 00:00:00 2001 From: Schimon Jehudah Date: Thu, 2 Nov 2023 05:14:01 +0000 Subject: [PATCH] Update 6 files - /slixfeed/database.py - /slixfeed/sqlitehandler.py - /slixfeed/datahandler.py - /slixfeed/confighandler.py - /slixfeed/xmpphandler.py - /slixfeed/__main__.py --- slixfeed/__main__.py | 640 +-------------------- slixfeed/confighandler.py | 156 +++++ slixfeed/datahandler.py | 300 ++++++++++ slixfeed/{database.py => sqlitehandler.py} | 313 ++++++---- slixfeed/xmpphandler.py | 450 +++++++++++++++ 5 files changed, 1128 insertions(+), 731 deletions(-) create mode 100644 slixfeed/confighandler.py create mode 100644 slixfeed/datahandler.py rename slixfeed/{database.py => sqlitehandler.py} (65%) create mode 100644 slixfeed/xmpphandler.py diff --git a/slixfeed/__main__.py b/slixfeed/__main__.py index 2c02d41..0fb8831 100644 --- a/slixfeed/__main__.py +++ b/slixfeed/__main__.py @@ -13,652 +13,26 @@ # jid = Jabber ID (XMPP) # res = response (HTTP) -import os from argparse import ArgumentParser -from asyncio.exceptions import IncompleteReadError -from datetime import date from getpass import getpass -from http.client import IncompleteRead -from urllib import error -import asyncio import logging -import sys -import time -import aiohttp -from bs4 import BeautifulSoup -from xml.etree.ElementTree import ElementTree, ParseError -from urllib.parse import urlparse -from lxml import html -import feedparser -import slixmpp +from datetime import date +import time # from eliot import start_action, to_file # # to_file(open("slixfeed.log", "w")) # # with start_action(action_type="set_date()", jid=jid): # # with start_action(action_type="message()", msg=msg): -import database - - -class Slixfeed(slixmpp.ClientXMPP): - """ - Slixmpp news bot that will send updates - from feeds it receives. - """ - def __init__(self, jid, password): - slixmpp.ClientXMPP.__init__(self, jid, password) - - # The session_start event will be triggered when - # the bot establishes its connection with the server - # and the XML streams are ready for use. We want to - # listen for this event so that we we can initialize - # our roster. - self.add_event_handler("session_start", self.start) - self.add_event_handler("session_start", self.send_update) - self.add_event_handler("session_start", self.send_status) - self.add_event_handler("session_start", self.check_updates) - - # The message event is triggered whenever a message - # stanza is received. Be aware that that includes - # MUC messages and error messages. - self.add_event_handler("message", self.message) - self.add_event_handler("disconnected", self.reconnect) - - async def start(self, event): - """ - Process the session_start event. - - Typical actions for the session_start event are - requesting the roster and broadcasting an initial - presence stanza. - - Arguments: - event -- An empty dictionary. The session_start - event does not provide any additional - data. - """ - self.send_presence() - await self.get_roster() - - async def message(self, msg): - """ - Process incoming message stanzas. Be aware that this also - includes MUC messages and error messages. It is usually - a good idea to check the messages's type before processing - or sending replies. - - Arguments: - msg -- The received message stanza. See the documentation - for stanza objects and the Message stanza to see - how it may be used. - """ - if msg['type'] in ('chat', 'normal'): - message = " ".join(msg['body'].split()) - if message.lower().startswith('help'): - action = print_help() - # NOTE: Might not need it - elif message.lower().startswith('recent '): - action = await initdb(msg['from'].bare, database.last_entries, message[7:]) - elif message.lower().startswith('search '): - action = await initdb( msg['from'].bare, database.search_entries, message[7:]) - elif message.lower().startswith('list'): - action = await initdb(msg['from'].bare, database.list_subscriptions) - elif message.lower().startswith('add '): - action = await initdb(msg['from'].bare, add_feed, message[4:]) - elif message.lower().startswith('remove '): - action = await initdb(msg['from'].bare, database.remove_feed, message[7:]) - elif message.lower().startswith('status '): - action = await initdb(msg['from'].bare, database.toggle_status, message[7:]) - elif message.lower().startswith('unread'): - action = await initdb(msg['from'].bare, database.statistics) - elif message.lower().startswith('enable'): - action = toggle_state(msg['from'].bare, True) - elif message.lower().startswith('disable'): - action = toggle_state(msg['from'].bare, False) - else: - action = 'Unknown command. Press "help" for list of commands' - msg.reply(action).send() - - print("COMMAND:", message) - print("ACCOUNT: " + str(msg['from'])) - - async def check_updates(self, event): - while True: - print("Checking update") - db_dir = get_default_dbdir() - if not os.path.isdir(db_dir): - msg = ("Slixfeed can not work without a database. \n" - "To create a database, follow these steps: \n" - "Add Slixfeed contact to your roster \n" - "Send a feed to the bot by: \n" - "feed add https://reclaimthenet.org/feed/") - print(msg) - else: - files = os.listdir(db_dir) - for file in files: - jid = file[:-3] - print("download_updates",jid) - await initdb(jid, download_updates) - await asyncio.sleep(90) - - async def send_update(self, event): - while True: - db_dir = get_default_dbdir() - if not os.path.isdir(db_dir): - msg = ("Slixfeed can not work without a database. \n" - "To create a database, follow these steps: \n" - "Add Slixfeed contact to your roster \n" - "Send a feed to the bot by: \n" - "feed add https://reclaimthenet.org/feed/") - print(msg) - else: - os.chdir(db_dir) - files = os.listdir() - for file in files: - if not file.endswith('.db-jour.db'): - jid = file[:-3] - print("get_entry_unread",jid) - - new = await initdb( - jid, - database.get_entry_unread - ) - - if new: - msg = self.send_message( - mto=jid, - mbody=new, - mtype='chat' - ) - - unread = await initdb( - jid, - database.get_number_of_entries_unread - ) - - if unread: - msg_status = ('📰 News items:', str(unread)) - msg_status = ' '.join(msg_status) - else: - msg_status = '🗞 No News' - - print(msg_status, 'for', jid) - - # Send status message - self.send_presence( - pstatus=msg_status, - pto=jid, - #pfrom=None - ) - - # await asyncio.sleep(15) - await asyncio.sleep(60 * 3) - - async def send_status(self, event): - while True: - db_dir = get_default_dbdir() - if not os.path.isdir(db_dir): - msg = ("Slixfeed can not work without a database. \n" - "To create a database, follow these steps: \n" - "Add Slixfeed contact to your roster \n" - "Send a feed to the bot by: \n" - "feed add https://reclaimthenet.org/feed/") - print(msg) - else: - files = os.listdir(db_dir) - for file in files: - jid = file[:-3] - - await asyncio.sleep(60) - - -def print_help(): - """ - Print help manual. - """ - msg = ("Slixfeed - News syndication bot for Jabber/XMPP \n" - "\n" - "DESCRIPTION: \n" - " Slixfeed is a news aggregator bot for online news feeds. \n" - " Supported filetypes: Atom, RDF and RSS. \n" - "\n" - "BASIC USAGE: \n" - " enable \n" - " Send updates. \n" - " disable \n" - " Stop sending updates. \n" - " batch N \n" - " Send N updates on ech interval. \n" - " interval N \n" - " Send an update each N minutes. \n" - " feed list \n" - " List subscriptions. \n" - "\n" - "EDIT OPTIONS: \n" - " add URL \n" - " Add URL to subscription list. \n" - " remove ID \n" - " Remove feed from subscription list. \n" - " status ID \n" - " Toggle update status of feed. \n" - "\n" - "SEARCH OPTIONS: \n" - " search TEXT \n" - " Search news items by given keywords. \n" - " recent N \n" - " List recent N news items (up to 50 items). \n" - "\n" - "STATISTICS OPTIONS: \n" - " analyses \n" - " Show report and statistics of feeds. \n" - " obsolete \n" - " List feeds that are not available. \n" - " unread \n" - " Print number of unread news items. \n" - "\n" - "BACKUP OPTIONS: \n" - " export opml \n" - " Send an OPML file with your feeds. \n" - " backup news html\n" - " Send an HTML formatted file of your news items. \n" - " backup news md \n" - " Send a Markdown file of your news items. \n" - " backup news text \n" - " Send a Plain Text file of your news items. \n" - "\n" - "DOCUMENTATION: \n" - " Slixfeed \n" - " https://gitgud.io/sjehuda/slixfeed \n" - " Slixmpp \n" - " https://slixmpp.readthedocs.io/ \n" - " feedparser \n" - " https://pythonhosted.org/feedparser") - return msg - - -# Function from jarun/buku -# Arun Prakash Jana (jarun) -# Dmitry Marakasov (AMDmi3) -def get_default_dbdir(): - """Determine the directory path where dbfile will be stored. - - If $XDG_DATA_HOME is defined, use it - else if $HOME exists, use it - else if the platform is Windows, use %APPDATA% - else use the current directory. - - :return: Path to database file. - - Note - ---- - This code was taken from the buku project. - """ -# data_home = xdg.BaseDirectory.xdg_data_home - data_home = os.environ.get('XDG_DATA_HOME') - if data_home is None: - if os.environ.get('HOME') is None: - if sys.platform == 'win32': - data_home = os.environ.get('APPDATA') - if data_home is None: - return os.path.abspath('.') - else: - return os.path.abspath('.') - else: - data_home = os.path.join(os.environ.get('HOME'), '.local', 'share') - return os.path.join(data_home, 'slixfeed') - - -# TODO Perhaps this needs to be executed -# just once per program execution -async def initdb(jid, callback, message=None): - """ - Callback function to instantiate action on database. - - :param jid: JID (Jabber ID). - :param callback: Function name. - :param massage: Optional kwarg when a message is a part or required argument. - """ - db_dir = get_default_dbdir() - if not os.path.isdir(db_dir): - os.mkdir(db_dir) - db_file = os.path.join(db_dir, r"{}.db".format(jid)) - database.create_tables(db_file) - - if message: - return await callback(db_file, message) - else: - return await callback(db_file) - -# NOTE I don't think there should be "return" -# because then we might stop scanning next URLs -async def download_updates(db_file): - """ - Chack feeds for new entries. - - :param db_file: Database filename. - """ - urls = await database.get_subscriptions(db_file) - - for url in urls: - # print("for url in urls") - source = url[0] - # print("source: ", source) - res = await download_feed(source) - # TypeError: 'NoneType' object is not subscriptable - if res is None: - # Skip to next feed - # urls.next() - # next(urls) - continue - - await database.update_source_status(db_file, res[1], source) - - if res[0]: - try: - feed = feedparser.parse(res[0]) - if feed.bozo: - bozo = ("WARNING: Bozo detected for feed <{}>. " - "For more information, visit " - "https://pythonhosted.org/feedparser/bozo.html" - .format(source)) - print(bozo) - valid = 0 - else: - valid = 1 - - await database.update_source_validity(db_file, source, valid) - except (IncompleteReadError, IncompleteRead, error.URLError) as e: - print(e) - # return - # TODO Place these couple of lines back down - # NOTE Need to correct the SQL statement to do so - # NOT SURE WHETHER I MEANT THE LINES ABOVE OR BELOW - - if res[1] == 200: - # NOT SURE WHETHER I MEANT THE LINES ABOVE OR BELOW - # TODO Place these couple of lines back down - # NOTE Need to correct the SQL statement to do so - entries = feed.entries - # length = len(entries) - # await database.remove_entry(db_file, source, length) - await database.remove_nonexistent_entries(db_file, feed, source) - - new_entry = 0 - for entry in entries: - - if entry.has_key("title"): - title = entry.title - else: - title = feed["feed"]["title"] - - if entry.has_key("link"): - link = entry.link - else: - link = source - # print('source:', source) - - exist = await database.check_entry_exist(db_file, title, link) - - if not exist: - new_entry = new_entry + 1 - # TODO Enhance summary - if entry.has_key("summary"): - summary = entry.summary - # Remove HTML tags - summary = BeautifulSoup(summary, "lxml").text - # TODO Limit text length - summary = summary.replace("\n\n", "\n")[:300] + " ‍⃨" - else: - summary = '*** No summary ***' - entry = (title, summary, link, source, 0); - await database.add_entry_and_set_date(db_file, source, entry) - # print("### added", new_entry, "entries") - - -async def download_feed(url): - """ - Download content of given URL. - - :param url: URL. - :return: Document or error message. - """ - # print("download_feed") - # time.sleep(1) - timeout = aiohttp.ClientTimeout(total=10) - async with aiohttp.ClientSession() as session: - # async with aiohttp.ClientSession(trust_env=True) as session: - try: - async with session.get(url, timeout=timeout) as response: - status = response.status - if response.status == 200: - try: - doc = await response.text() - # print (response.content_type) - return [doc, status] - except: - return [False, "The content of this document doesn't appear to be textual"] - else: - return [False, "HTTP Error: " + str(status)] - except aiohttp.ClientError as e: - print('Error', str(e)) - return [False, "Error: " + str(e)] - except asyncio.TimeoutError as e: - print('Timeout', str(e)) - return [False, "Timeout"] - - -async def add_feed(db_file, url): - """ - Check whether feed exist, otherwise process it. - - :param db_file: Database filename. - :param url: URL. - :return: Status message. - """ - exist = await database.check_feed_exist(db_file, url) - - if not exist: - res = await download_feed(url) - if res[0]: - feed = feedparser.parse(res[0]) - if feed.bozo: - bozo = ("WARNING: Bozo detected. Failed to load <{}>.".format(url)) - print(bozo) - try: - # tree = etree.fromstring(res[0]) # etree is for xml - tree = html.fromstring(res[0]) - except: - return "Failed to parse URL <{}> as feed".format(url) - - print("RSS Auto-Discovery Engaged") - xpath_query = """//link[(@rel="alternate") and (@type="application/atom+xml" or @type="application/rdf+xml" or @type="application/rss+xml")]""" - # xpath_query = """//link[(@rel="alternate") and (@type="application/atom+xml" or @type="application/rdf+xml" or @type="application/rss+xml")]/@href""" - # xpath_query = "//link[@rel='alternate' and @type='application/atom+xml' or @rel='alternate' and @type='application/rss+xml' or @rel='alternate' and @type='application/rdf+xml']/@href" - feeds = tree.xpath(xpath_query) - if len(feeds) > 1: - msg = "RSS Auto-Discovery has found {} feeds:\n\n".format(len(feeds)) - for feed in feeds: - # # The following code works; - # # The following code will catch - # # only valid resources (i.e. not 404); - # # The following code requires more bandwidth. - # res = await download_feed(feed) - # if res[0]: - # disco = feedparser.parse(res[0]) - # title = disco["feed"]["title"] - # msg += "{} \n {} \n\n".format(title, feed) - feed_name = feed.xpath('@title')[0] - feed_addr = feed.xpath('@href')[0] - msg += "{}\n{}\n\n".format(feed_name, feed_addr) - msg += "The above feeds were extracted from\n{}".format(url) - return msg - elif feeds: - url = feeds[0].xpath('@href')[0] - # Why wouldn't add_feed return a message - # upon success unless return is explicitly - # mentioned, yet upon failure it wouldn't? - return await add_feed(db_file, url) - - # Search for feeds by file extension and path - paths = [ - "/app.php/feed", # phpbb - "/atom", - "/atom.php", - "/atom.xml", - "/content-feeds/", - "/external.php?type=RSS2", - "/feed", # good practice - "/feed.atom", - # "/feed.json", - "/feed.php", - "/feed.rdf", - "/feed.rss", - "/feed.xml", - "/feed/atom/", - "/feeds/news_feed", - "/feeds/rss/news.xml.php", - "/forum_rss.php", - "/index.php/feed", - "/index.php?type=atom;action=.xml", #smf - "/index.php?type=rss;action=.xml", #smf - "/index.rss", - "/latest.rss", - "/news", - "/news.xml", - "/news.xml.php", - "/news/feed", - "/posts.rss", # discourse - "/rdf", - "/rdf.php", - "/rdf.xml", - "/rss", - # "/rss.json", - "/rss.php", - "/rss.xml", - "/timeline.rss", - "/xml/feed.rss", - # "?format=atom", - # "?format=rdf", - # "?format=rss", - # "?format=xml" - ] - - print("RSS Scan Mode Engaged") - feeds = {} - for path in paths: - # xpath_query = "//*[@*[contains(.,'{}')]]".format(path) - xpath_query = "//a[contains(@href,'{}')]".format(path) - addresses = tree.xpath(xpath_query) - parted_url = urlparse(url) - for address in addresses: - address = address.xpath('@href')[0] - if address.startswith('/'): - address = parted_url.scheme + '://' + parted_url.netloc + address - res = await download_feed(address) - if res[1] == 200: - try: - feeds[address] = feedparser.parse(res[0])["feed"]["title"] - except: - continue - if len(feeds) > 1: - msg = "RSS URL scan has found {} feeds:\n\n".format(len(feeds)) - for feed in feeds: - # try: - # res = await download_feed(feed) - # except: - # continue - feed_name = feeds[feed] - feed_addr = feed - msg += "{}\n{}\n\n".format(feed_name, feed_addr) - msg += "The above feeds were extracted from\n{}".format(url) - return msg - elif feeds: - url = list(feeds)[0] - return await add_feed(db_file, url) - - # (HTTP) Request(s) Paths - print("RSS Arbitrary Mode Engaged") - feeds = {} - parted_url = urlparse(url) - for path in paths: - address = parted_url.scheme + '://' + parted_url.netloc + path - res = await download_feed(address) - if res[1] == 200: - # print(feedparser.parse(res[0])["feed"]["title"]) - # feeds[address] = feedparser.parse(res[0])["feed"]["title"] - try: - title = feedparser.parse(res[0])["feed"]["title"] - except: - title = '*** No Title ***' - feeds[address] = title - - # Check whether URL has path (i.e. not root) - if parted_url.path.split('/')[1]: - paths.extend([".atom", ".feed", ".rdf", ".rss"]) if '.rss' not in paths else -1 - # if paths.index('.rss'): - # paths.extend([".atom", ".feed", ".rdf", ".rss"]) - address = parted_url.scheme + '://' + parted_url.netloc + '/' + parted_url.path.split('/')[1] + path - res = await download_feed(address) - if res[1] == 200: - print('ATTENTION') - print(address) - try: - title = feedparser.parse(res[0])["feed"]["title"] - except: - title = '*** No Title ***' - feeds[address] = title - if len(feeds) > 1: - msg = "RSS URL discovery has found {} feeds:\n\n".format(len(feeds)) - for feed in feeds: - feed_name = feeds[feed] - feed_addr = feed - msg += "{}\n{}\n\n".format(feed_name, feed_addr) - msg += "The above feeds were extracted from\n{}".format(url) - return msg - elif feeds: - url = list(feeds)[0] - return await add_feed(db_file, url) - else: - return "No news feeds were found for URL <{}>.".format(url) - else: - return await database.add_feed(db_file, feed, url, res) - else: - return "Failed to get URL <{}>. Reason: {}".format(url, res[1]) - else: - ix = exist[0] - return "News source <{}> is already listed in the subscription list at index {}".format(url, ix) - - -def toggle_state(jid, state): - """ - Set status of update. - - :param jid: JID (Jabber ID). - :param state: True or False. - :return: Status message. - """ - db_dir = get_default_dbdir() - db_file = os.path.join(db_dir, r"{}.db".format(jid)) - bk_file = os.path.join(db_dir, r"{}.db.bak".format(jid)) - - if state: - if os.path.exists(db_file): - return "Updates are already enabled" - elif os.path.exists(bk_file): - os.renames(bk_file, db_file) - return "Updates are now enabled" - else: - if os.path.exists(bk_file): - return "Updates are already disabled" - elif os.path.exists(db_file): - os.renames(db_file, bk_file) - return "Updates are now disabled" +#import irchandler +import xmpphandler +#import matrixhandler if __name__ == '__main__': # Setup the command line arguments. - parser = ArgumentParser(description=Slixfeed.__doc__) + parser = ArgumentParser(description=xmpphandler.Slixfeed.__doc__) # Output verbosity options. parser.add_argument( @@ -692,7 +66,7 @@ if __name__ == '__main__': # Setup the Slixfeed and register plugins. Note that while plugins may # have interdependencies, the order in which you register them does # not matter. - xmpp = Slixfeed(args.jid, args.password) + xmpp = xmpphandler.Slixfeed(args.jid, args.password) xmpp.register_plugin('xep_0004') # Data Forms xmpp.register_plugin('xep_0030') # Service Discovery xmpp.register_plugin('xep_0045') # Multi-User Chat diff --git a/slixfeed/confighandler.py b/slixfeed/confighandler.py new file mode 100644 index 0000000..ba2fbaf --- /dev/null +++ b/slixfeed/confighandler.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +import sys + +def get_default_dbdir(): + """ + Determine the directory path where dbfile will be stored. + + If $XDG_DATA_HOME is defined, use it + else if $HOME exists, use it + else if the platform is Windows, use %APPDATA% + else use the current directory. + + :return: Path to database file. + + Note + ---- + This code was taken from the buku project. + + * Arun Prakash Jana (jarun) + * Dmitry Marakasov (AMDmi3) + """ +# data_home = xdg.BaseDirectory.xdg_data_home + data_home = os.environ.get('XDG_DATA_HOME') + if data_home is None: + if os.environ.get('HOME') is None: + if sys.platform == 'win32': + data_home = os.environ.get('APPDATA') + if data_home is None: + return os.path.abspath('.') + else: + return os.path.abspath('.') + else: + data_home = os.path.join(os.environ.get('HOME'), '.local', 'share') + return os.path.join(data_home, 'slixfeed') + + +def get_default_confdir(): + """ + Determine the directory path where configuration will be stored. + + If $XDG_CONFIG_HOME is defined, use it + else if $HOME exists, use it + else if the platform is Windows, use %APPDATA% + else use the current directory. + + :return: Path to configueation directory. + """ +# config_home = xdg.BaseDirectory.xdg_config_home + config_home = os.environ.get('XDG_CONFIG_HOME') + if config_home is None: + if os.environ.get('HOME') is None: + if sys.platform == 'win32': + config_home = os.environ.get('APPDATA') + if config_home is None: + return os.path.abspath('.') + else: + return os.path.abspath('.') + else: + config_home = os.path.join(os.environ.get('HOME'), '.config') + return os.path.join(config_home, 'slixfeed') + + +async def get_value_default(key): + """ + Get settings default value. + + :param key: "enabled", "interval", "quantum". + :return: Integer. + """ + if key == "enabled": + result = 1 + elif key == "quantum": + result = 4 + elif key == "interval": + result = 30 + return result + + +# async def generate_dictionary(): +def get_default_list(): + """ + Generate a dictionary file. + + :return: List. + """ + paths = [ + ".atom", + ".rss", + ".xml", + "/?feed=atom", + "/?feed=rdf", + "/?feed=rss", + "/?feed=xml", # wordpress + "/?format=atom", + "/?format=rdf", + "/?format=rss", + "/?format=xml", # phpbb + "/app.php/feed", + "/atom", + "/atom.php", + "/atom.xml", + "/blog/feed/", + "/content-feeds/", + "/external.php?type=RSS2", + "/en/feed/", + "/feed", # good practice + "/feed.atom", + # "/feed.json", + "/feed.php", + "/feed.rdf", + "/feed.rss", + "/feed.xml", + "/feed/atom/", + "/feeds/news_feed", + "/feeds/posts/default", + "/feeds/posts/default?alt=atom", + "/feeds/posts/default?alt=rss", + "/feeds/rss/news.xml.php", + "/forum_rss.php", + "/index.atom", + "/index.php/feed", + "/index.php?type=atom;action=.xml", #smf + "/index.php?type=rss;action=.xml", #smf + "/index.rss", + "/jekyll/feed.xml", + "/latest.rss", + "/news", + "/news.xml", + "/news.xml.php", + "/news/feed", + "/posts.rss", # discourse + "/rdf", + "/rdf.php", + "/rdf.xml", + "/rss", + # "/rss.json", + "/rss.php", + "/rss.xml", + "/timeline.rss", + "/videos.atom", + # "/videos.json", + "/videos.xml", + "/xml/feed.rss" + ] + return paths + # cfg_dir = get_default_confdir() + # if not os.path.isdir(cfg_dir): + # os.mkdir(cfg_dir) + # cfg_file = os.path.join(cfg_dir, r"url_paths.txt") + # if not os.path.isfile(cfg_file): + # file = open(cfg_file, "w") + # file.writelines("\n".join(paths)) + # file.close() diff --git a/slixfeed/datahandler.py b/slixfeed/datahandler.py new file mode 100644 index 0000000..f4cf039 --- /dev/null +++ b/slixfeed/datahandler.py @@ -0,0 +1,300 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import feedparser +import aiohttp +import asyncio +import os +import sqlitehandler +import confighandler + +from http.client import IncompleteRead +from asyncio.exceptions import IncompleteReadError +from urllib import error +from bs4 import BeautifulSoup +# from xml.etree.ElementTree import ElementTree, ParseError +from urllib.parse import urlparse +from lxml import html + +async def download_updates(db_file): + """ + Check feeds for new entries. + + :param db_file: Database filename. + """ + urls = await sqlitehandler.get_subscriptions(db_file) + + for url in urls: + # print(os.path.basename(db_file), url[0]) + source = url[0] + res = await download_feed(source) + # TypeError: 'NoneType' object is not subscriptable + if res is None: + # Skip to next feed + # urls.next() + # next(urls) + continue + + await sqlitehandler.update_source_status(db_file, res[1], source) + + if res[0]: + try: + feed = feedparser.parse(res[0]) + if feed.bozo: + # bozo = ("WARNING: Bozo detected for feed <{}>. " + # "For more information, visit " + # "https://pythonhosted.org/feedparser/bozo.html" + # .format(source)) + # print(bozo) + valid = 0 + else: + valid = 1 + await sqlitehandler.update_source_validity(db_file, source, valid) + except (IncompleteReadError, IncompleteRead, error.URLError) as e: + print(e) + # NOTE I don't think there should be "return" + # because then we might stop scanning next URLs + # return + # TODO Place these couple of lines back down + # NOTE Need to correct the SQL statement to do so + # NOT SURE WHETHER I MEANT THE LINES ABOVE OR BELOW + + if res[1] == 200: + # NOT SURE WHETHER I MEANT THE LINES ABOVE OR BELOW + # TODO Place these couple of lines back down + # NOTE Need to correct the SQL statement to do so + entries = feed.entries + # length = len(entries) + # await sqlitehandler.remove_entry(db_file, source, length) + await sqlitehandler.remove_nonexistent_entries(db_file, feed, source) + + new_entry = 0 + for entry in entries: + + if entry.has_key("title"): + title = entry.title + else: + title = feed["feed"]["title"] + + if entry.has_key("link"): + link = entry.link + else: + link = source + + exist = await sqlitehandler.check_entry_exist(db_file, title, link) + + if not exist: + new_entry = new_entry + 1 + # TODO Enhance summary + if entry.has_key("summary"): + summary = entry.summary + # Remove HTML tags + summary = BeautifulSoup(summary, "lxml").text + # TODO Limit text length + summary = summary.replace("\n\n", "\n")[:300] + " ‍⃨" + else: + summary = '*** No summary ***' + entry = (title, summary, link, source, 0); + await sqlitehandler.add_entry_and_set_date(db_file, source, entry) + + +async def add_feed(db_file, url): + """ + Check whether feed exist, otherwise process it. + + :param db_file: Database filename. + :param url: URL. + :return: Status message. + """ + exist = await sqlitehandler.check_feed_exist(db_file, url) + + if not exist: + res = await download_feed(url) + if res[0]: + feed = feedparser.parse(res[0]) + title = await get_title(url, feed) + if feed.bozo: + bozo = ("WARNING: Bozo detected. Failed to load <{}>.".format(url)) + print(bozo) + try: + # tree = etree.fromstring(res[0]) # etree is for xml + tree = html.fromstring(res[0]) + except: + return "Failed to parse URL <{}> as feed".format(url) + + print("RSS Auto-Discovery Engaged") + xpath_query = """//link[(@rel="alternate") and (@type="application/atom+xml" or @type="application/rdf+xml" or @type="application/rss+xml")]""" + # xpath_query = """//link[(@rel="alternate") and (@type="application/atom+xml" or @type="application/rdf+xml" or @type="application/rss+xml")]/@href""" + # xpath_query = "//link[@rel='alternate' and @type='application/atom+xml' or @rel='alternate' and @type='application/rss+xml' or @rel='alternate' and @type='application/rdf+xml']/@href" + feeds = tree.xpath(xpath_query) + if len(feeds) > 1: + msg = "RSS Auto-Discovery has found {} feeds:\n\n".format(len(feeds)) + for feed in feeds: + # # The following code works; + # # The following code will catch + # # only valid resources (i.e. not 404); + # # The following code requires more bandwidth. + # res = await download_feed(feed) + # if res[0]: + # disco = feedparser.parse(res[0]) + # title = disco["feed"]["title"] + # msg += "{} \n {} \n\n".format(title, feed) + feed_name = feed.xpath('@title')[0] + feed_addr = feed.xpath('@href')[0] + msg += "{}\n{}\n\n".format(feed_name, feed_addr) + msg += "The above feeds were extracted from\n{}".format(url) + return msg + elif feeds: + url = feeds[0].xpath('@href')[0] + # Why wouldn't add_feed return a message + # upon success unless return is explicitly + # mentioned, yet upon failure it wouldn't? + return await add_feed(db_file, url) + + print("RSS Scan Mode Engaged") + feeds = {} + paths = [] + # TODO Test + cfg_dir = confighandler.get_default_confdir() + if not os.path.isdir(cfg_dir): + os.mkdir(cfg_dir) + cfg_file = os.path.join(cfg_dir, r"url_paths.txt") + if not os.path.isfile(cfg_file): + # confighandler.generate_dictionary() + list = confighandler.get_default_list() + file = open(cfg_file, "w") + file.writelines("\n".join(list)) + file.close() + file = open(cfg_file, "r") + lines = file.readlines() + for line in lines: + paths.extend([line.strip()]) + for path in paths: + # xpath_query = "//*[@*[contains(.,'{}')]]".format(path) + xpath_query = "//a[contains(@href,'{}')]".format(path) + addresses = tree.xpath(xpath_query) + parted_url = urlparse(url) + # NOTE Should number of addresses be limited or + # perhaps be N from the start and N from the end + for address in addresses: + address = address.xpath('@href')[0] + if address.startswith('/'): + address = parted_url.scheme + '://' + parted_url.netloc + address + res = await download_feed(address) + if res[1] == 200: + try: + feeds[address] = feedparser.parse(res[0])["feed"]["title"] + except: + continue + if len(feeds) > 1: + msg = "RSS URL scan has found {} feeds:\n\n".format(len(feeds)) + for feed in feeds: + # try: + # res = await download_feed(feed) + # except: + # continue + feed_name = feeds[feed] + feed_addr = feed + msg += "{}\n{}\n\n".format(feed_name, feed_addr) + msg += "The above feeds were extracted from\n{}".format(url) + return msg + elif feeds: + url = list(feeds)[0] + return await add_feed(db_file, url) + + # (HTTP) Request(s) Paths + print("RSS Arbitrary Mode Engaged") + feeds = {} + parted_url = urlparse(url) + for path in paths: + address = parted_url.scheme + '://' + parted_url.netloc + path + res = await download_feed(address) + if res[1] == 200: + # print(feedparser.parse(res[0])["feed"]["title"]) + # feeds[address] = feedparser.parse(res[0])["feed"]["title"] + try: + title = feedparser.parse(res[0])["feed"]["title"] + except: + title = '*** No Title ***' + feeds[address] = title + + # Check whether URL has path (i.e. not root) + if parted_url.path.split('/')[1]: + paths.extend([".atom", ".feed", ".rdf", ".rss"]) if '.rss' not in paths else -1 + # if paths.index('.rss'): + # paths.extend([".atom", ".feed", ".rdf", ".rss"]) + address = parted_url.scheme + '://' + parted_url.netloc + '/' + parted_url.path.split('/')[1] + path + res = await download_feed(address) + if res[1] == 200: + try: + title = feedparser.parse(res[0])["feed"]["title"] + except: + title = '*** No Title ***' + feeds[address] = title + if len(feeds) > 1: + msg = "RSS URL discovery has found {} feeds:\n\n".format(len(feeds)) + for feed in feeds: + feed_name = feeds[feed] + feed_addr = feed + msg += "{}\n{}\n\n".format(feed_name, feed_addr) + msg += "The above feeds were extracted from\n{}".format(url) + elif feeds: + url = list(feeds)[0] + msg = await add_feed(db_file, url) + else: + msg = "No news feeds were found for URL <{}>.".format(url) + else: + msg = await sqlitehandler.add_feed(db_file, title, url, res) + else: + msg = "Failed to get URL <{}>. Reason: {}".format(url, res[1]) + else: + ix = exist[0] + name = exist[1] + msg = "> {}\nNews source \"{}\" is already listed in the subscription list at index {}".format(url, name, ix) + return msg + + +async def download_feed(url): + """ + Download content of given URL. + + :param url: URL. + :return: Document or error message. + """ + timeout = aiohttp.ClientTimeout(total=10) + async with aiohttp.ClientSession() as session: + # async with aiohttp.ClientSession(trust_env=True) as session: + try: + async with session.get(url, timeout=timeout) as response: + status = response.status + if response.status == 200: + try: + doc = await response.text() + # print (response.content_type) + return [doc, status] + except: + # return [False, "The content of this document doesn't appear to be textual."] + return [False, "Document is too large or is not textual."] + else: + return [False, "HTTP Error: " + str(status)] + except aiohttp.ClientError as e: + print('Error', str(e)) + return [False, "Error: " + str(e)] + except asyncio.TimeoutError as e: + # print('Timeout:', str(e)) + return [False, "Timeout: " + str(e)] + + +async def get_title(url, feed): + """ + Get title of feed. + + :param url: URL + :param feed: Parsed feed + :return: Title or URL hostname. + """ + try: + title = feed["feed"]["title"] + except: + title = urlparse(url).netloc + return title \ No newline at end of file diff --git a/slixfeed/database.py b/slixfeed/sqlitehandler.py similarity index 65% rename from slixfeed/database.py rename to slixfeed/sqlitehandler.py index 388afdd..311f72e 100644 --- a/slixfeed/database.py +++ b/slixfeed/sqlitehandler.py @@ -2,12 +2,13 @@ # -*- coding: utf-8 -*- import sqlite3 -from sqlite3 import Error - import asyncio +from sqlite3 import Error from datetime import date +import confighandler + # from eliot import start_action, to_file # # with start_action(action_type="list_subscriptions()", db=db_file): # # with start_action(action_type="last_entries()", num=num): @@ -41,7 +42,7 @@ def create_connection(db_file): def create_tables(db_file): """ Create SQLite tables. - + :param db_file: Database filename. """ with create_connection(db_file) as conn: @@ -71,18 +72,24 @@ def create_tables(db_file): # title text NOT NULL, # number integer # ); """ - - c = conn.cursor() - # c = get_cursor(db_file) - c.execute(feeds_table_sql) - c.execute(entries_table_sql) - # c.execute(statistics_table_sql) + settings_table_sql = """ + CREATE TABLE IF NOT EXISTS settings ( + id integer PRIMARY KEY, + key text NOT NULL, + value integer + ); """ + cur = conn.cursor() + # cur = get_cursor(db_file) + cur.execute(feeds_table_sql) + cur.execute(entries_table_sql) + # cur.execute(statistics_table_sql) + cur.execute(settings_table_sql) def get_cursor(db_file): """ Allocate a cursor to connection per database. - + :param db_file: Database filename. :return: Cursor. """ @@ -95,12 +102,12 @@ def get_cursor(db_file): return CURSORS[db_file] -async def add_feed(db_file, feed, url, res): +async def add_feed(db_file, title, url, res): """ Add a new feed into the feeds table. - + :param db_file: Database filename. - :param feed: Parsed XML document. + :param title: Feed title. :param url: URL. :param res: XML document. :return: Message. @@ -120,21 +127,22 @@ async def add_feed(db_file, feed, url, res): async with DBLOCK: with create_connection(db_file) as conn: cur = conn.cursor() - title = feed["feed"]["title"] + # title = feed["feed"]["title"] feed = (title, url, 1, res[1], 1) - sql = """INSERT INTO feeds(name,address,enabled,status,valid) - VALUES(?,?,?,?,?) """ + sql = """INSERT INTO feeds(name, address, enabled, status, valid) + VALUES(?, ?, ?, ?, ?) """ cur.execute(sql, feed) source = title if title else '<' + url + '>' - msg = 'News source "{}" has been added to subscription list'.format(source) + msg = """> {}\nNews source \"{}\" has been added to subscription list. + """.format(url, source) return msg async def remove_feed(db_file, ix): """ Delete a feed by feed id. - + :param db_file: Database filename. :param ix: Index of feed. :return: Message. @@ -144,39 +152,42 @@ async def remove_feed(db_file, ix): cur = conn.cursor() try: sql = "SELECT address FROM feeds WHERE id = ?" - url = cur.execute(sql, (ix,)) - for i in url: - url = i[0] + # cur + # for i in url: + # url = i[0] + url = cur.execute(sql, (ix,)).fetchone()[0] + sql = "SELECT name FROM feeds WHERE id = ?" + name = cur.execute(sql, (ix,)).fetchone()[0] # NOTE Should we move DBLOCK to this line? 2022-12-23 sql = "DELETE FROM entries WHERE source = ?" cur.execute(sql, (url,)) sql = "DELETE FROM feeds WHERE id = ?" cur.execute(sql, (ix,)) - return """News source <{}> has been removed from subscription list - """.format(url) + msg = "> {}\nNews source \"{}\" has been removed from subscription list.".format(url, name) except: - return """No news source with ID {}""".format(ix) + msg = "No news source with ID {}.".format(ix) + return msg async def check_feed_exist(db_file, url): """ Check whether a feed exists. Query for feeds by given url. - + :param db_file: Database filename. :param url: URL. - :return: SQL row or None. + :return: Index ID and Name or None. """ cur = get_cursor(db_file) - sql = "SELECT id FROM feeds WHERE address = ?" - cur.execute(sql, (url,)) - return cur.fetchone() + sql = "SELECT id, name FROM feeds WHERE address = ?" + result = cur.execute(sql, (url,)).fetchone() + return result async def get_number_of_items(db_file, str): """ Return number of entries or feeds. - + :param cur: Cursor object. :param str: "entries" or "feeds". :return: Number of rows. @@ -184,8 +195,22 @@ async def get_number_of_items(db_file, str): with create_connection(db_file) as conn: cur = conn.cursor() sql = "SELECT count(id) FROM {}".format(str) - count = cur.execute(sql) - count = cur.fetchone()[0] + count = cur.execute(sql).fetchone()[0] + return count + + +async def get_number_of_feeds_active(db_file): + """ + Return number of active feeds. + + :param db_file: Database filename. + :param cur: Cursor object. + :return: Number of rows. + """ + with create_connection(db_file) as conn: + cur = conn.cursor() + sql = "SELECT count(id) FROM feeds WHERE enabled = 1" + count = cur.execute(sql).fetchone()[0] return count @@ -200,8 +225,7 @@ async def get_number_of_entries_unread(db_file): with create_connection(db_file) as conn: cur = conn.cursor() sql = "SELECT count(id) FROM entries WHERE read = 0" - count = cur.execute(sql) - count = cur.fetchone()[0] + count = cur.execute(sql).fetchone()[0] return count @@ -214,26 +238,18 @@ async def get_entry_unread(db_file): """ with create_connection(db_file) as conn: cur = conn.cursor() - entry = [] sql = "SELECT id FROM entries WHERE read = 0" ix = cur.execute(sql).fetchone() if ix is None: return False ix = ix[0] sql = "SELECT title FROM entries WHERE id = :id" - cur.execute(sql, (ix,)) - title = cur.fetchone()[0] - entry.append(title) + title = cur.execute(sql, (ix,)).fetchone()[0] sql = "SELECT summary FROM entries WHERE id = :id" - cur.execute(sql, (ix,)) - summary = cur.fetchone()[0] - entry.append(summary) + summary = cur.execute(sql, (ix,)).fetchone()[0] sql = "SELECT link FROM entries WHERE id = :id" - cur.execute(sql, (ix,)) - link = cur.fetchone()[0] - entry.append(link) - entry = "{}\n\n{}\n\n{}".format(entry[0], entry[1], entry[2]) - # print(entry) + link = cur.execute(sql, (ix,)).fetchone()[0] + entry = "{}\n\n{}\n\n{}".format(title, summary, link) async with DBLOCK: await mark_as_read(cur, ix) # async with DBLOCK: @@ -255,16 +271,28 @@ async def mark_as_read(cur, ix): async def statistics(db_file): """ Return table statistics. - + :param db_file: Database filename. :return: News item as message. """ feeds = await get_number_of_items(db_file, 'feeds') + active_feeds = await get_number_of_feeds_active(db_file) entries = await get_number_of_items(db_file, 'entries') unread_entries = await get_number_of_entries_unread(db_file) - return "You have {} unread news items out of {} from {} news sources.".format(unread_entries, entries, feeds) + # msg = """You have {} unread news items out of {} from {} news sources. + # """.format(unread_entries, entries, feeds) + with create_connection(db_file) as conn: + cur = conn.cursor() + sql = "SELECT value FROM settings WHERE key = \"enabled\"" + status = cur.execute(sql).fetchone()[0] + sql = "SELECT value FROM settings WHERE key = \"interval\"" + interval = cur.execute(sql).fetchone()[0] + msg = """News items: {} ({})\nNews sources: {} ({})\nUpdate interval: {}\nOperation status: {} + """.format(unread_entries, entries, active_feeds, feeds, interval, status) + return msg +#TODO statistics async def update_statistics(cur): """ Update table statistics. @@ -283,8 +311,7 @@ async def update_statistics(cur): cur.execute(sql, {"title": i, "num": stat_dict[i]}) else: sql = "SELECT count(id) FROM statistics" - count = cur.execute(sql) - count = cur.fetchone()[0] + count = cur.execute(sql).fetchone()[0] ix = count + 1 sql = "INSERT INTO statistics VALUES(?,?,?)" cur.execute(sql, (ix, i, stat_dict[i])) @@ -302,32 +329,34 @@ async def toggle_status(db_file, ix): async with DBLOCK: with create_connection(db_file) as conn: cur = conn.cursor() - #cur = get_cursor(db_file) - sql = "SELECT name FROM feeds WHERE id = :id" - cur.execute(sql, (ix,)) - title = cur.fetchone()[0] - sql = "SELECT enabled FROM feeds WHERE id = ?" - # NOTE [0][1][2] - cur.execute(sql, (ix,)) - status = cur.fetchone()[0] - # FIXME always set to 1 - # NOTE Maybe because is not integer - # TODO Reset feed table before further testing - if status == 1: - status = 0 - state = "disabled" - else: - status = 1 - state = "enabled" - sql = "UPDATE feeds SET enabled = :status WHERE id = :id" - cur.execute(sql, {"status": status, "id": ix}) - return "Updates for '{}' are now {}".format(title, state) + try: + #cur = get_cursor(db_file) + sql = "SELECT name FROM feeds WHERE id = :id" + title = cur.execute(sql, (ix,)).fetchone()[0] + sql = "SELECT enabled FROM feeds WHERE id = ?" + # NOTE [0][1][2] + status = cur.execute(sql, (ix,)).fetchone()[0] + # FIXME always set to 1 + # NOTE Maybe because is not integer + # TODO Reset feed table before further testing + if status == 1: + status = 0 + state = "disabled" + else: + status = 1 + state = "enabled" + sql = "UPDATE feeds SET enabled = :status WHERE id = :id" + cur.execute(sql, {"status": status, "id": ix}) + msg = "Updates for '{}' are now {}.".format(title, state) + except: + msg = "No news source with ID {}.".format(ix) + return msg async def set_date(cur, url): """ Set last update date of feed. - + :param cur: Cursor object. :param url: URL. """ @@ -373,12 +402,12 @@ async def update_source_validity(db_file, source, valid): async def add_entry(cur, entry): """ Add a new entry into the entries table. - + :param cur: Cursor object. :param entry: """ - sql = """ INSERT INTO entries(title,summary,link,source,read) - VALUES(?,?,?,?,?) """ + sql = """ INSERT INTO entries(title, summary, link, source, read) + VALUES(?, ?, ?, ?, ?) """ cur.execute(sql, entry) @@ -403,8 +432,7 @@ async def remove_entry(db_file, source, length): with create_connection(db_file) as conn: cur = conn.cursor() sql = "SELECT count(id) FROM entries WHERE source = ?" - count = cur.execute(sql, (source,)) - count = cur.fetchone()[0] + count = cur.execute(sql, (source,)).fetchone()[0] limit = count - length if limit: limit = limit; @@ -421,7 +449,7 @@ async def remove_nonexistent_entries(db_file, feed, source): Remove entries that don't exist in a given parsed feed. Check the entries returned from feed and delete non existing entries - + :param db_file: Database filename. :param feed: URL of parsed feed. :param source: URL of associated feed. @@ -430,18 +458,10 @@ async def remove_nonexistent_entries(db_file, feed, source): with create_connection(db_file) as conn: cur = conn.cursor() sql = "SELECT id, title, link FROM entries WHERE source = ?" - cur.execute(sql, (source,)) - entries_db = cur.fetchall() - # print('entries_db') - # print(entries_db) + entries_db = cur.execute(sql, (source,)).fetchall() for entry_db in entries_db: - # entry_db[1] = id - # entry_db[2] = title - # entry_db[3] = link exist = False - # print("check-db") for entry_feed in feed.entries: - # print("check-feed") # TODO better check and don't repeat code if entry_feed.has_key("title"): title = entry_feed.title @@ -454,18 +474,13 @@ async def remove_nonexistent_entries(db_file, feed, source): link = source # TODO better check and don't repeat code if entry_db[1] == title and entry_db[2] == link: - # print('exist') - # print(title) exist = True break if not exist: - # print('>>> not exist') - # print(entry_db[1]) # TODO Send to table archive # TODO Also make a regular/routine check for sources that have been changed (though that can only happen when manually editing) sql = "DELETE FROM entries WHERE id = ?" cur.execute(sql, (entry_db[0],)) - # breakpoint() async def get_subscriptions(db_file): @@ -478,14 +493,14 @@ async def get_subscriptions(db_file): with create_connection(db_file) as conn: cur = conn.cursor() sql = "SELECT address FROM feeds WHERE enabled = 1" - cur.execute(sql) - return cur.fetchall() + result = cur.execute(sql).fetchall() + return result async def list_subscriptions(db_file): """ Query table feeds and list items. - + :param db_file: Database filename. :return: List of feeds. """ @@ -566,13 +581,115 @@ async def check_entry_exist(db_file, title, link): """ Check whether an entry exists. Query entries by title and link. - + :param db_file: Database filename. :param link: Entry URL. :param title: Entry title. - :return: SQL row or None. + :return: Index ID or None. """ cur = get_cursor(db_file) sql = "SELECT id FROM entries WHERE title = :title and link = :link" - cur.execute(sql, {"title": title, "link": link}) - return cur.fetchone() + result = cur.execute(sql, {"title": title, "link": link}).fetchone() + return result + +# TODO dictionary +# settings = { +# "enabled" : { +# "message": "Updates are {}".format(status), +# "value": val +# }, +# "interval" : { +# "message": "Updates will be sent every {} minutes".format(val), +# "value": val +# }, +# "quantom" : { +# "message": "Every updates will contain {} news items".format(val), +# "value": val +# } +# } + +async def set_settings_value(db_file, key_value): + """ + Set settings value. + + :param db_file: Database filename. + :param key_value: List of key ("enabled", "interval", "quantum") and value (Integer). + :return: Message. + """ + # if isinstance(key_value, list): + # key = key_value[0] + # val = key_value[1] + # elif key_value == "enable": + # key = "enabled" + # val = 1 + # else: + # key = "enabled" + # val = 0 + key = key_value[0] + val = key_value[1] + async with DBLOCK: + with create_connection(db_file) as conn: + cur = conn.cursor() + await set_settings_value_default(cur, key) + sql = "UPDATE settings SET value = :value WHERE key = :key" + cur.execute(sql, {"key": key, "value": val}) + if key == 'quantum': + msg = "Each update will contain {} news items.".format(val) + elif key == 'interval': + msg = "Updates will be sent every {} minutes.".format(val) + else: + if val: + status = "disabled" + else: + status = "enabled" + msg = "Updates are {}.".format(status) + return msg + + +async def set_settings_value_default(cur, key): +# async def set_settings_value_default(cur): +# keys = ["enabled", "interval", "quantum"] +# for i in keys: +# sql = "SELECT id FROM settings WHERE key = ?" +# cur.execute(sql, (i,)) +# if not cur.fetchone(): +# val = await settings.get_value_default(i) +# sql = "INSERT INTO settings(key,value) VALUES(?,?)" +# cur.execute(sql, (i, val)) + sql = "SELECT id FROM settings WHERE key = ?" + cur.execute(sql, (key,)) + if not cur.fetchone(): + val = await confighandler.get_value_default(key) + sql = "INSERT INTO settings(key,value) VALUES(?,?)" + cur.execute(sql, (key, val)) + return val + + +async def get_settings_value(db_file, key): + """ + Get settings value. + + :param db_file: Database filename. + :param key: "enabled", "interval", "quantum". + """ + # try: + # with create_connection(db_file) as conn: + # cur = conn.cursor() + # sql = "SELECT value FROM settings WHERE key = ?" + # cur.execute(sql, (key,)) + # result = cur.fetchone() + # except: + # result = await settings.get_value_default(key) + # if not result: + # result = await settings.get_value_default(key) + # return result + with create_connection(db_file) as conn: + try: + cur = conn.cursor() + sql = "SELECT value FROM settings WHERE key = ?" + result = cur.execute(sql, (key,)).fetchone()[0] + except: + result = await set_settings_value_default(cur, key) + if not result: + result = await set_settings_value_default(cur, key) + return result diff --git a/slixfeed/xmpphandler.py b/slixfeed/xmpphandler.py new file mode 100644 index 0000000..92d5dd6 --- /dev/null +++ b/slixfeed/xmpphandler.py @@ -0,0 +1,450 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from datetime import datetime + +import asyncio +import os +import slixmpp + +import confighandler +import datahandler +import sqlitehandler + +jid_tasker = {} +task_manager = {} + +time_now = datetime.now() +# time_now = time_now.strftime("%H:%M:%S") + +def print_time(): + # return datetime.now().strftime("%H:%M:%S") + now = datetime.now() + current_time = now.strftime("%H:%M:%S") + return current_time + + +class Slixfeed(slixmpp.ClientXMPP): + """ + Slixmpp news bot that will send updates + from feeds it receives. + """ + + print("slixmpp.ClientXMPP") + print(repr(slixmpp.ClientXMPP)) + + def __init__(self, jid, password): + slixmpp.ClientXMPP.__init__(self, jid, password) + + # The session_start event will be triggered when + # the bot establishes its connection with the server + # and the XML streams are ready for use. We want to + # listen for this event so that we we can initialize + # our roster. + self.add_event_handler("session_start", self.start) + # self.add_event_handler("session_start", self.select_file) + # self.add_event_handler("session_start", self.send_status) + # self.add_event_handler("session_start", self.check_updates) + + # The message event is triggered whenever a message + # stanza is received. Be aware that that includes + # MUC messages and error messages. + self.add_event_handler("message", self.message) + self.add_event_handler("disconnected", self.reconnect) + # Initialize event loop + self.loop = asyncio.get_event_loop() + + + async def start(self, event): + """ + Process the session_start event. + + Typical actions for the session_start event are + requesting the roster and broadcasting an initial + presence stanza. + + Arguments: + event -- An empty dictionary. The session_start + event does not provide any additional + data. + """ + self.send_presence() + await self.get_roster() + await self.select_file() + + self.send_presence( + pshow="away", + pstatus="Slixmpp has been restarted.", + pto="sch@pimux.de" + ) + + + async def message(self, msg): + """ + Process incoming message stanzas. Be aware that this also + includes MUC messages and error messages. It is usually + a good idea to check the messages's type before processing + or sending replies. + + Arguments: + msg -- The received message stanza. See the documentation + for stanza objects and the Message stanza to see + how it may be used. + """ + if msg["type"] in ("chat", "normal"): + action = 0 + jid = msg["from"].bare + message = " ".join(msg["body"].split()) + message = message.lower() + if message.startswith("help"): + action = print_help() + # NOTE: Might not need it + # elif message.startswith("add "): + # url = message[4:] + elif message.startswith("http"): + url = message + action = await initdb(jid, datahandler.add_feed, url) + # action = "> " + message + "\n" + action + elif message.startswith("quantum "): + key = message[:7] + val = message[8:] + # action = "Every update will contain {} news items.".format(action) + action = await initdb(jid, sqlitehandler.set_settings_value, [key, val]) + await self.refresh_task(jid, key, val) + elif message.startswith("interval "): + key = message[:8] + val = message[9:] + # action = "Updates will be sent every {} minutes.".format(action) + action = await initdb(jid, sqlitehandler.set_settings_value, [key, val]) + await self.refresh_task(jid, key, val) + elif message.startswith("list"): + action = await initdb(jid, sqlitehandler.list_subscriptions) + elif message.startswith("recent "): + num = message[7:] + action = await initdb(jid, sqlitehandler.last_entries, num) + elif message.startswith("remove "): + ix = message[7:] + action = await initdb(jid, sqlitehandler.remove_feed, ix) + elif message.startswith("search "): + query = message[7:] + action = await initdb(jid, sqlitehandler.search_entries, query) + elif message.startswith("start"): + # action = "Updates are enabled." + key = "enabled" + val = 1 + actiona = await initdb(jid, sqlitehandler.set_settings_value, [key, val]) + asyncio.create_task(self.task_jid(jid)) + # print(print_time(), "task_manager[jid]") + # print(task_manager[jid]) + elif message.startswith("stats"): + action = await initdb(jid, sqlitehandler.statistics) + elif message.startswith("status "): + ix = message[7:] + action = await initdb(jid, sqlitehandler.toggle_status, ix) + elif message.startswith("stop"): + # action = "Updates are disabled." + try: + task_manager[jid]["check"].cancel() + # task_manager[jid]["status"].cancel() + task_manager[jid]["interval"].cancel() + key = "enabled" + val = 0 + actiona = await initdb(jid, sqlitehandler.set_settings_value, [key, val]) + await self.send_status(jid) + print(print_time(), "task_manager[jid]") + print(task_manager[jid]) + except: + # action = "Updates are already disabled." + await self.send_status(jid) + else: + action = "Unknown command. Press \"help\" for list of commands" + if action: msg.reply(action).send() + + print(print_time(), "COMMAND ACCOUNT") + print("COMMAND:", message) + print("ACCOUNT: " + str(msg["from"])) + + + async def select_file(self): + """ + Initiate actions by JID (Jabber ID). + + :param self: Self + """ + while True: + db_dir = confighandler.get_default_dbdir() + if not os.path.isdir(db_dir): + msg = ("Slixfeed can not work without a database. \n" + "To create a database, follow these steps: \n" + "Add Slixfeed contact to your roster \n" + "Send a feed to the bot by: \n" + "add https://reclaimthenet.org/feed/") + print(print_time(), msg) + print(msg) + else: + os.chdir(db_dir) + files = os.listdir() + # TODO Use loop (with gather) instead of TaskGroup + # for file in files: + # if file.endswith(".db") and not file.endswith(".db-jour.db"): + # jid = file[:-3] + # jid_tasker[jid] = asyncio.create_task(self.task_jid(jid)) + # await jid_tasker[jid] + async with asyncio.TaskGroup() as tg: + print("main task") + print(print_time(), "repr(tg)") + print(repr(tg)) # + for file in files: + if file.endswith(".db") and not file.endswith(".db-jour.db"): + jid = file[:-3] + tg.create_task(self.task_jid(jid)) + # task_manager.update({jid: tg}) + # print(task_manager) # {} + print(print_time(), "repr(tg) id(tg)") + print(jid, repr(tg)) # sch@pimux.de + print(jid, id(tg)) # sch@pimux.de 139879835500624 + # + # 139879835500624 + + + async def task_jid(self, jid): + """ + JID (Jabber ID) task manager. + + :param self: Self + :param jid: Jabber ID + """ + enabled = await initdb( + jid, + sqlitehandler.get_settings_value, + "enabled" + ) + print(print_time(), "enabled", enabled, jid) + if enabled: + print("sub task") + print(print_time(), "repr(self) id(self)") + print(repr(self)) + print(id(self)) + task_manager[jid] = {} + task_manager[jid]["check"] = asyncio.create_task(check_updates(jid)) + task_manager[jid]["status"] = asyncio.create_task(self.send_status(jid)) + task_manager[jid]["interval"] = asyncio.create_task(self.send_update(jid)) + await task_manager[jid]["check"] + await task_manager[jid]["status"] + await task_manager[jid]["interval"] + print(print_time(), "task_manager[jid].items()") + print(task_manager[jid].items()) + print(print_time(), "task_manager[jid]") + print(task_manager[jid]) + print(print_time(), "task_manager") + print(task_manager) + else: + await self.send_status(jid) + + async def send_update(self, jid): + """ + Send news items as messages. + + :param self: Self + :param jid: Jabber ID + """ + new = await initdb( + jid, + sqlitehandler.get_entry_unread + ) + if new: + print(print_time(), "> SEND UPDATE",jid) + self.send_message( + mto=jid, + mbody=new, + mtype="chat" + ) + interval = await initdb( + jid, + sqlitehandler.get_settings_value, + "interval" + ) + # await asyncio.sleep(60 * interval) + self.loop.call_at( + self.loop.time() + 60 * interval, + self.loop.create_task, + self.send_update(jid) + ) + + async def send_status(self, jid): + """ + Send status message. + + :param self: Self + :param jid: Jabber ID + """ + print(print_time(), "> SEND STATUS",jid) + unread = await initdb( + jid, + sqlitehandler.get_number_of_entries_unread + ) + + if unread: + status_text = "📰 News items: {}".format(str(unread)) + status_mode = "chat" + else: + status_text = "🗞 No News" + status_mode = "available" + + enabled = await initdb( + jid, + sqlitehandler.get_settings_value, + "enabled" + ) + + if not enabled: + status_mode = "xa" + + # print(status_text, "for", jid) + self.send_presence( + pshow=status_mode, + pstatus=status_text, + pto=jid, + #pfrom=None + ) + + await asyncio.sleep(60 * 20) + + # self.loop.call_at( + # self.loop.time() + 60 * 20, + # self.loop.create_task, + # self.send_status(jid) + # ) + + + async def refresh_task(self, jid, key, val): + """ + Apply settings on runtime. + + :param self: Self + :param jid: Jabber ID + :param key: Key + :param val: Value + """ + if jid in task_manager: + task_manager[jid][key].cancel() + loop = asyncio.get_event_loop() + print(print_time(), "loop") + print(loop) + print(print_time(), "loop") + task_manager[jid][key] = loop.call_at( + loop.time() + 60 * float(val), + loop.create_task, + self.send_update(jid) + ) + # task_manager[jid][key] = self.send_update.loop.call_at( + # self.send_update.loop.time() + 60 * val, + # self.send_update.loop.create_task, + # self.send_update(jid) + # ) + + +# TODO Take this function out of +# +async def check_updates(jid): + """ + Start calling for update check up. + + :param jid: Jabber ID + """ + while True: + print(print_time(), "> CHCK UPDATE",jid) + await initdb(jid, datahandler.download_updates) + await asyncio.sleep(60 * 90) + # Schedule to call this function again in 90 minutes + # self.loop.call_at( + # self.loop.time() + 60 * 90, + # self.loop.create_task, + # self.check_updates(jid) + # ) + + +def print_help(): + """ + Print help manual. + """ + msg = ("Slixfeed - News syndication bot for Jabber/XMPP \n" + "\n" + "DESCRIPTION: \n" + " Slixfeed is a news aggregator bot for online news feeds. \n" + " Supported filetypes: Atom, RDF and RSS. \n" + "\n" + "BASIC USAGE: \n" + " start \n" + " Enable bot and send updates. \n" + " Stop \n" + " Disable bot and stop updates. \n" + " batch N \n" + " Send N updates for each interval. \n" + " interval N \n" + " Send an update every N minutes. \n" + " feed list \n" + " List subscriptions. \n" + "\n" + "EDIT OPTIONS: \n" + " add URL \n" + " Add URL to subscription list. \n" + " remove ID \n" + " Remove feed from subscription list. \n" + " status ID \n" + " Toggle update status of feed. \n" + "\n" + "SEARCH OPTIONS: \n" + " search TEXT \n" + " Search news items by given keywords. \n" + " recent N \n" + " List recent N news items (up to 50 items). \n" + "\n" + "STATISTICS OPTIONS: \n" + " analyses \n" + " Show report and statistics of feeds. \n" + " obsolete \n" + " List feeds that are not available. \n" + " unread \n" + " Print number of unread news items. \n" + "\n" + "BACKUP OPTIONS: \n" + " export opml \n" + " Send an OPML file with your feeds. \n" + " backup news html\n" + " Send an HTML formatted file of your news items. \n" + " backup news md \n" + " Send a Markdown file of your news items. \n" + " backup news text \n" + " Send a Plain Text file of your news items. \n" + "\n" + "DOCUMENTATION: \n" + " Slixfeed \n" + " https://gitgud.io/sjehuda/slixfeed \n" + " Slixmpp \n" + " https://slixmpp.readthedocs.io/ \n" + " feedparser \n" + " https://pythonhosted.org/feedparser") + return msg + + +# TODO Perhaps this needs to be executed +# just once per program execution +async def initdb(jid, callback, message=None): + """ + Callback function to instantiate action on database. + + :param jid: JID (Jabber ID). + :param callback: Function name. + :param massage: Optional kwarg when a message is a part or required argument. + """ + db_dir = confighandler.get_default_dbdir() + if not os.path.isdir(db_dir): + os.mkdir(db_dir) + db_file = os.path.join(db_dir, r"{}.db".format(jid)) + sqlitehandler.create_tables(db_file) + # await sqlitehandler.set_default_values(db_file) + if message: + return await callback(db_file, message) + else: + return await callback(db_file)