原创

设计模式基础(十二)——观察者模式

观察者模式( Observer Design Pattern),也被称为发布订阅模式,是一种行为型模式。所谓行为型模式,主要解决的就是“类或对象之间的交互”问题。
根据应用场景的不同,观察者模式有不同的代码实现方式:同步阻塞方式,异步非阻塞方式、进程内实现方式、跨进程实现方式。

一、基本原理

在 GoF 的《设计模式》一书中,观察者模式的定义是: 在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知。

我们来看一个最简单的观察者模式示例。下面的代码算是观察者模式的“模板代码”,只能反映大体的设计思路。在真实的软件开发中,并不需要照搬下面的模板代码, 因为观察者模式的实现方法各式各样,函数、类的命名等会根据业务场景的不同有很大的差别。

首先是Subject接口,表示观察者所订阅的主题:

public interface Subject {
  void registerObserver(Observer observer);
  void removeObserver(Observer observer);
  void notifyObservers(Message message);
}

public class ConcreteSubject implements Subject {
  private List<Observer> observers = new ArrayList<Observer>();

  @Override
  public void registerObserver(Observer observer) {
    observers.add(observer);
  }

  @Override
  public void removeObserver(Observer observer) {
    observers.remove(observer);
  }

  @Override
  public void notifyObservers(Message message) {
    for (Observer observer : observers) {
      observer.update(message);
    }
  }

}

然后是Observer,也就是订阅主题的观察者们:

public interface Observer {
  void update(Message message);
}

public class ConcreteObserverOne implements Observer {
  @Override
  public void update(Message message) {
    //TODO: 获取消息通知,执行自己的逻辑...
    System.out.println("ConcreteObserverOne is notified.");
  }
}

public class ConcreteObserverTwo implements Observer {
  @Override
  public void update(Message message) {
    //TODO: 获取消息通知,执行自己的逻辑...
    System.out.println("ConcreteObserverTwo is notified.");
  }
}

最后,我们可以像下面这样使用观察者模式:

public class Demo {
  public static void main(String[] args) {
    ConcreteSubject subject = new ConcreteSubject();
    subject.registerObserver(new ConcreteObserverOne());
    subject.registerObserver(new ConcreteObserverTwo());
    subject.notifyObservers(new Message());
  }
}

二、使用场景

那么,到底什么情况下需要用到观察者模式?或者说,这种设计模式能解决什么问题呢? 我们通过一个例子来理解下。

2.1 使用观察者模式前

假设我们在开发一个 P2P 投资理财系统,用户注册成功之后,我们会给用户发放投资体验金。代码实现大致是下面这个样子的:

public class UserController {
  private UserService userService; // 依赖注入
  private PromotionService promotionService; // 依赖注入

  public Long register(String telephone, String password) {
    //省略输入参数的校验代码
    //省略userService.register()异常的try-catch代码
    long userId = userService.register(telephone, password);
    promotionService.issueNewUserExperienceCash(userId);
    return userId;
  }
}

上面代码有个明显的问题, 违反了单一职责原则。 如果需求频繁变动,比如,用户注册成功之后,不再发放体验金,而是改为发放优惠券,并且还要给用户发送一封“欢迎注册成功”的站内信。这种情况下,我们就需要频繁地修改 register() 函数中的代码,违反开闭原则。

2.2 使用观察者模式后

这个时候,观察者模式就能派上用场了。利用观察者模式,我对上面的代码进行了重构。重构之后的代码如下所示:

public interface RegObserver {
  void handleRegSuccess(long userId);
}

public class RegPromotionObserver implements RegObserver {
  private PromotionService promotionService; // 依赖注入

  @Override
  public void handleRegSuccess(long userId) {
    promotionService.issueNewUserExperienceCash(userId);
  }
}

public class RegNotificationObserver implements RegObserver {
  private NotificationService notificationService;

  @Override
  public void handleRegSuccess(long userId) {
    notificationService.sendInboxMessage(userId, "Welcome...");
  }
}

public class UserController {
  private UserService userService; // 依赖注入
  private List<RegObserver> regObservers = new ArrayList<>();

  // 一次性设置好,之后也不可能动态的修改
  public void setRegObservers(List<RegObserver> observers) {
    regObservers.addAll(observers);
  }

  public Long register(String telephone, String password) {
    //省略输入参数的校验代码
    //省略userService.register()异常的try-catch代码
    long userId = userService.register(telephone, password);

    for (RegObserver observer : regObservers) {
      observer.handleRegSuccess(userId);
    }

    return userId;
  }
}

当我们需要添加新的观察者的时候,比如,用户注册成功之后,推送用户注册信息给大数据征信系统,基于观察者模式的代码实现,UserController 类的 register() 函数完全不需要修改,只需要再添加一个实现了 RegObserver 接口的类,并且通过 setRegObservers() 函数将它注册到 UserController 类中即可。

三、异步观察者模式

上述用户注册的例子,register() 函数依次调用每个观察者的 handleRegSuccess() 函数,等到都执行完成之后,才会返回结果给客户端,是一种典型的同步阻塞实现方式。事实上, 异步非阻塞的观察者模式在现实中更加常用。要实现异步非阻塞,最简单的做法是,在每个 handleRegSuccess() 函数中,创建一个新的线程执行代码。不过,这种方式并不优雅,比较好的方式是基于 EventBus 来实现。

3.1 EventBus

EventBus——事件总线,它提供了实现观察者模式的骨架代码。我们可以基于此框架,非常容易地在自己的业务场景中实现观察者模式,不需要从零开始开发。Google Guava EventBus 就是一个比较著名的 EventBus 框架,它不仅仅支持异步非阻塞模式,同时也支持同步阻塞模式。

我们用 Guava EventBus 重新实现上一节中的用户注册示例。

首先,基于 EventBus,我们不需要定义 Observer 接口,任意类型的对象都可以注册到 EventBus 中,通过 @Subscribe 注解来标明类中哪个函数可以接收被观察者发送的消息:

public class RegPromotionObserver {
  private PromotionService promotionService; // 依赖注入

  @Subscribe
  public void handleRegSuccess(long userId) {
    promotionService.issueNewUserExperienceCash(userId);
  }
}

public class RegNotificationObserver {
  private NotificationService notificationService;

  @Subscribe
  public void handleRegSuccess(long userId) {
    notificationService.sendInboxMessage(userId, "...");
  }
}

然后,UserController中引入EventBus:

public class UserController {
  private UserService userService; // 依赖注入

  private EventBus eventBus;
  private static final int DEFAULT_EVENTBUS_THREAD_POOL_SIZE = 20;

  public UserController() {
    //eventBus = new EventBus(); // 同步阻塞模式

    // 异步非阻塞模式
    eventBus = new AsyncEventBus(Executors.newFixedThreadPool(DEFAULT_EVENTBUS_THREAD_POOL_SIZE)); 
  }

  public Long register(String telephone, String password) {
    long userId = userService.register(telephone, password);

    // EventBus 类提供了 post() 函数,用来给观察者发送消息
    eventBus.post(userId);

    return userId;
  }

  public void setRegObservers(List<Object> observers) {
    for (Object observer : observers) {
      eventBus.register(observer);
    }
  }
}

跟经典的观察者模式的不同之处在于:当我们调用 EventBus.post() 函数发送消息的时候,并非把消息发送给所有的观察者,而是发送给可匹配的观察者。所谓可匹配指的是,@Subscribe注解的方法入参类型匹配。

3.2 实现EventBus框架

了解了EventBus的基本功能,我们可以自己来实现一个简单版的EventBus。

EventBus的核心函数是register和post。它的最关键的一个数据结构是 Observer 注册表,记录了消息类型和可接收消息函数的对应关系。当调用 register() 函数注册观察者的时候,EventBus 通过解析 @Subscribe 注解,生成 Observer 注册表。



当调用 post() 函数发送消息的时候,EventBus 通过注册表找到相应的可接收消息的函数,然后通过 Java 反射来动态地创建对象、执行函数:



对于同步阻塞模式,EventBus 在一个线程内依次执行相应的函数。对于异步非阻塞模式,EventBus 通过一个线程池来执行相应的函数。

@Subscribe

首先定义一个标记注解@Subscribe,用于标明观察者中的哪个函数可以接收消息:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Beta
public @interface Subscribe {}

ObserverAction

接着,我们需要对@Subscribe所注解的方法进行抽象,用一个ObserverAction 类来表示。其中,target 表示观察者类,method 表示方法:

public class ObserverAction {
  private Object target;
  private Method method;

  public ObserverAction(Object target, Method method) {
    this.target = Preconditions.checkNotNull(target);
    this.method = method;
    this.method.setAccessible(true);
  }

  public void execute(Object event) { 
    try {
      method.invoke(target, event);
    } catch (InvocationTargetException | IllegalAccessException e) {
      e.printStackTrace();
    }
  }
}

ObserverRegistry

ObserverRegistry 类就是前面讲到的 Observer 注册表,是最复杂的一个类,框架中几乎所有的核心逻辑都在这个类中。这个类大量使用了 Java 的反射语法,不过代码整体来说都不难理解:

public class ObserverRegistry {
  private ConcurrentHashMap<Class<?>, CopyOnWriteArraySet<ObserverAction>> registry = new ConcurrentHashMap<>();

  public void register(Object observer) {
    Map<Class<?>, Collection<ObserverAction>> observerActions = findAllObserverActions(observer);
    for (Map.Entry<Class<?>, Collection<ObserverAction>> entry : observerActions.entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<ObserverAction> eventActions = entry.getValue();
      CopyOnWriteArraySet<ObserverAction> registeredEventActions = registry.get(eventType);
      if (registeredEventActions == null) {
        registry.putIfAbsent(eventType, new CopyOnWriteArraySet<>());
        registeredEventActions = registry.get(eventType);
      }
      registeredEventActions.addAll(eventActions);
    }
  }

  public List<ObserverAction> getMatchedObserverActions(Object event) {
    List<ObserverAction> matchedObservers = new ArrayList<>();
    Class<?> postedEventType = event.getClass();
    for (Map.Entry<Class<?>, CopyOnWriteArraySet<ObserverAction>> entry : registry.entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<ObserverAction> eventActions = entry.getValue();
      if (postedEventType.isAssignableFrom(eventType)) {
        matchedObservers.addAll(eventActions);
      }
    }
    return matchedObservers;
  }

  private Map<Class<?>, Collection<ObserverAction>> findAllObserverActions(Object observer) {
    Map<Class<?>, Collection<ObserverAction>> observerActions = new HashMap<>();
    Class<?> clazz = observer.getClass();
    for (Method method : getAnnotatedMethods(clazz)) {
      Class<?>[] parameterTypes = method.getParameterTypes();
      Class<?> eventType = parameterTypes[0];
      if (!observerActions.containsKey(eventType)) {
        observerActions.put(eventType, new ArrayList<>());
      }
      observerActions.get(eventType).add(new ObserverAction(observer, method));
    }
    return observerActions;
  }

  private List<Method> getAnnotatedMethods(Class<?> clazz) {
    List<Method> annotatedMethods = new ArrayList<>();
    for (Method method : clazz.getDeclaredMethods()) {
      if (method.isAnnotationPresent(Subscribe.class)) {
        Class<?>[] parameterTypes = method.getParameterTypes();
        Preconditions.checkArgument(parameterTypes.length == 1,
                "Method %s has @Subscribe annotation but has %s parameters."
                        + "Subscriber methods must have exactly 1 parameter.",
                method, parameterTypes.length);
        annotatedMethods.add(method);
      }
    }
    return annotatedMethods;
  }
}

EventBus

最后是EventBus,下面的EventBus实现了阻塞同步模式:

public class EventBus {
  private Executor executor;
  private ObserverRegistry registry = new ObserverRegistry();

  public EventBus() {
    this(MoreExecutors.directExecutor());
  }

  protected EventBus(Executor executor) {
    this.executor = executor;
  }

  public void register(Object object) {
    registry.register(object);
  }

  public void post(Object event) {
    List<ObserverAction> observerActions = registry.getMatchedObserverActions(event);
    for (ObserverAction observerAction : observerActions) {
      executor.execute(new Runnable() {
        @Override
        public void run() {
          observerAction.execute(event);
        }
      });
    }
  }
}

看代码你可能会有些疑问,这明明就用到了线程池 Executor 啊。实际上,MoreExecutors.directExecutor() 是 Google Guava 提供的工具类,看似是多线程,实际上是单线程。之所以要这么实现,主要还是为了跟 AsyncEventBus 统一代码逻辑,做到代码复用。

AsyncEventBus

有了 EventBus,AsyncEventBus 的实现就非常简单了。为了实现异步非阻塞的观察者模式,它就不能再继续使用 MoreExecutors.directExecutor() 了,而是需要在构造函数中,由调用者注入线程池:

public class AsyncEventBus extends EventBus {
  public AsyncEventBus(Executor executor) {
    super(executor);
  }
}

四、开源示例

Netty 中观察者模式的运用非常多,经常使用的ChannelFuture.addListener 接口就是观察者模式的实现:

ChannelFuture channelFuture = channel.writeAndFlush(object);

channelFuture.addListener(future -> {
    if (future.isSuccess()) {
        // do something
    } else {
        // do something
    }
});

addListener 方法会将监听器添加到 ChannelFuture 当中,并在 ChannelFuture 执行完毕的时候立刻通知已经注册的监听器。所以 ChannelFuture 是被观察者,addListener 方法用于添加观察者。

五、总结

观察者模式的应用场景非常广泛,小到代码层面的解耦,大到架构层面的系统解耦,再或者一些产品的设计思路,都有这种模式的影子,比如,邮件订阅、RSS Feeds,本质上都是观察者模式。

正文到此结束

感谢赞赏~

本文目录