diff --git a/apps/backend/internal/blob/err.go b/apps/backend/internal/blob/err.go index d1e9614..c26f647 100644 --- a/apps/backend/internal/blob/err.go +++ b/apps/backend/internal/blob/err.go @@ -3,6 +3,7 @@ package blob import "errors" var ( - ErrConflict = errors.New("key already used for a different blob") - ErrNotFound = errors.New("key not found") + ErrConflict = errors.New("key already used for a different blob") + ErrNotFound = errors.New("key not found") + ErrInvalidFileContent = errors.New("invalid file content. must provide either a reader or a blob key") ) diff --git a/apps/backend/internal/blob/fs_store.go b/apps/backend/internal/blob/fs_store.go index 9053b21..a39f91a 100644 --- a/apps/backend/internal/blob/fs_store.go +++ b/apps/backend/internal/blob/fs_store.go @@ -5,6 +5,8 @@ import ( "io" "os" "path/filepath" + + "github.com/get-drexa/drexa/internal/ioext" ) var _ Store = &FSStore{} @@ -14,13 +16,18 @@ type FSStore struct { } type FSStoreConfig struct { - Root string + Root string + UploadURL string } func NewFSStore(config FSStoreConfig) *FSStore { return &FSStore{config: config} } +func (s *FSStore) GenerateUploadURL(ctx context.Context, key Key, opts UploadURLOptions) (string, error) { + return s.config.UploadURL, nil +} + func (s *FSStore) Put(ctx context.Context, key Key, reader io.Reader) error { path := filepath.Join(s.config.Root, string(key)) @@ -47,7 +54,7 @@ func (s *FSStore) Put(ctx context.Context, key Key, reader io.Reader) error { return nil } -func (s *FSStore) Retrieve(ctx context.Context, key Key) (io.ReadCloser, error) { +func (s *FSStore) Read(ctx context.Context, key Key) (io.ReadCloser, error) { path := filepath.Join(s.config.Root, string(key)) f, err := os.Open(path) if err != nil { @@ -59,6 +66,37 @@ func (s *FSStore) Retrieve(ctx context.Context, key Key) (io.ReadCloser, error) return f, nil } +func (s *FSStore) ReadRange(ctx context.Context, key Key, offset, length int64) (io.ReadCloser, error) { + path := filepath.Join(s.config.Root, string(key)) + + f, err := os.Open(path) + if err != nil { + if os.IsNotExist(err) { + return nil, ErrNotFound + } + return nil, err + } + + _, err = f.Seek(offset, io.SeekStart) + if err != nil { + return nil, err + } + + return ioext.NewLimitReadCloser(f, length), nil +} + +func (s *FSStore) ReadSize(ctx context.Context, key Key) (int64, error) { + path := filepath.Join(s.config.Root, string(key)) + fi, err := os.Stat(path) + if err != nil { + if os.IsNotExist(err) { + return 0, ErrNotFound + } + return 0, err + } + return fi.Size(), nil +} + func (s *FSStore) Delete(ctx context.Context, key Key) error { err := os.Remove(filepath.Join(s.config.Root, string(key))) // no op if file does not exist @@ -69,6 +107,11 @@ func (s *FSStore) Delete(ctx context.Context, key Key) error { return nil } +func (s *FSStore) Update(ctx context.Context, key Key, opts UpdateOptions) error { + // Update is a no-op for FSStore + return nil +} + func (s *FSStore) Move(ctx context.Context, srcKey, dstKey Key) error { oldPath := filepath.Join(s.config.Root, string(srcKey)) newPath := filepath.Join(s.config.Root, string(dstKey)) diff --git a/apps/backend/internal/blob/key.go b/apps/backend/internal/blob/key.go index 0fdc1c4..406e61a 100644 --- a/apps/backend/internal/blob/key.go +++ b/apps/backend/internal/blob/key.go @@ -2,6 +2,13 @@ package blob type Key string +type KeyMode int + +const ( + KeyModeStable KeyMode = iota + KeyModeDerived +) + func (k Key) IsNil() bool { return k == "" } diff --git a/apps/backend/internal/blob/store.go b/apps/backend/internal/blob/store.go index c0996a0..0ddb252 100644 --- a/apps/backend/internal/blob/store.go +++ b/apps/backend/internal/blob/store.go @@ -3,11 +3,24 @@ package blob import ( "context" "io" + "time" ) +type UploadURLOptions struct { + Duration time.Duration +} + +type UpdateOptions struct { + ContentType string +} + type Store interface { + GenerateUploadURL(ctx context.Context, key Key, opts UploadURLOptions) (string, error) Put(ctx context.Context, key Key, reader io.Reader) error - Retrieve(ctx context.Context, key Key) (io.ReadCloser, error) + Update(ctx context.Context, key Key, opts UpdateOptions) error Delete(ctx context.Context, key Key) error Move(ctx context.Context, srcKey, dstKey Key) error + Read(ctx context.Context, key Key) (io.ReadCloser, error) + ReadRange(ctx context.Context, key Key, offset, length int64) (io.ReadCloser, error) + ReadSize(ctx context.Context, key Key) (int64, error) } diff --git a/apps/backend/internal/virtualfs/counting_reader.go b/apps/backend/internal/ioext/counting_reader.go similarity index 95% rename from apps/backend/internal/virtualfs/counting_reader.go rename to apps/backend/internal/ioext/counting_reader.go index 5e92f8c..a2a6824 100644 --- a/apps/backend/internal/virtualfs/counting_reader.go +++ b/apps/backend/internal/ioext/counting_reader.go @@ -1,4 +1,4 @@ -package virtualfs +package ioext import "io" @@ -20,3 +20,4 @@ func (r *CountingReader) Read(p []byte) (n int, err error) { func (r *CountingReader) Count() int64 { return r.count } + diff --git a/apps/backend/internal/ioext/limit_read_closer.go b/apps/backend/internal/ioext/limit_read_closer.go new file mode 100644 index 0000000..bc54250 --- /dev/null +++ b/apps/backend/internal/ioext/limit_read_closer.go @@ -0,0 +1,24 @@ +package ioext + +import "io" + +type LimitReadCloser struct { + reader io.ReadCloser + limitReader io.Reader +} + +func NewLimitReadCloser(reader io.ReadCloser, length int64) *LimitReadCloser { + return &LimitReadCloser{ + reader: reader, + limitReader: io.LimitReader(reader, length), + } +} + +func (r *LimitReadCloser) Read(p []byte) (n int, err error) { + return r.limitReader.Read(p) +} + +func (r *LimitReadCloser) Close() error { + return r.reader.Close() +} + diff --git a/apps/backend/internal/upload/err.go b/apps/backend/internal/upload/err.go new file mode 100644 index 0000000..27fdc66 --- /dev/null +++ b/apps/backend/internal/upload/err.go @@ -0,0 +1,9 @@ +package upload + +import "errors" + +var ( + ErrNotFound = errors.New("not found") + ErrParentNotDirectory = errors.New("parent is not a directory") + ErrConflict = errors.New("node conflict") +) diff --git a/apps/backend/internal/upload/service.go b/apps/backend/internal/upload/service.go new file mode 100644 index 0000000..806f740 --- /dev/null +++ b/apps/backend/internal/upload/service.go @@ -0,0 +1,141 @@ +package upload + +import ( + "context" + "errors" + "io" + "sync" + "time" + + "github.com/get-drexa/drexa/internal/blob" + "github.com/get-drexa/drexa/internal/virtualfs" + "github.com/google/uuid" +) + +type Service struct { + vfs *virtualfs.VirtualFS + blobStore blob.Store + + pendingUploads sync.Map +} + +func NewService(vfs *virtualfs.VirtualFS, blobStore blob.Store) *Service { + return &Service{ + vfs: vfs, + blobStore: blobStore, + + pendingUploads: sync.Map{}, + } +} + +type CreateUploadOptions struct { + ParentID string + Name string +} + +type Upload struct { + ID string + TargetNode *virtualfs.Node + UploadURL string +} + +type CreateUploadResult struct { + UploadID string + UploadURL string +} + +func (s *Service) CreateUpload(ctx context.Context, userID uuid.UUID, opts CreateUploadOptions) (*Upload, error) { + parentNode, err := s.vfs.FindNodeByPublicID(ctx, userID, opts.ParentID) + if err != nil { + if errors.Is(err, virtualfs.ErrNodeNotFound) { + return nil, ErrNotFound + } + return nil, err + } + + if parentNode.Kind != virtualfs.NodeKindDirectory { + return nil, ErrParentNotDirectory + } + + node, err := s.vfs.CreateFile(ctx, userID, virtualfs.CreateFileOptions{ + ParentID: parentNode.ID, + Name: opts.Name, + }) + if err != nil { + if errors.Is(err, virtualfs.ErrNodeConflict) { + return nil, ErrConflict + } + return nil, err + } + + uploadURL, err := s.blobStore.GenerateUploadURL(ctx, node.BlobKey, blob.UploadURLOptions{ + Duration: 1 * time.Hour, + }) + if err != nil { + return nil, err + } + + upload := &Upload{ + ID: node.PublicID, + TargetNode: node, + UploadURL: uploadURL, + } + + s.pendingUploads.Store(upload.ID, upload) + + return upload, nil +} + +func (s *Service) ReceiveUpload(ctx context.Context, userID uuid.UUID, uploadID string, reader io.Reader) error { + n, ok := s.pendingUploads.Load(uploadID) + if !ok { + return ErrNotFound + } + + upload, ok := n.(*Upload) + if !ok { + return ErrNotFound + } + + if upload.TargetNode.UserID != userID { + return ErrNotFound + } + + err := s.vfs.WriteFile(ctx, upload.TargetNode, virtualfs.FileContentFromReader(reader)) + if err != nil { + return err + } + + s.pendingUploads.Delete(uploadID) + + return nil +} + +func (s *Service) CompleteUpload(ctx context.Context, userID uuid.UUID, uploadID string) error { + n, ok := s.pendingUploads.Load(uploadID) + if !ok { + return ErrNotFound + } + + upload, ok := n.(*Upload) + if !ok { + return ErrNotFound + } + + if upload.TargetNode.UserID != userID { + return ErrNotFound + } + + if upload.TargetNode.Status == virtualfs.NodeStatusReady { + return nil + } + + err := s.vfs.WriteFile(ctx, upload.TargetNode, virtualfs.FileContentFromBlobKey(upload.TargetNode.BlobKey)) + if err != nil { + return err + } + + s.pendingUploads.Delete(uploadID) + + return nil +} diff --git a/apps/backend/internal/virtualfs/key_resolver.go b/apps/backend/internal/virtualfs/key_resolver.go index c562352..e89b9eb 100644 --- a/apps/backend/internal/virtualfs/key_resolver.go +++ b/apps/backend/internal/virtualfs/key_resolver.go @@ -9,11 +9,16 @@ import ( ) type BlobKeyResolver interface { + KeyMode() blob.KeyMode Resolve(ctx context.Context, node *Node) (blob.Key, error) } type FlatKeyResolver struct{} +func (r *FlatKeyResolver) KeyMode() blob.KeyMode { + return blob.KeyModeStable +} + func (r *FlatKeyResolver) Resolve(ctx context.Context, node *Node) (blob.Key, error) { if node.BlobKey == "" { id, err := uuid.NewV7() @@ -29,10 +34,15 @@ type HierarchicalKeyResolver struct { db *bun.DB } +func (r *HierarchicalKeyResolver) KeyMode() blob.KeyMode { + return blob.KeyModeDerived +} + func (r *HierarchicalKeyResolver) Resolve(ctx context.Context, node *Node) (blob.Key, error) { path, err := buildNodeAbsolutePath(ctx, r.db, node.ID) if err != nil { return "", err } + return blob.Key(path), nil } diff --git a/apps/backend/internal/virtualfs/vfs.go b/apps/backend/internal/virtualfs/vfs.go index e41c4c4..b006247 100644 --- a/apps/backend/internal/virtualfs/vfs.go +++ b/apps/backend/internal/virtualfs/vfs.go @@ -12,6 +12,7 @@ import ( "github.com/gabriel-vasile/mimetype" "github.com/get-drexa/drexa/internal/blob" "github.com/get-drexa/drexa/internal/database" + "github.com/get-drexa/drexa/internal/ioext" "github.com/google/uuid" "github.com/sqids/sqids-go" "github.com/uptrace/bun" @@ -31,17 +32,35 @@ type CreateNodeOptions struct { Name string } -type WriteFileOptions struct { +type CreateFileOptions struct { ParentID uuid.UUID Name string } -func NewVirtualFS(db *bun.DB, blobStore blob.Store, keyResolver BlobKeyResolver) *VirtualFS { +type FileContent struct { + reader io.Reader + blobKey blob.Key +} + +func FileContentFromReader(reader io.Reader) FileContent { + return FileContent{reader: reader} +} + +func FileContentFromBlobKey(blobKey blob.Key) FileContent { + return FileContent{blobKey: blobKey} +} + +func NewVirtualFS(db *bun.DB, blobStore blob.Store, keyResolver BlobKeyResolver) (*VirtualFS, error) { + sqid, err := sqids.New() + if err != nil { + return nil, err + } return &VirtualFS{ db: db, blobStore: blobStore, keyResolver: keyResolver, - } + sqid: sqid, + }, nil } func (vfs *VirtualFS) FindNode(ctx context.Context, userID, fileID string) (*Node, error) { @@ -100,7 +119,7 @@ func (vfs *VirtualFS) ListChildren(ctx context.Context, node *Node) ([]*Node, er return nodes, nil } -func (vfs *VirtualFS) WriteFile(ctx context.Context, userID uuid.UUID, reader io.Reader, opts WriteFileOptions) (*Node, error) { +func (vfs *VirtualFS) CreateFile(ctx context.Context, userID uuid.UUID, opts CreateFileOptions) (*Node, error) { pid, err := vfs.generatePublicID() if err != nil { return nil, err @@ -115,7 +134,14 @@ func (vfs *VirtualFS) WriteFile(ctx context.Context, userID uuid.UUID, reader io Name: opts.Name, } - _, err = vfs.db.NewInsert().Model(&node).Exec(ctx) + if vfs.keyResolver.KeyMode() == blob.KeyModeStable { + node.BlobKey, err = vfs.keyResolver.Resolve(ctx, &node) + if err != nil { + return nil, err + } + } + + _, err = vfs.db.NewInsert().Model(&node).Returning("*").Exec(ctx) if err != nil { if database.IsUniqueViolation(err) { return nil, ErrNodeConflict @@ -123,48 +149,88 @@ func (vfs *VirtualFS) WriteFile(ctx context.Context, userID uuid.UUID, reader io return nil, err } - cleanup := func() { - _, _ = vfs.db.NewDelete().Model(&node).WherePK().Exec(ctx) + return &node, nil +} + +func (vfs *VirtualFS) WriteFile(ctx context.Context, node *Node, content FileContent) error { + if content.reader == nil && content.blobKey.IsNil() { + return blob.ErrInvalidFileContent } - key, err := vfs.keyResolver.Resolve(ctx, &node) - if err != nil { - cleanup() - return nil, err + if !node.DeletedAt.IsZero() { + return ErrNodeNotFound } - h := make([]byte, 3072) - n, err := io.ReadFull(reader, h) - if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { - cleanup() - return nil, err - } - h = h[:n] + setCols := make([]string, 0, 4) - mt := mimetype.Detect(h) - cr := NewCountingReader(io.MultiReader(bytes.NewReader(h), reader)) + if content.reader != nil { + key, err := vfs.keyResolver.Resolve(ctx, node) + if err != nil { + return err + } - err = vfs.blobStore.Put(ctx, key, cr) - if err != nil { - cleanup() - return nil, err + buf := make([]byte, 3072) + n, err := io.ReadFull(content.reader, buf) + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + return err + } + buf = buf[:n] + + mt := mimetype.Detect(buf) + cr := ioext.NewCountingReader(io.MultiReader(bytes.NewReader(buf), content.reader)) + + err = vfs.blobStore.Put(ctx, key, cr) + if err != nil { + return err + } + + if vfs.keyResolver.KeyMode() == blob.KeyModeStable { + node.BlobKey = key + setCols = append(setCols, "blob_key") + } + + node.MimeType = mt.String() + node.Size = cr.Count() + node.Status = NodeStatusReady + + setCols = append(setCols, "mime_type", "size", "status") + } else { + node.BlobKey = content.blobKey + + b, err := vfs.blobStore.ReadRange(ctx, content.blobKey, 0, 3072) + if err != nil { + return err + } + defer b.Close() + + buf := make([]byte, 3072) + n, err := io.ReadFull(b, buf) + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + return err + } + buf = buf[:n] + mt := mimetype.Detect(buf) + node.MimeType = mt.String() + node.Status = NodeStatusReady + + s, err := vfs.blobStore.ReadSize(ctx, content.blobKey) + if err != nil { + return err + } + node.Size = s + + setCols = append(setCols, "mime_type", "blob_key", "size", "status") } - node.BlobKey = key - node.Size = cr.Count() - node.MimeType = mt.String() - node.Status = NodeStatusReady - - _, err = vfs.db.NewUpdate().Model(&node). - Column("status", "blob_key", "size", "mime_type"). + _, err := vfs.db.NewUpdate().Model(&node). + Column(setCols...). WherePK(). Exec(ctx) if err != nil { - cleanup() - return nil, err + return err } - return &node, nil + return nil } func (vfs *VirtualFS) CreateDirectory(ctx context.Context, userID uuid.UUID, parentID uuid.UUID, name string) (*Node, error) { @@ -216,7 +282,7 @@ func (vfs *VirtualFS) SoftDeleteNode(ctx context.Context, node *Node) error { } func (vfs *VirtualFS) RestoreNode(ctx context.Context, node *Node) error { - if !node.IsAccessible() { + if node.Status != NodeStatusReady { return ErrNodeNotFound } @@ -289,15 +355,13 @@ func (vfs *VirtualFS) MoveNode(ctx context.Context, node *Node, parentID uuid.UU return err } - if node.Kind == NodeKindFile && !node.BlobKey.IsNil() && oldKey != newKey { - // if node is a file, has a previous key, and the new key is different, we need to update the node with the new key - err = vfs.blobStore.Move(ctx, oldKey, newKey) - if err != nil { - return err - } + err = vfs.blobStore.Move(ctx, oldKey, newKey) + if err != nil { + return err + } + if vfs.keyResolver.KeyMode() == blob.KeyModeStable { node.BlobKey = newKey - _, err = vfs.db.NewUpdate().Model(node). WherePK(). Set("blob_key = ?", newKey).