diff --git a/src/catchup.py b/src/catchup.py index b096493..601af04 100644 --- a/src/catchup.py +++ b/src/catchup.py @@ -15,23 +15,11 @@ from util import * from talent_lists import * from twapi import TwAPI import talenttweet as tt +import ttweetqueue as ttq PROGRAM_ARGS = None safe_to_post_tweets = True - -def write_user_timestamp(user_id, file, timestamp = None, error = False): - if timestamp is None: - timestamp = datetime.datetime.now().timestamp() - - file.write(f'# {user_id} {timestamp if not error else "-1"}\n') - pass - -def get_queue_path(): - return f'{util.get_project_dir()}/queue.txt' - -def get_local_queue(): - # f = open(os.path.join(get_project_dir(), 'queue.txt')) - pass +errored = False ## Returns the ID of all tweets (up to limit) from a user ID. def get_user_tweets(id, since_timestamp=None, limit=None): @@ -51,6 +39,7 @@ def get_user_tweets(id, since_timestamp=None, limit=None): twint.run.Search(c) except: print(f'Had trouble getting tweets from {user_str}') + safe_to_post_tweets = False traceback.print_exc() for twt in tweets: @@ -60,114 +49,74 @@ def get_user_tweets(id, since_timestamp=None, limit=None): print(f'Scraped {len(tweets)} tweets, {qrt_count} of which are quote tweets.') return tweets -# Returns dict of accounts that successfully caught up. -# LINE FORMAT: "# {user_id} {status_num} {UNIX_timestamp} -def get_finished_user_timestamps(queue_file): - results = dict() - for line in queue_file: - tokens = line.split() - if len(tokens) == 0: continue - - if tokens[0][0] != '#': - print(f'{line} is our stopper!') - # reached end of accounts list - break - if tokens[2] != '-1': - results[int(tokens[1])] = float(tokens[2]) - return results - -def get_user_timestamps_str(queue_file): - results = str() - for line in queue_file: - tokens = line.split() - if len(tokens) != 3 or tokens[0][0] != '#': - # reached end of accounts list - break - results += f'{line}\n' - return results[:-1] - -# If queue.txt doesn't exist, creates and populates it. # Returns a list of sorted and filtered TalentTweets (should # be equivalent to queue.txt) -async def get_cross_talent_tweets(queue_path): - finished_user_timestamps = dict() - ttweets_dict = dict() +async def get_cross_talent_tweets(): posted_ttweets = set() # TODO: don't add TTweet to ttweets_dict if its id exists in posted_ttweets global safe_to_post_tweets - # Populate structures with existing data from queue.txt - try: - with open(queue_path, 'r') as f: - finished_user_timestamps = get_finished_user_timestamps(f) - - with open(queue_path, 'r') as f: # reset seek head - # Get existing queued TalentTweets - for line in f: - tokens = line.split() - if len(tokens) == 0 or tokens[0][0] == '#': - continue - ttweet = tt.TalentTweet.deserialize(line) - ttweets_dict[ttweet.tweet_id] = ttweet - print(f'Found {len(finished_user_timestamps)} scraped accounts and {len(ttweets_dict)} tweets.') - except FileNotFoundError: - print('queue.txt not found.') + queue = ttq.TalentTweetQueue.instance # Begin getting tweets from online - with open(queue_path, 'w') as f: - print('Pulling tweets from online!') - try: - for i, (talent_id, talent_username) in enumerate(talent_lists.talents.items()): - print(f'[{i+1}/{len(talent_lists.talents)}] {talent_username}-----------------------------------') - try: - # tweets = get_user_tweets(talent_id, since_timestamp=1663698621) # shorten test runs - tweets = get_user_tweets(talent_id, since_timestamp=finished_user_timestamps.get(talent_id, None)) - for tweet in tweets: - if tweet.id not in ttweets_dict: - ttweet = await tt.TalentTweet.create_from_twint_tweet(tweet) - if ttweet.is_cross_company(): - ttweets_dict[ttweet.tweet_id] = ttweet - except: - print('Error occurred processing tweet data.') - safe_to_post_tweets = False - print(traceback.format_exc()) - write_user_timestamp(user_id=talent_id, file=f, error=True) - else: - write_user_timestamp(user_id=talent_id, file=f) - f.write('\n') - ttweets_dict = dict(sorted(ttweets_dict.items())) - for ttweet in ttweets_dict.values(): - f.write(f'{ttweet.serialize()}\n') - except: - print('Unhandled error occurred while pulling tweets.') - traceback.print_exc() - print('Saving queue.txt and exiting.') - safe_to_post_tweets = False + print('Pulling tweets from online!') + try: + for i, (talent_id, talent_username) in enumerate(talent_lists.talents.items()): + print(f'[{i+1}/{len(talent_lists.talents)}] {talent_username}-----------------------------------') + try: + tweets = get_user_tweets(talent_id, since_timestamp=queue.finished_user_timestamps.get(talent_id, None)) + for tweet in tweets: + if tweet.id not in queue.ttweets_dict: + ttweet = await tt.TalentTweet.create_from_twint_tweet(tweet) + if ttweet.is_cross_company(): + queue.add_ttweet(ttweet) + except: + print('Error occurred processing tweet data.') + safe_to_post_tweets = False + print(traceback.format_exc()) + queue.finished_user_timestamps[talent_id] = -1 + else: + queue.finished_user_timestamps[talent_id] = util.get_current_timestamp() + except: + print('Unhandled error occurred while pulling tweets.') + traceback.print_exc() + safe_to_post_tweets = False + else: + print('Successfully saved all tweets from online!') + queue.save_file() - return ttweets_dict + return queue.get_ttweets_dict() # return False = errored or we posted at least one ttweet # return True = we didn't post a single ttweet -async def process_queue(ttweets_dict: dict) -> bool: +async def process_queue() -> bool: global PROGRAM_ARGS + global errored WAIT_TIME = 30 ttweets_posted = 0 errored = False - if len(ttweets_dict) == 0: return ttweets_posted + queue = ttq.TalentTweetQueue.instance + + if len(queue.ttweets_dict) == 0: return ttweets_posted if PROGRAM_ARGS.announce_catchup: TwAPI.instance.post_tweet(text=f'Starting to catch up through {len(ttweets_dict)} logged tweets.') try: - while len(ttweets_dict) > 0: - key = list(ttweets_dict.keys())[0] - ttweet = ttweets_dict[key] - if await TwAPI.instance.post_ttweet(ttweet, is_catchup=True): + while len(queue.ttweets_dict) > 0: + key = list(queue.ttweets_dict.keys())[0] + ttweet = queue.ttweets_dict[key] + queue.good = False + tweet_was_successful = await TwAPI.instance.post_ttweet(ttweet, is_catchup=True) + queue.ttweets_dict.pop(key) + if tweet_was_successful: + print('saving new queue...') + queue.good = True + queue.save_file() ttweets_posted += 1 - print(f'resting for {WAIT_TIME}s...') - await asyncio.sleep(WAIT_TIME) - ttweets_dict.pop(key) - # TODO: add ttweet.tweet_id to some success list + if len(queue.ttweets_dict) > 0: + print(f'resting for {WAIT_TIME}s...') + await asyncio.sleep(WAIT_TIME) except: print('Unhandled error occurred while posting tweets from queue.') errored = True @@ -175,14 +124,6 @@ async def process_queue(ttweets_dict: dict) -> bool: else: if PROGRAM_ARGS.announce_catchup: await TwAPI.instance.post_tweet('Finished with catch-up tweets!') - - print('Updating what\'s left in ttweet_dict to queue.txt.') - with open(get_queue_path(), 'r') as f: - user_timestamps_str = get_user_timestamps_str(f) - with open(get_queue_path(), 'w') as f: - f.write(user_timestamps_str + '\n\n') - for ttweet in ttweets_dict.values(): - f.write(f'{ttweet.serialize()}\n') if errored or ttweets_posted > 0: return False @@ -192,30 +133,22 @@ async def process_queue(ttweets_dict: dict) -> bool: # return False = issue occurred where we couldn't post all past tweets properly async def run(program_args): global PROGRAM_ARGS + global errored global safe_to_post_tweets PROGRAM_ARGS = program_args # in case we we experience failure and we're left with blank queue.txt # TODO: create TweetQueue class to organize file IO better; move all backup operations to there - queue_backup = os.path.join(util.get_project_dir(), 'queue_in_progress.txt') - queue_path = get_queue_path() - - if os.path.exists(queue_backup): - print('Found old backup queue! We errored in the previous run.') - shutil.copyfile(queue_backup, queue_path) - else: - print('Creating backup queue...') - shutil.copyfile(queue_path, queue_backup) + ttq.TalentTweetQueue() ret = None - while True: - ttweets_dict = await get_cross_talent_tweets(queue_path) + queue = ttq.TalentTweetQueue.instance + ttweets_dict = queue.ttweets_dict = await get_cross_talent_tweets() print(f'found {len(ttweets_dict)} cross-company tweets') try: if safe_to_post_tweets: - os.remove(queue_backup) # keep updated queue - if await process_queue(ttweets_dict): + if await process_queue(): print('Posted no new tweets; we\'re caught up!') return True else: @@ -225,4 +158,7 @@ async def run(program_args): except: print('Unhandled error occurred while running catch up in posting phase.') traceback.print_exc() + return False + + if errored: return False \ No newline at end of file diff --git a/src/main.py b/src/main.py index 53d7a7b..2ed953c 100644 --- a/src/main.py +++ b/src/main.py @@ -25,6 +25,7 @@ def init_argparse(): help=MODES_HELP_STR) p.add_argument('--show-tokens', action='store_true', help='[DO NOT USE IN PUBLIC SETTING] print stored tokens from secrets.ini') p.add_argument('--announce-catchup', action='store_true', help='In catch-up mode, post a tweet announcing catch-up mode.') + p.add_argument('--auto-listen', action='store_true', help='In catch-up mode, transition to listen mode after successfuly catching up.') p.add_argument('--no-delay', action='store_true', help='In self-destruct mode, clear tweets without safety waiting.') return p @@ -50,7 +51,7 @@ async def async_main(): await listen.run() case 'c' | 'catchup': print('RUNNING IN CATCH-UP MODE\n') - if await catchup.run(PROGRAM_ARGS): + if await catchup.run(PROGRAM_ARGS) and PROGRAM_ARGS.auto_listen: print('CATCH-UP MODE DONE, GOING INTO LISTEN MODE') await listen.run() case 'd' | 'delete-all': diff --git a/src/ttweetqueue.py b/src/ttweetqueue.py new file mode 100644 index 0000000..0e3c33c --- /dev/null +++ b/src/ttweetqueue.py @@ -0,0 +1,87 @@ +# TODO: move queue structures and file handling here +import os +import shutil + +import util +import talenttweet as tt + +# User timestamps line format: +# # {user_id} {status_num} {UNIX_timestamp} + +class TalentTweetQueue: + instance = None + + def __init__(self): + TalentTweetQueue.instance = self + self.queue_path = util.get_queue_path() + self.queue_backup_path = util.get_queue_backup_path() + self.finished_user_timestamps = dict() + self.ttweets_dict = dict() + self.good = False # if true, overwrite queue.txt on destruction + self.__sorted = False + + ## file check, backup copy + if os.path.exists(self.queue_backup_path): + print('Found old backup queue! We errored in the previous run.') + shutil.copyfile(self.queue_backup_path, self.queue_path) + elif os.path.exists(self.queue_path): + print('Creating backup queue...') + shutil.copyfile(self.queue_path, self.queue_backup_path) + + ## initialize structures + # user timestamps + with open(self.queue_path, 'r') as f: + for line in f: + tokens = line.split() + if len(tokens) == 0: continue + + if tokens[0][0] != '#': + print(f'Stopped finding user timestamps at {line}') + # reached end of accounts list + break + if tokens[2] != '-1': + self.finished_user_timestamps[int(tokens[1])] = float(tokens[2]) + + # tweets + with open(self.queue_path, 'r') as f: # reset seek head + # Get existing queued TalentTweets + for line in f: + tokens = line.split() + if len(tokens) == 0 or tokens[0][0] == '#': + continue + ttweet = tt.TalentTweet.deserialize(line) + self.ttweets_dict[ttweet.tweet_id] = ttweet + print(f'Found {len(self.finished_user_timestamps)} scraped accounts and {len(self.ttweets_dict)} tweets in queue.') + + def add_ttweet(self, ttweet): + self.__sorted = False + self.ttweets_dict[ttweet.tweet_id] = ttweet + + def get_ttweet(self, id): + return self.ttweets_dict[id] + + def get_ttweets_dict(self): + self.__sort_ttweets_dict() if not self.__sorted else None + return self.ttweets_dict + + # overwrite queue.txt + def save_file(self): + self.__sort_ttweets_dict() + with open(self.queue_path, 'w') as f: + # write timestamps + for (id, timestamp) in self.finished_user_timestamps.items(): + f.write(f'# {id} {timestamp}\n') + f.write('\n') + # write sorted ttweets + for ttweet in self.ttweets_dict.values(): + f.write(ttweet.serialize() + '\n') + + def __sort_ttweets_dict(self): + self.ttweets_dict = dict(sorted(self.ttweets_dict.items())) + self.__sorted = True + + # destructor + def __del__(self): + if self.good: + print('Ended in good state, deleting backup queue...') + os.remove(self.queue_backup_path) \ No newline at end of file diff --git a/src/twapi.py b/src/twapi.py index 1d3810d..8d07315 100644 --- a/src/twapi.py +++ b/src/twapi.py @@ -67,7 +67,11 @@ class TwAPI: access_token=api_secrets.access_token(), access_token_secret=api_secrets.access_secret() ) ) - self.me = self.client.get_me().data + try: + self.me = self.client.get_me().data + except Exception as e: + print('Did you setup secrets.ini?') + raise e print(f'Assuming the account of @{self.me.data["username"]} ({self.me["id"]})') ## ---[COMMENT OUT WHEN NOT IN USE]--- @@ -165,6 +169,8 @@ class TwAPI: media = self.api.media_upload(img) return media.media_id + # return True = successfully posted a single ttweet + # return False = did not post ttweet (duplicate) async def post_ttweet(self, ttweet: tt.TalentTweet, is_catchup=False): print(f'------{ttweet.tweet_id} ({util.get_username_local(ttweet.author_id)})------') diff --git a/src/tweetqueue.py b/src/tweetqueue.py deleted file mode 100644 index d326b10..0000000 --- a/src/tweetqueue.py +++ /dev/null @@ -1 +0,0 @@ -# TODO: move queue structures and file handling here \ No newline at end of file diff --git a/src/util.py b/src/util.py index 9e4ac63..817efae 100644 --- a/src/util.py +++ b/src/util.py @@ -4,6 +4,7 @@ import os import traceback import datetime +import tweepy import pytz import twint import twapi @@ -16,6 +17,12 @@ import talent_lists def get_project_dir(): return os.path.join(os.path.dirname(__file__), os.pardir) +def get_queue_path(): + return f'{get_project_dir()}/queue.txt' + +def get_queue_backup_path(): + return f'{get_project_dir()}/_queue_backup.txt' + def clamp(n, smallest, largest): return max(smallest, min(n, largest)) @@ -30,6 +37,9 @@ def timestamp_to_tdate(timestamp=None): timestamp = datetime.datetime.now().timestamp() return datetime_to_tdate(datetime.datetime.fromtimestamp(timestamp, tz=pytz.utc)) +def get_current_timestamp(): + return datetime.datetime.now().timestamp() + def get_key_from_value(d, val): keys = [k for k, v in d.items() if v == val] if keys: @@ -90,6 +100,8 @@ def get_username_online(id, default=None): try: resp = twapi.TwAPI.instance.client.get_user(id=id) return resp.data.username + except tweepy.TooManyRequests: + return str(default) if default is not None else f'id:{id}' except: print(f'Unhandled error retrieving username for {id}!') traceback.print_exc()