forked from sch/Blasta
2306 lines
122 KiB
Python
2306 lines
122 KiB
Python
#!/usr/bin/python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import asyncio
|
|
from blasta.config import Cache, Settings, Share
|
|
from blasta.database.sqlite import DatabaseSQLite
|
|
from blasta.utilities.cryptography import UtilitiesCryptography
|
|
from blasta.utilities.data import UtilitiesData
|
|
from blasta.utilities.date import UtilitiesDate
|
|
from blasta.utilities.http import UtilitiesHttp
|
|
from blasta.utilities.syndication import UtilitiesSyndication
|
|
from blasta.xmpp.form import DataForm
|
|
from blasta.xmpp.instance import XmppInstance
|
|
from blasta.xmpp.pubsub import XmppPubsub
|
|
from datetime import datetime
|
|
from fastapi import Cookie, FastAPI, File, Form, HTTPException, Request, Response, UploadFile
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import FileResponse, HTMLResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
import os
|
|
import random
|
|
from slixmpp.stanza.iq import Iq
|
|
from starlette.responses import RedirectResponse
|
|
import urllib.parse
|
|
import xml.etree.ElementTree as ET
|
|
|
|
class HttpInstance:
|
|
|
|
def __init__(self, accounts, sessions):
|
|
|
|
directory_cache = Cache.get_directory()
|
|
directory_cache_data = os.path.join(directory_cache, 'data')
|
|
directory_cache_export = os.path.join(directory_cache, 'export')
|
|
directory_cache_items = os.path.join(directory_cache, 'items')
|
|
|
|
self.directory_cache = directory_cache
|
|
|
|
directory_data = Share.get_directory()
|
|
directory_data_graphic = os.path.join(directory_data, 'graphic')
|
|
directory_data_graphic_ico = os.path.join(directory_data_graphic, 'blasta.ico')
|
|
directory_data_graphic_svg = os.path.join(directory_data_graphic, 'blasta.svg')
|
|
directory_data_script = os.path.join(directory_data, 'script')
|
|
directory_data_stylesheet = os.path.join(directory_data, 'stylesheet')
|
|
directory_data_template = os.path.join(directory_data, 'template')
|
|
#filename_database = os.path.join(directory_data, 'main.sqlite')
|
|
db_file = os.path.join(directory_data, 'main.sqlite')
|
|
|
|
self.app = FastAPI()
|
|
templates = Jinja2Templates(directory=directory_data_template)
|
|
|
|
self.app.mount('/data', StaticFiles(directory=directory_cache_data), name='data')
|
|
self.app.mount('/export', StaticFiles(directory=directory_cache_export), name='export')
|
|
self.app.mount('/graphic', StaticFiles(directory=directory_data_graphic), name='graphic')
|
|
self.app.mount('/script', StaticFiles(directory=directory_data_script), name='script')
|
|
self.app.mount('/stylesheet', StaticFiles(directory=directory_data_stylesheet), name='stylesheet')
|
|
|
|
directory_settings = Settings.get_directory()
|
|
filename_settings = os.path.join(directory_settings, 'settings.toml')
|
|
|
|
data = UtilitiesData.open_file_toml(filename_settings)
|
|
|
|
contacts = data['contacts']
|
|
contact_email = contacts['email']
|
|
contact_irc_channel = contacts['irc_channel']
|
|
contact_irc_server = contacts['irc_server']
|
|
contact_mix = contacts['mix']
|
|
contact_muc = contacts['muc']
|
|
contact_xmpp = contacts['xmpp']
|
|
|
|
settings = data['settings']
|
|
|
|
jabber_id_pubsub = settings['pubsub']
|
|
journal = settings['journal']
|
|
|
|
node_id_public = settings['node_id']
|
|
node_title_public = settings['node_title']
|
|
node_subtitle_public = settings['node_subtitle']
|
|
|
|
node_id_private = settings['node_id_private']
|
|
node_title_private = settings['node_title_private']
|
|
node_subtitle_private = settings['node_subtitle_private']
|
|
|
|
node_id_read = settings['node_id_read']
|
|
node_title_read = settings['node_title_read']
|
|
node_subtitle_read = settings['node_subtitle_read']
|
|
|
|
nodes = {
|
|
'public' : {
|
|
'name' : node_id_public,
|
|
'title' : node_title_public,
|
|
'subtitle' : node_subtitle_public,
|
|
'access_model' : 'presence'},
|
|
'private' : {
|
|
'name' : node_id_private,
|
|
'title' : node_title_private,
|
|
'subtitle' : node_subtitle_private,
|
|
'access_model' : 'whitelist'},
|
|
'read' : {
|
|
'name' : node_id_read,
|
|
'title' : node_title_read,
|
|
'subtitle' : node_subtitle_read,
|
|
'access_model' : 'whitelist'}
|
|
}
|
|
|
|
origins = [
|
|
"http://localhost",
|
|
"http://localhost:8080",
|
|
"http://127.0.0.1",
|
|
"http://127.0.0.1:8080",
|
|
]
|
|
|
|
self.app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=origins,
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# This is workaround for setting a "cookie" issue
|
|
# It appears that there is a problem to set or send a "cookie" when a template is returned.
|
|
@self.app.middleware('http')
|
|
async def middleware_handler(request: Request, call_next):
|
|
|
|
# Handle URL query
|
|
if request.url.path != '/save':
|
|
param_url = request.query_params.get('url', '') or None
|
|
param_hash = request.query_params.get('hash', '') or None
|
|
if param_hash:
|
|
return RedirectResponse(url='/url/' + param_hash)
|
|
if param_url:
|
|
url_hash = UtilitiesCryptography.hash_url_to_md5(param_url)
|
|
return RedirectResponse(url='/url/' + url_hash)
|
|
|
|
response = await call_next(request)
|
|
jabber_id = session_key = None
|
|
|
|
infoo = {
|
|
'accounts' : accounts,
|
|
'sessions' : sessions
|
|
}
|
|
print(infoo)
|
|
|
|
# Handle credentials (i.e. so called "cookies")
|
|
if request.url.path == '/disconnect':
|
|
jid = request.cookies.get('jabber_id')
|
|
if jid in accounts: del accounts[jid]
|
|
if jid in sessions: del sessions[jid]
|
|
response.delete_cookie('session_key')
|
|
response.delete_cookie('jabber_id')
|
|
else:
|
|
try:
|
|
# Access the variable from the request state
|
|
jabber_id = request.app.state.jabber_id
|
|
except Exception as e:
|
|
print(request.cookies.get('jabber_id'))
|
|
print(e)
|
|
pass
|
|
try:
|
|
# Access the variable from the request state
|
|
session_key = request.app.state.session_key
|
|
except Exception as e:
|
|
print(request.cookies.get('session_key'))
|
|
print(e)
|
|
pass
|
|
if jabber_id and session_key:
|
|
print(['Establishing a sessiong for:', jabber_id, session_key])
|
|
response.set_cookie(key='jabber_id', value=jabber_id)
|
|
response.set_cookie(key='session_key', value=session_key)
|
|
# del request.app.state.jabber_id
|
|
# del request.app.state.session_key
|
|
request.app.state.jabber_id = request.app.state.session_key = None
|
|
return response
|
|
|
|
# response.set_cookie(key='session', value=str(jid) + '/' + str(session_key))
|
|
# response.set_cookie(key='session',
|
|
# value=jid + '/' + session_key,
|
|
# expires=datetime.now().replace(tzinfo=timezone.utc) + timedelta(days=30),
|
|
# max_age=3600,
|
|
# domain='localhost',
|
|
# path='/',
|
|
# secure=True,
|
|
# httponly=False, # True
|
|
# samesite='lax')
|
|
|
|
@self.app.exception_handler(401)
|
|
def not_authorized_exception_handler(request: Request, exc: HTTPException):
|
|
#jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
message = 'Not Authorized.'
|
|
description = 'Not Authorized (401)'
|
|
return result_post(request, description, message)
|
|
|
|
@self.app.exception_handler(403)
|
|
def access_denied_exception_handler(request: Request, exc: HTTPException):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
message = 'Blasta system message » Access denied.'
|
|
description = 'Access denied (403)'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
|
|
@self.app.exception_handler(404)
|
|
def not_found_exception_handler(request: Request, exc: HTTPException):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
message = 'Blasta system message » Not Found.'
|
|
description = 'Not found (404)'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
|
|
@self.app.exception_handler(405)
|
|
def not_allowed_exception_handler(request: Request, exc: HTTPException):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
message = 'Blasta system message » Method Not Allowed.'
|
|
description = 'Not allowed (405)'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
|
|
@self.app.exception_handler(500)
|
|
def internal_error_exception_handler(request: Request, exc: HTTPException):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
message = 'Blasta system message » Internal Server Error.'
|
|
description = 'Internal error (500)'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
|
|
# TODO
|
|
@self.app.get('/admin')
|
|
def admin_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
authorized = None
|
|
if authorized:
|
|
template_file = 'connect.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'journal' : journal}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
else:
|
|
raise HTTPException(status_code=403, detail='Access denied')
|
|
|
|
@self.app.get('/connect')
|
|
def connect_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
if jabber_id:
|
|
response = RedirectResponse(url='/jid/' + jabber_id)
|
|
else:
|
|
template_file = 'connect.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'journal' : journal}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/contact')
|
|
def contact_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
template_file = 'contact.xhtml'
|
|
template_dict = {
|
|
'contact_email' : contact_email,
|
|
'contact_irc_channel' : contact_irc_channel,
|
|
'contact_irc_server' : contact_irc_server,
|
|
'contact_mix' : contact_mix,
|
|
'contact_muc' : contact_muc,
|
|
'contact_xmpp' : contact_xmpp,
|
|
'request' : request,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/disconnect')
|
|
def disconnect_get(request: Request,
|
|
response: Response,
|
|
jabber_id: str = Cookie(None),
|
|
session_key: str = Cookie(None)):
|
|
# response.set_cookie(max_age=0, value='', key='jabber_id')
|
|
# response.set_cookie(max_age=0, value='', key='session_key')
|
|
response = RedirectResponse(url='/')
|
|
response.delete_cookie('session_key')
|
|
response.delete_cookie('jabber_id')
|
|
return response
|
|
|
|
@self.app.get('/favicon.ico', include_in_schema=False)
|
|
def favicon_get():
|
|
return FileResponse(directory_data_graphic_ico)
|
|
|
|
@self.app.get('/help')
|
|
def help_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
template_file = 'help.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/help/about')
|
|
def help_about_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
template_file = 'about.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/help/about/folksonomy')
|
|
def help_about_folksonomies_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
template_file = 'folksonomy.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/help/about/ideas')
|
|
def help_about_ideas_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
protocol = request.url.scheme
|
|
hostname = request.url.hostname + ':' + str(request.url.port)
|
|
origin = protocol + '://' + hostname
|
|
template_file = 'ideas.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal,
|
|
'origin' : origin}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/help/about/philosophy')
|
|
def help_about_philosophy_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
template_file = 'philosophy.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/help/about/projects')
|
|
def help_about_projects_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
template_file = 'projects.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/help/about/software')
|
|
def help_about_software_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
template_file = 'software.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/help/about/thanks')
|
|
def help_about_thanks_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
template_file = 'thanks.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/help/about/xmpp')
|
|
def help_about_xmpp_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
template_file = 'xmpp.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/help/about/xmpp/atomsub')
|
|
def help_about_xmpp_atomsub_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
template_file = 'atomsub.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/help/about/xmpp/libervia')
|
|
def help_about_xmpp_libervia_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
template_file = 'libervia.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/help/about/xmpp/movim')
|
|
def help_about_xmpp_movim_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
template_file = 'movim.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/help/about/xmpp/pubsub')
|
|
def help_about_xmpp_pubsub_get(request: Request):
|
|
date_now_iso = datetime.now().isoformat()
|
|
date_now_readable = UtilitiesDate.convert_iso8601_to_readable(date_now_iso)
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
template_file = 'pubsub.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'date_now_iso' : date_now_iso,
|
|
'date_now_readable' : date_now_readable,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/help/feeds')
|
|
def help_about_feeds_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
template_file = 'feeds.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/help/questions')
|
|
def help_questions_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
template_file = 'questions.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/help/syndication')
|
|
def help_syndication_get(request: Request):
|
|
hostname = request.url.hostname + ':' + str(request.url.port)
|
|
protocol = request.url.scheme
|
|
origin = protocol + '://' + hostname
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
template_file = 'syndication.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal,
|
|
'origin' : origin,
|
|
'pubsub_jid' : jabber_id_pubsub}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/help/utilities')
|
|
def help_utilities_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
hostname = request.url.hostname
|
|
protocol = request.url.scheme
|
|
hostname = request.url.netloc
|
|
origin = protocol + '://' + hostname
|
|
bookmarklet = 'location.href=`' + origin + '/save?url=${encodeURIComponent(window.location.href)}&title=${encodeURIComponent(document.title)}`;'
|
|
template_file = 'utilities.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'bookmarklet' : bookmarklet,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/jid', response_class=HTMLResponse)
|
|
@self.app.post('/jid')
|
|
async def jid_get(request: Request, response : Response):
|
|
node_type = 'public'
|
|
path = 'jid'
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
if jabber_id:
|
|
xmpp_instance = accounts[jabber_id]
|
|
node_id = nodes[node_type]['name']
|
|
result, reason = await UtilitiesData.update_cache_and_database(
|
|
db_file, directory_cache, xmpp_instance, jabber_id, node_type, node_id)
|
|
if result == 'error':
|
|
message = 'XMPP system message » {}.'.format(reason)
|
|
description = 'IQ Error'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
else:
|
|
response = await jid_main_get(request, node_type, path, jid=jabber_id)
|
|
return response
|
|
else:
|
|
description = 'An XMPP account is required'
|
|
message = 'Blasta system message » Please connect with your XMPP account to view this directory.'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
|
|
@self.app.get('/jid/{jid}')
|
|
@self.app.post('/jid/{jid}')
|
|
async def jid_jid_get(request: Request, response : Response, jid):
|
|
response = await jid_main_get(request, node_type='public', path='jid', jid=jid)
|
|
return response
|
|
|
|
async def jid_main_get(request: Request, node_type=None, path=None, jid=None):
|
|
ask = invite = name = origin = start = ''
|
|
# pubsub_jid = syndicate = jid
|
|
# message = 'Find and share bookmarks with family and friends!'
|
|
# description = 'Bookmarks of {}'.format(jid)
|
|
max_count = 10
|
|
entries = None
|
|
related_tags = None
|
|
tags_dict = None
|
|
param_filetype = request.query_params.get('filetype', '') or None
|
|
param_mode = request.query_params.get('mode', '') or None
|
|
param_page = request.query_params.get('page', '') or None
|
|
param_protocol = request.query_params.get('protocol', '') or None
|
|
param_query = request.query_params.get('q', '') or None
|
|
if param_query: param_query = param_query.strip()
|
|
param_tags = request.query_params.get('tags', '') or None
|
|
param_tld = request.query_params.get('tld', '') or None
|
|
if param_page and param_mode != 'feed':
|
|
try:
|
|
page = int(param_page)
|
|
page_next = page + 1
|
|
page_prev = page - 1
|
|
except:
|
|
page = 1
|
|
page_next = 2
|
|
else:
|
|
page = 1
|
|
page_next = 2
|
|
page_prev = page - 1
|
|
index_first = (page - 1)*10
|
|
index_last = index_first+10
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
if jabber_id == jid or node_type in ('private', 'read'):
|
|
xmpp_instance = accounts[jabber_id]
|
|
# NOTE You need something other than an iterator (XEP-0059).
|
|
# You need a PubSub key that would hold tags.
|
|
filename_items = os.path.join(directory_cache, 'items', jabber_id + '.toml')
|
|
# NOTE Does it work?
|
|
# It does not seem to actually filter tags.
|
|
# NOTE Yes. It does work.
|
|
# See function "cache_items_and_tags_filter".
|
|
if param_query:
|
|
query = param_query
|
|
entries_cache = UtilitiesData.open_file_toml(filename_items)
|
|
entries_cache_node = entries_cache[node_type]
|
|
filename_cache = os.path.join(directory_cache, 'data', jid + '_query.toml')
|
|
UtilitiesData.cache_items_and_tags_search(directory_cache, entries_cache_node, jid, query)
|
|
if os.path.exists(filename_cache) and os.path.getsize(filename_cache):
|
|
data = UtilitiesData.open_file_toml(filename_cache)
|
|
item_ids_all = data['item_ids']
|
|
related_tags = data['tags']
|
|
if len(item_ids_all) <= index_last:
|
|
index_last = len(item_ids_all)
|
|
page_next = None
|
|
item_ids_selection = []
|
|
for item_id in item_ids_all[index_first:index_last]:
|
|
item_ids_selection.append(item_id)
|
|
entries = []
|
|
for entry in entries_cache_node:
|
|
for item_id in item_ids_selection:
|
|
if entry['url_hash'] == item_id:
|
|
entries.append(entry)
|
|
for entry in entries:
|
|
entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(entry['published'])
|
|
entry['tags'] = entry['tags'][:5]
|
|
description = 'Your {} bookmarks with "{}"'.format(node_type, query)
|
|
message = 'Listing {} bookmarks {} - {} out of {}.'.format(node_type, index_first+1, index_last, len(item_ids_all))
|
|
#item_id_next = entries[len(entries)-1]
|
|
else:
|
|
description = 'No {} bookmarks with "{}" were found for {}'.format(node_type, query, jid)
|
|
message = 'Blasta system message » No entries.'
|
|
page_next = None
|
|
page_prev = None
|
|
elif param_tags or param_tld or param_filetype or param_protocol:
|
|
tags_list = param_tags.split('+')
|
|
if len(tags_list) == 1:
|
|
tag = param_tags
|
|
entries_cache = UtilitiesData.open_file_toml(filename_items)
|
|
entries_cache_node = entries_cache[node_type]
|
|
filename_cache = os.path.join(directory_cache, 'data', jid, tag + '.toml')
|
|
UtilitiesData.cache_items_and_tags_filter(directory_cache, entries_cache_node, jid, tag)
|
|
if os.path.exists(filename_cache) and os.path.getsize(filename_cache):
|
|
data = UtilitiesData.open_file_toml(filename_cache)
|
|
item_ids_all = data['item_ids']
|
|
related_tags = data['tags']
|
|
if len(item_ids_all) <= index_last:
|
|
index_last = len(item_ids_all)
|
|
page_next = None
|
|
item_ids_selection = []
|
|
for item_id in item_ids_all[index_first:index_last]:
|
|
item_ids_selection.append(item_id)
|
|
entries = []
|
|
for entry in entries_cache_node:
|
|
for item_id in item_ids_selection:
|
|
if entry['url_hash'] == item_id:
|
|
entries.append(entry)
|
|
for entry in entries:
|
|
entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(entry['published'])
|
|
entry['tags'] = entry['tags'][:5]
|
|
description = 'Your {} bookmarks tagged with "{}"'.format(node_type, tag)
|
|
message = 'Listing {} bookmarks {} - {} out of {}.'.format(node_type, index_first+1, index_last, len(item_ids_all))
|
|
#item_id_next = entries[len(entries)-1]
|
|
else:
|
|
description = 'No {} bookmarks tagged with "{}" were found for {}'.format(node_type, tag, jid)
|
|
message = 'Blasta system message » No entries.'
|
|
page_next = None
|
|
page_prev = None
|
|
elif len(tag_list) > 1:
|
|
pass #TODO Support multiple tags
|
|
# if not param_tags and not param_tld and not param_filetype and not param_protocol and not param_url and not param_hash:
|
|
else:
|
|
name = jabber_id.split('@')[0]
|
|
entries_cache = UtilitiesData.open_file_toml(filename_items)
|
|
entries_cache_node = entries_cache[node_type]
|
|
filename_cache = os.path.join(directory_cache, 'data', jabber_id + '.toml')
|
|
#if len(entries_cache_node) and not os.path.exists(filename_cache):
|
|
UtilitiesData.cache_items_and_tags(directory_cache, entries_cache_node, jabber_id)
|
|
if os.path.exists(filename_cache) and os.path.getsize(filename_cache):
|
|
data = UtilitiesData.open_file_toml(filename_cache)
|
|
item_ids_all = data['item_ids']
|
|
related_tags = data['tags']
|
|
if len(item_ids_all) <= index_last:
|
|
index_last = len(item_ids_all)
|
|
page_next = None
|
|
item_ids_selection = []
|
|
for item_id in item_ids_all[index_first:index_last]:
|
|
item_ids_selection.append(item_id)
|
|
entries = []
|
|
for entry in entries_cache_node:
|
|
for item_id in item_ids_selection:
|
|
if entry['url_hash'] == item_id:
|
|
entries.append(entry)
|
|
for entry in entries:
|
|
entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(entry['published'])
|
|
entry['tags'] = entry['tags'][:5]
|
|
description = 'Your {} bookmarks'.format(node_type)
|
|
message = 'Listing {} bookmarks {} - {} out of {}.'.format(node_type, index_first+1, index_last, len(item_ids_all))
|
|
#item_id_next = entries[len(entries)-1]
|
|
else:
|
|
description = 'Your bookmarks directory appears to be empty'
|
|
message = 'Blasta system message » Zero count.'
|
|
start = True
|
|
elif jabber_id in accounts:
|
|
# NOTE Keep this IQ function call as an exception.
|
|
# If one wants to see contents of someone else, an
|
|
# authorization is required.
|
|
# NOTE It might be wiser to use cached items or item identifiers
|
|
# provided that the viewer is authorized to view items.
|
|
xmpp_instance = accounts[jabber_id]
|
|
tags_dict = {}
|
|
if param_query:
|
|
description = 'Bookmarks from {} with "{}"'.format(jid, param_query)
|
|
entries_database = DatabaseSQLite.get_entries_by_jid_and_query(db_file, jid, param_query, index_first)
|
|
entries_count = DatabaseSQLite.get_entries_count_by_jid_and_query(db_file, jid, param_query)
|
|
for tag, instances in DatabaseSQLite.get_30_tags_by_jid_and_query(db_file, jid, param_query, index_first):
|
|
tags_dict[tag] = instances
|
|
elif param_tags:
|
|
description = 'Bookmarks from {} tagged with "{}"'.format(jid, param_tags)
|
|
entries_database = DatabaseSQLite.get_entries_by_jid_and_tag(db_file, jid, param_tags, index_first)
|
|
entries_count = DatabaseSQLite.get_entries_count_by_jid_and_tag(db_file, jid, param_tags)
|
|
for tag, instances in DatabaseSQLite.get_30_tags_by_jid_and_tag(db_file, jid, param_tags, index_first):
|
|
tags_dict[tag] = instances
|
|
else:
|
|
description = 'Bookmarks from {}'.format(jid)
|
|
entries_database = DatabaseSQLite.get_entries_by_jid(db_file, jid, index_first)
|
|
entries_count = DatabaseSQLite.get_entries_count_by_jid(db_file, jid)
|
|
for tag, instances in DatabaseSQLite.get_30_tags_by_jid(db_file, jid, index_first):
|
|
tags_dict[tag] = instances
|
|
if not entries_database:
|
|
#message = 'Blasta system message » Error: No entries were found.'
|
|
#description = 'No results'
|
|
#path = 'error'
|
|
#return result_post(request, jabber_id, description, message, path)
|
|
raise HTTPException(status_code=404, detail='No entries were found')
|
|
if entries_count:
|
|
entries = []
|
|
for entry in entries_database:
|
|
tags_sorted = []
|
|
for tag in DatabaseSQLite.get_tags_by_entry_id(db_file, entry[0]):
|
|
tags_sorted.append(tag[0])
|
|
entry_jid = DatabaseSQLite.get_jid_by_jid_id(db_file, entry[5])
|
|
url_hash = UtilitiesCryptography.hash_url_to_md5(entry[2])
|
|
instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash)
|
|
entries.append(
|
|
{'title' : entry[3],
|
|
'link' : entry[2],
|
|
'summary' : entry[4],
|
|
'published' : entry[6],
|
|
'updated' : entry[7],
|
|
'tags' : tags_sorted,
|
|
'url_hash' : url_hash,
|
|
'jid' : entry_jid,
|
|
'name' : entry_jid, # jid.split('@')[0] if '@' in jid else jid,
|
|
'instances' : instances})
|
|
for entry in entries:
|
|
try:
|
|
date_iso = entry['published']
|
|
date_wrt = UtilitiesDate.convert_iso8601_to_readable(date_iso)
|
|
entry['published_mod'] = date_wrt
|
|
except:
|
|
print('ERROR: Probably due to an attempt to convert a non ISO 8601.')
|
|
print(entry['published'])
|
|
print(entry['published_mod'])
|
|
print(entry)
|
|
index_last = index_first+len(entries_database)
|
|
if entries_count <= index_last:
|
|
index_last = entries_count
|
|
page_next = None
|
|
message = 'Listing bookmarks {} - {} out of {}.'.format(index_first+1, index_last, entries_count)
|
|
else:
|
|
# TODO Check permission, so there is no unintended continuing to cached data which is not authorized for.
|
|
iq = await XmppPubsub.get_node_item_ids(xmpp_instance, jid, node_id_public)
|
|
if isinstance(iq, Iq):
|
|
iq_items_remote = iq['disco_items']
|
|
|
|
# Cache a list of identifiers of node items to a file.
|
|
iq_items_remote_name = []
|
|
for iq_item_remote in iq_items_remote:
|
|
iq_item_remote_name = iq_item_remote['name']
|
|
iq_items_remote_name.append(iq_item_remote_name)
|
|
|
|
#data_item_ids = {'iq_items' : iq_items_remote_name}
|
|
#filename_item_ids = 'item_ids/' + jid + '.toml'
|
|
#Data.save_to_toml(filename_item_ids, data_item_ids)
|
|
|
|
item_ids_all = iq_items_remote_name
|
|
#item_ids_all = data['item_ids']
|
|
#related_tags = data['tags']
|
|
if len(item_ids_all) <= index_last:
|
|
page_next = None
|
|
index_last = len(item_ids_all)
|
|
item_ids_selection = []
|
|
for item_id in item_ids_all[index_first:index_last]:
|
|
item_ids_selection.append(item_id)
|
|
|
|
iq = await XmppPubsub.get_node_items(xmpp_instance, jid, node_id_public, item_ids_selection)
|
|
entries = UtilitiesData.extract_iq_items_extra(db_file, iq, jid)
|
|
if entries:
|
|
for entry in entries:
|
|
entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(entry['published'])
|
|
message = 'Listing bookmarks {} - {} out of {}.'.format(index_first+1, index_last, len(item_ids_all))
|
|
description = 'Bookmarks from {}'.format(jid)
|
|
else:
|
|
message = 'Blasta system message » Zero count.'
|
|
description = 'Bookmarks directory appears to be empty'
|
|
invite = True
|
|
else:
|
|
message = 'XMPP system message » {}.'.format(iq)
|
|
name = jid.split('@')[0]
|
|
path = 'error'
|
|
if not iq:
|
|
message = 'XMPP system message » Empty.'
|
|
description = 'An unknown error has occurred'
|
|
invite = True
|
|
elif iq == 'Item not found':
|
|
description = 'Bookmarks directory appears to be empty'
|
|
invite = True
|
|
elif iq == 'forbidden':
|
|
description = 'Access forbidden'
|
|
elif iq == 'item-not-found':
|
|
description = 'Jabber ID does not appear to be exist'
|
|
elif iq == 'not-authorized':
|
|
description = 'You have no authorization to view ' + name + '\'s bookmarks.'
|
|
|
|
ask = True
|
|
elif iq == 'Node not found':
|
|
description = name + '\'s bookmarks directory appears to be empty.'
|
|
invite = True
|
|
elif 'DNS lookup failed' in iq:
|
|
domain = jid.split('@')[1] if '@' in jid else jid
|
|
description = 'Blasta could not connect to server {}'.format(domain)
|
|
elif iq == 'Connection failed: connection refused':
|
|
description = 'Connection with ' + name + ' has been refused'
|
|
elif 'Timeout' in iq or 'timeout' in iq:
|
|
description = 'Connection with ' + name + ' has been timed out'
|
|
else:
|
|
breakpoint()
|
|
description = 'An unknown error has occurred'
|
|
if invite:
|
|
hostname = request.url.hostname + ':' + str(request.url.port)
|
|
protocol = request.url.scheme
|
|
origin = protocol + '://' + hostname
|
|
template_file = 'ask.xhtml'
|
|
template_dict = {
|
|
'request': request,
|
|
'ask' : ask,
|
|
'alias' : jabber_id.split('@')[0],
|
|
'description': description,
|
|
'invite' : invite,
|
|
'jabber_id': jabber_id,
|
|
'jid': jid,
|
|
'journal': journal,
|
|
'message': message,
|
|
'name': name,
|
|
'origin': origin,
|
|
'path': path}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
else:
|
|
description = 'An XMPP account is required'
|
|
message = 'Blasta system message » Please connect with your XMPP account to view this directory.'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
#if not entries: raise HTTPException(status_code=404, detail='No entries were found')
|
|
template_dict = {
|
|
'request': request,
|
|
'description': description,
|
|
'entries': entries,
|
|
'jabber_id': jabber_id,
|
|
'jid': jid,
|
|
'journal': journal,
|
|
'message': message,
|
|
'node_type': node_type,
|
|
'page_next': page_next,
|
|
'page_prev': page_prev,
|
|
'pager' : True,
|
|
'param_query' : param_query,
|
|
'param_tags': param_tags,
|
|
'path': path,
|
|
'pubsub_jid': jid,
|
|
'node_id': nodes[node_type]['name'],
|
|
'start': start,
|
|
'syndicate': jid,
|
|
'tags' : tags_dict or related_tags or ''}
|
|
if param_mode == 'feed':
|
|
template_file = 'browse.atom'
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers["Content-Type"] = "application/xml"
|
|
else:
|
|
template_file = 'browse.xhtml'
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/blasta.svg')
|
|
def logo_get():
|
|
return FileResponse(directory_data_graphic_svg)
|
|
|
|
@self.app.get('/', response_class=HTMLResponse)
|
|
@self.app.get('/new', response_class=HTMLResponse)
|
|
async def root_get_new(request: Request, response : Response):
|
|
response = await root_main_get(request, response, page_type='new')
|
|
return response
|
|
|
|
@self.app.get('/popular', response_class=HTMLResponse)
|
|
async def root_get_popular(request: Request, response : Response):
|
|
response = await root_main_get(request, response, page_type='popular')
|
|
return response
|
|
|
|
@self.app.get('/query', response_class=HTMLResponse)
|
|
async def root_get_query(request: Request, response : Response):
|
|
response = await root_main_get(request, response, page_type='query')
|
|
return response
|
|
|
|
@self.app.get('/recent', response_class=HTMLResponse)
|
|
async def root_get_recent(request: Request, response : Response):
|
|
response = await root_main_get(request, response, page_type='recent')
|
|
return response
|
|
|
|
async def root_main_get(request: Request, response : Response, page_type=None):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
node_id = path = syndicate = page_type
|
|
param_filetype = request.query_params.get('filetype', '') or None
|
|
param_mode = request.query_params.get('mode', '') or None
|
|
param_page = request.query_params.get('page', '') or None
|
|
param_protocol = request.query_params.get('protocol', '') or None
|
|
param_query = request.query_params.get('q', '') or None
|
|
if param_query: param_query = param_query.strip()
|
|
param_tags = request.query_params.get('tags', '') or None
|
|
param_tld = request.query_params.get('tld', '') or None
|
|
if param_page and param_mode != 'feed':
|
|
try:
|
|
page = int(param_page)
|
|
page_next = page + 1
|
|
page_prev = page - 1
|
|
except:
|
|
page = 1
|
|
page_next = 2
|
|
else:
|
|
page = 1
|
|
page_next = 2
|
|
page_prev = page - 1
|
|
index_first = (page - 1)*10
|
|
if param_tags or param_tld or param_filetype or param_protocol:
|
|
entries_count = DatabaseSQLite.get_entries_count_by_tag(db_file, param_tags)
|
|
match page_type:
|
|
case 'new':
|
|
description = 'New bookmarks tagged with "{}"'.format(param_tags)
|
|
entries_database = DatabaseSQLite.get_entries_new_by_tag(db_file, param_tags, index_first)
|
|
tags_of_entries = DatabaseSQLite.get_30_tags_by_entries_new_by_tag(db_file, param_tags, index_first)
|
|
case 'popular':
|
|
description = 'Popular bookmarks tagged with "{}"'.format(param_tags) # 'Most popular'
|
|
entries_database = DatabaseSQLite.get_entries_popular_by_tag(db_file, param_tags, index_first)
|
|
tags_of_entries = DatabaseSQLite.get_30_tags_by_entries_popular_by_tag(db_file, param_tags, index_first)
|
|
case 'recent':
|
|
description = 'Recent bookmarks tagged with "{}"'.format(param_tags)
|
|
entries_database = DatabaseSQLite.get_entries_recent_by_tag(db_file, param_tags, index_first)
|
|
tags_of_entries = DatabaseSQLite.get_30_tags_by_entries_recent_by_tag(db_file, param_tags, index_first)
|
|
# TODO case 'query':
|
|
else:
|
|
match page_type:
|
|
case 'new':
|
|
description = 'New bookmarks'
|
|
entries_database = DatabaseSQLite.get_entries_new(db_file, index_first)
|
|
tags_of_entries = DatabaseSQLite.get_30_tags_by_entries_new(db_file, index_first)
|
|
entries_count = DatabaseSQLite.get_entries_count(db_file)
|
|
case 'popular':
|
|
description = 'Popular bookmarks' # 'Most popular'
|
|
entries_database = DatabaseSQLite.get_entries_popular(db_file, index_first)
|
|
tags_of_entries = DatabaseSQLite.get_30_tags_by_entries_popular(db_file, index_first)
|
|
entries_count = DatabaseSQLite.get_entries_count(db_file)
|
|
case 'query':
|
|
node_id = syndicate = 'new'
|
|
description = 'Posted bookmarks with "{}"'.format(param_query)
|
|
entries_database = DatabaseSQLite.get_entries_by_query(db_file, param_query, index_first)
|
|
tags_of_entries = DatabaseSQLite.get_30_tags_by_entries_by_query_recent(db_file, param_query, index_first)
|
|
entries_count = DatabaseSQLite.get_entries_count_by_query(db_file, param_query)
|
|
case 'recent':
|
|
description = 'Recent bookmarks'
|
|
entries_database = DatabaseSQLite.get_entries_recent(db_file, index_first)
|
|
tags_of_entries = DatabaseSQLite.get_30_tags_by_entries_recent(db_file, index_first)
|
|
entries_count = DatabaseSQLite.get_entries_count(db_file)
|
|
if not entries_database:
|
|
#message = 'Blasta system message » Error: No entries were found.'
|
|
#description = 'No results'
|
|
#path = 'error'
|
|
#return result_post(request, jabber_id, description, message, path)
|
|
raise HTTPException(status_code=404, detail='No entries were found')
|
|
tags_dict = {}
|
|
#for tag, instances in DatabaseSQLite.get_tags_30(db_file):
|
|
for tag, instances in tags_of_entries:
|
|
tags_dict[tag] = instances
|
|
entries = []
|
|
for entry in entries_database:
|
|
tags_sorted = []
|
|
for tag in DatabaseSQLite.get_tags_by_entry_id(db_file, entry[0]):
|
|
tags_sorted.append(tag[0])
|
|
jid = DatabaseSQLite.get_jid_by_jid_id(db_file, entry[5])
|
|
url_hash = UtilitiesCryptography.hash_url_to_md5(entry[2])
|
|
instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash)
|
|
entries.append(
|
|
{'title' : entry[3],
|
|
'link' : entry[2],
|
|
'summary' : entry[4],
|
|
'published' : entry[6],
|
|
'updated' : entry[7],
|
|
'tags' : tags_sorted,
|
|
'url_hash' : url_hash, #entry[1]
|
|
'jid' : jid,
|
|
'name' : jid, # jid.split('@')[0] if '@' in jid else jid,
|
|
'instances' : instances})
|
|
for entry in entries:
|
|
try:
|
|
date_iso = entry['published']
|
|
date_wrt = UtilitiesDate.convert_iso8601_to_readable(date_iso)
|
|
entry['published_mod'] = date_wrt
|
|
except:
|
|
print('ERROR: Probably due to an attempt to convert a non ISO 8601.')
|
|
print(entry['published'])
|
|
print(entry['published_mod'])
|
|
print(entry)
|
|
index_last = index_first+len(entries_database)
|
|
if entries_count <= index_last:
|
|
# NOTE Did you forget to modify index_last?
|
|
# NOTE No. It appears that it probably not needed index_last = entries_count
|
|
page_next = None
|
|
#if page_type != 'new' or page_prev or param_tags or param_tld or param_filetype or param_protocol:
|
|
if request.url.path != '/' or request.url.query:
|
|
message = 'Listing bookmarks {} - {} out of {}.'.format(index_first+1, index_last, entries_count)
|
|
message_link = None
|
|
else:
|
|
message = ('Welcome to Blasta, an XMPP PubSub oriented social '
|
|
'bookmarks manager for organizing online content.')
|
|
message_link = {'href' : '/help/about', 'text' : 'Learn more'}
|
|
template_dict = {
|
|
'request' : request,
|
|
'description' : description,
|
|
'entries' : entries,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal,
|
|
'message' : message,
|
|
'message_link' : message_link,
|
|
'node_id' : node_id,
|
|
'page_next' : page_next,
|
|
'page_prev' : page_prev,
|
|
'page_type' : page_type,
|
|
'pager' : True,
|
|
'param_query' : param_query,
|
|
'param_tags' : param_tags,
|
|
'path' : path,
|
|
'pubsub_jid' : jabber_id_pubsub,
|
|
'syndicate' : syndicate,
|
|
'tags' : tags_dict}
|
|
if param_mode == 'feed':
|
|
# NOTE Consider scheme "feed" in order to prompt news
|
|
# reader 'feed://' + request.url.netloc + request.url.path
|
|
template_file = 'browse.atom'
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers["Content-Type"] = "application/xml"
|
|
else:
|
|
template_file = 'browse.xhtml'
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
"""
|
|
# TODO Return to code /tag and / (root) once SQLite database is ready.
|
|
@self.app.get('/tag/{tag}')
|
|
async def tag_tag_get(request: Request, tag):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
node_id = 'tag:{}'.format(tag)
|
|
syndicate = '?tag={}'.format(tag)
|
|
path = 'tag'
|
|
# NOTE Perhaps it would be beneficial to retrieve "published" and
|
|
# tags ("category") of viewer to override the tags of Blasta
|
|
# TODO If URL exist in visitor's bookmarks, display its properties
|
|
# (summary, tags title etc.) before data of others.
|
|
# if UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request):
|
|
page = request.query_params.get('page', '') or None
|
|
if page:
|
|
try:
|
|
page = int(page)
|
|
page_next = page + 1
|
|
page_prev = page - 1
|
|
except:
|
|
page = 1
|
|
page_next = 2
|
|
else:
|
|
page = 1
|
|
page_next = 2
|
|
page_prev = page - 1
|
|
index_first = (page - 1)*10
|
|
index_last = index_first+10
|
|
tags_dict = {}
|
|
for entry in entries_database:
|
|
for entry_tag in entry['tags']:
|
|
if entry_tag in tags_dict:
|
|
tags_dict[entry_tag] = tags_dict[entry_tag]+1
|
|
else:
|
|
tags_dict[entry_tag] = 1
|
|
tags_dict = dict(sorted(tags_dict.items(), key=lambda item: (-item[1], item[0])))
|
|
tags_dict = dict(list(tags_dict.items())[:30])
|
|
#tags_dict = dict(sorted(tags_dict.items(), key=lambda item: (-item[1], item[0]))[:30])
|
|
print(tags_dict)
|
|
entries = []
|
|
for entry in entries_database:
|
|
if tag in entry['tags']:
|
|
entries.append(entry)
|
|
for entry in entries:
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
"""
|
|
|
|
@self.app.post('/', response_class=HTMLResponse)
|
|
async def root_post(request: Request,
|
|
response: Response,
|
|
jabber_id: str = Form(...),
|
|
password: str = Form(...)):
|
|
if not UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request):
|
|
# Store a variable in the request's state
|
|
request.app.state.jabber_id = jabber_id
|
|
session_key = str(random.random())
|
|
request.app.state.session_key = session_key
|
|
accounts[jabber_id] = XmppInstance(jabber_id + '/blasta', password)
|
|
# accounts[jabber_id].authenticated
|
|
# dir(accounts[jabber_id])
|
|
# accounts[jabber_id].failed_auth
|
|
# accounts[jabber_id].event_when_connected
|
|
sessions[jabber_id] = session_key
|
|
# Check if the user and password are present and valid
|
|
# If not valid, return "Could not connect to JID"
|
|
|
|
# FIXME Instead of an arbitrary number (i.e. 5 seconds), write a
|
|
# while loop with a timeout of 10 seconds.
|
|
|
|
# Check whether an account is connected.
|
|
# Wait for 5 seconds to connect.
|
|
await asyncio.sleep(5)
|
|
#if jabber_id in accounts and accounts[jabber_id].connection_accepted:
|
|
|
|
if jabber_id in accounts:
|
|
xmpp_instance = accounts[jabber_id]
|
|
#await xmpp_instance.plugin['xep_0060'].delete_node(jabber_id, node_id_public)
|
|
|
|
for node_properties in nodes:
|
|
properties = nodes[node_properties]
|
|
if not await XmppPubsub.is_node_exist(xmpp_instance, properties['name']):
|
|
iq = XmppPubsub.create_node_atom(
|
|
xmpp_instance, jabber_id, properties['name'],
|
|
properties['title'], properties['subtitle'],
|
|
properties['access_model'])
|
|
await iq.send(timeout=15)
|
|
|
|
#await XmppPubsub.set_node_private(xmpp_instance, node_id_private)
|
|
#await XmppPubsub.set_node_private(xmpp_instance, node_id_read)
|
|
#configuration_form = await xmpp_instance['xep_0060'].get_node_config(jabber_id, properties['name'])
|
|
#print(configuration_form)
|
|
node_id = nodes['public']['name']
|
|
result, reason = await UtilitiesData.update_cache_and_database(
|
|
db_file, directory_cache, xmpp_instance, jabber_id, 'public', node_id)
|
|
if result == 'error':
|
|
message = 'XMPP system message » {}.'.format(reason)
|
|
description = 'IQ Error'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
else:
|
|
iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, 'xmpp:blasta:configuration:0', 'routine')
|
|
routine = None
|
|
if isinstance(iq, Iq):
|
|
payload = iq['pubsub']['items']['item']['payload']
|
|
if payload:
|
|
xmlns = '{jabber:x:data}'
|
|
element_value = payload.find('.//' + xmlns + 'field[@var="routine"]/' + xmlns + 'value')
|
|
if isinstance(element_value, ET.Element): routine = element_value.text
|
|
match routine:
|
|
case 'private':
|
|
response = RedirectResponse(url='/private')
|
|
case 'read':
|
|
response = RedirectResponse(url='/read')
|
|
case _:
|
|
response = RedirectResponse(url='/jid/')
|
|
|
|
else:
|
|
#del accounts[jabber_id]
|
|
#del sessions[jabber_id]
|
|
message = 'Blasta system message » Authorization has failed.'
|
|
description = 'Connection has failed'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
return response
|
|
|
|
@self.app.post('/message')
|
|
async def message_post(request: Request,
|
|
jid: str = Form(...),
|
|
body: str = Form(...)):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
if jabber_id:
|
|
#headline = 'This is a message from Blasta'
|
|
xmpp_instance = accounts[jabber_id]
|
|
XmppMessage.send(xmpp_instance, jid, body)
|
|
alias = jid.split('@')[0]
|
|
message = 'Your message has been sent to {}.'.format(alias)
|
|
description = 'Message has been sent'
|
|
path = 'message'
|
|
else:
|
|
message = 'Blasta system message » Error: No active session.'
|
|
description = 'You are not connected'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
|
|
@self.app.get('/now')
|
|
def now_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
template_file = 'now.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/private', response_class=HTMLResponse)
|
|
@self.app.post('/private')
|
|
async def private_get(request: Request, response : Response):
|
|
node_type = 'private'
|
|
path = 'private'
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
if jabber_id:
|
|
xmpp_instance = accounts[jabber_id]
|
|
node_id = nodes[node_type]['name']
|
|
result, reason = await UtilitiesData.update_cache_and_database(
|
|
db_file, directory_cache, xmpp_instance, jabber_id, node_type, node_id)
|
|
if result == 'error':
|
|
message = 'Blasta system message » {}.'.format(reason)
|
|
description = 'Directory "private" appears to be empty'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
else:
|
|
response = await jid_main_get(request, node_type, path)
|
|
return response
|
|
else:
|
|
description = 'An XMPP account is required'
|
|
message = 'Blasta system message » Please connect with your XMPP account to view this directory.'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
|
|
@self.app.get('/profile')
|
|
async def profile_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
if jabber_id:
|
|
xmpp_instance = accounts[jabber_id]
|
|
if not await XmppPubsub.is_node_exist(xmpp_instance, 'xmpp:blasta:configuration:0'):
|
|
iq = XmppPubsub.create_node_config(xmpp_instance, jabber_id)
|
|
await iq.send(timeout=15)
|
|
access_models = {}
|
|
for node_type in nodes:
|
|
node_id = nodes[node_type]['name']
|
|
iq = await XmppPubsub.get_node_configuration(xmpp_instance, jabber_id, node_id)
|
|
access_model = iq['pubsub_owner']['configure']['form']['values']['pubsub#access_model']
|
|
access_models[node_type] = access_model
|
|
settings = {}
|
|
for setting in ['enrollment', 'routine']:
|
|
iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, 'xmpp:blasta:configuration:0', setting)
|
|
if isinstance(iq, Iq):
|
|
payload = iq['pubsub']['items']['item']['payload']
|
|
if payload:
|
|
xmlns = '{jabber:x:data}'
|
|
element_value = payload.find('.//' + xmlns + 'field[@var="' + setting + '"]/' + xmlns + 'value')
|
|
if isinstance(element_value, ET.Element): settings[setting] = element_value.text
|
|
template_file = 'profile.xhtml'
|
|
template_dict = {
|
|
'access_models' : access_models,
|
|
'enroll' : settings['enrollment'] if 'enrollment' in settings else None,
|
|
'request' : request,
|
|
'routine' : settings['routine'] if 'routine' in settings else None,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
else:
|
|
message = 'Blasta system message » Error: No active session.'
|
|
description = 'You are not connected'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
return response
|
|
|
|
@self.app.post('/profile')
|
|
async def profile_post(request: Request,
|
|
routine: str = Form(None),
|
|
enroll: str = Form(None)):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
if jabber_id:
|
|
xmpp_instance = accounts[jabber_id]
|
|
if routine:
|
|
message = 'The routine directory has been set to {}'.format(routine)
|
|
payload = DataForm.create_setting_entry(xmpp_instance, 'routine', routine)
|
|
iq = await XmppPubsub.publish_node_item( # NOTE Consider "configurations" as item ID (see Movim)
|
|
xmpp_instance, jabber_id, 'xmpp:blasta:configuration:0', 'routine', payload)
|
|
if enroll:
|
|
if enroll == '1': message = 'Your database is shared with the Blasta system'
|
|
else: message = 'Your database is excluded from the Blasta system'
|
|
payload = DataForm.create_setting_entry(xmpp_instance, 'enroll', enroll)
|
|
iq = await XmppPubsub.publish_node_item(
|
|
xmpp_instance, jabber_id, 'xmpp:blasta:configuration:0', 'enrollment', payload)
|
|
description = 'Setting has been saved'
|
|
template_file = 'result.xhtml'
|
|
template_dict = {
|
|
'description' : description,
|
|
'enroll' : enroll,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal,
|
|
'message' : message,
|
|
'request' : request,
|
|
'routine' : routine}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
else:
|
|
message = 'Blasta system message » Error: No active session.'
|
|
description = 'You are not connected'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
return response
|
|
|
|
@self.app.get('/profile/export/{node_type}/{filetype}')
|
|
async def profile_export_get(request: Request, node_type, filetype):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
if jabber_id:
|
|
xmpp_instance = accounts[jabber_id]
|
|
node_id = nodes[node_type]['name']
|
|
iq = await XmppPubsub.get_node_items(xmpp_instance, jabber_id, node_id)
|
|
if isinstance(iq, Iq):
|
|
entries = UtilitiesData.extract_iq_items(iq, jabber_id)
|
|
# TODO Append a bookmark or bookmarks of Blasta
|
|
if entries:
|
|
filename = os.path.join(directory_cache, 'export', jabber_id + '_' + node_type + '.' + filetype)
|
|
#filename = 'export/' + jabber_id + '_' + node_type + '.' + filetype
|
|
#filename = 'export/{}_{}.{}'.format(jabber_id, node_type, filetype)
|
|
#filename = 'export_' + node_type + '/' + jabber_id + '_' + '.' + filetype
|
|
#filename = 'export_{}/{}.{}'.format(node_type, jabber_id, filetype)
|
|
match filetype:
|
|
case 'json':
|
|
UtilitiesData.save_to_json(filename, entries)
|
|
case 'toml':
|
|
# NOTE Should the dict be named with 'entries' or 'private'/'public'/'read'?
|
|
data = {'entries' : entries}
|
|
UtilitiesData.save_to_toml(filename, data)
|
|
response = FileResponse(filename)
|
|
else:
|
|
message = 'Blasta system message » Error: No active session.'
|
|
description = 'You are not connected'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
return response
|
|
|
|
@self.app.post('/profile/import')
|
|
# def profile_import_post(file: UploadFile = File(...)):
|
|
async def profile_import_post(request: Request,
|
|
file: UploadFile | None = None,
|
|
merge: str = Form(None),
|
|
node: str = Form(...),
|
|
override: str = Form(None)):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
if jabber_id:
|
|
xmpp_instance = accounts[jabber_id]
|
|
if file:
|
|
|
|
# TODO If node does not exist, redirect to result page with
|
|
# a message that bookmarks are empty.
|
|
# NOTE No.
|
|
|
|
node_id = nodes[node]['name']
|
|
node_title = nodes[node]['title']
|
|
node_subtitle = nodes[node]['subtitle']
|
|
node_access_model = nodes[node]['access_model']
|
|
if not await XmppPubsub.is_node_exist(xmpp_instance, node_id):
|
|
iq = XmppPubsub.create_node_atom(
|
|
xmpp_instance, jabber_id, node_id, node_title,
|
|
node_subtitle, node_access_model)
|
|
await iq.send(timeout=15)
|
|
|
|
#return {"filename": file.filename}
|
|
content = file.file.read().decode()
|
|
|
|
# TODO Add match/case for filetype.
|
|
|
|
entries = tomllib.loads(content)
|
|
# entries_node = entries[node]
|
|
|
|
#breakpoint()
|
|
#for entry in entries: print(entry)
|
|
|
|
name = jabber_id.split('@')[0]
|
|
# timestamp = datetime.now().isoformat()
|
|
counter = 0
|
|
|
|
for entry_type in entries:
|
|
for entry in entries[entry_type]:
|
|
url_hash = item_id = UtilitiesCryptography.hash_url_to_md5(entry['link'])
|
|
instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash)
|
|
entry_new = {
|
|
'title' : entry['title'],
|
|
'link' : entry['link'],
|
|
'summary' : entry['summary'],
|
|
'published' : entry['published'],
|
|
'updated' : entry['published'],
|
|
#'updated' : entry['updated'],
|
|
'tags' : entry['tags'],
|
|
'url_hash' : url_hash,
|
|
'jid' : jabber_id,
|
|
'name' : name,
|
|
'instances' : instances}
|
|
xmpp_instance = accounts[jabber_id]
|
|
payload = UtilitiesSyndication.create_rfc4287_entry(entry_new)
|
|
iq = await XmppPubsub.publish_node_item(
|
|
xmpp_instance, jabber_id, node_id, item_id, payload)
|
|
#await iq.send(timeout=15)
|
|
counter += 1
|
|
|
|
message = 'Blasta system message » Imported {} items.'.format(counter)
|
|
description = 'Import successful'
|
|
path = 'profile'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
else:
|
|
message = 'Blasta system message » Error: No upload file sent.'
|
|
description = 'Import error'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
else:
|
|
message = 'Blasta system message » Error: No active session.'
|
|
description = 'You are not connected'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
|
|
@self.app.get('/save')
|
|
async def save_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
if jabber_id:
|
|
xmpp_instance = accounts[jabber_id]
|
|
param_url = request.query_params.get('url', '')
|
|
url_hash = UtilitiesCryptography.hash_url_to_md5(param_url)
|
|
for node_type in nodes:
|
|
node_id = nodes[node_type]['name']
|
|
iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, node_id, url_hash)
|
|
#if len(iq['pubsub']['items']):
|
|
if (isinstance(iq, Iq) and
|
|
url_hash == iq['pubsub']['items']['item']['id']):
|
|
return RedirectResponse(url='/url/' + url_hash + '/edit')
|
|
iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, 'xmpp:blasta:configuration:0', 'routine')
|
|
if isinstance(iq, Iq):
|
|
payload = iq['pubsub']['items']['item']['payload']
|
|
if payload:
|
|
xmlns = '{jabber:x:data}'
|
|
element_value = payload.find('.//' + xmlns + 'field[@var="routine"]/' + xmlns + 'value')
|
|
if isinstance(element_value, ET.Element): routine = element_value.text
|
|
else:
|
|
routine = None
|
|
# NOTE Is "message" missing?
|
|
description = 'Add a new bookmark' # 'Enter properties for a bookmark'
|
|
param_title = request.query_params.get('title', '')
|
|
param_tags = request.query_params.get('tags', '')
|
|
param_summary = request.query_params.get('summary', '')
|
|
path = 'save'
|
|
if request.query_params:
|
|
message = message_link = None
|
|
else:
|
|
message = 'For greater ease, you migh want to try our'
|
|
message_link = {'href' : '/help/utilities#buttons', 'text' : 'bookmarklets'}
|
|
template_file = 'edit.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'description' : description,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal,
|
|
'message' : message,
|
|
'message_link' : message_link,
|
|
'path' : path,
|
|
'routine' : routine,
|
|
'summary' : param_summary,
|
|
'tags' : param_tags,
|
|
'title' : param_title,
|
|
'url' : param_url}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
else:
|
|
message = 'Blasta system message » Error: No active session.'
|
|
description = 'You are not connected'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
return response
|
|
|
|
@self.app.post('/save')
|
|
async def save_post(request: Request,
|
|
node: str = Form(...),
|
|
summary: str = Form(''),
|
|
tags: str = Form(''),
|
|
title: str = Form(...),
|
|
url: str = Form(...)):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
if jabber_id:
|
|
xmpp_instance = accounts[jabber_id]
|
|
url_hash = UtilitiesCryptography.hash_url_to_md5(url)
|
|
for node_type in nodes:
|
|
node_id = nodes[node_type]['name']
|
|
iq = await XmppPubsub.get_node_item(
|
|
xmpp_instance, jabber_id, node_id, url_hash)
|
|
if (isinstance(iq, Iq) and
|
|
url_hash == iq['pubsub']['items']['item']['id']):
|
|
return RedirectResponse(url='/url/' + url_hash + '/edit')
|
|
description = 'Confirm properties of a bookmark'
|
|
path = 'save'
|
|
published = datetime.now().isoformat()
|
|
template_file = 'edit.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'confirm' : True,
|
|
'description' : description,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal,
|
|
'node' : node,
|
|
'path' : path,
|
|
'published' : published,
|
|
'summary' : summary,
|
|
'tags' : tags,
|
|
'title' : title,
|
|
'url' : url,
|
|
'url_hash' : url_hash}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
else:
|
|
message = 'Blasta system message » Error: No active session.'
|
|
description = 'You are not connected'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
|
|
@self.app.get('/read', response_class=HTMLResponse)
|
|
@self.app.post('/read')
|
|
async def read_get(request: Request, response : Response):
|
|
node_type = 'read'
|
|
path = 'read'
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
if jabber_id:
|
|
xmpp_instance = accounts[jabber_id]
|
|
node_id = nodes[node_type]['name']
|
|
result, reason = await UtilitiesData.update_cache_and_database(
|
|
db_file, directory_cache, xmpp_instance, jabber_id, node_type, node_id)
|
|
if result == 'error':
|
|
message = 'Blasta system message » {}.'.format(reason)
|
|
description = 'Directory "read" appears to be empty'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
else:
|
|
response = await jid_main_get(request, node_type, path)
|
|
return response
|
|
else:
|
|
description = 'An XMPP account is required'
|
|
message = 'Blasta system message » Please connect with your XMPP account to view this directory.'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
|
|
def result_post(request: Request, jabber_id: str, description: str, message: str, path: str, http_code=None):
|
|
template_file = 'result.xhtml'
|
|
template_dict = {
|
|
'description' : description,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal,
|
|
'message' : message,
|
|
'path' : path,
|
|
'request' : request}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/register')
|
|
def register_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
template_file = 'register.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/rss')
|
|
def rss(request: Request):
|
|
return RedirectResponse(url='/help/syndication')
|
|
|
|
@self.app.get('/search')
|
|
async def search_get(request: Request):
|
|
response = RedirectResponse(url='/search/all')
|
|
return response
|
|
|
|
@self.app.get('/search/all')
|
|
async def search_all_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
description = 'Search for public bookmarks'
|
|
form_action = '/query'
|
|
input_id = input_name = label_for = 'q'
|
|
input_placeholder = 'Enter a search query.'
|
|
input_type = 'search'
|
|
label = 'Search'
|
|
message = 'Search for bookmarks in the Blasta system.'
|
|
path = 'all'
|
|
template_file = 'search.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'description' : description,
|
|
'form_action' : form_action,
|
|
'input_id' : input_id,
|
|
'input_name' : input_name,
|
|
'input_placeholder' : input_placeholder,
|
|
'input_type' : input_type,
|
|
'label' : label,
|
|
'label_for' : label_for,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal,
|
|
'message' : message,
|
|
'path' : path}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/search/jid/{jid}')
|
|
async def search_jid_get(request: Request, jid):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
if jabber_id:
|
|
if jabber_id == jid:
|
|
description = 'Search your own bookmarks'
|
|
message = 'Search for bookmarks from your own directory.'
|
|
else:
|
|
description = 'Search bookmarks of {}'.format(jid)
|
|
message = 'Search for bookmarks of a given Jabber ID.'
|
|
form_action = '/jid/' + jid
|
|
input_id = input_name = label_for = 'q'
|
|
input_placeholder = 'Enter a search query.'
|
|
input_type = 'search'
|
|
label = 'Search'
|
|
path = 'jid'
|
|
template_file = 'search.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'description' : description,
|
|
'form_action' : form_action,
|
|
'input_id' : input_id,
|
|
'input_name' : input_name,
|
|
'input_placeholder' : input_placeholder,
|
|
'input_type' : input_type,
|
|
'jabber_id' : jabber_id,
|
|
'jid' : jid,
|
|
'label' : label,
|
|
'label_for' : label_for,
|
|
'journal' : journal,
|
|
'message' : message,
|
|
'path' : path}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
else:
|
|
response = RedirectResponse(url='/search/all')
|
|
return response
|
|
|
|
@self.app.get('/search/url')
|
|
async def search_url_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
description = 'Search for a bookmark'
|
|
form_action = None # This is not relevant due to function middleware. Maybe / or /url.
|
|
input_id = input_name = label_for = 'url'
|
|
input_placeholder = 'Enter a URL.'
|
|
input_type = 'url'
|
|
label = 'URL'
|
|
message = 'Search for a bookmark by a URL.'
|
|
path = 'url'
|
|
template_file = 'search.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'description' : description,
|
|
# 'form_action' : form_action,
|
|
'input_id' : input_id,
|
|
'input_name' : input_name,
|
|
'input_placeholder' : input_placeholder,
|
|
'input_type' : input_type,
|
|
'label' : label,
|
|
'label_for' : label_for,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal,
|
|
'message' : message,
|
|
'path' : path}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/tag')
|
|
def tag_get(request: Request):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
tag_list = DatabaseSQLite.get_tags_500(db_file)
|
|
message = 'Common 500 tags sorted by name and sized by commonality.'
|
|
description = 'Common tags'
|
|
template_file = 'tag.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'description' : description,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal,
|
|
'message' : message,
|
|
'tag_list' : tag_list}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/tag/{jid}')
|
|
def tag_get_jid(request: Request, jid):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
# NOTE Consider retrieval of tags from cache file.
|
|
# This is relevant to private and read nodes.
|
|
#if jabber_id == jid or node_type in ('private', 'read'):
|
|
tag_list = DatabaseSQLite.get_500_tags_by_jid_sorted_by_name(db_file, jid)
|
|
message = 'Common 500 tags sorted by name and sized by commonality.'
|
|
description = 'Common tags of {}'.format(jid)
|
|
template_file = 'tag.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'description' : description,
|
|
'jabber_id' : jabber_id,
|
|
'jid' : jid,
|
|
'journal' : journal,
|
|
'message' : message,
|
|
'tag_list' : tag_list}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
|
|
@self.app.get('/url')
|
|
async def url_get(request: Request):
|
|
response = RedirectResponse(url='/search/url')
|
|
return response
|
|
|
|
@self.app.get('/url/{url_hash}')
|
|
async def url_hash_get(request: Request, url_hash):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
node_id = 'hash:{}'.format(url_hash)
|
|
param_hash = url_hash
|
|
syndicate = path = 'url'
|
|
entries = []
|
|
exist = False
|
|
if len(url_hash) == 32:
|
|
if jabber_id:
|
|
xmpp_instance = accounts[jabber_id]
|
|
for node in nodes:
|
|
node_id = nodes[node]['name']
|
|
iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, node_id, url_hash)
|
|
if isinstance(iq, Iq):
|
|
# TODO If URL exist in visitor's bookmarks, display its properties (summary, tags title etc.) before data of others.
|
|
iq_item = iq['pubsub']['items']['item']
|
|
item_payload = iq_item['payload']
|
|
if item_payload:
|
|
exist = True
|
|
break
|
|
else:
|
|
message = 'XMPP system message » Error: {}.'.format(iq)
|
|
description = 'The requested bookmark could not be retrieved'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
if exist:
|
|
# TODO Perhaps adding a paragraph with "your tags" and "who else has tagged this link"
|
|
# and keep the (5 item) limit.
|
|
#entry = UtilitiesSyndication.extract_items(item_payload)
|
|
# NOTE Display only 5 items, as all the other tags appear at the list of "Related tags".
|
|
entry = UtilitiesSyndication.extract_items(item_payload, limit=True)
|
|
if entry:
|
|
#url_hash = iq_item['id']
|
|
url_hash = UtilitiesCryptography.hash_url_to_md5(entry['link'])
|
|
# TODO Add a check: if iq_item['id'] == url_hash:
|
|
instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash)
|
|
entry['instances'] = instances
|
|
entry['jid'] = jabber_id
|
|
name = jabber_id.split('@')[0]
|
|
entry['name'] = name
|
|
entry['url_hash'] = url_hash
|
|
entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(entry['published'])
|
|
#entry['tags'] = entry['tags'][:5]
|
|
entries.append(entry)
|
|
tags_list = {}
|
|
tags_and_instances = DatabaseSQLite.get_tags_and_instances_by_url_hash(db_file, url_hash)
|
|
for tag, tag_instances in tags_and_instances: tags_list[tag] = tag_instances
|
|
else: # NOTE Is it possible to activate this else statement? Consider removal.
|
|
# https://fastapi.tiangolo.com/tutorial/handling-errors/
|
|
#raise HTTPException(status_code=404, detail="Item not found")
|
|
message = 'Blasta system message » Error: Not found.'
|
|
description = 'The requested bookmark does not exist'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
return response
|
|
else:
|
|
entry = DatabaseSQLite.get_entry_by_url_hash(db_file, url_hash)
|
|
tags_sorted = []
|
|
if entry:
|
|
for tag in DatabaseSQLite.get_tags_by_entry_id(db_file, entry[0]):
|
|
tags_sorted.append(tag[0])
|
|
tags_list = {}
|
|
tags_and_instances = DatabaseSQLite.get_tags_and_instances_by_entry_id(db_file, entry[0])
|
|
for tag, tag_instances in tags_and_instances: tags_list[tag] = tag_instances
|
|
jid = DatabaseSQLite.get_jid_by_jid_id(db_file, entry[5])
|
|
instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash)
|
|
entries.append(
|
|
{'title' : entry[3],
|
|
'link' : entry[2],
|
|
'summary' : entry[4],
|
|
'published' : entry[6],
|
|
'published_mod' : UtilitiesDate.convert_iso8601_to_readable(entry[6]),
|
|
'updated' : entry[7],
|
|
'tags' : tags_sorted,
|
|
'url_hash' : url_hash,
|
|
'jid' : jid,
|
|
'name' : jid, # jid.split('@')[0] if '@' in jid else jid,
|
|
'instances' : instances})
|
|
# message = 'XMPP system message » {}.'.format(iq)
|
|
# if iq == 'Node not found':
|
|
# description = 'An error has occurred'
|
|
# else:
|
|
# description = 'An unknown error has occurred'
|
|
else:
|
|
# https://fastapi.tiangolo.com/tutorial/handling-errors/
|
|
#raise HTTPException(status_code=404, detail="Item not found")
|
|
message = 'Blasta system message » Error: Not found.'
|
|
description = 'The requested bookmark does not exist'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
return response
|
|
else:
|
|
entry = DatabaseSQLite.get_entry_by_url_hash(db_file, url_hash)
|
|
if entry:
|
|
tags_sorted = []
|
|
for tag in DatabaseSQLite.get_tags_by_entry_id(db_file, entry[0]):
|
|
tags_sorted.append(tag[0])
|
|
tags_list = {}
|
|
tags_and_instances = DatabaseSQLite.get_tags_and_instances_by_entry_id(db_file, entry[0])
|
|
for tag, tag_instances in tags_and_instances: tags_list[tag] = tag_instances
|
|
jid = DatabaseSQLite.get_jid_by_jid_id(db_file, entry[5])
|
|
instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash)
|
|
entries.append(
|
|
{'title' : entry[3],
|
|
'link' : entry[2],
|
|
'summary' : entry[4],
|
|
'published' : entry[6],
|
|
'published_mod' : UtilitiesDate.convert_iso8601_to_readable(entry[6]),
|
|
'updated' : entry[7],
|
|
'tags' : tags_sorted,
|
|
'url_hash' : url_hash,
|
|
'jid' : jid,
|
|
'name' : jid, # jid.split('@')[0] if '@' in jid else jid,
|
|
'instances' : instances})
|
|
else:
|
|
# https://fastapi.tiangolo.com/tutorial/handling-errors/
|
|
#raise HTTPException(status_code=404, detail="Item not found")
|
|
message = 'Blasta system message » Error: Not found.'
|
|
description = 'The requested bookmark does not exist'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
message = 'Information for URI {}'.format(entries[0]['link']) # entry[2]
|
|
if not instances: instances = 0
|
|
if instances > 1:
|
|
description = 'Discover new resources and see who shares them'
|
|
template_file = 'people.xhtml'
|
|
people_list = {}
|
|
jids_and_tags = DatabaseSQLite.get_jids_and_tags_by_url_hash(db_file, url_hash)
|
|
for jid, tag in jids_and_tags:
|
|
if jid in people_list and isinstance(people_list[jid], list):
|
|
people_list[jid].append(tag)
|
|
else:
|
|
people_list[jid] = [tag]
|
|
else:
|
|
people_list = None
|
|
description = 'Resource properties'
|
|
template_file = 'browse.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'description' : description,
|
|
'entries' : entries,
|
|
'exist' : exist,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal,
|
|
'message' : message,
|
|
'node_id' : node_id,
|
|
'param_hash' : param_hash,
|
|
'path' : path,
|
|
'people' : people_list,
|
|
'pubsub_jid' : jabber_id_pubsub,
|
|
'syndicate' : syndicate,
|
|
'tags' : tags_list}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
else:
|
|
message = 'Blasta system message » Error: MD5 message-digest algorithm.'
|
|
description = 'The argument for URL does not appear to be a valid MD5 Checksum'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
return response
|
|
|
|
@self.app.post('/url/{url_hash}')
|
|
async def url_hash_post(request: Request,
|
|
url_hash,
|
|
node: str = Form(...),
|
|
published: str = Form(...),
|
|
summary: str = Form(''),
|
|
tags: str = Form(''),
|
|
#tags_old: str = Form(...),
|
|
tags_old: str = Form(''),
|
|
title: str = Form(...),
|
|
url: str = Form(...)):
|
|
node_id = 'hash:{}'.format(url_hash)
|
|
param_hash = url_hash
|
|
syndicate = path = 'url'
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
if jabber_id:
|
|
name = jabber_id.split('@')[0]
|
|
instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash)
|
|
timestamp = datetime.now().isoformat()
|
|
tags_new = UtilitiesData.organize_tags(tags) if tags else ''
|
|
entry = {'title' : title.strip(),
|
|
'link' : url.strip(),
|
|
'summary' : summary.strip() if summary else '',
|
|
'published' : published,
|
|
'updated' : timestamp,
|
|
'tags' : tags_new,
|
|
'url_hash' : url_hash,
|
|
'jid' : jabber_id,
|
|
'name' : name,
|
|
'instances' : instances or 1}
|
|
message = 'Information for URL {}'.format(url)
|
|
description = 'Bookmark properties'
|
|
xmpp_instance = accounts[jabber_id]
|
|
payload = UtilitiesSyndication.create_rfc4287_entry(entry)
|
|
# TODO Add try/except for IQ
|
|
print('Publish item')
|
|
# TODO Check.
|
|
# NOTE You might not need to append to an open node before appending to a whitelist node.
|
|
node_id = nodes[node]['name']
|
|
iq = await XmppPubsub.publish_node_item(
|
|
xmpp_instance, jabber_id, node_id, url_hash, payload)
|
|
match node:
|
|
case 'private':
|
|
print('Set item as private (XEP-0223)')
|
|
#iq = await XmppPubsub.publish_node_item_private(
|
|
# xmpp_instance, node_id_private, url_hash, iq)
|
|
await XmppPubsub.del_node_item(xmpp_instance, jabber_id, node_id_public, url_hash)
|
|
UtilitiesData.remove_item_from_cache(directory_cache, jabber_id, 'public', url_hash)
|
|
await XmppPubsub.del_node_item(xmpp_instance, jabber_id, node_id_read, url_hash)
|
|
UtilitiesData.remove_item_from_cache(directory_cache, jabber_id, 'read', url_hash)
|
|
case 'public':
|
|
await XmppPubsub.del_node_item(xmpp_instance, jabber_id, node_id_private, url_hash)
|
|
UtilitiesData.remove_item_from_cache(directory_cache, jabber_id, 'private', url_hash)
|
|
await XmppPubsub.del_node_item(xmpp_instance, jabber_id, node_id_read, url_hash)
|
|
UtilitiesData.remove_item_from_cache(directory_cache, jabber_id, 'read', url_hash)
|
|
case 'read':
|
|
#iq = await XmppPubsub.publish_node_item_private(
|
|
# xmpp_instance, node_id_read, url_hash, iq)
|
|
await XmppPubsub.del_node_item(xmpp_instance, jabber_id, node_id_public, url_hash)
|
|
UtilitiesData.remove_item_from_cache(directory_cache, jabber_id, 'public', url_hash)
|
|
await XmppPubsub.del_node_item(xmpp_instance, jabber_id, node_id_private, url_hash)
|
|
UtilitiesData.remove_item_from_cache(directory_cache, jabber_id, 'private', url_hash)
|
|
if isinstance(iq, str):
|
|
description = 'Could not save bookmark'
|
|
message = 'XMPP system message » {}.'.format(iq)
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
#await iq.send(timeout=15)
|
|
# Save changes to cache file
|
|
entries_cache_filename = os.path.join(directory_cache, 'items', jabber_id + '.toml')
|
|
entries_cache = UtilitiesData.open_file_toml(entries_cache_filename)
|
|
entries_cache_node = entries_cache[node] if node in entries_cache else []
|
|
entries_cache_mod = []
|
|
#for entry_cache in entries_cache_node:
|
|
# if entry_cache['url_hash'] == url_hash:
|
|
# entry_cache = entry
|
|
# break
|
|
is_entry_modified = False
|
|
# You already have this code in the HTML form, which indicates that this is an edit of an existing item
|
|
# <input type="hidden" id="update" name="update" value="yes" required/>
|
|
for entry_cache in entries_cache_node:
|
|
if entry_cache['url_hash'] == url_hash:
|
|
is_entry_modified = True
|
|
entries_cache_mod.append(entry)
|
|
else:
|
|
entries_cache_mod.append(entry_cache)
|
|
if not is_entry_modified: entries_cache_mod.append(entry)
|
|
entries_cache[node] = entries_cache_mod
|
|
entries_cache_data = entries_cache
|
|
UtilitiesData.save_to_toml(entries_cache_filename, entries_cache_data)
|
|
# Save changes to database
|
|
if node == 'public':
|
|
tags_valid = []
|
|
tags_invalid = []
|
|
#tags_list_new = tags.split(',')
|
|
tags_list_new = tags_new
|
|
tags_list_old = tags_old.split(', ')
|
|
for tag in tags_list_old:
|
|
tag_trim = tag.strip()
|
|
if tag not in tags_list_new:
|
|
tags_invalid.append(tag_trim)
|
|
for tag in tags_list_new:
|
|
if tag:
|
|
tag_trim = tag.strip()
|
|
if tag_trim not in tags_list_old:
|
|
tags_valid.append(tag_trim)
|
|
# FIXME Variable tags_valid is not in use.
|
|
# NOTE Variable tags_valid might not be needed. See function associate_entries_tags_jids.
|
|
entry['tags'] = tags_valid
|
|
await DatabaseSQLite.add_tags(db_file, [entry])
|
|
# Slow (high I/O)
|
|
entry_id = DatabaseSQLite.get_entry_id_by_url_hash(db_file, url_hash)
|
|
if not entry_id:
|
|
await DatabaseSQLite.add_new_entries(db_file, [entry]) # Is this line needed?
|
|
await DatabaseSQLite.associate_entries_tags_jids(db_file, entry)
|
|
#elif not DatabaseSQLite.is_jid_associated_with_url_hash(db_file, jabber_id, url_hash):
|
|
# await DatabaseSQLite.associate_entries_tags_jids(db_file, entry)
|
|
else:
|
|
await DatabaseSQLite.associate_entries_tags_jids(db_file, entry)
|
|
print('tags_new')
|
|
print(tags_new)
|
|
print('tags_old')
|
|
print(tags_old)
|
|
print('tags_valid')
|
|
print(tags_valid)
|
|
print('tags_invalid')
|
|
print(tags_invalid)
|
|
print(url_hash)
|
|
print(jabber_id)
|
|
await DatabaseSQLite.delete_combination_row_by_url_hash_and_tag_and_jid(db_file, url_hash, tags_invalid, jabber_id)
|
|
# Entry for HTML
|
|
entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(published)
|
|
entry['updated_mod'] = UtilitiesDate.convert_iso8601_to_readable(timestamp)
|
|
entry['tags'] = entry['tags'][:5]
|
|
entries = [entry]
|
|
template_file = 'browse.xhtml'
|
|
template_dict = {
|
|
'request': request,
|
|
'description': description,
|
|
'entries': entries,
|
|
'exist': True,
|
|
'jabber_id': jabber_id,
|
|
'journal': journal,
|
|
'message': message,
|
|
'node_id': node_id,
|
|
'param_hash': param_hash,
|
|
'path': path,
|
|
'pubsub_jid': jabber_id_pubsub,
|
|
'syndicate': syndicate,
|
|
'tags' : tags_new}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
return response
|
|
else:
|
|
message = 'Blasta system message » Error: No active session.'
|
|
description = 'You are not connected'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
|
|
@self.app.get('/url/{url_hash}/confirm')
|
|
async def url_hash_confirm_get(request: Request, url_hash):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
node_id = 'hash:{}'.format(url_hash)
|
|
param_hash = url_hash
|
|
syndicate = path = 'url'
|
|
if len(url_hash) == 32:
|
|
if jabber_id:
|
|
xmpp_instance = accounts[jabber_id]
|
|
exist = False
|
|
for node in nodes:
|
|
node_id = nodes[node]['name']
|
|
iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, node_id, url_hash)
|
|
if isinstance(iq, Iq):
|
|
# TODO If URL exist in visitor's bookmarks, display its properties (summary, tags title etc.) before data of others.
|
|
iq_item = iq['pubsub']['items']['item']
|
|
item_payload = iq_item['payload']
|
|
if item_payload:
|
|
exist = True
|
|
break
|
|
else:
|
|
message = 'XMPP system message » {}.'.format(iq)
|
|
if iq == 'Node not found':
|
|
description = 'An error has occurred'
|
|
else:
|
|
description = 'An unknown error has occurred'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
if exist:
|
|
# TODO Add a check: if iq_item['id'] == url_hash:
|
|
entries = []
|
|
entry = UtilitiesSyndication.extract_items(item_payload)
|
|
instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash)
|
|
entry['instances'] = instances
|
|
entry['jid'] = jabber_id
|
|
name = jabber_id.split('@')[0]
|
|
entry['name'] = name
|
|
entry['url_hash'] = url_hash
|
|
entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(entry['published'])
|
|
entries.append(entry)
|
|
description = 'Confirm deletion of a bookmark'
|
|
message = 'Details for bookmark {}'.format(entries[0]['link'])
|
|
template_file = 'browse.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'delete' : True,
|
|
'description' : description,
|
|
'entries' : entries,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal,
|
|
'message' : message,
|
|
'node_id' : node_id,
|
|
'param_hash' : param_hash,
|
|
'path' : path,
|
|
'pubsub_jid' : jabber_id_pubsub,
|
|
'syndicate' : syndicate}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
else:
|
|
response = RedirectResponse(url='/jid/' + jabber_id)
|
|
else:
|
|
message = 'Blasta system message » Error: No active session.'
|
|
description = 'You are not connected'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
else:
|
|
message = 'Blasta system message » Error: MD5 message-digest algorithm.'
|
|
description = 'The argument for URL does not appear to be a valid MD5 Checksum'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
return response
|
|
|
|
@self.app.get('/url/{url_hash}/delete')
|
|
async def url_hash_delete_get(request: Request, url_hash):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
node_id = 'hash:{}'.format(url_hash)
|
|
param_hash = url_hash
|
|
syndicate = path = 'url'
|
|
if len(url_hash) == 32:
|
|
if jabber_id:
|
|
xmpp_instance = accounts[jabber_id]
|
|
exist = False
|
|
for node_type in nodes:
|
|
node_id = nodes[node_type]['name']
|
|
iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, node_id, url_hash)
|
|
if isinstance(iq, Iq):
|
|
# TODO If URL exist in visitor's bookmarks, display its properties (summary, tags title etc.) before data of others.
|
|
iq_item = iq['pubsub']['items']['item']
|
|
item_payload = iq_item['payload']
|
|
if item_payload:
|
|
exist = True
|
|
break
|
|
else:
|
|
message = 'XMPP system message » {}.'.format(iq)
|
|
if iq == 'Node not found':
|
|
description = 'An error has occurred'
|
|
else:
|
|
description = 'An unknown error has occurred'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
if exist:
|
|
# TODO Add a check: if iq_item['id'] == url_hash:
|
|
entries = []
|
|
entry = UtilitiesSyndication.extract_items(item_payload)
|
|
instances = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash)
|
|
entry['instances'] = instances
|
|
entry['jid'] = jabber_id
|
|
name = jabber_id.split('@')[0]
|
|
entry['name'] = name
|
|
entry['url_hash'] = url_hash
|
|
entry['published_mod'] = UtilitiesDate.convert_iso8601_to_readable(entry['published'])
|
|
entries.append(entry)
|
|
|
|
# Set a title
|
|
description = 'A bookmark has been deleted'
|
|
# Set a message
|
|
message = 'Details for bookmark {}'.format(entry['link'])
|
|
|
|
# Create a link to restore bookmark
|
|
link_save = ('/save?url=' + urllib.parse.quote(entry['link']) +
|
|
'&title=' + urllib.parse.quote(entry['title']) +
|
|
'&summary=' + urllib.parse.quote(entry['summary']) +
|
|
'&tags=' + urllib.parse.quote(','.join(entry['tags'])))
|
|
|
|
# Remove the item from node
|
|
xmpp_instance = accounts[jabber_id]
|
|
await XmppPubsub.del_node_item(xmpp_instance, jabber_id, node_id, url_hash)
|
|
|
|
# Remove the item association from database
|
|
await DatabaseSQLite.delete_combination_row_by_jid_and_url_hash(db_file, url_hash, jabber_id)
|
|
#await DatabaseSQLite.delete_combination_row_by_url_hash_and_tag_and_jid(db_file, url_hash, entry['tags'], jabber_id)
|
|
|
|
# Remove the item from cache
|
|
UtilitiesData.remove_item_from_cache(directory_cache, jabber_id, node_type, url_hash)
|
|
|
|
template_file = 'browse.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'description' : description,
|
|
'entries' : entries,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal,
|
|
'link_save' : link_save,
|
|
'message' : message,
|
|
'node_id' : node_id,
|
|
'param_hash' : param_hash,
|
|
'path' : path,
|
|
'pubsub_jid' : jabber_id_pubsub,
|
|
'restore' : True,
|
|
'syndicate' : syndicate}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
else:
|
|
response = RedirectResponse(url='/jid/' + jabber_id)
|
|
else:
|
|
message = 'Blasta system message » Error: No active session.'
|
|
description = 'You are not connected'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
else:
|
|
message = 'Blasta system message » Error: MD5 message-digest algorithm.'
|
|
description = 'The argument for URL does not appear to be a valid MD5 Checksum'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
return response
|
|
|
|
@self.app.get('/url/{url_hash}/edit')
|
|
@self.app.post('/url/{url_hash}/edit')
|
|
async def url_hash_edit_get(request: Request, url_hash):
|
|
jabber_id = UtilitiesHttp.is_jid_matches_to_session(accounts, sessions, request)
|
|
# node_id = 'hash:{}'.format(url_hash)
|
|
if len(url_hash) == 32:
|
|
if jabber_id:
|
|
xmpp_instance = accounts[jabber_id]
|
|
exist = False
|
|
for node in nodes:
|
|
node_id = nodes[node]['name']
|
|
iq = await XmppPubsub.get_node_item(xmpp_instance, jabber_id, node_id, url_hash)
|
|
if isinstance(iq, Iq):
|
|
name = jabber_id.split('@')[0]
|
|
iq_item = iq['pubsub']['items']['item']
|
|
# TODO Add a check: if iq_item['id'] == url_hash:
|
|
# Is this valid entry['url_hash'] = iq['id'] or should it be iq_item['id']
|
|
entry = None
|
|
item_payload = iq_item['payload']
|
|
if item_payload:
|
|
exist = True
|
|
break
|
|
else:
|
|
message = 'XMPP system message » {}.'.format(iq)
|
|
if iq == 'Node not found':
|
|
description = 'An error has occurred'
|
|
else:
|
|
description = 'An unknown error has occurred'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
|
|
if exist:
|
|
path = 'edit'
|
|
description = 'Edit an existing bookmark'
|
|
entry = UtilitiesSyndication.extract_items(item_payload)
|
|
entry['instances'] = DatabaseSQLite.get_entry_instances_by_url_hash(db_file, url_hash)
|
|
print(jabber_id)
|
|
print(entry['tags'])
|
|
else:
|
|
# TODO Consider redirect to path /save (function save_get)
|
|
# NOTE This seems to be the best to do, albeit, perhaps the pathname should be /save instead of /url/hash/edit.
|
|
path = 'save' # 'add'
|
|
description = 'Add a new bookmark'
|
|
result = DatabaseSQLite.get_entry_by_url_hash(db_file, url_hash)
|
|
tags_sorted = []
|
|
if result:
|
|
for tag in DatabaseSQLite.get_tags_by_entry_id(db_file, result[0]):
|
|
tags_sorted.append(tag[0])
|
|
entry = {'title' : result[3],
|
|
'link' : result[2],
|
|
'summary' : result[4],
|
|
'published' : result[6],
|
|
'updated' : result[7],
|
|
'tags' : tags_sorted}
|
|
#'instances' : result[8],
|
|
#'jid' = jabber_id,
|
|
#'name' : name,
|
|
#'url_hash' : url_hash
|
|
if entry:
|
|
entry['jid'] = jabber_id
|
|
entry['name'] = name
|
|
entry['url_hash'] = url_hash
|
|
else:
|
|
message = 'XMPP system message » {}.'.format(iq)
|
|
if iq == 'Node not found':
|
|
description = 'An error has occurred'
|
|
else:
|
|
description = 'An unknown error has occurred'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
template_file = 'edit.xhtml'
|
|
template_dict = {
|
|
'request' : request,
|
|
'description' : description,
|
|
'edit' : True,
|
|
'jabber_id' : jabber_id,
|
|
'journal' : journal,
|
|
'node' : node,
|
|
'path' : path,
|
|
'published' : entry['published'],
|
|
'summary' : entry['summary'],
|
|
'tags' : ', '.join(entry['tags']),
|
|
'title' : entry['title'],
|
|
'url' : entry['link'],
|
|
'url_hash' : url_hash}
|
|
response = templates.TemplateResponse(template_file, template_dict)
|
|
response.headers['Content-Type'] = 'application/xhtml+xml'
|
|
else:
|
|
message = 'Blasta system message » Error: No active session.'
|
|
description = 'You are not connected'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
else:
|
|
message = 'Blasta system message » Error: MD5 message-digest algorithm.'
|
|
description = 'The argument for URL does not appear to be a valid MD5 Checksum'
|
|
path = 'error'
|
|
return result_post(request, jabber_id, description, message, path)
|
|
return response
|