clean up code, setup in ready-to-run state

This commit is contained in:
muskit
2023-08-18 01:34:25 -07:00
parent 79e5fca9cc
commit fe1749bbe0
13 changed files with 203 additions and 199 deletions
+28 -14
View File
@@ -7,25 +7,39 @@ Twitter bot that tracks cross-company interactions between the non-JP branches o
## `.env`
These need to be defined in a `.env` file at the project root (outside of `src`):
```
# Scweet (scraping)
SCWEET_EMAIL=
SCWEET_USERNAME=
SCWEET_PASSWORD=
# Twitter API bot keys (posting)
api_key=
api_secret=
oauth1_access_token=
oauth1_access_secret=
bearer_token=
### Scraper Credentials
To get around rate limitations imposed on users, we scrape with multiple accounts. Each account is defined in the file using the following format:
```
scraper_usernameX=twitter_username
scraper_passwordX=twitter_password
```
where `X` is a number starting from 0, increasing by 1 for each account added. For instance:
```
scraper_username0=
scraper_password0=
scraper_username1=
scraper_password1=
```
The first account (`scraper_username0` and `scraper_password0`) will be used to attempt scraping private accounts. Make sure this account follows any private accounts that you want to scrape!
### Twitter API Stuff
The following keys/tokens are used for the official API via `tweepy`. We mainly use these to just post tweets.
```
app_key=
app_secret=
user_token=
user_secret=
```
### Screenshot Cookie *(optional)*
This is the authentication token obtained from a browser when signed in on the Twitter website. It's only needed if you want to screenshot tweets from privated accounts. Make sure the token belongs to an account that follows desired private accounts! Maybe have it belong to `scraper_username0`?
```
web_auth_token=
```
## Running modes
The bot may run in these modes:
* Catch-up (`c`): intended to run only once, scan all accounts for cross-company tweets and post them. Terminate when done posting all.
- use `--auto-listen` to switch to listen mode when finished
* Listen (`l`): listens for tweets from list, sharing it if it's cross-company
* Pass no argument to run in listen mode, which scrapes all accounts in the *list* folder at an interval.
* Pass `--straight-to-queue` to process the queue first before attempting to scrape.
* Command-line (`cmd`): an interactive mode for manual control and debugging (drops into Python interpretor)
*Created for the spirit of entertainment and in the name of unity.*
+3 -3
View File
@@ -15,7 +15,7 @@
# --- [Ethyria] ---
1437952405283426310 MillieParfait
1437963160544284675 EnnaAlouette
1437959162651156484 NinaKosaka
1437959162651156484 NinaKosaka p
1437961007029227520 ReimuEndou
# --- [Luxiem]---
@@ -28,7 +28,7 @@
# --- [Noctyx] ---
1490867613915828224 alban_knox
1491195742123397124 uki_violeta
1492604168145539072 Yugo_Asuma
1492604168145539072 Yugo_Asuma p
1493392149664219138 Fulgur_Ovid
1493394108014292993 sonny_brisko
@@ -44,7 +44,7 @@
1589536631324692480 MelocoKyoran
1589524401170833409 HexHaywire
1589531775058968576 D_Dropscythe
1589539582399348738 ZaionLanZa
1589539582399348738 ZaionLanZa p
1591995159901663232 KotokaTorahime
1589791076709171201 Ver_Vermillion
+12 -1
View File
@@ -1,5 +1,8 @@
possible combinations which involve a "target cross-tweeter" B
[scraper rate limitations]
50 searches/pages every 15 minutes
- max 20 tweets per search
[possible combinations which involve a "target cross-tweeter" B]
A retweets B
- B's tweet may have cross-mentions (B1, B2, etc.)
- rt_author_id=B; rt_mentions=B1,B2,...
@@ -12,5 +15,13 @@ A quotes a tweet from B
A quotes a tweet mentioning B
- quote_retweeted=...; rt_mentions=B...
A replies to B
r = B
A replies to a tweet mentioning B
- r=...; rtm=B1,B2,...
-- NO --
A retweets a tweet that quotes a tweet mentioning B?
[potential code change]
rtm --> tgm (target tweet's mentions)
+4
View File
@@ -18,6 +18,10 @@ class AccountPool:
else:
break
def use_index(self, idx):
self.__idx = idx
return self.current()
def current(self):
if 0 <= self.__idx < len(self.__accounts):
return self.__accounts[self.__idx]
+38 -32
View File
@@ -17,13 +17,12 @@ from twapi import TwAPI
import talenttweet as tt
import ttweetqueue as ttq
PROGRAM_ARGS = None
safe_to_post_tweets = False
safe_to_post_tweets = True
errored = False
# Returns a list of sorted and filtered TalentTweets (should
# be equivalent to queue.txt)
async def get_cross_talent_tweets():
async def get_cross_tweets_online():
global safe_to_post_tweets
scraper = Scraper()
@@ -38,9 +37,9 @@ async def get_cross_talent_tweets():
# tweets = get_user_tweets(talent_id, since_date=queue.finished_user_dates.get(talent_id, None))
since_date = queue.finished_user_dates.get(talent_id, None)
ttweets = scraper.get_cross_ttweets_from_user(talent_username, since_date=since_date)
print(f'got {len(ttweets)} TalentTweets')
for ttweet in ttweets:
if ttweet.tweet_id not in queue.ttweets_dict \
and ttweet.tweet_id not in queue.finished_ttweets \
if ttweet.tweet_id not in queue.finished_ttweets \
and ttweet.is_cross_company():
queue.add_ttweet(ttweet)
except KeyboardInterrupt as e:
@@ -68,9 +67,9 @@ async def get_cross_talent_tweets():
# return False = errored or we posted at least one ttweet
# return True = we didn't post a single ttweet
async def process_queue() -> bool:
global PROGRAM_ARGS
global errored
WAIT_TIME = 60*3
WAIT_TIME = 60*15
ttweets_posted = 0
errored = False
@@ -81,13 +80,10 @@ async def process_queue() -> bool:
print('Posting queue is empty!')
return True
if PROGRAM_ARGS.announce_catchup:
TwAPI.instance.post_tweet(text=f'Starting to catch up through {queued_ttweets_count} logged tweets.')
try:
while not queue.is_empty():
ttweet = queue.get_next_ttweet()
tweet_was_successful = await TwAPI.instance.post_ttweet(ttweet, is_catchup=True)
tweet_was_successful = await TwAPI.instance.post_ttweet(ttweet)
print('running queue.good()...')
queue.good()
@@ -103,9 +99,6 @@ async def process_queue() -> bool:
print('Unhandled error occurred while posting tweets from queue.')
errored = True
traceback.print_exc()
else:
if PROGRAM_ARGS.announce_catchup:
await TwAPI.instance.post_tweet('Finished with catch-up tweets!')
if errored or ttweets_posted > 0:
return False
@@ -113,26 +106,39 @@ async def process_queue() -> bool:
# return True = no problems
# return False = issue occurred where we couldn't post all past tweets properly
async def run():
async def run(PROGRAM_ARGS):
global errored
global safe_to_post_tweets
queue = ttq.TalentTweetQueue.instance
while True:
await get_cross_talent_tweets()
print(f'{queue.get_count()} cross-company tweets to attempt sharing.')
try:
if safe_to_post_tweets:
if await process_queue():
print('Posted no new tweets; we\'re caught up!')
return True
else:
print('Tweets were not retrieved cleanly.')
async def queue_loop():
while True:
print(f'{queue.get_count()} cross-company tweets to attempt sharing.')
try:
if safe_to_post_tweets:
if await process_queue():
print('Posted no new tweets; we\'re caught up!')
return True
else:
print('Tweets were not retrieved cleanly.')
return False
except KeyboardInterrupt:
print('Interrupting queue processing...')
return False
except:
print('Unhandled error occurred while running catch up in posting phase.')
traceback.print_exc()
return False
if errored:
return False
except:
print('Unhandled error occurred while running catch up in posting phase.')
traceback.print_exc()
return False
if errored:
return False
await get_cross_tweets_online()
if PROGRAM_ARGS.straight_to_queue:
print('Processing queue first before pulling tweets...')
return await queue_loop()
else:
await get_cross_tweets_online()
return await queue_loop()
+4 -4
View File
@@ -9,13 +9,13 @@ import catchup
errors_encountered = 0
def run():
def run(PROGRAM_ARGS):
global errors_encountered
while True:
try:
asyncio.run(catchup.run())
print('Sleeping for 30 minutes...')
sleep(1800) # run every half-hour
asyncio.run(catchup.run(PROGRAM_ARGS))
print('Sleeping for 10 minutes...')
sleep(60*10) # run every 10 minutes
except KeyboardInterrupt:
print('Interrupt signal received. Exiting listen mode.')
print(f'{errors_encountered} errors encountered throughout session.')
+11 -24
View File
@@ -15,44 +15,34 @@ from twapi import TwAPI
PROGRAM_ARGS = None
MODES_HELP_STR = '''mode to run the bot at:
l,listen: listen for new tweets from all accounts; will not terminate unless error occurs
c,catchup: scan all tweets from all accounts; will terminate when done
d,delete-all: delete all tweets on account provided by secrets.ini; make sure the function is uncommented in twapi.py'''
<blank> scrape accounts in lists and post cross-company tweets if relevant
cmd drop into Python interpretor with access to initialized variables'''
def init_argparse():
p = argparse.ArgumentParser(description='Twitter bot that follows interactions between Nijisanji EN/ID and hololive EN/ID members.', formatter_class=RawTextHelpFormatter)
p.add_argument('mode', nargs='?', \
help=MODES_HELP_STR)
p.add_argument('--no-delay', action='store_true', help='In self-destruct mode, clear tweets without safety waiting.')
p.add_argument('mode', nargs='?', help=MODES_HELP_STR)
p.add_argument('--no-listen', action='store_true', help='Run one scraping-posting cycle without waiting to run again.')
p.add_argument('--straight-to-queue', action='store_true', help='Go through queue first before attempting to pull tweets.')
return p
def command_line():
# TODO (extra): implement command line mode for manually controlling the bot
print('Shell coming soon. For now, here\'s a Python interpretor.')
print('Here\'s a Python interpretor.')
code.interact(local=globals())
pass
async def self_destruct():
if not PROGRAM_ARGS.no_delay:
print('\033[31;6m-----DELETING ALL TWEETS IN 10 SECONDS!! PRESS CTRL+C TO CANCEL.-----\033[0m')
await asyncio.sleep(10)
await TwAPI.instance.nuke_tweets()
async def async_main():
global PROGRAM_ARGS
if PROGRAM_ARGS.mode == None:
await catchup.run()
if PROGRAM_ARGS.no_listen:
await catchup.run(PROGRAM_ARGS)
else:
listen.run(PROGRAM_ARGS)
return
mode = PROGRAM_ARGS.mode.lower()
if mode in ['d', 'delete-all']:
print('WARNING: SELF-DESTRUCT MODE')
await self_destruct()
elif mode == 'cmd':
if mode == 'cmd':
command_line()
elif mode in ['l', 'listen']:
listen.run()
else:
print('\nunknown mode. run with no arguments or -h for help and modes')
@@ -66,8 +56,6 @@ def main():
PROGRAM_ARGS = parser.parse_args()
## We expect to run in some mode now.
# Initialize shared API instance
TwAPI()
@@ -78,7 +66,6 @@ def main():
ttq.TalentTweetQueue()
## Asynchronous execution
print('beginning async main')
nest_asyncio.apply()
asyncio.run(async_main())
+23 -9
View File
@@ -20,8 +20,12 @@ class Scraper:
self.__account = AccountPool()
self.try_login()
def try_login(self) -> bool:
acc = self.__account.next()
def try_login(self, account_idx: int = None) -> bool:
if account_idx is not None:
acc = self.__account.use_index(account_idx)
else:
acc = self.__account.next()
if acc is not None:
name = acc[0]
print(f"using {name}")
@@ -65,9 +69,10 @@ class Scraper:
# recover lost info
if tweet.is_retweet:
if tweet.retweeted_tweet is None:
print(f'{tweet.author.username}/{tweet.id} is missing the RT! Recovering...')
tweet.retweeted_tweet = self.app.tweet_detail(str(tweet.id)).retweeted_tweet
if tweet.retweeted_tweet.author is None:
print(f'{tweet.author.username}/{tweet.id} is missing the RT! It\'s probably nothing...')
# tweet.retweeted_tweet = self.app.tweet_detail(str(tweet.id)).retweeted_tweet
tweet.is_retweet = False
elif tweet.retweeted_tweet.author is None:
print(f'WARNING: {tweet.author.username}/{tweet.id} is missing the RT author! Recovering details...')
tweet.retweeted_tweet = self.app.tweet_detail(tweet.retweeted_tweet.id)
@@ -78,17 +83,20 @@ class Scraper:
tweet.is_quoted = False
elif tweet.quoted_tweet.author is None:
print(f'WARNING: {tweet.author.username}/{tweet.id} is missing the QRT author! Recovering details...')
tweet.quoted_tweet= self.app.tweet_detail(tweet.quoted_tweet.id)
tweet.quoted_tweet = self.app.tweet_detail(tweet.quoted_tweet.id)
# fix reply if it exists
# if tweet.is_reply and tweet.replied_to is None:
# tweet.replied_to = self.app.tweet_detail(tweet._original_tweet['in_reply_to_status_id_str'])
# tweet.replied_to = self.app.tweet_detail(tweet.original_tweet['in_reply_to_status_id_str'])
tweets.append(tweet)
if not reached_backdate and int(tweet.author.id) == uid and tweet.date <= since:
print("reached backdate")
reached_backdate = True
if uid in talent_lists.privated_accounts:
self.try_login(0)
while not reached_backdate:
try:
# uts = self.app.get_tweets(uid, replies=True, cursor=cur)
@@ -110,8 +118,14 @@ class Scraper:
cur = search.cursor
except UnknownError:
print("UnknownError occurred, probably rate-limited")
# traceback.print_exc()
if not self.try_login():
if uid in talent_lists.privated_accounts:
print("sticking pvt-accessible account. sleeping for 2 minutes...")
sleep(120)
print()
l = self.try_login(0)
else:
l = self.try_login()
if not l:
print("sleeping for 2 minutes...")
sleep(120)
print()
+10 -6
View File
@@ -6,6 +6,7 @@ niji_en: dict[int, str] = dict()
niji_exid: dict[int, str] = dict()
talents: dict[int, str] = dict()
talents_company: dict[int, str] = dict()
privated_accounts: dict[int, str] = dict()
test_talents = dict()
@@ -16,12 +17,15 @@ def __create_dict(file, _dict, company):
with open(file, 'r') as f:
for line in f:
words = line.split()
if len(words) == 2 and line[0] != '#':
id, name = line.split()
if len(words) >= 2 and line[0] != '#':
t = line.split()
id, name = int(t[0]), t[1]
# name = f'{util.get_username_online(id, default=name)}' # attempt to get updated name
talents[int(id)] = name
_dict[int(id)] = name
talents_company[int(id)] = company
talents[id] = name
_dict[id] = name
talents_company[id] = company
if len(words) > 2 and words[2] == 'p':
privated_accounts[id] = name
def init():
global holo_en
global holo_id
@@ -36,7 +40,7 @@ def init():
# nijiEN
__create_dict(f'{util.get_project_dir()}/lists/nijien.txt', niji_en, 'nijiEN')
# nijiexID
__create_dict(f'{util.get_project_dir()}/lists/nijiexid.txt', niji_exid, 'nijiex-ID')
__create_dict(f'{util.get_project_dir()}/lists/nijiexid.txt', niji_exid, 'nijiex\'ID')
# TODO: nijiex-KR
test_talents = holo_en
+45 -30
View File
@@ -5,7 +5,8 @@ import platform
import pytz
from tweety.types import *
from talent_lists import is_cross_company
# from talent_lists import is_cross_company, talents
import talent_lists as tl
import util
class TalentTweet:
@@ -94,7 +95,7 @@ class TalentTweet:
date_time=tweety.date, text=tweety.text,
mrq=(
[int(x.id) for x in tweety.user_mentions],
int(tweety._original_tweet['in_reply_to_user_id_str']) if tweety.is_reply else None,
int(tweety.original_tweet['in_reply_to_user_id_str']) if tweety.is_reply else None,
int(tweety.quoted_tweet.author.id) if tweety.quoted_tweet is not None else None
),
rt_author_id=tweety.retweeted_tweet.author.id if tweety.is_retweet else None,
@@ -108,15 +109,22 @@ class TalentTweet:
self.date_time = date_time
self.text = text
# filter twitter users to only be cross-company
self.mentions = {x for x in mrq[0] if is_cross_company(author_id, x)}
self.reply_to = mrq[1] if mrq[1] is not None and is_cross_company(author_id, mrq[1]) else None
self.quote_tweeted = mrq[2]
# filter users to only be talents
self.mentions = {x for x in mrq[0] if x in tl.talents}
self.rt_mentions = {x for x in rt_mentions if x in tl.talents}
# rt'd/quoted tweet contains cross-company names?
self.rt_mentions = {x for x in rt_mentions if is_cross_company(author_id, x)}
self.reply_to = mrq[1]
self.quote_tweeted = mrq[2]
self.rt_author_id = rt_author_id
try: self.mentions.remove(self.reply_to)
except: pass
# -1 if user is not in company
self.reply_to = self.reply_to if self.reply_to is None or self.reply_to in tl.talents else -1
self.quote_tweeted = self.quote_tweeted if self.quote_tweeted is None or self.quote_tweeted in tl.talents else -1
self.rt_author_id = self.rt_author_id if self.rt_author_id is None or self.rt_author_id in tl.talents else -1
# all users involved except for the author
self.all_parties = {self.reply_to, self.quote_tweeted, rt_author_id}
self.all_parties.update(self.mentions, self.rt_mentions)
@@ -124,10 +132,6 @@ class TalentTweet:
except: pass
try: self.all_parties.remove(self.author_id)
except: pass
# clean up mentions
try: self.mentions.remove(self.reply_to)
except: pass
def __repr__(self) -> str:
@@ -146,11 +150,11 @@ class TalentTweet:
)
def url(self):
return f'https://www.twitter.com/{self.username}/status/{self.tweet_id}'
return util.get_tweet_url(self.tweet_id, self.username)
def is_cross_company(self):
for other_id in self.all_parties:
if is_cross_company(self.author_id, other_id):
if tl.is_cross_company(self.author_id, other_id):
return True
return False
@@ -167,14 +171,15 @@ class TalentTweet:
unpad = '#' if platform.system() == 'Windows' else '-'
return self.date_time.strftime(f'%b %{unpad}d %Y, %{unpad}I:%M%p (%Z)')
def announce_text(self, is_catchup=False):
def announce_text(self):
# templates
REPLY = '{0} replied to {1}!'
TWEET = '{0} tweeted mentioning {1}!'
REPLY = '{0} replied to {1}!'
REPLY_TO_MENTION_B = '{0} replied to a tweet{1}mentioning {1}!' #########################
RETWEET = '{0} retweeted {1}!'
RETWEET_MENTIONS_B = '{0} shared a tweet mentioning {1}!'
RETWEET_MENTIONS_B = '{0} shared a tweet{1}mentioning {2}!' #########################
QUOTE_TWEET = '{0} quote tweeted {1}!'
QUOTE_TWEET_MENTIONS_B = '{0} quoted a tweet mentioning {1}!'
QUOTED_TWEET_MENTIONS_B = '{0} quoted a tweet{1}mentioning {2}!' #########################
author_username = f'@/{util.get_username_with_company(self.author_id)}'
ret = str()
@@ -184,28 +189,37 @@ class TalentTweet:
except: pass
mention_usernames = [f'@/{util.get_username_with_company(x)}' for x in print_mention_ids]
if is_catchup:
ret += f'{self.get_datetime_str()}\n'
pass
rt_mention_names = [util.get_username_with_company(x) for x in self.rt_mentions]
def rtm_msg(TEMPLATE: str, rtm_author_username: str):
if self.rt_author_id != -1: # rtm tweet is not from talent; rtm should be everyone
rtm_names = [f'@/{util.get_username_with_company(x)}' for x in self.rt_mentions]
between = f' from {rtm_author_username} '
ret += TEMPLATE.format(author_username, between, ", ".join(rtm_names))
else: # rtm tweet is from a talent; rtm should just be cross company
rtm_names = [f'@/{util.get_username_with_company(x)}' for x in self.rt_mentions if tl.is_cross_company(self.author_id, x)]
ret += TEMPLATE.format(author_username, ' ', ", ".join(rtm_names))
# Tweet types
if self.rt_author_id is not None: # retweet
rt_username = f'@/{util.get_username_with_company(self.rt_author_id)}' if self.rt_author_id != -1 else None
if len(self.rt_mentions) > 0:
ret += RETWEET_MENTIONS_B.format(author_username, ", ".join(rt_mention_names))
rtm_msg(RETWEET_MENTIONS_B, rt_username)
else:
ret += RETWEET.format(f'{author_username}', f'@/{util.get_username_with_company(self.rt_author_id)}')
ret += RETWEET.format(author_username, rt_username)
elif self.reply_to is not None: # reply
reply_username = f'@/{util.get_username_with_company(self.reply_to)}'
ret += REPLY.format(author_username, reply_username)
elif self.quote_tweeted is not None: # qrt
quoted_username = f'@/{util.get_username_with_company(self.quote_tweeted)}'
reply_username = f'@/{util.get_username_with_company(self.reply_to)}' if self.reply_to != -1 else None
if len(self.rt_mentions) > 0:
ret += QUOTE_TWEET_MENTIONS_B.format(author_username, ", ".join(rt_mention_names))
rtm_msg(REPLY_TO_MENTION_B, reply_username)
else:
ret += REPLY.format(author_username, reply_username)
elif self.quote_tweeted is not None: # qrt
quoted_username = f'@/{util.get_username_with_company(self.quote_tweeted)}' if self.quote_tweeted != -1 else None
if len(self.rt_mentions) > 0:
rtm_msg(QUOTED_TWEET_MENTIONS_B, quoted_username)
else:
ret += QUOTE_TWEET.format(author_username, quoted_username)
elif len(self.mentions) > 0: # standalone tweet
ret += TWEET.format(author_username, ", ".join(mention_usernames))
f'[{self.get_datetime_str()}]\n'
return ret
else:
raise ValueError(f'TalentTweet {self.tweet_id} has insufficient other parties')
@@ -217,4 +231,5 @@ class TalentTweet:
f'{", ".join(mention_usernames)}'
)
ret += f'\n\n{self.get_datetime_str()}'
return ret
+7 -2
View File
@@ -74,8 +74,8 @@ class TalentTweetQueue:
return self.get_count() <= 0
def add_ttweet(self, ttweet):
self.__sorted = False
self.ttweets_dict[ttweet.tweet_id] = ttweet
self.__sorted = False
def get_ttweet(self, id):
return self.ttweets_dict[id]
@@ -84,7 +84,10 @@ class TalentTweetQueue:
self.is_good = False
if os.path.exists(self.current_ttweet_path):
with open(self.current_ttweet_path, 'r') as f:
return tt.TalentTweet.deserialize(f.readline())
ttweet = tt.TalentTweet.deserialize(f.readline())
if ttweet.tweet_id in self.ttweets_dict:
self.ttweets_dict.pop(ttweet.tweet_id)
return ttweet
self.__sort_ttweets_dict()
key = list(self.ttweets_dict.keys())[0]
@@ -109,6 +112,7 @@ class TalentTweetQueue:
# overwrite queue.txt
def save_file(self):
print('saving file...', end='')
shutil.copyfile(self.queue_path, self.queue_backup_path)
self.__sort_ttweets_dict()
with open(self.queue_path, 'w') as f:
@@ -121,6 +125,7 @@ class TalentTweetQueue:
# write sorted ttweets
for ttweet in self.ttweets_dict.values():
f.write(ttweet.serialize() + '\n')
print('done')
def add_finished_tweet(self, id):
self.finished_ttweets.append(id)
+12 -51
View File
@@ -76,54 +76,18 @@ class TwAPI:
consumer_key=creds['app_key'], consumer_secret=creds['app_secret'],
access_token=creds['user_token'], access_token_secret=creds['user_secret']
)
# self.api = tweepy.API(
# auth=tweepy.OAuthHandler(
# consumer_key=api_secrets.api_key(), consumer_secret=api_secrets.api_secret(),
# access_token=api_secrets.access_token(), access_token_secret=api_secrets.access_secret()
# )
# )
self.api = tweepy.API(
auth=tweepy.OAuthHandler(
consumer_key=creds['app_key'], consumer_secret=creds['app_secret'],
access_token=creds['user_token'], access_token_secret=creds['user_secret']
)
)
# try:
# self.me = self.client.get_me(wait_on_rate_limit=True).data
# except Exception as e:
# print('Failed to login!')
# raise e
# print(f'Assuming the account of @{self.me.data["username"]} ({self.me["id"]})')
## ---[COMMENT OUT WHEN NOT IN USE]---
# async def nuke_tweets(self):
# async def delete_tweet(id):
# try:
# self.client.delete_tweet(id)
# except tweepy.TooManyRequests as e:
# wait_for = float(e.response.headers["x-rate-limit-reset"]) - datetime.datetime.now().timestamp() + 1
# print(f'\thit rate limit deleting {id}, retrying in {wait_for} seconds...')
# await asyncio.sleep(wait_for)
# print('continuing...')
# await delete_tweet(id)
# print(f'Retrieving all of {self.me["username"]}\'s tweets...')
# tweets = self.get_all_tweet_ids_from_user(self.me['id'])
# print(f'Retrieved {len(tweets)} tweets.')
# if not len(tweets) > 0:
# print('No tweets obtained. Make sure the profile is public.')
# return
# print(f'Deleting {len(tweets)} tweets...')
# deleted_count = 0
# try:
# for tweet in tweets:
# print(f'deleted {deleted_count}/{len(tweets)}')
# await delete_tweet(tweet.id)
# await asyncio.sleep(0.5)
# deleted_count += 1
# except:
# print('Unhandled error occurred while trying to delete tweets.')
# traceback.print_exc()
# print('Try running again.')
# else:
# print('Saul Gone')
try:
self.me = self.client.get_me().data
print(f'Assuming the account of @{self.me.data["username"]} ({self.me["id"]})')
except:
pass
async def post_tweet(self, text='', media_ids: list=None, reply_to_tweet: int=None, quote_tweet_id: int=None):
try:
@@ -142,7 +106,7 @@ class TwAPI:
# return True = successfully posted a single ttweet
# return False = did not post ttweet (duplicate)
async def post_ttweet(self, ttweet: tt.TalentTweet, is_catchup=False, dry_run=False):
async def post_ttweet(self, ttweet: tt.TalentTweet, dry_run=False):
print(f'------{ttweet.tweet_id} ({util.get_username_local(ttweet.author_id)})------')
text = ttweet.announce_text()
@@ -167,9 +131,6 @@ class TwAPI:
twt_resp = await self.post_tweet(text, quote_tweet_id=ttweet.tweet_id)
print('done')
twt_id = twt_resp.data['id']
# if ttweet.reply_to is not None:
# re_ttweet = tt.TalentTweet(tweet_id=ttweet.reply_to, author_id=)
# media_ids.insert(0, await self.get_ttweet_image_media_id())
try:
print('creating reply img...', end='')
+6 -23
View File
@@ -4,11 +4,12 @@ import os
import sys
import traceback
from datetime import datetime
from dotenv import dotenv_values
import tweepy
import pytz
import twint
#import twapi
import twapi
from tweetcapture import TweetCapture
from recrop import fix_aspect_ratio
@@ -53,11 +54,12 @@ def get_key_from_value(d: dict, val):
async def create_ttweet_image(ttweet):
tc = TweetCapture()
tc.cookies = [{'name': 'auth_token', 'value': dotenv_values()['web_auth_token']}]
if 'linux' in sys.platform:
# Linux chromedriver path
tc.driver_path = '/usr/bin/chromedriver'
filename = f'{get_project_dir()}/img.png'
url = ttweet_to_url(ttweet)
url = ttweet.url()
img = None
print(url)
try: os.remove(filename)
@@ -66,7 +68,7 @@ async def create_ttweet_image(ttweet):
img = await tc.screenshot(
url=url,
path=filename,
mode=4,
mode=0,
night_mode=1,
show_parent_tweets=True
)
@@ -80,26 +82,7 @@ async def create_ttweet_image(ttweet):
return img
def get_tweet_url(id, username):
return f'https://twitter.com/{username}/status/{id}'
def ttweet_to_url(ttweet):
username = get_username(ttweet.author_id)
return get_tweet_url(ttweet.tweet_id, username)
# twint
# May not work with short user IDs (ie. 1354241437)
# def get_username_online(id, default=None):
# c = twint.Config()
# c.User_id = id
# c.Store_object = True
# c.Hide_output = True
# try:
# twint.output.users_list.clear()
# twint.run.Lookup(c)
# user = twint.output.users_list[0]
# return user.username
# except:
# return str(default) if default is not None else f'{id}'
return f'https://www.twitter.com/{username}/status/{id}'
## Attempt to pull username from local; pull from online if doesn't exist.
def get_username(id):