Moving functions from main run.py to their own social.py file.

pull/1/head
Ayush Sharma 2018-09-06 18:45:08 +05:30
vecāks 7640831acc
revīzija 082cdb4b93
8 mainītis faili ar 125 papildinājumiem un 125 dzēšanām

0
.gitignore ārējs Normal file → Executable file
Parādīt failu

0
LICENSE.md Normal file → Executable file
Parādīt failu

0
README.md Normal file → Executable file
Parādīt failu

4
config.json Normal file → Executable file
Parādīt failu

@ -1,8 +1,8 @@
{
"gen.APP_NAME": "Tweet-Toot",
"gen.log_timestamp": "%Y-%m-%d %H:%M:%S",
"tweets.source_account_url": "",
"toots.host_instance": "",
"tweets.source_account_url": "https://twitter.com/SarcasmMother",
"toots.host_instance": "https://botsin.space",
"toots.app_secure_token": "",
"toots.cache_path": "/tmp/"
}

0
helpers.py Normal file → Executable file
Parādīt failu

0
requirements.txt Normal file → Executable file
Parādīt failu

127
run.py
Parādīt failu

@ -1,125 +1,6 @@
import helpers
import social
import sys
import requests
from bs4 import BeautifulSoup
from pathlib import Path
def isAlreadyTooted(tweet_id):
""" Check if a tweet has already been POSTed to Mastodon. If so, let's not do that again.
This is important!
Since this script will likely run as a cron, tweet-bombing our favorite Mastodon neighbourhood
will ruin things for everyone.
Arguments:
tweet_id {string} -- Numerical tweet ID returned by getTweets().
"""
cache_path = helpers._config('toots.cache_path')
my_file = Path(cache_path + tweet_id)
if my_file.is_file():
return True
else:
return False
def getTweets():
""" Get list of tweets, with tweet ID and content, from configured Twitter account URL.
This function relies on BeautifulSoup to extract the tweet IDs and content of all tweets on the specified page.
The data is returned as a list of dictionaries that can be used by other functions.
"""
all_tweets = []
url = helpers._config('tweets.source_account_url')
if not url:
return False
headers = {}
headers['accept-language'] = 'en-US,en;q=0.9'
headers['dnt'] = '1'
headers['user-agent'] = helpers._config('gen.APP_NAME')
data = requests.get(url)
html = BeautifulSoup(data.text, 'html.parser')
timeline = html.select('#timeline li.stream-item')
if timeline is None:
return False
helpers._info('getTweets => Fetching tweets for ' + url + '.')
for tweet in timeline:
tweet_id = tweet['data-item-id']
tweet_text = tweet.select('p.tweet-text')[0].get_text()
all_tweets.append({"id": tweet_id, "text": tweet_text})
return all_tweets if len(all_tweets) > 0 else None
def tootTheTweet(tweet):
""" Receieve a dictionary containing Tweet ID and text... and TOOT!
This function relies on the requests library to post the content to your Mastodon account (human or bot).
A boolean success status is returned.
Arguments:
tweet {dictionary} -- Dictionary containing the "id" and "text" of a single tweet.
"""
if isAlreadyTooted(tweet['id']):
helpers._info('tootTheTweet => ' +
tweet['id'] + ' already tooted. Skipping.')
return False
host_instance = helpers._config('toots.host_instance')
token = helpers._config('toots.app_secure_token')
headers = {}
headers['Authorization'] = 'Bearer ' + token
headers['Idempotency-Key'] = tweet['id']
data = {}
data['status'] = tweet['text']
data['visibility'] = 'public'
cache_path = '/tmp/' + tweet['id']
new_days = open(cache_path, 'w')
new_days.write(tweet['text'])
new_days.close()
response = requests.post(
url=host_instance + '/api/v1/statuses', data=data, headers=headers)
if response.status_code == 200:
helpers._info('tootTheTweet => OK (Response: ' + response.text + ')')
return True
else:
helpers._error(
'tootTheTweet => FAIL (Response: ' + response.text + ')')
return False
if __name__ == '__main__':
@ -130,12 +11,12 @@ if __name__ == '__main__':
It will only toot once per invokation to avoid flooding the instance.
"""
tweets = getTweets()
tweets = social.getTweets()
if not tweets:
helpers._error(
'__main__ => No tweets fetched. Please check the Twitter account URL "tweets.source_account_url" in the config.json file.')
'__main__ => No tweets fetched.')
sys.exit()
@ -143,7 +24,7 @@ if __name__ == '__main__':
for tweet in tweets:
if tootTheTweet(tweet):
if social.tootTheTweet(tweet):
helpers._info('__main__ => Tooted "' + tweet['text'] + '"')
helpers._info(

119
social.py Normal file
Parādīt failu

@ -0,0 +1,119 @@
import helpers
import requests
from bs4 import BeautifulSoup
from pathlib import Path
def getTweets():
""" Get list of tweets, with tweet ID and content, from configured Twitter account URL.
This function relies on BeautifulSoup to extract the tweet IDs and content of all tweets on the specified page.
The data is returned as a list of dictionaries that can be used by other functions.
"""
all_tweets = []
url = helpers._config('tweets.source_account_url')
if not url:
helpers._error('getTweets() => The source Twitter account URL (' + url + ') was incorrect. Could not retrieve tweets.')
return False
headers = {}
headers['accept-language'] = 'en-US,en;q=0.9'
headers['dnt'] = '1'
headers['user-agent'] = helpers._config('gen.APP_NAME')
data = requests.get(url)
html = BeautifulSoup(data.text, 'html.parser')
timeline = html.select('#timeline li.stream-item')
if timeline is None:
helpers._error('getTweets() => Could not retrieve tweets from the page. Please make sure the source Twitter account URL (' + url + ') is correct.')
return False
helpers._info('getTweets => Fetched tweets for ' + url + '.')
for tweet in timeline:
tweet_id = tweet['data-item-id']
tweet_text = tweet.select('p.tweet-text')[0].get_text()
all_tweets.append({"id": tweet_id, "text": tweet_text})
return all_tweets if len(all_tweets) > 0 else None
def tootTheTweet(tweet):
""" Receieve a dictionary containing Tweet ID and text... and TOOT!
This function relies on the requests library to post the content to your Mastodon account (human or bot).
A boolean success status is returned.
Arguments:
tweet {dictionary} -- Dictionary containing the "id" and "text" of a single tweet.
"""
host_instance = helpers._config('toots.host_instance')
token = helpers._config('toots.app_secure_token')
if not host_instance:
helpers._error('tootTheTweet() => Your host Mastodon instance URL (' + host_instance + ') was incorrect.')
return False
if not token:
helpers._error('tootTheTweet() => Your Mastodon access token was incorrect.')
return False
headers = {}
headers['Authorization'] = 'Bearer ' + token
headers['Idempotency-Key'] = tweet['id']
data = {}
data['status'] = tweet['text']
data['visibility'] = 'public'
tweet_check_file_path = helpers._config('toots.cache_path') + tweet['id']
tweet_check_file = Path(tweet_check_file_path)
if tweet_check_file.is_file():
helpers._info('tootTheTweet() => This tweet has already been posted. Skipping...')
return False
else:
tweet_check = open(tweet_check_file_path, 'w')
tweet_check.write(tweet['text'])
tweet_check.close()
helpers._info('tootTheTweet() => Caching new tweet.')
response = requests.post(
url=host_instance + '/api/v1/statuses', data=data, headers=headers)
if response.status_code == 200:
helpers._info('tootTheTweet() => OK. Posted tweet to Mastodon.')
helpers._info('tootTheTweet() => Response: ' + response.text)
return True
else:
helpers._info('tootTheTweet() => FAIL. Could not post tweet to Mastodon.')
helpers._info('tootTheTweet() => Response: ' + response.text)
return False