259 lines
12 KiB
Python
259 lines
12 KiB
Python
#!/usr/bin/python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from blasta.database.sqlite import DatabaseSQLite
|
|
from blasta.utilities.cryptography import UtilitiesCryptography
|
|
from blasta.utilities.syndication import UtilitiesSyndication
|
|
from blasta.xmpp.pubsub import XmppPubsub
|
|
import os
|
|
from slixmpp.stanza.iq import Iq
|
|
import tomli_w
|
|
|
|
try:
|
|
import tomllib
|
|
except:
|
|
import tomli as tomllib
|
|
|
|
class UtilitiesData:
|
|
|
|
def cache_items_and_tags_search(directory_cache, entries, jid, query):
|
|
"""Create a cache file of node items and tags."""
|
|
item_ids = []
|
|
tags = {}
|
|
for entry in entries:
|
|
entry_tags = entry['tags']
|
|
entry_url_hash = entry['url_hash']
|
|
tags_to_include = []
|
|
if query in ' '.join([entry['title'], entry['link'], entry['summary'], ' '.join(entry_tags)]):
|
|
item_ids.append(entry_url_hash)
|
|
tags_to_include += entry_tags
|
|
for tag_to_include in tags_to_include:
|
|
tags[tag_to_include] = tags[tag_to_include]+1 if tag_to_include in tags else 1
|
|
if tags:
|
|
tags = dict(sorted(tags.items(), key=lambda item: (-item[1], item[0])))
|
|
tags = dict(list(tags.items())[:30])
|
|
if item_ids:
|
|
filename = os.path.join(directory_cache, 'data', jid + '_query.toml')
|
|
data = {
|
|
'item_ids' : item_ids,
|
|
'tags' : tags}
|
|
UtilitiesData.save_to_toml(filename, data)
|
|
|
|
def cache_items_and_tags_filter(directory_cache, entries, jid, tag):
|
|
"""Create a cache file of node items and tags."""
|
|
item_ids = []
|
|
tags = {}
|
|
for entry in entries:
|
|
entry_tags = entry['tags']
|
|
entry_url_hash = entry['url_hash']
|
|
tags_to_include = []
|
|
if tag in entry_tags:
|
|
item_ids.append(entry_url_hash)
|
|
tags_to_include += entry_tags
|
|
for tag_to_include in tags_to_include:
|
|
tags[tag_to_include] = tags[tag_to_include]+1 if tag_to_include in tags else 1
|
|
if tags:
|
|
tags = dict(sorted(tags.items(), key=lambda item: (-item[1], item[0])))
|
|
tags = dict(list(tags.items())[:30])
|
|
del tags[tag]
|
|
if item_ids:
|
|
directory = os.path.join(directory_cache, 'data', jid)
|
|
if not os.path.exists(directory):
|
|
os.mkdir(directory)
|
|
filename = os.path.join(directory, tag + '.toml')
|
|
# Add support for search query
|
|
#filename = 'data/{}/query:{}.toml'.format(jid, query)
|
|
#filename = 'data/{}/tag:{}.toml'.format(jid, tag)
|
|
data = {
|
|
'item_ids' : item_ids,
|
|
'tags' : tags}
|
|
UtilitiesData.save_to_toml(filename, data)
|
|
|
|
def cache_items_and_tags(directory_cache, entries, jid):
|
|
"""Create a cache file of node items and tags."""
|
|
item_ids = []
|
|
tags = {}
|
|
for entry in entries:
|
|
entry_tags = entry['tags']
|
|
entry_url_hash = entry['url_hash']
|
|
tags_to_include = []
|
|
item_ids.append(entry_url_hash)
|
|
tags_to_include += entry_tags
|
|
for tag_to_include in tags_to_include:
|
|
tags[tag_to_include] = tags[tag_to_include]+1 if tag_to_include in tags else 1
|
|
if tags:
|
|
tags = dict(sorted(tags.items(), key=lambda item: (-item[1], item[0])))
|
|
tags = dict(list(tags.items())[:30])
|
|
if item_ids:
|
|
filename = os.path.join(directory_cache, 'data', jid + '.toml')
|
|
data = {
|
|
'item_ids' : item_ids,
|
|
'tags' : tags}
|
|
UtilitiesData.save_to_toml(filename, data)
|
|
|
|
def extract_iq_items(iq, jabber_id):
|
|
iq_items = iq['pubsub']['items']
|
|
entries = []
|
|
name = jabber_id.split('@')[0]
|
|
for iq_item in iq_items:
|
|
item_payload = iq_item['payload']
|
|
entry = UtilitiesSyndication.extract_items(item_payload)
|
|
entries.append(entry)
|
|
# TODO Handle this with XEP-0059 (reverse: bool), instead of reversing it.
|
|
entries.reverse()
|
|
return entries
|
|
|
|
def extract_iq_items_extra(db_file, iq, jabber_id, limit=None):
|
|
iq_items = iq['pubsub']['items']
|
|
entries = []
|
|
name = jabber_id.split('@')[0]
|
|
for iq_item in iq_items:
|
|
item_payload = iq_item['payload']
|
|
entry = UtilitiesSyndication.extract_items(item_payload, limit)
|
|
url_hash = UtilitiesCryptography.hash_url_to_md5(entry['link'])
|
|
iq_item_id = iq_item['id']
|
|
if iq_item_id != url_hash:
|
|
logging.error('Item ID does not match MD5. id: {} hash: {}'.format(iq_item_id, url_hash))
|
|
logging.warn('Item ID does not match MD5. id: {} hash: {}'.format(iq_item_id, url_hash))
|
|
instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash)
|
|
if entry:
|
|
entry['instances'] = instances or 0
|
|
entry['jid'] = jabber_id
|
|
entry['name'] = name
|
|
entry['url_hash'] = url_hash
|
|
entries.append(entry)
|
|
# TODO Handle this with XEP-0059 (reverse: bool), instead of reversing it.
|
|
entries.reverse()
|
|
result = entries
|
|
return result
|
|
|
|
def load_data_toml(data: dict) -> dict:
|
|
return tomllib.loads(data)
|
|
|
|
def open_file_toml(filename: str) -> dict:
|
|
with open(filename, mode="rb") as fn:
|
|
data = tomllib.load(fn)
|
|
return data
|
|
|
|
def organize_tags(tags):
|
|
tags_organized = []
|
|
tags = tags.split(',')
|
|
#tags = sorted(set(tags))
|
|
for tag in tags:
|
|
if tag:
|
|
tag = tag.lower().strip()
|
|
if tag not in tags_organized:
|
|
tags_organized.append(tag)
|
|
return sorted(tags_organized)
|
|
|
|
def remove_item_from_cache(directory_cache, jabber_id, node, url_hash):
|
|
filename_items = os.path.join(directory_cache, 'items', jabber_id + '.toml')
|
|
entries_cache = UtilitiesData.open_file_toml(filename_items)
|
|
if node in entries_cache:
|
|
entries_cache_node = entries_cache[node]
|
|
for entry_cache in entries_cache_node:
|
|
if entry_cache['url_hash'] == url_hash:
|
|
entry_cache_index = entries_cache_node.index(entry_cache)
|
|
del entries_cache_node[entry_cache_index]
|
|
break
|
|
data_items = entries_cache
|
|
UtilitiesData.save_to_toml(filename_items, data_items)
|
|
|
|
def save_to_json(filename: str, data) -> None:
|
|
with open(filename, 'w') as f:
|
|
json.dump(data, f)
|
|
|
|
def save_to_toml(filename: str, data: dict) -> None:
|
|
with open(filename, 'w') as fn:
|
|
data_as_string = tomli_w.dumps(data)
|
|
fn.write(data_as_string)
|
|
|
|
async def update_cache_and_database(
|
|
db_file, directory_cache, xmpp_instance, jabber_id: str, node_type: str, node_id: str):
|
|
# Download identifiers of node items.
|
|
iq = await XmppPubsub.get_node_item_ids(xmpp_instance, jabber_id, node_id)
|
|
if isinstance(iq, Iq):
|
|
iq_items_remote = iq['disco_items']
|
|
|
|
# Cache a list of identifiers of node items to a file.
|
|
iq_items_remote_name = []
|
|
for iq_item_remote in iq_items_remote:
|
|
iq_item_remote_name = iq_item_remote['name']
|
|
iq_items_remote_name.append(iq_item_remote_name)
|
|
|
|
#data_item_ids = {'iq_items' : iq_items_remote_name}
|
|
#filename_item_ids = 'item_ids/' + jabber_id + '.toml'
|
|
#Data.save_to_toml(filename_item_ids, data_item_ids)
|
|
|
|
filename_items = os.path.join(directory_cache, 'items', jabber_id + '.toml')
|
|
if not os.path.exists(filename_items) or os.path.getsize(filename_items) in (0, 13):
|
|
iq = await XmppPubsub.get_node_items(xmpp_instance, jabber_id, node_id)
|
|
if isinstance(iq, Iq):
|
|
entries_cache_node = UtilitiesData.extract_iq_items_extra(db_file, iq, jabber_id)
|
|
data_items = {node_type : entries_cache_node}
|
|
UtilitiesData.save_to_toml(filename_items, data_items)
|
|
return ['fine', iq] # TODO Remove this line
|
|
else:
|
|
return ['error', iq]
|
|
else:
|
|
entries_cache = UtilitiesData.open_file_toml(filename_items)
|
|
if not node_type in entries_cache: return ['error', 'Directory "{}" is empty'. format(node_type)]
|
|
entries_cache_node = entries_cache[node_type]
|
|
|
|
# Check whether items still exist on node
|
|
for entry in entries_cache_node:
|
|
iq_item_remote_exist = False
|
|
url_hash = None
|
|
for url_hash in iq_items_remote_name:
|
|
if url_hash == entry['url_hash']:
|
|
iq_item_remote_exist = True
|
|
break
|
|
if url_hash and not iq_item_remote_exist:
|
|
await DatabaseSQLite.delete_combination_row_by_jid_and_url_hash(
|
|
db_file, url_hash, jabber_id)
|
|
entry_index = entries_cache_node.index(entry)
|
|
del entries_cache_node[entry_index]
|
|
|
|
# Check for new items on node
|
|
entries_cache_node_new = []
|
|
for url_hash in iq_items_remote_name:
|
|
iq_item_local_exist = False
|
|
for entry in entries_cache_node:
|
|
if url_hash == entry['url_hash']:
|
|
iq_item_local_exist = True
|
|
break
|
|
if not iq_item_local_exist:
|
|
iq = await XmppPubsub.get_node_item(
|
|
xmpp_instance, jabber_id, node_id, url_hash)
|
|
if isinstance(iq, Iq):
|
|
entries_iq = UtilitiesData.extract_iq_items_extra(db_file, iq, jabber_id)
|
|
entries_cache_node_new += entries_iq
|
|
else:
|
|
# TODO
|
|
# Handle this concern in a different fashion,
|
|
# instead of stopping the whole operation.
|
|
return ['error', iq]
|
|
entries_cache_node += entries_cache_node_new
|
|
|
|
if node_type == 'public':
|
|
# Fast (low I/O)
|
|
if not DatabaseSQLite.get_jid_id_by_jid(db_file, jabber_id):
|
|
await DatabaseSQLite.set_jid(db_file, jabber_id)
|
|
#await DatabaseSQLite.add_new_entries(db_file, entries)
|
|
await DatabaseSQLite.add_tags(db_file, entries_cache_node)
|
|
# Slow (high I/O)
|
|
for entry in entries_cache_node:
|
|
url_hash = entry['url_hash']
|
|
if not DatabaseSQLite.get_entry_id_by_url_hash(db_file, url_hash):
|
|
await DatabaseSQLite.add_new_entries(db_file, entries_cache_node)
|
|
await DatabaseSQLite.associate_entries_tags_jids(db_file, entry)
|
|
#elif not DatabaseSQLite.is_jid_associated_with_url_hash(db_file, jabber_id, url_hash):
|
|
# await DatabaseSQLite.associate_entries_tags_jids(db_file, entry)
|
|
else:
|
|
await DatabaseSQLite.associate_entries_tags_jids(db_file, entry)
|
|
|
|
data_items = entries_cache
|
|
UtilitiesData.save_to_toml(filename_items, data_items)
|
|
return ['fine', iq] # TODO Remove this line
|
|
else:
|
|
return ['error', iq]
|