forked from sch/Slixfeed
5fe4e3b211
Thank you. roughnecks.
835 lines
25 KiB
Python
835 lines
25 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
|
|
TODO
|
|
|
|
1) Function scan at "for entry in entries"
|
|
Suppress directly calling function "add_entry" (accept db_file)
|
|
Pass a list of valid entries to a new function "add_entries"
|
|
(accept db_file) which would call function "add_entry" (accept cur).
|
|
* accelerate adding of large set of entries at once.
|
|
* prevent (or mitigate halt of consequent actions).
|
|
* reduce I/O.
|
|
|
|
2) Call sqlite function from function statistics.
|
|
Returning a list of values doesn't' seem to be a good practice.
|
|
|
|
3) Special statistics for operator:
|
|
* Size of database(s);
|
|
* Amount of JIDs subscribed;
|
|
* Amount of feeds of all JIDs;
|
|
* Amount of entries of all JIDs.
|
|
|
|
4) Consider to append text to remind to share presence
|
|
'✒️ Share online status to receive updates'
|
|
|
|
5) Request for subscription
|
|
if (await XmppUtilities.get_chat_type(self, jid_bare) == 'chat' and
|
|
not self.client_roster[jid_bare]['to']):
|
|
XmppPresence.subscription(self, jid_bare, 'subscribe')
|
|
await XmppRoster.add(self, jid_bare)
|
|
status_message = '✒️ Share online status to receive updates'
|
|
XmppPresence.send(self, jid_bare, status_message)
|
|
message_subject = 'RSS News Bot'
|
|
message_body = 'Share online status to receive updates.'
|
|
XmppMessage.send_headline(self, jid_bare, message_subject,
|
|
message_body, 'chat')
|
|
|
|
"""
|
|
|
|
from datetime import datetime
|
|
from dateutil.parser import parse
|
|
from email.utils import parseaddr, parsedate, parsedate_to_datetime
|
|
import hashlib
|
|
from lxml import etree, html
|
|
import os
|
|
import random
|
|
import slixfeed.fetch as fetch
|
|
from slixfeed.log import Logger
|
|
import slixfeed.sqlite as sqlite
|
|
import sys
|
|
from urllib.parse import (
|
|
parse_qs,
|
|
urlencode,
|
|
urljoin,
|
|
# urlparse,
|
|
urlsplit,
|
|
urlunsplit
|
|
)
|
|
|
|
try:
|
|
import tomllib
|
|
except:
|
|
import tomli as tomllib
|
|
|
|
logger = Logger(__name__)
|
|
|
|
|
|
class Config:
|
|
|
|
|
|
def get_default_data_directory():
|
|
if os.environ.get('HOME'):
|
|
data_home = os.path.join(os.environ.get('HOME'), '.local', 'share')
|
|
return os.path.join(data_home, 'kaikout')
|
|
elif sys.platform == 'win32':
|
|
data_home = os.environ.get('APPDATA')
|
|
if data_home is None:
|
|
return os.path.join(
|
|
os.path.dirname(__file__) + '/kaikout_data')
|
|
else:
|
|
return os.path.join(os.path.dirname(__file__) + '/kaikout_data')
|
|
|
|
|
|
def get_default_config_directory():
|
|
"""
|
|
Determine the directory path where configuration will be stored.
|
|
|
|
* If $XDG_CONFIG_HOME is defined, use it;
|
|
* else if $HOME exists, use it;
|
|
* else if the platform is Windows, use %APPDATA%;
|
|
* else use the current directory.
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
Path to configuration directory.
|
|
"""
|
|
# config_home = xdg.BaseDirectory.xdg_config_home
|
|
config_home = os.environ.get('XDG_CONFIG_HOME')
|
|
if config_home is None:
|
|
if os.environ.get('HOME') is None:
|
|
if sys.platform == 'win32':
|
|
config_home = os.environ.get('APPDATA')
|
|
if config_home is None:
|
|
return os.path.abspath('.')
|
|
else:
|
|
return os.path.abspath('.')
|
|
else:
|
|
config_home = os.path.join(
|
|
os.environ.get('HOME'), '.config'
|
|
)
|
|
return os.path.join(config_home, 'kaikout')
|
|
|
|
|
|
def get_setting_value(db_file, key):
|
|
value = sqlite.get_setting_value(db_file, key)
|
|
if value:
|
|
value = value[0]
|
|
else:
|
|
value = Config.get_value('settings', 'Settings', key)
|
|
return value
|
|
|
|
|
|
def get_values(filename, key=None):
|
|
config_dir = Config.get_default_config_directory()
|
|
if not os.path.isdir(config_dir):
|
|
config_dir = '/usr/share/slixfeed/'
|
|
if not os.path.isdir(config_dir):
|
|
config_dir = os.path.dirname(__file__) + "/assets"
|
|
config_file = os.path.join(config_dir, filename)
|
|
with open(config_file, mode="rb") as defaults:
|
|
result = tomllib.load(defaults)
|
|
values = result[key] if key else result
|
|
return values
|
|
|
|
|
|
class Database:
|
|
|
|
|
|
def instantiate(dir_data, jid_bare):
|
|
"""
|
|
Instantiate action on database and return its filename location.
|
|
|
|
Parameters
|
|
----------
|
|
dir_data : str
|
|
Directory.
|
|
jid_file : str
|
|
Jabber ID.
|
|
|
|
Returns
|
|
-------
|
|
db_file
|
|
Filename.
|
|
"""
|
|
db_file = os.path.join(dir_data, 'sqlite', f'{jid_bare}.db')
|
|
sqlite.create_tables(db_file)
|
|
return db_file
|
|
|
|
|
|
class DateAndTime:
|
|
|
|
#https://feedparser.readthedocs.io/en/latest/date-parsing.html
|
|
|
|
def now():
|
|
"""
|
|
ISO 8601 Timestamp.
|
|
|
|
Returns
|
|
-------
|
|
date : ???
|
|
ISO 8601 Timestamp.
|
|
"""
|
|
date = datetime.now().isoformat()
|
|
return date
|
|
|
|
|
|
def convert_struct_time_to_iso8601(struct_time):
|
|
date = datetime(*struct_time[:6])
|
|
date = date.isoformat()
|
|
return date
|
|
|
|
|
|
def convert_seconds_to_yyyy_mm_dd(seconds_time):
|
|
date_time = datetime.fromtimestamp(seconds_time)
|
|
formatted_date = date_time.strftime('%Y-%m-%d')
|
|
return formatted_date
|
|
|
|
|
|
def current_date():
|
|
"""
|
|
Print MM DD, YYYY (Weekday Time) timestamp.
|
|
|
|
Returns
|
|
-------
|
|
date : str
|
|
MM DD, YYYY (Weekday Time) timestamp.
|
|
"""
|
|
now = datetime.now()
|
|
time = now.strftime("%B %d, %Y (%A %T)")
|
|
return time
|
|
|
|
|
|
def current_time():
|
|
"""
|
|
Print HH:MM:SS timestamp.
|
|
|
|
Returns
|
|
-------
|
|
date : str
|
|
HH:MM:SS timestamp.
|
|
"""
|
|
now = datetime.now()
|
|
time = now.strftime("%H:%M:%S")
|
|
return time
|
|
|
|
|
|
def timestamp():
|
|
"""
|
|
Print time stamp to be used in filename.
|
|
|
|
Returns
|
|
-------
|
|
formatted_time : str
|
|
%Y%m%d-%H%M%S timestamp.
|
|
"""
|
|
now = datetime.now()
|
|
formatted_time = now.strftime("%Y%m%d-%H%M%S")
|
|
return formatted_time
|
|
|
|
|
|
def validate(date):
|
|
"""
|
|
Validate date format.
|
|
|
|
Parameters
|
|
----------
|
|
date : str
|
|
Timestamp.
|
|
|
|
Returns
|
|
-------
|
|
date : str
|
|
Timestamp.
|
|
"""
|
|
try:
|
|
parse(date)
|
|
except:
|
|
date = DateAndTime.now()
|
|
return date
|
|
|
|
|
|
def rfc2822_to_iso8601(date):
|
|
"""
|
|
Convert RFC 2822 into ISO 8601.
|
|
|
|
Parameters
|
|
----------
|
|
date : str
|
|
RFC 2822 Timestamp.
|
|
|
|
Returns
|
|
-------
|
|
date : str
|
|
ISO 8601 Timestamp.
|
|
"""
|
|
if parsedate(date):
|
|
try:
|
|
date = parsedate_to_datetime(date)
|
|
date = date.isoformat()
|
|
except:
|
|
date = DateAndTime.now()
|
|
return date
|
|
|
|
|
|
class Documentation:
|
|
|
|
|
|
def manual(config_dir, section=None, command=None):
|
|
function_name = sys._getframe().f_code.co_name
|
|
logger.debug('{}: filename: {}'.format(function_name, config_dir))
|
|
filename = os.path.join(config_dir, 'commands.toml')
|
|
with open(filename, mode="rb") as commands:
|
|
cmds = tomllib.load(commands)
|
|
if section == 'all':
|
|
cmd_list = ''
|
|
for cmd in cmds:
|
|
for i in cmds[cmd]:
|
|
cmd_list += cmds[cmd][i] + '\n'
|
|
elif command and section:
|
|
try:
|
|
cmd_list = cmds[section][command]
|
|
except KeyError as e:
|
|
logger.error(e)
|
|
cmd_list = None
|
|
elif section:
|
|
try:
|
|
cmd_list = []
|
|
for cmd in cmds[section]:
|
|
cmd_list.extend([cmd])
|
|
except KeyError as e:
|
|
logger.error('KeyError:' + str(e))
|
|
cmd_list = None
|
|
else:
|
|
cmd_list = []
|
|
for cmd in cmds:
|
|
cmd_list.extend([cmd])
|
|
return cmd_list
|
|
|
|
|
|
class Html:
|
|
|
|
|
|
async def extract_image_from_html(url):
|
|
function_name = sys._getframe().f_code.co_name
|
|
logger.debug('{}: url: {}'.format(function_name, url))
|
|
result = await fetch.http(settings_network, url)
|
|
if not result['error']:
|
|
data = result['content']
|
|
tree = html.fromstring(data)
|
|
# TODO Exclude banners, class="share" links etc.
|
|
images = tree.xpath(
|
|
'//img[not('
|
|
'contains(@src, "avatar") or '
|
|
'contains(@src, "cc-by-sa") or '
|
|
'contains(@src, "
|
|
https://www.lilithsaintcrow.com/2024/02/love-anonymous//image/png;base64,iVBORw0KGgoAAAANSUhEUgAABaAAAAeAAQAAAAAQ6M16AAAAAnRSTlMAAHaTzTgAAAFmSURBVBgZ7cEBAQAAAIKg/q92SMAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADgWE3LAAGyZmPPAAAAAElFTkSuQmCC
|
|
|
|
TODO
|
|
|
|
1) ActivityPub URL revealer activitypub_to_http.
|
|
|
|
2) SQLite preference "instance" for preferred instances.
|
|
|
|
"""
|
|
|
|
|
|
class Url:
|
|
|
|
# NOTE
|
|
# hostname and protocol are listed as one in file proxies.toml.
|
|
# Perhaps a better practice would be to have them separated.
|
|
|
|
# NOTE
|
|
# File proxies.toml will remain as it is, in order to be
|
|
# coordinated with the dataset of project LibRedirect, even
|
|
# though rule-sets might be adopted (see )Privacy Redirect).
|
|
|
|
def get_hostname(url):
|
|
parted_url = urlsplit(url)
|
|
hostname = parted_url.netloc
|
|
if hostname.startswith('www.'): hostname = hostname.replace('www.', '')
|
|
return hostname
|
|
|
|
|
|
async def replace_hostname(configuration_directory, proxies, settings_network, url, url_type):
|
|
"""
|
|
Replace hostname.
|
|
|
|
Parameters
|
|
----------
|
|
proxies : list
|
|
A list of hostnames.
|
|
url : str
|
|
A URL.
|
|
url_type : str
|
|
A "feed" or a "link".
|
|
|
|
Returns
|
|
-------
|
|
url : str
|
|
A processed URL.
|
|
"""
|
|
url_new = None
|
|
parted_url = urlsplit(url)
|
|
# protocol = parted_url.scheme
|
|
hostname = parted_url.netloc
|
|
hostname = hostname.replace('www.','')
|
|
pathname = parted_url.path
|
|
queries = parted_url.query
|
|
fragment = parted_url.fragment
|
|
for proxy_name in proxies:
|
|
proxy = proxies[proxy_name]
|
|
if hostname in proxy['hostname'] and url_type in proxy['type']:
|
|
while not url_new:
|
|
print('>>>')
|
|
print(url_new)
|
|
proxy_type = 'clearnet'
|
|
proxy_list = proxy[proxy_type]
|
|
if len(proxy_list):
|
|
# proxy_list = proxies[proxy_name][proxy_type]
|
|
proxy_url = random.choice(proxy_list)
|
|
parted_proxy_url = urlsplit(proxy_url)
|
|
protocol_new = parted_proxy_url.scheme
|
|
hostname_new = parted_proxy_url.netloc
|
|
url_new = urlunsplit([protocol_new, hostname_new,
|
|
pathname, queries, fragment])
|
|
print(proxy_url)
|
|
print(url_new)
|
|
print('>>>')
|
|
response = await fetch.http(settings_network, url_new)
|
|
if (response and
|
|
response['status_code'] == 200 and
|
|
# response.reason == 'OK' and
|
|
url_new.startswith(proxy_url)): break
|
|
else:
|
|
proxies_obsolete_file = os.path.join(configuration_directory, 'proxies_obsolete.toml')
|
|
proxies_file = os.path.join(configuration_directory, 'proxies.toml')
|
|
breakpoint()
|
|
proxies_obsolete = Toml.open_file(proxies_obsolete_file)
|
|
proxies_obsolete['proxies'][proxy_name][proxy_type].append(proxy_url)
|
|
Toml.save_file(proxies_obsolete_file, proxies_obsolete)
|
|
# TODO self.proxies might need to be changed, so self probably should be passed.
|
|
proxies['proxies'][proxy_name][proxy_type].remove(proxy_url)
|
|
Toml.save_file(proxies_file, proxies)
|
|
url_new = None
|
|
else:
|
|
logger.warning('No proxy URLs for {}. '
|
|
'Please update proxies.toml'
|
|
.format(proxy_name))
|
|
url_new = url
|
|
break
|
|
return url_new
|
|
|
|
|
|
def remove_tracking_parameters(trackers, url):
|
|
"""
|
|
Remove queries with tracking parameters.
|
|
|
|
Parameters
|
|
----------
|
|
trackers : list
|
|
A list of queries.
|
|
url : str
|
|
A URL.
|
|
|
|
Returns
|
|
-------
|
|
url : str
|
|
A processed URL.
|
|
"""
|
|
if url.startswith('data:') and ';base64,' in url:
|
|
return url
|
|
parted_url = urlsplit(url)
|
|
protocol = parted_url.scheme
|
|
hostname = parted_url.netloc
|
|
pathname = parted_url.path
|
|
queries = parse_qs(parted_url.query)
|
|
fragment = parted_url.fragment
|
|
for tracker in trackers:
|
|
if tracker in queries: del queries[tracker]
|
|
queries_new = urlencode(queries, doseq=True)
|
|
url = urlunsplit([protocol, hostname, pathname, queries_new, fragment])
|
|
return url
|
|
|
|
|
|
def feed_to_http(url):
|
|
"""
|
|
Replace scheme FEED by HTTP.
|
|
|
|
Parameters
|
|
----------
|
|
url : str
|
|
URL.
|
|
|
|
Returns
|
|
-------
|
|
new_url : str
|
|
URL.
|
|
"""
|
|
par_url = urlsplit(url)
|
|
new_url = urlunsplit(['http', par_url.netloc, par_url.path, par_url.query,
|
|
par_url.fragment])
|
|
return new_url
|
|
|
|
|
|
def check_xmpp_uri(uri):
|
|
"""
|
|
Check validity of XMPP URI.
|
|
|
|
Parameters
|
|
----------
|
|
uri : str
|
|
URI.
|
|
|
|
Returns
|
|
-------
|
|
jid : str
|
|
JID or None.
|
|
"""
|
|
jid = urlsplit(uri).path
|
|
if parseaddr(jid)[1] != jid:
|
|
jid = False
|
|
return jid
|
|
|
|
|
|
# NOTE Read the documentation
|
|
# https://docs.python.org/3/library/urllib.parse.html#urllib.parse.urljoin
|
|
def complete_url(source, link):
|
|
"""
|
|
Check if URL is pathname and complete it into URL.
|
|
|
|
Parameters
|
|
----------
|
|
source : str
|
|
Feed URL.
|
|
link : str
|
|
Link URL or pathname.
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
URL.
|
|
"""
|
|
if link.startswith('data:') and ';base64,' in link:
|
|
return link
|
|
if link.startswith('www.'):
|
|
return 'http://' + link
|
|
parted_link = urlsplit(link)
|
|
parted_feed = urlsplit(source)
|
|
if parted_link.scheme == 'magnet' and parted_link.query:
|
|
return link
|
|
if parted_link.scheme and parted_link.netloc:
|
|
return link
|
|
if link.startswith('//'):
|
|
if parted_link.netloc and parted_link.path:
|
|
new_link = urlunsplit([parted_feed.scheme, parted_link.netloc,
|
|
parted_link.path, parted_link.query,
|
|
parted_link.fragment])
|
|
elif link.startswith('/'):
|
|
new_link = urlunsplit([parted_feed.scheme, parted_feed.netloc,
|
|
parted_link.path, parted_link.query,
|
|
parted_link.fragment])
|
|
elif link.startswith('../'):
|
|
pathlink = parted_link.path.split('/')
|
|
pathfeed = parted_feed.path.split('/')
|
|
for i in pathlink:
|
|
if i == '..':
|
|
if pathlink.index('..') == 0:
|
|
pathfeed.pop()
|
|
else:
|
|
break
|
|
while pathlink.count('..'):
|
|
if pathlink.index('..') == 0:
|
|
pathlink.remove('..')
|
|
else:
|
|
break
|
|
pathlink = '/'.join(pathlink)
|
|
pathfeed.extend([pathlink])
|
|
new_link = urlunsplit([parted_feed.scheme, parted_feed.netloc,
|
|
'/'.join(pathfeed), parted_link.query,
|
|
parted_link.fragment])
|
|
else:
|
|
pathlink = parted_link.path.split('/')
|
|
pathfeed = parted_feed.path.split('/')
|
|
if link.startswith('./'):
|
|
pathlink.remove('.')
|
|
if not source.endswith('/'):
|
|
pathfeed.pop()
|
|
pathlink = '/'.join(pathlink)
|
|
pathfeed.extend([pathlink])
|
|
new_link = urlunsplit([parted_feed.scheme, parted_feed.netloc,
|
|
'/'.join(pathfeed), parted_link.query,
|
|
parted_link.fragment])
|
|
return new_link
|
|
|
|
|
|
|
|
# TODO
|
|
|
|
# Feed https://www.ocaml.org/feed.xml
|
|
# Link %20https://frama-c.com/fc-versions/cobalt.html%20
|
|
|
|
# FIXME
|
|
|
|
# Feed https://cyber.dabamos.de/blog/feed.rss
|
|
# Link https://cyber.dabamos.de/blog/#article-2022-07-15
|
|
|
|
def join_url(source, link):
|
|
"""
|
|
Join base URL with given pathname.
|
|
|
|
Parameters
|
|
----------
|
|
source : str
|
|
Feed URL.
|
|
link : str
|
|
Link URL or pathname.
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
URL.
|
|
"""
|
|
if link.startswith('data:') and ';base64,' in link:
|
|
return link
|
|
if link.startswith('www.'):
|
|
new_link = 'http://' + link
|
|
elif link.startswith('%20') and link.endswith('%20'):
|
|
old_link = link.split('%20')
|
|
del old_link[0]
|
|
old_link.pop()
|
|
new_link = ''.join(old_link)
|
|
else:
|
|
new_link = urljoin(source, link)
|
|
return new_link
|
|
|
|
|
|
def trim_url(url):
|
|
"""
|
|
Check URL pathname for double slash.
|
|
|
|
Parameters
|
|
----------
|
|
url : str
|
|
URL.
|
|
|
|
Returns
|
|
-------
|
|
url : str
|
|
URL.
|
|
"""
|
|
if url.startswith('data:') and ';base64,' in url:
|
|
return url
|
|
parted_url = urlsplit(url)
|
|
protocol = parted_url.scheme
|
|
hostname = parted_url.netloc
|
|
pathname = parted_url.path
|
|
queries = parted_url.query
|
|
fragment = parted_url.fragment
|
|
while '//' in pathname:
|
|
pathname = pathname.replace('//', '/')
|
|
url = urlunsplit([protocol, hostname, pathname, queries, fragment])
|
|
return url
|
|
|
|
|
|
def activitypub_to_http(namespace):
|
|
"""
|
|
Replace ActivityPub namespace by HTTP.
|
|
|
|
Parameters
|
|
----------
|
|
namespace : str
|
|
Namespace.
|
|
|
|
Returns
|
|
-------
|
|
new_url : str
|
|
URL.
|
|
"""
|
|
|
|
|
|
|
|
class String:
|
|
|
|
|
|
def generate_identifier(url, counter):
|
|
hostname = Url.get_hostname(url)
|
|
hostname = hostname.replace('.','-')
|
|
identifier = hostname + ':' + str(counter)
|
|
return identifier
|
|
|
|
|
|
# string_to_md5_hash
|
|
# NOTE Warning: Entry might not have a link
|
|
# TODO Handle situation error
|
|
def md5_hash(url):
|
|
url_encoded = url.encode()
|
|
url_hashed = hashlib.md5(url_encoded)
|
|
url_digest = url_hashed.hexdigest()
|
|
return url_digest
|
|
|
|
|
|
|
|
class Utilities:
|
|
|
|
|
|
# string_to_md5_hash
|
|
# NOTE Warning: Entry might not have a link
|
|
# TODO Handle situation error
|
|
def hash_url_to_md5(url):
|
|
url_encoded = url.encode()
|
|
url_hashed = hashlib.md5(url_encoded)
|
|
url_digest = url_hashed.hexdigest()
|
|
return url_digest
|
|
|
|
|
|
def pick_a_feed(dir_config, lang=None):
|
|
function_name = sys._getframe().f_code.co_name
|
|
logger.debug('{}: lang: {}'
|
|
.format(function_name, lang))
|
|
filename_feeds = os.path.join(dir_config, 'feeds.toml')
|
|
with open(filename_feeds, mode="rb") as feeds:
|
|
urls = tomllib.load(feeds)
|
|
import random
|
|
url = random.choice(urls['feeds'])
|
|
return url
|