스레드 통신
파이프
특징
- 자바에서 제공하는 java.io 패키지의 API. 블로킹 IO를 지원한다.
- 같은 프로세스 내의 두 스레드간의 단방향 데이터 전송을 지원
- 리눅스의 파이프 통신 :
프로세스간
통신 - 자바의 파이프 통신 :
동일 프로세스내의 스레드간
통신
- 리눅스의 파이프 통신 :
- 일반적인 사용케이스 : 하나의 태스크에서 다른 태스크로 계속해서 데이터를 옮길 때
- 데이터 전송 유형
- Binary : PipedInputStream / PipedOutputStream
- String : PipedReader / PipedWriter
- 파이프의 수명
- 연결설정(connect()) : 시작
- 연결종료(close()) : 종료
예제
데이터 생산자(UI 스레드)
- EditText로부터 글자를 입력받아서, 처리할 스레드로 전송
public class TestActivity extends Activity implements TextWatcher {
private EditText editText;
private PipedWriter writer;
private TextHandlerThread workerThread;
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentVie(R.layout.activity_test);
editText = (EditText) findViewById(R.id.edit);
editText.addTextChangedListener(this);
initializeConnection();
}
private void initializeConnection() {
writer = new PipedWriter();
workerThread = new TextHandlerThread();
try {
workerThread.connect(writer);
workerThread.start();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void onDestroy() {
super.onDestroy();
workerThread.interrupt();
try {
writer.close();
workerThread.disconnect();
} catch (IOExcepton e) {
e.printStackTrace();
}
}
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) {
}
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
//추가되는 문자만 처리
try {
if (count > before) {
writer.write(s.subSequence(before, count).toString());
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void afterTextChanged(Editable s) {
}
}
데이터 소비자(Worker 스레드)
- UI 스레드로부터 전달받은 글자를 처리
public class TextHandlerThread extends Thread {
private PipedReader reader;
public TextHandlerThread() {
reader = new PipedReader();
}
public void connect(PipedWriter writer) throws IOException {
writer.connect(reader);
}
public void disconnect() throws IOException {
reader.close();
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
int i;
while ((i = reader.read()) != -1) {
char c = (char) i;
//TODO : 문자처리 로직 추가
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
공유메모리
- 공유 메모리 : 같은 프로세스내의 모든 스레드가 접근할 수 있는 메모리 영역(Heap 영역)
- 인스턴스 멤버변수
- 스태틱 멤버변수
- 객체
- 공유메모리를 이용해 스레드간 데이터 Read/Write - 각 스레드간의 동기화 문제가 발생할 여지가 있다!
- 시그널링을 통해 해결
시그널링
- 시그널링 : 특정 스레드의 상태를 다른 스레드로 전파
- notify() / signal() - 무작위로 하나의 메서드만 꺠움
- notifyAll() / signalAll() - 기다리는 모든 스레드를 깨움
동기화타입 | synchronized | ReentrantLock | ReentrantReadWriteLock |
---|---|---|---|
Block & Wait | Object.wait() Object.wait(timeout) | Condition.await() Condition.await(timeout) | Condition.await() Condition.await(timeout) |
Notify | Object.notify() Object.notifyAll() | Condition.signal() Condition.signalAll() | Condition.signal() Condition.signalAll() |
- 안드로이드의 UI스레드에서는 사용할 수 없다.
- UI스레드가 block 상태로 지속되면, ANR 발생
블로킹 큐
java.util.concurrent.BlockingQueue
클래스 API- 저수준 메커니즘인 시그널링 기법을 추상화하여 만든 고수준 메커니즘
- 스레드 시그널링 + 리스트구현 래핑
- 생산자 - 소비자 패턴을 이용해 문제 해결
- put() : 큐에 넣는 메서드. 큐가 가득차면 block
- take() : 큐에서 빼는 메서드. 큐가 비면 block
메시지 전달
- Non-Blocking 생산자 - 소비자 패턴 메커니즘
android.os
패키지의 API를 이용- Message : 데이터 / 태스크를 포함하는 단위. 스레드간 주고받는 주체
- MessageQueue : 소비자 스레드에서 처리할 메시지들을 담고있는 무제한의 연결리스트. 루퍼 당 하나의 스레드만 연결될 수 있다.
- Looper : 메시지 발송담당. 스레드 당 하나의 루퍼만 연결될 수 있다.
- Handler : 생산자 스레드에서의 메시지 큐잉, 소비자 스레드에서의 메시지 처리를 담당.
Message
- 스레드간 통신을 위해 주고받는 주체
- 데이터 or 태스크 중 하나만을 옮기는 컨테이너 객체
- 데이터 : 소비자 스레드에서 처리됨
- 태스크 : 소비자 스레드에서 동작하는 태스크. 현재 진행중인 태스크가 없을 때 실행됨
- 소비자 스레드에서 메시지의 처리
- 데이터가 포함된 메시지 : Handler의
handleMessage(Message)
콜백메서드에서 수신된 데이터 메시지를 처리 - 태스크가 포함된 메시지 : 스레드에서 자동적으로 태스크를 실행. handleMessage() 콜백메서드에서는 태스크 메시지를 수신하지 않는다.
- 데이터가 포함된 메시지 : Handler의
Message 내부 필드
필드명 | 자료형 | 설명 |
---|---|---|
what | int | 메시지의 식별자 |
arg1, arg2 | int | 정수값을 전달하는 경우에 사용되는 간단한 데이터 필드 |
obj | Object | 임의의 객체. 이 객체가 다른 프로세스의 스레드로 전달될 때는 반드시 Parcelable 로 구현해야 한다. |
data | Bundle | 임의의 데이터값을 넣는 컨테이너 |
replyTo | Messanger | 다른 프로세스의 핸들러 참조. 프로세스간 통신을 위해 존재한다. |
callback | Runnable | 스레드에서 실행할 태스크. Handler의 postXXX() 메서드에서 보낸 Runnable 객체 |
Message 생명주기
- 4가지의 상태가 존재.
- 앱에서 메시지를 다룰 때, 메시지의 상태에 어떠한 가정도 하지 않아야 한다.
- 일단 메시지가 메시지큐에 추가되면, 메시지 안의 내용은 변경되면 안된다.
MessageQueue
- 처리될 메시지들로 구성된 무제한의 단방향 연결리스트
- 스레드에서는 핸들러를 이용하여 메시지큐 처리
- 생산자 스레드 : 메시지큐에 메시지 삽입
- 소비자 스레드 : 메시지큐에서 메시지 추출하여 처리
- 메시지는 타임스탬프를 기준으로 정렬된다.
- 가장 낮은 타임스탬프값을 가진 메시지가 맨 앞에 놓인다.(가장 빨리 처리된다.)
현재시간보다 타임스탬프값이 작을 때만
해당 메시지를 소비자 스레드로 전달한다. 현재시간보다 타임스탬프값이 크다면(미래의 메시지), 해당 시각이 될 때까지 기다린 후 메시지를 전달한다.
Looper
- 메시지큐에 있는 메시지를 정확한 핸들러로 보내는 역할을 담당
- 소비자 스레드에서는 메시지큐에 직접접근을 하지 않고, 루퍼를 통해 메시지를 얻는다.
- 루퍼는 스레드당 하나만 연결된다.
- 이미 스레드와 연결된 루퍼를 다른스레드에 다시 연결하려고 하면 RuntimeException을 던진다.
- 결과적으로, 스레드는 하나의 메시지큐만 가지는 것이 된다.
- UI스레드를 제외한 모든 스레드는 메시지처리를 하기 위해서 명시적으로 루퍼를 얻어야 하고, 메시지 처리를 끝내려는 스레드는 명시적으로 루퍼를 종료해주어야 한다.
Looper 연결
public class ConsumerThread extends Thread {
@Override
public void run() {
Looper.prepare();
//TODO : 루퍼와 핸들러 연결
Looper.loop();
}
}
- Looper.prepare() : 루퍼를 생성하고, 루퍼와 현재 스레드를 연결한다. 아직 이 단계에서 메시지를 전송하지는 않는다.
- Looper.loop() : run()메서드가 완료되지 않았음을 보장하는 Blocking 메서드. run()메서드가 차단된 동안, 루퍼는 이 스레드로 메시지를 전달한다.
Looper 종료
- quit() / quitSafely() 메서드
- quit() : 현재시각보다 낮은 타임스탬프의 메시지를 포함하여, 모든 대기중인 메시지를 폐기
- quitSafely() : 현재시각보다 낮은 타임스탬프의 메시지는 제외하고, 높은 타임스탬프의 대기메시지들을 폐기. 남은 메시지들은 루퍼가 종료되기 전에 처리된다. API레벨 18에서 추가된 메서드.
- 루퍼를 종료한다고 스레드가 종료되지는 않는다. 단지, blocking 중이었던 loop() 메서드를 빠져나오는 것이다.
UI스레드의 루퍼
- UI스레드는 앱이 초기화될 때 기본적으로 루퍼를 연결하기 때문에, 따로 연결할 필요가 없다.
- UI스레드 루퍼는 종료시킬 수 없다. quit() / quitSafely() 메서드가 호출되면 RuntimeException을 던진다.
- UI스레드 루퍼를 다른 스레드에 부착할 수 없다.
Looper.getMainLooper()
메서드를 이용하면 어디서든 UI스레드 루퍼를 접근할 수 있다.
Handler
- 스레드에서 메시지 삽입/처리를 직접적으로 담당.
- 핸들러는 내부적으로 메시지큐와의 상호작용을 하기위해 루퍼를 이용한다. 그러므로, 핸들러를 이용하는 스레드는 반드시 루퍼를 연결해야 한다.
Handler 생성
- 생성시점에 루퍼와 바인딩된다. 루퍼와의 바인딩에 실패하면 RuntimeException을 던진다.
- 한번 바인딩된 루퍼는 변경될 수 없다.
// 현재 스레드의 루퍼와 바인딩하는 생성자
new Handler();
new Handler(Handler.Callback);
//명시적으로 선언된 루퍼와 바인딩하는 생성자
new Handler(Looper);
new Handler(Looper, Handler.Callback);
메시지 삽입
//데이터 메시지 (sendXXX())
//메시지 객체 직접 전달
boolean sendMessage(Message msg)
boolean sendMessageAtFirstOfQueue(Message msg)
boolean sendMessageAtTime(Message msg, long uptimeMillis)
boolean sendMessageDelayed(Message msg, long delayMillis)
//메시지 의도만 전달
boolean sendEmptyMessage(int what)
boolean sendEmptyMessageAtTime(int what, long uptimeMillis)
boolean sendEmptyMessageDelayed(int what, long delayMillis)
//태스크 메시지 (postXXX())
boolean post(Runnable r)
boolean postAtFrontOfQueue(Runnable r)
boolean postAtTime(Runnable r, Object token, long uptimeMillis)
boolean postAtTime(Runnable r, long uptimeMillis)
boolean postDelayed(Runnable r, long delayMillis)
- 메시지 전달순서 조절 - 앱이 유일하게 핸들링할 수 있는 방법이다.
- 메시지의 처리시간은 불확정이다 - 기존에 처리중인 메시지 / 운영체제의 스케쥴링에 영향을 받기 때문
- default : 전달이 즉시 가능
- at_front : 타임스탬프값 0으로 전달되어, 다른메시지가 끼어들지 않는 한. 다음에 전달될 메시지가 된다.
- delay : 이 지연시간 후에 메시지가 전달될 수 있다.
- uptime : 이 절대시간에 메시지가 전달될 수 있다.
메시지 처리
- 태스크 메시지
- 따로 핸들링할 필요 없음. 전송된 태스크 Runnable의 run 메서드가 자동적으로 실행된다.
- 데이터 메시지
- 소비자 스레드에서
Handler.handleMessage(Message)
메서드를 오버라이드 하여 받은 메시지를 처리한다. - 소비자 스레드에서 Handler에 콜백 인터페이스를 등록한다.
- 소비자 스레드에서
//Handler.handlerMessage() 오버라이드
public class ConsumerThread extends Thread {
public Handler handler;
@Override
public void run() {
Looper.prapare();
// 핸들러는 반드시 Looper가 설정된 다음에 생성해야 한다!!
handler = new Handler() {
@Override
public void handleMessage(Message msg) {
//메시지 처리
}
};
Looper.loop();
}
}
//Handler에 콜백 인터페이스 등록
public class ConsumerThread extends Thread implements Handler.Callback {
public Handler handler;
@Override
public void run() {
Looper.prapare();
// 핸들러는 반드시 Looper가 설정된 다음에 생성해야 한다!!
handler = new Handler(this);
Looper.loop();
}
@Override
public boolean handleMessage(Message msg) {
//메시지 처리
//더이상 처리할 메시지가 없을때 true 리턴. false를 리턴하면 Handler.handleMessage() 메서드가 이어서 호출된다.
return true;
}
}
메시지 제거
- Handler의 메서드를 이용하면 큐에 넣어진 메시지를 제거할 수 있다.
- 단, 메시지가 루퍼에 의해 큐에서 꺼내지기 전에만 가능하다.
- 메시지를 정확하게 식별할 수 있어야 한다.
식별자 타입 | 설명 | 적용되는 메시지 타입 |
---|---|---|
Handler | 메시지 수신자 | 태스크 & 데이터 메시지 |
Object | 메시지 태그 | 태스크 & 데이터 메시지 |
int | 메시지의 what 매개변수 | 데이터 메시지 |
Runnable | 실행될 태스크 | 태스크 메시지 |
// 메시지 큐에서 태스크 제거
void removeCallbacks(Runnable r)
void removeCallbacks(Runnable, Object token)
//메시지 큐에서 데이터 제거
void removeMessages(int what)
void removeMessages(int what, Object object)
//메시지 큐에서 태스크 / 데이터 동시에 제거(Object로 식별할 경우만 가능)
void removeCallbakcsAndMessages(Object token)
댓글 없음:
댓글 쓰기