-
-
Notifications
You must be signed in to change notification settings - Fork 751
Understanding Broadcaster
A Broadcaster implements the publish/subscribe paradigm. An application can subscribe to one or many Broadcasters to get notified about events. By default, a single Broadcaster is created by the framework and associated with every new AtmosphereResource. The default Broadcaster's id is always "/*", which means that if you invoke
Future<Object> f = broadcaster.broadcast("hello");
all AtmosphereResource objects associated with the broadcaster will get notified and will have a chance to handle the event. By default, broadcaster.broadcast("hello") is an asynchronous operation and won't block. If you want to get notified when the asynchronous execution complete, you can either use the returned Future
Future<Object> f = broadcaster.broadcast("hello");
f.get(); // blocking until broadcast completed.
or you can add a BroadcasterListener that will be invoked when the broadcast operation completes.
broadcaster.addBroadcasterListener(new BroadcasterListener() {
public void onComplete(Broadcaster b) {
// do something
}
}.broadcast("hello");
You can associate a Broadcaster with an AtmosphereResource by doing
atmosphereResource.setBroadcaster(broadcaster);
...
// Always return the broadcaster at position 0, or the one used when calling setBroadcaster(...);
Broadcaster b = atmosphereResource.getBroadcaster();
or
atmosphereResource.addBroadcaster(broadcaster);
...
// Return all Broadcasters
List<Broadcaster> l = atmosphereResource.broadcasters();
You can associate an AtmosphereResource to several Broadcasters by doing
broadcaster.addAtmosphereResource(atmosphereResource);
As an example, implementing a Twitter like application using the Atmosphere Framework would consist of creating one Broadcaster per Twitter account (check this page for information on how to retrieve a BroadcasterFactory instance):
Broadcaster b = broadcasterFactory.get("jfarcand");
Broadcaster b2 = broadcasterFactory.get("atmo_framework");
If user 'jfarcand" wants to be notified when 'atmo_framework' publish a tweet, you just associate the AtmosphereResource representing 'jfarcand' to the 'atmo_framework' Broadcaster:
b2.addAtmosphereResource(atmosphereResource);
You can create channels (or Broadcasters) on the fly, using the BroadcasterFactory class.
The MetaBroadcaster utility class can be really helpful when you want to broadcast messages to all or a subset of existing Broadcasters. For example, executing
metaBroadcaster.broadcastTo("/", "hello world");
will broadcast the "hello world" message to all AtmosphereResource associated to all Broadcaster. To be more specific, assume you create
broadcasterFactory.get("/a");
broadcasterFactory.get("/a/a1");
broadcasterFactory.get("/a/a2");
broadcasterFactory.get("/b");
broadcasterFactory.get("/c");
Doing
metaBroadcaster.broadcastTo("/a/*", "hello world");
is equivalent of doing:
// Retrieve the Broadcaster named "/a/a1"
broadcasterFactory.lookup("/a/a1", true).broadcast("hello world");
// Retrieve the Broadcaster named "/a/a2"
broadcasterFactory).lookup("/a/a2", true).broadcast("hello world");
You can also schedule periodic tasks using the
metaBroadcaster.scheduleTo("/a/*", new Callable<String>() {
public String call() {
return "Hello world";
}
}, 10, TimeUnit.SECONDS);
is equivalent of doing:
// Retrieve the Broadcaster named "/a/a1"
broadcasterFactory.lookup("/a/a1", true).scheduleFixedBroadcast(new Callable<String>() {
public String call() {
return "Hello world";
}
}, 10, TimeUnit.SECONDS)
// Retrieve the Broadcaster named "/a/a2"
broadcasterFactory.lookup("/a/a2", true).scheduleFixedBroadcast(new Callable<String>() {
public String call() {
return "Hello world";
}
}, 10, TimeUnit.SECONDS)
By default there is no cache associated with a MetaBroadcaster. That means broadcasted messages can be lost if you do
metaBroadcaster.broadcastTo("/a/*", "hello world");
when there are no associated AtmosphereResource found. You can configure a MetaBroadcasterCache by doing:
metaBroadcaster.cache(new MetaBroadcaster.ThirtySecondsCache(m, framework.getAtmosphereConfig()));
Finally, MetaBroadcaster also supports BroadcastListener
metaBroadcaster.addBroadcasterListener(new BroadcasterListener() {
public void onComplete(Broadcaster b) {
// do something
}
}.broadcastTo("/*", hello");
A unique instance of BroadcasterConfig is always associated with a Broadcaster. You can use a BroadcasterConfig to set ExecutorServices, add BroadcastFilter and set a BroadcasterCache.
Configuring the default Broadcaster
By default, Atmosphere is using the DefaultBroadcaster and JerseyBroadcaster if atmosphere-jersey is used. You can either extend those Broadcasters or write your own. You can configure it by doing: In web.xml
<init-param>
<param-name>org.atmosphere.cpr.broadcasterClass</param-name>
<param-value>org....</param-value>
</init-param>
or in atmosphere.xml
<applicationConfig>
<param-name>org.atmosphere.cpr.broadcasterClass</param-name>
<param-value>org...</param-value>
</applicationConfig>
You can also annotate your Broadcaster implementation by using the BroadcasterService annotation:
@BroadcasterService
public class MyBroadcaster implements Broadcaster {...}
Atmosphere is able to auto discover the following Broadcaster when available on the classpath
- RedisBroadcaster
- JGroupsBroadcaster
- XMPPBroadcaster
- HazelcastBroadcaster
- JMSBroadcaster
- KafkaBroadcaster
That means you don't have to specify them by default.
By default, a Broadcaster always creates three ExecutorServices: one for supporting asynchronous broadcast, one for supporting asynchronous write and one for scheduling tasks. If you don't need asynchronous I/O, it is recommended you use the SimpleBroadcaster or SimpleJerseyBroadcaster, they aren't using any ExecutorServices.
If your application creates a lot of Broadcasters, you may experience some Out Of Memory error because too many instances of ExecutorServices have been created, e.g number of broadcaster * 3. If that's the case, you can configure Atmosphere to share ExecutorServices amongst Broadcasters. In that case only two ExecutorServices will be created:
Broadcaster b = broadcasterFactory.get();
b.getBroacasterConfig().setExecutorServices(...).setAsyncWriteService(...);
In web.xml
<init-param>
<param-name>org.atmosphere.cpr.broadcaster.shareableThreadPool</param-name>
<param-value>true</param-value>
</init-param>
or in atmosphere.xml
<applicationConfig>
<param-name>org.atmosphere.cpr.broadcaster.shareableThreadPool</param-name>
<param-value>true</param-value>
</applicationConfig>
To configure the maximum threads created by the Broadcaster of the message delivery, just add
<init-param>
<param-name>org.atmosphere.cpr.broadcaster.maxProcessingThreads</param-name>
<param-value>10</param-value>
</init-param>
and for the one used for the write operation,
<init-param>
<param-name>org.atmosphere.cpr.broadcaster.maxAsyncWriteThreads</param-name>
<param-value>10</param-value>
</init-param>
Instead of creating several Broadcasters, you can always reduce the number by instead adding BroadcasterFilter and PerRequestBroadcasterFilter to a Broadcaster. You can then filter which broadcasted messages get delivered to which AtmosphereResource (client).
Broadcaster b = broadcasterFactory.get();
b.getBroacasterConfig().addFilter(...);
Another way to prevent or reduce memory usage is by configuring a BroadcasterFactoryLifecyclePolicy. Supported policies are:
- IDLE: Release all resources associated with the Broadcaster when the idle time expires. Suspended AtmosphereResource will NOT get resumed and instead be closed right away.
- IDLE_DESTROY: Release all resources associated with the Broadcaster when the idle time expires and destroy the Broadcaster. This operation removes the Broadcaster from it's associated BroadcasterFactory. Suspended AtmosphereResource will NOT get resumed and instead be closed right away.
- IDLE_RESUME: Release all resources associated with the Broadcaster when the idle time expires. All associated AtmosphereResource WILL BE resumed and the Broadcaster will be destroyed.
- EMPTY: If there is no AtmosphereResource associated with the Broadcaster, release all resources.
- EMPTY_DESTROY: If there is no AtmosphereResource associated with the Broadcaster, release all resources and destroy the Broadcaster. This operation removes the Broadcaster from it's associated BroadcasterFactory
- NEVER: Never release or destroy the Broadcaster from it's associated BroadcasterFactory
The default is NEVER, which means that a fair amount of Broadcasters may 'polute' the BroadcasterFactory if not handled properly. BroadcasterFactoryLifecyclePolicy can be configured
You can configure the policy on a Broadcaster directly:
Broadcaster b = broadcasterFactory.get();
b.setBroadcasterLifeCyclePolicy(BroadcasterLifeCyclePolicy.IDLE);
You can also associate BroadcasterLifeCyclePolicyListener to a Broadcaster so you get notified when a policy is executed.
b.addBroadcasterLifeCyclePolicyListener(new BroadcasterLifeCyclePolicyListener() {...});
so instead of creating a new one, the "destroyed" one will be marked as "alive" and can be re-used.
In web.xml
<init-param>
<param-name>org.atmosphere.cpr.broadcasterLifeCyclePolicy</param-name>
<param-value>IDLE</param-value>
</init-param>
or in atmosphere.xml
<applicationConfig>
<param-name>org.atmosphere.cpr.broadcasterLifeCyclePolicy</param-name>
<param-value>IDLE</param-value>
</applicationConfig>
NOTE: If you set the policy to xxx_DESTROY, the associated BroadcasterCache will also be destroyed, and your application may start LOOSING messages between the time the Broacascaster with the same name is getting recreated.
A Broadcaster may have been destroyed but still available from the BroadcasterFactory. To re-use destroyed broadcaster, you can add, in web/application.xml
<init-param>
<param-name>org.atmosphere.cpr.recoverFromDestroyedBroadcaster</param-name>
<param-value>true</param-value>
</init-param>
- Understanding Atmosphere
- Understanding @ManagedService
- Using javax.inject.Inject and javax.inject.PostConstruct annotation
- Understanding Atmosphere's Annotation
- Understanding AtmosphereResource
- Understanding AtmosphereHandler
- Understanding WebSocketHandler
- Understanding Broadcaster
- Understanding BroadcasterCache
- Understanding Meteor
- Understanding BroadcastFilter
- Understanding Atmosphere's Events Listeners
- Understanding AtmosphereInterceptor
- Configuring Atmosphere for Performance
- Understanding JavaScript functions
- Understanding AtmosphereResourceSession
- Improving Performance by using the PoolableBroadcasterFactory
- Using Atmosphere Jersey API
- Using Meteor API
- Using AtmosphereHandler API
- Using Socket.IO
- Using GWT
- Writing HTML5 Server-Sent Events
- Using STOMP protocol
- Streaming WebSocket messages
- Configuring Atmosphere's Classes Creation and Injection
- Using AtmosphereInterceptor to customize Atmosphere Framework
- Writing WebSocket sub protocol
- Configuring Atmosphere for the Cloud
- Injecting Atmosphere's Components in Jersey
- Sharing connection between Browser's windows and tabs
- Understanding AtmosphereResourceSession
- Manage installed services
- Server Side: javadoc API
- Server Side: atmosphere.xml and web.xml configuration
- Client Side: atmosphere.js API