reaching the end of fundamentals!

This commit is contained in:
muskit
2022-09-27 02:49:03 -07:00
committed by msk
parent c06e712e06
commit 07e7e28dcb
6 changed files with 261 additions and 126 deletions
+112 -73
View File
@@ -6,9 +6,6 @@
import traceback
import datetime
import sys
import os
import asyncio
import twint
@@ -17,14 +14,16 @@ from talent_lists import *
from twapi import TwAPI
import talenttweet as tt
def write_user_date(user_id, file, date_str = None, error = False):
if date_str is None:
date_str = util.datetime_to_tdate(datetime.datetime.now())
PROGRAM_ARGS = None
file.write(f'# {user_id} {date_str if not error else "-1"}\n')
def write_user_timestamp(user_id, file, timestamp = None, error = False):
if timestamp is None:
timestamp = datetime.datetime.now().timestamp()
file.write(f'# {user_id} {timestamp if not error else "-1"}\n')
pass
def get_queue_file():
def get_queue_path():
return f'{util.get_project_dir()}/queue.txt'
def get_local_queue():
@@ -32,7 +31,8 @@ def get_local_queue():
pass
## Returns the ID of all tweets (up to limit) from a user ID.
def get_user_tweets(id, since_date='', limit=None):
def get_user_tweets(id, since_timestamp=None, limit=None):
qrt_count = 0
tweets = list()
c = twint.Config()
c.User_id = id
@@ -40,101 +40,140 @@ def get_user_tweets(id, since_date='', limit=None):
c.Store_object = True
c.Store_object_tweets_list = tweets
c.Hide_output = True
c.Since = since_date
c.Since = '' if since_timestamp == None else util.timestamp_to_tdate(since_timestamp)
user_str = f'{util.get_username(id)}'
print(f'Scraping tweets from {user_str}...')
user_str = f'{util.get_username_local(id)}'
print(f'Scraping tweets from {user_str} since {"forever ago" if c.Since == "" else c.Since}...')
try:
twint.run.Search(c)
except:
print(f'Had trouble getting tweets from {user_str}')
for twt in tweets:
if twt.quote_url != '':
qrt_count += 1
print(f'Scraped {len(tweets)} tweets')
print(f'Scraped {len(tweets)} tweets, {qrt_count} of which are quote tweets.')
return tweets
# Returns dict of accounts that successfully caught up.
# LINE FORMAT: "# {user_id} {status_num} {UNIX_timestamp}
def get_finished_user_timestamps(queue_file):
results = dict()
for line in queue_file:
tokens = line.split()
if len(tokens) != 3 or tokens[0][0] != '#':
# reached end of accounts list
break
if tokens[2] != '-1':
results[int(tokens[1])] = float(tokens[2])
return results
def get_user_timestamps_str(queue_file):
results = str()
for line in queue_file:
tokens = line.split()
if len(tokens) != 3 or tokens[0][0] != '#':
# reached end of accounts list
break
results += f'{line}\n'
return results[:-1]
# If queue.txt doesn't exist, creates and populates it.
# Returns a list of sorted and filtered TalentTweets (should
# be equivalent to queue.txt)
async def get_cross_talent_tweets(queue_path):
finished_user_tdates = dict()
finished_user_timestamps = dict()
ttweets_dict = dict()
# Populate structures with existing data from queue.txt
try:
print('Processing existing data in queue.txt...')
with open(queue_path, 'r') as f:
# Check for finished and incomplete accounts
# LINE FORMAT: "# {user_id} {status_num} (TODO: use date of retrival YYYY-MM-DD)
for line in f:
tokens = line.split()
if len(tokens) != 3 or tokens[0][0] != '#':
# reached end of accounts list
break
if tokens[2] != '-1':
finished_user_tdates[int(tokens[1])] = tokens[2]
finished_user_timestamps = get_finished_user_timestamps(f)
# Add existing serialized TalentTweets into ttweets
# Get existing queued TalentTweets
for line in f:
tokens = line.split()
if len(tokens) == 0 or tokens[0][0] == '#':
continue
ttweet = tt.TalentTweet.deserialize(line)
ttweets_dict[ttweet.tweet_id] = ttweet
print(f'Found {len(finished_user_timestamps)} scraped accounts and {len(ttweets_dict)} tweets.')
except FileNotFoundError:
print('Couldn\'t find queue.txt.')
print('queue.txt not found.')
# Pull tweets from twint
with open(queue_path, 'w') as f:
# for talent_id in talent_lists.talents:
for talent_id in talent_lists.test_talents:
print('using test_talents')
if talent_id not in finished_user_tdates or \
finished_user_tdates[talent_id] != util.datetime_to_tdate(datetime.datetime.today()):
try:
tweets = get_user_tweets(talent_id, since_date=finished_user_tdates.get(talent_id, None))
for tweet in tweets:
ttweet = await tt.TalentTweet.create_from_twint_tweet(tweet)
if ttweet.is_cross_company():
ttweets_dict[ttweet.tweet_id] = ttweet
except:
print('Error occurred processing tweet data. Traceback:')
print(traceback.format_exc())
write_user_date(user_id=talent_id, file=f, error=True)
print('Pulling tweets from online!')
try:
print('TODO: using test_talents')
for talent_id in talent_lists.test_talents:
# for talent_id in talent_lists.talents:
if talent_id not in finished_user_timestamps or \
finished_user_timestamps[talent_id] < datetime.datetime.now().timestamp():
try:
# tweets = get_user_tweets(talent_id, since_timestamp=1663698621) # shorten test runs
tweets = get_user_tweets(talent_id, since_timestamp=finished_user_timestamps.get(talent_id, None))
for tweet in tweets:
if tweet.id not in ttweets_dict:
ttweet = await tt.TalentTweet.create_from_twint_tweet(tweet)
if ttweet.is_cross_company():
ttweets_dict[ttweet.tweet_id] = ttweet
except:
print('Error occurred processing tweet data.')
print(traceback.format_exc())
write_user_timestamp(user_id=talent_id, file=f, error=True)
else:
write_user_timestamp(user_id=talent_id, file=f)
else:
write_user_date(user_id=talent_id, file=f)
else:
print(f'Skipping already completed {util.get_username(talent_id)}')
write_user_date(user_id=talent_id, file=f, date_str=finished_user_tdates[talent_id])
f.write('\n')
ttweets_dict = dict(sorted(ttweets_dict.items()))
for ttweet in ttweets_dict.values():
f.write(f'{ttweet.serialize()}\n')
print(f'Skipping already completed {util.get_username_local(talent_id)}')
write_user_timestamp(user_id=talent_id, file=f, timestamp=finished_user_timestamps[talent_id])
f.write('\n')
ttweets_dict = dict(sorted(ttweets_dict.items()))
for ttweet in ttweets_dict.values():
f.write(f'{ttweet.serialize()}\n')
except:
print('Unhandled error occurred while pulling tweets.')
traceback.print_exc()
print('Saving queue.txt and exiting.')
exit(1)
return ttweets_dict
def process_queue(file):
print('TODO: implement process_queue')
# while Queue.txt has lines present
# attempt to deserialize first line of Queue.txt
# exit program if failed, stating error
# while post isn't successful
# attempt to post tweet
# delete serialized line from Queue.txt, save it
#
# we're done! post tweet announcing done with archives
pass
async def process_queue(ttweets_dict: dict):
global PROGRAM_ARGS
async def run():
# if Queue.txt exists
# work through the tweets in Queue.txt
# else
# look through every talent's tweets, saving only cross-company tweets into a list
# sort the list by tweet_id
# create Queue.txt and save all tweets (serialized) there
# post a tweet announcing archival intent
# work through the tweets in Queue.txt
if len(ttweets_dict) == 0: return
if PROGRAM_ARGS.announce_catchup:
TwAPI.instance.post_tweet(text=f'Starting to catch-up through {len(ttweets_dict)} logged tweets.')
try:
while len(ttweets_dict) > 0:
key = list(ttweets_dict.keys())[0]
ttweet = ttweets_dict[key]
await TwAPI.instance.post_ttweet(ttweet)
ttweets_dict.pop(key)
except:
print('Unhandled error occurred while posting tweets from queue.')
traceback.print_exc()
else:
if PROGRAM_ARGS.announce_catchup:
await TwAPI.instance.post_tweet('Finished with catch-up tweets!')
queue_path = get_queue_file()
ttweet_dict = await get_cross_talent_tweets(queue_path)
print(f'got {len(ttweet_dict)} tweets')
print('Updating what\'s left in ttweet_dict to queue.txt.')
with open(get_queue_path(), 'r') as f:
user_timestamps_str = get_user_timestamps_str(f)
with open(get_queue_path(), 'w') as f:
f.write(user_timestamps_str + '\n\n')
for ttweet in ttweets_dict.values():
f.write(f'{ttweet.serialize()}\n')
async def run(program_args):
global PROGRAM_ARGS
PROGRAM_ARGS = program_args
queue_path = get_queue_path()
ttweets_dict = await get_cross_talent_tweets(queue_path)
print(f'got {len(ttweets_dict)} tweets')
await process_queue(ttweets_dict)