WIP: improve method of getting URLs for crawling

This commit is contained in:
Daoud Clarke 2022-12-31 13:32:15 +00:00
parent c69108cfcc
commit 7dae39b780
4 changed files with 55 additions and 18 deletions

View file

@ -6,6 +6,8 @@ from multiprocessing import Queue
from pathlib import Path
from time import sleep
from mwmbl.crawler.urls import URLDatabase
from mwmbl.database import Database
from mwmbl.indexer import index_batches, historical, update_urls
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.paths import BATCH_DIR_NAME, INDEX_NAME
@ -15,15 +17,23 @@ logger = getLogger(__name__)
def run(data_path: str, url_queue: Queue):
logger.info("Started background process")
with Database() as db:
url_db = URLDatabase(db.connection)
url_db.create_tables()
initialize_url_queue(url_queue)
historical.run()
# historical.run()
index_path = Path(data_path) / INDEX_NAME
batch_cache = BatchCache(Path(data_path) / BATCH_DIR_NAME)
while True:
try:
update_url_queue(url_queue)
except Exception:
logger.exception("Error updating URL queue")
return
try:
batch_cache.retrieve_batches(num_batches=10000)
except Exception:

View file

@ -48,12 +48,6 @@ last_batch = None
def get_router(batch_cache: BatchCache, url_queue: Queue):
router = APIRouter(prefix="/crawler", tags=["crawler"])
@router.on_event("startup")
async def on_startup():
with Database() as db:
url_db = URLDatabase(db.connection)
return url_db.create_tables()
@router.post('/batches/')
def create_batch(batch: Batch):
if len(batch.items) > MAX_BATCH_SIZE:

View file

@ -4,15 +4,20 @@ Database storing info on URLs
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from logging import getLogger
from psycopg2.extras import execute_values
from mwmbl.hn_top_domains_filtered import DOMAINS
# Client has one hour to crawl a URL that has been assigned to them, or it will be reassigned
from mwmbl.utils import batch
REASSIGN_MIN_HOURS = 5
BATCH_SIZE = 100
MAX_TOP_DOMAIN_URLS = 10
logger = getLogger(__name__)
class URLStatus(Enum):
@ -43,6 +48,8 @@ class URLDatabase:
self.connection = connection
def create_tables(self):
logger.info("Creating URL tables")
sql = """
CREATE TABLE IF NOT EXISTS urls (
url VARCHAR PRIMARY KEY,
@ -53,8 +60,19 @@ class URLDatabase:
)
"""
index_sql = """
CREATE INDEX IF NOT EXISTS host_index
ON urls(substring(url FROM '.*://([^/]*)'), score)
"""
view_sql = """
CREATE OR REPLACE VIEW url_and_hosts AS SELECT *, substring(url FROM '.*://([^/]*)') AS host FROM urls
"""
with self.connection.cursor() as cursor:
cursor.execute(sql)
cursor.execute(index_sql)
cursor.execute(view_sql)
def update_found_urls(self, found_urls: list[FoundURL]):
if len(found_urls) == 0:
@ -109,27 +127,40 @@ class URLDatabase:
execute_values(cursor, insert_sql, data)
def get_urls_for_crawling(self, num_urls: int):
start = datetime.utcnow()
logger.info("Getting URLs for crawling")
work_mem = "SET work_mem = '512MB'"
sql = f"""
UPDATE urls SET status = {URLStatus.QUEUED.value}, updated = %(now)s
WHERE url IN (
SELECT url FROM urls
WHERE status IN ({URLStatus.NEW.value}) OR (
status = {URLStatus.ASSIGNED.value} AND updated < %(min_updated_date)s
)
ORDER BY score DESC
LIMIT %(num_urls)s
FOR UPDATE SKIP LOCKED
SELECT url FROM (
SELECT url, host, score, rank() OVER (PARTITION BY host ORDER BY score DESC) AS pos
FROM url_and_hosts
WHERE host IN %(domains)s
AND status IN ({URLStatus.NEW.value}) OR (
status = {URLStatus.ASSIGNED.value} AND updated < %(min_updated_date)s
)
) u
WHERE pos < {MAX_TOP_DOMAIN_URLS}
)
RETURNING url
"""
now = datetime.utcnow()
min_updated_date = now - timedelta(hours=REASSIGN_MIN_HOURS)
domains = tuple(DOMAINS.keys())
with self.connection.cursor() as cursor:
cursor.execute(sql, {'min_updated_date': min_updated_date, 'now': now, 'num_urls': num_urls})
cursor.execute(work_mem)
cursor.execute(sql, {'min_updated_date': min_updated_date, 'now': now, 'num_urls': num_urls, 'domains': domains})
results = cursor.fetchall()
return [result[0] for result in results]
total_time_seconds = (datetime.now() - start).total_seconds()
results = [result[0] for result in results]
logger.info(f"Got {len(results)} in {total_time_seconds} seconds")
return results
def get_urls(self, status: URLStatus, num_urls: int):
sql = f"""

View file

@ -10,11 +10,13 @@ logger = getLogger(__name__)
MAX_QUEUE_SIZE = 5000
MIN_QUEUE_SIZE = 1000
def update_url_queue(url_queue: Queue):
logger.info("Updating URL queue")
current_size = url_queue.qsize()
if current_size >= MAX_QUEUE_SIZE:
if current_size >= MIN_QUEUE_SIZE:
logger.info(f"Skipping queue update, current size {current_size}")
return