[WIP] Add an IPC interface of type Unix domain socket (Berkeley sockets). Thank you Laura and TheCoffeMaker.

This commit is contained in:
Schimon Jehudah, Adv. 2024-06-10 18:54:27 +03:00
parent 5a2a2f9e3f
commit 245cd9832a
9 changed files with 1820 additions and 1231 deletions

View file

@ -84,8 +84,6 @@ from slixfeed.version import __version__
# import socks
# import socket
account_xmpp = config.get_values('accounts.toml', 'xmpp')
# account = ConfigAccount() # TODO ~Delete~ Clear as soon as posible after is no longer needed
def main():
@ -176,6 +174,8 @@ def main():
# if not alias:
# alias = (input('Alias: ')) or 'Slixfeed'
account_xmpp = config.get_values('accounts.toml', 'xmpp')
# Try configuration file
if 'client' in account_xmpp:
from slixfeed.xmpp.client import XmppClient

View file

@ -827,23 +827,6 @@ def list_search_results(query, results):
return message
def list_feeds_by_query(query, results):
function_name = sys._getframe().f_code.co_name
logger.debug('{}'.format(function_name))
message = ('Feeds containing "{}":\n\n```'
.format(query))
for result in results:
message += ('\nName : {} [{}]'
'\nURL : {}'
'\n'
.format(str(result[0]), str(result[1]), str(result[2])))
if len(results):
message += "\n```\nTotal of {} feeds".format(len(results))
else:
message = "No feeds were found for: {}".format(query)
return message
async def list_options(self, jid_bare):
"""
Print options.
@ -890,67 +873,6 @@ async def list_options(self, jid_bare):
return message
async def list_statistics(db_file):
"""
Print statistics.
Parameters
----------
db_file : str
Path to database file.
Returns
-------
msg : str
Statistics as message.
"""
function_name = sys._getframe().f_code.co_name
logger.debug('{}: db_file: {}'
.format(function_name, db_file))
entries_unread = sqlite.get_number_of_entries_unread(db_file)
entries = sqlite.get_number_of_items(db_file, 'entries_properties')
feeds_active = sqlite.get_number_of_feeds_active(db_file)
feeds_all = sqlite.get_number_of_items(db_file, 'feeds_properties')
# msg = """You have {} unread news items out of {} from {} news sources.
# """.format(unread_entries, entries, feeds)
# try:
# value = cur.execute(sql, par).fetchone()[0]
# except:
# print("Error for key:", key)
# value = "Default"
# values.extend([value])
message = ("Statistics:"
"\n"
"```"
"\n"
"News items : {}/{}\n"
"News sources : {}/{}\n"
"```").format(entries_unread,
entries,
feeds_active,
feeds_all)
return message
# FIXME Replace counter by len
def list_last_entries(results, num):
function_name = sys._getframe().f_code.co_name
logger.debug('{}: num: {}'
.format(function_name, num))
message = "Recent {} titles:\n\n```".format(num)
for result in results:
message += ("\n{}\n{}\n"
.format(str(result[0]), str(result[1])))
if len(results):
message += "```\n"
else:
message = "There are no news at the moment."
return message
def pick_a_feed(lang=None):
function_name = sys._getframe().f_code.co_name
logger.debug('{}: lang: {}'
@ -963,27 +885,6 @@ def pick_a_feed(lang=None):
return url
def list_feeds(results):
function_name = sys._getframe().f_code.co_name
logger.debug('{}'.format(function_name))
message = "\nList of subscriptions:\n\n```\n"
for result in results:
message += ("{} [{}]\n"
"{}\n"
"\n\n"
.format(str(result[1]), str(result[0]), str(result[2])))
if len(results):
message += ('```\nTotal of {} subscriptions.\n'
.format(len(results)))
else:
url = pick_a_feed()
message = ('List of subscriptions is empty. To add a feed, send a URL.'
'\n'
'Featured news: *{}*\n{}'
.format(url['name'], url['link']))
return message
def list_bookmarks(self, conferences):
function_name = sys._getframe().f_code.co_name
logger.debug('{}'.format(function_name))
@ -1014,7 +915,7 @@ def export_to_markdown(jid, filename, results):
# TODO Consider adding element jid as a pointer of import
def export_to_opml(jid, filename, results):
print(jid, filename, results)
# print(jid, filename, results)
function_name = sys._getframe().f_code.co_name
logger.debug('{} jid: {} filename: {}'
.format(function_name, jid, filename))

69
slixfeed/cli.py Normal file
View file

@ -0,0 +1,69 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
# from slixfeed.log import Logger
import socket
import sys
# logger = Logger(__name__)
# IPC parameters
ipc_socket_filename = '/tmp/slixfeed_xmpp.socket'
# Init socket object
if not os.path.exists(ipc_socket_filename):
print(f"File {ipc_socket_filename} doesn't exists")
sys.exit(-1)
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(ipc_socket_filename)
# def get_identifier():
# data = 'identifier'
# # Send request
# s.sendall(data.encode('utf-8'))
# # Wait for response
# datastream = s.recv(1024)
# return datastream.decode('utf-8')
def send_command(cmd, jid=None):
data = jid + '~' + cmd if jid else cmd
# Send request
s.sendall(data.encode('utf-8'))
# Wait for response
datastream = s.recv(1024)
return datastream.decode('utf-8')
# identifier = get_identifier()
# print('You are logged in as client #{}.format(identifier)')
print('Type a Jabber ID to commit an action upon.')
jid = input('slixfeed > ')
if not jid: jid = 'admin'
# TODO if not argument, enter loop.
try:
while True:
# print('Enter an action to act upon Jabber ID {}'.format(jid))
# print('Enter command:')
# cmd = input('slixfeed #{} ({}) > '.format(identifier, jid))
cmd = input('slixfeed ({}) > '.format(jid))
if cmd != '':
match cmd:
case 'switch':
print('Type a Jabber ID to commit an action upon.')
jid = input('slixfeed > ')
if not jid: jid = 'admin'
cmd = ''
case 'exit':
send_command(cmd, jid)
break
case _:
result = send_command(cmd, jid)
print(result)
except KeyboardInterrupt as e:
print(str(e))
# logger.error(str(e))
print('Disconnecting from IPC interface.')
s.close()

View file

@ -1887,6 +1887,24 @@ def get_entry_url(db_file, ix):
return url
def get_entry_summary(db_file, ix):
function_name = sys._getframe().f_code.co_name
logger.debug('{}: db_file: {} ix: {}'
.format(function_name, db_file, ix))
with create_connection(db_file) as conn:
cur = conn.cursor()
sql = (
"""
SELECT summary_text
FROM entries_properties
WHERE id = :ix
"""
)
par = (ix,)
summary = cur.execute(sql, par).fetchone()
return summary
def get_feed_url(db_file, feed_id):
function_name = sys._getframe().f_code.co_name
logger.debug('{}: db_file: {} feed_id: {}'
@ -2948,7 +2966,7 @@ def search_feeds(db_file, query):
cur = conn.cursor()
sql = (
"""
SELECT title, id, url
SELECT id, title, url
FROM feeds_properties
WHERE title LIKE ?
OR url LIKE ?
@ -2960,7 +2978,7 @@ def search_feeds(db_file, query):
return result
async def search_entries(db_file, query):
def search_entries(db_file, query):
"""
Query entries.

View file

@ -1,2 +1,2 @@
__version__ = '0.1.72'
__version_info__ = (0, 1, 72)
__version__ = '0.1.73'
__version_info__ = (0, 1, 73)

File diff suppressed because it is too large Load diff

View file

@ -32,6 +32,8 @@ NOTE
import asyncio
from datetime import datetime
import os
from feedparser import parse
import slixmpp
import slixfeed.task as task
@ -44,22 +46,30 @@ from slixfeed.url import join_url, trim_url
# import xml.etree.ElementTree as ET
# from lxml import etree
import slixfeed.action as action
import slixfeed.config as config
from slixfeed.config import Config
import slixfeed.crawl as crawl
import slixfeed.dt as dt
import slixfeed.fetch as fetch
import slixfeed.sqlite as sqlite
import slixfeed.url as uri
from slixfeed.log import Logger
from slixfeed.version import __version__
from slixfeed.xmpp.bookmark import XmppBookmark
from slixfeed.xmpp.chat import Chat
from slixfeed.xmpp.connect import XmppConnect
from slixfeed.xmpp.muc import XmppGroupchat
from slixfeed.xmpp.ipc import XmppIpcServer
from slixfeed.xmpp.iq import XmppIQ
from slixfeed.xmpp.message import XmppMessage
from slixfeed.xmpp.chat import Chat
from slixfeed.xmpp.muc import XmppGroupchat
from slixfeed.xmpp.presence import XmppPresence
from slixfeed.xmpp.privilege import is_moderator, is_operator, is_access
import slixfeed.xmpp.profile as profile
from slixfeed.xmpp.publish import XmppPubsub
from slixfeed.xmpp.roster import XmppRoster
# import slixfeed.xmpp.service as service
from slixfeed.xmpp.presence import XmppPresence
from slixfeed.xmpp.privilege import is_operator, is_access
from slixfeed.xmpp.upload import XmppUpload
from slixfeed.xmpp.utility import get_chat_type
import sys
import time
@ -69,27 +79,6 @@ try:
except:
import tomli as tomllib
import asyncio
from datetime import datetime
import logging
import os
import slixfeed.action as action
import slixfeed.config as config
from slixfeed.config import Config
import slixfeed.crawl as crawl
import slixfeed.dt as dt
import slixfeed.fetch as fetch
import slixfeed.url as uri
import slixfeed.sqlite as sqlite
import slixfeed.task as task
from slixfeed.version import __version__
from slixfeed.xmpp.bookmark import XmppBookmark
from slixfeed.xmpp.message import XmppMessage
from slixfeed.xmpp.presence import XmppPresence
from slixfeed.xmpp.roster import XmppRoster
from slixfeed.xmpp.upload import XmppUpload
from slixfeed.xmpp.privilege import is_moderator, is_operator, is_access
from slixfeed.xmpp.utility import get_chat_type
main_task = []
jid_tasker = {}
@ -364,9 +353,11 @@ class XmppClient(slixmpp.ClientXMPP):
Config.add_settings_jid(self.settings, jid_bare, db_file)
await task.start_tasks_xmpp_pubsub(self, jid_bare)
bookmarks = await XmppBookmark.get_bookmarks(self)
print('iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii')
await action.xmpp_muc_autojoin(self, bookmarks)
print('ooooooooooooooooooooooooooooooooo')
if 'ipc' in self.settings and self.settings['ipc']['bsd']:
# Start Inter-Process Communication
print('POSIX sockets: Initiating IPC server...')
self.ipc = asyncio.create_task(XmppIpcServer.ipc(self))
time_end = time.time()
difference = time_end - time_begin
if difference > 1: logger.warning('{} (time: {})'.format(function_name,
@ -1715,12 +1706,13 @@ class XmppClient(slixmpp.ClientXMPP):
logger.debug('Processed URL (tracker removal): {}'.format(url))
url = (await uri.replace_hostname(url, 'link')) or url
logger.debug('Processed URL (replace hostname): {}'.format(url))
result = await fetch.http(url)
if 'content' in result:
data = result['content']
summary = action.get_document_content_as_text(data)
else:
summary = 'No content to show.'
# result = await fetch.http(url)
# if 'content' in result:
# data = result['content']
# summary = action.get_document_content_as_text(data)
summary = sqlite.get_entry_summary(db_file, ix)
summary = summary[0]
summary = action.remove_html_tags(summary) if summary else 'No content to show.'
form.add_field(ftype="text-multi",
label='Article',
value=summary)
@ -1741,21 +1733,21 @@ class XmppClient(slixmpp.ClientXMPP):
value=feed_url,
var='url_feed')
field_feed['validate']['datatype'] = 'xs:anyURI'
options = form.add_field(desc='Select file type.',
ftype='list-single',
label='Save as',
required=True,
value='pdf',
var='filetype')
options.addOption('ePUB', 'epub')
options.addOption('HTML', 'html')
options.addOption('Markdown', 'md')
options.addOption('PDF', 'pdf')
options.addOption('Plain Text', 'txt')
# options = form.add_field(desc='Select file type.',
# ftype='list-single',
# label='Save as',
# required=True,
# value='pdf',
# var='filetype')
# options.addOption('ePUB', 'epub')
# options.addOption('HTML', 'html')
# options.addOption('Markdown', 'md')
# options.addOption('PDF', 'pdf')
# options.addOption('Plain Text', 'txt')
session['allow_complete'] = False
session['allow_prev'] = True
session['has_next'] = True
session['next'] = self._handle_recent_action
# session['next'] = self._handle_recent_action
session['payload'] = form
session['prev'] = self._handle_recent
return session
@ -1773,11 +1765,12 @@ class XmppClient(slixmpp.ClientXMPP):
url = values['subscription']
jid_bare = session['from'].bare
if is_operator(self, jid_bare) and 'jid' in values:
jid = values['jid']
jid_bare = jid[0] if isinstance(jid, list) else jid
custom_jid = values['jid']
jid_bare = custom_jid[0] if isinstance(custom_jid, list) else jid_bare
# jid_bare = custom_jid[0] if custom_jid else jid_bare
form.add_field(var='jid',
ftype='hidden',
value=jid)
value=jid_bare)
db_file = config.get_pathname_to_database(jid_bare)
if identifier and sqlite.check_identifier_exist(db_file, identifier):
form['title'] = 'Conflict'

1103
slixfeed/xmpp/commands.py Normal file

File diff suppressed because it is too large Load diff

347
slixfeed/xmpp/ipc.py Normal file
View file

@ -0,0 +1,347 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# print("Initiating IPC server...")
# print("Shutting down IPC server...")
"""
TODO Exchange socket fd and send a command to delete
socket (i.e. clients[fd]) from the respective client.
"""
import asyncio
import os
import slixfeed.config as config
from slixfeed.xmpp.commands import XmppCommands
import socket
class XmppIpcServer:
"""
Inter-Process Communication interface of type Berkeley sockets.
"""
async def ipc(self):
ipc_socket_filename = '/tmp/slixfeed_xmpp.socket'
# Setup socket
if os.path.exists(ipc_socket_filename):
os.remove(ipc_socket_filename)
loop = asyncio.get_running_loop()
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(ipc_socket_filename)
sock.listen(0)
# conn = None
# clients = []
# clients = {}
# Start listening loop
while True:
# Accept 'request'
conn, addr = await loop.sock_accept(sock)
# # Terminate an old connection in favour of a new connection
# if len(clients):
# for c in clients:
# print(c)
# c.close()
# del c
# else:
# conn, addr = await loop.sock_accept(sock)
# clients.append(conn)
# print(clients)
# Manage connections inside a dict
# fd = conn.fileno()
# clients[fd] = conn
# datastream = await loop.sock_recv(conn, 1024)
# if datastream.decode('utf-8') == 'identifier':
# await loop.sock_sendall(conn, fd.encode('utf-8'))
print('A connection from client has been detected. '
'Slixfeed is waiting for commands.')
# print('There are {} clients connected to the IPC '
# 'interface'.format(len(clients)))
# Process 'request'
while True:
response = None
# print('Awaiting for a command')
# print(clients[fd])
datastream = await loop.sock_recv(conn, 1024)
if not datastream:
break
data = datastream.decode('utf-8')
if '~' in data:
data_list = data.split('~')
jid_bare = data_list[0]
db_file = config.get_pathname_to_database(jid_bare)
command = data_list[1]
else:
command = data
match command:
case _ if command.startswith('add '):
command = command[4:]
url = command.split(' ')[0]
title = ' '.join(command.split(' ')[1:])
response = XmppCommands.feed_add(
url, db_file, jid_bare, title)
case _ if command.startswith('allow +'):
val = command[7:]
if val:
await XmppCommands.set_filter_allow(
db_file, val, True)
response = ('Approved keywords\n'
'```\n{}\n```'
.format(val))
else:
response = ('No action has been taken.'
'\n'
'Missing keywords.')
case _ if command.startswith('allow -'):
val = command[7:]
if val:
await XmppCommands.set_filter_allow(
db_file, val, False)
response = ('Approved keywords\n'
'```\n{}\n```'
.format(val))
else:
response = ('No action has been taken.'
'\n'
'Missing keywords.')
case _ if command.startswith('archive'):
val = command[8:]
response = await XmppCommands.set_archive(
self, jid_bare, val)
case _ if command.startswith('bookmark +'):
muc_jid = command[11:]
response = await XmppCommands.bookmark_add(
self, muc_jid)
case _ if command.startswith('bookmark -'):
muc_jid = command[11:]
response = await XmppCommands.bookmark_del(
self, muc_jid)
case 'bookmarks':
response = await XmppCommands.print_bookmarks(self)
case _ if command.startswith('clear '):
key = command[6:]
response = await XmppCommands.clear_filter(db_file, key)
case _ if command.startswith('default '):
key = command[8:]
response = await XmppCommands.restore_default(
self, jid_bare, key=None)
case 'defaults':
response = await XmppCommands.restore_default(self, jid_bare)
case _ if command.startswith('deny +'):
val = command[6:]
if val:
await XmppCommands.set_filter_allow(
db_file, val, True)
response = ('Rejected keywords\n'
'```\n{}\n```'
.format(val))
else:
response = ('No action has been taken.'
'\n'
'Missing keywords.')
case _ if command.startswith('deny -'):
val = command[6:]
if val:
await XmppCommands.set_filter_allow(
db_file, val, False)
response = ('Rejected keywords\n'
'```\n{}\n```'
.format(val))
else:
response = ('No action has been taken.'
'\n'
'Missing keywords.')
case _ if command.startswith('disable '):
response = await XmppCommands.feed_disable(
self, db_file, jid_bare, command)
case _ if command.startswith('enable '):
response = await XmppCommands.feed_enable(
self, db_file, command)
case _ if command.startswith('export'):
ext = command[7:]
if ext in ('md', 'opml'):
filename, result = XmppCommands.export_feeds(
self, jid_bare, ext)
response = result + ' : ' + filename
else:
response = 'Unsupported filetype. Try: md or opml'
case _ if command.startswith('feeds'):
query = command[6:]
result, number = XmppCommands.list_feeds(db_file, query)
if number:
if query:
first_line = ('Subscriptions containing "{}":\n'
.format(query))
else:
first_line = 'Subscriptions:\n'
response = (first_line + result +
'\nTotal of {} feeds'.format(number))
case _ if (command.startswith('gemini:') or
command.startswith('gopher:')):
response = XmppCommands.fetch_gemini()
case 'help':
response = XmppCommands.print_help()
case 'help all':
response = XmppCommands.print_help_list()
case _ if (command.startswith('http') and
command.endswith('.opml')):
response = await XmppCommands.import_opml(
self, db_file, jid_bare, command)
case 'info':
response = XmppCommands.print_info_list()
case _ if command.startswith('info'):
entry = command[5:].lower()
response = XmppCommands.print_info_specific(entry)
case 'pubsub list':
response = await XmppCommands.pubsub_list(
self, jid_bare)
case _ if command.startswith('pubsub list '):
jid = command[12:]
response = 'List of nodes for {}:\n```\n'.format(jid)
response = await XmppCommands.pubsub_list(self, jid)
response += '```'
case _ if command.startswith('pubsub send '):
info = command[12:]
info = info.split(' ')
jid = info[0]
# num = int(info[1])
if jid:
response = XmppCommands.pubsub_send(self, info, jid)
# TODO Handle node error
# sqlite3.IntegrityError: UNIQUE constraint failed: feeds_pubsub.node
# ERROR:slixmpp.basexmpp:UNIQUE constraint failed: feeds_pubsub.node
case _ if (command.startswith('http') or
command.startswith('feed:/') or
command.startswith('itpc:/') or
command.startswith('rss:/')):
response = await XmppCommands.fetch_http(
self, command, db_file, jid_bare)
case _ if command.startswith('interval'):
val = command[9:]
if val:
response = await XmppCommands.set_interval(
self, db_file, jid_bare, val)
else:
response = 'Current value for interval: '
response += XmppCommands.get_interval(self, jid_bare)
case _ if command.startswith('join'):
muc_jid = command[5:]
response = await XmppCommands.muc_join(self, muc_jid)
case _ if command.startswith('length'):
val = command[7:]
if val:
response = await XmppCommands.set_length(
self, db_file, jid_bare, val)
else:
response = 'Current value for length: '
response += XmppCommands.get_length(self, jid_bare)
case 'media off':
response = await XmppCommands.set_media_off(
self, jid_bare, db_file)
case 'media on':
response = await XmppCommands.set_media_on(
self, jid_bare, db_file)
case 'new':
response = await XmppCommands.set_old_off(
self, jid_bare, db_file)
case _ if command.startswith('next'):
await XmppCommands.send_next_update(self, jid_bare, command)
case _ if command.startswith('node delete '):
info = command[12:]
info = info.split(' ')
response = XmppCommands.node_delete(self, info)
case _ if command.startswith('node purge '):
info = command[11:]
info = info.split(' ')
response = XmppCommands.node_purge(self, info)
case 'old':
response = await XmppCommands.set_old_on(
self, jid_bare, db_file)
case 'options':
response = 'Options:\n```'
response += XmppCommands.print_options(self, jid_bare)
response += '\n```'
case _ if command.startswith('quantum'):
val = command[8:]
if val:
response = await XmppCommands.set_quantum(
self, db_file, jid_bare, val)
else:
response = 'Quantum: '
response += XmppCommands.get_quantum(
self, jid_bare)
case 'random':
response = XmppCommands.set_random(self, jid_bare, db_file)
case _ if command.startswith('read '):
data = command[5:]
data = data.split()
url = data[0]
if url:
response = await XmppCommands.feed_read(
self, jid_bare, data, url)
else:
response = ('No action has been taken.'
'\n'
'Missing URL.')
case _ if command.startswith('recent'):
num = command[7:]
if not num: num = 5
count, result = XmppCommands.print_recent(
self, db_file, num)
if count:
response = 'Recent {} fetched titles:\n'.format(num)
response += result
else:
response = result
case _ if command.startswith('remove '):
ix_url = command[7:]
ix_url = ix_url.split(' ')
response = await XmppCommands.feed_remove(
self, jid_bare, db_file, ix_url)
case _ if command.startswith('rename '):
response = await XmppCommands.feed_rename(
self, db_file, jid_bare, command)
case _ if command.startswith('reset'):
ix_url = command[6:]
ix_url = ix_url.split(' ')
response = await XmppCommands.mark_as_read(
self, jid_bare, db_file, ix_url)
case _ if command.startswith('search'):
query = command[7:]
response = XmppCommands.search_items(
self, db_file, query)
case 'start':
response = await XmppCommands.scheduler_start(
self, db_file, jid_bare)
case 'stats':
response = XmppCommands.print_statistics(db_file)
case 'stop':
response = await XmppCommands.scheduler_stop(
self, db_file, jid_bare)
case 'support':
response = XmppCommands.print_support_jid()
case 'version':
response = XmppCommands.print_version(self, jid_bare)
case _ if command.startswith('xmpp:'):
response = await XmppCommands.muc_join(self, command)
case _ if command.startswith('xmpp:'):
response = await XmppCommands.muc_join(self, command)
case 'exit':
conn.close()
break
case _:
response = XmppCommands.print_unknown()
# Send 'response'
await loop.sock_sendall(conn, response.encode('utf-8'))