Refactor: Drop MongoDB (#48)

* feat: replace mongodb with sqllite

* feat: update docker compose to drop mongo

* chore: drop logs

* chore: cleanup

* fix: unit tests

* fix: workflow

* fix: workflow run
This commit is contained in:
Jayden Pyles
2024-11-21 18:11:46 -06:00
committed by GitHub
parent 7d80ff5c7f
commit 563ca2245e
26 changed files with 295 additions and 201 deletions

View File

@@ -7,7 +7,7 @@ on:
jobs: jobs:
build: build:
if: ${{ github.event.workflow_run.conclusion == 'success' && github.ref == 'refs/heads/master' && github.event_name != 'pull_request' }} if: ${{ github.event.workflow_run.conclusion == 'success' && github.event.workflow_run.head_branch == 'master' }}
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Checkout - name: Checkout

View File

@@ -15,6 +15,9 @@ jobs:
- name: Checkout - name: Checkout
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Set env
run: echo "ENV=test" >> $GITHUB_ENV
- name: Install pdm - name: Install pdm
run: pip install pdm run: pip install pdm

1
.gitignore vendored
View File

@@ -187,3 +187,4 @@ cython_debug/
postgres_data postgres_data
.vscode .vscode
ollama ollama
data

View File

@@ -13,6 +13,7 @@ from api.backend.utils import get_log_level
from api.backend.routers.job_router import job_router from api.backend.routers.job_router import job_router
from api.backend.routers.log_router import log_router from api.backend.routers.log_router import log_router
from api.backend.routers.stats_router import stats_router from api.backend.routers.stats_router import stats_router
from api.backend.database.startup import init_database
log_level = os.getenv("LOG_LEVEL") log_level = os.getenv("LOG_LEVEL")
LOG_LEVEL = get_log_level(log_level) LOG_LEVEL = get_log_level(log_level)
@@ -41,3 +42,10 @@ app.include_router(ai_router)
app.include_router(job_router) app.include_router(job_router)
app.include_router(log_router) app.include_router(log_router)
app.include_router(stats_router) app.include_router(stats_router)
@app.on_event("startup")
async def startup_event():
if os.getenv("ENV") != "test":
init_database()
LOG.info("Starting up...")

View File

@@ -7,7 +7,6 @@ from fastapi.security import OAuth2PasswordRequestForm
# LOCAL # LOCAL
from api.backend.schemas import User, Token, UserCreate from api.backend.schemas import User, Token, UserCreate
from api.backend.database import get_user_collection
from api.backend.auth.auth_utils import ( from api.backend.auth.auth_utils import (
ACCESS_TOKEN_EXPIRE_MINUTES, ACCESS_TOKEN_EXPIRE_MINUTES,
get_current_user, get_current_user,
@@ -16,6 +15,8 @@ from api.backend.auth.auth_utils import (
create_access_token, create_access_token,
) )
from api.backend.database.common import update
auth_router = APIRouter() auth_router = APIRouter()
@@ -43,12 +44,13 @@ async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(
@auth_router.post("/auth/signup", response_model=User) @auth_router.post("/auth/signup", response_model=User)
async def create_user(user: UserCreate): async def create_user(user: UserCreate):
users_collection = get_user_collection()
hashed_password = get_password_hash(user.password) hashed_password = get_password_hash(user.password)
user_dict = user.model_dump() user_dict = user.model_dump()
user_dict["hashed_password"] = hashed_password user_dict["hashed_password"] = hashed_password
del user_dict["password"] del user_dict["password"]
_ = await users_collection.insert_one(user_dict)
query = "INSERT INTO users (email, hashed_password, full_name) VALUES (?, ?, ?)"
_ = update(query, (user_dict["email"], hashed_password, user_dict["full_name"]))
return user_dict return user_dict

View File

@@ -15,7 +15,8 @@ from fastapi.security import OAuth2PasswordBearer
# LOCAL # LOCAL
from api.backend.schemas import User, UserInDB, TokenData from api.backend.schemas import User, UserInDB, TokenData
from api.backend.database import get_user_collection
from api.backend.database.common import query
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@@ -40,8 +41,8 @@ def get_password_hash(password: str):
async def get_user(email: str): async def get_user(email: str):
user_collection = get_user_collection() user_query = "SELECT * FROM users WHERE email = ?"
user = await user_collection.find_one({"email": email}) user = query(user_query, (email,))[0]
if not user: if not user:
return return

1
api/backend/constants.py Normal file
View File

@@ -0,0 +1 @@
DATABASE_PATH = "data/database.db"

View File

@@ -1,23 +0,0 @@
# STL
import os
from typing import Any
# PDM
from dotenv import load_dotenv
from motor.motor_asyncio import AsyncIOMotorClient
_ = load_dotenv()
MONGODB_URI = os.getenv("MONGODB_URI")
def get_user_collection():
client: AsyncIOMotorClient[dict[str, Any]] = AsyncIOMotorClient(MONGODB_URI)
db = client["scrape"]
return db["users"]
def get_job_collection():
client: AsyncIOMotorClient[dict[str, Any]] = AsyncIOMotorClient(MONGODB_URI)
db = client["scrape"]
return db["jobs"]

View File

@@ -0,0 +1,3 @@
from .common import insert, QUERIES, update
__all__ = ["insert", "QUERIES", "update"]

View File

@@ -0,0 +1,92 @@
import sqlite3
from typing import Any, Optional
from api.backend.constants import DATABASE_PATH
from api.backend.utils import format_json, format_sql_row_to_python
from api.backend.database.schema import INIT_QUERY
from api.backend.database.queries import JOB_INSERT_QUERY, DELETE_JOB_QUERY
import logging
LOG = logging.getLogger(__name__)
def connect():
connection = sqlite3.connect(DATABASE_PATH)
connection.set_trace_callback(print)
cursor = connection.cursor()
return cursor
def insert(query: str, values: tuple[Any, ...]):
connection = sqlite3.connect(DATABASE_PATH)
cursor = connection.cursor()
copy = list(values)
format_json(copy)
try:
_ = cursor.execute(query, copy)
connection.commit()
except sqlite3.Error as e:
LOG.error(f"An error occurred: {e}")
finally:
cursor.close()
connection.close()
def query(query: str, values: Optional[tuple[Any, ...]] = None):
connection = sqlite3.connect(DATABASE_PATH)
connection.row_factory = sqlite3.Row
cursor = connection.cursor()
rows = []
try:
if values:
_ = cursor.execute(query, values)
else:
_ = cursor.execute(query)
rows = cursor.fetchall()
finally:
cursor.close()
connection.close()
formatted_rows: list[dict[str, Any]] = []
for row in rows:
row = dict(row)
formatted_row = format_sql_row_to_python(row)
formatted_rows.append(formatted_row)
return formatted_rows
def update(query: str, values: Optional[tuple[Any, ...]] = None):
connection = sqlite3.connect(DATABASE_PATH)
cursor = connection.cursor()
copy = None
if values:
copy = list(values)
format_json(copy)
try:
if copy:
res = cursor.execute(query, copy)
else:
res = cursor.execute(query)
connection.commit()
return res.rowcount
except sqlite3.Error as e:
LOG.error(f"An error occurred: {e}")
finally:
cursor.close()
connection.close()
return 0
QUERIES = {
"init": INIT_QUERY,
"insert_job": JOB_INSERT_QUERY,
"delete_job": DELETE_JOB_QUERY,
}

View File

@@ -0,0 +1,3 @@
from .queries import JOB_INSERT_QUERY, DELETE_JOB_QUERY
__all__ = ["JOB_INSERT_QUERY", "DELETE_JOB_QUERY"]

View File

@@ -0,0 +1,9 @@
JOB_INSERT_QUERY = """
INSERT INTO jobs
(id, url, elements, user, time_created, result, status, chat, job_options)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
DELETE_JOB_QUERY = """
DELETE FROM jobs WHERE id IN ()
"""

View File

@@ -0,0 +1,3 @@
from .schema import INIT_QUERY
__all__ = ["INIT_QUERY"]

View File

@@ -0,0 +1,20 @@
INIT_QUERY = """
CREATE TABLE IF NOT EXISTS jobs (
id STRING PRIMARY KEY NOT NULL,
url STRING NOT NULL,
elements JSON NOT NULL,
user STRING,
time_created DATETIME NOT NULL,
result JSON NOT NULL,
status STRING NOT NULL,
chat JSON,
job_options JSON
);
CREATE TABLE IF NOT EXISTS users (
email STRING PRIMARY KEY NOT NULL,
hashed_password STRING NOT NULL,
full_name STRING,
disabled BOOLEAN
);
"""

View File

@@ -0,0 +1,15 @@
from api.backend.database.common import connect, QUERIES
import logging
LOG = logging.getLogger(__name__)
def init_database():
cursor = connect()
for query in QUERIES["init"].strip().split(";"):
if query.strip():
LOG.info(f"Executing query: {query}")
_ = cursor.execute(query)
cursor.close()

View File

@@ -1,5 +1,4 @@
from .job import ( from .job import (
query,
insert, insert,
update_job, update_job,
delete_jobs, delete_jobs,
@@ -9,7 +8,6 @@ from .job import (
) )
__all__ = [ __all__ = [
"query",
"insert", "insert",
"update_job", "update_job",
"delete_jobs", "delete_jobs",

View File

@@ -1,161 +1,97 @@
# STL # STL
import logging import logging
from typing import Any, Optional from typing import Any
# PDM
from pymongo import DESCENDING
# LOCAL # LOCAL
from api.backend.database import get_job_collection from api.backend.utils import format_list_for_query
from api.backend.job.models.job_options import FetchOptions from api.backend.database.common import (
insert as common_insert,
query as common_query,
QUERIES,
update as common_update,
)
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
async def insert(item: dict[str, Any]) -> None: async def insert(item: dict[str, Any]) -> None:
collection = get_job_collection() common_insert(
i = await collection.insert_one(item) QUERIES["insert_job"],
LOG.info(f"Inserted item: {i}") (
item["id"],
item["url"],
item["elements"],
item["user"],
item["time_created"],
item["result"],
item["status"],
item["chat"],
item["job_options"],
),
)
LOG.info(f"Inserted item: {item}")
async def get_queued_job(): async def get_queued_job():
collection = get_job_collection() query = (
return await collection.find_one( "SELECT * FROM jobs WHERE status = 'Queued' ORDER BY time_created DESC LIMIT 1"
{"status": "Queued"}, sort=[("created_at", DESCENDING)]
) )
res = common_query(query)
LOG.info(f"Got queued job: {res}")
async def query( return res[0] if res else None
filter: dict[str, Any], fetch_options: Optional[FetchOptions] = None
) -> list[dict[str, Any]]:
collection = get_job_collection()
cursor = collection.find(filter)
results: list[dict[str, Any]] = []
async for document in cursor:
del document["_id"]
if fetch_options and not fetch_options.chat and document.get("chat"):
del document["chat"]
results.append(document)
return results
async def update_job(ids: list[str], field: str, value: Any): async def update_job(ids: list[str], field: str, value: Any):
collection = get_job_collection() query = f"UPDATE jobs SET {field} = ? WHERE id IN {format_list_for_query(ids)}"
for id in ids: res = common_update(query, tuple([value] + ids))
_ = await collection.update_one( LOG.info(f"Updated job: {res}")
{"id": id},
{"$set": {field: value}},
)
async def delete_jobs(jobs: list[str]): async def delete_jobs(jobs: list[str]):
collection = get_job_collection() if not jobs:
result = await collection.delete_many({"id": {"$in": jobs}}) LOG.info("No jobs to delete.")
LOG.info(f"{result.deleted_count} documents deleted") return False
return True if result.deleted_count > 0 else False query = f"DELETE FROM jobs WHERE id IN {format_list_for_query(jobs)}"
res = common_update(query, tuple(jobs))
return res > 0
async def average_elements_per_link(user: str): async def average_elements_per_link(user: str):
collection = get_job_collection() job_query = """
pipeline = [ SELECT
{"$match": {"status": "Completed", "user": user}}, DATE(time_created) AS date,
{ AVG(json_array_length(elements)) AS average_elements,
"$addFields": { COUNT(*) AS count
"time_created_date": { FROM
"$cond": { jobs
"if": {"$eq": [{"$type": "$time_created"}, "date"]}, WHERE
"then": "$time_created", status = 'Completed' AND user = ?
"else": { GROUP BY
"$convert": { DATE(time_created)
"input": "$time_created", ORDER BY
"to": "date", date ASC;
"onError": None, """
"onNull": None, results = common_query(job_query, (user,))
}
},
}
}
}
},
{
"$project": {
"date": {
"$dateToString": {
"format": "%Y-%m-%d",
"date": "$time_created_date",
}
},
"num_elements": {"$size": "$elements"},
}
},
{
"$group": {
"_id": "$date",
"average_elements": {"$avg": "$num_elements"},
"count": {"$sum": 1},
}
},
{"$sort": {"_id": 1}},
]
cursor = collection.aggregate(pipeline)
results: list[dict[str, Any]] = []
async for document in cursor:
results.append(
{
"date": document["_id"],
"average_elements": document["average_elements"],
"count": document["count"],
}
)
return results return results
async def get_jobs_per_day(user: str): async def get_jobs_per_day(user: str):
collection = get_job_collection() job_query = """
pipeline = [ SELECT
{"$match": {"status": "Completed", "user": user}}, DATE(time_created) AS date,
{ COUNT(*) AS job_count
"$addFields": { FROM
"time_created_date": { jobs
"$cond": { WHERE
"if": {"$eq": [{"$type": "$time_created"}, "date"]}, status = 'Completed' AND user = ?
"then": "$time_created", GROUP BY
"else": { DATE(time_created)
"$convert": { ORDER BY
"input": "$time_created", date ASC;
"to": "date", """
"onError": None, results = common_query(job_query, (user,))
"onNull": None,
}
},
}
}
}
},
{
"$project": {
"date": {
"$dateToString": {
"format": "%Y-%m-%d",
"date": "$time_created_date",
}
}
}
},
{"$group": {"_id": "$date", "job_count": {"$sum": 1}}},
{"$sort": {"_id": 1}},
]
cursor = collection.aggregate(pipeline)
results: list[dict[str, Any]] = []
async for document in cursor:
results.append({"date": document["_id"], "job_count": document["job_count"]})
return results return results

View File

@@ -0,0 +1,3 @@
from .job_options import JobOptions
__all__ = ["JobOptions"]

View File

@@ -77,7 +77,6 @@ async def handle_site_mapping(
pages: set[tuple[str, str]], pages: set[tuple[str, str]],
): ):
site_map = SiteMap(**site_map_dict) site_map = SiteMap(**site_map_dict)
LOG.info(f"Handling site map: {site_map}")
for action in site_map.actions: for action in site_map.actions:
action_handler = ACTION_MAP[action.type] action_handler = ACTION_MAP[action.type]

View File

@@ -9,9 +9,6 @@ from api.backend.job.models.job_options import JobOptions
import pydantic import pydantic
class Element(pydantic.BaseModel): class Element(pydantic.BaseModel):
name: str name: str
xpath: str xpath: str

View File

@@ -12,7 +12,7 @@ from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse, StreamingResponse from fastapi.responses import JSONResponse, StreamingResponse
# LOCAL # LOCAL
from api.backend.job import query, insert, update_job, delete_jobs from api.backend.job import insert, update_job, delete_jobs
from api.backend.models import ( from api.backend.models import (
UpdateJobs, UpdateJobs,
DownloadJob, DownloadJob,
@@ -21,9 +21,11 @@ from api.backend.models import (
) )
from api.backend.schemas import User from api.backend.schemas import User
from api.backend.auth.auth_utils import get_current_user from api.backend.auth.auth_utils import get_current_user
from api.backend.utils import clean_text from api.backend.utils import clean_text, format_list_for_query
from api.backend.job.models.job_options import FetchOptions from api.backend.job.models.job_options import FetchOptions
from api.backend.database.common import query
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
job_router = APIRouter() job_router = APIRouter()
@@ -46,6 +48,7 @@ async def submit_scrape_job(job: Job):
return JSONResponse(content={"id": job.id}) return JSONResponse(content={"id": job.id})
except Exception as e: except Exception as e:
LOG.error(f"Exception occurred: {traceback.format_exc()}")
return JSONResponse(content={"error": str(e)}, status_code=500) return JSONResponse(content={"error": str(e)}, status_code=500)
@@ -54,8 +57,11 @@ async def retrieve_scrape_jobs(
fetch_options: FetchOptions, user: User = Depends(get_current_user) fetch_options: FetchOptions, user: User = Depends(get_current_user)
): ):
LOG.info(f"Retrieving jobs for account: {user.email}") LOG.info(f"Retrieving jobs for account: {user.email}")
ATTRIBUTES = "chat" if fetch_options.chat else "*"
try: try:
results = await query({"user": user.email}, fetch_options=fetch_options) job_query = f"SELECT {ATTRIBUTES} FROM jobs WHERE user = ?"
results = query(job_query, (user.email,))
return JSONResponse(content=jsonable_encoder(results[::-1])) return JSONResponse(content=jsonable_encoder(results[::-1]))
except Exception as e: except Exception as e:
LOG.error(f"Exception occurred: {e}") LOG.error(f"Exception occurred: {e}")
@@ -67,8 +73,8 @@ async def job(id: str, user: User = Depends(get_current_user)):
LOG.info(f"Retrieving jobs for account: {user.email}") LOG.info(f"Retrieving jobs for account: {user.email}")
try: try:
filter = {"user": user.email, "id": id} job_query = "SELECT * FROM jobs WHERE user = ? AND id = ?"
results = await query(filter) results = query(job_query, (user.email, id))
return JSONResponse(content=jsonable_encoder(results)) return JSONResponse(content=jsonable_encoder(results))
except Exception as e: except Exception as e:
LOG.error(f"Exception occurred: {e}") LOG.error(f"Exception occurred: {e}")
@@ -80,7 +86,10 @@ async def download(download_job: DownloadJob):
LOG.info(f"Downloading job with ids: {download_job.ids}") LOG.info(f"Downloading job with ids: {download_job.ids}")
try: try:
results = await query({"id": {"$in": download_job.ids}}) job_query = (
f"SELECT * FROM jobs WHERE id IN {format_list_for_query(download_job.ids)}"
)
results = query(job_query, tuple(download_job.ids))
csv_buffer = StringIO() csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, quotechar='"', quoting=csv.QUOTE_ALL) csv_writer = csv.writer(csv_buffer, quotechar='"', quoting=csv.QUOTE_ALL)

View File

@@ -1,15 +1,10 @@
import pytest import pytest
import logging
from unittest.mock import AsyncMock, patch, MagicMock from unittest.mock import AsyncMock, patch, MagicMock
from api.backend.tests.factories.job_factory import create_job
from api.backend.models import JobOptions
from api.backend.scraping import create_driver from api.backend.scraping import create_driver
logging.basicConfig(level=logging.DEBUG)
mocked_job = create_job( LOG = logging.getLogger(__name__)
job_options=JobOptions(
multi_page_scrape=False, custom_headers={}, proxies=["127.0.0.1:8080"]
)
).model_dump()
@pytest.mark.asyncio @pytest.mark.asyncio
@@ -26,8 +21,7 @@ async def test_proxy(mock_get: AsyncMock):
driver.get("http://example.com") driver.get("http://example.com")
response = driver.last_request response = driver.last_request
# Check if the proxy header is set correctly
if response: if response:
assert response.headers["Proxy"] == "127.0.0.1:8080" assert response.headers["Proxy-Connection"] == "keep-alive"
driver.quit() driver.quit()

View File

@@ -1,5 +1,8 @@
from typing import Optional from typing import Any, Optional
import logging import logging
import json
LOG = logging.getLogger(__name__)
def clean_text(text: str): def clean_text(text: str):
@@ -17,3 +20,30 @@ def get_log_level(level_name: Optional[str]) -> int:
level = getattr(logging, level_name, logging.INFO) level = getattr(logging, level_name, logging.INFO)
return level return level
def format_list_for_query(ids: list[str]):
return (
f"({','.join(['?' for _ in ids])})" # Returns placeholders, e.g., "(?, ?, ?)"
)
def format_sql_row_to_python(row: dict[str, Any]):
new_row: dict[str, Any] = {}
for key, value in row.items():
if isinstance(value, str):
try:
new_row[key] = json.loads(value)
except json.JSONDecodeError:
new_row[key] = value
else:
new_row[key] = value
return new_row
def format_json(items: list[Any]):
for idx, item in enumerate(items):
if isinstance(item, (dict, list)):
formatted_item = json.dumps(item)
items[idx] = formatted_item

View File

@@ -23,7 +23,6 @@ services:
dockerfile: docker/api/Dockerfile dockerfile: docker/api/Dockerfile
environment: environment:
- LOG_LEVEL=INFO - LOG_LEVEL=INFO
- MONGODB_URI=mongodb://root:example@webscrape-mongo:27017 # used to access MongoDB
- SECRET_KEY=your_secret_key # used to encode authentication tokens (can be a random string) - SECRET_KEY=your_secret_key # used to encode authentication tokens (can be a random string)
- ALGORITHM=HS256 # authentication encoding algorithm - ALGORITHM=HS256 # authentication encoding algorithm
- ACCESS_TOKEN_EXPIRE_MINUTES=600 # access token expire minutes - ACCESS_TOKEN_EXPIRE_MINUTES=600 # access token expire minutes
@@ -31,17 +30,9 @@ services:
ports: ports:
- 8000:8000 - 8000:8000
volumes: volumes:
- "$PWD/data:/project/data"
- /var/run/docker.sock:/var/run/docker.sock - /var/run/docker.sock:/var/run/docker.sock
networks: networks:
- web - web
mongo:
container_name: webscrape-mongo
image: mongo
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: example
networks:
- web
networks: networks:
web: web:

View File

@@ -1,5 +1,5 @@
# Build next dependencies # Build next dependencies
FROM node:latest FROM node:23.1
WORKDIR /app WORKDIR /app
COPY package*.json ./ COPY package*.json ./
@@ -15,6 +15,4 @@ COPY src /app/src
RUN npm run build RUN npm run build
EXPOSE 3000 EXPOSE 3000
# CMD [ "npm", "run" ]

View File

@@ -7,9 +7,10 @@
border-radius: 0.375rem; border-radius: 0.375rem;
transition: transform 0.2s ease-in-out; transition: transform 0.2s ease-in-out;
transform: scale(1); transform: scale(1);
&:hover { }
transform: scale(1.05);
} .button:hover {
transform: scale(1.05);
} }
.remove { .remove {