create the scraper class, reorganizing

This commit is contained in:
muskit
2023-08-14 22:39:47 -07:00
parent 8c7ba26300
commit 45ac1af682
8 changed files with 149 additions and 153 deletions
+1
View File
@@ -2,5 +2,6 @@ python-dotenv
nest-asyncio
pytz
tweety-ns
tweepy
tweet-capture
opencv-python
-72
View File
@@ -1,72 +0,0 @@
from time import sleep
from datetime import datetime, timedelta
from dotenv import dotenv_values
import pytz
from tweety import Twitter
from tweety.types import *
creds = dotenv_values()
app = Twitter("session")
app.sign_in(creds["username"], creds["password"])
def url(t: Tweet):
return f'https://twitter.com/{t.author.username}/status/{t.id}'
def print_tweets(tweets: list):
print(f'{len(tweets)} tweets:')
for t in tweets:
if isinstance(t, Tweet):
print(f'{t.date} : {url(t)} : RT? {t.is_retweet}')
elif isinstance(t, TweetThread):
print('-----------TTd----------')
print_tweets(t.tweets)
print('-----------end----------')
def get_tweets_from_user(uid: int | str, since: datetime = None) -> list:
reached_backdate = False
tweets: [Tweet] = []
if since == None:
since = datetime.utcnow().replace(tzinfo=pytz.utc) - timedelta(days=7)
print(f'Grabbing tweets since 7 days ago ({since.date()})')
if isinstance(uid, str):
name = uid
uid = app._get_user_id(uid)
print(f"{name} = {uid}")
def add_tweet(tweet: Tweet):
nonlocal reached_backdate
try:
if tweet.is_retweet or tweet.author.id == uid:
tweets.append(tweet)
if not reached_backdate and tweet.date <= since:
print("reached backdate")
reached_backdate = True
except AttributeError:
print("skipping malformed tweet: {tweet}")
return
uts = app.get_tweets(uid, replies=True)
while not reached_backdate:
cur_page = uts.tweets
print(f'obtained {len(cur_page)} tweets')
if len(cur_page) == 0: break
for e in cur_page:
if isinstance(e, Tweet):
add_tweet(e)
elif isinstance(e, TweetThread):
for t in e.tweets:
add_tweet(t)
uts.get_next_page()
tweets.sort(key=lambda t: t.id)
return tweets
tweets = get_tweets_from_user("ninakosaka", since=datetime(2023, 7, 1))
print_tweets(tweets)
+99
View File
@@ -0,0 +1,99 @@
from os.path import exists
from time import sleep
from datetime import datetime, timedelta
from dotenv import dotenv_values
import pytz
from tweety import Twitter
from tweety.types import *
from tweety_utils import *
from talenttweet import *
from talent_lists import is_niji, is_holo
class Scraper:
def __init__(self):
creds = dotenv_values()
self.app = Twitter("session")
if exists("session.json"):
self.app.connect()
else:
self.app.sign_in(creds["scraper_username"], creds["scraper_password"])
# since MUST BE TIMEZONE AWARE
# usage example: since=datetime(2023, 8, 1).replace(tzinfo=pytz.utc)
def get_tweets_from_user(self, uid: int | str, since: datetime = None) -> list:
reached_backdate = False
tweets: list[Tweet] = []
if since == None:
since = datetime.utcnow().replace(tzinfo=pytz.utc) - timedelta(days=7)
print(f'Grabbing tweets since 7 days ago ({since.date()})')
if isinstance(uid, str):
name = uid
uid = self.app._get_user_id(uid)
print(f"{name} = {uid}")
def add_tweet(tweet: Tweet):
nonlocal reached_backdate
try:
tweet.author
tweets.append(tweet)
if not reached_backdate and tweet.date <= since:
print("reached backdate")
reached_backdate = True
except AttributeError:
print("skipping malformed tweet: {tweet}")
return
uts = self.app.get_tweets(uid, replies=True)
while not reached_backdate:
cur_page = uts.tweets
print(f'obtained {len(cur_page)} tweets')
if len(cur_page) == 0: break
for e in cur_page:
if isinstance(e, Tweet):
add_tweet(e)
elif isinstance(e, TweetThread):
# FIXME: rework when replied_to is fixed (currently only user_mentions works)
t = e[-1] # latest tweet in thread = og author's reply
t.replied_to = e[-2]
add_tweet(t)
print(f"adding thread latest: {t.id}")
uts = self.app.get_tweets(uid, replies=True, cursor=uts.cursor)
tweets.sort(key=lambda t: t.id)
return tweets
def get_cross_ttweets_from_user(self, uid: int | str, since: datetime = None):
tweets = self.get_tweets_from_user(uid, since)
ret: [TalentTweet] = []
for t in tweets:
is_niji = is_niji(int(t.author.id))
is_cross = False
# cross-rt?
# rt mentions cross-company?
# cross-qrt?
# cross-reply?
if t.replied_to is not None:
if is_niji == is_holo(int(t.replied_to.author.id)):
is_cross = True
# cross-mention? in-thread?
for u in t.user_mentions:
if is_niji == is_holo(int(u.id)):
is_cross = True
if __name__ == '__main__':
app = Scraper()
tweets = app.get_tweets_from_user("pomurainpuff")
print_tweets(tweets)
+3
View File
@@ -47,6 +47,9 @@ def is_niji(id: int) -> bool:
def is_holo(id: int) -> bool:
return id in holo_en or id in holo_id
def is_cross_company(id1: int, id2: int):
return is_niji(id1) == is_holo(id2)
# For filtered stream
# DEPRECATED: thx elon
def get_twitter_rules():
+25 -36
View File
@@ -1,14 +1,33 @@
import datetime
from datetime import datetime
from zoneinfo import ZoneInfo
import platform
import pytz
import twapi
import talent_lists
import util
class TalentTweet:
# Serialized one-liner format:
# {tweet} {author} {time in seconds since epoch} m {mention set} r {reply to author} q {quote tweet author} rt {retweeted tweet's id}
def serialize(self):
s = f'{self.tweet_id} {self.author_id} {self.date_time.timestamp()} '
if None not in [self.rt_target, self.rt_author_id]:
s += f'rt {self.rt_target} {self.rt_author_id}'
return s[:-1] # stop here since retweets can't have other info
if len(self.mentions) > 0:
s += 'm '
for id in self.mentions:
s += f'{id} '
if self.reply_to:
s += f'r {self.reply_to} '
if self.quote_retweeted:
s += f'q {self.quote_retweeted} '
return s[:-1]
@staticmethod
def deserialize(serialized_str: str):
tokens = serialized_str.split()
@@ -16,7 +35,7 @@ class TalentTweet:
raise ValueError('not enough tokens to reconstruct a TalentTweet')
tweet_id, author_id = int(tokens[0]), int(tokens[1])
date_time = datetime.datetime.fromtimestamp(float(tokens[2]), tz=pytz.utc)
date_time = datetime.fromtimestamp(float(tokens[2]), tz=pytz.utc)
mentions = set()
reply_to = None
@@ -43,27 +62,7 @@ class TalentTweet:
date_time=date_time, mrq=(mentions, reply_to, quote_retweeted)
)
# Serialized one-liner format:
# {tweet} {author} {time in seconds since epoch} m {mention_set} r {reply_to_author} q {quote_retweet_author}
def serialize(self):
s = f'{self.tweet_id} {self.author_id} {self.date_time.timestamp()} '
if None not in [self.rt_target, self.rt_author_id]:
s += f'rt {self.rt_target} {self.rt_author_id}'
return s[:-1] # stop here since retweets can't have other info
if len(self.mentions) > 0:
s += 'm '
for id in self.mentions:
s += f'{id} '
if self.reply_to:
s += f'r {self.reply_to} '
if self.quote_retweeted:
s += f'q {self.quote_retweeted} '
return s[:-1]
def __init__(self, tweet_id: int, author_id: int, date_time: datetime.datetime, mrq: tuple, rt_target: int=None, rt_author_id: int=None):
def __init__(self, tweet_id: int, author_id: int, date_time: datetime, mrq: tuple, rt_target: int=None, rt_author_id: int=None):
self.tweet_id, self.author_id = tweet_id, author_id
self.date_time = date_time
self.mentions = tuple(int(x) for x in mrq[0])
@@ -97,18 +96,8 @@ class TalentTweet:
def is_cross_company(self):
for other_id in self.all_parties:
if self.author_id in talent_lists.holo_en:
if other_id in talent_lists.niji_en or other_id in talent_lists.niji_exid:
return True
if self.author_id in talent_lists.niji_en:
if other_id in talent_lists.holo_en or other_id in talent_lists.holo_id:
return True
if self.author_id in talent_lists.holo_id:
if other_id in talent_lists.niji_en or other_id in talent_lists.niji_exid:
return True
if self.author_id in talent_lists.niji_exid:
if other_id in talent_lists.holo_en or other_id in talent_lists.holo_id:
return True
if talent_lists.is_cross_company(self.author_id, other_id):
return True
return False
def get_all_parties_usernames(self):
+1 -44
View File
@@ -124,35 +124,7 @@ class TwAPI:
# else:
# print('Saul Gone')
def get_all_tweet_ids_from_user(self, user_id):
next_page_token = None
tokens_retrieved = 0
tweets_retrieved = 0
tweets = list()
while True:
print(f'Retrieved {tokens_retrieved} tokens so far...')
resp = self.client.get_users_tweets(
user_id, max_results=100, pagination_token=next_page_token,
media_fields=TwAPI.TWEET_MEDIA_FIELDS,
tweet_fields=TwAPI.TWEET_FIELDS,
expansions=TwAPI.TWEET_EXPANSIONS
)
for tweet in resp.data:
tweets.append(tweet)
# update counters and pagination token
tweets_retrieved += resp.meta['result_count']
try:
next_page_token = resp.meta['next_token']
tokens_retrieved += 1
except KeyError:
print("next_token wasn't provided; we've reached the end!")
break # reached end of user's tweets
print(f'Retrieved {tweets_retrieved} tweets using {tokens_retrieved} tokens.')
return tweets
# DEPRECATED: thx elon
async def get_tweet_response(self, id, attempt = 0):
try:
twt = TwAPI.instance.client.get_tweet(
@@ -275,18 +247,3 @@ class TwAPI:
else:
raise e
return True
def post_ttweet_by_id(self, tweet_id, is_catchup=False, dry_run=False):
ttweet = asyncio.run(tt.TalentTweet.create_from_id(tweet_id))
print(f'm({ttweet.mentions}), r({ttweet.reply_to}), q({ttweet.quote_retweeted})')
if ttweet.is_cross_company():
print(f'Tweet {ttweet.tweet_id} is cross-company! Creating post...')
asyncio.run(self.post_ttweet(ttweet, is_catchup=is_catchup, dry_run=dry_run))
ttq.TalentTweetQueue.instance.add_finished_tweet(ttweet.tweet_id)
else:
print(f'Tweet {tweet_id} is not cross-company.')
+19
View File
@@ -0,0 +1,19 @@
from tweety.types import *
def url(t: Tweet):
return f'https://twitter.com/{t.author.username}/status/{t.id}'
def print_tweets(tweets: list[Tweet | TweetThread]):
print(f'{len(tweets)} tweets:')
for t in tweets:
if isinstance(t, Tweet):
print(f'{t.date} : {url(t)} : RT? {t.is_retweet} ', end=' ')
if t.replied_to is not None:
print(f'reply to {t.replied_to.author.username}', end=' ')
print("m=" + ",".join([x.username for x in t.user_mentions]))
elif isinstance(t, TweetThread):
print('-----------TTd----------')
print_tweets(t.tweets)
print('-----------end----------')
+1 -1
View File
@@ -8,7 +8,7 @@ from datetime import datetime
import tweepy
import pytz
import twint
import twapi
#import twapi
from tweetcapture import TweetCapture
from recrop import fix_aspect_ratio