diff --git a/slixfeed/__main__.py b/slixfeed/__main__.py index 50718d2..0fb8831 100644 --- a/slixfeed/__main__.py +++ b/slixfeed/__main__.py @@ -13,690 +13,26 @@ # jid = Jabber ID (XMPP) # res = response (HTTP) -import os from argparse import ArgumentParser -from asyncio.exceptions import IncompleteReadError -from datetime import date from getpass import getpass -from http.client import IncompleteRead -from urllib import error -import asyncio import logging -import sys -import time -import aiohttp -from bs4 import BeautifulSoup -from xml.etree.ElementTree import ElementTree, ParseError -from urllib.parse import urlparse -from lxml import html -import feedparser -import slixmpp +from datetime import date +import time # from eliot import start_action, to_file # # to_file(open("slixfeed.log", "w")) # # with start_action(action_type="set_date()", jid=jid): # # with start_action(action_type="message()", msg=msg): -import database - - -class Slixfeed(slixmpp.ClientXMPP): - """ - Slixmpp news bot that will send updates - from feeds it receives. - """ - def __init__(self, jid, password): - slixmpp.ClientXMPP.__init__(self, jid, password) - - # The session_start event will be triggered when - # the bot establishes its connection with the server - # and the XML streams are ready for use. We want to - # listen for this event so that we we can initialize - # our roster. - self.add_event_handler("session_start", self.start) - self.add_event_handler("session_start", self.select_file) - # self.add_event_handler("session_start", self.send_status) - # self.add_event_handler("session_start", self.check_updates) - - # The message event is triggered whenever a message - # stanza is received. Be aware that that includes - # MUC messages and error messages. - self.add_event_handler("message", self.message) - self.add_event_handler("disconnected", self.reconnect) - # Initialize event loop - self.loop = asyncio.get_event_loop() - - async def start(self, event): - """ - Process the session_start event. - - Typical actions for the session_start event are - requesting the roster and broadcasting an initial - presence stanza. - - Arguments: - event -- An empty dictionary. The session_start - event does not provide any additional - data. - """ - self.send_presence() - await self.get_roster() - - async def message(self, msg): - """ - Process incoming message stanzas. Be aware that this also - includes MUC messages and error messages. It is usually - a good idea to check the messages's type before processing - or sending replies. - - Arguments: - msg -- The received message stanza. See the documentation - for stanza objects and the Message stanza to see - how it may be used. - """ - if msg['type'] in ('chat', 'normal'): - message = " ".join(msg['body'].split()) - if message.lower().startswith('help'): - action = print_help() - # NOTE: Might not need it - elif message.lower().startswith('add '): - action = await initdb(msg['from'].bare, add_feed, message[4:]) - # action = "> " + message + "\n" + action - elif message.lower().startswith('quantum '): - action = await initdb(msg['from'].bare, database.set_settings_value, [message[:7], message[8:]]) - # action = "Every update will contain {} news items.".format(action) - elif message.lower().startswith('disable'): - action = await initdb(msg['from'].bare, database.set_settings_value, message) - # action = "Updates are disabled." - elif message.lower().startswith('enable'): - action = await initdb(msg['from'].bare, database.set_settings_value, message) - # action = "Updates are enabled." - elif message.lower().startswith('interval '): - action = await initdb(msg['from'].bare, database.set_settings_value, [message[:8], message[9:]]) - # action = "Updates will be sent every {} minutes.".format(action) - elif message.lower().startswith('list'): - action = await initdb(msg['from'].bare, database.list_subscriptions) - elif message.lower().startswith('recent '): - action = await initdb(msg['from'].bare, database.last_entries, message[7:]) - elif message.lower().startswith('remove '): - action = await initdb(msg['from'].bare, database.remove_feed, message[7:]) - elif message.lower().startswith('search '): - action = await initdb( msg['from'].bare, database.search_entries, message[7:]) - elif message.lower().startswith('status '): - action = await initdb(msg['from'].bare, database.toggle_status, message[7:]) - elif message.lower().startswith('unread'): - action = await initdb(msg['from'].bare, database.statistics) - else: - action = "Unknown command. Press \"help\" for list of commands" - msg.reply(action).send() - - print("COMMAND:", message) - print("ACCOUNT: " + str(msg['from'])) - - async def select_file(self, event): - """ - Initiate actions by JID (Jabber ID). - - :param self: - :param event: - """ - while True: - db_dir = get_default_dbdir() - if not os.path.isdir(db_dir): - msg = ("Slixfeed can not work without a database. \n" - "To create a database, follow these steps: \n" - "Add Slixfeed contact to your roster \n" - "Send a feed to the bot by: \n" - "feed add https://reclaimthenet.org/feed/") - print(msg) - else: - os.chdir(db_dir) - files = os.listdir() - async with asyncio.TaskGroup() as tg: - for file in files: - if file.endswith('.db') and not file.endswith('.db-jour.db'): - jid = file[:-3] - tg.create_task(self.jid(event, jid)) - - async def jid(self, event, jid): - """ - JID (Jabber ID) task manager. - - :param self: - :param event: - :param jid: Jabber ID - """ - enabled = await initdb(jid, database.get_settings_value, 'enabled') - print("enabled", enabled, jid) - if enabled: - async with asyncio.TaskGroup() as tg: - tg.create_task(self.check_updates(event, jid)) - tg.create_task(self.send_update(event, jid)) - tg.create_task(self.send_status(event, jid)) - - async def check_updates(self, event, jid): - """ - Start calling for update check up. - - :param self: - :param event: - :param jid: Jabber ID - """ - while True: - print("> CHCK UPDATE",jid) - await initdb(jid, download_updates) - await asyncio.sleep(60 * 90) - # Schedule to call this function again in 90 minutes - # self.loop.call_at(self.loop.time() + 60 * 90, self.loop.create_task, self.check_updates(event, jid)) - - async def send_update(self, event, jid): - """ - Send news items as messages. - - :param self: - :param event: - :param jid: Jabber ID - """ - new = await initdb( - jid, - database.get_entry_unread - ) - if new: - print("> SEND UPDATE",jid) - self.send_message( - mto=jid, - mbody=new, - mtype='chat' - ) - interval = await initdb(jid, database.get_settings_value, 'interval') - # await asyncio.sleep(60 * interval) - self.loop.call_at(self.loop.time() + 60 * interval, self.loop.create_task, self.send_update(event, jid)) - - async def send_status(self, event, jid): - """ - Send status message. - - :param self: - :param event: - :param jid: Jabber ID - """ - print("> SEND STATUS",jid) - unread = await initdb( - jid, - database.get_number_of_entries_unread - ) - if unread: - msg_status = ('📰 News items:', str(unread)) - else: - msg_status = '🗞 No News' - # print(msg_status, 'for', jid) - self.send_presence( - pstatus=msg_status, - pto=jid, - #pfrom=None - ) - # await asyncio.sleep(60 * 20) - self.loop.call_at(self.loop.time() + 60 * 20, self.loop.create_task, self.send_status(event, jid)) - - -def print_help(): - """ - Print help manual. - """ - msg = ("Slixfeed - News syndication bot for Jabber/XMPP \n" - "\n" - "DESCRIPTION: \n" - " Slixfeed is a news aggregator bot for online news feeds. \n" - " Supported filetypes: Atom, RDF and RSS. \n" - "\n" - "BASIC USAGE: \n" - " enable \n" - " Send updates. \n" - " disable \n" - " Stop sending updates. \n" - " batch N \n" - " Send N updates on ech interval. \n" - " interval N \n" - " Send an update each N minutes. \n" - " feed list \n" - " List subscriptions. \n" - "\n" - "EDIT OPTIONS: \n" - " add URL \n" - " Add URL to subscription list. \n" - " remove ID \n" - " Remove feed from subscription list. \n" - " status ID \n" - " Toggle update status of feed. \n" - "\n" - "SEARCH OPTIONS: \n" - " search TEXT \n" - " Search news items by given keywords. \n" - " recent N \n" - " List recent N news items (up to 50 items). \n" - "\n" - "STATISTICS OPTIONS: \n" - " analyses \n" - " Show report and statistics of feeds. \n" - " obsolete \n" - " List feeds that are not available. \n" - " unread \n" - " Print number of unread news items. \n" - "\n" - "BACKUP OPTIONS: \n" - " export opml \n" - " Send an OPML file with your feeds. \n" - " backup news html\n" - " Send an HTML formatted file of your news items. \n" - " backup news md \n" - " Send a Markdown file of your news items. \n" - " backup news text \n" - " Send a Plain Text file of your news items. \n" - "\n" - "DOCUMENTATION: \n" - " Slixfeed \n" - " https://gitgud.io/sjehuda/slixfeed \n" - " Slixmpp \n" - " https://slixmpp.readthedocs.io/ \n" - " feedparser \n" - " https://pythonhosted.org/feedparser") - return msg - - -# Function from jarun/buku -# Arun Prakash Jana (jarun) -# Dmitry Marakasov (AMDmi3) -def get_default_dbdir(): - """Determine the directory path where dbfile will be stored. - - If $XDG_DATA_HOME is defined, use it - else if $HOME exists, use it - else if the platform is Windows, use %APPDATA% - else use the current directory. - - :return: Path to database file. - - Note - ---- - This code was taken from the buku project. - """ -# data_home = xdg.BaseDirectory.xdg_data_home - data_home = os.environ.get('XDG_DATA_HOME') - if data_home is None: - if os.environ.get('HOME') is None: - if sys.platform == 'win32': - data_home = os.environ.get('APPDATA') - if data_home is None: - return os.path.abspath('.') - else: - return os.path.abspath('.') - else: - data_home = os.path.join(os.environ.get('HOME'), '.local', 'share') - return os.path.join(data_home, 'slixfeed') - - -# TODO Perhaps this needs to be executed -# just once per program execution -async def initdb(jid, callback, message=None): - """ - Callback function to instantiate action on database. - - :param jid: JID (Jabber ID). - :param callback: Function name. - :param massage: Optional kwarg when a message is a part or required argument. - """ - db_dir = get_default_dbdir() - if not os.path.isdir(db_dir): - os.mkdir(db_dir) - db_file = os.path.join(db_dir, r"{}.db".format(jid)) - database.create_tables(db_file) - # await database.set_default_values(db_file) - if message: - return await callback(db_file, message) - else: - return await callback(db_file) - -# NOTE I don't think there should be "return" -# because then we might stop scanning next URLs -async def download_updates(db_file): - """ - Chack feeds for new entries. - - :param db_file: Database filename. - """ - urls = await database.get_subscriptions(db_file) - - for url in urls: - # print(os.path.basename(db_file), url[0]) - source = url[0] - res = await download_feed(source) - # TypeError: 'NoneType' object is not subscriptable - if res is None: - # Skip to next feed - # urls.next() - # next(urls) - continue - - await database.update_source_status(db_file, res[1], source) - - if res[0]: - try: - feed = feedparser.parse(res[0]) - if feed.bozo: - # bozo = ("WARNING: Bozo detected for feed <{}>. " - # "For more information, visit " - # "https://pythonhosted.org/feedparser/bozo.html" - # .format(source)) - # print(bozo) - valid = 0 - else: - valid = 1 - await database.update_source_validity(db_file, source, valid) - except (IncompleteReadError, IncompleteRead, error.URLError) as e: - print(e) - # return - # TODO Place these couple of lines back down - # NOTE Need to correct the SQL statement to do so - # NOT SURE WHETHER I MEANT THE LINES ABOVE OR BELOW - - if res[1] == 200: - # NOT SURE WHETHER I MEANT THE LINES ABOVE OR BELOW - # TODO Place these couple of lines back down - # NOTE Need to correct the SQL statement to do so - entries = feed.entries - # length = len(entries) - # await database.remove_entry(db_file, source, length) - await database.remove_nonexistent_entries(db_file, feed, source) - - new_entry = 0 - for entry in entries: - - if entry.has_key("title"): - title = entry.title - else: - title = feed["feed"]["title"] - - if entry.has_key("link"): - link = entry.link - else: - link = source - - exist = await database.check_entry_exist(db_file, title, link) - - if not exist: - new_entry = new_entry + 1 - # TODO Enhance summary - if entry.has_key("summary"): - summary = entry.summary - # Remove HTML tags - summary = BeautifulSoup(summary, "lxml").text - # TODO Limit text length - summary = summary.replace("\n\n", "\n")[:300] + " ‍⃨" - else: - summary = '*** No summary ***' - entry = (title, summary, link, source, 0); - await database.add_entry_and_set_date(db_file, source, entry) - - -async def download_feed(url): - """ - Download content of given URL. - - :param url: URL. - :return: Document or error message. - """ - timeout = aiohttp.ClientTimeout(total=10) - async with aiohttp.ClientSession() as session: - # async with aiohttp.ClientSession(trust_env=True) as session: - try: - async with session.get(url, timeout=timeout) as response: - status = response.status - if response.status == 200: - try: - doc = await response.text() - # print (response.content_type) - return [doc, status] - except: - return [False, "The content of this document doesn't appear to be textual"] - else: - return [False, "HTTP Error: " + str(status)] - except aiohttp.ClientError as e: - print('Error', str(e)) - return [False, "Error: " + str(e)] - except asyncio.TimeoutError as e: - # print('Timeout:', str(e)) - return [False, "Timeout"] - - -async def add_feed(db_file, url): - """ - Check whether feed exist, otherwise process it. - - :param db_file: Database filename. - :param url: URL. - :return: Status message. - """ - exist = await database.check_feed_exist(db_file, url) - - if not exist: - res = await download_feed(url) - if res[0]: - feed = feedparser.parse(res[0]) - title = await get_title(url, feed) - if feed.bozo: - bozo = ("WARNING: Bozo detected. Failed to load <{}>.".format(url)) - print(bozo) - try: - # tree = etree.fromstring(res[0]) # etree is for xml - tree = html.fromstring(res[0]) - except: - return "Failed to parse URL <{}> as feed".format(url) - - print("RSS Auto-Discovery Engaged") - xpath_query = """//link[(@rel="alternate") and (@type="application/atom+xml" or @type="application/rdf+xml" or @type="application/rss+xml")]""" - # xpath_query = """//link[(@rel="alternate") and (@type="application/atom+xml" or @type="application/rdf+xml" or @type="application/rss+xml")]/@href""" - # xpath_query = "//link[@rel='alternate' and @type='application/atom+xml' or @rel='alternate' and @type='application/rss+xml' or @rel='alternate' and @type='application/rdf+xml']/@href" - feeds = tree.xpath(xpath_query) - if len(feeds) > 1: - msg = "RSS Auto-Discovery has found {} feeds:\n\n".format(len(feeds)) - for feed in feeds: - # # The following code works; - # # The following code will catch - # # only valid resources (i.e. not 404); - # # The following code requires more bandwidth. - # res = await download_feed(feed) - # if res[0]: - # disco = feedparser.parse(res[0]) - # title = disco["feed"]["title"] - # msg += "{} \n {} \n\n".format(title, feed) - feed_name = feed.xpath('@title')[0] - feed_addr = feed.xpath('@href')[0] - msg += "{}\n{}\n\n".format(feed_name, feed_addr) - msg += "The above feeds were extracted from\n{}".format(url) - return msg - elif feeds: - url = feeds[0].xpath('@href')[0] - # Why wouldn't add_feed return a message - # upon success unless return is explicitly - # mentioned, yet upon failure it wouldn't? - return await add_feed(db_file, url) - - # Search for feeds by file extension and path - paths = [ - "/app.php/feed", # phpbb - "/atom", - "/atom.php", - "/atom.xml", - "/content-feeds/", - "/external.php?type=RSS2", - "/feed", # good practice - "/feed.atom", - # "/feed.json", - "/feed.php", - "/feed.rdf", - "/feed.rss", - "/feed.xml", - "/feed/atom/", - "/feeds/news_feed", - "/feeds/rss/news.xml.php", - "/forum_rss.php", - "/index.php/feed", - "/index.php?type=atom;action=.xml", #smf - "/index.php?type=rss;action=.xml", #smf - "/index.rss", - "/latest.rss", - "/news", - "/news.xml", - "/news.xml.php", - "/news/feed", - "/posts.rss", # discourse - "/rdf", - "/rdf.php", - "/rdf.xml", - "/rss", - # "/rss.json", - "/rss.php", - "/rss.xml", - "/timeline.rss", - "/xml/feed.rss", - # "?format=atom", - # "?format=rdf", - # "?format=rss", - # "?format=xml" - ] - - print("RSS Scan Mode Engaged") - feeds = {} - for path in paths: - # xpath_query = "//*[@*[contains(.,'{}')]]".format(path) - xpath_query = "//a[contains(@href,'{}')]".format(path) - addresses = tree.xpath(xpath_query) - parted_url = urlparse(url) - for address in addresses: - address = address.xpath('@href')[0] - if address.startswith('/'): - address = parted_url.scheme + '://' + parted_url.netloc + address - res = await download_feed(address) - if res[1] == 200: - try: - feeds[address] = feedparser.parse(res[0])["feed"]["title"] - except: - continue - if len(feeds) > 1: - msg = "RSS URL scan has found {} feeds:\n\n".format(len(feeds)) - for feed in feeds: - # try: - # res = await download_feed(feed) - # except: - # continue - feed_name = feeds[feed] - feed_addr = feed - msg += "{}\n{}\n\n".format(feed_name, feed_addr) - msg += "The above feeds were extracted from\n{}".format(url) - return msg - elif feeds: - url = list(feeds)[0] - return await add_feed(db_file, url) - - # (HTTP) Request(s) Paths - print("RSS Arbitrary Mode Engaged") - feeds = {} - parted_url = urlparse(url) - for path in paths: - address = parted_url.scheme + '://' + parted_url.netloc + path - res = await download_feed(address) - if res[1] == 200: - # print(feedparser.parse(res[0])["feed"]["title"]) - # feeds[address] = feedparser.parse(res[0])["feed"]["title"] - try: - title = feedparser.parse(res[0])["feed"]["title"] - except: - title = '*** No Title ***' - feeds[address] = title - - # Check whether URL has path (i.e. not root) - if parted_url.path.split('/')[1]: - paths.extend([".atom", ".feed", ".rdf", ".rss"]) if '.rss' not in paths else -1 - # if paths.index('.rss'): - # paths.extend([".atom", ".feed", ".rdf", ".rss"]) - address = parted_url.scheme + '://' + parted_url.netloc + '/' + parted_url.path.split('/')[1] + path - res = await download_feed(address) - if res[1] == 200: - print('ATTENTION') - print(address) - try: - title = feedparser.parse(res[0])["feed"]["title"] - except: - title = '*** No Title ***' - feeds[address] = title - if len(feeds) > 1: - msg = "RSS URL discovery has found {} feeds:\n\n".format(len(feeds)) - for feed in feeds: - feed_name = feeds[feed] - feed_addr = feed - msg += "{}\n{}\n\n".format(feed_name, feed_addr) - msg += "The above feeds were extracted from\n{}".format(url) - elif feeds: - url = list(feeds)[0] - msg = await add_feed(db_file, url) - else: - msg = "No news feeds were found for URL <{}>.".format(url) - else: - msg = await database.add_feed(db_file, title, url, res) - else: - msg = "Failed to get URL <{}>. Reason: {}".format(url, res[1]) - else: - ix = exist[0] - name = exist[1] - msg = "> {}\nNews source \"{}\" is already listed in the subscription list at index {}".format(url, name, ix) - return msg - - -async def get_title(url, feed): - """ - Get title of feed. - - :param url: URL - :param feed: Parsed feed - :return: Title or URL hostname. - """ - try: - title = feed["feed"]["title"] - except: - title = urlparse(url).netloc - return title - - -def toggle_state(jid, state): - """ - Set status of update. - - :param jid: JID (Jabber ID). - :param state: True or False. - :return: Status message. - """ - db_dir = get_default_dbdir() - db_file = os.path.join(db_dir, r"{}.db".format(jid)) - bk_file = os.path.join(db_dir, r"{}.db.bak".format(jid)) - - if state: - if os.path.exists(db_file): - return "Updates are already enabled" - elif os.path.exists(bk_file): - os.renames(bk_file, db_file) - return "Updates are now enabled" - else: - if os.path.exists(bk_file): - return "Updates are already disabled" - elif os.path.exists(db_file): - os.renames(db_file, bk_file) - return "Updates are now disabled" +#import irchandler +import xmpphandler +#import matrixhandler if __name__ == '__main__': # Setup the command line arguments. - parser = ArgumentParser(description=Slixfeed.__doc__) + parser = ArgumentParser(description=xmpphandler.Slixfeed.__doc__) # Output verbosity options. parser.add_argument( @@ -730,7 +66,7 @@ if __name__ == '__main__': # Setup the Slixfeed and register plugins. Note that while plugins may # have interdependencies, the order in which you register them does # not matter. - xmpp = Slixfeed(args.jid, args.password) + xmpp = xmpphandler.Slixfeed(args.jid, args.password) xmpp.register_plugin('xep_0004') # Data Forms xmpp.register_plugin('xep_0030') # Service Discovery xmpp.register_plugin('xep_0045') # Multi-User Chat diff --git a/slixfeed/confighandler.py b/slixfeed/confighandler.py new file mode 100644 index 0000000..82b0f79 --- /dev/null +++ b/slixfeed/confighandler.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +import sys + +def get_default_dbdir(): + """ + Determine the directory path where dbfile will be stored. + + If $XDG_DATA_HOME is defined, use it + else if $HOME exists, use it + else if the platform is Windows, use %APPDATA% + else use the current directory. + + :return: Path to database file. + + Note + ---- + This code was taken from the buku project. + + * Arun Prakash Jana (jarun) + * Dmitry Marakasov (AMDmi3) + """ +# data_home = xdg.BaseDirectory.xdg_data_home + data_home = os.environ.get('XDG_DATA_HOME') + if data_home is None: + if os.environ.get('HOME') is None: + if sys.platform == 'win32': + data_home = os.environ.get('APPDATA') + if data_home is None: + return os.path.abspath('.') + else: + return os.path.abspath('.') + else: + data_home = os.path.join(os.environ.get('HOME'), '.local', 'share') + return os.path.join(data_home, 'slixfeed') + + +async def get_value_default(key): + """ + Get settings default value. + + :param key: "enabled", "interval", "quantum". + :return: Integer. + """ + if key == "enabled": + result = 1 + elif key == "quantum": + result = 4 + elif key == "interval": + result = 30 + return result diff --git a/slixfeed/datahandler.py b/slixfeed/datahandler.py new file mode 100644 index 0000000..08aaaf0 --- /dev/null +++ b/slixfeed/datahandler.py @@ -0,0 +1,331 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import feedparser +import aiohttp +import asyncio + +import feedhandler +import sqlitehandler + +from http.client import IncompleteRead +from asyncio.exceptions import IncompleteReadError +from urllib import error +from bs4 import BeautifulSoup +# from xml.etree.ElementTree import ElementTree, ParseError +from urllib.parse import urlparse +from lxml import html + +async def download_updates(db_file): + """ + Check feeds for new entries. + + :param db_file: Database filename. + """ + urls = await sqlitehandler.get_subscriptions(db_file) + + for url in urls: + # print(os.path.basename(db_file), url[0]) + source = url[0] + res = await download_feed(source) + # TypeError: 'NoneType' object is not subscriptable + if res is None: + # Skip to next feed + # urls.next() + # next(urls) + continue + + await sqlitehandler.update_source_status(db_file, res[1], source) + + if res[0]: + try: + feed = feedparser.parse(res[0]) + if feed.bozo: + # bozo = ("WARNING: Bozo detected for feed <{}>. " + # "For more information, visit " + # "https://pythonhosted.org/feedparser/bozo.html" + # .format(source)) + # print(bozo) + valid = 0 + else: + valid = 1 + await sqlitehandler.update_source_validity(db_file, source, valid) + except (IncompleteReadError, IncompleteRead, error.URLError) as e: + print(e) + # NOTE I don't think there should be "return" + # because then we might stop scanning next URLs + # return + # TODO Place these couple of lines back down + # NOTE Need to correct the SQL statement to do so + # NOT SURE WHETHER I MEANT THE LINES ABOVE OR BELOW + + if res[1] == 200: + # NOT SURE WHETHER I MEANT THE LINES ABOVE OR BELOW + # TODO Place these couple of lines back down + # NOTE Need to correct the SQL statement to do so + entries = feed.entries + # length = len(entries) + # await sqlitehandler.remove_entry(db_file, source, length) + await sqlitehandler.remove_nonexistent_entries(db_file, feed, source) + + new_entry = 0 + for entry in entries: + + if entry.has_key("title"): + title = entry.title + else: + title = feed["feed"]["title"] + + if entry.has_key("link"): + link = entry.link + else: + link = source + + exist = await sqlitehandler.check_entry_exist(db_file, title, link) + + if not exist: + new_entry = new_entry + 1 + # TODO Enhance summary + if entry.has_key("summary"): + summary = entry.summary + # Remove HTML tags + summary = BeautifulSoup(summary, "lxml").text + # TODO Limit text length + summary = summary.replace("\n\n", "\n")[:300] + " ‍⃨" + else: + summary = '*** No summary ***' + entry = (title, summary, link, source, 0); + await sqlitehandler.add_entry_and_set_date(db_file, source, entry) + + +async def add_feed(db_file, url): + """ + Check whether feed exist, otherwise process it. + + :param db_file: Database filename. + :param url: URL. + :return: Status message. + """ + exist = await sqlitehandler.check_feed_exist(db_file, url) + + if not exist: + res = await download_feed(url) + if res[0]: + feed = feedparser.parse(res[0]) + title = await feedhandler.get_title(url, feed) + if feed.bozo: + bozo = ("WARNING: Bozo detected. Failed to load <{}>.".format(url)) + print(bozo) + try: + # tree = etree.fromstring(res[0]) # etree is for xml + tree = html.fromstring(res[0]) + except: + return "Failed to parse URL <{}> as feed".format(url) + + print("RSS Auto-Discovery Engaged") + xpath_query = """//link[(@rel="alternate") and (@type="application/atom+xml" or @type="application/rdf+xml" or @type="application/rss+xml")]""" + # xpath_query = """//link[(@rel="alternate") and (@type="application/atom+xml" or @type="application/rdf+xml" or @type="application/rss+xml")]/@href""" + # xpath_query = "//link[@rel='alternate' and @type='application/atom+xml' or @rel='alternate' and @type='application/rss+xml' or @rel='alternate' and @type='application/rdf+xml']/@href" + feeds = tree.xpath(xpath_query) + if len(feeds) > 1: + msg = "RSS Auto-Discovery has found {} feeds:\n\n".format(len(feeds)) + for feed in feeds: + # # The following code works; + # # The following code will catch + # # only valid resources (i.e. not 404); + # # The following code requires more bandwidth. + # res = await download_feed(feed) + # if res[0]: + # disco = feedparser.parse(res[0]) + # title = disco["feed"]["title"] + # msg += "{} \n {} \n\n".format(title, feed) + feed_name = feed.xpath('@title')[0] + feed_addr = feed.xpath('@href')[0] + msg += "{}\n{}\n\n".format(feed_name, feed_addr) + msg += "The above feeds were extracted from\n{}".format(url) + return msg + elif feeds: + url = feeds[0].xpath('@href')[0] + # Why wouldn't add_feed return a message + # upon success unless return is explicitly + # mentioned, yet upon failure it wouldn't? + return await add_feed(db_file, url) + + # Search for feeds by file extension and path + paths = [ + ".atom", + ".rss", + ".xml", + "/?feed=atom", + "/?feed=rdf", + "/?feed=rss", + "/?feed=xml", # wordpress + "/?format=atom", + "/?format=rdf", + "/?format=rss", + "/?format=xml", # phpbb + "/app.php/feed", + "/atom", + "/atom.php", + "/atom.xml", + "/blog/feed/", + "/content-feeds/", + "/external.php?type=RSS2", + "/en/feed/", + "/feed", # good practice + "/feed.atom", + # "/feed.json", + "/feed.php", + "/feed.rdf", + "/feed.rss", + "/feed.xml", + "/feed/atom/", + "/feeds/news_feed", + "/feeds/posts/default", + "/feeds/posts/default?alt=atom", + "/feeds/posts/default?alt=rss", + "/feeds/rss/news.xml.php", + "/forum_rss.php", + "/index.atom", + "/index.php/feed", + "/index.php?type=atom;action=.xml", #smf + "/index.php?type=rss;action=.xml", #smf + "/index.rss", + "/jekyll/feed.xml", + "/latest.rss", + "/news", + "/news.xml", + "/news.xml.php", + "/news/feed", + "/posts.rss", # discourse + "/rdf", + "/rdf.php", + "/rdf.xml", + "/rss", + # "/rss.json", + "/rss.php", + "/rss.xml", + "/timeline.rss", + "/videos.atom", + # "/videos.json", + "/videos.xml", + "/xml/feed.rss" + ] + + print("RSS Scan Mode Engaged") + feeds = {} + for path in paths: + # xpath_query = "//*[@*[contains(.,'{}')]]".format(path) + xpath_query = "//a[contains(@href,'{}')]".format(path) + addresses = tree.xpath(xpath_query) + parted_url = urlparse(url) + # NOTE Should number of addresses be limited or + # perhaps be N from the start and N from the end + for address in addresses: + address = address.xpath('@href')[0] + if address.startswith('/'): + address = parted_url.scheme + '://' + parted_url.netloc + address + res = await download_feed(address) + if res[1] == 200: + try: + feeds[address] = feedparser.parse(res[0])["feed"]["title"] + except: + continue + if len(feeds) > 1: + msg = "RSS URL scan has found {} feeds:\n\n".format(len(feeds)) + for feed in feeds: + # try: + # res = await download_feed(feed) + # except: + # continue + feed_name = feeds[feed] + feed_addr = feed + msg += "{}\n{}\n\n".format(feed_name, feed_addr) + msg += "The above feeds were extracted from\n{}".format(url) + return msg + elif feeds: + url = list(feeds)[0] + return await add_feed(db_file, url) + + # (HTTP) Request(s) Paths + print("RSS Arbitrary Mode Engaged") + feeds = {} + parted_url = urlparse(url) + for path in paths: + address = parted_url.scheme + '://' + parted_url.netloc + path + res = await download_feed(address) + if res[1] == 200: + # print(feedparser.parse(res[0])["feed"]["title"]) + # feeds[address] = feedparser.parse(res[0])["feed"]["title"] + try: + title = feedparser.parse(res[0])["feed"]["title"] + except: + title = '*** No Title ***' + feeds[address] = title + + # Check whether URL has path (i.e. not root) + if parted_url.path.split('/')[1]: + paths.extend([".atom", ".feed", ".rdf", ".rss"]) if '.rss' not in paths else -1 + # if paths.index('.rss'): + # paths.extend([".atom", ".feed", ".rdf", ".rss"]) + address = parted_url.scheme + '://' + parted_url.netloc + '/' + parted_url.path.split('/')[1] + path + res = await download_feed(address) + if res[1] == 200: + print('ATTENTION') + print(address) + try: + title = feedparser.parse(res[0])["feed"]["title"] + except: + title = '*** No Title ***' + feeds[address] = title + if len(feeds) > 1: + msg = "RSS URL discovery has found {} feeds:\n\n".format(len(feeds)) + for feed in feeds: + feed_name = feeds[feed] + feed_addr = feed + msg += "{}\n{}\n\n".format(feed_name, feed_addr) + msg += "The above feeds were extracted from\n{}".format(url) + elif feeds: + url = list(feeds)[0] + msg = await add_feed(db_file, url) + else: + msg = "No news feeds were found for URL <{}>.".format(url) + else: + msg = await sqlitehandler.add_feed(db_file, title, url, res) + else: + msg = "Failed to get URL <{}>. Reason: {}".format(url, res[1]) + else: + ix = exist[0] + name = exist[1] + msg = "> {}\nNews source \"{}\" is already listed in the subscription list at index {}".format(url, name, ix) + return msg + + +async def download_feed(url): + """ + Download content of given URL. + + :param url: URL. + :return: Document or error message. + """ + timeout = aiohttp.ClientTimeout(total=10) + async with aiohttp.ClientSession() as session: + # async with aiohttp.ClientSession(trust_env=True) as session: + try: + async with session.get(url, timeout=timeout) as response: + status = response.status + if response.status == 200: + try: + doc = await response.text() + # print (response.content_type) + return [doc, status] + except: + return [False, "The content of this document doesn't appear to be textual"] + else: + return [False, "HTTP Error: " + str(status)] + except aiohttp.ClientError as e: + print('Error', str(e)) + return [False, "Error: " + str(e)] + except asyncio.TimeoutError as e: + # print('Timeout:', str(e)) + return [False, "Timeout: " + str(e)] diff --git a/slixfeed/feedhandler.py b/slixfeed/feedhandler.py new file mode 100644 index 0000000..10ae3f0 --- /dev/null +++ b/slixfeed/feedhandler.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from urllib.parse import urlparse + +async def get_title(url, feed): + """ + Get title of feed. + + :param url: URL + :param feed: Parsed feed + :return: Title or URL hostname. + """ + try: + title = feed["feed"]["title"] + except: + title = urlparse(url).netloc + return title \ No newline at end of file diff --git a/slixfeed/database.py b/slixfeed/sqlitehandler.py similarity index 99% rename from slixfeed/database.py rename to slixfeed/sqlitehandler.py index f6e4d15..3fb2e76 100644 --- a/slixfeed/database.py +++ b/slixfeed/sqlitehandler.py @@ -3,10 +3,11 @@ import sqlite3 import asyncio + from sqlite3 import Error from datetime import date -import settings +import confighandler # from eliot import start_action, to_file # # with start_action(action_type="list_subscriptions()", db=db_file): @@ -638,7 +639,7 @@ async def set_settings_value_default(cur, key): sql = "SELECT id FROM settings WHERE key = ?" cur.execute(sql, (key,)) if not cur.fetchone(): - val = await settings.get_value_default(key) + val = await confighandler.get_value_default(key) sql = "INSERT INTO settings(key,value) VALUES(?,?)" cur.execute(sql, (key, val)) return val diff --git a/slixfeed/xmpphandler.py b/slixfeed/xmpphandler.py new file mode 100644 index 0000000..29e9e65 --- /dev/null +++ b/slixfeed/xmpphandler.py @@ -0,0 +1,360 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import asyncio +import os +import slixmpp + +import confighandler +import datahandler +import sqlitehandler + +task_manager = {} + +class Slixfeed(slixmpp.ClientXMPP): + """ + Slixmpp news bot that will send updates + from feeds it receives. + """ + def __init__(self, jid, password): + slixmpp.ClientXMPP.__init__(self, jid, password) + + # The session_start event will be triggered when + # the bot establishes its connection with the server + # and the XML streams are ready for use. We want to + # listen for this event so that we we can initialize + # our roster. + self.add_event_handler("session_start", self.start) + self.add_event_handler("session_start", self.select_file) + # self.add_event_handler("session_start", self.send_status) + # self.add_event_handler("session_start", self.check_updates) + + # The message event is triggered whenever a message + # stanza is received. Be aware that that includes + # MUC messages and error messages. + self.add_event_handler("message", self.message) + self.add_event_handler("disconnected", self.reconnect) + # Initialize event loop + self.loop = asyncio.get_event_loop() + + async def start(self, event): + """ + Process the session_start event. + + Typical actions for the session_start event are + requesting the roster and broadcasting an initial + presence stanza. + + Arguments: + event -- An empty dictionary. The session_start + event does not provide any additional + data. + """ + self.send_presence() + await self.get_roster() + + async def message(self, event, msg): + """ + Process incoming message stanzas. Be aware that this also + includes MUC messages and error messages. It is usually + a good idea to check the messages's type before processing + or sending replies. + + Arguments: + msg -- The received message stanza. See the documentation + for stanza objects and the Message stanza to see + how it may be used. + """ + if msg['type'] in ('chat', 'normal'): + jid = msg['from'].bare + message = " ".join(msg['body'].split()) + if message.lower().startswith('help'): + action = print_help() + # NOTE: Might not need it + elif message.lower().startswith('add '): + action = await initdb(jid, datahandler.add_feed, message[4:]) + # action = "> " + message + "\n" + action + elif message.lower().startswith('quantum '): + key = message[:7] + val = message[8:] + # action = "Every update will contain {} news items.".format(action) + action = await initdb(jid, sqlitehandler.set_settings_value, [key, val]) + await self.refresh_task(jid, key, val) + elif message.lower().startswith('disable'): + # action = "Updates are disabled." + action = await initdb(jid, sqlitehandler.set_settings_value, message) + await self.refresh_task(jid, "enabled", 0) + elif message.lower().startswith('enable'): + # action = "Updates are enabled." + action = await initdb(jid, sqlitehandler.set_settings_value, message) + await self.refresh_task(jid, "enabled", 1) + elif message.lower().startswith('interval '): + key = message[:8] + val = message[9:] + # action = "Updates will be sent every {} minutes.".format(action) + action = await initdb(jid, sqlitehandler.set_settings_value, [key, val]) + await self.refresh_task(event, jid, key, val) + elif message.lower().startswith('list'): + action = await initdb(jid, sqlitehandler.list_subscriptions) + elif message.lower().startswith('recent '): + action = await initdb(jid, sqlitehandler.last_entries, message[7:]) + elif message.lower().startswith('remove '): + action = await initdb(jid, sqlitehandler.remove_feed, message[7:]) + elif message.lower().startswith('search '): + action = await initdb(jid, sqlitehandler.search_entries, message[7:]) + elif message.lower().startswith('status '): + action = await initdb(jid, sqlitehandler.toggle_status, message[7:]) + elif message.lower().startswith('unread'): + action = await initdb(jid, sqlitehandler.statistics) + else: + action = "Unknown command. Press \"help\" for list of commands" + msg.reply(action).send() + + print("COMMAND:", message) + print("ACCOUNT: " + str(msg['from'])) + + async def select_file(self, event): + """ + Initiate actions by JID (Jabber ID). + + :param self: Self + :param event: Event + """ + while True: + db_dir = confighandler.get_default_dbdir() + if not os.path.isdir(db_dir): + msg = ("Slixfeed can not work without a database. \n" + "To create a database, follow these steps: \n" + "Add Slixfeed contact to your roster \n" + "Send a feed to the bot by: \n" + "add https://reclaimthenet.org/feed/") + print(msg) + else: + os.chdir(db_dir) + files = os.listdir() + async with asyncio.TaskGroup() as tg: + print("main task") + print(repr(tg)) + for file in files: + if file.endswith('.db') and not file.endswith('.db-jour.db'): + jid = file[:-3] + tg.create_task(self.jid(event, jid)) + # task_manager.update({jid: tg}) + print(task_manager) + print(jid, repr(tg)) + print(jid, id(tg)) + + async def jid(self, event, jid): + """ + JID (Jabber ID) task manager. + + :param self: Self + :param event: Event + :param jid: Jabber ID + """ + enabled = await initdb( + jid, + sqlitehandler.get_settings_value, + 'enabled' + ) + print("enabled", enabled, jid) + if enabled: + async with asyncio.TaskGroup() as tg: + print("sub task") + print(repr(self)) + print(id(self)) + print(repr(tg)) + print(id(tg)) + tg.create_task(self.check_updates(event, jid)) + # tg.create_task(self.send_update(event, jid)) + task_manager[jid] = {} + task_manager[jid]['interval'] = tg.create_task(self.send_update(event, jid)) + print(task_manager[jid]) + tg.create_task(self.send_status(event, jid)) + else: + await self.send_status(event, jid) + + async def check_updates(self, event, jid): + """ + Start calling for update check up. + + :param self: Self + :param event: Event + :param jid: Jabber ID + """ + while True: + print("> CHCK UPDATE",jid) + await initdb(jid, datahandler.download_updates) + await asyncio.sleep(60 * 90) + # Schedule to call this function again in 90 minutes + # self.loop.call_at(self.loop.time() + 60 * 90, self.loop.create_task, self.check_updates(event, jid)) + + async def send_update(self, event, jid): + """ + Send news items as messages. + + :param self: Self + :param event: Event + :param jid: Jabber ID + """ + new = await initdb( + jid, + sqlitehandler.get_entry_unread + ) + if new: + print("> SEND UPDATE",jid) + self.send_message( + mto=jid, + mbody=new, + mtype='chat' + ) + interval = await initdb( + jid, + sqlitehandler.get_settings_value, + 'interval' + ) + # await asyncio.sleep(60 * interval) + self.loop.call_at(self.loop.time() + 60 * interval, self.loop.create_task, self.send_update(event, jid)) + + async def send_status(self, event, jid): + """ + Send status message. + + :param self: Self + :param event: Event + :param jid: Jabber ID + """ + print("> SEND STATUS",jid) + unread = await initdb( + jid, + sqlitehandler.get_number_of_entries_unread + ) + + if unread: + msg_status = "📰 News items: {}".format(str(unread)) + typ_status = "chat" + else: + msg_status = "🗞 No News" + typ_status = "available" + + enabled = await initdb( + jid, + sqlitehandler.get_settings_value, + 'enabled' + ) + + if not enabled: + typ_status = "xa" + + # print(msg_status, 'for', jid) + self.send_presence( + pshow=typ_status, + pstatus=msg_status, + pto=jid, + #pfrom=None + ) + # await asyncio.sleep(60 * 20) + self.loop.call_at(self.loop.time() + 60 * 20, self.loop.create_task, self.send_status(event, jid)) + + + async def refresh_task(self, event, jid, key, val): + """ + Apply settings on runtime. + + :param self: Self + :param jid: Jabber ID + :param key: Key + :param val: Value + """ + if jid in task_manager: + task_manager[jid][key].cancel() + task_manager[jid][key] = self.send_update.loop.call_at( + self.send_update.loop.time() + 60 * val, + self.send_update.loop.create_task, + self.send_update(event, jid) + ) + +def print_help(): + """ + Print help manual. + """ + msg = ("Slixfeed - News syndication bot for Jabber/XMPP \n" + "\n" + "DESCRIPTION: \n" + " Slixfeed is a news aggregator bot for online news feeds. \n" + " Supported filetypes: Atom, RDF and RSS. \n" + "\n" + "BASIC USAGE: \n" + " enable \n" + " Send updates. \n" + " disable \n" + " Stop sending updates. \n" + " batch N \n" + " Send N updates on ech interval. \n" + " interval N \n" + " Send an update each N minutes. \n" + " feed list \n" + " List subscriptions. \n" + "\n" + "EDIT OPTIONS: \n" + " add URL \n" + " Add URL to subscription list. \n" + " remove ID \n" + " Remove feed from subscription list. \n" + " status ID \n" + " Toggle update status of feed. \n" + "\n" + "SEARCH OPTIONS: \n" + " search TEXT \n" + " Search news items by given keywords. \n" + " recent N \n" + " List recent N news items (up to 50 items). \n" + "\n" + "STATISTICS OPTIONS: \n" + " analyses \n" + " Show report and statistics of feeds. \n" + " obsolete \n" + " List feeds that are not available. \n" + " unread \n" + " Print number of unread news items. \n" + "\n" + "BACKUP OPTIONS: \n" + " export opml \n" + " Send an OPML file with your feeds. \n" + " backup news html\n" + " Send an HTML formatted file of your news items. \n" + " backup news md \n" + " Send a Markdown file of your news items. \n" + " backup news text \n" + " Send a Plain Text file of your news items. \n" + "\n" + "DOCUMENTATION: \n" + " Slixfeed \n" + " https://gitgud.io/sjehuda/slixfeed \n" + " Slixmpp \n" + " https://slixmpp.readthedocs.io/ \n" + " feedparser \n" + " https://pythonhosted.org/feedparser") + return msg + + +# TODO Perhaps this needs to be executed +# just once per program execution +async def initdb(jid, callback, message=None): + """ + Callback function to instantiate action on database. + + :param jid: JID (Jabber ID). + :param callback: Function name. + :param massage: Optional kwarg when a message is a part or required argument. + """ + db_dir = confighandler.get_default_dbdir() + if not os.path.isdir(db_dir): + os.mkdir(db_dir) + db_file = os.path.join(db_dir, r"{}.db".format(jid)) + sqlitehandler.create_tables(db_file) + # await sqlitehandler.set_default_values(db_file) + if message: + return await callback(db_file, message) + else: + return await callback(db_file)