Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[net]: Proxy Request Redundancy #3491

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[network]: Add redundant parallel proxy requests.
Anecdotally, using SearX over unreliable proxies,
like tor, seems to be quite error prone.
SearX puts quite an effort to measure the
performance and reliability of engines, most
likely owning to those aspects being of
significant concern.

The patch here proposes to mitigate related
problems, by issuing concurrent redundant requests
through the specified proxies at once, returning
the first response that is not an error.
The functionality is enabled using the:
`proxy_request_redundancy` parameter within the
outgoing network settings or the engine settings.

Example:

```yaml

outgoing:
    request_timeout: 8.0
    proxies:
        "all://":
            - socks5h://tor:9050
            - socks5h://tor1:9050
            - socks5h://tor2:9050
            - socks5h://tor3:9050
    proxy_request_redundancy: 4
```

In this example, each network request will be
send 4 times, once through every proxy. The
first (non-error) response wins.

In my testing environment using several tor proxy
end-points, this approach almost entirely
removes engine errors related to timeouts
and denied requests. The latency of the
network system is also improved.

The implementation, uses a
`AsyncParallelTransport(httpx.AsyncBaseTransport)`
wrapper to wrap multiple sub-trasports,
and `asyncio.wait` to wait on the first completed
request.

The existing implementation of the network
proxy cycling has also been moved into the
`AsyncParallelTransport` class, which should
improve network client memoization and
performance.

TESTED:
- unit tests for the new functions and classes.
- tested on desktop PC with 10+ upstream proxies
    and comparable request redundancy.
  • Loading branch information
czaky committed May 17, 2024
commit 122a9568de6a07ab77fb97025734033499fc5c9f
1 change: 1 addition & 0 deletions AUTHORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,4 @@ features or generally made searx better:
- Austin Olacsi `<https://github.com/Austin-Olacsi>`
- @micsthepick
- Daniel Kukula `<https://github.com/dkuku>`
- @czaky `<https://github.com/czaky>`
5 changes: 4 additions & 1 deletion docs/admin/settings/settings_engine.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ engine is shown. Most of the options have a default value or even are optional.
max_keepalive_connections: 10
keepalive_expiry: 5.0
using_tor_proxy: false
proxy_request_redundancy: 1
proxies:
http:
- http://proxy1:8080
Expand Down Expand Up @@ -154,6 +155,9 @@ engine is shown. Most of the options have a default value or even are optional.
``proxies`` :
Overwrites proxy settings from :ref:`settings outgoing`.

``proxy_request_redundancy`` :
Overwrites proxy settings from :ref:`settings outgoing`.

``using_tor_proxy`` :
Using tor proxy (``true``) or not (``false``) for this engine. The default is
taken from ``using_tor_proxy`` of the :ref:`settings outgoing`.
Expand Down Expand Up @@ -241,4 +245,3 @@ Example configuration in settings.yml for a German and English speaker:

When searching, the default google engine will return German results and
"google english" will return English results.

15 changes: 11 additions & 4 deletions docs/admin/settings/settings_outgoing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ Communication with search engines.
# and https://www.python-httpx.org/compatibility/#ssl-configuration
# verify: ~/.mitmproxy/mitmproxy-ca-cert.cer
#
# uncomment below section if you want to use a proxyq see: SOCKS proxies
# Uncomment below section if you want to use a proxy. See:
# https://2.python-requests.org/en/latest/user/advanced/#proxies
# are also supported: see
# SOCKS proxies are also supported. See:
# https://2.python-requests.org/en/latest/user/advanced/#socks
#
# proxies:
Expand All @@ -34,6 +34,11 @@ Communication with search engines.
#
# using_tor_proxy: true
#
# Uncomment below if you want to make multiple request in parallel
# through all the proxies at once:
#
# proxy_request_redundancy: 10
#
# Extra seconds to add in order to account for the time taken by the proxy
#
# extra_proxy_timeout: 10.0
Expand Down Expand Up @@ -70,6 +75,10 @@ Communication with search engines.
If there are more than one proxy for one protocol (http, https),
requests to the engines are distributed in a round-robin fashion.

``proxy_request_redundancy`` :
Cycle the proxies (``1``) on by one or use them in parallel (``> 1``) for all engines.
The default is ``1`` and can be overwritten in the :ref:`settings engine`

``source_ips`` :
If you use multiple network interfaces, define from which IP the requests must
be made. Example:
Expand Down Expand Up @@ -106,5 +115,3 @@ Communication with search engines.
``using_tor_proxy`` :
Using tor proxy (``true``) or not (``false``) for all engines. The default is
``false`` and can be overwritten in the :ref:`settings engine`


4 changes: 4 additions & 0 deletions searx/enginelib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ class Engine: # pylint: disable=too-few-public-methods
https: socks5://proxy:port
"""

proxy_request_redundancy: int
"""Cycle proxies one by one (``1``) or
use them in parallel at once (``> 1``) for this engine."""

disabled: bool
"""To disable by default the engine, but not deleting it. It will allow the
user to manually activate it in the settings."""
Expand Down
194 changes: 183 additions & 11 deletions searx/network/client.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# pylint: disable=missing-module-docstring, global-statement

from __future__ import annotations

import asyncio
import contextlib
import logging
import random
from ssl import SSLContext
import threading
from typing import Any, Dict
from typing import Any, Dict, Iterable

import httpx
import httpcore
from httpx_socks import AsyncProxyTransport
from python_socks import parse_proxy_url, ProxyConnectionError, ProxyTimeoutError, ProxyError

Expand Down Expand Up @@ -112,7 +116,8 @@ async def handle_async_request(self, request):
raise httpx.ProxyError("ProxyError: " + e.args[0], request=request) from e


def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries):
def get_socks_transport(verify, http2, local_address, proxy_url, limit, retries):
"""Return an AsyncProxyTransport."""
# support socks5h (requests compatibility):
# https://requests.readthedocs.io/en/master/user/advanced/#socks
# socks5:// hostname is resolved on client side
Expand Down Expand Up @@ -141,7 +146,8 @@ def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit
)


def get_transport(verify, http2, local_address, proxy_url, limit, retries):
def get_http_transport(verify, http2, local_address, proxy_url, limit, retries):
"""Return an AsyncHTTPTransport."""
verify = get_sslcontexts(None, None, verify, True, http2) if verify is True else verify
return httpx.AsyncHTTPTransport(
# pylint: disable=protected-access
Expand All @@ -154,6 +160,166 @@ def get_transport(verify, http2, local_address, proxy_url, limit, retries):
)


def get_single_transport(
limit: httpx.Limits | None = None,
proxy_url: str | None = None,
local_address: str | None = None,
retries: int = 0,
*,
verify: bool = True,
http2: bool = True,
) -> httpx.AsyncBaseTransport:
"""Generate a single, non-parallel transport.

Parameters
----------
limit : httpx.Limits
Limits applied to the to the transport.
proxy_url : str | None, optional
Proxy to use for the transport.
local_address : str | None, optional
local address to specify in the connection.
retries : int, optional
how many times to retry the request, by default 0
verify : bool, optional
Verify the certificates, by default True
http2 : bool, optional
Enable HTTP2 protocol, by default True

Returns
-------
httpx.AsyncBaseTransport
An async transport object.
"""
limit = limit or httpx.Limits()
if proxy_url and proxy_url.startswith(('socks4://', 'socks5://', 'socks5h://')):
return get_socks_transport(verify, http2, local_address, proxy_url, limit, retries)
return get_http_transport(verify, http2, local_address, proxy_url, limit, retries)


class AsyncParallelTransport(httpx.AsyncBaseTransport):
"""Fan out request to multiple base transports."""

def __init__(
self,
transports: Iterable[httpx.AsyncBaseTransport],
proxy_request_redundancy: int,
network_logger: logging.Logger,
) -> None:
"""Init the parallel transport using a list of base `transports`."""
self._transports = list(transports)
if len(self._transports) == 0:
msg = "Got an empty list of (proxy) transports."
raise ValueError(msg)
if proxy_request_redundancy < 1:
logger.warning("Invalid proxy_request_redundancy specified: %d", proxy_request_redundancy)
proxy_request_redundancy = 1
self._proxy_request_redundancy = proxy_request_redundancy
self._index = random.randrange(len(self._transports)) # noqa: S311
self._logger = network_logger or logger

async def handle_async_request(
self,
request: httpx.Request,
) -> httpx.Response:
"""Issue parallel requests to all sub-transports.

Return the response of the first completed.

Parameters
----------
request : httpx.Request
Request to pass to the transports.

Returns
-------
httpx.Response
Response from the first completed request.

"""
response = None # non-error response, taking precedence
error_response = None # any error response
request_error = None # any request related exception
tcount = len(self._transports)
redundancy = self._proxy_request_redundancy
pending = [
asyncio.create_task(self._transports[i % tcount].handle_async_request(request))
for i in range(self._index, self._index + redundancy)
]
self._index = (self._index + redundancy) % tcount
while pending:
if len(pending) == 1:
return await pending.pop()
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for task in done:
try:
result = task.result()
if not result.is_error or result.status_code == 404:
response = result
elif not error_response:
self._logger.warning("Error response: %s for %s", result.status_code, request.url)
error_response = result
except (
httpx.HTTPError,
httpcore.ProtocolError,
httpcore.NetworkError,
httpcore.TimeoutException,
# Low level semaphore errors.
ValueError,
) as e:
if not request_error:
self._logger.warning("Request error: %s for %s", e, request.url)
request_error = e
if response:
break
if pending:
with contextlib.suppress(asyncio.exceptions.CancelledError):
gather = asyncio.gather(*pending)
gather.cancel()
self._logger.debug("Cancelling %d/%d redundant proxy requests.", len(pending), redundancy)
await gather
if response:
return response
if error_response:
return error_response
msg = "No valid response."
if request_error:
raise httpx.RequestError(msg) from request_error
raise httpx.RequestError(msg)

async def aclose(self) -> None:
"""Close all the transports."""
for transport in self._transports:
await transport.aclose()


def get_transport(
proxy_urls: list,
limit: httpx.Limits | None = None,
local_address: str | None = None,
proxy_request_redundancy: int = 1,
retries: int = 0,
network_logger: logging.Logger = logger,
*,
verify: bool = True,
http2: bool = True,
) -> httpx.AsyncBaseTransport:
"""Return a single http/proxy transport or the parallel version of those."""
limit = limit or httpx.Limits()
# pylint: disable=unnecessary-lambda-assignment
transport = lambda proxy_url: get_single_transport(
verify=verify,
http2=http2,
local_address=local_address,
proxy_url=proxy_url,
limit=limit,
retries=retries,
)
if len(proxy_urls or []) <= 1:
return transport(proxy_urls[0] if proxy_urls else None)
return AsyncParallelTransport(map(transport, proxy_urls), proxy_request_redundancy, network_logger)


def new_client(
# pylint: disable=too-many-arguments
enable_http,
Expand All @@ -163,10 +329,12 @@ def new_client(
max_keepalive_connections,
keepalive_expiry,
proxies,
proxy_request_redundancy,
local_address,
retries,
max_redirects,
hook_log_response,
network_logger,
):
limit = httpx.Limits(
max_connections=max_connections,
Expand All @@ -175,20 +343,24 @@ def new_client(
)
# See https://www.python-httpx.org/advanced/#routing
mounts = {}
for pattern, proxy_url in proxies.items():
for pattern, proxy_urls in proxies.items():
if not enable_http and pattern.startswith('http://'):
continue
if proxy_url.startswith('socks4://') or proxy_url.startswith('socks5://') or proxy_url.startswith('socks5h://'):
mounts[pattern] = get_transport_for_socks_proxy(
verify, enable_http2, local_address, proxy_url, limit, retries
)
else:
mounts[pattern] = get_transport(verify, enable_http2, local_address, proxy_url, limit, retries)
mounts[pattern] = get_transport(
verify=verify,
http2=enable_http2,
local_address=local_address,
proxy_urls=proxy_urls,
proxy_request_redundancy=proxy_request_redundancy,
limit=limit,
retries=retries,
network_logger=network_logger,
)

if not enable_http:
mounts['http://'] = AsyncHTTPTransportNoHttp()

transport = get_transport(verify, enable_http2, local_address, None, limit, retries)
transport = get_http_transport(verify, enable_http2, local_address, None, limit, retries)

event_hooks = None
if hook_log_response:
Expand Down
Loading
Loading