import json
import logging
import re
import urllib.parse
from pathlib import Path
from typing import Any
from typing import cast
from typing import Dict
from typing import List
from typing import Optional
import dateutil.parser
import requests
from bs4 import BeautifulSoup
from nefelibata.announcers import Announcer
from nefelibata.announcers import Comment
from nefelibata.announcers import Response
from nefelibata.announcers import User
from nefelibata.post import Post
_logger = logging.getLogger(__name__)
# languages supported by IndieNews
SUPPORTED_LANGUAGES = ["en", "sv", "de", "fr", "nl", "ru"]
# URL to send users to comment on posts via webmention
COMMENT_URL = "https://commentpara.de/"
[docs]def get_webmention_endpoint(url) -> Optional[str]:
# start with a HEAD request
response = requests.head(url)
if "Link" in response.headers:
header = response.headers["Link"]
links = requests.utils.parse_header_links(header)
for link in links:
if link["rel"] == "webmention":
return cast(str, urllib.parse.urljoin(url, link["url"]))
elif "text/html" not in response.headers.get("Content-Type", ""):
return None
response = requests.get(url)
html = response.content
soup = BeautifulSoup(html, "html.parser")
link = soup.find(rel="webmention")
if link:
return cast(str, urllib.parse.urljoin(url, link["href"]))
return None
[docs]def get_response_from_child(child: Dict[str, Any], target: str) -> Response:
# for the source, let's try to find a name, else fall back to URL
source = child.get("name") or child.get("url") or "Unknown"
url = child.get("url") or "#"
id_ = f'webmention:{child["wm-id"]}'
# for timestamp we fall back to when the response was received
timestamp = child.get("published") or child["wm-received"]
timestamp = dateutil.parser.parse(timestamp).isoformat()
user: User = {
"name": child["author"]["name"],
"image": child["author"]["photo"],
"url": child["author"]["url"],
}
if "content" in child:
text = child["content"].get("text", "")
html = child["content"].get("html", "")
summary = summarize(html or text, target)
else:
text = summary = ""
comment: Comment = {"text": text, "summary": summary}
return {
"source": source,
"url": url,
"id": id_,
"timestamp": timestamp,
"user": user,
"comment": comment,
}
[docs]def summarize(text: str, target: Optional[str] = None) -> str:
soup = BeautifulSoup(text, "html.parser")
# search for a paragraph containing the link
if target:
anchor = soup.find("a", href=target)
if anchor:
p = anchor.find_parent("p")
if p:
# what should we do with really long paragraphs? is that
# even a problem?
return str(p.get_text())
# return the first line
text = soup.get_text()
lines = text.split("\n")
return lines[0]
[docs]class WebmentionAnnouncer(Announcer):
id = "webmention"
name = "Webmention"
url_header = "webmention-url"
def __init__(
self, root: Path, config: Dict[str, Any], endpoint: str,
):
super().__init__(root, config)
self.endpoint = endpoint # used only in template
[docs] def should_announce(self, post: Post) -> bool:
# Since the plugin can announce to multiple places, and new places can be added
# after a post has been published, we always try to find new links to which we
# should send mentions.
return True
[docs] def announce(self, post: Post) -> str:
_logger.info("Discovering links supporting webmention...")
# store successful mentions and their responses in a JSON file
post_directory = post.file_path.parent
storage = post_directory / "webmentions.json"
if storage.exists():
with open(storage) as fp:
webmentions = json.load(fp)
else:
webmentions = {}
source = urllib.parse.urljoin(self.config["url"], post.url)
soup = BeautifulSoup(post.html, "html.parser")
new_mentions = False
for el in soup.find_all("a", href=re.compile("http")):
target = el.attrs.get("href")
if target not in webmentions:
_logger.info(f"Checking {target}")
webmentions[target] = self._send_mention(source, target)
new_mentions = True
keywords = [
keyword.strip() for keyword in post.parsed.get("keywords", "").split(",")
]
if "indieweb" in keywords or "indienews" in keywords:
language = post.parsed.get("language") or self.config["language"]
if language not in SUPPORTED_LANGUAGES:
_logger.error(
f'Currently IndieNews supports only the following languages: {", ".join(SUPPORTED_LANGUAGES)}',
)
else:
target = f"https://news.indieweb.org/{language}"
if target not in webmentions:
_logger.info(f"Checking {target}")
webmentions[target] = self._send_mention(source, target)
new_mentions = True
if new_mentions:
with open(storage, "w") as fp:
json.dump(webmentions, fp)
_logger.info("Success!")
return COMMENT_URL
def _send_mention(self, source: str, target: str) -> Dict[str, Any]:
endpoint = get_webmention_endpoint(target)
if not endpoint:
_logger.info("No endpoint found")
return {"success": False}
_logger.info(f"Sending mention to {endpoint}")
payload = {
"source": source,
"target": target,
}
response = requests.post(endpoint, data=payload)
info: Dict[str, Any] = {"success": response.ok}
if response.ok:
try:
info["content"] = response.json()
except ValueError:
info["content"] = response.text
return info
[docs] def collect(self, post: Post) -> List[Response]:
_logger.info("Collecting webmentions")
target = urllib.parse.urljoin(self.config["url"], post.url)
url = "https://webmention.io/api/mentions.jf2"
payload = {"target": target}
response = requests.get(url, params=payload)
try:
response.raise_for_status()
except Exception:
_logger.exception(f"Failed to load webmentions for {target}")
return []
feed = response.json()
_logger.info("Success!")
return [get_response_from_child(child, target) for child in feed["children"]]