Skip to content

Commit

Permalink
redisUtil 분리 및 분산락/샤딩 적용
Browse files Browse the repository at this point in the history
  • Loading branch information
kohbyeongchan committed Nov 18, 2024
1 parent a0bf74f commit 908d762
Show file tree
Hide file tree
Showing 7 changed files with 433 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,24 @@ public QueueController(QueueService queueService) {
this.queueService = queueService;
}


// 사용자 대기열 진입 API
// 사용자 대기열 진입 API
@GetMapping("/queue")
public ResponseEntity<String> enterQueue(@RequestParam(required = false) String userToken) {
try {
// 사용자 대기열 진입 처리
String response = queueService.handleUserEntry(userToken);
return ResponseEntity.ok(response);
} catch (Exception e) {
// 에러 로그 추가
System.err.println("Error entering queue: " + e.getMessage());
return ResponseEntity.status(500).body("Error entering queue: " + e.getMessage());
}
}


//Polling API: 사용자 리다이렉트 여부 확인
// Polling API: 사용자 리다이렉트 여부 확인
@GetMapping("/queue/status")
public ResponseEntity<String> checkQueueStatus(@RequestParam String userToken) {
try {
boolean readyToRedirect = queueService.isUserReadyToRedirect(userToken);

if (readyToRedirect) {
return ResponseEntity.ok("/seats/sections");
} else {
return ResponseEntity.ok("inQueue");
}
return readyToRedirect ? ResponseEntity.ok("/seats/sections") : ResponseEntity.ok("inQueue");
} catch (Exception e) {
// 에러 로그 추가
System.err.println("Error checking queue status: " + e.getMessage());
return ResponseEntity.status(500).body("Error checking queue status: " + e.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,100 +1,145 @@
package com.bticketing.appqueue.service;

import com.bticketing.appqueue.util.RedisUtil;
import com.bticketing.appqueue.util.TokenUtil;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

@Service
public class QueueService {

private static final Logger logger = LoggerFactory.getLogger(QueueService.class);

private static final String QUEUE_KEY = "userQueue";
private static final String GROUP_KEY_PREFIX = "group-";
private static final String REDIRECT_COUNT_KEY_PREFIX = "redirectCount-";
private static final String USER_READY_KEY_PREFIX = "userReady-";
private static final int MAX_QUEUE_SIZE = 500;
private static final int GROUP_SIZE = 200;
private static final String CURRENT_GROUP_KEY = "currentGroup";

private static final int MAX_QUEUE_SIZE = 1000;
private static final int GROUP_SIZE = 120;
private final AtomicInteger cachedCurrentGroup = new AtomicInteger(1);

private final RedisTemplate<String, Object> redisTemplate;
private int currentGroup = 1;
private final RedisUtil redisUtil;

public QueueService(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
public QueueService(RedisUtil redisUtil) {
this.redisUtil = redisUtil;
}

// 현재 그룹 조회
protected int getCurrentGroup() {
int currentGroup = cachedCurrentGroup.get();
Integer redisGroup = (Integer) redisUtil.getValue(CURRENT_GROUP_KEY);
if (redisGroup != null && redisGroup > currentGroup) {
cachedCurrentGroup.set(redisGroup);
return redisGroup;
}
return currentGroup;
}

// 사용자 대기열 진입 처리
// 현재 그룹 업데이트
protected void updateCurrentGroup(int newGroup) {
try {
redisUtil.setValue(CURRENT_GROUP_KEY, newGroup);
cachedCurrentGroup.set(newGroup);
logger.info("현재 그룹 번호가 {}로 업데이트 되었습니다.", newGroup);
} catch (Exception e) {
logger.error("현재 그룹 번호 업데이트 실패", e);
}
}

// 사용자 진입 처리
public String handleUserEntry(String userToken) {
if (userToken == null || userToken.isEmpty()) {
if (userToken == null || userToken.isBlank()) {
userToken = TokenUtil.generateUserToken();
}

Long queueSize = redisTemplate.opsForList().size(QUEUE_KEY);

// 500명 미만이면 바로 리다이렉트
Long queueSize = redisUtil.incrementValue(QUEUE_KEY + "-size");
if (queueSize != null && queueSize < MAX_QUEUE_SIZE) {
markUserAsReadyToRedirect(userToken);
// 사용자 리다이렉트 준비: 직접 Redis에 값을 설정
redisUtil.setValueWithTTL(USER_READY_KEY_PREFIX + userToken, true, Duration.ofMinutes(10));
return "/seats/sections";
}

// 현재 그룹 키 생성 및 그룹 관리
String groupKey = GROUP_KEY_PREFIX + currentGroup;
Long groupSize = redisTemplate.opsForList().size(groupKey);

// 그룹 사이즈가 200명에 도달하면 다음 그룹으로 전환
if (groupSize != null && groupSize >= GROUP_SIZE) {
currentGroup++;
groupKey = GROUP_KEY_PREFIX + currentGroup;
int currentGroup = getCurrentGroup();
if (!acquireLockWithSharding("lock:group", currentGroup, Duration.ofSeconds(5))) {
logger.warn("그룹 {}에 사용자 추가를 위한 락 획득에 실패했습니다.", currentGroup);
return "error";
}

// 사용자 대기열 및 현재 그룹에 추가
redisTemplate.opsForList().rightPush(QUEUE_KEY, userToken);
redisTemplate.opsForList().rightPush(groupKey, userToken);
try {
String groupKey = GROUP_KEY_PREFIX + currentGroup;
if (redisUtil.getListLength(groupKey) >= GROUP_SIZE) {
updateCurrentGroup(currentGroup + 1);
groupKey = GROUP_KEY_PREFIX + (currentGroup + 1);
}

redisUtil.rightPushToList(groupKey, userToken);

// 대기열 그룹 처리 메서드 호출
processQueueGroup();
// 사용자 리다이렉트 준비: 직접 Redis에 값을 설정
redisUtil.setValueWithTTL(USER_READY_KEY_PREFIX + userToken, true, Duration.ofMinutes(10));
logger.info("사용자 {}가 {} 그룹에 추가되었습니다.", userToken, groupKey);
} finally {
redisUtil.releaseLock("lock:group-" + currentGroup);
}

return "addedToQueue";
return "addedToQueue?userToken=" + userToken;
}

// 그룹별 대기열 처리 메서드
// 대기열 그룹 처리
public void processQueueGroup() {
String groupKey = GROUP_KEY_PREFIX + currentGroup;
String redirectCountKey = REDIRECT_COUNT_KEY_PREFIX + currentGroup;
Long groupSize = redisTemplate.opsForList().size(groupKey);

// 현재 그룹이 존재하고, 그룹 사이즈가 200명 이상일 경우 처리
if (groupSize != null && groupSize >= GROUP_SIZE) {
for (int i = 0; i < GROUP_SIZE; i++) {
String userToken = (String) redisTemplate.opsForList().leftPop(groupKey);
if (userToken != null) {
markUserAsReadyToRedirect(userToken);

// Atomic Counter 증가
Long redirectCount = redisTemplate.opsForValue().increment(redirectCountKey);

// 모든 사용자가 리다이렉트되면 그룹 키 삭제
if (redirectCount != null && redirectCount >= GROUP_SIZE) {
redisTemplate.delete(groupKey);
redisTemplate.delete(redirectCountKey);
System.out.println("Deleted group key and redirect counter: " + groupKey);
}
}
int currentGroup = getCurrentGroup();
String lockKey = "lock:processQueueGroup-" + currentGroup;

if (!acquireLockWithSharding(lockKey, currentGroup, Duration.ofSeconds(5))) {
logger.warn("그룹 {}의 대기열 처리를 위한 락 획득에 실패했습니다.", currentGroup);
return;
}

try {
String groupKey = GROUP_KEY_PREFIX + currentGroup;
List<String> userTokens = redisUtil.leftPopMultipleFromList(groupKey, GROUP_SIZE);
if (userTokens.isEmpty()) {
logger.warn("그룹 {}에서 사용자 토큰을 찾을 수 없습니다.", currentGroup);
return;
}

// 다음 그룹 처리
currentGroup++;
processQueueGroup();
redisUtil.setMultipleValues(userTokens, true, Duration.ofMinutes(10));
updateCurrentGroup(currentGroup + 1);
} finally {
redisUtil.releaseLock(lockKey);
}
}

// 사용자 리다이렉트 상태 업데이트
public void markUserAsReadyToRedirect(String userToken) {
// Atomic Set: userToken:ready 키를 true로 설정
redisTemplate.opsForValue().set(USER_READY_KEY_PREFIX + userToken, true);
// Sharded Lock 획득 메서드
public boolean acquireLockWithSharding(String lockKeyPrefix, int groupId, Duration lockDuration) {
String lockKey = lockKeyPrefix + "-" + groupId;
int maxRetries = 5;
int baseDelay = 50;

for (int i = 0; i < maxRetries; i++) {
if (redisUtil.acquireLock(lockKey, lockDuration)) {
return true;
}
try {
int backoffDelay = baseDelay * i;
Thread.sleep(backoffDelay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
logger.warn("그룹 {}에 대해 {}회 재시도 후 락 획득에 실패했습니다.", groupId, maxRetries);
return false;
}

// 사용자 리다이렉트 상태 확인
public boolean isUserReadyToRedirect(String userToken) {
Boolean isReady = (Boolean) redisTemplate.opsForValue().get(USER_READY_KEY_PREFIX + userToken);
Boolean isReady = (Boolean) redisUtil.getValue(USER_READY_KEY_PREFIX + userToken);
return Boolean.TRUE.equals(isReady);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.bticketing.appqueue.util;

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.stereotype.Component;
import org.springframework.data.redis.core.SessionCallback;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;

@Component
public class RedisUtil {

private final RedisTemplate<String, Object> redisTemplate;

public RedisUtil(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}

// 값 가져오기
public Object getValue(String key) {
return redisTemplate.opsForValue().get(key);
}

// 값 설정
public void setValue(String key, Object value) {
redisTemplate.opsForValue().set(key, value);
}

// 값 설정 (TTL 포함)
public void setValueWithTTL(String key, Object value, Duration ttl) {
redisTemplate.opsForValue().set(key, value, ttl);
}

// 값 증가
public Long incrementValue(String key) {
return redisTemplate.opsForValue().increment(key);
}

// 리스트 길이 가져오기
public Long getListLength(String key) {
return redisTemplate.opsForList().size(key);
}

// 리스트에 값 추가
public void rightPushToList(String key, String value) {
redisTemplate.opsForList().rightPush(key, value);
}

// Redis에서 리스트에서 여러 값 꺼내기 메서드
public List<String> leftPopMultipleFromList(String key, int count) {
List<String> results = new ArrayList<>();
try {
for (int i = 0; i < count; i++) {
String value = (String) redisTemplate.opsForList().leftPop(key);
if (value == null) {
break;
}
results.add(value);
}
} catch (Exception e) {
System.err.println("Error during leftPopMultipleFromList: " + e.getMessage());
}
return results;
}

// 배치로 여러 값 설정 (MSET 사용)
public void setMultipleValues(List<String> keys, Object value, Duration ttl) {
Map<String, Object> valueMap = new HashMap<>();
for (String key : keys) {
valueMap.put(key, value);
}
redisTemplate.opsForValue().multiSet(valueMap);

// TTL 설정
for (String key : keys) {
redisTemplate.expire(key, ttl);
}
}

// 분산 락 획득
public boolean acquireLock(String key, Duration ttl) {
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, "locked", ttl);
return Boolean.TRUE.equals(result);
}

// 분산 락 해제
public void releaseLock(String key) {
redisTemplate.delete(key);
}


}
3 changes: 3 additions & 0 deletions app-queue/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ spring:
redis:
host: localhost
port: 6379
timeout: 5000ms


Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ void testPollingAPI_UserInQueue() {
void testPollingAPI_UserRedirected() {
// Given: 리다이렉트 준비 완료
String userToken = "redirectUser";
queueService.markUserAsReadyToRedirect(userToken);

// When: Polling API 호출
String statusUrl = UriComponentsBuilder.fromPath("/queue/status")
Expand Down
Loading

0 comments on commit 908d762

Please sign in to comment.