Redis 의 Publish / Subscribe 구조
: 발신자(게시자)가 특정 수진자(구독자)에게 메시지를 보내도록 프로그래밍 되지않은 pub/sub 메시징 패러다임이다. 게시된 메시지는 구독자에 대한 정보 없이 채널로 특성화된다. 구독자는 하나 이상의 채널에 관심을 표하게 되면, 오직 관심있는 pub들의 메시지들만 받게 된다.
foo와 bar 채널을 구독하기 위해 client는 다음의 명령어를 실행한다.
SUBSCRIBE foo bar
다른 클라이언트가 foo나 bar에 보낸 메시지들은 구독한 client에게 pushed 됩니다.
Pub/Sub는 key에 대해 일체의 간섭이 없게 만들어졌습니다.
1보다 숫자가 많은 10번 db에서 publising 하더라도, 1번 db에서 subscribe 할 수 있는 것입니다.
따라서, 범위지정(scoping)이 필요하다면, channel 앞에 environment의 이름을 붙이는 식으로 해결해야 합니다.
(test, staging, production etc.... 와일드 카드를 사용 가능.)
다음은 first와 second 채널을 구독할때, 실행되는 결과이다.
SUBSCRIBE first second
================
*3
$9
subscribe
$5
first
:1
================
*3
$9
subscribe
$6
second
:2
================
======= 은 임의로 내가 추가해서 찍은 것이고, 잘 보면, first와 second에 해당하는 parsing을 하는 모습으로 보여진다.
second에 Hello 라는 message를 publising 하게되면
PUBLISH second Hello
*3
$7
message
$6
second
$5
Hello
다음과 같은 결과가 나오게 되는데, $7 (type?)은 message, $6(target?)은 second $5(content?)은 Hello임을 볼 수 있다.
Redis in socket.io
socket.io에서는 Redis Adapter를 사용하는데, 구조도를 보면 다음과 같다.
즉, emit 명령이 날라오면, redis에서는 각 서버로 할당된 Redis adapter로 모든 신호를 날려주는 것이다.
주로 broadcast를 emit 하는 경우 전체 socket에 메시지를 전달하며, local을 emit 하는 경우에는 다음 그림과 같다.
자신이 붙어있는 서버 내에서만 메시지를 뿌리며, redis로는 전달되지 않는게 핵심이지만, 붙어 있는 서버의 대한 client 관리를 하는게 어렵다고 생각하기 때문에, 자주 사용하지 않을 것 같다.
오히려 namespace 속 room별로 나누어서 broadcast 하는 방식을 서비스 단계에서는 주로 사용될 것 같다.
socket.io private-messaging 예제
socket.io example 중에서 private-messaging 예제가 redis와 cluster를 사용하는 예제이며, socket.io를 이해하기에 꽤나 괜찮은 예제이기 때문에, 참고하려고 한다. 그러나, 중요한 문제가 있다면 window에서는 redis를 지원하지 않기 때문에, 주로 linux 환경에서 돌리거나, docker를 사용하거나 하는데, 최근 window에서는 wsl을 통해서, linux를 지원해주고 있고, 생각보다, window와의 호환성이 좋아지고 있다. 따라서, wsl ubuntu 상에서 redis-server를 깔고, service를 켜주면, window 상에서 돌아가기 때문에, 돌릴 수 있다.
private-messaging 예제를 돌리기 위한 순서는 다음과 같다.
1. wsl상에서 redis-server를 켜둔다.
2. private-messaging 위치에서 vue로 이루어진 front단을 실행한다.
npm install
npm run serve
3. 다른 터미널을 열어서, /server 폴더 안에서 서버를 실행한다.
cd ./server
npm install
npm start
4. localhost:8080/으로 들어가, 잘 동작하는지 체크한다.
동작해보면, 잘 작동하는 것을 볼 수 있다.
socket.io의 get-started를 보면, documents로 잘 정리되어 있으며, 자세한 구조를 알고 싶으면 다음 링크를 참조하는 것이 좋다.
[참조: https://socket.io/get-started/]
part 1 ~ part 4 까지 천천히 읽어본다면, 전체적인 구조가 이해가 될 것이다.
특히 part.4에서 cluster와 redis를 사용하는 모습을 볼 수 있는데,
// ./server/index.js
const httpServer = require("http").createServer();
const Redis = require("ioredis");
const redisClient = new Redis();
const io = require("socket.io")(httpServer, {
cors: {
origin: "http://localhost:8080",
},
adapter: require("socket.io-redis")({
pubClient: redisClient,
subClient: redisClient.duplicate(),
}),
});
const { RedisSessionStore } = require("./sessionStore");
const sessionStore = new RedisSessionStore(redisClient);
const { RedisMessageStore } = require("./messageStore");
const messageStore = new RedisMessageStore(redisClient);
// ./server/sessionStore.js
/* abstract */
class SessionStore {
findSession(id) {}
saveSession(id, session) {}
findAllSessions() {}
}
class InMemorySessionStore extends SessionStore {
constructor() {
super();
this.sessions = new Map();
}
findSession(id) {
return this.sessions.get(id);
}
saveSession(id, session) {
this.sessions.set(id, session);
}
findAllSessions() {
return [...this.sessions.values()];
}
}
const SESSION_TTL = 24 * 60 * 60;
const mapSession = ([userID, username, connected]) =>
userID ? { userID, username, connected: connected === "true" } : undefined;
class RedisSessionStore extends SessionStore {
constructor(redisClient) {
super();
this.redisClient = redisClient;
}
findSession(id) {
return this.redisClient
.hmget(`session:${id}`, "userID", "username", "connected")
.then(mapSession);
}
saveSession(id, { userID, username, connected }) {
this.redisClient
.multi()
.hset(
`session:${id}`,
"userID",
userID,
"username",
username,
"connected",
connected
)
.expire(`session:${id}`, SESSION_TTL)
.exec();
}
async findAllSessions() {
const keys = new Set();
let nextIndex = 0;
do {
const [nextIndexAsStr, results] = await this.redisClient.scan(
nextIndex,
"MATCH",
"session:*",
"COUNT",
"100"
);
nextIndex = parseInt(nextIndexAsStr, 10);
results.forEach((s) => keys.add(s));
} while (nextIndex !== 0);
const commands = [];
keys.forEach((key) => {
commands.push(["hmget", key, "userID", "username", "connected"]);
});
return this.redisClient
.multi(commands)
.exec()
.then((results) => {
return results
.map(([err, session]) => (err ? undefined : mapSession(session)))
.filter((v) => !!v);
});
}
}
module.exports = {
InMemorySessionStore,
RedisSessionStore,
};
// ./server/messageStore.js
/* abstract */
class MessageStore {
saveMessage(message) {}
findMessagesForUser(userID) {}
}
class InMemoryMessageStore extends MessageStore {
constructor() {
super();
this.messages = [];
}
saveMessage(message) {
this.messages.push(message);
}
findMessagesForUser(userID) {
return this.messages.filter(
({ from, to }) => from === userID || to === userID
);
}
}
const CONVERSATION_TTL = 24 * 60 * 60;
class RedisMessageStore extends MessageStore {
constructor(redisClient) {
super();
this.redisClient = redisClient;
}
saveMessage(message) {
const value = JSON.stringify(message);
this.redisClient
.multi()
.rpush(`messages:${message.from}`, value)
.rpush(`messages:${message.to}`, value)
.expire(`messages:${message.from}`, CONVERSATION_TTL)
.expire(`messages:${message.to}`, CONVERSATION_TTL)
.exec();
}
findMessagesForUser(userID) {
return this.redisClient
.lrange(`messages:${userID}`, 0, -1)
.then((results) => {
return results.map((result) => JSON.parse(result));
});
}
}
module.exports = {
InMemoryMessageStore,
RedisMessageStore,
};
adapter로 세팅하고, sessionStorage와 messageStorage를 Redis를 활용하여, 구현한 모습을 볼 수 있다.
multi() 명령어는 트랜잭션의 시작을 말하며, exec()의 도달하기까지, 명령에 문제가 생기면, 그 앞선 명령을 다 취소시키는 원자성을 실현시켜 준다.
그 외의 rpush 라던지, hset, lrange, expire 등은 redis의 있는 명령어들을 활용하면, 금방 사용할 수 있을 것이다.
[참조 : https://redis.io/commands]
이상으로 socket.io 에서는 Redis를 어떻게 활용하고 있는지 알아보았다. In-memory 방식의 Redis를 통해서 cluster들의 데이터를 관리하고, broadcast 할 수 있기 때문에, 잘 활용되는 것이 아닐까 싶다.
다음 포스팅은 socket.io를 사용하는데 있어, Framework를 구성해볼 것이다.
'서비스개발(Web, App) > Back-End' 카테고리의 다른 글
Nest JS 프레임워크 공부하기 (Node.js) - 2 (0) | 2022.01.03 |
---|---|
Nest JS 프레임워크 공부하기 (Node.js) - 1 (0) | 2022.01.02 |
Nest JS 프레임워크 공부하기 (Node.js) - 0 (0) | 2022.01.02 |
Node.js ioredis(Redis) 외부 접속 (0) | 2021.08.19 |
Redis란?? (Redis 정의 및 설명) (0) | 2021.08.09 |