Files
NijiHolo_EN_ID_Bot/src/talenttweet.py
T

180 lines
6.8 KiB
Python
Raw Normal View History

2022-09-27 02:49:03 -07:00
import datetime
2022-09-24 17:56:58 -07:00
import platform
import pytz
2022-09-24 17:56:58 -07:00
2022-10-01 13:33:20 -07:00
import twapi
2022-09-24 17:56:58 -07:00
import talent_lists
2022-09-27 15:09:09 -07:00
import util
2022-09-24 17:56:58 -07:00
class TalentTweet:
@staticmethod
def deserialize(serialized_str: str):
tokens = serialized_str.split()
if len(tokens) < 3:
raise ValueError('not enough tokens to reconstruct a TalentTweet')
tweet_id, author_id = int(tokens[0]), int(tokens[1])
2022-09-27 02:49:03 -07:00
date_time = datetime.datetime.fromtimestamp(float(tokens[2]), tz=pytz.utc)
mentions = set()
reply_to = None
quote_retweeted = None
mode = ''
for i in range(3, len(tokens)):
if len(tokens[i]) == 1 and not tokens[i].isnumeric(): # mode switch
mode = tokens[i]
continue
if tokens[i].isnumeric():
if mode == 'm': # mentions
mentions.add(int(tokens[i]))
continue
if mode == 'r': # reply_to
reply_to = int(tokens[i])
continue
if mode == 'q': # quote_retweeted
quote_retweeted = int(tokens[i])
return TalentTweet(
tweet_id=tweet_id, author_id=author_id,
date_time=date_time, mrq=(mentions, reply_to, quote_retweeted)
)
@staticmethod
2022-09-26 14:44:46 -07:00
async def create_from_twint_tweet(tweet):
2022-09-27 02:49:03 -07:00
# MRQ
mentions = set()
reply_to = None
2022-09-27 02:49:03 -07:00
quoted_id = None
# reply_to/mentions
2022-09-26 14:44:46 -07:00
is_reply = tweet.id != int(tweet.conversation_id)
mentions = set([x['id'] for x in tweet.mentions])
2022-09-27 02:49:03 -07:00
if is_reply and len(tweet.reply_to) > 0:
reply_to = tweet.reply_to[0]['id'] # FIXME: QRT = is_reply and len(tweet.reply_to) == 0?
2022-09-26 14:44:46 -07:00
reply_others = [x['id'] for x in tweet.reply_to[1:]]
mentions.update(reply_others)
try: mentions.remove(reply_to)
except: pass
2022-09-27 02:49:03 -07:00
# qrt
if type(tweet.quote_url) == str:
# print(f'url: {tweet.quote_url} ({type(tweet.quote_url)})')
quote_tokens = tweet.quote_url.split('/')
if len(quote_tokens) >= 2:
quoted_username = quote_tokens[-2]
quoted_id = util.get_user_id_local(quoted_username)
if quoted_id == -1:
quoted_id = util.get_user_id_online(quoted_username)
2022-10-01 13:33:20 -07:00
# FIXME: resultant tweets don't show timezone properly
2022-09-27 02:49:03 -07:00
date_time = datetime.datetime.strptime(tweet.datetime, '%Y-%m-%d %H:%M:%S %Z')
print(date_time)
2022-09-27 02:49:03 -07:00
return TalentTweet(tweet_id=tweet.id, author_id=tweet.user_id, date_time=date_time, mrq=(mentions, reply_to, quoted_id))
@staticmethod
2022-09-27 22:04:26 -07:00
def create_from_v2api_response(resp):
tweet = resp.data
2022-10-01 13:33:20 -07:00
mrq = twapi.TwAPI.get_mrq(tweet, resp)
2022-09-27 22:04:26 -07:00
rt_target = None
rt_author_id = None
# check if is RT
if tweet.referenced_tweets is not None and len(tweet.referenced_tweets) > 0:
for ref in tweet.referenced_tweets:
if ref.type == 'retweeted':
rt_target = ref.id
for incl_tweet in resp.includes['tweets']:
if incl_tweet.id == ref.id:
rt_author_id = incl_tweet.author_id
return TalentTweet(
tweet_id=tweet.id,
author_id=tweet.author_id,
date_time=tweet.created_at,
2022-09-27 22:04:26 -07:00
mrq=mrq,
rt_target=rt_target,
rt_author_id=rt_author_id
)
2022-09-27 22:04:26 -07:00
@staticmethod
async def create_from_id(id):
2022-10-01 13:33:20 -07:00
resp = await twapi.TwAPI.instance.get_tweet_response(id)
2022-09-27 22:04:26 -07:00
return TalentTweet.create_from_v2api_response(resp)
def __init__(self, tweet_id: int, author_id: int, date_time: datetime.datetime, mrq: tuple, rt_target: int=None, rt_author_id: int=None):
self.tweet_id, self.author_id = tweet_id, author_id
self.date_time = date_time
2022-09-26 14:44:46 -07:00
self.mentions = tuple(int(x) for x in mrq[0])
self.reply_to = int(mrq[1]) if mrq[1] is not None else None
self.quote_retweeted = int(mrq[2]) if mrq[2] is not None else None
2022-09-27 22:04:26 -07:00
self.rt_target, self.rt_author_id = rt_target, rt_author_id
# all users involved, except for the author
self.all_parties = {self.reply_to, self.quote_retweeted}
self.all_parties.update(self.mentions)
try:
self.all_parties.remove(None)
2022-10-01 13:33:20 -07:00
except: pass
try:
self.all_parties.remove(self.author_id)
2022-10-01 13:33:20 -07:00
except: pass
2022-09-24 17:56:58 -07:00
def __repr__(self) -> str:
return (
2022-09-27 02:49:03 -07:00
f'{self.tweet_id} from {util.get_username_local(self.author_id)}):\n'
2022-09-24 17:56:58 -07:00
f'{self.get_datetime_str()}\n'
f'{self.get_all_parties_usernames()}\n'
f'mentions: {self.mentions}\n'
f'reply_to: {self.reply_to}\n'
f'quote_retweeted: {self.quote_retweeted}\n'
2022-09-24 17:56:58 -07:00
f'Cross-company: {self.is_cross_company()}\n'
f'{self.serialize()}\n'
2022-09-24 17:56:58 -07:00
f'======================================================'
)
# Serialized one-liner format:
# {tweet} {author} {time in seconds since epoch} m {mention_set} r {reply_to_author} q {quote_retweet_author}
def serialize(self):
s = f'{self.tweet_id} {self.author_id} {self.date_time.timestamp()} '
if len(self.mentions) > 0:
s += 'm '
for id in self.mentions:
s += f'{id} '
if self.reply_to:
s += f'r {self.reply_to} '
if self.quote_retweeted:
s += f'q {self.quote_retweeted} '
return s[:-1]
2022-09-24 17:56:58 -07:00
def is_cross_company(self):
for other_id in self.all_parties:
2022-09-25 04:24:04 -07:00
if self.author_id in talent_lists.holo_en:
if other_id in talent_lists.niji_en or other_id in talent_lists.niji_exid:
return True
if self.author_id in talent_lists.niji_en:
2022-09-25 04:24:04 -07:00
if other_id in talent_lists.holo_en or other_id in talent_lists.holo_id:
2022-09-24 17:56:58 -07:00
return True
2022-09-25 04:24:04 -07:00
if self.author_id in talent_lists.holo_id:
if other_id in talent_lists.niji_en:
2022-09-24 17:56:58 -07:00
return True
2022-09-25 04:24:04 -07:00
if self.author_id in talent_lists.niji_exid:
if other_id in talent_lists.holo_en:
return True
2022-09-24 17:56:58 -07:00
return False
def get_all_parties_usernames(self):
if len(self.all_parties) > 0:
2022-09-24 17:56:58 -07:00
s = str()
for id in self.all_parties:
2022-09-27 02:49:03 -07:00
s += f'{util.get_username_local(id)}, '
2022-09-24 17:56:58 -07:00
return s[0:-2]
return 'none'
def get_datetime_str(self):
unpad = '#' if platform.system() == 'Windows' else '-'
return self.date_time.strftime(f'%b %{unpad}d %Y, %{unpad}I:%M%p (%Z)')