"""
Interactive Eye Movement Visualization Module using Dash
This module provides an interactive web-based visualization tool for eye movement data,
including scanpath replay, heatmaps, areas of interest, and fixation sequence plots.
"""
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
import warnings
from PIL import Image
import dash
from dash import html, dcc, dash_table
from dash.dependencies import Input, Output, State
import dash_bootstrap_components as dbc
import matplotlib.pyplot as plt
from ..aoi import get_fixation_aoi, compute_aoi_statistics
from ..plot_utils import draw_heatmap
[docs]
class FixationViewer:
"""An interactive web-based visualization tool for eye movement data.
This class provides a Dash-based interface for visualizing eye movement data with
multiple visualization modes (scanpath, heatmap, AOI), interactive controls, and
data export capabilities.
Parameters
----------
data : pandas.DataFrame, optional
Eye movement data with columns for timestamps, coordinates, etc.
screen_dims : tuple, default=(1920, 1080)
Screen dimensions in pixels (width, height)
col_mapping : dict, optional
Column name mapping for required fields:
- trial_id : str or list
Trial identifier column(s). Can be a single column name or a list of column names
that together uniquely identify a trial (e.g., ['subject', 'block', 'trial'])
- timestamp : str
Timestamp column (optional)
- x : str
X coordinate
- y : str
Y coordinate
- duration : str
Fixation duration (optional)
- stimuli : str
Stimuli path/identifier
stimuli_path : str, optional
Base path for stimuli images
animation_speed : int, default=500
Animation playback speed in milliseconds
dot_size : int, default=10
Fixed size for fixation dots
Attributes
----------
data : pandas.DataFrame
The eye movement data being visualized
screen_dims : tuple
The dimensions of the visualization canvas
col_mapping : dict
Mapping of required columns to data columns
aois : dict
Dictionary of Areas of Interest definitions
app : dash.Dash
The Dash application instance
"""
def __init__(self, data=None, screen_dims=(1920, 1080),
col_mapping=None, stimuli_path=None,
animation_speed=500, dot_size=10):
"""Initialize the visualizer.
Parameters
----------
data : pandas.DataFrame, optional
Eye movement data with columns for timestamps, coordinates, etc.
screen_dims : tuple, default=(1920, 1080)
Screen dimensions in pixels (width, height)
col_mapping : dict, optional
Column name mapping for required fields:
- trial_id : str or list
Trial identifier column(s). Can be a single column name or a list of column names
that together uniquely identify a trial (e.g., ['subject', 'block', 'trial'])
- timestamp : str
Timestamp column (optional)
- x : str
X coordinate
- y : str
Y coordinate
- duration : str
Fixation duration (optional)
- stimuli : str
Stimuli path/identifier
stimuli_path : str, optional
Base path for stimuli images
animation_speed : int, default=500
Animation playback speed in milliseconds
dot_size : int, default=50
Fixed size for fixation dots
"""
self.screen_dims = screen_dims
self.stimuli_path = stimuli_path
self.animation_speed = animation_speed
self.dot_size = dot_size
# Default column mapping
self._default_col_mapping = {
'trial_id': 'trial_id', # Can be overridden with a list of columns
'timestamp': None, # Optional timestamp column
'x': 'x',
'y': 'y',
'duration': None, # Optional duration column
'stimuli': 'stimuli'
}
# Update with user provided mapping
self.col_mapping = self._default_col_mapping.copy()
if col_mapping is not None:
self.col_mapping.update(col_mapping)
# Store data if provided and check for missing values
self.data = None
self._stimuli_cache = {} # Cache for loaded stimuli images
self.aois = None # Will store AOI definitions
if data is not None:
self.set_data(data)
# Initialize Dash app
self.app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])
# Create app layout
self.app.layout = self._create_layout()
# Setup callbacks
self._setup_callbacks()
[docs]
def set_data(self, data):
"""Set the eye movement data for visualization."""
self.data = data.copy()
self._validate_data()
if hasattr(self, 'app'):
# Update trial selector options
self.app.layout = self._create_layout()
[docs]
def set_aois(self, aois):
"""Set Areas of Interest (AOIs) for visualization.
Parameters
----------
aois : dict
Can be either:
- A nested dictionary mapping stimulus IDs to AOI definitions.
- A simple dictionary of AOIs that applies to all stimuli.
where each AOI is defined by a list of (x,y) vertex coordinates.
The last point should be the same as the first point to close the polygon.
Returns
-------
None
Examples
--------
>>> # Define AOIs for each stimulus
>>> aois = {
... 'stimulus1': {
... 'aoi1': [(x1,y1), (x2,y2), ..., (x1, y1)],
... 'aoi2': [(x1,y1), (x2,y2), ..., (x1, y1)]
... },
... 'stimulus2': {
... 'aoi1': [(x1,y1), (x2,y2), ..., (x1, y1)],
... 'aoi2': [(x1,y1), (x2,y2), ..., (x1, y1)]
... }
... }
>>> # Define AOIs for all stimuli
>>> aois = {
... 'aoi1': [(x1,y1), (x2,y2), ..., (x1, y1)],
... 'aoi2': [(x1,y1), (x2,y2), ..., (x1, y1)]
... }
"""
if not isinstance(aois, dict):
raise ValueError("AOIs must be provided as a dictionary")
# Check if this is a simple (unnested) AOI dictionary
is_simple = all(isinstance(v, (list, tuple)) for v in aois.values())
if is_simple:
# Validate the simple structure
for aoi_name, vertices in aois.items():
if not all(isinstance(v, (list, tuple)) and len(v) == 2 for v in vertices):
raise ValueError(f"Invalid vertices for AOI {aoi_name}")
# Get unique stimulus IDs from the data if available
stimulus_ids = set()
if hasattr(self, 'data') and self.data is not None and self.col_mapping['stimuli'] in self.data.columns:
stimulus_ids = set(self.data[self.col_mapping['stimuli']].unique())
# If no data is set yet, create a default key
if not stimulus_ids:
stimulus_ids = {'default'}
# Convert to nested structure
self.aois = {stim_id: aois.copy() for stim_id in stimulus_ids}
print('Assuming AOIs are the same for all stimuli.')
else:
# Validate the nested structure
for stim_id, stim_aois in aois.items():
if not isinstance(stim_aois, dict):
raise ValueError(f"AOIs for stimulus {stim_id} must be a dictionary")
for aoi_name, vertices in stim_aois.items():
if not isinstance(vertices, (list, tuple)) or \
not all(isinstance(v, (list, tuple)) and len(v) == 2 for v in vertices):
raise ValueError(f"Invalid vertices for AOI {aoi_name} in stimulus {stim_id}")
self.aois = aois
def _get_stimulus_aois(self, stim_id):
"""Get AOIs for a specific stimulus."""
if not hasattr(self, 'aois') or self.aois is None:
return None
# Try to get stimulus-specific AOIs
aois = self.aois.get(stim_id)
# If not found and we have a 'default' key, use that
if aois is None and 'default' in self.aois:
aois = self.aois['default']
return aois
def _validate_data(self):
"""Validate the input data and check for missing values."""
if self.data is None:
return
# Check for missing values in required columns
required_cols = [self.col_mapping[col] for col in ['x', 'y']]
if isinstance(self.col_mapping['trial_id'], (list, tuple)):
required_cols.extend(self.col_mapping['trial_id'])
else:
required_cols.append(self.col_mapping['trial_id'])
# Only check optional columns if they're specified
if self.col_mapping['duration'] is not None:
required_cols.append(self.col_mapping['duration'])
else:
print('No duration column specified. Fixation duration will not be displayed.')
if self.col_mapping['timestamp'] is not None:
required_cols.append(self.col_mapping['timestamp'])
else:
print('No timestamp column specified.')
missing_mask = self.data[required_cols].isna().any(axis=1)
trials_with_missing = self.data[missing_mask]
if len(trials_with_missing) > 0:
warnings.warn(f"Found {len(trials_with_missing)} rows with missing values")
print("Trials with missing values:")
print(trials_with_missing)
def _get_unique_trials(self):
"""Get all unique trial identifiers from the data."""
if self.data is None:
return []
trial_cols = self.col_mapping['trial_id']
if not isinstance(trial_cols, (list, tuple)):
trial_cols = [trial_cols]
# Get unique combinations of trial identifier columns
unique_trials = self.data[trial_cols].drop_duplicates()
# Convert to list of tuples
if len(trial_cols) == 1:
return [(val,) for val in unique_trials[trial_cols[0]].values]
else:
return [tuple(row) for row in unique_trials.values]
def _format_trial_label(self, trial_values):
"""Format trial identifier for display in plots."""
trial_cols = self.col_mapping['trial_id']
if not isinstance(trial_cols, (list, tuple)):
trial_cols = [trial_cols]
return ' | '.join(f"{col}: {val}" for col, val in zip(trial_cols, trial_values))
def _get_trial_data(self, trial_values):
"""Get data for a specific trial."""
if self.data is None:
raise ValueError("No data has been set")
trial_cols = self.col_mapping['trial_id']
if not isinstance(trial_cols, (list, tuple)):
trial_cols = [trial_cols]
# Create mask for all trial identifier columns
mask = pd.Series(True, index=self.data.index)
for col, val in zip(trial_cols, trial_values):
mask &= (self.data[col] == val)
return self.data[mask].copy()
def _get_stimuli_image(self, stimuli_id):
"""Get the stimuli image for a given identifier."""
if stimuli_id not in self._stimuli_cache:
if self.stimuli_path is None:
warnings.warn("No stimuli path set")
return None
try:
path = f"{self.stimuli_path}/{stimuli_id}"
# Try to verify the image file is valid
with Image.open(path) as img:
try:
img.verify() # Verify it's a valid image
except Exception as e:
warnings.warn(f"Invalid image file {stimuli_id}: {str(e)}")
return None
# If verification passed, load and resize the image
img = Image.open(path)
if img.size != self.screen_dims:
print('Original size:', img.size, 'Resized size:', self.screen_dims)
img = img.resize(self.screen_dims)
self._stimuli_cache[stimuli_id] = img
except (OSError, IOError) as e:
warnings.warn(f"Failed to load stimuli {stimuli_id}: {str(e)}")
return None
except Exception as e:
warnings.warn(f"Unexpected error loading stimuli {stimuli_id}: {str(e)}")
return None
return self._stimuli_cache.get(stimuli_id)
def _format_trial_id(self, trial_id):
"""
Format trial identifier for consistent handling.
Parameters
----------
trial_id : str, int, tuple, list, or dict
Trial identifier. If using composite identifiers, can be:
- tuple/list of values in order of trial_id columns
- dict mapping column names to values
- string representation of a tuple (will be evaluated)
Returns
-------
tuple
Values corresponding to trial_id columns
"""
if isinstance(trial_id, str):
# Convert string representation back to tuple
return eval(trial_id)
trial_cols = self.col_mapping['trial_id']
if not isinstance(trial_cols, (list, tuple)):
trial_cols = [trial_cols]
if isinstance(trial_id, dict):
# Convert dict to tuple in correct order
return tuple(trial_id[col] for col in trial_cols)
elif isinstance(trial_id, (list, tuple)):
if len(trial_id) != len(trial_cols):
raise ValueError(f"Trial ID should have {len(trial_cols)} values")
return tuple(trial_id)
else:
# Single value
if len(trial_cols) > 1:
raise ValueError(f"Trial ID should have {len(trial_cols)} values")
return (trial_id,)
def _create_layout(self):
"""Create the Dash app layout."""
# Calculate aspect ratio and height based on screen dimensions
aspect_ratio = self.screen_dims[1] / self.screen_dims[0]
plot_height = 600 # Base height in pixels
return dbc.Container([
dbc.Row([
dbc.Col([
html.H1("Fixation Viewer", className="text-center mb-4")
])
]),
# Control Panel
dbc.Row([
dbc.Col([
dbc.Card([
dbc.CardHeader("Control Panel", className="p-1 small"),
dbc.CardBody([
# Single row for all controls
dbc.Row([
# Trial Selection
dbc.Col([
html.Label("Trial:", className="mb-0 small"),
dcc.Dropdown(
id='trial-selector',
options=self._get_trial_options(),
value=self._get_default_trial(),
clearable=False,
className="small"
)
], width=5),
# Visualization Type Selection
dbc.Col([
html.Label("View:", className="mb-0 small"),
dcc.Dropdown(
id='viz-selector',
options=[
{'label': 'Scanpath', 'value': 'scanpath'},
{'label': 'Heatmap', 'value': 'heatmap'},
{'label': 'AOI', 'value': 'aoi'}
],
value='scanpath',
clearable=False,
className="small"
)
], width=3),
# Display Options
dbc.Col([
html.Label("Show:", className="mb-0 small"),
dcc.Checklist(
id='display-options',
options=[
{'label': ' BG', 'value': 'background'},
{'label': ' AOI', 'value': 'aois'},
{'label': ' Label', 'value': 'labels'}
],
value=['background', 'aois', 'labels'],
className="d-flex gap-2 small",
inputClassName="me-1"
)
], width=2),
# Export Button
dbc.Col([
dbc.Button(
"Export Trial Data",
id="export-button",
color="primary",
size="sm",
className="mt-3 py-0 px-2"
),
dcc.Download(id="download-data")
], width=2, className="text-end")
])
], className="p-2")
], className="mb-2")
], width=12)
]),
# Main Visualization Area
dbc.Row([
dbc.Col([
dbc.Card([
dbc.CardBody([
# Scanpath Plot
dcc.Graph(
id='scanpath-plot',
config={'displayModeBar': True},
style={'display': 'block', 'height': f'{plot_height}px'}
),
# Heatmap Plot
dcc.Graph(
id='heatmap-plot',
config={'displayModeBar': True},
style={'display': 'none', 'height': f'{plot_height}px'}
),
# AOI Plot
dcc.Graph(
id='aoi-plot',
config={'displayModeBar': True},
style={'display': 'none', 'height': f'{plot_height}px'}
)
], className="p-0") # Remove padding from card body
])
], width=8),
# Statistics Panel
dbc.Col([
dbc.Card([
dbc.CardHeader("Statistics"),
dbc.CardBody([
dash_table.DataTable(
id='stats-table',
columns=[],
data=[],
style_table={'overflowX': 'auto', 'height': f'{plot_height}px'},
style_cell={
'textAlign': 'left',
'padding': '5px',
'whiteSpace': 'normal',
'height': 'auto',
'fontSize': '12px',
'fontFamily': 'Arial'
},
style_header={
'backgroundColor': 'rgb(230, 230, 230)',
'fontWeight': 'bold',
'fontSize': '13px',
'fontFamily': 'Arial'
},
style_data={
'whiteSpace': 'normal',
'height': 'auto'
}
)
], className="p-0") # Remove padding from card body
])
], width=4)
], className="g-0") # Remove gutters between columns
], fluid=True)
def _setup_callbacks(self):
"""Set up all callbacks."""
self._setup_plot_callbacks()
self._setup_export_callbacks()
self._setup_display_callbacks()
def _setup_plot_callbacks(self):
"""Set up callbacks for plot updates."""
# Update scanpath plot
@self.app.callback(
Output('scanpath-plot', 'figure'),
[Input('trial-selector', 'value'),
Input('display-options', 'value')]
)
def update_scanpath(trial_id, display_options):
return self._create_scanpath_figure(trial_id, display_options)
# Update heatmap plot
@self.app.callback(
Output('heatmap-plot', 'figure'),
[Input('trial-selector', 'value'),
Input('display-options', 'value')]
)
def update_heatmap(trial_id, display_options):
return self._create_heatmap_figure(trial_id, display_options)
# Update AOI plot
@self.app.callback(
Output('aoi-plot', 'figure'),
[Input('trial-selector', 'value'),
Input('display-options', 'value')]
)
def update_aoi_plot(trial_id, display_options):
return self._create_aoi_figure(trial_id, display_options)
# Update statistics table
@self.app.callback(
[Output('stats-table', 'data'),
Output('stats-table', 'columns')],
[Input('trial-selector', 'value')]
)
def update_statistics(trial_id):
stats = self._create_statistics_data(trial_id)
if not stats:
return [], []
columns = [{'name': col, 'id': col} for col in ['Metric', 'Value']]
return stats, columns
def _setup_export_callbacks(self):
"""Set up callbacks for data export."""
@self.app.callback(
Output('download-data', 'data'),
[Input('export-button', 'n_clicks')],
[State('trial-selector', 'value')]
)
def export_data(n_clicks, trial_id):
if n_clicks is None:
return None
try:
# Convert string representation of trial ID back to tuple
trial_values = self._format_trial_id(trial_id)
# Get trial data
trial_data = self._get_trial_data(trial_values)
if len(trial_data) == 0:
print(f"No data found for trial {trial_id}")
return None
# Format filename using trial label
filename = f"eye_movement_data_{self._format_trial_label(trial_values).replace(' | ', '_')}.csv"
return dcc.send_data_frame(trial_data.to_csv, filename, index=False)
except Exception as e:
print(f"Error exporting data: {str(e)}")
return None
def _setup_display_callbacks(self):
"""Set up callbacks for display control."""
@self.app.callback(
[Output('scanpath-plot', 'style'),
Output('heatmap-plot', 'style'),
Output('aoi-plot', 'style')],
[Input('viz-selector', 'value')]
)
def update_visible_plot(selected_viz):
# Create style dicts for each plot
styles = []
for viz_type in ['scanpath', 'heatmap', 'aoi']:
if viz_type == selected_viz:
styles.append({'display': 'block'})
else:
styles.append({'display': 'none'})
return styles
def _get_trial_options(self):
"""Get options for trial selector dropdown."""
if self.data is None:
return []
trial_ids = self._get_unique_trials()
return [{'label': self._format_trial_label(t), 'value': str(t)} for t in trial_ids]
def _get_default_trial(self):
"""Get the default trial ID for initial display."""
if self.data is None:
return None
trial_ids = self._get_unique_trials()
if trial_ids:
return str(trial_ids[0])
return None
def _create_marker_dict(self, durations=None, duration_index=None):
"""Create a marker dictionary for scanpath visualization.
Parameters
----------
durations : array-like, optional
Array of fixation durations
duration_index : int, optional
If provided, only use durations up to this index
"""
marker_dict = dict(size=self.dot_size)
if durations is not None:
if duration_index is not None:
dur_values = durations[:duration_index + 1]
else:
dur_values = [durations[0]] # For initial state
marker_dict.update(dict(
color=dur_values,
colorscale='Viridis',
showscale=True,
cmin=durations.min(),
cmax=durations.max()
))
else:
marker_dict.update(dict(color='blue'))
return marker_dict
def _create_scatter_trace(self, x, y, marker_dict, display_options, index=None):
"""Create a scatter trace for scanpath visualization.
Parameters
----------
x, y : array-like
Coordinates for the scatter points
marker_dict : dict
Marker styling dictionary
display_options : list
Display options for the visualization
index : int, optional
If provided, create trace for frame at this index
"""
is_initial = index is None
x_vals = [x[0]] if is_initial else x[:index + 1]
y_vals = [y[0]] if is_initial else y[:index + 1]
text = ['1'] if is_initial else [str(j+1) for j in range(index + 1)]
text = text if 'labels' in display_options else None
return go.Scatter(
x=x_vals,
y=y_vals,
mode='markers+text' if 'labels' in display_options else 'markers',
marker=marker_dict,
text=text,
textposition="top center",
name='Fixations',
hovertemplate='<b>Fixation %{text}</b><br>' +
'X: %{x}<br>' +
'Y: %{y}<br>' +
('Duration: %{marker.color:.0f} ms<br>' if 'color' in marker_dict else '') +
'<extra></extra>'
)
def _create_scanpath_figure(self, trial_id, display_options):
"""Create an interactive scanpath visualization.
Parameters
----------
trial_id : str or tuple
Identifier for the trial to visualize
display_options : list
List of display options to enable:
- background : bool
Show stimulus image
- aois : bool
Show Areas of Interest
- labels : bool
Show fixation number labels
Returns
-------
plotly.graph_objects.Figure
Interactive figure with scanpath visualization including:
- Animated fixation sequence
- Duration-based color coding
- Background image (if enabled)
- Playback controls
- Hover information
Notes
-----
- Creates animation frames for sequential display
- Includes play/pause controls and frame slider
- Supports duration-based color coding of fixations
- Maintains aspect ratio and proper axis scaling
"""
if trial_id is None:
return go.Figure()
# Get trial data
trial_values = self._format_trial_id(trial_id)
trial_data = self._get_trial_data(trial_values)
if len(trial_data) == 0:
return go.Figure()
# Extract coordinates and durations
x = trial_data[self.col_mapping['x']].values
y = trial_data[self.col_mapping['y']].values
durations = trial_data[self.col_mapping['duration']].values if self.col_mapping['duration'] is not None else None
# Create figure
fig = go.Figure()
# Add background image if requested
if 'background' in display_options and self.col_mapping['stimuli'] in trial_data.columns:
stim_id = trial_data[self.col_mapping['stimuli']].iloc[0]
try:
background_img = self._get_stimuli_image(stim_id)
if background_img is not None:
fig.add_layout_image(
dict(
source=background_img,
xref="x",
yref="y",
x=0,
y=0,
sizex=self.screen_dims[0],
sizey=self.screen_dims[1],
sizing="stretch",
opacity=0.4,
layer="above"
)
)
except:
warnings.warn(f"Could not load stimuli for trial {self._format_trial_label(trial_values)}")
# Create animation frames
frames = []
# Add initial state
initial_marker_dict = self._create_marker_dict(durations)
fig.add_trace(self._create_scatter_trace(x, y, initial_marker_dict, display_options))
# Create frames
for i in range(len(x)):
marker_dict = self._create_marker_dict(durations, i)
frames.append(go.Frame(
data=[self._create_scatter_trace(x, y, marker_dict, display_options, i)],
name=f'frame{i+1}'
))
# Update layout with screen dimensions and animation controls
fig = self._update_figure_layout(fig)
# Add animation controls
animation_controls = {
'updatemenus': [{
'buttons': [
dict(
args=[None, {'frame': {'duration': self.animation_speed, 'redraw': True},
'mode': 'immediate',
'transition': {'duration': 0}}],
label='▶️ Play',
method='animate'
),
dict(
args=[[None], {'frame': {'duration': 0, 'redraw': False},
'mode': 'immediate',
'transition': {'duration': 0}}],
label='⏸️ Pause',
method='animate'
)
],
'type': 'buttons',
'showactive': True,
'x': 0,
'y': -0.1,
'xanchor': 'right',
'yanchor': 'top'
}],
'sliders': [{
'currentvalue': {'visible': True},
'steps': [
dict(
args=[[f'frame{k+1}'], {'frame': {'duration': self.animation_speed, 'redraw': True},
'mode': 'immediate',
'transition': {'duration': 0}}],
label=str(k+1),
method='animate'
) for k in range(len(frames))
],
'x': 0.5,
'y': -0.1,
'xanchor': 'center',
'yanchor': 'top',
'len': 0.9,
'pad': {'t': 0}
}]
}
fig.update_layout(**animation_controls)
# Add frames to figure
fig.frames = frames
return fig
def _create_heatmap_figure(self, trial_id, display_options):
"""Create a heatmap visualization of fixation density.
Parameters
----------
trial_id : str or tuple
Identifier for the trial to visualize
display_options : list
List of display options to enable:
- background : bool
Show stimulus image
Returns
-------
plotly.graph_objects.Figure
Interactive figure with heatmap visualization including:
- Fixation density heatmap
- Background image (if enabled)
- Color scale
- Hover information
Notes
-----
- Uses Gaussian kernel density estimation
- Supports adjustable background image opacity
- Maintains aspect ratio and proper axis scaling
- Includes interactive hover information
"""
if trial_id is None:
return go.Figure()
# Get trial data
trial_values = self._format_trial_id(trial_id)
trial_data = self._get_trial_data(trial_values)
if len(trial_data) == 0:
return go.Figure()
# Extract coordinates and durations
x = trial_data[self.col_mapping['x']].values
y = trial_data[self.col_mapping['y']].values
durations = trial_data[self.col_mapping['duration']].values if self.col_mapping['duration'] is not None else None
# Get background image if requested
background_img = None
if 'background' in display_options and self.col_mapping['stimuli'] in trial_data.columns:
stim_id = trial_data[self.col_mapping['stimuli']].iloc[0]
try:
background_img = self._get_stimuli_image(stim_id)
except:
warnings.warn(f"Could not load stimuli for trial {self._format_trial_label(trial_values)}")
# Generate heatmap data using the plotting utility
heatmap, _ = draw_heatmap(x, y, screen_dims=self.screen_dims, durations=durations, background_img=background_img, return_data=True)
# Create Plotly figure
fig = go.Figure()
# Add heatmap
fig.add_trace(
go.Heatmap(
z=heatmap,
colorscale='Viridis',
showscale=True,
hoverongaps=False,
hovertemplate='X: %{x:.0f}<br>' +
'Y: %{y:.0f}<br>' +
'Density: %{z:.3f}<br>' +
'<extra></extra>'
)
)
# Add background image if available
if background_img is not None:
fig.add_layout_image(
dict(
source=background_img,
xref="x",
yref="y",
x=0,
y=0,
sizex=self.screen_dims[0],
sizey=self.screen_dims[1],
sizing="stretch",
opacity=0.4,
layer="above"
)
)
# Update layout
fig = self._update_figure_layout(fig)
return fig
def _create_aoi_figure(self, trial_id, display_options):
"""Create an AOI visualization with fixation overlay.
Parameters
----------
trial_id : str or tuple
Identifier for the trial to visualize
display_options : list
List of display options to enable:
- background : bool
Show stimulus image
- aois : bool
Show Areas of Interest
- labels : bool
Show fixation number labels
Returns
-------
plotly.graph_objects.Figure
Interactive figure with AOI visualization including:
- AOI polygons with labels
- Fixations colored by AOI
- Background image (if enabled)
- Hover information
Notes
-----
- Groups fixations by AOI membership
- Uses distinct colors for different AOIs
- Supports interactive hover information
- Maintains aspect ratio and proper axis scaling
"""
if trial_id is None:
return go.Figure()
# Get trial data
trial_values = self._format_trial_id(trial_id)
trial_data = self._get_trial_data(trial_values)
if len(trial_data) == 0:
return go.Figure()
# Extract coordinates and durations
x = trial_data[self.col_mapping['x']].values
y = trial_data[self.col_mapping['y']].values
durations = trial_data[self.col_mapping['duration']].values if self.col_mapping['duration'] is not None else None
# Get stimulus ID for AOI lookup
stim_id = None
if self.col_mapping['stimuli'] in trial_data.columns:
stim_id = trial_data[self.col_mapping['stimuli']].iloc[0]
# Create figure
fig = go.Figure()
# Add background image if requested
if 'background' in display_options and stim_id is not None:
try:
background_img = self._get_stimuli_image(stim_id)
if background_img is not None:
fig.add_layout_image(
dict(
source=background_img,
xref="x",
yref="y",
x=0,
y=0,
sizex=self.screen_dims[0],
sizey=self.screen_dims[1],
sizing="stretch",
opacity=0.4,
layer="above"
)
)
except:
warnings.warn(f"Could not load stimuli for trial {self._format_trial_label(trial_values)}")
# Get stimulus-specific AOIs
stimulus_aois = self._get_stimulus_aois(stim_id) if stim_id is not None else None
# Add AOIs if defined and requested
if stimulus_aois and 'aois' in display_options:
# Use a colormap for AOIs
aoi_colors = plt.cm.Set3(np.linspace(0, 1, len(stimulus_aois)))
for (aoi_name, vertices), color in zip(stimulus_aois.items(), aoi_colors):
vertices_array = np.array(vertices)
fig.add_trace(
go.Scatter(
x=vertices_array[:, 0],
y=vertices_array[:, 1],
fill="toself",
fillcolor=f'rgba({int(color[0]*255)},{int(color[1]*255)},{int(color[2]*255)},0.8)',
line=dict(
color=f'rgba({int(color[0]*255)},{int(color[1]*255)},{int(color[2]*255)},1)',
width=2
),
name=aoi_name,
hoverinfo='name'
)
)
# Group fixations by AOI if AOIs are defined
if stimulus_aois:
# Initialize fixation groups
aoi_fixations = {'outside': {'x': [], 'y': [], 'dur': [], 'text': []}}
for aoi_name in stimulus_aois.keys():
aoi_fixations[aoi_name] = {'x': [], 'y': [], 'dur': [], 'text': []}
# Group fixations
for i, (xi, yi) in enumerate(zip(x, y)):
dur = durations[i] if durations is not None else None
aoi = get_fixation_aoi(xi, yi, stimulus_aois)
target_dict = aoi_fixations[aoi if aoi else 'outside']
target_dict['x'].append(xi)
target_dict['y'].append(yi)
if dur is not None:
target_dict['dur'].append(dur)
target_dict['text'].append(str(i+1))
# Plot fixations for each AOI
for aoi_name, fixations in aoi_fixations.items():
if len(fixations['x']) > 0:
fig.add_trace(
go.Scatter(
x=fixations['x'],
y=fixations['y'],
mode='markers+text' if 'labels' in display_options else 'markers',
marker=dict(
size=self.dot_size,
color='grey' if aoi_name == 'outside' else None
),
text=fixations['text'],
textposition="top center",
name=f'Fixations ({aoi_name})',
hovertemplate='<b>Fixation %{text}</b><br>' +
'X: %{x}<br>' +
'Y: %{y}<br>' +
f'AOI: {aoi_name}<br>' +
'<extra></extra>'
)
)
else:
# Plot all fixations without AOI grouping
fig.add_trace(
go.Scatter(
x=x,
y=y,
mode='markers+text' if 'labels' in display_options else 'markers',
marker=dict(
size=self.dot_size,
color='blue'
),
text=[str(i+1) for i in range(len(x))],
textposition="top center",
name='Fixations',
hovertemplate='<b>Fixation %{text}</b><br>' +
'X: %{x}<br>' +
'Y: %{y}<br>' +
'<extra></extra>'
)
)
# Update layout
fig = self._update_figure_layout(fig)
return fig
def _create_statistics_data(self, trial_id):
"""Generate statistics for the current trial and AOIs.
Parameters
----------
trial_id : str or tuple
Identifier for the trial to analyze
Returns
-------
list of dict
List of dictionaries containing statistics:
- Basic metrics (number of fixations)
- Duration statistics (if available)
- AOI-specific metrics (if AOIs defined)
Each dictionary has 'Metric' and 'Value' keys.
Notes
-----
- Calculates basic fixation statistics
- Includes duration-based metrics if available
- Computes AOI-specific statistics if AOIs are defined
- Handles missing data and edge cases
"""
if trial_id is None:
return []
# Get trial data
trial_values = self._format_trial_id(trial_id)
trial_data = self._get_trial_data(trial_values)
if len(trial_data) == 0:
return []
# Extract coordinates
x = trial_data[self.col_mapping['x']].values
y = trial_data[self.col_mapping['y']].values
has_duration = self.col_mapping['duration'] is not None
# Get stimulus ID for AOI lookup
stim_id = None
if self.col_mapping['stimuli'] in trial_data.columns:
stim_id = trial_data[self.col_mapping['stimuli']].iloc[0]
# Initialize statistics
stats = []
# Basic statistics
stats.append({
'Metric': 'Number of Fixations',
'Value': len(x)
})
# Duration-related statistics
if has_duration:
durations = trial_data[self.col_mapping['duration']].values
stats.extend([
{
'Metric': 'Mean Fixation Duration',
'Value': f'{np.mean(durations):.2f}'
},
{
'Metric': 'Sum Fixation Duration',
'Value': f'{np.sum(durations):.2f}'
},
{
'Metric': 'Min Fixation Duration',
'Value': f'{np.min(durations):.2f}'
},
{
'Metric': 'Max Fixation Duration (ms)',
'Value': f'{np.max(durations):.2f}'
}
])
# Get stimulus-specific AOIs
stimulus_aois = self._get_stimulus_aois(stim_id) if stim_id is not None else None
# AOI statistics
if stimulus_aois:
aoi_stats = compute_aoi_statistics(x, y, stimulus_aois, durations if has_duration else None)
for aoi_name, aoi_data in aoi_stats.items():
# Add count statistics
stats.append({
'Metric': f'{aoi_name} - Number of Fixations',
'Value': aoi_data['count']
})
# Add duration-related statistics if available
if has_duration and aoi_data['count'] > 0:
# Total duration
if 'total_duration' in aoi_data:
stats.append({
'Metric': f'{aoi_name} - Sum Fixation Duration',
'Value': f'{aoi_data["total_duration"]:.2f}'
})
elif has_duration:
# Add N/A for duration stats when count is 0
stats.extend([
{
'Metric': f'{aoi_name} - Sum Fixation Duration',
'Value': 'N/A'
}
])
return stats
def _update_figure_layout(self, fig):
"""Update the layout of a figure to match screen dimensions."""
fig.update_layout(
autosize=False,
margin=dict(l=20, r=20, t=40, b=20),
xaxis=dict(
range=[0, self.screen_dims[0]],
showgrid=False,
zeroline=False,
constrain="domain"
),
yaxis=dict(
range=[self.screen_dims[1], 0], # Invert y-axis
showgrid=False,
zeroline=False,
scaleanchor="x",
scaleratio=1, # Maintain aspect ratio
constrain="domain"
)
)
return fig
[docs]
def run(self, debug=False, port=8050, **kwargs):
"""Start the Dash server and run the fixation viewer application.
This method initializes and starts the web server for the fixation viewer application.
The application will be accessible through a web browser at the specified port.
Parameters
----------
debug : bool, default=False
Whether to run the server in debug mode
port : int, default=8050
Port to run the server on
**kwargs : dict
Additional arguments to pass to dash.run_server()
See Dash documentation for available options.
Notes
-----
- The application will run until interrupted (Ctrl+C)
- Access the interface at http://localhost:<port>
- Debug mode provides additional error information
- Default port (8050) can be changed if already in use
"""
self.app.run(debug=debug, port=port, **kwargs)