diff --git a/retweet notes.txt b/retweet notes.txt new file mode 100644 index 0000000..04809b4 --- /dev/null +++ b/retweet notes.txt @@ -0,0 +1,16 @@ +possible combinations which involve a "target cross-tweeter" B + +A retweets B + - B's tweet may have cross-mentions (B1, B2, etc.) + - rt_author_id=B; rt_mentions=B1,B2,... +A retweets tweet mentioning B + - rt_author_id=...; rt_mentions=B... + +A quotes a tweet from B + - B's tweet may have cross-mentions (B1, B2, etc.) + - quote_retweeted=B; rt_mentions=B1,B2,... +A quotes a tweet mentioning B + - quote_retweeted=...; rt_mentions=B... + +-- NO -- +A retweets a tweet that quotes a tweet mentioning B? diff --git a/src/talenttweet.py b/src/talenttweet.py index a515fcf..969d961 100644 --- a/src/talenttweet.py +++ b/src/talenttweet.py @@ -10,12 +10,19 @@ import util class TalentTweet: # Serialized one-liner format: - # {tweet} {author} {time in seconds since epoch} m {mention set} r {reply to author} q {quote tweet author} rt {retweeted tweet's id} + # {tweet} {author} {time in seconds since epoch UTC} m {mention set} r {reply to author} q {quote tweet author} rt {retweeted user's id} rtm {mentions in retweet} def serialize(self): - s = f'{self.tweet_id} {self.author_id} {self.date_time.timestamp()} ' + s = f'{self.tweet_id} {self.author_id} {int(self.date_time.timestamp())} ' + if self.date_time.tzinfo is None: + print(f'warning: serialized tweet {self.tweet_id} has a NAIVE timestamp!') + + if len(self.rt_mentions) > 0: + s += 'rtm ' + for n in self.rt_mentions: + s += f'{n} ' if self.rt_author_id != None: - s += f'rt {self.rt_id} {self.rt_author_id}' + s += f'rt {self.rt_author_id} ' return s[:-1] # stop here since retweets can't have other info if len(self.mentions) > 0: @@ -24,23 +31,25 @@ class TalentTweet: s += f'{id} ' if self.reply_to: s += f'r {self.reply_to} ' - if self.quote_retweeted: - s += f'q {self.quote_retweeted} ' + if self.quote_tweeted: + s += f'q {self.quote_tweeted} ' return s[:-1] @staticmethod def deserialize(serialized_str: str): - tokens = serialized_str.split() + tokens = serialized_str.split('#')[0] if len(tokens) < 3: raise ValueError('not enough tokens to reconstruct a TalentTweet') tweet_id, author_id = int(tokens[0]), int(tokens[1]) date_time = datetime.fromtimestamp(float(tokens[2]), tz=pytz.utc) - mentions = set() + mentions = list() reply_to = None quote_retweeted = None + rt = None + rtm = list() mode = '' for i in range(3, len(tokens)): @@ -57,15 +66,27 @@ class TalentTweet: continue if mode == 'q': # quote_retweeted quote_retweeted = int(tokens[i]) + if mode == 'rt': # retweeted user + rt = int(tokens[i]) + if mode == 'rtm': # retweet/qrt mentions + rtm = int(tokens[i]) return TalentTweet( tweet_id=tweet_id, author_id=author_id, - date_time=date_time, mrq=(mentions, reply_to, quote_retweeted) + date_time=date_time, mrq=(mentions, reply_to, quote_retweeted), + rt_author_id=rt, rt_mentions=rtm ) ## Creates a TalentTweet from a Tweety-library Tweet. @staticmethod def create_from_tweety(tweety: Tweet): + if tweety.is_retweet: + rtm = [int(x.id) for x in tweety.retweeted_tweet.user_mentions] + elif tweety.is_quoted: + rtm = [int(x.id) for x in tweety.quoted_tweet.user_mentions] + else: + rtm = list() + return TalentTweet( tweet_id=int(tweety.id), author_id=int(tweety.author.id), date_time=tweety.date, text=tweety.text, @@ -75,7 +96,7 @@ class TalentTweet: int(tweety.quoted_tweet.author.id) if tweety.quoted_tweet is not None else None ), rt_author_id=tweety.retweeted_tweet.author.id if tweety.is_retweet else None, - rt_mentions=[int(x.id) for x in tweety.retweeted_tweet.user_mentions] if tweety.is_retweet else list() + rt_mentions=rtm ) def __init__(self, tweet_id: int, author_id: int, date_time: datetime, text: str = None, mrq: tuple[list[int], int|None, int|None]=None, rt_author_id: int=None, rt_mentions: list[int]=None): @@ -88,32 +109,37 @@ class TalentTweet: # filter twitter users to only be cross-company self.mentions = {x for x in mrq[0] if is_cross_company(author_id, x)} self.reply_to = mrq[1] if mrq[1] is not None and is_cross_company(author_id, mrq[1]) else None - self.quote_retweeted = mrq[2] if mrq[2] is not None and is_cross_company(author_id, mrq[2]) else None - self.rt_mentions = {x for x in rt_mentions if is_cross_company(author_id, x)} if rt_mentions is not None else None - self.rt_author_id = rt_author_id if (rt_author_id is not None and is_cross_company(author_id, rt_author_id)) or (len(self.rt_mentions) > 0) else None + self.quote_tweeted = mrq[2] - # all users involved, except for the author - self.all_parties = {self.reply_to, self.quote_retweeted} - self.all_parties.update(self.mentions) - try: - self.all_parties.remove(None) + # rt'd/quoted tweet contains cross-company names? + self.rt_mentions = {x for x in rt_mentions if is_cross_company(author_id, x)} + self.rt_author_id = rt_author_id + + # all users involved except for the author + self.all_parties = {self.reply_to, self.quote_tweeted, rt_author_id} + self.all_parties.update(self.mentions, self.rt_mentions) + try: self.all_parties.remove(None) except: pass - try: - self.all_parties.remove(self.author_id) + try: self.all_parties.remove(self.author_id) + except: pass + + # clean up mentions + try: self.mentions.remove(self.reply_to) except: pass def __repr__(self) -> str: return ( - f'======================================================' + f'======================================================\n' f'{self.tweet_id} from {self.username}:\n' f'{self.get_datetime_str()}\n' f'parties: {self.get_all_parties_usernames()}\n' f'mentions: {self.mentions}\n' f'reply_to: {self.reply_to}\n' - f'quote_retweeted: {self.quote_retweeted}\n' + f'quote_retweeted: {self.quote_tweeted}\n' f'cross-company? {self.is_cross_company()}\n' f'{self.serialize()}\n' + f'----\n{self.announce_text()}\n----\n' f'{self.url()}' ) @@ -138,3 +164,53 @@ class TalentTweet: def get_datetime_str(self): unpad = '#' if platform.system() == 'Windows' else '-' return self.date_time.strftime(f'%b %{unpad}d %Y, %{unpad}I:%M%p (%Z)') + + def announce_text(self, is_catchup=False): + # templates + REPLY = '{0} replied to {1}!' + TWEET = '{0} tweeted!' + RETWEET = '{0} retweeted {1}!' + RETWEET_MENTIONS_B = '{0} shared a tweet mentioning{1}!' + QUOTE_TWEET = '{0} quote tweeted {1}!' + QUOTE_TWEET_MENTIONS_B = '{0} quoted a tweet mentioning {1}!' + + author_username = f'@/{util.get_username_with_company(self.author_id)}' + ret = str() + print_mention_ids = set(self.mentions) + if is_catchup: + ret += f'{self.get_datetime_str()}\n' + pass + + rt_mention_names = [util.get_username_with_company(x) for x in self.rt_mentions] + # Tweet types + if self.rt_author_id is not None: # retweet + if len(self.rt_mentions) > 0: + ret += RETWEET_MENTIONS_B.format(author_username, ", ".join(rt_mention_names)) + else: + ret += RETWEET.format(f'{author_username}', f'@/{util.get_username_with_company(self.rt_author_id)}') + elif self.reply_to is not None: # reply + reply_username = f'@/{util.get_username_with_company(self.reply_to)}' + ret += REPLY.format(author_username, reply_username) + elif self.quote_tweeted is not None: # qrt + quoted_username = f'@/{util.get_username_with_company(self.quote_tweeted)}' + if len(self.rt_mentions) > 0: + ret += QUOTE_TWEET_MENTIONS_B.format(author_username, ", ".join(rt_mention_names)) + else: + ret += QUOTE_TWEET.format(author_username, quoted_username) + elif len(self.mentions) > 0: # standalone tweet + ret += TWEET.format(author_username) + else: + raise ValueError(f'TalentTweet {self.tweet_id} has insufficient other parties') + + try: print_mention_ids.remove(None) + except: pass + + # mention line + if len(print_mention_ids) > 0: + mention_usernames = [f'@/{util.get_username_with_company(x)}' for x in print_mention_ids] + ret += ( + '\nMentioning ' + f'{", ".join(mention_usernames)}' + ) + + return ret diff --git a/src/twapi.py b/src/twapi.py index f377e6f..c7f9e6f 100644 --- a/src/twapi.py +++ b/src/twapi.py @@ -124,23 +124,6 @@ class TwAPI: # else: # print('Saul Gone') - # DEPRECATED: thx elon - async def get_tweet_response(self, id, attempt = 0): - try: - twt = TwAPI.instance.client.get_tweet( - id, - media_fields=TwAPI.TWEET_MEDIA_FIELDS, - tweet_fields=TwAPI.TWEET_FIELDS, - expansions=TwAPI.TWEET_EXPANSIONS - ) - TwAPI.tweets_fetched += 1 - return twt - except tweepy.TooManyRequests as e: - wait_for = float(e.response.headers["x-rate-limit-reset"]) - datetime.datetime.now().timestamp() + 1 - print(f'[{attempt}]\tget_tweet_response({id}):\n\thit rate limit after {TwAPI.tweets_fetched} fetches -- trying again in {wait_for} seconds...') - await asyncio.sleep(wait_for) - return await self.get_tweet_response(id, attempt=attempt+1) - async def post_tweet(self, text='', media_ids: list=None, reply_to_tweet: int=None, quote_tweet_id: int=None): try: tweet = self.client.create_tweet(text=text, media_ids=media_ids, in_reply_to_tweet_id=reply_to_tweet, quote_tweet_id=quote_tweet_id) @@ -160,55 +143,12 @@ class TwAPI: # return False = did not post ttweet (duplicate) async def post_ttweet(self, ttweet: tt.TalentTweet, is_catchup=False, dry_run=False): print(f'------{ttweet.tweet_id} ({util.get_username_local(ttweet.author_id)})------') - - REPLY = '{0} replied to {1}!\n' - QUOTE_TWEET = '{0} quote tweeted {1}!\n' - TWEET = '{0} tweeted!\n' - RETWEET = '{0} retweeted {1}!\n' - - def create_text(): - author_username = f'@/{util.get_username_with_company(ttweet.author_id)}' - print_mention_ids = set(ttweet.mentions) - ret = str() - if is_catchup: - ret += f'{ttweet.get_datetime_str()}\n' - pass - - # Tweet types - if ttweet.rt_id is not None: # retweet - ret += RETWEET.format(f'{author_username}', f'@/{util.get_username_with_company(ttweet.rt_author_id)}') - elif ttweet.reply_to is not None: # reply - reply_username = f'@/{util.get_username_with_company(ttweet.reply_to)}' - ret += REPLY.format(author_username, reply_username) - # if qrt, push id into mentions - print_mention_ids.add(ttweet.quote_retweeted) - elif ttweet.quote_retweeted is not None: # qrt - quoted_username = f'@/{util.get_username_with_company(ttweet.quote_retweeted)}' - ret += QUOTE_TWEET.format(author_username, quoted_username) - elif len(ttweet.mentions) > 0: # standalone tweet - ret += TWEET.format(author_username) - else: - raise ValueError(f'TalentTweet {ttweet.tweet_id} has insufficient other parties') - - try: print_mention_ids.remove(None) - except: pass - - # mention line - if len(print_mention_ids) > 0: - mention_usernames = [f'@/{util.get_username_with_company(x)}' for x in print_mention_ids] - ret += ( - 'mentioning ' - f'{", ".join(mention_usernames)}\n' - ) - ret += '\n' - # ret += '(this is a missed tweet)\n' if is_catchup else '' - return ret - text = create_text() - ttweet_url = util.ttweet_to_url(ttweet) + text = ttweet.announce_text() + ttweet_url = ttweet.url() if dry_run: print('-------------------- DRY RUN --------------------') - print(text) + print(ttweet) if dry_run: return False # NO DRY-RUN: actually post tweet diff --git a/src/util.py b/src/util.py index 5f9148a..289c273 100644 --- a/src/util.py +++ b/src/util.py @@ -45,7 +45,7 @@ def get_current_timestamp(): def get_current_date(): return datetime.today().strftime('%Y-%m-%d') -def get_key_from_value(d, val): +def get_key_from_value(d: dict, val): keys = [k for k, v in d.items() if v == val] if keys: return keys[0] @@ -101,21 +101,6 @@ def ttweet_to_url(ttweet): # except: # return str(default) if default is not None else f'{id}' -def get_username_local(id: int): - return talent_lists.talents.get(id, f'{id}') - -# Retrieve username via API v2 (tweepy) -# def get_username_online(id, default=None): -# try: -# resp = twapi.TwAPI.instance.client.get_user(id=id) -# return resp.data.username -# except tweepy.TooManyRequests: -# return str(default) if default is not None else f'id:{id}' -# except: -# print(f'Unhandled error retrieving username for {id}!') -# traceback.print_exc() -# return str(default) if default is not None else f'id:{id}' - ## Attempt to pull username from local; pull from online if doesn't exist. def get_username(id): ret = talent_lists.talents.get(id, None) @@ -127,21 +112,17 @@ def get_username_with_company(id): company = talent_lists.talents_company.get(id, None) return f'{get_username(id)} {f"({company})" if company is not None else ""}' -def get_user_id_local(username) -> int: - talent_usernames = list(talent_lists.talents.values()) - for i in range(0, len(talent_usernames)): - if username.lower() == talent_usernames[i].lower(): - return list(talent_lists.talents)[i] - -def get_user_id_online(username) -> int: - c = twint.Config() - c.Username = username - c.Store_object = True - c.Hide_output = True +def get_username_local(id: int): + return talent_lists.talents.get(id, f'{id}') + +# Retrieve username via API v2 (tweepy) +def get_username_online(id, default=None): try: - twint.output.users_list.clear() - twint.run.Lookup(c) - user = twint.output.users_list[0] - return user.id + resp = twapi.TwAPI.instance.client.get_user(id=id) + return resp.data.username + except tweepy.TooManyRequests: + return str(default) if default is not None else f'id:{id}' except: - return -1 + print(f'Unhandled error retrieving username for {id}!') + traceback.print_exc() + return str(default) if default is not None else f'id:{id}' \ No newline at end of file