-
Notifications
You must be signed in to change notification settings - Fork 327
/
query.py
201 lines (177 loc) · 7.8 KB
/
query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import annotations
from copy import deepcopy
from typing import Any, Callable, Dict, List, Mapping, TypeVar
import torch
import torch.nn as nn
from tensordict import NestedKey, TensorDictBase
from tensordict.nn.common import TensorDictModuleBase
from torchrl._utils import logger as torchrl_logger
from torchrl.data.map.hash import SipHash
K = TypeVar("K")
V = TypeVar("V")
class HashToInt(nn.Module):
"""Converts a hash value to an integer that can be used for indexing a contiguous storage."""
def __init__(self):
super().__init__()
self._index_to_index = {}
def __call__(self, key: torch.Tensor, extend: bool = False) -> torch.Tensor:
result = []
if extend:
for _item in key.tolist():
result.append(
self._index_to_index.setdefault(_item, len(self._index_to_index))
)
else:
for _item in key.tolist():
result.append(
self._index_to_index.get(_item, len(self._index_to_index))
)
return torch.tensor(result, device=key.device, dtype=key.dtype)
def state_dict(self) -> Dict[str, torch.Tensor]:
values = torch.tensor(self._index_to_index.values())
keys = torch.tensor(self._index_to_index.keys())
return {"keys": keys, "values": values}
def load_state_dict(
self, state_dict: Mapping[str, Any], strict: bool = True, assign: bool = False
):
keys = state_dict["keys"]
values = state_dict["values"]
self._index_to_index = {
key: val for key, val in zip(keys.tolist(), values.tolist())
}
class QueryModule(TensorDictModuleBase):
"""A Module to generate compatible indices for storage.
A module that queries a storage and return required index of that storage.
Currently, it only outputs integer indices (torch.int64).
Args:
in_keys (list of NestedKeys): keys of the input tensordict that
will be used to generate the hash value.
index_key (NestedKey): the output key where the index value will be written.
Defaults to ``"_index"``.
Keyword Args:
hash_key (NestedKey): the output key where the hash value will be written.
Defaults to ``"_hash"``.
hash_module (Callable[[Any], int] or a list of these, optional): a hash
module similar to :class:`~tensordict.nn.SipHash` (default).
If a list of callables is provided, its length must equate the number of in_keys.
hash_to_int (Callable[[int], int], optional): a stateful function that
maps a hash value to a non-negative integer corresponding to an index in a
storage. Defaults to :class:`~torchrl.data.map.HashToInt`.
aggregator (Callable[[int], int], optional): a hash function to group multiple hashes
together. This argument should only be passed when there is more than one ``in_keys``.
If a single ``hash_module`` is provided but no aggregator is passed, it will take
the value of the hash_module. If no ``hash_module`` or a list of ``hash_modules`` is
provided but no aggregator is passed, it will default to ``SipHash``.
clone (bool, optional): if ``True``, a shallow clone of the input TensorDict will be
returned. This can be used to retrieve the integer index within the storage,
corresponding to a given input tensordict. This can be overridden at runtime by
providing the ``clone`` argument to the forward method.
Defaults to ``False``.
d
Examples:
>>> query_module = QueryModule(
... in_keys=["key1", "key2"],
... index_key="index",
... hash_module=SipHash(),
... )
>>> query = TensorDict(
... {
... "key1": torch.Tensor([[1], [1], [1], [2]]),
... "key2": torch.Tensor([[3], [3], [2], [3]]),
... "other": torch.randn(4),
... },
... batch_size=(4,),
... )
>>> res = query_module(query)
>>> # The first two pairs of key1 and key2 match
>>> assert res["index"][0] == res["index"][1]
>>> # The last three pairs of key1 and key2 have at least one mismatching value
>>> assert res["index"][1] != res["index"][2]
>>> assert res["index"][2] != res["index"][3]
"""
def __init__(
self,
in_keys: List[NestedKey],
index_key: NestedKey = "_index",
hash_key: NestedKey = "_hash",
*,
hash_module: Callable[[Any], int] | List[Callable[[Any], int]] | None = None,
hash_to_int: Callable[[int], int] | None = None,
aggregator: Callable[[Any], int] = None,
clone: bool = False,
):
if len(in_keys) == 0:
raise ValueError("`in_keys` cannot be empty.")
in_keys = in_keys if isinstance(in_keys, List) else [in_keys]
super().__init__()
in_keys = self.in_keys = in_keys
self.out_keys = [index_key, hash_key]
index_key = self.out_keys[0]
self.hash_key = self.out_keys[1]
if aggregator is not None and len(self.in_keys) == 1:
torchrl_logger.warn(
"An aggregator was provided but there is only one in-key to be read. "
"This module will be ignored."
)
elif aggregator is None:
if hash_module is not None and not isinstance(hash_module, list):
aggregator = hash_module
else:
aggregator = SipHash()
if hash_module is None:
hash_module = [SipHash() for _ in range(len(self.in_keys))]
elif not isinstance(hash_module, list):
try:
hash_module = [
deepcopy(hash_module) if len(self.in_keys) > 1 else hash_module
for _ in range(len(self.in_keys))
]
except Exception as err:
raise RuntimeError(
"failed to deepcopy the hash module. Please provide a list of hash modules instead."
) from err
elif len(hash_module) != len(self.in_keys):
raise ValueError(
"The number of hash_modules must match the number of in_keys. "
f"Got {len(hash_module)} hash modules but {len(in_keys)} in_keys."
)
if hash_to_int is None:
hash_to_int = HashToInt()
self.aggregator = aggregator
self.hash_module = dict(zip(self.in_keys, hash_module))
self.hash_to_int = hash_to_int
self.index_key = index_key
self.clone = clone
def forward(
self,
tensordict: TensorDictBase,
*,
extend: bool = True,
write_hash: bool = True,
clone: bool | None = None,
) -> TensorDictBase:
hash_values = []
for k in self.in_keys:
hash_values.append(self.hash_module[k](tensordict.get(k)))
if len(self.in_keys) > 1:
hash_values = torch.stack(
hash_values,
dim=-1,
)
hash_values = self.aggregator(hash_values)
else:
hash_values = hash_values[0]
td_hash_value = self.hash_to_int(hash_values, extend=extend)
clone = clone if clone is not None else self.clone
if clone:
output = tensordict.copy()
else:
output = tensordict
output.set(self.index_key, td_hash_value)
if write_hash:
output.set(self.hash_key, hash_values)
return output