Source code for gstswitch.testsource

"""
The testsource contains all gstreamer pipelines
It provides the abse for all the other gstreamer
components are build upon.
"""

import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst

GObject.threads_init()
Gst.init(None)

from .exception import RangeError
import random

# from pipeline import *
# IMPORTS

__all__ = ["Preview", "VideoSrc", "AudioSrc"]


class BasePipeline(Gst.Pipeline):

    """A Basic pipeline
    """

    def __init__(self):
        Gst.Pipeline.__init__(self)
        self._playing = False

    def play(self):
        """Set the pipeline as playing"""
        self._playing = True
        self.set_state(Gst.State.PLAYING)

    def pause(self):
        """Pause the pipeline"""
        self._playing = False
        self.set_state(Gst.State.PAUSED)

    def disable(self):
        """Disable the pipeline"""
        self._playing = False
        self.set_state(Gst.State.NULL)

    @classmethod
    def make(cls, elem, description=''):
        """Make a new gstreamer pipeline element
        :param elem: The name of the element to be made
        :param description: Description of the element to be made
        returns: The element which is made
        """
        element = Gst.ElementFactory.make(elem, description)
        return element


class VideoPipeline(BasePipeline):

    """A Video Pipeline which can be used by a Video Test Source
    :param port: The port of where the TCP stream will be sent
    Should be same as video port of gst-switch-src
    :param width: The width of the output video
    :param height: The height of the output video
    :param pattern: The videotestsrc pattern of the output video
    :param timeoverlay: True to enable a running time over video
    :param clockoverlay: True to enable current clock time over video
    """

    def __init__(
            self,
            port,
            host='localhost',
            width=300,
            height=200,
            pattern=None,
            timeoverlay=False,
            clockoverlay=False):
        super(VideoPipeline, self).__init__()

        self.host = host

        src = self.make_videotestsrc(pattern)
        self.add(src)
        vfilter = self.make_capsfilter(width, height)
        self.add(vfilter)
        src.link(vfilter)
        gdppay = self.make_gdppay()
        self.add(gdppay)
        if timeoverlay:
            _timeoverlay = self.make_timeoverlay()
        if clockoverlay:
            _clockoverlay = self.make_clockoverlay()
        if timeoverlay and clockoverlay:
            self.add(_timeoverlay)
            self.add(_clockoverlay)
            vfilter.link(_timeoverlay)
            _timeoverlay.link(_clockoverlay)
            _clockoverlay.link(gdppay)
        if timeoverlay:
            self.add(_timeoverlay)
            vfilter.link(_timeoverlay)
            _timeoverlay.link(gdppay)
        if clockoverlay:
            self.add(_clockoverlay)
            vfilter.link(_clockoverlay)
            _clockoverlay.link(gdppay)
        else:
            vfilter.link(gdppay)

        sink = self.make_tcpclientsink(port)
        self.add(sink)
        gdppay.link(sink)

    def make_videotestsrc(self, pattern):
        """Return a videotestsrc element
        :param pattern: The video test source pattern (1-20)
        :return: A video test source element
        """
        element = self.make('videotestsrc', 'src')
        element.set_property('pattern', int(pattern))
        return element

    def make_capsfilter(self, width, height):
        """Return a caps filter
        :param width: The width of the caps
        :param height: The height of the caps
        :returns: A caps filter element
        """
        element = self.make("capsfilter", "vfilter")
        width = str(width)
        height = str(height)
        capsstring = "video/x-raw, format=(string)I420, width={0},\
         height={1}".format(width, height)
        print capsstring
        caps = Gst.Caps.from_string(capsstring)
        element.set_property('caps', caps)
        return element

    def make_gdppay(self):
        """Return a gdppay element
        :returns: A gdppay element
        """
        element = self.make('gdppay', 'gdppay')
        return element

    def make_timeoverlay(self):
        """Return a time overlay element (Verdana bold 50)
        :returns: a time overlay element
        """
        element = self.make('timeoverlay', 'timeoverlay')
        element.set_property('font-desc', "Verdana bold 50")
        return element

    def make_clockoverlay(self):
        """Return a clock overlay element (Verdana bold 50)
        :returns: a clock overlay element
        """
        element = self.make('clockoverlay', 'clockoverlay')
        element.set_property('font-desc', "Verdana bold 50")
        return element

    def make_tcpclientsink(self, port):
        """Return a TCP client sink element
        :port: Port to sink
        :returns: A TCP client sink element
        """
        element = self.make('tcpclientsink', 'tcpclientsink')
        element.set_property('host', self.host)
        element.set_property('port', int(port))
        return element


class AudioPipeline(BasePipeline):

    """docstring for AudioPipeline"""

    def __init__(
            self,
            port,
            host='localhost',
            freq=110,
            wave=None):
        super(AudioPipeline, self).__init__()

        self.host = host

        src = self.make_audiotestsrc(freq, wave)
        self.add(src)
        gdppay = self.make_gdppay()
        self.add(gdppay)
        src.link(gdppay)
        sink = self.make_tcpclientsink(port)
        self.add(sink)
        gdppay.link(sink)

    def make_audiotestsrc(self, freq, wave=None):
        """Return a Audio Source Element
        :freq: The Frequency
        :wave: The wave pattern (valid between 1 and 12)
        """
        element = self.make('audiotestsrc', 'src')
        element.set_property('freq', int(freq))
        element.set_property('wave', int(wave))
        return element

    def make_gdppay(self):
        """Return a gdppay element
        :returns: A gdppay element
        """
        element = self.make('gdppay', 'gdppay')
        return element

    def make_tcpclientsink(self, port):
        """Return a TCP client sink element
        :port: Port to sink
        :returns: A TCP client sink element
        """
        element = self.make('tcpclientsink', 'tcpclientsink')
        element.set_property('host', self.host)
        element.set_property('port', int(port))
        return element


class PreviewPipeline(BasePipeline):

    """Pipeline for usage by a Preview
    :param port: The preview port
    """

    def __init__(self, port):
        super(PreviewPipeline, self).__init__()
        self.host = 'localhost'
        self.preview_port = port
        src = self.make_tcpclientsrc()
        self.add(src)

        gdpdepay = self.make_gdpdepay()
        self.add(gdpdepay)
        src.link(gdpdepay)

        conv1 = self.make_videoconvert("conv1")
        self.add(conv1)
        gdpdepay.link(conv1)

        cairo = self.make_cairooverlay()
        self.add(cairo)
        conv1.link(cairo)

        conv2 = self.make_videoconvert("conv2")
        self.add(conv2)
        cairo.link(conv2)

        sink = self.make_xvimagesink()
        self.add(sink)
        conv2.link(sink)

    def make_tcpclientsrc(self):
        """Return a TCP Client Source element
        :param port: The port of the server
        :returns: A TCP Client Source element
        """
        element = self.make('tcpclientsrc', 'tcpclientsrc')
        element.set_property('host', self.host)
        element.set_property('port', self.preview_port)
        return element

    def make_gdpdepay(self):
        """Return a gdpdepay element
        :returns: A gdpdepay element
        """
        element = self.make('gdpdepay', 'gdpdepay')
        return element

    def make_videoconvert(self, desc):
        """Return a videoconvert element
        :returns: A videoconvert element
        """
        element = self.make('videoconvert', desc)
        return element

    def make_cairooverlay(self):
        """Return a cairooverlay element
        :returns: A cairooverlay element
        """
        element = self.make('cairooverlay', 'cairooverlay')
        return element

    def make_xvimagesink(self):
        """Return a xvimagesink element
        :returns: A xvimagesink element
        """
        element = self.make('xvimagesink', 'xvimagesink')
        element.set_property('sync', 'false')
        return element


[docs]class VideoSrc(object): """A Test Video Source :param width: The width of the output video :param height: The height of the output video :param pattern: The videotestsrc pattern of the output video None for random :param timeoverlay: True to enable a running time over video :param clockoverlay: True to enable current clock time over video """ HOST = 'localhost' def __init__( self, port, width=300, height=200, pattern=None, timeoverlay=False, clockoverlay=False): super(VideoSrc, self).__init__() self._port = None self._width = None self._height = None self._pattern = None self._timeoverlay = None self._clockoverlay = None self.port = port self.width = width self.height = height self.pattern = self.generate_pattern(pattern) self.timeoverlay = timeoverlay self.clockoverlay = clockoverlay self.pipeline = VideoPipeline( self.port, self.HOST, self.width, self.height, self.pattern, self.timeoverlay, self.clockoverlay) @property def port(self): """Get the Video Port""" return self._port @port.setter
[docs] def port(self, port): """Set the video Port :raises RangeError: Port not in range 1 to 65535 :raises TypeError: Port cannot be converted to integer :raises ValueError: Port cannot be left blank """ if not port: raise ValueError("Port: '{0}' cannot be blank" .format(port)) else: try: i = int(port) if i < 1 or i > 65535: raise RangeError('Port must be in range 1 to 65535') else: self._port = port except TypeError: raise TypeError("Port must be a string or number," "not, '{0}'".format(type(port))) except ValueError: raise TypeError("Port must be a valid number")
@property def width(self): """Get the width""" return self._width @width.setter
[docs] def width(self, width): """Set the Width raises ValueError: Width must be a positive value raises ValueError: Width must not be blank raises TypeError: Width must be convertable into a float """ if not width: raise ValueError("Width: '{0}' cannot be blank" .format(width)) try: i = float(width) if i <= 0: raise ValueError("Width: '{0}' must be a valid positive value") else: self._width = width except TypeError: raise TypeError("Width must be a valid value not '{0}'" .format(type(width)))
@property def height(self): """Get the height""" return self._height @height.setter
[docs] def height(self, height): """Set the height raises ValueError: Height must be a positive value raises ValueError: Height must not be blank raises TypeError: Height must be convertable into a float """ if not height: raise ValueError("Height: '{0}' cannot be blank" .format(height)) try: i = float(height) if i <= 0: raise ValueError( "Height: '{0}' must be a valid positive value") else: self._height = height except TypeError: raise TypeError("Height must be a valid value not '{0}'" .format(type(height)))
@property def pattern(self): """Get the Pattern""" return self._pattern @pattern.setter
[docs] def pattern(self, pattern): """Set the Pattern :raises RangeError: Pattern must be in range 0 to 19 :raises TypeError: Pattern must be convertable to an integer """ try: i = int(float(pattern)) if i < 0 or i > 19: raise RangeError('Pattern must be in range 0 and 19') else: self._pattern = pattern except ValueError: raise TypeError("Pattern must be a valid number")
@property def timeoverlay(self): """Get the timeoverlay""" return self._timeoverlay @timeoverlay.setter
[docs] def timeoverlay(self, timeoverlay): """Set the timeoverlay :raises ValueError: Timeoverlay must be True or False """ timeover = str(timeoverlay) if timeover == 'True' or timeover == 'False': self._timeoverlay = timeoverlay else: raise ValueError("Timeoverlay: '{0}' must be True of False" .format(timeoverlay))
@property def clockoverlay(self): """Get the Clockoverlay""" return self._clockoverlay @clockoverlay.setter
[docs] def clockoverlay(self, clockoverlay): """Set the Clockoverlay :raises ValueError: Clockoverlay must be True or False """ clockover = str(clockoverlay) if clockover == 'True' or clockover == 'False': self._clockoverlay = clockoverlay else: raise ValueError("Clockoverlay: '{0}' must be True of False" .format(clockoverlay))
[docs] def run(self): """Run the pipeline""" self.pipeline.play()
[docs] def pause(self): """End the pipeline""" self.pipeline.pause()
[docs] def end(self): """End/disable the pipeline""" self.pipeline.disable()
@classmethod
[docs] def generate_pattern(cls, pattern): """Generate a random pattern if not specified """ if pattern is None: pattern = random.randint(0, 19) pattern = str(pattern) return pattern
[docs]class AudioSrc(object): """docstring for AudioSrc""" HOST = 'localhost' def __init__( self, port, freq=110, wave=None): super(AudioSrc, self).__init__() self._port = None self._freq = None self._wave = None self.port = port self.freq = freq self.wave = self.generate_wave(wave) self.pipeline = AudioPipeline( self.port, self.HOST, self.freq, self.wave) @property def port(self): """Get the Audio Port""" return self._port @port.setter
[docs] def port(self, port): """Set the Audio Port :raises RangeError: Port not in range 1 to 65535 :raises TypeError: Port cannot be converted to integer :raises ValueError: Port cannot be left blank """ if not port: raise ValueError("Port: '{0}' cannot be blank" .format(port)) else: try: i = int(port) if i < 1 or i > 65535: raise RangeError('Port must be in range 1 to 65535') else: self._port = port except TypeError: raise TypeError("Port must be a string or number," "not, '{0}'".format(type(port))) except ValueError: raise TypeError("Port must be a valid number")
@property def freq(self): """Get the Frequency""" return self._freq @freq.setter
[docs] def freq(self, freq): """Set the Frequency raises ValueError: Width must be a positive value raises ValueError: Width must not be blank raises TypeError: Width must be convertable into a float """ if not freq: raise ValueError("Frequency: '{0}' cannot be blank" .format(freq)) else: try: i = int(freq) if i < 1: raise RangeError("Frequency must be a positive value") else: self._freq = freq except TypeError: raise TypeError("Frequency must be a string or number," "not, '{0}'".format(type(freq))) except ValueError: raise TypeError("Frequency must be a valid number")
@property def wave(self): """Get the wave number""" return self._wave @wave.setter
[docs] def wave(self, wave): """Set the Wave number :raises RangeError: Wave must be in range 0 to 12 :raises TypeError: Wave must be convertable to integer """ try: i = int(float(wave)) if i < 0 or i > 12: raise RangeError('Wave must be in range 0 and 12') else: self._wave = wave except ValueError: raise TypeError("Wave must be a valid number")
[docs] def run(self): """Run the pipeline""" self.pipeline.play()
[docs] def pause(self): """End the pipeline""" self.pipeline.pause()
[docs] def end(self): """End/disable the pipeline""" self.pipeline.disable()
@classmethod
[docs] def generate_wave(cls, wave): """Generate a random wave if not specified """ if wave is None: wave = random.randint(0, 12) wave = str(wave) return wave
[docs]class Preview(object): """A Preview Element :param port: The preview port """ def __init__(self, port): super(Preview, self).__init__() self._preview_port = None self.preview_port = port self.pipeline = PreviewPipeline(self.preview_port) @property def preview_port(self): """Get the Preview Port""" return self._preview_port @preview_port.setter
[docs] def preview_port(self, preview_port): """Set the Preview Port :raises RangeError: Port not in range 1 to 65535 :raises TypeError: Port cannot be converted to integer :raises ValueError: Port cannot be left blank """ if not preview_port: raise ValueError("preview_port: '{0}' cannot be blank" .format(preview_port)) else: try: i = int(preview_port) if i < 1 or i > 65535: raise RangeError( 'preview_port must be in range 1 to 65535') else: self._preview_port = preview_port except TypeError: raise TypeError("preview_port must be a string or number," "not, '{0}'".format(type(preview_port))) except ValueError: raise TypeError("Port must be a valid number")
[docs] def run(self): """Run the pipeline""" self.pipeline.play()
[docs] def pause(self): """Pause the pipeline""" self.pipeline.pause()
[docs] def end(self): """End/disable the pipeline""" self.pipeline.disable()