Add util script to send batch; add logging

This commit is contained in:
Daoud Clarke 2022-07-18 21:37:19 +01:00
parent 3c97fdb3a0
commit 93307ad1ec
8 changed files with 93 additions and 22 deletions

View file

@ -1,20 +1,50 @@
from mwmbl.tinysearchengine.indexer import TinyIndex, NUM_PAGES, PAGE_SIZE, Document
import logging
import sys
import spacy
from mwmbl.indexer.index import tokenize_document
from mwmbl.indexer.paths import INDEX_PATH
from mwmbl.tinysearchengine.indexer import TinyIndex, Document
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
nlp = spacy.load("en_core_web_sm")
def store():
document = Document(
title='A nation in search of the new black | Theatre | The Guardian',
url='https://www.theguardian.com/stage/2007/nov/18/theatre',
extract="Topic-stuffed and talk-filled, Kwame Kwei-Armah's new play proves that issue-driven drama is (despite reports of its death) still being written and staged…",
score=1.0
)
with TinyIndex(Document, INDEX_PATH, 'w') as tiny_index:
tokenized = tokenize_document(document.url, document.title, document.extract, 1, nlp)
print("Tokenized", tokenized)
# for token in tokenized.tokens:
#
# tiny_index.index(token, document)
def get_items():
tiny_index = TinyIndex(Document, INDEX_PATH, NUM_PAGES, PAGE_SIZE)
items = tiny_index.retrieve('soup')
if items:
for item in items:
print("Items", item)
with TinyIndex(Document, INDEX_PATH) as tiny_index:
items = tiny_index.retrieve('search')
if items:
for item in items:
print("Items", item)
def run():
tiny_index = TinyIndex(Document, INDEX_PATH, NUM_PAGES, PAGE_SIZE)
for i in range(100):
tiny_index.get_page(i)
with TinyIndex(Document, INDEX_PATH) as tiny_index:
for i in range(100000):
page = tiny_index.get_page(i)
for item in page:
if ' search' in item.title:
print("Page", i, item)
if __name__ == '__main__':
run()
# store()
# run()
get_items()

27
analyse/send_batch.py Normal file
View file

@ -0,0 +1,27 @@
"""
Send a batch to a running instance.
"""
import requests
from mwmbl.crawler.batch import Batch, Item, ItemContent
URL = 'http://localhost:5000/crawler/batches/'
def run():
batch = Batch(user_id='test_user_id111111111111111111111111', items=[Item(
url='https://www.theguardian.com/stage/2007/nov/18/theatre',
content=ItemContent(
title='A nation in search of the new black | Theatre | The Guardian',
extract="Topic-stuffed and talk-filled, Kwame Kwei-Armah's new play proves that issue-driven drama is (despite reports of its death) still being written and staged…",
links=[]),
timestamp=123456,
status=200,
)])
result = requests.post(URL, data=batch.json())
print("Result", result.content)
if __name__ == '__main__':
run()

View file

@ -13,18 +13,18 @@ logger = getLogger(__name__)
def run(index_path: str):
historical.run()
# historical.run()
while True:
try:
retrieve_batches()
except Exception:
logger.exception("Error retrieving batches")
# try:
# retrieve_batches()
# except Exception:
# logger.exception("Error retrieving batches")
try:
run_preprocessing(index_path)
except Exception:
logger.exception("Error preprocessing")
try:
run_update(index_path)
except Exception:
logger.exception("Error running index update")
# try:
# run_update(index_path)
# except Exception:
# logger.exception("Error running index update")
sleep(10)

View file

@ -277,7 +277,6 @@ def status():
def queue_batch(batch: HashedBatch):
# TODO: get the score from the URLs database
# TODO: also queue documents for batches sent through the API
documents = [Document(item.content.title, item.url, item.content.extract, 1)
for item in batch.items if item.content is not None]
with Database() as db:

View file

@ -2,6 +2,7 @@
Preprocess local documents for indexing.
"""
import traceback
from logging import getLogger
from time import sleep
import spacy
@ -12,6 +13,9 @@ from mwmbl.indexer.index import tokenize_document
from mwmbl.tinysearchengine.indexer import TinyIndex, Document
logger = getLogger(__name__)
def run(index_path):
while True:
try:
@ -34,7 +38,9 @@ def run_preprocessing(index_path):
with TinyIndex(Document, index_path, 'w') as indexer:
for document in documents:
tokenized = tokenize_document(document.url, document.title, document.extract, 1, nlp)
logger.debug(f"Tokenized: {tokenized}")
page_indexes = [indexer.get_key_page_index(token) for token in tokenized.tokens]
logger.debug(f"Page indexes: {page_indexes}")
index_db.queue_documents_for_page([(tokenized.url, i) for i in page_indexes])

View file

@ -30,6 +30,7 @@ def run_update(index_path):
except ValueError:
documents = documents[:len(documents)//2]
if len(documents) == 0:
print("No more space")
break
print(f"Not enough space, adding {len(documents)}")
index_db.clear_queued_documents_for_page(i)

View file

@ -1,6 +1,7 @@
import argparse
import logging
import os
import sys
from multiprocessing import Process
import uvicorn
@ -14,7 +15,7 @@ from mwmbl.tinysearchengine.completer import Completer
from mwmbl.tinysearchengine.indexer import TinyIndex, Document, NUM_PAGES, PAGE_SIZE
from mwmbl.tinysearchengine.rank import HeuristicRanker
logging.basicConfig()
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
def setup_args():

View file

@ -2,6 +2,7 @@ import json
import os
from dataclasses import astuple, dataclass, asdict
from io import UnsupportedOperation
from logging import getLogger
from mmap import mmap, PROT_READ, PROT_WRITE
from typing import TypeVar, Generic, Callable, List
@ -16,6 +17,9 @@ NUM_PAGES = 5_120_000
PAGE_SIZE = 4096
logger = getLogger(__name__)
@dataclass
class Document:
title: str
@ -92,6 +96,7 @@ class TinyIndex(Generic[T]):
self.page_size = metadata.page_size
self.compressor = ZstdCompressor()
self.decompressor = ZstdDecompressor()
logger.info(f"Loaded index with {self.num_pages} pages and {self.page_size} page size")
self.index_file = None
self.mmap = None
@ -107,13 +112,14 @@ class TinyIndex(Generic[T]):
def retrieve(self, key: str) -> List[T]:
index = self.get_key_page_index(key)
logger.debug(f"Retrieving index {index}")
return self.get_page(index)
def get_key_page_index(self, key) -> int:
key_hash = mmh3.hash(key, signed=False)
return key_hash % self.num_pages
def get_page(self, i):
def get_page(self, i) -> list[T]:
"""
Get the page at index i, decompress and deserialise it using JSON
"""
@ -123,6 +129,7 @@ class TinyIndex(Generic[T]):
def _get_page_tuples(self, i):
page_data = self.mmap[i * self.page_size:(i + 1) * self.page_size]
decompressed_data = self.decompressor.decompress(page_data)
# logger.debug(f"Decompressed data: {decompressed_data}")
return json.loads(decompressed_data.decode('utf8'))
def index(self, key: str, value: T):