Slixfeed/slixfeed/crawl.py
Schimon Jehudah 4e193a2b02 Add for Add Subscription.
Segregating more code as one task per function, particularly adding of new subscription.
Fix allow/deny keys.
2024-02-16 01:46:04 +00:00

409 lines
No EOL
12 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
FIXME
1) https://wiki.pine64.org
File "/slixfeed/crawl.py", line 178, in feed_mode_guess
address = join_url(url, parted_url.path.split('/')[1] + path)
~~~~~~~~~~~~~~~~~~~~~~~~~~^^^
IndexError: list index out of range
TODO
1.1) Attempt to scan more paths: /blog/, /news/ etc., including root /
Attempt to scan sub domains
https://esmailelbob.xyz/en/
https://blog.esmailelbob.xyz/feed/
1.2) Consider utilizing fetch.http_response
2) Consider merging with module fetch.py
FEEDS CRAWLER PROJECT
3) Mark redirects for manual check
Title : JSON Feed
Link : https://www.jsonfeed.org/feed.json.xml
Title : JSON Feed
Link : https://www.jsonfeed.org/feed.json/atom.xml
Title : JSON Feed
Link : https://www.jsonfeed.org/feed.json/feed.xml
Title : JSON Feed
Link : https://www.jsonfeed.org/feed.json/feeds/rss/news.xml.php
Title : JSON Feed
Link : https://www.jsonfeed.org/feed.json/jekyll/feed.xml
Title : JSON Feed
Link : https://www.jsonfeed.org/feed.json/news.xml
Title : JSON Feed
Link : https://www.jsonfeed.org/feed.json/news.xml.php
Title : JSON Feed
Link : https://www.jsonfeed.org/feed.json/rdf.xml
Title : JSON Feed
Link : https://www.jsonfeed.org/feed.json/rss.xml
Title : JSON Feed
Link : https://www.jsonfeed.org/feed.json/videos.xml
"""
from aiohttp import ClientError, ClientSession, ClientTimeout
from feedparser import parse
import logging
from lxml import html
import slixfeed.config as config
import slixfeed.fetch as fetch
from slixfeed.url import complete_url, join_url, trim_url
from urllib.parse import urlsplit, urlunsplit
# TODO Use boolean as a flag to determine whether a single URL was found
# async def probe_page(
# callback, url, document, num=None, db_file=None):
# result = None
# try:
# # tree = etree.fromstring(res[0]) # etree is for xml
# tree = html.fromstring(document)
# except:
# result = (
# "> {}\nFailed to parse URL as feed."
# ).format(url)
# if not result:
# print("RSS Auto-Discovery Engaged")
# result = await feed_mode_auto_discovery(url, tree)
# if not result:
# print("RSS Scan Mode Engaged")
# result = await feed_mode_scan(url, tree)
# if not result:
# print("RSS Arbitrary Mode Engaged")
# result = await feed_mode_request(url, tree)
# if not result:
# result = (
# "> {}\nNo news feeds were found for URL."
# ).format(url)
# # elif msg:
# else:
# if isinstance(result, str):
# return result
# elif isinstance(result, list):
# url = result[0]
# if db_file:
# # print("if db_file", db_file)
# return await callback(db_file, url)
# elif num:
# return await callback(url, num)
# else:
# return await callback(url)
async def probe_page(url, document):
"""
Parameters
----------
url : str
URL.
document : TYPE
DESCRIPTION.
Returns
-------
result : list or str
Single URL as list or selection of URLs as str.
"""
result = None
try:
# tree = etree.fromstring(res[0]) # etree is for xml
tree = html.fromstring(document)
except:
result = (
"> {}\nFailed to parse URL as feed."
).format(url)
if not result:
logging.debug(
"Feed auto-discovery engaged for {}".format(url))
result = await feed_mode_auto_discovery(url, tree)
if not result:
logging.debug(
"Feed link scan mode engaged for {}".format(url))
result = await feed_mode_scan(url, tree)
if not result:
logging.debug(
"Feed arbitrary mode engaged for {}".format(url))
result = await feed_mode_guess(url, tree)
if not result:
logging.debug(
"No feeds were found for {}".format(url))
result = (
"> {}\nNo news feeds were found for URL."
).format(url)
return result
# TODO Improve scan by gradual decreasing of path
async def feed_mode_guess(url, tree):
"""
Lookup for feeds by pathname using HTTP Requests.
Parameters
----------
db_file : str
Path to database file.
url : str
URL.
tree : TYPE
DESCRIPTION.
Returns
-------
msg : str
Message with URLs.
"""
urls = []
parted_url = urlsplit(url)
paths = config.open_config_file("lists.toml")["pathnames"]
# Check whether URL has path (i.e. not root)
# Check parted_url.path to avoid error in case root wasn't given
# TODO Make more tests
if parted_url.path and parted_url.path.split('/')[1]:
paths.extend(
[".atom", ".feed", ".rdf", ".rss"]
) if '.rss' not in paths else -1
# if paths.index('.rss'):
# paths.extend([".atom", ".feed", ".rdf", ".rss"])
for path in paths:
address = join_url(url, parted_url.path.split('/')[1] + path)
if address not in urls:
urls.extend([address])
# breakpoint()
# print("feed_mode_guess")
urls = await process_feed_selection(url, urls)
return urls
async def feed_mode_scan(url, tree):
"""
Scan page for potential feeds by pathname.
Parameters
----------
db_file : str
Path to database file.
url : str
URL.
tree : TYPE
DESCRIPTION.
Returns
-------
msg : str
Message with URLs.
"""
urls = []
paths = config.open_config_file("lists.toml")["pathnames"]
for path in paths:
# xpath_query = "//*[@*[contains(.,'{}')]]".format(path)
# xpath_query = "//a[contains(@href,'{}')]".format(path)
num = 5
xpath_query = (
"(//a[contains(@href,'{}')])[position()<={}]"
).format(path, num)
addresses = tree.xpath(xpath_query)
xpath_query = (
"(//a[contains(@href,'{}')])[position()>last()-{}]"
).format(path, num)
addresses += tree.xpath(xpath_query)
# NOTE Should number of addresses be limited or
# perhaps be N from the start and N from the end
for address in addresses:
address = join_url(url, address.xpath('@href')[0])
if address not in urls:
urls.extend([address])
# breakpoint()
# print("feed_mode_scan")
urls = await process_feed_selection(url, urls)
return urls
async def feed_mode_auto_discovery(url, tree):
"""
Lookup for feeds using RSS autodiscovery technique.
See: https://www.rssboard.org/rss-autodiscovery
Parameters
----------
db_file : str
Path to database file.
url : str
URL.
tree : TYPE
DESCRIPTION.
Returns
-------
msg : str
Message with URLs.
"""
xpath_query = (
'//link[(@rel="alternate") and '
'(@type="application/atom+xml" or '
'@type="application/rdf+xml" or '
'@type="application/rss+xml")]'
)
# xpath_query = """//link[(@rel="alternate") and (@type="application/atom+xml" or @type="application/rdf+xml" or @type="application/rss+xml")]/@href"""
# xpath_query = "//link[@rel='alternate' and @type='application/atom+xml' or @rel='alternate' and @type='application/rss+xml' or @rel='alternate' and @type='application/rdf+xml']/@href"
feeds = tree.xpath(xpath_query)
if feeds:
urls = []
for feed in feeds:
# # The following code works;
# # The following code will catch
# # only valid resources (i.e. not 404);
# # The following code requires more bandwidth.
# res = await fetch.http(feed)
# if res[0]:
# disco = parse(res[0])
# title = disco["feed"]["title"]
# msg += "{} \n {} \n\n".format(title, feed)
# feed_name = feed.xpath('@title')[0]
# feed_addr = join_url(url, feed.xpath('@href')[0])
# if feed_addr.startswith("/"):
# feed_addr = url + feed_addr
address = join_url(url, feed.xpath('@href')[0])
if address not in urls:
urls.extend([address])
# breakpoint()
# print("feed_mode_auto_discovery")
urls = await process_feed_selection(url, urls)
return urls
# TODO Segregate function into function that returns
# URLs (string) and Feeds (dict) and function that
# composes text message (string).
# Maybe that's not necessary.
async def process_feed_selection(url, urls):
feeds = {}
for i in urls:
res = await fetch.http(i)
status_code = res[1]
if status_code == 200:
try:
feeds[i] = [parse(res[0])]
except:
continue
message = (
"Web feeds found for {}\n\n```\n"
).format(url)
urls = []
for feed_url in feeds:
# try:
# res = await fetch.http(feed)
# except:
# continue
feed_name = None
if "title" in feeds[feed_url][0]["feed"].keys():
feed_name = feeds[feed_url][0].feed.title
feed_name = feed_name if feed_name else "Untitled"
# feed_name = feed_name if feed_name else urlsplit(feed_url).netloc
# AttributeError: 'str' object has no attribute 'entries'
if "entries" in feeds[feed_url][0].keys():
feed_amnt = feeds[feed_url][0].entries
else:
continue
if feed_amnt:
# NOTE Because there could be many false positives
# which are revealed in second phase of scan, we
# could end with a single feed, which would be
# listed instead of fetched, so feed_url_mark is
# utilized in order to make fetch possible.
# NOTE feed_url_mark was a variable which stored
# single URL (probably first accepted as valid)
# in order to get an indication whether a single
# URL has been fetched, so that the receiving
# function will scan that single URL instead of
# listing it as a message.
url = {'url' : feed_url,
'index' : None,
'name' : feed_name,
'code' : status_code,
'error' : False,
'exist' : None}
urls.extend([url])
count = len(urls)
if count > 1:
result = urls
elif count:
result = urls[0]
else:
result = None
return result
# def get_discovered_feeds(url, urls):
# message = (
# "Found {} web feeds:\n\n```\n"
# ).format(len(urls))
# if len(urls) > 1:
# for urls in urls:
# message += (
# "Title : {}\n"
# "Link : {}\n"
# "\n"
# ).format(url, url.title)
# message += (
# "```\nThe above feeds were extracted from\n{}"
# ).format(url)
# elif len(urls) > 0:
# result = urls
# else:
# message = (
# "No feeds were found for {}"
# ).format(url)
# return result
# Test module
# TODO ModuleNotFoundError: No module named 'slixfeed'
# import slixfeed.fetch as fetch
# from slixfeed.action import is_feed, process_feed_selection
# async def start(url):
# while True:
# result = await fetch.http(url)
# document = result[0]
# status = result[1]
# if document:
# feed = parse(document)
# if is_feed(feed):
# print(url)
# else:
# urls = await probe_page(
# url, document)
# if len(urls) > 1:
# await process_feed_selection(urls)
# elif urls:
# url = urls[0]
# else:
# response = (
# "> {}\nFailed to load URL. Reason: {}"
# ).format(url, status)
# break
# return response
# url = "https://www.smh.com.au/rssheadlines"
# start(url)