Files
NijiHolo_EN_ID_Bot/src/twapi.py
T

259 lines
10 KiB
Python
Raw Normal View History

2022-09-27 02:49:03 -07:00
import datetime
2022-09-27 15:09:09 -07:00
import traceback
import asyncio
2022-09-24 17:56:58 -07:00
import tweepy
import api_secrets
import talenttweet as tt
import util
class TwAPI:
2022-09-27 02:49:03 -07:00
tweets_fetched = 0
2022-09-24 17:56:58 -07:00
instance = None
TWEET_MEDIA_FIELDS = ['url']
2022-09-27 22:04:26 -07:00
TWEET_FIELDS = ['created_at', 'in_reply_to_user_id', 'referenced_tweets']
2022-09-24 17:56:58 -07:00
TWEET_EXPANSIONS = ['entities.mentions.username', 'referenced_tweets.id.author_id']
# Returns a tuple of user IDs:(reply_to, qrt, {mentions})
# for a single tweet.
2022-09-24 17:56:58 -07:00
#
# Tweet must have been queried with these parameters:
# media_fields=['url'],
# tweet_fields=['created_at', 'in_reply_to_user_id'],
# expansions=['entities.mentions.username', 'referenced_tweets.id.author_id']
@staticmethod
def get_mrq(tweet: tweepy.Tweet, response):
mentions = set()
reply_to = None
qrt = None
2022-09-24 17:56:58 -07:00
# mentions
try:
mention_list = tweet.entities['mentions']
for mention in mention_list:
mentions.add(int(mention['id']))
except:
pass
2022-09-24 17:56:58 -07:00
# reply-to
if tweet.in_reply_to_user_id != None:
reply_to = tweet.in_reply_to_user_id
2022-09-24 17:56:58 -07:00
# qrt
if tweet.referenced_tweets:
for ref_tweet in tweet.referenced_tweets:
2022-09-24 17:56:58 -07:00
if ref_tweet.type == 'quoted':
for incl_tweet in response.includes['tweets']:
if incl_tweet.id == ref_tweet.id:
qrt = incl_tweet.author_id
try:
mentions.remove(reply_to)
2022-10-01 14:09:14 -07:00
except: pass
try:
mentions.remove(qrt)
except: pass
return (mentions, reply_to, qrt)
2022-09-24 17:56:58 -07:00
def __init__(self):
TwAPI.instance = self
self.client = tweepy.Client(
bearer_token=api_secrets.bearer_token(),
consumer_key=api_secrets.api_key(), consumer_secret=api_secrets.api_secret(),
access_token=api_secrets.access_token(), access_token_secret=api_secrets.access_secret()
)
2022-09-27 02:49:03 -07:00
self.api = tweepy.API(
auth=tweepy.OAuthHandler(
consumer_key=api_secrets.api_key(), consumer_secret=api_secrets.api_secret(),
access_token=api_secrets.access_token(), access_token_secret=api_secrets.access_secret()
)
)
try:
self.me = self.client.get_me().data
except Exception as e:
print('Did you setup secrets.ini?')
raise e
2022-09-27 15:09:09 -07:00
print(f'Assuming the account of @{self.me.data["username"]} ({self.me["id"]})')
2022-09-24 17:56:58 -07:00
2022-09-27 15:09:09 -07:00
## ---[COMMENT OUT WHEN NOT IN USE]---
async def nuke_tweets(self):
async def delete_tweet(id):
try:
self.client.delete_tweet(id)
except tweepy.TooManyRequests as e:
wait_for = float(e.response.headers["x-rate-limit-reset"]) - datetime.datetime.now().timestamp() + 1
print(f'\thit rate limit deleting {id}, retrying in {wait_for} seconds...')
await asyncio.sleep(wait_for)
print('continuing...')
await delete_tweet(id)
print(f'Retrieving all of {self.me["username"]}\'s tweets...')
tweets = self.get_all_tweet_ids_from_user(self.me['id'])
print(f'Retrieved {len(tweets)} tweets.')
if not len(tweets) > 0:
print('No tweets obtained. Make sure the profile is public.')
return
print(f'Deleting {len(tweets)} tweets...')
deleted_count = 0
try:
for tweet in tweets:
print(f'deleted {deleted_count}/{len(tweets)}')
await delete_tweet(tweet.id)
await asyncio.sleep(0.5)
deleted_count += 1
except:
print('Unhandled error occurred while trying to delete tweets.')
traceback.print_exc()
print('Try running again.')
else:
print('Saul Gone')
def get_all_tweet_ids_from_user(self, user_id):
next_page_token = None
tokens_retrieved = 0
tweets_retrieved = 0
tweets = list()
while True:
print(f'Retrieved {tokens_retrieved} tokens so far...')
resp = self.client.get_users_tweets(
user_id, max_results=100, pagination_token=next_page_token,
media_fields=TwAPI.TWEET_MEDIA_FIELDS,
tweet_fields=TwAPI.TWEET_FIELDS,
expansions=TwAPI.TWEET_EXPANSIONS
)
for tweet in resp.data:
tweets.append(tweet)
# update counters and pagination token
tweets_retrieved += resp.meta['result_count']
try:
next_page_token = resp.meta['next_token']
tokens_retrieved += 1
except KeyError:
print("next_token wasn't provided; we've reached the end!")
break # reached end of user's tweets
print(f'Retrieved {tweets_retrieved} tweets using {tokens_retrieved} tokens.')
return tweets
async def get_tweet_response(self, id, attempt = 0):
try:
2022-09-27 02:49:03 -07:00
twt = TwAPI.instance.client.get_tweet(
id,
media_fields=TwAPI.TWEET_MEDIA_FIELDS,
tweet_fields=TwAPI.TWEET_FIELDS,
expansions=TwAPI.TWEET_EXPANSIONS
)
2022-09-27 02:49:03 -07:00
TwAPI.tweets_fetched += 1
return twt
except tweepy.TooManyRequests as e:
wait_for = float(e.response.headers["x-rate-limit-reset"]) - datetime.datetime.now().timestamp() + 1
print(f'[{attempt}]\tget_tweet_response({id}):\n\thit rate limit after {TwAPI.tweets_fetched} fetches -- trying again in {wait_for} seconds...')
await asyncio.sleep(wait_for)
return await self.get_tweet_response(id, attempt=attempt+1)
2022-09-24 17:56:58 -07:00
2022-09-28 13:33:31 -07:00
async def post_tweet(self, text='', media_ids: list=None, reply_to_tweet: int=None):
2022-09-27 02:49:03 -07:00
try:
2022-09-28 13:33:31 -07:00
tweet = self.client.create_tweet(text=text, media_ids=None if media_ids == None else media_ids, in_reply_to_tweet_id=reply_to_tweet)
2022-09-27 02:49:03 -07:00
return tweet
except tweepy.TooManyRequests as e:
wait_for = float(e.response.headers["x-rate-limit-reset"]) - datetime.datetime.now().timestamp() + 1
print(f'\thit rate limit -- attempting to create Tweet again in {wait_for} seconds...')
await asyncio.sleep(wait_for)
2022-09-28 13:33:31 -07:00
return await self.post_tweet(text=text, media_ids=media_ids, reply_to_tweet=reply_to_tweet)
2022-09-27 02:49:03 -07:00
async def get_ttweet_image_media_id(self, ttweet):
img = await util.create_ttweet_image(ttweet)
2022-09-27 02:49:03 -07:00
media = self.api.media_upload(img)
return media.media_id
# return True = successfully posted a single ttweet
# return False = did not post ttweet (duplicate)
2022-09-27 15:09:09 -07:00
async def post_ttweet(self, ttweet: tt.TalentTweet, is_catchup=False):
print(f'------{ttweet.tweet_id} ({util.get_username_local(ttweet.author_id)})------')
2022-10-01 13:33:20 -07:00
REPLY = '{0} {1}replied to {2}!\n'
QUOTE_TWEET = '{0} {1}quote tweeted {2}!\n'
TWEET = '{0} {1}tweeted!\n'
RETWEET = '{0} {1}retweeted {2}!\n'
2022-09-27 02:49:03 -07:00
def create_text():
2022-09-27 22:04:26 -07:00
author_username = f'@/{util.get_username_local(ttweet.author_id)}'
mention_ids = set()
2022-09-27 15:09:09 -07:00
ret = str()
2022-10-01 13:33:20 -07:00
just = ''
2022-09-27 15:09:09 -07:00
if is_catchup:
2022-09-27 17:40:48 -07:00
ret += f'{ttweet.get_datetime_str()}\n'
2022-09-27 15:09:09 -07:00
pass
2022-10-01 13:33:20 -07:00
else:
just = 'just '
2022-09-27 17:40:48 -07:00
# Tweet types
2022-09-27 22:04:26 -07:00
if ttweet.rt_target is not None: # standalone tweet
2022-10-01 13:33:20 -07:00
ret += RETWEET.format(author_username, just, f'@/{util.get_username(ttweet.rt_author_id)}')
2022-09-27 22:04:26 -07:00
mention_ids.clear()
elif ttweet.reply_to is not None: # reply (w/ qrt; push it into mentions)
2022-10-01 13:33:20 -07:00
reply_username = f'@/{util.get_username(ttweet.reply_to)}'
ret += REPLY.format(author_username, just, reply_username)
2022-09-27 15:09:09 -07:00
2022-09-27 02:49:03 -07:00
mention_ids = set(ttweet.mentions)
mention_ids.add(ttweet.quote_retweeted)
try: mention_ids.remove(None)
except: pass
2022-09-27 15:09:09 -07:00
elif ttweet.quote_retweeted is not None: # standalone qrt
2022-10-01 13:33:20 -07:00
quoted_username = f'@/{util.get_username(ttweet.quote_retweeted)}'
ret += QUOTE_TWEET.format(author_username, just, quoted_username)
2022-09-27 15:09:09 -07:00
elif len(ttweet.mentions) > 0: # standalone tweet w/ mentions
2022-10-01 13:33:20 -07:00
ret += TWEET.format(author_username, just)
2022-09-27 15:09:09 -07:00
else:
raise ValueError(f'TalentTweet {ttweet.tweet_id} has insufficient other parties')
# mention line
if len(mention_ids) > 0:
2022-10-01 13:33:20 -07:00
mention_usernames = [f'@/{util.get_username(x)}' for x in mention_ids]
2022-09-27 02:49:03 -07:00
ret += (
2022-09-27 17:40:48 -07:00
'mentioning '
2022-09-27 15:09:09 -07:00
f'{" ".join(mention_usernames)}\n'
2022-09-27 02:49:03 -07:00
)
2022-11-22 01:44:59 -08:00
ret += '\n'
2022-12-09 01:57:38 -08:00
ret += '(this is a missed tweet)\n' if is_catchup else ''
2022-11-22 01:44:59 -08:00
ret += f'{util.ttweet_to_url(ttweet)}'
2022-09-27 15:09:09 -07:00
return ret
2022-09-27 02:49:03 -07:00
text = create_text()
2022-09-27 15:09:09 -07:00
try:
2022-09-29 07:44:21 +01:00
# media_ids = [await self.get_ttweet_image_media_id(ttweet)]
2022-10-01 13:33:20 -07:00
print('posting main tweet...', end='')
2022-09-27 15:09:09 -07:00
twt_resp = await self.post_tweet(text)
2022-10-01 13:33:20 -07:00
print('done')
2022-09-27 15:09:09 -07:00
twt_id = twt_resp.data['id']
2022-09-28 13:33:31 -07:00
# if ttweet.reply_to is not None:
# re_ttweet = tt.TalentTweet(tweet_id=ttweet.reply_to, author_id=)
# media_ids.insert(0, await self.get_ttweet_image_media_id())
try:
print('creating reply img...', end='')
media_ids = [await self.get_ttweet_image_media_id(ttweet)]
print('posting reply tweet...', end='')
await self.post_tweet(reply_to_tweet=twt_id, media_ids=media_ids,)
print('done')
except:
print('Had trouble posting reply image tweet.')
2022-09-27 15:09:09 -07:00
print('successfully posted ttweet!')
return True
except tweepy.Forbidden as e:
if 'duplicate content' in e.api_messages[0]:
print('Twitter says the TalentTweet is a duplicate; skipping error-free...')
return False
else:
raise e
2022-09-27 02:49:03 -07:00
2022-09-24 17:56:58 -07:00
2022-09-29 07:44:21 +01:00