Slixfeed/slixfeed/fetch.py

185 lines
5.3 KiB
Python
Raw Normal View History

2023-10-24 16:43:14 +02:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
FIXME
1) feed_mode_scan doesn't find feed for https://www.blender.org/
even though it should be according to the pathnames dictionary.
2023-11-23 17:55:36 +01:00
TODO
1) Support Gemini and Gopher.
2) Check also for HTML, not only feed.bozo.
3) Add "if utility.is_feed(url, feed)" to view_entry and view_feed
2024-01-14 19:05:12 +01:00
4) Replace sqlite.remove_nonexistent_entries by sqlite.check_entry_exist
Same check, just reverse.
5) Support protocol Gopher
See project /michael-lazar/pygopherd
See project /gopherball/gb
6) Support ActivityPub @person@domain (see Tip Of The Day).
7) See project /offpunk/offblocklist.py
"""
from aiohttp import ClientError, ClientSession, ClientTimeout
from asyncio import TimeoutError
# from asyncio.exceptions import IncompleteReadError
# from bs4 import BeautifulSoup
# from http.client import IncompleteRead
import logging
# from lxml import html
2023-10-24 16:43:14 +02:00
# from xml.etree.ElementTree import ElementTree, ParseError
import requests
import slixfeed.config as config
try:
from magnet2torrent import Magnet2Torrent, FailedToFetchException
except:
logging.info(
"Package magnet2torrent was not found.\n"
"BitTorrent is disabled.")
# class FetchDat:
# async def dat():
# class FetchFtp:
# async def ftp():
# class FetchGemini:
# async def gemini():
2023-10-24 16:43:14 +02:00
# class FetchGopher:
# async def gopher():
2023-10-24 16:43:14 +02:00
# class FetchHttp:
# async def http():
# class FetchIpfs:
# async def ipfs():
def http_response(url):
"""
Download response headers.
Parameters
----------
url : str
URL.
Returns
-------
response: requests.models.Response
HTTP Header Response.
Result would contain these:
response.encoding
response.headers
response.history
response.reason
response.status_code
response.url
"""
user_agent = (
config.get_value(
"settings", "Network", "user_agent")
) or 'Slixfeed/0.1'
headers = {
"User-Agent": user_agent
}
try:
# Don't use HEAD request because quite a few websites may deny it
# response = requests.head(url, headers=headers, allow_redirects=True)
response = requests.get(url, headers=headers, allow_redirects=True)
except Exception as e:
logging.error(str(e))
response = None
return response
async def http(url):
2023-10-24 16:43:14 +02:00
"""
Download content of given URL.
2023-11-02 06:17:04 +01:00
Parameters
----------
url : list
URL.
Returns
-------
msg: list or str
Document or error message.
2023-10-24 16:43:14 +02:00
"""
user_agent = (config.get_values('settings.toml', 'network')['user_agent']
or 'Slixfeed/0.1')
headers = {'User-Agent': user_agent}
proxy = (config.get_values('settings.toml', 'network')['http_proxy']
or '')
timeout = ClientTimeout(total=10)
async with ClientSession(headers=headers) as session:
# async with ClientSession(trust_env=True) as session:
2023-10-24 16:43:14 +02:00
try:
async with session.get(url, proxy=proxy,
# proxy_auth=(proxy_username, proxy_password),
timeout=timeout
) as response:
2023-10-24 16:43:14 +02:00
status = response.status
if status == 200:
2023-10-24 16:43:14 +02:00
try:
document = await response.text()
result = {'charset': response.charset,
'content': document,
'content_length': response.content_length,
'content_type': response.content_type,
'error': False,
'message': None,
'original_url': url,
'status_code': status,
'response_url': response.url}
2023-10-24 16:43:14 +02:00
except:
result = {'error': True,
'message': 'Could not get document.',
'original_url': url,
'status_code': status,
'response_url': response.url}
2023-10-24 16:43:14 +02:00
else:
result = {'error': True,
'message': 'HTTP Error:' + str(status),
'original_url': url,
'status_code': status,
'response_url': response.url}
except ClientError as e:
result = {'error': True,
'message': 'Error:' + str(e),
'original_url': url,
'status_code': None}
except TimeoutError as e:
result = {'error': True,
'message': 'Timeout:' + str(e),
'original_url': url,
'status_code': None}
except Exception as e:
logging.error(e)
result = {'error': True,
'message': 'Error:' + str(e),
'original_url': url,
'status_code': None}
return result
async def magnet(link):
m2t = Magnet2Torrent(link)
try:
filename, torrent_data = await m2t.retrieve_torrent()
except FailedToFetchException:
logging.debug("Failed")