2022-09-24 17:56:58 -07:00
|
|
|
## Shared utility functions.
|
|
|
|
|
|
|
|
|
|
import os
|
2022-09-27 22:04:26 -07:00
|
|
|
import traceback
|
|
|
|
|
import datetime
|
2022-09-25 03:39:15 -07:00
|
|
|
|
2022-09-28 20:00:02 -07:00
|
|
|
import tweepy
|
2022-09-27 02:49:03 -07:00
|
|
|
import pytz
|
2022-09-25 03:39:15 -07:00
|
|
|
import twint
|
2022-09-27 22:04:26 -07:00
|
|
|
import twapi
|
2022-09-25 03:39:15 -07:00
|
|
|
from tweetcapture import TweetCapture
|
|
|
|
|
|
2022-11-22 21:46:42 -08:00
|
|
|
from recrop import fix_aspect_ratio
|
2022-09-25 18:31:50 -07:00
|
|
|
import talent_lists
|
2022-09-24 17:56:58 -07:00
|
|
|
|
|
|
|
|
# returns system path to this project, which is
|
2022-09-25 18:31:50 -07:00
|
|
|
# up one level from this file's directory (effective path: ..../src/../).
|
2022-09-24 17:56:58 -07:00
|
|
|
def get_project_dir():
|
|
|
|
|
return os.path.join(os.path.dirname(__file__), os.pardir)
|
|
|
|
|
|
2022-09-28 20:00:02 -07:00
|
|
|
def get_queue_path():
|
|
|
|
|
return f'{get_project_dir()}/queue.txt'
|
|
|
|
|
|
|
|
|
|
def get_queue_backup_path():
|
|
|
|
|
return f'{get_project_dir()}/_queue_backup.txt'
|
|
|
|
|
|
2022-09-24 17:56:58 -07:00
|
|
|
def clamp(n, smallest, largest):
|
2022-09-25 03:39:15 -07:00
|
|
|
return max(smallest, min(n, largest))
|
|
|
|
|
|
2022-09-26 14:44:46 -07:00
|
|
|
def datetime_to_tdate(date_time: datetime.datetime):
|
|
|
|
|
return date_time.strftime("%Y-%m-%d")
|
|
|
|
|
|
|
|
|
|
def tdate_to_datetime(tdate: str):
|
|
|
|
|
return datetime.datetime.strptime("%Y-%m-%d")
|
|
|
|
|
|
2022-09-27 02:49:03 -07:00
|
|
|
def timestamp_to_tdate(timestamp=None):
|
|
|
|
|
if timestamp==None:
|
|
|
|
|
timestamp = datetime.datetime.now().timestamp()
|
|
|
|
|
return datetime_to_tdate(datetime.datetime.fromtimestamp(timestamp, tz=pytz.utc))
|
|
|
|
|
|
2022-09-28 20:00:02 -07:00
|
|
|
def get_current_timestamp():
|
|
|
|
|
return datetime.datetime.now().timestamp()
|
|
|
|
|
|
2022-09-27 02:49:03 -07:00
|
|
|
def get_key_from_value(d, val):
|
|
|
|
|
keys = [k for k, v in d.items() if v == val]
|
|
|
|
|
if keys:
|
|
|
|
|
return keys[0]
|
|
|
|
|
return None
|
|
|
|
|
|
2022-09-25 03:39:15 -07:00
|
|
|
async def create_ttweet_image(ttweet):
|
|
|
|
|
tc = TweetCapture()
|
2023-01-11 17:11:41 -08:00
|
|
|
tc.driver_path = '/usr/bin/chromedriver' # Linux chromedriver path
|
2022-09-27 15:09:09 -07:00
|
|
|
filename = f'{get_project_dir()}/img.png'
|
2022-09-25 03:39:15 -07:00
|
|
|
url = ttweet_to_url(ttweet)
|
|
|
|
|
img = None
|
2022-10-01 13:33:20 -07:00
|
|
|
print(url)
|
2022-09-25 03:39:15 -07:00
|
|
|
try: os.remove(filename)
|
|
|
|
|
except: pass
|
|
|
|
|
try:
|
|
|
|
|
img = await tc.screenshot(
|
|
|
|
|
url=url,
|
|
|
|
|
path=filename,
|
|
|
|
|
mode=4,
|
2022-12-12 23:23:41 -08:00
|
|
|
night_mode=1,
|
|
|
|
|
show_parent_tweets=True
|
2022-09-25 03:39:15 -07:00
|
|
|
)
|
2022-11-22 21:46:42 -08:00
|
|
|
img = fix_aspect_ratio(img)
|
2022-09-25 03:39:15 -07:00
|
|
|
except:
|
|
|
|
|
print('unable to create tweet image')
|
2022-09-28 03:04:11 -07:00
|
|
|
traceback.print_exc()
|
2022-09-25 03:39:15 -07:00
|
|
|
return None
|
|
|
|
|
else:
|
|
|
|
|
print(f'successfully saved {img}')
|
|
|
|
|
return img
|
|
|
|
|
|
2022-09-28 13:33:31 -07:00
|
|
|
def get_tweet_url(id, username):
|
|
|
|
|
return f'https://twitter.com/{username}/status/{id}'
|
|
|
|
|
|
2022-09-25 03:39:15 -07:00
|
|
|
def ttweet_to_url(ttweet):
|
2022-09-28 13:33:31 -07:00
|
|
|
username = get_username(ttweet.author_id)
|
|
|
|
|
return get_tweet_url(ttweet.tweet_id, username)
|
2022-09-25 03:39:15 -07:00
|
|
|
|
2022-09-27 22:04:26 -07:00
|
|
|
def get_username_local(id):
|
|
|
|
|
return talent_lists.talents.get(id, f'{id}')
|
2022-09-25 18:31:50 -07:00
|
|
|
|
2022-09-27 22:04:26 -07:00
|
|
|
# twint
|
|
|
|
|
# May not work with short user IDs (ie. 1354241437)
|
|
|
|
|
# def get_username_online(id, default=None):
|
|
|
|
|
# c = twint.Config()
|
|
|
|
|
# c.User_id = id
|
|
|
|
|
# c.Store_object = True
|
|
|
|
|
# c.Hide_output = True
|
|
|
|
|
# try:
|
|
|
|
|
# twint.output.users_list.clear()
|
|
|
|
|
# twint.run.Lookup(c)
|
|
|
|
|
# user = twint.output.users_list[0]
|
|
|
|
|
# return user.username
|
|
|
|
|
# except:
|
|
|
|
|
# return str(default) if default is not None else f'{id}'
|
|
|
|
|
|
|
|
|
|
# API v2 (tweepy)
|
|
|
|
|
# Short user IDs (ie. 1354241437) apparently don't work with twint
|
|
|
|
|
def get_username_online(id, default=None):
|
2022-09-25 03:39:15 -07:00
|
|
|
try:
|
2022-09-27 22:04:26 -07:00
|
|
|
resp = twapi.TwAPI.instance.client.get_user(id=id)
|
|
|
|
|
return resp.data.username
|
2022-09-28 20:00:02 -07:00
|
|
|
except tweepy.TooManyRequests:
|
|
|
|
|
return str(default) if default is not None else f'id:{id}'
|
2022-09-25 03:39:15 -07:00
|
|
|
except:
|
2022-09-27 22:04:26 -07:00
|
|
|
print(f'Unhandled error retrieving username for {id}!')
|
|
|
|
|
traceback.print_exc()
|
2022-09-28 13:33:31 -07:00
|
|
|
return str(default) if default is not None else f'id:{id}'
|
2022-09-27 22:04:26 -07:00
|
|
|
|
|
|
|
|
## Attempt to pull username from local; pull from online if doesn't exist.
|
|
|
|
|
def get_username(id):
|
|
|
|
|
ret = talent_lists.talents.get(id, None)
|
|
|
|
|
if ret == None:
|
|
|
|
|
return get_username_online(id)
|
2022-09-28 13:33:31 -07:00
|
|
|
return ret
|
2022-09-27 02:49:03 -07:00
|
|
|
|
|
|
|
|
def get_user_id_local(username) -> int:
|
|
|
|
|
talent_usernames = list(talent_lists.talents.values())
|
|
|
|
|
for i in range(0, len(talent_usernames)):
|
|
|
|
|
if username.lower() == talent_usernames[i].lower():
|
|
|
|
|
return list(talent_lists.talents)[i]
|
|
|
|
|
|
|
|
|
|
def get_user_id_online(username) -> int:
|
|
|
|
|
c = twint.Config()
|
|
|
|
|
c.Username = username
|
|
|
|
|
c.Store_object = True
|
|
|
|
|
c.Hide_output = True
|
|
|
|
|
try:
|
|
|
|
|
twint.output.users_list.clear()
|
|
|
|
|
twint.run.Lookup(c)
|
|
|
|
|
user = twint.output.users_list[0]
|
|
|
|
|
return user.id
|
|
|
|
|
except:
|
2022-12-12 23:23:41 -08:00
|
|
|
return -1
|