79b80b1698
a handler to each directory, instead of processing a function to detect them.
744 lines
21 KiB
Python
744 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
|
|
FIXME
|
|
|
|
1) Use dict for ConfigDefault
|
|
|
|
2) Store ConfigJabberID in dicts
|
|
|
|
TODO
|
|
|
|
1) Site-specific filter (i.e. audiobookbay).
|
|
|
|
2) Exclude sites from being subjected to filtering (e.g. metapedia).
|
|
|
|
3) Filter phrases:
|
|
Refer to sqlitehandler.search_entries for implementation.
|
|
It is expected to be more complex than function search_entries.
|
|
|
|
4) Copy file from /etc/slixfeed/ or /usr/share/slixfeed/
|
|
|
|
5) Merge get_value_default into get_value.
|
|
|
|
6) Use TOML https://ruudvanasseldonk.com/2023/01/11/the-yaml-document-from-hell
|
|
|
|
7) Make the program portable (directly use the directory assets) -- Thorsten
|
|
|
|
7.1) Read missing files from base directories or either set error message.
|
|
|
|
"""
|
|
|
|
import configparser
|
|
from slixfeed.log import Logger
|
|
import os
|
|
# from random import randrange
|
|
import slixfeed.sqlite as sqlite
|
|
import sys
|
|
import tomli_w
|
|
try:
|
|
import tomllib
|
|
except:
|
|
import tomli as tomllib
|
|
|
|
logger = Logger(__name__)
|
|
|
|
class Cache:
|
|
|
|
def get_default_cache_directory():
|
|
"""
|
|
Determine the directory path where dbfile will be stored.
|
|
|
|
* If $XDG_DATA_HOME is defined, use it;
|
|
* else if $HOME exists, use it;
|
|
* else if the platform is Windows, use %APPDATA%;
|
|
* else use the current directory.
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
Path to cache directory.
|
|
"""
|
|
# data_home = xdg.BaseDirectory.xdg_data_home
|
|
data_home = os.environ.get('XDG_CACHE_HOME')
|
|
if data_home is None:
|
|
if os.environ.get('HOME') is None:
|
|
if sys.platform == 'win32':
|
|
data_home = os.environ.get('APPDATA')
|
|
if data_home is None:
|
|
return os.path.abspath('.slixfeed/cache')
|
|
else:
|
|
return os.path.abspath('.slixfeed/cache')
|
|
else:
|
|
data_home = os.path.join(
|
|
os.environ.get('HOME'), '.cache'
|
|
)
|
|
return os.path.join(data_home, 'slixfeed')
|
|
|
|
# TODO Consider a class ConfigDefault for default values to be initiate at most
|
|
# basic level possible and a class ConfigJID for each JID (i.e. db_file) to be
|
|
# also initiated at same level or at least at event call, then check whether
|
|
# setting_jid.setting_key has value, otherwise resort to setting_default.setting_key.
|
|
class Config:
|
|
|
|
# TODO Write a similar function for file.
|
|
# NOTE the is a function of directory, noot file.
|
|
def get_default_config_directory():
|
|
"""
|
|
Determine the directory path where configuration will be stored.
|
|
|
|
* If $XDG_CONFIG_HOME is defined, use it;
|
|
* else if $HOME exists, use it;
|
|
* else if the platform is Windows, use %APPDATA%;
|
|
* else use the current directory.
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
Path to configuration directory.
|
|
"""
|
|
# config_home = xdg.BaseDirectory.xdg_config_home
|
|
config_home = os.environ.get('XDG_CONFIG_HOME')
|
|
if config_home is None:
|
|
if os.environ.get('HOME') is None:
|
|
if sys.platform == 'win32':
|
|
config_home = os.environ.get('APPDATA')
|
|
if config_home is None:
|
|
return os.path.abspath('.')
|
|
else:
|
|
return os.path.abspath('.')
|
|
else:
|
|
config_home = os.path.join(
|
|
os.environ.get('HOME'), '.config'
|
|
)
|
|
return os.path.join(config_home, 'slixfeed')
|
|
|
|
def update_toml_file(filename, data):
|
|
with open(filename, 'w') as new_file:
|
|
content = tomli_w.dumps(data)
|
|
new_file.write(content)
|
|
|
|
def add_settings_default(self):
|
|
settings_default = get_values('settings.toml', 'settings')
|
|
self.defaults = settings_default
|
|
|
|
# TODO Open SQLite file once
|
|
def add_settings_jid(self, jid_bare, db_file):
|
|
self.settings[jid_bare] = {}
|
|
for key in self.defaults['default']:
|
|
value = sqlite.get_setting_value(db_file, key)
|
|
if value:
|
|
self.settings[jid_bare][key] = value[0]
|
|
elif key not in ('check', 'formatting'):
|
|
# NOTE This might neglects the need for
|
|
# self.defaults of get_setting_value
|
|
self.settings[jid_bare][key] = self.defaults['default'][key]
|
|
|
|
def get_settings_xmpp(key=None):
|
|
result = get_values('accounts.toml', 'xmpp')
|
|
result = result[key] if key else result
|
|
return result
|
|
|
|
async def set_setting_value(self, jid_bare, db_file, key, val):
|
|
key = key.lower()
|
|
key_val = [key, val]
|
|
self.settings[jid_bare][key] = val
|
|
if sqlite.is_setting_key(db_file, key):
|
|
await sqlite.update_setting_value(db_file, key_val)
|
|
else:
|
|
await sqlite.set_setting_value(db_file, key_val)
|
|
|
|
# TODO Segregate Jabber ID settings from Slixfeed wide settings.
|
|
# self.settings, self.settings_xmpp, self.settings_irc etc.
|
|
def get_setting_value(self, jid_bare, key):
|
|
if jid_bare in self.settings and key in self.settings[jid_bare]:
|
|
value = self.settings[jid_bare][key]
|
|
else:
|
|
value = self.defaults['default'][key]
|
|
return value
|
|
|
|
class ConfigNetwork:
|
|
def __init__(self, settings):
|
|
settings['network'] = {}
|
|
for key in ('http_proxy', 'user_agent'):
|
|
value = get_value('settings', 'Network', key)
|
|
settings['network'][key] = value
|
|
|
|
|
|
class ConfigJabberID:
|
|
def __init__(self, settings, jid_bare, db_file):
|
|
settings[jid_bare] = {}
|
|
for key in ('archive', 'enabled', 'filter', 'formatting', 'interval',
|
|
'length', 'media', 'old', 'quantum'):
|
|
value = sqlite.get_setting_value(db_file, key)
|
|
if value: value = value[0]
|
|
print(value)
|
|
settings[jid_bare][key] = value
|
|
|
|
|
|
class Data:
|
|
|
|
|
|
def get_default_data_directory():
|
|
"""
|
|
Determine the directory path where dbfile will be stored.
|
|
|
|
* If $XDG_DATA_HOME is defined, use it;
|
|
* else if $HOME exists, use it;
|
|
* else if the platform is Windows, use %APPDATA%;
|
|
* else use the current directory.
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
Path to database file.
|
|
|
|
Note
|
|
----
|
|
This function was taken from project buku.
|
|
|
|
See https://github.com/jarun/buku
|
|
|
|
* Arun Prakash Jana (jarun)
|
|
* Dmitry Marakasov (AMDmi3)
|
|
"""
|
|
# data_home = xdg.BaseDirectory.xdg_data_home
|
|
data_home = os.environ.get('XDG_DATA_HOME')
|
|
if data_home is None:
|
|
if os.environ.get('HOME') is None:
|
|
if sys.platform == 'win32':
|
|
data_home = os.environ.get('APPDATA')
|
|
if data_home is None:
|
|
return os.path.abspath('.slixfeed/data')
|
|
else:
|
|
return os.path.abspath('.slixfeed/data')
|
|
else:
|
|
data_home = os.path.join(
|
|
os.environ.get('HOME'), '.local', 'share'
|
|
)
|
|
return os.path.join(data_home, 'slixfeed')
|
|
|
|
|
|
def get_pathname_to_omemo_directory():
|
|
"""
|
|
Get OMEMO directory.
|
|
|
|
Parameters
|
|
----------
|
|
None
|
|
|
|
Returns
|
|
-------
|
|
object
|
|
Coroutine object.
|
|
"""
|
|
db_dir = get_default_data_directory()
|
|
if not os.path.isdir(db_dir):
|
|
os.mkdir(db_dir)
|
|
if not os.path.isdir(db_dir + "/omemo"):
|
|
os.mkdir(db_dir + "/omemo")
|
|
omemo_dir = os.path.join(db_dir, "omemo")
|
|
return omemo_dir
|
|
|
|
|
|
def get_values(filename, key=None):
|
|
config_dir = get_default_config_directory()
|
|
if not os.path.isdir(config_dir):
|
|
config_dir = '/usr/share/slixfeed/'
|
|
if not os.path.isdir(config_dir):
|
|
config_dir = os.path.dirname(__file__) + "/assets"
|
|
config_file = os.path.join(config_dir, filename)
|
|
with open(config_file, mode="rb") as defaults:
|
|
result = tomllib.load(defaults)
|
|
values = result[key] if key else result
|
|
return values
|
|
|
|
|
|
def get_setting_value(db_file, key):
|
|
value = sqlite.get_setting_value(db_file, key)
|
|
if value:
|
|
value = value[0]
|
|
else:
|
|
value = get_value('settings', 'Settings', key)
|
|
# try:
|
|
# value = int(value)
|
|
# except ValueError as e:
|
|
# print('ValueError for value {} (key {}):\n{}'.format(value, key, e))
|
|
# if isinstance(value, bool):
|
|
# if value:
|
|
# value = 1
|
|
# else:
|
|
# value = 0
|
|
return value
|
|
|
|
|
|
# TODO Merge with backup_obsolete
|
|
def update_proxies(file, proxy_name, proxy_type, proxy_url, action='remove'):
|
|
"""
|
|
Add given URL to given list.
|
|
|
|
Parameters
|
|
----------
|
|
file : str
|
|
Filename.
|
|
proxy_name : str
|
|
Proxy name.
|
|
proxy_type : str
|
|
Proxy title.
|
|
proxy_url : str
|
|
Proxy URL.
|
|
action : str
|
|
add or remove
|
|
|
|
Returns
|
|
-------
|
|
None.
|
|
"""
|
|
data = open_config_file('proxies.toml')
|
|
proxy_list = data['proxies'][proxy_name][proxy_type]
|
|
# breakpoint()
|
|
print('####################### PROXY ######################')
|
|
proxy_index = proxy_list.index(proxy_url)
|
|
proxy_list.pop(proxy_index)
|
|
with open(file, 'w') as new_file:
|
|
content = tomli_w.dumps(data)
|
|
new_file.write(content)
|
|
|
|
|
|
# TODO Merge with update_proxies
|
|
def backup_obsolete(file, proxy_name, proxy_type, proxy_url, action='add'):
|
|
"""
|
|
Add given URL to given list.
|
|
|
|
Parameters
|
|
----------
|
|
file : str
|
|
Filename.
|
|
proxy_name : str
|
|
Proxy name.
|
|
proxy_type : str
|
|
Proxy title.
|
|
proxy_url : str
|
|
Proxy URL.
|
|
action : str
|
|
add or remove
|
|
|
|
Returns
|
|
-------
|
|
None.
|
|
"""
|
|
data = open_config_file('proxies_obsolete.toml')
|
|
proxy_list = data['proxies'][proxy_name][proxy_type]
|
|
proxy_list.extend([proxy_url])
|
|
with open(file, 'w') as new_file:
|
|
content = tomli_w.dumps(data)
|
|
new_file.write(content)
|
|
|
|
|
|
def create_skeleton(file):
|
|
with open(file, 'rb') as original_file:
|
|
data = tomllib.load(original_file)
|
|
data = clear_values(data)
|
|
with open('proxies_obsolete.toml', 'w') as new_file:
|
|
content = tomli_w.dumps(data)
|
|
new_file.write(content)
|
|
|
|
|
|
def clear_values(input):
|
|
if isinstance(input, dict):
|
|
return {k: clear_values(v) for k, v in input.items()}
|
|
elif isinstance(input, list):
|
|
return ['']
|
|
else:
|
|
return ''
|
|
|
|
|
|
# TODO Return dict instead of list
|
|
def get_value(filename, section, keys):
|
|
"""
|
|
Get setting value.
|
|
|
|
Parameters
|
|
----------
|
|
filename : str
|
|
INI filename.
|
|
keys : list or str
|
|
A single key as string or multiple keys as list.
|
|
section : str
|
|
INI Section.
|
|
|
|
Returns
|
|
-------
|
|
result : list or str
|
|
A single value as string or multiple values as list.
|
|
"""
|
|
result = None
|
|
config_res = configparser.RawConfigParser()
|
|
config_dir = get_default_config_directory()
|
|
if not os.path.isdir(config_dir):
|
|
config_dir = '/usr/share/slixfeed/'
|
|
if not os.path.isdir(config_dir):
|
|
config_dir = os.path.dirname(__file__) + "/assets"
|
|
config_file = os.path.join(config_dir, filename + ".ini")
|
|
config_res.read(config_file)
|
|
if config_res.has_section(section):
|
|
section_res = config_res[section]
|
|
if isinstance(keys, list):
|
|
result = []
|
|
for key in keys:
|
|
if key in section_res:
|
|
value = section_res[key]
|
|
logger.debug("Found value {} for key {}".format(value, key))
|
|
else:
|
|
value = ''
|
|
logger.debug("Missing key:", key)
|
|
result.extend([value])
|
|
elif isinstance(keys, str):
|
|
key = keys
|
|
if key in section_res:
|
|
result = section_res[key]
|
|
logger.debug("Found value {} for key {}".format(result, key))
|
|
else:
|
|
result = ''
|
|
# logger.error("Missing key:", key)
|
|
if result == None:
|
|
logger.error(
|
|
"Check configuration file {}.ini for "
|
|
"missing key(s) \"{}\" under section [{}].".format(
|
|
filename, keys, section)
|
|
)
|
|
else:
|
|
return result
|
|
|
|
|
|
# TODO Store config file as an object in runtime, otherwise
|
|
# the file will be opened time and time again.
|
|
# TODO Copy file from /etc/slixfeed/ or /usr/share/slixfeed/
|
|
def get_value_default(filename, section, key):
|
|
"""
|
|
Get settings default value.
|
|
|
|
Parameters
|
|
----------
|
|
key : str
|
|
Key: archive, enabled, interval,
|
|
length, old, quantum, random.
|
|
|
|
Returns
|
|
-------
|
|
result : str
|
|
Value.
|
|
"""
|
|
config_res = configparser.RawConfigParser()
|
|
config_dir = get_default_config_directory()
|
|
if not os.path.isdir(config_dir):
|
|
config_dir = '/usr/share/slixfeed/'
|
|
config_file = os.path.join(config_dir, filename + ".ini")
|
|
config_res.read(config_file)
|
|
if config_res.has_section(section):
|
|
result = config_res[section][key]
|
|
return result
|
|
|
|
|
|
# TODO DELETE THIS FUNCTION OR KEEP ONLY THE CODE BELOW NOTE
|
|
# IF CODE BELOW NOTE IS KEPT, RENAME FUNCTION TO open_toml
|
|
def open_config_file(filename):
|
|
"""
|
|
Get settings default value.
|
|
|
|
Parameters
|
|
----------
|
|
filename : str
|
|
Filename of toml file.
|
|
|
|
Returns
|
|
-------
|
|
result : list
|
|
List of pathnames or keywords.
|
|
"""
|
|
config_dir = get_default_config_directory()
|
|
if not os.path.isdir(config_dir):
|
|
config_dir = '/usr/share/slixfeed/'
|
|
if not os.path.isdir(config_dir):
|
|
config_dir = os.path.dirname(__file__) + "/assets"
|
|
config_file = os.path.join(config_dir, filename)
|
|
# NOTE THIS IS THE IMPORTANT CODE
|
|
with open(config_file, mode="rb") as defaults:
|
|
# default = yaml.safe_load(defaults)
|
|
# result = default[key]
|
|
result = tomllib.load(defaults)
|
|
return result
|
|
|
|
|
|
def get_default_data_directory():
|
|
"""
|
|
Determine the directory path where dbfile will be stored.
|
|
|
|
* If $XDG_DATA_HOME is defined, use it;
|
|
* else if $HOME exists, use it;
|
|
* else if the platform is Windows, use %APPDATA%;
|
|
* else use the current directory.
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
Path to database file.
|
|
|
|
Note
|
|
----
|
|
This function was taken from project buku.
|
|
|
|
See https://github.com/jarun/buku
|
|
|
|
* Arun Prakash Jana (jarun)
|
|
* Dmitry Marakasov (AMDmi3)
|
|
"""
|
|
# data_home = xdg.BaseDirectory.xdg_data_home
|
|
data_home = os.environ.get('XDG_DATA_HOME')
|
|
if data_home is None:
|
|
if os.environ.get('HOME') is None:
|
|
if sys.platform == 'win32':
|
|
data_home = os.environ.get('APPDATA')
|
|
if data_home is None:
|
|
return os.path.abspath('.slixfeed/data')
|
|
else:
|
|
return os.path.abspath('.slixfeed/data')
|
|
else:
|
|
data_home = os.path.join(
|
|
os.environ.get('HOME'), '.local', 'share'
|
|
)
|
|
return os.path.join(data_home, 'slixfeed')
|
|
|
|
|
|
def get_default_cache_directory():
|
|
"""
|
|
Determine the directory path where dbfile will be stored.
|
|
|
|
* If $XDG_DATA_HOME is defined, use it;
|
|
* else if $HOME exists, use it;
|
|
* else if the platform is Windows, use %APPDATA%;
|
|
* else use the current directory.
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
Path to cache directory.
|
|
"""
|
|
# data_home = xdg.BaseDirectory.xdg_data_home
|
|
data_home = os.environ.get('XDG_CACHE_HOME')
|
|
if data_home is None:
|
|
if os.environ.get('HOME') is None:
|
|
if sys.platform == 'win32':
|
|
data_home = os.environ.get('APPDATA')
|
|
if data_home is None:
|
|
return os.path.abspath('.slixfeed/cache')
|
|
else:
|
|
return os.path.abspath('.slixfeed/cache')
|
|
else:
|
|
data_home = os.path.join(
|
|
os.environ.get('HOME'), '.cache'
|
|
)
|
|
return os.path.join(data_home, 'slixfeed')
|
|
|
|
|
|
# TODO Write a similar function for file.
|
|
# NOTE the is a function of directory, noot file.
|
|
def get_default_config_directory():
|
|
"""
|
|
Determine the directory path where configuration will be stored.
|
|
|
|
* If $XDG_CONFIG_HOME is defined, use it;
|
|
* else if $HOME exists, use it;
|
|
* else if the platform is Windows, use %APPDATA%;
|
|
* else use the current directory.
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
Path to configuration directory.
|
|
"""
|
|
# config_home = xdg.BaseDirectory.xdg_config_home
|
|
config_home = os.environ.get('XDG_CONFIG_HOME')
|
|
if config_home is None:
|
|
if os.environ.get('HOME') is None:
|
|
if sys.platform == 'win32':
|
|
config_home = os.environ.get('APPDATA')
|
|
if config_home is None:
|
|
return os.path.abspath('.')
|
|
else:
|
|
return os.path.abspath('.')
|
|
else:
|
|
config_home = os.path.join(
|
|
os.environ.get('HOME'), '.config'
|
|
)
|
|
return os.path.join(config_home, 'slixfeed')
|
|
|
|
|
|
def get_pathname_to_database(jid_file):
|
|
"""
|
|
Callback function to instantiate action on database.
|
|
|
|
Parameters
|
|
----------
|
|
jid_file : str
|
|
Filename.
|
|
callback : ?
|
|
Function name.
|
|
message : str, optional
|
|
Optional kwarg when a message is a part or
|
|
required argument. The default is None.
|
|
|
|
Returns
|
|
-------
|
|
object
|
|
Coroutine object.
|
|
"""
|
|
db_dir = get_default_data_directory()
|
|
if not os.path.isdir(db_dir):
|
|
os.mkdir(db_dir)
|
|
if not os.path.isdir(db_dir + "/sqlite"):
|
|
os.mkdir(db_dir + "/sqlite")
|
|
db_file = os.path.join(db_dir, "sqlite", r"{}.db".format(jid_file))
|
|
sqlite.create_tables(db_file)
|
|
return db_file
|
|
# await set_default_values(db_file)
|
|
# if message:
|
|
# return await callback(db_file, message)
|
|
# else:
|
|
# return await callback(db_file)
|
|
|
|
|
|
async def add_to_list(newwords, keywords):
|
|
"""
|
|
Append new keywords to list.
|
|
|
|
Parameters
|
|
----------
|
|
newwords : str
|
|
List of new keywords.
|
|
keywords : str
|
|
List of current keywords.
|
|
|
|
Returns
|
|
-------
|
|
val : str
|
|
List of current keywords and new keywords.
|
|
"""
|
|
if isinstance(keywords, str) or keywords is None:
|
|
try:
|
|
keywords = keywords.split(",")
|
|
except:
|
|
keywords = []
|
|
newwords = newwords.lower().split(",")
|
|
for word in newwords:
|
|
word = word.strip()
|
|
if len(word) and word not in keywords:
|
|
keywords.extend([word])
|
|
keywords.sort()
|
|
val = ",".join(keywords)
|
|
return val
|
|
|
|
|
|
async def remove_from_list(newwords, keywords):
|
|
"""
|
|
Remove given keywords from list.
|
|
|
|
Parameters
|
|
----------
|
|
newwords : str
|
|
List of new keywords.
|
|
keywords : str
|
|
List of current keywords.
|
|
|
|
Returns
|
|
-------
|
|
val : str
|
|
List of new keywords.
|
|
"""
|
|
if isinstance(keywords, str) or keywords is None:
|
|
try:
|
|
keywords = keywords.split(",")
|
|
except:
|
|
keywords = []
|
|
newwords = newwords.lower().split(",")
|
|
for word in newwords:
|
|
word = word.strip()
|
|
if len(word) and word in keywords:
|
|
keywords.remove(word)
|
|
keywords.sort()
|
|
val = ",".join(keywords)
|
|
return val
|
|
|
|
|
|
def is_include_keyword(db_file, key, string):
|
|
"""
|
|
Check keyword match.
|
|
|
|
Parameters
|
|
----------
|
|
db_file : str
|
|
Path to database file.
|
|
type : str
|
|
"allow" or "deny".
|
|
string : str
|
|
String.
|
|
|
|
Returns
|
|
-------
|
|
Matched keyword or None.
|
|
|
|
"""
|
|
# async def reject(db_file, string):
|
|
# async def is_blacklisted(db_file, string):
|
|
keywords = sqlite.get_filter_value(db_file, key)
|
|
keywords = keywords[0] if keywords else ''
|
|
keywords = keywords.split(",")
|
|
keywords = keywords + (open_config_file("lists.toml")[key])
|
|
for keyword in keywords:
|
|
if not keyword or len(keyword) < 2:
|
|
continue
|
|
if keyword in string.lower():
|
|
# print(">>> ACTIVATE", i)
|
|
# return 1
|
|
return keyword
|
|
|
|
"""
|
|
|
|
This code was tested at module datahandler
|
|
|
|
reject = 0
|
|
blacklist = sqlite.get_setting_value(
|
|
db_file,
|
|
"deny"
|
|
)
|
|
# print(">>> blacklist:")
|
|
# print(blacklist)
|
|
# breakpoint()
|
|
if blacklist:
|
|
blacklist = blacklist.split(",")
|
|
# print(">>> blacklist.split")
|
|
# print(blacklist)
|
|
# breakpoint()
|
|
for i in blacklist:
|
|
# print(">>> length", len(i))
|
|
# breakpoint()
|
|
# if len(i):
|
|
if not i or len(i) < 2:
|
|
print(">>> continue due to length", len(i))
|
|
# breakpoint()
|
|
continue
|
|
# print(title)
|
|
# print(">>> blacklisted word:", i)
|
|
# breakpoint()
|
|
test = (title + " " + summary + " " + link)
|
|
if i in test.lower():
|
|
reject = 1
|
|
break
|
|
|
|
if reject:
|
|
print("rejected:",title)
|
|
entry = (title, '', link, source, date, 1);
|
|
|
|
"""
|