Caching SearXNG Version 2

This revision is from 2024/08/03 02:52. You can Restore it.

In version 2 the caching filenames are hashed which means that international characters and symbols are supported and less likely to break searx.

  1. webapp.py in /usr/local/searxng/searxng-src/searx/webapp.py : def search()
  2. __init__.py in /usr/local/searxng/searxng-src/searx/search/__init__.py : class Search

sudo mkdir -p /usr/local/searxng/searxng-src/searx/cache

for i in {0..255}; do sudo mkdir -p /usr/local/searxng/searxng-src/searx/cache/$(printf "%02x" $i); done

sudo chown -R searxng:searxng /usr/local/searxng/searxng-src/searx/cache

sudo chmod -R 755 /usr/local/searxng/searxng-src/searx/cache

webapp.py

import hashlib

import os

fname = request.form['q'] + str(search_query.pageno) + str(search_query.categories[0])

# Generate a hash of the search term

hash_object = hashlib.md5(fname.encode())

hex_dig = hash_object.hexdigest()

subdirectory = hex_dig[:2] # Use the first 2 characters of the hash as the subdirectory name

cache_dir = os.path.abspath(os.path.join("cache", subdirectory))

if not os.path.exists(cache_dir):

os.makedirs(cache_dir)

file_path = os.path.join(cache_dir, hex_dig) # Use the hash as the filename

if not os.path.exists(file_path):

responsex = webutils.get_json_response(search_query, result_container)

if len(responsex.strip()) > 1000: # Checking length greater than 2 to ensure it's not just '{}'

with open(file_path, "w") as text_file:

text_file.write(responsex) # json.dump(responsex, text_file)

__init__.py

import hashlib

import os

def search_standard(self):

requests, self.actual_timeout = self._get_requests()

cache_dir = 'cache'

fname = self.search_query.query.lower() + str(self.search_query.pageno) + str(self.search_query.categories[0])

hash_object = hashlib.md5(fname.encode())

hex_dig = hash_object.hexdigest()

subdirectory = hex_dig[:2]

hashed_filename = hex_dig # Use the full hash as the filename

query_dir = os.path.join(cache_dir, subdirectory)

mock_data_filename = os.path.join(query_dir, hashed_filename)

if requests:

if os.path.isfile(mock_data_filename):

self.search_multiple_requests2(requests, hashed_filename)

else:

self.search_multiple_requests(requests)

return True

__init__.py

def search_multiple_requests2(self, requests, hashed_filename):

search_id = str(uuid4())

mock_result_container = ResultContainer()

cache_dir = 'cache'

fname = self.search_query.query.lower() + str(self.search_query.pageno) + str(self.search_query.categories[0])

hash_object = hashlib.md5(fname.encode())

hex_dig = hash_object.hexdigest()

subdirectory = hex_dig[:2]

query_dir = os.path.join(cache_dir, subdirectory)

mock_data_filename = os.path.join(query_dir, hashed_filename)

with open(mock_data_filename, encoding='utf-8') as mock_data_file:

mock_data = json.load(mock_data_file)

mock_results = mock_data['results']

threads = []

for engine_name, _, _ in requests:

th = threading.Thread(

target=self.mock_search_function,

args=(engine_name, mock_results, mock_result_container),

name=search_id,

)

th._timeout = False

th._engine_name = engine_name

th.start()

threads.append(th)

remaining_time = None

for th in threads:

if th.name == search_id:

if remaining_time is None:

remaining_time = self.actual_timeout - (default_timer() - self.start_time)

th.join(remaining_time)

if th.is_alive():

th._timeout = True

self.result_container.add_unresponsive_engine(th._engine_name, 'timeout')

PROCESSORS[th._engine_name].logger.error('engine timeout')

for th in threads:

th.join()

self.result_container = mock_result_container

def mock_search_function(self, engine_name, mock_results, result_container):

time.sleep(0.1)

for result in mock_results:

if 'publishedDate' in result:

if isinstance(result['publishedDate'], str):

result['publishedDate'] = datetime.fromisoformat(result['publishedDate'])

result_container.extend(engine_name, mock_results)

A bett version, perhaps version 3

searx/webapp.py

@app.route('/search', methods=['GET', 'POST'])

def search():

"""Search query in q and return results.

Supported outputs: html, json, csv, rss.

"""

# pylint: disable=too-many-locals, too-many-return-statements, too-many-branches

# pylint: disable=too-many-statements

# output_format

output_format = request.form.get('format', 'html')

if output_format not in OUTPUT_FORMATS:

output_format = 'html'

if output_format not in settings['search']['formats']:

flask.abort(403)

# check if there is query (not None and not an empty string)

if not request.form.get('q'):

if output_format == 'html':

return render(

# fmt: off

'index.html',

selected_categories=get_selected_categories(request.preferences, request.form),

# fmt: on

)

return index_error(output_format, 'No query'), 400

# search

search_query = None

raw_text_query = None

result_container = None

try:

search_query, raw_text_query, _, _, selected_locale = get_search_query_from_webapp(

request.preferences, request.form

)

search = SearchWithPlugins(search_query, request.user_plugins, request) # pylint: disable=redefined-outer-name

result_container = search.search()

#

fname = request.form['q'] + str(search_query.pageno) + str(search_query.categories[0])

# Generate a hash of the search term

hash_object = hashlib.md5(fname.encode())

hex_dig = hash_object.hexdigest()

subdirectory = hex_dig[:2] # Use the first 2 characters of the hash as the subdirectory name

cache_dir = os.path.abspath(os.path.join("cache", subdirectory))

if not os.path.exists(cache_dir):

os.makedirs(cache_dir)

file_path = os.path.join(cache_dir, hex_dig) # Use the hash as the filename

if not os.path.exists(file_path):

responsex = webutils.get_json_response(search_query, result_container)

if len(responsex.strip()) > 1000: # Checking length greater than 2 to ensure it's not just '{}'

with open(file_path, "w") as text_file:

text_file.write(responsex) # json.dump(responsex, text_file)

#

except SearxParameterException as e:

logger.exception('search error: SearxParameterException')

return index_error(output_format, e.message), 400

except Exception as e: # pylint: disable=broad-except

logger.exception(e, exc_info=True)

return index_error(output_format, gettext('search error')), 500

# 1. check if the result is a redirect for an external bang

if result_container.redirect_url:

return redirect(result_container.redirect_url)

# 2. add Server-Timing header for measuring performance characteristics of

# web applications

request.timings = result_container.get_timings() # pylint: disable=assigning-non-slot

# 3. formats without a template

if output_format == 'json':

response = webutils.get_json_response(search_query, result_container)

return Response(response, mimetype='application/json')

if output_format == 'csv':

csv = webutils.CSVWriter(StringIO())

webutils.write_csv_response(csv, result_container)

csv.stream.seek(0)

response = Response(csv.stream.read(), mimetype='application/csv')

cont_disp = 'attachment;Filename=searx_-_{0}.csv'.format(search_query.query)

response.headers.add('Content-Disposition', cont_disp)

return response

# 4. formats rendered by a template / RSS & HTML

current_template = None

previous_result = None

results = result_container.get_ordered_results()

if search_query.redirect_to_first_result and results:

return redirect(results[0]['url'], 302)

for result in results:

if output_format == 'html':

if 'content' in result and result['content']:

result['content'] = highlight_content(escape(result['content'][:1024]), search_query.query)

if 'title' in result and result['title']:

result['title'] = highlight_content(escape(result['title'] or ''), search_query.query)

if 'url' in result:

result['pretty_url'] = webutils.prettify_url(result['url'])

if result.get('publishedDate'): # do not try to get a date from an empty string or a None type

try: # test if publishedDate >= 1900 (datetime module bug)

result['pubdate'] = result['publishedDate'].strftime('%Y-%m-%d %H:%M:%S%z')

except ValueError:

result['publishedDate'] = None

else:

result['publishedDate'] = webutils.searxng_l10n_timespan(result['publishedDate'])

# set result['open_group'] = True when the template changes from the previous result

# set result['close_group'] = True when the template changes on the next result

if current_template != result.get('template'):

result['open_group'] = True

if previous_result:

previous_result['close_group'] = True # pylint: disable=unsupported-assignment-operation

current_template = result.get('template')

previous_result = result

if previous_result:

previous_result['close_group'] = True

# 4.a RSS

if output_format == 'rss':

response_rss = render(

'opensearch_response_rss.xml',

results=results,

answers=result_container.answers,

corrections=result_container.corrections,

suggestions=result_container.suggestions,

q=request.form['q'],

number_of_results=result_container.number_of_results,

)

return Response(response_rss, mimetype='text/xml')

# 4.b HTML

# suggestions: use RawTextQuery to get the suggestion URLs with the same bang

suggestion_urls = list(

map(

lambda suggestion: {'url': raw_text_query.changeQuery(suggestion).getFullQuery(), 'title': suggestion},

result_container.suggestions,

)

)

correction_urls = list(

map(

lambda correction: {'url': raw_text_query.changeQuery(correction).getFullQuery(), 'title': correction},

result_container.corrections,

)

)

# search_query.lang contains the user choice (all, auto, en, ...)

# when the user choice is "auto", search.search_query.lang contains the detected language

# otherwise it is equals to search_query.lang

return render(

# fmt: off

'results.html',

results = results,

q=request.form['q'],

selected_categories = search_query.categories,

pageno = search_query.pageno,

time_range = search_query.time_range or '',

number_of_results = format_decimal(result_container.number_of_results),

suggestions = suggestion_urls,

answers = result_container.answers,

corrections = correction_urls,

infoboxes = result_container.infoboxes,

engine_data = result_container.engine_data,

paging = result_container.paging,

unresponsive_engines = webutils.get_translated_errors(

result_container.unresponsive_engines

),

current_locale = request.preferences.get_value("locale"),

current_language = selected_locale,

search_language = match_locale(

search.search_query.lang,

settings['search']['languages'],

fallback=request.preferences.get_value("language")

),

timeout_limit = request.form.get('timeout_limit', None)

# fmt: on

)

searx/search/__init__.py

def search_multiple_requests2(self, requests, hashed_filename):

search_id = str(uuid4())

mock_result_container = ResultContainer()

cache_dir = 'cache'

fname = self.search_query.query + str(self.search_query.pageno) + str(self.search_query.categories[0])

hash_object = hashlib.md5(fname.encode())

hex_dig = hash_object.hexdigest()

subdirectory = hex_dig[:2]

query_dir = os.path.join(cache_dir, subdirectory)

mock_data_filename = os.path.join(query_dir, hashed_filename)

with open(mock_data_filename, encoding='utf-8') as mock_data_file:

mock_data = json.load(mock_data_file)

mock_results = mock_data['results']

mock_infoboxes = mock_data.get('infoboxes', [])

mock_suggestions = mock_data.get('suggestions', [])

mock_answers = mock_data.get('answers', [])

mock_number_of_results = mock_data.get('number_of_results', 0)

# Process results for each engine

for engine_name, _, _ in requests:

self.mock_search_function(engine_name, mock_results, mock_result_container)

mock_result_container.infoboxes.extend(mock_infoboxes)

mock_result_container.suggestions = mock_suggestions

mock_result_container.answers = {answer: {'answer': answer} for answer in mock_answers}

mock_result_container.number_of_results = mock_number_of_results

self.result_container = mock_result_container

def mock_search_function(self, engine_name, mock_results, result_container):

engine_results = []

for result in mock_results:

if engine_name in result.get('engines', []):

result_copy = result.copy()

result_copy['engine'] = engine_name

if 'publishedDate' in result_copy and isinstance(result_copy['publishedDate'], str):

result_copy['publishedDate'] = datetime.fromisoformat(result_copy['publishedDate'])

engine_results.append(result_copy)

result_container.extend(engine_name, engine_results)

def search_standard(self):

requests, self.actual_timeout = self._get_requests()

cache_dir = 'cache'

fname = self.search_query.query + str(self.search_query.pageno) + str(self.search_query.categories[0])

hash_object = hashlib.md5(fname.encode())

hex_dig = hash_object.hexdigest()

subdirectory = hex_dig[:2]

hashed_filename = hex_dig # Use the full hash as the filename

query_dir = os.path.join(cache_dir, subdirectory)

mock_data_filename = os.path.join(query_dir, hashed_filename)

if requests:

if os.path.isfile(mock_data_filename):

self.search_multiple_requests2(requests, hashed_filename)

else:

self.search_multiple_requests(requests)

return True

# do search-request

def search(self) -> ResultContainer:

self.start_time = default_timer()

if not self.search_external_bang():

if not self.search_answerers():

self.search_standard()

return self.result_container

searx/results.py

class ResultContainer:

"""docstring for ResultContainer"""

__slots__ = (

'_merged_results',

'infoboxes',

'suggestions',

'answers',

'corrections',

'_number_of_results',

'_closed',

'paging',

'unresponsive_engines',

'timings',

'redirect_url',

'engine_data',

'on_result',

'_lock',

)

class ResultContainer:

def extend(self, engine_name, results):

if engine_name not in self.results:

self.results[engine_name] = []

self.results[engine_name].extend(results)

def __init__(self):

super().__init__()

self._merged_results = []

self.infoboxes = []

self.suggestions = set()

self.answers = {}

self.corrections = set()

self._number_of_results = []

self.engine_data = defaultdict(dict)

self._closed = False

self.paging = False

self.unresponsive_engines: Set[UnresponsiveEngine] = set()

self.timings: List[Timing] = []

self.redirect_url = None

self.on_result = lambda _: True

self._lock = RLock()

def extend(self, engine_name, results): # pylint: disable=too-many-branches

if self._closed:

return

standard_result_count = 0

error_msgs = set()

for result in list(results):

result['engine'] = engine_name

if 'suggestion' in result and self.on_result(result):

self.suggestions.add(result['suggestion'])

elif 'answer' in result and self.on_result(result):

self.answers[result['answer']] = result

elif 'correction' in result and self.on_result(result):

self.corrections.add(result['correction'])

elif 'infobox' in result and self.on_result(result):

self._merge_infobox(result)

elif 'number_of_results' in result and self.on_result(result):

self._number_of_results.append(result['number_of_results'])

elif 'engine_data' in result and self.on_result(result):

self.engine_data[engine_name][result['key']] = result['engine_data']

elif 'url' in result:

# standard result (url, title, content)

if not self._is_valid_url_result(result, error_msgs):

continue

# normalize the result

self._normalize_url_result(result)

# call on_result call searx.search.SearchWithPlugins._on_result

# which calls the plugins

if not self.on_result(result):

continue

self.__merge_url_result(result, standard_result_count + 1)

standard_result_count += 1

elif self.on_result(result):

self.__merge_result_no_url(result, standard_result_count + 1)

standard_result_count += 1

if len(error_msgs) > 0:

for msg in error_msgs:

count_error(engine_name, 'some results are invalids: ' + msg, secondary=True)

if engine_name in engines:

histogram_observe(standard_result_count, 'engine', engine_name, 'result', 'count')

if not self.paging and engine_name in engines and engines[engine_name].paging:

self.paging = True

def _merge_infobox(self, infobox):

add_infobox = True

infobox_id = infobox.get('id', None)

infobox['engines'] = set([infobox['engine']])

if infobox_id is not None:

parsed_url_infobox_id = urlparse(infobox_id)

with self._lock:

for existingIndex in self.infoboxes:

if compare_urls(urlparse(existingIndex.get('id', '')), parsed_url_infobox_id):

merge_two_infoboxes(existingIndex, infobox)

add_infobox = False

if add_infobox:

self.infoboxes.append(infobox)

def _is_valid_url_result(self, result, error_msgs):

if 'url' in result:

if not isinstance(result['url'], str):

logger.debug('result: invalid URL: %s', str(result))

error_msgs.add('invalid URL')

return False

if 'title' in result and not isinstance(result['title'], str):

logger.debug('result: invalid title: %s', str(result))

error_msgs.add('invalid title')

return False

if 'content' in result:

if not isinstance(result['content'], str):

logger.debug('result: invalid content: %s', str(result))

error_msgs.add('invalid content')

return False

return True

def _normalize_url_result(self, result):

"""Return True if the result is valid"""

result['parsed_url'] = urlparse(result['url'])

# if the result has no scheme, use http as default

if not result['parsed_url'].scheme:

result['parsed_url'] = result['parsed_url']._replace(scheme="http")

result['url'] = result['parsed_url'].geturl()

# avoid duplicate content between the content and title fields

if result.get('content') == result.get('title'):

del result['content']

# make sure there is a template

if 'template' not in result:

result['template'] = 'default.html'

# strip multiple spaces and carriage returns from content

if result.get('content'):

result['content'] = WHITESPACE_REGEX.sub(' ', result['content'])

def __merge_url_result(self, result, position):

result['engines'] = set([result['engine']])

with self._lock:

duplicated = self.__find_duplicated_http_result(result)

if duplicated:

self.__merge_duplicated_http_result(duplicated, result, position)

return

# if there is no duplicate found, append result

result['positions'] = [position]

self._merged_results.append(result)

def __find_duplicated_http_result(self, result):

result_template = result.get('template')

for merged_result in self._merged_results:

if 'parsed_url' not in merged_result:

continue

if compare_urls(result['parsed_url'], merged_result['parsed_url']) and result_template == merged_result.get(

'template'

):

if result_template != 'images.html':

# not an image, same template, same url : it's a duplicate

return merged_result

# it's an image

# it's a duplicate if the parsed_url, template and img_src are different

if result.get('img_src', ) == merged_result.get('img_src', ):

return merged_result

return None

def __merge_duplicated_http_result(self, duplicated, result, position):

# using content with more text

if result_content_len(result.get('content', )) > result_content_len(duplicated.get('content', )):

duplicated['content'] = result['content']

# merge all result's parameters not found in duplicate

for key in result.keys():

if not duplicated.get(key):

duplicated[key] = result.get(key)

# add the new position

duplicated['positions'].append(position)

# add engine to list of result-engines

duplicated['engines'].add(result['engine'])

# using https if possible

if duplicated['parsed_url'].scheme != 'https' and result['parsed_url'].scheme == 'https':

duplicated['url'] = result['parsed_url'].geturl()

duplicated['parsed_url'] = result['parsed_url']

def __merge_result_no_url(self, result, position):

result['engines'] = set([result['engine']])

result['positions'] = [position]

with self._lock:

self._merged_results.append(result)

def close(self):

self._closed = True

for result in self._merged_results:

result['score'] = result_score(result, result.get('priority'))

# removing html content and whitespace duplications

if result.get('content'):

result['content'] = utils.html_to_text(result['content']).strip()

if result.get('title'):

result['title'] = ' '.join(utils.html_to_text(result['title']).strip().split())

for result_engine in result['engines']:

counter_add(result['score'], 'engine', result_engine, 'score')

results = sorted(self._merged_results, key=itemgetter('score'), reverse=True)

# pass 2 : group results by category and template

gresults = []

categoryPositions = {}

for res in results:

# do we need to handle more than one category per engine?

engine = engines[res['engine']]

res['category'] = engine.categories[0] if len(engine.categories) > 0 else ''

# do we need to handle more than one category per engine?

category = (

res['category']

+ ':'

+ res.get('template', '')

+ ':'

+ ('img_src' if 'img_src' in res or 'thumbnail' in res else '')

)

current = None if category not in categoryPositions else categoryPositions[category]

# group with previous results using the same category

# if the group can accept more result and is not too far

# from the current position

if current is not None and (current['count'] > 0) and (len(gresults) - current['index'] < 20):

# group with the previous results using

# the same category with this one

index = current['index']

gresults.insert(index, res)

# update every index after the current one

# (including the current one)

for k in categoryPositions: # pylint: disable=consider-using-dict-items

v = categoryPositions[k]['index']

if v >= index:

categoryPositions[k]['index'] = v + 1

# update this category

current['count'] -= 1

else:

# same category

gresults.append(res)

# update categoryIndex

categoryPositions[category] = {'index': len(gresults), 'count': 8}

# update _merged_results

self._merged_results = gresults

def get_ordered_results(self):

if not self._closed:

self.close()

return self._merged_results

def results_length(self):

return len(self._merged_results)

@property

def number_of_results(self) -> int:

"""Returns the average of results number, returns zero if the average

result number is smaller than the actual result count."""

with self._lock:

if not self._closed:

logger.error("call to ResultContainer.number_of_results before ResultContainer.close")

return 0

resultnum_sum = sum(self._number_of_results)

if not resultnum_sum or not self._number_of_results:

return 0

average = int(resultnum_sum / len(self._number_of_results))

if average < self.results_length():

average = 0

return average

@number_of_results.setter

def number_of_results(self, value):

with self._lock:

self._number_of_results.append(value)

def add_unresponsive_engine(self, engine_name: str, error_type: str, suspended: bool = False):

with self._lock:

if self._closed:

logger.error("call to ResultContainer.add_unresponsive_engine after ResultContainer.close")

return

if engines[engine_name].display_error_messages:

self.unresponsive_engines.add(UnresponsiveEngine(engine_name, error_type, suspended))

def add_timing(self, engine_name: str, engine_time: float, page_load_time: float):

with self._lock:

if self._closed:

logger.error("call to ResultContainer.add_timing after ResultContainer.close")

return

self.timings.append(Timing(engine_name, total=engine_time, load=page_load_time))

def get_timings(self):

with self._lock:

if not self._closed:

logger.error("call to ResultContainer.get_timings before ResultContainer.close")

return []

return self.timings

  

📝 📜 ⏱️ ⬆️