diff --git a/.gitignore b/.gitignore index a6213d7..bf619a7 100644 --- a/.gitignore +++ b/.gitignore @@ -143,9 +143,5 @@ cython_debug/ .vscode # project-specific -*.png -*.json -queue.txt -_queue_backup.txt -finished_ttweets.txt -_current_ttweet.txt \ No newline at end of file +run/ +*.json \ No newline at end of file diff --git a/README.md b/README.md index ba3dbc3..0b961c2 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ This is the authentication token obtained from a browser when signed in on the T ``` web_auth_token= ``` -### Example contents of `.env` without values +### Example `.env` without values ``` scraper_username0= scraper_password0= diff --git a/src/account_pool.py b/src/account_pool.py index f51decf..bf53004 100644 --- a/src/account_pool.py +++ b/src/account_pool.py @@ -1,23 +1,24 @@ +import os from dotenv import dotenv_values +from util import working_path + ## Track multiple accounts in a pool, cycling to the next one when requested. class AccountPool: def __init__(self): self.__accounts: list[tuple[str, str]] = list() self.__idx = 0 - creds = dotenv_values() + creds = dotenv_values(working_path(file=".env")) i = 0 while True: - if f'scraper_username{i}' in creds \ - and f'scraper_password{i}' in creds: - self.__accounts.append(( - creds[f'scraper_username{i}'], - creds[f'scraper_password{i}'] - )) + if f"scraper_username{i}" in creds and f"scraper_password{i}" in creds: + self.__accounts.append( + (creds[f"scraper_username{i}"], creds[f"scraper_password{i}"]) + ) i += 1 else: break - + def use_index(self, idx): self.__idx = idx return self.current() @@ -26,7 +27,7 @@ class AccountPool: if 0 <= self.__idx < len(self.__accounts): return self.__accounts[self.__idx] return None - + def next(self) -> tuple[str, str] | None: self.__idx += 1 if self.__idx >= len(self.__accounts): diff --git a/src/catchup.py b/src/catchup.py index ed4185c..5d77ce5 100644 --- a/src/catchup.py +++ b/src/catchup.py @@ -19,6 +19,7 @@ PROGRAM_ARGS = None safe_to_post_tweets = True scraper: Scraper + # Updates TTweetQueue async def get_cross_tweets_online(): global safe_to_post_tweets @@ -26,91 +27,101 @@ async def get_cross_tweets_online(): global scraper safe_to_post_tweets = True - dbg_curr_user = '' + dbg_curr_user = "" # Begin getting tweets from online - print('Pulling tweets from online!') + print("Pulling tweets from online!") try: for i, (talent_id, talent_username) in enumerate(talents.items()): - print(f'[{i+1}/{len(talents)}] {talent_username}-----------------------------------') - dbg_curr_user = f'{talent_id}: {talent_username}' + print( + f"[{i+1}/{len(talents)}] {talent_username}-----------------------------------" + ) + dbg_curr_user = f"{talent_id}: {talent_username}" try: since_date = queue.finished_user_dates.get(talent_id, None) - ttweets = scraper.get_cross_ttweets_from_user(talent_username, since_date=since_date) - print(f'got {len(ttweets)} TalentTweets') + ttweets = scraper.get_cross_ttweets_from_user( + talent_username, since_date=since_date + ) + print(f"got {len(ttweets)} TalentTweets") for ttweet in ttweets: - if ttweet.tweet_id not in queue.finished_ttweets \ - and ttweet.is_cross_company(): + if ( + ttweet.tweet_id not in queue.finished_ttweets + and ttweet.is_cross_company() + ): queue.add_ttweet(ttweet) except KeyboardInterrupt as e: raise e except Exception as e: - print('Unhandled error occurred processing tweet data.') + print("Unhandled error occurred processing tweet data.") safe_to_post_tweets = False raise e else: queue.finished_user_dates[talent_id] = get_current_date() queue.save_file() except KeyboardInterrupt as e: - print('Interrupting tweet pulling... NOTE: remaining dates in queue file will not be updated!') + print( + "Interrupting tweet pulling... NOTE: remaining dates in queue file will not be updated!" + ) queue.save_file() raise e except: - print('Unhandled error occurred while pulling tweets.') + print("Unhandled error occurred while pulling tweets.") traceback.print_exc() - with open("error_catchup.txt", "a") as f: - f.write(f'Error getting tweets from user {dbg_curr_user}\n') + with open(working_path(file="error_catchup.txt"), "a") as f: + f.write(f"Error getting tweets from user {dbg_curr_user}\n") traceback.print_exc(file=f) safe_to_post_tweets = False else: - print('Successfully saved all tweets from online!') + print("Successfully saved all tweets from online!") queue.save_file() + # return False = we posted at least one ttweet # return True = we didn't post a single ttweet async def process_queue() -> bool: - ''' + """ Go through the queue and post stored TalentTweets. - ''' + """ global scraper global queue queued_ttweets_count = queue.get_count() - - WAIT_TIME = 60*15 + + WAIT_TIME = 60 * 15 ttweets_posted = 0 if queued_ttweets_count == 0: - print('Posting queue is empty!') + print("Posting queue is empty!") return True - + try: while not queue.is_empty(): ttweet = queue.get_next_ttweet() if ttweet.tweet_id in queue.finished_ttweets: - print('skipping finished tweet...') + print("skipping finished tweet...") queue.good(ttweet.tweet_id) continue tweet_was_successful = await TwAPI.instance.post_ttweet(ttweet) - - print('running queue.good()...') + + print("running queue.good()...") queue.good(ttweet.tweet_id) if tweet_was_successful: ttweets_posted += 1 - print(f'({ttweets_posted}/{queued_ttweets_count}) done') + print(f"({ttweets_posted}/{queued_ttweets_count}) done") if not queue.is_empty(): - print(f'resting for {WAIT_TIME}s...') - await asyncio.sleep(WAIT_TIME-5) - print('5 second warning!') + print(f"resting for {WAIT_TIME}s...") + await asyncio.sleep(WAIT_TIME - 5) + print("5 second warning!") await asyncio.sleep(5) except Exception as e: - print('Unhandled error occurred while posting tweets from queue.') + print("Unhandled error occurred while posting tweets from queue.") traceback.print_exc() if ttweets_posted > 0: return False return True + # return True = no problems # return False = issue occurred where we couldn't post all past tweets properly async def run(PROGRAM_ARGS): @@ -124,63 +135,67 @@ async def run(PROGRAM_ARGS): # post tweets given in command line first if PROGRAM_ARGS.post_id is not None and len(PROGRAM_ARGS.post_id) > 0: PROGRAM_ARGS.post_id.sort() - print('Posting specified tweets first.') + print("Posting specified tweets first.") for id in PROGRAM_ARGS.post_id: try: i = int(id) except ValueError: - print(f'Invalid tweet {id}!') + print(f"Invalid tweet {id}!") continue - + posted = await TwAPI.instance.post_ttweet_by_id(i) if posted: queue.add_finished_tweet(i) - print('Successfully posted tweet. Sleeping for 5 minutes') - await asyncio.sleep(60*5) + print("Successfully posted tweet. Sleeping for 5 minutes") + await asyncio.sleep(60 * 5) else: - print('Did not post tweet') - print('Done processing specified tweets') + print("Did not post tweet") + print("Done processing specified tweets") PROGRAM_ARGS.post_id = None # refresh stored queue first if PROGRAM_ARGS.refresh_queue: PROGRAM_ARGS.refresh_queue = False - print('Refreshing queue tweets...') + print("Refreshing queue tweets...") for id in queue.ttweets_dict: - t = scraper.get_tweet(id, queue.ttweets_dict[id].author_id in privated_accounts) + t = scraper.get_tweet( + id, queue.ttweets_dict[id].author_id in privated_accounts + ) queue.ttweets_dict[id] = tt.TalentTweet.create_from_tweety(t) queue.save_file() async def queue_loop(): while True: - print(f'{queue.get_count()} cross-company tweets to announce.') + print(f"{queue.get_count()} cross-company tweets to announce.") try: if safe_to_post_tweets: if await process_queue(): print("Finished processing queue") return else: - print('Posted no new tweets; we\'re caught up!') + print("Posted no new tweets; we're caught up!") return else: - print('Tweets were not retrieved cleanly. Not processing queue.') + print("Tweets were not retrieved cleanly. Not processing queue.") return except KeyboardInterrupt as e: - print('Interrupting queue processing...') + print("Interrupting queue processing...") raise e except: - print('Unhandled error occurred while running catch up in posting phase.') + print( + "Unhandled error occurred while running catch up in posting phase." + ) traceback.print_exc() await get_cross_tweets_online() try: if PROGRAM_ARGS.straight_to_queue: PROGRAM_ARGS.straight_to_queue = False - print('Processing queue first before fetching tweets...') + print("Processing queue first before fetching tweets...") await queue_loop() else: await get_cross_tweets_online() await queue_loop() except KeyboardInterrupt: - print('Interrupt received. Ending catchup mode...') + print("Interrupt received. Ending catchup mode...") return False diff --git a/src/talent_lists.py b/src/talent_lists.py index 97a73cb..a2a0269 100644 --- a/src/talent_lists.py +++ b/src/talent_lists.py @@ -1,4 +1,4 @@ -from util import get_project_dir +from util import project_root holo_en: dict[int, str] = dict() holo_id: dict[int, str] = dict() @@ -10,22 +10,25 @@ privated_accounts: dict[int, str] = dict() test_talents = dict() + # TODO: talents(id) -> (name, company) def __create_dict(file, _dict, company): - print(f'Initializing talents\' account list from {file}...') + print(f"Initializing talents' account list from {file}...") global talents - with open(file, 'r') as f: + with open(file, "r") as f: for line in f: words = line.split() - if len(words) >= 2 and line[0] != '#': + if len(words) >= 2 and line[0] != "#": t = line.split() id, name = int(t[0]), t[1] # name = f'{util.get_username_online(id, default=name)}' # attempt to get updated name talents[id] = name _dict[id] = name talents_company[id] = company - if len(words) > 2 and words[2] == 'p': + if len(words) > 2 and words[2] == "p": privated_accounts[id] = name + + def init(): global holo_en global holo_id @@ -34,26 +37,30 @@ def init(): global test_talents # holoEN - __create_dict(f'{get_project_dir()}/lists/holoen.txt', holo_en, 'holoEN') + __create_dict(project_root(("lists",), "holoen.txt"), holo_en, "holoEN") # holoID - __create_dict(f'{get_project_dir()}/lists/holoid.txt', holo_id, 'holoID') + __create_dict(project_root(("lists",), "holoid.txt"), holo_id, "holoID") # nijiEN - __create_dict(f'{get_project_dir()}/lists/nijien.txt', niji_en, 'nijiEN') + __create_dict(project_root(("lists",), "nijien.txt"), niji_en, "nijiEN") # nijiexID - __create_dict(f'{get_project_dir()}/lists/nijiexid.txt', niji_exid, 'nijiex\'ID') + __create_dict(project_root(("lists",), "nijiexid.txt"), niji_exid, "nijiex'ID") # TODO: nijiex-KR test_talents = holo_en + def is_niji(id: int) -> bool: return id in niji_en or id in niji_exid + def is_holo(id: int) -> bool: return id in holo_en or id in holo_id + def is_cross_company(id1: int, id2: int): return (is_niji(id1) and is_holo(id2)) or (is_holo(id1) and is_niji(id2)) + # For filtered stream # DEPRECATED: thx elon def get_twitter_rules(): @@ -61,12 +68,12 @@ def get_twitter_rules(): rules = list() names = list(talents.values()) - curr_rule = f'from:{names}' + curr_rule = f"from:{names}" for name in list(talents.values())[1:]: - test_rule = curr_rule + f' OR from:{name}' + test_rule = curr_rule + f" OR from:{name}" if len(test_rule) > 512: rules.append(curr_rule) - curr_rule = f'from:{name}' + curr_rule = f"from:{name}" else: curr_rule = test_rule rules.append(curr_rule) diff --git a/src/ttweetqueue.py b/src/ttweetqueue.py index 7987bdb..2e97433 100644 --- a/src/ttweetqueue.py +++ b/src/ttweetqueue.py @@ -9,15 +9,16 @@ import talenttweet as tt # User timestamps line format: # {user_id} {status_num} {UNIX_timestamp} + class TalentTweetQueue: instance = None - + def __init__(self): TalentTweetQueue.instance = self - self.queue_path = util.get_queue_path() - self.queue_backup_path = util.get_queue_backup_path() - self.current_ttweet_path = f'{util.get_project_dir()}/_current_ttweet.txt' - self.finished_ttweets_path = f'{util.get_project_dir()}/finished_ttweets.txt' + self.queue_path = util.working_path(file="queue.txt") + self.queue_backup_path = util.working_path(file="_queue_backup.txt") + self.current_ttweet_path = util.working_path(file="_current_ttweet.txt") + self.finished_ttweets_path = util.working_path(file="finished_ttweets.txt") self.is_good = True self.__sorted = False self.finished_user_dates: dict[int, str] = dict() @@ -26,58 +27,62 @@ class TalentTweetQueue: ## file check, backup copy if os.path.exists(self.queue_backup_path): - print('Found backup queue! We errored in the previous run.') + print("Found backup queue! We errored in the previous run.") shutil.copyfile(self.queue_backup_path, self.queue_path) elif os.path.exists(self.queue_path): - print('Creating backup queue...') + print("Creating backup queue...") shutil.copyfile(self.queue_path, self.queue_backup_path) ## initialize structures # user timestamps try: - with open(self.queue_path, 'r') as f: + with open(self.queue_path, "r") as f: for line in f: tokens = line.split() - if len(tokens) == 0: continue + if len(tokens) == 0: + continue - if tokens[0][0] != '#': - print(f'Stopped finding user dates at {line}') + if tokens[0][0] != "#": + print(f"Stopped finding user dates at {line}") # reached end of accounts list break - if tokens[2] != '-1': + if tokens[2] != "-1": self.finished_user_dates[int(tokens[1])] = tokens[2] - except: pass + except: + pass # ttweets try: - with open(self.queue_path, 'r') as f: # reset seek head + with open(self.queue_path, "r") as f: # reset seek head # Get existing queued TalentTweets for line in f: tokens = line.split() - if len(tokens) == 0 or tokens[0][0] == '#': + if len(tokens) == 0 or tokens[0][0] == "#": continue ttweet = tt.TalentTweet.deserialize(line) # print(f'{ttweet.tweet_id}:\n{ttweet}') self.ttweets_dict[ttweet.tweet_id] = ttweet - print(f'Found {len(self.finished_user_dates)} scraped accounts and {len(self.ttweets_dict)} tweets in queue.') + print( + f"Found {len(self.finished_user_dates)} scraped accounts and {len(self.ttweets_dict)} tweets in queue." + ) except: traceback.print_exc() pass # unfinished ttweet if os.path.exists(self.current_ttweet_path): - with open(self.current_ttweet_path, 'r') as f: + with open(self.current_ttweet_path, "r") as f: for line in f: if len(line) > 0: ttweet = tt.TalentTweet.deserialize(line) if ttweet.tweet_id in self.ttweets_dict: self.ttweets_dict[ttweet.tweet_id] = ttweet - print(f'adding unfinished tweet {ttweet.tweet_id}') + print(f"adding unfinished tweet {ttweet.tweet_id}") # finished ttweets try: - with open(self.finished_ttweets_path, 'r') as f: + with open(self.finished_ttweets_path, "r") as f: for line in f: self.finished_ttweets.add(int(line)) - except: pass - + except: + pass def is_empty(self): return self.get_count() <= 0 @@ -94,57 +99,59 @@ class TalentTweetQueue: self.__sort_ttweets_dict() key = list(self.ttweets_dict.keys())[0] ttweet = self.ttweets_dict.pop(key) - with open(self.current_ttweet_path, 'w') as f: + with open(self.current_ttweet_path, "w") as f: f.write(ttweet.serialize()) return ttweet - + def get_count(self): return len(self.ttweets_dict) - + ## Call when the TalentTweet retrieved from get_next_ttweet() was # posted successfully. def good(self, tweet_id: int): - try: os.remove(self.current_ttweet_path) - except: pass + try: + os.remove(self.current_ttweet_path) + except: + pass self.add_finished_tweet(tweet_id) self.save_file() self.is_good = True - + # overwrite queue.txt def save_file(self, replace_backup=True): - print('saving queue...', end='') + print("saving queue...", end="") if replace_backup: - print('overwriting backup...', end='') + print("overwriting backup...", end="") shutil.copyfile(self.queue_path, self.queue_backup_path) self.__sort_ttweets_dict() - with open(self.queue_path, 'w') as f: + with open(self.queue_path, "w") as f: # write dates - for (id, date) in self.finished_user_dates.items(): - f.write(f'# {id} {date}\n') + for id, date in self.finished_user_dates.items(): + f.write(f"# {id} {date}\n") - f.write('\n') + f.write("\n") # write sorted ttweets for ttweet in self.ttweets_dict.values(): - f.write(ttweet.serialize() + '\n') - print('done') + f.write(ttweet.serialize() + "\n") + print("done") def add_finished_tweet(self, id): self.finished_ttweets.add(id) - with open(self.finished_ttweets_path, 'a') as f: - f.write(f'{id}\n') - + with open(self.finished_ttweets_path, "a") as f: + f.write(f"{id}\n") + def __sort_ttweets_dict(self): if not self.__sorted: self.ttweets_dict = dict(sorted(self.ttweets_dict.items())) self.__sorted = True - + # destructor def __del__(self): if self.is_good: - print('Ended in good state, deleting backup queue...') + print("Ended in good state, deleting backup queue...") os.remove(self.queue_backup_path) else: - print('Ended in bad state, keeping backup queue.') \ No newline at end of file + print("Ended in bad state, keeping backup queue.") diff --git a/src/twapi.py b/src/twapi.py index 42e52b6..368c328 100644 --- a/src/twapi.py +++ b/src/twapi.py @@ -9,13 +9,14 @@ import talenttweet as tt import talent_lists as tl import util + class TwAPI: tweets_fetched = 0 instance = None - TWEET_MEDIA_FIELDS = ['url'] - TWEET_FIELDS = ['created_at', 'in_reply_to_user_id', 'referenced_tweets'] - TWEET_EXPANSIONS = ['entities.mentions.username', 'referenced_tweets.id.author_id'] - + TWEET_MEDIA_FIELDS = ["url"] + TWEET_FIELDS = ["created_at", "in_reply_to_user_id", "referenced_tweets"] + TWEET_EXPANSIONS = ["entities.mentions.username", "referenced_tweets.id.author_id"] + # Returns a tuple of user IDs:(reply_to, qrt, {mentions}) # for a single tweet. # @@ -35,9 +36,9 @@ class TwAPI: # mentions try: - mention_list = tweet.entities['mentions'] + mention_list = tweet.entities["mentions"] for mention in mention_list: - mentions.add(int(mention['id'])) + mentions.add(int(mention["id"])) except: pass # reply-to @@ -46,17 +47,19 @@ class TwAPI: # qrt if tweet.referenced_tweets: for ref_tweet in tweet.referenced_tweets: - if ref_tweet.type == 'quoted': - for incl_tweet in response.includes['tweets']: + if ref_tweet.type == "quoted": + for incl_tweet in response.includes["tweets"]: if incl_tweet.id == ref_tweet.id: qrt = incl_tweet.author_id try: mentions.remove(reply_to) - except: pass + except: + pass try: mentions.remove(qrt) - except: pass + except: + pass mention_list = list(mentions) for uid in mention_list: @@ -64,39 +67,66 @@ class TwAPI: mentions.remove(uid) if reply_to not in tl.talents.keys(): reply_to = None - + return (mentions, reply_to, qrt) - def __init__(self): - creds = dotenv_values() + creds = dotenv_values(util.working_path(file=".env")) TwAPI.instance = self self.client = tweepy.Client( - consumer_key=creds['app_key'], consumer_secret=creds['app_secret'], - access_token=creds['user_token'], access_token_secret=creds['user_secret'] + consumer_key=creds["app_key"], + consumer_secret=creds["app_secret"], + access_token=creds["user_token"], + access_token_secret=creds["user_secret"], ) self.api = tweepy.API( auth=tweepy.OAuthHandler( - consumer_key=creds['app_key'], consumer_secret=creds['app_secret'], - access_token=creds['user_token'], access_token_secret=creds['user_secret'] + consumer_key=creds["app_key"], + consumer_secret=creds["app_secret"], + access_token=creds["user_token"], + access_token_secret=creds["user_secret"], ) ) - + try: self.me = self.client.get_me().data - print(f'Assuming the account of @{self.me.data["username"]} ({self.me["id"]})') + print( + f'Assuming the account of @{self.me.data["username"]} ({self.me["id"]})' + ) except: pass - async def post_tweet(self, text='', media_ids: list=None, reply_to_tweet: int=None, quote_tweet_id: int=None): + async def post_tweet( + self, + text="", + media_ids: list = None, + reply_to_tweet: int = None, + quote_tweet_id: int = None, + ): try: - tweet = self.client.create_tweet(text=text, media_ids=media_ids, in_reply_to_tweet_id=reply_to_tweet, quote_tweet_id=quote_tweet_id) + tweet = self.client.create_tweet( + text=text, + media_ids=media_ids, + in_reply_to_tweet_id=reply_to_tweet, + quote_tweet_id=quote_tweet_id, + ) return tweet except tweepy.TooManyRequests as e: - wait_for = abs(float(e.response.headers["x-rate-limit-reset"]) - datetime.datetime.now().timestamp()) + 1 - print(f'\thit rate limit: trying again in {wait_for}s...') + wait_for = ( + abs( + float(e.response.headers["x-rate-limit-reset"]) + - datetime.datetime.now().timestamp() + ) + + 1 + ) + print(f"\thit rate limit: trying again in {wait_for}s...") await asyncio.sleep(wait_for) - return await self.post_tweet(text=text, media_ids=media_ids, reply_to_tweet=reply_to_tweet, quote_tweet_id=quote_tweet_id) + return await self.post_tweet( + text=text, + media_ids=media_ids, + reply_to_tweet=reply_to_tweet, + quote_tweet_id=quote_tweet_id, + ) async def get_ttweet_image_media_id(self, ttweet): img = await util.create_ttweet_image(ttweet) @@ -106,43 +136,53 @@ class TwAPI: # return True = successfully posted a single ttweet # return False = did not post ttweet (duplicate) async def post_ttweet(self, ttweet: tt.TalentTweet, dry_run=False): - print(f'------{ttweet.tweet_id} ({util.get_username_local(ttweet.author_id)})------') - + print( + f"------{ttweet.tweet_id} ({util.get_username_local(ttweet.author_id)})------" + ) + text = ttweet.announce_text() ttweet_url = ttweet.url() - - if dry_run: print('-------------------- DRY RUN --------------------') + + if dry_run: + print("-------------------- DRY RUN --------------------") print(ttweet) - if dry_run: return False + if dry_run: + return False # NO DRY-RUN: actually post tweet # main tweet: text + screenshot try: - print('creating main QRT w/ screenshot...') + print("creating main QRT w/ screenshot...") media_ids = [await self.get_ttweet_image_media_id(ttweet)] - twt_resp = await self.post_tweet(text, media_ids=media_ids, quote_tweet_id=ttweet.tweet_id) - print('done') + twt_resp = await self.post_tweet( + text, media_ids=media_ids, quote_tweet_id=ttweet.tweet_id + ) + print("done") except: - print('error occurred trying to create main tweet, falling back to URL-main + reply screencap format') + print( + "error occurred trying to create main tweet, falling back to URL-main + reply screencap format" + ) traceback.print_exc() try: - print('posting main tweet...') + print("posting main tweet...") twt_resp = await self.post_tweet(text, quote_tweet_id=ttweet.tweet_id) - print('done') - twt_id = twt_resp.data['id'] + print("done") + twt_id = twt_resp.data["id"] try: - print('creating reply img...', end='') + print("creating reply img...", end="") media_ids = [await self.get_ttweet_image_media_id(ttweet)] - print('posting reply tweet...', end='') + print("posting reply tweet...", end="") await self.post_tweet(reply_to_tweet=twt_id, media_ids=media_ids) - print('done') + print("done") except: - print('Had trouble posting reply image tweet.') - print('successfully posted ttweet!') + print("Had trouble posting reply image tweet.") + print("successfully posted ttweet!") except tweepy.Forbidden as e: - if 'duplicate content' in e.api_messages[0]: - print('Twitter says the TalentTweet is a duplicate; skipping error-free...') + if "duplicate content" in e.api_messages[0]: + print( + "Twitter says the TalentTweet is a duplicate; skipping error-free..." + ) return False else: raise e @@ -151,17 +191,17 @@ class TwAPI: async def post_ttweet_by_id(self, id: int): from scraper import Scraper - print(f'Manually posting tweet {id}') + print(f"Manually posting tweet {id}") s = Scraper() t = s.get_tweet(id, True) if not t: - print('Tweet could not be retrieved') + print("Tweet could not be retrieved") return False - + ttweet = tt.TalentTweet.create_from_tweety(t) if not ttweet.is_cross_company(): - print(f'{ttweet.username}/{ttweet.tweet_id} is not cross-company!') + print(f"{ttweet.username}/{ttweet.tweet_id} is not cross-company!") return False - - print(f'Posting {ttweet.username}/{ttweet.tweet_id}...') + + print(f"Posting {ttweet.username}/{ttweet.tweet_id}...") return await self.post_ttweet(ttweet) diff --git a/src/util.py b/src/util.py index 5c8a17e..7a0fdf4 100644 --- a/src/util.py +++ b/src/util.py @@ -3,6 +3,7 @@ import os import sys import traceback +from pathlib import Path from datetime import datetime from dotenv import dotenv_values @@ -13,36 +14,52 @@ import tweepy from recrop import fix_aspect_ratio import talent_lists -# returns system path to this project, which is -# up one level from this file's directory (effective path: ..../src/../). -def get_project_dir(): - return os.path.join(os.path.dirname(__file__), os.pardir) -def get_queue_path(): - return f'{get_project_dir()}/queue.txt' +def project_root(dir_path: tuple[str] = tuple(), file: str = None): + """Returns path relative to the project root.""" + dir_path = os.path.join(os.path.dirname(__file__), os.pardir, *dir_path) + Path(dir_path).mkdir(parents=True, exist_ok=True) + + if file is not None: + return os.path.join(dir_path, file) + return os.path.join(dir_path) + + +def working_path(dir_path: tuple[str] = tuple(), file: str = None): + """Returns file path relative to the working ephemeral directory.""" + dir_path = project_root("run", *dir_path) + Path(dir_path).mkdir(parents=True, exist_ok=True) + + if file is not None: + return os.path.join(dir_path, file) + return os.path.join(dir_path) -def get_queue_backup_path(): - return f'{get_project_dir()}/_queue_backup.txt' def clamp(n, smallest, largest): return max(smallest, min(n, largest)) + def datetime_to_tdate(date_time: datetime): return date_time.strftime("%Y-%m-%d") + def tdate_to_datetime(tdate: str): return datetime.strptime("%Y-%m-%d") + def timestamp_to_tdate(timestamp=None): - if timestamp==None: + if timestamp == None: timestamp = datetime.now().timestamp() return datetime_to_tdate(datetime.fromtimestamp(timestamp, tz=pytz.utc)) + def get_current_timestamp(): return datetime.now().timestamp() + def get_current_date(): - return datetime.today().strftime('%Y-%m-%d') + return datetime.today().strftime("%Y-%m-%d") + def get_key_from_value(d: dict, val): keys = [k for k, v in d.items() if v == val] @@ -50,38 +67,43 @@ def get_key_from_value(d: dict, val): return keys[0] return None + # FIXME: web_auth_token under rate-limitation will fail to screenshot async def create_ttweet_image(ttweet): tc = TweetCapture() - auth_token = dotenv_values().get('web_auth_token') + auth_token = dotenv_values(working_path(file=".env")).get("web_auth_token") if auth_token: - tc.cookies = [{'name': 'auth_token', 'value': auth_token}] - if 'linux' in sys.platform: + tc.cookies = [{"name": "auth_token", "value": auth_token}] + if "linux" in sys.platform: # Linux chromedriver path - tc.driver_path = '/usr/bin/chromedriver' - filename = f'{get_project_dir()}/img.png' + tc.driver_path = "/usr/bin/chromedriver" + filename = working_path(file="img.png") img = None - try: os.remove(filename) - except: pass + try: + os.remove(filename) + except: + pass try: img = await tc.screenshot( url=ttweet.url(), path=filename, mode=4, night_mode=1, - show_parent_tweets=True + show_parent_tweets=True, ) img = fix_aspect_ratio(img) except: - print('unable to create tweet image') + print("unable to create tweet image") traceback.print_exc() return None - - print(f'successfully saved {img}') + + print(f"successfully saved {img}") return img + def get_tweet_url(id, username): - return f'https://www.twitter.com/{username}/status/{id}' + return f"https://www.twitter.com/{username}/status/{id}" + ## Attempt to pull username from local; pull from online if doesn't exist. def get_username(id): @@ -90,22 +112,26 @@ def get_username(id): return get_username_online(id) return ret + def get_username_with_company(id): company = talent_lists.talents_company.get(id, None) return f'{get_username(id)} {f"({company})" if company is not None else ""}' + def get_username_local(id: int): - return talent_lists.talents.get(id, f'{id}') + return talent_lists.talents.get(id, f"{id}") + # Retrieve username via API v2 (tweepy) def get_username_online(id, default=None): import twapi + try: resp = twapi.TwAPI.instance.client.get_user(id=id) return resp.data.username except tweepy.TooManyRequests: - return str(default) if default is not None else f'id:{id}' + return str(default) if default is not None else f"id:{id}" except: - print(f'Unhandled error retrieving username for {id}!') + print(f"Unhandled error retrieving username for {id}!") traceback.print_exc() - return str(default) if default is not None else f'id:{id}' \ No newline at end of file + return str(default) if default is not None else f"id:{id}"