diff --git a/src/scraper.py b/src/scraper.py index c20e6e5..9e77dc9 100644 --- a/src/scraper.py +++ b/src/scraper.py @@ -7,93 +7,95 @@ import pytz from tweety import Twitter from tweety.types import * +from tweety.exceptions_ import * +from tweety.filters import SearchFilters from tweety_utils import * from talenttweet import * -from talent_lists import is_niji, is_holo +import talent_lists class Scraper: def __init__(self): creds = dotenv_values() self.app = Twitter("session") - if exists("session.json"): - self.app.connect() - else: - self.app.sign_in(creds["scraper_username"], creds["scraper_password"]) + # if exists("session.json"): + # self.app.connect() + # else: + self.app.sign_in(creds["scraper_username"], creds["scraper_password"]) # since MUST BE TIMEZONE AWARE # usage example: since=datetime(2023, 8, 1).replace(tzinfo=pytz.utc) - def get_tweets_from_user(self, uid: int | str, since: datetime = None) -> list: + def get_tweets_from_user(self, username: str, since: datetime = None) -> list[Tweet]: reached_backdate = False tweets: list[Tweet] = [] + cur = None if since == None: since = datetime.utcnow().replace(tzinfo=pytz.utc) - timedelta(days=7) - print(f'Grabbing tweets since 7 days ago ({since.date()})') + print(f'falling back to grabbing tweets since 7 days ago ({since.date()})') + else: + print(f'grabbing tweets since {since.date()}') - if isinstance(uid, str): - name = uid - uid = self.app._get_user_id(uid) - print(f"{name} = {uid}") + uid = self.app._get_user_id(username) + print(f"{username} = {uid}") def add_tweet(tweet: Tweet): + # malformed tweet check nonlocal reached_backdate try: tweet.author - tweets.append(tweet) - if not reached_backdate and tweet.date <= since: - print("reached backdate") - reached_backdate = True except AttributeError: print("skipping malformed tweet: {tweet}") return - uts = self.app.get_tweets(uid, replies=True) + # fix reply if it exists + # if tweet.is_reply and tweet.replied_to is None: + # tweet.replied_to = self.app.tweet_detail(tweet._original_tweet['in_reply_to_status_id_str']) + tweets.append(tweet) + + if not reached_backdate and int(tweet.author.id) == uid and tweet.date <= since: + print("reached backdate") + reached_backdate = True + while not reached_backdate: - cur_page = uts.tweets - print(f'obtained {len(cur_page)} tweets') + try: + # uts = self.app.get_tweets(uid, replies=True, cursor=cur) + search = self.app.search(f'from:{username}', filter_=SearchFilters.Latest(), cursor=cur) + cur_page = search.tweets + print(f'obtained {len(cur_page)} tweets') - if len(cur_page) == 0: break + if len(cur_page) == 0: break - for e in cur_page: - if isinstance(e, Tweet): - add_tweet(e) - elif isinstance(e, TweetThread): - # FIXME: rework when replied_to is fixed (currently only user_mentions works) - t = e[-1] # latest tweet in thread = og author's reply - t.replied_to = e[-2] - add_tweet(t) - print(f"adding thread latest: {t.id}") - - uts = self.app.get_tweets(uid, replies=True, cursor=uts.cursor) + for e in cur_page: + if isinstance(e, Tweet): + add_tweet(e) + elif isinstance(e, TweetThread): + # FIXME: rework when replied_to is fixed (currently populates user_mentions) + # latest tweet in thread = og author's reply + add_tweet(e[0]) + for t in e: + add_tweet(t) + + cur = search.cursor + except UnknownError: + print("UnknownError occurred, probably rate-limited") + print("sleeping for 1 minute...") + sleep(60) tweets.sort(key=lambda t: t.id) return tweets - def get_cross_ttweets_from_user(self, uid: int | str, since: datetime = None): - tweets = self.get_tweets_from_user(uid, since) - ret: [TalentTweet] = [] + def get_cross_ttweets_from_user(self, username: str, since: datetime = None) -> list[TalentTweet]: + tweets = self.get_tweets_from_user(username, since) + print_tweets(tweets) + ret: list[TalentTweet] = [] for t in tweets: - is_niji = is_niji(int(t.author.id)) - is_cross = False + tt = TalentTweet.create_from_tweety(t) + if tt.is_cross_company(): + ret.append(tt) + return ret - # cross-rt? - - # rt mentions cross-company? - - # cross-qrt? - - # cross-reply? - if t.replied_to is not None: - if is_niji == is_holo(int(t.replied_to.author.id)): - is_cross = True - - # cross-mention? in-thread? - for u in t.user_mentions: - if is_niji == is_holo(int(u.id)): - is_cross = True - -if __name__ == '__main__': - app = Scraper() - tweets = app.get_tweets_from_user("pomurainpuff") - print_tweets(tweets) \ No newline at end of file +talent_lists.init() +s = Scraper() +ttweets = s.get_cross_ttweets_from_user("pomurainpuff", since=datetime(2023, 7, 30).replace(tzinfo=pytz.utc)) +print("\n".join([x.__repr__() for x in ttweets])) \ No newline at end of file diff --git a/src/talenttweet.py b/src/talenttweet.py index baa2c54..a515fcf 100644 --- a/src/talenttweet.py +++ b/src/talenttweet.py @@ -3,8 +3,9 @@ from zoneinfo import ZoneInfo import platform import pytz +from tweety.types import * -import talent_lists +from talent_lists import is_cross_company import util class TalentTweet: @@ -13,8 +14,8 @@ class TalentTweet: def serialize(self): s = f'{self.tweet_id} {self.author_id} {self.date_time.timestamp()} ' - if None not in [self.rt_target, self.rt_author_id]: - s += f'rt {self.rt_target} {self.rt_author_id}' + if self.rt_author_id != None: + s += f'rt {self.rt_id} {self.rt_author_id}' return s[:-1] # stop here since retweets can't have other info if len(self.mentions) > 0: @@ -61,14 +62,35 @@ class TalentTweet: tweet_id=tweet_id, author_id=author_id, date_time=date_time, mrq=(mentions, reply_to, quote_retweeted) ) + + ## Creates a TalentTweet from a Tweety-library Tweet. + @staticmethod + def create_from_tweety(tweety: Tweet): + return TalentTweet( + tweet_id=int(tweety.id), author_id=int(tweety.author.id), + date_time=tweety.date, text=tweety.text, + mrq=( + [int(x.id) for x in tweety.user_mentions], + int(tweety._original_tweet['in_reply_to_user_id_str']) if tweety.is_reply else None, + int(tweety.quoted_tweet.author.id) if tweety.quoted_tweet is not None else None + ), + rt_author_id=tweety.retweeted_tweet.author.id if tweety.is_retweet else None, + rt_mentions=[int(x.id) for x in tweety.retweeted_tweet.user_mentions] if tweety.is_retweet else list() + ) - def __init__(self, tweet_id: int, author_id: int, date_time: datetime, mrq: tuple, rt_target: int=None, rt_author_id: int=None): + def __init__(self, tweet_id: int, author_id: int, date_time: datetime, text: str = None, mrq: tuple[list[int], int|None, int|None]=None, rt_author_id: int=None, rt_mentions: list[int]=None): + # basic information self.tweet_id, self.author_id = tweet_id, author_id + self.username = util.get_username_local(self.author_id) self.date_time = date_time - self.mentions = tuple(int(x) for x in mrq[0]) - self.reply_to = int(mrq[1]) if mrq[1] is not None else None - self.quote_retweeted = int(mrq[2]) if mrq[2] is not None else None - self.rt_target, self.rt_author_id = rt_target, rt_author_id + self.text = text + + # filter twitter users to only be cross-company + self.mentions = {x for x in mrq[0] if is_cross_company(author_id, x)} + self.reply_to = mrq[1] if mrq[1] is not None and is_cross_company(author_id, mrq[1]) else None + self.quote_retweeted = mrq[2] if mrq[2] is not None and is_cross_company(author_id, mrq[2]) else None + self.rt_mentions = {x for x in rt_mentions if is_cross_company(author_id, x)} if rt_mentions is not None else None + self.rt_author_id = rt_author_id if (rt_author_id is not None and is_cross_company(author_id, rt_author_id)) or (len(self.rt_mentions) > 0) else None # all users involved, except for the author self.all_parties = {self.reply_to, self.quote_retweeted} @@ -83,20 +105,24 @@ class TalentTweet: def __repr__(self) -> str: return ( - f'{self.tweet_id} from {util.get_username_local(self.author_id)}):\n' + f'======================================================' + f'{self.tweet_id} from {self.username}:\n' f'{self.get_datetime_str()}\n' - f'{self.get_all_parties_usernames()}\n' + f'parties: {self.get_all_parties_usernames()}\n' f'mentions: {self.mentions}\n' f'reply_to: {self.reply_to}\n' f'quote_retweeted: {self.quote_retweeted}\n' - f'Cross-company: {self.is_cross_company()}\n' + f'cross-company? {self.is_cross_company()}\n' f'{self.serialize()}\n' - f'======================================================' + f'{self.url()}' ) + def url(self): + return f'https://www.twitter.com/{self.username}/status/{self.tweet_id}' + def is_cross_company(self): for other_id in self.all_parties: - if talent_lists.is_cross_company(self.author_id, other_id): + if is_cross_company(self.author_id, other_id): return True return False diff --git a/src/twapi.py b/src/twapi.py index 687393d..f377e6f 100644 --- a/src/twapi.py +++ b/src/twapi.py @@ -175,7 +175,7 @@ class TwAPI: pass # Tweet types - if ttweet.rt_target is not None: # retweet + if ttweet.rt_id is not None: # retweet ret += RETWEET.format(f'{author_username}', f'@/{util.get_username_with_company(ttweet.rt_author_id)}') elif ttweet.reply_to is not None: # reply reply_username = f'@/{util.get_username_with_company(ttweet.reply_to)}' diff --git a/src/tweety_utils.py b/src/tweety_utils.py index 8d96b04..ad21e05 100644 --- a/src/tweety_utils.py +++ b/src/tweety_utils.py @@ -7,8 +7,13 @@ def print_tweets(tweets: list[Tweet | TweetThread]): print(f'{len(tweets)} tweets:') for t in tweets: if isinstance(t, Tweet): - print(f'{t.date} : {url(t)} : RT? {t.is_retweet} ', end=' ') + print(f'{t.date} : {url(t)} :', end=' ') + if t.is_retweet: + print(f'RT ({t.retweeted_tweet.author.username})', end=' ') + + if t.is_reply: + print(f'is reply!', end=' ') if t.replied_to is not None: print(f'reply to {t.replied_to.author.username}', end=' ') diff --git a/src/util.py b/src/util.py index b23cbb5..5f9148a 100644 --- a/src/util.py +++ b/src/util.py @@ -101,20 +101,20 @@ def ttweet_to_url(ttweet): # except: # return str(default) if default is not None else f'{id}' -def get_username_local(id): +def get_username_local(id: int): return talent_lists.talents.get(id, f'{id}') # Retrieve username via API v2 (tweepy) -def get_username_online(id, default=None): - try: - resp = twapi.TwAPI.instance.client.get_user(id=id) - return resp.data.username - except tweepy.TooManyRequests: - return str(default) if default is not None else f'id:{id}' - except: - print(f'Unhandled error retrieving username for {id}!') - traceback.print_exc() - return str(default) if default is not None else f'id:{id}' +# def get_username_online(id, default=None): +# try: +# resp = twapi.TwAPI.instance.client.get_user(id=id) +# return resp.data.username +# except tweepy.TooManyRequests: +# return str(default) if default is not None else f'id:{id}' +# except: +# print(f'Unhandled error retrieving username for {id}!') +# traceback.print_exc() +# return str(default) if default is not None else f'id:{id}' ## Attempt to pull username from local; pull from online if doesn't exist. def get_username(id):