Update 6 files

- /slixfeed/database.py
- /slixfeed/sqlitehandler.py
- /slixfeed/datahandler.py
- /slixfeed/confighandler.py
- /slixfeed/xmpphandler.py
- /slixfeed/__main__.py
This commit is contained in:
Schimon Jehudah 2023-11-02 05:14:01 +00:00
parent aa8c35d728
commit 8511239e2c
5 changed files with 1128 additions and 731 deletions

View file

@ -13,652 +13,26 @@
# jid = Jabber ID (XMPP) # jid = Jabber ID (XMPP)
# res = response (HTTP) # res = response (HTTP)
import os
from argparse import ArgumentParser from argparse import ArgumentParser
from asyncio.exceptions import IncompleteReadError
from datetime import date
from getpass import getpass from getpass import getpass
from http.client import IncompleteRead
from urllib import error
import asyncio
import logging import logging
import sys
import time
import aiohttp from datetime import date
from bs4 import BeautifulSoup import time
from xml.etree.ElementTree import ElementTree, ParseError
from urllib.parse import urlparse
from lxml import html
import feedparser
import slixmpp
# from eliot import start_action, to_file # from eliot import start_action, to_file
# # to_file(open("slixfeed.log", "w")) # # to_file(open("slixfeed.log", "w"))
# # with start_action(action_type="set_date()", jid=jid): # # with start_action(action_type="set_date()", jid=jid):
# # with start_action(action_type="message()", msg=msg): # # with start_action(action_type="message()", msg=msg):
import database #import irchandler
import xmpphandler
#import matrixhandler
class Slixfeed(slixmpp.ClientXMPP):
"""
Slixmpp news bot that will send updates
from feeds it receives.
"""
def __init__(self, jid, password):
slixmpp.ClientXMPP.__init__(self, jid, password)
# The session_start event will be triggered when
# the bot establishes its connection with the server
# and the XML streams are ready for use. We want to
# listen for this event so that we we can initialize
# our roster.
self.add_event_handler("session_start", self.start)
self.add_event_handler("session_start", self.send_update)
self.add_event_handler("session_start", self.send_status)
self.add_event_handler("session_start", self.check_updates)
# The message event is triggered whenever a message
# stanza is received. Be aware that that includes
# MUC messages and error messages.
self.add_event_handler("message", self.message)
self.add_event_handler("disconnected", self.reconnect)
async def start(self, event):
"""
Process the session_start event.
Typical actions for the session_start event are
requesting the roster and broadcasting an initial
presence stanza.
Arguments:
event -- An empty dictionary. The session_start
event does not provide any additional
data.
"""
self.send_presence()
await self.get_roster()
async def message(self, msg):
"""
Process incoming message stanzas. Be aware that this also
includes MUC messages and error messages. It is usually
a good idea to check the messages's type before processing
or sending replies.
Arguments:
msg -- The received message stanza. See the documentation
for stanza objects and the Message stanza to see
how it may be used.
"""
if msg['type'] in ('chat', 'normal'):
message = " ".join(msg['body'].split())
if message.lower().startswith('help'):
action = print_help()
# NOTE: Might not need it
elif message.lower().startswith('recent '):
action = await initdb(msg['from'].bare, database.last_entries, message[7:])
elif message.lower().startswith('search '):
action = await initdb( msg['from'].bare, database.search_entries, message[7:])
elif message.lower().startswith('list'):
action = await initdb(msg['from'].bare, database.list_subscriptions)
elif message.lower().startswith('add '):
action = await initdb(msg['from'].bare, add_feed, message[4:])
elif message.lower().startswith('remove '):
action = await initdb(msg['from'].bare, database.remove_feed, message[7:])
elif message.lower().startswith('status '):
action = await initdb(msg['from'].bare, database.toggle_status, message[7:])
elif message.lower().startswith('unread'):
action = await initdb(msg['from'].bare, database.statistics)
elif message.lower().startswith('enable'):
action = toggle_state(msg['from'].bare, True)
elif message.lower().startswith('disable'):
action = toggle_state(msg['from'].bare, False)
else:
action = 'Unknown command. Press "help" for list of commands'
msg.reply(action).send()
print("COMMAND:", message)
print("ACCOUNT: " + str(msg['from']))
async def check_updates(self, event):
while True:
print("Checking update")
db_dir = get_default_dbdir()
if not os.path.isdir(db_dir):
msg = ("Slixfeed can not work without a database. \n"
"To create a database, follow these steps: \n"
"Add Slixfeed contact to your roster \n"
"Send a feed to the bot by: \n"
"feed add https://reclaimthenet.org/feed/")
print(msg)
else:
files = os.listdir(db_dir)
for file in files:
jid = file[:-3]
print("download_updates",jid)
await initdb(jid, download_updates)
await asyncio.sleep(90)
async def send_update(self, event):
while True:
db_dir = get_default_dbdir()
if not os.path.isdir(db_dir):
msg = ("Slixfeed can not work without a database. \n"
"To create a database, follow these steps: \n"
"Add Slixfeed contact to your roster \n"
"Send a feed to the bot by: \n"
"feed add https://reclaimthenet.org/feed/")
print(msg)
else:
os.chdir(db_dir)
files = os.listdir()
for file in files:
if not file.endswith('.db-jour.db'):
jid = file[:-3]
print("get_entry_unread",jid)
new = await initdb(
jid,
database.get_entry_unread
)
if new:
msg = self.send_message(
mto=jid,
mbody=new,
mtype='chat'
)
unread = await initdb(
jid,
database.get_number_of_entries_unread
)
if unread:
msg_status = ('📰 News items:', str(unread))
msg_status = ' '.join(msg_status)
else:
msg_status = '🗞 No News'
print(msg_status, 'for', jid)
# Send status message
self.send_presence(
pstatus=msg_status,
pto=jid,
#pfrom=None
)
# await asyncio.sleep(15)
await asyncio.sleep(60 * 3)
async def send_status(self, event):
while True:
db_dir = get_default_dbdir()
if not os.path.isdir(db_dir):
msg = ("Slixfeed can not work without a database. \n"
"To create a database, follow these steps: \n"
"Add Slixfeed contact to your roster \n"
"Send a feed to the bot by: \n"
"feed add https://reclaimthenet.org/feed/")
print(msg)
else:
files = os.listdir(db_dir)
for file in files:
jid = file[:-3]
await asyncio.sleep(60)
def print_help():
"""
Print help manual.
"""
msg = ("Slixfeed - News syndication bot for Jabber/XMPP \n"
"\n"
"DESCRIPTION: \n"
" Slixfeed is a news aggregator bot for online news feeds. \n"
" Supported filetypes: Atom, RDF and RSS. \n"
"\n"
"BASIC USAGE: \n"
" enable \n"
" Send updates. \n"
" disable \n"
" Stop sending updates. \n"
" batch N \n"
" Send N updates on ech interval. \n"
" interval N \n"
" Send an update each N minutes. \n"
" feed list \n"
" List subscriptions. \n"
"\n"
"EDIT OPTIONS: \n"
" add URL \n"
" Add URL to subscription list. \n"
" remove ID \n"
" Remove feed from subscription list. \n"
" status ID \n"
" Toggle update status of feed. \n"
"\n"
"SEARCH OPTIONS: \n"
" search TEXT \n"
" Search news items by given keywords. \n"
" recent N \n"
" List recent N news items (up to 50 items). \n"
"\n"
"STATISTICS OPTIONS: \n"
" analyses \n"
" Show report and statistics of feeds. \n"
" obsolete \n"
" List feeds that are not available. \n"
" unread \n"
" Print number of unread news items. \n"
"\n"
"BACKUP OPTIONS: \n"
" export opml \n"
" Send an OPML file with your feeds. \n"
" backup news html\n"
" Send an HTML formatted file of your news items. \n"
" backup news md \n"
" Send a Markdown file of your news items. \n"
" backup news text \n"
" Send a Plain Text file of your news items. \n"
"\n"
"DOCUMENTATION: \n"
" Slixfeed \n"
" https://gitgud.io/sjehuda/slixfeed \n"
" Slixmpp \n"
" https://slixmpp.readthedocs.io/ \n"
" feedparser \n"
" https://pythonhosted.org/feedparser")
return msg
# Function from jarun/buku
# Arun Prakash Jana (jarun)
# Dmitry Marakasov (AMDmi3)
def get_default_dbdir():
"""Determine the directory path where dbfile will be stored.
If $XDG_DATA_HOME is defined, use it
else if $HOME exists, use it
else if the platform is Windows, use %APPDATA%
else use the current directory.
:return: Path to database file.
Note
----
This code was taken from the buku project.
"""
# data_home = xdg.BaseDirectory.xdg_data_home
data_home = os.environ.get('XDG_DATA_HOME')
if data_home is None:
if os.environ.get('HOME') is None:
if sys.platform == 'win32':
data_home = os.environ.get('APPDATA')
if data_home is None:
return os.path.abspath('.')
else:
return os.path.abspath('.')
else:
data_home = os.path.join(os.environ.get('HOME'), '.local', 'share')
return os.path.join(data_home, 'slixfeed')
# TODO Perhaps this needs to be executed
# just once per program execution
async def initdb(jid, callback, message=None):
"""
Callback function to instantiate action on database.
:param jid: JID (Jabber ID).
:param callback: Function name.
:param massage: Optional kwarg when a message is a part or required argument.
"""
db_dir = get_default_dbdir()
if not os.path.isdir(db_dir):
os.mkdir(db_dir)
db_file = os.path.join(db_dir, r"{}.db".format(jid))
database.create_tables(db_file)
if message:
return await callback(db_file, message)
else:
return await callback(db_file)
# NOTE I don't think there should be "return"
# because then we might stop scanning next URLs
async def download_updates(db_file):
"""
Chack feeds for new entries.
:param db_file: Database filename.
"""
urls = await database.get_subscriptions(db_file)
for url in urls:
# print("for url in urls")
source = url[0]
# print("source: ", source)
res = await download_feed(source)
# TypeError: 'NoneType' object is not subscriptable
if res is None:
# Skip to next feed
# urls.next()
# next(urls)
continue
await database.update_source_status(db_file, res[1], source)
if res[0]:
try:
feed = feedparser.parse(res[0])
if feed.bozo:
bozo = ("WARNING: Bozo detected for feed <{}>. "
"For more information, visit "
"https://pythonhosted.org/feedparser/bozo.html"
.format(source))
print(bozo)
valid = 0
else:
valid = 1
await database.update_source_validity(db_file, source, valid)
except (IncompleteReadError, IncompleteRead, error.URLError) as e:
print(e)
# return
# TODO Place these couple of lines back down
# NOTE Need to correct the SQL statement to do so
# NOT SURE WHETHER I MEANT THE LINES ABOVE OR BELOW
if res[1] == 200:
# NOT SURE WHETHER I MEANT THE LINES ABOVE OR BELOW
# TODO Place these couple of lines back down
# NOTE Need to correct the SQL statement to do so
entries = feed.entries
# length = len(entries)
# await database.remove_entry(db_file, source, length)
await database.remove_nonexistent_entries(db_file, feed, source)
new_entry = 0
for entry in entries:
if entry.has_key("title"):
title = entry.title
else:
title = feed["feed"]["title"]
if entry.has_key("link"):
link = entry.link
else:
link = source
# print('source:', source)
exist = await database.check_entry_exist(db_file, title, link)
if not exist:
new_entry = new_entry + 1
# TODO Enhance summary
if entry.has_key("summary"):
summary = entry.summary
# Remove HTML tags
summary = BeautifulSoup(summary, "lxml").text
# TODO Limit text length
summary = summary.replace("\n\n", "\n")[:300] + " ‍⃨"
else:
summary = '*** No summary ***'
entry = (title, summary, link, source, 0);
await database.add_entry_and_set_date(db_file, source, entry)
# print("### added", new_entry, "entries")
async def download_feed(url):
"""
Download content of given URL.
:param url: URL.
:return: Document or error message.
"""
# print("download_feed")
# time.sleep(1)
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession() as session:
# async with aiohttp.ClientSession(trust_env=True) as session:
try:
async with session.get(url, timeout=timeout) as response:
status = response.status
if response.status == 200:
try:
doc = await response.text()
# print (response.content_type)
return [doc, status]
except:
return [False, "The content of this document doesn't appear to be textual"]
else:
return [False, "HTTP Error: " + str(status)]
except aiohttp.ClientError as e:
print('Error', str(e))
return [False, "Error: " + str(e)]
except asyncio.TimeoutError as e:
print('Timeout', str(e))
return [False, "Timeout"]
async def add_feed(db_file, url):
"""
Check whether feed exist, otherwise process it.
:param db_file: Database filename.
:param url: URL.
:return: Status message.
"""
exist = await database.check_feed_exist(db_file, url)
if not exist:
res = await download_feed(url)
if res[0]:
feed = feedparser.parse(res[0])
if feed.bozo:
bozo = ("WARNING: Bozo detected. Failed to load <{}>.".format(url))
print(bozo)
try:
# tree = etree.fromstring(res[0]) # etree is for xml
tree = html.fromstring(res[0])
except:
return "Failed to parse URL <{}> as feed".format(url)
print("RSS Auto-Discovery Engaged")
xpath_query = """//link[(@rel="alternate") and (@type="application/atom+xml" or @type="application/rdf+xml" or @type="application/rss+xml")]"""
# xpath_query = """//link[(@rel="alternate") and (@type="application/atom+xml" or @type="application/rdf+xml" or @type="application/rss+xml")]/@href"""
# xpath_query = "//link[@rel='alternate' and @type='application/atom+xml' or @rel='alternate' and @type='application/rss+xml' or @rel='alternate' and @type='application/rdf+xml']/@href"
feeds = tree.xpath(xpath_query)
if len(feeds) > 1:
msg = "RSS Auto-Discovery has found {} feeds:\n\n".format(len(feeds))
for feed in feeds:
# # The following code works;
# # The following code will catch
# # only valid resources (i.e. not 404);
# # The following code requires more bandwidth.
# res = await download_feed(feed)
# if res[0]:
# disco = feedparser.parse(res[0])
# title = disco["feed"]["title"]
# msg += "{} \n {} \n\n".format(title, feed)
feed_name = feed.xpath('@title')[0]
feed_addr = feed.xpath('@href')[0]
msg += "{}\n{}\n\n".format(feed_name, feed_addr)
msg += "The above feeds were extracted from\n{}".format(url)
return msg
elif feeds:
url = feeds[0].xpath('@href')[0]
# Why wouldn't add_feed return a message
# upon success unless return is explicitly
# mentioned, yet upon failure it wouldn't?
return await add_feed(db_file, url)
# Search for feeds by file extension and path
paths = [
"/app.php/feed", # phpbb
"/atom",
"/atom.php",
"/atom.xml",
"/content-feeds/",
"/external.php?type=RSS2",
"/feed", # good practice
"/feed.atom",
# "/feed.json",
"/feed.php",
"/feed.rdf",
"/feed.rss",
"/feed.xml",
"/feed/atom/",
"/feeds/news_feed",
"/feeds/rss/news.xml.php",
"/forum_rss.php",
"/index.php/feed",
"/index.php?type=atom;action=.xml", #smf
"/index.php?type=rss;action=.xml", #smf
"/index.rss",
"/latest.rss",
"/news",
"/news.xml",
"/news.xml.php",
"/news/feed",
"/posts.rss", # discourse
"/rdf",
"/rdf.php",
"/rdf.xml",
"/rss",
# "/rss.json",
"/rss.php",
"/rss.xml",
"/timeline.rss",
"/xml/feed.rss",
# "?format=atom",
# "?format=rdf",
# "?format=rss",
# "?format=xml"
]
print("RSS Scan Mode Engaged")
feeds = {}
for path in paths:
# xpath_query = "//*[@*[contains(.,'{}')]]".format(path)
xpath_query = "//a[contains(@href,'{}')]".format(path)
addresses = tree.xpath(xpath_query)
parted_url = urlparse(url)
for address in addresses:
address = address.xpath('@href')[0]
if address.startswith('/'):
address = parted_url.scheme + '://' + parted_url.netloc + address
res = await download_feed(address)
if res[1] == 200:
try:
feeds[address] = feedparser.parse(res[0])["feed"]["title"]
except:
continue
if len(feeds) > 1:
msg = "RSS URL scan has found {} feeds:\n\n".format(len(feeds))
for feed in feeds:
# try:
# res = await download_feed(feed)
# except:
# continue
feed_name = feeds[feed]
feed_addr = feed
msg += "{}\n{}\n\n".format(feed_name, feed_addr)
msg += "The above feeds were extracted from\n{}".format(url)
return msg
elif feeds:
url = list(feeds)[0]
return await add_feed(db_file, url)
# (HTTP) Request(s) Paths
print("RSS Arbitrary Mode Engaged")
feeds = {}
parted_url = urlparse(url)
for path in paths:
address = parted_url.scheme + '://' + parted_url.netloc + path
res = await download_feed(address)
if res[1] == 200:
# print(feedparser.parse(res[0])["feed"]["title"])
# feeds[address] = feedparser.parse(res[0])["feed"]["title"]
try:
title = feedparser.parse(res[0])["feed"]["title"]
except:
title = '*** No Title ***'
feeds[address] = title
# Check whether URL has path (i.e. not root)
if parted_url.path.split('/')[1]:
paths.extend([".atom", ".feed", ".rdf", ".rss"]) if '.rss' not in paths else -1
# if paths.index('.rss'):
# paths.extend([".atom", ".feed", ".rdf", ".rss"])
address = parted_url.scheme + '://' + parted_url.netloc + '/' + parted_url.path.split('/')[1] + path
res = await download_feed(address)
if res[1] == 200:
print('ATTENTION')
print(address)
try:
title = feedparser.parse(res[0])["feed"]["title"]
except:
title = '*** No Title ***'
feeds[address] = title
if len(feeds) > 1:
msg = "RSS URL discovery has found {} feeds:\n\n".format(len(feeds))
for feed in feeds:
feed_name = feeds[feed]
feed_addr = feed
msg += "{}\n{}\n\n".format(feed_name, feed_addr)
msg += "The above feeds were extracted from\n{}".format(url)
return msg
elif feeds:
url = list(feeds)[0]
return await add_feed(db_file, url)
else:
return "No news feeds were found for URL <{}>.".format(url)
else:
return await database.add_feed(db_file, feed, url, res)
else:
return "Failed to get URL <{}>. Reason: {}".format(url, res[1])
else:
ix = exist[0]
return "News source <{}> is already listed in the subscription list at index {}".format(url, ix)
def toggle_state(jid, state):
"""
Set status of update.
:param jid: JID (Jabber ID).
:param state: True or False.
:return: Status message.
"""
db_dir = get_default_dbdir()
db_file = os.path.join(db_dir, r"{}.db".format(jid))
bk_file = os.path.join(db_dir, r"{}.db.bak".format(jid))
if state:
if os.path.exists(db_file):
return "Updates are already enabled"
elif os.path.exists(bk_file):
os.renames(bk_file, db_file)
return "Updates are now enabled"
else:
if os.path.exists(bk_file):
return "Updates are already disabled"
elif os.path.exists(db_file):
os.renames(db_file, bk_file)
return "Updates are now disabled"
if __name__ == '__main__': if __name__ == '__main__':
# Setup the command line arguments. # Setup the command line arguments.
parser = ArgumentParser(description=Slixfeed.__doc__) parser = ArgumentParser(description=xmpphandler.Slixfeed.__doc__)
# Output verbosity options. # Output verbosity options.
parser.add_argument( parser.add_argument(
@ -692,7 +66,7 @@ if __name__ == '__main__':
# Setup the Slixfeed and register plugins. Note that while plugins may # Setup the Slixfeed and register plugins. Note that while plugins may
# have interdependencies, the order in which you register them does # have interdependencies, the order in which you register them does
# not matter. # not matter.
xmpp = Slixfeed(args.jid, args.password) xmpp = xmpphandler.Slixfeed(args.jid, args.password)
xmpp.register_plugin('xep_0004') # Data Forms xmpp.register_plugin('xep_0004') # Data Forms
xmpp.register_plugin('xep_0030') # Service Discovery xmpp.register_plugin('xep_0030') # Service Discovery
xmpp.register_plugin('xep_0045') # Multi-User Chat xmpp.register_plugin('xep_0045') # Multi-User Chat

156
slixfeed/confighandler.py Normal file
View file

@ -0,0 +1,156 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
def get_default_dbdir():
"""
Determine the directory path where dbfile will be stored.
If $XDG_DATA_HOME is defined, use it
else if $HOME exists, use it
else if the platform is Windows, use %APPDATA%
else use the current directory.
:return: Path to database file.
Note
----
This code was taken from the buku project.
* Arun Prakash Jana (jarun)
* Dmitry Marakasov (AMDmi3)
"""
# data_home = xdg.BaseDirectory.xdg_data_home
data_home = os.environ.get('XDG_DATA_HOME')
if data_home is None:
if os.environ.get('HOME') is None:
if sys.platform == 'win32':
data_home = os.environ.get('APPDATA')
if data_home is None:
return os.path.abspath('.')
else:
return os.path.abspath('.')
else:
data_home = os.path.join(os.environ.get('HOME'), '.local', 'share')
return os.path.join(data_home, 'slixfeed')
def get_default_confdir():
"""
Determine the directory path where configuration will be stored.
If $XDG_CONFIG_HOME is defined, use it
else if $HOME exists, use it
else if the platform is Windows, use %APPDATA%
else use the current directory.
:return: Path to configueation directory.
"""
# config_home = xdg.BaseDirectory.xdg_config_home
config_home = os.environ.get('XDG_CONFIG_HOME')
if config_home is None:
if os.environ.get('HOME') is None:
if sys.platform == 'win32':
config_home = os.environ.get('APPDATA')
if config_home is None:
return os.path.abspath('.')
else:
return os.path.abspath('.')
else:
config_home = os.path.join(os.environ.get('HOME'), '.config')
return os.path.join(config_home, 'slixfeed')
async def get_value_default(key):
"""
Get settings default value.
:param key: "enabled", "interval", "quantum".
:return: Integer.
"""
if key == "enabled":
result = 1
elif key == "quantum":
result = 4
elif key == "interval":
result = 30
return result
# async def generate_dictionary():
def get_default_list():
"""
Generate a dictionary file.
:return: List.
"""
paths = [
".atom",
".rss",
".xml",
"/?feed=atom",
"/?feed=rdf",
"/?feed=rss",
"/?feed=xml", # wordpress
"/?format=atom",
"/?format=rdf",
"/?format=rss",
"/?format=xml", # phpbb
"/app.php/feed",
"/atom",
"/atom.php",
"/atom.xml",
"/blog/feed/",
"/content-feeds/",
"/external.php?type=RSS2",
"/en/feed/",
"/feed", # good practice
"/feed.atom",
# "/feed.json",
"/feed.php",
"/feed.rdf",
"/feed.rss",
"/feed.xml",
"/feed/atom/",
"/feeds/news_feed",
"/feeds/posts/default",
"/feeds/posts/default?alt=atom",
"/feeds/posts/default?alt=rss",
"/feeds/rss/news.xml.php",
"/forum_rss.php",
"/index.atom",
"/index.php/feed",
"/index.php?type=atom;action=.xml", #smf
"/index.php?type=rss;action=.xml", #smf
"/index.rss",
"/jekyll/feed.xml",
"/latest.rss",
"/news",
"/news.xml",
"/news.xml.php",
"/news/feed",
"/posts.rss", # discourse
"/rdf",
"/rdf.php",
"/rdf.xml",
"/rss",
# "/rss.json",
"/rss.php",
"/rss.xml",
"/timeline.rss",
"/videos.atom",
# "/videos.json",
"/videos.xml",
"/xml/feed.rss"
]
return paths
# cfg_dir = get_default_confdir()
# if not os.path.isdir(cfg_dir):
# os.mkdir(cfg_dir)
# cfg_file = os.path.join(cfg_dir, r"url_paths.txt")
# if not os.path.isfile(cfg_file):
# file = open(cfg_file, "w")
# file.writelines("\n".join(paths))
# file.close()

300
slixfeed/datahandler.py Normal file
View file

@ -0,0 +1,300 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import feedparser
import aiohttp
import asyncio
import os
import sqlitehandler
import confighandler
from http.client import IncompleteRead
from asyncio.exceptions import IncompleteReadError
from urllib import error
from bs4 import BeautifulSoup
# from xml.etree.ElementTree import ElementTree, ParseError
from urllib.parse import urlparse
from lxml import html
async def download_updates(db_file):
"""
Check feeds for new entries.
:param db_file: Database filename.
"""
urls = await sqlitehandler.get_subscriptions(db_file)
for url in urls:
# print(os.path.basename(db_file), url[0])
source = url[0]
res = await download_feed(source)
# TypeError: 'NoneType' object is not subscriptable
if res is None:
# Skip to next feed
# urls.next()
# next(urls)
continue
await sqlitehandler.update_source_status(db_file, res[1], source)
if res[0]:
try:
feed = feedparser.parse(res[0])
if feed.bozo:
# bozo = ("WARNING: Bozo detected for feed <{}>. "
# "For more information, visit "
# "https://pythonhosted.org/feedparser/bozo.html"
# .format(source))
# print(bozo)
valid = 0
else:
valid = 1
await sqlitehandler.update_source_validity(db_file, source, valid)
except (IncompleteReadError, IncompleteRead, error.URLError) as e:
print(e)
# NOTE I don't think there should be "return"
# because then we might stop scanning next URLs
# return
# TODO Place these couple of lines back down
# NOTE Need to correct the SQL statement to do so
# NOT SURE WHETHER I MEANT THE LINES ABOVE OR BELOW
if res[1] == 200:
# NOT SURE WHETHER I MEANT THE LINES ABOVE OR BELOW
# TODO Place these couple of lines back down
# NOTE Need to correct the SQL statement to do so
entries = feed.entries
# length = len(entries)
# await sqlitehandler.remove_entry(db_file, source, length)
await sqlitehandler.remove_nonexistent_entries(db_file, feed, source)
new_entry = 0
for entry in entries:
if entry.has_key("title"):
title = entry.title
else:
title = feed["feed"]["title"]
if entry.has_key("link"):
link = entry.link
else:
link = source
exist = await sqlitehandler.check_entry_exist(db_file, title, link)
if not exist:
new_entry = new_entry + 1
# TODO Enhance summary
if entry.has_key("summary"):
summary = entry.summary
# Remove HTML tags
summary = BeautifulSoup(summary, "lxml").text
# TODO Limit text length
summary = summary.replace("\n\n", "\n")[:300] + " ‍⃨"
else:
summary = '*** No summary ***'
entry = (title, summary, link, source, 0);
await sqlitehandler.add_entry_and_set_date(db_file, source, entry)
async def add_feed(db_file, url):
"""
Check whether feed exist, otherwise process it.
:param db_file: Database filename.
:param url: URL.
:return: Status message.
"""
exist = await sqlitehandler.check_feed_exist(db_file, url)
if not exist:
res = await download_feed(url)
if res[0]:
feed = feedparser.parse(res[0])
title = await get_title(url, feed)
if feed.bozo:
bozo = ("WARNING: Bozo detected. Failed to load <{}>.".format(url))
print(bozo)
try:
# tree = etree.fromstring(res[0]) # etree is for xml
tree = html.fromstring(res[0])
except:
return "Failed to parse URL <{}> as feed".format(url)
print("RSS Auto-Discovery Engaged")
xpath_query = """//link[(@rel="alternate") and (@type="application/atom+xml" or @type="application/rdf+xml" or @type="application/rss+xml")]"""
# xpath_query = """//link[(@rel="alternate") and (@type="application/atom+xml" or @type="application/rdf+xml" or @type="application/rss+xml")]/@href"""
# xpath_query = "//link[@rel='alternate' and @type='application/atom+xml' or @rel='alternate' and @type='application/rss+xml' or @rel='alternate' and @type='application/rdf+xml']/@href"
feeds = tree.xpath(xpath_query)
if len(feeds) > 1:
msg = "RSS Auto-Discovery has found {} feeds:\n\n".format(len(feeds))
for feed in feeds:
# # The following code works;
# # The following code will catch
# # only valid resources (i.e. not 404);
# # The following code requires more bandwidth.
# res = await download_feed(feed)
# if res[0]:
# disco = feedparser.parse(res[0])
# title = disco["feed"]["title"]
# msg += "{} \n {} \n\n".format(title, feed)
feed_name = feed.xpath('@title')[0]
feed_addr = feed.xpath('@href')[0]
msg += "{}\n{}\n\n".format(feed_name, feed_addr)
msg += "The above feeds were extracted from\n{}".format(url)
return msg
elif feeds:
url = feeds[0].xpath('@href')[0]
# Why wouldn't add_feed return a message
# upon success unless return is explicitly
# mentioned, yet upon failure it wouldn't?
return await add_feed(db_file, url)
print("RSS Scan Mode Engaged")
feeds = {}
paths = []
# TODO Test
cfg_dir = confighandler.get_default_confdir()
if not os.path.isdir(cfg_dir):
os.mkdir(cfg_dir)
cfg_file = os.path.join(cfg_dir, r"url_paths.txt")
if not os.path.isfile(cfg_file):
# confighandler.generate_dictionary()
list = confighandler.get_default_list()
file = open(cfg_file, "w")
file.writelines("\n".join(list))
file.close()
file = open(cfg_file, "r")
lines = file.readlines()
for line in lines:
paths.extend([line.strip()])
for path in paths:
# xpath_query = "//*[@*[contains(.,'{}')]]".format(path)
xpath_query = "//a[contains(@href,'{}')]".format(path)
addresses = tree.xpath(xpath_query)
parted_url = urlparse(url)
# NOTE Should number of addresses be limited or
# perhaps be N from the start and N from the end
for address in addresses:
address = address.xpath('@href')[0]
if address.startswith('/'):
address = parted_url.scheme + '://' + parted_url.netloc + address
res = await download_feed(address)
if res[1] == 200:
try:
feeds[address] = feedparser.parse(res[0])["feed"]["title"]
except:
continue
if len(feeds) > 1:
msg = "RSS URL scan has found {} feeds:\n\n".format(len(feeds))
for feed in feeds:
# try:
# res = await download_feed(feed)
# except:
# continue
feed_name = feeds[feed]
feed_addr = feed
msg += "{}\n{}\n\n".format(feed_name, feed_addr)
msg += "The above feeds were extracted from\n{}".format(url)
return msg
elif feeds:
url = list(feeds)[0]
return await add_feed(db_file, url)
# (HTTP) Request(s) Paths
print("RSS Arbitrary Mode Engaged")
feeds = {}
parted_url = urlparse(url)
for path in paths:
address = parted_url.scheme + '://' + parted_url.netloc + path
res = await download_feed(address)
if res[1] == 200:
# print(feedparser.parse(res[0])["feed"]["title"])
# feeds[address] = feedparser.parse(res[0])["feed"]["title"]
try:
title = feedparser.parse(res[0])["feed"]["title"]
except:
title = '*** No Title ***'
feeds[address] = title
# Check whether URL has path (i.e. not root)
if parted_url.path.split('/')[1]:
paths.extend([".atom", ".feed", ".rdf", ".rss"]) if '.rss' not in paths else -1
# if paths.index('.rss'):
# paths.extend([".atom", ".feed", ".rdf", ".rss"])
address = parted_url.scheme + '://' + parted_url.netloc + '/' + parted_url.path.split('/')[1] + path
res = await download_feed(address)
if res[1] == 200:
try:
title = feedparser.parse(res[0])["feed"]["title"]
except:
title = '*** No Title ***'
feeds[address] = title
if len(feeds) > 1:
msg = "RSS URL discovery has found {} feeds:\n\n".format(len(feeds))
for feed in feeds:
feed_name = feeds[feed]
feed_addr = feed
msg += "{}\n{}\n\n".format(feed_name, feed_addr)
msg += "The above feeds were extracted from\n{}".format(url)
elif feeds:
url = list(feeds)[0]
msg = await add_feed(db_file, url)
else:
msg = "No news feeds were found for URL <{}>.".format(url)
else:
msg = await sqlitehandler.add_feed(db_file, title, url, res)
else:
msg = "Failed to get URL <{}>. Reason: {}".format(url, res[1])
else:
ix = exist[0]
name = exist[1]
msg = "> {}\nNews source \"{}\" is already listed in the subscription list at index {}".format(url, name, ix)
return msg
async def download_feed(url):
"""
Download content of given URL.
:param url: URL.
:return: Document or error message.
"""
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession() as session:
# async with aiohttp.ClientSession(trust_env=True) as session:
try:
async with session.get(url, timeout=timeout) as response:
status = response.status
if response.status == 200:
try:
doc = await response.text()
# print (response.content_type)
return [doc, status]
except:
# return [False, "The content of this document doesn't appear to be textual."]
return [False, "Document is too large or is not textual."]
else:
return [False, "HTTP Error: " + str(status)]
except aiohttp.ClientError as e:
print('Error', str(e))
return [False, "Error: " + str(e)]
except asyncio.TimeoutError as e:
# print('Timeout:', str(e))
return [False, "Timeout: " + str(e)]
async def get_title(url, feed):
"""
Get title of feed.
:param url: URL
:param feed: Parsed feed
:return: Title or URL hostname.
"""
try:
title = feed["feed"]["title"]
except:
title = urlparse(url).netloc
return title

View file

@ -2,12 +2,13 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import sqlite3 import sqlite3
from sqlite3 import Error
import asyncio import asyncio
from sqlite3 import Error
from datetime import date from datetime import date
import confighandler
# from eliot import start_action, to_file # from eliot import start_action, to_file
# # with start_action(action_type="list_subscriptions()", db=db_file): # # with start_action(action_type="list_subscriptions()", db=db_file):
# # with start_action(action_type="last_entries()", num=num): # # with start_action(action_type="last_entries()", num=num):
@ -71,12 +72,18 @@ def create_tables(db_file):
# title text NOT NULL, # title text NOT NULL,
# number integer # number integer
# ); """ # ); """
settings_table_sql = """
c = conn.cursor() CREATE TABLE IF NOT EXISTS settings (
# c = get_cursor(db_file) id integer PRIMARY KEY,
c.execute(feeds_table_sql) key text NOT NULL,
c.execute(entries_table_sql) value integer
# c.execute(statistics_table_sql) ); """
cur = conn.cursor()
# cur = get_cursor(db_file)
cur.execute(feeds_table_sql)
cur.execute(entries_table_sql)
# cur.execute(statistics_table_sql)
cur.execute(settings_table_sql)
def get_cursor(db_file): def get_cursor(db_file):
@ -95,12 +102,12 @@ def get_cursor(db_file):
return CURSORS[db_file] return CURSORS[db_file]
async def add_feed(db_file, feed, url, res): async def add_feed(db_file, title, url, res):
""" """
Add a new feed into the feeds table. Add a new feed into the feeds table.
:param db_file: Database filename. :param db_file: Database filename.
:param feed: Parsed XML document. :param title: Feed title.
:param url: URL. :param url: URL.
:param res: XML document. :param res: XML document.
:return: Message. :return: Message.
@ -120,14 +127,15 @@ async def add_feed(db_file, feed, url, res):
async with DBLOCK: async with DBLOCK:
with create_connection(db_file) as conn: with create_connection(db_file) as conn:
cur = conn.cursor() cur = conn.cursor()
title = feed["feed"]["title"] # title = feed["feed"]["title"]
feed = (title, url, 1, res[1], 1) feed = (title, url, 1, res[1], 1)
sql = """INSERT INTO feeds(name,address,enabled,status,valid) sql = """INSERT INTO feeds(name, address, enabled, status, valid)
VALUES(?,?,?,?,?) """ VALUES(?, ?, ?, ?, ?) """
cur.execute(sql, feed) cur.execute(sql, feed)
source = title if title else '<' + url + '>' source = title if title else '<' + url + '>'
msg = 'News source "{}" has been added to subscription list'.format(source) msg = """> {}\nNews source \"{}\" has been added to subscription list.
""".format(url, source)
return msg return msg
@ -144,18 +152,21 @@ async def remove_feed(db_file, ix):
cur = conn.cursor() cur = conn.cursor()
try: try:
sql = "SELECT address FROM feeds WHERE id = ?" sql = "SELECT address FROM feeds WHERE id = ?"
url = cur.execute(sql, (ix,)) # cur
for i in url: # for i in url:
url = i[0] # url = i[0]
url = cur.execute(sql, (ix,)).fetchone()[0]
sql = "SELECT name FROM feeds WHERE id = ?"
name = cur.execute(sql, (ix,)).fetchone()[0]
# NOTE Should we move DBLOCK to this line? 2022-12-23 # NOTE Should we move DBLOCK to this line? 2022-12-23
sql = "DELETE FROM entries WHERE source = ?" sql = "DELETE FROM entries WHERE source = ?"
cur.execute(sql, (url,)) cur.execute(sql, (url,))
sql = "DELETE FROM feeds WHERE id = ?" sql = "DELETE FROM feeds WHERE id = ?"
cur.execute(sql, (ix,)) cur.execute(sql, (ix,))
return """News source <{}> has been removed from subscription list msg = "> {}\nNews source \"{}\" has been removed from subscription list.".format(url, name)
""".format(url)
except: except:
return """No news source with ID {}""".format(ix) msg = "No news source with ID {}.".format(ix)
return msg
async def check_feed_exist(db_file, url): async def check_feed_exist(db_file, url):
@ -165,12 +176,12 @@ async def check_feed_exist(db_file, url):
:param db_file: Database filename. :param db_file: Database filename.
:param url: URL. :param url: URL.
:return: SQL row or None. :return: Index ID and Name or None.
""" """
cur = get_cursor(db_file) cur = get_cursor(db_file)
sql = "SELECT id FROM feeds WHERE address = ?" sql = "SELECT id, name FROM feeds WHERE address = ?"
cur.execute(sql, (url,)) result = cur.execute(sql, (url,)).fetchone()
return cur.fetchone() return result
async def get_number_of_items(db_file, str): async def get_number_of_items(db_file, str):
@ -184,8 +195,22 @@ async def get_number_of_items(db_file, str):
with create_connection(db_file) as conn: with create_connection(db_file) as conn:
cur = conn.cursor() cur = conn.cursor()
sql = "SELECT count(id) FROM {}".format(str) sql = "SELECT count(id) FROM {}".format(str)
count = cur.execute(sql) count = cur.execute(sql).fetchone()[0]
count = cur.fetchone()[0] return count
async def get_number_of_feeds_active(db_file):
"""
Return number of active feeds.
:param db_file: Database filename.
:param cur: Cursor object.
:return: Number of rows.
"""
with create_connection(db_file) as conn:
cur = conn.cursor()
sql = "SELECT count(id) FROM feeds WHERE enabled = 1"
count = cur.execute(sql).fetchone()[0]
return count return count
@ -200,8 +225,7 @@ async def get_number_of_entries_unread(db_file):
with create_connection(db_file) as conn: with create_connection(db_file) as conn:
cur = conn.cursor() cur = conn.cursor()
sql = "SELECT count(id) FROM entries WHERE read = 0" sql = "SELECT count(id) FROM entries WHERE read = 0"
count = cur.execute(sql) count = cur.execute(sql).fetchone()[0]
count = cur.fetchone()[0]
return count return count
@ -214,26 +238,18 @@ async def get_entry_unread(db_file):
""" """
with create_connection(db_file) as conn: with create_connection(db_file) as conn:
cur = conn.cursor() cur = conn.cursor()
entry = []
sql = "SELECT id FROM entries WHERE read = 0" sql = "SELECT id FROM entries WHERE read = 0"
ix = cur.execute(sql).fetchone() ix = cur.execute(sql).fetchone()
if ix is None: if ix is None:
return False return False
ix = ix[0] ix = ix[0]
sql = "SELECT title FROM entries WHERE id = :id" sql = "SELECT title FROM entries WHERE id = :id"
cur.execute(sql, (ix,)) title = cur.execute(sql, (ix,)).fetchone()[0]
title = cur.fetchone()[0]
entry.append(title)
sql = "SELECT summary FROM entries WHERE id = :id" sql = "SELECT summary FROM entries WHERE id = :id"
cur.execute(sql, (ix,)) summary = cur.execute(sql, (ix,)).fetchone()[0]
summary = cur.fetchone()[0]
entry.append(summary)
sql = "SELECT link FROM entries WHERE id = :id" sql = "SELECT link FROM entries WHERE id = :id"
cur.execute(sql, (ix,)) link = cur.execute(sql, (ix,)).fetchone()[0]
link = cur.fetchone()[0] entry = "{}\n\n{}\n\n{}".format(title, summary, link)
entry.append(link)
entry = "{}\n\n{}\n\n{}".format(entry[0], entry[1], entry[2])
# print(entry)
async with DBLOCK: async with DBLOCK:
await mark_as_read(cur, ix) await mark_as_read(cur, ix)
# async with DBLOCK: # async with DBLOCK:
@ -260,11 +276,23 @@ async def statistics(db_file):
:return: News item as message. :return: News item as message.
""" """
feeds = await get_number_of_items(db_file, 'feeds') feeds = await get_number_of_items(db_file, 'feeds')
active_feeds = await get_number_of_feeds_active(db_file)
entries = await get_number_of_items(db_file, 'entries') entries = await get_number_of_items(db_file, 'entries')
unread_entries = await get_number_of_entries_unread(db_file) unread_entries = await get_number_of_entries_unread(db_file)
return "You have {} unread news items out of {} from {} news sources.".format(unread_entries, entries, feeds) # msg = """You have {} unread news items out of {} from {} news sources.
# """.format(unread_entries, entries, feeds)
with create_connection(db_file) as conn:
cur = conn.cursor()
sql = "SELECT value FROM settings WHERE key = \"enabled\""
status = cur.execute(sql).fetchone()[0]
sql = "SELECT value FROM settings WHERE key = \"interval\""
interval = cur.execute(sql).fetchone()[0]
msg = """News items: {} ({})\nNews sources: {} ({})\nUpdate interval: {}\nOperation status: {}
""".format(unread_entries, entries, active_feeds, feeds, interval, status)
return msg
#TODO statistics
async def update_statistics(cur): async def update_statistics(cur):
""" """
Update table statistics. Update table statistics.
@ -283,8 +311,7 @@ async def update_statistics(cur):
cur.execute(sql, {"title": i, "num": stat_dict[i]}) cur.execute(sql, {"title": i, "num": stat_dict[i]})
else: else:
sql = "SELECT count(id) FROM statistics" sql = "SELECT count(id) FROM statistics"
count = cur.execute(sql) count = cur.execute(sql).fetchone()[0]
count = cur.fetchone()[0]
ix = count + 1 ix = count + 1
sql = "INSERT INTO statistics VALUES(?,?,?)" sql = "INSERT INTO statistics VALUES(?,?,?)"
cur.execute(sql, (ix, i, stat_dict[i])) cur.execute(sql, (ix, i, stat_dict[i]))
@ -302,14 +329,13 @@ async def toggle_status(db_file, ix):
async with DBLOCK: async with DBLOCK:
with create_connection(db_file) as conn: with create_connection(db_file) as conn:
cur = conn.cursor() cur = conn.cursor()
try:
#cur = get_cursor(db_file) #cur = get_cursor(db_file)
sql = "SELECT name FROM feeds WHERE id = :id" sql = "SELECT name FROM feeds WHERE id = :id"
cur.execute(sql, (ix,)) title = cur.execute(sql, (ix,)).fetchone()[0]
title = cur.fetchone()[0]
sql = "SELECT enabled FROM feeds WHERE id = ?" sql = "SELECT enabled FROM feeds WHERE id = ?"
# NOTE [0][1][2] # NOTE [0][1][2]
cur.execute(sql, (ix,)) status = cur.execute(sql, (ix,)).fetchone()[0]
status = cur.fetchone()[0]
# FIXME always set to 1 # FIXME always set to 1
# NOTE Maybe because is not integer # NOTE Maybe because is not integer
# TODO Reset feed table before further testing # TODO Reset feed table before further testing
@ -321,7 +347,10 @@ async def toggle_status(db_file, ix):
state = "enabled" state = "enabled"
sql = "UPDATE feeds SET enabled = :status WHERE id = :id" sql = "UPDATE feeds SET enabled = :status WHERE id = :id"
cur.execute(sql, {"status": status, "id": ix}) cur.execute(sql, {"status": status, "id": ix})
return "Updates for '{}' are now {}".format(title, state) msg = "Updates for '{}' are now {}.".format(title, state)
except:
msg = "No news source with ID {}.".format(ix)
return msg
async def set_date(cur, url): async def set_date(cur, url):
@ -377,8 +406,8 @@ async def add_entry(cur, entry):
:param cur: Cursor object. :param cur: Cursor object.
:param entry: :param entry:
""" """
sql = """ INSERT INTO entries(title,summary,link,source,read) sql = """ INSERT INTO entries(title, summary, link, source, read)
VALUES(?,?,?,?,?) """ VALUES(?, ?, ?, ?, ?) """
cur.execute(sql, entry) cur.execute(sql, entry)
@ -403,8 +432,7 @@ async def remove_entry(db_file, source, length):
with create_connection(db_file) as conn: with create_connection(db_file) as conn:
cur = conn.cursor() cur = conn.cursor()
sql = "SELECT count(id) FROM entries WHERE source = ?" sql = "SELECT count(id) FROM entries WHERE source = ?"
count = cur.execute(sql, (source,)) count = cur.execute(sql, (source,)).fetchone()[0]
count = cur.fetchone()[0]
limit = count - length limit = count - length
if limit: if limit:
limit = limit; limit = limit;
@ -430,18 +458,10 @@ async def remove_nonexistent_entries(db_file, feed, source):
with create_connection(db_file) as conn: with create_connection(db_file) as conn:
cur = conn.cursor() cur = conn.cursor()
sql = "SELECT id, title, link FROM entries WHERE source = ?" sql = "SELECT id, title, link FROM entries WHERE source = ?"
cur.execute(sql, (source,)) entries_db = cur.execute(sql, (source,)).fetchall()
entries_db = cur.fetchall()
# print('entries_db')
# print(entries_db)
for entry_db in entries_db: for entry_db in entries_db:
# entry_db[1] = id
# entry_db[2] = title
# entry_db[3] = link
exist = False exist = False
# print("check-db")
for entry_feed in feed.entries: for entry_feed in feed.entries:
# print("check-feed")
# TODO better check and don't repeat code # TODO better check and don't repeat code
if entry_feed.has_key("title"): if entry_feed.has_key("title"):
title = entry_feed.title title = entry_feed.title
@ -454,18 +474,13 @@ async def remove_nonexistent_entries(db_file, feed, source):
link = source link = source
# TODO better check and don't repeat code # TODO better check and don't repeat code
if entry_db[1] == title and entry_db[2] == link: if entry_db[1] == title and entry_db[2] == link:
# print('exist')
# print(title)
exist = True exist = True
break break
if not exist: if not exist:
# print('>>> not exist')
# print(entry_db[1])
# TODO Send to table archive # TODO Send to table archive
# TODO Also make a regular/routine check for sources that have been changed (though that can only happen when manually editing) # TODO Also make a regular/routine check for sources that have been changed (though that can only happen when manually editing)
sql = "DELETE FROM entries WHERE id = ?" sql = "DELETE FROM entries WHERE id = ?"
cur.execute(sql, (entry_db[0],)) cur.execute(sql, (entry_db[0],))
# breakpoint()
async def get_subscriptions(db_file): async def get_subscriptions(db_file):
@ -478,8 +493,8 @@ async def get_subscriptions(db_file):
with create_connection(db_file) as conn: with create_connection(db_file) as conn:
cur = conn.cursor() cur = conn.cursor()
sql = "SELECT address FROM feeds WHERE enabled = 1" sql = "SELECT address FROM feeds WHERE enabled = 1"
cur.execute(sql) result = cur.execute(sql).fetchall()
return cur.fetchall() return result
async def list_subscriptions(db_file): async def list_subscriptions(db_file):
@ -570,9 +585,111 @@ async def check_entry_exist(db_file, title, link):
:param db_file: Database filename. :param db_file: Database filename.
:param link: Entry URL. :param link: Entry URL.
:param title: Entry title. :param title: Entry title.
:return: SQL row or None. :return: Index ID or None.
""" """
cur = get_cursor(db_file) cur = get_cursor(db_file)
sql = "SELECT id FROM entries WHERE title = :title and link = :link" sql = "SELECT id FROM entries WHERE title = :title and link = :link"
cur.execute(sql, {"title": title, "link": link}) result = cur.execute(sql, {"title": title, "link": link}).fetchone()
return cur.fetchone() return result
# TODO dictionary
# settings = {
# "enabled" : {
# "message": "Updates are {}".format(status),
# "value": val
# },
# "interval" : {
# "message": "Updates will be sent every {} minutes".format(val),
# "value": val
# },
# "quantom" : {
# "message": "Every updates will contain {} news items".format(val),
# "value": val
# }
# }
async def set_settings_value(db_file, key_value):
"""
Set settings value.
:param db_file: Database filename.
:param key_value: List of key ("enabled", "interval", "quantum") and value (Integer).
:return: Message.
"""
# if isinstance(key_value, list):
# key = key_value[0]
# val = key_value[1]
# elif key_value == "enable":
# key = "enabled"
# val = 1
# else:
# key = "enabled"
# val = 0
key = key_value[0]
val = key_value[1]
async with DBLOCK:
with create_connection(db_file) as conn:
cur = conn.cursor()
await set_settings_value_default(cur, key)
sql = "UPDATE settings SET value = :value WHERE key = :key"
cur.execute(sql, {"key": key, "value": val})
if key == 'quantum':
msg = "Each update will contain {} news items.".format(val)
elif key == 'interval':
msg = "Updates will be sent every {} minutes.".format(val)
else:
if val:
status = "disabled"
else:
status = "enabled"
msg = "Updates are {}.".format(status)
return msg
async def set_settings_value_default(cur, key):
# async def set_settings_value_default(cur):
# keys = ["enabled", "interval", "quantum"]
# for i in keys:
# sql = "SELECT id FROM settings WHERE key = ?"
# cur.execute(sql, (i,))
# if not cur.fetchone():
# val = await settings.get_value_default(i)
# sql = "INSERT INTO settings(key,value) VALUES(?,?)"
# cur.execute(sql, (i, val))
sql = "SELECT id FROM settings WHERE key = ?"
cur.execute(sql, (key,))
if not cur.fetchone():
val = await confighandler.get_value_default(key)
sql = "INSERT INTO settings(key,value) VALUES(?,?)"
cur.execute(sql, (key, val))
return val
async def get_settings_value(db_file, key):
"""
Get settings value.
:param db_file: Database filename.
:param key: "enabled", "interval", "quantum".
"""
# try:
# with create_connection(db_file) as conn:
# cur = conn.cursor()
# sql = "SELECT value FROM settings WHERE key = ?"
# cur.execute(sql, (key,))
# result = cur.fetchone()
# except:
# result = await settings.get_value_default(key)
# if not result:
# result = await settings.get_value_default(key)
# return result
with create_connection(db_file) as conn:
try:
cur = conn.cursor()
sql = "SELECT value FROM settings WHERE key = ?"
result = cur.execute(sql, (key,)).fetchone()[0]
except:
result = await set_settings_value_default(cur, key)
if not result:
result = await set_settings_value_default(cur, key)
return result

450
slixfeed/xmpphandler.py Normal file
View file

@ -0,0 +1,450 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from datetime import datetime
import asyncio
import os
import slixmpp
import confighandler
import datahandler
import sqlitehandler
jid_tasker = {}
task_manager = {}
time_now = datetime.now()
# time_now = time_now.strftime("%H:%M:%S")
def print_time():
# return datetime.now().strftime("%H:%M:%S")
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
return current_time
class Slixfeed(slixmpp.ClientXMPP):
"""
Slixmpp news bot that will send updates
from feeds it receives.
"""
print("slixmpp.ClientXMPP")
print(repr(slixmpp.ClientXMPP))
def __init__(self, jid, password):
slixmpp.ClientXMPP.__init__(self, jid, password)
# The session_start event will be triggered when
# the bot establishes its connection with the server
# and the XML streams are ready for use. We want to
# listen for this event so that we we can initialize
# our roster.
self.add_event_handler("session_start", self.start)
# self.add_event_handler("session_start", self.select_file)
# self.add_event_handler("session_start", self.send_status)
# self.add_event_handler("session_start", self.check_updates)
# The message event is triggered whenever a message
# stanza is received. Be aware that that includes
# MUC messages and error messages.
self.add_event_handler("message", self.message)
self.add_event_handler("disconnected", self.reconnect)
# Initialize event loop
self.loop = asyncio.get_event_loop()
async def start(self, event):
"""
Process the session_start event.
Typical actions for the session_start event are
requesting the roster and broadcasting an initial
presence stanza.
Arguments:
event -- An empty dictionary. The session_start
event does not provide any additional
data.
"""
self.send_presence()
await self.get_roster()
await self.select_file()
self.send_presence(
pshow="away",
pstatus="Slixmpp has been restarted.",
pto="sch@pimux.de"
)
async def message(self, msg):
"""
Process incoming message stanzas. Be aware that this also
includes MUC messages and error messages. It is usually
a good idea to check the messages's type before processing
or sending replies.
Arguments:
msg -- The received message stanza. See the documentation
for stanza objects and the Message stanza to see
how it may be used.
"""
if msg["type"] in ("chat", "normal"):
action = 0
jid = msg["from"].bare
message = " ".join(msg["body"].split())
message = message.lower()
if message.startswith("help"):
action = print_help()
# NOTE: Might not need it
# elif message.startswith("add "):
# url = message[4:]
elif message.startswith("http"):
url = message
action = await initdb(jid, datahandler.add_feed, url)
# action = "> " + message + "\n" + action
elif message.startswith("quantum "):
key = message[:7]
val = message[8:]
# action = "Every update will contain {} news items.".format(action)
action = await initdb(jid, sqlitehandler.set_settings_value, [key, val])
await self.refresh_task(jid, key, val)
elif message.startswith("interval "):
key = message[:8]
val = message[9:]
# action = "Updates will be sent every {} minutes.".format(action)
action = await initdb(jid, sqlitehandler.set_settings_value, [key, val])
await self.refresh_task(jid, key, val)
elif message.startswith("list"):
action = await initdb(jid, sqlitehandler.list_subscriptions)
elif message.startswith("recent "):
num = message[7:]
action = await initdb(jid, sqlitehandler.last_entries, num)
elif message.startswith("remove "):
ix = message[7:]
action = await initdb(jid, sqlitehandler.remove_feed, ix)
elif message.startswith("search "):
query = message[7:]
action = await initdb(jid, sqlitehandler.search_entries, query)
elif message.startswith("start"):
# action = "Updates are enabled."
key = "enabled"
val = 1
actiona = await initdb(jid, sqlitehandler.set_settings_value, [key, val])
asyncio.create_task(self.task_jid(jid))
# print(print_time(), "task_manager[jid]")
# print(task_manager[jid])
elif message.startswith("stats"):
action = await initdb(jid, sqlitehandler.statistics)
elif message.startswith("status "):
ix = message[7:]
action = await initdb(jid, sqlitehandler.toggle_status, ix)
elif message.startswith("stop"):
# action = "Updates are disabled."
try:
task_manager[jid]["check"].cancel()
# task_manager[jid]["status"].cancel()
task_manager[jid]["interval"].cancel()
key = "enabled"
val = 0
actiona = await initdb(jid, sqlitehandler.set_settings_value, [key, val])
await self.send_status(jid)
print(print_time(), "task_manager[jid]")
print(task_manager[jid])
except:
# action = "Updates are already disabled."
await self.send_status(jid)
else:
action = "Unknown command. Press \"help\" for list of commands"
if action: msg.reply(action).send()
print(print_time(), "COMMAND ACCOUNT")
print("COMMAND:", message)
print("ACCOUNT: " + str(msg["from"]))
async def select_file(self):
"""
Initiate actions by JID (Jabber ID).
:param self: Self
"""
while True:
db_dir = confighandler.get_default_dbdir()
if not os.path.isdir(db_dir):
msg = ("Slixfeed can not work without a database. \n"
"To create a database, follow these steps: \n"
"Add Slixfeed contact to your roster \n"
"Send a feed to the bot by: \n"
"add https://reclaimthenet.org/feed/")
print(print_time(), msg)
print(msg)
else:
os.chdir(db_dir)
files = os.listdir()
# TODO Use loop (with gather) instead of TaskGroup
# for file in files:
# if file.endswith(".db") and not file.endswith(".db-jour.db"):
# jid = file[:-3]
# jid_tasker[jid] = asyncio.create_task(self.task_jid(jid))
# await jid_tasker[jid]
async with asyncio.TaskGroup() as tg:
print("main task")
print(print_time(), "repr(tg)")
print(repr(tg)) # <TaskGroup entered>
for file in files:
if file.endswith(".db") and not file.endswith(".db-jour.db"):
jid = file[:-3]
tg.create_task(self.task_jid(jid))
# task_manager.update({jid: tg})
# print(task_manager) # {}
print(print_time(), "repr(tg) id(tg)")
print(jid, repr(tg)) # sch@pimux.de <TaskGroup tasks=1 entered>
print(jid, id(tg)) # sch@pimux.de 139879835500624
# <xmpphandler.Slixfeed object at 0x7f24922124d0> <TaskGroup tasks=2 entered>
# <xmpphandler.Slixfeed object at 0x7f24922124d0> 139879835500624
async def task_jid(self, jid):
"""
JID (Jabber ID) task manager.
:param self: Self
:param jid: Jabber ID
"""
enabled = await initdb(
jid,
sqlitehandler.get_settings_value,
"enabled"
)
print(print_time(), "enabled", enabled, jid)
if enabled:
print("sub task")
print(print_time(), "repr(self) id(self)")
print(repr(self))
print(id(self))
task_manager[jid] = {}
task_manager[jid]["check"] = asyncio.create_task(check_updates(jid))
task_manager[jid]["status"] = asyncio.create_task(self.send_status(jid))
task_manager[jid]["interval"] = asyncio.create_task(self.send_update(jid))
await task_manager[jid]["check"]
await task_manager[jid]["status"]
await task_manager[jid]["interval"]
print(print_time(), "task_manager[jid].items()")
print(task_manager[jid].items())
print(print_time(), "task_manager[jid]")
print(task_manager[jid])
print(print_time(), "task_manager")
print(task_manager)
else:
await self.send_status(jid)
async def send_update(self, jid):
"""
Send news items as messages.
:param self: Self
:param jid: Jabber ID
"""
new = await initdb(
jid,
sqlitehandler.get_entry_unread
)
if new:
print(print_time(), "> SEND UPDATE",jid)
self.send_message(
mto=jid,
mbody=new,
mtype="chat"
)
interval = await initdb(
jid,
sqlitehandler.get_settings_value,
"interval"
)
# await asyncio.sleep(60 * interval)
self.loop.call_at(
self.loop.time() + 60 * interval,
self.loop.create_task,
self.send_update(jid)
)
async def send_status(self, jid):
"""
Send status message.
:param self: Self
:param jid: Jabber ID
"""
print(print_time(), "> SEND STATUS",jid)
unread = await initdb(
jid,
sqlitehandler.get_number_of_entries_unread
)
if unread:
status_text = "📰 News items: {}".format(str(unread))
status_mode = "chat"
else:
status_text = "🗞 No News"
status_mode = "available"
enabled = await initdb(
jid,
sqlitehandler.get_settings_value,
"enabled"
)
if not enabled:
status_mode = "xa"
# print(status_text, "for", jid)
self.send_presence(
pshow=status_mode,
pstatus=status_text,
pto=jid,
#pfrom=None
)
await asyncio.sleep(60 * 20)
# self.loop.call_at(
# self.loop.time() + 60 * 20,
# self.loop.create_task,
# self.send_status(jid)
# )
async def refresh_task(self, jid, key, val):
"""
Apply settings on runtime.
:param self: Self
:param jid: Jabber ID
:param key: Key
:param val: Value
"""
if jid in task_manager:
task_manager[jid][key].cancel()
loop = asyncio.get_event_loop()
print(print_time(), "loop")
print(loop)
print(print_time(), "loop")
task_manager[jid][key] = loop.call_at(
loop.time() + 60 * float(val),
loop.create_task,
self.send_update(jid)
)
# task_manager[jid][key] = self.send_update.loop.call_at(
# self.send_update.loop.time() + 60 * val,
# self.send_update.loop.create_task,
# self.send_update(jid)
# )
# TODO Take this function out of
# <class 'slixmpp.clientxmpp.ClientXMPP'>
async def check_updates(jid):
"""
Start calling for update check up.
:param jid: Jabber ID
"""
while True:
print(print_time(), "> CHCK UPDATE",jid)
await initdb(jid, datahandler.download_updates)
await asyncio.sleep(60 * 90)
# Schedule to call this function again in 90 minutes
# self.loop.call_at(
# self.loop.time() + 60 * 90,
# self.loop.create_task,
# self.check_updates(jid)
# )
def print_help():
"""
Print help manual.
"""
msg = ("Slixfeed - News syndication bot for Jabber/XMPP \n"
"\n"
"DESCRIPTION: \n"
" Slixfeed is a news aggregator bot for online news feeds. \n"
" Supported filetypes: Atom, RDF and RSS. \n"
"\n"
"BASIC USAGE: \n"
" start \n"
" Enable bot and send updates. \n"
" Stop \n"
" Disable bot and stop updates. \n"
" batch N \n"
" Send N updates for each interval. \n"
" interval N \n"
" Send an update every N minutes. \n"
" feed list \n"
" List subscriptions. \n"
"\n"
"EDIT OPTIONS: \n"
" add URL \n"
" Add URL to subscription list. \n"
" remove ID \n"
" Remove feed from subscription list. \n"
" status ID \n"
" Toggle update status of feed. \n"
"\n"
"SEARCH OPTIONS: \n"
" search TEXT \n"
" Search news items by given keywords. \n"
" recent N \n"
" List recent N news items (up to 50 items). \n"
"\n"
"STATISTICS OPTIONS: \n"
" analyses \n"
" Show report and statistics of feeds. \n"
" obsolete \n"
" List feeds that are not available. \n"
" unread \n"
" Print number of unread news items. \n"
"\n"
"BACKUP OPTIONS: \n"
" export opml \n"
" Send an OPML file with your feeds. \n"
" backup news html\n"
" Send an HTML formatted file of your news items. \n"
" backup news md \n"
" Send a Markdown file of your news items. \n"
" backup news text \n"
" Send a Plain Text file of your news items. \n"
"\n"
"DOCUMENTATION: \n"
" Slixfeed \n"
" https://gitgud.io/sjehuda/slixfeed \n"
" Slixmpp \n"
" https://slixmpp.readthedocs.io/ \n"
" feedparser \n"
" https://pythonhosted.org/feedparser")
return msg
# TODO Perhaps this needs to be executed
# just once per program execution
async def initdb(jid, callback, message=None):
"""
Callback function to instantiate action on database.
:param jid: JID (Jabber ID).
:param callback: Function name.
:param massage: Optional kwarg when a message is a part or required argument.
"""
db_dir = confighandler.get_default_dbdir()
if not os.path.isdir(db_dir):
os.mkdir(db_dir)
db_file = os.path.join(db_dir, r"{}.db".format(jid))
sqlitehandler.create_tables(db_file)
# await sqlitehandler.set_default_values(db_file)
if message:
return await callback(db_file, message)
else:
return await callback(db_file)