设计模式基础(十二)——观察者模式
观察者模式( Observer Design Pattern),也被称为发布订阅模式,是一种行为型模式。所谓行为型模式,主要解决的就是“类或对象之间的交互”问题。
根据应用场景的不同,观察者模式有不同的代码实现方式:同步阻塞方式,异步非阻塞方式、进程内实现方式、跨进程实现方式。
一、基本原理
在 GoF 的《设计模式》一书中,观察者模式的定义是: 在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知。
我们来看一个最简单的观察者模式示例。下面的代码算是观察者模式的“模板代码”,只能反映大体的设计思路。在真实的软件开发中,并不需要照搬下面的模板代码, 因为观察者模式的实现方法各式各样,函数、类的命名等会根据业务场景的不同有很大的差别。
首先是Subject接口,表示观察者所订阅的主题:
public interface Subject {
void registerObserver(Observer observer);
void removeObserver(Observer observer);
void notifyObservers(Message message);
}
public class ConcreteSubject implements Subject {
private List<Observer> observers = new ArrayList<Observer>();
@Override
public void registerObserver(Observer observer) {
observers.add(observer);
}
@Override
public void removeObserver(Observer observer) {
observers.remove(observer);
}
@Override
public void notifyObservers(Message message) {
for (Observer observer : observers) {
observer.update(message);
}
}
}
然后是Observer,也就是订阅主题的观察者们:
public interface Observer {
void update(Message message);
}
public class ConcreteObserverOne implements Observer {
@Override
public void update(Message message) {
//TODO: 获取消息通知,执行自己的逻辑...
System.out.println("ConcreteObserverOne is notified.");
}
}
public class ConcreteObserverTwo implements Observer {
@Override
public void update(Message message) {
//TODO: 获取消息通知,执行自己的逻辑...
System.out.println("ConcreteObserverTwo is notified.");
}
}
最后,我们可以像下面这样使用观察者模式:
public class Demo {
public static void main(String[] args) {
ConcreteSubject subject = new ConcreteSubject();
subject.registerObserver(new ConcreteObserverOne());
subject.registerObserver(new ConcreteObserverTwo());
subject.notifyObservers(new Message());
}
}
二、使用场景
那么,到底什么情况下需要用到观察者模式?或者说,这种设计模式能解决什么问题呢? 我们通过一个例子来理解下。
2.1 使用观察者模式前
假设我们在开发一个 P2P 投资理财系统,用户注册成功之后,我们会给用户发放投资体验金。代码实现大致是下面这个样子的:
public class UserController {
private UserService userService; // 依赖注入
private PromotionService promotionService; // 依赖注入
public Long register(String telephone, String password) {
//省略输入参数的校验代码
//省略userService.register()异常的try-catch代码
long userId = userService.register(telephone, password);
promotionService.issueNewUserExperienceCash(userId);
return userId;
}
}
上面代码有个明显的问题, 违反了单一职责原则。 如果需求频繁变动,比如,用户注册成功之后,不再发放体验金,而是改为发放优惠券,并且还要给用户发送一封“欢迎注册成功”的站内信。这种情况下,我们就需要频繁地修改 register() 函数中的代码,违反开闭原则。
2.2 使用观察者模式后
这个时候,观察者模式就能派上用场了。利用观察者模式,我对上面的代码进行了重构。重构之后的代码如下所示:
public interface RegObserver {
void handleRegSuccess(long userId);
}
public class RegPromotionObserver implements RegObserver {
private PromotionService promotionService; // 依赖注入
@Override
public void handleRegSuccess(long userId) {
promotionService.issueNewUserExperienceCash(userId);
}
}
public class RegNotificationObserver implements RegObserver {
private NotificationService notificationService;
@Override
public void handleRegSuccess(long userId) {
notificationService.sendInboxMessage(userId, "Welcome...");
}
}
public class UserController {
private UserService userService; // 依赖注入
private List<RegObserver> regObservers = new ArrayList<>();
// 一次性设置好,之后也不可能动态的修改
public void setRegObservers(List<RegObserver> observers) {
regObservers.addAll(observers);
}
public Long register(String telephone, String password) {
//省略输入参数的校验代码
//省略userService.register()异常的try-catch代码
long userId = userService.register(telephone, password);
for (RegObserver observer : regObservers) {
observer.handleRegSuccess(userId);
}
return userId;
}
}
当我们需要添加新的观察者的时候,比如,用户注册成功之后,推送用户注册信息给大数据征信系统,基于观察者模式的代码实现,UserController 类的 register() 函数完全不需要修改,只需要再添加一个实现了 RegObserver 接口的类,并且通过 setRegObservers() 函数将它注册到 UserController 类中即可。
三、异步观察者模式
上述用户注册的例子,register() 函数依次调用每个观察者的 handleRegSuccess() 函数,等到都执行完成之后,才会返回结果给客户端,是一种典型的同步阻塞实现方式。事实上, 异步非阻塞的观察者模式在现实中更加常用。要实现异步非阻塞,最简单的做法是,在每个 handleRegSuccess() 函数中,创建一个新的线程执行代码。不过,这种方式并不优雅,比较好的方式是基于 EventBus 来实现。
3.1 EventBus
EventBus——事件总线,它提供了实现观察者模式的骨架代码。我们可以基于此框架,非常容易地在自己的业务场景中实现观察者模式,不需要从零开始开发。Google Guava EventBus 就是一个比较著名的 EventBus 框架,它不仅仅支持异步非阻塞模式,同时也支持同步阻塞模式。
我们用 Guava EventBus 重新实现上一节中的用户注册示例。
首先,基于 EventBus,我们不需要定义 Observer 接口,任意类型的对象都可以注册到 EventBus 中,通过 @Subscribe 注解来标明类中哪个函数可以接收被观察者发送的消息:
public class RegPromotionObserver {
private PromotionService promotionService; // 依赖注入
@Subscribe
public void handleRegSuccess(long userId) {
promotionService.issueNewUserExperienceCash(userId);
}
}
public class RegNotificationObserver {
private NotificationService notificationService;
@Subscribe
public void handleRegSuccess(long userId) {
notificationService.sendInboxMessage(userId, "...");
}
}
然后,UserController中引入EventBus:
public class UserController {
private UserService userService; // 依赖注入
private EventBus eventBus;
private static final int DEFAULT_EVENTBUS_THREAD_POOL_SIZE = 20;
public UserController() {
//eventBus = new EventBus(); // 同步阻塞模式
// 异步非阻塞模式
eventBus = new AsyncEventBus(Executors.newFixedThreadPool(DEFAULT_EVENTBUS_THREAD_POOL_SIZE));
}
public Long register(String telephone, String password) {
long userId = userService.register(telephone, password);
// EventBus 类提供了 post() 函数,用来给观察者发送消息
eventBus.post(userId);
return userId;
}
public void setRegObservers(List<Object> observers) {
for (Object observer : observers) {
eventBus.register(observer);
}
}
}
跟经典的观察者模式的不同之处在于:当我们调用 EventBus.post() 函数发送消息的时候,并非把消息发送给所有的观察者,而是发送给可匹配的观察者。所谓可匹配指的是,@Subscribe注解的方法入参类型匹配。
3.2 实现EventBus框架
了解了EventBus的基本功能,我们可以自己来实现一个简单版的EventBus。
EventBus的核心函数是register和post。它的最关键的一个数据结构是 Observer 注册表,记录了消息类型和可接收消息函数的对应关系。当调用 register() 函数注册观察者的时候,EventBus 通过解析 @Subscribe 注解,生成 Observer 注册表。

当调用 post() 函数发送消息的时候,EventBus 通过注册表找到相应的可接收消息的函数,然后通过 Java 反射来动态地创建对象、执行函数:

对于同步阻塞模式,EventBus 在一个线程内依次执行相应的函数。对于异步非阻塞模式,EventBus 通过一个线程池来执行相应的函数。
@Subscribe
首先定义一个标记注解@Subscribe,用于标明观察者中的哪个函数可以接收消息:
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Beta
public @interface Subscribe {}
ObserverAction
接着,我们需要对@Subscribe所注解的方法进行抽象,用一个ObserverAction 类来表示。其中,target 表示观察者类,method 表示方法:
public class ObserverAction {
private Object target;
private Method method;
public ObserverAction(Object target, Method method) {
this.target = Preconditions.checkNotNull(target);
this.method = method;
this.method.setAccessible(true);
}
public void execute(Object event) {
try {
method.invoke(target, event);
} catch (InvocationTargetException | IllegalAccessException e) {
e.printStackTrace();
}
}
}
ObserverRegistry
ObserverRegistry 类就是前面讲到的 Observer 注册表,是最复杂的一个类,框架中几乎所有的核心逻辑都在这个类中。这个类大量使用了 Java 的反射语法,不过代码整体来说都不难理解:
public class ObserverRegistry {
private ConcurrentHashMap<Class<?>, CopyOnWriteArraySet<ObserverAction>> registry = new ConcurrentHashMap<>();
public void register(Object observer) {
Map<Class<?>, Collection<ObserverAction>> observerActions = findAllObserverActions(observer);
for (Map.Entry<Class<?>, Collection<ObserverAction>> entry : observerActions.entrySet()) {
Class<?> eventType = entry.getKey();
Collection<ObserverAction> eventActions = entry.getValue();
CopyOnWriteArraySet<ObserverAction> registeredEventActions = registry.get(eventType);
if (registeredEventActions == null) {
registry.putIfAbsent(eventType, new CopyOnWriteArraySet<>());
registeredEventActions = registry.get(eventType);
}
registeredEventActions.addAll(eventActions);
}
}
public List<ObserverAction> getMatchedObserverActions(Object event) {
List<ObserverAction> matchedObservers = new ArrayList<>();
Class<?> postedEventType = event.getClass();
for (Map.Entry<Class<?>, CopyOnWriteArraySet<ObserverAction>> entry : registry.entrySet()) {
Class<?> eventType = entry.getKey();
Collection<ObserverAction> eventActions = entry.getValue();
if (postedEventType.isAssignableFrom(eventType)) {
matchedObservers.addAll(eventActions);
}
}
return matchedObservers;
}
private Map<Class<?>, Collection<ObserverAction>> findAllObserverActions(Object observer) {
Map<Class<?>, Collection<ObserverAction>> observerActions = new HashMap<>();
Class<?> clazz = observer.getClass();
for (Method method : getAnnotatedMethods(clazz)) {
Class<?>[] parameterTypes = method.getParameterTypes();
Class<?> eventType = parameterTypes[0];
if (!observerActions.containsKey(eventType)) {
observerActions.put(eventType, new ArrayList<>());
}
observerActions.get(eventType).add(new ObserverAction(observer, method));
}
return observerActions;
}
private List<Method> getAnnotatedMethods(Class<?> clazz) {
List<Method> annotatedMethods = new ArrayList<>();
for (Method method : clazz.getDeclaredMethods()) {
if (method.isAnnotationPresent(Subscribe.class)) {
Class<?>[] parameterTypes = method.getParameterTypes();
Preconditions.checkArgument(parameterTypes.length == 1,
"Method %s has @Subscribe annotation but has %s parameters."
+ "Subscriber methods must have exactly 1 parameter.",
method, parameterTypes.length);
annotatedMethods.add(method);
}
}
return annotatedMethods;
}
}
EventBus
最后是EventBus,下面的EventBus实现了阻塞同步模式:
public class EventBus {
private Executor executor;
private ObserverRegistry registry = new ObserverRegistry();
public EventBus() {
this(MoreExecutors.directExecutor());
}
protected EventBus(Executor executor) {
this.executor = executor;
}
public void register(Object object) {
registry.register(object);
}
public void post(Object event) {
List<ObserverAction> observerActions = registry.getMatchedObserverActions(event);
for (ObserverAction observerAction : observerActions) {
executor.execute(new Runnable() {
@Override
public void run() {
observerAction.execute(event);
}
});
}
}
}
看代码你可能会有些疑问,这明明就用到了线程池 Executor 啊。实际上,MoreExecutors.directExecutor() 是 Google Guava 提供的工具类,看似是多线程,实际上是单线程。之所以要这么实现,主要还是为了跟 AsyncEventBus 统一代码逻辑,做到代码复用。
AsyncEventBus
有了 EventBus,AsyncEventBus 的实现就非常简单了。为了实现异步非阻塞的观察者模式,它就不能再继续使用 MoreExecutors.directExecutor() 了,而是需要在构造函数中,由调用者注入线程池:
public class AsyncEventBus extends EventBus {
public AsyncEventBus(Executor executor) {
super(executor);
}
}
四、开源示例
Netty 中观察者模式的运用非常多,经常使用的ChannelFuture.addListener
接口就是观察者模式的实现:
ChannelFuture channelFuture = channel.writeAndFlush(object);
channelFuture.addListener(future -> {
if (future.isSuccess()) {
// do something
} else {
// do something
}
});
addListener 方法会将监听器添加到 ChannelFuture 当中,并在 ChannelFuture 执行完毕的时候立刻通知已经注册的监听器。所以 ChannelFuture 是被观察者,addListener 方法用于添加观察者。
五、总结
观察者模式的应用场景非常广泛,小到代码层面的解耦,大到架构层面的系统解耦,再或者一些产品的设计思路,都有这种模式的影子,比如,邮件订阅、RSS Feeds,本质上都是观察者模式。
感谢赞赏~