JAM - examples
Despite its inner complexity and the multifaceted possibilities the JAM framework provides, its usage is in most cases as quick and easy as simply calling any regular method. It is designed for convenience and ease, performance and power and complexity kept under the hood. JAM can mediate defined announcements as well as any arbitrary object from any place in your system to any other place in your system, allowing your system to react in one place directly on events that happen in another place, with full flexibility and high performance. JAM can speed up your system in many cases.
Note: the mediation from a publisher to a subscriber will circumvent a traditional n-tier architecture in most cases - on purpose. This does not mean the information flow through JAM is unstructured or less orchestrated. Its the opposite. With JAM you can use complex topics with endless possibilites to orchestrate your information flows, but topics provide a way way higher flexibility and ease than tiers.
Let's see first a few very simple examples here, how to use JAM in practice. First we see how we can publish simple stuff. JAM comes with several API with different flavors (and we can extend JAM with our own API if we like). On the publishers side we can use e.g.
- Publish (static methods, single threaded), best for simple cases
- PublishParallel (static methods, multithreaded), best for simple concurrent cases
- Composition (fluent API, single threaded), best for complex cases
- ParallelComposition (fluent API, multithreaded), best for complex concurrent cases
- MessageQuery, API to retrieve complex messages for individual processing
- Specification, complementary API to specify a required compliance and demanded objects
- Ref, API for direct value change propagation, including value change vetos
- Distribute, specialized API to pull or push data as chunks and on demand
- DistributeParallel, specialized API to pull or push data concurrently as chunks and on demand
- etc.
We can publish a simple text like this:
Publish.text("Hello world!");
We can add a severity (aka log level) like INFO:
Publish.info("Hello world!");
Which is the same as:
Publish.text("Hello world!", Severity.INFO);
In all 3 examples above the topic will be choosen automatically. For the first one its the RootTopic.TEXT, for example 2 and 3 its the RootTopic.INFO_TEXT. To choose a topic manually we have to use a fluent API (Composition, ParallelComposition) which we discuss later on.
Note: JAM provides more topic classes with automatic topic detection, including dynamic and generic topics.
If we wish that JAM mediates concurrently, we simply replace Publish by PublishParallel:
PublishParallel.text("Hello world!");
It is important to know that the topic choosen for a concurrent process is never the same as the topic choosen for a single threaded process, even if the topic name is the same!
Note: depending on our use case and architecture we might have to configure the concurrency in JAM to make it work as we expect. That is too complex to discuss it at this stage.
Before proceeding to complex messages and more complex stuff, let's see already how we can receive our simple text announcements on the other side, the side of the subscribers.
By default, JAM is already initialized with a few subscriptions, so for some basic functionalities JAM works out of the box, but for our examples above the only relevant subscriber is the AnnouncementPrinter. The AnnouncementPrinter simply writes all announcements he receives to the "standard" output stream (usually the console). The AnnouncementPrinter accepts the following announcement types:
- TextOffer (asynchronous announcement of simple text)
- MessageOffer (asynchronous announcement of complex messages)
- ThrowableOffer (asynchronous announcement of Throwable objects)
That means that we should already be able to see the output of our simple text announcements in the console after being sent.
To pass the output also through a logger (e.g. to write it with the logger format into a logfile or on the console again), there is a ready to use subscriber AnnouncementLogger which works same as the AnnouncementPrinter, but is not subscribed yet. The AnnouncementLogger uses the java.util.logging.Logger.
Note: we can easily employ other logger frameworks. JAM comes already with extra packages for famous logger API like Log4j and SLF4J.
To subscribe the AnnouncementLogger we must decide first for which topic(s) we want the logging. To make it simple for now, we subscribe like this:
JAM.getMediator(RootTopic.ANY).subscribe(new AnnouncementLogger());
Or for concurrently processed announcements:
JAM.getConcurrentMediator(RootTopic.ANY).subscribe(new AnnouncementLogger());
Now, all announcements of a suitable announcement type (here TextOffer, MessageOffer, ThrowableOffer) will be sent to the logger.
Note: we might need to configure the logger framework first to make it work actually.
For a real-world application we need our own implementations of subscribers. Even that is very simple to do:
JAM.getMediator(RootTopic.ANY).mediator.subscribe(new TextConsumer() {
@Override
public boolean acceptsSubtopics() {
return true;
}
@Override
public void accept(Topic topic, TextOffer offer) {
System.out.println("Consumed: " + offer.getContent());
}
});
Just for this example we created a new TextConsumer as anonymous class and subscribed that Consumer in one step. Consumers consume Offers. Offers are asynchronous announcements, means nothing will be returned synchronously. Accordingly, Consumers accept Offers and return nothing.
Note: synchronous announcements are represented by Demands, which are consumed by Suppliers. Suppliers return the objects (or null) demanded by a Demand synchronously.
We return true in the method acceptsSubtopics() to make sure the reception of announcements is not restricted to the topic the subscriber is subscribed for (here ANY). Remember our text announcements are sent automatically to the topics TEXT and INFO_TEXT. Returning true allows to receive announcements from these subordinated topics of ANY.
Fortunately, JAM passes the announcements to the subscribers always with their correct concrete type, so we can access all their properties type safe without any instanceof and cast! For this example we just print the content (the String) to the console.
Well, simple texts are probably not the reason to employ JAM. Let's continue with complex messages now and later on with more complex and generic stuff.
We can publish a complex message with severity INFO like this:
Publish.infoMessage("message_hello", Publish.createParameter("param_world", "world"));
Complex Messages are backed by any "backend" source, e.g. properties in resource bundles, databases etc. Here "message_hello" is the identifier for the backed message
message_hello=Hello {0}!
and "param_world" is the identifier for a backed parameter and "world" is a fallback String for the case the parameter cannot be resolved or is null.
The automatically choosen topic is here RootTopic.INFO_MESSAGE.
Note: message parameters can be nested (parameters with parameters)! Nested parameters enable us to build really complex message structures without too many redundancies. Think of an engine that generates documents.
To receive messages on the side of the subscribers we have to subscribe a MessageConsumer in the same manner as we did with the TextConsumer before.
Moreover, JAM supports to use boolean expressions as identifier for messages or parameters in the backing resource to match a given value! Imagine we submit as message identifier the value "100" ...
Publish.infoMessage(new LongMessageId(100));
...which is automatically matched to the message from the backing resource...
>99&<101=hundred
or maybe something like
99|100|101=hundred
...and the message actually published is the text "hundred".
Vice versa, we can also use an object as identifier which represents a value range, e.g. we submit...
Publish.infoMessage(new LongRangeMessageId(2L, 8L));
...which is automatically matched to the messages from the backing resource...
5=Hello
6= world!
...and the message actually published is the concatenated text "Hello world!".
Maybe we have a use case that requires even more power and flexibility and in order to process a message individually we rather want to retrieve the message and then process that message before we publish it or we do anything else with it. JAM let us retrieve messages and values of any type easily, for instance like this:
If we have a property in a resource bundle with a numeric value...
hundred=100
...we retrieve it with a message query as numeric type (here a Long)...
Queue<Long> queue = new MessageQuery<Long>(new Parser<Long>() {
@Override
public Long parse(String string) {
return Long.parseLong(string);
}
})
.get("hundred");
...an we get back a Queue of Long, because JAM retrieves the messages from subscribers which query the backing sources - and hence JAM supports to retrieve the messages from several subscribers and serveral sources at once. If we have just one source, we can simply poll once:
Long value = queue.poll().longValue();
Since you can retrieve any data type, this approach can be used nicely beyond the classical purpose of messages for instance to implement a parameterization of a system. In that regard we might want to retrieve a list of values and not just a single value...
hundred=99|100|101
...which works same easy with JAM...
Queue<List<Long>> queue = new MessageQuery<Long>(new Parser<Long>() {
@Override
public Long parse(String string) {
return Long.parseLong(string);
}
})
.list("hundred");
...we just get back List(s) inside the Queue:
List<Long> list = queue.poll();
Probably, even complex messages are not the (only) reason to employ JAM. Let's have a look at the creation of our own announcement types first and then generic ways without specialized announcement type to announce arbitrary objects instantly.
An announcement must be implemented along with its counterpart: the respective subscriber interface. Both are tightly coupled. We implement a consumer and an announcement type to announce objects of the type java.lang.Long:
public interface LongConsumer extends Consumer {
void accept(Topic topic, LongOffer offer) throws Throwable;
}
public class LongOffer extends OfferBase<Long, LongConsumer> {
private static final long serialVersionUID = -8728103319320242516L;
public LongOffer(Long content) {
super(content);
}
@Override
public void offerTo(Topic topic, LongConsumer consumer) throws Throwable {
consumer.accept(topic, this);
}
@Override
public Class<LongConsumer> getSubscriberType() {
return LongConsumer.class;
}
}
The consumer interface does not define any operation and we are free to define our own operation(s) (usually named "accept").
In the constructor of the LongOffer we can just pass the main content (here a Long) to the superclass, which manages the content for some purposes. Beside that main content, we can add as many additional (but unmanaged) properties as we like for our own purposes.
The offerTo method uses our self-defined operation ("accept") on the LongConsumer interface, passing itself to the LongConsumer.
To receive these new LongOffers we implement the new LongConsumer interface in any class we like and subscribe that LongConsumer in the same manner as the TextConsumer before. Then we can fire our first LongOffer:
Publish.offer(new LongOffer(new Long(100)));
Of course, in a real-world scenario we do not just send a Long or the like, because it does not provide any context information to the receiver. We rather implement any kind of specialized bean or business object. But thanks to the possibility to implement own announcement types, we can also use these own announcement types as specialized beans by adding the properties that provide context information accordingly.
In case we already have a specialized object type that makes sense for receivers we might want to announce it directly without any added boilerplate code. Moreover, we might have individual announcements which are published just seldom and therefore the effort of implementing an own announcement type feels disproportionate perhaps. Fortunately, JAM let us announce any arbitrary object in a generic manner:
Publish.offerThat(new Long(100));
We just used Long again to keep this example simple. To receive that Long object we need again, you can guess it already, a Consumer. This time with a small, but important change: we do not implement our own new Consumer interface, but we implement the interface GenericConsumer, which requires to implement additionally the operation getContentType().
public class OurLongProcessor implements GenericConsumer<Long, OurLongProcessor> {
@Override
public Class<Long> getContentType() {
return Long.class;
}
@Override
public boolean acceptsSubtopics() {
return true;
}
@Override
public void accept(Topic topic, GenericOffer<Long, OurLongProcessor> offer) {
Long long = offer.getContent();
// we process "long" here
}
}
We implemented the GenericConsumer interface, providing as generic parameters the type of object we receive and the type of our own implementation. The method getContentType() returns the type of object we receive, because we do not receive a specialized announcement type and with the content type we tell JAM which type of object we want to receive.
As you can see, we can publish any arbitrary object as convenient as a simple text and same type safe as well. So why should we implement own announcements anyway? The generic way of publishing objects has a loss of performance. Just a small loss, but if we publish that way a huge number of objects, that can play a role depending on our use case and overall architecture. That loss of performance can be mitigated in most cases by publishing concurrently:
PublishParallel.offerThat(new Long(100));
Therefore it is recommended to implement own announcement types for those content types that we publish often at runtime, unless we are sure it does not play any role, and to use the generic way of publishing for those content types which are fired just seldom.
As an aside: JAM tries to provide us type safety all the way and the best performance possible for its compex functionality, and therefore tries to avoid to use reflections internally where it might impact the performance, instead using Java generics in an extreme manner (kept under the hood, so won't bother us), shifting performance costs from the runtime to the compile time. For the generic way of publishing as seen above that is not fully possible anymore, because it needs some sort of rocket science algorithm for the automatic detection of (nested) generic parameters in depth at runtime. Hence it looses performance in comparison to defined announcements.
We saw lots of basic examples how to publish with JAM in terms of announcing texts, messages and arbitrary objects asynchronously. JAM supports synchronous information flows as well, which means we get an immediate response, like from an invocation of any other operation which has a return type (that is not void). With such an invocation we rather demand data than offering data (even if we also offer some data needed to process the demand). Therefore a synchronous announcement is called a Demand and a subscriber returning the demanded data (or null) is called a Supplier.
We can find in the publisher API of JAM like Publish, PublishParallel, Composition, ParallelComposition, etc. many operations to demand data beside the operations to offer data. A typical example for a Demand is to demand an instance of a given type:
Long long = Publish.demandInstance(Long.class).poll();
More than one Supplier might return an instance, so we get back a queue and we simply poll. To receive the InstanceDemand we subscribe a Supplier implementing the InstanceSupplier interface same like we did with the Consumers, but this implementation can return an object (here a Long). Again, in a real-world scenario we will demand a more specific data type and not just Long. But even with a more specific data type the demand might be still too unspecific in terms of how a Supplier shall satisfy the demand, e.g. delivering specific data within the returned object. For being able to provide an exact specification of what data we demand from Suppliers, JAM provides a specification API that can be used in addition with many publishing operations, for demands and offers.
Long long = Publish.demandInstance(Long.class, new NotNullSpecification<Long>(), true).poll();
Here, we require to get back an object and not null. Even if some Suppliers may return null, at least one Supplier must return a valid object as specified, otherwise JAM will throw a SpecificationUnsatisfiedException. Additionally to "not null" we can create Specifications to validate domain and use case specific data. To do so we extend usually the SpecificationDecoratorBase or the SpecificationChainBase, to decorate and / or chain specifications with each other.
Announcements and Subscribers which support Specifications are segregated from those without Specifications. In our example we receive an SpecifiedInstanceDemand with an implementation of the InstanceSpecificationSupplier interface. SpecificationSuppliers can use a specified demand to test objects against the specification, e.g. to filter a collection, before returning them.
If it comes to use Specifications for Offers, we do not specify objects to return, of course, but we specify a compliance that implements the interface Compliance like e.g. ComplianceState. Imagine for instance the Consumer which receives a specified offer is a batch process implementation. Then we probably want to assert the batch process has been completed without failures or with some tolerated failures or partially completed etc.
When using the static methods of Publish or PublishParallel, its easy for simple cases, but if we want to do more complex stuff that kind of API may feel too clumsy. Therefore JAM provides fluent API, which are more or less self-explaining and provide way more flexibility and even possibilities beyond Publish or PublishParallel, but feel inflated for very simple cases. We invoke the fluent API like this (using Long again)...
Publish.asFollows(). ...
or
Publish.as(Long.class). ...
or
PublishParallel.asFollows(). ...
or
PublishParallel.as(Long.class). ...
To give some fancy examples for the fluent API:
Publish.asFollows()
.on(RootTopic.ANY)
.atInfoLevel()
.theMessage(new LongMessageId(1000), "One thousend!")
.messagingNow();
Publishes immediately the message with ID 1000 as INFO under ANY. We can pass different topic types to the on(Topic) operation, including our own implementations of the Topic interface. JAM provides several types out of the box, e.g.
- RootTopic (root hierarchy of (all) topics)
- NamedTopic (creates topics from names)
- TypeTopic (creates topics from types)
- GenericTopic (creates topics from any objects)
- WeakGenericTopic (weakly referenced topics)
- NoTopic (fallback if no topic is provided)
Long long = PublishParallel.as(Long.class)
.theGivenType()
.demandingInstanceNow()
.poll();
Returns immediately an instance of type Long.
PublishParallel.asFollows()
.within(2, TimeUnit.SECONDS)
.that(new Long(100))
.offeringThatTimed();
Offers a Long, but requires to finish all processes within 2 seconds (timeout).
Object object = PublishParallel.asFollows()
.on(TypeTopic.createTopic(Long.class))
.waitingTilCompletion()
.after(2).elapsed(TimeUnit.SECONDS)
.theObject(new Long(100))
.demandingObjectsTimed()
.get().poll();
Waits 2 seconds, creates for this demand a TypeTopic with the type Long, then returns an object as specified (Long value=100) as soon as all processes are completed. The operation demandingObjectsTimed() returns a ConcurrencyContext which provides many methods to deal with concurrency and scheduling. Here we just got the queue again.
PublishParallel.as(Long.class)
.that(new Long(100))
.sendingResultsTo(new ResultReceiver<Long>() {
@Override
public void receive(Queue<Long> results, Topic topic, Announcement<?> announcement) {
// here we process the "results"
}
})
.demandingThatTimed();
Here we demand a Long object, processed concurrently, but the results are forwarded to another given receiver (here as anonymous class).
We saw many basic examples of using different API provided by JAM. There is so much more to discover in JAM - for instance Ref with ValueChangeVetoSupplier, Distribute and DistributeParallel with distributors, distribution channels and distributions, also the subscription of anouncement filters, weak subscriptions and we should not forget the numerous possibilities to configure the mediators itself, regarding multi-level concurrency, thread pools, input queue, announcements on demand, and so on - tons of features!
More coming soon.
About the creator & author