feat: impl upload service

This commit is contained in:
2025-11-28 22:31:00 +00:00
parent 797b40a35c
commit 987f36e1d2
10 changed files with 361 additions and 48 deletions

View File

@@ -1,22 +0,0 @@
package virtualfs
import "io"
type CountingReader struct {
reader io.Reader
count int64
}
func NewCountingReader(reader io.Reader) *CountingReader {
return &CountingReader{reader: reader}
}
func (r *CountingReader) Read(p []byte) (n int, err error) {
n, err = r.reader.Read(p)
r.count += int64(n)
return n, err
}
func (r *CountingReader) Count() int64 {
return r.count
}

View File

@@ -9,11 +9,16 @@ import (
)
type BlobKeyResolver interface {
KeyMode() blob.KeyMode
Resolve(ctx context.Context, node *Node) (blob.Key, error)
}
type FlatKeyResolver struct{}
func (r *FlatKeyResolver) KeyMode() blob.KeyMode {
return blob.KeyModeStable
}
func (r *FlatKeyResolver) Resolve(ctx context.Context, node *Node) (blob.Key, error) {
if node.BlobKey == "" {
id, err := uuid.NewV7()
@@ -29,10 +34,15 @@ type HierarchicalKeyResolver struct {
db *bun.DB
}
func (r *HierarchicalKeyResolver) KeyMode() blob.KeyMode {
return blob.KeyModeDerived
}
func (r *HierarchicalKeyResolver) Resolve(ctx context.Context, node *Node) (blob.Key, error) {
path, err := buildNodeAbsolutePath(ctx, r.db, node.ID)
if err != nil {
return "", err
}
return blob.Key(path), nil
}

View File

@@ -12,6 +12,7 @@ import (
"github.com/gabriel-vasile/mimetype"
"github.com/get-drexa/drexa/internal/blob"
"github.com/get-drexa/drexa/internal/database"
"github.com/get-drexa/drexa/internal/ioext"
"github.com/google/uuid"
"github.com/sqids/sqids-go"
"github.com/uptrace/bun"
@@ -31,17 +32,35 @@ type CreateNodeOptions struct {
Name string
}
type WriteFileOptions struct {
type CreateFileOptions struct {
ParentID uuid.UUID
Name string
}
func NewVirtualFS(db *bun.DB, blobStore blob.Store, keyResolver BlobKeyResolver) *VirtualFS {
type FileContent struct {
reader io.Reader
blobKey blob.Key
}
func FileContentFromReader(reader io.Reader) FileContent {
return FileContent{reader: reader}
}
func FileContentFromBlobKey(blobKey blob.Key) FileContent {
return FileContent{blobKey: blobKey}
}
func NewVirtualFS(db *bun.DB, blobStore blob.Store, keyResolver BlobKeyResolver) (*VirtualFS, error) {
sqid, err := sqids.New()
if err != nil {
return nil, err
}
return &VirtualFS{
db: db,
blobStore: blobStore,
keyResolver: keyResolver,
}
sqid: sqid,
}, nil
}
func (vfs *VirtualFS) FindNode(ctx context.Context, userID, fileID string) (*Node, error) {
@@ -100,7 +119,7 @@ func (vfs *VirtualFS) ListChildren(ctx context.Context, node *Node) ([]*Node, er
return nodes, nil
}
func (vfs *VirtualFS) WriteFile(ctx context.Context, userID uuid.UUID, reader io.Reader, opts WriteFileOptions) (*Node, error) {
func (vfs *VirtualFS) CreateFile(ctx context.Context, userID uuid.UUID, opts CreateFileOptions) (*Node, error) {
pid, err := vfs.generatePublicID()
if err != nil {
return nil, err
@@ -115,7 +134,14 @@ func (vfs *VirtualFS) WriteFile(ctx context.Context, userID uuid.UUID, reader io
Name: opts.Name,
}
_, err = vfs.db.NewInsert().Model(&node).Exec(ctx)
if vfs.keyResolver.KeyMode() == blob.KeyModeStable {
node.BlobKey, err = vfs.keyResolver.Resolve(ctx, &node)
if err != nil {
return nil, err
}
}
_, err = vfs.db.NewInsert().Model(&node).Returning("*").Exec(ctx)
if err != nil {
if database.IsUniqueViolation(err) {
return nil, ErrNodeConflict
@@ -123,48 +149,88 @@ func (vfs *VirtualFS) WriteFile(ctx context.Context, userID uuid.UUID, reader io
return nil, err
}
cleanup := func() {
_, _ = vfs.db.NewDelete().Model(&node).WherePK().Exec(ctx)
return &node, nil
}
func (vfs *VirtualFS) WriteFile(ctx context.Context, node *Node, content FileContent) error {
if content.reader == nil && content.blobKey.IsNil() {
return blob.ErrInvalidFileContent
}
key, err := vfs.keyResolver.Resolve(ctx, &node)
if err != nil {
cleanup()
return nil, err
if !node.DeletedAt.IsZero() {
return ErrNodeNotFound
}
h := make([]byte, 3072)
n, err := io.ReadFull(reader, h)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
cleanup()
return nil, err
}
h = h[:n]
setCols := make([]string, 0, 4)
mt := mimetype.Detect(h)
cr := NewCountingReader(io.MultiReader(bytes.NewReader(h), reader))
if content.reader != nil {
key, err := vfs.keyResolver.Resolve(ctx, node)
if err != nil {
return err
}
err = vfs.blobStore.Put(ctx, key, cr)
if err != nil {
cleanup()
return nil, err
buf := make([]byte, 3072)
n, err := io.ReadFull(content.reader, buf)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
return err
}
buf = buf[:n]
mt := mimetype.Detect(buf)
cr := ioext.NewCountingReader(io.MultiReader(bytes.NewReader(buf), content.reader))
err = vfs.blobStore.Put(ctx, key, cr)
if err != nil {
return err
}
if vfs.keyResolver.KeyMode() == blob.KeyModeStable {
node.BlobKey = key
setCols = append(setCols, "blob_key")
}
node.MimeType = mt.String()
node.Size = cr.Count()
node.Status = NodeStatusReady
setCols = append(setCols, "mime_type", "size", "status")
} else {
node.BlobKey = content.blobKey
b, err := vfs.blobStore.ReadRange(ctx, content.blobKey, 0, 3072)
if err != nil {
return err
}
defer b.Close()
buf := make([]byte, 3072)
n, err := io.ReadFull(b, buf)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
return err
}
buf = buf[:n]
mt := mimetype.Detect(buf)
node.MimeType = mt.String()
node.Status = NodeStatusReady
s, err := vfs.blobStore.ReadSize(ctx, content.blobKey)
if err != nil {
return err
}
node.Size = s
setCols = append(setCols, "mime_type", "blob_key", "size", "status")
}
node.BlobKey = key
node.Size = cr.Count()
node.MimeType = mt.String()
node.Status = NodeStatusReady
_, err = vfs.db.NewUpdate().Model(&node).
Column("status", "blob_key", "size", "mime_type").
_, err := vfs.db.NewUpdate().Model(&node).
Column(setCols...).
WherePK().
Exec(ctx)
if err != nil {
cleanup()
return nil, err
return err
}
return &node, nil
return nil
}
func (vfs *VirtualFS) CreateDirectory(ctx context.Context, userID uuid.UUID, parentID uuid.UUID, name string) (*Node, error) {
@@ -216,7 +282,7 @@ func (vfs *VirtualFS) SoftDeleteNode(ctx context.Context, node *Node) error {
}
func (vfs *VirtualFS) RestoreNode(ctx context.Context, node *Node) error {
if !node.IsAccessible() {
if node.Status != NodeStatusReady {
return ErrNodeNotFound
}
@@ -289,15 +355,13 @@ func (vfs *VirtualFS) MoveNode(ctx context.Context, node *Node, parentID uuid.UU
return err
}
if node.Kind == NodeKindFile && !node.BlobKey.IsNil() && oldKey != newKey {
// if node is a file, has a previous key, and the new key is different, we need to update the node with the new key
err = vfs.blobStore.Move(ctx, oldKey, newKey)
if err != nil {
return err
}
err = vfs.blobStore.Move(ctx, oldKey, newKey)
if err != nil {
return err
}
if vfs.keyResolver.KeyMode() == blob.KeyModeStable {
node.BlobKey = newKey
_, err = vfs.db.NewUpdate().Model(node).
WherePK().
Set("blob_key = ?", newKey).