Caching SearXNG

This revision is from 2024/06/24 19:39. You can Restore it.

SearXNG installs itself on /usr/local/searxng/searxng-src, with the main source code in searxng-src directory.

Interesting files are...

  1. webapp.py in /usr/local/searxng/searxng-src/searx/webapp.py : def search()
  2. __init__.py in /usr/local/searxng/searxng-src/searx/search/__init__.py : class Search

A cache implentation here...

  • making a directory in the searx folder named cache
  • make a sub-folder for every possible character in the cache directory, for instance a to z and 0 to 9

  • the cache files are named by and are indentical to the search term
  • check if the filename exists when a search is performed
  • if there is a match read in the local file instead and defer the search
  • send the keywords to cache maintainers so they can update the cache. They can then crawl the search engines and build a more comprehensive cache over time.
  • the user updates their cache, by downloading and appending a distributed database.

Benefits: Why do this?

Imagine a man in the middle that knows your search term before you and performs the search prior, and then returns the result instantly when you pressed enter. The result is the same, except it would be very much faster. That is what a cache does, it speeds up the process. It also allows for a more comprehensive search, if I could perform searches across all the search engines, compile, optimize and store that data on disk all the while awaiting the user to search the keyword, when the term was searched again, the result would be not only fast but comprehensive.

Moreover, it could turn searXNG into a full search engine built from caching results, secondly offline searching becomes possible if the cache gets big enough.

Searx is privacy focused search engine, so disclosure to end user that however anonymous, caching requires keywords/search term sharing. That is how the cache is built. Opt out.

Proposed searXNG options:

  • use cache
  • update the cache daily

Make the cache directories

sudo mkdir -p /usr/local/searxng/searxng-src/searx/cache

sudo mkdir -p /usr/local/searxng/searxng-src/searx/cache/\! \@ \# \$ \% \& \? a b c d e f g h i j k l m n o p q r s t u v w x y z 0 1 2 3 4 5 6 7 8 9

sudo chown -R root:searxng /usr/local/searxng/searxng-src/searx/cache

sudo chmod -R 777 /usr/local/searxng/searxng-src/searx/cache

File: webapp.py: def search(): Line 625: Inject Line 662

The cache filename is the search_term, keyword. So its a file exists if else. Get the first letter of the search term to determine which directory, then complete path to determine if file exists, if the file exists return the json file otherwise perform the search.

def search():

"""Search query in q and return results.

Supported outputs: html, json, csv, rss.

"""

# pylint: disable=too-many-locals, too-many-return-statements, too-many-branches

# pylint: disable=too-many-statements

# output_format

output_format = request.form.get('format', 'html')

if output_format not in OUTPUT_FORMATS:

output_format = 'html'

if output_format not in settings['search']['formats']:

flask.abort(403)

# check if there is query (not None and not an empty string)

if not request.form.get('q'):

if output_format == 'html':

return render(

# fmt: off

'index.html',

selected_categories=get_selected_categories(request.preferences, request.form),

# fmt: on

)

return index_error(output_format, 'No query'), 400

# search

search_query = None

raw_text_query = None

result_container = None

try:

search_query, raw_text_query, _, _, selected_locale = get_search_query_from_webapp(

request.preferences, request.form

)

search = SearchWithPlugins(search_query, request.user_plugins, request) # pylint: disable=redefined-outer-name

result_container = search.search()

############ Start new code

fname = request.form['q'] + str(search_query.pageno) + str(search_query.categories[0])

first_char = fname[:1].lower()

if not first_char.isalnum():

first_char = '#'

file_path = os.path.abspath(os.path.join("cache", fname[:1].lower(), fname))

if not os.path.exists(file_path):

responsex = webutils.get_json_response(search_query, result_container)

if len(responsex.strip()) > 1000: # Checking length greater than 2 to ensure it's not just '{}'

with open(file_path, "w") as text_file:

text_file.write(responsex) # json.dump(responsex, text_file)

############# End new code

except SearxParameterException as e:

logger.exception('search error: SearxParameterException')

return index_error(output_format, e.message), 400

except Exception as e: # pylint: disable=broad-except

logger.exception(e, exc_info=True)

return index_error(output_format, gettext('search error')), 500

File: __init__.py in search, entire file for replacement

# SPDX-License-Identifier: AGPL-3.0-or-later # lint: pylint # pylint: disable=missing-module-docstring, too-few-public-methods

import os

import time

import json

import threading

from copy import copy

from timeit import default_timer

from uuid import uuid4

from datetime import datetime

from pathlib import Path

import traceback

from typing import List, Tuple

import flask

from flask import copy_current_request_context

import babel

from searx import settings

from searx.answerers import ask

from searx.external_bang import get_bang_url

from searx.results import ResultContainer

from searx import logger

from searx.plugins import plugins

from searx.search.models import EngineRef, SearchQuery

from searx.engines import load_engines

from searx.network import initialize as initialize_network, check_network_configuration

from searx.metrics import initialize as initialize_metrics, counter_inc, histogram_observe_time

from searx.search.processors import PROCESSORS, initialize as initialize_processors

from searx.search.checker import initialize as initialize_checker

logger = logger.getChild('search')

def initialize(settings_engines=None, enable_checker=False, check_network=False, enable_metrics=True):

settings_engines = settings_engines or settings['engines']

load_engines(settings_engines)

initialize_network(settings_engines, settings['outgoing'])

if check_network:

check_network_configuration()

initialize_metrics([engine['name'] for engine in settings_engines], enable_metrics)

initialize_processors(settings_engines)

if enable_checker:

initialize_checker()

class Search:

"""Search information container"""

__slots__ = "search_query", "result_container", "start_time", "actual_timeout"

def __init__(self, search_query: SearchQuery):

"""Initialize the Search"""

# init vars

super().__init__()

self.search_query = search_query

self.result_container = ResultContainer()

self.start_time = None

self.actual_timeout = None

def search_external_bang(self):

"""

Check if there is a external bang.

If yes, update self.result_container and return True

"""

if self.search_query.external_bang:

self.result_container.redirect_url = get_bang_url(self.search_query)

# This means there was a valid bang and the

# rest of the search does not need to be continued

if isinstance(self.result_container.redirect_url, str):

return True

return False

def search_answerers(self):

"""

Check if an answer return a result.

If yes, update self.result_container and return True

"""

answerers_results = ask(self.search_query)

if answerers_results:

for results in answerers_results:

self.result_container.extend('answer', results)

return True

return False

# do search-request

def _get_requests(self):

# init vars

requests = []

# max of all selected engine timeout

default_timeout = 0

# start search-request for all selected engines

for engineref in self.search_query.engineref_list:

processor = PROCESSORS[engineref.name]

# stop the request now if the engine is suspend

if processor.extend_container_if_suspended(self.result_container):

continue

# set default request parameters

request_params = processor.get_params(self.search_query, engineref.category)

if request_params is None:

continue

counter_inc('engine', engineref.name, 'search', 'count', 'sent')

# append request to list

requests.append((engineref.name, self.search_query.query, request_params))

# update default_timeout

default_timeout = max(default_timeout, processor.engine.timeout)

# adjust timeout

max_request_timeout = settings['outgoing']['max_request_timeout']

actual_timeout = default_timeout

query_timeout = self.search_query.timeout_limit

if max_request_timeout is None and query_timeout is None:

# No max, no user query: default_timeout

pass

elif max_request_timeout is None and query_timeout is not None:

# No max, but user query: From user query except if above default

actual_timeout = min(default_timeout, query_timeout)

elif max_request_timeout is not None and query_timeout is None:

# Max, no user query: Default except if above max

actual_timeout = min(default_timeout, max_request_timeout)

elif max_request_timeout is not None and query_timeout is not None:

# Max & user query: From user query except if above max

actual_timeout = min(query_timeout, max_request_timeout)

logger.debug(

"actual_timeout={0} (default_timeout={1}, ?timeout_limit={2}, max_request_timeout={3})".format(

actual_timeout, default_timeout, query_timeout, max_request_timeout

)

)

return requests, actual_timeout

def search_multiple_requests(self, requests):

# pylint: disable=protected-access

search_id = str(uuid4())

for engine_name, query, request_params in requests:

_search = copy_current_request_context(PROCESSORS[engine_name].search)

th = threading.Thread( # pylint: disable=invalid-name

target=_search,

args=(query, request_params, self.result_container, self.start_time, self.actual_timeout),

name=search_id,

)

th._timeout = False

th._engine_name = engine_name

th.start()

for th in threading.enumerate(): # pylint: disable=invalid-name

if th.name == search_id:

remaining_time = max(0.0, self.actual_timeout - (default_timer() - self.start_time))

th.join(remaining_time)

if th.is_alive():

th._timeout = True

self.result_container.add_unresponsive_engine(th._engine_name, 'timeout')

PROCESSORS[th._engine_name].logger.error('engine timeout')

def search_multiple_requests2(self, requests):

# pylint: disable=protected-access

search_id = str(uuid4())

mock_result_container = ResultContainer()

# Modify the path to load the JSON data

cache_dir = 'cache'

query_dir = os.path.join(cache_dir, self.search_query.query[0].lower())

fname = self.search_query.query.lower() + str(self.search_query.pageno) + str(self.search_query.categories[0])

# fname = self.search_query.query + str(self.search_query.pageno)

mock_data_filename = os.path.join(query_dir, fname)

with open(mock_data_filename, encoding='utf-8') as mock_data_file:

mock_data = json.load(mock_data_file)

mock_results = mock_data['results'] # Extract 'results' from the JSON data

threads = []

for engine_name, _, _ in requests:

th = threading.Thread(

target=self.mock_search_function,

args=(engine_name, mock_results, mock_result_container),

name=search_id,

)

th._timeout = False

th._engine_name = engine_name

th.start()

threads.append(th)

remaining_time = None

for th in threads:

if th.name == search_id:

if remaining_time is None:

remaining_time = self.actual_timeout - (default_timer() - self.start_time)

th.join(remaining_time)

if th.is_alive():

th._timeout = True

self.result_container.add_unresponsive_engine(th._engine_name, 'timeout')

PROCESSORS[th._engine_name].logger.error('engine timeout')

# Wait for all threads to finish, even if some have timed out

for th in threads:

th.join()

# Copy the mock results to the actual result_container

self.result_container = mock_result_container

def mock_search_function(self, engine_name, mock_results, result_container):

# This is a mock search function

time.sleep(0.1) # Simulate some processing time

# Convert 'publishedDate' string to datetime object

for result in mock_results:

if 'publishedDate' in result:

if isinstance(result['publishedDate'], str):

result['publishedDate'] = datetime.fromisoformat(result['publishedDate'])

result_container.extend(engine_name, mock_results)

def search_standard(self):

"""

Update self.result_container, self.actual_timeout

"""

requests, self.actual_timeout = self._get_requests()

# Modify the path to load the JSON data

cache_dir = 'cache'

query_dir = os.path.join(cache_dir, self.search_query.query[0].lower()) # Force entire query to lowercase

fname = self.search_query.query.lower() + str(self.search_query.pageno) + str(self.search_query.categories[0]) # Force entire file name to lowercase

mock_data_filename = os.path.join(query_dir, fname)

# with open('categories.txt', 'w') as f: # f.write(str(self.search_query.categories))

# send all search-request

if requests:

# Check if the file exists in the cache directory

if os.path.isfile(mock_data_filename): # and self.search_query.categories[0] == 'general':

self.search_multiple_requests2(requests)

else:

self.search_multiple_requests(requests)

# if os.path.isfile(mock_data_filename): # self.search_multiple_requests2(requests) # else: # self.search_multiple_requests(requests)

# return results, suggestions, answers and infoboxes

return True

# do search-request

def search(self) -> ResultContainer:

self.start_time = default_timer()

if not self.search_external_bang():

if not self.search_answerers():

self.search_standard()

return self.result_container

class SearchWithPlugins(Search):

"""Inherit from the Search class, add calls to the plugins."""

__slots__ = 'ordered_plugin_list', 'request'

def __init__(self, search_query: SearchQuery, ordered_plugin_list, request: flask.Request):

super().__init__(search_query)

self.ordered_plugin_list = ordered_plugin_list

self.result_container.on_result = self._on_result

# pylint: disable=line-too-long

# get the "real" request to use it outside the Flask context.

# see

# * https://github.com/pallets/flask/blob/d01d26e5210e3ee4cbbdef12f05c886e08e92852/src/flask/globals.py#L55

# * https://github.com/pallets/werkzeug/blob/3c5d3c9bd0d9ce64590f0af8997a38f3823b368d/src/werkzeug/local.py#L548-L559

# * https://werkzeug.palletsprojects.com/en/2.0.x/local/#werkzeug.local.LocalProxy._get_current_object

# pylint: enable=line-too-long

self.request = request._get_current_object()

def _on_result(self, result):

return plugins.call(self.ordered_plugin_list, 'on_result', self.request, self, result)

def search(self) -> ResultContainer:

if plugins.call(self.ordered_plugin_list, 'pre_search', self.request, self):

super().search()

plugins.call(self.ordered_plugin_list, 'post_search', self.request, self)

self.result_container.close()

return self.result_container

Testing the cache

sudo systemctl restart uwsgi

  1. Do the search Immortality Coin
  2. Go to the cache directory letter i
  3. Open the file and edit a title in the json file
  4. Perform another search immortality coin, does it display the altered result?
  

📝 📜 ⏱️ ⬆️