Skip to content

Commit

Permalink
fix: Fix issues with recent pychromecast / zeroconf (#398)
Browse files Browse the repository at this point in the history
* Update gitignore

* Fix pychromecast breakage

* Update dependencies

* Remove redundant code from get_cast_device

* Refactor away CastDevice

..now that Chromecast object contains CastInfo namedtuple

* Update discovery docstrings

* Misc fixes

* Miscier than misc
  • Loading branch information
theychx authored Jun 21, 2022
1 parent 0856a57 commit 6c55414
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 104 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,7 @@ dmypy.json

# Pyre type checker
.pyre/

.idea/
poetry.lock
.vscode/
42 changes: 19 additions & 23 deletions catt/api.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
from typing import List
from typing import Optional

from pychromecast import Chromecast

from .controllers import CastController
from .controllers import get_app
from .controllers import get_controller
from .discovery import CastDevice
from .discovery import get_cast_device_with_ip
from .discovery import get_cast_device_with_name
from .discovery import get_cast_devices
from .discovery import get_cast_with_ip
from .discovery import get_cast_with_name
from .discovery import get_casts
from .error import APIError
from .error import CastError
from .stream_info import StreamInfo


def discover() -> list:
"""Perform discovery of devices present on local network, and return result."""

return [CattDevice(ip_addr=d.ip) for d in get_cast_devices()]


class CattDevice:
def __init__(self, name: str = "", ip_addr: str = "", lazy: bool = False) -> None:
"""
Expand All @@ -38,8 +34,7 @@ def __init__(self, name: str = "", ip_addr: str = "", lazy: bool = False) -> Non
self.ip_addr = ip_addr
self.uuid = None

self._cast_device = None # type: Optional[CastDevice]
# Type comment for compatibility with Py3.5-.
self._cast = None # type: Optional[Chromecast]
self._cast_controller = None # type: Optional[CastController]
if not lazy:
self._create_cast()
Expand All @@ -48,24 +43,19 @@ def __repr__(self) -> str:
return "<CattDevice: {}>".format(self.name or self.ip_addr)

def _create_cast(self) -> None:
self._cast_device = (
get_cast_device_with_ip(self.ip_addr) if self.ip_addr else get_cast_device_with_name(self.name)
)
if not self._cast_device:
self._cast = get_cast_with_ip(self.ip_addr) if self.ip_addr else get_cast_with_name(self.name)
if not self._cast:
raise CastError("Device could not be found")
self._cast = self._cast_device.cast
self.name = self._cast.name
self.ip_addr = self._cast_device.ip
self.uuid = self._cast.uuid

self._cast.wait()
self.name = self._cast.cast_info.friendly_name
self.ip_addr = self._cast.cast_info.host
self.uuid = self._cast.cast_info.uuid

def _create_controller(self) -> None:
self._cast_controller = get_controller(self._cast, get_app("default"))

@property
def controller(self):
if not self._cast_device:
if not self._cast:
self._create_cast()
if not self._cast_controller:
self._create_controller()
Expand Down Expand Up @@ -169,3 +159,9 @@ def volumedown(self, delta: float) -> None:
"""

self.controller.volumedown(delta)


def discover() -> List[CattDevice]:
"""Perform discovery of devices present on local network, and return result."""

return [CattDevice(ip_addr=c.ip) for c in get_casts()]
17 changes: 8 additions & 9 deletions catt/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from .controllers import setup_cast
from .controllers import StateFileError
from .controllers import StateMode
from .discovery import cast_device_ip_exists
from .discovery import get_cast_devices_info
from .discovery import cast_ip_exists
from .discovery import get_cast_infos
from .error import CastError
from .error import CattUserError
from .error import CliError
Expand Down Expand Up @@ -493,15 +493,15 @@ def info(settings, json_output):
def scan(json_output):
if not json_output:
click.echo("Scanning Chromecasts...")
devices = get_cast_devices_info()
devices = get_cast_infos()

if json_output:
echo_json(devices)
echo_json({d.friendly_name: d._asdict() for d in devices})
else:
if not devices:
raise CastError("No devices found")
for device in devices.keys():
click.echo("{ip} - {device} - {manufacturer} {model_name}".format(device=device, **devices[device]))
for device in devices:
click.echo(f"{device.host} - {device.friendly_name} - {device.manufacturer} {device.model_name}")


@cli.command(short_help="Save the current state of the Chromecast for later use.")
Expand Down Expand Up @@ -613,10 +613,9 @@ def get_device_from_settings(settings):
raise CliError("No device specified (must be explicitly specified with -d option)")
is_ip = is_ipaddress(device_desc)
if is_ip:
found = cast_device_ip_exists(device_desc)
found = cast_ip_exists(device_desc)
else:
devices = get_cast_devices_info()
found = device_desc in devices.keys()
found = device_desc in [d.friendly_name for d in get_cast_infos()]
if not found:
msg = "No device found at {}" if is_ip else 'Specified device "{}" not found'
raise CliError(msg.format(device_desc))
Expand Down
9 changes: 4 additions & 5 deletions catt/controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pychromecast.controllers.dashcast import DashCastController as PyChromecastDashCastController
from pychromecast.controllers.youtube import YouTubeController

from .discovery import get_cast_device
from .discovery import get_cast
from .error import AppSelectionError
from .error import CastError
from .error import ControllerError
Expand Down Expand Up @@ -75,11 +75,10 @@ def get_controller(cast, app, action=None, prep=None) -> "CastController":


def setup_cast(device_desc, video_url=None, controller=None, ytdl_options=None, action=None, prep=None):
cast_device = get_cast_device(device_desc)
cast = cast_device.cast
cast = get_cast(device_desc)
cast_type = cast.cast_type
app_id = cast.app_id
stream = StreamInfo(video_url, device_info=cast_device.info, ytdl_options=ytdl_options) if video_url else None
stream = StreamInfo(video_url, cast_info=cast.cast_info, ytdl_options=ytdl_options) if video_url else None

if controller:
app = get_app(controller, cast_type, strict=True)
Expand Down Expand Up @@ -293,7 +292,7 @@ def update():

@property
def cc_name(self):
return self._cast.device.friendly_name
return self._cast.cast_info.friendly_name

@property
def info(self):
Expand Down
94 changes: 37 additions & 57 deletions catt/discovery.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
Expand All @@ -11,97 +10,79 @@
DEFAULT_PORT = 8009


class CastDevice:
def __init__(self, cast: pychromecast.Chromecast, ip: str, port: int) -> None:
self.cast = cast
self.ip = ip
self.port = port

@property
def info(self):
return {
"ip": self.ip,
"port": self.port,
"manufacturer": self.cast.device.manufacturer,
"model_name": self.cast.model_name,
"uuid": self.cast.uuid,
"cast_type": self.cast.cast_type,
"name": self.cast.name,
}


def get_cast_devices(names: Optional[List[str]] = None) -> List[CastDevice]:
def get_casts(names: Optional[List[str]] = None) -> List[pychromecast.Chromecast]:
"""
Discover all available devices, optionally filtering them with list of specific device names
(which will speedup discovery, as pychromecast does this in a non-blocking manner).
:param names: Optional list of device names.
:type names: List[str]
:returns: List of CastDevice wrapper objects containing cast object and additional ip/port info.
:rtype: List[CastDevice]
:returns: List of Chromecast objects.
:rtype: List[pychromecast.Chromecast]
"""

if names:
cast_infos, browser = pychromecast.discovery.discover_listed_chromecasts(friendly_names=names)
else:
cast_infos, browser = pychromecast.discovery.discover_chromecasts()
browser.stop_discovery()

devices = [
CastDevice(pychromecast.get_chromecast_from_cast_info(c, browser.zc), c.host, c.port) for c in cast_infos
]
devices.sort(key=lambda d: d.cast.name)
return devices
casts = [pychromecast.get_chromecast_from_cast_info(c, browser.zc) for c in cast_infos]

for cast in casts:
cast.wait()

def get_cast_devices_info() -> Dict[str, Dict[str, Union[str, int]]]:
browser.stop_discovery()
casts.sort(key=lambda c: c.cast_info.friendly_name)
return casts


def get_cast_infos() -> List[pychromecast.CastInfo]:
"""
Discover all available devices, and collect info from them.
:returns: Various device info, packed in dict w. device names as keys.
:rtype: Dict
:returns: Various device info, packed in CastInfo namedtuple.
:rtype: pychromecast.CastInfo
"""

devices = get_cast_devices()
return {d.cast.name: d.info for d in devices}
return [c.cast_info for c in get_casts()]


def get_cast_device_with_name(device_name: Union[str, None]) -> Optional[CastDevice]:
def get_cast_with_name(cast_name: Union[str, None]) -> Optional[pychromecast.Chromecast]:
"""
Get specific device if supplied name is not None,
otherwise the device with the name that has the lowest alphabetical value.
:param device_name: Name of device.
:type device_name: str
:returns: CastDevice wrapper object containing cast object and additional ip/port info.
:rtype: CastDevice
:returns: Chromecast object.
:rtype: pychromecast.Chromecast
"""

devices = get_cast_devices([device_name]) if device_name else get_cast_devices()
return devices[0] if devices else None
casts = get_casts([cast_name]) if cast_name else get_casts()
return casts[0] if casts else None


def get_cast_device_with_ip(device_ip: str, port: int = DEFAULT_PORT) -> Optional[CastDevice]:
def get_cast_with_ip(cast_ip: str, port: int = DEFAULT_PORT) -> Optional[pychromecast.Chromecast]:
"""
Get specific device using its ip-address (and optionally port).
:param device_ip: Ip-address of device.
:type device_name: str
:param port: Optional port number of device.
:returns: CastDevice wrapper object containing cast object and additional ip/port info.
:rtype: CastDevice
:returns: Chromecast object.
:rtype: pychromecast.Chromecast
"""

try:
# tries = 1 is necessary in order to stop pychromecast engaging
# in a retry behaviour when ip is correct, but port is wrong.
cast = pychromecast.Chromecast(device_ip, port=port, tries=1)
return CastDevice(cast, device_ip, port)
cast = pychromecast.Chromecast(cast_ip, port=port, tries=1)
return cast
except pychromecast.error.ChromecastConnectionError:
return None


def cast_device_ip_exists(device_ip: str) -> bool:
def cast_ip_exists(cast_ip: str) -> bool:
"""
Get availability of specific device using its ip-address.
Expand All @@ -111,10 +92,10 @@ def cast_device_ip_exists(device_ip: str) -> bool:
:rtype: bool
"""

return bool(get_cast_device_with_ip(device_ip))
return bool(get_cast_with_ip(cast_ip))


def get_cast_device(device_desc: Optional[str] = None) -> CastDevice:
def get_cast(cast_desc: Optional[str] = None) -> pychromecast.Chromecast:
"""
Attempt to connect with requested device (or any device if none has been specified).
Expand All @@ -124,18 +105,17 @@ def get_cast_device(device_desc: Optional[str] = None) -> CastDevice:
:rtype: pychromecast.Chromecast
"""

cast_device = None
cast = None

if device_desc and is_ipaddress(device_desc):
cast_device = get_cast_device_with_ip(device_desc, DEFAULT_PORT)
if not cast_device:
msg = "No device found at {}".format(device_desc)
if cast_desc and is_ipaddress(cast_desc):
cast = get_cast_with_ip(cast_desc, DEFAULT_PORT)
if not cast:
msg = "No device found at {}".format(cast_desc)
raise CastError(msg)
else:
cast_device = get_cast_device_with_name(device_desc)
if not cast_device:
msg = 'Specified device "{}" not found'.format(device_desc) if device_desc else "No devices found"
cast = get_cast_with_name(cast_desc)
if not cast:
msg = 'Specified device "{}" not found'.format(cast_desc) if cast_desc else "No devices found"
raise CastError(msg)

cast_device.cast.wait()
return cast_device
return cast
10 changes: 5 additions & 5 deletions catt/stream_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@


class StreamInfo:
def __init__(self, video_url, device_info=None, ytdl_options=None, throw_ytdl_dl_errs=False):
def __init__(self, video_url, cast_info=None, ytdl_options=None, throw_ytdl_dl_errs=False):
self._throw_ytdl_dl_errs = throw_ytdl_dl_errs
self.local_ip = get_local_ip(device_info["ip"]) if device_info else None
self.port = random.randrange(45000, 47000) if device_info else None
self.local_ip = get_local_ip(cast_info.host) if cast_info else None
self.port = random.randrange(45000, 47000) if cast_info else None

if "://" in video_url:
self._ydl = yt_dlp.YoutubeDL(dict(ytdl_options) if ytdl_options else DEFAULT_YTDL_OPTS)
Expand All @@ -45,8 +45,8 @@ def __init__(self, video_url, device_info=None, ytdl_options=None, throw_ytdl_dl
self._preinfo = self._get_stream_preinfo(self._preinfo["url"])
self.is_local_file = False

model = (device_info["manufacturer"], device_info["model_name"]) if device_info else None
cast_type = device_info["cast_type"] if device_info else None
model = (cast_info.manufacturer, cast_info.model_name) if cast_info else None
cast_type = cast_info.cast_type if cast_info else None
if "format" in self._ydl.params:
# We pop the "format" item, as it will make get_stream_info fail,
# if it holds an invalid value.
Expand Down
6 changes: 1 addition & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,11 @@ catt = "catt.cli:main"
python = ">=3.7"
click = ">=7.1.2"
ifaddr = ">=0.1.7"
pychromecast = "==9.2.0"
pychromecast = ">=12.1.3, <13"
requests = ">=2.23.0"
yt-dlp = ">=2021.12.1"
protobuf = "<4"

# We don't use zeroconf directly, but PyChromecast does, and they aren't great about
# pinning it, so we've seen breakage. We pin it here just to avoid that.
zeroconf = "==0.31.0"

[tool.poetry.dev-dependencies]
coverage = "*"
flake8 = "*"
Expand Down

0 comments on commit 6c55414

Please sign in to comment.