slight restructuring around tracking our posts

This commit is contained in:
msk
2022-10-02 04:57:24 -07:00
parent 65b554f014
commit 7f2dfaa3f0
7 changed files with 105 additions and 58 deletions
+15 -22
View File
@@ -23,6 +23,8 @@ errored = False
## Returns the ID of all tweets (up to limit) from a user ID. ## Returns the ID of all tweets (up to limit) from a user ID.
def get_user_tweets(id, since_timestamp=None, limit=None): def get_user_tweets(id, since_timestamp=None, limit=None):
global safe_to_post_tweets
qrt_count = 0 qrt_count = 0
tweets = list() tweets = list()
c = twint.Config() c = twint.Config()
@@ -52,7 +54,6 @@ def get_user_tweets(id, since_timestamp=None, limit=None):
# Returns a list of sorted and filtered TalentTweets (should # Returns a list of sorted and filtered TalentTweets (should
# be equivalent to queue.txt) # be equivalent to queue.txt)
async def get_cross_talent_tweets(): async def get_cross_talent_tweets():
posted_ttweets = set() # TODO: don't add TTweet to ttweets_dict if its id exists in posted_ttweets
global safe_to_post_tweets global safe_to_post_tweets
queue = ttq.TalentTweetQueue.instance queue = ttq.TalentTweetQueue.instance
@@ -65,7 +66,7 @@ async def get_cross_talent_tweets():
try: try:
tweets = get_user_tweets(talent_id, since_timestamp=queue.finished_user_timestamps.get(talent_id, None)) tweets = get_user_tweets(talent_id, since_timestamp=queue.finished_user_timestamps.get(talent_id, None))
for tweet in tweets: for tweet in tweets:
if tweet.id not in queue.ttweets_dict: if tweet.id not in queue.ttweets_dict and tweet.id not in queue.finished_ttweets:
ttweet = await tt.TalentTweet.create_from_twint_tweet(tweet) ttweet = await tt.TalentTweet.create_from_twint_tweet(tweet)
if ttweet.is_cross_company(): if ttweet.is_cross_company():
queue.add_ttweet(ttweet) queue.add_ttweet(ttweet)
@@ -84,8 +85,6 @@ async def get_cross_talent_tweets():
print('Successfully saved all tweets from online!') print('Successfully saved all tweets from online!')
queue.save_file() queue.save_file()
return queue.get_ttweets_dict()
# return False = errored or we posted at least one ttweet # return False = errored or we posted at least one ttweet
# return True = we didn't post a single ttweet # return True = we didn't post a single ttweet
async def process_queue() -> bool: async def process_queue() -> bool:
@@ -96,28 +95,26 @@ async def process_queue() -> bool:
errored = False errored = False
queue = ttq.TalentTweetQueue.instance queue = ttq.TalentTweetQueue.instance
queued_ttweets_count = len(queue.ttweets_dict) queued_ttweets_count = queue.get_count()
if queued_ttweets_count == 0: return ttweets_posted if queued_ttweets_count == 0:
print('post-able queue is empty!')
return True
if PROGRAM_ARGS.announce_catchup: if PROGRAM_ARGS.announce_catchup:
TwAPI.instance.post_tweet(text=f'Starting to catch up through {queued_ttweets_count} logged tweets.') TwAPI.instance.post_tweet(text=f'Starting to catch up through {queued_ttweets_count} logged tweets.')
try: try:
while len(queue.ttweets_dict) > 0: while not queue.is_empty():
key = list(queue.ttweets_dict.keys())[0] ttweet = queue.get_next_ttweet()
ttweet = queue.ttweets_dict[key]
queue.good = False
tweet_was_successful = await TwAPI.instance.post_ttweet(ttweet, is_catchup=True) tweet_was_successful = await TwAPI.instance.post_ttweet(ttweet, is_catchup=True)
queue.ttweets_dict.pop(key)
print('saving new queue...') print('running queue.good()...')
queue.good = True queue.good()
queue.save_file()
if tweet_was_successful: if tweet_was_successful:
ttweets_posted += 1 ttweets_posted += 1
print(f'({ttweets_posted}/{queued_ttweets_count}) done') print(f'({ttweets_posted}/{queued_ttweets_count}) done')
if len(queue.ttweets_dict) > 0: if not queue.is_empty():
print(f'resting for {WAIT_TIME}s...') print(f'resting for {WAIT_TIME}s...')
await asyncio.sleep(WAIT_TIME-5) await asyncio.sleep(WAIT_TIME-5)
print('5 second warning!') print('5 second warning!')
@@ -142,15 +139,11 @@ async def run(program_args):
global safe_to_post_tweets global safe_to_post_tweets
PROGRAM_ARGS = program_args PROGRAM_ARGS = program_args
# in case we we experience failure and we're left with blank queue.txt
# TODO: create TweetQueue class to organize file IO better; move all backup operations to there
ttq.TalentTweetQueue()
ret = None ret = None
queue = ttq.TalentTweetQueue.instance
while True: while True:
queue = ttq.TalentTweetQueue.instance await get_cross_talent_tweets()
ttweets_dict = queue.ttweets_dict = await get_cross_talent_tweets() print(f'{queue.get_count()} cross-company tweets to attempt sharing.')
print(f'found {len(ttweets_dict)} cross-company tweets')
try: try:
if safe_to_post_tweets: if safe_to_post_tweets:
if await process_queue(): if await process_queue():
+4 -3
View File
@@ -6,18 +6,19 @@ import tweepy
from talenttweet import TalentTweet from talenttweet import TalentTweet
from twapi import TwAPI from twapi import TwAPI
import ttweetqueue as ttq
import api_secrets import api_secrets
import talent_lists as tl import talent_lists as tl
def on_response(resp): def on_response(resp):
id = resp.data.id
ttweet = TalentTweet.create_from_v2api_response(resp) ttweet = TalentTweet.create_from_v2api_response(resp)
if ttweet.is_cross_company(): if ttweet.is_cross_company():
print(f'Tweet {id} is cross-company! Creating post...') print(f'Tweet {ttweet.tweet_id} is cross-company! Creating post...')
asyncio.run(TwAPI.instance.post_ttweet(ttweet)) asyncio.run(TwAPI.instance.post_ttweet(ttweet))
ttq.TalentTweetQueue.instance.add_finished_tweet(ttweet.tweet_id)
else: else:
print(f'Tweet {id} is not cross-company.') print(f'Tweet {ttweet.tweet_id} is not cross-company.')
def run(): def run():
sc = tweepy.StreamingClient(api_secrets.bearer_token()) sc = tweepy.StreamingClient(api_secrets.bearer_token())
+5 -1
View File
@@ -7,6 +7,7 @@ import code
import nest_asyncio import nest_asyncio
import talent_lists import talent_lists
import ttweetqueue as ttq
import api_secrets import api_secrets
import catchup import catchup
import listen import listen
@@ -97,11 +98,14 @@ def main():
## We expect to run in some mode now. ## We expect to run in some mode now.
# Initialize shared API instance # Initialize shared API instance
twApi = TwAPI.instance = TwAPI() TwAPI()
# Initialize talent account lists # Initialize talent account lists
talent_lists.init() talent_lists.init()
# Initialize queue files system
ttq.TalentTweetQueue()
## Asynchronous execution ## Asynchronous execution
nest_asyncio.apply() nest_asyncio.apply()
asyncio.run(async_main()) asyncio.run(async_main())
+1
View File
@@ -71,6 +71,7 @@ class TalentTweet:
# FIXME: resultant tweets don't show timezone properly # FIXME: resultant tweets don't show timezone properly
date_time = datetime.datetime.strptime(tweet.datetime, '%Y-%m-%d %H:%M:%S %Z') date_time = datetime.datetime.strptime(tweet.datetime, '%Y-%m-%d %H:%M:%S %Z')
print(date_time)
return TalentTweet(tweet_id=tweet.id, author_id=tweet.user_id, date_time=date_time, mrq=(mentions, reply_to, quoted_id)) return TalentTweet(tweet_id=tweet.id, author_id=tweet.user_id, date_time=date_time, mrq=(mentions, reply_to, quoted_id))
@staticmethod @staticmethod
+76 -28
View File
@@ -15,10 +15,13 @@ class TalentTweetQueue:
TalentTweetQueue.instance = self TalentTweetQueue.instance = self
self.queue_path = util.get_queue_path() self.queue_path = util.get_queue_path()
self.queue_backup_path = util.get_queue_backup_path() self.queue_backup_path = util.get_queue_backup_path()
self.current_ttweet_path = f'{util.get_project_dir()}/_current_ttweet.txt'
self.finished_ttweets_path = f'{util.get_project_dir()}/finished_ttweets.txt'
self.is_good = True
self.__sorted = False
self.finished_user_timestamps = dict() self.finished_user_timestamps = dict()
self.ttweets_dict = dict() self.ttweets_dict = dict()
self.good = False # if true, overwrite queue.txt on destruction self.finished_ttweets = list()
self.__sorted = False
## file check, backup copy ## file check, backup copy
if os.path.exists(self.queue_backup_path): if os.path.exists(self.queue_backup_path):
@@ -30,59 +33,104 @@ class TalentTweetQueue:
## initialize structures ## initialize structures
# user timestamps # user timestamps
with open(self.queue_path, 'r') as f: try:
for line in f: with open(self.queue_path, 'r') as f:
tokens = line.split() for line in f:
if len(tokens) == 0: continue tokens = line.split()
if len(tokens) == 0: continue
if tokens[0][0] != '#': if tokens[0][0] != '#':
print(f'Stopped finding user timestamps at {line}') print(f'Stopped finding user timestamps at {line}')
# reached end of accounts list # reached end of accounts list
break break
if tokens[2] != '-1': if tokens[2] != '-1':
self.finished_user_timestamps[int(tokens[1])] = float(tokens[2]) self.finished_user_timestamps[int(tokens[1])] = float(tokens[2])
except: pass
# ttweets
try:
with open(self.queue_path, 'r') as f: # reset seek head
# Get existing queued TalentTweets
for line in f:
tokens = line.split()
if len(tokens) == 0 or tokens[0][0] == '#':
continue
ttweet = tt.TalentTweet.deserialize(line)
self.ttweets_dict[ttweet.tweet_id] = ttweet
print(f'Found {len(self.finished_user_timestamps)} scraped accounts and {len(self.ttweets_dict)} tweets in queue.')
except: pass
# finished ttweets
try:
with open(self.finished_ttweets_path, 'r') as f:
for line in f:
self.finished_ttweets.append(int(line))
except: pass
# tweets
with open(self.queue_path, 'r') as f: # reset seek head def is_empty(self):
# Get existing queued TalentTweets return self.get_count() <= 0
for line in f:
tokens = line.split()
if len(tokens) == 0 or tokens[0][0] == '#':
continue
ttweet = tt.TalentTweet.deserialize(line)
self.ttweets_dict[ttweet.tweet_id] = ttweet
print(f'Found {len(self.finished_user_timestamps)} scraped accounts and {len(self.ttweets_dict)} tweets in queue.')
def add_ttweet(self, ttweet): def add_ttweet(self, ttweet):
self.__sorted = False self.__sorted = False
self.ttweets_dict[ttweet.tweet_id] = ttweet self.ttweets_dict[ttweet.tweet_id] = ttweet
def get_ttweet(self, id): def get_ttweet(self, id):
return self.ttweets_dict[id] return self.ttweets_dict[id]
def get_ttweets_dict(self): def get_next_ttweet(self):
self.__sort_ttweets_dict() if not self.__sorted else None if os.path.exists(self.current_ttweet_path):
return self.ttweets_dict with open(self.current_ttweet_path, 'r') as f:
return tt.TalentTweet.deserialize(f.readline())
self.__sort_ttweets_dict()
key = list(self.ttweets_dict.keys())[0]
ttweet = self.ttweets_dict.pop(key)
with open(self.current_ttweet_path, 'w') as f:
f.write(ttweet.serialize())
self.is_good = False
return ttweet
def get_count(self):
return len(self.ttweets_dict)
## Call when the TalentTweet retrieved from get_next_ttweet() was
# posted successfully.
def good(self):
with open(self.current_ttweet_path, 'r') as f:
ttweet = tt.TalentTweet.deserialize(f.readline())
self.add_finished_tweet(ttweet.tweet_id)
os.remove(self.current_ttweet_path)
self.save_file()
self.is_good = True
# overwrite queue.txt # overwrite queue.txt
def save_file(self): def save_file(self):
shutil.copyfile(self.queue_path, self.queue_backup_path)
self.__sort_ttweets_dict() self.__sort_ttweets_dict()
with open(self.queue_path, 'w') as f: with open(self.queue_path, 'w') as f:
# write timestamps # write timestamps
for (id, timestamp) in self.finished_user_timestamps.items(): for (id, timestamp) in self.finished_user_timestamps.items():
f.write(f'# {id} {timestamp}\n') f.write(f'# {id} {timestamp}\n')
f.write('\n') f.write('\n')
# write sorted ttweets # write sorted ttweets
for ttweet in self.ttweets_dict.values(): for ttweet in self.ttweets_dict.values():
f.write(ttweet.serialize() + '\n') f.write(ttweet.serialize() + '\n')
def add_finished_tweet(self, id):
self.finished_ttweets.append(id)
with open(self.finished_ttweets_path, 'a') as f:
f.write(f'{id}\n')
def __sort_ttweets_dict(self): def __sort_ttweets_dict(self):
self.ttweets_dict = dict(sorted(self.ttweets_dict.items())) if not self.__sorted:
self.ttweets_dict = dict(sorted(self.ttweets_dict.items()))
self.__sorted = True self.__sorted = True
# destructor # destructor
def __del__(self): def __del__(self):
if self.good: if self.is_good:
print('Ended in good state, deleting backup queue...') print('Ended in good state, deleting backup queue...')
os.remove(self.queue_backup_path) os.remove(self.queue_backup_path)
else: else:
+1 -1
View File
@@ -1,5 +1,6 @@
import datetime import datetime
import traceback import traceback
import asyncio
import tweepy import tweepy
@@ -223,7 +224,6 @@ class TwAPI:
text = create_text() text = create_text()
try: try:
# print('creating reply img')
# media_ids = [await self.get_ttweet_image_media_id(ttweet)] # media_ids = [await self.get_ttweet_image_media_id(ttweet)]
print('posting main tweet...', end='') print('posting main tweet...', end='')
twt_resp = await self.post_tweet(text) twt_resp = await self.post_tweet(text)