a couple libraries and even MORE restructuring

This commit is contained in:
muskit
2022-09-25 03:39:15 -07:00
committed by msk
parent 579929559f
commit 25025de06b
7 changed files with 235 additions and 113 deletions
+3 -1
View File
@@ -1,3 +1,5 @@
tweepy
nest-asyncio
pytz
tweet-capture
tweepy
git+https://github.com/muskit/twint_2022_fix.git
+5 -5
View File
@@ -16,8 +16,8 @@
# note: api_key/secret = consumer_key/secret
[Credentials]
api_key=x
api_secret=y
bearer_token=z
oauth1_access_token=a
oauth1_access_secret=b
api_key=xxx
api_secret=yyy
bearer_token=zzz
oauth1_access_token=x
oauth1_access_secret=y
+38 -56
View File
@@ -1,7 +1,8 @@
from lib2to3.pgen2 import token
import asyncio
from math import inf
from urllib import response
import tweepy
from tweetcapture import TweetCapture
import api_secrets
import talenttweet as tt
@@ -13,33 +14,44 @@ class TwAPI:
TWEET_FIELDS = ['created_at', 'in_reply_to_user_id']
TWEET_EXPANSIONS = ['entities.mentions.username', 'referenced_tweets.id.author_id']
# Returns a set of involved parties for a single tweet.
# Returns a tuple of user IDs:(reply_to, qrt, {mentions})
# for a single tweet.
#
# Tweet must have been queried with these parameters:
# media_fields=['url'],
# tweet_fields=['created_at', 'in_reply_to_user_id'],
# expansions=['entities.mentions.username', 'referenced_tweets.id.author_id']
@staticmethod
def get_involved_parties(tweet, response):
involved_parties = set()
def get_mrq(tweet: tweepy.Tweet, response):
mentions = set()
reply_to = None
qrt = None
# mentions
try:
mention_list = tweet.entities['mentions']
for mention in mention_list:
involved_parties.add(int(mention['id']))
except: pass
mentions.add(int(mention['id']))
except:
pass
# reply-to
if tweet.in_reply_to_user_id != None:
involved_parties.add(tweet.in_reply_to_user_id)
reply_to = tweet.in_reply_to_user_id
# qrt
if tweet.attachments:
for ref_tweet in tweet.attachments:
if tweet.referenced_tweets:
for ref_tweet in tweet.referenced_tweets:
if ref_tweet.type == 'quoted':
for incl_tweet in response.includes['tweets']:
if incl_tweet.id == ref_tweet.id:
involved_parties.add(incl_tweet.author_id)
qrt = incl_tweet.author_id
try:
mentions.remove(reply_to)
mentions.remove(qrt)
except: pass
return (mentions, reply_to, qrt)
return involved_parties
def __init__(self):
TwAPI.instance = self
@@ -49,49 +61,19 @@ class TwAPI:
access_token=api_secrets.access_token(), access_token_secret=api_secrets.access_secret()
)
# Returns a list of TalentTweets from a user.
def get_users_all_tweets_mentions(self, id: int, count=inf):
ttweets = list()
retrieve_size = util.clamp(count, 5, 100)
next_page_token = None
tokens_retrieved = 0
tweets_retrieved = 0
while tweets_retrieved < count:
print(f'Retrieved {tokens_retrieved} tokens so far...')
resp = self.client.get_users_tweets(id, max_results=retrieve_size, pagination_token=next_page_token,
media_fields=TwAPI.TWEET_MEDIA_FIELDS,
tweet_fields=TwAPI.TWEET_FIELDS,
expansions=TwAPI.TWEET_EXPANSIONS)
for tweet in resp.data:
mentions = TwAPI.get_involved_parties(tweet, resp)
ttweets.append(tt.TalentTweet(tweet=tweet, other_parties=mentions))
# update counters and pagination token
tweets_retrieved += resp.meta['result_count']
if tweets_retrieved < count:
try:
next_page_token = resp.meta['next_token']
tokens_retrieved += 1
except KeyError:
print("next_token wasn't provided; we've reached the end!")
break # reached end of user's tweets
print(f'Retrieved {tweets_retrieved} tweets using {tokens_retrieved} tokens.')
return ttweets
# Returns a list of cross-company TalentTweets from a user.
def get_users_cross_tweets_mentions(self, id):
ret = list()
ttweets = self.get_users_all_tweets_mentions(id)
for ttweet in ttweets:
if ttweet.is_cross_company():
ret.append(ttweet)
return ret
def get_tweet_response(self, id):
return TwAPI.instance.client.get_tweet(
id,
media_fields=TwAPI.TWEET_MEDIA_FIELDS,
tweet_fields=TwAPI.TWEET_FIELDS,
expansions=TwAPI.TWEET_EXPANSIONS
)
# Create a post that showcases given tweet and its mentions set.
def create_post(self, tweet, mentions):
pass
# Try do do this without retireving Tweet data.
async def create_post(self, ttweet):
img = await util.create_ttweet_image(ttweet)
+23 -5
View File
@@ -5,6 +5,7 @@
# We should post, at the fastest, one tweet per minute.
import os
import asyncio
import twint
@@ -13,6 +14,8 @@ from talent_lists import *
from api import TwAPI
import talenttweet as tt
cross_tweets_queue = dict()
## Returns list of tweets present in queue.txt
def get_local_queue():
# f = open(os.path.join(get_project_dir(), 'queue.txt'))
@@ -26,14 +29,29 @@ def get_user_tweet_ids(id, limit=None):
c.Limit = limit
c.Store_object = True
c.Store_object_tweets_list = tweets
c.Hide_output = True
twint.run.Search(c)
return [x.id for x in tweets]
def run():
async def run():
queue = get_local_queue()
tweets_ids = get_user_tweet_ids(1390620618001838086, limit=20)
for id in tweets_ids:
ttweet = tt.TalentTweet(id)
print(ttweet)
# for user_id in talents.keys():
# tweets_ids = get_user_tweet_ids(user_id, limit=20)
# for id in tweets_ids:
# ttweet = tt.TalentAPITweet(id)
# print(ttweet)
# ids = get_user_tweet_ids(1413339084076978179, limit=20)
# for id in ids:
# ttweet = tt.TalentAPITweet(tweet_id=id)
# print(ttweet)
# serialized_ttweet = '1573778069441200129 1390620618001838086 1664052905.0 m 70876713 1413326894435602434 r 1413326894435602434'
# ttweet = tt.TalentTweet.deserialize(serialized_ttweet)
# print(ttweet)
ttweet = tt.TalentAPITweet(1573563417415233536)
print(ttweet)
# await TwAPI.instance.create_post(ttweet)
+7 -3
View File
@@ -1,7 +1,10 @@
import sys
import asyncio
import argparse
from argparse import RawTextHelpFormatter
import nest_asyncio
import talent_lists
import api_secrets
import catchup
@@ -23,7 +26,7 @@ def init_argparse():
def command_line():
pass
def main():
async def main():
parser = init_argparse()
if len(sys.argv) < 2:
parser.print_help()
@@ -51,7 +54,7 @@ def main():
listen.run()
case 'c' | 'catchup':
print('RUNNING IN CATCH-UP MODE\n')
catchup.run()
await catchup.run()
case _:
command_line()
#TODO: remove message
@@ -60,4 +63,5 @@ def main():
if __name__ == "__main__":
main()
nest_asyncio.apply()
asyncio.run(main())
+112 -35
View File
@@ -1,69 +1,146 @@
from datetime import datetime
import platform
import tweepy
import pytz
from api import *
import talent_lists
class TalentTweet:
def __init__(self, tweet: tweepy.Tweet, other_parties: set):
self.tweet = tweet
self.other_parties = other_parties
@staticmethod
def deserialize(serialized_str: str):
tokens = serialized_str.split()
if len(tokens) < 3:
raise ValueError('not enough tokens to reconstruct a TalentTweet')
def __init__(self, tweet_id):
resp = TwAPI.instance.client.get_tweet(tweet_id,
media_fields=TwAPI.TWEET_MEDIA_FIELDS,
tweet_fields=TwAPI.TWEET_FIELDS,
expansions=TwAPI.TWEET_EXPANSIONS)
tweet_id, author_id = int(tokens[0]), int(tokens[1])
date_time = datetime.fromtimestamp(float(tokens[2]), tz=pytz.utc)
mentions = set()
reply_to = None
quote_retweeted = None
mode = ''
for i in range(3, len(tokens)):
if len(tokens[i]) == 1 and not tokens[i].isnumeric(): # mode switch
mode = tokens[i]
continue
if tokens[i].isnumeric():
if mode == 'm': # mentions
mentions.add(int(tokens[i]))
continue
if mode == 'r': # reply_to
reply_to = int(tokens[i])
continue
if mode == 'q': # quote_retweeted
quote_retweeted = int(tokens[i])
return TalentTweet(
tweet_id=tweet_id, author_id=author_id,
date_time=date_time, mrq=(mentions, reply_to, quote_retweeted)
)
def __init__(self, tweet_id: int, author_id: int,date_time: datetime, mrq: tuple):
self.tweet_id, self.author_id = tweet_id, author_id
self.date_time = date_time
self.mentions = mrq[0]
self.reply_to = mrq[1]
self.quote_retweeted = mrq[2]
# all users involved, except for the author
self.all_parties = {self.reply_to, self.quote_retweeted}
self.all_parties.update(self.mentions)
try:
self.all_parties.remove(None)
self.all_parties.remove(self.author_id)
except:
pass
self.tweet = resp.data
self.other_parties = TwAPI.get_involved_parties(self.tweet, resp)
def __repr__(self) -> str:
return (
f'{self.tweet.id} from {talent_lists.talents.get(self.tweet.author_id, "???")}:\n'
f'{self.tweet.text}\n'
f'------------------------------------------------------\n'
f'{self.tweet_id} from {util.get_username(self.author_id)}):\n'
f'{self.get_datetime_str()}\n'
f'{self.get_mentions_usernames()}\n'
f'{self.get_all_parties_usernames()}\n'
f'mentions: {self.mentions}\n'
f'reply_to: {self.reply_to}\n'
f'quote_retweeted: {self.quote_retweeted}\n'
f'Cross-company: {self.is_cross_company()}\n'
f'{self.serialize()}\n'
f'======================================================'
)
def is_cross_company(self):
author_id = self.tweet.author_id
mentions = self.other_parties
# Serialized one-liner format:
# {tweet} {author} {time in seconds since epoch} m {mention_set} r {reply_to_author} q {quote_retweet_author}
def serialize(self):
s = f'{self.tweet_id} {self.author_id} {self.date_time.timestamp()} '
if len(self.mentions) > 0:
s += 'm '
for id in self.mentions:
s += f'{id} '
if self.reply_to:
s += f'r {self.reply_to} '
if self.quote_retweeted:
s += f'q {self.quote_retweeted} '
return s[:-1]
def is_cross_company(self):
# TODO: update for EN/ID
for mention_id in mentions:
if author_id in talent_lists.niji_en:
if mention_id in talent_lists.holo_en:
for other_id in self.all_parties:
if self.author_id in talent_lists.niji_en:
if other_id in talent_lists.holo_en:
return True
elif author_id in talent_lists.holo_en:
if mention_id in talent_lists.niji_en:
elif self.author_id in talent_lists.holo_en:
if other_id in talent_lists.niji_en:
return True
return False
def get_mentions_usernames(self):
if len(self.other_parties) > 0:
def get_all_parties_usernames(self):
if len(self.all_parties) > 0:
s = str()
for id in self.other_parties:
s += f'{talent_lists.talents.get(id, "???")}, '
for id in self.all_parties:
s += f'{util.get_username(id)}, '
return s[0:-2]
return 'none'
def get_datetime_str(self):
unpad = '#' if platform.system() == 'Windows' else '-'
return self.tweet.created_at.strftime(f'%b %{unpad}d %Y, %{unpad}I:%M%p (%Z)')
return self.date_time.strftime(f'%b %{unpad}d %Y, %{unpad}I:%M%p (%Z)')
class TalentTweets:
def __init__(self):
self.ttweets = list()
class TalentAPITweet(TalentTweet):
def __init__(self, tweet_id=None, tweet=None, mrq: tuple=None):
if tweet and mrq:
self.tweet = tweet
elif tweet_id:
resp = TwAPI.instance.get_tweet_response(tweet_id)
self.tweet = resp.data
mrq = TwAPI.get_mrq(self.tweet, resp)
else:
raise ValueError('did not supply sufficient tweet information')
def get_ttweets(self):
pass
TalentTweet.__init__(
self,
tweet_id=self.tweet.id,
author_id=self.tweet.author_id,
date_time=self.tweet.created_at,
mrq=mrq
)
def get_ttweet_ids(self):
pass
def __repr__(self) -> str:
return (
f'{self.tweet_id} from {util.get_username(self.author_id)}:\n'
f'{self.tweet.text}\n'
f'------------------------------------------------------\n'
f'{self.get_datetime_str()}\n'
f'{self.get_all_parties_usernames()}\n'
f'mentions: {self.mentions}\n'
f'reply_to: {self.reply_to}\n'
f'quote_retweeted: {self.quote_retweeted}\n'
f'{self.serialize()}\n'
f'Cross-company: {self.is_cross_company()}\n'
f'======================================================'
)
+43 -4
View File
@@ -1,7 +1,11 @@
## Shared utility functions.
import os
import talent_lists
import twint
from tweetcapture import TweetCapture
from talent_lists import *
import talenttweet as tt
# returns system path to this project, which is
@@ -9,8 +13,43 @@ import talenttweet as tt
def get_project_dir():
return os.path.join(os.path.dirname(__file__), os.pardir)
def tweet_id_to_url(id):
return f'https://twitter.com/twitter/status/{id}'
def clamp(n, smallest, largest):
return max(smallest, min(n, largest))
async def create_ttweet_image(ttweet):
tc = TweetCapture()
filename = 'img.png'
url = ttweet_to_url(ttweet)
img = None
try: os.remove(filename)
except: pass
try:
img = await tc.screenshot(
url=url,
path=filename,
mode=4,
night_mode=1
)
except:
print('unable to create tweet image')
return None
else:
print(f'successfully saved {img}')
return img
def ttweet_to_url(ttweet):
username = get_username(ttweet.author_id)
return f'https://twitter.com/{username}/status/{ttweet.tweet_id}'
def get_username(user_id):
c = twint.Config()
c.User_id = user_id
c.Store_object = True
c.Hide_output = True
try:
twint.run.Lookup(c)
user = twint.output.users_list[0]
return user.username
except:
return talents.get(user_id, f'#{user_id}')