(de)serialization and queuing works now

This commit is contained in:
muskit
2023-08-17 02:28:29 -07:00
parent dd4578708a
commit 3c93ca18c2
10 changed files with 163 additions and 188 deletions
+31
View File
@@ -0,0 +1,31 @@
from dotenv import dotenv_values
## Track multiple accounts in a pool, cycling to the next one when requested.
class AccountPool:
def __init__(self):
self.__accounts: list[tuple[str, str]] = list()
self.__idx = -1
creds = dotenv_values()
i = 0
while True:
if f'scraper_username{i}' in creds \
and f'scraper_password{i}' in creds:
self.__accounts.append((
creds[f'scraper_username{i}'],
creds[f'scraper_password{i}']
))
i += 1
else:
break
def current(self):
if 0 <= self.__idx < len(self.__accounts):
return self.__accounts[self.__idx]
return None
def next(self) -> tuple[str, str] | None:
self.__idx += 1
if self.__idx >= len(self.__accounts):
self.__idx = -1
return None
return self.current()
-42
View File
@@ -1,42 +0,0 @@
## Twitter developer credentials management.
from os.path import join, isfile
from dotenv import dotenv_values
import util
# returns dictionary of the Credentials section.
# [NOT TO BE USED OUTSIDE OF THIS FILE.]
def __get_env():
f = join(util.get_project_dir(), '.env')
if isfile(f):
return dotenv_values(f)
return None
# returns the consumer api_key stored in secrets.ini
def api_key():
c = __get_env()
return c.get(option='api_key', fallback='xxx') if c is not None else 'xxx'
# returns the consumer api_secret stored in secrets.ini
def api_secret():
c = __get_env()
return c.get(option='api_secret', fallback='yyy') if c is not None else 'yyy'
# returns the bearer_token stored in secrets.ini
def bearer_token():
c = __get_env()
return c.get(option='bearer_token', fallback='zzz') if c is not None else 'zzz'
# returns the access_token stroed in secrets.ini
def access_token():
c = __get_env()
return c.get(option='oauth1_access_token', fallback='zzz') if c is not None else 'aaa'
# returns the access_secret stroed in secrets.ini
def access_secret():
c = __get_env()
return c.get(option='oauth1_access_secret', fallback='zzz') if c is not None else 'bbb'
def get_all_secrets():
return f'api_key:{api_key()}\napi_secret:{api_secret()}\nbearer_token:{bearer_token()}\naccess_token:{access_token()}\naccess_secret:{access_secret()}'
+20 -40
View File
@@ -8,9 +8,9 @@ import traceback
import datetime
import asyncio
import shutil
from datetime import datetime
import twint
from scraper import Scraper
from util import *
from talent_lists import *
from twapi import TwAPI
@@ -21,41 +21,12 @@ PROGRAM_ARGS = None
safe_to_post_tweets = False
errored = False
## Returns the ID of all tweets (up to limit) from a user ID.
def get_user_tweets(id, since_date=None, limit=None):
global safe_to_post_tweets
qrt_count = 0
tweets = list()
c = twint.Config()
c.User_id = id
c.Limit = limit
c.Store_object = True
c.Store_object_tweets_list = tweets
c.Hide_output = True
c.Since = '' if since_date == None else f'{since_date} 00:00:00'
user_str = f'@{util.get_username_local(id)}'
print(f'Scraping tweets from {user_str} since {"forever ago" if c.Since == "" else c.Since}...')
try:
twint.run.Search(c)
except:
print(f'Had trouble getting tweets from {user_str}')
safe_to_post_tweets = False
traceback.print_exc()
for twt in tweets:
if type(twt.quote_url) is str and twt.quote_url != '':
qrt_count += 1
print(f'Scraped {len(tweets)} tweets, {qrt_count} of which are quote tweets.')
return tweets
# Returns a list of sorted and filtered TalentTweets (should
# be equivalent to queue.txt)
async def get_cross_talent_tweets():
global safe_to_post_tweets
scraper = Scraper()
queue = ttq.TalentTweetQueue.instance
# Begin getting tweets from online
@@ -64,19 +35,28 @@ async def get_cross_talent_tweets():
for i, (talent_id, talent_username) in enumerate(talent_lists.talents.items()):
print(f'[{i+1}/{len(talent_lists.talents)}] {talent_username}-----------------------------------')
try:
tweets = get_user_tweets(talent_id, since_date=queue.finished_user_dates.get(talent_id, None))
for tweet in tweets:
if tweet.id not in queue.ttweets_dict and tweet.id not in queue.finished_ttweets:
ttweet = await tt.TalentTweet.create_from_twint_tweet(tweet)
if ttweet.is_cross_company():
queue.add_ttweet(ttweet)
# tweets = get_user_tweets(talent_id, since_date=queue.finished_user_dates.get(talent_id, None))
since_date = queue.finished_user_dates.get(talent_id, None)
ttweets = scraper.get_cross_ttweets_from_user(talent_username, since_date=since_date)
for ttweet in ttweets:
if ttweet.tweet_id not in queue.ttweets_dict \
and ttweet.tweet_id not in queue.finished_ttweets \
and ttweet.is_cross_company():
queue.add_ttweet(ttweet)
except KeyboardInterrupt as e:
raise e
except:
print('Error occurred processing tweet data.')
safe_to_post_tweets = False
print(traceback.format_exc())
queue.finished_user_dates[talent_id] = '2000-01-01'
traceback.print_exc()
if talent_id not in queue.finished_user_dates:
queue.finished_user_dates[talent_id] = '2023-04-26' # date is when bot token first got revoked
else:
queue.finished_user_dates[talent_id] = util.get_current_date()
queue.save_file()
except KeyboardInterrupt:
print('Interrupting tweet pulling... NOTE: remaining dates in queue file will not be updated!')
queue.save_file()
except:
print('Unhandled error occurred while pulling tweets.')
traceback.print_exc()
+5 -50
View File
@@ -1,66 +1,21 @@
## The bot's listen mode
# Continuously listen for cross-company interactions.
from time import sleep
import asyncio
import traceback
import tweepy
from talenttweet import TalentTweet
from twapi import TwAPI
import ttweetqueue as ttq
import api_secrets
import talent_lists as tl
import util
import catchup
errors_encountered = 0
def on_response(resp):
ttweet = TalentTweet.create_from_v2api_response(resp)
if ttweet is None:
print('Couldn\'t create ttweet from the response:')
print(resp)
return
tweet_username = util.get_username(ttweet.author_id)
if ttweet.is_cross_company():
print(f'Tweet {ttweet.tweet_id} is cross-company! Creating post...')
is_successful = asyncio.run(TwAPI.instance.post_ttweet(ttweet))
if is_successful:
ttq.TalentTweetQueue.instance.add_finished_tweet(ttweet.tweet_id)
else:
print(f'[WARNING] Failed to post ttweet for {tweet_username}/{ttweet.tweet_id}!')
else:
print(f'Tweet {tweet_username}/{ttweet.tweet_id} is not cross-company.')
def run():
global errors_encountered
while True:
try:
sc = tweepy.StreamingClient(api_secrets.bearer_token())
# clear rules
print('Clearing streaming rules...')
rules_resp = sc.get_rules()
if rules_resp.data:
print('Deleted a rule!')
sc.delete_rules(rules_resp.data)
# create new rules
print('Creating new streaming rules...')
for rule in tl.get_twitter_rules():
sc.add_rules(tweepy.StreamRule(rule))
print('--------------------------------------------')
print(sc.get_rules().data)
print('--------------------------------------------')
sc.on_response=on_response
print('Starting listening stream...')
sc.filter(
expansions=TwAPI.TWEET_EXPANSIONS,
media_fields=TwAPI.TWEET_MEDIA_FIELDS,
tweet_fields=TwAPI.TWEET_FIELDS
)
asyncio.run(catchup.run())
print('Sleeping for 30 minutes...')
sleep(1800) # run every half-hour
except KeyboardInterrupt:
print('Interrupt signal received. Exiting listen mode.')
print(f'{errors_encountered} errors encountered throughout session.')
+6 -4
View File
@@ -8,7 +8,6 @@ import nest_asyncio
import talent_lists
import ttweetqueue as ttq
import api_secrets
import catchup
import listen
from twapi import TwAPI
@@ -52,6 +51,8 @@ async def async_main():
await self_destruct()
elif mode == 'cmd':
command_line()
elif mode in ['l', 'listen']:
listen.run()
else:
print('\nunknown mode. run with no arguments or -h for help and modes')
@@ -59,9 +60,9 @@ def main():
global PROGRAM_ARGS
parser = init_argparse()
# if len(sys.argv) < 2:
# parser.print_help()
# return
if len(sys.argv) < 2:
parser.print_help()
return
PROGRAM_ARGS = parser.parse_args()
@@ -77,6 +78,7 @@ def main():
ttq.TalentTweetQueue()
## Asynchronous execution
print('beginning async main')
nest_asyncio.apply()
asyncio.run(async_main())
+60 -19
View File
@@ -2,7 +2,6 @@ from os.path import exists
from time import sleep
from datetime import datetime, timedelta
from dotenv import dotenv_values
import pytz
from tweety import Twitter
@@ -10,18 +9,33 @@ from tweety.types import *
from tweety.exceptions_ import *
from tweety.filters import SearchFilters
from account_pool import AccountPool
from tweety_utils import *
from talenttweet import *
import talent_lists
class Scraper:
def __init__(self):
creds = dotenv_values()
self.app = Twitter("session")
# if exists("session.json"):
# self.app.connect()
# else:
self.app.sign_in(creds["scraper_username"], creds["scraper_password"])
Scraper.instance = self
self.__account = AccountPool()
self.try_login()
def try_login(self) -> bool:
acc = self.__account.next()
if acc is not None:
name = acc[0]
print(f"using {name}")
self.app = Twitter(name)
if exists(f"{name}.json"):
try:
self.app.connect()
except:
self.app.sign_in(*acc)
else:
self.app.sign_in(*acc)
return True
print('exhausted all accounts!')
return False
# since MUST BE TIMEZONE AWARE
# usage example: since=datetime(2023, 8, 1).replace(tzinfo=pytz.utc)
@@ -43,11 +57,29 @@ class Scraper:
# malformed tweet check
nonlocal reached_backdate
try:
tweet.author
except AttributeError:
print("skipping malformed tweet: {tweet}")
tweet.author.id
except:
print(f"skipping malformed tweet: {tweet}")
return
# recover lost info
if tweet.is_retweet:
if tweet.retweeted_tweet is None:
print(f'{tweet.author.username}/{tweet.id} is missing the RT! Recovering...')
tweet.retweeted_tweet = self.app.tweet_detail(str(tweet.id)).retweeted_tweet
if tweet.retweeted_tweet.author is None:
print(f'WARNING: {tweet.author.username}/{tweet.id} is missing the RT author! Recovering details...')
tweet.retweeted_tweet = self.app.tweet_detail(tweet.retweeted_tweet.id)
if tweet.is_quoted:
if tweet.quoted_tweet is None: # quoted tweet is deleted
# print(f'{tweet.author.username}/{tweet.id} is missing the QRT! Recovering...')
# tweet.quoted_tweet = self.app.tweet_detail(str(tweet.id)).quoted_tweet
tweet.is_quoted = False
elif tweet.quoted_tweet.author is None:
print(f'WARNING: {tweet.author.username}/{tweet.id} is missing the QRT author! Recovering details...')
tweet.quoted_tweet= self.app.tweet_detail(tweet.quoted_tweet.id)
# fix reply if it exists
# if tweet.is_reply and tweet.replied_to is None:
# tweet.replied_to = self.app.tweet_detail(tweet._original_tweet['in_reply_to_status_id_str'])
@@ -72,22 +104,30 @@ class Scraper:
elif isinstance(e, TweetThread):
# FIXME: rework when replied_to is fixed (currently populates user_mentions)
# latest tweet in thread = og author's reply
add_tweet(e[0])
for t in e:
add_tweet(t)
cur = search.cursor
except UnknownError:
print("UnknownError occurred, probably rate-limited")
print("sleeping for 1 minute...")
sleep(60)
# traceback.print_exc()
if not self.try_login():
print("sleeping for 1 minute...")
sleep(60)
print()
self.try_login()
tweets.sort(key=lambda t: t.id)
return tweets
def get_cross_ttweets_from_user(self, username: str, since: datetime = None) -> list[TalentTweet]:
def get_cross_ttweets_from_user(self, username: str, since_date: str = None) -> list[TalentTweet]:
if since_date is not None:
d = since_date.split('-')
since = datetime(*[int(x) for x in d]).replace(tzinfo=pytz.utc)
else:
since = None
tweets = self.get_tweets_from_user(username, since)
print_tweets(tweets)
# print_tweets(tweets)
ret: list[TalentTweet] = []
for t in tweets:
tt = TalentTweet.create_from_tweety(t)
@@ -95,7 +135,8 @@ class Scraper:
ret.append(tt)
return ret
talent_lists.init()
s = Scraper()
ttweets = s.get_cross_ttweets_from_user("pomurainpuff", since=datetime(2023, 7, 30).replace(tzinfo=pytz.utc))
print("\n".join([x.__repr__() for x in ttweets]))
if __name__== '__main__':
talent_lists.init()
s = Scraper()
ttweets = s.get_cross_ttweets_from_user("pomurainpuff", since=datetime(2023, 7, 30).replace(tzinfo=pytz.utc))
print("\n".join([x.__repr__() for x in ttweets]))
+15 -11
View File
@@ -38,10 +38,12 @@ class TalentTweet:
@staticmethod
def deserialize(serialized_str: str):
tokens = serialized_str.split('#')[0]
if len(tokens) < 3:
token_check = serialized_str.split('#')[0]
if len(token_check) < 3:
raise ValueError('not enough tokens to reconstruct a TalentTweet')
tokens = serialized_str.split()
tweet_id, author_id = int(tokens[0]), int(tokens[1])
date_time = datetime.fromtimestamp(float(tokens[2]), tz=pytz.utc)
@@ -59,7 +61,7 @@ class TalentTweet:
if tokens[i].isnumeric():
if mode == 'm': # mentions
mentions.add(int(tokens[i]))
mentions.append(int(tokens[i]))
continue
if mode == 'r': # reply_to
reply_to = int(tokens[i])
@@ -69,7 +71,7 @@ class TalentTweet:
if mode == 'rt': # retweeted user
rt = int(tokens[i])
if mode == 'rtm': # retweet/qrt mentions
rtm = int(tokens[i])
rtm.append(int(tokens[i]))
return TalentTweet(
tweet_id=tweet_id, author_id=author_id,
@@ -168,15 +170,20 @@ class TalentTweet:
def announce_text(self, is_catchup=False):
# templates
REPLY = '{0} replied to {1}!'
TWEET = '{0} tweeted!'
TWEET = '{0} tweeted mentioning {1}!'
RETWEET = '{0} retweeted {1}!'
RETWEET_MENTIONS_B = '{0} shared a tweet mentioning{1}!'
RETWEET_MENTIONS_B = '{0} shared a tweet mentioning {1}!'
QUOTE_TWEET = '{0} quote tweeted {1}!'
QUOTE_TWEET_MENTIONS_B = '{0} quoted a tweet mentioning {1}!'
author_username = f'@/{util.get_username_with_company(self.author_id)}'
ret = str()
print_mention_ids = set(self.mentions)
try: print_mention_ids.remove(None)
except: pass
mention_usernames = [f'@/{util.get_username_with_company(x)}' for x in print_mention_ids]
if is_catchup:
ret += f'{self.get_datetime_str()}\n'
pass
@@ -198,16 +205,13 @@ class TalentTweet:
else:
ret += QUOTE_TWEET.format(author_username, quoted_username)
elif len(self.mentions) > 0: # standalone tweet
ret += TWEET.format(author_username)
ret += TWEET.format(author_username, ", ".join(mention_usernames))
return ret
else:
raise ValueError(f'TalentTweet {self.tweet_id} has insufficient other parties')
try: print_mention_ids.remove(None)
except: pass
# mention line
if len(print_mention_ids) > 0:
mention_usernames = [f'@/{util.get_username_with_company(x)}' for x in print_mention_ids]
ret += (
'\nMentioning '
f'{", ".join(mention_usernames)}'
+5 -1
View File
@@ -1,6 +1,7 @@
# TODO: move queue structures and file handling here
import os
import shutil
import traceback
import util
import talenttweet as tt
@@ -55,9 +56,12 @@ class TalentTweetQueue:
if len(tokens) == 0 or tokens[0][0] == '#':
continue
ttweet = tt.TalentTweet.deserialize(line)
# print(f'{ttweet.tweet_id}:\n{ttweet}')
self.ttweets_dict[ttweet.tweet_id] = ttweet
print(f'Found {len(self.finished_user_dates)} scraped accounts and {len(self.ttweets_dict)} tweets in queue.')
except: pass
except:
traceback.print_exc()
pass
# finished ttweets
try:
with open(self.finished_ttweets_path, 'r') as f:
+20 -20
View File
@@ -2,9 +2,9 @@ import datetime
import traceback
import asyncio
from dotenv import dotenv_values
import tweepy
import api_secrets
import talenttweet as tt
import talent_lists as tl
import ttweetqueue as ttq
@@ -70,24 +70,25 @@ class TwAPI:
def __init__(self):
creds = dotenv_values()
TwAPI.instance = self
self.client = tweepy.Client(
bearer_token=api_secrets.bearer_token(),
consumer_key=api_secrets.api_key(), consumer_secret=api_secrets.api_secret(),
access_token=api_secrets.access_token(), access_token_secret=api_secrets.access_secret()
consumer_key=creds['app_key'], consumer_secret=creds['app_secret'],
access_token=creds['user_token'], access_token_secret=creds['user_secret']
)
self.api = tweepy.API(
auth=tweepy.OAuthHandler(
consumer_key=api_secrets.api_key(), consumer_secret=api_secrets.api_secret(),
access_token=api_secrets.access_token(), access_token_secret=api_secrets.access_secret()
)
)
try:
self.me = self.client.get_me().data
except Exception as e:
print('Did you setup secrets.ini?')
raise e
print(f'Assuming the account of @{self.me.data["username"]} ({self.me["id"]})')
# self.api = tweepy.API(
# auth=tweepy.OAuthHandler(
# consumer_key=api_secrets.api_key(), consumer_secret=api_secrets.api_secret(),
# access_token=api_secrets.access_token(), access_token_secret=api_secrets.access_secret()
# )
# )
# try:
# self.me = self.client.get_me(wait_on_rate_limit=True).data
# except Exception as e:
# print('Failed to login!')
# raise e
# print(f'Assuming the account of @{self.me.data["username"]} ({self.me["id"]})')
## ---[COMMENT OUT WHEN NOT IN USE]---
# async def nuke_tweets(self):
@@ -154,17 +155,16 @@ class TwAPI:
# NO DRY-RUN: actually post tweet
# main tweet: text + screenshot
try:
print('creating main QRT w/ screenshot...', end='')
print('creating main QRT w/ screenshot...')
media_ids = [await self.get_ttweet_image_media_id(ttweet)]
twt_resp = await self.post_tweet(text, media_ids=media_ids, quote_tweet_id=ttweet.tweet_id)
print('done')
except:
print('error occurred trying to create main tweet, falling back to URL-main + reply screencap format')
traceback.print_exc()
text += f"\n{ttweet_url}"
try:
print('posting main tweet...', end='')
twt_resp = await self.post_tweet(text)
print('posting main tweet...')
twt_resp = await self.post_tweet(text, quote_tweet_id=ttweet.tweet_id)
print('done')
twt_id = twt_resp.data['id']
# if ttweet.reply_to is not None: