#!/usr/bin/python # -*- coding: utf-8 -*- import asyncio from blasta.config import Cache, Settings, Share from blasta.database.sqlite import DatabaseSQLite from blasta.utilities.cryptography import UtilitiesCryptography from blasta.utilities.data import UtilitiesData from blasta.utilities.date import UtilitiesDate from blasta.utilities.http import UtilitiesHttp from blasta.utilities.syndication import UtilitiesSyndication from blasta.xmpp.form import DataForm from blasta.xmpp.instance import XmppInstance from blasta.xmpp.iq import XmppIq from blasta.xmpp.pubsub import XmppPubsub from datetime import datetime from fastapi import Cookie, FastAPI, File, Form, HTTPException, Request, Response, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates import os import random from slixmpp.stanza.iq import Iq from starlette.responses import RedirectResponse import urllib.parse import xml.etree.ElementTree as ET class HttpInstance: def __init__(self, accounts, sessions): directory_cache = Cache.get_directory() directory_cache_data = os.path.join(directory_cache, 'data') directory_cache_export = os.path.join(directory_cache, 'export') directory_cache_items = os.path.join(directory_cache, 'items') self.directory_cache = directory_cache directory_data = Share.get_directory() directory_data_graphic = os.path.join(directory_data, 'graphic') directory_data_graphic_ico = os.path.join(directory_data_graphic, 'blasta.ico') directory_data_graphic_svg = os.path.join(directory_data_graphic, 'blasta.svg') directory_data_script = os.path.join(directory_data, 'script') directory_data_stylesheet = os.path.join(directory_data, 'stylesheet') directory_data_template = os.path.join(directory_data, 'template') #filename_database = os.path.join(directory_data, 'main.sqlite') db_file = os.path.join(directory_data, 'main.sqlite') self.app = FastAPI() templates = Jinja2Templates(directory=directory_data_template) self.app.mount('/data', StaticFiles(directory=directory_cache_data), name='data') self.app.mount('/export', StaticFiles(directory=directory_cache_export), name='export') self.app.mount('/graphic', StaticFiles(directory=directory_data_graphic), name='graphic') self.app.mount('/script', StaticFiles(directory=directory_data_script), name='script') self.app.mount('/stylesheet', StaticFiles(directory=directory_data_stylesheet), name='stylesheet') directory_settings = Settings.get_directory() filename_settings = os.path.join(directory_settings, 'settings.toml') data = UtilitiesData.open_file_toml(filename_settings) contacts = data['contacts'] contact_email = contacts['email'] contact_irc_channel = contacts['irc_channel'] contact_irc_server = contacts['irc_server'] contact_mix = contacts['mix'] contact_muc = contacts['muc'] contact_xmpp = contacts['xmpp'] settings = data['settings'] jabber_id_pubsub = settings['pubsub'] journal = settings['journal'] node_settings_id = settings['node_settings_id'] node_settings_title = settings['node_settings_title'] node_settings_subtitle = settings['node_settings_subtitle'] node_public_id = settings['node_public_id'] node_public_title = settings['node_public_title'] node_public_subtitle = settings['node_public_subtitle'] node_private_id = settings['node_private_id'] node_private_title = settings['node_private_title'] node_private_subtitle = settings['node_private_subtitle'] node_read_id = settings['node_read_id'] node_read_title = settings['node_read_title'] node_read_subtitle = settings['node_read_subtitle'] nodes = { 'public' : { 'name' : node_public_id, 'title' : node_public_title, 'subtitle' : node_public_subtitle, 'access_model' : 'presence'}, 'private' : { 'name' : node_private_id, 'title' : node_private_title, 'subtitle' : node_private_subtitle, 'access_model' : 'whitelist'}, 'read' : { 'name' : node_read_id, 'title' : node_read_title, 'subtitle' : node_read_subtitle, 'access_model' : 'whitelist'} } origins = [ "http://localhost", "http://localhost:8080", "http://127.0.0.1", "http://127.0.0.1:8080", ] self.app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # This is workaround for setting a "cookie" issue # It appears that there is a problem to set or send a "cookie" when a template is returned. @self.app.middleware('http') async def middleware_handler(request: Request, call_next): # Handle URL query if request.url.path != '/save': param_url = request.query_params.get('url', '') or None param_hash = request.query_params.get('hash', '') or None if param_hash: return RedirectResponse(url='/url/' + param_hash) if param_url: url_hash = UtilitiesCryptography.hash_url_to_md5(param_url) return RedirectResponse(url='/url/' + url_hash) response = await call_next(request) jabber_id = session_key = None infoo = { 'accounts' : accounts, 'sessions' : sessions } print(infoo) # Handle credentials (i.e. so called "cookies") if request.url.path == '/disconnect': jid = request.cookies.get('jabber_id') if jid in accounts: del accounts[jid] if jid in sessions: del sessions[jid] response.delete_cookie('session_key') response.delete_cookie('jabber_id') else: try: # Access the variable from the request state jabber_id = request.app.state.jabber_id except Exception as e: print(request.cookies.get('jabber_id')) print(e) pass try: # Access the variable from the request state session_key = request.app.state.session_key except Exception as e: print(request.cookies.get('session_key')) print(e) pass if jabber_id and session_key: print(['Establishing a sessiong for:', jabber_id, session_key]) response.set_cookie(key='jabber_id', value=jabber_id) response.set_cookie(key='session_key', value=session_key) # del request.app.state.jabber_id # del request.app.state.session_key request.app.state.jabber_id = request.app.state.session_key = None return response # response.set_cookie(key='session', value=str(jid) + '/' + str(session_key)) # response.set_cookie(key='session', # value=jid + '/' + session_key, # expires=datetime.now().replace(tzinfo=timezone.utc) + timedelta(days=30), # max_age=3600, # domain='localhost', # path='/', # secure=True, # httponly=False, # True # samesite='lax') @self.app.exception_handler(401) def not_authorized_exception_handler(request: Request, exc: HTTPException): #jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) message = 'Not Authorized.' description = 'Not Authorized (401)' return result_post(request, description, message) @self.app.exception_handler(403) def access_denied_exception_handler(request: Request, exc: HTTPException): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) message = 'Blasta system message » Access denied.' description = 'Access denied (403)' path = 'error' return result_post(request, jabber_id, description, message, path) @self.app.exception_handler(404) def not_found_exception_handler(request: Request, exc: HTTPException): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) message = 'Blasta system message » Not Found.' description = 'Not found (404)' path = 'error' return result_post(request, jabber_id, description, message, path) @self.app.exception_handler(405) def not_allowed_exception_handler(request: Request, exc: HTTPException): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) message = 'Blasta system message » Method Not Allowed.' description = 'Not allowed (405)' path = 'error' return result_post(request, jabber_id, description, message, path) @self.app.exception_handler(500) def internal_error_exception_handler(request: Request, exc: HTTPException): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) message = 'Blasta system message » Internal Server Error.' description = 'Internal error (500)' path = 'error' return result_post(request, jabber_id, description, message, path) # TODO @self.app.get('/admin') def admin_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) authorized = None if authorized: template_file = 'connect.xhtml' template_dict = { 'request' : request, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response else: raise HTTPException(status_code=403, detail='Access denied') @self.app.get('/connect') def connect_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: response = RedirectResponse(url='/jid/' + jabber_id) else: template_file = 'connect.xhtml' template_dict = { 'request' : request, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/contact') def contact_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'contact.xhtml' template_dict = { 'contact_email' : contact_email, 'contact_irc_channel' : contact_irc_channel, 'contact_irc_server' : contact_irc_server, 'contact_mix' : contact_mix, 'contact_muc' : contact_muc, 'contact_xmpp' : contact_xmpp, 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/disconnect') def disconnect_get(request: Request, response: Response, jabber_id: str = Cookie(None), session_key: str = Cookie(None)): # response.set_cookie(max_age=0, value='', key='jabber_id') # response.set_cookie(max_age=0, value='', key='session_key') response = RedirectResponse(url='/') response.delete_cookie('session_key') response.delete_cookie('jabber_id') return response @self.app.get('/favicon.ico', include_in_schema=False) def favicon_get(): return FileResponse(directory_data_graphic_ico) @self.app.get('/help') def help_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'help.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/help/about') def help_about_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'about.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/help/about/folksonomy') def help_about_folksonomies_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'folksonomy.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/help/about/ideas') def help_about_ideas_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) protocol = request.url.scheme hostname = request.url.hostname + ':' + str(request.url.port) origin = protocol + '://' + hostname template_file = 'ideas.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal, 'origin' : origin} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/help/about/philosophy') def help_about_philosophy_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'philosophy.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/help/about/projects') def help_about_projects_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'projects.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/help/about/software') def help_about_software_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'software.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/help/about/thanks') def help_about_thanks_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'thanks.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/help/about/xmpp') def help_about_xmpp_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'xmpp.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/help/about/xmpp/atomsub') def help_about_xmpp_atomsub_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'atomsub.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/help/about/xmpp/libervia') def help_about_xmpp_libervia_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'libervia.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/help/about/xmpp/movim') def help_about_xmpp_movim_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'movim.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/help/about/xmpp/pubsub') def help_about_xmpp_pubsub_get(request: Request): date_now_iso = datetime.now().isoformat() date_now_readable = UtilitiesDate.convert_iso8601_to_readable(date_now_iso) jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'pubsub.xhtml' template_dict = { 'request' : request, 'date_now_iso' : date_now_iso, 'date_now_readable' : date_now_readable, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/help/feeds') def help_about_feeds_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'feeds.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/help/questions') def help_questions_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'questions.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/help/syndication') def help_syndication_get(request: Request): hostname = request.url.hostname + ':' + str(request.url.port) protocol = request.url.scheme origin = protocol + '://' + hostname jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'syndication.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal, 'origin' : origin, 'pubsub_jid' : jabber_id_pubsub} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/help/utilities') def help_utilities_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) hostname = request.url.hostname protocol = request.url.scheme hostname = request.url.netloc origin = protocol + '://' + hostname bookmarklet = 'location.href=`' + origin + '/save?url=${encodeURIComponent(window.location.href)}&title=${encodeURIComponent(document.title)}`;' template_file = 'utilities.xhtml' template_dict = { 'request' : request, 'bookmarklet' : bookmarklet, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/jid', response_class=HTMLResponse) @self.app.post('/jid') async def jid_get(request: Request, response : Response): node_type = 'public' path = 'jid' jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: xmpp_instance = accounts[jabber_id] node_id = nodes[node_type]['name'] result, reason = await UtilitiesData.update_cache_and_database( db_file, directory_cache, xmpp_instance, jabber_id, node_type, node_id) if result == 'error': message = f'XMPP system message » {reason}.' description = 'IQ Error' path = 'error' return result_post(request, jabber_id, description, message, path) else: response = await jid_main_get(request, node_type, path, jid=jabber_id) return response else: description = 'An XMPP account is required' message = 'Blasta system message » Please connect with your XMPP account to view this directory.' path = 'error' return result_post(request, jabber_id, description, message, path) @self.app.get('/jid/{jid}') @self.app.post('/jid/{jid}') async def jid_jid_get(request: Request, response : Response, jid): response = await jid_main_get(request, node_type='public', path='jid', jid=jid) return response async def jid_main_get(request: Request, node_type=None, path=None, jid=None): ask = invite = name = origin = start = '' # pubsub_jid = syndicate = jid # message = 'Find and share bookmarks with family and friends!' # description = f'Bookmarks of {jid}' max_count = 10 entries = None related_tags = None tags_dict = None param_filetype = request.query_params.get('filetype', '') or None param_mode = request.query_params.get('mode', '') or None param_page = request.query_params.get('page', '') or None param_protocol = request.query_params.get('protocol', '') or None param_query = request.query_params.get('q', '') or None if param_query: param_query = param_query.strip() param_tags = request.query_params.get('tags', '') or None param_tld = request.query_params.get('tld', '') or None if param_page and param_mode != 'feed': try: page = int(param_page) page_next = page + 1 page_prev = page - 1 except: page = 1 page_next = 2 else: page = 1 page_next = 2 page_prev = page - 1 index_first = (page - 1)*10 index_last = index_first+10 jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id == jid or node_type in ('private', 'read'): xmpp_instance = accounts[jabber_id] # NOTE You need something other than an iterator (XEP-0059). # You need a PubSub key that would hold tags. filename_items = os.path.join(directory_cache, 'items', jabber_id + '.toml') # NOTE Does it work? # It does not seem to actually filter tags. # NOTE Yes. It does work. # See function "cache_items_and_tags_filter". if param_query: query = param_query entries_cache = UtilitiesData.open_file_toml(filename_items) if node_type in entries_cache: entries_cache_node = entries_cache[node_type] filename_cache = os.path.join(directory_cache, 'data', jid + '_query.toml') UtilitiesData.cache_items_and_tags_search(directory_cache, entries_cache_node, jid, query) if os.path.exists(filename_cache) and os.path.getsize(filename_cache): data = UtilitiesData.open_file_toml(filename_cache) item_ids_all = data['item_ids'] related_tags = data['tags'] if len(item_ids_all) <= index_last: index_last = len(item_ids_all) page_next = None item_ids_selection = [] for item_id in item_ids_all[index_first:index_last]: item_ids_selection.append(item_id) entries = [] for entry in entries_cache_node: for item_id in item_ids_selection: if entry['url_hash'] == item_id: entries.append(entry) for entry in entries: entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(entry['published']) entry['tags'] = entry['tags'][:5] description = f'Your {node_type} bookmarks with "{query}"' message = f'Listing {node_type} bookmarks {index_first+1} - {index_last} out of {len(item_ids_all)}.' #item_id_next = entries[len(entries)-1] else: description = f'No {node_type} bookmarks with "{query}" were found for {jid}' message = 'Blasta system message » No entries.' page_next = None page_prev = None else: description = f'No {node_type} bookmarks with "{query}" were found for {jid}' message = 'Blasta system message » No entries.' page_next = None page_prev = None elif param_tags or param_tld or param_filetype or param_protocol: tags_list = param_tags.split('+') if len(tags_list) == 1: tag = param_tags entries_cache = UtilitiesData.open_file_toml(filename_items) if node_type in entries_cache: entries_cache_node = entries_cache[node_type] filename_cache = os.path.join(directory_cache, 'data', jid, tag + '.toml') UtilitiesData.cache_items_and_tags_filter(directory_cache, entries_cache_node, jid, tag) if os.path.exists(filename_cache) and os.path.getsize(filename_cache): data = UtilitiesData.open_file_toml(filename_cache) item_ids_all = data['item_ids'] related_tags = data['tags'] if len(item_ids_all) <= index_last: index_last = len(item_ids_all) page_next = None item_ids_selection = [] for item_id in item_ids_all[index_first:index_last]: item_ids_selection.append(item_id) entries = [] for entry in entries_cache_node: for item_id in item_ids_selection: if entry['url_hash'] == item_id: entries.append(entry) for entry in entries: entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(entry['published']) entry['tags'] = entry['tags'][:5] description = f'Your {node_type} bookmarks tagged with "{tag}"' message = f'Listing {node_type} bookmarks {index_first+1} - {index_last} out of {len(item_ids_all)}.' #item_id_next = entries[len(entries)-1] else: description = 'No {node_type} bookmarks tagged with "{tag}" were found for {jid}' message = 'Blasta system message » No entries.' page_next = None page_prev = None else: description = 'No {node_type} bookmarks tagged with "{tag}" were found for {jid}' message = 'Blasta system message » No entries.' page_next = None page_prev = None elif len(tag_list) > 1: pass #TODO Support multiple tags # if not param_tags and not param_tld and not param_filetype and not param_protocol and not param_url and not param_hash: else: name = jabber_id.split('@')[0] entries_cache = UtilitiesData.open_file_toml(filename_items) if node_type in entries_cache: entries_cache_node = entries_cache[node_type] filename_cache = os.path.join(directory_cache, 'data', jabber_id + '.toml') #if len(entries_cache_node) and not os.path.exists(filename_cache): UtilitiesData.cache_items_and_tags(directory_cache, entries_cache_node, jabber_id) if os.path.exists(filename_cache) and os.path.getsize(filename_cache): data = UtilitiesData.open_file_toml(filename_cache) item_ids_all = data['item_ids'] related_tags = data['tags'] if len(item_ids_all) <= index_last: index_last = len(item_ids_all) page_next = None item_ids_selection = [] for item_id in item_ids_all[index_first:index_last]: item_ids_selection.append(item_id) entries = [] for entry in entries_cache_node: for item_id in item_ids_selection: if entry['url_hash'] == item_id: entries.append(entry) for entry in entries: entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable( entry['published']) entry['tags'] = entry['tags'][:5] description = f'Your {node_type} bookmarks' message = f'Listing {node_type} bookmarks {index_first+1} - {index_last} out of {len(item_ids_all)}.' #item_id_next = entries[len(entries)-1] else: description = 'Your bookmarks directory appears to be empty' message = 'Blasta system message » Zero count.' start = True else: description = 'Your bookmarks directory appears to be empty' message = 'Blasta system message » Zero count.' start = True elif jabber_id in accounts: # NOTE Keep this IQ function call as an exception. # If one wants to see contents of someone else, an # authorization is required. # NOTE It might be wiser to use cached items or item identifiers # provided that the viewer is authorized to view items. xmpp_instance = accounts[jabber_id] tags_dict = {} if param_query: description = f'Bookmarks from {jid} with "{param_query}"' entries_database = DatabaseSQLite.get_entries_by_jid_and_query(db_file, jid, param_query, index_first) entries_count = DatabaseSQLite.get_entries_count_by_jid_and_query(db_file, jid, param_query) for tag, instances in DatabaseSQLite.get_30_tags_by_jid_and_query(db_file, jid, param_query, index_first): tags_dict[tag] = instances elif param_tags: description = f'Bookmarks from {jid} tagged with "{param_tags}"' entries_database = DatabaseSQLite.get_entries_by_jid_and_tag(db_file, jid, param_tags, index_first) entries_count = DatabaseSQLite.get_entries_count_by_jid_and_tag(db_file, jid, param_tags) for tag, instances in DatabaseSQLite.get_30_tags_by_jid_and_tag(db_file, jid, param_tags, index_first): tags_dict[tag] = instances else: description = f'Bookmarks from {jid}' entries_database = DatabaseSQLite.get_entries_by_jid(db_file, jid, index_first) entries_count = DatabaseSQLite.get_entries_count_by_jid(db_file, jid) for tag, instances in DatabaseSQLite.get_30_tags_by_jid(db_file, jid, index_first): tags_dict[tag] = instances if not entries_database: #message = 'Blasta system message » Error: No entries were found.' #description = 'No results' #path = 'error' #return result_post(request, jabber_id, description, message, path) raise HTTPException(status_code=404, detail='No entries were found') if entries_count: entries = [] for entry in entries_database: tags_sorted = [] for tag in DatabaseSQLite.get_tags_by_entry_id(db_file, entry[0]): tags_sorted.append(tag[0]) entry_jid = DatabaseSQLite.get_jid_by_jid_id(db_file, entry[5]) url_hash = UtilitiesCryptography.hash_url_to_md5(entry[2]) instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash) entries.append( {'title' : entry[3], 'link' : entry[2], 'summary' : entry[4], 'published' : entry[6], 'updated' : entry[7], 'tags' : tags_sorted, 'url_hash' : url_hash, 'jid' : entry_jid, 'name' : entry_jid, # jid.split('@')[0] if '@' in jid else jid, 'instances' : instances}) for entry in entries: try: date_iso = entry['published'] date_wrt = UtilitiesDate.convert_iso8601_to_readable(date_iso) entry['published_mod'] = date_wrt except: print('ERROR: Probably due to an attempt to convert a non ISO 8601.') print(entry['published']) print(entry['published_mod']) print(entry) index_last = index_first+len(entries_database) if entries_count <= index_last: index_last = entries_count page_next = None message = f'Listing bookmarks {index_first+1} - {index_last} out of {entries_count}.' else: # TODO Check permission, so there is no unintended continuing to cached data which is not authorized for. iq = await XmppPubsub.get_node_item_ids(xmpp_instance, jid, node_public_id) if isinstance(iq, Iq): iq_items_remote = iq['disco_items'] # Cache a list of identifiers of node items to a file. iq_items_remote_name = [] for iq_item_remote in iq_items_remote: iq_item_remote_name = iq_item_remote['name'] iq_items_remote_name.append(iq_item_remote_name) #data_item_ids = {'iq_items' : iq_items_remote_name} #filename_item_ids = 'item_ids/' + jid + '.toml' #Data.save_to_toml(filename_item_ids, data_item_ids) item_ids_all = iq_items_remote_name #item_ids_all = data['item_ids'] #related_tags = data['tags'] if len(item_ids_all) <= index_last: page_next = None index_last = len(item_ids_all) item_ids_selection = [] for item_id in item_ids_all[index_first:index_last]: item_ids_selection.append(item_id) iq = await XmppPubsub.get_node_items(xmpp_instance, jid, node_public_id, item_ids_selection) entries = UtilitiesData.extract_iq_items_extra(db_file, iq, jid) if entries: for entry in entries: entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(entry['published']) message = f'Listing bookmarks {index_first+1} - {index_last} out of {len(item_ids_all)}.' description = f'Bookmarks from {jid}' else: message = 'Blasta system message » Zero count.' description = 'Bookmarks directory appears to be empty' invite = True else: message = f'XMPP system message » {iq}.' name = jid.split('@')[0] path = 'error' if not iq: message = 'XMPP system message » Empty.' description = 'An unknown error has occurred' invite = True elif iq == 'Item not found': description = 'Bookmarks directory appears to be empty' invite = True elif iq == 'forbidden': description = 'Access forbidden' elif iq == 'item-not-found': description = 'Jabber ID does not appear to be exist' elif iq == 'not-authorized': description = 'You have no authorization to view ' + name + '\'s bookmarks.' ask = True elif iq == 'Node not found': description = name + '\'s bookmarks directory appears to be empty.' invite = True elif 'DNS lookup failed' in iq: domain = jid.split('@')[1] if '@' in jid else jid description = f'Blasta could not connect to server {domain}' elif iq == 'Connection failed: connection refused': description = 'Connection with ' + name + ' has been refused' elif 'Timeout' in iq or 'timeout' in iq: description = 'Connection with ' + name + ' has been timed out' else: breakpoint() description = 'An unknown error has occurred' if invite: hostname = request.url.hostname + ':' + str(request.url.port) protocol = request.url.scheme origin = protocol + '://' + hostname template_file = 'ask.xhtml' template_dict = { 'request': request, 'ask' : ask, 'alias' : jabber_id.split('@')[0], 'description': description, 'invite' : invite, 'jabber_id': jabber_id, 'jid': jid, 'journal': journal, 'message': message, 'name': name, 'origin': origin, 'path': path} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response else: description = 'An XMPP account is required' message = 'Blasta system message » Please connect with your XMPP account to view this directory.' path = 'error' return result_post(request, jabber_id, description, message, path) #if not entries: raise HTTPException(status_code=404, detail='No entries were found') template_dict = { 'request': request, 'description': description, 'entries': entries, 'jabber_id': jabber_id, 'jid': jid, 'journal': journal, 'message': message, 'node_type': node_type, 'page_next': page_next, 'page_prev': page_prev, 'pager' : True, 'param_query' : param_query, 'param_tags': param_tags, 'path': path, 'pubsub_jid': jid, 'node_id': nodes[node_type]['name'], 'start': start, 'syndicate': jid, 'tags' : tags_dict or related_tags or ''} if param_mode == 'feed': template_file = 'browse.atom' response = templates.TemplateResponse(template_file, template_dict) response.headers["Content-Type"] = "application/xml" else: template_file = 'browse.xhtml' response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/blasta.svg') def logo_get(): return FileResponse(directory_data_graphic_svg) @self.app.get('/', response_class=HTMLResponse) @self.app.get('/new', response_class=HTMLResponse) async def root_get_new(request: Request, response : Response): response = await root_main_get(request, response, page_type='new') return response @self.app.get('/popular', response_class=HTMLResponse) async def root_get_popular(request: Request, response : Response): response = await root_main_get(request, response, page_type='popular') return response @self.app.get('/query', response_class=HTMLResponse) async def root_get_query(request: Request, response : Response): response = await root_main_get(request, response, page_type='query') return response @self.app.get('/recent', response_class=HTMLResponse) async def root_get_recent(request: Request, response : Response): response = await root_main_get(request, response, page_type='recent') return response async def root_main_get(request: Request, response : Response, page_type=None): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) node_id = path = syndicate = page_type param_filetype = request.query_params.get('filetype', '') or None param_mode = request.query_params.get('mode', '') or None param_page = request.query_params.get('page', '') or None param_protocol = request.query_params.get('protocol', '') or None param_query = request.query_params.get('q', '') or None if param_query: param_query = param_query.strip() param_tags = request.query_params.get('tags', '') or None param_tld = request.query_params.get('tld', '') or None if param_page and param_mode != 'feed': try: page = int(param_page) page_next = page + 1 page_prev = page - 1 except: page = 1 page_next = 2 else: page = 1 page_next = 2 page_prev = page - 1 index_first = (page - 1)*10 if param_tags or param_tld or param_filetype or param_protocol: entries_count = DatabaseSQLite.get_entries_count_by_tag(db_file, param_tags) match page_type: case 'new': description = f'New bookmarks tagged with "{param_tags}"' entries_database = DatabaseSQLite.get_entries_new_by_tag(db_file, param_tags, index_first) tags_of_entries = DatabaseSQLite.get_30_tags_by_entries_new_by_tag(db_file, param_tags, index_first) case 'popular': description = f'Popular bookmarks tagged with "{param_tags}"' # 'Most popular' entries_database = DatabaseSQLite.get_entries_popular_by_tag(db_file, param_tags, index_first) tags_of_entries = DatabaseSQLite.get_30_tags_by_entries_popular_by_tag(db_file, param_tags, index_first) case 'recent': description = f'Recent bookmarks tagged with "{param_tags}"' entries_database = DatabaseSQLite.get_entries_recent_by_tag(db_file, param_tags, index_first) tags_of_entries = DatabaseSQLite.get_30_tags_by_entries_recent_by_tag(db_file, param_tags, index_first) # TODO case 'query': else: match page_type: case 'new': description = 'New bookmarks' entries_database = DatabaseSQLite.get_entries_new(db_file, index_first) tags_of_entries = DatabaseSQLite.get_30_tags_by_entries_new(db_file, index_first) entries_count = DatabaseSQLite.get_entries_count(db_file) case 'popular': description = 'Popular bookmarks' # 'Most popular' entries_database = DatabaseSQLite.get_entries_popular(db_file, index_first) tags_of_entries = DatabaseSQLite.get_30_tags_by_entries_popular(db_file, index_first) entries_count = DatabaseSQLite.get_entries_count(db_file) case 'query': node_id = syndicate = 'new' description = f'Posted bookmarks with "{param_query}"' entries_database = DatabaseSQLite.get_entries_by_query(db_file, param_query, index_first) tags_of_entries = DatabaseSQLite.get_30_tags_by_entries_by_query_recent(db_file, param_query, index_first) entries_count = DatabaseSQLite.get_entries_count_by_query(db_file, param_query) case 'recent': description = 'Recent bookmarks' entries_database = DatabaseSQLite.get_entries_recent(db_file, index_first) tags_of_entries = DatabaseSQLite.get_30_tags_by_entries_recent(db_file, index_first) entries_count = DatabaseSQLite.get_entries_count(db_file) if not entries_database: #message = 'Blasta system message » Error: No entries were found.' #description = 'No results' #path = 'error' #return result_post(request, jabber_id, description, message, path) raise HTTPException(status_code=404, detail='No entries were found') tags_dict = {} #for tag, instances in DatabaseSQLite.get_tags_30(db_file): for tag, instances in tags_of_entries: tags_dict[tag] = instances entries = [] for entry in entries_database: tags_sorted = [] for tag in DatabaseSQLite.get_tags_by_entry_id(db_file, entry[0]): tags_sorted.append(tag[0]) jid = DatabaseSQLite.get_jid_by_jid_id(db_file, entry[5]) url_hash = UtilitiesCryptography.hash_url_to_md5(entry[2]) instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash) entries.append( {'title' : entry[3], 'link' : entry[2], 'summary' : entry[4], 'published' : entry[6], 'updated' : entry[7], 'tags' : tags_sorted, 'url_hash' : url_hash, #entry[1] 'jid' : jid, 'name' : jid, # jid.split('@')[0] if '@' in jid else jid, 'instances' : instances}) for entry in entries: try: date_iso = entry['published'] date_wrt = UtilitiesDate.convert_iso8601_to_readable(date_iso) entry['published_mod'] = date_wrt except: print('ERROR: Probably due to an attempt to convert a non ISO 8601.') print(entry['published']) print(entry['published_mod']) print(entry) index_last = index_first+len(entries_database) if entries_count <= index_last: # NOTE Did you forget to modify index_last? # NOTE No. It appears that it probably not needed index_last = entries_count page_next = None #if page_type != 'new' or page_prev or param_tags or param_tld or param_filetype or param_protocol: if request.url.path != '/' or request.url.query: message = f'Listing bookmarks {index_first+1} - {index_last} out of {entries_count}.' message_link = None else: message = ('Welcome to Blasta, an XMPP PubSub oriented social ' 'bookmarks manager for organizing online content.') message_link = {'href' : '/help/about', 'text' : 'Learn more'} template_dict = { 'request' : request, 'description' : description, 'entries' : entries, 'jabber_id' : jabber_id, 'journal' : journal, 'message' : message, 'message_link' : message_link, 'node_id' : node_id, 'page_next' : page_next, 'page_prev' : page_prev, 'page_type' : page_type, 'pager' : True, 'param_query' : param_query, 'param_tags' : param_tags, 'path' : path, 'pubsub_jid' : jabber_id_pubsub, 'syndicate' : syndicate, 'tags' : tags_dict} if param_mode == 'feed': # NOTE Consider scheme "feed" in order to prompt news # reader 'feed://' + request.url.netloc + request.url.path template_file = 'browse.atom' response = templates.TemplateResponse(template_file, template_dict) response.headers["Content-Type"] = "application/xml" else: template_file = 'browse.xhtml' response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response """ # TODO Return to code /tag and / (root) once SQLite database is ready. @self.app.get('/tag/{tag}') async def tag_tag_get(request: Request, tag): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) node_id = f'tag:{tag}' syndicate = f'?tag={tag}' path = 'tag' # NOTE Perhaps it would be beneficial to retrieve "published" and # tags ("category") of viewer to override the tags of Blasta # TODO If URL exist in visitor's bookmarks, display its properties # (summary, tags title etc.) before data of others. # if UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request): page = request.query_params.get('page', '') or None if page: try: page = int(page) page_next = page + 1 page_prev = page - 1 except: page = 1 page_next = 2 else: page = 1 page_next = 2 page_prev = page - 1 index_first = (page - 1)*10 index_last = index_first+10 tags_dict = {} for entry in entries_database: for entry_tag in entry['tags']: if entry_tag in tags_dict: tags_dict[entry_tag] = tags_dict[entry_tag]+1 else: tags_dict[entry_tag] = 1 tags_dict = dict(sorted(tags_dict.items(), key=lambda item: (-item[1], item[0]))) tags_dict = dict(list(tags_dict.items())[:30]) #tags_dict = dict(sorted(tags_dict.items(), key=lambda item: (-item[1], item[0]))[:30]) print(tags_dict) entries = [] for entry in entries_database: if tag in entry['tags']: entries.append(entry) for entry in entries: # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # """ @self.app.post('/', response_class=HTMLResponse) async def root_post(request: Request, response: Response, jabber_id: str = Form(...), password: str = Form(...)): if not UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request): # Store a variable in the request's state request.app.state.jabber_id = jabber_id session_key = str(random.random()) request.app.state.session_key = session_key accounts[jabber_id] = XmppInstance(jabber_id + '/blasta', password) # accounts[jabber_id].authenticated # dir(accounts[jabber_id]) # accounts[jabber_id].failed_auth # accounts[jabber_id].event_when_connected sessions[jabber_id] = session_key # Check if the user and password are present and valid # If not valid, return "Could not connect to JID" # FIXME Instead of an arbitrary number (i.e. 5 seconds), write a # while loop with a timeout of 10 seconds. # Check whether an account is connected. # Wait for 5 seconds to connect. await asyncio.sleep(5) #if jabber_id in accounts and accounts[jabber_id].connection_accepted: if jabber_id in accounts: xmpp_instance = accounts[jabber_id] #await xmpp_instance.plugin['xep_0060'].delete_node(jabber_id, node_public_id) for node_properties in nodes: properties = nodes[node_properties] if not await XmppPubsub.is_node_exist(xmpp_instance, properties['name']): iq = XmppPubsub.create_node_atom( xmpp_instance, jabber_id, properties['name'], properties['title'], properties['subtitle'], properties['access_model']) await XmppIq.send(iq, 15) #await XmppPubsub.set_node_private(xmpp_instance, node_private_id) #await XmppPubsub.set_node_private(xmpp_instance, node_read_id) #configuration_form = await xmpp_instance['xep_0060'].get_node_config(jabber_id, properties['name']) #print(configuration_form) node_id = nodes['public']['name'] result, reason = await UtilitiesData.update_cache_and_database( db_file, directory_cache, xmpp_instance, jabber_id, 'public', node_id) if result == 'error': message = f'XMPP system message » {reason}.' description = 'IQ Error' path = 'error' return result_post(request, jabber_id, description, message, path) else: iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, node_settings_id, 'routine') routine = None if isinstance(iq, Iq): payload = iq['pubsub']['items']['item']['payload'] if payload: xmlns = '{jabber:x:data}' element_value = payload.find('.//' + xmlns + 'field[@var="routine"]/' + xmlns + 'value') if isinstance(element_value, ET.Element): routine = element_value.text match routine: case 'private': response = RedirectResponse(url='/private') case 'read': response = RedirectResponse(url='/read') case _: response = RedirectResponse(url='/jid/') else: #del accounts[jabber_id] #del sessions[jabber_id] message = 'Blasta system message » Authorization has failed.' description = 'Connection has failed' path = 'error' return result_post(request, jabber_id, description, message, path) return response @self.app.post('/message') async def message_post(request: Request, jid: str = Form(...), body: str = Form(...)): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: #headline = 'This is a message from Blasta' xmpp_instance = accounts[jabber_id] XmppMessage.send(xmpp_instance, jid, body) alias = jid.split('@')[0] message = f'Your message has been sent to {alias}.' description = 'Message has been sent' path = 'message' else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' return result_post(request, jabber_id, description, message, path) @self.app.get('/now') def now_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'now.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/private', response_class=HTMLResponse) @self.app.post('/private') async def private_get(request: Request, response : Response): node_type = 'private' path = 'private' jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: xmpp_instance = accounts[jabber_id] node_id = nodes[node_type]['name'] result, reason = await UtilitiesData.update_cache_and_database( db_file, directory_cache, xmpp_instance, jabber_id, node_type, node_id) if result == 'error': message = f'Blasta system message » {reason}.' description = 'Directory "private" appears to be empty' path = 'error' return result_post(request, jabber_id, description, message, path) else: response = await jid_main_get(request, node_type, path) return response else: description = 'An XMPP account is required' message = 'Blasta system message » Please connect with your XMPP account to view this directory.' path = 'error' return result_post(request, jabber_id, description, message, path) @self.app.get('/profile') async def profile_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: xmpp_instance = accounts[jabber_id] if not await XmppPubsub.is_node_exist(xmpp_instance, node_settings_id): iq = XmppPubsub.create_node_config(xmpp_instance, jabber_id, node_settings_id) await XmppIq.send(iq, 15) access_models = {} for node_type in nodes: node_id = nodes[node_type]['name'] iq = await XmppPubsub.get_node_configuration(xmpp_instance, jabber_id, node_id) if isinstance(iq, Iq): access_model = iq['pubsub_owner']['configure']['form']['values']['pubsub#access_model'] access_models[node_type] = access_model settings = {} for setting in ['enrollment', 'routine']: iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, node_settings_id, setting) if isinstance(iq, Iq): payload = iq['pubsub']['items']['item']['payload'] if payload: xmlns = '{jabber:x:data}' element_value = payload.find('.//' + xmlns + 'field[@var="' + setting + '"]/' + xmlns + 'value') if isinstance(element_value, ET.Element): settings[setting] = element_value.text template_file = 'profile.xhtml' template_dict = { 'access_models' : access_models, 'enroll' : settings['enrollment'] if 'enrollment' in settings else None, 'request' : request, 'routine' : settings['routine'] if 'routine' in settings else None, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' return result_post(request, jabber_id, description, message, path) return response @self.app.post('/profile') async def profile_post(request: Request, routine: str = Form(None), enroll: str = Form(None)): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: xmpp_instance = accounts[jabber_id] if routine: message = f'The routine directory has been set to {routine}' payload = DataForm.create_setting_entry(xmpp_instance, 'routine', routine) iq = await XmppPubsub.publish_node_item( # NOTE Consider "configurations" as item ID (see Movim) xmpp_instance, jabber_id, node_settings_id, 'routine', payload) if enroll: if enroll == '1': message = 'Your database is shared with the Blasta system' else: message = 'Your database is excluded from the Blasta system' payload = DataForm.create_setting_entry(xmpp_instance, 'enroll', enroll) iq = await XmppPubsub.publish_node_item( xmpp_instance, jabber_id, node_settings_id, 'enrollment', payload) description = 'Setting has been saved' template_file = 'result.xhtml' template_dict = { 'description' : description, 'enroll' : enroll, 'jabber_id' : jabber_id, 'journal' : journal, 'message' : message, 'request' : request, 'routine' : routine} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' return result_post(request, jabber_id, description, message, path) return response @self.app.get('/profile/export/{node_type}/{filetype}') async def profile_export_get(request: Request, node_type, filetype): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: xmpp_instance = accounts[jabber_id] node_id = nodes[node_type]['name'] iq = await XmppPubsub.get_node_items(xmpp_instance, jabber_id, node_id) if isinstance(iq, Iq): entries = UtilitiesData.extract_iq_items(iq, jabber_id) # TODO Append a bookmark or bookmarks of Blasta if entries: filename = os.path.join(directory_cache, 'export', jabber_id + '_' + node_type + '.' + filetype) #filename = 'export/' + jabber_id + '_' + node_type + '.' + filetype #filename = f'export/{jabber_id}_{node_type}.{filetype}' #filename = 'export_' + node_type + '/' + jabber_id + '_' + '.' + filetype #filename = f'export_{node_type}/{jabber_id}.{filetype}' match filetype: case 'json': UtilitiesData.save_to_json(filename, entries) case 'toml': # NOTE Should the dict be named with 'entries' or 'private'/'public'/'read'? data = {'entries' : entries} UtilitiesData.save_to_toml(filename, data) response = FileResponse(filename) else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' return result_post(request, jabber_id, description, message, path) return response @self.app.post('/profile/import') # def profile_import_post(file: UploadFile = File(...)): async def profile_import_post(request: Request, file: UploadFile | None = None, merge: str = Form(None), node: str = Form(...), override: str = Form(None)): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: xmpp_instance = accounts[jabber_id] if file: # TODO If node does not exist, redirect to result page with # a message that bookmarks are empty. # NOTE No. node_id = nodes[node]['name'] node_title = nodes[node]['title'] node_subtitle = nodes[node]['subtitle'] node_access_model = nodes[node]['access_model'] if not await XmppPubsub.is_node_exist(xmpp_instance, node_id): iq = XmppPubsub.create_node_atom( xmpp_instance, jabber_id, node_id, node_title, node_subtitle, node_access_model) await XmppIq.send(iq, 15) #return {"filename": file.filename} content = file.file.read().decode() # TODO Add match/case for filetype. filename = os.path.splitext(file.filename) file_extension = filename[len(filename)-1] match file_extension: case '.html': entries = UtilitiesData.load_data_netscape(content) case '.toml': entries = UtilitiesData.load_data_toml(content) case _: message = 'Blasta system message » Error: Unknown file type.' description = 'Import error' path = 'error' return result_post(request, jabber_id, description, message, path) # entries_node = entries[node] #breakpoint() #for entry in entries: print(entry) name = jabber_id.split('@')[0] # timestamp = datetime.now().isoformat() counter = 0 for entry_type in entries: for entry in entries[entry_type]: url_hash = item_id = UtilitiesCryptography.hash_url_to_md5(entry['link']) instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash) entry_new = { 'title' : entry['title'], 'link' : entry['link'], 'summary' : entry['summary'], 'published' : entry['published'], 'updated' : entry['published'], #'updated' : entry['updated'], 'tags' : entry['tags'], 'url_hash' : url_hash, 'jid' : jabber_id, 'name' : name, 'instances' : instances} xmpp_instance = accounts[jabber_id] payload = UtilitiesSyndication.create_rfc4287_entry(entry_new) iq = await XmppPubsub.publish_node_item( xmpp_instance, jabber_id, node_id, item_id, payload) #await XmppIq.send(iq, 15) counter += 1 message = f'Blasta system message » Imported {counter} items.' description = 'Import successful' path = 'profile' return result_post(request, jabber_id, description, message, path) else: message = 'Blasta system message » Error: No upload file sent.' description = 'Import error' path = 'error' return result_post(request, jabber_id, description, message, path) else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' return result_post(request, jabber_id, description, message, path) @self.app.get('/save') async def save_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: routine = None xmpp_instance = accounts[jabber_id] param_url = request.query_params.get('url', '') url_hash = UtilitiesCryptography.hash_url_to_md5(param_url) for node_type in nodes: node_id = nodes[node_type]['name'] iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, node_id, url_hash) #if len(iq['pubsub']['items']): if (isinstance(iq, Iq) and url_hash == iq['pubsub']['items']['item']['id']): return RedirectResponse(url='/url/' + url_hash + '/edit') iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, node_settings_id, 'routine') if isinstance(iq, Iq): payload = iq['pubsub']['items']['item']['payload'] if payload: xmlns = '{jabber:x:data}' element_value = payload.find('.//' + xmlns + 'field[@var="routine"]/' + xmlns + 'value') if isinstance(element_value, ET.Element): routine = element_value.text # else: # routine = None # NOTE Is "message" missing? description = 'Add a new bookmark' # 'Enter properties for a bookmark' param_title = request.query_params.get('title', '') param_tags = request.query_params.get('tags', '') param_summary = request.query_params.get('summary', '') path = 'save' if request.query_params: message = message_link = None else: message = 'For greater ease, you migh want to try our' message_link = {'href' : '/help/utilities#buttons', 'text' : 'bookmarklets'} template_file = 'edit.xhtml' template_dict = { 'request' : request, 'description' : description, 'jabber_id' : jabber_id, 'journal' : journal, 'message' : message, 'message_link' : message_link, 'path' : path, 'routine' : routine, 'summary' : param_summary, 'tags' : param_tags, 'title' : param_title, 'url' : param_url} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' return result_post(request, jabber_id, description, message, path) return response @self.app.post('/save') async def save_post(request: Request, node: str = Form(...), summary: str = Form(''), tags: str = Form(''), title: str = Form(...), url: str = Form(...)): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: xmpp_instance = accounts[jabber_id] url_hash = UtilitiesCryptography.hash_url_to_md5(url) for node_type in nodes: node_id = nodes[node_type]['name'] iq = await XmppPubsub.get_node_item( xmpp_instance, jabber_id, node_id, url_hash) if (isinstance(iq, Iq) and url_hash == iq['pubsub']['items']['item']['id']): return RedirectResponse(url='/url/' + url_hash + '/edit') description = 'Confirm properties of a bookmark' path = 'save' published = datetime.now().isoformat() template_file = 'edit.xhtml' template_dict = { 'request' : request, 'confirm' : True, 'description' : description, 'jabber_id' : jabber_id, 'journal' : journal, 'node' : node, 'path' : path, 'published' : published, 'summary' : summary, 'tags' : tags, 'title' : title, 'url' : url, 'url_hash' : url_hash} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' return result_post(request, jabber_id, description, message, path) @self.app.get('/read', response_class=HTMLResponse) @self.app.post('/read') async def read_get(request: Request, response : Response): node_type = 'read' path = 'read' jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: xmpp_instance = accounts[jabber_id] node_id = nodes[node_type]['name'] result, reason = await UtilitiesData.update_cache_and_database( db_file, directory_cache, xmpp_instance, jabber_id, node_type, node_id) if result == 'error': message = f'Blasta system message » {reason}.' description = 'Directory "read" appears to be empty' path = 'error' return result_post(request, jabber_id, description, message, path) else: response = await jid_main_get(request, node_type, path) return response else: description = 'An XMPP account is required' message = 'Blasta system message » Please connect with your XMPP account to view this directory.' path = 'error' return result_post(request, jabber_id, description, message, path) def result_post(request: Request, jabber_id: str, description: str, message: str, path: str, http_code=None): template_file = 'result.xhtml' template_dict = { 'description' : description, 'jabber_id' : jabber_id, 'journal' : journal, 'message' : message, 'path' : path, 'request' : request} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/register') def register_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'register.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/rss') def rss(request: Request): return RedirectResponse(url='/help/syndication') @self.app.get('/search') async def search_get(request: Request): response = RedirectResponse(url='/search/all') return response @self.app.get('/search/all') async def search_all_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) description = 'Search for public bookmarks' form_action = '/query' input_id = input_name = label_for = 'q' input_placeholder = 'Enter a search query.' input_type = 'search' label = 'Search' message = 'Search for bookmarks in the Blasta system.' path = 'all' template_file = 'search.xhtml' template_dict = { 'request' : request, 'description' : description, 'form_action' : form_action, 'input_id' : input_id, 'input_name' : input_name, 'input_placeholder' : input_placeholder, 'input_type' : input_type, 'label' : label, 'label_for' : label_for, 'jabber_id' : jabber_id, 'journal' : journal, 'message' : message, 'path' : path} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/search/jid/{jid}') async def search_jid_get(request: Request, jid): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: if jabber_id == jid: description = 'Search your own bookmarks' message = 'Search for bookmarks from your own directory.' else: description = f'Search bookmarks of {jid}' message = 'Search for bookmarks of a given Jabber ID.' form_action = '/jid/' + jid input_id = input_name = label_for = 'q' input_placeholder = 'Enter a search query.' input_type = 'search' label = 'Search' path = 'jid' template_file = 'search.xhtml' template_dict = { 'request' : request, 'description' : description, 'form_action' : form_action, 'input_id' : input_id, 'input_name' : input_name, 'input_placeholder' : input_placeholder, 'input_type' : input_type, 'jabber_id' : jabber_id, 'jid' : jid, 'label' : label, 'label_for' : label_for, 'journal' : journal, 'message' : message, 'path' : path} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' else: response = RedirectResponse(url='/search/all') return response @self.app.get('/search/url') async def search_url_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) description = 'Search for a bookmark' form_action = None # This is not relevant due to function middleware. Maybe / or /url. input_id = input_name = label_for = 'url' input_placeholder = 'Enter a URL.' input_type = 'url' label = 'URL' message = 'Search for a bookmark by a URL.' path = 'url' template_file = 'search.xhtml' template_dict = { 'request' : request, 'description' : description, # 'form_action' : form_action, 'input_id' : input_id, 'input_name' : input_name, 'input_placeholder' : input_placeholder, 'input_type' : input_type, 'label' : label, 'label_for' : label_for, 'jabber_id' : jabber_id, 'journal' : journal, 'message' : message, 'path' : path} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/tag') def tag_get(request: Request): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) tag_list = DatabaseSQLite.get_tags_500(db_file) message = 'Common 500 tags sorted by name and sized by commonality.' description = 'Common tags' template_file = 'tag.xhtml' template_dict = { 'request' : request, 'description' : description, 'jabber_id' : jabber_id, 'journal' : journal, 'message' : message, 'tag_list' : tag_list} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/tag/{jid}') def tag_get_jid(request: Request, jid): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) # NOTE Consider retrieval of tags from cache file. # This is relevant to private and read nodes. #if jabber_id == jid or node_type in ('private', 'read'): tag_list = DatabaseSQLite.get_500_tags_by_jid_sorted_by_name(db_file, jid) message = 'Common 500 tags sorted by name and sized by commonality.' description = f'Common tags of {jid}' template_file = 'tag.xhtml' template_dict = { 'request' : request, 'description' : description, 'jabber_id' : jabber_id, 'jid' : jid, 'journal' : journal, 'message' : message, 'tag_list' : tag_list} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response @self.app.get('/url') async def url_get(request: Request): response = RedirectResponse(url='/search/url') return response @self.app.get('/url/{url_hash}') async def url_hash_get(request: Request, url_hash): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) node_id = f'hash:{url_hash}' param_hash = url_hash syndicate = path = 'url' entries = [] exist = False if len(url_hash) == 32: if jabber_id: xmpp_instance = accounts[jabber_id] for node in nodes: node_id = nodes[node]['name'] iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, node_id, url_hash) if isinstance(iq, Iq): # TODO If URL exist in visitor's bookmarks, display its properties (summary, tags title etc.) before data of others. iq_item = iq['pubsub']['items']['item'] item_payload = iq_item['payload'] if item_payload: exist = True break else: message = f'XMPP system message » Error: {iq}.' description = 'The requested bookmark could not be retrieved' path = 'error' return result_post(request, jabber_id, description, message, path) if exist: # TODO Perhaps adding a paragraph with "your tags" and "who else has tagged this link" # and keep the (5 item) limit. #entry = UtilitiesSyndication.extract_items(item_payload) # NOTE Display only 5 items, as all the other tags appear at the list of "Related tags". entry = UtilitiesSyndication.extract_items(item_payload, limit=True) if entry: #url_hash = iq_item['id'] url_hash = UtilitiesCryptography.hash_url_to_md5(entry['link']) # TODO Add a check: if iq_item['id'] == url_hash: instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash) entry['instances'] = instances entry['jid'] = jabber_id name = jabber_id.split('@')[0] entry['name'] = name entry['url_hash'] = url_hash entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(entry['published']) #entry['tags'] = entry['tags'][:5] entries.append(entry) tags_list = {} tags_and_instances = DatabaseSQLite.get_tags_and_instances_by_url_hash(db_file, url_hash) for tag, tag_instances in tags_and_instances: tags_list[tag] = tag_instances else: # NOTE Is it possible to activate this else statement? Consider removal. # https://fastapi.tiangolo.com/tutorial/handling-errors/ #raise HTTPException(status_code=404, detail="Item not found") message = 'Blasta system message » Error: Not found.' description = 'The requested bookmark does not exist' path = 'error' return result_post(request, jabber_id, description, message, path) return response else: entry = DatabaseSQLite.get_entry_by_url_hash(db_file, url_hash) tags_sorted = [] if entry: for tag in DatabaseSQLite.get_tags_by_entry_id(db_file, entry[0]): tags_sorted.append(tag[0]) tags_list = {} tags_and_instances = DatabaseSQLite.get_tags_and_instances_by_entry_id(db_file, entry[0]) for tag, tag_instances in tags_and_instances: tags_list[tag] = tag_instances jid = DatabaseSQLite.get_jid_by_jid_id(db_file, entry[5]) instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash) entries.append( {'title' : entry[3], 'link' : entry[2], 'summary' : entry[4], 'published' : entry[6], 'published_mod' : UtilitiesDate.convert_iso8601_to_readable(entry[6]), 'updated' : entry[7], 'tags' : tags_sorted, 'url_hash' : url_hash, 'jid' : jid, 'name' : jid, # jid.split('@')[0] if '@' in jid else jid, 'instances' : instances}) # message = f'XMPP system message » {iq}.' # if iq == 'Node not found': # description = 'An error has occurred' # else: # description = 'An unknown error has occurred' else: # https://fastapi.tiangolo.com/tutorial/handling-errors/ #raise HTTPException(status_code=404, detail="Item not found") message = 'Blasta system message » Error: Not found.' description = 'The requested bookmark does not exist' path = 'error' return result_post(request, jabber_id, description, message, path) return response else: entry = DatabaseSQLite.get_entry_by_url_hash(db_file, url_hash) if entry: tags_sorted = [] for tag in DatabaseSQLite.get_tags_by_entry_id(db_file, entry[0]): tags_sorted.append(tag[0]) tags_list = {} tags_and_instances = DatabaseSQLite.get_tags_and_instances_by_entry_id(db_file, entry[0]) for tag, tag_instances in tags_and_instances: tags_list[tag] = tag_instances jid = DatabaseSQLite.get_jid_by_jid_id(db_file, entry[5]) instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash) entries.append( {'title' : entry[3], 'link' : entry[2], 'summary' : entry[4], 'published' : entry[6], 'published_mod' : UtilitiesDate.convert_iso8601_to_readable(entry[6]), 'updated' : entry[7], 'tags' : tags_sorted, 'url_hash' : url_hash, 'jid' : jid, 'name' : jid, # jid.split('@')[0] if '@' in jid else jid, 'instances' : instances}) else: # https://fastapi.tiangolo.com/tutorial/handling-errors/ #raise HTTPException(status_code=404, detail="Item not found") message = 'Blasta system message » Error: Not found.' description = 'The requested bookmark does not exist' path = 'error' return result_post(request, jabber_id, description, message, path) message = f'Information for URI {entries[0]["link"]}' # entry[2] if not instances: instances = 0 if instances > 1: description = 'Discover new resources and see who shares them' template_file = 'people.xhtml' people_list = {} jids_and_tags = DatabaseSQLite.get_jids_and_tags_by_url_hash(db_file, url_hash) for jid, tag in jids_and_tags: if jid in people_list and isinstance(people_list[jid], list): people_list[jid].append(tag) else: people_list[jid] = [tag] else: people_list = None description = 'Resource properties' template_file = 'browse.xhtml' template_dict = { 'request' : request, 'description' : description, 'entries' : entries, 'exist' : exist, 'jabber_id' : jabber_id, 'journal' : journal, 'message' : message, 'node_id' : node_id, 'param_hash' : param_hash, 'path' : path, 'people' : people_list, 'pubsub_jid' : jabber_id_pubsub, 'syndicate' : syndicate, 'tags' : tags_list} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' else: message = 'Blasta system message » Error: MD5 message-digest algorithm.' description = 'The argument for URL does not appear to be a valid MD5 Checksum' path = 'error' return result_post(request, jabber_id, description, message, path) return response @self.app.post('/url/{url_hash}') async def url_hash_post(request: Request, url_hash, node: str = Form(...), published: str = Form(...), summary: str = Form(''), tags: str = Form(''), #tags_old: str = Form(...), tags_old: str = Form(''), title: str = Form(...), url: str = Form(...)): node_id = f'hash:{url_hash}' param_hash = url_hash syndicate = path = 'url' jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: name = jabber_id.split('@')[0] instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash) timestamp = datetime.now().isoformat() tags_new = UtilitiesData.organize_tags(tags) if tags else '' entry = {'title' : title.strip(), 'link' : url.strip(), 'summary' : summary.strip() if summary else '', 'published' : published, 'updated' : timestamp, 'tags' : tags_new, 'url_hash' : url_hash, 'jid' : jabber_id, 'name' : name, 'instances' : instances or 1} message = f'Information for URL {url}' description = 'Bookmark properties' xmpp_instance = accounts[jabber_id] payload = UtilitiesSyndication.create_rfc4287_entry(entry) # TODO Add try/except for IQ print('Publish item') # TODO Check. # NOTE You might not need to append to an open node before appending to a whitelist node. node_id = nodes[node]['name'] iq = await XmppPubsub.publish_node_item( xmpp_instance, jabber_id, node_id, url_hash, payload) match node: case 'private': print('Set item as private (XEP-0223)') #iq = await XmppPubsub.publish_node_item_private( # xmpp_instance, node_private_id, url_hash, iq) await XmppPubsub.del_node_item(xmpp_instance, jabber_id, node_public_id, url_hash) UtilitiesData.remove_item_from_cache(directory_cache, jabber_id, 'public', url_hash) await XmppPubsub.del_node_item(xmpp_instance, jabber_id, node_read_id, url_hash) UtilitiesData.remove_item_from_cache(directory_cache, jabber_id, 'read', url_hash) case 'public': await XmppPubsub.del_node_item(xmpp_instance, jabber_id, node_private_id, url_hash) UtilitiesData.remove_item_from_cache(directory_cache, jabber_id, 'private', url_hash) await XmppPubsub.del_node_item(xmpp_instance, jabber_id, node_read_id, url_hash) UtilitiesData.remove_item_from_cache(directory_cache, jabber_id, 'read', url_hash) case 'read': #iq = await XmppPubsub.publish_node_item_private( # xmpp_instance, node_read_id, url_hash, iq) await XmppPubsub.del_node_item(xmpp_instance, jabber_id, node_public_id, url_hash) UtilitiesData.remove_item_from_cache(directory_cache, jabber_id, 'public', url_hash) await XmppPubsub.del_node_item(xmpp_instance, jabber_id, node_private_id, url_hash) UtilitiesData.remove_item_from_cache(directory_cache, jabber_id, 'private', url_hash) if isinstance(iq, str): description = 'Could not save bookmark' message = f'XMPP system message » {iq}.' path = 'error' return result_post(request, jabber_id, description, message, path) #await XmppIq.send(iq, 15) # Save changes to cache file entries_cache_filename = os.path.join(directory_cache, 'items', jabber_id + '.toml') entries_cache = UtilitiesData.open_file_toml(entries_cache_filename) entries_cache_node = entries_cache[node] if node in entries_cache else [] entries_cache_mod = [] #for entry_cache in entries_cache_node: # if entry_cache['url_hash'] == url_hash: # entry_cache = entry # break is_entry_modified = False # You already have this code in the HTML form, which indicates that this is an edit of an existing item # for entry_cache in entries_cache_node: if entry_cache['url_hash'] == url_hash: is_entry_modified = True entries_cache_mod.append(entry) else: entries_cache_mod.append(entry_cache) if not is_entry_modified: entries_cache_mod.append(entry) entries_cache[node] = entries_cache_mod entries_cache_data = entries_cache UtilitiesData.save_to_toml(entries_cache_filename, entries_cache_data) # Save changes to database if node == 'public': tags_valid = [] tags_invalid = [] #tags_list_new = tags.split(',') tags_list_new = tags_new tags_list_old = tags_old.split(', ') for tag in tags_list_old: tag_trim = tag.strip() if tag not in tags_list_new: tags_invalid.append(tag_trim) for tag in tags_list_new: if tag: tag_trim = tag.strip() if tag_trim not in tags_list_old: tags_valid.append(tag_trim) # FIXME Variable tags_valid is not in use. # NOTE Variable tags_valid might not be needed. See function associate_entries_tags_jids. entry['tags'] = tags_valid await DatabaseSQLite.add_tags(db_file, [entry]) # Slow (high I/O) entry_id = DatabaseSQLite.get_entry_id_by_url_hash(db_file, url_hash) if not entry_id: await DatabaseSQLite.add_new_entries(db_file, [entry]) # Is this line needed? await DatabaseSQLite.associate_entries_tags_jids(db_file, entry) #elif not DatabaseSQLite.is_jid_associated_with_url_hash(db_file, jabber_id, url_hash): # await DatabaseSQLite.associate_entries_tags_jids(db_file, entry) else: await DatabaseSQLite.associate_entries_tags_jids(db_file, entry) print('tags_new') print(tags_new) print('tags_old') print(tags_old) print('tags_valid') print(tags_valid) print('tags_invalid') print(tags_invalid) print(url_hash) print(jabber_id) await DatabaseSQLite.delete_combination_row_by_url_hash_and_tag_and_jid(db_file, url_hash, tags_invalid, jabber_id) # Entry for HTML entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(published) entry['updated_mod'] = UtilitiesDate.convert_iso8601_to_readable(timestamp) entry['tags'] = entry['tags'][:5] entries = [entry] template_file = 'browse.xhtml' template_dict = { 'request': request, 'description': description, 'entries': entries, 'exist': True, 'jabber_id': jabber_id, 'journal': journal, 'message': message, 'node_id': node_id, 'param_hash': param_hash, 'path': path, 'pubsub_jid': jabber_id_pubsub, 'syndicate': syndicate, 'tags' : tags_new} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' return result_post(request, jabber_id, description, message, path) @self.app.get('/url/{url_hash}/confirm') async def url_hash_confirm_get(request: Request, url_hash): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) node_id = f'hash:{url_hash}' param_hash = url_hash syndicate = path = 'url' if len(url_hash) == 32: if jabber_id: xmpp_instance = accounts[jabber_id] exist = False for node in nodes: node_id = nodes[node]['name'] iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, node_id, url_hash) if isinstance(iq, Iq): # TODO If URL exist in visitor's bookmarks, display its properties (summary, tags title etc.) before data of others. iq_item = iq['pubsub']['items']['item'] item_payload = iq_item['payload'] if item_payload: exist = True break else: message = f'XMPP system message » {iq}.' if iq == 'Node not found': description = 'An error has occurred' else: description = 'An unknown error has occurred' path = 'error' return result_post(request, jabber_id, description, message, path) if exist: # TODO Add a check: if iq_item['id'] == url_hash: entries = [] entry = UtilitiesSyndication.extract_items(item_payload) instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash) entry['instances'] = instances entry['jid'] = jabber_id name = jabber_id.split('@')[0] entry['name'] = name entry['url_hash'] = url_hash entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(entry['published']) entries.append(entry) description = 'Confirm deletion of a bookmark' message = f'Details for bookmark {entries[0]["link"]}' template_file = 'browse.xhtml' template_dict = { 'request' : request, 'delete' : True, 'description' : description, 'entries' : entries, 'jabber_id' : jabber_id, 'journal' : journal, 'message' : message, 'node_id' : node_id, 'param_hash' : param_hash, 'path' : path, 'pubsub_jid' : jabber_id_pubsub, 'syndicate' : syndicate} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' else: response = RedirectResponse(url='/jid/' + jabber_id) else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' return result_post(request, jabber_id, description, message, path) else: message = 'Blasta system message » Error: MD5 message-digest algorithm.' description = 'The argument for URL does not appear to be a valid MD5 Checksum' path = 'error' return result_post(request, jabber_id, description, message, path) return response @self.app.get('/url/{url_hash}/delete') async def url_hash_delete_get(request: Request, url_hash): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) node_id = f'hash:{url_hash}' param_hash = url_hash syndicate = path = 'url' if len(url_hash) == 32: if jabber_id: xmpp_instance = accounts[jabber_id] exist = False for node_type in nodes: node_id = nodes[node_type]['name'] iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, node_id, url_hash) if isinstance(iq, Iq): # TODO If URL exist in visitor's bookmarks, display its properties (summary, tags title etc.) before data of others. iq_item = iq['pubsub']['items']['item'] item_payload = iq_item['payload'] if item_payload: exist = True break else: message = f'XMPP system message » {iq}.' if iq == 'Node not found': description = 'An error has occurred' else: description = 'An unknown error has occurred' path = 'error' return result_post(request, jabber_id, description, message, path) if exist: # TODO Add a check: if iq_item['id'] == url_hash: entries = [] entry = UtilitiesSyndication.extract_items(item_payload) instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash) entry['instances'] = instances entry['jid'] = jabber_id name = jabber_id.split('@')[0] entry['name'] = name entry['url_hash'] = url_hash entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(entry['published']) entries.append(entry) # Set a title description = 'A bookmark has been deleted' # Set a message message = f'Details for bookmark {entry["link"]}' # Create a link to restore bookmark link_save = ('/save?url=' + urllib.parse.quote(entry['link']) + '&title=' + urllib.parse.quote(entry['title']) + '&summary=' + urllib.parse.quote(entry['summary']) + '&tags=' + urllib.parse.quote(','.join(entry['tags']))) # Remove the item from node xmpp_instance = accounts[jabber_id] await XmppPubsub.del_node_item(xmpp_instance, jabber_id, node_id, url_hash) # Remove the item association from database await DatabaseSQLite.delete_combination_row_by_jid_and_url_hash(db_file, url_hash, jabber_id) #await DatabaseSQLite.delete_combination_row_by_url_hash_and_tag_and_jid(db_file, url_hash, entry['tags'], jabber_id) # Remove the item from cache UtilitiesData.remove_item_from_cache(directory_cache, jabber_id, node_type, url_hash) template_file = 'browse.xhtml' template_dict = { 'request' : request, 'description' : description, 'entries' : entries, 'jabber_id' : jabber_id, 'journal' : journal, 'link_save' : link_save, 'message' : message, 'node_id' : node_id, 'param_hash' : param_hash, 'path' : path, 'pubsub_jid' : jabber_id_pubsub, 'restore' : True, 'syndicate' : syndicate} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' else: response = RedirectResponse(url='/jid/' + jabber_id) else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' return result_post(request, jabber_id, description, message, path) else: message = 'Blasta system message » Error: MD5 message-digest algorithm.' description = 'The argument for URL does not appear to be a valid MD5 Checksum' path = 'error' return result_post(request, jabber_id, description, message, path) return response @self.app.get('/url/{url_hash}/edit') @self.app.post('/url/{url_hash}/edit') async def url_hash_edit_get(request: Request, url_hash): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) # node_id = f'hash:{url_hash}' if len(url_hash) == 32: if jabber_id: xmpp_instance = accounts[jabber_id] exist = False for node in nodes: node_id = nodes[node]['name'] iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, node_id, url_hash) if isinstance(iq, Iq): name = jabber_id.split('@')[0] iq_item = iq['pubsub']['items']['item'] # TODO Add a check: if iq_item['id'] == url_hash: # Is this valid entry['url_hash'] = iq['id'] or should it be iq_item['id'] entry = None item_payload = iq_item['payload'] if item_payload: exist = True break else: message = f'XMPP system message » {iq}.' if iq == 'Node not found': description = 'An error has occurred' else: description = 'An unknown error has occurred' path = 'error' return result_post(request, jabber_id, description, message, path) if exist: path = 'edit' description = 'Edit an existing bookmark' entry = UtilitiesSyndication.extract_items(item_payload) entry['instances'] = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash) print(jabber_id) print(entry['tags']) else: # TODO Consider redirect to path /save (function save_get) # NOTE This seems to be the best to do, albeit, perhaps the pathname should be /save instead of /url/hash/edit. path = 'save' # 'add' description = 'Add a new bookmark' result = DatabaseSQLite.get_entry_by_url_hash(db_file, url_hash) tags_sorted = [] if result: for tag in DatabaseSQLite.get_tags_by_entry_id(db_file, result[0]): tags_sorted.append(tag[0]) entry = {'title' : result[3], 'link' : result[2], 'summary' : result[4], 'published' : result[6], 'updated' : result[7], 'tags' : tags_sorted} #'instances' : result[8], #'jid' = jabber_id, #'name' : name, #'url_hash' : url_hash if entry: entry['jid'] = jabber_id entry['name'] = name entry['url_hash'] = url_hash else: message = f'XMPP system message » {iq}.' if iq == 'Node not found': description = 'An error has occurred' else: description = 'An unknown error has occurred' path = 'error' return result_post(request, jabber_id, description, message, path) template_file = 'edit.xhtml' template_dict = { 'request' : request, 'description' : description, 'edit' : True, 'jabber_id' : jabber_id, 'journal' : journal, 'node' : node, 'path' : path, 'published' : entry['published'], 'summary' : entry['summary'], 'tags' : ', '.join(entry['tags']), 'title' : entry['title'], 'url' : entry['link'], 'url_hash' : url_hash} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' return result_post(request, jabber_id, description, message, path) else: message = 'Blasta system message » Error: MD5 message-digest algorithm.' description = 'The argument for URL does not appear to be a valid MD5 Checksum' path = 'error' return result_post(request, jabber_id, description, message, path) return response