Reading Mode

In this post, we’ll explore the development of a web scraper that uses AI to assist users in extracting information from websites. The project combines several technologies, including Selenium for web scraping, Python’s streamlit library for building an interactive interface, and various AI models to answer user queries.

Background

Web scraping is the process of automatically extracting data from websites using specialized software or algorithms. While it can be a useful tool for gathering information, it can also raise ethical concerns if not done responsibly. In this project, we aimed to create a web scraper that not only extracts data but also provides users with helpful insights and answers to their questions.

Project Overview

The Selfhood Web Scraper is an open-source project built using Python and the streamlit library for building an interactive interface. The scraper uses Selenium to navigate websites and extract data, while AI models are used to answer user queries and provide insights into the scraped data.

Key Features

  1. Web Scraping : The web scraper uses Selenium to navigate websites and extract data. It supports various website formats and can handle pagination.
  2. AI Model Integration : The project integrates several AI models, including LM Studio and Groq, to answer user queries and provide insights into the scraped data.
  3. Interactive Interface : The streamlit library is used to build an interactive interface that allows users to input website URLs, select model types, and ask questions.

Implementation Details

The project consists of several modules:

  1. Web Scraping Module : This module uses Selenium to navigate websites and extract data.
  2. AI Model Integration Module : This module integrates the AI models used in the project, including LM Studio and Groq.
  3. Interactive Interface Module : This module uses streamlit to build an interactive interface that allows users to input website URLs, select model types, and ask questions.

Benefits

The Selfhood Web Scraper with AI Assistant provides several benefits, including:

  1. Efficient Data Extraction : The web scraper can efficiently extract data from websites, making it a useful tool for data analysts and researchers.
  2. Insights and Answers : The AI models used in the project provide insights into the scraped data and answer user queries, making it a valuable tool for users who need help with their research or projects.
  3. Interactive Interface : The streamlit library is used to build an interactive interface that allows users to easily input website URLs, select model types, and ask questions.

Conclusion

The Selfhood Web Scraper with AI Assistant is an innovative project that combines web scraping with AI to provide users with efficient data extraction and insightful answers. While it has several benefits, it also raises ethical concerns about data usage and privacy. As the web scrapers continue to evolve, we need to address these concerns and develop responsible practices for using this technology.

Future Work

There are several areas for future work in this project:

  1. Improving Data Quality : Improving the quality of the scraped data by adding more sophisticated algorithms for data cleaning and validation.
  2. Integrating More AI Models : Integrating additional AI models, such as transformer-based models, to improve the accuracy of user queries.
  3. Developing Responsible Practices : Developing responsible practices for using web scrapers, including guidelines for data usage and privacy.

By addressing these areas, we can further improve the Selfhood Web Scraper with AI Assistant and make it a valuable tool for users who need help with their research or projects.

import streamlit as st
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from selenium.common.exceptions import NoSuchElementException, TimeoutException
import requests
import json
import os
from datetime import datetime

def setup_webdriver():
    # Configure Chrome options
    chrome_options = Options()
    chrome_options.add_argument("--headless")  # Run in headless mode (no GUI)
    chrome_options.add_argument("--disable-gpu")
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--disable-dev-shm-usage")
    
    # Setup Chrome WebDriver
    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=chrome_options)
    return driver

def scrape_website(url, max_pages=None, next_button_selector=None):
    driver = setup_webdriver()
    all_content = []
    current_page = 1
    
    try:
        driver.get(url)
        
        while True:
            # Wait for the page to load
            WebDriverWait(driver, 10).until(
                EC.presence_of_element_located((By.TAG_NAME, "body"))
            )
            
            # Scrape current page
            page_content = {
                "page_number": current_page,
                "title": driver.title,
                "content": driver.find_element(By.TAG_NAME, "body").text
            }
            all_content.append(page_content)
            
            # Check if we should stop pagination
            if max_pages and current_page >= max_pages:
                break
                
            # Try to find and click next button if selector is provided
            if not next_button_selector:
                break
                
            try:
                next_button = WebDriverWait(driver, 5).until(
                    EC.element_to_be_clickable((By.CSS_SELECTOR, next_button_selector))
                )
                driver.execute_script("arguments[0].scrollIntoView(true);", next_button)
                next_button.click()
                
                # Wait for page change (you might need to adjust this based on the website)
                WebDriverWait(driver, 10).until(
                    lambda driver: page_content["title"] != driver.title
                )
                
                current_page += 1
            except (NoSuchElementException, TimeoutException):
                break  # No more pages to scrape
                
        return {
            "success": True,
            "pages": all_content
        }
    except Exception as e:
        return {"success": False, "error": str(e)}
    finally:
        driver.quit()

def query_lmstudio(prompt, context, model="llama2:3b"):
    """Query LM Studio with context and prompt"""
    try:
        # LM Studio typically runs on port 1234
        api_url = "http://localhost:1234/v1/chat/completions"
        
        # Construct messages in ChatGPT format
        messages = [
            {"role": "system", "content": "You are a helpful assistant that answers questions based on the provided context."},
            {"role": "user", "content": f"Context: {context}\n\nQuestion: {prompt}"}
        ]
        
        # Construct payload for LM Studio
        payload = {
            "messages": messages,
            "temperature": 0.7,
            "max_tokens": 2000,
            "stream": False
        }
        
        # Add headers
        headers = {
            "Content-Type": "application/json"
        }
        
        response = requests.post(
            api_url,
            json=payload,
            headers=headers
        )
        
        # Debug information
        print("Request URL:", api_url)
        print("Request payload:", payload)
        print("Response status:", response.status_code)
        print("Response content:", response.text)
        
        if response.status_code != 200:
            return f"Error: Server returned status code {response.status_code}"
            
        response_json = response.json()
        if "choices" not in response_json:
            return f"Error: Unexpected response format: {response_json}"
            
        return response_json["choices"][0]["message"]["content"]
        
    except requests.exceptions.RequestException as e:
        return f"Connection error: {str(e)}"
    except json.JSONDecodeError as e:
        return f"Error parsing response: {str(e)}"
    except Exception as e:
        return f"Unexpected error: {str(e)}"

def query_groq(prompt, context, model="mixtral-8x7b-32768"):
    """Query Groq with context and prompt"""
    try:
        # Groq API endpoint
        api_url = "https://api.groq.com/openai/v1/chat/completions"
        
        # Construct messages in ChatGPT format
        messages = [
            {"role": "system", "content": "You are a helpful assistant that answers questions based on the provided context."},
            {"role": "user", "content": f"Context: {context}\n\nQuestion: {prompt}"}
        ]
        
        # Construct payload for Groq
        payload = {
            "model": model,
            "messages": messages,
            "temperature": 0.7,
            "max_tokens": 8192
        }
        
        # Add headers with Groq API key
        headers = {
            "Authorization": f"Bearer {st.secrets['GROQ_API_KEY']}",
            "Content-Type": "application/json"
        }
        
        response = requests.post(
            api_url,
            json=payload,
            headers=headers
        )
        
        if response.status_code != 200:
            return f"Error: Server returned status code {response.status_code}"
            
        response_json = response.json()
        return response_json["choices"][0]["message"]["content"]
        
    except Exception as e:
        return f"Error querying Groq: {str(e)}"

def query_model(prompt, context, model_type="lmstudio", model_name="Default Model"):
    """Query selected model with context and prompt"""
    if model_type == "lmstudio":
        return query_lmstudio(prompt, context)
    elif model_type == "groq":
        return query_groq(prompt, context, model_name)
    else:
        return "Error: Unknown model type"

def save_answer(question, answer, save_dir="saved_answers"):
    """Save the Q&A to a file"""
    try:
        # Create directory if it doesn't exist
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
        
        # Create filename with timestamp
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"qa_{timestamp}.txt"
        filepath = os.path.join(save_dir, filename)
        
        # Write Q&A to file
        with open(filepath, "w", encoding="utf-8") as f:
            f.write(f"Question: {question}\n\n")
            f.write(f"Answer: {answer}\n")
        
        return True, filepath
    except Exception as e:
        return False, str(e)

def load_conversation_history(save_dir="saved_answers"):
    """Load conversation history from file"""
    try:
        history_file = os.path.join(save_dir, "conversation_history.json")
        if os.path.exists(history_file):
            with open(history_file, "r", encoding="utf-8") as f:
                return json.load(f)
        return []
    except Exception as e:
        print(f"Error loading conversation history: {e}")
        return []

def save_conversation_history(history, save_dir="saved_answers"):
    """Save conversation history to file"""
    try:
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
        history_file = os.path.join(save_dir, "conversation_history.json")
        with open(history_file, "w", encoding="utf-8") as f:
            json.dump(history, f, indent=2)
        return True
    except Exception as e:
        print(f"Error saving conversation history: {e}")
        return False

def delete_conversation(index, save_dir="saved_answers"):
    """Delete a specific conversation from history"""
    try:
        # Remove from session state
        if 0 <= index < len(st.session_state.conversation_history):
            st.session_state.conversation_history.pop(index)
            
            # Save updated history to file
            save_conversation_history(st.session_state.conversation_history, save_dir)
            return True
        return False
    except Exception as e:
        print(f"Error deleting conversation: {e}")
        return False

def main():
    st.set_page_config(
        page_title=" X88 Web Scraper",
        page_icon="🤖",
        layout="wide"
    )
    
    st.title(" X88 Web Scraper with AI Assistant")
    
    # Create tabs for scraping and querying
    tab1, tab2 = st.tabs(["Scrape Website", "Ask Questions"])
    
    # Store scraped data in session state
    if "scraped_data" not in st.session_state:
        st.session_state.scraped_data = None
    
    # Initialize conversation history from file
    if "conversation_history" not in st.session_state:
        st.session_state.conversation_history = load_conversation_history()

    with tab1:
        # Existing scraping UI
        url = st.text_input("Enter website URL to scrape:", "https://example.com")
        
        col1, col2 = st.columns(2)
        with col1:
            max_pages = st.number_input("Maximum pages to scrape (0 for unlimited)", 
                                      min_value=0, value=1)
        with col2:
            next_button_selector = st.text_input(
                "CSS Selector for 'Next' button",
                placeholder="e.g., button.next-page, div.pagination a.next"
            )
        
        if st.button("Scrape"):
            if url:
                with st.spinner("Scraping website..."):
                    result = scrape_website(
                        url,
                        max_pages if max_pages > 0 else None,
                        next_button_selector if next_button_selector else None
                    )
                    
                    if not result["success"]:
                        st.error(f"Error: {result['error']}")
                    else:
                        st.session_state.scraped_data = result['pages']
                        st.success(f"Scraping completed! Found {len(result['pages'])} pages")
                        
                        # Display results with pagination
                        for page_num, page_data in enumerate(result['pages'], 1):
                            with st.expander(f"Page {page_num}"):
                                st.subheader("Page Title")
                                st.write(page_data["title"])
                                st.subheader("Content")
                                st.write(page_data["content"])
            else:
                st.warning("Please enter a URL")

    with tab2:
        # Initialize session state for conversation history
        if "conversation_history" not in st.session_state:
            st.session_state.conversation_history = load_conversation_history()
        
        # Only show new question input if website is scraped
        if st.session_state.scraped_data is not None:
            # Combine all scraped content into one context
            full_context = "\n\n".join([
                f"Page {page['page_number']}:\n{page['title']}\n{page['content']}" 
                for page in st.session_state.scraped_data
            ])
            
            # Create columns for model selection and save directory
            col1, col2, col3 = st.columns(3)
            with col1:
                model_type = st.selectbox(
                    "Select Model Type",
                    ["lmstudio", "groq"],
                    index=0
                )
            with col2:
                if model_type == "groq":
                    model = st.selectbox(
                        "Select Model",
                        ["mixtral-8x7b-32768", "llama2-70b-4096"],
                        index=0
                    )
                else:
                    model = "Default Model"
            with col3:
                save_dir = st.text_input(
                    "Save Directory",
                    value="saved_answers",
                    help="Directory where answers will be saved"
                )
            
            # Question input
            user_question = st.text_input("Ask a question about the scraped content:")
            
            # Create columns for Ask and Save buttons
            col1, col2 = st.columns(2)
            
            with col1:
                ask_button = st.button("Ask")
            with col2:
                save_button = st.button("Save All Conversations")
            
            if ask_button:
                if user_question:
                    with st.spinner("Getting answer..."):
                        answer = query_model(user_question, full_context, model_type, model)
                        
                        # Add to conversation history
                        st.session_state.conversation_history.append({
                            "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                            "model": f"{model_type}-{model}",
                            "question": user_question,
                            "answer": answer
                        })
                        
                        # Save conversation history to file
                        save_conversation_history(st.session_state.conversation_history)
                        
                        st.write("### Answer")
                        st.write(answer)
                        
                        # Save individual answer
                        success, result = save_answer(user_question, answer, save_dir)
                        if success:
                            st.success(f"Answer saved to {result}")
                        else:
                            st.error(f"Failed to save answer: {result}")
                else:
                    st.warning("Please enter a question!")
        else:
            st.warning("Please scrape a website first to ask new questions!")
            
        # Display conversation history at the bottom
        st.markdown("---")  # Add a horizontal line to separate
        if st.session_state.conversation_history:
            st.write("### Previous Conversations")
            # Reverse the order to show newest first
            for i, conv in enumerate(reversed(st.session_state.conversation_history), 1):
                with st.expander(f"Q&A {len(st.session_state.conversation_history)-i+1} - {conv['timestamp']}"):
                    # Create two columns: one for content, one for delete button
                    col1, col2 = st.columns([5,1])
                    
                    with col1:
                        st.write("**Model:**", conv.get('model', 'Unknown'))
                        st.write("**Question:**")
                        st.write(conv['question'])
                        st.write("**Answer:**")
                        st.write(conv['answer'])
                    
                    with col2:
                        # Calculate the actual index in the original list
                        original_index = len(st.session_state.conversation_history) - i
                        if st.button("🗑️ Delete", key=f"delete_{i}", help="Delete this conversation"):
                            if delete_conversation(original_index):
                                st.rerun()

if __name__ == "__main__":
    main()

To run the provided code, you’ll need to meet the following requirements:

Operating System:

  • The code is compatible with Windows, macOS, and Linux operating systems.

Python Environment:

  • Python 3.8 or later is required for the project.
  • You can install Python from the official Python website (https://www.python.org/downloads/).
  • Alternatively, you can use a Python distribution like Anaconda, which includes various libraries and tools for data science and AI.

Required Libraries:

  • streamlit: Install using pip: pip install streamlit
  • selenium: Install using pip: pip install selenium (with the required ChromeDriver version)
  • webdriver-manager: Install using pip: pip install webdriver-manager
  • requests: Install using pip: pip install requests
  • json: Comes bundled with Python
  • datetime: Comes bundled with Python

Additional Dependencies:

  • ChromeDriver: You’ll need to download the ChromeDriver executable, which can be found on the official ChromeDriver website (extract and put the folder in the same folder as this code) (https://chromedriver.chromium.org/downloads). The version you use may depend on your system’s architecture and operating system.

Other Requirements:

  • A working internet connection is required for the code to function.
  • You’ll need a Google account to access the Groq API, which is used in the project. Create a new account if you don’t already have one.
  • You’ll also need an API key from the Groq website (https://console.groq.com/).
  • Save Directory: The project uses streamlit to store saved conversations in a directory specified by the user. You can choose any directory you want for this purpose.

Setup Instructions:

  1. Install the required libraries using pip.
  2. Download the ChromeDriver executable and add it to your system’s PATH environment variable or put it in the same folder as this code.
  3. Create a folder named .streamlit then create a new file named secrets.py in the same directory as the project script, containing your Groq API key.
  4. Update the project script with the correct path to the ChromeDriver executable.

After completing these steps, you should be able to run the code successfully.

Categorized in: