118 lines
3.7 KiB
Python
118 lines
3.7 KiB
Python
import helpers
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from pathlib import Path
|
|
import base64
|
|
|
|
|
|
def getTweets():
|
|
""" Get list of tweets, with tweet ID and content, from configured Twitter account URL.
|
|
|
|
This function relies on BeautifulSoup to extract the tweet IDs and content of all tweets on the specified page.
|
|
|
|
The data is returned as a list of dictionaries that can be used by other functions.
|
|
"""
|
|
|
|
all_tweets = []
|
|
url = helpers._config("tweets.source_account_url")
|
|
|
|
if not url:
|
|
helpers._error(
|
|
f"getTweets() => The source Twitter account URL ({url}) was incorrect. Could not retrieve tweets.")
|
|
return False
|
|
|
|
headers = {}
|
|
headers["accept-language"] = "en-US,en;q=0.9"
|
|
headers["dnt"] = "1"
|
|
headers["user-agent"] = helpers._config("gen.APP_NAME")
|
|
|
|
data = requests.get(url)
|
|
html = BeautifulSoup(data.text, "html.parser")
|
|
timeline = html.select("#timeline li.stream-item")
|
|
|
|
if timeline is None:
|
|
helpers._error(
|
|
f"getTweets() => Could not retrieve tweets from the page. Please make sure the source Twitter account URL ({url}) is correct."
|
|
)
|
|
return False
|
|
|
|
helpers._info(f"getTweets() => Fetched tweets for {url}.")
|
|
|
|
for tweet in timeline:
|
|
tweet_id = tweet["data-item-id"]
|
|
try:
|
|
tweet_text = tweet.select("p.tweet-text")[0].get_text()
|
|
except:
|
|
helpers._info("getTweets() => No tweet text found. Moving on...")
|
|
continue
|
|
all_tweets.append({"id": tweet_id, "text": tweet_text})
|
|
return all_tweets if len(all_tweets) > 0 else None
|
|
|
|
|
|
def tootTheTweet(tweet):
|
|
""" Receieve a dictionary containing Tweet ID and text... and TOOT!
|
|
|
|
This function relies on the requests library to post the content to your Mastodon account (human or bot).
|
|
|
|
A boolean success status is returned.
|
|
|
|
Arguments:
|
|
tweet {dictionary} -- Dictionary containing the "id" and "text" of a single tweet.
|
|
"""
|
|
|
|
host_instance = helpers._config("toots.host_instance")
|
|
token = helpers._config("toots.app_secure_token")
|
|
tweet_id = tweet["id"]
|
|
|
|
if not host_instance:
|
|
helpers._error(
|
|
f"tootTheTweet() => Your host Mastodon instance URL ({host_instance}) was incorrect."
|
|
)
|
|
return False
|
|
|
|
if not token:
|
|
helpers._error("tootTheTweet() => Your Mastodon access token was incorrect.")
|
|
return False
|
|
|
|
headers = {}
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
headers["Idempotency-Key"] = tweet_id
|
|
|
|
data = {}
|
|
data["status"] = tweet["text"]
|
|
data["visibility"] = "public"
|
|
|
|
tweet_check_file_path = helpers._config("toots.cache_path") + tweet["id"]
|
|
tweet_check_file = Path(tweet_check_file_path)
|
|
if tweet_check_file.is_file():
|
|
helpers._info(
|
|
f"tootTheTweet() => Tweet {tweet_id} was already posted. Reposting..."
|
|
)
|
|
return False
|
|
else:
|
|
tweet["text"].encode("utf-8")
|
|
|
|
tweet_check = open(tweet_check_file_path, mode="w")
|
|
tweet_check.write(tweet["text"])
|
|
tweet_check.close()
|
|
|
|
helpers._info(
|
|
f'tootTheTweet() => New tweet {tweet_id} => "{tweet["text"]}".'
|
|
)
|
|
|
|
response = requests.post(
|
|
url=f"{host_instance}/api/v1/statuses", data=data, headers=headers
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
helpers._info(f"tootTheTweet() => OK. Posted tweet {tweet_id} to Mastodon.")
|
|
helpers._info(f"tootTheTweet() => Response: {response.text}")
|
|
return True
|
|
|
|
else:
|
|
helpers._info(
|
|
f"tootTheTweet() => FAIL. Could not post tweet {tweet_id} to Mastodon."
|
|
)
|
|
helpers._info(f"tootTheTweet() => Response: {response.text}")
|
|
return False
|