Skip to content

Commit

Permalink
Add parameteres to most Deferred instances. (#6414)
Browse files Browse the repository at this point in the history
  • Loading branch information
wRAR authored Jun 26, 2024
1 parent d08f559 commit e47110f
Show file tree
Hide file tree
Showing 21 changed files with 190 additions and 128 deletions.
6 changes: 3 additions & 3 deletions scrapy/commands/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def iterate_spider_output(
@overload
def iterate_spider_output(self, result: _T) -> Iterable[Any]: ...

def iterate_spider_output(self, result: Any) -> Union[Iterable[Any], Deferred]:
def iterate_spider_output(self, result: Any) -> Union[Iterable[Any], Deferred[Any]]:
if inspect.isasyncgen(result):
d = deferred_from_coro(
collect_asyncgen(aiter_errback(result, self.handle_exception))
Expand Down Expand Up @@ -233,7 +233,7 @@ def run_callback(
response: Response,
callback: Callable,
cb_kwargs: Optional[Dict[str, Any]] = None,
) -> Deferred:
) -> Deferred[Any]:
cb_kwargs = cb_kwargs or {}
d = maybeDeferred(self.iterate_spider_output, callback(response, **cb_kwargs))
return d
Expand Down Expand Up @@ -345,7 +345,7 @@ def _get_callback(
def prepare_request(
self, spider: Spider, request: Request, opts: argparse.Namespace
) -> Request:
def callback(response: Response, **cb_kwargs: Any) -> Deferred:
def callback(response: Response, **cb_kwargs: Any) -> Deferred[List[Any]]:
# memorize first request
if not self.first_response:
self.first_response = response
Expand Down
3 changes: 2 additions & 1 deletion scrapy/core/downloader/handlers/http10.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

if TYPE_CHECKING:
from twisted.internet.defer import Deferred
from twisted.internet.interfaces import IConnector

# typing.Self requires Python 3.11
from typing_extensions import Self
Expand Down Expand Up @@ -45,7 +46,7 @@ def download_request(self, request: Request, spider: Spider) -> Deferred[Respons
self._connect(factory)
return factory.deferred

def _connect(self, factory: ScrapyHTTPClientFactory) -> Deferred:
def _connect(self, factory: ScrapyHTTPClientFactory) -> IConnector:
from twisted.internet import reactor

host, port = to_unicode(factory.host), factory.port
Expand Down
26 changes: 15 additions & 11 deletions scrapy/core/http2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from twisted.internet.base import ReactorBase
from twisted.internet.endpoints import HostnameEndpoint

from scrapy.http.request import Request
from scrapy.http import Request, Response
from scrapy.settings import Settings
from scrapy.spiders import Spider

Expand All @@ -39,16 +39,18 @@ def __init__(self, reactor: ReactorBase, settings: Settings) -> None:
self._connections: Dict[ConnectionKeyT, H2ClientProtocol] = {}

# Save all requests that arrive before the connection is established
self._pending_requests: Dict[ConnectionKeyT, Deque[Deferred]] = {}
self._pending_requests: Dict[
ConnectionKeyT, Deque[Deferred[H2ClientProtocol]]
] = {}

def get_connection(
self, key: ConnectionKeyT, uri: URI, endpoint: HostnameEndpoint
) -> Deferred:
) -> Deferred[H2ClientProtocol]:
if key in self._pending_requests:
# Received a request while connecting to remote
# Create a deferred which will fire with the H2ClientProtocol
# instance
d: Deferred = Deferred()
d: Deferred[H2ClientProtocol] = Deferred()
self._pending_requests[key].append(d)
return d

Expand All @@ -63,17 +65,17 @@ def get_connection(

def _new_connection(
self, key: ConnectionKeyT, uri: URI, endpoint: HostnameEndpoint
) -> Deferred:
) -> Deferred[H2ClientProtocol]:
self._pending_requests[key] = deque()

conn_lost_deferred: Deferred = Deferred()
conn_lost_deferred: Deferred[List[BaseException]] = Deferred()
conn_lost_deferred.addCallback(self._remove_connection, key)

factory = H2ClientFactory(uri, self.settings, conn_lost_deferred)
conn_d = endpoint.connect(factory)
conn_d.addCallback(self.put_connection, key)

d: Deferred = Deferred()
d: Deferred[H2ClientProtocol] = Deferred()
self._pending_requests[key].append(d)
return d

Expand Down Expand Up @@ -141,17 +143,19 @@ def get_key(self, uri: URI) -> ConnectionKeyT:
"""
return uri.scheme, uri.host, uri.port

def request(self, request: Request, spider: Spider) -> Deferred:
def request(self, request: Request, spider: Spider) -> Deferred[Response]:
uri = URI.fromBytes(bytes(request.url, encoding="utf-8"))
try:
endpoint = self.get_endpoint(uri)
except SchemeNotSupported:
return defer.fail(Failure())

key = self.get_key(uri)
d = self._pool.get_connection(key, uri, endpoint)
d.addCallback(lambda conn: conn.request(request, spider))
return d
d: Deferred[H2ClientProtocol] = self._pool.get_connection(key, uri, endpoint)
d2: Deferred[Response] = d.addCallback(
lambda conn: conn.request(request, spider)
)
return d2


class ScrapyProxyH2Agent(H2Agent):
Expand Down
18 changes: 12 additions & 6 deletions scrapy/core/http2/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from zope.interface import implementer

from scrapy.core.http2.stream import Stream, StreamCloseReason
from scrapy.http import Request
from scrapy.http import Request, Response

if TYPE_CHECKING:
from ipaddress import IPv4Address, IPv6Address
Expand Down Expand Up @@ -88,7 +88,10 @@ class H2ClientProtocol(Protocol, TimeoutMixin):
IDLE_TIMEOUT = 240

def __init__(
self, uri: URI, settings: Settings, conn_lost_deferred: Deferred
self,
uri: URI,
settings: Settings,
conn_lost_deferred: Deferred[List[BaseException]],
) -> None:
"""
Arguments:
Expand All @@ -99,7 +102,7 @@ def __init__(
conn_lost_deferred -- Deferred fires with the reason: Failure to notify
that connection was lost
"""
self._conn_lost_deferred = conn_lost_deferred
self._conn_lost_deferred: Deferred[List[BaseException]] = conn_lost_deferred

config = H2Configuration(client_side=True, header_encoding="utf-8")
self.conn = H2Connection(config=config)
Expand Down Expand Up @@ -215,14 +218,14 @@ def _write_to_transport(self) -> None:
data = self.conn.data_to_send()
self.transport.write(data)

def request(self, request: Request, spider: Spider) -> Deferred:
def request(self, request: Request, spider: Spider) -> Deferred[Response]:
if not isinstance(request, Request):
raise TypeError(
f"Expected scrapy.http.Request, received {request.__class__.__qualname__}"
)

stream = self._new_stream(request, spider)
d = stream.get_response()
d: Deferred[Response] = stream.get_response()

# Add the stream to the request pool
self._pending_request_stream_pool.append(stream)
Expand Down Expand Up @@ -436,7 +439,10 @@ def window_updated(self, event: WindowUpdated) -> None:
@implementer(IProtocolNegotiationFactory)
class H2ClientFactory(Factory):
def __init__(
self, uri: URI, settings: Settings, conn_lost_deferred: Deferred
self,
uri: URI,
settings: Settings,
conn_lost_deferred: Deferred[List[BaseException]],
) -> None:
self.uri = uri
self.settings = settings
Expand Down
6 changes: 3 additions & 3 deletions scrapy/core/http2/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from hpack import HeaderTuple

from scrapy.core.http2.protocol import H2ClientProtocol
from scrapy.http import Request
from scrapy.http import Request, Response


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -154,7 +154,7 @@ def _cancel(_: Any) -> None:
else:
self.close(StreamCloseReason.CANCELLED)

self._deferred_response: Deferred = Deferred(_cancel)
self._deferred_response: Deferred[Response] = Deferred(_cancel)

def __repr__(self) -> str:
return f"Stream(id={self.stream_id!r})"
Expand All @@ -180,7 +180,7 @@ def _log_warnsize(self) -> bool:
and not self.metadata["reached_warnsize"]
)

def get_response(self) -> Deferred:
def get_response(self) -> Deferred[Response]:
"""Simply return a Deferred which fires when response
from the asynchronous request is available
"""
Expand Down
6 changes: 3 additions & 3 deletions scrapy/core/scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Generator,
Iterable,
Iterator,
List,
Optional,
Set,
Tuple,
Expand All @@ -34,7 +35,6 @@
from scrapy.pipelines import ItemPipelineManager
from scrapy.signalmanager import SignalManager
from scrapy.utils.defer import (
DeferredListResultListT,
aiter_errback,
defer_fail,
defer_succeed,
Expand All @@ -54,7 +54,7 @@


_T = TypeVar("_T")
_ParallelResult = DeferredListResultListT[Iterator[Any]]
_ParallelResult = List[Tuple[bool, Iterator[Any]]]

if TYPE_CHECKING:
# parameterized Deferreds require Twisted 21.7.0
Expand Down Expand Up @@ -374,7 +374,7 @@ def _log_download_errors(

def _itemproc_finished(
self, output: Any, item: Any, response: Response, spider: Spider
) -> Deferred:
) -> Deferred[Any]:
"""ItemProcessor finished for the given ``item`` and returned ``output``"""
assert self.slot is not None # typing
self.slot.itemproc_size -= 1
Expand Down
7 changes: 5 additions & 2 deletions scrapy/core/spidermw.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,10 @@ async def _process_callback_output(
recovered = MutableChain()
result = self._evaluate_iterable(response, spider, result, 0, recovered)
result = await maybe_deferred_to_future(
self._process_spider_output(response, spider, result)
cast(
"Deferred[Union[Iterable[_T], AsyncIterable[_T]]]",
self._process_spider_output(response, spider, result),
)
)
if isinstance(result, AsyncIterable):
return MutableAsyncChain(result, recovered)
Expand Down Expand Up @@ -339,7 +342,7 @@ def process_spider_exception(

def process_start_requests(
self, start_requests: Iterable[Request], spider: Spider
) -> Deferred:
) -> Deferred[Iterable[Request]]:
return self._process_chain("process_start_requests", start_requests, spider)

# This method is only needed until _async compatibility methods are removed.
Expand Down
33 changes: 23 additions & 10 deletions scrapy/crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,18 @@
import pprint
import signal
import warnings
from typing import TYPE_CHECKING, Any, Dict, Generator, Optional, Set, Type, Union, cast
from typing import (
TYPE_CHECKING,
Any,
Dict,
Generator,
Optional,
Set,
Type,
TypeVar,
Union,
cast,
)

from twisted.internet.defer import (
Deferred,
Expand Down Expand Up @@ -54,6 +65,8 @@

logger = logging.getLogger(__name__)

_T = TypeVar("_T")


class Crawler:
def __init__(
Expand Down Expand Up @@ -140,7 +153,7 @@ def _apply_settings(self) -> None:
)

@inlineCallbacks
def crawl(self, *args: Any, **kwargs: Any) -> Generator[Deferred, Any, None]:
def crawl(self, *args: Any, **kwargs: Any) -> Generator[Deferred[Any], Any, None]:
if self.crawling:
raise RuntimeError("Crawling already taking place")
if self._started:
Expand Down Expand Up @@ -172,7 +185,7 @@ def _create_engine(self) -> ExecutionEngine:
return ExecutionEngine(self, lambda _: self.stop())

@inlineCallbacks
def stop(self) -> Generator[Deferred, Any, None]:
def stop(self) -> Generator[Deferred[Any], Any, None]:
"""Starts a graceful stop of the crawler and returns a deferred that is
fired when the crawler is stopped."""
if self.crawling:
Expand Down Expand Up @@ -256,15 +269,15 @@ def __init__(self, settings: Union[Dict[str, Any], Settings, None] = None):
self.settings = settings
self.spider_loader = self._get_spider_loader(settings)
self._crawlers: Set[Crawler] = set()
self._active: Set[Deferred] = set()
self._active: Set[Deferred[None]] = set()
self.bootstrap_failed = False

def crawl(
self,
crawler_or_spidercls: Union[Type[Spider], str, Crawler],
*args: Any,
**kwargs: Any,
) -> Deferred:
) -> Deferred[None]:
"""
Run a crawler with the provided arguments.
Expand Down Expand Up @@ -294,12 +307,12 @@ def crawl(
crawler = self.create_crawler(crawler_or_spidercls)
return self._crawl(crawler, *args, **kwargs)

def _crawl(self, crawler: Crawler, *args: Any, **kwargs: Any) -> Deferred:
def _crawl(self, crawler: Crawler, *args: Any, **kwargs: Any) -> Deferred[None]:
self.crawlers.add(crawler)
d = crawler.crawl(*args, **kwargs)
self._active.add(d)

def _done(result: Any) -> Any:
def _done(result: _T) -> _T:
self.crawlers.discard(crawler)
self._active.discard(d)
self.bootstrap_failed |= not getattr(crawler, "spider", None)
Expand Down Expand Up @@ -335,7 +348,7 @@ def _create_crawler(self, spidercls: Union[str, Type[Spider]]) -> Crawler:
# temporary cast until self.spider_loader is typed
return Crawler(cast(Type[Spider], spidercls), self.settings)

def stop(self) -> Deferred:
def stop(self) -> Deferred[Any]:
"""
Stops simultaneously all the crawling jobs taking place.
Expand All @@ -344,7 +357,7 @@ def stop(self) -> Deferred:
return DeferredList([c.stop() for c in list(self.crawlers)])

@inlineCallbacks
def join(self) -> Generator[Deferred, Any, None]:
def join(self) -> Generator[Deferred[Any], Any, None]:
"""
join()
Expand Down Expand Up @@ -460,7 +473,7 @@ def start(
)
reactor.run(installSignalHandlers=install_signal_handlers) # blocking call

def _graceful_stop_reactor(self) -> Deferred:
def _graceful_stop_reactor(self) -> Deferred[Any]:
d = self.stop()
d.addBoth(self._stop_reactor)
return d
Expand Down
Loading

0 comments on commit e47110f

Please sign in to comment.