wrap up catch-up mode, add tweets nuker

This commit is contained in:
muskit
2022-09-27 15:09:09 -07:00
committed by msk
parent 07e7e28dcb
commit 4f53bda1f8
6 changed files with 154 additions and 33 deletions
+110 -15
View File
@@ -1,8 +1,9 @@
import asyncio
import datetime
import traceback
import tweepy
from tweetcapture import TweetCapture
import twint
import api_secrets
import talenttweet as tt
@@ -67,7 +68,73 @@ class TwAPI:
access_token=api_secrets.access_token(), access_token_secret=api_secrets.access_secret()
)
)
self.me = self.client.get_me().data
print(f'Assuming the account of @{self.me.data["username"]} ({self.me["id"]})')
## ---[COMMENT OUT WHEN NOT IN USE]---
async def nuke_tweets(self):
async def delete_tweet(id):
try:
self.client.delete_tweet(id)
except tweepy.TooManyRequests as e:
wait_for = float(e.response.headers["x-rate-limit-reset"]) - datetime.datetime.now().timestamp() + 1
print(f'\thit rate limit deleting {id}, retrying in {wait_for} seconds...')
await asyncio.sleep(wait_for)
print('continuing...')
await delete_tweet(id)
print(f'Retrieving all of {self.me["username"]}\'s tweets...')
tweets = self.get_all_tweet_ids_from_user(self.me['id'])
print(f'Retrieved {len(tweets)} tweets.')
if not len(tweets) > 0:
print('No tweets obtained. Make sure the profile is public.')
return
print(f'Deleting {len(tweets)} tweets...')
deleted_count = 0
try:
for tweet in tweets:
print(f'deleted {deleted_count}/{len(tweets)}')
await delete_tweet(tweet.id)
await asyncio.sleep(0.5)
deleted_count += 1
except:
print('Unhandled error occurred while trying to delete tweets.')
traceback.print_exc()
print('Try running again.')
else:
print('Saul Gone')
def get_all_tweet_ids_from_user(self, user_id):
next_page_token = None
tokens_retrieved = 0
tweets_retrieved = 0
tweets = list()
while True:
print(f'Retrieved {tokens_retrieved} tokens so far...')
resp = self.client.get_users_tweets(
user_id, max_results=100, pagination_token=next_page_token,
media_fields=TwAPI.TWEET_MEDIA_FIELDS,
tweet_fields=TwAPI.TWEET_FIELDS,
expansions=TwAPI.TWEET_EXPANSIONS
)
for tweet in resp.data:
tweets.append(tweet)
# update counters and pagination token
tweets_retrieved += resp.meta['result_count']
try:
next_page_token = resp.meta['next_token']
tokens_retrieved += 1
except KeyError:
print("next_token wasn't provided; we've reached the end!")
break # reached end of user's tweets
print(f'Retrieved {tweets_retrieved} tweets using {tokens_retrieved} tokens.')
return tweets
async def get_tweet_response(self, id, attempt = 0):
try:
twt = TwAPI.instance.client.get_tweet(
@@ -84,7 +151,7 @@ class TwAPI:
await asyncio.sleep(wait_for)
return await self.get_tweet_response(id, attempt=attempt+1)
async def post_tweet(self, text, media_id=None, reply_to_tweet: int=None):
async def post_tweet(self, text='', media_id=None, reply_to_tweet: int=None):
try:
tweet = self.client.create_tweet(text=text, media_ids=None if media_id == None else [media_id], in_reply_to_tweet_id=reply_to_tweet)
return tweet
@@ -92,42 +159,70 @@ class TwAPI:
wait_for = float(e.response.headers["x-rate-limit-reset"]) - datetime.datetime.now().timestamp() + 1
print(f'\thit rate limit -- attempting to create Tweet again in {wait_for} seconds...')
await asyncio.sleep(wait_for)
return await self.post_tweet(text=text, media_ids=[media_id])
return await self.post_tweet(text=text, media_id=media_id, reply_to_tweet=reply_to_tweet)
async def get_ttweet_image_media_id(self, ttweet):
img = await util.create_ttweet_image(ttweet)
media = self.api.media_upload(img)
return media.media_id
async def post_ttweet(self, ttweet: tt.TalentTweet):
async def post_ttweet(self, ttweet: tt.TalentTweet, is_catchup=False):
print(f'------{ttweet.tweet_id} ({util.get_username_local(ttweet.author_id)})------')
REPLY = '{0} replied to {1}!\n'
MENTION = '{0} mentioned {1}!\n'
QUOTE_TWEET = '{0} quote tweeted {1}!\n'
MENTION = '{0} tweeted with '
def create_text():
if ttweet.reply_to is not None:
author_username = f'@/{util.get_username_online(ttweet.author_id)}'
author_username = f'@/{util.get_username_online(ttweet.author_id)}'
ret = str()
if is_catchup:
# ret += '[catch-up tweet]\n'
pass
ret += f'{ttweet.get_datetime_str()}\n'
if ttweet.reply_to is not None: # reply (w/ qrt; push it into mentions)
reply_username = f'@/{util.get_username_online(ttweet.reply_to)}'
ret += REPLY.format(author_username, reply_username)
mention_ids = set(ttweet.mentions)
mention_ids.add(ttweet.quote_retweeted)
try: mention_ids.remove(None)
except: pass
elif ttweet.quote_retweeted is not None: # standalone qrt
quoted_username = f'@/{util.get_username_online(ttweet.quote_retweeted)}'
ret += QUOTE_TWEET.format(author_username, quoted_username)
elif len(ttweet.mentions) > 0: # standalone tweet w/ mentions
ret += MENTION.format(author_username)
else:
raise ValueError(f'TalentTweet {ttweet.tweet_id} has insufficient other parties')
# mention line
if len(mention_ids) > 0:
mention_usernames = [f'@/{util.get_username_online(x)}' for x in mention_ids]
ret = REPLY.format(author_username, reply_username)
ret += (
'mentions '
f'{" ".join(mention_usernames)}'
f'\n{util.ttweet_to_url(ttweet)}'
f'{" ".join(mention_usernames)}\n'
)
return ret
ret += f'\n{util.ttweet_to_url(ttweet)}'
return ret
img_media_id_task = asyncio.create_task(self.get_ttweet_image_media_id(ttweet))
text = create_text()
media_id = await img_media_id_task
twt_resp = await self.post_tweet(text)
twt_id = twt_resp.data['id']
await self.post_tweet(text='Image backup', reply_to_tweet=twt_id, media_id=media_id,)
try:
print('posting main tweet')
twt_resp = await self.post_tweet(text)
twt_id = twt_resp.data['id']
print('posting reply tweet')
await self.post_tweet(reply_to_tweet=twt_id, media_id=media_id,)
print('successfully posted ttweet!')
return True
except tweepy.Forbidden as e:
if 'duplicate content' in e.api_messages[0]:
print('Twitter says the TalentTweet is a duplicate; skipping error-free...')
return False
else:
raise e