2023-08-14 22:39:47 -07:00
|
|
|
from os.path import exists
|
|
|
|
|
from time import sleep
|
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
|
|
|
|
|
|
from dotenv import dotenv_values
|
|
|
|
|
import pytz
|
|
|
|
|
|
|
|
|
|
from tweety import Twitter
|
|
|
|
|
from tweety.types import *
|
2023-08-15 17:33:29 -07:00
|
|
|
from tweety.exceptions_ import *
|
|
|
|
|
from tweety.filters import SearchFilters
|
2023-08-14 22:39:47 -07:00
|
|
|
|
|
|
|
|
from tweety_utils import *
|
|
|
|
|
from talenttweet import *
|
2023-08-15 17:33:29 -07:00
|
|
|
import talent_lists
|
2023-08-14 22:39:47 -07:00
|
|
|
|
|
|
|
|
class Scraper:
|
|
|
|
|
def __init__(self):
|
|
|
|
|
creds = dotenv_values()
|
|
|
|
|
self.app = Twitter("session")
|
2023-08-15 17:33:29 -07:00
|
|
|
# if exists("session.json"):
|
|
|
|
|
# self.app.connect()
|
|
|
|
|
# else:
|
|
|
|
|
self.app.sign_in(creds["scraper_username"], creds["scraper_password"])
|
2023-08-14 22:39:47 -07:00
|
|
|
|
|
|
|
|
# since MUST BE TIMEZONE AWARE
|
|
|
|
|
# usage example: since=datetime(2023, 8, 1).replace(tzinfo=pytz.utc)
|
2023-08-15 17:33:29 -07:00
|
|
|
def get_tweets_from_user(self, username: str, since: datetime = None) -> list[Tweet]:
|
2023-08-14 22:39:47 -07:00
|
|
|
reached_backdate = False
|
|
|
|
|
tweets: list[Tweet] = []
|
2023-08-15 17:33:29 -07:00
|
|
|
cur = None
|
2023-08-14 22:39:47 -07:00
|
|
|
|
|
|
|
|
if since == None:
|
|
|
|
|
since = datetime.utcnow().replace(tzinfo=pytz.utc) - timedelta(days=7)
|
2023-08-15 17:33:29 -07:00
|
|
|
print(f'falling back to grabbing tweets since 7 days ago ({since.date()})')
|
|
|
|
|
else:
|
|
|
|
|
print(f'grabbing tweets since {since.date()}')
|
2023-08-14 22:39:47 -07:00
|
|
|
|
2023-08-15 17:33:29 -07:00
|
|
|
uid = self.app._get_user_id(username)
|
|
|
|
|
print(f"{username} = {uid}")
|
2023-08-14 22:39:47 -07:00
|
|
|
|
|
|
|
|
def add_tweet(tweet: Tweet):
|
2023-08-15 17:33:29 -07:00
|
|
|
# malformed tweet check
|
2023-08-14 22:39:47 -07:00
|
|
|
nonlocal reached_backdate
|
|
|
|
|
try:
|
|
|
|
|
tweet.author
|
|
|
|
|
except AttributeError:
|
|
|
|
|
print("skipping malformed tweet: {tweet}")
|
|
|
|
|
return
|
|
|
|
|
|
2023-08-15 17:33:29 -07:00
|
|
|
# fix reply if it exists
|
|
|
|
|
# if tweet.is_reply and tweet.replied_to is None:
|
|
|
|
|
# tweet.replied_to = self.app.tweet_detail(tweet._original_tweet['in_reply_to_status_id_str'])
|
|
|
|
|
tweets.append(tweet)
|
2023-08-14 22:39:47 -07:00
|
|
|
|
2023-08-15 17:33:29 -07:00
|
|
|
if not reached_backdate and int(tweet.author.id) == uid and tweet.date <= since:
|
|
|
|
|
print("reached backdate")
|
|
|
|
|
reached_backdate = True
|
2023-08-14 22:39:47 -07:00
|
|
|
|
2023-08-15 17:33:29 -07:00
|
|
|
while not reached_backdate:
|
|
|
|
|
try:
|
|
|
|
|
# uts = self.app.get_tweets(uid, replies=True, cursor=cur)
|
|
|
|
|
search = self.app.search(f'from:{username}', filter_=SearchFilters.Latest(), cursor=cur)
|
|
|
|
|
cur_page = search.tweets
|
|
|
|
|
print(f'obtained {len(cur_page)} tweets')
|
|
|
|
|
|
|
|
|
|
if len(cur_page) == 0: break
|
|
|
|
|
|
|
|
|
|
for e in cur_page:
|
|
|
|
|
if isinstance(e, Tweet):
|
|
|
|
|
add_tweet(e)
|
|
|
|
|
elif isinstance(e, TweetThread):
|
|
|
|
|
# FIXME: rework when replied_to is fixed (currently populates user_mentions)
|
|
|
|
|
# latest tweet in thread = og author's reply
|
|
|
|
|
add_tweet(e[0])
|
|
|
|
|
for t in e:
|
|
|
|
|
add_tweet(t)
|
|
|
|
|
|
|
|
|
|
cur = search.cursor
|
|
|
|
|
except UnknownError:
|
|
|
|
|
print("UnknownError occurred, probably rate-limited")
|
|
|
|
|
print("sleeping for 1 minute...")
|
|
|
|
|
sleep(60)
|
2023-08-14 22:39:47 -07:00
|
|
|
|
|
|
|
|
tweets.sort(key=lambda t: t.id)
|
|
|
|
|
return tweets
|
|
|
|
|
|
2023-08-15 17:33:29 -07:00
|
|
|
def get_cross_ttweets_from_user(self, username: str, since: datetime = None) -> list[TalentTweet]:
|
|
|
|
|
tweets = self.get_tweets_from_user(username, since)
|
|
|
|
|
print_tweets(tweets)
|
|
|
|
|
ret: list[TalentTweet] = []
|
2023-08-14 22:39:47 -07:00
|
|
|
for t in tweets:
|
2023-08-15 17:33:29 -07:00
|
|
|
tt = TalentTweet.create_from_tweety(t)
|
|
|
|
|
if tt.is_cross_company():
|
|
|
|
|
ret.append(tt)
|
|
|
|
|
return ret
|
|
|
|
|
|
|
|
|
|
talent_lists.init()
|
|
|
|
|
s = Scraper()
|
|
|
|
|
ttweets = s.get_cross_ttweets_from_user("pomurainpuff", since=datetime(2023, 7, 30).replace(tzinfo=pytz.utc))
|
|
|
|
|
print("\n".join([x.__repr__() for x in ttweets]))
|