Move pkg/kmutex to internal/kmutex

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan
2024-01-17 09:57:01 -08:00
parent 5e1d9543be
commit 696cf25650
7 changed files with 4 additions and 4 deletions

105
internal/kmutex/kmutex.go Normal file
View File

@@ -0,0 +1,105 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package kmutex provides synchronization primitives to lock/unlock resource by unique key.
package kmutex
import (
"context"
"fmt"
"sync"
"golang.org/x/sync/semaphore"
)
// KeyedLocker is the interface for acquiring locks based on string.
type KeyedLocker interface {
Lock(ctx context.Context, key string) error
Unlock(key string)
}
func New() KeyedLocker {
return newKeyMutex()
}
func newKeyMutex() *keyMutex {
return &keyMutex{
locks: make(map[string]*klock),
}
}
type keyMutex struct {
mu sync.Mutex
locks map[string]*klock
}
type klock struct {
*semaphore.Weighted
ref int
}
func (km *keyMutex) Lock(ctx context.Context, key string) error {
km.mu.Lock()
l, ok := km.locks[key]
if !ok {
km.locks[key] = &klock{
Weighted: semaphore.NewWeighted(1),
}
l = km.locks[key]
}
l.ref++
km.mu.Unlock()
if err := l.Acquire(ctx, 1); err != nil {
km.mu.Lock()
defer km.mu.Unlock()
l.ref--
if l.ref < 0 {
panic(fmt.Errorf("kmutex: release of unlocked key %v", key))
}
if l.ref == 0 {
delete(km.locks, key)
}
return err
}
return nil
}
func (km *keyMutex) Unlock(key string) {
km.mu.Lock()
defer km.mu.Unlock()
l, ok := km.locks[key]
if !ok {
panic(fmt.Errorf("kmutex: unlock of unlocked key %v", key))
}
l.Release(1)
l.ref--
if l.ref < 0 {
panic(fmt.Errorf("kmutex: released of unlocked key %v", key))
}
if l.ref == 0 {
delete(km.locks, key)
}
}

View File

@@ -0,0 +1,170 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kmutex
import (
"context"
"runtime"
"strconv"
"sync"
"testing"
"time"
"github.com/containerd/containerd/v2/pkg/randutil"
"github.com/stretchr/testify/assert"
)
func TestBasic(t *testing.T) {
t.Parallel()
km := newKeyMutex()
ctx := context.Background()
km.Lock(ctx, "c1")
km.Lock(ctx, "c2")
assert.Equal(t, len(km.locks), 2)
assert.Equal(t, km.locks["c1"].ref, 1)
assert.Equal(t, km.locks["c2"].ref, 1)
checkWaitFn := func(key string, num int) {
retries := 100
waitLock := false
for i := 0; i < retries; i++ {
// prevent from data-race
km.mu.Lock()
ref := km.locks[key].ref
km.mu.Unlock()
if ref == num {
waitLock = true
break
}
time.Sleep(time.Duration(randutil.Int63n(100)) * time.Millisecond)
}
assert.Equal(t, waitLock, true)
}
// should acquire successfully after release
{
waitCh := make(chan struct{})
go func() {
defer close(waitCh)
km.Lock(ctx, "c1")
}()
checkWaitFn("c1", 2)
km.Unlock("c1")
<-waitCh
assert.Equal(t, km.locks["c1"].ref, 1)
}
// failed to acquire if context cancel
{
var errCh = make(chan error, 1)
ctx, cancel := context.WithCancel(context.Background())
go func() {
errCh <- km.Lock(ctx, "c1")
}()
checkWaitFn("c1", 2)
cancel()
assert.Equal(t, <-errCh, context.Canceled)
assert.Equal(t, km.locks["c1"].ref, 1)
}
}
func TestReleasePanic(t *testing.T) {
t.Parallel()
km := newKeyMutex()
defer func() {
if recover() == nil {
t.Fatal("release of unlocked key did not panic")
}
}()
km.Unlock(t.Name())
}
func TestMultileAcquireOnKeys(t *testing.T) {
t.Parallel()
km := newKeyMutex()
nloops := 10000
nproc := runtime.GOMAXPROCS(0)
ctx := context.Background()
var wg sync.WaitGroup
for i := 0; i < nproc; i++ {
wg.Add(1)
go func(key string) {
defer wg.Done()
for i := 0; i < nloops; i++ {
km.Lock(ctx, key)
time.Sleep(time.Duration(randutil.Int63n(100)) * time.Nanosecond)
km.Unlock(key)
}
}("key-" + strconv.Itoa(i))
}
wg.Wait()
}
func TestMultiAcquireOnSameKey(t *testing.T) {
t.Parallel()
km := newKeyMutex()
key := "c1"
ctx := context.Background()
assert.Nil(t, km.Lock(ctx, key))
nproc := runtime.GOMAXPROCS(0)
nloops := 10000
var wg sync.WaitGroup
for i := 0; i < nproc; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < nloops; i++ {
km.Lock(ctx, key)
time.Sleep(time.Duration(randutil.Int63n(100)) * time.Nanosecond)
km.Unlock(key)
}
}()
}
km.Unlock(key)
wg.Wait()
// c1 key has been released so the it should not have any klock.
assert.Equal(t, len(km.locks), 0)
}

33
internal/kmutex/noop.go Normal file
View File

@@ -0,0 +1,33 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kmutex
import "context"
func NewNoop() KeyedLocker {
return &noopMutex{}
}
type noopMutex struct {
}
func (*noopMutex) Lock(_ context.Context, _ string) error {
return nil
}
func (*noopMutex) Unlock(_ string) {
}