Sources¶
Contents
Built-in Sources¶
Generic Stomp Listener¶
-
class
flow.source.stomp.GenericStompListener(stomp_url)¶ Source class that listens for events on a Stomp URL.
Options given under [Source]:
[Source] stomp url = <full stomp url>
To use this with your flow daemon:
from flow import Flow from flow.source import GenericStompListener class MyFlow(Flow): SOURCE = GenericStompListener def start(self, message): # message is a vizone.net.message_queue.Message pass
Asset Entry Listener¶
-
class
flow.source.asset.AssetEntryListener¶ Source class that listens for Asset Entry events. Make sure to have the Asset/Admin permission or it won’t work.
Options given under [Source]:
[Source] # None
To use this with your flow daemon:
from flow import Flow from flow.source import AssetEntryListener class MyFlow(Flow): SOURCE = AssetEntryListener def start(self, asset): # asset is a vizone.payload.asset.Item pass
Unmanaged Files Listener¶
-
class
flow.source.location.UnmanagedFilesListener(location=None, skip_empty_files='no')¶ Source class that listens for UnmanagedFile events for a given location.
Options given under [Source]:
[Source] location = <handle of an import storage> skip empty files = <yes/no>
To use this with your flow daemon:
from flow import Flow from flow.source import UnmanagedFilesListener class MyFlow(Flow): SOURCE = UnmanagedFilesListener def start(self, f): # f is a vizone.payload.media.UnmanagedFile pass
STDIN¶
-
class
flow.source.local.STDIN(payload_class=None)¶ Source class that reads standard in, designed to be run once, and especially for Transfer Plugins.
Options given under [Source]:
[Source] payload class = None|vizone.payload.transfer.PluginData|...
To use this with your flow daemon:
from flow import Flow from flow.source.local import STDIN class MyFlow(Flow): SOURCE = STDIN def start(self, data): # data is a vizone.payload.transfer.PluginData if that format is pass
Interval¶
-
class
flow.source.interval.Interval(interval=60, window_start=None, window_end=None)¶ Source class that triggers an event on interval.
Options given under [Source]:
[Source] interval = 60 (seconds) window start = 01:00 [HH[:MM[:SS]]] window end = 03:00 [HH[:MM[:SS]]]
To use this with your flow daemon:
from flow import Flow from flow.source import Interval class MyFlow(Flow): SOURCE = Interval def start(self, data): pass
Note that data is always
Nonefor this source.
Building a Web Interface¶
This is an example of a web interface built with Flow. The advantages of using Flow for this are:
- Easy to configure.
- The worker pool can be used to handle asyncronous jobs.
It’s recommended to use bottle since it’s part of the python-one package but any other framework can be used. This example features bottle however:
from flow import Flow, EventBased
from flow.needs import NeedsClient
from vizone.resource.asset import get_asset_by_id
from one_depends import bottle
class Web(EventBased, NeedsClient):
_has_event_loop = True # This prevents Flow from running a second
# event loop when bottle.run() exists.
instance = None
def __init__(self, interface='localhost', port=8080):
self.server_interface = interface
self.server_port = int(port)
self.callback = None
logging.log("Web Interface Settings", {
'interface': self.server_interface,
'port': self.server_port,
}, 'pp')
Web.instance = self # We use this instance as a singleton
def run(self):
bottle.run(
host=self.server_interface,
port=self.server_port,
)
class XmlServer(Flow):
SOURCE = Web
# this class could also implement the run(message) method and act
# as a worker pool. The web method would then call
# Web.interface.callback(message) to spawn a job. If a worker pool
# is not needed, just leave this class like implemented here.
@bottle.get('/<asset_id>')
def get_asset(asset_id):
"""
Serve XML for a given asset id.
"""
try:
asset = get_asset_by_id(asset_id, client=Web.instance.client)
except HTTPClientError:
return bottle.HttpError(status=404, body="There is no such asset.")
bottle.response.status = 200
bottle.response.set_header('Content-Type', asset.content_type)
bottle.response.set_header('Content-Disposition', 'attachment; filename="%s.xml"' % asset.id)
return output.generate()
And the corresponding INI file:
[Flow]
app name = xmlserver
class = xmlserver.XmlServer
[Source]
interface = localhost
port = 34566
[Viz One]
hostname = vizoneserver
username = user
password = password
use https = no
check certificates = no