#!/usr/bin/python # -*- coding: utf-8 -*- import asyncio from blasta.config import Cache, Settings, Share from blasta.database.sqlite import DatabaseSQLite from blasta.utilities.cryptography import UtilitiesCryptography from blasta.utilities.data import UtilitiesData from blasta.utilities.date import UtilitiesDate from blasta.utilities.http import UtilitiesHttp from blasta.utilities.logger import UtilitiesLogger from blasta.utilities.syndication import UtilitiesSyndication from blasta.xmpp.form import DataForm from blasta.xmpp.instance import XmppInstance from blasta.xmpp.iq import XmppIq from blasta.xmpp.pubsub import XmppPubsub from fastapi import Cookie, FastAPI, File, Form, HTTPException, Request, Response, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates import os import random from slixmpp.stanza.iq import Iq import sys from starlette.responses import RedirectResponse import urllib.parse import xml.etree.ElementTree as ET logger = UtilitiesLogger(__name__) class HttpInstance: def __init__(self, accounts, sessions): directory_cache = Cache.get_directory() directory_cache_data = os.path.join(directory_cache, 'data') directory_cache_export = os.path.join(directory_cache, 'export') directory_cache_items = os.path.join(directory_cache, 'items') self.directory_cache = directory_cache directory_data = Share.get_directory() directory_data_graphic = os.path.join(directory_data, 'graphic') directory_data_graphic_ico = os.path.join(directory_data_graphic, 'blasta.ico') directory_data_graphic_svg = os.path.join(directory_data_graphic, 'blasta.svg') directory_data_script = os.path.join(directory_data, 'script') directory_data_stylesheet = os.path.join(directory_data, 'stylesheet') directory_data_template = os.path.join(directory_data, 'template') #filename_database = os.path.join(directory_data, 'main.sqlite') db_file = os.path.join(directory_data, 'main.sqlite') self.app = FastAPI() templates = Jinja2Templates(directory=directory_data_template) self.app.mount('/data', StaticFiles(directory=directory_cache_data), name='data') self.app.mount('/export', StaticFiles(directory=directory_cache_export), name='export') self.app.mount('/graphic', StaticFiles(directory=directory_data_graphic), name='graphic') self.app.mount('/script', StaticFiles(directory=directory_data_script), name='script') self.app.mount('/stylesheet', StaticFiles(directory=directory_data_stylesheet), name='stylesheet') directory_settings = Settings.get_directory() filename_settings = os.path.join(directory_settings, 'settings.toml') data = UtilitiesData.open_file_toml(filename_settings) contacts = data['contacts'] contact_email = contacts['email'] contact_irc_channel = contacts['irc_channel'] contact_irc_server = contacts['irc_server'] contact_mix = contacts['mix'] contact_muc = contacts['muc'] contact_xmpp = contacts['xmpp'] settings = data['settings'] jabber_id_pubsub = settings['pubsub'] journal = settings['journal'] node_settings_id = settings['node_settings_id'] node_settings_title = settings['node_settings_title'] node_settings_subtitle = settings['node_settings_subtitle'] node_public_id = settings['node_public_id'] node_public_title = settings['node_public_title'] node_public_subtitle = settings['node_public_subtitle'] node_private_id = settings['node_private_id'] node_private_title = settings['node_private_title'] node_private_subtitle = settings['node_private_subtitle'] node_read_id = settings['node_read_id'] node_read_title = settings['node_read_title'] node_read_subtitle = settings['node_read_subtitle'] nodes = { 'public' : { 'name' : node_public_id, 'title' : node_public_title, 'subtitle' : node_public_subtitle, 'access_model' : 'presence'}, 'private' : { 'name' : node_private_id, 'title' : node_private_title, 'subtitle' : node_private_subtitle, 'access_model' : 'whitelist'}, 'read' : { 'name' : node_read_id, 'title' : node_read_title, 'subtitle' : node_read_subtitle, 'access_model' : 'whitelist'} } origins = [ "http://localhost", "http://localhost:8080", "http://127.0.0.1", "http://127.0.0.1:8080", ] self.app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # This is workaround for setting a "cookie" issue # It appears that there is a problem to set or send a "cookie" when a template is returned. @self.app.middleware('http') async def middleware_handler(request: Request, call_next): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') # Handle URL query if request_url_path != '/save': param_url = request.query_params.get('url', '') or None param_hash = request.query_params.get('hash', '') or None if param_hash: return RedirectResponse(url='/url/' + param_hash) if param_url: url_hash = UtilitiesCryptography.hash_url_to_md5(param_url) return RedirectResponse(url='/url/' + url_hash) response = await call_next(request) jabber_id = session_key = None # Handle credentials (i.e. so called "cookies") if request_url_path == '/disconnect': jid = request.cookies.get('jabber_id') if jid in accounts: del accounts[jid] if jid in sessions: del sessions[jid] response.delete_cookie('session_key') response.delete_cookie('jabber_id') else: try: # Access the variable from the request state jabber_id = request.app.state.jabber_id except Exception as e: jabber_id = request.cookies.get('jabber_id') logger.error(f'{function_name},{jabber_id},{str(e)}') pass try: # Access the variable from the request state session_key = request.app.state.session_key except Exception as e: session_key = request.cookies.get('session_key') logger.error(f'{function_name},{session_key},{str(e)}') pass if jabber_id and session_key: logger.info(f'{function_name},{jabber_id},Session key: {session_key}') response.set_cookie(key='jabber_id', value=jabber_id) response.set_cookie(key='session_key', value=session_key) # del request.app.state.jabber_id # del request.app.state.session_key request.app.state.jabber_id = request.app.state.session_key = None logger.debug(f'{function_name},{request_url_path},Finish') return response # response.set_cookie(key='session', value=str(jid) + '/' + str(session_key)) # response.set_cookie(key='session', # value=jid + '/' + session_key, # expires=datetime.now().replace(tzinfo=timezone.utc) + timedelta(days=30), # max_age=3600, # domain='localhost', # path='/', # secure=True, # httponly=False, # True # samesite='lax') @self.app.exception_handler(401) def not_authorized_exception_handler(request: Request, exc: HTTPException): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') #jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) message = 'Not Authorized.' description = 'Not Authorized (401)' logger.warning(f'{function_name},{request_url_path},{description}') logger.debug(f'{function_name},{request_url_path},Finish') return result_post(request, description, message) @self.app.exception_handler(403) def access_denied_exception_handler(request: Request, exc: HTTPException): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) message = 'Blasta system message » Access denied.' description = 'Access denied (403)' path = 'error' logger.warning(f'{function_name},{request_url_path},{description}') logger.debug(f'{function_name},{request_url_path},Finish') return result_post(request, jabber_id, description, message, path) @self.app.exception_handler(404) def not_found_exception_handler(request: Request, exc: HTTPException): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) message = 'Blasta system message » Not Found.' description = 'Not found (404)' path = 'error' logger.warning(f'{function_name},{request_url_path},{description}') logger.debug(f'{function_name},{request_url_path},Finish') return result_post(request, jabber_id, description, message, path) @self.app.exception_handler(405) def not_allowed_exception_handler(request: Request, exc: HTTPException): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) message = 'Blasta system message » Method Not Allowed.' description = 'Not allowed (405)' path = 'error' logger.warning(f'{function_name},{request_url_path},{description}') logger.debug(f'{function_name},{request_url_path},Finish') return result_post(request, jabber_id, description, message, path) @self.app.exception_handler(500) def internal_error_exception_handler(request: Request, exc: HTTPException): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) message = 'Blasta system message » Internal Server Error.' description = 'Internal error (500)' path = 'error' logger.warning(f'{function_name},{request_url_path},{description}') logger.debug(f'{function_name},{request_url_path},Finish') return result_post(request, jabber_id, description, message, path) # TODO @self.app.get('/admin') def admin_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) authorized = None if authorized: template_file = 'connect.xhtml' template_dict = { 'request' : request, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response else: raise HTTPException(status_code=403, detail='Access denied') @self.app.get('/connect') def connect_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: logger.debug(f'{function_name},{jabber_id},Finish') response = RedirectResponse(url='/jid/' + jabber_id) else: template_file = 'connect.xhtml' template_dict = { 'request' : request, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/contact') def contact_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'contact.xhtml' template_dict = { 'contact_email' : contact_email, 'contact_irc_channel' : contact_irc_channel, 'contact_irc_server' : contact_irc_server, 'contact_mix' : contact_mix, 'contact_muc' : contact_muc, 'contact_xmpp' : contact_xmpp, 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/disconnect') def disconnect_get(request: Request, response: Response, jabber_id: str = Cookie(None), session_key: str = Cookie(None)): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') # response.set_cookie(max_age=0, value='', key='jabber_id') # response.set_cookie(max_age=0, value='', key='session_key') response = RedirectResponse(url='/') response.delete_cookie('session_key') response.delete_cookie('jabber_id') logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/favicon.ico', include_in_schema=False) def favicon_get(): return FileResponse(directory_data_graphic_ico) @self.app.get('/help') def help_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'help.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/help/about') def help_about_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'about.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/help/about/folksonomy') def help_about_folksonomies_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'folksonomy.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/help/about/ideas') def help_about_ideas_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) protocol = request.url.scheme hostname = request.url.hostname + ':' + str(request.url.port) origin = protocol + '://' + hostname template_file = 'ideas.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal, 'origin' : origin} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/help/about/philosophy') def help_about_philosophy_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'philosophy.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/help/about/projects') def help_about_projects_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'projects.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/help/about/software') def help_about_software_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'software.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/help/about/thanks') def help_about_thanks_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'thanks.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/help/about/xmpp') def help_about_xmpp_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'xmpp.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/help/about/xmpp/atomsub') def help_about_xmpp_atomsub_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'atomsub.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/help/about/xmpp/libervia') def help_about_xmpp_libervia_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'libervia.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/help/about/xmpp/movim') def help_about_xmpp_movim_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'movim.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/help/about/xmpp/pubsub') def help_about_xmpp_pubsub_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') date_now_iso = UtilitiesDate.get_current_time_as_iso8601() date_now_readable = UtilitiesDate.convert_iso8601_to_readable(date_now_iso) jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'pubsub.xhtml' template_dict = { 'request' : request, 'date_now_iso' : date_now_iso, 'date_now_readable' : date_now_readable, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/help/feeds') def help_about_feeds_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'feeds.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/help/questions') def help_questions_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'questions.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/help/syndication') def help_syndication_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') hostname = request.url.hostname + ':' + str(request.url.port) protocol = request.url.scheme origin = protocol + '://' + hostname jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'syndication.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal, 'origin' : origin, 'pubsub_jid' : jabber_id_pubsub} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/help/utilities') def help_utilities_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) hostname = request.url.hostname protocol = request.url.scheme hostname = request.url.netloc origin = protocol + '://' + hostname bookmarklet = 'location.href=`' + origin + '/save?url=${encodeURIComponent(window.location.href)}&title=${encodeURIComponent(document.title)}`;' template_file = 'utilities.xhtml' template_dict = { 'request' : request, 'bookmarklet' : bookmarklet, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/jid', response_class=HTMLResponse) @self.app.post('/jid') async def jid_get(request: Request, response : Response): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') node_type = 'public' path = 'jid' jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: xmpp_instance = accounts[jabber_id] node_id = nodes[node_type]['name'] result, reason = await UtilitiesData.update_cache_and_database( db_file, directory_cache, xmpp_instance, jabber_id, node_type, node_id) if result == 'error': message = f'XMPP system message » {reason}.' description = 'IQ Error' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) else: response = await jid_main_get(request, node_type, path, jid=jabber_id) else: description = 'An XMPP account is required' message = 'Blasta system message » Please connect with your XMPP account to view this directory.' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/jid/{jid}') @self.app.post('/jid/{jid}') async def jid_jid_get(request: Request, response : Response, jid): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') response = await jid_main_get(request, node_type='public', path='jid', jid=jid) logger.debug(f'{function_name},{request_url_path},Finish') return response async def jid_main_get(request: Request, node_type=None, path=None, jid=None): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') ask = invite = name = origin = start = '' # pubsub_jid = syndicate = jid # message = 'Find and share bookmarks with family and friends!' # description = f'Bookmarks of {jid}' max_count = 10 entries = None related_tags = None tags_dict = None param_filetype = request.query_params.get('filetype', '') or None param_mode = request.query_params.get('mode', '') or None param_page = request.query_params.get('page', '') or None param_protocol = request.query_params.get('protocol', '') or None param_query = request.query_params.get('q', '') or None if param_query: param_query = param_query.strip() param_tags = request.query_params.get('tags', '') or None param_tld = request.query_params.get('tld', '') or None if param_page and param_mode != 'feed': try: page = int(param_page) page_next = page + 1 page_prev = page - 1 except: page = 1 page_next = 2 else: page = 1 page_next = 2 page_prev = page - 1 index_first = (page - 1)*10 index_last = index_first+10 jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id == jid or node_type in ('private', 'read'): xmpp_instance = accounts[jabber_id] # NOTE You need something other than an iterator (XEP-0059). # You need a PubSub key that would hold tags. filename_items = os.path.join(directory_cache, 'items', jabber_id + '.toml') # NOTE Does it work? # It does not seem to actually filter tags. # NOTE Yes. It does work. # See function "cache_items_and_tags_filter". if param_query: query = param_query entries_cache = UtilitiesData.open_file_toml(filename_items) if node_type in entries_cache: entries_cache_node = entries_cache[node_type] filename_cache = os.path.join(directory_cache, 'data', jid + '_query.toml') UtilitiesData.cache_items_and_tags_search(directory_cache, entries_cache_node, jid, query) if os.path.exists(filename_cache) and os.path.getsize(filename_cache): data = UtilitiesData.open_file_toml(filename_cache) item_ids_all = data['item_ids'] related_tags = data['tags'] if len(item_ids_all) <= index_last: index_last = len(item_ids_all) page_next = None item_ids_selection = [] for item_id in item_ids_all[index_first:index_last]: item_ids_selection.append(item_id) entries = [] for entry in entries_cache_node: for item_id in item_ids_selection: if entry['url_hash'] == item_id: entries.append(entry) for entry in entries: entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(entry['published']) entry['tags'] = entry['tags'][:5] description = f'Your {node_type} bookmarks with "{query}"' message = f'Listing {node_type} bookmarks {index_first+1} - {index_last} out of {len(item_ids_all)}.' #item_id_next = entries[len(entries)-1] else: description = f'No {node_type} bookmarks with "{query}" were found for {jid}' message = 'Blasta system message » No entries.' page_next = None page_prev = None else: description = f'No {node_type} bookmarks with "{query}" were found for {jid}' message = 'Blasta system message » No entries.' page_next = None page_prev = None elif param_tags or param_tld or param_filetype or param_protocol: tags_list = param_tags.split('+') if len(tags_list) == 1: tag = param_tags entries_cache = UtilitiesData.open_file_toml(filename_items) if node_type in entries_cache: entries_cache_node = entries_cache[node_type] filename_cache = os.path.join(directory_cache, 'data', jid, tag + '.toml') UtilitiesData.cache_items_and_tags_filter(directory_cache, entries_cache_node, jid, tag) if os.path.exists(filename_cache) and os.path.getsize(filename_cache): data = UtilitiesData.open_file_toml(filename_cache) item_ids_all = data['item_ids'] related_tags = data['tags'] if len(item_ids_all) <= index_last: index_last = len(item_ids_all) page_next = None item_ids_selection = [] for item_id in item_ids_all[index_first:index_last]: item_ids_selection.append(item_id) entries = [] for entry in entries_cache_node: for item_id in item_ids_selection: if entry['url_hash'] == item_id: entries.append(entry) for entry in entries: entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(entry['published']) entry['tags'] = entry['tags'][:5] description = f'Your {node_type} bookmarks tagged with "{tag}"' message = f'Listing {node_type} bookmarks {index_first+1} - {index_last} out of {len(item_ids_all)}.' #item_id_next = entries[len(entries)-1] else: description = 'No {node_type} bookmarks tagged with "{tag}" were found for {jid}' message = 'Blasta system message » No entries.' page_next = None page_prev = None else: description = 'No {node_type} bookmarks tagged with "{tag}" were found for {jid}' message = 'Blasta system message » No entries.' page_next = None page_prev = None elif len(tag_list) > 1: pass #TODO Support multiple tags # if not param_tags and not param_tld and not param_filetype and not param_protocol and not param_url and not param_hash: else: name = jabber_id.split('@')[0] entries_cache = UtilitiesData.open_file_toml(filename_items) if node_type in entries_cache: entries_cache_node = entries_cache[node_type] filename_cache = os.path.join(directory_cache, 'data', jabber_id + '.toml') #if len(entries_cache_node) and not os.path.exists(filename_cache): UtilitiesData.cache_items_and_tags(directory_cache, entries_cache_node, jabber_id) if os.path.exists(filename_cache) and os.path.getsize(filename_cache): data = UtilitiesData.open_file_toml(filename_cache) item_ids_all = data['item_ids'] related_tags = data['tags'] if len(item_ids_all) <= index_last: index_last = len(item_ids_all) page_next = None item_ids_selection = [] for item_id in item_ids_all[index_first:index_last]: item_ids_selection.append(item_id) entries = [] for entry in entries_cache_node: for item_id in item_ids_selection: if entry['url_hash'] == item_id: entries.append(entry) for entry in entries: entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable( entry['published']) entry['tags'] = entry['tags'][:5] description = f'Your {node_type} bookmarks' message = f'Listing {node_type} bookmarks {index_first+1} - {index_last} out of {len(item_ids_all)}.' #item_id_next = entries[len(entries)-1] else: description = 'Your bookmarks directory appears to be empty' message = 'Blasta system message » Zero count.' start = True else: description = 'Your bookmarks directory appears to be empty' message = 'Blasta system message » Zero count.' start = True elif jabber_id in accounts: # NOTE Keep this IQ function call as an exception. # If one wants to see contents of someone else, an # authorization is required. # NOTE It might be wiser to use cached items or item identifiers # provided that the viewer is authorized to view items. xmpp_instance = accounts[jabber_id] tags_dict = {} if param_query: description = f'Bookmarks from {jid} with "{param_query}"' entries_database = DatabaseSQLite.get_entries_by_jid_and_query(db_file, jid, param_query, index_first) entries_count = DatabaseSQLite.get_entries_count_by_jid_and_query(db_file, jid, param_query) for tag, instances in DatabaseSQLite.get_30_tags_by_jid_and_query(db_file, jid, param_query, index_first): tags_dict[tag] = instances elif param_tags: description = f'Bookmarks from {jid} tagged with "{param_tags}"' entries_database = DatabaseSQLite.get_entries_by_jid_and_tag(db_file, jid, param_tags, index_first) entries_count = DatabaseSQLite.get_entries_count_by_jid_and_tag(db_file, jid, param_tags) for tag, instances in DatabaseSQLite.get_30_tags_by_jid_and_tag(db_file, jid, param_tags, index_first): tags_dict[tag] = instances else: description = f'Bookmarks from {jid}' entries_database = DatabaseSQLite.get_entries_by_jid(db_file, jid, index_first) entries_count = DatabaseSQLite.get_entries_count_by_jid(db_file, jid) for tag, instances in DatabaseSQLite.get_30_tags_by_jid(db_file, jid, index_first): tags_dict[tag] = instances if not entries_database: #message = 'Blasta system message » Error: No entries were found.' #description = 'No results' #path = 'error' #return result_post(request, jabber_id, description, message, path) raise HTTPException(status_code=404, detail='No entries were found') if entries_count: entries = [] for entry in entries_database: tags_sorted = [] for tag in DatabaseSQLite.get_tags_by_entry_id(db_file, entry[0]): tags_sorted.append(tag[0]) entry_jid = DatabaseSQLite.get_jid_by_jid_id(db_file, entry[5]) url_hash = UtilitiesCryptography.hash_url_to_md5(entry[2]) instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash) entries.append( {'title' : entry[3], 'link' : entry[2], 'summary' : entry[4], 'published' : entry[6], 'updated' : entry[7], 'tags' : tags_sorted, 'url_hash' : url_hash, 'jid' : entry_jid, 'name' : entry_jid, # jid.split('@')[0] if '@' in jid else jid, 'instances' : instances}) for entry in entries: try: date_iso = entry['published'] date_wrt = UtilitiesDate.convert_iso8601_to_readable(date_iso) entry['published_mod'] = date_wrt except: print('ERROR: Probably due to an attempt to convert a non ISO 8601.') print(entry['published']) print(entry['published_mod']) print(entry) index_last = index_first+len(entries_database) if entries_count <= index_last: index_last = entries_count page_next = None message = f'Listing bookmarks {index_first+1} - {index_last} out of {entries_count}.' else: # TODO Check permission, so there is no unintended continuing to cached data which is not authorized for. iq = await XmppPubsub.get_node_item_ids(xmpp_instance, jid, node_public_id) if isinstance(iq, Iq): iq_items_remote = iq['disco_items'] # Cache a list of identifiers of node items to a file. iq_items_remote_name = [] for iq_item_remote in iq_items_remote: iq_item_remote_name = iq_item_remote['name'] iq_items_remote_name.append(iq_item_remote_name) #data_item_ids = {'iq_items' : iq_items_remote_name} #filename_item_ids = 'item_ids/' + jid + '.toml' #Data.save_to_toml(filename_item_ids, data_item_ids) item_ids_all = iq_items_remote_name #item_ids_all = data['item_ids'] #related_tags = data['tags'] if len(item_ids_all) <= index_last: page_next = None index_last = len(item_ids_all) item_ids_selection = [] for item_id in item_ids_all[index_first:index_last]: item_ids_selection.append(item_id) iq = await XmppPubsub.get_node_items(xmpp_instance, jid, node_public_id, item_ids_selection) entries = UtilitiesData.extract_iq_items_extra(db_file, iq, jid) if entries: for entry in entries: entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(entry['published']) message = f'Listing bookmarks {index_first+1} - {index_last} out of {len(item_ids_all)}.' description = f'Bookmarks from {jid}' else: message = 'Blasta system message » Zero count.' description = 'Bookmarks directory appears to be empty' invite = True else: message = f'XMPP system message » {iq}.' name = jid.split('@')[0] path = 'error' logger.error(f'{function_name},{request_url_path},{description}') if not iq: message = 'XMPP system message » Empty.' description = 'An unknown error has occurred' invite = True elif iq == 'Item not found': description = 'Bookmarks directory appears to be empty' invite = True elif iq == 'forbidden': description = 'Access forbidden' elif iq == 'item-not-found': description = 'Jabber ID does not appear to be exist' elif iq == 'not-authorized': description = 'You have no authorization to view ' + name + '\'s bookmarks.' ask = True elif iq == 'Node not found': description = name + '\'s bookmarks directory appears to be empty.' invite = True elif 'DNS lookup failed' in iq: domain = jid.split('@')[1] if '@' in jid else jid description = f'Blasta could not connect to server {domain}' elif iq == 'Connection failed: connection refused': description = 'Connection with ' + name + ' has been refused' elif 'Timeout' in iq or 'timeout' in iq: description = 'Connection with ' + name + ' has been timed out' else: breakpoint() description = 'An unknown error has occurred' if invite: hostname = request.url.hostname + ':' + str(request.url.port) protocol = request.url.scheme origin = protocol + '://' + hostname template_file = 'ask.xhtml' template_dict = { 'request': request, 'ask' : ask, 'alias' : jabber_id.split('@')[0], 'description': description, 'invite' : invite, 'jabber_id': jabber_id, 'jid': jid, 'journal': journal, 'message': message, 'name': name, 'origin': origin, 'path': path} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' return response else: description = 'An XMPP account is required' message = 'Blasta system message » Please connect with your XMPP account to view this directory.' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) return response #if not entries: raise HTTPException(status_code=404, detail='No entries were found') template_dict = { 'request': request, 'description': description, 'entries': entries, 'jabber_id': jabber_id, 'jid': jid, 'journal': journal, 'message': message, 'node_type': node_type, 'page_next': page_next, 'page_prev': page_prev, 'pager' : True, 'param_query' : param_query, 'param_tags': param_tags, 'path': path, 'pubsub_jid': jid, 'node_id': nodes[node_type]['name'], 'start': start, 'syndicate': jid, 'tags' : tags_dict or related_tags or ''} if param_mode == 'feed': template_file = 'browse.atom' response = templates.TemplateResponse(template_file, template_dict) response.headers["Content-Type"] = "application/xml" else: template_file = 'browse.xhtml' response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/blasta.svg') def logo_get(): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') response = FileResponse(directory_data_graphic_svg) logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/', response_class=HTMLResponse) @self.app.get('/new', response_class=HTMLResponse) async def root_get_new(request: Request, response : Response): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') response = await root_main_get(request, response, page_type='new') logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/popular', response_class=HTMLResponse) async def root_get_popular(request: Request, response : Response): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') response = await root_main_get(request, response, page_type='popular') logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/query', response_class=HTMLResponse) async def root_get_query(request: Request, response : Response): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') response = await root_main_get(request, response, page_type='query') logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/recent', response_class=HTMLResponse) async def root_get_recent(request: Request, response : Response): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') response = await root_main_get(request, response, page_type='recent') logger.debug(f'{function_name},{request_url_path},Finish') return response async def root_main_get(request: Request, response : Response, page_type=None): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) node_id = path = syndicate = page_type param_filetype = request.query_params.get('filetype', '') or None param_mode = request.query_params.get('mode', '') or None param_page = request.query_params.get('page', '') or None param_protocol = request.query_params.get('protocol', '') or None param_query = request.query_params.get('q', '') or None if param_query: param_query = param_query.strip() param_tags = request.query_params.get('tags', '') or None param_tld = request.query_params.get('tld', '') or None if param_page and param_mode != 'feed': try: page = int(param_page) page_next = page + 1 page_prev = page - 1 except: page = 1 page_next = 2 else: page = 1 page_next = 2 page_prev = page - 1 index_first = (page - 1)*10 if param_tags or param_tld or param_filetype or param_protocol: entries_count = DatabaseSQLite.get_entries_count_by_tag(db_file, param_tags) match page_type: case 'new': description = f'New bookmarks tagged with "{param_tags}"' entries_database = DatabaseSQLite.get_entries_new_by_tag(db_file, param_tags, index_first) tags_of_entries = DatabaseSQLite.get_30_tags_by_entries_new_by_tag(db_file, param_tags, index_first) case 'popular': description = f'Popular bookmarks tagged with "{param_tags}"' # 'Most popular' entries_database = DatabaseSQLite.get_entries_popular_by_tag(db_file, param_tags, index_first) tags_of_entries = DatabaseSQLite.get_30_tags_by_entries_popular_by_tag(db_file, param_tags, index_first) case 'recent': description = f'Recent bookmarks tagged with "{param_tags}"' entries_database = DatabaseSQLite.get_entries_recent_by_tag(db_file, param_tags, index_first) tags_of_entries = DatabaseSQLite.get_30_tags_by_entries_recent_by_tag(db_file, param_tags, index_first) # TODO case 'query': else: match page_type: case 'new': description = 'New bookmarks' entries_database = DatabaseSQLite.get_entries_new(db_file, index_first) tags_of_entries = DatabaseSQLite.get_30_tags_by_entries_new(db_file, index_first) entries_count = DatabaseSQLite.get_entries_count(db_file) case 'popular': description = 'Popular bookmarks' # 'Most popular' entries_database = DatabaseSQLite.get_entries_popular(db_file, index_first) tags_of_entries = DatabaseSQLite.get_30_tags_by_entries_popular(db_file, index_first) entries_count = DatabaseSQLite.get_entries_count(db_file) case 'query': node_id = syndicate = 'new' description = f'Posted bookmarks with "{param_query}"' entries_database = DatabaseSQLite.get_entries_by_query(db_file, param_query, index_first) tags_of_entries = DatabaseSQLite.get_30_tags_by_entries_by_query_recent(db_file, param_query, index_first) entries_count = DatabaseSQLite.get_entries_count_by_query(db_file, param_query) case 'recent': description = 'Recent bookmarks' entries_database = DatabaseSQLite.get_entries_recent(db_file, index_first) tags_of_entries = DatabaseSQLite.get_30_tags_by_entries_recent(db_file, index_first) entries_count = DatabaseSQLite.get_entries_count(db_file) if not entries_database: #message = 'Blasta system message » Error: No entries were found.' #description = 'No results' #path = 'error' #return result_post(request, jabber_id, description, message, path) raise HTTPException(status_code=404, detail='No entries were found') tags_dict = {} #for tag, instances in DatabaseSQLite.get_tags_30(db_file): for tag, instances in tags_of_entries: tags_dict[tag] = instances entries = [] for entry in entries_database: tags_sorted = [] for tag in DatabaseSQLite.get_tags_by_entry_id(db_file, entry[0]): tags_sorted.append(tag[0]) jid = DatabaseSQLite.get_jid_by_jid_id(db_file, entry[5]) url_hash = UtilitiesCryptography.hash_url_to_md5(entry[2]) instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash) entries.append( {'title' : entry[3], 'link' : entry[2], 'summary' : entry[4], 'published' : entry[6], 'updated' : entry[7], 'tags' : tags_sorted, 'url_hash' : url_hash, #entry[1] 'jid' : jid, 'name' : jid, # jid.split('@')[0] if '@' in jid else jid, 'instances' : instances}) for entry in entries: try: date_iso = entry['published'] date_wrt = UtilitiesDate.convert_iso8601_to_readable(date_iso) entry['published_mod'] = date_wrt except: print('ERROR: Probably due to an attempt to convert a non ISO 8601.') print(entry['published']) print(entry['published_mod']) print(entry) index_last = index_first+len(entries_database) if entries_count <= index_last: # NOTE Did you forget to modify index_last? # NOTE No. It appears that it probably not needed index_last = entries_count page_next = None #if page_type != 'new' or page_prev or param_tags or param_tld or param_filetype or param_protocol: if request.url.path != '/' or request.url.query: message = f'Listing bookmarks {index_first+1} - {index_last} out of {entries_count}.' message_link = None else: message = ('Welcome to Blasta, an XMPP PubSub oriented social ' 'bookmarks manager for organizing online content.') message_link = {'href' : '/help/about', 'text' : 'Learn more'} template_dict = { 'request' : request, 'description' : description, 'entries' : entries, 'jabber_id' : jabber_id, 'journal' : journal, 'message' : message, 'message_link' : message_link, 'node_id' : node_id, 'page_next' : page_next, 'page_prev' : page_prev, 'page_type' : page_type, 'pager' : True, 'param_query' : param_query, 'param_tags' : param_tags, 'path' : path, 'pubsub_jid' : jabber_id_pubsub, 'syndicate' : syndicate, 'tags' : tags_dict} if param_mode == 'feed': # NOTE Consider scheme "feed" in order to prompt news # reader 'feed://' + request.url.netloc + request.url.path template_file = 'browse.atom' response = templates.TemplateResponse(template_file, template_dict) response.headers["Content-Type"] = "application/xml" else: template_file = 'browse.xhtml' response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response """ # TODO Return to code /tag and / (root) once SQLite database is ready. @self.app.get('/tag/{tag}') async def tag_tag_get(request: Request, tag): jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) node_id = f'tag:{tag}' syndicate = f'?tag={tag}' path = 'tag' # NOTE Perhaps it would be beneficial to retrieve "published" and # tags ("category") of viewer to override the tags of Blasta # TODO If URL exist in visitor's bookmarks, display its properties # (summary, tags title etc.) before data of others. # if UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request): page = request.query_params.get('page', '') or None if page: try: page = int(page) page_next = page + 1 page_prev = page - 1 except: page = 1 page_next = 2 else: page = 1 page_next = 2 page_prev = page - 1 index_first = (page - 1)*10 index_last = index_first+10 tags_dict = {} for entry in entries_database: for entry_tag in entry['tags']: if entry_tag in tags_dict: tags_dict[entry_tag] = tags_dict[entry_tag]+1 else: tags_dict[entry_tag] = 1 tags_dict = dict(sorted(tags_dict.items(), key=lambda item: (-item[1], item[0]))) tags_dict = dict(list(tags_dict.items())[:30]) #tags_dict = dict(sorted(tags_dict.items(), key=lambda item: (-item[1], item[0]))[:30]) print(tags_dict) entries = [] for entry in entries_database: if tag in entry['tags']: entries.append(entry) for entry in entries: # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # """ @self.app.post('/', response_class=HTMLResponse) async def root_post(request: Request, response: Response, jabber_id: str = Form(...), password: str = Form(...)): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') if not UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request): # Store a variable in the request's state request.app.state.jabber_id = jabber_id session_key = str(random.random()) request.app.state.session_key = session_key accounts[jabber_id] = XmppInstance(jabber_id + '/blasta', password) if accounts[jabber_id].is_connected: sessions[jabber_id] = session_key else: del accounts[jabber_id] jabber_id = None # Check if the user and password are present and valid # If not valid, return "Could not connect to JID" # FIXME Instead of an arbitrary number (i.e. 5 seconds), write a # while loop with a timeout of 10 seconds. # Check whether an account is connected. # Wait for 5 seconds to connect. await asyncio.sleep(5) #if jabber_id in accounts and accounts[jabber_id].connection_accepted: if jabber_id in accounts: xmpp_instance = accounts[jabber_id] #await xmpp_instance.plugin['xep_0060'].delete_node(jabber_id, node_public_id) for node_properties in nodes: properties = nodes[node_properties] if not await XmppPubsub.is_node_exist(xmpp_instance, properties['name']): iq = XmppPubsub.create_node_atom( xmpp_instance, jabber_id, properties['name'], properties['title'], properties['subtitle'], properties['access_model']) await XmppIq.send(iq, 15) #await XmppPubsub.set_node_private(xmpp_instance, node_private_id) #await XmppPubsub.set_node_private(xmpp_instance, node_read_id) #configuration_form = await xmpp_instance['xep_0060'].get_node_config(jabber_id, properties['name']) #print(configuration_form) node_id = nodes['public']['name'] result, reason = await UtilitiesData.update_cache_and_database( db_file, directory_cache, xmpp_instance, jabber_id, 'public', node_id) if result == 'error': message = f'XMPP system message » {reason}.' description = 'IQ Error' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) else: iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, node_settings_id, 'routine') routine = None if isinstance(iq, Iq): payload = iq['pubsub']['items']['item']['payload'] if payload: xmlns = '{jabber:x:data}' element_value = payload.find('.//' + xmlns + 'field[@var="routine"]/' + xmlns + 'value') if isinstance(element_value, ET.Element): routine = element_value.text match routine: case 'private': response = RedirectResponse(url='/private') case 'read': response = RedirectResponse(url='/read') case _: response = RedirectResponse(url='/jid/') else: #del accounts[jabber_id] #del sessions[jabber_id] message = 'Blasta system message » Authorization has failed.' description = 'Connection has failed' path = 'error' logger.error(f'{function_name},{jabber_id},{description}') response = result_post(request, jabber_id, description, message, path) logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.post('/message') async def message_post(request: Request, jid: str = Form(...), body: str = Form(...)): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: #headline = 'This is a message from Blasta' xmpp_instance = accounts[jabber_id] XmppMessage.send(xmpp_instance, jid, body) alias = jid.split('@')[0] message = f'Your message has been sent to {alias}.' description = 'Message has been sent' path = 'message' logger.info(f'{function_name},{request_url_path},{description}') else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/now') def now_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'now.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/private', response_class=HTMLResponse) @self.app.post('/private') async def private_get(request: Request, response : Response): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') node_type = 'private' path = 'private' jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: xmpp_instance = accounts[jabber_id] node_id = nodes[node_type]['name'] result, reason = await UtilitiesData.update_cache_and_database( db_file, directory_cache, xmpp_instance, jabber_id, node_type, node_id) if result == 'error': message = f'Blasta system message » {reason}.' description = 'Directory "private" appears to be empty' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) else: response = await jid_main_get(request, node_type, path) else: description = 'An XMPP account is required' message = 'Blasta system message » Please connect with your XMPP account to view this directory.' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/profile') async def profile_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: xmpp_instance = accounts[jabber_id] if not await XmppPubsub.is_node_exist(xmpp_instance, node_settings_id): iq = XmppPubsub.create_node_config(xmpp_instance, jabber_id, node_settings_id) await XmppIq.send(iq, 15) access_models = {} for node_type in nodes: node_id = nodes[node_type]['name'] iq = await XmppPubsub.get_node_configuration(xmpp_instance, jabber_id, node_id) if isinstance(iq, Iq): access_model = iq['pubsub_owner']['configure']['form']['values']['pubsub#access_model'] access_models[node_type] = access_model settings = {} for setting in ['enrollment', 'routine']: iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, node_settings_id, setting) if isinstance(iq, Iq): payload = iq['pubsub']['items']['item']['payload'] if payload: xmlns = '{jabber:x:data}' element_value = payload.find('.//' + xmlns + 'field[@var="' + setting + '"]/' + xmlns + 'value') if isinstance(element_value, ET.Element): settings[setting] = element_value.text template_file = 'profile.xhtml' template_dict = { 'access_models' : access_models, 'enroll' : settings['enrollment'] if 'enrollment' in settings else None, 'request' : request, 'routine' : settings['routine'] if 'routine' in settings else None, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.post('/profile') async def profile_post(request: Request, routine: str = Form(None), enroll: str = Form(None)): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: xmpp_instance = accounts[jabber_id] if routine: message = f'The routine directory has been set to {routine}' payload = DataForm.create_setting_entry(xmpp_instance, 'routine', routine) iq = await XmppPubsub.publish_node_item( # NOTE Consider "configurations" as item ID (see Movim) xmpp_instance, jabber_id, node_settings_id, 'routine', payload) if enroll: if enroll == '1': message = 'Your database is shared with the Blasta system' else: message = 'Your database is excluded from the Blasta system' payload = DataForm.create_setting_entry(xmpp_instance, 'enroll', enroll) iq = await XmppPubsub.publish_node_item( xmpp_instance, jabber_id, node_settings_id, 'enrollment', payload) description = 'Setting has been saved' template_file = 'result.xhtml' template_dict = { 'description' : description, 'enroll' : enroll, 'jabber_id' : jabber_id, 'journal' : journal, 'message' : message, 'request' : request, 'routine' : routine} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/profile/export/{node_type}/{filetype}') async def profile_export_get(request: Request, node_type, filetype): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: xmpp_instance = accounts[jabber_id] node_id = nodes[node_type]['name'] iq = await XmppPubsub.get_node_items(xmpp_instance, jabber_id, node_id) if isinstance(iq, Iq): entries = UtilitiesData.extract_iq_items(iq, jabber_id) # TODO Append a bookmark or bookmarks of Blasta if entries: filename = os.path.join(directory_cache, 'export', jabber_id + '_' + node_type + '.' + filetype) #filename = 'export/' + jabber_id + '_' + node_type + '.' + filetype #filename = f'export/{jabber_id}_{node_type}.{filetype}' #filename = 'export_' + node_type + '/' + jabber_id + '_' + '.' + filetype #filename = f'export_{node_type}/{jabber_id}.{filetype}' match filetype: case 'json': UtilitiesData.save_to_json(filename, entries) case 'toml': # NOTE Should the dict be named with 'entries' or 'private'/'public'/'read'? data = {'entries' : entries} UtilitiesData.save_to_toml(filename, data) response = FileResponse(filename) else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.post('/profile/import') # def profile_import_post(file: UploadFile = File(...)): async def profile_import_post(request: Request, file: UploadFile | None = None, merge: str = Form(None), node: str = Form(...), override: str = Form(None)): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: xmpp_instance = accounts[jabber_id] if file: # TODO If node does not exist, redirect to result page with # a message that bookmarks are empty. # NOTE No. node_id = nodes[node]['name'] node_title = nodes[node]['title'] node_subtitle = nodes[node]['subtitle'] node_access_model = nodes[node]['access_model'] if not await XmppPubsub.is_node_exist(xmpp_instance, node_id): iq = XmppPubsub.create_node_atom( xmpp_instance, jabber_id, node_id, node_title, node_subtitle, node_access_model) await XmppIq.send(iq, 15) #return {"filename": file.filename} content = file.file.read().decode() # TODO Add match/case for filetype. filename = os.path.splitext(file.filename) file_extension = filename[len(filename)-1] match file_extension: case '.html': entries = UtilitiesData.load_data_netscape(content) case '.toml': entries = UtilitiesData.load_data_toml(content) case _: message = 'Blasta system message » Error: Unknown file type.' description = 'Import error' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) return response # entries_node = entries[node] #breakpoint() #for entry in entries: print(entry) name = jabber_id.split('@')[0] # timestamp = datetime.now().isoformat() counter = 0 for entry_type in entries: for entry in entries[entry_type]: url_hash = item_id = UtilitiesCryptography.hash_url_to_md5(entry['link']) instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash) entry_new = { 'title' : entry['title'], 'link' : entry['link'], 'summary' : entry['summary'], 'published' : entry['published'], 'updated' : entry['published'], #'updated' : entry['updated'], 'tags' : entry['tags'], 'url_hash' : url_hash, 'jid' : jabber_id, 'name' : name, 'instances' : instances} xmpp_instance = accounts[jabber_id] payload = UtilitiesSyndication.create_rfc4287_entry(entry_new) iq = await XmppPubsub.publish_node_item( xmpp_instance, jabber_id, node_id, item_id, payload) #await XmppIq.send(iq, 15) counter += 1 message = f'Blasta system message » Imported {counter} items.' description = 'Import successful' path = 'profile' response = result_post(request, jabber_id, description, message, path) else: message = 'Blasta system message » Error: No upload file sent.' description = 'Import error' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/save') async def save_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: routine = None xmpp_instance = accounts[jabber_id] param_url = request.query_params.get('url', '') url_hash = UtilitiesCryptography.hash_url_to_md5(param_url) for node_type in nodes: node_id = nodes[node_type]['name'] iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, node_id, url_hash) #if len(iq['pubsub']['items']): if (isinstance(iq, Iq) and url_hash == iq['pubsub']['items']['item']['id']): response = RedirectResponse(url='/url/' + url_hash + '/edit') return response iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, node_settings_id, 'routine') if isinstance(iq, Iq): payload = iq['pubsub']['items']['item']['payload'] if payload: xmlns = '{jabber:x:data}' element_value = payload.find('.//' + xmlns + 'field[@var="routine"]/' + xmlns + 'value') if isinstance(element_value, ET.Element): routine = element_value.text # else: # routine = None # NOTE Is "message" missing? description = 'Add a new bookmark' # 'Enter properties for a bookmark' param_title = request.query_params.get('title', '') param_tags = request.query_params.get('tags', '') param_summary = request.query_params.get('summary', '') path = 'save' if request.query_params: message = message_link = None else: message = 'For greater ease, you migh want to try our' message_link = {'href' : '/help/utilities#buttons', 'text' : 'bookmarklets'} template_file = 'edit.xhtml' template_dict = { 'request' : request, 'description' : description, 'jabber_id' : jabber_id, 'journal' : journal, 'message' : message, 'message_link' : message_link, 'path' : path, 'routine' : routine, 'summary' : param_summary, 'tags' : param_tags, 'title' : param_title, 'url' : param_url} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.post('/save') async def save_post(request: Request, node: str = Form(...), summary: str = Form(''), tags: str = Form(''), title: str = Form(...), url: str = Form(...)): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: xmpp_instance = accounts[jabber_id] url_hash = UtilitiesCryptography.hash_url_to_md5(url) for node_type in nodes: node_id = nodes[node_type]['name'] iq = await XmppPubsub.get_node_item( xmpp_instance, jabber_id, node_id, url_hash) if (isinstance(iq, Iq) and url_hash == iq['pubsub']['items']['item']['id']): return RedirectResponse(url='/url/' + url_hash + '/edit') description = 'Confirm properties of a bookmark' path = 'save' published = UtilitiesDate.get_current_time_as_iso8601() template_file = 'edit.xhtml' template_dict = { 'request' : request, 'confirm' : True, 'description' : description, 'jabber_id' : jabber_id, 'journal' : journal, 'node' : node, 'path' : path, 'published' : published, 'summary' : summary, 'tags' : tags, 'title' : title, 'url' : url, 'url_hash' : url_hash} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/read', response_class=HTMLResponse) @self.app.post('/read') async def read_get(request: Request, response : Response): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') node_type = 'read' path = 'read' jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: xmpp_instance = accounts[jabber_id] node_id = nodes[node_type]['name'] result, reason = await UtilitiesData.update_cache_and_database( db_file, directory_cache, xmpp_instance, jabber_id, node_type, node_id) if result == 'error': message = f'Blasta system message » {reason}.' description = 'Directory "read" appears to be empty' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) else: response = await jid_main_get(request, node_type, path) else: description = 'An XMPP account is required' message = 'Blasta system message » Please connect with your XMPP account to view this directory.' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') esponse = result_post(request, jabber_id, description, message, path) logger.debug(f'{function_name},{request_url_path},Finish') return response def result_post(request: Request, jabber_id: str, description: str, message: str, path: str, http_code=None): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') template_file = 'result.xhtml' template_dict = { 'description' : description, 'jabber_id' : jabber_id, 'journal' : journal, 'message' : message, 'path' : path, 'request' : request} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/register') def register_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) template_file = 'register.xhtml' template_dict = { 'request' : request, 'jabber_id' : jabber_id, 'journal' : journal} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/rss') def rss(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') response = RedirectResponse(url='/help/syndication') logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/search') async def search_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') response = RedirectResponse(url='/search/all') logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/search/all') async def search_all_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) description = 'Search for public bookmarks' form_action = '/query' input_id = input_name = label_for = 'q' input_placeholder = 'Enter a search query.' input_type = 'search' label = 'Search' message = 'Search for bookmarks in the Blasta system.' path = 'all' template_file = 'search.xhtml' template_dict = { 'request' : request, 'description' : description, 'form_action' : form_action, 'input_id' : input_id, 'input_name' : input_name, 'input_placeholder' : input_placeholder, 'input_type' : input_type, 'label' : label, 'label_for' : label_for, 'jabber_id' : jabber_id, 'journal' : journal, 'message' : message, 'path' : path} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/search/jid/{jid}') async def search_jid_get(request: Request, jid): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: if jabber_id == jid: description = 'Search your own bookmarks' message = 'Search for bookmarks from your own directory.' else: description = f'Search bookmarks of {jid}' message = 'Search for bookmarks of a given Jabber ID.' form_action = '/jid/' + jid input_id = input_name = label_for = 'q' input_placeholder = 'Enter a search query.' input_type = 'search' label = 'Search' path = 'jid' template_file = 'search.xhtml' template_dict = { 'request' : request, 'description' : description, 'form_action' : form_action, 'input_id' : input_id, 'input_name' : input_name, 'input_placeholder' : input_placeholder, 'input_type' : input_type, 'jabber_id' : jabber_id, 'jid' : jid, 'label' : label, 'label_for' : label_for, 'journal' : journal, 'message' : message, 'path' : path} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' else: response = RedirectResponse(url='/search/all') logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/search/url') async def search_url_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) description = 'Search for a bookmark' form_action = None # This is not relevant due to function middleware. Maybe / or /url. input_id = input_name = label_for = 'url' input_placeholder = 'Enter a URL.' input_type = 'url' label = 'URL' message = 'Search for a bookmark by a URL.' path = 'url' template_file = 'search.xhtml' template_dict = { 'request' : request, 'description' : description, # 'form_action' : form_action, 'input_id' : input_id, 'input_name' : input_name, 'input_placeholder' : input_placeholder, 'input_type' : input_type, 'label' : label, 'label_for' : label_for, 'jabber_id' : jabber_id, 'journal' : journal, 'message' : message, 'path' : path} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/tag') def tag_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) tag_list = DatabaseSQLite.get_tags_500(db_file) message = 'Common 500 tags sorted by name and sized by commonality.' description = 'Common tags' template_file = 'tag.xhtml' template_dict = { 'request' : request, 'description' : description, 'jabber_id' : jabber_id, 'journal' : journal, 'message' : message, 'tag_list' : tag_list} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/tag/{jid}') def tag_get_jid(request: Request, jid): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) # NOTE Consider retrieval of tags from cache file. # This is relevant to private and read nodes. #if jabber_id == jid or node_type in ('private', 'read'): tag_list = DatabaseSQLite.get_500_tags_by_jid_sorted_by_name(db_file, jid) message = 'Common 500 tags sorted by name and sized by commonality.' description = f'Common tags of {jid}' template_file = 'tag.xhtml' template_dict = { 'request' : request, 'description' : description, 'jabber_id' : jabber_id, 'jid' : jid, 'journal' : journal, 'message' : message, 'tag_list' : tag_list} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/url') async def url_get(request: Request): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') response = RedirectResponse(url='/search/url') logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/url/{url_hash}') async def url_hash_get(request: Request, url_hash): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) node_id = f'hash:{url_hash}' param_hash = url_hash syndicate = path = 'url' entries = [] exist = False if len(url_hash) == 32: if jabber_id: xmpp_instance = accounts[jabber_id] for node in nodes: node_id = nodes[node]['name'] iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, node_id, url_hash) if isinstance(iq, Iq): # TODO If URL exist in visitor's bookmarks, display its properties (summary, tags title etc.) before data of others. iq_item = iq['pubsub']['items']['item'] item_payload = iq_item['payload'] if item_payload: exist = True break else: message = f'XMPP system message » Error: {iq}.' description = 'The requested bookmark could not be retrieved' path = 'error' logger.error(f'{function_name},{request_url_path},{description}: {iq}') response = result_post(request, jabber_id, description, message, path) return response if exist: # TODO Perhaps adding a paragraph with "your tags" and "who else has tagged this link" # and keep the (5 item) limit. #entry = UtilitiesSyndication.extract_items(item_payload) # NOTE Display only 5 items, as all the other tags appear at the list of "Related tags". entry = UtilitiesSyndication.extract_items(item_payload, limit=True) if entry: #url_hash = iq_item['id'] url_hash = UtilitiesCryptography.hash_url_to_md5(entry['link']) # TODO Add a check: if iq_item['id'] == url_hash: instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash) entry['instances'] = instances entry['jid'] = jabber_id name = jabber_id.split('@')[0] entry['name'] = name entry['url_hash'] = url_hash entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(entry['published']) #entry['tags'] = entry['tags'][:5] entries.append(entry) tags_list = {} tags_and_instances = DatabaseSQLite.get_tags_and_instances_by_url_hash(db_file, url_hash) for tag, tag_instances in tags_and_instances: tags_list[tag] = tag_instances else: # NOTE Is it possible to activate this else statement? Consider removal. # https://fastapi.tiangolo.com/tutorial/handling-errors/ #raise HTTPException(status_code=404, detail="Item not found") message = 'Blasta system message » Error: Not found.' description = 'The requested bookmark does not exist' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) return response else: entry = DatabaseSQLite.get_entry_by_url_hash(db_file, url_hash) tags_sorted = [] if entry: for tag in DatabaseSQLite.get_tags_by_entry_id(db_file, entry[0]): tags_sorted.append(tag[0]) tags_list = {} tags_and_instances = DatabaseSQLite.get_tags_and_instances_by_entry_id(db_file, entry[0]) for tag, tag_instances in tags_and_instances: tags_list[tag] = tag_instances jid = DatabaseSQLite.get_jid_by_jid_id(db_file, entry[5]) instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash) entries.append( {'title' : entry[3], 'link' : entry[2], 'summary' : entry[4], 'published' : entry[6], 'published_mod' : UtilitiesDate.convert_iso8601_to_readable(entry[6]), 'updated' : entry[7], 'tags' : tags_sorted, 'url_hash' : url_hash, 'jid' : jid, 'name' : jid, # jid.split('@')[0] if '@' in jid else jid, 'instances' : instances}) # message = f'XMPP system message » {iq}.' # if iq == 'Node not found': # description = 'An error has occurred' # else: # description = 'An unknown error has occurred' else: # https://fastapi.tiangolo.com/tutorial/handling-errors/ #raise HTTPException(status_code=404, detail="Item not found") message = 'Blasta system message » Error: Not found.' description = 'The requested bookmark does not exist' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) return response else: entry = DatabaseSQLite.get_entry_by_url_hash(db_file, url_hash) if entry: tags_sorted = [] for tag in DatabaseSQLite.get_tags_by_entry_id(db_file, entry[0]): tags_sorted.append(tag[0]) tags_list = {} tags_and_instances = DatabaseSQLite.get_tags_and_instances_by_entry_id(db_file, entry[0]) for tag, tag_instances in tags_and_instances: tags_list[tag] = tag_instances jid = DatabaseSQLite.get_jid_by_jid_id(db_file, entry[5]) instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash) entries.append( {'title' : entry[3], 'link' : entry[2], 'summary' : entry[4], 'published' : entry[6], 'published_mod' : UtilitiesDate.convert_iso8601_to_readable(entry[6]), 'updated' : entry[7], 'tags' : tags_sorted, 'url_hash' : url_hash, 'jid' : jid, 'name' : jid, # jid.split('@')[0] if '@' in jid else jid, 'instances' : instances}) else: # https://fastapi.tiangolo.com/tutorial/handling-errors/ #raise HTTPException(status_code=404, detail="Item not found") message = 'Blasta system message » Error: Not found.' description = 'The requested bookmark does not exist' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) return response message = f'Information for URI {entries[0]["link"]}' # entry[2] if not instances: instances = 0 if instances > 1: description = 'Discover new resources and see who shares them' template_file = 'people.xhtml' people_list = {} jids_and_tags = DatabaseSQLite.get_jids_and_tags_by_url_hash(db_file, url_hash) for jid, tag in jids_and_tags: if jid in people_list and isinstance(people_list[jid], list): people_list[jid].append(tag) else: people_list[jid] = [tag] else: people_list = None description = 'Resource properties' template_file = 'browse.xhtml' template_dict = { 'request' : request, 'description' : description, 'entries' : entries, 'exist' : exist, 'jabber_id' : jabber_id, 'journal' : journal, 'message' : message, 'node_id' : node_id, 'param_hash' : param_hash, 'path' : path, 'people' : people_list, 'pubsub_jid' : jabber_id_pubsub, 'syndicate' : syndicate, 'tags' : tags_list} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' else: message = 'Blasta system message » Error: MD5 message-digest algorithm.' description = 'The argument for URL does not appear to be a valid MD5 Checksum' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.post('/url/{url_hash}') async def url_hash_post(request: Request, url_hash, node: str = Form(...), published: str = Form(...), summary: str = Form(''), tags: str = Form(''), #tags_old: str = Form(...), tags_old: str = Form(''), title: str = Form(...), url: str = Form(...)): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') node_id = f'hash:{url_hash}' param_hash = url_hash syndicate = path = 'url' jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) if jabber_id: name = jabber_id.split('@')[0] instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash) timestamp = UtilitiesDate.get_current_time_as_iso8601() tags_new = UtilitiesData.organize_tags(tags) if tags else '' entry = {'title' : title.strip(), 'link' : url.strip(), 'summary' : summary.strip() if summary else '', 'published' : published, 'updated' : timestamp, 'tags' : tags_new, 'url_hash' : url_hash, 'jid' : jabber_id, 'name' : name, 'instances' : instances or 1} message = f'Information for URL {url}' description = 'Bookmark properties' xmpp_instance = accounts[jabber_id] payload = UtilitiesSyndication.create_rfc4287_entry(entry) # TODO Add try/except for IQ print('Publish item') # TODO Check. # NOTE You might not need to append to an open node before appending to a whitelist node. node_id = nodes[node]['name'] iq = await XmppPubsub.publish_node_item( xmpp_instance, jabber_id, node_id, url_hash, payload) match node: case 'private': print('Set item as private (XEP-0223)') #iq = await XmppPubsub.publish_node_item_private( # xmpp_instance, node_private_id, url_hash, iq) await XmppPubsub.del_node_item(xmpp_instance, jabber_id, node_public_id, url_hash) UtilitiesData.remove_item_from_cache(directory_cache, jabber_id, 'public', url_hash) await XmppPubsub.del_node_item(xmpp_instance, jabber_id, node_read_id, url_hash) UtilitiesData.remove_item_from_cache(directory_cache, jabber_id, 'read', url_hash) case 'public': await XmppPubsub.del_node_item(xmpp_instance, jabber_id, node_private_id, url_hash) UtilitiesData.remove_item_from_cache(directory_cache, jabber_id, 'private', url_hash) await XmppPubsub.del_node_item(xmpp_instance, jabber_id, node_read_id, url_hash) UtilitiesData.remove_item_from_cache(directory_cache, jabber_id, 'read', url_hash) case 'read': #iq = await XmppPubsub.publish_node_item_private( # xmpp_instance, node_read_id, url_hash, iq) await XmppPubsub.del_node_item(xmpp_instance, jabber_id, node_public_id, url_hash) UtilitiesData.remove_item_from_cache(directory_cache, jabber_id, 'public', url_hash) await XmppPubsub.del_node_item(xmpp_instance, jabber_id, node_private_id, url_hash) UtilitiesData.remove_item_from_cache(directory_cache, jabber_id, 'private', url_hash) if isinstance(iq, str): description = 'Could not save bookmark' message = f'XMPP system message » {iq}.' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) return response #await XmppIq.send(iq, 15) # Save changes to cache file entries_cache_filename = os.path.join(directory_cache, 'items', jabber_id + '.toml') entries_cache = UtilitiesData.open_file_toml(entries_cache_filename) entries_cache_node = entries_cache[node] if node in entries_cache else [] entries_cache_mod = [] #for entry_cache in entries_cache_node: # if entry_cache['url_hash'] == url_hash: # entry_cache = entry # break is_entry_modified = False # You already have this code in the HTML form, which indicates that this is an edit of an existing item # <input type="hidden" id="update" name="update" value="yes" required/> for entry_cache in entries_cache_node: if entry_cache['url_hash'] == url_hash: is_entry_modified = True entries_cache_mod.append(entry) else: entries_cache_mod.append(entry_cache) if not is_entry_modified: entries_cache_mod.append(entry) entries_cache[node] = entries_cache_mod entries_cache_data = entries_cache UtilitiesData.save_to_toml(entries_cache_filename, entries_cache_data) # Save changes to database if node == 'public': tags_valid = [] tags_invalid = [] #tags_list_new = tags.split(',') tags_list_new = tags_new tags_list_old = tags_old.split(', ') for tag in tags_list_old: tag_trim = tag.strip() if tag not in tags_list_new: tags_invalid.append(tag_trim) for tag in tags_list_new: if tag: tag_trim = tag.strip() if tag_trim not in tags_list_old: tags_valid.append(tag_trim) # FIXME Variable tags_valid is not in use. # NOTE Variable tags_valid might not be needed. See function associate_entries_tags_jids. entry['tags'] = tags_valid await DatabaseSQLite.add_tags(db_file, [entry]) # Slow (high I/O) entry_id = DatabaseSQLite.get_entry_id_by_url_hash(db_file, url_hash) if not entry_id: await DatabaseSQLite.add_new_entries(db_file, [entry]) # Is this line needed? await DatabaseSQLite.associate_entries_tags_jids(db_file, entry) #elif not DatabaseSQLite.is_jid_associated_with_url_hash(db_file, jabber_id, url_hash): # await DatabaseSQLite.associate_entries_tags_jids(db_file, entry) else: await DatabaseSQLite.associate_entries_tags_jids(db_file, entry) print('tags_new') print(tags_new) print('tags_old') print(tags_old) print('tags_valid') print(tags_valid) print('tags_invalid') print(tags_invalid) print(url_hash) print(jabber_id) await DatabaseSQLite.delete_combination_row_by_url_hash_and_tag_and_jid(db_file, url_hash, tags_invalid, jabber_id) # Entry for HTML entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(published) entry['updated_mod'] = UtilitiesDate.convert_iso8601_to_readable(timestamp) entry['tags'] = entry['tags'][:5] entries = [entry] template_file = 'browse.xhtml' template_dict = { 'request': request, 'description': description, 'entries': entries, 'exist': True, 'jabber_id': jabber_id, 'journal': journal, 'message': message, 'node_id': node_id, 'param_hash': param_hash, 'path': path, 'pubsub_jid': jabber_id_pubsub, 'syndicate': syndicate, 'tags' : tags_new} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/url/{url_hash}/confirm') async def url_hash_confirm_get(request: Request, url_hash): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) node_id = f'hash:{url_hash}' param_hash = url_hash syndicate = path = 'url' if len(url_hash) == 32: if jabber_id: xmpp_instance = accounts[jabber_id] exist = False for node in nodes: node_id = nodes[node]['name'] iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, node_id, url_hash) if isinstance(iq, Iq): # TODO If URL exist in visitor's bookmarks, display its properties (summary, tags title etc.) before data of others. iq_item = iq['pubsub']['items']['item'] item_payload = iq_item['payload'] if item_payload: exist = True break else: message = f'XMPP system message » {iq}.' if iq == 'Node not found': description = 'An error has occurred' else: description = 'An unknown error has occurred' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) return response if exist: # TODO Add a check: if iq_item['id'] == url_hash: entries = [] entry = UtilitiesSyndication.extract_items(item_payload) instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash) entry['instances'] = instances entry['jid'] = jabber_id name = jabber_id.split('@')[0] entry['name'] = name entry['url_hash'] = url_hash entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(entry['published']) entries.append(entry) description = 'Confirm deletion of a bookmark' message = f'Details for bookmark {entries[0]["link"]}' template_file = 'browse.xhtml' template_dict = { 'request' : request, 'delete' : True, 'description' : description, 'entries' : entries, 'jabber_id' : jabber_id, 'journal' : journal, 'message' : message, 'node_id' : node_id, 'param_hash' : param_hash, 'path' : path, 'pubsub_jid' : jabber_id_pubsub, 'syndicate' : syndicate} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' else: logger.info(f'{function_name},{request_url_path},Redirecting to /jid/{jabber_id}') response = RedirectResponse(url='/jid/' + jabber_id) else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) else: message = 'Blasta system message » Error: MD5 message-digest algorithm.' description = 'The argument for URL does not appear to be a valid MD5 Checksum' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/url/{url_hash}/delete') async def url_hash_delete_get(request: Request, url_hash): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) node_id = f'hash:{url_hash}' param_hash = url_hash syndicate = path = 'url' if len(url_hash) == 32: if jabber_id: xmpp_instance = accounts[jabber_id] exist = False for node_type in nodes: node_id = nodes[node_type]['name'] iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, node_id, url_hash) if isinstance(iq, Iq): # TODO If URL exist in visitor's bookmarks, display its properties (summary, tags title etc.) before data of others. iq_item = iq['pubsub']['items']['item'] item_payload = iq_item['payload'] if item_payload: exist = True break else: message = f'XMPP system message » {iq}.' if iq == 'Node not found': description = 'An error has occurred' else: description = 'An unknown error has occurred' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) return response if exist: # TODO Add a check: if iq_item['id'] == url_hash: entries = [] entry = UtilitiesSyndication.extract_items(item_payload) instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash) entry['instances'] = instances entry['jid'] = jabber_id name = jabber_id.split('@')[0] entry['name'] = name entry['url_hash'] = url_hash entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(entry['published']) entries.append(entry) # Set a title description = 'A bookmark has been deleted' # Set a message message = f'Details for bookmark {entry["link"]}' # Create a link to restore bookmark link_save = ('/save?url=' + urllib.parse.quote(entry['link']) + '&title=' + urllib.parse.quote(entry['title']) + '&summary=' + urllib.parse.quote(entry['summary']) + '&tags=' + urllib.parse.quote(','.join(entry['tags']))) # Remove the item from node xmpp_instance = accounts[jabber_id] await XmppPubsub.del_node_item(xmpp_instance, jabber_id, node_id, url_hash) # Remove the item association from database await DatabaseSQLite.delete_combination_row_by_jid_and_url_hash(db_file, url_hash, jabber_id) #await DatabaseSQLite.delete_combination_row_by_url_hash_and_tag_and_jid(db_file, url_hash, entry['tags'], jabber_id) # Remove the item from cache UtilitiesData.remove_item_from_cache(directory_cache, jabber_id, node_type, url_hash) template_file = 'browse.xhtml' template_dict = { 'request' : request, 'description' : description, 'entries' : entries, 'jabber_id' : jabber_id, 'journal' : journal, 'link_save' : link_save, 'message' : message, 'node_id' : node_id, 'param_hash' : param_hash, 'path' : path, 'pubsub_jid' : jabber_id_pubsub, 'restore' : True, 'syndicate' : syndicate} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' else: logger.info(f'{function_name},{request_url_path},Redirecting to /jid/{jabber_id}') response = RedirectResponse(url='/jid/' + jabber_id) else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) else: message = 'Blasta system message » Error: MD5 message-digest algorithm.' description = 'The argument for URL does not appear to be a valid MD5 Checksum' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) logger.debug(f'{function_name},{request_url_path},Finish') return response @self.app.get('/url/{url_hash}/edit') @self.app.post('/url/{url_hash}/edit') async def url_hash_edit_get(request: Request, url_hash): request_url_path = request.url.path function_name = sys._getframe().f_code.co_name logger.debug(f'{function_name},{request_url_path},Start') jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request) # node_id = f'hash:{url_hash}' if len(url_hash) == 32: if jabber_id: xmpp_instance = accounts[jabber_id] exist = False for node in nodes: node_id = nodes[node]['name'] iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, node_id, url_hash) if isinstance(iq, Iq): name = jabber_id.split('@')[0] iq_item = iq['pubsub']['items']['item'] # TODO Add a check: if iq_item['id'] == url_hash: # Is this valid entry['url_hash'] = iq['id'] or should it be iq_item['id'] entry = None item_payload = iq_item['payload'] if item_payload: exist = True break else: message = f'XMPP system message » {iq}.' if iq == 'Node not found': description = 'An error has occurred' else: description = 'An unknown error has occurred' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) return response if exist: path = 'edit' description = 'Edit an existing bookmark' entry = UtilitiesSyndication.extract_items(item_payload) entry['instances'] = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash) print(jabber_id) print(entry['tags']) else: # TODO Consider redirect to path /save (function save_get) # NOTE This seems to be the best to do, albeit, perhaps the pathname should be /save instead of /url/hash/edit. path = 'save' # 'add' description = 'Add a new bookmark' result = DatabaseSQLite.get_entry_by_url_hash(db_file, url_hash) tags_sorted = [] if result: for tag in DatabaseSQLite.get_tags_by_entry_id(db_file, result[0]): tags_sorted.append(tag[0]) entry = {'title' : result[3], 'link' : result[2], 'summary' : result[4], 'published' : result[6], 'updated' : result[7], 'tags' : tags_sorted} #'instances' : result[8], #'jid' = jabber_id, #'name' : name, #'url_hash' : url_hash if entry: entry['jid'] = jabber_id entry['name'] = name entry['url_hash'] = url_hash template_file = 'edit.xhtml' template_dict = { 'request' : request, 'description' : description, 'edit' : True, 'jabber_id' : jabber_id, 'journal' : journal, 'node' : node, 'path' : path, 'published' : entry['published'], 'summary' : entry['summary'], 'tags' : ', '.join(entry['tags']), 'title' : entry['title'], 'url' : entry['link'], 'url_hash' : url_hash} response = templates.TemplateResponse(template_file, template_dict) response.headers['Content-Type'] = 'application/xhtml+xml' else: message = f'XMPP system message » {iq}.' if iq == 'Node not found': description = 'An error has occurred' else: description = 'An unknown error has occurred' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) else: message = 'Blasta system message » Error: No active session.' description = 'You are not connected' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) else: message = 'Blasta system message » Error: MD5 message-digest algorithm.' description = 'The argument for URL does not appear to be a valid MD5 Checksum' path = 'error' logger.error(f'{function_name},{request_url_path},{description}') response = result_post(request, jabber_id, description, message, path) logger.debug(f'{function_name},{request_url_path},Finish') return response