麻豆小视频在线观看_中文黄色一级片_久久久成人精品_成片免费观看视频大全_午夜精品久久久久久久99热浪潮_成人一区二区三区四区

首頁 > 數據庫 > Redis > 正文

Redis實現分布式鎖和等待序列的方法示例

2020-10-28 21:30:04
字體:
來源:轉載
供稿:網友

在集群下,經常會因為同時處理發生資源爭搶和并發問題,但是我們都知道同步鎖 synchronized 、 cas 、 ReentrankLock 這些鎖的作用范圍都是 JVM ,說白了在集群下沒啥用。這時我們就需要能在多臺 JVM 之間決定執行順序的鎖了,現在分布式鎖主要有 redis 、 Zookeeper 實現的,還有數據庫的方式,不過性能太差,也就是需要一個第三方的監管。

背景

最近在做一個消費 Kafka 消息的時候發現,由于線上的消費者過多,經常會遇到,多個機器同時處理一個主鍵類型的數據的情況發生,如果最后是執行更新操作的話,也就是一個更新順序的問題,但是如果恰好都需要插入數據的時候,會出現主鍵重復的問題。這是生產上不被允許的(因為公司有異常監管的機制,扣分啥的),這是就需要個分布式鎖了,斟酌后用了 Redis 的實現方式(因為網上例子多)

分析

redis 實現的分布式鎖,實現原理是 set 方法,因為多個線程同時請求的時候,只有一個線程可以成功并返回結果,還可以設置有效期,來避免死鎖的發生,一切都是這么的完美,不過有個問題,在 set 的時候,會直接返回結果,成功或者失敗,不具有阻塞效果,需要我們自己對失敗的線程進程處理,有兩種方式

  • 丟棄
  • 等待重試 由于我們的系統需要這些數據,那么只能重新嘗試獲取。這里使用 redis 的 List 類型實現等待序列的作用

代碼

直接上代碼 其實直接redis的工具類就可以解決了

package com.testimport redis.clients.jedis.Jedis;import java.util.Collections;import java.util.List;/** * @desc redis隊列實現方式 * @anthor  * @date  **/public class RedisUcUitl {  private static final String LOCK_SUCCESS = "OK";  private static final String SET_IF_NOT_EXIST = "NX";  private static final String SET_WITH_EXPIRE_TIME = "PX";  private static final Long RELEASE_SUCCESS = 1L;  private RedisUcUitl() {  }  /**   * logger   **/  /**   * 存儲redis隊列順序存儲 在隊列首部存入   *   * @param key  字節類型   * @param value 字節類型   */  public static Long lpush(Jedis jedis, final byte[] key, final byte[] value) {    return jedis.lpush(key, value);    }  /**   * 移除列表中最后一個元素 并將改元素添加入另一個列表中 ,當列表為空時 將阻塞連接 直到等待超時   *   * @param srckey   * @param dstkey   * @param timeout 0 表示永不超時   * @return   */  public static byte[] brpoplpush(Jedis jedis,final byte[] srckey, final byte[] dstkey, final int timeout) {    return jedis.brpoplpush(srckey, dstkey, timeout);  }  /**   * 返回制定的key,起始位置的redis數據   * @param redisKey   * @param start   * @param end -1 表示到最后   * @return   */  public static List<byte[]> lrange(Jedis jedis,final byte[] redisKey, final long start, final long end) {        return jedis.lrange(redisKey, start, end);  }  /**   * 刪除key   * @param redisKey   */  public static void delete(Jedis jedis, final byte[] redisKey) {         return jedis.del(redisKey);  }  /**   * 嘗試加鎖   * @param lockKey key名稱   * @param requestId 身份標識   * @param expireTime 過期時間   * @return   */  public static boolean tryGetDistributedLock(Jedis jedis,final String lockKey, final String requestId, final int expireTime) {    String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);    return LOCK_SUCCESS.equals(result);  }  /**   * 釋放鎖   * @param lockKey key名稱   * @param requestId 身份標識   * @return   */  public static boolean releaseDistributedLock(Jedis jedis,final String lockKey, final String requestId) {    final String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";    jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));    return RELEASE_SUCCESS.equals(result);  }}

業務邏輯主要代碼如下

1.先消耗隊列中的

while(true){  // 消費隊列  try{    // 被放入redis隊列的數據 序列化后的    byte[] bytes = RedisUcUitl.brpoplpush(keyStr.getBytes(UTF_8), dstKeyStr.getBytes(UTF_8), 1);    if(bytes == null || bytes.isEmpty()){      // 隊列中沒數據時退出      break;    }    // 反序列化對象    Map<String, Object> singleMap = (Map<String, Object>) ObjectSerialUtil.bytesToObject(bytes);    // 塞入唯一的值 防止被其他線程誤解鎖    String requestId = UUID.randomUUID().toString();    boolean lockGetFlag = RedisUcUitl.tryGetDistributedLock(keyStr,requestId, 100);    if(lockGetFlag){      // 成功獲取鎖 進行業務處理      //TODO      // 處理完畢釋放鎖       boolean freeLock = RedisUcUitl.releaseDistributedLock(keyStr, requestId);    }else{      // 未能獲得鎖放入等待隊列     RedisUcUitl.lpush(keyStr.getBytes(UTF_8), ObjectSerialUtil.objectToBytes(param));      }      }catch(Exception e){    break;  }  }

2.處理最新接到的數據

同樣是走嘗試獲取鎖,獲取不到放入隊列的流程

一般序列化用 fastJson 之列的就可以了,這里用的是 JDK 自帶的,工具類如下

public class ObjectSerialUtil {  private ObjectSerialUtil() {//    工具類  }  /**   * 將Object對象序列化為byte[]   *   * @param obj 對象   * @return byte數組   * @throws Exception   */  public static byte[] objectToBytes(Object obj) throws IOException {    ByteArrayOutputStream bos = new ByteArrayOutputStream();    ObjectOutputStream oos = new ObjectOutputStream(bos);    oos.writeObject(obj);    byte[] bytes = bos.toByteArray();    bos.close();    oos.close();    return bytes;  }  /**   * 將bytes數組還原為對象   *   * @param bytes   * @return   * @throws Exception   */  public static Object bytesToObject(byte[] bytes) {    try {      ByteArrayInputStream bin = new ByteArrayInputStream(bytes);      ObjectInputStream ois = new ObjectInputStream(bin);      return ois.readObject();    } catch (Exception e) {      throw new BaseException("反序列化出錯!", e);    }  }}

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持武林網。

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 黄网站在线播放视频免费观看 | 视频一区二区三区视频 | 91 在线免费观看 | 日韩视频区 | v11av在线播放 | 国产成人强伦免费视频网站 | 日本一道aⅴ不卡免费播放 久久久久久久高清 | 一区二区三区视频在线播放 | 成人9禁啪啪无遮挡免费 | 国产成人在线免费观看视频 | av电影在线观看免费 | 国产一区免费 | h色视频网站 | 欧美人一级淫片a免费播放 久久久久久久久91 国产99久久久久久免费看 | 亚洲精品成人18久久久久 | 亚洲精品欧美二区三区中文字幕 | 91av亚洲| 成人在线免费观看小视频 | 美国一级黄色毛片 | 污视频在线免费 | 一区二区三区视频播放 | 一区二区三区无码高清视频 | 午夜精品老牛av一区二区三区 | 国产精品久久久久久影视 | 永久免费在线观看av | 亚洲视频在线观看免费 | 欧美一级毛片免费观看视频 | 得得啪在线 | 天堂精品久久 | 热99在线视频 | 国产日韩在线视频 | 无遮挡一级毛片视频 | 日日噜噜噜噜久久久精品毛片 | 欧美精品一区二区三区在线播放 | 日韩黄站 | 亚洲国产精品一 | 涩涩天堂| 欧美成人高清视频 | 免费观看一级淫片 | 免费a级观看 | 热99在线视频 |