Top

forestbus.forest module

The forest module provides an easy to use Python client API for interacting with a Forest Bus cluster.

"""
The forest module provides an easy to use Python client API for interacting with a Forest Bus cluster.
"""

import cbor_rpc
import socket
import threading

ErrNoNodesAvailable = Exception ("No nodes available")
""" ErrNoNodesAvailable is thrown when the Client is not able to fulfill the request against any of the nodes in the cluster. """

ErrClusterIDMismatch = Exception ("ClusterID given doesn't match that used by the node")
""" ErrClusterIDMismatch is thrown if the nodes in the cluster have been configured with a different id to the one given when the Client is created. """

class Client:
	""" 
	The Client class provides a multi-thread safe connection to a Forest Bus cluster.  This class will establish connections to the nodes in the cluster as required.
	"""
	def __init__ (self, clusterID, nodes):
		"""
		Creates a new Client instance.  The clusterID will be checked against the -id that the Forest Bus nodes were started with.

		The nodes parameter is a list of strings in the format hostname:port and should match the -cbor parameter given when the nodes where started.
		"""
		self._clusterID = clusterID
		self._nodes = nodes

		self._connections = {}
		self._topic_leaders = {}
		self._lock = threading.Lock()
		self._lastUsedGetNodeName = ""

	def GetMessages (self, topic, index, quantity, wait):
		"""
		GetMessages takes the topic (a string), the index (a 64 bit integer), target quantity (integer) and a wait flag (boolean).  If wait is True then the method will block until new messages become available.

		GetMessages returns a tuple of the received messages (a list of sequence of bytes) and the index of the next message in this topic sequence.

		GetMessages will usually return more or fewer messages than the quantity requested.  This ensures effeicient message retrieval from the node as messages are aligned to offset and cache boundaries.  If any messages are available at the requested index then at least one message will be returned.

		If the index requested is no longer available on this node (i.e. clean-up has removed old data) then zero messages will be returned and the nextID will be the index of the first available message.

		If the messages returned bring the client up to the end of the available messages, the nextID will contain the index of what will become the next message when it has been sent.  By setting wait to True and passing in the index returned by nextID, GetMessages will block until at least one new message is available, before returning that message/messages.


		GetMessages throws Exceptions if no nodes are available, a network error occurs or the topic could not be found.  Even if an exception has been thrown - Client.Close() must be called to close down all threads.
		"""
		factory = self._rpcConnect()
		while (True):
			rpcClient = factory()
			if rpcClient is None:
				raise ErrNoNodesAvailable
			result, error = rpcClient.Call("RPCHandler.ReceiveMessages", {"ClusterID": self._clusterID, "Topic": topic, "ID": index, "Quantity": quantity, "WaitForMessages": wait})
			if error != "":
				raise Exception (error)

			return (result["ReceivedMessages"], result["NextID"])

	def GetTopicMaxAvailableIndex (self, topic):
		"""
		GetTopicMaxAvailableIndex returns the maximum available index from the currently connected node for the given topic.

		If the cluster has been completely shutdown and restarted (rather than a rolling restart of individual nodes) then the commit index may be zero, in which case the maxAvailableIndex will be zero.  Once a message has been sent to the cluster in this topic the commit index will be recalculated and the maximum commit index will return as normal.

		GetTopicMaxAvailableIndex throws Exceptions if no nodes are available, a network error occurs or the topic could not be found.  Even if an exception has been thrown - Client.Close() must be called to close down all threads.
		"""
		factory = self._rpcConnect()
		while (True):
			rpcClient = factory()
			if rpcClient is None:
				raise ErrNoNodesAvailable
			result, error = rpcClient.Call("RPCHandler.GetTopicDetails", {"Topic": topic})
			if error != "":
				self._removeBrokenConnection (rpcClient)
				raise Exception (error)

			return result["CommitIndex"]

	def SendMessages (self, topic, messages, waitForCommit):
		"""
		SendMessages sends a batch of messages to the Forest Bus cluster.

		Messages are a list of sequences of bytes.  Sending many messages (hundreds) at once gives better through-put than sending individual messages.

		If waitForCommit is false then SendMessages will return as soon as the message has been saved on the leader node for this topic.  If waitForCommit is true then SendMessages will only return once the messages have been replicated to a majority of the nodes in the cluster and are therefore committed.

		SendMessages throws Exceptions if no leader node is available, a network error occurs or the topic could not be found.  Even if an exception has been thrown - Client.Close() must be called to close down all threads.
		"""
		factory = self._rpcConnect(topic)
		while (True):
			rpcClient = factory()
			if rpcClient is None:
				raise ErrNoNodesAvailable
			result, error = rpcClient.Call("RPCHandler.SendMessages", {"Topic": topic, "SentMessages": messages, "WaitForCommit": waitForCommit})
			if error != "":
				self._removeBrokenConnection (rpcClient)
				raise Exception (error)

			if result["Result"]["Code"] == 0:
				# Success - send it back
				return result["IDs"]

	def Close (self):
		"""
		Close shutsdown the Forest Bus Client and closes all connections to the underlying nodes.

		A closed Client can be reused - new node connections will be established as required.
		"""
		#print ("Acquiring lock in close")
		self._lock.acquire()
		#print ("Looping through connections in close")
		for nodeName in self._connections:
			#print ("Closing connection " + nodeName + " close")
			self._connections[nodeName].Close()
		self._connections = {}
		self._lock.release()


	def _rpcConnect (self, topic = None):
		# Start with the last used connection
		lastUsed = 0
		self._lock.acquire()
		if topic is not None:
			# See if we have a last seen leader
			if topic in self._topic_leaders:
				lastSeenTopicLeader = self._topic_leaders[topic]
				lastUsed = self._nodes.index (lastSeenTopicLeader)
				#print ("Last seen leader " + str (lastSeenTopicLeader) + " is index " + str (lastUsed))
			else:
				# Just use the first one
				#print ("Topic not found, using 0")
				lastUsed = 0
		elif self._lastUsedGetNodeName != "":
			lastUsed = self._nodes.index (self._lastUsedGetNodeName)
		self._lock.release()

		ournodes = []
		for node in self._nodes:
			ournodes.append (node)
		# Prioritise the first one
		#print ("Nodes before sort: " + str (ournodes) + " lastUsed: " + str (lastUsed))
		if lastUsed != 0:
			ournodes[lastUsed], ournodes[0] = ournodes[0], ournodes[lastUsed]
		#print ("Nodes after sort: " + str (ournodes))
		lastTried = [0]

		def factory ():
			while lastTried[0] < len (ournodes):
				try:
					result = self._getConnection (ournodes[lastTried[0]])
					if result != None:
						self._lock.acquire()
						if topic is not None:
							self._topic_leaders[topic] = ournodes[lastTried[0]]
						else:
							self._lastUsedGetNodeName = ournodes[lastTried[0]]
						self._lock.release()
						lastTried[0] += 1
						return result
					else:
						lastTried[0] += 1
				except Exception, e:
					if e == ErrClusterIDMismatch:
						raise e
					lastTried[0] += 1

			return None
		return factory

	def _getConnection (self, nodeName):
		self._lock.acquire()
		if nodeName in self._connections:
			#print ("Returning existing connection to " + nodeName)
			result = self._connections[nodeName]
			self._lock.release()
		else:
			try:
				#sock = socket.socket()
				address = (nodeName[:nodeName.index(":")], int (nodeName[nodeName.index(":")+1:]))
				#print ("Trying to connect to " + str(address))
				#sock.connect(address)
				sock = socket.create_connection (address)
				result = cbor_rpc.Client (sock)

				# Check cluster IDs match
				response, err = result.Call("RPCHandler.GetClusterDetails", {})
				if err == "" and response["Result"]["Code"] == 0:
					if self._clusterID != response["ClusterID"]:
						result.Close()
						raise ErrClusterIDMismatch
				self._connections[nodeName] = result
			except Exception, e:
				if e == ErrClusterIDMismatch:
					raise e
				#print ("Exception in _getConnection: " + str (e))
				result.Close()
				result = None
				del self._connections[nodeName]
			finally:
				self._lock.release()
		return result

	def _removeBrokenConnection (self, client):
		self._lock.acquire()
		for nodeName in self._connections:
			nodeClient = self._connections[nodeName]
			if nodeClient == client:
				del self._connections[nodeName]
				nodeClient.Close()
				self._lock.release()
				return
		self._lock.release()




Module variables

var ErrClusterIDMismatch

ErrClusterIDMismatch is thrown if the nodes in the cluster have been configured with a different id to the one given when the Client is created.

var ErrNoNodesAvailable

ErrNoNodesAvailable is thrown when the Client is not able to fulfill the request against any of the nodes in the cluster.

Classes

class Client

The Client class provides a multi-thread safe connection to a Forest Bus cluster. This class will establish connections to the nodes in the cluster as required.

class Client:
	""" 
	The Client class provides a multi-thread safe connection to a Forest Bus cluster.  This class will establish connections to the nodes in the cluster as required.
	"""
	def __init__ (self, clusterID, nodes):
		"""
		Creates a new Client instance.  The clusterID will be checked against the -id that the Forest Bus nodes were started with.

		The nodes parameter is a list of strings in the format hostname:port and should match the -cbor parameter given when the nodes where started.
		"""
		self._clusterID = clusterID
		self._nodes = nodes

		self._connections = {}
		self._topic_leaders = {}
		self._lock = threading.Lock()
		self._lastUsedGetNodeName = ""

	def GetMessages (self, topic, index, quantity, wait):
		"""
		GetMessages takes the topic (a string), the index (a 64 bit integer), target quantity (integer) and a wait flag (boolean).  If wait is True then the method will block until new messages become available.

		GetMessages returns a tuple of the received messages (a list of sequence of bytes) and the index of the next message in this topic sequence.

		GetMessages will usually return more or fewer messages than the quantity requested.  This ensures effeicient message retrieval from the node as messages are aligned to offset and cache boundaries.  If any messages are available at the requested index then at least one message will be returned.

		If the index requested is no longer available on this node (i.e. clean-up has removed old data) then zero messages will be returned and the nextID will be the index of the first available message.

		If the messages returned bring the client up to the end of the available messages, the nextID will contain the index of what will become the next message when it has been sent.  By setting wait to True and passing in the index returned by nextID, GetMessages will block until at least one new message is available, before returning that message/messages.


		GetMessages throws Exceptions if no nodes are available, a network error occurs or the topic could not be found.  Even if an exception has been thrown - Client.Close() must be called to close down all threads.
		"""
		factory = self._rpcConnect()
		while (True):
			rpcClient = factory()
			if rpcClient is None:
				raise ErrNoNodesAvailable
			result, error = rpcClient.Call("RPCHandler.ReceiveMessages", {"ClusterID": self._clusterID, "Topic": topic, "ID": index, "Quantity": quantity, "WaitForMessages": wait})
			if error != "":
				raise Exception (error)

			return (result["ReceivedMessages"], result["NextID"])

	def GetTopicMaxAvailableIndex (self, topic):
		"""
		GetTopicMaxAvailableIndex returns the maximum available index from the currently connected node for the given topic.

		If the cluster has been completely shutdown and restarted (rather than a rolling restart of individual nodes) then the commit index may be zero, in which case the maxAvailableIndex will be zero.  Once a message has been sent to the cluster in this topic the commit index will be recalculated and the maximum commit index will return as normal.

		GetTopicMaxAvailableIndex throws Exceptions if no nodes are available, a network error occurs or the topic could not be found.  Even if an exception has been thrown - Client.Close() must be called to close down all threads.
		"""
		factory = self._rpcConnect()
		while (True):
			rpcClient = factory()
			if rpcClient is None:
				raise ErrNoNodesAvailable
			result, error = rpcClient.Call("RPCHandler.GetTopicDetails", {"Topic": topic})
			if error != "":
				self._removeBrokenConnection (rpcClient)
				raise Exception (error)

			return result["CommitIndex"]

	def SendMessages (self, topic, messages, waitForCommit):
		"""
		SendMessages sends a batch of messages to the Forest Bus cluster.

		Messages are a list of sequences of bytes.  Sending many messages (hundreds) at once gives better through-put than sending individual messages.

		If waitForCommit is false then SendMessages will return as soon as the message has been saved on the leader node for this topic.  If waitForCommit is true then SendMessages will only return once the messages have been replicated to a majority of the nodes in the cluster and are therefore committed.

		SendMessages throws Exceptions if no leader node is available, a network error occurs or the topic could not be found.  Even if an exception has been thrown - Client.Close() must be called to close down all threads.
		"""
		factory = self._rpcConnect(topic)
		while (True):
			rpcClient = factory()
			if rpcClient is None:
				raise ErrNoNodesAvailable
			result, error = rpcClient.Call("RPCHandler.SendMessages", {"Topic": topic, "SentMessages": messages, "WaitForCommit": waitForCommit})
			if error != "":
				self._removeBrokenConnection (rpcClient)
				raise Exception (error)

			if result["Result"]["Code"] == 0:
				# Success - send it back
				return result["IDs"]

	def Close (self):
		"""
		Close shutsdown the Forest Bus Client and closes all connections to the underlying nodes.

		A closed Client can be reused - new node connections will be established as required.
		"""
		#print ("Acquiring lock in close")
		self._lock.acquire()
		#print ("Looping through connections in close")
		for nodeName in self._connections:
			#print ("Closing connection " + nodeName + " close")
			self._connections[nodeName].Close()
		self._connections = {}
		self._lock.release()


	def _rpcConnect (self, topic = None):
		# Start with the last used connection
		lastUsed = 0
		self._lock.acquire()
		if topic is not None:
			# See if we have a last seen leader
			if topic in self._topic_leaders:
				lastSeenTopicLeader = self._topic_leaders[topic]
				lastUsed = self._nodes.index (lastSeenTopicLeader)
				#print ("Last seen leader " + str (lastSeenTopicLeader) + " is index " + str (lastUsed))
			else:
				# Just use the first one
				#print ("Topic not found, using 0")
				lastUsed = 0
		elif self._lastUsedGetNodeName != "":
			lastUsed = self._nodes.index (self._lastUsedGetNodeName)
		self._lock.release()

		ournodes = []
		for node in self._nodes:
			ournodes.append (node)
		# Prioritise the first one
		#print ("Nodes before sort: " + str (ournodes) + " lastUsed: " + str (lastUsed))
		if lastUsed != 0:
			ournodes[lastUsed], ournodes[0] = ournodes[0], ournodes[lastUsed]
		#print ("Nodes after sort: " + str (ournodes))
		lastTried = [0]

		def factory ():
			while lastTried[0] < len (ournodes):
				try:
					result = self._getConnection (ournodes[lastTried[0]])
					if result != None:
						self._lock.acquire()
						if topic is not None:
							self._topic_leaders[topic] = ournodes[lastTried[0]]
						else:
							self._lastUsedGetNodeName = ournodes[lastTried[0]]
						self._lock.release()
						lastTried[0] += 1
						return result
					else:
						lastTried[0] += 1
				except Exception, e:
					if e == ErrClusterIDMismatch:
						raise e
					lastTried[0] += 1

			return None
		return factory

	def _getConnection (self, nodeName):
		self._lock.acquire()
		if nodeName in self._connections:
			#print ("Returning existing connection to " + nodeName)
			result = self._connections[nodeName]
			self._lock.release()
		else:
			try:
				#sock = socket.socket()
				address = (nodeName[:nodeName.index(":")], int (nodeName[nodeName.index(":")+1:]))
				#print ("Trying to connect to " + str(address))
				#sock.connect(address)
				sock = socket.create_connection (address)
				result = cbor_rpc.Client (sock)

				# Check cluster IDs match
				response, err = result.Call("RPCHandler.GetClusterDetails", {})
				if err == "" and response["Result"]["Code"] == 0:
					if self._clusterID != response["ClusterID"]:
						result.Close()
						raise ErrClusterIDMismatch
				self._connections[nodeName] = result
			except Exception, e:
				if e == ErrClusterIDMismatch:
					raise e
				#print ("Exception in _getConnection: " + str (e))
				result.Close()
				result = None
				del self._connections[nodeName]
			finally:
				self._lock.release()
		return result

	def _removeBrokenConnection (self, client):
		self._lock.acquire()
		for nodeName in self._connections:
			nodeClient = self._connections[nodeName]
			if nodeClient == client:
				del self._connections[nodeName]
				nodeClient.Close()
				self._lock.release()
				return
		self._lock.release()

Ancestors (in MRO)

Methods

def __init__(

self, clusterID, nodes)

Creates a new Client instance. The clusterID will be checked against the -id that the Forest Bus nodes were started with.

The nodes parameter is a list of strings in the format hostname:port and should match the -cbor parameter given when the nodes where started.

def __init__ (self, clusterID, nodes):
	"""
	Creates a new Client instance.  The clusterID will be checked against the -id that the Forest Bus nodes were started with.
	The nodes parameter is a list of strings in the format hostname:port and should match the -cbor parameter given when the nodes where started.
	"""
	self._clusterID = clusterID
	self._nodes = nodes
	self._connections = {}
	self._topic_leaders = {}
	self._lock = threading.Lock()
	self._lastUsedGetNodeName = ""

def Close(

self)

Close shutsdown the Forest Bus Client and closes all connections to the underlying nodes.

A closed Client can be reused - new node connections will be established as required.

def Close (self):
	"""
	Close shutsdown the Forest Bus Client and closes all connections to the underlying nodes.
	A closed Client can be reused - new node connections will be established as required.
	"""
	#print ("Acquiring lock in close")
	self._lock.acquire()
	#print ("Looping through connections in close")
	for nodeName in self._connections:
		#print ("Closing connection " + nodeName + " close")
		self._connections[nodeName].Close()
	self._connections = {}
	self._lock.release()

def GetMessages(

self, topic, index, quantity, wait)

GetMessages takes the topic (a string), the index (a 64 bit integer), target quantity (integer) and a wait flag (boolean). If wait is True then the method will block until new messages become available.

GetMessages returns a tuple of the received messages (a list of sequence of bytes) and the index of the next message in this topic sequence.

GetMessages will usually return more or fewer messages than the quantity requested. This ensures effeicient message retrieval from the node as messages are aligned to offset and cache boundaries. If any messages are available at the requested index then at least one message will be returned.

If the index requested is no longer available on this node (i.e. clean-up has removed old data) then zero messages will be returned and the nextID will be the index of the first available message.

If the messages returned bring the client up to the end of the available messages, the nextID will contain the index of what will become the next message when it has been sent. By setting wait to True and passing in the index returned by nextID, GetMessages will block until at least one new message is available, before returning that message/messages.

GetMessages throws Exceptions if no nodes are available, a network error occurs or the topic could not be found. Even if an exception has been thrown - Client.Close() must be called to close down all threads.

def GetMessages (self, topic, index, quantity, wait):
	"""
	GetMessages takes the topic (a string), the index (a 64 bit integer), target quantity (integer) and a wait flag (boolean).  If wait is True then the method will block until new messages become available.
	GetMessages returns a tuple of the received messages (a list of sequence of bytes) and the index of the next message in this topic sequence.
	GetMessages will usually return more or fewer messages than the quantity requested.  This ensures effeicient message retrieval from the node as messages are aligned to offset and cache boundaries.  If any messages are available at the requested index then at least one message will be returned.
	If the index requested is no longer available on this node (i.e. clean-up has removed old data) then zero messages will be returned and the nextID will be the index of the first available message.
	If the messages returned bring the client up to the end of the available messages, the nextID will contain the index of what will become the next message when it has been sent.  By setting wait to True and passing in the index returned by nextID, GetMessages will block until at least one new message is available, before returning that message/messages.
	GetMessages throws Exceptions if no nodes are available, a network error occurs or the topic could not be found.  Even if an exception has been thrown - Client.Close() must be called to close down all threads.
	"""
	factory = self._rpcConnect()
	while (True):
		rpcClient = factory()
		if rpcClient is None:
			raise ErrNoNodesAvailable
		result, error = rpcClient.Call("RPCHandler.ReceiveMessages", {"ClusterID": self._clusterID, "Topic": topic, "ID": index, "Quantity": quantity, "WaitForMessages": wait})
		if error != "":
			raise Exception (error)
		return (result["ReceivedMessages"], result["NextID"])

def GetTopicMaxAvailableIndex(

self, topic)

GetTopicMaxAvailableIndex returns the maximum available index from the currently connected node for the given topic.

If the cluster has been completely shutdown and restarted (rather than a rolling restart of individual nodes) then the commit index may be zero, in which case the maxAvailableIndex will be zero. Once a message has been sent to the cluster in this topic the commit index will be recalculated and the maximum commit index will return as normal.

GetTopicMaxAvailableIndex throws Exceptions if no nodes are available, a network error occurs or the topic could not be found. Even if an exception has been thrown - Client.Close() must be called to close down all threads.

def GetTopicMaxAvailableIndex (self, topic):
	"""
	GetTopicMaxAvailableIndex returns the maximum available index from the currently connected node for the given topic.
	If the cluster has been completely shutdown and restarted (rather than a rolling restart of individual nodes) then the commit index may be zero, in which case the maxAvailableIndex will be zero.  Once a message has been sent to the cluster in this topic the commit index will be recalculated and the maximum commit index will return as normal.
	GetTopicMaxAvailableIndex throws Exceptions if no nodes are available, a network error occurs or the topic could not be found.  Even if an exception has been thrown - Client.Close() must be called to close down all threads.
	"""
	factory = self._rpcConnect()
	while (True):
		rpcClient = factory()
		if rpcClient is None:
			raise ErrNoNodesAvailable
		result, error = rpcClient.Call("RPCHandler.GetTopicDetails", {"Topic": topic})
		if error != "":
			self._removeBrokenConnection (rpcClient)
			raise Exception (error)
		return result["CommitIndex"]

def SendMessages(

self, topic, messages, waitForCommit)

SendMessages sends a batch of messages to the Forest Bus cluster.

Messages are a list of sequences of bytes. Sending many messages (hundreds) at once gives better through-put than sending individual messages.

If waitForCommit is false then SendMessages will return as soon as the message has been saved on the leader node for this topic. If waitForCommit is true then SendMessages will only return once the messages have been replicated to a majority of the nodes in the cluster and are therefore committed.

SendMessages throws Exceptions if no leader node is available, a network error occurs or the topic could not be found. Even if an exception has been thrown - Client.Close() must be called to close down all threads.

def SendMessages (self, topic, messages, waitForCommit):
	"""
	SendMessages sends a batch of messages to the Forest Bus cluster.
	Messages are a list of sequences of bytes.  Sending many messages (hundreds) at once gives better through-put than sending individual messages.
	If waitForCommit is false then SendMessages will return as soon as the message has been saved on the leader node for this topic.  If waitForCommit is true then SendMessages will only return once the messages have been replicated to a majority of the nodes in the cluster and are therefore committed.
	SendMessages throws Exceptions if no leader node is available, a network error occurs or the topic could not be found.  Even if an exception has been thrown - Client.Close() must be called to close down all threads.
	"""
	factory = self._rpcConnect(topic)
	while (True):
		rpcClient = factory()
		if rpcClient is None:
			raise ErrNoNodesAvailable
		result, error = rpcClient.Call("RPCHandler.SendMessages", {"Topic": topic, "SentMessages": messages, "WaitForCommit": waitForCommit})
		if error != "":
			self._removeBrokenConnection (rpcClient)
			raise Exception (error)
		if result["Result"]["Code"] == 0:
			# Success - send it back
			return result["IDs"]