handle tweets self-quoting/replying

This commit is contained in:
muskit
2024-01-26 23:58:30 -08:00
parent 39cd02bce2
commit 5c70996067
2 changed files with 178 additions and 103 deletions
+177 -102
View File
@@ -8,79 +8,83 @@ from tweety.types import *
import talent_lists as tl import talent_lists as tl
import util import util
class TalentTweet: class TalentTweet:
# Serialized one-liner format: # Serialized one-liner format:
# {tweet} {author} {time in seconds since epoch UTC} m {mention set} r {reply to author} q {quote tweet author} rt {retweeted user's id} rtm {mentions in retweet} # {tweet} {author} {time in seconds since epoch UTC} m {mention set} r {reply to author} q {quote tweet author} rt {retweeted user's id} rtm {mentions in retweet}
def serialize(self): def serialize(self):
s = f'{self.tweet_id} {self.author_id} {int(self.date_time.timestamp())} ' s = f"{self.tweet_id} {self.author_id} {int(self.date_time.timestamp())} "
if self.date_time.tzinfo is None: if self.date_time.tzinfo is None:
print(f'warning: serialized tweet {self.tweet_id} has a NAIVE timestamp!') print(f"warning: serialized tweet {self.tweet_id} has a NAIVE timestamp!")
if len(self.rt_mentions) > 0: if len(self.rt_mentions) > 0:
s += 'rtm ' s += "rtm "
for n in self.rt_mentions: for n in self.rt_mentions:
s += f'{n} ' s += f"{n} "
if self.rt_author_id != None: if self.rt_author_id != None:
s += f'rt {self.rt_author_id} ' s += f"rt {self.rt_author_id} "
return s[:-1] # stop here since retweets can't have other info return s[:-1] # stop here since retweets can't have other info
if len(self.mentions) > 0: if len(self.mentions) > 0:
s += 'm ' s += "m "
for id in self.mentions: for id in self.mentions:
s += f'{id} ' s += f"{id} "
if self.reply_to: if self.reply_to:
s += f'r {self.reply_to} ' s += f"r {self.reply_to} "
if self.quote_tweeted: if self.quote_tweeted:
s += f'q {self.quote_tweeted} ' s += f"q {self.quote_tweeted} "
return s[:-1] return s[:-1]
@staticmethod @staticmethod
def deserialize(serialized_str: str): def deserialize(serialized_str: str):
token_check = serialized_str.split('#')[0] token_check = serialized_str.split("#")[0]
if len(token_check) < 3: if len(token_check) < 3:
raise ValueError('not enough tokens to reconstruct a TalentTweet') raise ValueError("not enough tokens to reconstruct a TalentTweet")
tokens = serialized_str.split() tokens = serialized_str.split()
tweet_id, author_id = int(tokens[0]), int(tokens[1]) tweet_id, author_id = int(tokens[0]), int(tokens[1])
date_time = datetime.fromtimestamp(float(tokens[2]), tz=pytz.utc) date_time = datetime.fromtimestamp(float(tokens[2]), tz=pytz.utc)
mentions = list() mentions = list()
reply_to = None reply_to = None
quote_retweeted = None quote_retweeted = None
rt = None rt = None
rtm = list() rtm = list()
mode = '' mode = ""
for i in range(3, len(tokens)): for i in range(3, len(tokens)):
if not tokens[i].isnumeric(): # mode switch if not tokens[i].isnumeric(): # mode switch
mode = tokens[i] mode = tokens[i]
continue continue
if tokens[i].isnumeric(): if tokens[i].isnumeric():
if mode == 'm': # mentions if mode == "m": # mentions
mentions.append(int(tokens[i])) mentions.append(int(tokens[i]))
continue continue
elif mode == 'r': # reply_to elif mode == "r": # reply_to
reply_to = int(tokens[i]) reply_to = int(tokens[i])
continue continue
elif mode == 'q': # quote_retweeted elif mode == "q": # quote_retweeted
quote_retweeted = int(tokens[i]) quote_retweeted = int(tokens[i])
elif mode == 'rt': # retweeted user elif mode == "rt": # retweeted user
rt = int(tokens[i]) rt = int(tokens[i])
elif mode == 'rtm': # retweet/qrt mentions elif mode == "rtm": # retweet/qrt mentions
rtm.append(int(tokens[i])) rtm.append(int(tokens[i]))
else: else:
raise ValueError(f'encountered invalid mode token {mode}') raise ValueError(f"encountered invalid mode token {mode}")
return TalentTweet( return TalentTweet(
tweet_id=tweet_id, author_id=author_id, tweet_id=tweet_id,
date_time=date_time, mrq=(mentions, reply_to, quote_retweeted), author_id=author_id,
rt_author_id=rt, rt_mentions=rtm date_time=date_time,
mrq=(mentions, reply_to, quote_retweeted),
rt_author_id=rt,
rt_mentions=rtm,
) )
## Creates a TalentTweet from a Tweety-library Tweet. ## Creates a TalentTweet from a Tweety-library Tweet.
@staticmethod @staticmethod
def create_from_tweety(tweety: Tweet): def create_from_tweety(tweety: Tweet):
@@ -94,18 +98,35 @@ class TalentTweet:
rtm = set() rtm = set()
return TalentTweet( return TalentTweet(
tweet_id=int(tweety.id), author_id=int(tweety.author.id), tweet_id=int(tweety.id),
date_time=tweety.date, text=tweety.text, author_id=int(tweety.author.id),
date_time=tweety.date,
text=tweety.text,
mrq=( mrq=(
{int(x.id) for x in tweety.user_mentions}, {int(x.id) for x in tweety.user_mentions},
int(tweety.original_tweet['in_reply_to_user_id_str']) if tweety.is_reply else None, int(tweety.original_tweet["in_reply_to_user_id_str"])
int(tweety.quoted_tweet.author.id) if tweety.quoted_tweet is not None else None if tweety.is_reply
else None,
int(tweety.quoted_tweet.author.id)
if tweety.quoted_tweet is not None
else None,
), ),
rt_author_id=tweety.retweeted_tweet.author.id if tweety.is_retweet else None, rt_author_id=tweety.retweeted_tweet.author.id
rt_mentions=rtm if tweety.is_retweet
else None,
rt_mentions=rtm,
) )
def __init__(self, tweet_id: int, author_id: int, date_time: datetime, text: str = None, mrq: tuple[list[int], int|None, int|None]=None, rt_author_id: int=None, rt_mentions: list[int]=None): def __init__(
self,
tweet_id: int,
author_id: int,
date_time: datetime,
text: str = None,
mrq: tuple[list[int], int | None, int | None] = None,
rt_author_id: int = None,
rt_mentions: list[int] = None,
):
# basic information # basic information
self.tweet_id, self.author_id = tweet_id, author_id self.tweet_id, self.author_id = tweet_id, author_id
self.username = util.get_username_local(self.author_id) self.username = util.get_username_local(self.author_id)
@@ -116,47 +137,66 @@ class TalentTweet:
self.mentions = {x for x in mrq[0] if x in tl.talents} self.mentions = {x for x in mrq[0] if x in tl.talents}
self.rt_mentions = {x for x in rt_mentions if x in tl.talents} self.rt_mentions = {x for x in rt_mentions if x in tl.talents}
self.mentions.difference_update(self.rt_mentions) self.mentions.difference_update(self.rt_mentions)
try: self.rt_mentions.remove(self.author_id) try:
except: pass self.rt_mentions.remove(self.author_id)
except:
pass
self.reply_to = mrq[1] self.reply_to = mrq[1]
self.quote_tweeted = mrq[2] self.quote_tweeted = mrq[2]
self.rt_author_id = rt_author_id self.rt_author_id = rt_author_id
try: self.mentions.remove(self.reply_to) try:
except: pass self.mentions.remove(self.reply_to)
except:
pass
# -1 if user is not in company # -1 if user is not in company
self.reply_to = self.reply_to if self.reply_to is None or self.reply_to in tl.talents else -1 self.reply_to = (
self.quote_tweeted = self.quote_tweeted if self.quote_tweeted is None or self.quote_tweeted in tl.talents else -1 self.reply_to
self.rt_author_id = self.rt_author_id if self.rt_author_id is None or self.rt_author_id in tl.talents else -1 if self.reply_to is None or self.reply_to in tl.talents
else -1
)
self.quote_tweeted = (
self.quote_tweeted
if self.quote_tweeted is None or self.quote_tweeted in tl.talents
else -1
)
self.rt_author_id = (
self.rt_author_id
if self.rt_author_id is None or self.rt_author_id in tl.talents
else -1
)
# all users involved except for the author # all users involved except for the author
self.all_parties = {self.reply_to, self.quote_tweeted, rt_author_id} self.all_parties = {self.reply_to, self.quote_tweeted, rt_author_id}
self.all_parties.update(self.mentions, self.rt_mentions) self.all_parties.update(self.mentions, self.rt_mentions)
try: self.all_parties.remove(None) try:
except: pass self.all_parties.remove(None)
try: self.all_parties.remove(self.author_id) except:
except: pass pass
try:
self.all_parties.remove(self.author_id)
except:
pass
# if not self.is_cross_company(): # if not self.is_cross_company():
# print(f'WARNING: {self.tweet_id} is not cross-company!') # print(f'WARNING: {self.tweet_id} is not cross-company!')
def __repr__(self) -> str: def __repr__(self) -> str:
return ( return (
f'======================================================\n' f"======================================================\n"
f'{self.tweet_id} from {self.username}:\n' f"{self.tweet_id} from {self.username}:\n"
f'{self.get_datetime_str()}\n' f"{self.get_datetime_str()}\n"
f'parties: {self.get_all_parties_usernames()}\n' f"parties: {self.get_all_parties_usernames()}\n"
f'mentions: {self.mentions}\n' f"mentions: {self.mentions}\n"
f'reply_to: {self.reply_to}\n' f"reply_to: {self.reply_to}\n"
f'rtm: {self.rt_mentions}\n' f"rtm: {self.rt_mentions}\n"
f'quote_retweeted: {self.quote_tweeted}\n' f"quote_retweeted: {self.quote_tweeted}\n"
f'cross-company? {self.is_cross_company()}\n' f"cross-company? {self.is_cross_company()}\n"
f'{self.serialize()}\n' f"{self.serialize()}\n"
f'----\n{self.announce_text()}\n----\n' f"----\n{self.announce_text()}\n----\n"
f'{self.url()}' f"{self.url()}"
) )
def url(self): def url(self):
@@ -165,89 +205,124 @@ class TalentTweet:
def is_cross_company(self): def is_cross_company(self):
if self.author_id == self.rt_author_id: if self.author_id == self.rt_author_id:
return False return False
for other_id in self.all_parties: for other_id in self.all_parties:
if tl.is_cross_company(self.author_id, other_id): if tl.is_cross_company(self.author_id, other_id):
return True return True
return False return False
def get_all_parties_usernames(self): def get_all_parties_usernames(self):
if len(self.all_parties) > 0: if len(self.all_parties) > 0:
s = str() s = str()
for id in self.all_parties: for id in self.all_parties:
s += f'{util.get_username_local(id)}, ' s += f"{util.get_username_local(id)}, "
return s[0:-2] return s[0:-2]
return 'none' return "none"
def get_datetime_str(self): def get_datetime_str(self):
unpad = '#' if platform.system() == 'Windows' else '-' unpad = "#" if platform.system() == "Windows" else "-"
return self.date_time.strftime(f'%b %{unpad}d, %Y · %{unpad}I:%M%p (%Z)') return self.date_time.strftime(f"%b %{unpad}d, %Y · %{unpad}I:%M%p (%Z)")
def announce_text(self): def announce_text(self):
# templates # templates
TWEET = '{0} tweeted mentioning {1}!' TWEET = "{0} tweeted mentioning {1}!"
REPLY = '{0} replied to {1}!' REPLY = "{0} replied to {1}!"
REPLY_TO_MENTION_B = '{0} replied to a tweet{1}mentioning {2}!' ######################### REPLY_TO_MENTION_B = (
RETWEET = '{0} retweeted {1}!' "{0} replied to a tweet{1}mentioning {2}!" #########################
RETWEET_MENTIONS_B = '{0} shared a tweet{1}mentioning {2}!' ######################### )
QUOTE_TWEET = '{0} quote tweeted {1}!' RETWEET = "{0} retweeted {1}!"
QUOTED_TWEET_MENTIONS_B = '{0} quoted a tweet{1}mentioning {2}!' ######################### RETWEET_MENTIONS_B = (
"{0} shared a tweet{1}mentioning {2}!" #########################
)
QUOTE_TWEET = "{0} quote tweeted {1}!"
QUOTED_TWEET_MENTIONS_B = (
"{0} quoted a tweet{1}mentioning {2}!" #########################
)
author_username = f'@/{util.get_username_with_company(self.author_id)}' author_username = util.get_username_with_company(self.author_id)
ret = str() ret = str()
print_mention_ids = set(self.mentions) print_mention_ids = set(self.mentions)
try: print_mention_ids.remove(None) try:
except: pass print_mention_ids.remove(None)
mention_usernames = [f'@/{util.get_username_with_company(x)}' for x in print_mention_ids] except:
pass
mention_usernames = [
util.get_username_with_company(x) for x in print_mention_ids
]
def rtm_msg(TEMPLATE: str, rtm_author_username: str): def rtm_msg(TEMPLATE: str, rtm_author_username: str):
nonlocal ret nonlocal ret
if (self.rt_author_id is not None and self.rt_author_id != -1) \ if (
or (self.quote_tweeted is not None and self.quote_tweeted != -1) \ (self.rt_author_id is not None and self.rt_author_id != -1)
or (self.reply_to is not None and self.reply_to != -1): # rtm tweet is from talent; rtm should be everyone or (self.quote_tweeted is not None and self.quote_tweeted != -1)
rtm_names = [f'@/{util.get_username_with_company(x)}' for x in self.rt_mentions] or (self.reply_to is not None and self.reply_to != -1)
between = f' from {rtm_author_username} ' ): # rtm tweet is from talent; rtm should be everyone
rtm_names = [
util.get_username_with_company(x) for x in self.rt_mentions
]
between = f" from {rtm_author_username} "
ret += TEMPLATE.format(author_username, between, ", ".join(rtm_names)) ret += TEMPLATE.format(author_username, between, ", ".join(rtm_names))
else: # rtm tweet is not from a talent; rtm should just be cross company else: # rtm tweet is not from a talent; rtm should just be cross company
rtm_names = [f'@/{util.get_username_with_company(x)}' for x in self.rt_mentions if tl.is_cross_company(self.author_id, x)] rtm_names = [
ret += TEMPLATE.format(author_username, ' ', ", ".join(rtm_names)) util.get_username_with_company(x)
for x in self.rt_mentions
if tl.is_cross_company(self.author_id, x)
]
ret += TEMPLATE.format(author_username, " ", ", ".join(rtm_names))
# Tweet types # Tweet types
if self.rt_author_id is not None: # retweet if self.rt_author_id is not None: # retweet
rt_username = f'@/{util.get_username_with_company(self.rt_author_id)}' if self.rt_author_id != -1 else None rt_username = (
util.get_username_with_company(self.rt_author_id)
if self.rt_author_id != -1
else None
)
if rt_username == author_username:
rt_username = "themselves"
if len(self.rt_mentions) > 0: if len(self.rt_mentions) > 0:
rtm_msg(RETWEET_MENTIONS_B, rt_username) rtm_msg(RETWEET_MENTIONS_B, rt_username)
else: else:
ret += RETWEET.format(author_username, rt_username) ret += RETWEET.format(author_username, rt_username)
mention_usernames.clear() mention_usernames.clear()
elif self.reply_to is not None: # reply elif self.reply_to is not None: # reply
reply_username = f'@/{util.get_username_with_company(self.reply_to)}' if self.reply_to != -1 else None reply_username = (
util.get_username_with_company(self.reply_to)
if self.reply_to != -1
else None
)
if reply_username == author_username:
reply_username = "themselves"
if len(self.rt_mentions) > 0: if len(self.rt_mentions) > 0:
rtm_msg(REPLY_TO_MENTION_B, reply_username) rtm_msg(REPLY_TO_MENTION_B, reply_username)
else: else:
ret += REPLY.format(author_username, reply_username) ret += REPLY.format(author_username, reply_username)
elif self.quote_tweeted is not None: # qrt elif self.quote_tweeted is not None: # qrt
quoted_username = f'@/{util.get_username_with_company(self.quote_tweeted)}' if self.quote_tweeted != -1 else None quoted_username = (
util.get_username_with_company(self.quote_tweeted)
if self.quote_tweeted != -1
else None
)
if quoted_username == author_username:
quoted_username = "themselves"
if len(self.rt_mentions) > 0: if len(self.rt_mentions) > 0:
rtm_msg(QUOTED_TWEET_MENTIONS_B, quoted_username) rtm_msg(QUOTED_TWEET_MENTIONS_B, quoted_username)
else: else:
ret += QUOTE_TWEET.format(author_username, quoted_username) ret += QUOTE_TWEET.format(author_username, quoted_username)
elif len(self.mentions) > 0: # standalone tweet elif len(self.mentions) > 0: # standalone tweet
ret += TWEET.format(author_username, ", ".join(mention_usernames)) ret += TWEET.format(author_username, ", ".join(mention_usernames))
mention_usernames.clear() mention_usernames.clear()
else: else:
raise ValueError(f'TalentTweet {self.tweet_id} has insufficient other parties') raise ValueError(
f"TalentTweet {self.tweet_id} has insufficient other parties"
)
# mention line # mention line
if len(mention_usernames) > 0: if len(mention_usernames) > 0:
ret += ( ret += "\nMentions: " f'{", ".join(mention_usernames)}'
'\nMentions: '
f'{", ".join(mention_usernames)}'
)
# date # date
ret += f'\n\n{self.get_datetime_str()}' ret += f"\n\n{self.get_datetime_str()}"
return ret return ret
+1 -1
View File
@@ -119,7 +119,7 @@ def get_username(id):
def get_username_with_company(id): def get_username_with_company(id):
company = talent_lists.talents_company.get(id, None) company = talent_lists.talents_company.get(id, None)
return f'{get_username(id)} {f"({company})" if company is not None else ""}' return f'@/{get_username(id)} {f"({company})" if company is not None else ""}'
def get_username_local(id: int): def get_username_local(id: int):