Source code for nefelibata.assistants.archive_links
import json
import logging
import re
import urllib.parse
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from pathlib import Path
import dateutil.parser
import requests
from bs4 import BeautifulSoup
from nefelibata.assistants import Assistant
from nefelibata.assistants import Scope
from nefelibata.post import Post
_logger = logging.getLogger(__name__)
RETRY_TIMEOUT = timedelta(days=1)
[docs]class ArchiveLinksAssistant(Assistant):
scopes = [Scope.POST]
def _safe(self, resource: str):
if resource.startswith(self.config["url"]):
return True
# if the blog uses an external endpoint for webmention that's ok
if (
"webmention" in self.config
and resource == self.config["webmention"]["endpoint"]
):
return True
# ignore links to archive itself
parsed = urllib.parse.urlparse(resource)
return parsed.netloc.endswith("archive.org")
[docs] def process_post(self, post: Post, force: bool = False) -> None:
post_directory = post.file_path.parent
storage = post_directory / "archives.json"
if storage.exists():
with open(storage) as fp:
archives = json.load(fp)
else:
archives = {}
# find links from the post HTML
soup = BeautifulSoup(post.html, "html.parser")
archived = False
for el in soup.find_all("a", href=re.compile("http")):
url = el.attrs["href"]
if not url or self._safe(url):
continue
if url in archives:
# if the URL has been archived already, skip
if archives[url]["url"]:
continue
# if the URL has not been archived but we tried
# recently, skip it to not overload archive.org
if (
datetime.now().astimezone(timezone.utc)
- dateutil.parser.parse(archives[url]["date"])
< RETRY_TIMEOUT
):
continue
# save to archive.org
_logger.info(f"Requesting to save {url}")
response = requests.get(f"https://web.archive.org/save/{url}")
content_location = response.headers.get("Content-Location")
archived_url = (
f"https://web.archive.org{content_location}"
if content_location
else None
)
# add link to archived version
archives[url] = {
"url": archived_url,
"date": datetime.now().astimezone(timezone.utc).isoformat(),
}
archived = True
if archived:
with open(storage, "w") as fp:
json.dump(archives, fp)
# now enrich links in the generated HTML
index_html = post.file_path.with_suffix(".html")
with open(index_html) as fp:
html = fp.read()
soup = BeautifulSoup(html, "html.parser")
# remove existing archive notes
for el in soup.find_all("span", attrs={"class": "archive"}):
el.decompose()
modified = False
for el in soup.find_all("a", href=re.compile("http")):
url = el.attrs["href"]
if url not in archives:
continue
if el.attrs.get("data-archive-url") != archives[url]["url"]:
el.attrs["data-archive-url"] = archives[url]["url"]
el.attrs["data-archive-date"] = archives[url]["date"]
span = soup.new_tag("span", attrs={"class": "archive"})
anchor = soup.new_tag("a", href=archives[url]["url"])
anchor.string = "archived"
span.extend(["[", anchor, "]"])
el.insert_after(span)
modified = True
if modified:
html = str(soup)
with open(index_html, "w") as fp:
fp.write(html)