Files
Scraperr/api/backend/scraping.py
2024-07-23 20:53:15 -05:00

180 lines
5.3 KiB
Python

import logging
from typing import Any, Optional
import time
from bs4 import BeautifulSoup
from lxml import etree
from seleniumwire import webdriver
from lxml.etree import _Element # type: ignore [reportPrivateImport]
from fake_useragent import UserAgent
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.chrome.service import Service
from urllib.parse import urlparse, urljoin
from api.backend.models import Element, CapturedElement
LOG = logging.getLogger(__name__)
class HtmlElement(_Element): ...
def is_same_domain(url: str, original_url: str) -> bool:
parsed_url = urlparse(url)
parsed_original_url = urlparse(original_url)
return parsed_url.netloc == parsed_original_url.netloc or parsed_url.netloc == ""
def clean_xpath(xpath: str) -> str:
parts = xpath.split("/")
clean_parts: list[str] = []
for part in parts:
if part == "":
clean_parts.append("/")
else:
clean_parts.append(part)
clean_xpath = "//".join(clean_parts).replace("////", "//")
clean_xpath = clean_xpath.replace("'", "\\'")
return clean_xpath
def sxpath(context: _Element, xpath: str) -> list[HtmlElement]:
return context.xpath(xpath) # type: ignore [reportReturnType]
def interceptor(headers: dict[str, Any]):
def _interceptor(request: Any):
for key, val in headers.items():
if request.headers.get(key):
del request.headers[key]
request.headers[key] = val
if "sec-ch-ua" in request.headers:
original_value = request.headers["sec-ch-ua"]
del request.headers["sec-ch-ua"]
modified_value = original_value.replace("HeadlessChrome", "Chrome")
request.headers["sec-ch-ua"] = modified_value
return _interceptor
def create_driver():
ua = UserAgent()
chrome_options = ChromeOptions()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument(f"user-agent={ua.random}")
return webdriver.Chrome(options=chrome_options)
async def make_site_request(
url: str,
headers: Optional[dict[str, Any]],
multi_page_scrape: bool = False,
visited_urls: set[str] = set(),
pages: set[tuple[str, str]] = set(),
original_url: str = "",
) -> None:
"""Make basic `GET` request to site using Selenium."""
# Check if URL has already been visited
if url in visited_urls:
return
driver = create_driver()
driver.implicitly_wait(10)
if headers:
driver.request_interceptor = interceptor(headers)
try:
LOG.info(f"Visiting URL: {url}")
driver.get(url)
final_url = driver.current_url
visited_urls.add(url)
visited_urls.add(final_url)
_ = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
time.sleep(5)
page_source = driver.page_source
LOG.debug(f"Page source for url: {url}\n{page_source}")
pages.add((page_source, final_url))
finally:
driver.quit()
if not multi_page_scrape:
return
soup = BeautifulSoup(page_source, "html.parser")
for a_tag in soup.find_all("a"):
link = a_tag.get("href")
if link:
if not urlparse(link).netloc:
base_url = "{0.scheme}://{0.netloc}".format(urlparse(final_url))
link = urljoin(base_url, link)
if link not in visited_urls and is_same_domain(link, original_url):
await make_site_request(
link,
headers=headers,
multi_page_scrape=multi_page_scrape,
visited_urls=visited_urls,
pages=pages,
original_url=original_url,
)
async def collect_scraped_elements(page: tuple[str, str], xpaths: list[Element]):
soup = BeautifulSoup(page[0], "lxml")
root = etree.HTML(str(soup))
elements: dict[str, list[CapturedElement]] = dict()
for elem in xpaths:
el = sxpath(root, clean_xpath(elem.xpath))
text = ["\t".join(str(e) for e in e.itertext()) for e in el]
captured_element = CapturedElement(
xpath=elem.xpath, text=",".join(text), name=elem.name
)
if elem.name in elements:
elements[elem.name].append(captured_element)
continue
elements[elem.name] = [captured_element]
return {page[1]: elements}
async def scrape(
url: str,
xpaths: list[Element],
headers: Optional[dict[str, Any]],
multi_page_scrape: bool = False,
):
visited_urls: set[str] = set()
pages: set[tuple[str, str]] = set()
_ = await make_site_request(
url,
headers,
multi_page_scrape=multi_page_scrape,
visited_urls=visited_urls,
pages=pages,
original_url=url,
)
elements: list[dict[str, dict[str, list[CapturedElement]]]] = list()
for page in pages:
elements.append(await collect_scraped_elements(page, xpaths))
return elements