TalentTweet work, cleanups
- added other RT/QRT considerations - announce tweet text moved to TalentTweet - some general clean up
This commit is contained in:
@@ -0,0 +1,16 @@
|
||||
possible combinations which involve a "target cross-tweeter" B
|
||||
|
||||
A retweets B
|
||||
- B's tweet may have cross-mentions (B1, B2, etc.)
|
||||
- rt_author_id=B; rt_mentions=B1,B2,...
|
||||
A retweets tweet mentioning B
|
||||
- rt_author_id=...; rt_mentions=B...
|
||||
|
||||
A quotes a tweet from B
|
||||
- B's tweet may have cross-mentions (B1, B2, etc.)
|
||||
- quote_retweeted=B; rt_mentions=B1,B2,...
|
||||
A quotes a tweet mentioning B
|
||||
- quote_retweeted=...; rt_mentions=B...
|
||||
|
||||
-- NO --
|
||||
A retweets a tweet that quotes a tweet mentioning B?
|
||||
+97
-21
@@ -10,12 +10,19 @@ import util
|
||||
|
||||
class TalentTweet:
|
||||
# Serialized one-liner format:
|
||||
# {tweet} {author} {time in seconds since epoch} m {mention set} r {reply to author} q {quote tweet author} rt {retweeted tweet's id}
|
||||
# {tweet} {author} {time in seconds since epoch UTC} m {mention set} r {reply to author} q {quote tweet author} rt {retweeted user's id} rtm {mentions in retweet}
|
||||
def serialize(self):
|
||||
s = f'{self.tweet_id} {self.author_id} {self.date_time.timestamp()} '
|
||||
s = f'{self.tweet_id} {self.author_id} {int(self.date_time.timestamp())} '
|
||||
if self.date_time.tzinfo is None:
|
||||
print(f'warning: serialized tweet {self.tweet_id} has a NAIVE timestamp!')
|
||||
|
||||
if len(self.rt_mentions) > 0:
|
||||
s += 'rtm '
|
||||
for n in self.rt_mentions:
|
||||
s += f'{n} '
|
||||
|
||||
if self.rt_author_id != None:
|
||||
s += f'rt {self.rt_id} {self.rt_author_id}'
|
||||
s += f'rt {self.rt_author_id} '
|
||||
return s[:-1] # stop here since retweets can't have other info
|
||||
|
||||
if len(self.mentions) > 0:
|
||||
@@ -24,23 +31,25 @@ class TalentTweet:
|
||||
s += f'{id} '
|
||||
if self.reply_to:
|
||||
s += f'r {self.reply_to} '
|
||||
if self.quote_retweeted:
|
||||
s += f'q {self.quote_retweeted} '
|
||||
if self.quote_tweeted:
|
||||
s += f'q {self.quote_tweeted} '
|
||||
|
||||
return s[:-1]
|
||||
|
||||
@staticmethod
|
||||
def deserialize(serialized_str: str):
|
||||
tokens = serialized_str.split()
|
||||
tokens = serialized_str.split('#')[0]
|
||||
if len(tokens) < 3:
|
||||
raise ValueError('not enough tokens to reconstruct a TalentTweet')
|
||||
|
||||
tweet_id, author_id = int(tokens[0]), int(tokens[1])
|
||||
date_time = datetime.fromtimestamp(float(tokens[2]), tz=pytz.utc)
|
||||
|
||||
mentions = set()
|
||||
mentions = list()
|
||||
reply_to = None
|
||||
quote_retweeted = None
|
||||
rt = None
|
||||
rtm = list()
|
||||
|
||||
mode = ''
|
||||
for i in range(3, len(tokens)):
|
||||
@@ -57,15 +66,27 @@ class TalentTweet:
|
||||
continue
|
||||
if mode == 'q': # quote_retweeted
|
||||
quote_retweeted = int(tokens[i])
|
||||
if mode == 'rt': # retweeted user
|
||||
rt = int(tokens[i])
|
||||
if mode == 'rtm': # retweet/qrt mentions
|
||||
rtm = int(tokens[i])
|
||||
|
||||
return TalentTweet(
|
||||
tweet_id=tweet_id, author_id=author_id,
|
||||
date_time=date_time, mrq=(mentions, reply_to, quote_retweeted)
|
||||
date_time=date_time, mrq=(mentions, reply_to, quote_retweeted),
|
||||
rt_author_id=rt, rt_mentions=rtm
|
||||
)
|
||||
|
||||
## Creates a TalentTweet from a Tweety-library Tweet.
|
||||
@staticmethod
|
||||
def create_from_tweety(tweety: Tweet):
|
||||
if tweety.is_retweet:
|
||||
rtm = [int(x.id) for x in tweety.retweeted_tweet.user_mentions]
|
||||
elif tweety.is_quoted:
|
||||
rtm = [int(x.id) for x in tweety.quoted_tweet.user_mentions]
|
||||
else:
|
||||
rtm = list()
|
||||
|
||||
return TalentTweet(
|
||||
tweet_id=int(tweety.id), author_id=int(tweety.author.id),
|
||||
date_time=tweety.date, text=tweety.text,
|
||||
@@ -75,7 +96,7 @@ class TalentTweet:
|
||||
int(tweety.quoted_tweet.author.id) if tweety.quoted_tweet is not None else None
|
||||
),
|
||||
rt_author_id=tweety.retweeted_tweet.author.id if tweety.is_retweet else None,
|
||||
rt_mentions=[int(x.id) for x in tweety.retweeted_tweet.user_mentions] if tweety.is_retweet else list()
|
||||
rt_mentions=rtm
|
||||
)
|
||||
|
||||
def __init__(self, tweet_id: int, author_id: int, date_time: datetime, text: str = None, mrq: tuple[list[int], int|None, int|None]=None, rt_author_id: int=None, rt_mentions: list[int]=None):
|
||||
@@ -88,32 +109,37 @@ class TalentTweet:
|
||||
# filter twitter users to only be cross-company
|
||||
self.mentions = {x for x in mrq[0] if is_cross_company(author_id, x)}
|
||||
self.reply_to = mrq[1] if mrq[1] is not None and is_cross_company(author_id, mrq[1]) else None
|
||||
self.quote_retweeted = mrq[2] if mrq[2] is not None and is_cross_company(author_id, mrq[2]) else None
|
||||
self.rt_mentions = {x for x in rt_mentions if is_cross_company(author_id, x)} if rt_mentions is not None else None
|
||||
self.rt_author_id = rt_author_id if (rt_author_id is not None and is_cross_company(author_id, rt_author_id)) or (len(self.rt_mentions) > 0) else None
|
||||
self.quote_tweeted = mrq[2]
|
||||
|
||||
# all users involved, except for the author
|
||||
self.all_parties = {self.reply_to, self.quote_retweeted}
|
||||
self.all_parties.update(self.mentions)
|
||||
try:
|
||||
self.all_parties.remove(None)
|
||||
# rt'd/quoted tweet contains cross-company names?
|
||||
self.rt_mentions = {x for x in rt_mentions if is_cross_company(author_id, x)}
|
||||
self.rt_author_id = rt_author_id
|
||||
|
||||
# all users involved except for the author
|
||||
self.all_parties = {self.reply_to, self.quote_tweeted, rt_author_id}
|
||||
self.all_parties.update(self.mentions, self.rt_mentions)
|
||||
try: self.all_parties.remove(None)
|
||||
except: pass
|
||||
try:
|
||||
self.all_parties.remove(self.author_id)
|
||||
try: self.all_parties.remove(self.author_id)
|
||||
except: pass
|
||||
|
||||
# clean up mentions
|
||||
try: self.mentions.remove(self.reply_to)
|
||||
except: pass
|
||||
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f'======================================================'
|
||||
f'======================================================\n'
|
||||
f'{self.tweet_id} from {self.username}:\n'
|
||||
f'{self.get_datetime_str()}\n'
|
||||
f'parties: {self.get_all_parties_usernames()}\n'
|
||||
f'mentions: {self.mentions}\n'
|
||||
f'reply_to: {self.reply_to}\n'
|
||||
f'quote_retweeted: {self.quote_retweeted}\n'
|
||||
f'quote_retweeted: {self.quote_tweeted}\n'
|
||||
f'cross-company? {self.is_cross_company()}\n'
|
||||
f'{self.serialize()}\n'
|
||||
f'----\n{self.announce_text()}\n----\n'
|
||||
f'{self.url()}'
|
||||
)
|
||||
|
||||
@@ -138,3 +164,53 @@ class TalentTweet:
|
||||
def get_datetime_str(self):
|
||||
unpad = '#' if platform.system() == 'Windows' else '-'
|
||||
return self.date_time.strftime(f'%b %{unpad}d %Y, %{unpad}I:%M%p (%Z)')
|
||||
|
||||
def announce_text(self, is_catchup=False):
|
||||
# templates
|
||||
REPLY = '{0} replied to {1}!'
|
||||
TWEET = '{0} tweeted!'
|
||||
RETWEET = '{0} retweeted {1}!'
|
||||
RETWEET_MENTIONS_B = '{0} shared a tweet mentioning{1}!'
|
||||
QUOTE_TWEET = '{0} quote tweeted {1}!'
|
||||
QUOTE_TWEET_MENTIONS_B = '{0} quoted a tweet mentioning {1}!'
|
||||
|
||||
author_username = f'@/{util.get_username_with_company(self.author_id)}'
|
||||
ret = str()
|
||||
print_mention_ids = set(self.mentions)
|
||||
if is_catchup:
|
||||
ret += f'{self.get_datetime_str()}\n'
|
||||
pass
|
||||
|
||||
rt_mention_names = [util.get_username_with_company(x) for x in self.rt_mentions]
|
||||
# Tweet types
|
||||
if self.rt_author_id is not None: # retweet
|
||||
if len(self.rt_mentions) > 0:
|
||||
ret += RETWEET_MENTIONS_B.format(author_username, ", ".join(rt_mention_names))
|
||||
else:
|
||||
ret += RETWEET.format(f'{author_username}', f'@/{util.get_username_with_company(self.rt_author_id)}')
|
||||
elif self.reply_to is not None: # reply
|
||||
reply_username = f'@/{util.get_username_with_company(self.reply_to)}'
|
||||
ret += REPLY.format(author_username, reply_username)
|
||||
elif self.quote_tweeted is not None: # qrt
|
||||
quoted_username = f'@/{util.get_username_with_company(self.quote_tweeted)}'
|
||||
if len(self.rt_mentions) > 0:
|
||||
ret += QUOTE_TWEET_MENTIONS_B.format(author_username, ", ".join(rt_mention_names))
|
||||
else:
|
||||
ret += QUOTE_TWEET.format(author_username, quoted_username)
|
||||
elif len(self.mentions) > 0: # standalone tweet
|
||||
ret += TWEET.format(author_username)
|
||||
else:
|
||||
raise ValueError(f'TalentTweet {self.tweet_id} has insufficient other parties')
|
||||
|
||||
try: print_mention_ids.remove(None)
|
||||
except: pass
|
||||
|
||||
# mention line
|
||||
if len(print_mention_ids) > 0:
|
||||
mention_usernames = [f'@/{util.get_username_with_company(x)}' for x in print_mention_ids]
|
||||
ret += (
|
||||
'\nMentioning '
|
||||
f'{", ".join(mention_usernames)}'
|
||||
)
|
||||
|
||||
return ret
|
||||
|
||||
+3
-63
@@ -124,23 +124,6 @@ class TwAPI:
|
||||
# else:
|
||||
# print('Saul Gone')
|
||||
|
||||
# DEPRECATED: thx elon
|
||||
async def get_tweet_response(self, id, attempt = 0):
|
||||
try:
|
||||
twt = TwAPI.instance.client.get_tweet(
|
||||
id,
|
||||
media_fields=TwAPI.TWEET_MEDIA_FIELDS,
|
||||
tweet_fields=TwAPI.TWEET_FIELDS,
|
||||
expansions=TwAPI.TWEET_EXPANSIONS
|
||||
)
|
||||
TwAPI.tweets_fetched += 1
|
||||
return twt
|
||||
except tweepy.TooManyRequests as e:
|
||||
wait_for = float(e.response.headers["x-rate-limit-reset"]) - datetime.datetime.now().timestamp() + 1
|
||||
print(f'[{attempt}]\tget_tweet_response({id}):\n\thit rate limit after {TwAPI.tweets_fetched} fetches -- trying again in {wait_for} seconds...')
|
||||
await asyncio.sleep(wait_for)
|
||||
return await self.get_tweet_response(id, attempt=attempt+1)
|
||||
|
||||
async def post_tweet(self, text='', media_ids: list=None, reply_to_tweet: int=None, quote_tweet_id: int=None):
|
||||
try:
|
||||
tweet = self.client.create_tweet(text=text, media_ids=media_ids, in_reply_to_tweet_id=reply_to_tweet, quote_tweet_id=quote_tweet_id)
|
||||
@@ -160,55 +143,12 @@ class TwAPI:
|
||||
# return False = did not post ttweet (duplicate)
|
||||
async def post_ttweet(self, ttweet: tt.TalentTweet, is_catchup=False, dry_run=False):
|
||||
print(f'------{ttweet.tweet_id} ({util.get_username_local(ttweet.author_id)})------')
|
||||
|
||||
REPLY = '{0} replied to {1}!\n'
|
||||
QUOTE_TWEET = '{0} quote tweeted {1}!\n'
|
||||
TWEET = '{0} tweeted!\n'
|
||||
RETWEET = '{0} retweeted {1}!\n'
|
||||
|
||||
def create_text():
|
||||
author_username = f'@/{util.get_username_with_company(ttweet.author_id)}'
|
||||
print_mention_ids = set(ttweet.mentions)
|
||||
ret = str()
|
||||
if is_catchup:
|
||||
ret += f'{ttweet.get_datetime_str()}\n'
|
||||
pass
|
||||
|
||||
# Tweet types
|
||||
if ttweet.rt_id is not None: # retweet
|
||||
ret += RETWEET.format(f'{author_username}', f'@/{util.get_username_with_company(ttweet.rt_author_id)}')
|
||||
elif ttweet.reply_to is not None: # reply
|
||||
reply_username = f'@/{util.get_username_with_company(ttweet.reply_to)}'
|
||||
ret += REPLY.format(author_username, reply_username)
|
||||
# if qrt, push id into mentions
|
||||
print_mention_ids.add(ttweet.quote_retweeted)
|
||||
elif ttweet.quote_retweeted is not None: # qrt
|
||||
quoted_username = f'@/{util.get_username_with_company(ttweet.quote_retweeted)}'
|
||||
ret += QUOTE_TWEET.format(author_username, quoted_username)
|
||||
elif len(ttweet.mentions) > 0: # standalone tweet
|
||||
ret += TWEET.format(author_username)
|
||||
else:
|
||||
raise ValueError(f'TalentTweet {ttweet.tweet_id} has insufficient other parties')
|
||||
|
||||
try: print_mention_ids.remove(None)
|
||||
except: pass
|
||||
|
||||
# mention line
|
||||
if len(print_mention_ids) > 0:
|
||||
mention_usernames = [f'@/{util.get_username_with_company(x)}' for x in print_mention_ids]
|
||||
ret += (
|
||||
'mentioning '
|
||||
f'{", ".join(mention_usernames)}\n'
|
||||
)
|
||||
ret += '\n'
|
||||
# ret += '(this is a missed tweet)\n' if is_catchup else ''
|
||||
return ret
|
||||
|
||||
text = create_text()
|
||||
ttweet_url = util.ttweet_to_url(ttweet)
|
||||
text = ttweet.announce_text()
|
||||
ttweet_url = ttweet.url()
|
||||
|
||||
if dry_run: print('-------------------- DRY RUN --------------------')
|
||||
print(text)
|
||||
print(ttweet)
|
||||
if dry_run: return False
|
||||
|
||||
# NO DRY-RUN: actually post tweet
|
||||
|
||||
+13
-32
@@ -45,7 +45,7 @@ def get_current_timestamp():
|
||||
def get_current_date():
|
||||
return datetime.today().strftime('%Y-%m-%d')
|
||||
|
||||
def get_key_from_value(d, val):
|
||||
def get_key_from_value(d: dict, val):
|
||||
keys = [k for k, v in d.items() if v == val]
|
||||
if keys:
|
||||
return keys[0]
|
||||
@@ -101,21 +101,6 @@ def ttweet_to_url(ttweet):
|
||||
# except:
|
||||
# return str(default) if default is not None else f'{id}'
|
||||
|
||||
def get_username_local(id: int):
|
||||
return talent_lists.talents.get(id, f'{id}')
|
||||
|
||||
# Retrieve username via API v2 (tweepy)
|
||||
# def get_username_online(id, default=None):
|
||||
# try:
|
||||
# resp = twapi.TwAPI.instance.client.get_user(id=id)
|
||||
# return resp.data.username
|
||||
# except tweepy.TooManyRequests:
|
||||
# return str(default) if default is not None else f'id:{id}'
|
||||
# except:
|
||||
# print(f'Unhandled error retrieving username for {id}!')
|
||||
# traceback.print_exc()
|
||||
# return str(default) if default is not None else f'id:{id}'
|
||||
|
||||
## Attempt to pull username from local; pull from online if doesn't exist.
|
||||
def get_username(id):
|
||||
ret = talent_lists.talents.get(id, None)
|
||||
@@ -127,21 +112,17 @@ def get_username_with_company(id):
|
||||
company = talent_lists.talents_company.get(id, None)
|
||||
return f'{get_username(id)} {f"({company})" if company is not None else ""}'
|
||||
|
||||
def get_user_id_local(username) -> int:
|
||||
talent_usernames = list(talent_lists.talents.values())
|
||||
for i in range(0, len(talent_usernames)):
|
||||
if username.lower() == talent_usernames[i].lower():
|
||||
return list(talent_lists.talents)[i]
|
||||
|
||||
def get_user_id_online(username) -> int:
|
||||
c = twint.Config()
|
||||
c.Username = username
|
||||
c.Store_object = True
|
||||
c.Hide_output = True
|
||||
def get_username_local(id: int):
|
||||
return talent_lists.talents.get(id, f'{id}')
|
||||
|
||||
# Retrieve username via API v2 (tweepy)
|
||||
def get_username_online(id, default=None):
|
||||
try:
|
||||
twint.output.users_list.clear()
|
||||
twint.run.Lookup(c)
|
||||
user = twint.output.users_list[0]
|
||||
return user.id
|
||||
resp = twapi.TwAPI.instance.client.get_user(id=id)
|
||||
return resp.data.username
|
||||
except tweepy.TooManyRequests:
|
||||
return str(default) if default is not None else f'id:{id}'
|
||||
except:
|
||||
return -1
|
||||
print(f'Unhandled error retrieving username for {id}!')
|
||||
traceback.print_exc()
|
||||
return str(default) if default is not None else f'id:{id}'
|
||||
Reference in New Issue
Block a user