A typical end-to-end workflow involves heterogeneous compute nodes which include FPGA for accelerated services like ML, video, and database acceleration and CPUs for I/O with outside world and compute not implemented on FPGA. Vitis AI provides a set of APIs and functions to enable composition of streaming applications in Python. Xstream APIs build on top of the features provided by Xbutler. The components of Xstream API are as follows.
- Xstream ($VAI_PYTHON_DIR
/vai/dpuv1/rt/xstream.py) provides a standard mechanism for streaming data between multiple processes and controlling execution flow / dependencies.
- Xstream Channel: Channels are defined by an alphanumeric string. Xstream Nodes may publish payloads to channels and subscribe to channels to receive payloads. The default pattern is PUB-SUB, that is, all subscribers of a channel will receive all payloads published to that channel. Payloads are queued up on the subscriber side in FIFO order until the subscriber consumes them off the queue.
- Xstream Payloads contain two items: a blob of binary data and metadata. The binary blob and metadata are transmitted using Redis, as an object store. The binary blob is meant for large data. The metadata is meant for smaller data like IDs, arguments and options. The object IDs are transmitted through ZMQ. ZMQ is used for stream flow control. The id field is required in the metadata. An empty payload is used to signal the end of transmission.
- Xstream Node: Each Xstream Node is a stream processor. It is a separate
process that can subscribe to zero or more input channels, and output to zero or
more output channels. A node may perform computation on payload received on its
input channel(s). The computation can be implemented in CPU, FPGA or GPU. To define
a new node, add a new Python file in
vai/dpuv1/rt/xsnodes. See ping.py as an example. Every node should loop forever upon construction. On each iteration of the loop, it should consume payloads from its input channel(s) and publish payloads to its output channel(s). If an empty payload is received, the node should forward the empty payload to its output channels by calling
- Xstream Graph: Use
$VAI_PYTHON_DIR/vai/dpuv1/rt/xsnodes/grapher.pyto construct a graph consisting of one or more nodes. When Graph.serve() is called, the graph will spawn each node as a separate process and connect their input/output channels. The graph manages the life and death of all its nodes. See
neptune/services/ping.pyfor a graph example. For example:
graph = grapher.Graph("my_graph") graph.node("prep", pre.ImagenetPreProcess, args) graph.node("fpga", fpga.FpgaProcess, args) graph.node("post", post.ImagenetPostProcess, args) graph.edge("START", None, "prep") graph.edge("fpga", "prep", "fpga") graph.edge("post", "fpga", "post") graph.edge("DONE", "post", None) graph.serve(background=True) ... graph.stop()
- Xstream Runner: The runner is a convenience class that pushes a payload to
the input channel of a graph. The payload is submitted with a unique ID. The runner
then waits for the output payload of the graph matching the submitted ID. The
purpose of this runner is to provide the look-and-feel of a blocking function call.
A complete standalone example of Xstream is here: