syndilights/open-lighting-architecture/ola-0.8.4/python/OlaClient.py

472 lines
15 KiB
Python
Raw Normal View History

2010-10-31 17:32:25 +01:00
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
# OlaClient.py
# Copyright (C) 2005-2009 Simon Newton
"""The client used to communicate with the Ola Server."""
__author__ = 'nomis52@gmail.com (Simon Newton)'
import array
from ola.rpc.StreamRpcChannel import StreamRpcChannel
from ola.rpc.SimpleRpcController import SimpleRpcController
from ola import Ola_pb2
"""The port that the OLA server listens on."""
OLA_PORT = 9010
class Plugin(object):
"""Represents a plugin.
Attributes:
id: the id of this plugin
name: the name of this plugin
description: the description of this plugin
"""
def __init__(self, plugin_id, name, description):
self.id = plugin_id
self.name = name
self.description = description
def __cmp__(self, other):
return cmp(self.id, other.id)
# Populate the Plugin class attributes from the protobuf
for value in Ola_pb2._PLUGINIDS.values:
setattr(Plugin, value.name, value.number)
class Device(object):
"""Represents a device.
Attributes:
id: the unique id of this device
alias: the integer alias for this device
name: the name of this device
plugin_id: the plugin that this device belongs to
input_ports: a list of Input Port objects
output_ports: a list of Output Port objects
"""
def __init__(self, device_id, alias, name, plugin_id, input_ports,
output_ports):
self.id = device_id
self.alias = alias
self.name = name
self.plugin_id = plugin_id
self.input_ports = sorted(input_ports)
self.output_ports = sorted(output_ports)
def __cmp__(self, other):
return cmp(self.alias, other.alias)
class Port(object):
"""Represents a port.
Attributes:
id: the unique id of this port
universe: the universe that this port belongs to
active: True if this port is active
description: the description of the port
"""
def __init__(self, port_id, universe, active, description):
self.id = port_id
self.universe = universe
self.active = active
self.description = description
def __cmp__(self, other):
return cmp(self.id, other.id)
class Universe(object):
"""Represents a universe.
Attributes:
id: the integer universe id
name: the name of this universe
merge_mode: the merge mode this universe is using
"""
LTP = Ola_pb2.LTP
HTP = Ola_pb2.HTP
def __init__(self, universe_id, name, merge_mode):
self.id = universe_id
self.name = name
self.merge_mode = merge_mode
def __cmp__(self, other):
return cmp(self.id, other.id)
class RequestStatus(object):
"""Represents the status of an reqeust.
Attributes:
state: the state of the operation
message: an error message if it failed
"""
SUCCESS, FAILED, CANCELLED = range(3)
def __init__(self, state=SUCCESS, message=None):
self.state = state
self.message = message
def Succeeded(self):
"""Returns true if this request succeeded."""
return self.state == self.SUCCESS
class OlaClient(Ola_pb2.OlaClientService):
"""The client used to communicate with olad."""
def __init__(self, socket):
"""Create a new client.
Args:
socket: the socket to use for communications.
"""
self._socket = socket
self._channel = StreamRpcChannel(socket, self)
self._stub = Ola_pb2.OlaServerService_Stub(self._channel)
self._universe_callbacks = {}
def SocketReady(self):
"""Called when the socket has new data."""
self._channel.SocketReady()
def FetchPlugins(self, callback,
plugin_filter=Plugin.OLA_PLUGIN_ALL,
include_description=False):
"""Fetch the list of plugins.
Args:
callback: the function to call once complete, takes two arguments, a
RequestStatus object and a list of Plugin objects
filter: the id of the plugin if you want to filter the results
include_description: whether to include the plugin description or not
"""
controller = SimpleRpcController()
request = Ola_pb2.PluginInfoRequest()
request.plugin_id = plugin_filter
request.include_description = include_description
done = lambda x, y: self._PluginInfoComplete(callback, x, y)
self._stub.GetPluginInfo(controller, request, done)
def FetchDevices(self, callback, plugin_filter=Plugin.OLA_PLUGIN_ALL):
"""Fetch a list of devices from the server.
Args:
callback: The function to call once complete, takes two arguments, a
RequestStatus object and a list of Device objects.
filter: a plugin id to filter by
"""
controller = SimpleRpcController()
request = Ola_pb2.DeviceInfoRequest()
request.plugin_id = plugin_filter
done = lambda x, y: self._DeviceInfoComplete(callback, x, y)
self._stub.GetDeviceInfo(controller, request, done)
def FetchUniverses(self, callback):
"""Fetch a list of universes from the server
Args:
callback: The function to call once complete, takes two arguments, a
RequestStatus object and a list of Universe objects.
"""
controller = SimpleRpcController()
request = Ola_pb2.UniverseInfoRequest()
done = lambda x, y: self._UniverseInfoComplete(callback, x, y)
self._stub.GetUniverseInfo(controller, request, done)
def FetchDmx(self, universe, callback):
"""Fetch a list of universes from the server
Args:
universe: the universe to fetch the data for
callback: The function to call once complete, takes three arguments, a
RequestStatus object, a universe number and a list of dmx data.
"""
controller = SimpleRpcController()
request = Ola_pb2.UniverseInfoRequest()
request.universe = universe
done = lambda x, y: self._GetDmxComplete(callback, x, y)
self._stub.GetDmx(controller, request, done)
def SendDmx(self, universe, data, callback=None):
"""Send DMX data to the server
Args:
universe: the universe to fetch the data for
data: An array object with the DMX data
callback: The function to call once complete, takes one argument, a
RequestStatus object.
"""
controller = SimpleRpcController()
request = Ola_pb2.DmxData()
request.universe = universe
request.data = data.tostring()
done = lambda x, y: self._AckMessageComplete(callback, x, y)
self._stub.UpdateDmxData(controller, request, done)
def SetUniverseName(self, universe, name, callback=None):
"""Set the name of a universe.
Args:
universe: the universe to set the name of
name: the new name for the universe
callback: The function to call once complete, takes one argument, a
RequestStatus object.
"""
controller = SimpleRpcController()
request = Ola_pb2.UniverseNameRequest()
request.universe = universe
request.name = name
done = lambda x, y: self._AckMessageComplete(callback, x, y)
self._stub.SetUniverseName(controller, request, done)
def SetUniverseMergeMode(self, universe, merge_mode, callback=None):
"""Set the merge_mode of a universe.
Args:
universe: the universe to set the name of
merge_mode: either Universe.HTP or Universe.LTP
callback: The function to call once complete, takes one argument, a
RequestStatus object.
"""
controller = SimpleRpcController()
request = Ola_pb2.MergeModeRequest()
request.universe = universe
request.merge_mode = merge_mode
done = lambda x, y: self._AckMessageComplete(callback, x, y)
self._stub.SetMergeMode(controller, request, done)
def RegisterUniverse(self, universe, action, data_callback, callback=None):
"""Register to receive dmx updates for a universe.
Args:
universe: the universe to set the name of
action: OlaClient.REGISTER or OlaClient.UNREGISTER
data_callback: the function to be called when there is new data, passed
a single argument of type array.
callback: The function to call once complete, takes one argument, a
RequestStatus object.
"""
controller = SimpleRpcController()
request = Ola_pb2.RegisterDmxRequest()
request.universe = universe
request.action = action
done = lambda x, y: self._AckMessageComplete(callback, x, y)
self._stub.RegisterForDmx(controller, request, done)
if action == self.PATCH:
self._universe_callbacks[universe] = data_callback
elif universe in self._universe_callbacks:
del self._universe_callbacks[universe]
def PatchPort(self, device_alias, port, action, universe, callback=None):
"""Patch a port to a universe.
Args:
device_alias: the alias of the device to configure
port: the id of the port
action: OlaClient.PATCH or OlcClient.UNPATCH
universe: the universe to set the name of
callback: The function to call once complete, takes one argument, a
RequestStatus object.
"""
controller = SimpleRpcController()
request = Ola_pb2.PatchPortRequest()
request.device_alias = device_alias
request.port_id = port
request.action = action
request.universe = universe
done = lambda x, y: self._AckMessageComplete(callback, x, y)
self._stub.PatchPort(controller, request, done)
def ConfigureDevice(self, device_alias, request_data, callback):
"""Send a device config request.
Args:
device_alias: the alias of the device to configure
request_data: the request to send to the device
callback: The function to call once complete, takes two arguments, a
RequestStatus object and a response.
"""
controller = SimpleRpcController()
request = Ola_pb2.DeviceConfigRequest()
request.device_alias = device_alias
request.data = request_data
done = lambda x, y: self._ConfigureDeviceComplete(callback, x, y)
self._stub.ConfigureDevice(controller, request, done)
def UpdateDmxData(self, controller, request, callback):
"""Called when we receive new DMX data.
Args:
controller: An RpcController object
reqeust: A DmxData message
callback: The callback to run once complete
"""
if request.universe in self._universe_callbacks:
data = array.array('B')
data.fromstring(request.data)
self._universe_callbacks[request.universe](data)
response = Ola_pb2.Ack()
callback(response)
def _CreateStateFromController(self, controller):
"""Return a Status object given a RpcController object.
Args:
controller: An RpcController object.
Returns:
A RequestStatus object.
"""
if controller.Failed():
return RequestStatus(RequestStatus.FAILED, controller.ErrorText())
elif controller.IsCanceled():
return RequestStatus(RequestStatus.CANCELLED, controller.ErrorText())
else:
return RequestStatus()
def _PluginInfoComplete(self, callback, controller, response):
"""Called when the list of plugins is returned.
Args:
callback: the callback to run
controller: an RpcController
response: a PluginInfoReply message.
"""
if not callback:
return
status = self._CreateStateFromController(controller)
if not status.Succeeded():
return
plugins = [Plugin(p.plugin_id, p.name, p.description) for p in
response.plugin]
callback(status, plugins)
def _DeviceInfoComplete(self, callback, controller, response):
"""Called when the Device info request returns.
Args:
callback: the callback to run
controller: an RpcController
response: a DeviceInfoReply message.
"""
if not callback:
return
status = self._CreateStateFromController(controller)
if not status.Succeeded():
return
devices = []
for device in response.device:
input_ports = []
output_ports = []
for port in device.input_port:
input_ports.append(Port(port.port_id,
port.universe,
port.active,
port.description))
for port in device.output_port:
output_ports.append(Port(port.port_id,
port.universe,
port.active,
port.description))
devices.append(Device(device.device_id,
device.device_alias,
device.device_name,
device.plugin_id,
input_ports,
output_ports))
callback(status, devices)
def _UniverseInfoComplete(self, callback, controller, response):
"""Called when the Universe info request returns.
Args:
callback: the callback to run
controller: an RpcController
response: a UniverseInfoReply message.
"""
if not callback:
return
status = self._CreateStateFromController(controller)
if not status.Succeeded():
return
universes = [Universe(u.universe, u.name, u.merge_mode) for u in
response.universe]
callback(status, universes)
def _GetDmxComplete(self, callback, controller, response):
"""Called when the Universe info request returns.
Args:
callback: the callback to run
controller: an RpcController
response: a UniverseInfoReply message.
"""
if not callback:
return
status = self._CreateStateFromController(controller)
if not status.Succeeded():
return
data = array.array('B')
data.fromstring(response.data)
callback(status, response.universe, data)
def _AckMessageComplete(self, callback, controller, response):
"""Called when an rpc that returns an Ack completes.
Args:
callback: the callback to run
controller: an RpcController
response: an Ack message.
"""
if not callback:
return
status = self._CreateStateFromController(controller)
if not status.Succeeded():
return
callback(status)
def _ConfigureDeviceComplete(self, callback, controller, response):
"""Called when a ConfigureDevice request completes.
Args:
callback: the callback to run
controller: an RpcController
response: an DeviceConfigReply message.
"""
if not callback:
return
status = self._CreateStateFromController(controller)
if not status.Succeeded():
return
callback(status, response.data)
# Populate the patch & register actions
for value in Ola_pb2._PATCHACTION.values:
setattr(OlaClient, value.name, value.number)
for value in Ola_pb2._REGISTERACTION.values:
setattr(OlaClient, value.name, value.number)