Files
NijiHolo_EN_ID_Bot/src/scraper.py
T

190 lines
5.7 KiB
Python
Raw Normal View History

2023-08-14 22:39:47 -07:00
from os.path import exists
from time import sleep
from datetime import datetime, timedelta
import pytz
from tweety import Twitter
from tweety.types import *
2023-08-15 17:33:29 -07:00
from tweety.exceptions_ import *
from tweety.filters import SearchFilters
2023-08-14 22:39:47 -07:00
2023-08-17 02:28:29 -07:00
from account_pool import AccountPool
2023-08-14 22:39:47 -07:00
from tweety_utils import *
from talenttweet import *
2023-08-15 17:33:29 -07:00
import talent_lists
2023-08-14 22:39:47 -07:00
class Scraper:
def __init__(self):
2023-08-17 02:28:29 -07:00
Scraper.instance = self
self.__account = AccountPool()
self.try_login()
2023-08-18 01:34:25 -07:00
def try_login(self, account_idx: int = None) -> bool:
2023-08-20 23:27:10 -07:00
# decide on which account to use
2023-08-18 01:34:25 -07:00
if account_idx is not None:
acc = self.__account.use_index(account_idx)
else:
acc = self.__account.next()
2023-08-20 23:27:10 -07:00
# attempt to login with the account
2023-08-17 02:28:29 -07:00
if acc is not None:
name = acc[0]
print(f"using {name}")
self.app = Twitter(name)
if exists(f"{name}.json"):
try:
self.app.connect()
except:
self.app.sign_in(*acc)
else:
self.app.sign_in(*acc)
return True
print('exhausted all accounts!')
return False
2023-08-18 18:20:53 -07:00
def login_wait(self, private=False):
if private:
2023-08-30 02:26:45 -07:00
print(f"keeping pvt-accessible account ({self.__account.use_index(0)[0]}). sleeping for 4 minutes...")
sleep(240)
2023-08-18 18:20:53 -07:00
print()
l = self.try_login(0)
else:
l = self.try_login()
if not l:
2023-08-30 02:26:45 -07:00
print("sleeping for 4 minutes...")
sleep(240)
2023-08-18 18:20:53 -07:00
print()
self.try_login()
# recover lost info
def fix_tweet(self, tweet: Tweet):
if tweet.is_retweet:
if tweet.retweeted_tweet is None:
# tweet.retweeted_tweet = self.app.tweet_detail(str(tweet.id)).retweeted_tweet
2023-08-27 16:30:15 -07:00
# print(f'{tweet.author.username}/{tweet.id} is missing the RT! It\'s probably nothing...')
2023-08-18 18:20:53 -07:00
tweet.is_retweet = False
elif tweet.retweeted_tweet.author is None:
2023-08-27 16:30:15 -07:00
# print(f'{tweet.author.username}/{tweet.id} is missing the RT author! Fetching RT\'d...')
2023-08-18 18:20:53 -07:00
tweet.retweeted_tweet = self.get_tweet(tweet.retweeted_tweet.id)
if tweet.is_quoted:
if tweet.quoted_tweet is None: # quoted tweet is deleted
tweet.is_quoted = False
elif tweet.quoted_tweet.author is None:
2023-08-27 16:30:15 -07:00
# print(f'{tweet.author.username}/{tweet.id} is missing the QRT author! Fetching QRT\'d...')
2023-08-18 18:20:53 -07:00
tweet.quoted_tweet = self.get_tweet(tweet.quoted_tweet.id)
if tweet.is_reply and tweet.replied_to is None:
2023-08-27 16:30:15 -07:00
# print(f'{tweet.author.username}/{tweet.id} is missing reply-to tweet! Recovering...')
2023-08-18 18:20:53 -07:00
tweet.replied_to = self.get_tweet(tweet.original_tweet['in_reply_to_status_id_str'])
return tweet
def get_tweet(self, id: int, private_user=False):
2023-08-30 02:26:45 -07:00
# print(f'{id}{" on private" if private_user else ""}')
2023-08-18 18:20:53 -07:00
if private_user:
self.try_login(0)
while True:
try:
t = self.app.tweet_detail(str(id))
return self.fix_tweet(t) if t is not None else None
2023-08-27 02:55:18 -07:00
except RateLimitReached:
print("RateLimitReached occurred")
self.login_wait(private_user)
2023-08-18 18:20:53 -07:00
except UnknownError:
print("UnknownError occurred, probably rate-limited")
2023-08-27 02:55:18 -07:00
#traceback.print_exc()
2023-08-18 18:20:53 -07:00
self.login_wait(private_user)
except Exception as e:
2023-08-27 16:30:15 -07:00
if not private_user:
print("Unhandled exception occurred, trying again as private...")
return self.get_tweet(id, True)
else:
2023-10-30 19:07:52 -07:00
print(f"Unhandled exception occurred, tweet {id} is probably unavailable")
2023-08-21 02:43:18 -07:00
print(e)
2023-08-18 18:20:53 -07:00
return None
2023-08-14 22:39:47 -07:00
# since MUST BE TIMEZONE AWARE
# usage example: since=datetime(2023, 8, 1).replace(tzinfo=pytz.utc)
2023-08-15 17:33:29 -07:00
def get_tweets_from_user(self, username: str, since: datetime = None) -> list[Tweet]:
2023-08-14 22:39:47 -07:00
reached_backdate = False
tweets: list[Tweet] = []
2023-08-15 17:33:29 -07:00
cur = None
2023-08-14 22:39:47 -07:00
if since == None:
since = datetime.utcnow().replace(tzinfo=pytz.utc) - timedelta(days=7)
2023-08-15 17:33:29 -07:00
print(f'falling back to grabbing tweets since 7 days ago ({since.date()})')
else:
print(f'grabbing tweets since {since.date()}')
2023-08-14 22:39:47 -07:00
2023-08-15 17:33:29 -07:00
uid = self.app._get_user_id(username)
print(f"{username} = {uid}")
2023-08-14 22:39:47 -07:00
def add_tweet(tweet: Tweet):
2023-08-15 17:33:29 -07:00
# malformed tweet check
2023-08-14 22:39:47 -07:00
nonlocal reached_backdate
try:
2023-08-17 02:28:29 -07:00
tweet.author.id
except:
print(f"skipping malformed tweet: {tweet}")
2023-08-14 22:39:47 -07:00
return
2023-08-20 02:33:05 -07:00
tweet = self.fix_tweet(tweet)
2023-08-18 18:20:53 -07:00
2023-08-15 17:33:29 -07:00
tweets.append(tweet)
2023-08-14 22:39:47 -07:00
2023-08-15 17:33:29 -07:00
if not reached_backdate and int(tweet.author.id) == uid and tweet.date <= since:
print("reached backdate")
reached_backdate = True
2023-08-14 22:39:47 -07:00
2023-08-18 01:34:25 -07:00
if uid in talent_lists.privated_accounts:
self.try_login(0)
2023-08-15 17:33:29 -07:00
while not reached_backdate:
try:
# uts = self.app.get_tweets(uid, replies=True, cursor=cur)
search = self.app.search(f'from:{username}', filter_=SearchFilters.Latest(), cursor=cur)
cur_page = search.tweets
print(f'obtained {len(cur_page)} tweets')
if len(cur_page) == 0: break
for e in cur_page:
if isinstance(e, Tweet):
add_tweet(e)
elif isinstance(e, TweetThread):
# FIXME: rework when replied_to is fixed (currently populates user_mentions)
# latest tweet in thread = og author's reply
for t in e:
add_tweet(t)
cur = search.cursor
2023-08-27 02:55:18 -07:00
except (UnknownError, RateLimitReached):
2023-08-15 17:33:29 -07:00
print("UnknownError occurred, probably rate-limited")
2023-08-18 18:20:53 -07:00
self.login_wait(uid in talent_lists.privated_accounts)
2023-08-14 22:39:47 -07:00
tweets.sort(key=lambda t: t.id)
return tweets
2023-08-17 02:28:29 -07:00
def get_cross_ttweets_from_user(self, username: str, since_date: str = None) -> list[TalentTweet]:
if since_date is not None:
d = since_date.split('-')
since = datetime(*[int(x) for x in d]).replace(tzinfo=pytz.utc)
else:
since = None
2023-08-15 17:33:29 -07:00
tweets = self.get_tweets_from_user(username, since)
2023-08-17 02:28:29 -07:00
# print_tweets(tweets)
2023-08-15 17:33:29 -07:00
ret: list[TalentTweet] = []
2023-08-14 22:39:47 -07:00
for t in tweets:
2023-08-15 17:33:29 -07:00
tt = TalentTweet.create_from_tweety(t)
if tt.is_cross_company():
ret.append(tt)
2023-08-21 03:10:46 -07:00
print(f'Found {len(ret)}/{len(tweets)} TalentTweets')
2023-08-15 17:33:29 -07:00
return ret
2023-08-17 02:28:29 -07:00
if __name__== '__main__':
talent_lists.init()
s = Scraper()
ttweets = s.get_cross_ttweets_from_user("pomurainpuff", since=datetime(2023, 7, 30).replace(tzinfo=pytz.utc))
2023-08-18 18:20:53 -07:00
print("\n".join([x.__repr__() for x in ttweets]))