# This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Library General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # OlaClient.py # Copyright (C) 2005-2009 Simon Newton """The client used to communicate with the Ola Server.""" __author__ = 'nomis52@gmail.com (Simon Newton)' import array from ola.rpc.StreamRpcChannel import StreamRpcChannel from ola.rpc.SimpleRpcController import SimpleRpcController from ola import Ola_pb2 """The port that the OLA server listens on.""" OLA_PORT = 9010 class Plugin(object): """Represents a plugin. Attributes: id: the id of this plugin name: the name of this plugin description: the description of this plugin """ def __init__(self, plugin_id, name, description): self.id = plugin_id self.name = name self.description = description def __cmp__(self, other): return cmp(self.id, other.id) # Populate the Plugin class attributes from the protobuf for value in Ola_pb2._PLUGINIDS.values: setattr(Plugin, value.name, value.number) class Device(object): """Represents a device. Attributes: id: the unique id of this device alias: the integer alias for this device name: the name of this device plugin_id: the plugin that this device belongs to input_ports: a list of Input Port objects output_ports: a list of Output Port objects """ def __init__(self, device_id, alias, name, plugin_id, input_ports, output_ports): self.id = device_id self.alias = alias self.name = name self.plugin_id = plugin_id self.input_ports = sorted(input_ports) self.output_ports = sorted(output_ports) def __cmp__(self, other): return cmp(self.alias, other.alias) class Port(object): """Represents a port. Attributes: id: the unique id of this port universe: the universe that this port belongs to active: True if this port is active description: the description of the port """ def __init__(self, port_id, universe, active, description): self.id = port_id self.universe = universe self.active = active self.description = description def __cmp__(self, other): return cmp(self.id, other.id) class Universe(object): """Represents a universe. Attributes: id: the integer universe id name: the name of this universe merge_mode: the merge mode this universe is using """ LTP = Ola_pb2.LTP HTP = Ola_pb2.HTP def __init__(self, universe_id, name, merge_mode): self.id = universe_id self.name = name self.merge_mode = merge_mode def __cmp__(self, other): return cmp(self.id, other.id) class RequestStatus(object): """Represents the status of an reqeust. Attributes: state: the state of the operation message: an error message if it failed """ SUCCESS, FAILED, CANCELLED = range(3) def __init__(self, state=SUCCESS, message=None): self.state = state self.message = message def Succeeded(self): """Returns true if this request succeeded.""" return self.state == self.SUCCESS class OlaClient(Ola_pb2.OlaClientService): """The client used to communicate with olad.""" def __init__(self, socket): """Create a new client. Args: socket: the socket to use for communications. """ self._socket = socket self._channel = StreamRpcChannel(socket, self) self._stub = Ola_pb2.OlaServerService_Stub(self._channel) self._universe_callbacks = {} def SocketReady(self): """Called when the socket has new data.""" self._channel.SocketReady() def FetchPlugins(self, callback, plugin_filter=Plugin.OLA_PLUGIN_ALL, include_description=False): """Fetch the list of plugins. Args: callback: the function to call once complete, takes two arguments, a RequestStatus object and a list of Plugin objects filter: the id of the plugin if you want to filter the results include_description: whether to include the plugin description or not """ controller = SimpleRpcController() request = Ola_pb2.PluginInfoRequest() request.plugin_id = plugin_filter request.include_description = include_description done = lambda x, y: self._PluginInfoComplete(callback, x, y) self._stub.GetPluginInfo(controller, request, done) def FetchDevices(self, callback, plugin_filter=Plugin.OLA_PLUGIN_ALL): """Fetch a list of devices from the server. Args: callback: The function to call once complete, takes two arguments, a RequestStatus object and a list of Device objects. filter: a plugin id to filter by """ controller = SimpleRpcController() request = Ola_pb2.DeviceInfoRequest() request.plugin_id = plugin_filter done = lambda x, y: self._DeviceInfoComplete(callback, x, y) self._stub.GetDeviceInfo(controller, request, done) def FetchUniverses(self, callback): """Fetch a list of universes from the server Args: callback: The function to call once complete, takes two arguments, a RequestStatus object and a list of Universe objects. """ controller = SimpleRpcController() request = Ola_pb2.UniverseInfoRequest() done = lambda x, y: self._UniverseInfoComplete(callback, x, y) self._stub.GetUniverseInfo(controller, request, done) def FetchDmx(self, universe, callback): """Fetch a list of universes from the server Args: universe: the universe to fetch the data for callback: The function to call once complete, takes three arguments, a RequestStatus object, a universe number and a list of dmx data. """ controller = SimpleRpcController() request = Ola_pb2.UniverseInfoRequest() request.universe = universe done = lambda x, y: self._GetDmxComplete(callback, x, y) self._stub.GetDmx(controller, request, done) def SendDmx(self, universe, data, callback=None): """Send DMX data to the server Args: universe: the universe to fetch the data for data: An array object with the DMX data callback: The function to call once complete, takes one argument, a RequestStatus object. """ controller = SimpleRpcController() request = Ola_pb2.DmxData() request.universe = universe request.data = data.tostring() done = lambda x, y: self._AckMessageComplete(callback, x, y) self._stub.UpdateDmxData(controller, request, done) def SetUniverseName(self, universe, name, callback=None): """Set the name of a universe. Args: universe: the universe to set the name of name: the new name for the universe callback: The function to call once complete, takes one argument, a RequestStatus object. """ controller = SimpleRpcController() request = Ola_pb2.UniverseNameRequest() request.universe = universe request.name = name done = lambda x, y: self._AckMessageComplete(callback, x, y) self._stub.SetUniverseName(controller, request, done) def SetUniverseMergeMode(self, universe, merge_mode, callback=None): """Set the merge_mode of a universe. Args: universe: the universe to set the name of merge_mode: either Universe.HTP or Universe.LTP callback: The function to call once complete, takes one argument, a RequestStatus object. """ controller = SimpleRpcController() request = Ola_pb2.MergeModeRequest() request.universe = universe request.merge_mode = merge_mode done = lambda x, y: self._AckMessageComplete(callback, x, y) self._stub.SetMergeMode(controller, request, done) def RegisterUniverse(self, universe, action, data_callback, callback=None): """Register to receive dmx updates for a universe. Args: universe: the universe to set the name of action: OlaClient.REGISTER or OlaClient.UNREGISTER data_callback: the function to be called when there is new data, passed a single argument of type array. callback: The function to call once complete, takes one argument, a RequestStatus object. """ controller = SimpleRpcController() request = Ola_pb2.RegisterDmxRequest() request.universe = universe request.action = action done = lambda x, y: self._AckMessageComplete(callback, x, y) self._stub.RegisterForDmx(controller, request, done) if action == self.PATCH: self._universe_callbacks[universe] = data_callback elif universe in self._universe_callbacks: del self._universe_callbacks[universe] def PatchPort(self, device_alias, port, action, universe, callback=None): """Patch a port to a universe. Args: device_alias: the alias of the device to configure port: the id of the port action: OlaClient.PATCH or OlcClient.UNPATCH universe: the universe to set the name of callback: The function to call once complete, takes one argument, a RequestStatus object. """ controller = SimpleRpcController() request = Ola_pb2.PatchPortRequest() request.device_alias = device_alias request.port_id = port request.action = action request.universe = universe done = lambda x, y: self._AckMessageComplete(callback, x, y) self._stub.PatchPort(controller, request, done) def ConfigureDevice(self, device_alias, request_data, callback): """Send a device config request. Args: device_alias: the alias of the device to configure request_data: the request to send to the device callback: The function to call once complete, takes two arguments, a RequestStatus object and a response. """ controller = SimpleRpcController() request = Ola_pb2.DeviceConfigRequest() request.device_alias = device_alias request.data = request_data done = lambda x, y: self._ConfigureDeviceComplete(callback, x, y) self._stub.ConfigureDevice(controller, request, done) def UpdateDmxData(self, controller, request, callback): """Called when we receive new DMX data. Args: controller: An RpcController object reqeust: A DmxData message callback: The callback to run once complete """ if request.universe in self._universe_callbacks: data = array.array('B') data.fromstring(request.data) self._universe_callbacks[request.universe](data) response = Ola_pb2.Ack() callback(response) def _CreateStateFromController(self, controller): """Return a Status object given a RpcController object. Args: controller: An RpcController object. Returns: A RequestStatus object. """ if controller.Failed(): return RequestStatus(RequestStatus.FAILED, controller.ErrorText()) elif controller.IsCanceled(): return RequestStatus(RequestStatus.CANCELLED, controller.ErrorText()) else: return RequestStatus() def _PluginInfoComplete(self, callback, controller, response): """Called when the list of plugins is returned. Args: callback: the callback to run controller: an RpcController response: a PluginInfoReply message. """ if not callback: return status = self._CreateStateFromController(controller) if not status.Succeeded(): return plugins = [Plugin(p.plugin_id, p.name, p.description) for p in response.plugin] callback(status, plugins) def _DeviceInfoComplete(self, callback, controller, response): """Called when the Device info request returns. Args: callback: the callback to run controller: an RpcController response: a DeviceInfoReply message. """ if not callback: return status = self._CreateStateFromController(controller) if not status.Succeeded(): return devices = [] for device in response.device: input_ports = [] output_ports = [] for port in device.input_port: input_ports.append(Port(port.port_id, port.universe, port.active, port.description)) for port in device.output_port: output_ports.append(Port(port.port_id, port.universe, port.active, port.description)) devices.append(Device(device.device_id, device.device_alias, device.device_name, device.plugin_id, input_ports, output_ports)) callback(status, devices) def _UniverseInfoComplete(self, callback, controller, response): """Called when the Universe info request returns. Args: callback: the callback to run controller: an RpcController response: a UniverseInfoReply message. """ if not callback: return status = self._CreateStateFromController(controller) if not status.Succeeded(): return universes = [Universe(u.universe, u.name, u.merge_mode) for u in response.universe] callback(status, universes) def _GetDmxComplete(self, callback, controller, response): """Called when the Universe info request returns. Args: callback: the callback to run controller: an RpcController response: a UniverseInfoReply message. """ if not callback: return status = self._CreateStateFromController(controller) if not status.Succeeded(): return data = array.array('B') data.fromstring(response.data) callback(status, response.universe, data) def _AckMessageComplete(self, callback, controller, response): """Called when an rpc that returns an Ack completes. Args: callback: the callback to run controller: an RpcController response: an Ack message. """ if not callback: return status = self._CreateStateFromController(controller) if not status.Succeeded(): return callback(status) def _ConfigureDeviceComplete(self, callback, controller, response): """Called when a ConfigureDevice request completes. Args: callback: the callback to run controller: an RpcController response: an DeviceConfigReply message. """ if not callback: return status = self._CreateStateFromController(controller) if not status.Succeeded(): return callback(status, response.data) # Populate the patch & register actions for value in Ola_pb2._PATCHACTION.values: setattr(OlaClient, value.name, value.number) for value in Ola_pb2._REGISTERACTION.values: setattr(OlaClient, value.name, value.number)