diff --git a/.gitignore b/.gitignore index b1c8472..19d9c7c 100644 --- a/.gitignore +++ b/.gitignore @@ -144,4 +144,6 @@ cython_debug/ # project-specific /secrets.ini -/queue.txt \ No newline at end of file +/queue.txt +/img.png +/src/img.png \ No newline at end of file diff --git a/src/catchup.py b/src/catchup.py index 2d26bad..e6100ec 100644 --- a/src/catchup.py +++ b/src/catchup.py @@ -6,9 +6,6 @@ import traceback import datetime -import sys -import os -import asyncio import twint @@ -17,14 +14,16 @@ from talent_lists import * from twapi import TwAPI import talenttweet as tt -def write_user_date(user_id, file, date_str = None, error = False): - if date_str is None: - date_str = util.datetime_to_tdate(datetime.datetime.now()) +PROGRAM_ARGS = None - file.write(f'# {user_id} {date_str if not error else "-1"}\n') +def write_user_timestamp(user_id, file, timestamp = None, error = False): + if timestamp is None: + timestamp = datetime.datetime.now().timestamp() + + file.write(f'# {user_id} {timestamp if not error else "-1"}\n') pass -def get_queue_file(): +def get_queue_path(): return f'{util.get_project_dir()}/queue.txt' def get_local_queue(): @@ -32,7 +31,8 @@ def get_local_queue(): pass ## Returns the ID of all tweets (up to limit) from a user ID. -def get_user_tweets(id, since_date='', limit=None): +def get_user_tweets(id, since_timestamp=None, limit=None): + qrt_count = 0 tweets = list() c = twint.Config() c.User_id = id @@ -40,101 +40,140 @@ def get_user_tweets(id, since_date='', limit=None): c.Store_object = True c.Store_object_tweets_list = tweets c.Hide_output = True - c.Since = since_date + c.Since = '' if since_timestamp == None else util.timestamp_to_tdate(since_timestamp) - user_str = f'{util.get_username(id)}' - print(f'Scraping tweets from {user_str}...') + user_str = f'{util.get_username_local(id)}' + print(f'Scraping tweets from {user_str} since {"forever ago" if c.Since == "" else c.Since}...') try: twint.run.Search(c) except: print(f'Had trouble getting tweets from {user_str}') + + for twt in tweets: + if twt.quote_url != '': + qrt_count += 1 - print(f'Scraped {len(tweets)} tweets') + print(f'Scraped {len(tweets)} tweets, {qrt_count} of which are quote tweets.') return tweets +# Returns dict of accounts that successfully caught up. +# LINE FORMAT: "# {user_id} {status_num} {UNIX_timestamp} +def get_finished_user_timestamps(queue_file): + results = dict() + for line in queue_file: + tokens = line.split() + if len(tokens) != 3 or tokens[0][0] != '#': + # reached end of accounts list + break + + if tokens[2] != '-1': + results[int(tokens[1])] = float(tokens[2]) + return results + +def get_user_timestamps_str(queue_file): + results = str() + for line in queue_file: + tokens = line.split() + if len(tokens) != 3 or tokens[0][0] != '#': + # reached end of accounts list + break + results += f'{line}\n' + return results[:-1] + # If queue.txt doesn't exist, creates and populates it. # Returns a list of sorted and filtered TalentTweets (should # be equivalent to queue.txt) async def get_cross_talent_tweets(queue_path): - finished_user_tdates = dict() + finished_user_timestamps = dict() ttweets_dict = dict() # Populate structures with existing data from queue.txt try: - print('Processing existing data in queue.txt...') with open(queue_path, 'r') as f: - # Check for finished and incomplete accounts - # LINE FORMAT: "# {user_id} {status_num} (TODO: use date of retrival YYYY-MM-DD) - for line in f: - tokens = line.split() - if len(tokens) != 3 or tokens[0][0] != '#': - # reached end of accounts list - break - - if tokens[2] != '-1': - finished_user_tdates[int(tokens[1])] = tokens[2] + finished_user_timestamps = get_finished_user_timestamps(f) - # Add existing serialized TalentTweets into ttweets + # Get existing queued TalentTweets for line in f: tokens = line.split() if len(tokens) == 0 or tokens[0][0] == '#': continue ttweet = tt.TalentTweet.deserialize(line) ttweets_dict[ttweet.tweet_id] = ttweet + print(f'Found {len(finished_user_timestamps)} scraped accounts and {len(ttweets_dict)} tweets.') except FileNotFoundError: - print('Couldn\'t find queue.txt.') + print('queue.txt not found.') # Pull tweets from twint with open(queue_path, 'w') as f: - # for talent_id in talent_lists.talents: - for talent_id in talent_lists.test_talents: - print('using test_talents') - if talent_id not in finished_user_tdates or \ - finished_user_tdates[talent_id] != util.datetime_to_tdate(datetime.datetime.today()): - try: - tweets = get_user_tweets(talent_id, since_date=finished_user_tdates.get(talent_id, None)) - for tweet in tweets: - ttweet = await tt.TalentTweet.create_from_twint_tweet(tweet) - if ttweet.is_cross_company(): - ttweets_dict[ttweet.tweet_id] = ttweet - except: - print('Error occurred processing tweet data. Traceback:') - print(traceback.format_exc()) - write_user_date(user_id=talent_id, file=f, error=True) + print('Pulling tweets from online!') + try: + print('TODO: using test_talents') + for talent_id in talent_lists.test_talents: + # for talent_id in talent_lists.talents: + if talent_id not in finished_user_timestamps or \ + finished_user_timestamps[talent_id] < datetime.datetime.now().timestamp(): + try: + # tweets = get_user_tweets(talent_id, since_timestamp=1663698621) # shorten test runs + tweets = get_user_tweets(talent_id, since_timestamp=finished_user_timestamps.get(talent_id, None)) + for tweet in tweets: + if tweet.id not in ttweets_dict: + ttweet = await tt.TalentTweet.create_from_twint_tweet(tweet) + if ttweet.is_cross_company(): + ttweets_dict[ttweet.tweet_id] = ttweet + except: + print('Error occurred processing tweet data.') + print(traceback.format_exc()) + write_user_timestamp(user_id=talent_id, file=f, error=True) + else: + write_user_timestamp(user_id=talent_id, file=f) else: - write_user_date(user_id=talent_id, file=f) - else: - print(f'Skipping already completed {util.get_username(talent_id)}') - write_user_date(user_id=talent_id, file=f, date_str=finished_user_tdates[talent_id]) - f.write('\n') - ttweets_dict = dict(sorted(ttweets_dict.items())) - for ttweet in ttweets_dict.values(): - f.write(f'{ttweet.serialize()}\n') + print(f'Skipping already completed {util.get_username_local(talent_id)}') + write_user_timestamp(user_id=talent_id, file=f, timestamp=finished_user_timestamps[talent_id]) + f.write('\n') + ttweets_dict = dict(sorted(ttweets_dict.items())) + for ttweet in ttweets_dict.values(): + f.write(f'{ttweet.serialize()}\n') + except: + print('Unhandled error occurred while pulling tweets.') + traceback.print_exc() + print('Saving queue.txt and exiting.') + exit(1) return ttweets_dict -def process_queue(file): - print('TODO: implement process_queue') - # while Queue.txt has lines present - # attempt to deserialize first line of Queue.txt - # exit program if failed, stating error - # while post isn't successful - # attempt to post tweet - # delete serialized line from Queue.txt, save it - # - # we're done! post tweet announcing done with archives - pass +async def process_queue(ttweets_dict: dict): + global PROGRAM_ARGS -async def run(): - # if Queue.txt exists - # work through the tweets in Queue.txt - # else - # look through every talent's tweets, saving only cross-company tweets into a list - # sort the list by tweet_id - # create Queue.txt and save all tweets (serialized) there - # post a tweet announcing archival intent - # work through the tweets in Queue.txt + if len(ttweets_dict) == 0: return + + if PROGRAM_ARGS.announce_catchup: + TwAPI.instance.post_tweet(text=f'Starting to catch-up through {len(ttweets_dict)} logged tweets.') + + try: + while len(ttweets_dict) > 0: + key = list(ttweets_dict.keys())[0] + ttweet = ttweets_dict[key] + await TwAPI.instance.post_ttweet(ttweet) + ttweets_dict.pop(key) + except: + print('Unhandled error occurred while posting tweets from queue.') + traceback.print_exc() + else: + if PROGRAM_ARGS.announce_catchup: + await TwAPI.instance.post_tweet('Finished with catch-up tweets!') - queue_path = get_queue_file() - ttweet_dict = await get_cross_talent_tweets(queue_path) - print(f'got {len(ttweet_dict)} tweets') \ No newline at end of file + print('Updating what\'s left in ttweet_dict to queue.txt.') + with open(get_queue_path(), 'r') as f: + user_timestamps_str = get_user_timestamps_str(f) + with open(get_queue_path(), 'w') as f: + f.write(user_timestamps_str + '\n\n') + for ttweet in ttweets_dict.values(): + f.write(f'{ttweet.serialize()}\n') + +async def run(program_args): + global PROGRAM_ARGS + PROGRAM_ARGS = program_args + queue_path = get_queue_path() + ttweets_dict = await get_cross_talent_tweets(queue_path) + print(f'got {len(ttweets_dict)} tweets') + await process_queue(ttweets_dict) \ No newline at end of file diff --git a/src/main.py b/src/main.py index d8c3c1d..0537ffe 100644 --- a/src/main.py +++ b/src/main.py @@ -11,6 +11,8 @@ import catchup import listen from twapi import TwAPI +PROGRAM_ARGS = None + MODES_HELP_STR = '''mode to run the bot at: l,listen: listen for new tweets from all accounts; will not terminate unless error occurs c,catchup: scan all tweets from all accounts; will terminate when done''' @@ -20,24 +22,44 @@ def init_argparse(): p.add_argument('mode', nargs='?', \ help=MODES_HELP_STR) p.add_argument('--show-tokens', action='store_true', help='[DO NOT USE IN PUBLIC SETTING] print stored tokens from secrets.ini') + p.add_argument('--announce-catchup', action='store_true', help='In catch-up mode, post a tweet announcing catch-up mode.') return p -# TODO: implement command line mode for manually controlling the bot def command_line(): + # TODO: implement command line mode for manually controlling the bot pass -async def main(): +async def async_main(): + global PROGRAM_ARGS + + ## Determine running mode + match PROGRAM_ARGS.mode.lower(): + case 'l' | 'listen': + print('RUNNING IN LISTEN MODE\n') + listen.run() + case 'c' | 'catchup': + print('RUNNING IN CATCH-UP MODE\n') + await catchup.run(PROGRAM_ARGS) + case _: + command_line() + #TODO: remove message + print('\ninvalid mode. run with no arguments or "-h" for help page, including mode list.') + return + +def main(): + global PROGRAM_ARGS + parser = init_argparse() if len(sys.argv) < 2: parser.print_help() return - args = parser.parse_args() + PROGRAM_ARGS = parser.parse_args() - if args.show_tokens: + if PROGRAM_ARGS.show_tokens: print(api_secrets.get_all_secrets()) - if args.mode is None: return + if PROGRAM_ARGS.mode is None: return ## We expect to run in some mode now. @@ -47,21 +69,10 @@ async def main(): # Initialize talent account lists talent_lists.init() - ## Determine running mode - match args.mode.lower(): - case 'l' | 'listen': - print('RUNNING IN LISTEN MODE\n') - listen.run() - case 'c' | 'catchup': - print('RUNNING IN CATCH-UP MODE\n') - await catchup.run() - case _: - command_line() - #TODO: remove message - print('\ninvalid mode. run with no arguments or "-h" for help page, including mode list.') - return + ## Asynchronous execution + nest_asyncio.apply() + asyncio.run(async_main()) if __name__ == "__main__": - nest_asyncio.apply() - asyncio.run(main()) \ No newline at end of file + main() \ No newline at end of file diff --git a/src/talenttweet.py b/src/talenttweet.py index 22451e6..b8a3ddd 100644 --- a/src/talenttweet.py +++ b/src/talenttweet.py @@ -1,8 +1,7 @@ -from datetime import datetime +import datetime import platform import pytz -import twint from twapi import * import talent_lists @@ -15,7 +14,7 @@ class TalentTweet: raise ValueError('not enough tokens to reconstruct a TalentTweet') tweet_id, author_id = int(tokens[0]), int(tokens[1]) - date_time = datetime.fromtimestamp(float(tokens[2]), tz=pytz.utc) + date_time = datetime.datetime.fromtimestamp(float(tokens[2]), tz=pytz.utc) mentions = set() reply_to = None @@ -44,29 +43,33 @@ class TalentTweet: @staticmethod async def create_from_twint_tweet(tweet): - # qrt - # -- COMMENTED OUT FOR TESTING PURPOSES -- - # TODO: uncomment - # if tweet.quote_url != '': - # api_ttweet = await TalentTweet.create_from_id(tweet.id) - # return api_ttweet - - # MRQ (Q is guaranteed to be None) + # MRQ mentions = set() reply_to = None - + quoted_id = None + # reply_to/mentions is_reply = tweet.id != int(tweet.conversation_id) mentions = set([x['id'] for x in tweet.mentions]) - if is_reply and len(tweet.reply_to) > 0: # FIXME: QRT = is_reply and len(tweet.reply_to) == 0? - reply_to = tweet.reply_to[0]['id'] + if is_reply and len(tweet.reply_to) > 0: + reply_to = tweet.reply_to[0]['id'] # FIXME: QRT = is_reply and len(tweet.reply_to) == 0? reply_others = [x['id'] for x in tweet.reply_to[1:]] mentions.update(reply_others) try: mentions.remove(reply_to) except: pass - date_time = datetime.strptime(tweet.datetime, '%Y-%m-%d %H:%M:%S %Z') - return TalentTweet(tweet_id=tweet.id, author_id=tweet.user_id, date_time=date_time, mrq=(mentions, reply_to, None)) + # qrt + if type(tweet.quote_url) == str: + # print(f'url: {tweet.quote_url} ({type(tweet.quote_url)})') + quote_tokens = tweet.quote_url.split('/') + if len(quote_tokens) >= 2: + quoted_username = quote_tokens[-2] + quoted_id = util.get_user_id_local(quoted_username) + if quoted_id == -1: + quoted_id = util.get_user_id_online(quoted_username) + + date_time = datetime.datetime.strptime(tweet.datetime, '%Y-%m-%d %H:%M:%S %Z') + return TalentTweet(tweet_id=tweet.id, author_id=tweet.user_id, date_time=date_time, mrq=(mentions, reply_to, quoted_id)) @staticmethod @@ -82,7 +85,7 @@ class TalentTweet: mrq=mrq ) - def __init__(self, tweet_id: int, author_id: int,date_time: datetime, mrq: tuple): + def __init__(self, tweet_id: int, author_id: int,date_time: datetime.datetime, mrq: tuple): self.tweet_id, self.author_id = tweet_id, author_id self.date_time = date_time self.mentions = tuple(int(x) for x in mrq[0]) @@ -101,7 +104,7 @@ class TalentTweet: def __repr__(self) -> str: return ( - f'{self.tweet_id} from {util.get_username(self.author_id)}):\n' + f'{self.tweet_id} from {util.get_username_local(self.author_id)}):\n' f'{self.get_datetime_str()}\n' f'{self.get_all_parties_usernames()}\n' f'mentions: {self.mentions}\n' @@ -146,7 +149,7 @@ class TalentTweet: if len(self.all_parties) > 0: s = str() for id in self.all_parties: - s += f'{util.get_username(id)}, ' + s += f'{util.get_username_local(id)}, ' return s[0:-2] return 'none' diff --git a/src/twapi.py b/src/twapi.py index 7acedb7..20f5ffb 100644 --- a/src/twapi.py +++ b/src/twapi.py @@ -1,6 +1,5 @@ import asyncio -from math import inf -from time import time +import datetime import tweepy from tweetcapture import TweetCapture @@ -10,6 +9,7 @@ import talenttweet as tt import util class TwAPI: + tweets_fetched = 0 instance = None TWEET_MEDIA_FIELDS = ['url'] TWEET_FIELDS = ['created_at', 'in_reply_to_user_id'] @@ -61,24 +61,74 @@ class TwAPI: consumer_key=api_secrets.api_key(), consumer_secret=api_secrets.api_secret(), access_token=api_secrets.access_token(), access_token_secret=api_secrets.access_secret() ) + self.api = tweepy.API( + auth=tweepy.OAuthHandler( + consumer_key=api_secrets.api_key(), consumer_secret=api_secrets.api_secret(), + access_token=api_secrets.access_token(), access_token_secret=api_secrets.access_secret() + ) + ) async def get_tweet_response(self, id, attempt = 0): try: - return TwAPI.instance.client.get_tweet( + twt = TwAPI.instance.client.get_tweet( id, media_fields=TwAPI.TWEET_MEDIA_FIELDS, tweet_fields=TwAPI.TWEET_FIELDS, expansions=TwAPI.TWEET_EXPANSIONS ) - except tweepy.TooManyRequests: - print(f'[{attempt}]get_tweet_response({id}):\n\ttoo many API requests -- trying again in 1 minute...') - await asyncio.sleep(60) + TwAPI.tweets_fetched += 1 + return twt + except tweepy.TooManyRequests as e: + wait_for = float(e.response.headers["x-rate-limit-reset"]) - datetime.datetime.now().timestamp() + 1 + print(f'[{attempt}]\tget_tweet_response({id}):\n\thit rate limit after {TwAPI.tweets_fetched} fetches -- trying again in {wait_for} seconds...') + await asyncio.sleep(wait_for) return await self.get_tweet_response(id, attempt=attempt+1) - # Create a post that showcases given tweet and its mentions set. - # Try do do this without retireving Tweet data. - async def create_post(self, ttweet): + async def post_tweet(self, text, media_id=None, reply_to_tweet: int=None): + try: + tweet = self.client.create_tweet(text=text, media_ids=None if media_id == None else [media_id], in_reply_to_tweet_id=reply_to_tweet) + return tweet + except tweepy.TooManyRequests as e: + wait_for = float(e.response.headers["x-rate-limit-reset"]) - datetime.datetime.now().timestamp() + 1 + print(f'\thit rate limit -- attempting to create Tweet again in {wait_for} seconds...') + await asyncio.sleep(wait_for) + return await self.post_tweet(text=text, media_ids=[media_id]) + + async def get_ttweet_image_media_id(self, ttweet): img = await util.create_ttweet_image(ttweet) + media = self.api.media_upload(img) + return media.media_id + + async def post_ttweet(self, ttweet: tt.TalentTweet): + REPLY = '{0} replied to {1}!\n' + MENTION = '{0} mentioned {1}!\n' + QUOTE_TWEET = '{0} quote tweeted {1}!\n' + + def create_text(): + if ttweet.reply_to is not None: + author_username = f'@/{util.get_username_online(ttweet.author_id)}' + reply_username = f'@/{util.get_username_online(ttweet.reply_to)}' + mention_ids = set(ttweet.mentions) + mention_ids.add(ttweet.quote_retweeted) + try: mention_ids.remove(None) + except: pass + mention_usernames = [f'@/{util.get_username_online(x)}' for x in mention_ids] + + ret = REPLY.format(author_username, reply_username) + ret += ( + 'mentions ' + f'{" ".join(mention_usernames)}' + f'\n{util.ttweet_to_url(ttweet)}' + ) + return ret + + img_media_id_task = asyncio.create_task(self.get_ttweet_image_media_id(ttweet)) + text = create_text() + media_id = await img_media_id_task + twt_resp = await self.post_tweet(text) + twt_id = twt_resp.data['id'] + await self.post_tweet(text='Image backup', reply_to_tweet=twt_id, media_id=media_id,) + diff --git a/src/util.py b/src/util.py index 455a5f4..d6bee62 100644 --- a/src/util.py +++ b/src/util.py @@ -3,11 +3,11 @@ import datetime import os +import pytz import twint from tweetcapture import TweetCapture import talent_lists -import talenttweet as tt # returns system path to this project, which is # up one level from this file's directory (effective path: ..../src/../). @@ -23,6 +23,17 @@ def datetime_to_tdate(date_time: datetime.datetime): def tdate_to_datetime(tdate: str): return datetime.datetime.strptime("%Y-%m-%d") +def timestamp_to_tdate(timestamp=None): + if timestamp==None: + timestamp = datetime.datetime.now().timestamp() + return datetime_to_tdate(datetime.datetime.fromtimestamp(timestamp, tz=pytz.utc)) + +def get_key_from_value(d, val): + keys = [k for k, v in d.items() if v == val] + if keys: + return keys[0] + return None + async def create_ttweet_image(ttweet): tc = TweetCapture() filename = 'img.png' @@ -46,10 +57,10 @@ async def create_ttweet_image(ttweet): return img def ttweet_to_url(ttweet): - username = get_username(ttweet.author_id) + username = get_username_online(ttweet.author_id) return f'https://twitter.com/{username}/status/{ttweet.tweet_id}' -def get_username(user_id): +def get_username_local(user_id): return talent_lists.talents.get(user_id, f'#{id}') def get_username_online(user_id): @@ -63,4 +74,23 @@ def get_username_online(user_id): user = twint.output.users_list[0] return user.username except: - return f'#{user_id}' \ No newline at end of file + return f'#{user_id}' + +def get_user_id_local(username) -> int: + talent_usernames = list(talent_lists.talents.values()) + for i in range(0, len(talent_usernames)): + if username.lower() == talent_usernames[i].lower(): + return list(talent_lists.talents)[i] + +def get_user_id_online(username) -> int: + c = twint.Config() + c.Username = username + c.Store_object = True + c.Hide_output = True + try: + twint.output.users_list.clear() + twint.run.Lookup(c) + user = twint.output.users_list[0] + return user.id + except: + return -1