Source code for ethoscope.stimulators.sleep_depriver_stimulators

'''
any new class added here need to be added to web_utils/control_thread.py too
'''

__author__ = 'quentin'


from ethoscope.stimulators.stimulators import BaseStimulator, HasInteractedVariable

from ethoscope.hardware.interfaces.interfaces import  DefaultInterface
from ethoscope.hardware.interfaces.sleep_depriver_interface import SleepDepriverInterface, SleepDepriverInterfaceCR
from ethoscope.hardware.interfaces.optomotor import OptoMotor


import random


[docs]class IsMovingStimulator(BaseStimulator): _HardwareInterfaceClass = DefaultInterface def __init__(self, hardware_connection=None, velocity_threshold=0.0060, date_range = "", **kwargs): """ class implementing an stimulator that decides whether an animal has moved though does nothing accordingly. :param hardware_connection: a default hardware interface object :param velocity_threshold: Up to which velocity an animal is considered to be immobile :type velocity_threshold: float """ self._velocity_threshold = velocity_threshold self._last_active = 0 super(IsMovingStimulator, self).__init__(hardware_connection, date_range) def _has_moved(self): positions = self._tracker.positions if len(positions ) <2 : return False if len(positions[-1]) != 1: raise Exception("This stimulator can only work with a single animal per ROI") tail_m = positions[-1][0] times = self._tracker.times last_time_for_position = times[-1] last_time = self._tracker.last_time_point # we assume no movement if the animal was not spotted if last_time != last_time_for_position: return False dt_s = abs(times[-1] - times[-2]) / 1000.0 dist = 10.0 ** (tail_m["xy_dist_log10x1000"]/1000.0) velocity = dist / dt_s if velocity > self._velocity_threshold: return True return False def _decide(self): has_moved = self._has_moved() t = self._tracker.times if has_moved:# or xor_diff > self._xor_speed_threshold : self._last_active = t[-1] return HasInteractedVariable(False), {} return HasInteractedVariable(True), {}
[docs]class SleepDepStimulator(IsMovingStimulator): _description = {"overview": "A stimulator to sleep deprive an animal using servo motor. See http://todo/fixme.html", "arguments": [ {"type": "number", "min": 0.0, "max": 1.0, "step":0.0001, "name": "velocity_threshold", "description": "The minimal velocity that counts as movement","default":0.0060}, {"type": "number", "min": 1, "max": 3600*12, "step":1, "name": "min_inactive_time", "description": "The minimal time after which an inactive animal is awaken","default":120}, {"type": "date_range", "name": "date_range", "description": "A date and time range in which the device will perform (see http://tinyurl.com/jv7k826)", "default": ""}, ]} _HardwareInterfaceClass = SleepDepriverInterface _roi_to_channel = { 1:1, 3:2, 5:3, 7:4, 9:5, 12:6, 14:7, 16:8, 18:9, 20:10 } def __init__(self, hardware_connection, velocity_threshold=0.0060, min_inactive_time=120, #s date_range="" ): """ A stimulator to control a sleep depriver module. :param hardware_connection: the sleep depriver module hardware interface :type hardware_connection: :class:`~ethoscope.hardware.interfaces.sleep_depriver_interface.SleepDepriverInterface` :param velocity_threshold: :type velocity_threshold: float :param min_inactive_time: the minimal time without motion after which an animal should be disturbed (in seconds) :type min_inactive_time: float :return: """ self._inactivity_time_threshold_ms = min_inactive_time *1000 #so we use ms internally self._t0 = None super(SleepDepStimulator, self).__init__(hardware_connection, velocity_threshold, date_range=date_range) def _decide(self): roi_id= self._tracker._roi.idx now = self._tracker.last_time_point try: channel = self._roi_to_channel[roi_id] except KeyError: return HasInteractedVariable(False), {} has_moved = self._has_moved() if self._t0 is None: self._t0 = now if not has_moved: if float(now - self._t0) > self._inactivity_time_threshold_ms: self._t0 = None return HasInteractedVariable(True), {"channel":channel} else: self._t0 = now return HasInteractedVariable(False), {}
[docs]class SleepDepStimulatorCR(SleepDepStimulator): _description = {"overview": "A stimulator to sleep deprive an animal using servo motor in Continous Rotation mode. See http://todo/fixme.html", "arguments": [ {"type": "number", "min": 0.0, "max": 1.0, "step":0.0001, "name": "velocity_threshold", "description": "The minimal velocity that counts as movement","default":0.0060}, {"type": "number", "min": 1, "max": 3600*12, "step":1, "name": "min_inactive_time", "description": "The minimal time after which an inactive animal is awaken","default":120}, {"type": "date_range", "name": "date_range", "description": "A date and time range in which the device will perform (see http://tinyurl.com/jv7k826)", "default": ""} ]} _HardwareInterfaceClass = SleepDepriverInterfaceCR _roi_to_channel = { 1:1, 3:2, 5:3, 7:4, 9:5, 12:6, 14:7, 16:8, 18:9, 20:10 } def __init__(self, hardware_connection, velocity_threshold=0.0060, min_inactive_time=120, #s date_range="" ): """ A stimulator to control a sleep depriver module. :param hardware_connection: the sleep depriver module hardware interface :type hardware_connection: :class:`~ethoscope.hardware.interfaces.sleep_depriver_interface.SleepDepriverInterface` :param velocity_threshold: :type velocity_threshold: float :param min_inactive_time: the minimal time without motion after which an animal should be disturbed (in seconds) :type min_inactive_time: float :return: """ self._inactivity_time_threshold_ms = min_inactive_time *1000 #so we use ms internally self._t0 = None super(SleepDepStimulator, self).__init__(hardware_connection, velocity_threshold, date_range=date_range)
[docs]class OptomotorSleepDepriver(SleepDepStimulator): _description = {"overview": "A stimulator to sleep deprive an animal using gear motors. See https://github.com/gilestrolab/ethoscope_hardware/tree/master/modules/gear_motor_sleep_depriver", "arguments": [ {"type": "number", "min": 0.0, "max": 1.0, "step":0.0001, "name": "velocity_threshold", "description": "The minimal velocity that counts as movement","default":0.0060}, {"type": "number", "min": 1, "max": 3600*12, "step":1, "name": "min_inactive_time", "description": "The minimal time after which an inactive animal is awaken(s)","default":120}, {"type": "number", "min": 500, "max": 10000 , "step": 50, "name": "pulse_duration", "description": "For how long to deliver the stimulus(ms)", "default": 1000}, {"type": "number", "min": 0, "max": 3, "step": 1, "name": "stimulus_type", "description": "1 = opto, 2= moto", "default": 2}, {"type": "date_range", "name": "date_range", "description": "A date and time range in which the device will perform (see http://tinyurl.com/jv7k826)", "default": ""} ]} _HardwareInterfaceClass = OptoMotor _roi_to_channel_opto = { 1: 1, 3: 3, 5: 5, 7: 7, 9: 9, 12: 23, 14: 21, 16: 19, 18: 17, 20: 15 } _roi_to_channel_moto = { 1: 0, 3: 2, 5: 4, 7: 6, 9: 8, 12: 22, 14: 20, 16: 18, 18: 16, 20: 14 } def __init__(self, hardware_connection, velocity_threshold=0.0060, min_inactive_time=120, # s pulse_duration = 1000, #ms stimulus_type = 2, # 1 = opto, 2= moto, 3 = both date_range="" ): self._t0 = None # the inactive time depends on the chanel here super(OptomotorSleepDepriver, self).__init__(hardware_connection, velocity_threshold, min_inactive_time, date_range) if stimulus_type == 2: self._roi_to_channel = self._roi_to_channel_moto elif stimulus_type == 1: self._roi_to_channel = self._roi_to_channel_opto self._pulse_duration= pulse_duration def _decide(self): out, dic = super(OptomotorSleepDepriver, self)._decide() dic["duration"] = self._pulse_duration return out,dic
[docs]class ExperimentalSleepDepStimulator(SleepDepStimulator): _description = {"overview": "A stimulator to sleep deprive an animal using servo motor. See http://todo/fixme.html", "arguments": [ {"type": "number", "min": 0.0, "max": 1.0, "step":0.0001, "name": "velocity_threshold", "description": "The minimal velocity that counts as movement","default":0.0060}, {"type": "date_range", "name": "date_range", "description": "A date and time range in which the device will perform (see http://tinyurl.com/jv7k826)", "default": ""} ]} _HardwareInterfaceClass = SleepDepriverInterface _roi_to_channel = { 1:1, 3:2, 5:3, 7:4, 9:5, 12:6, 14:7, 16:8, 18:9, 20:10 } def __init__(self, hardware_connection, velocity_threshold=0.0060, date_range="" ): """ A stimulator to control a sleep depriver module. This is an experimental version where each channel has a different inactivity_time_threshold. :param hardware_connection: the sleep depriver module hardware interface :type hardware_connection: :class:`~ethoscope.hardawre.interfaces.sleep_depriver_interface.SleepDepriverInterface` :param velocity_threshold: :type velocity_threshold: float :return: """ self._t0 = None # the inactive time depends on the chanel here super(ExperimentalSleepDepStimulator, self).__init__(hardware_connection, velocity_threshold, 0, date_range) self._inactivity_time_threshold_ms = None # here we override bind tracker so that we also define inactive time for this stimulator
[docs] def bind_tracker(self, tracker): self._tracker = tracker roi_id = self._tracker._roi.idx try: channel = self._roi_to_channel[roi_id] self._inactivity_time_threshold_ms = round(channel ** 1.7) * 20 * 1000 except KeyError: pass
[docs]class MiddleCrossingStimulator(BaseStimulator): _description = {"overview": "A stimulator to disturb animal as they cross the midline", "arguments": [ {"type": "number", "min": 0.0, "max": 1.0, "step":0.01, "name": "p", "description": "the probability to move the tube when a beam cross was detected","default":1.0}, {"type": "date_range", "name": "date_range", "description": "A date and time range in which the device will perform (see http://tinyurl.com/jv7k826)", "default": ""} ]} _HardwareInterfaceClass = SleepDepriverInterface _refractory_period = 60#s _roi_to_channel = { 1:1, 3:2, 5:3, 7:4, 9:5, 12:6, 14:7, 16:8, 18:9, 20:10 } def __init__(self, hardware_connection, p=1.0, date_range="" ): """ :param hardware_connection: the sleep depriver module hardware interface :type hardware_connection: :class:`~ethoscope.hardawre.interfaces.sleep_depriver_interface.SleepDepriverInterface` :param p: the probability of disturbing the animal when a beam cross happens :type p: float :return: """ self._last_stimulus_time = 0 self._p = p super(MiddleCrossingStimulator, self).__init__(hardware_connection, date_range=date_range) def _decide(self): roi_id = self._tracker._roi.idx now = self._tracker.last_time_point if now - self._last_stimulus_time < self._refractory_period * 1000: return HasInteractedVariable(False), {} try: channel = self._roi_to_channel[roi_id] except KeyError: return HasInteractedVariable(False), {} positions = self._tracker.positions if len(positions) < 2: return HasInteractedVariable(False), {} if len(positions[-1]) != 1: raise Exception("This stimulator can only work with a single animal per ROI") roi_w = float(self._tracker._roi.longest_axis) x_t_zero = positions[-1][0]["x"] / roi_w - 0.5 x_t_minus_one = positions[-2][0]["x"] / roi_w - 0.5 # if roi_id == 12: # print (roi_id, channel, roi_w, positions[-1][0]["x"], positions[-2][0]["x"], x_t_zero, x_t_minus_one) if (x_t_zero > 0) ^ (x_t_minus_one >0): # this is a change of sign if random.uniform(0,1) < self._p: self._last_stimulus_time = now return HasInteractedVariable(True), {"channel": channel} return HasInteractedVariable(False), {"channel": channel}