make catch-up mode to work better (+graceful exit)

This commit is contained in:
muskit
2022-09-28 20:00:02 -07:00
parent d2fc4a4d44
commit 82dbaabd6d
6 changed files with 165 additions and 124 deletions
+57 -121
View File
@@ -15,23 +15,11 @@ from util import *
from talent_lists import *
from twapi import TwAPI
import talenttweet as tt
import ttweetqueue as ttq
PROGRAM_ARGS = None
safe_to_post_tweets = True
def write_user_timestamp(user_id, file, timestamp = None, error = False):
if timestamp is None:
timestamp = datetime.datetime.now().timestamp()
file.write(f'# {user_id} {timestamp if not error else "-1"}\n')
pass
def get_queue_path():
return f'{util.get_project_dir()}/queue.txt'
def get_local_queue():
# f = open(os.path.join(get_project_dir(), 'queue.txt'))
pass
errored = False
## Returns the ID of all tweets (up to limit) from a user ID.
def get_user_tweets(id, since_timestamp=None, limit=None):
@@ -51,6 +39,7 @@ def get_user_tweets(id, since_timestamp=None, limit=None):
twint.run.Search(c)
except:
print(f'Had trouble getting tweets from {user_str}')
safe_to_post_tweets = False
traceback.print_exc()
for twt in tweets:
@@ -60,114 +49,74 @@ def get_user_tweets(id, since_timestamp=None, limit=None):
print(f'Scraped {len(tweets)} tweets, {qrt_count} of which are quote tweets.')
return tweets
# Returns dict of accounts that successfully caught up.
# LINE FORMAT: "# {user_id} {status_num} {UNIX_timestamp}
def get_finished_user_timestamps(queue_file):
results = dict()
for line in queue_file:
tokens = line.split()
if len(tokens) == 0: continue
if tokens[0][0] != '#':
print(f'{line} is our stopper!')
# reached end of accounts list
break
if tokens[2] != '-1':
results[int(tokens[1])] = float(tokens[2])
return results
def get_user_timestamps_str(queue_file):
results = str()
for line in queue_file:
tokens = line.split()
if len(tokens) != 3 or tokens[0][0] != '#':
# reached end of accounts list
break
results += f'{line}\n'
return results[:-1]
# If queue.txt doesn't exist, creates and populates it.
# Returns a list of sorted and filtered TalentTweets (should
# be equivalent to queue.txt)
async def get_cross_talent_tweets(queue_path):
finished_user_timestamps = dict()
ttweets_dict = dict()
async def get_cross_talent_tweets():
posted_ttweets = set() # TODO: don't add TTweet to ttweets_dict if its id exists in posted_ttweets
global safe_to_post_tweets
# Populate structures with existing data from queue.txt
try:
with open(queue_path, 'r') as f:
finished_user_timestamps = get_finished_user_timestamps(f)
with open(queue_path, 'r') as f: # reset seek head
# Get existing queued TalentTweets
for line in f:
tokens = line.split()
if len(tokens) == 0 or tokens[0][0] == '#':
continue
ttweet = tt.TalentTweet.deserialize(line)
ttweets_dict[ttweet.tweet_id] = ttweet
print(f'Found {len(finished_user_timestamps)} scraped accounts and {len(ttweets_dict)} tweets.')
except FileNotFoundError:
print('queue.txt not found.')
queue = ttq.TalentTweetQueue.instance
# Begin getting tweets from online
with open(queue_path, 'w') as f:
print('Pulling tweets from online!')
try:
for i, (talent_id, talent_username) in enumerate(talent_lists.talents.items()):
print(f'[{i+1}/{len(talent_lists.talents)}] {talent_username}-----------------------------------')
try:
# tweets = get_user_tweets(talent_id, since_timestamp=1663698621) # shorten test runs
tweets = get_user_tweets(talent_id, since_timestamp=finished_user_timestamps.get(talent_id, None))
for tweet in tweets:
if tweet.id not in ttweets_dict:
ttweet = await tt.TalentTweet.create_from_twint_tweet(tweet)
if ttweet.is_cross_company():
ttweets_dict[ttweet.tweet_id] = ttweet
except:
print('Error occurred processing tweet data.')
safe_to_post_tweets = False
print(traceback.format_exc())
write_user_timestamp(user_id=talent_id, file=f, error=True)
else:
write_user_timestamp(user_id=talent_id, file=f)
f.write('\n')
ttweets_dict = dict(sorted(ttweets_dict.items()))
for ttweet in ttweets_dict.values():
f.write(f'{ttweet.serialize()}\n')
except:
print('Unhandled error occurred while pulling tweets.')
traceback.print_exc()
print('Saving queue.txt and exiting.')
safe_to_post_tweets = False
print('Pulling tweets from online!')
try:
for i, (talent_id, talent_username) in enumerate(talent_lists.talents.items()):
print(f'[{i+1}/{len(talent_lists.talents)}] {talent_username}-----------------------------------')
try:
tweets = get_user_tweets(talent_id, since_timestamp=queue.finished_user_timestamps.get(talent_id, None))
for tweet in tweets:
if tweet.id not in queue.ttweets_dict:
ttweet = await tt.TalentTweet.create_from_twint_tweet(tweet)
if ttweet.is_cross_company():
queue.add_ttweet(ttweet)
except:
print('Error occurred processing tweet data.')
safe_to_post_tweets = False
print(traceback.format_exc())
queue.finished_user_timestamps[talent_id] = -1
else:
queue.finished_user_timestamps[talent_id] = util.get_current_timestamp()
except:
print('Unhandled error occurred while pulling tweets.')
traceback.print_exc()
safe_to_post_tweets = False
else:
print('Successfully saved all tweets from online!')
queue.save_file()
return ttweets_dict
return queue.get_ttweets_dict()
# return False = errored or we posted at least one ttweet
# return True = we didn't post a single ttweet
async def process_queue(ttweets_dict: dict) -> bool:
async def process_queue() -> bool:
global PROGRAM_ARGS
global errored
WAIT_TIME = 30
ttweets_posted = 0
errored = False
if len(ttweets_dict) == 0: return ttweets_posted
queue = ttq.TalentTweetQueue.instance
if len(queue.ttweets_dict) == 0: return ttweets_posted
if PROGRAM_ARGS.announce_catchup:
TwAPI.instance.post_tweet(text=f'Starting to catch up through {len(ttweets_dict)} logged tweets.')
try:
while len(ttweets_dict) > 0:
key = list(ttweets_dict.keys())[0]
ttweet = ttweets_dict[key]
if await TwAPI.instance.post_ttweet(ttweet, is_catchup=True):
while len(queue.ttweets_dict) > 0:
key = list(queue.ttweets_dict.keys())[0]
ttweet = queue.ttweets_dict[key]
queue.good = False
tweet_was_successful = await TwAPI.instance.post_ttweet(ttweet, is_catchup=True)
queue.ttweets_dict.pop(key)
if tweet_was_successful:
print('saving new queue...')
queue.good = True
queue.save_file()
ttweets_posted += 1
print(f'resting for {WAIT_TIME}s...')
await asyncio.sleep(WAIT_TIME)
ttweets_dict.pop(key)
# TODO: add ttweet.tweet_id to some success list
if len(queue.ttweets_dict) > 0:
print(f'resting for {WAIT_TIME}s...')
await asyncio.sleep(WAIT_TIME)
except:
print('Unhandled error occurred while posting tweets from queue.')
errored = True
@@ -175,14 +124,6 @@ async def process_queue(ttweets_dict: dict) -> bool:
else:
if PROGRAM_ARGS.announce_catchup:
await TwAPI.instance.post_tweet('Finished with catch-up tweets!')
print('Updating what\'s left in ttweet_dict to queue.txt.')
with open(get_queue_path(), 'r') as f:
user_timestamps_str = get_user_timestamps_str(f)
with open(get_queue_path(), 'w') as f:
f.write(user_timestamps_str + '\n\n')
for ttweet in ttweets_dict.values():
f.write(f'{ttweet.serialize()}\n')
if errored or ttweets_posted > 0:
return False
@@ -192,30 +133,22 @@ async def process_queue(ttweets_dict: dict) -> bool:
# return False = issue occurred where we couldn't post all past tweets properly
async def run(program_args):
global PROGRAM_ARGS
global errored
global safe_to_post_tweets
PROGRAM_ARGS = program_args
# in case we we experience failure and we're left with blank queue.txt
# TODO: create TweetQueue class to organize file IO better; move all backup operations to there
queue_backup = os.path.join(util.get_project_dir(), 'queue_in_progress.txt')
queue_path = get_queue_path()
if os.path.exists(queue_backup):
print('Found old backup queue! We errored in the previous run.')
shutil.copyfile(queue_backup, queue_path)
else:
print('Creating backup queue...')
shutil.copyfile(queue_path, queue_backup)
ttq.TalentTweetQueue()
ret = None
while True:
ttweets_dict = await get_cross_talent_tweets(queue_path)
queue = ttq.TalentTweetQueue.instance
ttweets_dict = queue.ttweets_dict = await get_cross_talent_tweets()
print(f'found {len(ttweets_dict)} cross-company tweets')
try:
if safe_to_post_tweets:
os.remove(queue_backup) # keep updated queue
if await process_queue(ttweets_dict):
if await process_queue():
print('Posted no new tweets; we\'re caught up!')
return True
else:
@@ -225,4 +158,7 @@ async def run(program_args):
except:
print('Unhandled error occurred while running catch up in posting phase.')
traceback.print_exc()
return False
if errored:
return False