Source code for pupeyes.data.tobii_titta

# -*- coding:utf-8 -*-

"""
Tobii Data Parsing Module (from Titta)

This module is designed for parsing Tobii data saved from a Titta experiment (hdf5 format). It provides functionalities to parse messages and raw gaze samples. 
However, it does not support parsing fixations, saccades, and blinks, as these are not saved by Titta.

For more info on the Titta package, see https://github.com/marcus-nystrom/Titta
"""

import pandas as pd
import numpy as np
import h5py

[docs] class TobiiTittaReader: """ A class to read and parse Tobii data saved from Titta (hdf5 format). This class handles loading and parsing of Tobii data files, providing methods to extract messages and gaze samples. However, it does not support parsing fixations, saccades, and blinks, as these are not saved by Titta. Most functions here are wrappers for existing functionalities in the Titta package. Parameters ---------- path : str Path to the Tobii hdf5 data file start_msg : str Common part of message marking the start of a trial. For example, if your trial start messages are 'TRIAL_START 1 1', 'TRIAL_START 1 2', etc., then start_msg would be 'TRIAL_START' stop_msg : str Common part of message marking the end of a trial. For example, if your trial end messages are 'TRIAL_END 1 1', 'TRIAL_END 1 2', etc., then stop_msg would be 'TRIAL_END' msg_format : dict Dictionary specifying the format of messages. The messages will be parsed based on this format. Example: {'marker': str, 'event': str, 'block': int, 'trial': int} delimiter : str Character used to separate message components. For example, if messages are formatted as 'TRIAL_END 1 1', the delimiter would be ' '. add_cols : dict, optional Additional columns to add to output DataFrames. The dictionary should be in the format {'column_name': column_data}. For example, to add a column 'subject' with value 'S01' to all rows, use {'subject': 'S01'}. progress_bar : bool, optional If True, shows a progress bar while reading the data file. Default is True. Attributes ---------- calibration_history : pd.DataFrame Raw calibration history as saved by Titta external_signal : pd.DataFrame Raw external signal as saved by Titta gaze : pd.DataFrame Raw gaze data as saved by Titta log : pd.DataFrame Raw log data as saved by Titta msg : pd.DataFrame Raw message data as saved by Titta notification : pd.DataFrame Raw notification data as saved by Titta time_sync : pd.DataFrame Raw time sync data as saved by Titta Examples -------- >>> reader = TobiiTittaReader( ... path='subject01.h5', ... start_msg='TRIAL_START', ... stop_msg='TRIAL_END', ... msg_format={'marker': str, 'event': str, 'block': int, 'trial': int}, ... delimiter=' ' ... ) """ def __init__(self, path, start_msg, stop_msg, msg_format, delimiter, add_cols=None): """ Initialize TobiiTittaReader for processing eye tracking data. Parameters ---------- path : str Path to the Tobii hdf5 data file start_msg : str Common part of message marking the start of a trial. For example: 'TRIAL_START' stop_msg : str Common part of message marking the end of a trial. For example: 'TRIAL_END' msg_format : dict Dictionary specifying the format of messages. The messages will be parsed based on this format. Example: {'marker': str, 'event': str, 'block': int, 'trial': int} delimiter : str Character used to separate message components. For example: ' ' add_cols : dict, optional Additional columns to add to output DataFrames. The dictionary should be in the format {'column_name': column_data}. For example, to add a column 'subject' with value 'S01' to all rows, use {'subject': 'S01'}. """ self.path = path self.start_msg = start_msg self.stop_msg = stop_msg self.msg_format = msg_format self.delimiter = delimiter self.add_cols = add_cols # read data from hdf5 file self.calibration_history = None self.external_signal = None self.gaze = None self.log = None self.msg = None self.notification = None self.time_sync = None # Read data from hdf5 file with h5py.File(path, 'r') as h5_file: dataset_names = h5_file.keys() if 'calibration_history' in dataset_names: self.calibration_history = pd.read_hdf(path, 'calibration_history') if 'external_signal' in dataset_names: self.external_signal = pd.read_hdf(path,'external_signal') if 'gaze' in dataset_names: self.gaze = pd.read_hdf(path,'gaze') if 'log' in dataset_names: self.log = pd.read_hdf(path,'log') if 'msg' in dataset_names: self.msg = pd.read_hdf(path,'msg') if 'notification' in dataset_names: self.notification = pd.read_hdf(path,'notification') if 'time_sync' in dataset_names: self.time_sync = pd.read_hdf(path,'time_sync')
[docs] def get_messages(self): """ Extract and process marker events from the Titta dataset. This method extracts all message events from the data and parses them according to the specified message format and delimiter. Returns ------- pd.DataFrame DataFrame containing processed message data with columns: - id : int Trial identifier - system_time_stamp : float System timestamps - msg : str Raw message string - Additional columns to store parsed message parts based on msg_format specification. - Additional columns from self.add_cols are added if specified. Notes ----- Messages are split using the specified delimiter and parsed according to the data types specified in msg_format. """ # filter messages for start and stop messages messages = self.msg[(self.msg.msg.str.contains(self.start_msg)) | (self.msg.msg.str.contains(self.stop_msg))].reset_index(drop=True) # whenever a new start_msg is found, adds 1 to trial id messages['id'] = np.cumsum(messages.msg.str.contains(self.start_msg))-1 # reorder columns messages = messages[['id', 'system_time_stamp','msg']] # parse message parts for m, col in enumerate(self.msg_format.keys()): messages[col] = messages.msg.str.split(pat=self.delimiter, expand=True)[m].astype(self.msg_format[col]) # Add any additional columns if self.add_cols: messages = messages.assign(**(self.add_cols)) # sort by system_time_stamp messages = messages.sort_values('system_time_stamp') return messages
[docs] def get_samples(self, parse_messages=True): """ Extract gaze samples for each trial based on start and stop messages. Parameters ---------- parse_messages : bool, optional If True, parse message columns and add them to samples. If False, only add raw message. Default is True. Returns ------- pd.DataFrame DataFrame containing processed sample data. Columns include all columns in self.gaze, as well as: - trialtime : float Trial timestamps in milliseconds (since the start of each trial) - msgtime : float Message timestamps in system timestamps (start time of each trial) - msg : str Raw message strings (start message of each trial) - Additional columns from message parsing if parse_messages=True. - Additional columns from self.add_cols if specified. Notes ----- Columns are converted to the appropriate data type that supports pd.NA. Only trials with both start and stop messages are included in the output. """ # get trial boundaries (start/stop times for each trial) messages = self.get_messages() if messages.empty: return pd.DataFrame() # extract start and stop messages separately start_msgs = messages[messages.msg.str.contains(self.start_msg)] stop_msgs = messages[messages.msg.str.contains(self.stop_msg)] # create trial boundaries DataFrame # essentially creating a wide format with each row being a trial and the start and stop times trial_boundaries = pd.merge( start_msgs[['id', 'system_time_stamp']].rename(columns={'system_time_stamp': 'start_time'}), stop_msgs[['id', 'system_time_stamp']].rename(columns={'system_time_stamp': 'stop_time'}), on='id', how='inner' ) if trial_boundaries.empty: return pd.DataFrame() # create a mapping from timestamp to trial_id for all gaze samples trial_assignments = [] for _, trial in trial_boundaries.iterrows(): # find all gaze samples in this trial's time window trial_mask = (self.gaze.system_time_stamp >= trial.start_time) & (self.gaze.system_time_stamp < trial.stop_time) if trial_mask.any(): trial_assignments.append({ 'trial_id': trial.id, 'start_time': trial.start_time, 'indices': self.gaze[trial_mask].index }) if not trial_assignments: return pd.DataFrame() # extract gaze samples and message times all_indices = [] trial_info = [] msg_times = [] for assignment in trial_assignments: indices = assignment['indices'] all_indices.extend(indices) trial_info.extend([assignment['trial_id']] * len(indices)) msg_times.extend([assignment['start_time']] * len(indices)) samples = self.gaze.loc[all_indices].copy() samples['trial_id'] = trial_info samples['msgtime'] = msg_times # normalize timestamps samples['trialtime'] = ((samples.system_time_stamp - samples.msgtime)/1000).astype(int) # convert to milliseconds # add message information if parse_messages: # get start messages for each trial start_msg_data = start_msgs.set_index('id') message_cols = [col for col in start_msg_data.columns if col not in ['system_time_stamp']] for col in message_cols: samples[col] = samples.trial_id.map(start_msg_data[col]) else: # add raw message start_msg_data = start_msgs.set_index('id') samples['msg'] = samples.trial_id.map(start_msg_data['msg']) # add any additional columns if self.add_cols: samples = samples.assign(**self.add_cols) # remove the trial_id column from final output samples = samples.drop(columns=['trial_id']) # reset index samples = samples.reset_index(drop=True) return samples.convert_dtypes(convert_string=True, convert_integer=True, convert_boolean=True, convert_floating=True)