Source code for nefelibata.assistants.mirror_images
import hashlib
import mimetypes
import re
from collections import defaultdict
from io import BytesIO
from pathlib import Path
import piexif
import requests
from bs4 import BeautifulSoup
from nefelibata.assistants import Assistant
from nefelibata.assistants import Scope
from nefelibata.post import Post
from nefelibata.utils import modify_html
from PIL import Image
CHUNK_SIZE = 2048
[docs]def get_resource_extension(url: str) -> str:
response = requests.head(url)
content_type = response.headers["content-type"]
extension = mimetypes.guess_extension(content_type)
return extension or ""
[docs]class MirrorImagesAssistant(Assistant):
scopes = [Scope.POST, Scope.SITE]
[docs] def process_post(self, post: Post, force: bool = False) -> None:
self._process_file(post.file_path.with_suffix(".html"))
[docs] def process_site(self, force: bool = False) -> None:
for path in (self.root / "build").glob("*.html"):
self._process_file(path)
def _process_file(self, file_path: Path) -> None:
mirror = file_path.parent / "img"
if not mirror.exists():
mirror.mkdir()
soup: BeautifulSoup
with modify_html(file_path) as soup:
external_images = soup.find_all("img", src=re.compile("http"))
for image in external_images:
url = image.attrs["src"]
extension = get_resource_extension(url).lower()
m = hashlib.md5()
m.update(url.encode("utf-8"))
filename = f"{m.hexdigest()}{extension}"
local = mirror / filename
image.attrs["src"] = "img/%s" % local.name
if local.exists():
continue
# download and store locally
buf = BytesIO()
response = requests.get(url, stream=True)
for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
buf.write(chunk)
# store original URL in EXIF
if extension in [".jpeg", ".jpg"]:
buf.seek(0)
im = Image.open(buf)
exif = (
piexif.load(im.info["exif"])
if "exif" in im.info
else defaultdict(dict)
)
exif["0th"][piexif.ImageIFD.ImageDescription] = url
buf = BytesIO()
im.save(buf, "jpeg", exif=piexif.dump(exif))
with open(local, "wb") as outp:
outp.write(buf.getvalue())