gomog/internal/engine/memory_store.go

236 lines
5.2 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package engine
import (
"context"
"sync"
"time"
"git.kingecg.top/kingecg/gomog/internal/database"
"git.kingecg.top/kingecg/gomog/pkg/errors"
"git.kingecg.top/kingecg/gomog/pkg/types"
)
// MemoryStore 内存数据存储
type MemoryStore struct {
mu sync.RWMutex
collections map[string]*Collection
adapter database.DatabaseAdapter
}
// Collection 内存集合
type Collection struct {
name string
documents map[string]types.Document // id -> Document
mu sync.RWMutex
}
// NewMemoryStore 创建内存存储
func NewMemoryStore(adapter database.DatabaseAdapter) *MemoryStore {
return &MemoryStore{
collections: make(map[string]*Collection),
adapter: adapter,
}
}
// CreateTestCollectionForTesting 为测试创建集合(仅用于测试)
func CreateTestCollectionForTesting(store *MemoryStore, name string, documents map[string]types.Document) {
store.collections[name] = &Collection{
name: name,
documents: documents,
}
}
// LoadCollection 从数据库加载集合到内存
func (ms *MemoryStore) LoadCollection(ctx context.Context, name string) error {
// 检查集合是否存在
exists, err := ms.adapter.CollectionExists(ctx, name)
if err != nil {
return err
}
if !exists {
// 创建集合
if err := ms.adapter.CreateCollection(ctx, name); err != nil {
return err
}
}
// 从数据库加载所有文档
docs, err := ms.adapter.FindAll(ctx, name)
if err != nil {
return err
}
ms.mu.Lock()
defer ms.mu.Unlock()
coll := &Collection{
name: name,
documents: make(map[string]types.Document),
}
for _, doc := range docs {
coll.documents[doc.ID] = doc
}
ms.collections[name] = coll
return nil
}
// GetCollection 获取集合
func (ms *MemoryStore) GetCollection(name string) (*Collection, error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
coll, exists := ms.collections[name]
if !exists {
return nil, errors.ErrCollectionNotFnd
}
return coll, nil
}
// Insert 插入文档到内存
func (ms *MemoryStore) Insert(collection string, doc types.Document) error {
coll, err := ms.GetCollection(collection)
if err != nil {
return err
}
coll.mu.Lock()
defer coll.mu.Unlock()
coll.documents[doc.ID] = doc
return nil
}
// Find 查询文档
func (ms *MemoryStore) Find(collection string, filter types.Filter) ([]types.Document, error) {
coll, err := ms.GetCollection(collection)
if err != nil {
return nil, err
}
coll.mu.RLock()
defer coll.mu.RUnlock()
var results []types.Document
for _, doc := range coll.documents {
if MatchFilter(doc.Data, filter) {
results = append(results, doc)
}
}
return results, nil
}
// Update 更新文档(支持 upsert 和 arrayFilters
func (ms *MemoryStore) Update(collection string, filter types.Filter, update types.Update, upsert bool, arrayFilters []types.Filter) (int, int, []string, error) {
coll, err := ms.GetCollection(collection)
if err != nil {
return 0, 0, nil, err
}
coll.mu.Lock()
defer coll.mu.Unlock()
matched := 0
modified := 0
var upsertedIDs []string
for id, doc := range coll.documents {
if MatchFilter(doc.Data, filter) {
matched++
// 应用更新
newData := applyUpdateWithFilters(doc.Data, update, false, arrayFilters)
coll.documents[id] = types.Document{
ID: doc.ID,
Data: newData,
CreatedAt: doc.CreatedAt,
UpdatedAt: time.Now(),
}
modified++
}
}
// 处理 upsert如果没有匹配的文档且设置了 upsert
if matched == 0 && upsert {
// 创建新文档
newID := generateID()
newDoc := make(map[string]interface{})
// 应用更新($setOnInsert 会生效)
newData := applyUpdateWithFilters(newDoc, update, true, arrayFilters)
coll.documents[newID] = types.Document{
ID: newID,
Data: newData,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
matched = 1
modified = 1
upsertedIDs = append(upsertedIDs, newID)
}
return matched, modified, upsertedIDs, nil
}
// Delete 删除文档
func (ms *MemoryStore) Delete(collection string, filter types.Filter) (int, error) {
coll, err := ms.GetCollection(collection)
if err != nil {
return 0, err
}
coll.mu.Lock()
defer coll.mu.Unlock()
deleted := 0
for id, doc := range coll.documents {
if MatchFilter(doc.Data, filter) {
delete(coll.documents, id)
deleted++
}
}
return deleted, nil
}
// SyncToDB 同步集合到数据库
func (ms *MemoryStore) SyncToDB(ctx context.Context, collection string) error {
coll, err := ms.GetCollection(collection)
if err != nil {
return err
}
coll.mu.RLock()
defer coll.mu.RUnlock()
// 转换为文档数组
docs := make([]types.Document, 0, len(coll.documents))
for _, doc := range coll.documents {
docs = append(docs, doc)
}
// 批量插入/更新到数据库
// 注意:这里简化处理,实际应该区分新增和更新
return ms.adapter.InsertMany(ctx, collection, docs)
}
// GetAllDocuments 获取集合的所有文档(用于聚合)
func (ms *MemoryStore) GetAllDocuments(collection string) ([]types.Document, error) {
coll, err := ms.GetCollection(collection)
if err != nil {
return nil, err
}
coll.mu.RLock()
defer coll.mu.RUnlock()
docs := make([]types.Document, 0, len(coll.documents))
for _, doc := range coll.documents {
docs = append(docs, doc)
}
return docs, nil
}