class Search:
"""Search information container"""
__slots__ = "search_query", "result_container", "start_time", "actual_timeout"
def search_multiple_requests(self, requests):
# pylint: disable=protected-access
search_id = str(uuid4())
for engine_name, query, request_params in requests:
_search = copy_current_request_context(PROCESSORS[engine_name].search)
th = threading.Thread( # pylint: disable=invalid-name
target=_search,
args=(query, request_params, self.result_container, self.start_time, self.actual_timeout),
name=search_id,
)
th._timeout = False
th._engine_name = engine_name
th.start()
for th in threading.enumerate(): # pylint: disable=invalid-name
if th.name == search_id:
remaining_time = max(0.0, self.actual_timeout - (default_timer() - self.start_time))
th.join(remaining_time)
if th.is_alive():
th._timeout = True
self.result_container.add_unresponsive_engine(th._engine_name, 'timeout')
PROCESSORS[th._engine_name].logger.error('engine timeout')
def search_multiple_requests2(self, requests):
# pylint: disable=protected-access
search_id = str(uuid4())
mock_result_container = ResultContainer()
mock_results = [{'url': f'Mock Result {i}', 'content': ''} for i in range(1, 6)]
threads = []
for engine_name, _, _ in requests:
th = threading.Thread(
target=self.mock_search_function,
args=(engine_name, mock_results, mock_result_container),
name=search_id,
)
th._timeout = False
th._engine_name = engine_name
th.start()
threads.append(th)
remaining_time = None
for th in threads:
if th.name == search_id:
if remaining_time is None:
remaining_time = self.actual_timeout - (default_timer() - self.start_time)
th.join(remaining_time)
if th.is_alive():
th._timeout = True
self.result_container.add_unresponsive_engine(th._engine_name, 'timeout')
PROCESSORS[th._engine_name].logger.error('engine timeout')
# Wait for all threads to finish, even if some have timed out
for th in threads:
th.join()
# Copy the mock results to the actual result_container
self.result_container = mock_result_container
def mock_search_function(self, engine_name, mock_results, result_container):
# This is a mock search function
time.sleep(0.1) # Simulate some processing time
result_container.extend(engine_name, mock_results)
def search_standard(self):
"""
Update self.result_container, self.actual_timeout
"""
requests, self.actual_timeout = self._get_requests()
cache_dir = 'cache'
query_file_path = os.path.join(cache_dir, self.search_query.query)
# send all search-request
if requests:
# Check if the file exists in the cache directory
if os.path.isfile(query_file_path):
self.search_multiple_requests2(requests)
else:
self.search_multiple_requests(requests)
# return results, suggestions, answers and infoboxes
return True