diff options
Diffstat (limited to 'vendor/github.com/streamrail')
3 files changed, 376 insertions, 0 deletions
diff --git a/vendor/github.com/streamrail/concurrent-map/LICENSE b/vendor/github.com/streamrail/concurrent-map/LICENSE new file mode 100644 index 0000000..ea2fec0 --- /dev/null +++ b/vendor/github.com/streamrail/concurrent-map/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2014 streamrail + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/vendor/github.com/streamrail/concurrent-map/README.md b/vendor/github.com/streamrail/concurrent-map/README.md new file mode 100644 index 0000000..e6b4839 --- /dev/null +++ b/vendor/github.com/streamrail/concurrent-map/README.md @@ -0,0 +1,53 @@ +# concurrent map [](https://circleci.com/gh/streamrail/concurrent-map) + +As explained [here](http://golang.org/doc/faq#atomic_maps) and [here](http://blog.golang.org/go-maps-in-action), the `map` type in Go doesn't support concurrent reads and writes. `concurrent-map` provides a high-performance solution to this by sharding the map with minimal time spent waiting for locks. + +## usage + +Import the package: + +```go +import ( + "github.com/streamrail/concurrent-map" +) + +``` + +```bash +go get "github.com/streamrail/concurrent-map" +``` + +The package is now imported under the "cmap" namespace. + +## example + +```go + + // Create a new map. + map := cmap.New() + + // Sets item within map, sets "bar" under key "foo" + map.Set("foo", "bar") + + // Retrieve item from map. + if tmp, ok := map.Get("foo"); ok { + bar := tmp.(string) + } + + // Removes item under key "foo" + map.Remove("foo") + +``` + +For more examples have a look at concurrent_map_test.go. + + +Running tests: + +```bash +go test "github.com/streamrail/concurrent-map" +``` + + +## license +MIT (see [LICENSE](https://github.com/streamrail/concurrent-map/blob/master/LICENSE) file) diff --git a/vendor/github.com/streamrail/concurrent-map/concurrent_map.go b/vendor/github.com/streamrail/concurrent-map/concurrent_map.go new file mode 100644 index 0000000..f6e5c06 --- /dev/null +++ b/vendor/github.com/streamrail/concurrent-map/concurrent_map.go @@ -0,0 +1,301 @@ +package cmap + +import ( + "encoding/json" + "sync" +) + +var SHARD_COUNT = 32 + +// A "thread" safe map of type string:Anything. +// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards. +type ConcurrentMap []*ConcurrentMapShared + +// A "thread" safe string to anything map. +type ConcurrentMapShared struct { + items map[string]interface{} + sync.RWMutex // Read Write mutex, guards access to internal map. +} + +// Creates a new concurrent map. +func New() ConcurrentMap { + m := make(ConcurrentMap, SHARD_COUNT) + for i := 0; i < SHARD_COUNT; i++ { + m[i] = &ConcurrentMapShared{items: make(map[string]interface{})} + } + return m +} + +// Returns shard under given key +func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared { + return m[uint(fnv32(key))%uint(SHARD_COUNT)] +} + +func (m ConcurrentMap) MSet(data map[string]interface{}) { + for key, value := range data { + shard := m.GetShard(key) + shard.Lock() + shard.items[key] = value + shard.Unlock() + } +} + +// Sets the given value under the specified key. +func (m *ConcurrentMap) Set(key string, value interface{}) { + // Get map shard. + shard := m.GetShard(key) + shard.Lock() + shard.items[key] = value + shard.Unlock() +} + +// Callback to return new element to be inserted into the map +// It is called while lock is held, therefore it MUST NOT +// try to access other keys in same map, as it can lead to deadlock since +// Go sync.RWLock is not reentrant +type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{} + +// Insert or Update - updates existing element or inserts a new one using UpsertCb +func (m *ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) { + shard := m.GetShard(key) + shard.Lock() + v, ok := shard.items[key] + res = cb(ok, v, value) + shard.items[key] = res + shard.Unlock() + return res +} + +// Sets the given value under the specified key if no value was associated with it. +func (m *ConcurrentMap) SetIfAbsent(key string, value interface{}) bool { + // Get map shard. + shard := m.GetShard(key) + shard.Lock() + _, ok := shard.items[key] + if !ok { + shard.items[key] = value + } + shard.Unlock() + return !ok +} + +// Retrieves an element from map under given key. +func (m ConcurrentMap) Get(key string) (interface{}, bool) { + // Get shard + shard := m.GetShard(key) + shard.RLock() + // Get item from shard. + val, ok := shard.items[key] + shard.RUnlock() + return val, ok +} + +// Returns the number of elements within the map. +func (m ConcurrentMap) Count() int { + count := 0 + for i := 0; i < SHARD_COUNT; i++ { + shard := m[i] + shard.RLock() + count += len(shard.items) + shard.RUnlock() + } + return count +} + +// Looks up an item under specified key +func (m *ConcurrentMap) Has(key string) bool { + // Get shard + shard := m.GetShard(key) + shard.RLock() + // See if element is within shard. + _, ok := shard.items[key] + shard.RUnlock() + return ok +} + +// Removes an element from the map. +func (m *ConcurrentMap) Remove(key string) { + // Try to get shard. + shard := m.GetShard(key) + shard.Lock() + delete(shard.items, key) + shard.Unlock() +} + +// Removes an element from the map and returns it +func (m *ConcurrentMap) Pop(key string) (v interface{}, exists bool) { + // Try to get shard. + shard := m.GetShard(key) + shard.Lock() + v, exists = shard.items[key] + delete(shard.items, key) + shard.Unlock() + return v, exists +} + +// Checks if map is empty. +func (m *ConcurrentMap) IsEmpty() bool { + return m.Count() == 0 +} + +// Used by the Iter & IterBuffered functions to wrap two variables together over a channel, +type Tuple struct { + Key string + Val interface{} +} + +// Returns an iterator which could be used in a for range loop. +// +// Deprecated: using IterBuffered() will get a better performence +func (m ConcurrentMap) Iter() <-chan Tuple { + ch := make(chan Tuple) + go func() { + wg := sync.WaitGroup{} + wg.Add(SHARD_COUNT) + // Foreach shard. + for _, shard := range m { + go func(shard *ConcurrentMapShared) { + // Foreach key, value pair. + shard.RLock() + for key, val := range shard.items { + ch <- Tuple{key, val} + } + shard.RUnlock() + wg.Done() + }(shard) + } + wg.Wait() + close(ch) + }() + return ch +} + +// Returns a buffered iterator which could be used in a for range loop. +func (m ConcurrentMap) IterBuffered() <-chan Tuple { + ch := make(chan Tuple, m.Count()) + go func() { + wg := sync.WaitGroup{} + wg.Add(SHARD_COUNT) + // Foreach shard. + for _, shard := range m { + go func(shard *ConcurrentMapShared) { + // Foreach key, value pair. + shard.RLock() + for key, val := range shard.items { + ch <- Tuple{key, val} + } + shard.RUnlock() + wg.Done() + }(shard) + } + wg.Wait() + close(ch) + }() + return ch +} + +// Returns all items as map[string]interface{} +func (m ConcurrentMap) Items() map[string]interface{} { + tmp := make(map[string]interface{}) + + // Insert items to temporary map. + for item := range m.IterBuffered() { + tmp[item.Key] = item.Val + } + + return tmp +} + +// Iterator callback,called for every key,value found in +// maps. RLock is held for all calls for a given shard +// therefore callback sess consistent view of a shard, +// but not across the shards +type IterCb func(key string, v interface{}) + +// Callback based iterator, cheapest way to read +// all elements in a map. +func (m *ConcurrentMap) IterCb(fn IterCb) { + for idx := range *m { + shard := (*m)[idx] + shard.RLock() + for key, value := range shard.items { + fn(key, value) + } + shard.RUnlock() + } +} + +// Return all keys as []string +func (m ConcurrentMap) Keys() []string { + count := m.Count() + ch := make(chan string, count) + go func() { + // Foreach shard. + wg := sync.WaitGroup{} + wg.Add(SHARD_COUNT) + for _, shard := range m { + go func(shard *ConcurrentMapShared) { + // Foreach key, value pair. + shard.RLock() + for key := range shard.items { + ch <- key + } + shard.RUnlock() + wg.Done() + }(shard) + } + wg.Wait() + close(ch) + }() + + // Generate keys + keys := make([]string, count) + for i := 0; i < count; i++ { + keys[i] = <-ch + } + return keys +} + +//Reviles ConcurrentMap "private" variables to json marshal. +func (m ConcurrentMap) MarshalJSON() ([]byte, error) { + // Create a temporary map, which will hold all item spread across shards. + tmp := make(map[string]interface{}) + + // Insert items to temporary map. + for item := range m.IterBuffered() { + tmp[item.Key] = item.Val + } + return json.Marshal(tmp) +} + +func fnv32(key string) uint32 { + hash := uint32(2166136261) + const prime32 = uint32(16777619) + for i := 0; i < len(key); i++ { + hash *= prime32 + hash ^= uint32(key[i]) + } + return hash +} + +// Concurrent map uses Interface{} as its value, therefor JSON Unmarshal +// will probably won't know which to type to unmarshal into, in such case +// we'll end up with a value of type map[string]interface{}, In most cases this isn't +// out value type, this is why we've decided to remove this functionality. + +// func (m *ConcurrentMap) UnmarshalJSON(b []byte) (err error) { +// // Reverse process of Marshal. + +// tmp := make(map[string]interface{}) + +// // Unmarshal into a single map. +// if err := json.Unmarshal(b, &tmp); err != nil { +// return nil +// } + +// // foreach key,value pair in temporary map insert into our concurrent map. +// for key, val := range tmp { +// m.Set(key, val) +// } +// return nil +// } |
