Blasta/blasta/helpers/data.py
Schimon Jehudah, Adv. 77ac4c0ed9 Modularize code;
Update document README.
2024-11-13 16:41:33 +02:00

257 lines
11 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
from blasta.helpers.utilities import Utilities
from blasta.sqlite import SQLite
from blasta.xml.syndication import Syndication
from blasta.xmpp.pubsub import XmppPubsub
import os
from slixmpp.stanza.iq import Iq
import tomli_w
try:
import tomllib
except:
import tomli as tomllib
class Data:
def cache_items_and_tags_search(directory_cache, entries, jid, query):
"""Create a cache file of node items and tags."""
item_ids = []
tags = {}
for entry in entries:
entry_tags = entry['tags']
entry_url_hash = entry['url_hash']
tags_to_include = []
if query in ' '.join([entry['title'], entry['link'], entry['summary'], ' '.join(entry_tags)]):
item_ids.append(entry_url_hash)
tags_to_include += entry_tags
for tag_to_include in tags_to_include:
tags[tag_to_include] = tags[tag_to_include]+1 if tag_to_include in tags else 1
if tags:
tags = dict(sorted(tags.items(), key=lambda item: (-item[1], item[0])))
tags = dict(list(tags.items())[:30])
if item_ids:
filename = os.path.join(directory_cache, 'data', jid + '_query.toml')
data = {
'item_ids' : item_ids,
'tags' : tags}
Data.save_to_toml(filename, data)
def cache_items_and_tags_filter(directory_cache, entries, jid, tag):
"""Create a cache file of node items and tags."""
item_ids = []
tags = {}
for entry in entries:
entry_tags = entry['tags']
entry_url_hash = entry['url_hash']
tags_to_include = []
if tag in entry_tags:
item_ids.append(entry_url_hash)
tags_to_include += entry_tags
for tag_to_include in tags_to_include:
tags[tag_to_include] = tags[tag_to_include]+1 if tag_to_include in tags else 1
if tags:
tags = dict(sorted(tags.items(), key=lambda item: (-item[1], item[0])))
tags = dict(list(tags.items())[:30])
del tags[tag]
if item_ids:
directory = os.path.join(directory_cache, 'data', jid)
if not os.path.exists(directory):
os.mkdir(directory)
filename = os.path.join(directory, tag)
# Add support for search query
#filename = 'data/{}/query:{}.toml'.format(jid, query)
#filename = 'data/{}/tag:{}.toml'.format(jid, tag)
data = {
'item_ids' : item_ids,
'tags' : tags}
Data.save_to_toml(filename, data)
def cache_items_and_tags(directory_cache, entries, jid):
"""Create a cache file of node items and tags."""
item_ids = []
tags = {}
for entry in entries:
entry_tags = entry['tags']
entry_url_hash = entry['url_hash']
tags_to_include = []
item_ids.append(entry_url_hash)
tags_to_include += entry_tags
for tag_to_include in tags_to_include:
tags[tag_to_include] = tags[tag_to_include]+1 if tag_to_include in tags else 1
if tags:
tags = dict(sorted(tags.items(), key=lambda item: (-item[1], item[0])))
tags = dict(list(tags.items())[:30])
if item_ids:
filename = os.path.join(directory_cache, 'data', jid + '.toml')
data = {
'item_ids' : item_ids,
'tags' : tags}
Data.save_to_toml(filename, data)
def extract_iq_items(iq, jabber_id):
iq_items = iq['pubsub']['items']
entries = []
name = jabber_id.split('@')[0]
for iq_item in iq_items:
item_payload = iq_item['payload']
entry = Syndication.extract_items(item_payload)
entries.append(entry)
# TODO Handle this with XEP-0059 (reverse: bool), instead of reversing it.
entries.reverse()
return entries
def extract_iq_items_extra(iq, jabber_id, limit=None):
iq_items = iq['pubsub']['items']
entries = []
name = jabber_id.split('@')[0]
for iq_item in iq_items:
item_payload = iq_item['payload']
entry = Syndication.extract_items(item_payload, limit)
url_hash = Utilities.hash_url_to_md5(entry['link'])
iq_item_id = iq_item['id']
if iq_item_id != url_hash:
logging.error('Item ID does not match MD5. id: {} hash: {}'.format(iq_item_id, url_hash))
logging.warn('Item ID does not match MD5. id: {} hash: {}'.format(iq_item_id, url_hash))
db_file = 'main.sqlite'
instances = SQLite.get_entry_instances_by_url_hash(db_file, url_hash)
if entry:
entry['instances'] = instances or 0
entry['jid'] = jabber_id
entry['name'] = name
entry['url_hash'] = url_hash
entries.append(entry)
# TODO Handle this with XEP-0059 (reverse: bool), instead of reversing it.
entries.reverse()
result = entries
return result
def open_file_toml(filename: str) -> dict:
with open(filename, mode="rb") as fn:
data = tomllib.load(fn)
return data
def organize_tags(tags):
tags_organized = []
tags = tags.split(',')
#tags = sorted(set(tags))
for tag in tags:
if tag:
tag = tag.lower().strip()
if tag not in tags_organized:
tags_organized.append(tag)
return sorted(tags_organized)
def remove_item_from_cache(directory_cache, jabber_id, node, url_hash):
filename_items = os.path.join(directory_cache, 'items', jabber_id + '.toml')
entries_cache = Data.open_file_toml(filename_items)
if node in entries_cache:
entries_cache_node = entries_cache[node]
for entry_cache in entries_cache_node:
if entry_cache['url_hash'] == url_hash:
entry_cache_index = entries_cache_node.index(entry_cache)
del entries_cache_node[entry_cache_index]
break
data_items = entries_cache
Data.save_to_toml(filename_items, data_items)
def save_to_json(filename: str, data) -> None:
with open(filename, 'w') as f:
json.dump(data, f)
def save_to_toml(filename: str, data: dict) -> None:
with open(filename, 'w') as fn:
data_as_string = tomli_w.dumps(data)
fn.write(data_as_string)
async def update_cache_and_database(directory_cache, xmpp_instance, jabber_id: str, node_type: str, node_id: str):
# Download identifiers of node items.
iq = await XmppPubsub.get_node_item_ids(xmpp_instance, jabber_id, node_id)
if isinstance(iq, Iq):
iq_items_remote = iq['disco_items']
# Cache a list of identifiers of node items to a file.
iq_items_remote_name = []
for iq_item_remote in iq_items_remote:
iq_item_remote_name = iq_item_remote['name']
iq_items_remote_name.append(iq_item_remote_name)
#data_item_ids = {'iq_items' : iq_items_remote_name}
#filename_item_ids = 'item_ids/' + jabber_id + '.toml'
#Data.save_to_toml(filename_item_ids, data_item_ids)
filename_items = os.path.join(directory_cache, 'items', jabber_id + '.toml')
if not os.path.exists(filename_items) or os.path.getsize(filename_items) in (0, 13):
iq = await XmppPubsub.get_node_items(xmpp_instance, jabber_id, node_id)
if isinstance(iq, Iq):
entries_cache_node = Data.extract_iq_items_extra(iq, jabber_id)
data_items = {node_type : entries_cache_node}
Data.save_to_toml(filename_items, data_items)
return ['fine', iq] # TODO Remove this line
else:
return ['error', iq]
else:
entries_cache = Data.open_file_toml(filename_items)
if not node_type in entries_cache: return ['error', 'Directory "{}" is empty'. format(node_type)]
entries_cache_node = entries_cache[node_type]
db_file = 'main.sqlite'
# Check whether items still exist on node
for entry in entries_cache_node:
iq_item_remote_exist = False
url_hash = None
for url_hash in iq_items_remote_name:
if url_hash == entry['url_hash']:
iq_item_remote_exist = True
break
if url_hash and not iq_item_remote_exist:
await SQLite.delete_combination_row_by_jid_and_url_hash(
db_file, url_hash, jabber_id)
entry_index = entries_cache_node.index(entry)
del entries_cache_node[entry_index]
# Check for new items on node
entries_cache_node_new = []
for url_hash in iq_items_remote_name:
iq_item_local_exist = False
for entry in entries_cache_node:
if url_hash == entry['url_hash']:
iq_item_local_exist = True
break
if not iq_item_local_exist:
iq = await XmppPubsub.get_node_item(
xmpp_instance, jabber_id, node_id, url_hash)
if isinstance(iq, Iq):
entries_iq = Data.extract_iq_items_extra(iq, jabber_id)
entries_cache_node_new += entries_iq
else:
# TODO
# Handle this concern in a different fashion,
# instead of stopping the whole operation.
return ['error', iq]
entries_cache_node += entries_cache_node_new
if node_type == 'public':
# Fast (low I/O)
if not SQLite.get_jid_id_by_jid(db_file, jabber_id):
await SQLite.set_jid(db_file, jabber_id)
#await SQLite.add_new_entries(db_file, entries)
await SQLite.add_tags(db_file, entries_cache_node)
# Slow (high I/O)
for entry in entries_cache_node:
url_hash = entry['url_hash']
if not SQLite.get_entry_id_by_url_hash(db_file, url_hash):
await SQLite.add_new_entries(db_file, entries_cache_node)
await SQLite.associate_entries_tags_jids(db_file, entry)
#elif not SQLite.is_jid_associated_with_url_hash(db_file, jabber_id, url_hash):
# await SQLite.associate_entries_tags_jids(db_file, entry)
else:
await SQLite.associate_entries_tags_jids(db_file, entry)
data_items = entries_cache
Data.save_to_toml(filename_items, data_items)
return ['fine', iq] # TODO Remove this line
else:
return ['error', iq]