From 7f2dfaa3f096b1f2b6a362696f15f7c16d7aaad6 Mon Sep 17 00:00:00 2001 From: msk <15199219+muskit@users.noreply.github.com> Date: Sun, 2 Oct 2022 04:57:24 -0700 Subject: [PATCH] slight restructuring around tracking our posts --- src/catchup.py | 39 +++++++---------- src/listen.py | 7 +-- src/main.py | 6 ++- src/talenttweet.py | 1 + src/ttweetqueue.py | 106 ++++++++++++++++++++++++++++++++------------- src/twapi.py | 2 +- src/util.py | 2 +- 7 files changed, 105 insertions(+), 58 deletions(-) diff --git a/src/catchup.py b/src/catchup.py index e38a779..c3fd25e 100644 --- a/src/catchup.py +++ b/src/catchup.py @@ -23,6 +23,8 @@ errored = False ## Returns the ID of all tweets (up to limit) from a user ID. def get_user_tweets(id, since_timestamp=None, limit=None): + global safe_to_post_tweets + qrt_count = 0 tweets = list() c = twint.Config() @@ -52,7 +54,6 @@ def get_user_tweets(id, since_timestamp=None, limit=None): # Returns a list of sorted and filtered TalentTweets (should # be equivalent to queue.txt) async def get_cross_talent_tweets(): - posted_ttweets = set() # TODO: don't add TTweet to ttweets_dict if its id exists in posted_ttweets global safe_to_post_tweets queue = ttq.TalentTweetQueue.instance @@ -65,7 +66,7 @@ async def get_cross_talent_tweets(): try: tweets = get_user_tweets(talent_id, since_timestamp=queue.finished_user_timestamps.get(talent_id, None)) for tweet in tweets: - if tweet.id not in queue.ttweets_dict: + if tweet.id not in queue.ttweets_dict and tweet.id not in queue.finished_ttweets: ttweet = await tt.TalentTweet.create_from_twint_tweet(tweet) if ttweet.is_cross_company(): queue.add_ttweet(ttweet) @@ -83,8 +84,6 @@ async def get_cross_talent_tweets(): else: print('Successfully saved all tweets from online!') queue.save_file() - - return queue.get_ttweets_dict() # return False = errored or we posted at least one ttweet # return True = we didn't post a single ttweet @@ -96,28 +95,26 @@ async def process_queue() -> bool: errored = False queue = ttq.TalentTweetQueue.instance - queued_ttweets_count = len(queue.ttweets_dict) + queued_ttweets_count = queue.get_count() - if queued_ttweets_count == 0: return ttweets_posted + if queued_ttweets_count == 0: + print('post-able queue is empty!') + return True if PROGRAM_ARGS.announce_catchup: TwAPI.instance.post_tweet(text=f'Starting to catch up through {queued_ttweets_count} logged tweets.') try: - while len(queue.ttweets_dict) > 0: - key = list(queue.ttweets_dict.keys())[0] - ttweet = queue.ttweets_dict[key] - queue.good = False + while not queue.is_empty(): + ttweet = queue.get_next_ttweet() tweet_was_successful = await TwAPI.instance.post_ttweet(ttweet, is_catchup=True) - queue.ttweets_dict.pop(key) - print('saving new queue...') - queue.good = True - queue.save_file() + print('running queue.good()...') + queue.good() if tweet_was_successful: ttweets_posted += 1 print(f'({ttweets_posted}/{queued_ttweets_count}) done') - if len(queue.ttweets_dict) > 0: + if not queue.is_empty(): print(f'resting for {WAIT_TIME}s...') await asyncio.sleep(WAIT_TIME-5) print('5 second warning!') @@ -129,7 +126,7 @@ async def process_queue() -> bool: else: if PROGRAM_ARGS.announce_catchup: await TwAPI.instance.post_tweet('Finished with catch-up tweets!') - + if errored or ttweets_posted > 0: return False return True @@ -142,15 +139,11 @@ async def run(program_args): global safe_to_post_tweets PROGRAM_ARGS = program_args - # in case we we experience failure and we're left with blank queue.txt - # TODO: create TweetQueue class to organize file IO better; move all backup operations to there - ttq.TalentTweetQueue() - ret = None + queue = ttq.TalentTweetQueue.instance while True: - queue = ttq.TalentTweetQueue.instance - ttweets_dict = queue.ttweets_dict = await get_cross_talent_tweets() - print(f'found {len(ttweets_dict)} cross-company tweets') + await get_cross_talent_tweets() + print(f'{queue.get_count()} cross-company tweets to attempt sharing.') try: if safe_to_post_tweets: if await process_queue(): diff --git a/src/listen.py b/src/listen.py index b3cec26..7eee413 100644 --- a/src/listen.py +++ b/src/listen.py @@ -6,18 +6,19 @@ import tweepy from talenttweet import TalentTweet from twapi import TwAPI +import ttweetqueue as ttq import api_secrets import talent_lists as tl def on_response(resp): - id = resp.data.id ttweet = TalentTweet.create_from_v2api_response(resp) if ttweet.is_cross_company(): - print(f'Tweet {id} is cross-company! Creating post...') + print(f'Tweet {ttweet.tweet_id} is cross-company! Creating post...') asyncio.run(TwAPI.instance.post_ttweet(ttweet)) + ttq.TalentTweetQueue.instance.add_finished_tweet(ttweet.tweet_id) else: - print(f'Tweet {id} is not cross-company.') + print(f'Tweet {ttweet.tweet_id} is not cross-company.') def run(): sc = tweepy.StreamingClient(api_secrets.bearer_token()) diff --git a/src/main.py b/src/main.py index ede699f..a751d2b 100644 --- a/src/main.py +++ b/src/main.py @@ -7,6 +7,7 @@ import code import nest_asyncio import talent_lists +import ttweetqueue as ttq import api_secrets import catchup import listen @@ -97,11 +98,14 @@ def main(): ## We expect to run in some mode now. # Initialize shared API instance - twApi = TwAPI.instance = TwAPI() + TwAPI() # Initialize talent account lists talent_lists.init() + # Initialize queue files system + ttq.TalentTweetQueue() + ## Asynchronous execution nest_asyncio.apply() asyncio.run(async_main()) diff --git a/src/talenttweet.py b/src/talenttweet.py index 9faac63..b7ced5a 100644 --- a/src/talenttweet.py +++ b/src/talenttweet.py @@ -71,6 +71,7 @@ class TalentTweet: # FIXME: resultant tweets don't show timezone properly date_time = datetime.datetime.strptime(tweet.datetime, '%Y-%m-%d %H:%M:%S %Z') + print(date_time) return TalentTweet(tweet_id=tweet.id, author_id=tweet.user_id, date_time=date_time, mrq=(mentions, reply_to, quoted_id)) @staticmethod diff --git a/src/ttweetqueue.py b/src/ttweetqueue.py index 3c804f1..33a1bf6 100644 --- a/src/ttweetqueue.py +++ b/src/ttweetqueue.py @@ -15,10 +15,13 @@ class TalentTweetQueue: TalentTweetQueue.instance = self self.queue_path = util.get_queue_path() self.queue_backup_path = util.get_queue_backup_path() + self.current_ttweet_path = f'{util.get_project_dir()}/_current_ttweet.txt' + self.finished_ttweets_path = f'{util.get_project_dir()}/finished_ttweets.txt' + self.is_good = True + self.__sorted = False self.finished_user_timestamps = dict() self.ttweets_dict = dict() - self.good = False # if true, overwrite queue.txt on destruction - self.__sorted = False + self.finished_ttweets = list() ## file check, backup copy if os.path.exists(self.queue_backup_path): @@ -30,59 +33,104 @@ class TalentTweetQueue: ## initialize structures # user timestamps - with open(self.queue_path, 'r') as f: - for line in f: - tokens = line.split() - if len(tokens) == 0: continue + try: + with open(self.queue_path, 'r') as f: + for line in f: + tokens = line.split() + if len(tokens) == 0: continue - if tokens[0][0] != '#': - print(f'Stopped finding user timestamps at {line}') - # reached end of accounts list - break - if tokens[2] != '-1': - self.finished_user_timestamps[int(tokens[1])] = float(tokens[2]) - - # tweets - with open(self.queue_path, 'r') as f: # reset seek head - # Get existing queued TalentTweets - for line in f: - tokens = line.split() - if len(tokens) == 0 or tokens[0][0] == '#': - continue - ttweet = tt.TalentTweet.deserialize(line) - self.ttweets_dict[ttweet.tweet_id] = ttweet - print(f'Found {len(self.finished_user_timestamps)} scraped accounts and {len(self.ttweets_dict)} tweets in queue.') + if tokens[0][0] != '#': + print(f'Stopped finding user timestamps at {line}') + # reached end of accounts list + break + if tokens[2] != '-1': + self.finished_user_timestamps[int(tokens[1])] = float(tokens[2]) + except: pass + # ttweets + try: + with open(self.queue_path, 'r') as f: # reset seek head + # Get existing queued TalentTweets + for line in f: + tokens = line.split() + if len(tokens) == 0 or tokens[0][0] == '#': + continue + ttweet = tt.TalentTweet.deserialize(line) + self.ttweets_dict[ttweet.tweet_id] = ttweet + print(f'Found {len(self.finished_user_timestamps)} scraped accounts and {len(self.ttweets_dict)} tweets in queue.') + except: pass + # finished ttweets + try: + with open(self.finished_ttweets_path, 'r') as f: + for line in f: + self.finished_ttweets.append(int(line)) + except: pass + + + def is_empty(self): + return self.get_count() <= 0 def add_ttweet(self, ttweet): self.__sorted = False self.ttweets_dict[ttweet.tweet_id] = ttweet def get_ttweet(self, id): - return self.ttweets_dict[id] + return self.ttweets_dict[id] + + def get_next_ttweet(self): + if os.path.exists(self.current_ttweet_path): + with open(self.current_ttweet_path, 'r') as f: + return tt.TalentTweet.deserialize(f.readline()) + + self.__sort_ttweets_dict() + key = list(self.ttweets_dict.keys())[0] + ttweet = self.ttweets_dict.pop(key) + with open(self.current_ttweet_path, 'w') as f: + f.write(ttweet.serialize()) + self.is_good = False + return ttweet - def get_ttweets_dict(self): - self.__sort_ttweets_dict() if not self.__sorted else None - return self.ttweets_dict + def get_count(self): + return len(self.ttweets_dict) + + ## Call when the TalentTweet retrieved from get_next_ttweet() was + # posted successfully. + def good(self): + with open(self.current_ttweet_path, 'r') as f: + ttweet = tt.TalentTweet.deserialize(f.readline()) + + self.add_finished_tweet(ttweet.tweet_id) + os.remove(self.current_ttweet_path) + self.save_file() + self.is_good = True # overwrite queue.txt def save_file(self): + shutil.copyfile(self.queue_path, self.queue_backup_path) self.__sort_ttweets_dict() with open(self.queue_path, 'w') as f: # write timestamps for (id, timestamp) in self.finished_user_timestamps.items(): f.write(f'# {id} {timestamp}\n') + f.write('\n') + # write sorted ttweets for ttweet in self.ttweets_dict.values(): f.write(ttweet.serialize() + '\n') + + def add_finished_tweet(self, id): + self.finished_ttweets.append(id) + with open(self.finished_ttweets_path, 'a') as f: + f.write(f'{id}\n') def __sort_ttweets_dict(self): - self.ttweets_dict = dict(sorted(self.ttweets_dict.items())) + if not self.__sorted: + self.ttweets_dict = dict(sorted(self.ttweets_dict.items())) self.__sorted = True # destructor def __del__(self): - if self.good: + if self.is_good: print('Ended in good state, deleting backup queue...') os.remove(self.queue_backup_path) else: diff --git a/src/twapi.py b/src/twapi.py index 5fdc126..e6038a8 100644 --- a/src/twapi.py +++ b/src/twapi.py @@ -1,5 +1,6 @@ import datetime import traceback +import asyncio import tweepy @@ -223,7 +224,6 @@ class TwAPI: text = create_text() try: - # print('creating reply img') # media_ids = [await self.get_ttweet_image_media_id(ttweet)] print('posting main tweet...', end='') twt_resp = await self.post_tweet(text) diff --git a/src/util.py b/src/util.py index fb82570..d4f9bd0 100644 --- a/src/util.py +++ b/src/util.py @@ -131,4 +131,4 @@ def get_user_id_online(username) -> int: user = twint.output.users_list[0] return user.id except: - return -1 + return -1 \ No newline at end of file