Sources

Built-in Sources

Generic Stomp Listener

class flow.source.stomp.GenericStompListener(stomp_url)

Source class that listens for events on a Stomp URL.

Options given under [Source]:

[Source]
stomp url = <full stomp url>

To use this with your flow daemon:

from flow import Flow
from flow.source import GenericStompListener

class MyFlow(Flow):
    SOURCE = GenericStompListener

    def start(self, message):
        # message is a vizone.net.message_queue.Message
        pass

Asset Entry Listener

class flow.source.asset.AssetEntryListener

Source class that listens for Asset Entry events. Make sure to have the Asset/Admin permission or it won’t work.

Options given under [Source]:

[Source]
# None

To use this with your flow daemon:

from flow import Flow
from flow.source import AssetEntryListener

class MyFlow(Flow):
    SOURCE = AssetEntryListener

    def start(self, asset):
        # asset is a vizone.payload.asset.Item
        pass

Unmanaged Files Listener

class flow.source.location.UnmanagedFilesListener(location=None, skip_empty_files='no')

Source class that listens for UnmanagedFile events for a given location.

Options given under [Source]:

[Source]
location = <handle of an import storage>
skip empty files = <yes/no>

To use this with your flow daemon:

from flow import Flow
from flow.source import UnmanagedFilesListener

class MyFlow(Flow):
    SOURCE = UnmanagedFilesListener

    def start(self, f):
        # f is a vizone.payload.media.UnmanagedFile
        pass

STDIN

class flow.source.local.STDIN(payload_class=None)

Source class that reads standard in, designed to be run once, and especially for Transfer Plugins.

Options given under [Source]:

[Source]
payload class = None|vizone.payload.transfer.PluginData|...

To use this with your flow daemon:

from flow import Flow
from flow.source.local import STDIN

class MyFlow(Flow):
    SOURCE = STDIN

    def start(self, data):
        # data is a vizone.payload.transfer.PluginData if that format is
        pass

Interval

class flow.source.interval.Interval(interval=60, window_start=None, window_end=None)

Source class that triggers an event on interval.

Options given under [Source]:

[Source]
interval = 60 (seconds)
window start = 01:00 [HH[:MM[:SS]]]
window end = 03:00 [HH[:MM[:SS]]]

To use this with your flow daemon:

from flow import Flow
from flow.source import Interval

class MyFlow(Flow):
    SOURCE = Interval

    def start(self, data):
        pass

Note that data is always None for this source.

Building a Web Interface

This is an example of a web interface built with Flow. The advantages of using Flow for this are:

  • Easy to configure.
  • The worker pool can be used to handle asyncronous jobs.

It’s recommended to use bottle since it’s part of the python-one package but any other framework can be used. This example features bottle however:

from flow import Flow, EventBased
from flow.needs import NeedsClient
from vizone.resource.asset import get_asset_by_id
from one_depends import bottle

class Web(EventBased, NeedsClient):
    _has_event_loop = True  # This prevents Flow from running a second
                            # event loop when bottle.run() exists.
    instance = None

    def __init__(self, interface='localhost', port=8080):
        self.server_interface = interface
        self.server_port = int(port)
        self.callback = None

        logging.log("Web Interface Settings", {
            'interface': self.server_interface,
            'port': self.server_port,
        }, 'pp')

        Web.instance = self  # We use this instance as a singleton

    def run(self):
        bottle.run(
            host=self.server_interface,
            port=self.server_port,
        )


class XmlServer(Flow):
    SOURCE = Web

    # this class could also implement the run(message) method and act
    # as a worker pool. The web method would then call
    # Web.interface.callback(message) to spawn a job. If a worker pool
    # is not needed, just leave this class like implemented here.


@bottle.get('/<asset_id>')
def get_asset(asset_id):
    """
    Serve XML for a given asset id.
    """
    try:
        asset = get_asset_by_id(asset_id, client=Web.instance.client)
    except HTTPClientError:
        return bottle.HttpError(status=404, body="There is no such asset.")

    bottle.response.status = 200
    bottle.response.set_header('Content-Type', asset.content_type)
    bottle.response.set_header('Content-Disposition', 'attachment; filename="%s.xml"' % asset.id)
    return output.generate()

And the corresponding INI file:

[Flow]
app name = xmlserver
class = xmlserver.XmlServer

[Source]
interface = localhost
port = 34566

[Viz One]
hostname = vizoneserver
username = user
password = password
use https = no
check certificates = no