added twint (scraper), restructuring

This commit is contained in:
msk
2022-09-24 17:56:58 -07:00
parent 5b458f2e1f
commit 579929559f
8 changed files with 349 additions and 296 deletions
+96 -108
View File
@@ -1,109 +1,97 @@
from lib2to3.pgen2 import token
from math import inf
from urllib import response
import tweepy
import secrets
import util
class TwAPI:
instance = None
TWEET_MEDIA_FIELDS = ['url']
TWEET_FIELDS = ['created_at', 'in_reply_to_user_id']
TWEET_EXPANSIONS = ['entities.mentions.username', 'referenced_tweets.id.author_id']
def __init__(self):
TwAPI.instance = self
self.client = tweepy.Client(
bearer_token=secrets.bearer_token(),
consumer_key=secrets.api_key(), consumer_secret=secrets.api_secret(),
access_token=secrets.access_token(), access_token_secret=secrets.access_secret()
)
# Returns a set of involved parties for a single tweet.
#
# Tweet must have been queried with these parameters:
# media_fields=['url'],
# tweet_fields=['created_at', 'in_reply_to_user_id'],
# expansions=['entities.mentions.username', 'referenced_tweets.id.author_id']
@staticmethod
def get_involved_parties(tweet, response):
involved_parties = set()
# mentions
try:
mention_list = tweet.entities['mentions']
for mention in mention_list:
involved_parties.add(int(mention['id']))
except: pass
# reply-to
if tweet.in_reply_to_user_id != None:
involved_parties.add(tweet.in_reply_to_user_id)
# qrt
if tweet.attachments:
for ref_tweet in tweet.attachments:
if ref_tweet.type == 'quoted':
for incl_tweet in response.includes['tweets']:
if incl_tweet.id == ref_tweet.id:
involved_parties.add(incl_tweet.author_id)
return involved_parties
# Returns a tweet and mention-set pair, given a tweet ID.
def get_tweet_mentions(self, id):
resp = self.client.get_tweet(id,
media_fields=TwAPI.TWEET_MEDIA_FIELDS,
tweet_fields=TwAPI.TWEET_FIELDS,
expansions=TwAPI.TWEET_EXPANSIONS)
tweet = resp.data
mentions = TwAPI.get_involved_parties(tweet, resp)
return (tweet, mentions)
# Returns a list (tweet, {mentions}) from a user.
# mentions- a set comprised of any other parties involved
# in this tweet (reply, mention, qrt)
def get_users_all_tweets_mentions(self, id: int, count=inf):
pairs = list()
retrieve_size = util.clamp(count, 5, 100)
next_page_token = None
tokens_retrieved = 0
tweets_retrieved = 0
while tweets_retrieved < count:
print(f'Retrieved {tokens_retrieved} tokens so far...')
resp = self.client.get_users_tweets(id, max_results=retrieve_size, pagination_token=next_page_token,
media_fields=TwAPI.TWEET_MEDIA_FIELDS,
tweet_fields=TwAPI.TWEET_FIELDS,
expansions=TwAPI.TWEET_EXPANSIONS)
for tweet in resp.data:
mentions = TwAPI.get_involved_parties(tweet, resp)
pairs.append((tweet, mentions))
# update counters and pagination token
tweets_retrieved += resp.meta['result_count']
if tweets_retrieved < count:
try:
next_page_token = resp.meta['next_token']
tokens_retrieved += 1
except KeyError:
print("next_token wasn't provided; we've reached the end!")
break # reached end of user's tweets
print(f'Retrieved {tweets_retrieved} tweets using {tokens_retrieved} tokens.')
return pairs
# returns a filtered list (tweet, [mentions]) from a user
def get_users_cross_tweets_mentions(self, id):
ret = list()
pairs = self.get_users_all_tweets_mentions(id)
for pair in pairs:
if util.is_cross_company(pair):
ret.append(pair)
return ret
# Create a post that showcases given tweet and its mentions set.
def create_post(self, tweet, mentions):
from lib2to3.pgen2 import token
from math import inf
from urllib import response
import tweepy
import api_secrets
import talenttweet as tt
import util
class TwAPI:
instance = None
TWEET_MEDIA_FIELDS = ['url']
TWEET_FIELDS = ['created_at', 'in_reply_to_user_id']
TWEET_EXPANSIONS = ['entities.mentions.username', 'referenced_tweets.id.author_id']
# Returns a set of involved parties for a single tweet.
#
# Tweet must have been queried with these parameters:
# media_fields=['url'],
# tweet_fields=['created_at', 'in_reply_to_user_id'],
# expansions=['entities.mentions.username', 'referenced_tweets.id.author_id']
@staticmethod
def get_involved_parties(tweet, response):
involved_parties = set()
# mentions
try:
mention_list = tweet.entities['mentions']
for mention in mention_list:
involved_parties.add(int(mention['id']))
except: pass
# reply-to
if tweet.in_reply_to_user_id != None:
involved_parties.add(tweet.in_reply_to_user_id)
# qrt
if tweet.attachments:
for ref_tweet in tweet.attachments:
if ref_tweet.type == 'quoted':
for incl_tweet in response.includes['tweets']:
if incl_tweet.id == ref_tweet.id:
involved_parties.add(incl_tweet.author_id)
return involved_parties
def __init__(self):
TwAPI.instance = self
self.client = tweepy.Client(
bearer_token=api_secrets.bearer_token(),
consumer_key=api_secrets.api_key(), consumer_secret=api_secrets.api_secret(),
access_token=api_secrets.access_token(), access_token_secret=api_secrets.access_secret()
)
# Returns a list of TalentTweets from a user.
def get_users_all_tweets_mentions(self, id: int, count=inf):
ttweets = list()
retrieve_size = util.clamp(count, 5, 100)
next_page_token = None
tokens_retrieved = 0
tweets_retrieved = 0
while tweets_retrieved < count:
print(f'Retrieved {tokens_retrieved} tokens so far...')
resp = self.client.get_users_tweets(id, max_results=retrieve_size, pagination_token=next_page_token,
media_fields=TwAPI.TWEET_MEDIA_FIELDS,
tweet_fields=TwAPI.TWEET_FIELDS,
expansions=TwAPI.TWEET_EXPANSIONS)
for tweet in resp.data:
mentions = TwAPI.get_involved_parties(tweet, resp)
ttweets.append(tt.TalentTweet(tweet=tweet, other_parties=mentions))
# update counters and pagination token
tweets_retrieved += resp.meta['result_count']
if tweets_retrieved < count:
try:
next_page_token = resp.meta['next_token']
tokens_retrieved += 1
except KeyError:
print("next_token wasn't provided; we've reached the end!")
break # reached end of user's tweets
print(f'Retrieved {tweets_retrieved} tweets using {tokens_retrieved} tokens.')
return ttweets
# Returns a list of cross-company TalentTweets from a user.
def get_users_cross_tweets_mentions(self, id):
ret = list()
ttweets = self.get_users_all_tweets_mentions(id)
for ttweet in ttweets:
if ttweet.is_cross_company():
ret.append(ttweet)
return ret
# Create a post that showcases given tweet and its mentions set.
def create_post(self, tweet, mentions):
pass
+41 -41
View File
@@ -1,42 +1,42 @@
## Twitter developer credentials management.
import os
import configparser
from util import *
# returns dictionary of the Credentials section.
# [NOT TO BE USED OUTSIDE OF THIS FILE.]
def __get_ini_credentials():
c = configparser.RawConfigParser()
if len(c.read(os.path.join(get_project_dir(), 'secrets.ini'))) > 0 and c.has_section('Credentials'):
return c['Credentials']
return None
# returns the consumer api_key stored in secrets.ini
def api_key():
c = __get_ini_credentials()
return c.get(option='api_key', fallback='xxx') if c is not None else 'xxx'
# returns the consumer api_secret stored in secrets.ini
def api_secret():
c = __get_ini_credentials()
return c.get(option='api_secret', fallback='yyy') if c is not None else 'yyy'
# returns the bearer_token stored in secrets.ini
def bearer_token():
c = __get_ini_credentials()
return c.get(option='bearer_token', fallback='zzz') if c is not None else 'zzz'
# returns the access_token stroed in secrets.ini
def access_token():
c = __get_ini_credentials()
return c.get(option='oauth1_access_token', fallback='zzz') if c is not None else 'aaa'
# returns the access_secret stroed in secrets.ini
def access_secret():
c = __get_ini_credentials()
return c.get(option='oauth1_access_secret', fallback='zzz') if c is not None else 'bbb'
def get_all_secrets():
## Twitter developer credentials management.
import os
import configparser
import util
# returns dictionary of the Credentials section.
# [NOT TO BE USED OUTSIDE OF THIS FILE.]
def __get_ini_credentials():
c = configparser.RawConfigParser()
if len(c.read(os.path.join(util.get_project_dir(), 'secrets.ini'))) > 0 and c.has_section('Credentials'):
return c['Credentials']
return None
# returns the consumer api_key stored in secrets.ini
def api_key():
c = __get_ini_credentials()
return c.get(option='api_key', fallback='xxx') if c is not None else 'xxx'
# returns the consumer api_secret stored in secrets.ini
def api_secret():
c = __get_ini_credentials()
return c.get(option='api_secret', fallback='yyy') if c is not None else 'yyy'
# returns the bearer_token stored in secrets.ini
def bearer_token():
c = __get_ini_credentials()
return c.get(option='bearer_token', fallback='zzz') if c is not None else 'zzz'
# returns the access_token stroed in secrets.ini
def access_token():
c = __get_ini_credentials()
return c.get(option='oauth1_access_token', fallback='zzz') if c is not None else 'aaa'
# returns the access_secret stroed in secrets.ini
def access_secret():
c = __get_ini_credentials()
return c.get(option='oauth1_access_secret', fallback='zzz') if c is not None else 'bbb'
def get_all_secrets():
return f'api_key:{api_key()}\napi_secret:{api_secret()}\nbearer_token:{bearer_token()}\naccess_token:{access_token()}\naccess_secret:{access_secret()}'
+39 -21
View File
@@ -1,21 +1,39 @@
## The bot's catch-up mode
# Scan all accounts for cross-company interactions.
# Terminates when finished scanning and posting.
#
# We should post, at the fastest, one tweet per minute.
import os
from util import *
from api import TwAPI
## Returns list of tweets present in queue.txt
def get_local_queue():
# f = open(os.path.join(get_project_dir(), 'queue.txt'))
pass
def run():
queue = get_local_queue()
pairs = TwAPI.instance.get_users_all_tweets_mentions(1390620618001838086, count=5)
for (tweet, mentions) in pairs:
print_tweet(tweet, mentions)
## The bot's catch-up mode
# Scan all accounts for cross-company interactions.
# Terminates when finished scanning and posting.
#
# We should post, at the fastest, one tweet per minute.
import os
import twint
from util import *
from talent_lists import *
from api import TwAPI
import talenttweet as tt
## Returns list of tweets present in queue.txt
def get_local_queue():
# f = open(os.path.join(get_project_dir(), 'queue.txt'))
pass
## Returns the ID of all tweets (up to limit) from a user ID.
def get_user_tweet_ids(id, limit=None):
tweets = list()
c = twint.Config()
c.User_id = id
c.Limit = limit
c.Store_object = True
c.Store_object_tweets_list = tweets
twint.run.Search(c)
return [x.id for x in tweets]
def run():
queue = get_local_queue()
tweets_ids = get_user_tweet_ids(1390620618001838086, limit=20)
for id in tweets_ids:
ttweet = tt.TalentTweet(id)
print(ttweet)
+63 -64
View File
@@ -1,64 +1,63 @@
import sys
import argparse
from argparse import RawTextHelpFormatter
import talent_lists
import secrets
import catchup
import listen
from api import TwAPI
from util import is_cross_company, print_tweet
MODES_HELP_STR = '''mode to run the bot at:
l,listen: listen for new tweets from all accounts; will not terminate unless error occurs
c,catchup: scan all tweets from all accounts; will terminate when done'''
def init_argparse():
p = argparse.ArgumentParser(description='Twitter bot that follows interactions between Nijisanji EN/ID and hololive EN/ID members.', formatter_class=RawTextHelpFormatter)
p.add_argument('mode', nargs='?', \
help=MODES_HELP_STR)
p.add_argument('--show-tokens', action='store_true', help='[DO NOT USE IN PUBLIC SETTING] print stored tokens from secrets.ini')
return p
def main():
parser = init_argparse()
if len(sys.argv) < 2:
parser.print_help()
return
args = parser.parse_args()
if args.show_tokens:
print(secrets.get_all_secrets())
if args.mode is None: return
## We expect to run in some mode now.
# Initialize shared API instance
twApi = TwAPI.instance = TwAPI()
# Initialize talent account lists
talent_lists.init()
## TEST CODE ##
cross_pairs = twApi.get_users_cross_tweets_mentions(1390620618001838086)
for pair in cross_pairs:
print_tweet(pair)
## Determine running mode
match args.mode.lower():
case 'l' | 'listen':
print('RUNNING IN LISTEN MODE\n')
listen.run()
case 'c' | 'catchup':
print('RUNNING IN CATCH-UP MODE\n')
catchup.run()
case _:
print('\ninvalid mode. run with no arguments or "-h" for help page, including mode list.')
return
if __name__ == "__main__":
main()
import sys
import argparse
from argparse import RawTextHelpFormatter
import talent_lists
import api_secrets
import catchup
import listen
from api import TwAPI
MODES_HELP_STR = '''mode to run the bot at:
l,listen: listen for new tweets from all accounts; will not terminate unless error occurs
c,catchup: scan all tweets from all accounts; will terminate when done'''
def init_argparse():
p = argparse.ArgumentParser(description='Twitter bot that follows interactions between Nijisanji EN/ID and hololive EN/ID members.', formatter_class=RawTextHelpFormatter)
p.add_argument('mode', nargs='?', \
help=MODES_HELP_STR)
p.add_argument('--show-tokens', action='store_true', help='[DO NOT USE IN PUBLIC SETTING] print stored tokens from secrets.ini')
return p
# TODO: implement command line mode for manually controlling the bot
def command_line():
pass
def main():
parser = init_argparse()
if len(sys.argv) < 2:
parser.print_help()
return
args = parser.parse_args()
if args.show_tokens:
print(api_secrets.get_all_secrets())
if args.mode is None: return
## We expect to run in some mode now.
# Initialize shared API instance
twApi = TwAPI.instance = TwAPI()
# Initialize talent account lists
talent_lists.init()
## Determine running mode
match args.mode.lower():
case 'l' | 'listen':
print('RUNNING IN LISTEN MODE\n')
listen.run()
case 'c' | 'catchup':
print('RUNNING IN CATCH-UP MODE\n')
catchup.run()
case _:
command_line()
#TODO: remove message
print('\ninvalid mode. run with no arguments or "-h" for help page, including mode list.')
return
if __name__ == "__main__":
main()
+23 -21
View File
@@ -1,21 +1,23 @@
import util
niji_en = dict()
holo_en = dict()
def __create_dict(file, _dict):
with open(file, 'r') as f:
for line in f:
words = line.split()
if len(words) == 2 and line[0] != '#':
name, id = line.split()
_dict[int(id)] = name
def init():
global niji_en
global holo_en
# holoEN
__create_dict(f'{util.get_project_dir()}/lists/holoen.txt', holo_en)
# nijiEN
__create_dict(f'{util.get_project_dir()}/lists/nijien.txt', niji_en)
import util
niji_en = dict()
holo_en = dict()
talents = dict()
def __create_dict(file, _dict):
global talents
with open(file, 'r') as f:
for line in f:
words = line.split()
if len(words) == 2 and line[0] != '#':
name, id = line.split()
_dict[int(id)] = name
talents[int(id)] = name
def init():
global niji_en
global holo_en
# holoEN
__create_dict(f'{util.get_project_dir()}/lists/holoen.txt', holo_en)
# nijiEN
__create_dict(f'{util.get_project_dir()}/lists/nijien.txt', niji_en)
+69
View File
@@ -0,0 +1,69 @@
import platform
import tweepy
from api import *
import talent_lists
class TalentTweet:
def __init__(self, tweet: tweepy.Tweet, other_parties: set):
self.tweet = tweet
self.other_parties = other_parties
def __init__(self, tweet_id):
resp = TwAPI.instance.client.get_tweet(tweet_id,
media_fields=TwAPI.TWEET_MEDIA_FIELDS,
tweet_fields=TwAPI.TWEET_FIELDS,
expansions=TwAPI.TWEET_EXPANSIONS)
self.tweet = resp.data
self.other_parties = TwAPI.get_involved_parties(self.tweet, resp)
def __repr__(self) -> str:
return (
f'{self.tweet.id} from {talent_lists.talents.get(self.tweet.author_id, "???")}:\n'
f'{self.tweet.text}\n'
f'------------------------------------------------------\n'
f'{self.get_datetime_str()}\n'
f'{self.get_mentions_usernames()}\n'
f'Cross-company: {self.is_cross_company()}\n'
f'======================================================'
)
def is_cross_company(self):
author_id = self.tweet.author_id
mentions = self.other_parties
# TODO: update for EN/ID
for mention_id in mentions:
if author_id in talent_lists.niji_en:
if mention_id in talent_lists.holo_en:
return True
elif author_id in talent_lists.holo_en:
if mention_id in talent_lists.niji_en:
return True
return False
def get_mentions_usernames(self):
if len(self.other_parties) > 0:
s = str()
for id in self.other_parties:
s += f'{talent_lists.talents.get(id, "???")}, '
return s[0:-2]
return 'none'
def get_datetime_str(self):
unpad = '#' if platform.system() == 'Windows' else '-'
return self.tweet.created_at.strftime(f'%b %{unpad}d %Y, %{unpad}I:%M%p (%Z)')
class TalentTweets:
def __init__(self):
self.ttweets = list()
def get_ttweets(self):
pass
def get_ttweet_ids(self):
pass
+15 -39
View File
@@ -1,40 +1,16 @@
## Shared utility functions.
import os
import talent_lists
# returns system path to this project, which is
# up one level from this file's directory (src).
def get_project_dir():
return os.path.join(os.path.dirname(__file__), os.pardir)
# determine if tweet involves cross-company interaction
def is_cross_company(pair: tuple):
author_id, mentions = pair[0].author_id, pair[1]
for mention_id in mentions:
if author_id in talent_lists.niji_en:
if mention_id in talent_lists.holo_en:
return True
elif author_id in talent_lists.holo_en:
if mention_id in talent_lists.niji_en:
return True
return False
def tweet_id_to_url(id):
return f'https://twitter.com/twitter/status/{id}'
def print_tweet(pair: tuple):
tweet, mentions = pair
s = (
f'{tweet.id}: {tweet.created_at}: involves {mentions}\n'
f'{tweet.text}\n'
f'-----\n'
f'{tweet.entities}\n'
f'{tweet.referenced_tweets}\n'
f'================================================='
)
print(s)
def clamp(n, smallest, largest):
## Shared utility functions.
import os
import talent_lists
import talenttweet as tt
# returns system path to this project, which is
# up one level from this file's directory (src).
def get_project_dir():
return os.path.join(os.path.dirname(__file__), os.pardir)
def tweet_id_to_url(id):
return f'https://twitter.com/twitter/status/{id}'
def clamp(n, smallest, largest):
return max(smallest, min(n, largest))