Source code for nefelibata.assistants.mirror_images

import hashlib
import mimetypes
import re
from pathlib import Path

import requests
from bs4 import BeautifulSoup
from nefelibata.assistants import Assistant
from nefelibata.assistants import Scope
from nefelibata.post import Post


CHUNK_SIZE = 2048


[docs]def get_resource_extension(url: str) -> str: response = requests.head(url) content_type = response.headers["content-type"] extension = mimetypes.guess_extension(content_type) return extension or ""
[docs]class MirrorImagesAssistant(Assistant): scopes = [Scope.POST, Scope.SITE]
[docs] def process_post(self, post: Post, force: bool = False) -> None: self._process_file(post.file_path.with_suffix(".html"))
[docs] def process_site(self, force: bool = False) -> None: for path in (self.root / "build").glob("*.html"): self._process_file(path)
def _process_file(self, file_path: Path) -> None: mirror = file_path.parent / "img" if not mirror.exists(): mirror.mkdir() with open(file_path) as inp: html = inp.read() soup = BeautifulSoup(html, "html.parser") external_images = soup.find_all("img", src=re.compile("http")) for image in external_images: url = image.attrs["src"] extension = get_resource_extension(url) m = hashlib.md5() m.update(url.encode("utf-8")) filename = f"{m.hexdigest()}{extension}" local = mirror / filename # download and store locally if not local.exists(): response = requests.get(url, stream=True) with open(local, "wb") as outp: for chunk in response.iter_content(chunk_size=CHUNK_SIZE): outp.write(chunk) image.attrs["src"] = "img/%s" % local.name if external_images: html = str(soup) with open(file_path, "w") as fp: fp.write(html)