From 579929559f37c1f254d710dadd2ccb26ab6df8d1 Mon Sep 17 00:00:00 2001 From: msk <15199219+muskit@users.noreply.github.com> Date: Sat, 24 Sep 2022 17:56:58 -0700 Subject: [PATCH] added twint (scraper), restructuring --- requirements.txt | 5 +- src/api.py | 204 ++++++++++++++--------------- src/{secrets.py => api_secrets.py} | 82 ++++++------ src/catchup.py | 60 ++++++--- src/main.py | 127 +++++++++--------- src/talent_lists.py | 44 ++++--- src/talenttweet.py | 69 ++++++++++ src/util.py | 54 +++----- 8 files changed, 349 insertions(+), 296 deletions(-) rename src/{secrets.py => api_secrets.py} (89%) create mode 100644 src/talenttweet.py diff --git a/requirements.txt b/requirements.txt index 02cd967..7c5f3a9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ -tweepy -tweet-capture \ No newline at end of file +tweepy +tweet-capture +git+https://github.com/muskit/twint_2022_fix.git \ No newline at end of file diff --git a/src/api.py b/src/api.py index 964846b..4320650 100644 --- a/src/api.py +++ b/src/api.py @@ -1,109 +1,97 @@ -from lib2to3.pgen2 import token -from math import inf -from urllib import response -import tweepy - -import secrets -import util - -class TwAPI: - instance = None - TWEET_MEDIA_FIELDS = ['url'] - TWEET_FIELDS = ['created_at', 'in_reply_to_user_id'] - TWEET_EXPANSIONS = ['entities.mentions.username', 'referenced_tweets.id.author_id'] - - def __init__(self): - TwAPI.instance = self - self.client = tweepy.Client( - bearer_token=secrets.bearer_token(), - consumer_key=secrets.api_key(), consumer_secret=secrets.api_secret(), - access_token=secrets.access_token(), access_token_secret=secrets.access_secret() - ) - - # Returns a set of involved parties for a single tweet. - # - # Tweet must have been queried with these parameters: - # media_fields=['url'], - # tweet_fields=['created_at', 'in_reply_to_user_id'], - # expansions=['entities.mentions.username', 'referenced_tweets.id.author_id'] - @staticmethod - def get_involved_parties(tweet, response): - involved_parties = set() - # mentions - try: - mention_list = tweet.entities['mentions'] - for mention in mention_list: - involved_parties.add(int(mention['id'])) - except: pass - # reply-to - if tweet.in_reply_to_user_id != None: - involved_parties.add(tweet.in_reply_to_user_id) - # qrt - if tweet.attachments: - for ref_tweet in tweet.attachments: - if ref_tweet.type == 'quoted': - for incl_tweet in response.includes['tweets']: - if incl_tweet.id == ref_tweet.id: - involved_parties.add(incl_tweet.author_id) - - return involved_parties - - # Returns a tweet and mention-set pair, given a tweet ID. - def get_tweet_mentions(self, id): - resp = self.client.get_tweet(id, - media_fields=TwAPI.TWEET_MEDIA_FIELDS, - tweet_fields=TwAPI.TWEET_FIELDS, - expansions=TwAPI.TWEET_EXPANSIONS) - - tweet = resp.data - mentions = TwAPI.get_involved_parties(tweet, resp) - return (tweet, mentions) - - # Returns a list (tweet, {mentions}) from a user. - # mentions- a set comprised of any other parties involved - # in this tweet (reply, mention, qrt) - def get_users_all_tweets_mentions(self, id: int, count=inf): - pairs = list() - - retrieve_size = util.clamp(count, 5, 100) - next_page_token = None - tokens_retrieved = 0 - tweets_retrieved = 0 - - while tweets_retrieved < count: - print(f'Retrieved {tokens_retrieved} tokens so far...') - resp = self.client.get_users_tweets(id, max_results=retrieve_size, pagination_token=next_page_token, - media_fields=TwAPI.TWEET_MEDIA_FIELDS, - tweet_fields=TwAPI.TWEET_FIELDS, - expansions=TwAPI.TWEET_EXPANSIONS) - - for tweet in resp.data: - mentions = TwAPI.get_involved_parties(tweet, resp) - pairs.append((tweet, mentions)) - - # update counters and pagination token - tweets_retrieved += resp.meta['result_count'] - if tweets_retrieved < count: - try: - next_page_token = resp.meta['next_token'] - tokens_retrieved += 1 - except KeyError: - print("next_token wasn't provided; we've reached the end!") - break # reached end of user's tweets - - print(f'Retrieved {tweets_retrieved} tweets using {tokens_retrieved} tokens.') - return pairs - - # returns a filtered list (tweet, [mentions]) from a user - def get_users_cross_tweets_mentions(self, id): - ret = list() - pairs = self.get_users_all_tweets_mentions(id) - for pair in pairs: - if util.is_cross_company(pair): - ret.append(pair) - - return ret - - # Create a post that showcases given tweet and its mentions set. - def create_post(self, tweet, mentions): +from lib2to3.pgen2 import token +from math import inf +from urllib import response +import tweepy + +import api_secrets +import talenttweet as tt +import util + +class TwAPI: + instance = None + TWEET_MEDIA_FIELDS = ['url'] + TWEET_FIELDS = ['created_at', 'in_reply_to_user_id'] + TWEET_EXPANSIONS = ['entities.mentions.username', 'referenced_tweets.id.author_id'] + + # Returns a set of involved parties for a single tweet. + # + # Tweet must have been queried with these parameters: + # media_fields=['url'], + # tweet_fields=['created_at', 'in_reply_to_user_id'], + # expansions=['entities.mentions.username', 'referenced_tweets.id.author_id'] + @staticmethod + def get_involved_parties(tweet, response): + involved_parties = set() + # mentions + try: + mention_list = tweet.entities['mentions'] + for mention in mention_list: + involved_parties.add(int(mention['id'])) + except: pass + # reply-to + if tweet.in_reply_to_user_id != None: + involved_parties.add(tweet.in_reply_to_user_id) + # qrt + if tweet.attachments: + for ref_tweet in tweet.attachments: + if ref_tweet.type == 'quoted': + for incl_tweet in response.includes['tweets']: + if incl_tweet.id == ref_tweet.id: + involved_parties.add(incl_tweet.author_id) + + return involved_parties + + def __init__(self): + TwAPI.instance = self + self.client = tweepy.Client( + bearer_token=api_secrets.bearer_token(), + consumer_key=api_secrets.api_key(), consumer_secret=api_secrets.api_secret(), + access_token=api_secrets.access_token(), access_token_secret=api_secrets.access_secret() + ) + + # Returns a list of TalentTweets from a user. + def get_users_all_tweets_mentions(self, id: int, count=inf): + ttweets = list() + + retrieve_size = util.clamp(count, 5, 100) + next_page_token = None + tokens_retrieved = 0 + tweets_retrieved = 0 + + while tweets_retrieved < count: + print(f'Retrieved {tokens_retrieved} tokens so far...') + resp = self.client.get_users_tweets(id, max_results=retrieve_size, pagination_token=next_page_token, + media_fields=TwAPI.TWEET_MEDIA_FIELDS, + tweet_fields=TwAPI.TWEET_FIELDS, + expansions=TwAPI.TWEET_EXPANSIONS) + + for tweet in resp.data: + mentions = TwAPI.get_involved_parties(tweet, resp) + ttweets.append(tt.TalentTweet(tweet=tweet, other_parties=mentions)) + + # update counters and pagination token + tweets_retrieved += resp.meta['result_count'] + if tweets_retrieved < count: + try: + next_page_token = resp.meta['next_token'] + tokens_retrieved += 1 + except KeyError: + print("next_token wasn't provided; we've reached the end!") + break # reached end of user's tweets + + print(f'Retrieved {tweets_retrieved} tweets using {tokens_retrieved} tokens.') + return ttweets + + # Returns a list of cross-company TalentTweets from a user. + def get_users_cross_tweets_mentions(self, id): + ret = list() + ttweets = self.get_users_all_tweets_mentions(id) + for ttweet in ttweets: + if ttweet.is_cross_company(): + ret.append(ttweet) + + return ret + + # Create a post that showcases given tweet and its mentions set. + def create_post(self, tweet, mentions): pass \ No newline at end of file diff --git a/src/secrets.py b/src/api_secrets.py similarity index 89% rename from src/secrets.py rename to src/api_secrets.py index 4bcb56a..2e50e12 100644 --- a/src/secrets.py +++ b/src/api_secrets.py @@ -1,42 +1,42 @@ -## Twitter developer credentials management. - -import os -import configparser - -from util import * - -# returns dictionary of the Credentials section. -# [NOT TO BE USED OUTSIDE OF THIS FILE.] -def __get_ini_credentials(): - c = configparser.RawConfigParser() - if len(c.read(os.path.join(get_project_dir(), 'secrets.ini'))) > 0 and c.has_section('Credentials'): - return c['Credentials'] - return None - -# returns the consumer api_key stored in secrets.ini -def api_key(): - c = __get_ini_credentials() - return c.get(option='api_key', fallback='xxx') if c is not None else 'xxx' - -# returns the consumer api_secret stored in secrets.ini -def api_secret(): - c = __get_ini_credentials() - return c.get(option='api_secret', fallback='yyy') if c is not None else 'yyy' - -# returns the bearer_token stored in secrets.ini -def bearer_token(): - c = __get_ini_credentials() - return c.get(option='bearer_token', fallback='zzz') if c is not None else 'zzz' - -# returns the access_token stroed in secrets.ini -def access_token(): - c = __get_ini_credentials() - return c.get(option='oauth1_access_token', fallback='zzz') if c is not None else 'aaa' - -# returns the access_secret stroed in secrets.ini -def access_secret(): - c = __get_ini_credentials() - return c.get(option='oauth1_access_secret', fallback='zzz') if c is not None else 'bbb' - -def get_all_secrets(): +## Twitter developer credentials management. + +import os +import configparser + +import util + +# returns dictionary of the Credentials section. +# [NOT TO BE USED OUTSIDE OF THIS FILE.] +def __get_ini_credentials(): + c = configparser.RawConfigParser() + if len(c.read(os.path.join(util.get_project_dir(), 'secrets.ini'))) > 0 and c.has_section('Credentials'): + return c['Credentials'] + return None + +# returns the consumer api_key stored in secrets.ini +def api_key(): + c = __get_ini_credentials() + return c.get(option='api_key', fallback='xxx') if c is not None else 'xxx' + +# returns the consumer api_secret stored in secrets.ini +def api_secret(): + c = __get_ini_credentials() + return c.get(option='api_secret', fallback='yyy') if c is not None else 'yyy' + +# returns the bearer_token stored in secrets.ini +def bearer_token(): + c = __get_ini_credentials() + return c.get(option='bearer_token', fallback='zzz') if c is not None else 'zzz' + +# returns the access_token stroed in secrets.ini +def access_token(): + c = __get_ini_credentials() + return c.get(option='oauth1_access_token', fallback='zzz') if c is not None else 'aaa' + +# returns the access_secret stroed in secrets.ini +def access_secret(): + c = __get_ini_credentials() + return c.get(option='oauth1_access_secret', fallback='zzz') if c is not None else 'bbb' + +def get_all_secrets(): return f'api_key:{api_key()}\napi_secret:{api_secret()}\nbearer_token:{bearer_token()}\naccess_token:{access_token()}\naccess_secret:{access_secret()}' \ No newline at end of file diff --git a/src/catchup.py b/src/catchup.py index 0b5a131..04558cf 100644 --- a/src/catchup.py +++ b/src/catchup.py @@ -1,21 +1,39 @@ -## The bot's catch-up mode -# Scan all accounts for cross-company interactions. -# Terminates when finished scanning and posting. -# -# We should post, at the fastest, one tweet per minute. - -import os - -from util import * -from api import TwAPI - -## Returns list of tweets present in queue.txt -def get_local_queue(): - # f = open(os.path.join(get_project_dir(), 'queue.txt')) - pass - -def run(): - queue = get_local_queue() - pairs = TwAPI.instance.get_users_all_tweets_mentions(1390620618001838086, count=5) - for (tweet, mentions) in pairs: - print_tweet(tweet, mentions) \ No newline at end of file +## The bot's catch-up mode +# Scan all accounts for cross-company interactions. +# Terminates when finished scanning and posting. +# +# We should post, at the fastest, one tweet per minute. + +import os + +import twint + +from util import * +from talent_lists import * +from api import TwAPI +import talenttweet as tt + +## Returns list of tweets present in queue.txt +def get_local_queue(): + # f = open(os.path.join(get_project_dir(), 'queue.txt')) + pass + +## Returns the ID of all tweets (up to limit) from a user ID. +def get_user_tweet_ids(id, limit=None): + tweets = list() + c = twint.Config() + c.User_id = id + c.Limit = limit + c.Store_object = True + c.Store_object_tweets_list = tweets + + twint.run.Search(c) + return [x.id for x in tweets] + +def run(): + queue = get_local_queue() + + tweets_ids = get_user_tweet_ids(1390620618001838086, limit=20) + for id in tweets_ids: + ttweet = tt.TalentTweet(id) + print(ttweet) \ No newline at end of file diff --git a/src/main.py b/src/main.py index 3faf73c..4e3a2d1 100644 --- a/src/main.py +++ b/src/main.py @@ -1,64 +1,63 @@ -import sys -import argparse -from argparse import RawTextHelpFormatter - -import talent_lists -import secrets -import catchup -import listen - -from api import TwAPI -from util import is_cross_company, print_tweet - -MODES_HELP_STR = '''mode to run the bot at: -l,listen: listen for new tweets from all accounts; will not terminate unless error occurs -c,catchup: scan all tweets from all accounts; will terminate when done''' - -def init_argparse(): - p = argparse.ArgumentParser(description='Twitter bot that follows interactions between Nijisanji EN/ID and hololive EN/ID members.', formatter_class=RawTextHelpFormatter) - p.add_argument('mode', nargs='?', \ - help=MODES_HELP_STR) - p.add_argument('--show-tokens', action='store_true', help='[DO NOT USE IN PUBLIC SETTING] print stored tokens from secrets.ini') - return p - -def main(): - parser = init_argparse() - if len(sys.argv) < 2: - parser.print_help() - return - - args = parser.parse_args() - - if args.show_tokens: - print(secrets.get_all_secrets()) - - if args.mode is None: return - - ## We expect to run in some mode now. - - # Initialize shared API instance - twApi = TwAPI.instance = TwAPI() - - # Initialize talent account lists - talent_lists.init() - - ## TEST CODE ## - cross_pairs = twApi.get_users_cross_tweets_mentions(1390620618001838086) - for pair in cross_pairs: - print_tweet(pair) - - ## Determine running mode - match args.mode.lower(): - case 'l' | 'listen': - print('RUNNING IN LISTEN MODE\n') - listen.run() - case 'c' | 'catchup': - print('RUNNING IN CATCH-UP MODE\n') - catchup.run() - case _: - print('\ninvalid mode. run with no arguments or "-h" for help page, including mode list.') - return - - -if __name__ == "__main__": - main() +import sys +import argparse +from argparse import RawTextHelpFormatter + +import talent_lists +import api_secrets +import catchup +import listen +from api import TwAPI + +MODES_HELP_STR = '''mode to run the bot at: +l,listen: listen for new tweets from all accounts; will not terminate unless error occurs +c,catchup: scan all tweets from all accounts; will terminate when done''' + +def init_argparse(): + p = argparse.ArgumentParser(description='Twitter bot that follows interactions between Nijisanji EN/ID and hololive EN/ID members.', formatter_class=RawTextHelpFormatter) + p.add_argument('mode', nargs='?', \ + help=MODES_HELP_STR) + p.add_argument('--show-tokens', action='store_true', help='[DO NOT USE IN PUBLIC SETTING] print stored tokens from secrets.ini') + return p + +# TODO: implement command line mode for manually controlling the bot +def command_line(): + pass + +def main(): + parser = init_argparse() + if len(sys.argv) < 2: + parser.print_help() + return + + args = parser.parse_args() + + if args.show_tokens: + print(api_secrets.get_all_secrets()) + + if args.mode is None: return + + ## We expect to run in some mode now. + + # Initialize shared API instance + twApi = TwAPI.instance = TwAPI() + + # Initialize talent account lists + talent_lists.init() + + ## Determine running mode + match args.mode.lower(): + case 'l' | 'listen': + print('RUNNING IN LISTEN MODE\n') + listen.run() + case 'c' | 'catchup': + print('RUNNING IN CATCH-UP MODE\n') + catchup.run() + case _: + command_line() + #TODO: remove message + print('\ninvalid mode. run with no arguments or "-h" for help page, including mode list.') + return + + +if __name__ == "__main__": + main() diff --git a/src/talent_lists.py b/src/talent_lists.py index ae2d15a..9b49a43 100644 --- a/src/talent_lists.py +++ b/src/talent_lists.py @@ -1,21 +1,23 @@ -import util - -niji_en = dict() -holo_en = dict() - -def __create_dict(file, _dict): - with open(file, 'r') as f: - for line in f: - words = line.split() - if len(words) == 2 and line[0] != '#': - name, id = line.split() - _dict[int(id)] = name - -def init(): - global niji_en - global holo_en - - # holoEN - __create_dict(f'{util.get_project_dir()}/lists/holoen.txt', holo_en) - # nijiEN - __create_dict(f'{util.get_project_dir()}/lists/nijien.txt', niji_en) +import util + +niji_en = dict() +holo_en = dict() +talents = dict() + +def __create_dict(file, _dict): + global talents + with open(file, 'r') as f: + for line in f: + words = line.split() + if len(words) == 2 and line[0] != '#': + name, id = line.split() + _dict[int(id)] = name + talents[int(id)] = name +def init(): + global niji_en + global holo_en + + # holoEN + __create_dict(f'{util.get_project_dir()}/lists/holoen.txt', holo_en) + # nijiEN + __create_dict(f'{util.get_project_dir()}/lists/nijien.txt', niji_en) diff --git a/src/talenttweet.py b/src/talenttweet.py new file mode 100644 index 0000000..85e22fd --- /dev/null +++ b/src/talenttweet.py @@ -0,0 +1,69 @@ +import platform + +import tweepy + +from api import * +import talent_lists + +class TalentTweet: + def __init__(self, tweet: tweepy.Tweet, other_parties: set): + self.tweet = tweet + self.other_parties = other_parties + + def __init__(self, tweet_id): + resp = TwAPI.instance.client.get_tweet(tweet_id, + media_fields=TwAPI.TWEET_MEDIA_FIELDS, + tweet_fields=TwAPI.TWEET_FIELDS, + expansions=TwAPI.TWEET_EXPANSIONS) + + self.tweet = resp.data + self.other_parties = TwAPI.get_involved_parties(self.tweet, resp) + + def __repr__(self) -> str: + return ( + f'{self.tweet.id} from {talent_lists.talents.get(self.tweet.author_id, "???")}:\n' + f'{self.tweet.text}\n' + f'------------------------------------------------------\n' + f'{self.get_datetime_str()}\n' + f'{self.get_mentions_usernames()}\n' + f'Cross-company: {self.is_cross_company()}\n' + f'======================================================' + ) + + def is_cross_company(self): + author_id = self.tweet.author_id + mentions = self.other_parties + + # TODO: update for EN/ID + for mention_id in mentions: + if author_id in talent_lists.niji_en: + if mention_id in talent_lists.holo_en: + return True + elif author_id in talent_lists.holo_en: + if mention_id in talent_lists.niji_en: + return True + return False + + def get_mentions_usernames(self): + if len(self.other_parties) > 0: + s = str() + for id in self.other_parties: + s += f'{talent_lists.talents.get(id, "???")}, ' + return s[0:-2] + + return 'none' + + def get_datetime_str(self): + unpad = '#' if platform.system() == 'Windows' else '-' + return self.tweet.created_at.strftime(f'%b %{unpad}d %Y, %{unpad}I:%M%p (%Z)') + + +class TalentTweets: + def __init__(self): + self.ttweets = list() + + def get_ttweets(self): + pass + + def get_ttweet_ids(self): + pass \ No newline at end of file diff --git a/src/util.py b/src/util.py index 8ac4dc6..d4d2656 100644 --- a/src/util.py +++ b/src/util.py @@ -1,40 +1,16 @@ -## Shared utility functions. - -import os -import talent_lists - -# returns system path to this project, which is -# up one level from this file's directory (src). -def get_project_dir(): - return os.path.join(os.path.dirname(__file__), os.pardir) - -# determine if tweet involves cross-company interaction -def is_cross_company(pair: tuple): - author_id, mentions = pair[0].author_id, pair[1] - - for mention_id in mentions: - if author_id in talent_lists.niji_en: - if mention_id in talent_lists.holo_en: - return True - elif author_id in talent_lists.holo_en: - if mention_id in talent_lists.niji_en: - return True - return False - -def tweet_id_to_url(id): - return f'https://twitter.com/twitter/status/{id}' - -def print_tweet(pair: tuple): - tweet, mentions = pair - s = ( - f'{tweet.id}: {tweet.created_at}: involves {mentions}\n' - f'{tweet.text}\n' - f'-----\n' - f'{tweet.entities}\n' - f'{tweet.referenced_tweets}\n' - f'=================================================' - ) - print(s) - -def clamp(n, smallest, largest): +## Shared utility functions. + +import os +import talent_lists +import talenttweet as tt + +# returns system path to this project, which is +# up one level from this file's directory (src). +def get_project_dir(): + return os.path.join(os.path.dirname(__file__), os.pardir) + +def tweet_id_to_url(id): + return f'https://twitter.com/twitter/status/{id}' + +def clamp(n, smallest, largest): return max(smallest, min(n, largest)) \ No newline at end of file