Compare commits

...

2 Commits

Author SHA1 Message Date
9ea76d2021 build: per lang code format settings 2025-11-28 22:31:25 +00:00
987f36e1d2 feat: impl upload service 2025-11-28 22:31:00 +00:00
11 changed files with 418 additions and 54 deletions

View File

@@ -25,12 +25,7 @@
"golang.go" "golang.go"
], ],
"settings": { "settings": {
"editor.defaultFormatter": "biomejs.biome",
"editor.formatOnSave": true, "editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports.biome": "explicit",
"source.fixAll.biome": "explicit"
},
"typescript.preferences.importModuleSpecifier": "relative", "typescript.preferences.importModuleSpecifier": "relative",
"typescript.suggest.autoImports": true, "typescript.suggest.autoImports": true,
"emmet.includeLanguages": { "emmet.includeLanguages": {
@@ -40,7 +35,63 @@
"tailwindCSS.experimental.classRegex": [ "tailwindCSS.experimental.classRegex": [
["cva\\(([^)]*)\\)", "[\"'`]([^\"'`]*).*?[\"'`]"], ["cva\\(([^)]*)\\)", "[\"'`]([^\"'`]*).*?[\"'`]"],
["cx\\(([^)]*)\\)", "(?:'|\"|`)([^']*)(?:'|\"|`)"] ["cx\\(([^)]*)\\)", "(?:'|\"|`)([^']*)(?:'|\"|`)"]
] ],
"[javascript]": {
"editor.formatOnSave": true,
"editor.defaultFormatter": "biomejs.biome",
"editor.codeActionsOnSave": {
"source.organizeImports.biome": "explicit",
"source.fixAll.biome": "explicit"
}
},
"[javascriptreact]": {
"editor.formatOnSave": true,
"editor.defaultFormatter": "biomejs.biome",
"editor.codeActionsOnSave": {
"source.organizeImports.biome": "explicit",
"source.fixAll.biome": "explicit"
}
},
"[typescript]": {
"editor.formatOnSave": true,
"editor.defaultFormatter": "biomejs.biome",
"editor.codeActionsOnSave": {
"source.organizeImports.biome": "explicit",
"source.fixAll.biome": "explicit"
}
},
"[typescriptreact]": {
"editor.formatOnSave": true,
"editor.defaultFormatter": "biomejs.biome",
"editor.codeActionsOnSave": {
"source.organizeImports.biome": "explicit",
"source.fixAll.biome": "explicit"
}
},
"[json]": {
"editor.formatOnSave": true,
"editor.defaultFormatter": "biomejs.biome",
"editor.codeActionsOnSave": {
"source.fixAll.biome": "explicit"
}
},
"[jsonc]": {
"editor.formatOnSave": true,
"editor.defaultFormatter": "biomejs.biome",
"editor.codeActionsOnSave": {
"source.fixAll.biome": "explicit"
}
},
"[go]": {
"editor.formatOnSave": true,
"editor.defaultFormatter": "golang.go",
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit"
}
},
"go.formatTool": "goimports",
"go.lintTool": "golangci-lint",
"go.useLanguageServer": true
} }
} }
}, },

View File

@@ -5,4 +5,5 @@ import "errors"
var ( var (
ErrConflict = errors.New("key already used for a different blob") ErrConflict = errors.New("key already used for a different blob")
ErrNotFound = errors.New("key not found") ErrNotFound = errors.New("key not found")
ErrInvalidFileContent = errors.New("invalid file content. must provide either a reader or a blob key")
) )

View File

@@ -5,6 +5,8 @@ import (
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"github.com/get-drexa/drexa/internal/ioext"
) )
var _ Store = &FSStore{} var _ Store = &FSStore{}
@@ -15,12 +17,17 @@ type FSStore struct {
type FSStoreConfig struct { type FSStoreConfig struct {
Root string Root string
UploadURL string
} }
func NewFSStore(config FSStoreConfig) *FSStore { func NewFSStore(config FSStoreConfig) *FSStore {
return &FSStore{config: config} return &FSStore{config: config}
} }
func (s *FSStore) GenerateUploadURL(ctx context.Context, key Key, opts UploadURLOptions) (string, error) {
return s.config.UploadURL, nil
}
func (s *FSStore) Put(ctx context.Context, key Key, reader io.Reader) error { func (s *FSStore) Put(ctx context.Context, key Key, reader io.Reader) error {
path := filepath.Join(s.config.Root, string(key)) path := filepath.Join(s.config.Root, string(key))
@@ -47,7 +54,7 @@ func (s *FSStore) Put(ctx context.Context, key Key, reader io.Reader) error {
return nil return nil
} }
func (s *FSStore) Retrieve(ctx context.Context, key Key) (io.ReadCloser, error) { func (s *FSStore) Read(ctx context.Context, key Key) (io.ReadCloser, error) {
path := filepath.Join(s.config.Root, string(key)) path := filepath.Join(s.config.Root, string(key))
f, err := os.Open(path) f, err := os.Open(path)
if err != nil { if err != nil {
@@ -59,6 +66,37 @@ func (s *FSStore) Retrieve(ctx context.Context, key Key) (io.ReadCloser, error)
return f, nil return f, nil
} }
func (s *FSStore) ReadRange(ctx context.Context, key Key, offset, length int64) (io.ReadCloser, error) {
path := filepath.Join(s.config.Root, string(key))
f, err := os.Open(path)
if err != nil {
if os.IsNotExist(err) {
return nil, ErrNotFound
}
return nil, err
}
_, err = f.Seek(offset, io.SeekStart)
if err != nil {
return nil, err
}
return ioext.NewLimitReadCloser(f, length), nil
}
func (s *FSStore) ReadSize(ctx context.Context, key Key) (int64, error) {
path := filepath.Join(s.config.Root, string(key))
fi, err := os.Stat(path)
if err != nil {
if os.IsNotExist(err) {
return 0, ErrNotFound
}
return 0, err
}
return fi.Size(), nil
}
func (s *FSStore) Delete(ctx context.Context, key Key) error { func (s *FSStore) Delete(ctx context.Context, key Key) error {
err := os.Remove(filepath.Join(s.config.Root, string(key))) err := os.Remove(filepath.Join(s.config.Root, string(key)))
// no op if file does not exist // no op if file does not exist
@@ -69,6 +107,11 @@ func (s *FSStore) Delete(ctx context.Context, key Key) error {
return nil return nil
} }
func (s *FSStore) Update(ctx context.Context, key Key, opts UpdateOptions) error {
// Update is a no-op for FSStore
return nil
}
func (s *FSStore) Move(ctx context.Context, srcKey, dstKey Key) error { func (s *FSStore) Move(ctx context.Context, srcKey, dstKey Key) error {
oldPath := filepath.Join(s.config.Root, string(srcKey)) oldPath := filepath.Join(s.config.Root, string(srcKey))
newPath := filepath.Join(s.config.Root, string(dstKey)) newPath := filepath.Join(s.config.Root, string(dstKey))

View File

@@ -2,6 +2,13 @@ package blob
type Key string type Key string
type KeyMode int
const (
KeyModeStable KeyMode = iota
KeyModeDerived
)
func (k Key) IsNil() bool { func (k Key) IsNil() bool {
return k == "" return k == ""
} }

View File

@@ -3,11 +3,24 @@ package blob
import ( import (
"context" "context"
"io" "io"
"time"
) )
type UploadURLOptions struct {
Duration time.Duration
}
type UpdateOptions struct {
ContentType string
}
type Store interface { type Store interface {
GenerateUploadURL(ctx context.Context, key Key, opts UploadURLOptions) (string, error)
Put(ctx context.Context, key Key, reader io.Reader) error Put(ctx context.Context, key Key, reader io.Reader) error
Retrieve(ctx context.Context, key Key) (io.ReadCloser, error) Update(ctx context.Context, key Key, opts UpdateOptions) error
Delete(ctx context.Context, key Key) error Delete(ctx context.Context, key Key) error
Move(ctx context.Context, srcKey, dstKey Key) error Move(ctx context.Context, srcKey, dstKey Key) error
Read(ctx context.Context, key Key) (io.ReadCloser, error)
ReadRange(ctx context.Context, key Key, offset, length int64) (io.ReadCloser, error)
ReadSize(ctx context.Context, key Key) (int64, error)
} }

View File

@@ -1,4 +1,4 @@
package virtualfs package ioext
import "io" import "io"
@@ -20,3 +20,4 @@ func (r *CountingReader) Read(p []byte) (n int, err error) {
func (r *CountingReader) Count() int64 { func (r *CountingReader) Count() int64 {
return r.count return r.count
} }

View File

@@ -0,0 +1,24 @@
package ioext
import "io"
type LimitReadCloser struct {
reader io.ReadCloser
limitReader io.Reader
}
func NewLimitReadCloser(reader io.ReadCloser, length int64) *LimitReadCloser {
return &LimitReadCloser{
reader: reader,
limitReader: io.LimitReader(reader, length),
}
}
func (r *LimitReadCloser) Read(p []byte) (n int, err error) {
return r.limitReader.Read(p)
}
func (r *LimitReadCloser) Close() error {
return r.reader.Close()
}

View File

@@ -0,0 +1,9 @@
package upload
import "errors"
var (
ErrNotFound = errors.New("not found")
ErrParentNotDirectory = errors.New("parent is not a directory")
ErrConflict = errors.New("node conflict")
)

View File

@@ -0,0 +1,141 @@
package upload
import (
"context"
"errors"
"io"
"sync"
"time"
"github.com/get-drexa/drexa/internal/blob"
"github.com/get-drexa/drexa/internal/virtualfs"
"github.com/google/uuid"
)
type Service struct {
vfs *virtualfs.VirtualFS
blobStore blob.Store
pendingUploads sync.Map
}
func NewService(vfs *virtualfs.VirtualFS, blobStore blob.Store) *Service {
return &Service{
vfs: vfs,
blobStore: blobStore,
pendingUploads: sync.Map{},
}
}
type CreateUploadOptions struct {
ParentID string
Name string
}
type Upload struct {
ID string
TargetNode *virtualfs.Node
UploadURL string
}
type CreateUploadResult struct {
UploadID string
UploadURL string
}
func (s *Service) CreateUpload(ctx context.Context, userID uuid.UUID, opts CreateUploadOptions) (*Upload, error) {
parentNode, err := s.vfs.FindNodeByPublicID(ctx, userID, opts.ParentID)
if err != nil {
if errors.Is(err, virtualfs.ErrNodeNotFound) {
return nil, ErrNotFound
}
return nil, err
}
if parentNode.Kind != virtualfs.NodeKindDirectory {
return nil, ErrParentNotDirectory
}
node, err := s.vfs.CreateFile(ctx, userID, virtualfs.CreateFileOptions{
ParentID: parentNode.ID,
Name: opts.Name,
})
if err != nil {
if errors.Is(err, virtualfs.ErrNodeConflict) {
return nil, ErrConflict
}
return nil, err
}
uploadURL, err := s.blobStore.GenerateUploadURL(ctx, node.BlobKey, blob.UploadURLOptions{
Duration: 1 * time.Hour,
})
if err != nil {
return nil, err
}
upload := &Upload{
ID: node.PublicID,
TargetNode: node,
UploadURL: uploadURL,
}
s.pendingUploads.Store(upload.ID, upload)
return upload, nil
}
func (s *Service) ReceiveUpload(ctx context.Context, userID uuid.UUID, uploadID string, reader io.Reader) error {
n, ok := s.pendingUploads.Load(uploadID)
if !ok {
return ErrNotFound
}
upload, ok := n.(*Upload)
if !ok {
return ErrNotFound
}
if upload.TargetNode.UserID != userID {
return ErrNotFound
}
err := s.vfs.WriteFile(ctx, upload.TargetNode, virtualfs.FileContentFromReader(reader))
if err != nil {
return err
}
s.pendingUploads.Delete(uploadID)
return nil
}
func (s *Service) CompleteUpload(ctx context.Context, userID uuid.UUID, uploadID string) error {
n, ok := s.pendingUploads.Load(uploadID)
if !ok {
return ErrNotFound
}
upload, ok := n.(*Upload)
if !ok {
return ErrNotFound
}
if upload.TargetNode.UserID != userID {
return ErrNotFound
}
if upload.TargetNode.Status == virtualfs.NodeStatusReady {
return nil
}
err := s.vfs.WriteFile(ctx, upload.TargetNode, virtualfs.FileContentFromBlobKey(upload.TargetNode.BlobKey))
if err != nil {
return err
}
s.pendingUploads.Delete(uploadID)
return nil
}

View File

@@ -9,11 +9,16 @@ import (
) )
type BlobKeyResolver interface { type BlobKeyResolver interface {
KeyMode() blob.KeyMode
Resolve(ctx context.Context, node *Node) (blob.Key, error) Resolve(ctx context.Context, node *Node) (blob.Key, error)
} }
type FlatKeyResolver struct{} type FlatKeyResolver struct{}
func (r *FlatKeyResolver) KeyMode() blob.KeyMode {
return blob.KeyModeStable
}
func (r *FlatKeyResolver) Resolve(ctx context.Context, node *Node) (blob.Key, error) { func (r *FlatKeyResolver) Resolve(ctx context.Context, node *Node) (blob.Key, error) {
if node.BlobKey == "" { if node.BlobKey == "" {
id, err := uuid.NewV7() id, err := uuid.NewV7()
@@ -29,10 +34,15 @@ type HierarchicalKeyResolver struct {
db *bun.DB db *bun.DB
} }
func (r *HierarchicalKeyResolver) KeyMode() blob.KeyMode {
return blob.KeyModeDerived
}
func (r *HierarchicalKeyResolver) Resolve(ctx context.Context, node *Node) (blob.Key, error) { func (r *HierarchicalKeyResolver) Resolve(ctx context.Context, node *Node) (blob.Key, error) {
path, err := buildNodeAbsolutePath(ctx, r.db, node.ID) path, err := buildNodeAbsolutePath(ctx, r.db, node.ID)
if err != nil { if err != nil {
return "", err return "", err
} }
return blob.Key(path), nil return blob.Key(path), nil
} }

View File

@@ -12,6 +12,7 @@ import (
"github.com/gabriel-vasile/mimetype" "github.com/gabriel-vasile/mimetype"
"github.com/get-drexa/drexa/internal/blob" "github.com/get-drexa/drexa/internal/blob"
"github.com/get-drexa/drexa/internal/database" "github.com/get-drexa/drexa/internal/database"
"github.com/get-drexa/drexa/internal/ioext"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/sqids/sqids-go" "github.com/sqids/sqids-go"
"github.com/uptrace/bun" "github.com/uptrace/bun"
@@ -31,17 +32,35 @@ type CreateNodeOptions struct {
Name string Name string
} }
type WriteFileOptions struct { type CreateFileOptions struct {
ParentID uuid.UUID ParentID uuid.UUID
Name string Name string
} }
func NewVirtualFS(db *bun.DB, blobStore blob.Store, keyResolver BlobKeyResolver) *VirtualFS { type FileContent struct {
reader io.Reader
blobKey blob.Key
}
func FileContentFromReader(reader io.Reader) FileContent {
return FileContent{reader: reader}
}
func FileContentFromBlobKey(blobKey blob.Key) FileContent {
return FileContent{blobKey: blobKey}
}
func NewVirtualFS(db *bun.DB, blobStore blob.Store, keyResolver BlobKeyResolver) (*VirtualFS, error) {
sqid, err := sqids.New()
if err != nil {
return nil, err
}
return &VirtualFS{ return &VirtualFS{
db: db, db: db,
blobStore: blobStore, blobStore: blobStore,
keyResolver: keyResolver, keyResolver: keyResolver,
} sqid: sqid,
}, nil
} }
func (vfs *VirtualFS) FindNode(ctx context.Context, userID, fileID string) (*Node, error) { func (vfs *VirtualFS) FindNode(ctx context.Context, userID, fileID string) (*Node, error) {
@@ -100,7 +119,7 @@ func (vfs *VirtualFS) ListChildren(ctx context.Context, node *Node) ([]*Node, er
return nodes, nil return nodes, nil
} }
func (vfs *VirtualFS) WriteFile(ctx context.Context, userID uuid.UUID, reader io.Reader, opts WriteFileOptions) (*Node, error) { func (vfs *VirtualFS) CreateFile(ctx context.Context, userID uuid.UUID, opts CreateFileOptions) (*Node, error) {
pid, err := vfs.generatePublicID() pid, err := vfs.generatePublicID()
if err != nil { if err != nil {
return nil, err return nil, err
@@ -115,7 +134,14 @@ func (vfs *VirtualFS) WriteFile(ctx context.Context, userID uuid.UUID, reader io
Name: opts.Name, Name: opts.Name,
} }
_, err = vfs.db.NewInsert().Model(&node).Exec(ctx) if vfs.keyResolver.KeyMode() == blob.KeyModeStable {
node.BlobKey, err = vfs.keyResolver.Resolve(ctx, &node)
if err != nil {
return nil, err
}
}
_, err = vfs.db.NewInsert().Model(&node).Returning("*").Exec(ctx)
if err != nil { if err != nil {
if database.IsUniqueViolation(err) { if database.IsUniqueViolation(err) {
return nil, ErrNodeConflict return nil, ErrNodeConflict
@@ -123,48 +149,88 @@ func (vfs *VirtualFS) WriteFile(ctx context.Context, userID uuid.UUID, reader io
return nil, err return nil, err
} }
cleanup := func() { return &node, nil
_, _ = vfs.db.NewDelete().Model(&node).WherePK().Exec(ctx) }
func (vfs *VirtualFS) WriteFile(ctx context.Context, node *Node, content FileContent) error {
if content.reader == nil && content.blobKey.IsNil() {
return blob.ErrInvalidFileContent
} }
key, err := vfs.keyResolver.Resolve(ctx, &node) if !node.DeletedAt.IsZero() {
return ErrNodeNotFound
}
setCols := make([]string, 0, 4)
if content.reader != nil {
key, err := vfs.keyResolver.Resolve(ctx, node)
if err != nil { if err != nil {
cleanup() return err
return nil, err
} }
h := make([]byte, 3072) buf := make([]byte, 3072)
n, err := io.ReadFull(reader, h) n, err := io.ReadFull(content.reader, buf)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
cleanup() return err
return nil, err
} }
h = h[:n] buf = buf[:n]
mt := mimetype.Detect(h) mt := mimetype.Detect(buf)
cr := NewCountingReader(io.MultiReader(bytes.NewReader(h), reader)) cr := ioext.NewCountingReader(io.MultiReader(bytes.NewReader(buf), content.reader))
err = vfs.blobStore.Put(ctx, key, cr) err = vfs.blobStore.Put(ctx, key, cr)
if err != nil { if err != nil {
cleanup() return err
return nil, err
} }
if vfs.keyResolver.KeyMode() == blob.KeyModeStable {
node.BlobKey = key node.BlobKey = key
setCols = append(setCols, "blob_key")
}
node.MimeType = mt.String()
node.Size = cr.Count() node.Size = cr.Count()
node.Status = NodeStatusReady
setCols = append(setCols, "mime_type", "size", "status")
} else {
node.BlobKey = content.blobKey
b, err := vfs.blobStore.ReadRange(ctx, content.blobKey, 0, 3072)
if err != nil {
return err
}
defer b.Close()
buf := make([]byte, 3072)
n, err := io.ReadFull(b, buf)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
return err
}
buf = buf[:n]
mt := mimetype.Detect(buf)
node.MimeType = mt.String() node.MimeType = mt.String()
node.Status = NodeStatusReady node.Status = NodeStatusReady
_, err = vfs.db.NewUpdate().Model(&node). s, err := vfs.blobStore.ReadSize(ctx, content.blobKey)
Column("status", "blob_key", "size", "mime_type"). if err != nil {
return err
}
node.Size = s
setCols = append(setCols, "mime_type", "blob_key", "size", "status")
}
_, err := vfs.db.NewUpdate().Model(&node).
Column(setCols...).
WherePK(). WherePK().
Exec(ctx) Exec(ctx)
if err != nil { if err != nil {
cleanup() return err
return nil, err
} }
return &node, nil return nil
} }
func (vfs *VirtualFS) CreateDirectory(ctx context.Context, userID uuid.UUID, parentID uuid.UUID, name string) (*Node, error) { func (vfs *VirtualFS) CreateDirectory(ctx context.Context, userID uuid.UUID, parentID uuid.UUID, name string) (*Node, error) {
@@ -216,7 +282,7 @@ func (vfs *VirtualFS) SoftDeleteNode(ctx context.Context, node *Node) error {
} }
func (vfs *VirtualFS) RestoreNode(ctx context.Context, node *Node) error { func (vfs *VirtualFS) RestoreNode(ctx context.Context, node *Node) error {
if !node.IsAccessible() { if node.Status != NodeStatusReady {
return ErrNodeNotFound return ErrNodeNotFound
} }
@@ -289,15 +355,13 @@ func (vfs *VirtualFS) MoveNode(ctx context.Context, node *Node, parentID uuid.UU
return err return err
} }
if node.Kind == NodeKindFile && !node.BlobKey.IsNil() && oldKey != newKey {
// if node is a file, has a previous key, and the new key is different, we need to update the node with the new key
err = vfs.blobStore.Move(ctx, oldKey, newKey) err = vfs.blobStore.Move(ctx, oldKey, newKey)
if err != nil { if err != nil {
return err return err
} }
if vfs.keyResolver.KeyMode() == blob.KeyModeStable {
node.BlobKey = newKey node.BlobKey = newKey
_, err = vfs.db.NewUpdate().Model(node). _, err = vfs.db.NewUpdate().Model(node).
WherePK(). WherePK().
Set("blob_key = ?", newKey). Set("blob_key = ?", newKey).