236 lines
5.2 KiB
Go
236 lines
5.2 KiB
Go
package engine
|
||
|
||
import (
|
||
"context"
|
||
"sync"
|
||
"time"
|
||
|
||
"git.kingecg.top/kingecg/gomog/internal/database"
|
||
"git.kingecg.top/kingecg/gomog/pkg/errors"
|
||
"git.kingecg.top/kingecg/gomog/pkg/types"
|
||
)
|
||
|
||
// MemoryStore 内存数据存储
|
||
type MemoryStore struct {
|
||
mu sync.RWMutex
|
||
collections map[string]*Collection
|
||
adapter database.DatabaseAdapter
|
||
}
|
||
|
||
// Collection 内存集合
|
||
type Collection struct {
|
||
name string
|
||
documents map[string]types.Document // id -> Document
|
||
mu sync.RWMutex
|
||
}
|
||
|
||
// NewMemoryStore 创建内存存储
|
||
func NewMemoryStore(adapter database.DatabaseAdapter) *MemoryStore {
|
||
return &MemoryStore{
|
||
collections: make(map[string]*Collection),
|
||
adapter: adapter,
|
||
}
|
||
}
|
||
|
||
// CreateTestCollectionForTesting 为测试创建集合(仅用于测试)
|
||
func CreateTestCollectionForTesting(store *MemoryStore, name string, documents map[string]types.Document) {
|
||
store.collections[name] = &Collection{
|
||
name: name,
|
||
documents: documents,
|
||
}
|
||
}
|
||
|
||
// LoadCollection 从数据库加载集合到内存
|
||
func (ms *MemoryStore) LoadCollection(ctx context.Context, name string) error {
|
||
// 检查集合是否存在
|
||
exists, err := ms.adapter.CollectionExists(ctx, name)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if !exists {
|
||
// 创建集合
|
||
if err := ms.adapter.CreateCollection(ctx, name); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
// 从数据库加载所有文档
|
||
docs, err := ms.adapter.FindAll(ctx, name)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
ms.mu.Lock()
|
||
defer ms.mu.Unlock()
|
||
|
||
coll := &Collection{
|
||
name: name,
|
||
documents: make(map[string]types.Document),
|
||
}
|
||
for _, doc := range docs {
|
||
coll.documents[doc.ID] = doc
|
||
}
|
||
ms.collections[name] = coll
|
||
|
||
return nil
|
||
}
|
||
|
||
// GetCollection 获取集合
|
||
func (ms *MemoryStore) GetCollection(name string) (*Collection, error) {
|
||
ms.mu.RLock()
|
||
defer ms.mu.RUnlock()
|
||
|
||
coll, exists := ms.collections[name]
|
||
if !exists {
|
||
return nil, errors.ErrCollectionNotFnd
|
||
}
|
||
return coll, nil
|
||
}
|
||
|
||
// Insert 插入文档到内存
|
||
func (ms *MemoryStore) Insert(collection string, doc types.Document) error {
|
||
coll, err := ms.GetCollection(collection)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
coll.mu.Lock()
|
||
defer coll.mu.Unlock()
|
||
|
||
coll.documents[doc.ID] = doc
|
||
return nil
|
||
}
|
||
|
||
// Find 查询文档
|
||
func (ms *MemoryStore) Find(collection string, filter types.Filter) ([]types.Document, error) {
|
||
coll, err := ms.GetCollection(collection)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
coll.mu.RLock()
|
||
defer coll.mu.RUnlock()
|
||
|
||
var results []types.Document
|
||
for _, doc := range coll.documents {
|
||
if MatchFilter(doc.Data, filter) {
|
||
results = append(results, doc)
|
||
}
|
||
}
|
||
|
||
return results, nil
|
||
}
|
||
|
||
// Update 更新文档(支持 upsert 和 arrayFilters)
|
||
func (ms *MemoryStore) Update(collection string, filter types.Filter, update types.Update, upsert bool, arrayFilters []types.Filter) (int, int, []string, error) {
|
||
coll, err := ms.GetCollection(collection)
|
||
if err != nil {
|
||
return 0, 0, nil, err
|
||
}
|
||
|
||
coll.mu.Lock()
|
||
defer coll.mu.Unlock()
|
||
|
||
matched := 0
|
||
modified := 0
|
||
var upsertedIDs []string
|
||
|
||
for id, doc := range coll.documents {
|
||
if MatchFilter(doc.Data, filter) {
|
||
matched++
|
||
|
||
// 应用更新
|
||
newData := applyUpdateWithFilters(doc.Data, update, false, arrayFilters)
|
||
coll.documents[id] = types.Document{
|
||
ID: doc.ID,
|
||
Data: newData,
|
||
CreatedAt: doc.CreatedAt,
|
||
UpdatedAt: time.Now(),
|
||
}
|
||
modified++
|
||
}
|
||
}
|
||
|
||
// 处理 upsert:如果没有匹配的文档且设置了 upsert
|
||
if matched == 0 && upsert {
|
||
// 创建新文档
|
||
newID := generateID()
|
||
newDoc := make(map[string]interface{})
|
||
|
||
// 应用更新($setOnInsert 会生效)
|
||
newData := applyUpdateWithFilters(newDoc, update, true, arrayFilters)
|
||
|
||
coll.documents[newID] = types.Document{
|
||
ID: newID,
|
||
Data: newData,
|
||
CreatedAt: time.Now(),
|
||
UpdatedAt: time.Now(),
|
||
}
|
||
matched = 1
|
||
modified = 1
|
||
upsertedIDs = append(upsertedIDs, newID)
|
||
}
|
||
|
||
return matched, modified, upsertedIDs, nil
|
||
}
|
||
|
||
// Delete 删除文档
|
||
func (ms *MemoryStore) Delete(collection string, filter types.Filter) (int, error) {
|
||
coll, err := ms.GetCollection(collection)
|
||
if err != nil {
|
||
return 0, err
|
||
}
|
||
|
||
coll.mu.Lock()
|
||
defer coll.mu.Unlock()
|
||
|
||
deleted := 0
|
||
for id, doc := range coll.documents {
|
||
if MatchFilter(doc.Data, filter) {
|
||
delete(coll.documents, id)
|
||
deleted++
|
||
}
|
||
}
|
||
|
||
return deleted, nil
|
||
}
|
||
|
||
// SyncToDB 同步集合到数据库
|
||
func (ms *MemoryStore) SyncToDB(ctx context.Context, collection string) error {
|
||
coll, err := ms.GetCollection(collection)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
coll.mu.RLock()
|
||
defer coll.mu.RUnlock()
|
||
|
||
// 转换为文档数组
|
||
docs := make([]types.Document, 0, len(coll.documents))
|
||
for _, doc := range coll.documents {
|
||
docs = append(docs, doc)
|
||
}
|
||
|
||
// 批量插入/更新到数据库
|
||
// 注意:这里简化处理,实际应该区分新增和更新
|
||
return ms.adapter.InsertMany(ctx, collection, docs)
|
||
}
|
||
|
||
// GetAllDocuments 获取集合的所有文档(用于聚合)
|
||
func (ms *MemoryStore) GetAllDocuments(collection string) ([]types.Document, error) {
|
||
coll, err := ms.GetCollection(collection)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
coll.mu.RLock()
|
||
defer coll.mu.RUnlock()
|
||
|
||
docs := make([]types.Document, 0, len(coll.documents))
|
||
for _, doc := range coll.documents {
|
||
docs = append(docs, doc)
|
||
}
|
||
|
||
return docs, nil
|
||
}
|