Source code for FlowCytometryTools.core.gates

"""" @author: Eugene Yurtsev @date 2013-07-25

Defines gates that operate on DataFrames.

API:
    DataFrame -- pandas object
    gate1 -- a polygon gate.

Gates:
    ThresholdGate
    IntervalGate
    QuadGate
    PolyGate
"""
import numpy
import pylab as pl
from matplotlib.path import Path

from FlowCytometryTools.core.common_doc import doc_replacer
from FlowCytometryTools.core.utils import to_list

doc_replacer.update(_gate_pars_name="""\
name : str
    The gate's name.""")
doc_replacer.update(_gate_pars_1_channel="""\
channel : str
    Defines the channel name.""")

doc_replacer.update(_gate_pars_2_channels=
                    """channels : ['channel 1 name', 'channel 2 name']
                        Defines the names of the channels""")

doc_replacer.update(_gate_plot_doc=
                    """Plots the gate.

                    .. warning:
                        The plot function does not check that your
                        axis correspond to the correct channels.

                    Parameters
                    ----------
                    ax : axes to use for plotting the gate on
                    flip : boolean
                        If True, draws the interval
                        along the y-axis instead of along the x-axis

                    Returns
                    -------
                    Reference to created artists.""")


class _ComposableMixin(object):
    """ A mixin' class that enables to compose gates using logic elements. """

    def __and__(self, other):
        return CompositeGate(self, 'and', other)

    def __xor__(self, other):
        return CompositeGate(self, 'xor', other)

    def __or__(self, other):
        return CompositeGate(self, 'or', other)

    def __invert__(self):
        return CompositeGate(self, 'invert')


class Gate(_ComposableMixin):
    """ Defines common interface for specific implementations of the gate classes. """
    unnamed_gate_num = 1

    def __init__(self, vert, channels, region, name=None):
        self.vert = vert
        channels = to_list(channels)
        self.channels = channels

        if name == None:
            self.name = "Unnamed Gate {0}".format(Gate.unnamed_gate_num)
            Gate.unnamed_gate_num += 1
        else:
            self.name = name

        self.region = region
        self.validate_input()

    def validate_input(self):
        """ Optional method to be defined by derived class
        to check whether user input was valid. """
        pass

    def __repr__(self):
        return """Gate Type: {0}
               \tVertices: {1}
               \tChannel(s): {2}
               \tName: {3}
                """.format(self.__class__, self.vert, self.channels, self.name)

    def __str__(self):
        return self.__repr__()

    def __call__(self, dataframe, region=None):
        """
        Filters the dataframe, keeping only events that pass the gate.

        Parameters
        --------------
        dataframe : DataFrame
        region : None, optional
            If specified, the gate region is updated to the given region.
        """
        if region is not None:
            self.region = region

        for c in self.channels:
            if c not in dataframe:
                raise ValueError(
                    'Trying to filter based on channel {channel}, which is not present in the data.'.format(
                        channel=c))

        idx = self._identify(dataframe)

        return dataframe[idx]

    def _find_orientation(self, ax_channels):
        ax_channels = to_list(ax_channels)
        c = self.channels[0]
        if ax_channels is not None:
            try:
                i = ax_channels.index(c)
                if i == 0:
                    flip = False
                else:
                    flip = True
            except ValueError:
                raise Exception("""Trying to plot gate that is defined on channel {0},
                                but figure axis correspond to channels {1}""".format(c,
                                                                                     ax_channels))
        if len(self.channels) == 2:
            c = self.channels[1]
            if c not in ax_channels:
                raise Exception("""Trying to plot gate that is defined on channel {0},
                                but figure axis correspond to channels {1}""".format(c,
                                                                                     ax_channels))
        return flip

    def plot(self, **kwargs):
        """ Plots the gate. Must be specified in derived class. """
        raise NotImplementedError('Plotting is not yet supported for this gate type.')

    def _identify(self, dataframe):
        """ Returns a list of indexes corresponding to events that pass the gate. """
        raise NotImplementedError

    @property
    def region(self):
        """ The region of the gate that passes events. """
        return self._region

    @region.setter
    def region(self, value):
        if value.lower() in self._region_options:
            self._region = value.lower()
        else:
            raise ValueError(
                "region must be one of the following: {0}".format(self._region_options))


[docs]class ThresholdGate(Gate):
[docs] @doc_replacer def __init__(self, threshold, channel, region, name=None): """ Passes all events above or below a given threshold. Parameters ---------- threshold : float Location of the gate {_gate_pars_1_channel} region : ['above', 'below'] If 'above', the gate only passes through data that lies above the threshold. {_gate_pars_name} """ #: Possible regions self._region_options = ('above', 'below')
super(ThresholdGate, self).__init__(threshold, channel, region, name) def _identify(self, dataframe): """ Identifies which of the data points in the dataframe pass the gate. """ idx = dataframe[self.channels[0]] >= self.vert # Get indexes that are above threshold if self.region == 'below': idx = ~idx return idx @doc_replacer def plot(self, flip=False, ax_channels=None, ax=None, *args, **kwargs): """ {_gate_plot_doc} """ if ax == None: ax = pl.gca() if ax_channels is not None: flip = self._find_orientation(ax_channels) plot_func = ax.axes.axhline if flip else ax.axes.axvline kwargs.setdefault('color', 'black')
return plot_func(self.vert, *args, **kwargs)
[docs]class IntervalGate(Gate):
[docs] @doc_replacer def __init__(self, vert, channel, region, name=None): """ Passes all events either inside or outside the given interval. Parameters ---------- vert : tuple Tuple describes the interval (xmin, xmax) {_gate_pars_1_channel} region : ['in', 'out'] If 'in', the gate only passes through data that lies inside the interval. {_gate_pars_name} """ self._region_options = ('in', 'out')
super(IntervalGate, self).__init__(vert, channel, region, name) def validate_input(self): """Raise appropriate exception if gate was defined incorrectly.""" if self.vert[1] <= self.vert[0]: raise ValueError(u'{} must be larger than {}'.format(self.vert[1], self.vert[0])) def _identify(self, dataframe): """Return bool series which is True for indexes that 'pass' the gate""" idx = ((dataframe[self.channels[0]] <= self.vert[1]) & (dataframe[self.channels[0]] >= self.vert[0])) if self.region == 'out': idx = ~idx return idx @doc_replacer def plot(self, flip=False, ax_channels=None, ax=None, *args, **kwargs): """ {_gate_plot_doc} """ if ax == None: ax = pl.gca() if ax_channels is not None: flip = self._find_orientation(ax_channels) plot_func = ax.axes.axhline if flip else ax.axes.axvline kwargs.setdefault('color', 'black') a1 = plot_func(self.vert[0], *args, **kwargs) a2 = plot_func(self.vert[1], *args, **kwargs)
return (a1, a2)
[docs]class QuadGate(Gate):
[docs] @doc_replacer def __init__(self, vert, channels, region, name=None): """ Passes events in a given quadrant of a 2d-plot. Parameters ---------- vert : tuple A tuple of length 2: (x_center, y_center) Specifies the center of the quad gate. {_gate_pars_2_channels} region : ['top left', 'top right', 'bottom left', 'bottom right'] For example, if 'top left', the gate only passes through data that lies in the top left region. {_gate_pars_name} """ self._region_options = ('top left', 'top right', 'bottom left', 'bottom right')
super(QuadGate, self).__init__(vert, channels, region, name) def _identify(self, dataframe): """ Returns a list of indexes containing only the points that pass the filter. Parameters ---------- dataframe : DataFrame """ ## # TODO Fix this implementation. (i.e., why not support just 'left') # At the moment this implementation won't work at all. # The logic here can be simplified. id1 = dataframe[self.channels[0]] >= self.vert[0] id2 = dataframe[self.channels[1]] >= self.vert[1] if 'left' in self.region: id1 = ~id1 if 'bottom' in self.region: id2 = ~id2 idx = id1 & id2 if 'out' in self.region: idx = ~idx return idx @doc_replacer def plot(self, flip=False, ax_channels=None, ax=None, *args, **kwargs): """ {_gate_plot_doc} """ if ax == None: ax = pl.gca() kwargs.setdefault('color', 'black') if ax_channels is not None: flip = self._find_orientation(ax_channels) if not flip: a1 = ax.axes.axvline(self.vert[0], *args, **kwargs) a2 = ax.axes.axhline(self.vert[1], *args, **kwargs) else: a1 = ax.axes.axvline(self.vert[1], *args, **kwargs) a2 = ax.axes.axhline(self.vert[0], *args, **kwargs)
return (a1, a2)
[docs]class PolyGate(Gate):
[docs] @doc_replacer def __init__(self, vert, channels, region='in', name=None): """ Passes all events that are either inside or outside the polygon. Parameters ---------- vert : list of 2-tuples [(x1, y1), (x2, y2), (x3, y3)] Each 2-tuple describes the location of a vertex. {_gate_pars_2_channels} region : ['in', 'out'] If 'in', the gate only passes through data that lies inside the interval. {_gate_pars_name} """ self._region_options = ('in', 'out')
super(PolyGate, self).__init__(vert, channels, region, name) def _identify(self, dataframe): """ Returns a list of indexes containing only the points that pass the filter. Parameters ---------- dataframe : DataFrame """ path = Path(self.vert) idx = path.contains_points(dataframe.filter(self.channels)) if self.region == 'out': idx = ~idx return idx @doc_replacer def plot(self, flip=False, ax_channels=None, ax=None, *args, **kwargs): """ {_gate_plot_doc} """ if ax == None: ax = pl.gca() if ax_channels is not None: flip = self._find_orientation(ax_channels) if flip: vert = [v[::-1] for v in self.vert] else: vert = self.vert kwargs.setdefault('fill', False) kwargs.setdefault('color', 'black') poly = pl.Polygon(vert, *args, **kwargs)
return ax.add_artist(poly)
[docs]class CompositeGate(_ComposableMixin): """ Defines a composite gate that is generated by the logical addition of one or more gates. Examples -------- You can create the union of two gates by combining them with how='or'. >>> union_gate = CompositeGate(gate1, 'or', gate2) However, There is a shorthand to do the same operation >>> union_gate = gate1 | gate2 Similarly, if you wanted the intersection: >>> intersection_gate = CompositeGate(gate1, 'and', gate2) With the shorthand >>> intersection_gate = gate1 & gate2 As another example, let's invert a gate. The longway: >>> inverted_gate = CompositeGate(gate1, 'invert') The shortway: >>> inverted_gate = ~gate1 .. warning:: When using the shorthand notation you must use the syntax '~', '&', '|'. Do **NOT** use 'and', 'or', 'not'. """
[docs] def __init__(self, gate1, how, gate2=None): """ Instead of using this class directly to create composite gates, it is recommended you use the shorthand syntax presented directly above. Parameters ---------- gate1 : subclass of Gate how : ['and' | 'or' | 'invert' | 'xor'] Describes how to combine the gates gate2 : subclass of Gate Need to specify is how is anything but 'not'. """ self.gates = [gate1] if gate2 is not None: self.gates.append(gate2)
self.how = how @property def name(self): if len(self.gates) == 1: gate = self.gates[0] return '~{0}'.format(gate.name) else: return '{0} {1} {2}'.format(self.gates[0].name, self.how, self.gates[1].name) def __str__(self): return self.name def _identify(self, dataframe): idx = [gate._identify(dataframe) for gate in self.gates] if self.how == 'and': function = numpy.logical_and elif self.how == 'or': function = numpy.logical_or elif self.how == 'invert': function = numpy.logical_not elif self.how == 'xor': function = numpy.logical_xor else: supported_values = ('and', 'or', 'invert', 'xor') raise ValueError( "Unsupported value for how. how must be in ({0})".format(supported_values)) return function(*idx) def __call__(self, dataframe): idx = self._identify(dataframe) return dataframe[idx] @doc_replacer def plot(self, flip=False, ax_channels=None, ax=None, *args, **kwargs): """ {_gate_plot_doc} """ for gate in self.gates:
gate.plot(flip=flip, ax_channels=ax_channels, ax=ax, *args, **kwargs)