블로그나 유튜브를 보면, 구독을 해놓고 새글이나 영상이 올라올 때 알람을 받는다. 이런 상호작용을 게시-구독(Publish-Subscribe)관계라고 한다. Observer pattern은 이처럼 observer를 등록해놓고 대상의 변화에 대한 알람을 받는 경우에 적용되는 design pattern이다.
Observer pattern은 callback의 형태로 이미 많은 곳에서 사용하고 있고 경험해봤을 것이다. 안드로이드에서 BroadcastReceiver를 등록해 사용하는 것도 이런 형태의 하나라고 볼 수 있다.
하나의 데이터를 두고 여러방식의 그래프로 표현할 때, 데이터의 변화를 실시간 반영하는 경우에도 적용된다. 이 경우, model-view 관계가 되고 조금 더 확장해서 Controller가 추가되면 MVC 패턴이된다. 즉, MVC 패턴에서 Model과 View 사이에 Observer Pattern이 사용된다는 얘기.
프로그램의 Settings 또는 Preference 항목을 변경하는 경우도 적용할 수 있다. 예를들어 폰트를 변경하면, 현재 보여지고 있는 텍스트의 폰트들도 바로 변경되어야 한다.
다자간 채팅 프로그램도 이를 사용해야 하는걸 알 수 있다. 채팅방이 subject가 되고 새로운 채팅이 올라오면 모든 observer들인 채팅 참가자들에게 전달해야한다.
UML 표현은 다음과 같다. ( Wikipedia 참조 )

눈여겨 봐야할 부분은 update() 부분과, update() 이후, getState()로 상태를 얻어오는 부분이다.
첫번째 문제는 동기화 문제이다. 단일 쓰레드일 때는 상관이 없지만, Observer들이 독립된 별개의 쓰레드들이라면 문제가 생긴다. 그렇다면 어느부분에 동기화 오브젝트나 블럭을 써야 할까? 사실, 이 고민은 Kotlin, C# 구현중에는 미뤄뒀다가 python 구현에 와서 찾아봤다. 정답은 Observer 리스트를 동기화 시켜 관리해야 하는건데, python 구현부분에서 다루겠다. 궁금한 분들을 위해 관련해서 잘 정리된 문서 두개 링크를 남긴다.
둘 째로, observer와 subject의 커플링 문제이다. 그래프상에서 getState()가 concrete class에 존재한다. 그 말은, observer가 concrete class를 알고 있어야 한다는 말이다. 지금까지 design pattern을 살펴본 경험으론 getState()가 인터페이스나 추상클래스에 있어야 할거 같다. 실제로 그렇게 기술된 문서들도 있다. 그렇게 되면 다시 문제가 되는 부분은 getState()로 얻어올 데이터에 대해 추상 클래스인 Subject가 알아야 한다는 점이다.
또한, getstate()가 전체 데이터를 가져오는 경우에는 문제가 없겠지만, 변경사항만 가져오려고 하면 골치가 아파진다. 해결책으로는 변경사항을 가져올 때, timestamp등을 이용할 수 있겠다. 실제 구현들을 보면 getState()가 아니라 update()에서 변경사항을 넘겨주는 경우도 흔하다.
자바를 보면, java.util에서 Observer pattern 구현을 위한 Observable 과 Observer를 제공하고 있다. 코드를 보면, update(Observable obj, Object arg)로 인자를 넘기고 있다. 첫번째 인자로 Observable 자체를 얻어오며, 두번째 인자로 사용자 정의 데이터 객체를 넘겨받고 있다. UML에서 보이듯이 구현을 할지, 자바처럼 구현을 할지는 상황에 맞게 적용하길. 근데, 일상적으론 자바에서의 구현이 보편적일듯 하다.
실사용 예들이 넘쳐나는 흔하게 쓰는 디자인 패턴이라 무슨 예를 코드로 구현해볼까 하다가 군더더기 없는 채팅방을 흉내내 보기로 결정했다. 실제처럼 네트웍을 이용한 구현을 할까 하다가 단지 예를 보여주는 것이니 로컬로 흉내만 내겠다.
Kotlin
여기서는 다루지 않겠지만, Kotlin에선 property에 대해 observer pattern과 유사한 내용을 언어적으로 지원하는 delegate라는게 있다. 이런게 있다는 것만 인지하고 일단 넘어가자.
간단하게 보여주기에는 java의 구현 형태가 좋겠으나, 조금 복잡하더라도 UML에 소개된 형태로 구현해 봤다.
package observer
import java.time.Instant
data class SingleChat(val name: String, val chat: String, val timestamp: Instant)
abstract class ChatRoom{
private val observers = mutableListOf<IObserver>()
public fun attach(observer: IObserver){
observers.add(observer)
observerChanged(observer, true)
}
public fun detach(observer: IObserver){
observers.remove(observer)
observerChanged(observer, false)
}
protected fun chatNotify(){
observers.forEach { observer -> observer.update() }
}
protected open fun observerChanged(observer: IObserver, attached: Boolean){}
abstract fun addChat(data: SingleChat)
abstract fun getChat(observer: IObserver): List<SingleChat>
}
class MyChatRoom: ChatRoom() {
private val chatting = mutableListOf<SingleChat>()
private val timeline = mutableMapOf<IObserver, Instant>()
override fun observerChanged(observer: IObserver, attached: Boolean) {
if(!attached){
timeline.remove(observer)
}
}
public override fun addChat(data: SingleChat){
chatting.add(data)
chatNotify()
}
public override fun getChat(observer: IObserver): List<SingleChat>{
val timestamp = timeline.get(observer)
timeline[observer] = Instant.now()
return if(timestamp != null){
chatting.filter{it.timestamp > timestamp}
}else
chatting
}
}
우선 채팅을 저장하고 Observer에게 전달할 채팅 기본 포맷을 data class로 만들었다. 이름, 내용, 타임스탬프로 이루어져 있다.
구현 부분은 이게 최선인가 싶긴 한데, 부모 클래스에 observer와 관련된 코드만 빼내지 않고, addChat, getChat 인터페이스를 추가했다. 결국, 어떤 데이터를 주고받을지 알고 있어야 하는데 Observer쪽에 Concrete class가 아닌 인터페이스만 드러내고 싶었다. observer관련 코드만 완벽하게 부모클래스로 빼고 싶다면, java의 Observable을 참고하면 된다.
concrete class인 MyChatRoom을 보면, addChat()으로 채팅이 추가되면, chatNotify()로 observer들에게 변경되었음을 브로드캐스팅하게 된다. 이를 받은 observer 들은 getChat()인터페이스를 통해 새로운 채팅을 받아가는데, 새로운 내용만 가져가도록 timestamp를 이용했다. 이전에 getChat()을 통해 대화내용을 가져갔었다면, 그 timestamp를 기록해놓고 그 이후의 대화내용만 리스트 형태로 돌려준다. 만약 처음 부르는 거라면, timestamp기록이 없으므로 전체 대화내용을 돌려준다.
이부분 때문에 예상에 없었던 코드를 추가했는데, observerChanged()라는 메소드를 만들고 attach(), detach()될 때마다 호출되도록 했다. detach 될 때, timestamp 기록에서도 제거하기 위해서이다. 기본 코드를 유지하고 싶어 이렇게 추가를 했는데, 이게 최선인지는 모르겠다.
package observer
import java.time.Instant
interface IObserver{
fun update()
}
class ChatClient: IObserver{
private var chatName: String = ""
private var chatRoom: ChatRoom? = null
fun connect(name: String, room: ChatRoom){
chatName = name
chatRoom = room
room.attach(this)
}
fun disconnect(){
chatRoom?.detach(this)
chatRoom = null
}
fun talk(chat: String){
chatRoom?.addChat(SingleChat(chatName, chat, Instant.now()))
}
override fun update() {
if(chatRoom != null){
val chatting = chatRoom!!.getChat(chatName)
chatting.forEach{println("[client: $chatName] [${it.timestamp}] ${it.name}: ${it.chat}")}
}
}
}
Observer 인터페이스와 가상 채팅 클라이언트 코드이다. connect() 와 disconnect()에서 observer를 등록해주고, update()가 호출되면 getChat을 이용해 대화 내용을 받아와 출력한다. 대화를 올릴 땐 talk() 메소드를 이용한다. 여기에서 timestamp가 찍힌 대화가 올라가게된다.
package observer
fun main(args: Array<String>){
val room: ChatRoom = MyChatRoom()
val client1: ChatClient = ChatClient()
client1.connect("bato", room)
val client2: ChatClient = ChatClient()
client2.connect("wan-tae", room)
val client3: ChatClient = ChatClient()
client3.connect("jung-a", room)
client1.talk("hi! I'm bato")
Thread.sleep(100)
client3.talk("hi! I'm jung-a")
Thread.sleep(100)
client2.talk("Do you see my chat?")
Thread.sleep(100)
client1.disconnect()
client3.disconnect()
client2.talk("Hello?")
}
[client: bato] [2020-05-02T06:59:44.817809200Z] bato: hi! I'm bato
[client: wan-tae] [2020-05-02T06:59:44.817809200Z] bato: hi! I'm bato
[client: jung-a] [2020-05-02T06:59:44.817809200Z] bato: hi! I'm bato
[client: bato] [2020-05-02T06:59:44.930179600Z] jung-a: hi! I'm jung-a
[client: wan-tae] [2020-05-02T06:59:44.930179600Z] jung-a: hi! I'm jung-a
[client: jung-a] [2020-05-02T06:59:44.930179600Z] jung-a: hi! I'm jung-a
[client: bato] [2020-05-02T06:59:45.030512600Z] wan-tae: Do you see my chat?
[client: wan-tae] [2020-05-02T06:59:45.030512600Z] wan-tae: Do you see my chat?
[client: jung-a] [2020-05-02T06:59:45.030512600Z] wan-tae: Do you see my chat?
[client: wan-tae] [2020-05-02T06:59:45.130844600Z] wan-tae: Hello?
Process finished with exit code 0
테스트 코드는 위와 같다. 테스트 대화 중간에 sleep()을 넣었는데, 이게 없으면 실행속도가 빨라 timestamp가 제대로 작동하지 않는다. 실행 결과를 보면, 한명이 말할 때 각 클라이언트에서 대화를 받게되며 마지막 대화는 두 클라이언트가 disconnect된 상태라서 전달되지 않는걸 볼 수 있다.
C#
C#에서는 조금 다른걸 구현해 보려 한다. 앞서, Kotlin의 delegate를 잠깐 언급했지만, C#에서는 event, delegate 라는걸 지원해서 이걸 이용할 수도 있다. 또한, JAVA와 유사하게 IObserver<T>, IObservable<T> 인터페이스를 제공한다.( MS 공식 문서 참조 ) 구현을 살펴보다보니, 생각보다 낯선 내용이라서 이걸 이용해 preference를 구현해 보겠다.
using System;
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
namespace Observer
{
public struct ObserverData
{
public int volume;
public Color color1;
public Color color2;
public ObserverData(int vol, Color color1, Color color2)
{
this.volume = vol;
this.color1 = color1;
this.color2 = color2;
}
}
public class GamePreference: IObservable<ObserverData>
{
private static readonly Lazy<GamePreference> lazy = new Lazy<GamePreference>(() => new GamePreference());
private ObserverData _prefData;
private List<IObserver<ObserverData>> observers;
public ObserverData PrefData
{
get => _prefData;
set
{
_prefData = value;
PrefChangeNotify(_prefData);
}
}
private GamePreference()
{
_prefData = new ObserverData(0, Color.white, Color.white);
observers = new List<IObserver<ObserverData>>();
}
public static GamePreference Instance { get { return lazy.Value; } }
public IDisposable Subscribe(IObserver<ObserverData> observer)
{
if (!observers.Contains(observer)) observers.Add(observer);
return new Unsubscriber(observers, observer);
}
public void PrefChangeNotify(ObserverData data)
{
foreach(IObserver<ObserverData> observer in observers)
{
observer.OnNext(data);
}
}
private class Unsubscriber: IDisposable
{
private List<IObserver<ObserverData>> _observers;
private IObserver<ObserverData> _observer;
public Unsubscriber(List<IObserver<ObserverData>> observers, IObserver<ObserverData> observer)
{
this._observers = observers;
this._observer = observer;
}
public void Dispose()
{
if (_observer != null) _observers.Remove(_observer);
}
}
}
}
Observer가 받게될 struct를 먼저 정의했다. 가상의 preference이므로 volume 과 color값을 가정했다.
Preference는 게임내에서 딱 하나만 존재해야한다. 그래서 singleton 구현코드를 추가했다. ( 이전에 올린 포스팅을 참조 )
핵심은 IObservable<ObserverData> 인터페이스의 구현이다. Subscribe()를 구현했는데 리턴값이 IDisposable이다. MS 문서를 참조해서 Unsubscriber를 구현했다. observer pattern에서는 Observer측에서 detach하기위해, Subject(또는 Provider)를 들고 있어야 하는데, 여기서는 이부분을 IDisposable 인터페이스로 매우 깔끔하게 구현하였다. 물론, 여기서는 provider가 singleton이기 때문에 굳이 이렇게 안해도 되겠지만, 일반적인 Observer Pattern에서는 가장 깔금한 처리방법이 아닌가 생각된다.
using System;
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
namespace Observer
{
public class PrefObserver : MonoBehaviour, IObserver<ObserverData>
{
private IDisposable unsubscriber;
public virtual void Subscribe(IObservable<ObserverData> provider)
{
unsubscriber = provider.Subscribe(this);
}
public virtual void Unsubscribe()
{
unsubscriber.Dispose();
}
public void OnCompleted()
{
// Not used here
Debug.Log("Observable data transfer complete.");
}
public void OnError(Exception error)
{
// Not used here
Debug.Log("Observable data transfer error.");
}
public void OnNext(ObserverData data)
{
Debug.Log(gameObject.name + ":" + data.volume + ", " + data.color1 + ", " + data.color2);
}
private void OnEnable()
{
GamePreference pref = GamePreference.Instance;
unsubscriber = pref.Subscribe(this);
}
private void OnDisable()
{
if (unsubscriber != null) unsubscriber.Dispose();
}
}
}
Observer측에선 IObserver<ObserverData> 의 인터페이스를 구현해야 한다. 변경사항은 OnNext()가 호출되고, 더이상 변화가 없을 경우 OnComplete()이 호출된다. 에러가 발생하면 OnError()를 쓴다. 이렇게 3개의 인터페이스인데, 사실 Observable에서 어떻게 호출하냐에 달려있다. 일반적인 update()만 생각하면 OnNext()만 구현하면 되고 여기서 그렇게 했다.
고민되던 지점은 받아오는 ObserverData 인데, 하나의 데이터 객체로 preference의 변경된 부분만 전달하기가 좀 애매해서 통채로 전달했다. 좀 더 발전시킨다면, Dictionary를 쓰는 방법이 있을 수 있겠다.
Observer의 등록과 해지는 OnEnable(), OnDisable()에서 구현했다. Subscribe()에서 IDisposable 구현을 받아와 저장했다가, 해지할 때 Dispose()만 호출해주면 된다.
Unity에서 구현된 것으로 어느 객체에든 script component로 붙여놓으면 된다. Preference는 singleton으로 단일하지만, observer가 되는 객체는 얼마든지 있을 수 있다.
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
namespace Observer
{
public class PrefTestScript : MonoBehaviour
{
private List<ObserverData> sample = new List<ObserverData>();
private int index = 0;
// Start is called before the first frame update
void Start()
{
sample.Add(new ObserverData(30, Color.green, Color.cyan));
sample.Add(new ObserverData(100, Color.gray, Color.magenta));
sample.Add(new ObserverData(10, Color.yellow, Color.magenta));
}
// Update is called once per frame
void Update()
{
if (Input.GetKeyDown(KeyCode.Space))
{
if(index < sample.Count)
{
GamePreference.Instance.PrefData = sample[index];
index++;
}
}
}
}
}
Preference를 위한 UI를 만드는건 시간이 걸리므로 순전히 테스트를 위해 만든 코드이다. Preference를 변경하면서 Observer들이 데이터를 받는지 확인해야 하는데, 그 변경을 여기에서 한다. 테스트용 객체를 하나 만들고 이 스크립트 컴포넌트를 추가한 후 실행하면 게임화면에서 스페이스바가 눌릴 때마다 샘플 데이터가 입력된다. 결과는 다음과 같다.

PrefObserver1, PrefObserver2 두개의 Observer객체를 만들고 테스트를 돌려봤다. 스페이스바가 눌릴 때마다 각각 값을 잘 받아가는걸 볼 수 있다.
처음엔 event와 delegate를 이용한 간단한걸 만들 생각이었는데, Observer관련 인터페이스와 설명 문서가 딱 나와있으니 만들어 봐야했다. Observer pattern은 날로먹을 생각을 하고 있었는데, 새로운 내용도 많고 복잡한거 같다.
Python
Kotlin으로 구현했던 채팅방 흉내를 다시 내보자. 이번에는 thread safe 까지 고려해 동기화 코드를 추가할 것이다.
python에서는 java처럼 synchronized 같은걸 지원하지 않는다. 대신 Lock과 같은 동기화 오브젝트들은 지원한다. ( 공식 문서 참조 )
from dataclasses import dataclass
@dataclass
class SingleChat:
name: str = ""
chat: str = ""
timestamp: float = 0.
Kotlin에서와 동일하게 채팅 데이터 클래스를 정의했다. python에서 timestamp는 float로 표시되는데, 소수점 위는 초단위, 아래는 밀리초 단위를 표시한다.
from abc import ABC, abstractmethod
from typing import List
import threading
import copy
class IObserver(ABC):
@abstractmethod
def update(self, data):
pass
class Observable:
def __init__(self):
self.observers: List[IObserver] = []
self.lock = threading.Lock()
def attach(self, observer: IObserver):
with self.lock:
self.observers.append(observer)
def detach(self, observer: IObserver):
with self.lock:
self.observers.remove(observer)
def chat_notify(self, data):
with self.lock:
observers_copy = self.observers[:]
for observer in observers_copy:
observer.update(data)
Java 처럼 IObserver와 Observable 클래스를 만들었다. dynamic type 언어의 장점이 보이는데, notify로 전달하는 data의 타입을 지정하지 않아도 되서 concrete class와 완전히 분리된 걸 볼 수 있다.
주목할 점은 thread-safe 코드의 추가인데, lock을 하나 생성하고 여러 쓰레드에서 접근하는 데이터인 observers list 사용시, lock을 걸고 사용해 동기화 시켰다. notify하는 부분에서는 lock이 걸린채로 observer.update()를 호출하면 deadlock을 유발할 수 있으므로, 리스트를 복사할 때만 lock을 걸고 복사된 데이터를 사용했다.
from observer.data import SingleChat
from observer.abc_observer import Observable
class ChatRoom(Observable):
def talk(self, data: SingleChat):
self.chat_notify(data)
Observable을 상속한 채팅방 클래스이다. 앞서 Kotlin의 구현과 다르게, notify에서 데이터가 전달되므로 새로운 대화를 추가하는 talk만 추가 되었다. Observer관련 코드도 깔끔하게 떨어져서 다른 추가코드는 필요치 않았다.
from abc import ABC, abstractmethod
from observer.chatroom import ChatRoom
from observer.data import SingleChat
from datetime import datetime
import time
from observer.abc_observer import IObserver
class ChatClient(IObserver):
def __init__(self):
self.name: str = None
self.room: ChatRoom = None
def update(self, data: SingleChat):
print(f"[{self.name}] {data.name}, {data.chat} - {datetime.fromtimestamp(data.timestamp)}")
def connect(self, name: str, room: ChatRoom):
self.name = name
self.room = room
self.room.attach(self)
def disconnect(self):
self.room.detach(self)
self.room = None
def talk(self, msg: str):
if self.room is not None:
self.room.talk(SingleChat(self.name, msg, time.time()))
Observer가 되는 채팅 클라이언트 구현이다. IObserver를 상속해서 update()를 override했다. time.time()은 현재의 timestamp를 가져오는 함수이다.
import threading
from observer.chatroom import ChatRoom
from observer.chatclient import ChatClient
import time
def client1_thread(client: ChatClient):
time.sleep(0.1)
client.talk("hi! I'm bato")
time.sleep(1)
client.disconnect()
def client2_thread(client: ChatClient):
time.sleep(0.5)
client.talk("Do you see my chat?")
time.sleep(2)
client.talk("Hello?")
def client3_thread(client: ChatClient):
time.sleep(0.7)
client.talk("hi! I'm jung-a")
time.sleep(0.1)
client.disconnect()
def main():
room1 = ChatRoom()
client1 = ChatClient()
client1.connect("bato", room1)
client2 = ChatClient()
client2.connect("wan-tae", room1)
client3 = ChatClient()
client3.connect("jung-a", room1)
thread1 = threading.Thread(target=client1_thread, args=(client1,))
thread2 = threading.Thread(target=client2_thread, args=(client2,))
thread3 = threading.Thread(target=client3_thread, args=(client3,))
thread1.start()
thread2.start()
thread3.start()
if __name__ == "__main__":
main()
멀티쓰레드 동기화를 신경쓴 만큼, 테스트 코드도 멀티 쓰레드로 구성해 봤다.
[bato] bato, hi! I'm bato - 2020-05-03 16:38:11.351498
[wan-tae] bato, hi! I'm bato - 2020-05-03 16:38:11.351498
[jung-a] bato, hi! I'm bato - 2020-05-03 16:38:11.351498
[bato] wan-tae, Do you see my chat? - 2020-05-03 16:38:11.745049
[wan-tae] wan-tae, Do you see my chat? - 2020-05-03 16:38:11.745049
[jung-a] wan-tae, Do you see my chat? - 2020-05-03 16:38:11.745049
[bato] jung-a, hi! I'm jung-a - 2020-05-03 16:38:11.948585
[wan-tae] jung-a, hi! I'm jung-a - 2020-05-03 16:38:11.948585
[jung-a] jung-a, hi! I'm jung-a - 2020-05-03 16:38:11.948585
[wan-tae] wan-tae, Hello? - 2020-05-03 16:38:13.761212
3개의 쓰레드에서 대화가 오가는걸 볼 수 있고, disconnect 이후에는 대화를 받지 않는걸 마지막에 확인가능하다.
Observer pattern은 자주 보던 패턴이라 쉽게만 생각했는데, 문서의 내용과 실제 언어들 구현에 차이가 있어 고민이 좀 많았다. 진짜, 날로 먹으려 했던 패턴인데, 이렇게 방대한 내용이 있을줄 몰랐다. 워낙 흔하게 쓰이다보니 언어레벨에서 지원하는 내용들도 많은데 다 나에겐 낯선 내용들이었다. 구현 방법도 가지각색인데, 그것들 조차 생각처럼 간단한 구현들이 아니기도 했고. C#의 IDisposable은 Observer Pattern의 꺼림직한 부분에 깔끔한 답을 주긴 했지만, 구현이 너무 번잡했다. 또 실무에서는 백퍼 동기화 문제를 무시할 수 없을텐데, 이에 대한 내용도 마지막 Python구현에 가서야 살펴봤다. 벽에 막힐 때마다가 미루다 보니 이 포스팅 하나에 3일이 걸렸는데, 그럴만한 주제가 아니었나 생각되네.