Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parameteres to most Deferred instances. #6414

Merged
merged 1 commit into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions scrapy/commands/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@
@overload
def iterate_spider_output(self, result: _T) -> Iterable[Any]: ...

def iterate_spider_output(self, result: Any) -> Union[Iterable[Any], Deferred]:
def iterate_spider_output(self, result: Any) -> Union[Iterable[Any], Deferred[Any]]:
if inspect.isasyncgen(result):
d = deferred_from_coro(
collect_asyncgen(aiter_errback(result, self.handle_exception))
Expand Down Expand Up @@ -233,7 +233,7 @@
response: Response,
callback: Callable,
cb_kwargs: Optional[Dict[str, Any]] = None,
) -> Deferred:
) -> Deferred[Any]:
cb_kwargs = cb_kwargs or {}
d = maybeDeferred(self.iterate_spider_output, callback(response, **cb_kwargs))
return d
Expand Down Expand Up @@ -345,7 +345,7 @@
def prepare_request(
self, spider: Spider, request: Request, opts: argparse.Namespace
) -> Request:
def callback(response: Response, **cb_kwargs: Any) -> Deferred:
def callback(response: Response, **cb_kwargs: Any) -> Deferred[List[Any]]:

Check warning on line 348 in scrapy/commands/parse.py

View check run for this annotation

Codecov / codecov/patch

scrapy/commands/parse.py#L348

Added line #L348 was not covered by tests
# memorize first request
if not self.first_response:
self.first_response = response
Expand Down
3 changes: 2 additions & 1 deletion scrapy/core/downloader/handlers/http10.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

if TYPE_CHECKING:
from twisted.internet.defer import Deferred
from twisted.internet.interfaces import IConnector

# typing.Self requires Python 3.11
from typing_extensions import Self
Expand Down Expand Up @@ -45,7 +46,7 @@ def download_request(self, request: Request, spider: Spider) -> Deferred[Respons
self._connect(factory)
return factory.deferred

def _connect(self, factory: ScrapyHTTPClientFactory) -> Deferred:
def _connect(self, factory: ScrapyHTTPClientFactory) -> IConnector:
from twisted.internet import reactor

host, port = to_unicode(factory.host), factory.port
Expand Down
26 changes: 15 additions & 11 deletions scrapy/core/http2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from twisted.internet.base import ReactorBase
from twisted.internet.endpoints import HostnameEndpoint

from scrapy.http.request import Request
from scrapy.http import Request, Response
from scrapy.settings import Settings
from scrapy.spiders import Spider

Expand All @@ -39,16 +39,18 @@
self._connections: Dict[ConnectionKeyT, H2ClientProtocol] = {}

# Save all requests that arrive before the connection is established
self._pending_requests: Dict[ConnectionKeyT, Deque[Deferred]] = {}
self._pending_requests: Dict[

Check warning on line 42 in scrapy/core/http2/agent.py

View check run for this annotation

Codecov / codecov/patch

scrapy/core/http2/agent.py#L42

Added line #L42 was not covered by tests
ConnectionKeyT, Deque[Deferred[H2ClientProtocol]]
] = {}

def get_connection(
self, key: ConnectionKeyT, uri: URI, endpoint: HostnameEndpoint
) -> Deferred:
) -> Deferred[H2ClientProtocol]:
if key in self._pending_requests:
# Received a request while connecting to remote
# Create a deferred which will fire with the H2ClientProtocol
# instance
d: Deferred = Deferred()
d: Deferred[H2ClientProtocol] = Deferred()

Check warning on line 53 in scrapy/core/http2/agent.py

View check run for this annotation

Codecov / codecov/patch

scrapy/core/http2/agent.py#L53

Added line #L53 was not covered by tests
self._pending_requests[key].append(d)
return d

Expand All @@ -63,17 +65,17 @@

def _new_connection(
self, key: ConnectionKeyT, uri: URI, endpoint: HostnameEndpoint
) -> Deferred:
) -> Deferred[H2ClientProtocol]:
self._pending_requests[key] = deque()

conn_lost_deferred: Deferred = Deferred()
conn_lost_deferred: Deferred[List[BaseException]] = Deferred()

Check warning on line 71 in scrapy/core/http2/agent.py

View check run for this annotation

Codecov / codecov/patch

scrapy/core/http2/agent.py#L71

Added line #L71 was not covered by tests
conn_lost_deferred.addCallback(self._remove_connection, key)

factory = H2ClientFactory(uri, self.settings, conn_lost_deferred)
conn_d = endpoint.connect(factory)
conn_d.addCallback(self.put_connection, key)

d: Deferred = Deferred()
d: Deferred[H2ClientProtocol] = Deferred()

Check warning on line 78 in scrapy/core/http2/agent.py

View check run for this annotation

Codecov / codecov/patch

scrapy/core/http2/agent.py#L78

Added line #L78 was not covered by tests
self._pending_requests[key].append(d)
return d

Expand Down Expand Up @@ -141,17 +143,19 @@
"""
return uri.scheme, uri.host, uri.port

def request(self, request: Request, spider: Spider) -> Deferred:
def request(self, request: Request, spider: Spider) -> Deferred[Response]:
uri = URI.fromBytes(bytes(request.url, encoding="utf-8"))
try:
endpoint = self.get_endpoint(uri)
except SchemeNotSupported:
return defer.fail(Failure())

key = self.get_key(uri)
d = self._pool.get_connection(key, uri, endpoint)
d.addCallback(lambda conn: conn.request(request, spider))
return d
d: Deferred[H2ClientProtocol] = self._pool.get_connection(key, uri, endpoint)

Check warning on line 154 in scrapy/core/http2/agent.py

View check run for this annotation

Codecov / codecov/patch

scrapy/core/http2/agent.py#L154

Added line #L154 was not covered by tests
d2: Deferred[Response] = d.addCallback(
lambda conn: conn.request(request, spider)
)
return d2

Check warning on line 158 in scrapy/core/http2/agent.py

View check run for this annotation

Codecov / codecov/patch

scrapy/core/http2/agent.py#L158

Added line #L158 was not covered by tests


class ScrapyProxyH2Agent(H2Agent):
Expand Down
18 changes: 12 additions & 6 deletions scrapy/core/http2/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from zope.interface import implementer

from scrapy.core.http2.stream import Stream, StreamCloseReason
from scrapy.http import Request
from scrapy.http import Request, Response

if TYPE_CHECKING:
from ipaddress import IPv4Address, IPv6Address
Expand Down Expand Up @@ -88,7 +88,10 @@
IDLE_TIMEOUT = 240

def __init__(
self, uri: URI, settings: Settings, conn_lost_deferred: Deferred
self,
uri: URI,
settings: Settings,
conn_lost_deferred: Deferred[List[BaseException]],
) -> None:
"""
Arguments:
Expand All @@ -99,7 +102,7 @@
conn_lost_deferred -- Deferred fires with the reason: Failure to notify
that connection was lost
"""
self._conn_lost_deferred = conn_lost_deferred
self._conn_lost_deferred: Deferred[List[BaseException]] = conn_lost_deferred

Check warning on line 105 in scrapy/core/http2/protocol.py

View check run for this annotation

Codecov / codecov/patch

scrapy/core/http2/protocol.py#L105

Added line #L105 was not covered by tests

config = H2Configuration(client_side=True, header_encoding="utf-8")
self.conn = H2Connection(config=config)
Expand Down Expand Up @@ -215,14 +218,14 @@
data = self.conn.data_to_send()
self.transport.write(data)

def request(self, request: Request, spider: Spider) -> Deferred:
def request(self, request: Request, spider: Spider) -> Deferred[Response]:
if not isinstance(request, Request):
raise TypeError(
f"Expected scrapy.http.Request, received {request.__class__.__qualname__}"
)

stream = self._new_stream(request, spider)
d = stream.get_response()
d: Deferred[Response] = stream.get_response()

Check warning on line 228 in scrapy/core/http2/protocol.py

View check run for this annotation

Codecov / codecov/patch

scrapy/core/http2/protocol.py#L228

Added line #L228 was not covered by tests

# Add the stream to the request pool
self._pending_request_stream_pool.append(stream)
Expand Down Expand Up @@ -436,7 +439,10 @@
@implementer(IProtocolNegotiationFactory)
class H2ClientFactory(Factory):
def __init__(
self, uri: URI, settings: Settings, conn_lost_deferred: Deferred
self,
uri: URI,
settings: Settings,
conn_lost_deferred: Deferred[List[BaseException]],
) -> None:
self.uri = uri
self.settings = settings
Expand Down
6 changes: 3 additions & 3 deletions scrapy/core/http2/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from hpack import HeaderTuple

from scrapy.core.http2.protocol import H2ClientProtocol
from scrapy.http import Request
from scrapy.http import Request, Response


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -154,7 +154,7 @@
else:
self.close(StreamCloseReason.CANCELLED)

self._deferred_response: Deferred = Deferred(_cancel)
self._deferred_response: Deferred[Response] = Deferred(_cancel)

Check warning on line 157 in scrapy/core/http2/stream.py

View check run for this annotation

Codecov / codecov/patch

scrapy/core/http2/stream.py#L157

Added line #L157 was not covered by tests

def __repr__(self) -> str:
return f"Stream(id={self.stream_id!r})"
Expand All @@ -180,7 +180,7 @@
and not self.metadata["reached_warnsize"]
)

def get_response(self) -> Deferred:
def get_response(self) -> Deferred[Response]:
"""Simply return a Deferred which fires when response
from the asynchronous request is available
"""
Expand Down
6 changes: 3 additions & 3 deletions scrapy/core/scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Generator,
Iterable,
Iterator,
List,
Optional,
Set,
Tuple,
Expand All @@ -34,7 +35,6 @@
from scrapy.pipelines import ItemPipelineManager
from scrapy.signalmanager import SignalManager
from scrapy.utils.defer import (
DeferredListResultListT,
aiter_errback,
defer_fail,
defer_succeed,
Expand All @@ -54,7 +54,7 @@


_T = TypeVar("_T")
_ParallelResult = DeferredListResultListT[Iterator[Any]]
_ParallelResult = List[Tuple[bool, Iterator[Any]]]

if TYPE_CHECKING:
# parameterized Deferreds require Twisted 21.7.0
Expand Down Expand Up @@ -374,7 +374,7 @@ def _log_download_errors(

def _itemproc_finished(
self, output: Any, item: Any, response: Response, spider: Spider
) -> Deferred:
) -> Deferred[Any]:
"""ItemProcessor finished for the given ``item`` and returned ``output``"""
assert self.slot is not None # typing
self.slot.itemproc_size -= 1
Expand Down
7 changes: 5 additions & 2 deletions scrapy/core/spidermw.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,10 @@ async def _process_callback_output(
recovered = MutableChain()
result = self._evaluate_iterable(response, spider, result, 0, recovered)
result = await maybe_deferred_to_future(
self._process_spider_output(response, spider, result)
cast(
"Deferred[Union[Iterable[_T], AsyncIterable[_T]]]",
self._process_spider_output(response, spider, result),
)
)
if isinstance(result, AsyncIterable):
return MutableAsyncChain(result, recovered)
Expand Down Expand Up @@ -339,7 +342,7 @@ def process_spider_exception(

def process_start_requests(
self, start_requests: Iterable[Request], spider: Spider
) -> Deferred:
) -> Deferred[Iterable[Request]]:
return self._process_chain("process_start_requests", start_requests, spider)

# This method is only needed until _async compatibility methods are removed.
Expand Down
33 changes: 23 additions & 10 deletions scrapy/crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,18 @@
import pprint
import signal
import warnings
from typing import TYPE_CHECKING, Any, Dict, Generator, Optional, Set, Type, Union, cast
from typing import (
TYPE_CHECKING,
Any,
Dict,
Generator,
Optional,
Set,
Type,
TypeVar,
Union,
cast,
)

from twisted.internet.defer import (
Deferred,
Expand Down Expand Up @@ -54,6 +65,8 @@

logger = logging.getLogger(__name__)

_T = TypeVar("_T")


class Crawler:
def __init__(
Expand Down Expand Up @@ -140,7 +153,7 @@ def _apply_settings(self) -> None:
)

@inlineCallbacks
def crawl(self, *args: Any, **kwargs: Any) -> Generator[Deferred, Any, None]:
def crawl(self, *args: Any, **kwargs: Any) -> Generator[Deferred[Any], Any, None]:
if self.crawling:
raise RuntimeError("Crawling already taking place")
if self._started:
Expand Down Expand Up @@ -172,7 +185,7 @@ def _create_engine(self) -> ExecutionEngine:
return ExecutionEngine(self, lambda _: self.stop())

@inlineCallbacks
def stop(self) -> Generator[Deferred, Any, None]:
def stop(self) -> Generator[Deferred[Any], Any, None]:
"""Starts a graceful stop of the crawler and returns a deferred that is
fired when the crawler is stopped."""
if self.crawling:
Expand Down Expand Up @@ -256,15 +269,15 @@ def __init__(self, settings: Union[Dict[str, Any], Settings, None] = None):
self.settings = settings
self.spider_loader = self._get_spider_loader(settings)
self._crawlers: Set[Crawler] = set()
self._active: Set[Deferred] = set()
self._active: Set[Deferred[None]] = set()
self.bootstrap_failed = False

def crawl(
self,
crawler_or_spidercls: Union[Type[Spider], str, Crawler],
*args: Any,
**kwargs: Any,
) -> Deferred:
) -> Deferred[None]:
"""
Run a crawler with the provided arguments.

Expand Down Expand Up @@ -294,12 +307,12 @@ def crawl(
crawler = self.create_crawler(crawler_or_spidercls)
return self._crawl(crawler, *args, **kwargs)

def _crawl(self, crawler: Crawler, *args: Any, **kwargs: Any) -> Deferred:
def _crawl(self, crawler: Crawler, *args: Any, **kwargs: Any) -> Deferred[None]:
self.crawlers.add(crawler)
d = crawler.crawl(*args, **kwargs)
self._active.add(d)

def _done(result: Any) -> Any:
def _done(result: _T) -> _T:
self.crawlers.discard(crawler)
self._active.discard(d)
self.bootstrap_failed |= not getattr(crawler, "spider", None)
Expand Down Expand Up @@ -335,7 +348,7 @@ def _create_crawler(self, spidercls: Union[str, Type[Spider]]) -> Crawler:
# temporary cast until self.spider_loader is typed
return Crawler(cast(Type[Spider], spidercls), self.settings)

def stop(self) -> Deferred:
def stop(self) -> Deferred[Any]:
"""
Stops simultaneously all the crawling jobs taking place.

Expand All @@ -344,7 +357,7 @@ def stop(self) -> Deferred:
return DeferredList([c.stop() for c in list(self.crawlers)])

@inlineCallbacks
def join(self) -> Generator[Deferred, Any, None]:
def join(self) -> Generator[Deferred[Any], Any, None]:
"""
join()

Expand Down Expand Up @@ -460,7 +473,7 @@ def start(
)
reactor.run(installSignalHandlers=install_signal_handlers) # blocking call

def _graceful_stop_reactor(self) -> Deferred:
def _graceful_stop_reactor(self) -> Deferred[Any]:
d = self.stop()
d.addBoth(self._stop_reactor)
return d
Expand Down
Loading
Loading