Join the MaximusDevs community and get the full member experience.
Join For Free
Last time we talked about implementation of functional queues with Groovy++ Today we will use these queues to implement several algorithms for processing of asynchronious messages. You can find source code and more examples in the Groovy++ distro
What we want to do is to implement simplified actor, the object which sequentially process asynchroniously coming messages. There are two types of actors
- thread bound actor, which is the one having dedicated message processing thread. The thread is blocked if no messages are available
- pooled actor, which is executed on some thread pool. The beauty of pooled actor is that it does not consume any resources at all if there is no messages to process
We will try to use the same approach based on our functional queues to implement both.
Let us start with interface definition for message channel
@Trait abstract class MessageChannel{ abstract MessageChannel post (T message) MessageChannel leftShift (T msg) { post msg } }
@Trait annotation is Groovy++ way to define interface with default implementationof some methods. Each class implementing such interface will inherit default implementation if the method is not implemented by the class or superclass
We use Groovy++ trait here not because it is necessary for our sample but because the sample is real code from Groovy++ runtime.
Note that message channel has nothing to do with concurrency – we can implement method post in whatever way we like. Just for fun we can do following Multiplexor class, which immidiately redistribute all incoming messages to all subscribed channels.
class Multiplexorimplements MessageChannel { private volatile FList > listeners = FList.emptyList Multiplexor subscribe(MessageChannel channel) { for (;;) { def l = listeners if (listeners.compareAndSet(l, l + channel)) return this } } Multiplexor subscribe(MessageChannel ... channels) { for (c in channels) { subscribe(c) } this } Multiplexor unsubscribe(MessageChannel channel) { for (;;) { def l = listeners if (listeners.compareAndSet(l, l - channel)) return this } } final Multiplexor post(M message) { listeners.each { channel -> channel << message } this } static Multiplexor of (MessageChannel ... channels) { new Multiplexor().subscribe(channels) } }
You may notice that it was almost trivial to allow subscribers to subscribe and unsubscribe asynchroniously using our functional lists
OK, back to our main story. Let us implement channel with asynchronious queue, which process no more than one message at any given moment.
Our idea is following:
- we use functional queue to add messages
- when we add message to the queue we signal subclass (whatever it means for subclassing algorithm)
- we introduce special state of the queue to be used by subclasses, which means that the queue is already empty but last message is not processed yet. This is probably most non-trivial part of our algorithms
Here is the implementation
abstract class QueuedChannelimplements MessageChannel { protected volatile FQueue queue = FQueue.emptyQueue protected static final FQueue busyEmptyQueue = FQueue.emptyQueue + null MessageChannel post(M message) { for (;;) { def oldQueue = queue def newQueue = (oldQueue === busyEmptyQueue ? FQueue.emptyQueue : oldQueue) + message if (queue.compareAndSet(oldQueue, newQueue)) { signalPost(oldQueue, newQueue) return this } } } protected abstract void signalPost (FQueue oldQueue, FQueue newQueue) abstract void onMessage(M message) }
Now we are ready to create our first real actor backed by Executor and scheduled for execution for each message. We call it “fair” because it does not try to take as much resources as possible but give chance to work for all it’s collegues.
Here is explaination of the algorithm
- our channel implements Runnable. That might be not perfect from OOP prospective bus save us additional object creation
- when message added to empty queue we schedule actor for execution
- if after processing of a message our queue still non-empty we schedule again
- special care taken for the case when we process last message in the queue – we have to make sure that while we are not done new messages will not schedule new execution of the actor
abstract static class FairExecutingChannelextends QueuedChannel implements Runnable { Executor executor void run () { for (;;) { def q = queue def removed = q.removeFirst() if (q.size() == 1) { if (queue.compareAndSet(q, busyEmptyQueue)) { onMessage removed.first if (!queue.compareAndSet(busyEmptyQueue, FQueue.emptyQueue)) { executor.execute this } break } } else { if (queue.compareAndSet(q, removed.second)) { onMessage removed.first executor.execute this break } } } } protected void signalPost(FQueue oldQueue, FQueue newQueue) { if (oldQueue !== busyEmptyQueue && newQueue.size() == 1) executor.execute this } }
Fair algorithm above has one downside – if processing of messages is really fast we waste a lot of cycles by being executed for each and every message. That leads us to the idea of “non-fair” algorithm, which process all available messages when Runnable executed. For amounts of small messages it runs 2-3 times faster.
Here is the implementation, which is even simplier
@Typed abstract class NonfairExecutingChannelextends FairExecutingChannel { void run () { for (;;) { def q = queue if (queue.compareAndSet(q, busyEmptyQueue)) { for(m in q) { if (m) onMessage m } if(!queue.compareAndSet(busyEmptyQueue, FQueue.emptyQueue)) { executor.execute this } break } } } }
Intersting to notice that we can develop some variations of algorithms above. For example we can process as many messages as we can in given timeframe (let say 250ms) or given number of messages in a run. Functional data structures gives us a lot of flexibility.
To have the picture complete we should also implement thread backed variation of our approach. We leave it as exercise for reader
Thank you for reading and hope it was interesting. Till next time.
Algorithm Message passing
Opinions expressed by MaximusDevs contributors are their own.
Trending
-
Real-Time Made Easy: An Introduction to SignalR
-
TDD vs. BDD: Choosing The Suitable Framework
-
How to LINQ Between Java and SQL With JPAStreamer
-
What Is Istio Service Mesh?