Golang 使用 Redis 锁阻止接口并发请求(重复提交)

Published: 2023-02-21

Tags: Golang

本文总阅读量

本文介绍使用 Redis 锁来限制接口的并发请求。

问题简述

线上服务可能部署在多台服务器,当用户快速点击时,多个请求会被不同机器处理,这个时候就可能存在并发问题。

比如,服务器A为用户增加了道具数量,但还未设置用户已领取道具标识时,服务器 B 也收到增加用户道具数量请求,读取到用户还未添加道具的标识,就会继续给用户加道具,导致了重复发放。

就算网关将同一用户的请求发往同一服务器处理,也会因为网络问题导致两个请求同时到达服务器,无法避免。

解决方式

避免接口并发请求,客户端改动是为了避免用户无效请求发出,服务端改动是为了从接口层面保证数据一致性。

  • 客户端在 UI 应在请求发出后,按钮变为不可点击状态,直到收到本次请求结果或按钮倒计时结束。
  • 如果可以,客户端发出的请求,应携带一个 OnceCode,同一场景用户点击两次按钮,OnceCode 应相同,用于服务端分辨同一请求。
  • 服务端的优化是当用户发起的同一个请求到达服务,响应最先到达的请求,其余的请求丢弃。

另外客户端在按钮上做限制,避免多次点击也有一个重要原因—— 如果不限制,服务端的 Redis 缓存失效时间就不好确定,也就是两次请求间隔多久算是两次请求,设置太久,会容易导致第二次正常的请求也不处理,设置短些,也不能低于接口响应时间,否则就没有办法避免并发了。

示例代码

server.go

package main

import (
    "crypto/md5"
    "encoding/hex"

    "fmt"
    "github.com/bsm/redislock"
    "github.com/gin-gonic/gin"
    "github.com/redis/go-redis/v9"
    "net/http"
    "time"
)

const (
    ServerName = "server_name"
    lockTTL    = 500 * time.Millisecond

    UserId = "u100123"
)

// --- redis ---

var client *redis.Client

func init() {
    opt := &redis.Options{
        Addr:     "127.0.0.1:6379",
        DB:       0,
        PoolSize: 600,
    }
    client = redis.NewClient(opt)
}

// --- middleware ---

func SequenceLock(c *gin.Context) {

    locker := redislock.New(client)
    pathHash := GetHash(c.Request.URL.Path)
    key := fmt.Sprintf("%s:%s:%s:%s", ServerName, "lock", UserId, pathHash)

    lock, err := locker.Obtain(c.Request.Context(), key, lockTTL, nil)
    if err != nil {
        time.Sleep(time.Second)
        c.JSON(http.StatusTooManyRequests, gin.H{
            "message": "repeated request",
        })
        c.Abort()
        return
    }

    defer lock.Release(c.Request.Context())
    c.Next()
}

// --- utils ---

func GetHash(text string) string {
    hash := md5.Sum([]byte(text))
    return hex.EncodeToString(hash[:])
}

func main() {
    r := gin.Default()
    r.GET("/ping", SequenceLock, func(c *gin.Context) {
        time.Sleep(300 * time.Millisecond)
        c.JSON(http.StatusOK, gin.H{
            "message": "pong",
        })
    })
    r.Run() // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080")
}

运行服务端,它会监听 8080 端口,用浏览器打开 http://127.0.0.1:8080/ping,会看到浏览器回显 "pong"

$ go run main.go

代码解析

  • 以上示例服务端代码使用 gin Web 框架,接到请求后 Sleep 300 毫秒是为了模拟业务处理时间。
  • SequenceLock 是封装好的中间件,在需要的接口添加即可,它负责从 Redis 获取一个锁,如果获取成功,则继续处理这个请求,未获取到,则忽略请求的处理。
  • 获取锁原理是判断 Redis 键是否存在,如果存在键,则无法获取锁,键不存在,则创建键并设置失效时间,返回加锁成功。
  • 借助 redislock 库,操作 Redis 是原子化的。另外 Redis 键用于表示 “一次请求”,它可以是 “用户ID + OnceCode” 精准的表示一次请求,如果客户端未传递 OnceCode,也可以是 “用户ID + URL Path Hash” 表示一次请求,即这个用户对这个即接口的调用同时只能处理一个。
  • 另外需要注意的是,需要 “连击” 的接口,不能使用 URL Path Hash 方式,会导致正常的请求被丢弃。

补充

Redis 锁的 TTL 500 毫秒是一个经验值,它需要大于接口响应时间。

例如接口响应时长为 800 毫秒,TTL 设置为 500 毫秒小于此值,持续并发请求下,第二个获取锁的请求时间为 500 毫秒后,此时第一个请求尚需 300 毫秒才能执行完成,它很可能还未设置完标志位,第二个请求时获取到未更新的标志位,就会导致非预期行为。

模拟客户端的测试脚本

client.py

#!/bin/env python
# -*- coding: utf-8 -*-

from gevent import monkey
monkey.patch_all()

import requests
import gevent

def get_page(url):
    r = requests.get(url)
    print(r.text)


if __name__ == '__main__':

    url = 'http://127.0.0.1:8080/ping'

    print("Results:")

    threads = [gevent.spawn(get_page, url) for i in range(10)]
    gevent.joinall(threads)

运行结果

Results:
{"message":"pong"}
{"message":"repeated request"}
{"message":"repeated request"}
{"message":"repeated request"}
{"message":"repeated request"}
{"message":"repeated request"}
{"message":"repeated request"}
{"message":"repeated request"}
{"message":"repeated request"}
{"message":"repeated request"}

bsm / redislock 库的补充

地址:https://github.com/bsm/redislock

从源代码中可以学习到使用 Lua 脚本的方式操作 Redis 键值,这种方式可以使得读取和操作 Key 两个步骤是原子化操作。

var (
    luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
    luaRelease = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end`)
    luaPTTL    = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pttl", KEYS[1]) else return -3 end`)
)

另外获取锁的时候支持重试

// 每 100ms 获取一次, 重试三次
backoff := redislock.LimitRetry(redislock.LinearBackoff(100*time.Millisecond), 3)

// Obtain lock with retry
lock, err := locker.Obtain(ctx, "my-key", time.Second, &redislock.Options{
    RetryStrategy: backoff,
})
// 在一分钟内,每 500ms 获取一次
backoff := redislock.LinearBackoff(500 * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
defer cancel()

更多示例参考 redislock 库的 example_test.go

总结

需要注意的是,本文方案不能避免重放攻击,仅从中间件角度,提供一种简便的,限制接口不能被用户并发请求的机制。

从安全性考虑,接口也应该在允许调用次数、频率、防重放等方面,结合业务来做更完善的验证和限制。