Google

May 29, 2014

Event Driven Programming: Java example tutorial style - part 1

Event Driven Architecture aka EDA loosely couples event producers and event consumers.

An event can be defined as "a change in state". For example, when an event producer fires an event to notify all its registered listeners that either "securities" or "security prices" have been loaded, the listeners are notified to update their data via a synchronous or asynchronous dispatcher. Both the "Event" producers and listeners are loosely coupled via an "EventHub" and "Event". An "EventHub" is used to register and unregister listeners.

The "EventHub" can be registed as a JMX bean to control behaviors at runtime via a jconsole like firing an event, count number of events, etc.

A number of tutorials will take you through writing event driven code in Java along with registering as MBean to interact via a JMX compliant tool like jconsole.



Let's define the interfaces and implementation classes.


Step 1: Define the "Event" class from which all other events can be derived from.

package com.writtentest14;

import java.util.Date;

public class Event {

 private String id;
 private Date timeStamp;
 
 public Event(String id) {
  super();
  this.id = id;
  this.timeStamp = new Date();
 }

 public String getId() {
  return id;
 }

 public void setId(String id) {
  this.id = id;
 }

 public Date getTimeStamp() {
  return timeStamp;
 }

 public void setTimeStamp(Date timeStamp) {
  this.timeStamp = timeStamp;
 }

}


Step 2: Define the interface for the listeners

package com.writtentest14;

public interface EventListener<T extends Event> {
 void onEvent(T event);
}


Step 3:  Define the dispatcher interface.

package com.writtentest14;

import java.util.List;


public interface EventDispatcher {

    void dispatch(Event event, List<EventListener<Event>> listeners);    
}


Step 4: The dispatcher implementation. It could be synchronous or asynchronous dispatcher. Let's keep it simple by defining a synchronous dispatcher.

package com.writtentest14;

import java.util.List;

public class SimpleSynchronousDispatcher implements EventDispatcher {

    @Override
    public void dispatch(Event event, List<EventListener<Event>> listeners) {
        for (EventListener<Event> listener : listeners) {
            listener.onEvent(event);
        }
    }
}

Step 5:  Define the EventHub. Binds and unbinds listeners and invokes the dispatcher to dispatch the events.

package com.writtentest14;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;

/**
 * register and unregister event listeners
 */

public class EventHub {

 private static final EventHub INSTANCE = createInstance();
 
 private ConcurrentMap<String, List<EventListener<Event>>> registeredListeners = 
                                   new ConcurrentHashMap<String, List<EventListener<Event>>>();

 private EventDispatcher synchronousDispatcher;

 private AtomicLong eventCount = new AtomicLong();

 public EventHub() {
 }

 public static EventHub instance() {
  return INSTANCE;
 }

 public EventHub(EventDispatcher synchronousDispatcher) {
  this.synchronousDispatcher = synchronousDispatcher;
 }

 public long getEventCount() {
  return this.eventCount.get();
 }

 private long getNextEventNumber() {
  return this.eventCount.incrementAndGet();
 }

 protected EventDispatcher getSynchronousDispatcher() {
  return this.synchronousDispatcher;
 }

 public void setSynchronousDispatcher(EventDispatcher dispatcher) {
  this.synchronousDispatcher = dispatcher;
 }

 public void fire(Event event) {
  dispatch(event, getSynchronousDispatcher());
 }

 public synchronized void addListener(String eventId, EventListener<Event> listener) {
  List<EventListener<Event>> listeners = this.registeredListeners.get(eventId);
  if (listeners != null) {
   listeners.add(listener);
  } else {
   listeners = new CopyOnWriteArrayList<EventListener<Event>>();
   listeners.add(listener);
   this.registeredListeners.put(eventId, listeners);
  }

 }

 public void removeListener(String eventId, EventListener<Event> listener) {
  List<EventListener<Event>> listeners = this.registeredListeners.get(eventId);
  if (listeners != null) {
   listeners.remove(listener);

  }
 }

 protected void dispatch(Event event, EventDispatcher dispatcher) {
  getNextEventNumber();
  List<EventListener<Event>> listeners = getListeners(event);
  if (!listeners.isEmpty()) {

   dispatcher.dispatch(event, listeners);

  }
 }
 
  private static EventHub createInstance() {
         EventHub instance = new EventHub(new SimpleSynchronousDispatcher());
         return instance;
     }

 private List<EventListener<Event>> getListeners(Event event) {
  List<EventListener<Event>> listeners = this.registeredListeners.get(event.getId());
  return (listeners != null) ? listeners : Collections.<EventListener<Event>>emptyList();
 }

}


Step 6: Finally, the EventHubMain that has the main method to run, and creates 3 listeners as  anonymous inner classes, and also acts as a producer to fire events. The listeners and the producer are decoupled via EventHub as the producer and listeners don't interact with each other, but via the EventHub and Event classes.

package com.writtentest14;

import java.util.concurrent.TimeUnit;

public class EventHubMain {

 private static final String PRICE_LOAD_EVENT = "PL_EVENT";
 private static final String SECURITY_LOAD_EVENT = "SL_EVENT";

 public static void main(String[] args) {

  // Anonymous listener1
  EventHub.instance().addListener(PRICE_LOAD_EVENT, new EventListener<Event>() {

   @Override
   public void onEvent(Event event) {
    System.out.println(PRICE_LOAD_EVENT + " received by listener " + this.getClass());
    try {
     TimeUnit.SECONDS.sleep(10);
    } catch (InterruptedException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
    }
   }

  });

  // Anonymous listener2
  EventHub.instance().addListener(SECURITY_LOAD_EVENT, new EventListener<Event>() {

   @Override
   public void onEvent(Event event) {
    System.out.println(SECURITY_LOAD_EVENT + " received by listener " + this.getClass());
    try {
     TimeUnit.SECONDS.sleep(10);
    } catch (InterruptedException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
    }
   }

  });

  // Anonymous listener3
  EventHub.instance().addListener(PRICE_LOAD_EVENT, new EventListener<Event>() {

   @Override
   public void onEvent(Event event) {
    System.out.println(PRICE_LOAD_EVENT + " received by listener " + this.getClass());
    try {
     TimeUnit.SECONDS.sleep(10);
    } catch (InterruptedException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
    }
   }

  });

  // Event dispatcher
  while (true) {
   System.out.println("Event fired " + PRICE_LOAD_EVENT + ".............");
   EventHub.instance().fire(new Event(PRICE_LOAD_EVENT));

   try {
    TimeUnit.SECONDS.sleep(5);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }

   System.out.println("Event fired " + SECURITY_LOAD_EVENT + ".............");
   EventHub.instance().fire(new Event(SECURITY_LOAD_EVENT));

  }
 }

}


Finally, the output if you run the above class, which runs for ever in a while loop.

Event fired PL_EVENT.............
PL_EVENT received by listener class com.writtentest14.EventHubMain$1
PL_EVENT received by listener class com.writtentest14.EventHubMain$3
Event fired SL_EVENT.............
SL_EVENT received by listener class com.writtentest14.EventHubMain$2
Event fired PL_EVENT.............
PL_EVENT received by listener class com.writtentest14.EventHubMain$1


In the next post, I will integrate MBean components with MBean server to manage the EventHub via a JMX client like jconsole.

Labels: ,

0 Comments:

Post a Comment

Subscribe to Post Comments [Atom]

<< Home