mirror of
https://github.com/Priyanshu-hawk/ChatGPT-unofficial-api-selenium.git
synced 2026-06-02 06:03:34 +02:00
Merge pull request #24 from actuallyrizzn/enhanced-stability
Enhanced Chrome Driver Management and Stability Improvements
This commit is contained in:
+368
-255
@@ -1,84 +1,155 @@
|
||||
import os
|
||||
import time
|
||||
import pyperclip
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.chrome.service import Service
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.webdriver.common.by import By
|
||||
|
||||
from selenium.common.exceptions import TimeoutException
|
||||
|
||||
import time
|
||||
|
||||
from selenium.webdriver.remote.webdriver import WebDriver
|
||||
from selenium.common.exceptions import TimeoutException, WebDriverException
|
||||
from selenium.webdriver.chrome.service import Service
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
import undetected_chromedriver as uc
|
||||
import helper_fn
|
||||
import logging
|
||||
|
||||
import os
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
datefmt='%m/%d/%Y %I:%M:%S %p'
|
||||
)
|
||||
|
||||
import chrome_handler
|
||||
import helper_funcs
|
||||
import sys
|
||||
class ChromeDriverManager:
|
||||
def __init__(self):
|
||||
self.driver = None
|
||||
self.max_retries = 3
|
||||
self.retry_delay = 5
|
||||
self.setup_chrome_options()
|
||||
|
||||
# current file path
|
||||
current_file_path = os.path.dirname(os.path.realpath(__file__))
|
||||
user_data_folder = current_file_path + "/chromedata"
|
||||
chrome_profile = "Default"
|
||||
|
||||
def load_chrome():
|
||||
options = Options()
|
||||
options.add_argument(f"--user-data-dir={user_data_folder}")
|
||||
options.add_argument(f'--profile-directory={chrome_profile}')
|
||||
|
||||
global helper_fn, driver
|
||||
|
||||
driver = uc.Chrome(options=options)
|
||||
helper_fn = helper_funcs.HelperFn(driver)
|
||||
|
||||
|
||||
def start_chat_gpt():
|
||||
load_chrome()
|
||||
driver.maximize_window()
|
||||
driver.get("https://chatgpt.com/")
|
||||
time.sleep(3)
|
||||
|
||||
'''
|
||||
This function only returns raw text, with no formatting... For formatted markdown text, I've made make_gpt_request_and_copy()
|
||||
'''
|
||||
def make_gpt_request(text):
|
||||
time.sleep(1)
|
||||
text_area_xpath = "//*[@id='prompt-textarea']"
|
||||
helper_fn.wait_for_element(text_area_xpath)
|
||||
if helper_fn.is_element_present(text_area_xpath):
|
||||
text_area = helper_fn.find_element(text_area_xpath)
|
||||
text_area.send_keys(text)
|
||||
|
||||
# send button
|
||||
send_btn_xpath = "//*[@data-testid='send-button']"
|
||||
def setup_chrome_options(self):
|
||||
"""Set up Chrome options for optimal stability."""
|
||||
self.options = uc.ChromeOptions()
|
||||
|
||||
# Wait until the button with data-testid="send-button" is present
|
||||
WebDriverWait(driver, 120).until(
|
||||
EC.presence_of_element_located((By.XPATH, send_btn_xpath))
|
||||
)
|
||||
# Create user data directory if it doesn't exist
|
||||
user_data_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'chrome_user_data')
|
||||
os.makedirs(user_data_dir, exist_ok=True)
|
||||
|
||||
send_btn = helper_fn.find_element(send_btn_xpath)
|
||||
time.sleep(1)
|
||||
send_btn.click()
|
||||
# Add user data directory argument
|
||||
self.options.add_argument(f'--user-data-dir={user_data_dir}')
|
||||
|
||||
# Existing options
|
||||
self.options.add_argument('--no-sandbox')
|
||||
self.options.add_argument('--disable-dev-shm-usage')
|
||||
self.options.add_argument('--disable-gpu')
|
||||
self.options.add_argument('--disable-software-rasterizer')
|
||||
self.options.add_argument('--disable-extensions')
|
||||
self.options.add_argument('--disable-notifications')
|
||||
self.options.add_argument('--disable-popup-blocking')
|
||||
self.options.add_argument('--ignore-certificate-errors')
|
||||
self.options.add_argument('--start-maximized')
|
||||
self.options.add_argument('--disable-blink-features=AutomationControlled')
|
||||
self.options.add_argument('--disable-logging')
|
||||
self.options.add_argument('--log-level=3')
|
||||
self.options.add_argument('--silent')
|
||||
self.options.add_argument('--disable-automation')
|
||||
self.options.add_argument('--disable-infobars')
|
||||
self.options.add_argument('--disable-blink-features')
|
||||
self.options.add_argument('--disable-blink-features=AutomationControlled')
|
||||
self.options.add_argument('--disable-web-security')
|
||||
self.options.add_argument('--allow-running-insecure-content')
|
||||
|
||||
helper_fn.wait_for_x_seconds(5)
|
||||
# waiting for response
|
||||
response_xpath_light = "//*[@class='markdown prose w-full break-words dark:prose-invert light']" # for light mode
|
||||
response_xpath_dark = "//*[@class='markdown prose w-full break-words dark:prose-invert dark']" # for dark mode
|
||||
regenrate_xpath = '//*[@id="__next"]/div[1]/div[2]/main/div[1]/div[2]/div[1]/div/form/div/div[2]/div/div/button'
|
||||
|
||||
# Change this line to wait for send button instead of regenrate button
|
||||
WebDriverWait(driver, 120).until(
|
||||
EC.presence_of_element_located((By.XPATH, send_btn_xpath))
|
||||
)
|
||||
def initialize_driver(self):
|
||||
"""Initialize Chrome driver with retries."""
|
||||
for attempt in range(self.max_retries):
|
||||
try:
|
||||
if self.driver:
|
||||
try:
|
||||
self.driver.quit()
|
||||
except:
|
||||
pass
|
||||
|
||||
self.driver = uc.Chrome(options=self.options)
|
||||
self.driver.set_page_load_timeout(30)
|
||||
helper_fn.set_driver(self.driver) # Set the driver in helper_fn
|
||||
logging.info("Chrome driver initialized successfully")
|
||||
return True
|
||||
except Exception as e:
|
||||
logging.error(f"Attempt {attempt + 1} failed to initialize Chrome driver: {str(e)}")
|
||||
if attempt < self.max_retries - 1:
|
||||
time.sleep(self.retry_delay)
|
||||
else:
|
||||
logging.error("Failed to initialize Chrome driver after all retries")
|
||||
raise
|
||||
|
||||
response_xpath = response_xpath_dark if helper_fn.is_element_present(response_xpath_dark) else response_xpath_light # check for dark mode or light mode
|
||||
if helper_fn.is_element_present(response_xpath):
|
||||
helper_fn.wait_for_x_seconds(1)
|
||||
response = helper_fn.find_elements(response_xpath)[-1]
|
||||
return response.text # will return all the textual information under that particular xpath
|
||||
def ensure_driver_alive(self):
|
||||
"""Check if driver is alive and reconnect if necessary."""
|
||||
try:
|
||||
# Try to get window handles to check if driver is responsive
|
||||
self.driver.window_handles
|
||||
return True
|
||||
except:
|
||||
logging.warning("Chrome driver appears to be dead, attempting to reconnect...")
|
||||
return self.initialize_driver()
|
||||
|
||||
def quit(self):
|
||||
"""Safely quit the Chrome driver."""
|
||||
if self.driver:
|
||||
try:
|
||||
self.driver.quit()
|
||||
except:
|
||||
pass
|
||||
self.driver = None
|
||||
|
||||
# Initialize the global ChromeDriverManager
|
||||
driver_manager = ChromeDriverManager()
|
||||
driver = None
|
||||
|
||||
def initialize_chrome():
|
||||
"""Initialize Chrome and navigate to ChatGPT."""
|
||||
global driver
|
||||
try:
|
||||
if driver_manager.initialize_driver():
|
||||
driver = driver_manager.driver
|
||||
driver.get("https://chat.openai.com/")
|
||||
logging.info("Successfully navigated to ChatGPT")
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to initialize Chrome: {str(e)}")
|
||||
return False
|
||||
|
||||
def ensure_chrome_alive():
|
||||
"""Ensure Chrome is alive and reconnect if necessary."""
|
||||
global driver
|
||||
if driver_manager.ensure_driver_alive():
|
||||
driver = driver_manager.driver
|
||||
return True
|
||||
return False
|
||||
|
||||
def is_response_complete():
|
||||
"""Check if the response is complete using multiple methods."""
|
||||
try:
|
||||
# Method 1: Check for the presence of typing indicators
|
||||
typing_indicators = driver.find_elements(By.XPATH, "//div[contains(@class, 'result-streaming')]")
|
||||
if not typing_indicators:
|
||||
logging.info("No typing indicators found")
|
||||
return True
|
||||
|
||||
# Method 2: Check if the response text has stopped changing
|
||||
response_container_xpath = "//div[contains(@class, 'markdown prose w-full break-words')]"
|
||||
current_response = driver.find_elements(By.XPATH, response_container_xpath)[-1].text
|
||||
time.sleep(2) # Brief wait
|
||||
new_response = driver.find_elements(By.XPATH, response_container_xpath)[-1].text
|
||||
if current_response == new_response and current_response != "":
|
||||
logging.info("Response text has stabilized")
|
||||
return True
|
||||
|
||||
logging.info("Response still in progress...")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logging.warning(f"Error checking response completion: {str(e)}")
|
||||
return False
|
||||
|
||||
def make_gpt_request_and_copy(text):
|
||||
"""
|
||||
@@ -88,237 +159,279 @@ def make_gpt_request_and_copy(text):
|
||||
Returns:
|
||||
str: Formatted response text from clipboard
|
||||
"""
|
||||
global driver
|
||||
|
||||
try:
|
||||
print("Starting GPT request process...")
|
||||
if not ensure_chrome_alive():
|
||||
raise Exception("Failed to ensure Chrome is alive")
|
||||
|
||||
logging.info("Starting GPT request process...")
|
||||
# Initial pause to ensure page is loaded
|
||||
time.sleep(1.5)
|
||||
|
||||
try:
|
||||
# 1. Check for "Stay logged out" button first
|
||||
print("Step 1a: Checking for 'Stay logged out' button...")
|
||||
logging.info("Step 1a: Checking for 'Stay logged out' button...")
|
||||
try:
|
||||
stay_logged_out_xpath = "//a[contains(text(), 'Stay logged out')]"
|
||||
stay_logged_out = WebDriverWait(driver, 1).until(
|
||||
EC.presence_of_element_located((By.XPATH, stay_logged_out_xpath))
|
||||
)
|
||||
print("🔒 'Stay logged out' button found, clicking it...")
|
||||
stay_logged_out.click()
|
||||
time.sleep(1)
|
||||
print("✓ Clicked 'Stay logged out'")
|
||||
except TimeoutException:
|
||||
print("💡 No 'Stay logged out' button found, continuing...")
|
||||
if helper_fn.is_element_present(stay_logged_out_xpath, timeout=1):
|
||||
logging.info("'Stay logged out' button found, clicking it...")
|
||||
helper_fn.click_element(stay_logged_out_xpath)
|
||||
time.sleep(1)
|
||||
logging.info("Clicked 'Stay logged out'")
|
||||
except Exception as logout_error:
|
||||
print(f"⚠️ Stay logged out check encountered an error: {str(logout_error)}")
|
||||
logging.warning(f"Stay logged out check encountered an error: {str(logout_error)}")
|
||||
|
||||
# 1b. Original text input logic
|
||||
print("Step 1b: Finding text input area...")
|
||||
# 1b. Text input logic
|
||||
logging.info("Step 1b: Finding text input area...")
|
||||
text_area_xpath = "//*[@id='prompt-textarea']"
|
||||
helper_fn.wait_for_element(text_area_xpath)
|
||||
if helper_fn.is_element_present(text_area_xpath):
|
||||
text_area = helper_fn.find_element(text_area_xpath)
|
||||
text_area.send_keys(text)
|
||||
print("✓ Text input successful")
|
||||
if helper_fn.send_keys(text_area_xpath, text):
|
||||
logging.info("Text input successful")
|
||||
else:
|
||||
raise Exception("Text area not found")
|
||||
raise Exception("Failed to input text")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Error in Step 1 (Text Input): {str(e)}")
|
||||
logging.error(f"Error in Step 1 (Text Input): {str(e)}")
|
||||
raise
|
||||
|
||||
try:
|
||||
# 2. Find and click send button
|
||||
print("Step 2: Finding and clicking send button...")
|
||||
logging.info("Step 2: Finding and clicking send button...")
|
||||
send_btn_xpath = "//*[@data-testid='send-button']"
|
||||
WebDriverWait(driver, 10).until(
|
||||
EC.presence_of_element_located((By.XPATH, send_btn_xpath))
|
||||
)
|
||||
send_btn = helper_fn.find_element(send_btn_xpath)
|
||||
time.sleep(1)
|
||||
send_btn.click()
|
||||
print("✓ Send button clicked")
|
||||
if helper_fn.click_element(send_btn_xpath):
|
||||
logging.info("Send button clicked")
|
||||
else:
|
||||
raise Exception("Failed to click send button")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Error in Step 2 (Send Button): {str(e)}")
|
||||
logging.error(f"Error in Step 2 (Send Button): {str(e)}")
|
||||
raise
|
||||
|
||||
try:
|
||||
# 3. Wait for response container
|
||||
print("Step 3: Waiting for response container...")
|
||||
"""
|
||||
There is a new mode, in ChatGPT --> Canvas, it is answered in canvas (detected by the text ),
|
||||
then change it to chat mode here.
|
||||
"""
|
||||
logging.info("Step 3: Waiting for response container...")
|
||||
try:
|
||||
canvas_button = WebDriverWait(driver, 5).until(
|
||||
EC.presence_of_element_located((By.XPATH, "//button[contains(., 'Answer in chat instead')]"))
|
||||
)
|
||||
print("📝 Canvas mode detected, switching to chat mode...")
|
||||
canvas_button.click()
|
||||
# Wait for transition
|
||||
time.sleep(2)
|
||||
print("✓ Switched to chat mode")
|
||||
except TimeoutException:
|
||||
print("💬 Already in chat mode")
|
||||
canvas_button_xpath = "//button[contains(., 'Answer in chat instead')]"
|
||||
if helper_fn.is_element_present(canvas_button_xpath, timeout=5):
|
||||
logging.info("Canvas mode detected, switching to chat mode...")
|
||||
helper_fn.click_element(canvas_button_xpath)
|
||||
time.sleep(2)
|
||||
logging.info("Switched to chat mode")
|
||||
else:
|
||||
logging.info("Already in chat mode")
|
||||
except Exception as canvas_error:
|
||||
print(f"⚠️ Canvas check encountered an error: {str(canvas_error)}")
|
||||
# Continue anyway as this might be a false negative
|
||||
|
||||
# Short wait for UI to stabilize
|
||||
time.sleep(2)
|
||||
|
||||
response_container_xpath = "//div[contains(@class, 'markdown prose w-full break-words')]"
|
||||
WebDriverWait(driver, 30).until(
|
||||
EC.presence_of_element_located((By.XPATH, response_container_xpath))
|
||||
)
|
||||
print("✓ Response container found")
|
||||
except Exception as e:
|
||||
print(f"❌ Error in Step 3 (Response Container): {str(e)}")
|
||||
raise
|
||||
|
||||
# 4. Wait for response to appear and complete
|
||||
print("Step 4: Waiting for response...")
|
||||
try:
|
||||
# First, wait for response container to appear
|
||||
response_container_xpath = "//div[contains(@class, 'markdown prose w-full break-words')]"
|
||||
WebDriverWait(driver, 30).until(
|
||||
EC.presence_of_element_located((By.XPATH, response_container_xpath))
|
||||
)
|
||||
print("✓ Initial response detected")
|
||||
logging.warning(f"Canvas check encountered an error: {str(canvas_error)}")
|
||||
|
||||
# 4. Wait for response to appear and complete
|
||||
logging.info("Step 4: Waiting for response...")
|
||||
try:
|
||||
# Then wait for the typing indicator to disappear
|
||||
typing_indicator_xpath = "//div[contains(@class, 'result-streaming')]"
|
||||
WebDriverWait(driver, 180).until_not(
|
||||
EC.presence_of_element_located((By.XPATH, typing_indicator_xpath))
|
||||
)
|
||||
print("✓ Response completion confirmed")
|
||||
except TimeoutException:
|
||||
print("⚠️ Response timeout - ChatGPT may have entered power saving mode")
|
||||
print("ℹ️ Please check your browser tab - ChatGPT requires tab focus to continue processing")
|
||||
raise Exception("ChatGPT entered power saving mode - Please ensure the browser tab is active")
|
||||
# First, wait for response container to appear
|
||||
response_container_xpath = "//div[contains(@class, 'markdown prose w-full break-words')]"
|
||||
if helper_fn.wait_for_element(response_container_xpath, timeout=30):
|
||||
logging.info("Initial response detected")
|
||||
|
||||
# Wait for response completion with timeout
|
||||
start_time = time.time()
|
||||
timeout = 180 # 3 minutes timeout
|
||||
last_length = 0
|
||||
stable_count = 0
|
||||
|
||||
while time.time() - start_time < timeout:
|
||||
if not ensure_chrome_alive():
|
||||
raise Exception("Chrome driver died during response wait")
|
||||
|
||||
try:
|
||||
# Get current response
|
||||
responses = helper_fn.find_elements(response_container_xpath)
|
||||
if responses:
|
||||
current_text = responses[-1].text
|
||||
current_length = len(current_text)
|
||||
|
||||
logging.debug(f"Current response length: {current_length}, Last length: {last_length}")
|
||||
|
||||
# If text length hasn't changed and response seems complete
|
||||
if current_length > 0 and current_length == last_length:
|
||||
if is_response_complete():
|
||||
stable_count += 1
|
||||
logging.info(f"Response stable for {stable_count} checks")
|
||||
if stable_count >= 3: # Require 3 consecutive stable checks
|
||||
time.sleep(2) # Final verification wait
|
||||
if current_length == len(responses[-1].text):
|
||||
logging.info("Response completion confirmed")
|
||||
break
|
||||
else:
|
||||
stable_count = 0 # Reset stable count if response is not complete
|
||||
else:
|
||||
stable_count = 0 # Reset stable count if length changed
|
||||
|
||||
last_length = current_length
|
||||
|
||||
except Exception as e:
|
||||
logging.warning(f"Error while checking response: {str(e)}")
|
||||
stable_count = 0 # Reset stable count on error
|
||||
|
||||
time.sleep(2) # Increased wait time to reduce CPU usage
|
||||
|
||||
if time.time() - start_time >= timeout:
|
||||
logging.warning("Response timeout reached")
|
||||
raise TimeoutException("Response generation timed out")
|
||||
|
||||
else:
|
||||
raise Exception("Response container not found")
|
||||
|
||||
except TimeoutException as te:
|
||||
logging.error(f"Timeout while waiting for response: {str(te)}")
|
||||
raise
|
||||
except Exception as e:
|
||||
logging.error(f"Error in Step 4 (Response Detection): {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
# Short wait for UI to stabilize
|
||||
time.sleep(2)
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Error in Step 4 (Response Detection): {str(e)}")
|
||||
logging.error(f"Error in Step 3 (Response Container): {str(e)}")
|
||||
raise
|
||||
|
||||
# 5. Find and handle copy button
|
||||
print("Step 5: Locating copy button...")
|
||||
# 5. Get response text directly
|
||||
logging.info("Step 5: Getting response text...")
|
||||
try:
|
||||
copy_button_xpath = "//*[@data-testid='copy-turn-action-button']"
|
||||
|
||||
# Wait for copy buttons with multiple conditions
|
||||
WebDriverWait(driver, 1000).until(
|
||||
EC.presence_of_all_elements_located((By.XPATH, copy_button_xpath))
|
||||
)
|
||||
|
||||
# Get all copy buttons
|
||||
copy_buttons = driver.find_elements(By.XPATH, copy_button_xpath)
|
||||
|
||||
if not copy_buttons:
|
||||
raise Exception("No copy buttons found after waiting")
|
||||
|
||||
# Get the last copy button
|
||||
last_copy_button = copy_buttons[-1]
|
||||
|
||||
# Scroll to button and ensure it's in view
|
||||
driver.execute_script("arguments[0].scrollIntoView({behavior: 'smooth', block: 'center'});", last_copy_button)
|
||||
time.sleep(1)
|
||||
|
||||
# Try multiple click methods
|
||||
try:
|
||||
# Try regular click first
|
||||
last_copy_button.click()
|
||||
except:
|
||||
try:
|
||||
# Try JavaScript click if regular click fails
|
||||
driver.execute_script("arguments[0].click();", last_copy_button)
|
||||
except Exception as js_error:
|
||||
print(f"❌ Both click methods failed: {str(js_error)}")
|
||||
raise
|
||||
|
||||
time.sleep(1)
|
||||
import pyperclip
|
||||
clipboard_content = pyperclip.paste()
|
||||
print("✓ Content successfully copied")
|
||||
return clipboard_content
|
||||
|
||||
except Exception as copy_error:
|
||||
print(f"❌ Error in copy button handling: {str(copy_error)}")
|
||||
|
||||
# Fallback: Get text directly from the response
|
||||
print("Attempting fallback method...")
|
||||
try:
|
||||
response_elements = driver.find_elements(By.XPATH, response_container_xpath)
|
||||
if response_elements:
|
||||
last_response = response_elements[-1]
|
||||
# Use JavaScript to get formatted text
|
||||
formatted_text = driver.execute_script("""
|
||||
var element = arguments[0];
|
||||
var text = '';
|
||||
function extractText(node) {
|
||||
if (node.nodeType === Node.TEXT_NODE) {
|
||||
text += node.textContent + '\\n';
|
||||
} else if (node.nodeName === 'PRE') {
|
||||
text += '```\\n' + node.textContent + '\\n```\\n';
|
||||
} else if (node.nodeName === 'CODE') {
|
||||
text += '`' + node.textContent + '`';
|
||||
} else {
|
||||
for (var child of node.childNodes) {
|
||||
extractText(child);
|
||||
response_elements = helper_fn.find_elements(response_container_xpath)
|
||||
if response_elements:
|
||||
last_response = response_elements[-1]
|
||||
# Use JavaScript to get formatted text
|
||||
formatted_text = driver.execute_script("""
|
||||
var element = arguments[0];
|
||||
var text = '';
|
||||
var lastWasNewline = false;
|
||||
var lastWasSpace = false;
|
||||
|
||||
function extractText(node) {
|
||||
if (node.nodeType === Node.TEXT_NODE) {
|
||||
var content = node.textContent.trim();
|
||||
if (content) {
|
||||
if (lastWasNewline) {
|
||||
text += '\\n';
|
||||
}
|
||||
text += content;
|
||||
lastWasNewline = false;
|
||||
lastWasSpace = false;
|
||||
}
|
||||
} else if (node.nodeName === 'PRE') {
|
||||
// Handle code blocks
|
||||
if (lastWasNewline) {
|
||||
text += '\\n';
|
||||
}
|
||||
text += '```\\n' + node.textContent + '\\n```\\n';
|
||||
lastWasNewline = true;
|
||||
lastWasSpace = false;
|
||||
} else if (node.nodeName === 'CODE') {
|
||||
// Handle inline code
|
||||
text += '`' + node.textContent + '`';
|
||||
lastWasNewline = false;
|
||||
lastWasSpace = false;
|
||||
} else if (node.nodeName === 'STRONG' || node.nodeName === 'B') {
|
||||
// Handle bold text
|
||||
if (!lastWasSpace) {
|
||||
text += ' ';
|
||||
}
|
||||
text += '**' + node.textContent + '**';
|
||||
lastWasNewline = false;
|
||||
lastWasSpace = true;
|
||||
} else if (node.nodeName === 'EM' || node.nodeName === 'I') {
|
||||
// Handle italic text
|
||||
if (!lastWasSpace) {
|
||||
text += ' ';
|
||||
}
|
||||
text += '*' + node.textContent + '*';
|
||||
lastWasNewline = false;
|
||||
lastWasSpace = true;
|
||||
} else if (node.nodeName === 'BLOCKQUOTE') {
|
||||
// Handle blockquotes
|
||||
if (lastWasNewline) {
|
||||
text += '\\n';
|
||||
}
|
||||
text += '> ' + node.textContent.trim().replace(/\\n/g, '\\n> ') + '\\n';
|
||||
lastWasNewline = true;
|
||||
lastWasSpace = false;
|
||||
} else if (node.nodeName === 'HR') {
|
||||
// Handle horizontal rules
|
||||
if (lastWasNewline) {
|
||||
text += '\\n';
|
||||
}
|
||||
text += '\\n---\\n\\n';
|
||||
lastWasNewline = true;
|
||||
lastWasSpace = false;
|
||||
} else if (node.nodeName === 'UL' || node.nodeName === 'OL') {
|
||||
// Handle lists
|
||||
if (lastWasNewline) {
|
||||
text += '\\n';
|
||||
}
|
||||
var items = node.getElementsByTagName('li');
|
||||
for (var i = 0; i < items.length; i++) {
|
||||
text += (node.nodeName === 'OL' ? (i + 1) + '. ' : '- ') + items[i].textContent.trim() + '\\n';
|
||||
}
|
||||
lastWasNewline = true;
|
||||
lastWasSpace = false;
|
||||
} else if (node.nodeName === 'P') {
|
||||
// Handle paragraphs
|
||||
if (lastWasNewline) {
|
||||
text += '\\n';
|
||||
}
|
||||
for (var child of node.childNodes) {
|
||||
extractText(child);
|
||||
}
|
||||
text += '\\n';
|
||||
lastWasNewline = true;
|
||||
lastWasSpace = false;
|
||||
} else if (node.nodeName === 'BR') {
|
||||
// Handle line breaks
|
||||
text += '\\n';
|
||||
lastWasNewline = true;
|
||||
lastWasSpace = false;
|
||||
} else {
|
||||
// Process child nodes
|
||||
for (var child of node.childNodes) {
|
||||
extractText(child);
|
||||
}
|
||||
}
|
||||
extractText(element);
|
||||
return text.trim();
|
||||
""", last_response)
|
||||
print("✓ Fallback method successful")
|
||||
return formatted_text
|
||||
else:
|
||||
raise Exception("No response elements found")
|
||||
except Exception as fallback_error:
|
||||
print(f"❌ Fallback method failed: {str(fallback_error)}")
|
||||
|
||||
# Fallback : Second Method
|
||||
print("Attempting fallback method...")
|
||||
try:
|
||||
response_elements = driver.find_elements(
|
||||
By.XPATH,
|
||||
"//div[contains(@class, 'markdown prose w-full break-words')]"
|
||||
)
|
||||
if response_elements:
|
||||
fallback_text = response_elements[-1].text
|
||||
print("✓ Fallback method successful")
|
||||
return fallback_text
|
||||
else:
|
||||
print("❌ Fallback method failed: No response elements found")
|
||||
except Exception as fallback_error:
|
||||
print(f"❌ Fallback method failed: {str(fallback_error)}")
|
||||
}
|
||||
|
||||
extractText(element);
|
||||
|
||||
// Clean up excessive newlines
|
||||
text = text.replace(/\\n{3,}/g, '\\n\\n');
|
||||
|
||||
// Ensure proper spacing around horizontal rules
|
||||
text = text.replace(/\\n---\\n/g, '\\n\\n---\\n\\n');
|
||||
|
||||
// Clean up any double spaces
|
||||
text = text.replace(/ +/g, ' ');
|
||||
|
||||
return text.trim();
|
||||
""", last_response)
|
||||
logging.info("Successfully extracted response text")
|
||||
return formatted_text
|
||||
else:
|
||||
raise Exception("No response elements found")
|
||||
except Exception as e:
|
||||
logging.error(f"Error extracting response text: {str(e)}")
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Process failed: {str(e)}")
|
||||
logging.error(f"Process failed: {str(e)}")
|
||||
if not ensure_chrome_alive():
|
||||
logging.error("Chrome driver died during process")
|
||||
return f"Error occurred: {str(e)}"
|
||||
|
||||
def stop_chat_gpt():
|
||||
driver.close()
|
||||
driver.quit()
|
||||
def cleanup():
|
||||
"""Clean up resources."""
|
||||
driver_manager.quit()
|
||||
|
||||
# chrome_handler.kill_chrome()
|
||||
|
||||
if __name__ == "__main__":
|
||||
start_chat_gpt()
|
||||
|
||||
initialize_chrome()
|
||||
try:
|
||||
while True:
|
||||
req = input("Enter text: ")
|
||||
if req == "!quit":
|
||||
break
|
||||
resp = make_gpt_request_and_copy(req)
|
||||
print(resp)
|
||||
except KeyboardInterrupt:
|
||||
print("KeyboardInterrupt detected, exiting...")
|
||||
response = make_gpt_request_and_copy("Hello, how are you?")
|
||||
print(response)
|
||||
finally:
|
||||
stop_chat_gpt()
|
||||
cleanup()
|
||||
|
||||
+176
@@ -0,0 +1,176 @@
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.common.exceptions import TimeoutException, NoSuchElementException
|
||||
import logging
|
||||
import pyperclip
|
||||
from selenium.webdriver.common.keys import Keys
|
||||
|
||||
# Global variables
|
||||
driver = None
|
||||
DEFAULT_TIMEOUT = 10
|
||||
|
||||
def set_driver(webdriver):
|
||||
"""Set the global driver instance."""
|
||||
global driver
|
||||
driver = webdriver
|
||||
|
||||
def wait_for_element(xpath, timeout=DEFAULT_TIMEOUT):
|
||||
"""Wait for an element to be present and visible."""
|
||||
try:
|
||||
element = WebDriverWait(driver, timeout).until(
|
||||
EC.presence_of_element_located((By.XPATH, xpath))
|
||||
)
|
||||
return element
|
||||
except TimeoutException:
|
||||
logging.warning(f"Timeout waiting for element: {xpath}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logging.error(f"Error waiting for element {xpath}: {str(e)}")
|
||||
return None
|
||||
|
||||
def is_element_present(xpath, timeout=DEFAULT_TIMEOUT):
|
||||
"""Check if an element is present on the page."""
|
||||
try:
|
||||
WebDriverWait(driver, timeout).until(
|
||||
EC.presence_of_element_located((By.XPATH, xpath))
|
||||
)
|
||||
return True
|
||||
except (TimeoutException, NoSuchElementException):
|
||||
return False
|
||||
except Exception as e:
|
||||
logging.error(f"Error checking element presence {xpath}: {str(e)}")
|
||||
return False
|
||||
|
||||
def find_element(xpath, timeout=DEFAULT_TIMEOUT):
|
||||
"""Find an element by xpath with wait."""
|
||||
try:
|
||||
element = WebDriverWait(driver, timeout).until(
|
||||
EC.presence_of_element_located((By.XPATH, xpath))
|
||||
)
|
||||
return element
|
||||
except TimeoutException:
|
||||
logging.warning(f"Element not found: {xpath}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logging.error(f"Error finding element {xpath}: {str(e)}")
|
||||
return None
|
||||
|
||||
def find_elements(xpath, timeout=DEFAULT_TIMEOUT):
|
||||
"""Find all elements matching xpath with wait."""
|
||||
try:
|
||||
elements = WebDriverWait(driver, timeout).until(
|
||||
EC.presence_of_all_elements_located((By.XPATH, xpath))
|
||||
)
|
||||
return elements
|
||||
except TimeoutException:
|
||||
logging.warning(f"No elements found: {xpath}")
|
||||
return []
|
||||
except Exception as e:
|
||||
logging.error(f"Error finding elements {xpath}: {str(e)}")
|
||||
return []
|
||||
|
||||
def click_element(xpath, timeout=DEFAULT_TIMEOUT):
|
||||
"""Click an element with wait and retry."""
|
||||
try:
|
||||
element = WebDriverWait(driver, timeout).until(
|
||||
EC.element_to_be_clickable((By.XPATH, xpath))
|
||||
)
|
||||
element.click()
|
||||
return True
|
||||
except Exception as e:
|
||||
logging.error(f"Error clicking element {xpath}: {str(e)}")
|
||||
try:
|
||||
# Fallback to JavaScript click
|
||||
element = find_element(xpath)
|
||||
if element:
|
||||
driver.execute_script("arguments[0].click();", element)
|
||||
return True
|
||||
except Exception as js_error:
|
||||
logging.error(f"JavaScript click failed for {xpath}: {str(js_error)}")
|
||||
return False
|
||||
|
||||
def send_keys(xpath, text, timeout=DEFAULT_TIMEOUT):
|
||||
"""Send keys to an element with wait using clipboard to handle non-BMP characters."""
|
||||
try:
|
||||
element = WebDriverWait(driver, timeout).until(
|
||||
EC.presence_of_element_located((By.XPATH, xpath))
|
||||
)
|
||||
element.clear()
|
||||
|
||||
# Store current clipboard content
|
||||
original_clipboard = pyperclip.paste()
|
||||
|
||||
# Copy text to clipboard
|
||||
pyperclip.copy(text)
|
||||
|
||||
# Use Ctrl+V to paste
|
||||
element.send_keys(Keys.CONTROL + 'v')
|
||||
|
||||
# Restore original clipboard content
|
||||
pyperclip.copy(original_clipboard)
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
logging.error(f"Error sending keys to element {xpath}: {str(e)}")
|
||||
return False
|
||||
|
||||
def wait_for_text_present(xpath, text, timeout=DEFAULT_TIMEOUT):
|
||||
"""Wait for text to be present in element."""
|
||||
try:
|
||||
return WebDriverWait(driver, timeout).until(
|
||||
EC.text_to_be_present_in_element((By.XPATH, xpath), text)
|
||||
)
|
||||
except TimeoutException:
|
||||
logging.warning(f"Text '{text}' not found in element: {xpath}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logging.error(f"Error waiting for text in element {xpath}: {str(e)}")
|
||||
return False
|
||||
|
||||
def wait_for_element_invisible(xpath, timeout=DEFAULT_TIMEOUT):
|
||||
"""Wait for an element to become invisible."""
|
||||
try:
|
||||
return WebDriverWait(driver, timeout).until(
|
||||
EC.invisibility_of_element_located((By.XPATH, xpath))
|
||||
)
|
||||
except TimeoutException:
|
||||
logging.warning(f"Element still visible: {xpath}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logging.error(f"Error waiting for element invisibility {xpath}: {str(e)}")
|
||||
return False
|
||||
|
||||
def get_element_text(xpath, timeout=DEFAULT_TIMEOUT):
|
||||
"""Get text from an element with wait."""
|
||||
try:
|
||||
element = WebDriverWait(driver, timeout).until(
|
||||
EC.presence_of_element_located((By.XPATH, xpath))
|
||||
)
|
||||
return element.text
|
||||
except Exception as e:
|
||||
logging.error(f"Error getting text from element {xpath}: {str(e)}")
|
||||
return None
|
||||
|
||||
def is_element_enabled(xpath, timeout=DEFAULT_TIMEOUT):
|
||||
"""Check if an element is enabled."""
|
||||
try:
|
||||
element = WebDriverWait(driver, timeout).until(
|
||||
EC.presence_of_element_located((By.XPATH, xpath))
|
||||
)
|
||||
return element.is_enabled()
|
||||
except Exception as e:
|
||||
logging.error(f"Error checking if element is enabled {xpath}: {str(e)}")
|
||||
return False
|
||||
|
||||
def scroll_into_view(xpath, timeout=DEFAULT_TIMEOUT):
|
||||
"""Scroll element into view."""
|
||||
try:
|
||||
element = WebDriverWait(driver, timeout).until(
|
||||
EC.presence_of_element_located((By.XPATH, xpath))
|
||||
)
|
||||
driver.execute_script("arguments[0].scrollIntoView({behavior: 'smooth', block: 'center'});", element)
|
||||
return True
|
||||
except Exception as e:
|
||||
logging.error(f"Error scrolling element into view {xpath}: {str(e)}")
|
||||
return False
|
||||
Reference in New Issue
Block a user