mirror of
https://github.com/jaypyles/Scraperr.git
synced 2025-12-09 09:15:42 +00:00
wip: mutli page scraping and worker
This commit is contained in:
@@ -1,18 +1,19 @@
|
||||
# STL
|
||||
from functools import partial
|
||||
import uuid
|
||||
import logging
|
||||
from io import StringIO
|
||||
|
||||
# PDM
|
||||
import pandas as pd
|
||||
from fastapi import FastAPI
|
||||
from fastapi import BackgroundTasks, FastAPI
|
||||
from fastapi.encoders import jsonable_encoder
|
||||
from fastapi.responses import FileResponse, JSONResponse, StreamingResponse
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
# LOCAL
|
||||
from api.backend.job import query, insert, delete_jobs
|
||||
from api.backend.job import query, insert, delete_jobs, update_job
|
||||
from api.backend.models import (
|
||||
DownloadJob,
|
||||
SubmitScrapeJob,
|
||||
@@ -21,6 +22,7 @@ from api.backend.models import (
|
||||
)
|
||||
from api.backend.scraping import scrape
|
||||
from api.backend.auth.auth_router import auth_router
|
||||
from seleniumwire.thirdparty.mitmproxy.master import traceback
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
@@ -55,23 +57,15 @@ def read_favicon():
|
||||
|
||||
|
||||
@app.post("/api/submit-scrape-job")
|
||||
async def submit_scrape_job(job: SubmitScrapeJob):
|
||||
async def submit_scrape_job(job: SubmitScrapeJob, background_tasks: BackgroundTasks):
|
||||
LOG.info(f"Recieved job: {job}")
|
||||
try:
|
||||
scraped = await scrape(job.url, job.elements)
|
||||
|
||||
LOG.info(
|
||||
f"Scraped result for url: {job.url}, with elements: {job.elements}\n{scraped}"
|
||||
)
|
||||
|
||||
json_scraped = jsonable_encoder(scraped)
|
||||
job.result = json_scraped
|
||||
job.id = uuid.uuid4().hex
|
||||
|
||||
if job.user:
|
||||
await insert(jsonable_encoder(job))
|
||||
|
||||
return JSONResponse(content=json_scraped)
|
||||
return JSONResponse(content=f"Job queued for scraping: {job.id}")
|
||||
except Exception as e:
|
||||
return JSONResponse(content={"error": str(e)}, status_code=500)
|
||||
|
||||
@@ -95,19 +89,21 @@ async def download(download_job: DownloadJob):
|
||||
|
||||
flattened_results = []
|
||||
for result in results:
|
||||
for key, values in result["result"].items():
|
||||
for value in values:
|
||||
flattened_results.append(
|
||||
{
|
||||
"id": result["id"],
|
||||
"url": result["url"],
|
||||
"element_name": key,
|
||||
"xpath": value["xpath"],
|
||||
"text": value["text"],
|
||||
"user": result["user"],
|
||||
"time_created": result["time_created"],
|
||||
}
|
||||
)
|
||||
for res in result["result"]:
|
||||
for url, elements in res.items():
|
||||
for element_name, values in elements.items():
|
||||
for value in values:
|
||||
flattened_results.append(
|
||||
{
|
||||
"id": result.get("id", None),
|
||||
"url": url,
|
||||
"element_name": element_name,
|
||||
"xpath": value.get("xpath", ""),
|
||||
"text": value.get("text", ""),
|
||||
"user": result.get("user", ""),
|
||||
"time_created": result.get("time_created", ""),
|
||||
}
|
||||
)
|
||||
|
||||
df = pd.DataFrame(flattened_results)
|
||||
|
||||
@@ -120,6 +116,7 @@ async def download(download_job: DownloadJob):
|
||||
|
||||
except Exception as e:
|
||||
LOG.error(f"Exception occurred: {e}")
|
||||
traceback.print_exc()
|
||||
return {"error": str(e)}
|
||||
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
# STL
|
||||
import logging
|
||||
from typing import Any
|
||||
from pymongo import DESCENDING
|
||||
|
||||
# LOCAL
|
||||
from api.backend.database import get_job_collection
|
||||
@@ -14,6 +15,13 @@ async def insert(item: dict[str, Any]) -> None:
|
||||
LOG.info(f"Inserted item: {i}")
|
||||
|
||||
|
||||
async def get_queued_job():
|
||||
collection = get_job_collection()
|
||||
return await collection.find_one(
|
||||
{"status": "Queued"}, sort=[("created_at", DESCENDING)]
|
||||
)
|
||||
|
||||
|
||||
async def query(filter: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
collection = get_job_collection()
|
||||
cursor = collection.find(filter)
|
||||
@@ -26,6 +34,15 @@ async def query(filter: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
return results
|
||||
|
||||
|
||||
async def update_job(id: str, field: str, value: Any):
|
||||
collection = get_job_collection()
|
||||
result = await collection.update_one(
|
||||
{"id": id},
|
||||
{"$set": {field: value}},
|
||||
)
|
||||
return result.modified_count
|
||||
|
||||
|
||||
async def delete_jobs(jobs: list[str]):
|
||||
collection = get_job_collection()
|
||||
result = await collection.delete_many({"id": {"$in": jobs}})
|
||||
|
||||
@@ -17,6 +17,11 @@ class CapturedElement(pydantic.BaseModel):
|
||||
name: str
|
||||
|
||||
|
||||
class JobOptions(pydantic.BaseModel):
|
||||
multi_page_scrape: bool
|
||||
custom_headers: Optional[dict[str, Any]]
|
||||
|
||||
|
||||
class SubmitScrapeJob(pydantic.BaseModel):
|
||||
id: Optional[str] = None
|
||||
url: str
|
||||
@@ -24,6 +29,8 @@ class SubmitScrapeJob(pydantic.BaseModel):
|
||||
user: Optional[str] = None
|
||||
time_created: Optional[str] = None
|
||||
result: Optional[dict[str, Any]] = None
|
||||
job_options: JobOptions
|
||||
status: str = "Queued"
|
||||
|
||||
|
||||
class RetrieveScrapeJobs(pydantic.BaseModel):
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
# STL
|
||||
import logging
|
||||
from typing import Any, Optional
|
||||
|
||||
# PDM
|
||||
from bs4 import BeautifulSoup
|
||||
from lxml import etree
|
||||
from selenium import webdriver
|
||||
from seleniumwire import webdriver
|
||||
from lxml.etree import _Element # type: ignore [reportPrivateImport]
|
||||
from fake_useragent import UserAgent
|
||||
from webdriver_manager.chrome import ChromeDriverManager
|
||||
@@ -13,6 +14,7 @@ from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.chrome.options import Options as ChromeOptions
|
||||
from selenium.webdriver.chrome.service import Service
|
||||
from urllib.parse import urlparse, urljoin
|
||||
|
||||
# LOCAL
|
||||
from api.backend.models import Element, CapturedElement
|
||||
@@ -23,6 +25,14 @@ LOG = logging.getLogger(__name__)
|
||||
class HtmlElement(_Element): ...
|
||||
|
||||
|
||||
def is_same_domain(url: str, original_url: str) -> bool:
|
||||
parsed_url = urlparse(url)
|
||||
parsed_original_url = urlparse(original_url)
|
||||
LOG.info(f"PARSED: {parsed_url.netloc}")
|
||||
LOG.info(f"PARSED_ORIGINAL: {parsed_original_url.netloc}")
|
||||
return parsed_url.netloc == parsed_original_url.netloc or parsed_url.netloc == ""
|
||||
|
||||
|
||||
def clean_xpath(xpath: str) -> str:
|
||||
parts = xpath.split("/")
|
||||
clean_parts: list[str] = []
|
||||
@@ -41,10 +51,25 @@ def sxpath(context: _Element, xpath: str) -> list[HtmlElement]:
|
||||
return context.xpath(xpath) # type: ignore [reportReturnType]
|
||||
|
||||
|
||||
async def make_site_request(url: str) -> str:
|
||||
"""Make basic `GET` request to site using Selenium."""
|
||||
ua = UserAgent()
|
||||
def interceptor(headers: dict[str, Any]):
|
||||
def _interceptor(request: Any):
|
||||
for key, val in headers.items():
|
||||
if request.headers.get(key):
|
||||
del request.headers[key]
|
||||
|
||||
request.headers[key] = val
|
||||
|
||||
if "sec-ch-ua" in request.headers:
|
||||
original_value = request.headers["sec-ch-ua"]
|
||||
del request.headers["sec-ch-ua"]
|
||||
modified_value = original_value.replace("HeadlessChrome", "Chrome")
|
||||
request.headers["sec-ch-ua"] = modified_value
|
||||
|
||||
return _interceptor
|
||||
|
||||
|
||||
def create_driver():
|
||||
ua = UserAgent()
|
||||
chrome_options = ChromeOptions()
|
||||
chrome_options.add_argument("--headless")
|
||||
chrome_options.add_argument("--no-sandbox")
|
||||
@@ -52,23 +77,65 @@ async def make_site_request(url: str) -> str:
|
||||
chrome_options.add_argument(f"user-agent={ua.random}")
|
||||
|
||||
service = Service(ChromeDriverManager().install())
|
||||
driver = webdriver.Chrome(options=chrome_options, service=service)
|
||||
return webdriver.Chrome(options=chrome_options, service=service)
|
||||
|
||||
|
||||
async def make_site_request(
|
||||
url: str,
|
||||
headers: Optional[dict[str, Any]],
|
||||
multi_page_scrape: bool = False,
|
||||
visited_urls: set[str] = set(),
|
||||
pages: set[tuple[str, str]] = set(),
|
||||
original_url: str = "",
|
||||
) -> None:
|
||||
"""Make basic `GET` request to site using Selenium."""
|
||||
# Check if URL has already been visited
|
||||
if url in visited_urls:
|
||||
return
|
||||
|
||||
driver = create_driver()
|
||||
|
||||
if headers:
|
||||
driver.request_interceptor = interceptor(headers)
|
||||
|
||||
try:
|
||||
driver.get(url)
|
||||
visited_urls.add(url)
|
||||
_ = WebDriverWait(driver, 10).until(
|
||||
EC.presence_of_element_located((By.TAG_NAME, "body"))
|
||||
)
|
||||
page_source = driver.page_source
|
||||
LOG.debug(f"Page source for url: {url}\n{page_source}")
|
||||
pages.add((page_source, url))
|
||||
finally:
|
||||
driver.quit()
|
||||
|
||||
LOG.debug(f"Page source for url: {url}\n{page_source}")
|
||||
return page_source
|
||||
if not multi_page_scrape:
|
||||
return
|
||||
|
||||
soup = BeautifulSoup(page_source, "html.parser")
|
||||
|
||||
for a_tag in soup.find_all("a"):
|
||||
link = a_tag.get("href")
|
||||
|
||||
if link:
|
||||
if not urlparse(link).netloc:
|
||||
base_url = "{0.scheme}://{0.netloc}".format(urlparse(original_url))
|
||||
link = urljoin(base_url, link)
|
||||
|
||||
if link not in visited_urls and is_same_domain(link, original_url):
|
||||
await make_site_request(
|
||||
link,
|
||||
headers=headers,
|
||||
multi_page_scrape=multi_page_scrape,
|
||||
visited_urls=visited_urls,
|
||||
pages=pages,
|
||||
original_url=original_url,
|
||||
)
|
||||
|
||||
|
||||
async def collect_scraped_elements(page: str, xpaths: list[Element]):
|
||||
soup = BeautifulSoup(page, "lxml")
|
||||
async def collect_scraped_elements(page: tuple[str, str], xpaths: list[Element]):
|
||||
soup = BeautifulSoup(page[0], "lxml")
|
||||
root = etree.HTML(str(soup))
|
||||
|
||||
elements: dict[str, list[CapturedElement]] = dict()
|
||||
@@ -86,11 +153,30 @@ async def collect_scraped_elements(page: str, xpaths: list[Element]):
|
||||
|
||||
elements[elem.name] = [captured_element]
|
||||
|
||||
return elements
|
||||
return {page[1]: elements}
|
||||
|
||||
|
||||
async def scrape(url: str, xpaths: list[Element]):
|
||||
page = await make_site_request(url)
|
||||
elements = await collect_scraped_elements(page, xpaths)
|
||||
async def scrape(
|
||||
url: str,
|
||||
xpaths: list[Element],
|
||||
headers: Optional[dict[str, Any]],
|
||||
multi_page_scrape: bool = False,
|
||||
):
|
||||
visited_urls: set[str] = set()
|
||||
pages: set[tuple[str, str]] = set()
|
||||
|
||||
_ = await make_site_request(
|
||||
url,
|
||||
headers,
|
||||
multi_page_scrape=multi_page_scrape,
|
||||
visited_urls=visited_urls,
|
||||
pages=pages,
|
||||
original_url=url,
|
||||
)
|
||||
|
||||
elements: list[dict[str, dict[str, list[CapturedElement]]]] = list()
|
||||
|
||||
for page in pages:
|
||||
elements.append(await collect_scraped_elements(page, xpaths))
|
||||
|
||||
return elements
|
||||
|
||||
48
api/backend/worker/job_worker.py
Normal file
48
api/backend/worker/job_worker.py
Normal file
@@ -0,0 +1,48 @@
|
||||
from api.backend.job import get_queued_job, update_job
|
||||
from api.backend.scraping import scrape
|
||||
from api.backend.models import Element
|
||||
from fastapi.encoders import jsonable_encoder
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def process_job():
|
||||
job = await get_queued_job()
|
||||
if job:
|
||||
LOG.info(f"Beginning processing job: {job}.")
|
||||
try:
|
||||
_ = await update_job(job["id"], field="status", value="Scraping")
|
||||
scraped = await scrape(
|
||||
job["url"],
|
||||
[Element(**j) for j in job["elements"]],
|
||||
job["job_options"]["custom_headers"],
|
||||
job["job_options"]["multi_page_scrape"],
|
||||
)
|
||||
LOG.info(
|
||||
f"Scraped result for url: {job['url']}, with elements: {job['elements']}\n{scraped}"
|
||||
)
|
||||
_ = await update_job(
|
||||
job["id"], field="result", value=jsonable_encoder(scraped)
|
||||
)
|
||||
_ = await update_job(job["id"], field="status", value="Completed")
|
||||
except Exception as e:
|
||||
_ = await update_job(job["id"], field="status", value="Failed")
|
||||
_ = await update_job(job["id"], field="result", value=e)
|
||||
LOG.error(f"Exception as occured: {e}\n{traceback.print_exc()}")
|
||||
|
||||
|
||||
async def main():
|
||||
LOG.info("Starting job worker...")
|
||||
while True:
|
||||
await process_job()
|
||||
await asyncio.sleep(5) # Sleep for 5 seconds before checking for new jobs
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user