From 3c93ca18c2af4aaff1ce8eb6e5930b94c1e52772 Mon Sep 17 00:00:00 2001 From: muskit <15199219+muskit@users.noreply.github.com> Date: Thu, 17 Aug 2023 02:28:29 -0700 Subject: [PATCH] (de)serialization and queuing works now --- .gitignore | 2 +- src/account_pool.py | 31 ++++++++++++++++++ src/api_secrets.py | 42 ------------------------ src/catchup.py | 60 ++++++++++++---------------------- src/listen.py | 55 +++---------------------------- src/main.py | 10 +++--- src/scraper.py | 79 ++++++++++++++++++++++++++++++++++----------- src/talenttweet.py | 26 ++++++++------- src/ttweetqueue.py | 6 +++- src/twapi.py | 40 +++++++++++------------ 10 files changed, 163 insertions(+), 188 deletions(-) create mode 100644 src/account_pool.py delete mode 100644 src/api_secrets.py diff --git a/.gitignore b/.gitignore index ac961e8..a0e4b05 100644 --- a/.gitignore +++ b/.gitignore @@ -144,4 +144,4 @@ cython_debug/ # project-specific (secret.ini: can't ignore existing file?) *.png -session.json \ No newline at end of file +*.json \ No newline at end of file diff --git a/src/account_pool.py b/src/account_pool.py new file mode 100644 index 0000000..1a655d7 --- /dev/null +++ b/src/account_pool.py @@ -0,0 +1,31 @@ +from dotenv import dotenv_values + +## Track multiple accounts in a pool, cycling to the next one when requested. +class AccountPool: + def __init__(self): + self.__accounts: list[tuple[str, str]] = list() + self.__idx = -1 + creds = dotenv_values() + i = 0 + while True: + if f'scraper_username{i}' in creds \ + and f'scraper_password{i}' in creds: + self.__accounts.append(( + creds[f'scraper_username{i}'], + creds[f'scraper_password{i}'] + )) + i += 1 + else: + break + + def current(self): + if 0 <= self.__idx < len(self.__accounts): + return self.__accounts[self.__idx] + return None + + def next(self) -> tuple[str, str] | None: + self.__idx += 1 + if self.__idx >= len(self.__accounts): + self.__idx = -1 + return None + return self.current() diff --git a/src/api_secrets.py b/src/api_secrets.py deleted file mode 100644 index ca13bda..0000000 --- a/src/api_secrets.py +++ /dev/null @@ -1,42 +0,0 @@ -## Twitter developer credentials management. - -from os.path import join, isfile -from dotenv import dotenv_values - -import util - -# returns dictionary of the Credentials section. -# [NOT TO BE USED OUTSIDE OF THIS FILE.] -def __get_env(): - f = join(util.get_project_dir(), '.env') - if isfile(f): - return dotenv_values(f) - return None - -# returns the consumer api_key stored in secrets.ini -def api_key(): - c = __get_env() - return c.get(option='api_key', fallback='xxx') if c is not None else 'xxx' - -# returns the consumer api_secret stored in secrets.ini -def api_secret(): - c = __get_env() - return c.get(option='api_secret', fallback='yyy') if c is not None else 'yyy' - -# returns the bearer_token stored in secrets.ini -def bearer_token(): - c = __get_env() - return c.get(option='bearer_token', fallback='zzz') if c is not None else 'zzz' - -# returns the access_token stroed in secrets.ini -def access_token(): - c = __get_env() - return c.get(option='oauth1_access_token', fallback='zzz') if c is not None else 'aaa' - -# returns the access_secret stroed in secrets.ini -def access_secret(): - c = __get_env() - return c.get(option='oauth1_access_secret', fallback='zzz') if c is not None else 'bbb' - -def get_all_secrets(): - return f'api_key:{api_key()}\napi_secret:{api_secret()}\nbearer_token:{bearer_token()}\naccess_token:{access_token()}\naccess_secret:{access_secret()}' \ No newline at end of file diff --git a/src/catchup.py b/src/catchup.py index 759cf22..ffa638b 100644 --- a/src/catchup.py +++ b/src/catchup.py @@ -8,9 +8,9 @@ import traceback import datetime import asyncio import shutil +from datetime import datetime -import twint - +from scraper import Scraper from util import * from talent_lists import * from twapi import TwAPI @@ -21,41 +21,12 @@ PROGRAM_ARGS = None safe_to_post_tweets = False errored = False -## Returns the ID of all tweets (up to limit) from a user ID. -def get_user_tweets(id, since_date=None, limit=None): - global safe_to_post_tweets - - qrt_count = 0 - tweets = list() - c = twint.Config() - c.User_id = id - c.Limit = limit - c.Store_object = True - c.Store_object_tweets_list = tweets - c.Hide_output = True - c.Since = '' if since_date == None else f'{since_date} 00:00:00' - - user_str = f'@{util.get_username_local(id)}' - print(f'Scraping tweets from {user_str} since {"forever ago" if c.Since == "" else c.Since}...') - try: - twint.run.Search(c) - except: - print(f'Had trouble getting tweets from {user_str}') - safe_to_post_tweets = False - traceback.print_exc() - - for twt in tweets: - if type(twt.quote_url) is str and twt.quote_url != '': - qrt_count += 1 - - print(f'Scraped {len(tweets)} tweets, {qrt_count} of which are quote tweets.') - return tweets - # Returns a list of sorted and filtered TalentTweets (should # be equivalent to queue.txt) async def get_cross_talent_tweets(): global safe_to_post_tweets + scraper = Scraper() queue = ttq.TalentTweetQueue.instance # Begin getting tweets from online @@ -64,19 +35,28 @@ async def get_cross_talent_tweets(): for i, (talent_id, talent_username) in enumerate(talent_lists.talents.items()): print(f'[{i+1}/{len(talent_lists.talents)}] {talent_username}-----------------------------------') try: - tweets = get_user_tweets(talent_id, since_date=queue.finished_user_dates.get(talent_id, None)) - for tweet in tweets: - if tweet.id not in queue.ttweets_dict and tweet.id not in queue.finished_ttweets: - ttweet = await tt.TalentTweet.create_from_twint_tweet(tweet) - if ttweet.is_cross_company(): - queue.add_ttweet(ttweet) + # tweets = get_user_tweets(talent_id, since_date=queue.finished_user_dates.get(talent_id, None)) + since_date = queue.finished_user_dates.get(talent_id, None) + ttweets = scraper.get_cross_ttweets_from_user(talent_username, since_date=since_date) + for ttweet in ttweets: + if ttweet.tweet_id not in queue.ttweets_dict \ + and ttweet.tweet_id not in queue.finished_ttweets \ + and ttweet.is_cross_company(): + queue.add_ttweet(ttweet) + except KeyboardInterrupt as e: + raise e except: print('Error occurred processing tweet data.') safe_to_post_tweets = False - print(traceback.format_exc()) - queue.finished_user_dates[talent_id] = '2000-01-01' + traceback.print_exc() + if talent_id not in queue.finished_user_dates: + queue.finished_user_dates[talent_id] = '2023-04-26' # date is when bot token first got revoked else: queue.finished_user_dates[talent_id] = util.get_current_date() + queue.save_file() + except KeyboardInterrupt: + print('Interrupting tweet pulling... NOTE: remaining dates in queue file will not be updated!') + queue.save_file() except: print('Unhandled error occurred while pulling tweets.') traceback.print_exc() diff --git a/src/listen.py b/src/listen.py index 58273d0..e1ccfd1 100644 --- a/src/listen.py +++ b/src/listen.py @@ -1,66 +1,21 @@ ## The bot's listen mode # Continuously listen for cross-company interactions. +from time import sleep import asyncio import traceback -import tweepy -from talenttweet import TalentTweet -from twapi import TwAPI -import ttweetqueue as ttq -import api_secrets -import talent_lists as tl -import util +import catchup errors_encountered = 0 -def on_response(resp): - ttweet = TalentTweet.create_from_v2api_response(resp) - if ttweet is None: - print('Couldn\'t create ttweet from the response:') - print(resp) - return - - tweet_username = util.get_username(ttweet.author_id) - - if ttweet.is_cross_company(): - print(f'Tweet {ttweet.tweet_id} is cross-company! Creating post...') - is_successful = asyncio.run(TwAPI.instance.post_ttweet(ttweet)) - if is_successful: - ttq.TalentTweetQueue.instance.add_finished_tweet(ttweet.tweet_id) - else: - print(f'[WARNING] Failed to post ttweet for {tweet_username}/{ttweet.tweet_id}!') - else: - print(f'Tweet {tweet_username}/{ttweet.tweet_id} is not cross-company.') - def run(): global errors_encountered while True: try: - sc = tweepy.StreamingClient(api_secrets.bearer_token()) - - # clear rules - print('Clearing streaming rules...') - rules_resp = sc.get_rules() - if rules_resp.data: - print('Deleted a rule!') - sc.delete_rules(rules_resp.data) - - # create new rules - print('Creating new streaming rules...') - for rule in tl.get_twitter_rules(): - sc.add_rules(tweepy.StreamRule(rule)) - print('--------------------------------------------') - print(sc.get_rules().data) - print('--------------------------------------------') - - sc.on_response=on_response - print('Starting listening stream...') - sc.filter( - expansions=TwAPI.TWEET_EXPANSIONS, - media_fields=TwAPI.TWEET_MEDIA_FIELDS, - tweet_fields=TwAPI.TWEET_FIELDS - ) + asyncio.run(catchup.run()) + print('Sleeping for 30 minutes...') + sleep(1800) # run every half-hour except KeyboardInterrupt: print('Interrupt signal received. Exiting listen mode.') print(f'{errors_encountered} errors encountered throughout session.') diff --git a/src/main.py b/src/main.py index 01eba94..ed60f60 100644 --- a/src/main.py +++ b/src/main.py @@ -8,7 +8,6 @@ import nest_asyncio import talent_lists import ttweetqueue as ttq -import api_secrets import catchup import listen from twapi import TwAPI @@ -52,6 +51,8 @@ async def async_main(): await self_destruct() elif mode == 'cmd': command_line() + elif mode in ['l', 'listen']: + listen.run() else: print('\nunknown mode. run with no arguments or -h for help and modes') @@ -59,9 +60,9 @@ def main(): global PROGRAM_ARGS parser = init_argparse() - # if len(sys.argv) < 2: - # parser.print_help() - # return + if len(sys.argv) < 2: + parser.print_help() + return PROGRAM_ARGS = parser.parse_args() @@ -77,6 +78,7 @@ def main(): ttq.TalentTweetQueue() ## Asynchronous execution + print('beginning async main') nest_asyncio.apply() asyncio.run(async_main()) diff --git a/src/scraper.py b/src/scraper.py index 9e77dc9..b1dc654 100644 --- a/src/scraper.py +++ b/src/scraper.py @@ -2,7 +2,6 @@ from os.path import exists from time import sleep from datetime import datetime, timedelta -from dotenv import dotenv_values import pytz from tweety import Twitter @@ -10,18 +9,33 @@ from tweety.types import * from tweety.exceptions_ import * from tweety.filters import SearchFilters +from account_pool import AccountPool from tweety_utils import * from talenttweet import * import talent_lists class Scraper: def __init__(self): - creds = dotenv_values() - self.app = Twitter("session") - # if exists("session.json"): - # self.app.connect() - # else: - self.app.sign_in(creds["scraper_username"], creds["scraper_password"]) + Scraper.instance = self + self.__account = AccountPool() + self.try_login() + + def try_login(self) -> bool: + acc = self.__account.next() + if acc is not None: + name = acc[0] + print(f"using {name}") + self.app = Twitter(name) + if exists(f"{name}.json"): + try: + self.app.connect() + except: + self.app.sign_in(*acc) + else: + self.app.sign_in(*acc) + return True + print('exhausted all accounts!') + return False # since MUST BE TIMEZONE AWARE # usage example: since=datetime(2023, 8, 1).replace(tzinfo=pytz.utc) @@ -43,11 +57,29 @@ class Scraper: # malformed tweet check nonlocal reached_backdate try: - tweet.author - except AttributeError: - print("skipping malformed tweet: {tweet}") + tweet.author.id + except: + print(f"skipping malformed tweet: {tweet}") return + # recover lost info + if tweet.is_retweet: + if tweet.retweeted_tweet is None: + print(f'{tweet.author.username}/{tweet.id} is missing the RT! Recovering...') + tweet.retweeted_tweet = self.app.tweet_detail(str(tweet.id)).retweeted_tweet + if tweet.retweeted_tweet.author is None: + print(f'WARNING: {tweet.author.username}/{tweet.id} is missing the RT author! Recovering details...') + tweet.retweeted_tweet = self.app.tweet_detail(tweet.retweeted_tweet.id) + + if tweet.is_quoted: + if tweet.quoted_tweet is None: # quoted tweet is deleted + # print(f'{tweet.author.username}/{tweet.id} is missing the QRT! Recovering...') + # tweet.quoted_tweet = self.app.tweet_detail(str(tweet.id)).quoted_tweet + tweet.is_quoted = False + elif tweet.quoted_tweet.author is None: + print(f'WARNING: {tweet.author.username}/{tweet.id} is missing the QRT author! Recovering details...') + tweet.quoted_tweet= self.app.tweet_detail(tweet.quoted_tweet.id) + # fix reply if it exists # if tweet.is_reply and tweet.replied_to is None: # tweet.replied_to = self.app.tweet_detail(tweet._original_tweet['in_reply_to_status_id_str']) @@ -72,22 +104,30 @@ class Scraper: elif isinstance(e, TweetThread): # FIXME: rework when replied_to is fixed (currently populates user_mentions) # latest tweet in thread = og author's reply - add_tweet(e[0]) for t in e: add_tweet(t) cur = search.cursor except UnknownError: print("UnknownError occurred, probably rate-limited") - print("sleeping for 1 minute...") - sleep(60) + # traceback.print_exc() + if not self.try_login(): + print("sleeping for 1 minute...") + sleep(60) + print() + self.try_login() tweets.sort(key=lambda t: t.id) return tweets - def get_cross_ttweets_from_user(self, username: str, since: datetime = None) -> list[TalentTweet]: + def get_cross_ttweets_from_user(self, username: str, since_date: str = None) -> list[TalentTweet]: + if since_date is not None: + d = since_date.split('-') + since = datetime(*[int(x) for x in d]).replace(tzinfo=pytz.utc) + else: + since = None tweets = self.get_tweets_from_user(username, since) - print_tweets(tweets) + # print_tweets(tweets) ret: list[TalentTweet] = [] for t in tweets: tt = TalentTweet.create_from_tweety(t) @@ -95,7 +135,8 @@ class Scraper: ret.append(tt) return ret -talent_lists.init() -s = Scraper() -ttweets = s.get_cross_ttweets_from_user("pomurainpuff", since=datetime(2023, 7, 30).replace(tzinfo=pytz.utc)) -print("\n".join([x.__repr__() for x in ttweets])) \ No newline at end of file +if __name__== '__main__': + talent_lists.init() + s = Scraper() + ttweets = s.get_cross_ttweets_from_user("pomurainpuff", since=datetime(2023, 7, 30).replace(tzinfo=pytz.utc)) + print("\n".join([x.__repr__() for x in ttweets])) \ No newline at end of file diff --git a/src/talenttweet.py b/src/talenttweet.py index 969d961..d7a8777 100644 --- a/src/talenttweet.py +++ b/src/talenttweet.py @@ -38,10 +38,12 @@ class TalentTweet: @staticmethod def deserialize(serialized_str: str): - tokens = serialized_str.split('#')[0] - if len(tokens) < 3: + token_check = serialized_str.split('#')[0] + if len(token_check) < 3: raise ValueError('not enough tokens to reconstruct a TalentTweet') + tokens = serialized_str.split() + tweet_id, author_id = int(tokens[0]), int(tokens[1]) date_time = datetime.fromtimestamp(float(tokens[2]), tz=pytz.utc) @@ -59,7 +61,7 @@ class TalentTweet: if tokens[i].isnumeric(): if mode == 'm': # mentions - mentions.add(int(tokens[i])) + mentions.append(int(tokens[i])) continue if mode == 'r': # reply_to reply_to = int(tokens[i]) @@ -69,7 +71,7 @@ class TalentTweet: if mode == 'rt': # retweeted user rt = int(tokens[i]) if mode == 'rtm': # retweet/qrt mentions - rtm = int(tokens[i]) + rtm.append(int(tokens[i])) return TalentTweet( tweet_id=tweet_id, author_id=author_id, @@ -168,15 +170,20 @@ class TalentTweet: def announce_text(self, is_catchup=False): # templates REPLY = '{0} replied to {1}!' - TWEET = '{0} tweeted!' + TWEET = '{0} tweeted mentioning {1}!' RETWEET = '{0} retweeted {1}!' - RETWEET_MENTIONS_B = '{0} shared a tweet mentioning{1}!' + RETWEET_MENTIONS_B = '{0} shared a tweet mentioning {1}!' QUOTE_TWEET = '{0} quote tweeted {1}!' QUOTE_TWEET_MENTIONS_B = '{0} quoted a tweet mentioning {1}!' author_username = f'@/{util.get_username_with_company(self.author_id)}' ret = str() + print_mention_ids = set(self.mentions) + try: print_mention_ids.remove(None) + except: pass + mention_usernames = [f'@/{util.get_username_with_company(x)}' for x in print_mention_ids] + if is_catchup: ret += f'{self.get_datetime_str()}\n' pass @@ -198,16 +205,13 @@ class TalentTweet: else: ret += QUOTE_TWEET.format(author_username, quoted_username) elif len(self.mentions) > 0: # standalone tweet - ret += TWEET.format(author_username) + ret += TWEET.format(author_username, ", ".join(mention_usernames)) + return ret else: raise ValueError(f'TalentTweet {self.tweet_id} has insufficient other parties') - try: print_mention_ids.remove(None) - except: pass - # mention line if len(print_mention_ids) > 0: - mention_usernames = [f'@/{util.get_username_with_company(x)}' for x in print_mention_ids] ret += ( '\nMentioning ' f'{", ".join(mention_usernames)}' diff --git a/src/ttweetqueue.py b/src/ttweetqueue.py index f297491..a602e1f 100644 --- a/src/ttweetqueue.py +++ b/src/ttweetqueue.py @@ -1,6 +1,7 @@ # TODO: move queue structures and file handling here import os import shutil +import traceback import util import talenttweet as tt @@ -55,9 +56,12 @@ class TalentTweetQueue: if len(tokens) == 0 or tokens[0][0] == '#': continue ttweet = tt.TalentTweet.deserialize(line) + # print(f'{ttweet.tweet_id}:\n{ttweet}') self.ttweets_dict[ttweet.tweet_id] = ttweet print(f'Found {len(self.finished_user_dates)} scraped accounts and {len(self.ttweets_dict)} tweets in queue.') - except: pass + except: + traceback.print_exc() + pass # finished ttweets try: with open(self.finished_ttweets_path, 'r') as f: diff --git a/src/twapi.py b/src/twapi.py index c7f9e6f..9ea4126 100644 --- a/src/twapi.py +++ b/src/twapi.py @@ -2,9 +2,9 @@ import datetime import traceback import asyncio +from dotenv import dotenv_values import tweepy -import api_secrets import talenttweet as tt import talent_lists as tl import ttweetqueue as ttq @@ -70,24 +70,25 @@ class TwAPI: def __init__(self): + creds = dotenv_values() TwAPI.instance = self self.client = tweepy.Client( - bearer_token=api_secrets.bearer_token(), - consumer_key=api_secrets.api_key(), consumer_secret=api_secrets.api_secret(), - access_token=api_secrets.access_token(), access_token_secret=api_secrets.access_secret() + consumer_key=creds['app_key'], consumer_secret=creds['app_secret'], + access_token=creds['user_token'], access_token_secret=creds['user_secret'] ) - self.api = tweepy.API( - auth=tweepy.OAuthHandler( - consumer_key=api_secrets.api_key(), consumer_secret=api_secrets.api_secret(), - access_token=api_secrets.access_token(), access_token_secret=api_secrets.access_secret() - ) - ) - try: - self.me = self.client.get_me().data - except Exception as e: - print('Did you setup secrets.ini?') - raise e - print(f'Assuming the account of @{self.me.data["username"]} ({self.me["id"]})') + # self.api = tweepy.API( + # auth=tweepy.OAuthHandler( + # consumer_key=api_secrets.api_key(), consumer_secret=api_secrets.api_secret(), + # access_token=api_secrets.access_token(), access_token_secret=api_secrets.access_secret() + # ) + # ) + + # try: + # self.me = self.client.get_me(wait_on_rate_limit=True).data + # except Exception as e: + # print('Failed to login!') + # raise e + # print(f'Assuming the account of @{self.me.data["username"]} ({self.me["id"]})') ## ---[COMMENT OUT WHEN NOT IN USE]--- # async def nuke_tweets(self): @@ -154,17 +155,16 @@ class TwAPI: # NO DRY-RUN: actually post tweet # main tweet: text + screenshot try: - print('creating main QRT w/ screenshot...', end='') + print('creating main QRT w/ screenshot...') media_ids = [await self.get_ttweet_image_media_id(ttweet)] twt_resp = await self.post_tweet(text, media_ids=media_ids, quote_tweet_id=ttweet.tweet_id) print('done') except: print('error occurred trying to create main tweet, falling back to URL-main + reply screencap format') traceback.print_exc() - text += f"\n{ttweet_url}" try: - print('posting main tweet...', end='') - twt_resp = await self.post_tweet(text) + print('posting main tweet...') + twt_resp = await self.post_tweet(text, quote_tweet_id=ttweet.tweet_id) print('done') twt_id = twt_resp.data['id'] # if ttweet.reply_to is not None: