Source code for nefelibata.announcers.fiftyninety
import json
import logging
import re
import textwrap
import urllib.parse
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from pathlib import Path
from typing import Any
from typing import Dict
from typing import List
from typing import Tuple
import dateutil.parser
import requests
from bs4 import BeautifulSoup
from bs4 import NavigableString
from bs4.element import Tag
from dateutil.parser._parser import ParserError
from nefelibata.announcers import Announcer
from nefelibata.announcers import Comment
from nefelibata.announcers import Response
from nefelibata.announcers import User
from nefelibata.post import Post
_logger = logging.getLogger(__name__)
[docs]def get_session(username: str, password: str) -> requests.Session:
session = requests.Session()
response = session.get("http://fiftyninety.fawmers.org/")
soup = BeautifulSoup(response.text, "html.parser")
form = soup.find("form", id="user-login-form")
el = form.find("input", {"name": "form_build_id"})
form_build_id = el.attrs["value"]
# get authentication cookie
url = "http://fiftyninety.fawmers.org/node"
params = {
"name": username,
"pass": password,
"form_build_id": form_build_id,
"form_id": "user_login_block",
"feed_me": "",
"op": "Log+in",
}
session.post(url, params={"destination": "node"}, data=params)
return session
[docs]def get_fid(session, options: str, demo: str) -> str:
url = "http://fiftyninety.fawmers.org/media/browser"
params = {
"options": options,
"plugins": "undefined",
"render": "media-popup",
}
response = session.get(url, params=params)
form_build_id, form_token = get_form_params_from_input(
response.text, "remote-stream-wrapper-file-add-form",
)
data = {
"form_build_id": form_build_id,
"form_id": "remote_stream_wrapper_file_add_form",
"form_token": form_token,
"op": "Submit",
"url": demo,
}
response = session.post(url, params=params, data=data, allow_redirects=False)
url = response.headers["Location"]
qs = urllib.parse.urlparse(url).query
parsed = urllib.parse.parse_qs(qs)
return parsed["fid"][0]
[docs]def get_form_params_from_input(html: str, form_id: str) -> Tuple[str, str]:
soup = BeautifulSoup(html, "html.parser")
form = soup.find("form", id=form_id)
el = form.find("input", {"name": "form_build_id"})
form_build_id = el.attrs["value"]
el = form.find("input", {"name": "form_token"})
form_token = el.attrs["value"]
return form_build_id, form_token
[docs]def extract_params(
session: requests.Session, post: Post, root: Path, config: Dict[str, Any],
) -> Dict[str, Any]:
"""Extract params from a standard FiftyNinety post.
"""
soup = BeautifulSoup(post.html, "html.parser")
# liner notes are between <h1>s
notes_h1 = soup.find("h1", text="Liner Notes")
lines = []
next_sibling = notes_h1.next_sibling
while next_sibling and next_sibling.name != "h1":
try:
content = next_sibling.get_text()
except AttributeError:
content = next_sibling.string
lines.append(content)
if next_sibling.name == "p":
lines.append("\n")
next_sibling = next_sibling.next_sibling
notes = "".join(lines).strip()
# lyrics are inside a <pre> element
try:
pre = soup.find("pre")
lyrics = textwrap.dedent(pre.text).strip()
except Exception:
lyrics = "N/A"
# search for a single MP3 in the post directory to use as demo
post_directory = post.file_path.parent
mp3s = list(post_directory.glob("**/*.mp3"))
if len(mp3s) == 1:
mp3_path = mp3s[0].relative_to(root / "posts")
demo = f'{config["url"]}{mp3_path}'
elif len(mp3s) > 1:
_logger.error("Multiple MP3s found, aborting!")
raise Exception("Only posts with a single MP3 can be announced on FiftyNinety")
else:
demo = ""
# get tokens used in POST
response = session.get("http://fiftyninety.fawmers.org/node/add/song")
form_build_id, form_token = get_form_params_from_input(
response.text, "song-node-form",
)
# get additional params for file upload; these are encoded in the JS
soup = BeautifulSoup(response.text, "html.parser")
el = soup.find("script", text=re.compile(r"^jQuery\.extend"))
if not el:
raise Exception("Unable to find options from Javascript")
match = re.search("{.*}", el.contents[0])
if not match:
raise Exception("Unable to parse options from Javascript")
payload = match.group(0)
arg = json.loads(payload)
options = arg["media"]["elements"][
".js-media-element-edit-field-demo-und-0-upload"
]["global"]["options"]
# get an ID referencing the demo
fid = get_fid(session, options, demo) if demo else ""
return {
"body[und][0][value]": notes,
"field_collab[und][0][_weight]": "0",
"field_demo[und][0][display]": "1",
"field_demo[und][0][fid]": fid,
"field_downloadable[und]": "1" if demo else "0",
"field_lyrics[und][0][value]": lyrics,
"field_tags[und]": post.parsed["keywords"],
"form_build_id": form_build_id,
"form_id": "song_node_form",
"form_token": form_token,
"op": "Save",
"title": post.title,
"changed": "",
}
[docs]def get_comments_from_fiftyninety_page(
session: requests.Session, url: str, username: str, password: str,
) -> List[Response]:
"""Extract comments from a given FiftyNinety page.
"""
base_url = "http://fiftyninety.fawmers.org"
response = session.get(url)
response.encoding = "UTF-8"
html = response.text
soup = BeautifulSoup(html, "html.parser")
responses: List[Response] = []
# the HTML is so broken that we can't rely on relationship between nodes
# first, we extract comment IDs
ids = [
int(el.attrs["id"].split("-", 1)[1])
for el in soup.find_all("a", {"id": re.compile(r"comment-\d+")})
]
# now we extract usernames and time of the comment
users: List[User] = []
timestamps = []
for el in soup.find_all("p", {"class": "author-datetime"}):
fuzzy_timestamp = el.text.split(" - ")[1]
timestamps.append(parse_fuzzy_timestamp(fuzzy_timestamp))
anchor = el.find("a")
users.append({"name": anchor.text, "url": f"{base_url}{anchor.attrs['href']}"})
# extract comments and user images
comments: List[Comment] = []
for i, el in enumerate(soup.find_all("div", {"class": "comment-content"})):
if el.find("img"):
# the linked image is low resolution, but we can replace it with a
# higher resolution version
users[i]["image"] = (
el.find("img").attrs["src"].replace("/smallthumb/", "/medium/")
)
comments.append(
{
"text": "\n\n".join(p.text for p in el.find_all("p")),
"url": f"{url}#comment-{ids[i]}",
},
)
for id_, user, timestamp, comment, in zip(ids, users, timestamps, comments):
responses.append(
{
"source": "50/90",
"url": url,
"color": "#284ead",
"id": f"fiftyninety:{id_}",
"timestamp": timestamp.isoformat(),
"user": user,
"comment": comment,
},
)
return responses
[docs]def parse_fuzzy_timestamp(timestamp: str) -> datetime:
"""Parse fuzzy timestamps.
FiftyNinety annotates comments with relative timestamps:
- 0 sec
- 3 min 50 sec
- 20 hours 38 min
- 1 hour 52 min
- 1 day 16 hours
"""
now = datetime.now(tz=timezone.utc)
timedelta_units = {
"sec": "seconds",
"secs": "seconds",
"min": "minutes",
"mins": "minutes",
"hour": "hours",
"hours": "hours",
"day": "days",
"days": "days",
"week": "weeks",
"weeks": "weeks",
}
kwargs = {}
parts = timestamp.strip().split(" ")
for number, units in zip(parts[::2], parts[1::2]):
value = int(number)
if units in ["month", "months"]:
units = "days"
value *= 30
kwargs = {timedelta_units[units]: value}
now -= timedelta(**kwargs)
return now
[docs]class FiftyNinetyAnnouncer(Announcer):
"""FiftyNinety Announcer
FiftyNinety is a website where every year of people participate in a
challenge to write 50 songs in 90 days during the northern hemisphere summer.
Every year the website reboots, and comments are lost, making it particularly
suited for Nefelibata.
In order to publish a song, the post should be structured like this:
# Liner Notes
<notes about the song>
# Lyrics
<pre>
<lyrics go here>
</pre>
Lyrics are optional. Everything else is ignored, so you can add an audio
player outside of those sections.
You also need to have an MP3 in your post directory, if you want to have it
published as a demo.
"""
id = "fiftyninety"
name = "FiftyNinety"
url_header = "fiftyninety-url"
def __init__(
self, root: Path, config: Dict[str, Any], username: str, password: str,
):
super().__init__(root, config)
self.username = username
self.password = password
[docs] def announce(self, post: Post) -> str:
"""Publish the song to FiftyNinety.
"""
_logger.info("Creating new song on FiftyNinety")
# login and store auth cookie
session = get_session(self.username, self.password)
params = extract_params(session, post, self.root, self.config)
response = session.post(
"http://fiftyninety.fawmers.org/node/add/song",
data=params,
allow_redirects=False,
)
url = response.headers["Location"]
_logger.info("Success!")
return url
[docs] def collect(self, post: Post) -> List[Response]:
_logger.info("Collecting comments from FiftyNinety")
# login and store auth cookie
session = get_session(self.username, self.password)
url = post.parsed[self.url_header]
responses = get_comments_from_fiftyninety_page(
session, url, self.username, self.password,
)
_logger.info("Success!")
return responses