Skip to content

Commit

Permalink
Merge pull request #73 from agencyenterprise/push_chunk-timestamp-vector
Browse files Browse the repository at this point in the history
Allow `timestamp` list/vector in `StreamOutlet.push_chunk`
  • Loading branch information
cboulay authored Oct 25, 2023
2 parents 13853cf + d3b89c1 commit 6b11606
Showing 1 changed file with 56 additions and 15 deletions.
71 changes: 56 additions & 15 deletions pylsl/pylsl.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ def __init__(self, info, chunk_size=0, max_buffered=360):
self.channel_count = info.channel_count()
self.do_push_sample = fmt2push_sample[self.channel_format]
self.do_push_chunk = fmt2push_chunk[self.channel_format]
self.do_push_chunk_n = fmt2push_chunk_n[self.channel_format]
self.value_type = fmt2type[self.channel_format]
self.sample_type = self.value_type * self.channel_count

Expand Down Expand Up @@ -458,43 +459,75 @@ def push_sample(self, x, timestamp=0.0, pushthrough=True):
def push_chunk(self, x, timestamp=0.0, pushthrough=True):
"""Push a list of samples into the outlet.
samples -- A list of samples, either as a list of lists or a list of
samples -- A list of samples, preferably as a 2-D numpy array.
`samples` can also be a list of lists, or a list of
multiplexed values.
timestamp -- Optionally the capture time of the most recent sample, in
agreement with local_clock(); if omitted, the current
timestamp -- Optional, float or 1-D list of floats.
If float: the capture time of the most recent sample, in
agreement with local_clock(); if omitted/default (0.0), the current
time is used. The time stamps of other samples are
automatically derived according to the sampling rate of
the stream. (default 0.0)
the stream.
If list of floats: the time stamps for each sample.
Must be the same length as `samples`.
pushthrough Whether to push the chunk through to the receivers instead
of buffering it with subsequent samples. Note that the
chunk_size, if specified at outlet construction, takes
precedence over the pushthrough flag. (default True)
Note: performance is optimized for the following argument types:
- `samples`: 2-D numpy array
- `timestamp`: float
"""
# Convert timestamp to corresponding ctype
try:
timestamp_c = c_double(timestamp)
# Select the corresponding push_chunk method
liblsl_push_chunk_func = self.do_push_chunk
except TypeError:
try:
timestamp_c = (c_double * len(timestamp))(*timestamp)
liblsl_push_chunk_func = self.do_push_chunk_n
except TypeError:
raise TypeError(
"timestamp must be a float or an iterable of floats"
)

try:
n_values = self.channel_count * len(x)
data_buff = (self.value_type * n_values).from_buffer(x)
handle_error(self.do_push_chunk(self.obj, data_buff,
c_long(n_values),
c_double(timestamp),
c_int(pushthrough)))
handle_error(
liblsl_push_chunk_func(
self.obj, data_buff,
c_long(n_values),
timestamp_c,
c_int(pushthrough)
)
)
except TypeError:
# don't send empty chunks
if len(x):
if type(x[0]) is list:
x = [v for sample in x for v in sample]
if self.channel_format == cf_string:
x = [v.encode('utf-8') for v in x]
if len(x) % self.channel_count == 0:
# x is a flattened list of multiplexed values
constructor = self.value_type * len(x)
# noinspection PyCallingNonCallable
handle_error(self.do_push_chunk(self.obj, constructor(*x),
c_long(len(x)),
c_double(timestamp),
c_int(pushthrough)))
handle_error(
liblsl_push_chunk_func(
self.obj, constructor(*x),
c_long(len(x)),
timestamp_c,
c_int(pushthrough)
)
)
else:
raise ValueError("Each sample must have the same number of channels ("
+ str(self.channel_count) + ").")


def have_consumers(self):
"""Check whether consumers are currently registered.
Expand Down Expand Up @@ -1401,11 +1434,15 @@ def find_liblsl_libraries(verbose=False):
push_sample_int64 = lib.lsl_push_sample_ltp
pull_sample_int64 = lib.lsl_pull_sample_l
push_chunk_int64 = lib.lsl_push_chunk_ltp
push_chunk_int64_n = lib.lsl_push_chunk_ltnp
pull_chunk_int64 = lib.lsl_pull_chunk_l
else:
def push_sample_int64(*_):
raise NotImplementedError('int64 support isn\'t enabled on your platform')
pull_sample_int64 = push_chunk_int64 = pull_chunk_int64 = push_sample_int64
pull_sample_int64 = push_sample_int64
push_chunk_int64 = push_sample_int64
push_chunk_int64_n = push_sample_int64
pull_chunk_int64 = push_sample_int64

# set up some type maps
string2fmt = {'float32': cf_float32, 'double64': cf_double64,
Expand All @@ -1425,10 +1462,14 @@ def push_sample_int64(*_):
fmt2push_chunk = [[], lib.lsl_push_chunk_ftp, lib.lsl_push_chunk_dtp,
lib.lsl_push_chunk_strtp, lib.lsl_push_chunk_itp,
lib.lsl_push_chunk_stp, lib.lsl_push_chunk_ctp, push_chunk_int64]
fmt2push_chunk_n = [[], lib.lsl_push_chunk_ftnp, lib.lsl_push_chunk_dtnp,
lib.lsl_push_chunk_strtnp, lib.lsl_push_chunk_itnp,
lib.lsl_push_chunk_stnp, lib.lsl_push_chunk_ctnp, push_chunk_int64_n]
fmt2pull_chunk = [[], lib.lsl_pull_chunk_f, lib.lsl_pull_chunk_d,
lib.lsl_pull_chunk_str, lib.lsl_pull_chunk_i,
lib.lsl_pull_chunk_s, lib.lsl_pull_chunk_c, pull_chunk_int64]
except:
# if not available
fmt2push_chunk = [None, None, None, None, None, None, None, None]
fmt2pull_chunk = [None, None, None, None, None, None, None, None]
fmt2push_chunk = [None] * len(fmt2string)
fmt2push_chunk_n = [None] * len(fmt2string)
fmt2pull_chunk = [None] * len(fmt2string)

0 comments on commit 6b11606

Please sign in to comment.