update regex

This commit is contained in:
Junyi Hou
2024-08-11 20:25:41 +08:00
parent 2d1176b969
commit 94ab2b6535
3 changed files with 42 additions and 82 deletions
+6
View File
@@ -0,0 +1,6 @@
{
"black-formatter.args": [
"--line-length",
"200"
]
}
+34 -78
View File
@@ -5,7 +5,6 @@ import pickle
import re
import time
from concurrent.futures import ThreadPoolExecutor
from sqlite3 import Connection, Cursor
from selenium import webdriver
from selenium.common.exceptions import UnableToSetCookieException
@@ -14,16 +13,7 @@ from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from tqdm import tqdm
from utils import (
check_key,
db_close,
db_delete,
db_get_all_keys,
db_insert,
db_key_exists,
db_open,
db_remove_duplication,
)
from utils import check_key, db_close, db_delete, db_get_all_keys, db_insert, db_key_exists, db_open, db_remove_duplication
FORMAT = "%(message)s"
logging.basicConfig(level=logging.INFO, format=FORMAT, datefmt="[%X]")
@@ -33,13 +23,15 @@ log = logging.getLogger("ChatGPT-API-Leakage")
class APIKeyLeakageScanner:
def __init__(self, db_file: str, keywords: list, languages: list):
self.db_file = db_file
log.info(f"📂 Opening database file {self.db_file}")
self.con, self.cur = db_open(self.db_file)
self.keywords = keywords
self.languages = languages
self.candidate_urls = [
f"https://github.com/search?q={keyword}+AND+%28%2Fsk-%5Ba-zA-Z0-9%5D%7B48%7D%2F%29+language%3A{language}&type=code&ref=advsearch"
# f"https://github.com/search?q={keyword}+AND+%28%2Fsk-%5Ba-zA-Z0-9%5D%7B48%7D%2F%29+language%3A{language}&type=code&ref=advsearch"
f"https://github.com/search?q={keyword}+AND+%28%2Fsk-proj-%5BA-Za-z0-9%5D%7B20%7DT3BlbkFJ%5BA-Za-z0-9%5D%7B20%7D%2F%29+language%3A{language}&type=code&ref=advsearch"
for language in self.languages
for keyword in self.keywords
]
@@ -59,18 +51,9 @@ class APIKeyLeakageScanner:
self.driver.add_cookie(cookie)
except UnableToSetCookieException as e:
log.debug(f"🟡 Warning, unable to set a cookie {cookie}")
except EOFError as e:
if os.path.exists("cookies.pkl"):
os.remove("cookies.pkl")
log.error(
"🔴 Error, unable to load cookies, invalid cookies has been removed, please restart."
)
except pickle.UnpicklingError as e:
if os.path.exists("cookies.pkl"):
os.remove("cookies.pkl")
log.error(
"🔴 Error, load cookies failed, invalid cookies has been removed, please restart."
)
except (EOFError, pickle.UnpicklingError):
os.remove("cookies.pkl") if os.path.exists("cookies.pkl") else None
log.error("🔴 Error, unable to load cookies, invalid cookies has been removed, please restart.")
def _test_cookies(self):
"""
@@ -79,26 +62,18 @@ class APIKeyLeakageScanner:
log.info("🤗 Redirecting ...")
self.driver.get("https://github.com/")
if self.driver.find_elements(
by=By.XPATH, value="//*[contains(text(), 'Sign in')]"
):
if self.driver.find_elements(by=By.XPATH, value="//*[contains(text(), 'Sign in')]"):
return False
return True
def _hit_rate_limit(self):
return self.driver.find_elements(
by=By.XPATH,
value="//*[contains(text(), 'You have exceeded a secondary rate limit')]",
)
def login_to_github(self):
log.info("🌍 Opening Chrome ...")
self.options = webdriver.ChromeOptions()
self.options.add_argument("--ignore-certificate-errors")
self.options.add_argument("--ignore-ssl-errors")
options = webdriver.ChromeOptions()
options.add_argument("--ignore-certificate-errors")
options.add_argument("--ignore-ssl-errors")
self.driver = webdriver.Chrome(options=self.options)
self.driver = webdriver.Chrome(options=options)
self.driver.implicitly_wait(3)
cookie_exists = os.path.exists("cookies.pkl")
@@ -113,8 +88,7 @@ class APIKeyLeakageScanner:
self._load_cookies()
if not self._test_cookies():
if os.path.exists("cookies.pkl"):
os.remove("cookies.pkl")
os.remove("cookies.pkl") if os.path.exists("cookies.pkl") else None
log.error("🔴 Error, you are not logged in, please restart and try again.")
exit(1)
@@ -122,27 +96,20 @@ class APIKeyLeakageScanner:
def _process_url(self, url: str):
self.driver.get(url)
pattern = re.compile(r"sk-[a-zA-Z0-9]{48}")
pattern = re.compile(r"sk-proj-[A-Za-z0-9]{20}T3BlbkFJ[A-Za-z0-9]{20}")
while True:
# If current webpage is reached the rate limit, then wait for 30 seconds
if self._hit_rate_limit():
if self.driver.find_elements(by=By.XPATH, value="//*[contains(text(), 'You have exceeded a secondary rate limit')]"):
for _ in tqdm(range(30), desc="⏳ Rate limit reached, waiting ..."):
time.sleep(1)
self.driver.refresh()
continue
# Expand all the code
[
element.click()
for element in self.driver.find_elements(
by=By.XPATH, value="//*[contains(text(), 'more match')]"
)
]
[element.click() for element in self.driver.find_elements(by=By.XPATH, value="//*[contains(text(), 'more match')]")]
codes = self.driver.find_elements(
by=By.CLASS_NAME, value="code-list"
) # find all elements with class name 'f4'
codes = self.driver.find_elements(by=By.CLASS_NAME, value="code-list") # find all elements with class name 'f4'
for element in codes:
apis = pattern.findall(element.text)
if len(apis) == 0:
@@ -156,20 +123,12 @@ class APIKeyLeakageScanner:
for idx, result in enumerate(results):
db_insert(self.con, self.cur, apis[idx], result)
next_buttons = self.driver.find_elements(
by=By.XPATH, value="//a[@aria-label='Next Page']"
)
next_buttons = self.driver.find_elements(by=By.XPATH, value="//a[@aria-label='Next Page']")
try:
WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located(
(By.XPATH, "//a[@aria-label='Next Page']")
)
)
WebDriverWait(self.driver, 5).until(EC.presence_of_element_located((By.XPATH, "//a[@aria-label='Next Page']")))
next_buttons = self.driver.find_elements(
by=By.XPATH, value="//a[@aria-label='Next Page']"
)
next_buttons = self.driver.find_elements(by=By.XPATH, value="//a[@aria-label='Next Page']")
next_buttons[0].click()
except Exception as _:
# log.info(" ⚪️ No more pages")
@@ -177,25 +136,22 @@ class APIKeyLeakageScanner:
def _save_progress(self, from_iter: int):
with open(".progress.txt", "w") as file:
# Save the progress and timestamp
file.write(f"{from_iter}/{len(self.candidate_urls)}/{time.time()}")
def _load_progress(self):
if not os.path.exists(".progress.txt"):
return 0
with open(".progress.txt", "r") as file:
progress = file.read().strip().split("/")
last = int(progress[0])
totl = int(progress[1])
tmst = float(progress[2])
# if the time is less than 1 hour, then continue from the last progress
if time.time() - tmst < 3600 and totl == len(self.candidate_urls):
# ask the user if they want to continue from the last progress
action = input(f"🔍 Progress found, do you want to continue from the last progress ({last}/{totl})? [yes] | no: ")
if action.lower() == "yes" or action.lower() == "y" or action == "":
return int(progress[0])
else:
def load_progress(self):
progress_file = ".progress.txt"
if not os.path.exists(progress_file):
return 0
with open(progress_file, "r") as file:
last, totl, tmst = file.read().strip().split("/")
last, totl = int(last), int(totl)
if time.time() - float(tmst) < 3600 and totl == len(self.candidate_urls):
action = input(f"🔍 Progress found, do you want to continue from the last progress ({last}/{totl})? [yes] | no: ").lower()
if action in {"yes", "y", ""}:
return last
return 0
def search(self, from_iter: int = None):
@@ -206,7 +162,7 @@ class APIKeyLeakageScanner:
)
if from_iter is None:
from_iter = self._load_progress()
from_iter = self.load_progress()
for idx, url in enumerate(self.candidate_urls):
if idx < from_iter:
+1 -3
View File
@@ -14,9 +14,7 @@ def db_get_all_keys(cur: Cursor) -> list:
def db_remove_duplication(con: Connection, cur: Cursor) -> None:
cur.execute(
"CREATE TABLE temp_table as SELECT apiKey, status, MAX(lastChecked) as lastChecked FROM APIKeys GROUP BY apiKey;"
)
cur.execute("CREATE TABLE temp_table as SELECT apiKey, status, MAX(lastChecked) as lastChecked FROM APIKeys GROUP BY apiKey;")
cur.execute("DROP TABLE APIKeys;")
cur.execute("ALTER TABLE temp_table RENAME TO APIKeys;")
con.commit()