From b1e34f878c257cef6cea6055f597267bbb6d3541 Mon Sep 17 00:00:00 2001 From: Kenneth Date: Thu, 27 Nov 2025 20:49:58 +0000 Subject: [PATCH] wip: vfs implementation --- apps/backend/go.mod | 2 + apps/backend/go.sum | 4 + .../internal/database/migrations/initial.sql | 67 ++--- .../internal/virtualfs/counting_reader.go | 22 ++ apps/backend/internal/virtualfs/err.go | 8 + .../internal/virtualfs/key_resolver.go | 38 +++ apps/backend/internal/virtualfs/node.go | 49 ++++ apps/backend/internal/virtualfs/path.go | 42 +++ apps/backend/internal/virtualfs/vfs.go | 270 ++++++++++++++++++ 9 files changed, 466 insertions(+), 36 deletions(-) create mode 100644 apps/backend/internal/virtualfs/counting_reader.go create mode 100644 apps/backend/internal/virtualfs/err.go create mode 100644 apps/backend/internal/virtualfs/key_resolver.go create mode 100644 apps/backend/internal/virtualfs/node.go create mode 100644 apps/backend/internal/virtualfs/path.go create mode 100644 apps/backend/internal/virtualfs/vfs.go diff --git a/apps/backend/go.mod b/apps/backend/go.mod index ec66e20..7a68bb7 100644 --- a/apps/backend/go.mod +++ b/apps/backend/go.mod @@ -10,7 +10,9 @@ require ( ) require ( + github.com/gabriel-vasile/mimetype v1.4.11 // indirect github.com/joho/godotenv v1.5.1 // indirect + github.com/sqids/sqids-go v0.4.1 // indirect go.opentelemetry.io/otel v1.37.0 // indirect go.opentelemetry.io/otel/trace v1.37.0 // indirect mellium.im/sasl v0.3.2 // indirect diff --git a/apps/backend/go.sum b/apps/backend/go.sum index c79049d..acfb399 100644 --- a/apps/backend/go.sum +++ b/apps/backend/go.sum @@ -2,6 +2,8 @@ github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1 github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gabriel-vasile/mimetype v1.4.11 h1:AQvxbp830wPhHTqc1u7nzoLT+ZFxGY7emj5DR5DYFik= +github.com/gabriel-vasile/mimetype v1.4.11/go.mod h1:d+9Oxyo1wTzWdyVUPMmXFvp4F9tea18J8ufA774AB3s= github.com/gofiber/fiber/v2 v2.52.9 h1:YjKl5DOiyP3j0mO61u3NTmK7or8GzzWzCFzkboyP5cw= github.com/gofiber/fiber/v2 v2.52.9/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw= github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo= @@ -26,6 +28,8 @@ github.com/puzpuzpuz/xsync/v3 v3.5.1 h1:GJYJZwO6IdxN/IKbneznS6yPkVC+c3zyY/j19c++ github.com/puzpuzpuz/xsync/v3 v3.5.1/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/sqids/sqids-go v0.4.1 h1:eQKYzmAZbLlRwHeHYPF35QhgxwZHLnlmVj9AkIj/rrw= +github.com/sqids/sqids-go v0.4.1/go.mod h1:EMwHuPQgSNFS0A49jESTfIQS+066XQTVhukrzEPScl8= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc h1:9lRDQMhESg+zvGYmW5DyG0UqvY96Bu5QYsTLvCHdrgo= diff --git a/apps/backend/internal/database/migrations/initial.sql b/apps/backend/internal/database/migrations/initial.sql index 27dc103..12f769c 100644 --- a/apps/backend/internal/database/migrations/initial.sql +++ b/apps/backend/internal/database/migrations/initial.sql @@ -50,52 +50,50 @@ CREATE INDEX idx_refresh_tokens_user_id ON refresh_tokens(user_id); CREATE INDEX idx_refresh_tokens_token_hash ON refresh_tokens(token_hash); CREATE INDEX idx_refresh_tokens_expires_at ON refresh_tokens(expires_at); -CREATE TABLE IF NOT EXISTS directories ( +-- Virtual filesystem nodes (unified files + directories) +CREATE TABLE IF NOT EXISTS vfs_nodes ( id UUID PRIMARY KEY DEFAULT uuid_generate_v7(), + public_id TEXT NOT NULL UNIQUE, -- opaque ID for external API (no timestamp leak) user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + parent_id UUID REFERENCES vfs_nodes(id) ON DELETE CASCADE, -- NULL = root directory + kind TEXT NOT NULL CHECK (kind IN ('file', 'directory')), + status TEXT NOT NULL DEFAULT 'ready' CHECK (status IN ('pending', 'ready')), name TEXT NOT NULL, - parent_id UUID REFERENCES directories(id) ON DELETE CASCADE, + + -- File-specific fields (NULL for directories) + blob_key TEXT, -- reference to blob storage (flat mode), NULL for hierarchical + size BIGINT, -- file size in bytes + mime_type TEXT, -- content type + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - deleted_at TIMESTAMPTZ, - CONSTRAINT unique_directory_path UNIQUE NULLS NOT DISTINCT (user_id, parent_id, name, deleted_at) + deleted_at TIMESTAMPTZ, -- soft delete for trash + + -- No duplicate names in same parent (per user, excluding deleted) + CONSTRAINT unique_node_name UNIQUE NULLS NOT DISTINCT (user_id, parent_id, name, deleted_at) ); -CREATE INDEX idx_directories_user_id ON directories(user_id, deleted_at); -CREATE INDEX idx_directories_path ON directories(user_id, path, deleted_at); +CREATE INDEX idx_vfs_nodes_user_id ON vfs_nodes(user_id) WHERE deleted_at IS NULL; +CREATE INDEX idx_vfs_nodes_parent_id ON vfs_nodes(parent_id) WHERE deleted_at IS NULL; +CREATE INDEX idx_vfs_nodes_user_parent ON vfs_nodes(user_id, parent_id) WHERE deleted_at IS NULL; +CREATE INDEX idx_vfs_nodes_kind ON vfs_nodes(user_id, kind) WHERE deleted_at IS NULL; +CREATE INDEX idx_vfs_nodes_deleted ON vfs_nodes(user_id, deleted_at) WHERE deleted_at IS NOT NULL; +CREATE INDEX idx_vfs_nodes_public_id ON vfs_nodes(public_id); +CREATE UNIQUE INDEX idx_vfs_nodes_user_root ON vfs_nodes(user_id) WHERE parent_id IS NULL; -- one root per user +CREATE INDEX idx_vfs_nodes_pending ON vfs_nodes(created_at) WHERE status = 'pending'; -- for cleanup job -CREATE TABLE IF NOT EXISTS files ( +CREATE TABLE IF NOT EXISTS node_shares ( id UUID PRIMARY KEY DEFAULT uuid_generate_v7(), - user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, - directory_id UUID REFERENCES directories(id) ON DELETE CASCADE, - name TEXT NOT NULL, - size BIGINT NOT NULL, - mime_type TEXT, - created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - deleted_at TIMESTAMPTZ, - last_accessed_at TIMESTAMPTZ, - CONSTRAINT unique_file_in_directory UNIQUE NULLS NOT DISTINCT (user_id, directory_id, name, deleted_at) -); - -CREATE INDEX idx_files_user_id ON files(user_id, deleted_at); -CREATE INDEX idx_files_directory_id ON files(directory_id) WHERE directory_id IS NOT NULL; -CREATE INDEX idx_files_path ON files(user_id, path, deleted_at); -CREATE INDEX idx_files_deleted_at ON files(deleted_at) WHERE deleted_at IS NOT NULL; -CREATE INDEX idx_files_last_accessed_at ON files(user_id, deleted_at, last_accessed_at); - -CREATE TABLE IF NOT EXISTS file_shares ( - id UUID PRIMARY KEY DEFAULT uuid_generate_v7(), - file_id UUID NOT NULL REFERENCES files(id) ON DELETE CASCADE, + node_id UUID NOT NULL REFERENCES vfs_nodes(id) ON DELETE CASCADE, share_token TEXT NOT NULL UNIQUE, expires_at TIMESTAMPTZ, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); -CREATE INDEX idx_file_shares_share_token ON file_shares(share_token); -CREATE INDEX idx_file_shares_file_id ON file_shares(file_id); -CREATE INDEX idx_file_shares_expires_at ON file_shares(expires_at) WHERE expires_at IS NOT NULL; +CREATE INDEX idx_node_shares_share_token ON node_shares(share_token); +CREATE INDEX idx_node_shares_node_id ON node_shares(node_id); +CREATE INDEX idx_node_shares_expires_at ON node_shares(expires_at) WHERE expires_at IS NOT NULL; -- ============================================================================ -- Triggers for updated_at timestamps @@ -112,11 +110,8 @@ $$ LANGUAGE plpgsql; CREATE TRIGGER update_users_updated_at BEFORE UPDATE ON users FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); -CREATE TRIGGER update_directories_updated_at BEFORE UPDATE ON directories +CREATE TRIGGER update_vfs_nodes_updated_at BEFORE UPDATE ON vfs_nodes FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); -CREATE TRIGGER update_files_updated_at BEFORE UPDATE ON files - FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); - -CREATE TRIGGER update_file_shares_updated_at BEFORE UPDATE ON file_shares +CREATE TRIGGER update_node_shares_updated_at BEFORE UPDATE ON node_shares FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); \ No newline at end of file diff --git a/apps/backend/internal/virtualfs/counting_reader.go b/apps/backend/internal/virtualfs/counting_reader.go new file mode 100644 index 0000000..5e92f8c --- /dev/null +++ b/apps/backend/internal/virtualfs/counting_reader.go @@ -0,0 +1,22 @@ +package virtualfs + +import "io" + +type CountingReader struct { + reader io.Reader + count int64 +} + +func NewCountingReader(reader io.Reader) *CountingReader { + return &CountingReader{reader: reader} +} + +func (r *CountingReader) Read(p []byte) (n int, err error) { + n, err = r.reader.Read(p) + r.count += int64(n) + return n, err +} + +func (r *CountingReader) Count() int64 { + return r.count +} diff --git a/apps/backend/internal/virtualfs/err.go b/apps/backend/internal/virtualfs/err.go new file mode 100644 index 0000000..ea5517f --- /dev/null +++ b/apps/backend/internal/virtualfs/err.go @@ -0,0 +1,8 @@ +package virtualfs + +import "errors" + +var ( + ErrNodeNotFound = errors.New("node not found") + ErrNodeConflict = errors.New("node conflict") +) diff --git a/apps/backend/internal/virtualfs/key_resolver.go b/apps/backend/internal/virtualfs/key_resolver.go new file mode 100644 index 0000000..c562352 --- /dev/null +++ b/apps/backend/internal/virtualfs/key_resolver.go @@ -0,0 +1,38 @@ +package virtualfs + +import ( + "context" + + "github.com/get-drexa/drexa/internal/blob" + "github.com/google/uuid" + "github.com/uptrace/bun" +) + +type BlobKeyResolver interface { + Resolve(ctx context.Context, node *Node) (blob.Key, error) +} + +type FlatKeyResolver struct{} + +func (r *FlatKeyResolver) Resolve(ctx context.Context, node *Node) (blob.Key, error) { + if node.BlobKey == "" { + id, err := uuid.NewV7() + if err != nil { + return "", err + } + return blob.Key(id.String()), nil + } + return node.BlobKey, nil +} + +type HierarchicalKeyResolver struct { + db *bun.DB +} + +func (r *HierarchicalKeyResolver) Resolve(ctx context.Context, node *Node) (blob.Key, error) { + path, err := buildNodeAbsolutePath(ctx, r.db, node.ID) + if err != nil { + return "", err + } + return blob.Key(path), nil +} diff --git a/apps/backend/internal/virtualfs/node.go b/apps/backend/internal/virtualfs/node.go new file mode 100644 index 0000000..3aab117 --- /dev/null +++ b/apps/backend/internal/virtualfs/node.go @@ -0,0 +1,49 @@ +package virtualfs + +import ( + "time" + + "github.com/get-drexa/drexa/internal/blob" + "github.com/google/uuid" + "github.com/uptrace/bun" +) + +type NodeKind string + +const ( + NodeKindFile NodeKind = "file" + NodeKindDirectory NodeKind = "directory" +) + +type NodeStatus string + +const ( + NodeStatusPending NodeStatus = "pending" + NodeStatusReady NodeStatus = "ready" +) + +type Node struct { + bun.BaseModel `bun:"vfs_nodes"` + + ID uuid.UUID `bun:",pk,type:uuid"` + PublicID string `bun:"public_id,notnull"` + UserID uuid.UUID `bun:"user_id,notnull"` + ParentID uuid.UUID `bun:"parent_id,notnull"` + Kind NodeKind `bun:"kind,notnull"` + Status NodeStatus `bun:"status,notnull"` + Name string `bun:"name,notnull"` + + BlobKey blob.Key `bun:"blob_key"` + Size int64 `bun:"size"` + MimeType string `bun:"mime_type"` + + CreatedAt time.Time `bun:"created_at,notnull"` + UpdatedAt time.Time `bun:"updated_at,notnull"` + DeletedAt time.Time `bun:"deleted_at"` +} + +// IsAccessible returns true if the node can be accessed. +// If the node is not ready or if it is soft deleted, it cannot be accessed. +func (n *Node) IsAccessible() bool { + return n.DeletedAt.IsZero() && n.Status == NodeStatusReady +} diff --git a/apps/backend/internal/virtualfs/path.go b/apps/backend/internal/virtualfs/path.go new file mode 100644 index 0000000..2f9ebd2 --- /dev/null +++ b/apps/backend/internal/virtualfs/path.go @@ -0,0 +1,42 @@ +package virtualfs + +import ( + "context" + "database/sql" + "errors" + "strings" + + "github.com/google/uuid" + "github.com/uptrace/bun" +) + +const absolutePathQuery = `WITH RECURSIVE path AS ( + SELECT id, parent_id, name, 1 as depth + FROM vfs_nodes WHERE id = $1 AND deleted_at IS NULL + + UNION ALL + + SELECT n.id, n.parent_id, n.name, p.depth + 1 + FROM vfs_nodes n + JOIN path p ON n.id = p.parent_id + WHERE n.deleted_at IS NULL +) +SELECT name FROM path +WHERE EXISTS (SELECT 1 FROM path WHERE parent_id IS NULL) +ORDER BY depth DESC;` + +func JoinPath(parts ...string) string { + return strings.Join(parts, "/") +} + +func buildNodeAbsolutePath(ctx context.Context, db *bun.DB, nodeID uuid.UUID) (string, error) { + var path []string + err := db.NewRaw(absolutePathQuery, nodeID).Scan(ctx, &path) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return "", ErrNodeNotFound + } + return "", err + } + return JoinPath(path...), nil +} diff --git a/apps/backend/internal/virtualfs/vfs.go b/apps/backend/internal/virtualfs/vfs.go new file mode 100644 index 0000000..b8eb9f0 --- /dev/null +++ b/apps/backend/internal/virtualfs/vfs.go @@ -0,0 +1,270 @@ +package virtualfs + +import ( + "bytes" + "context" + "crypto/rand" + "database/sql" + "encoding/binary" + "errors" + "io" + + "github.com/gabriel-vasile/mimetype" + "github.com/get-drexa/drexa/internal/blob" + "github.com/get-drexa/drexa/internal/database" + "github.com/google/uuid" + "github.com/sqids/sqids-go" + "github.com/uptrace/bun" +) + +type VirtualFS struct { + db *bun.DB + blobStore blob.Store + keyResolver BlobKeyResolver + + sqid *sqids.Sqids +} + +type CreateNodeOptions struct { + ParentID uuid.UUID + Kind NodeKind + Name string +} + +type WriteFileOptions struct { + ParentID uuid.UUID + Name string +} + +func NewVirtualFS(db *bun.DB, blobStore blob.Store, keyResolver BlobKeyResolver) *VirtualFS { + return &VirtualFS{ + db: db, + blobStore: blobStore, + keyResolver: keyResolver, + } +} + +func (vfs *VirtualFS) FindNode(ctx context.Context, userID, fileID string) (*Node, error) { + var node Node + err := vfs.db.NewSelect().Model(&node). + Where("user_id = ?", userID). + Where("id = ?", fileID). + Where("status = ?", NodeStatusReady). + Where("deleted_at IS NULL"). + Scan(ctx) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrNodeNotFound + } + return nil, err + } + return &node, nil +} + +func (vfs *VirtualFS) FindNodeByPublicID(ctx context.Context, userID uuid.UUID, publicID string) (*Node, error) { + var node Node + err := vfs.db.NewSelect().Model(&node). + Where("user_id = ?", userID). + Where("public_id = ?", publicID). + Where("status = ?", NodeStatusReady). + Where("deleted_at IS NULL"). + Scan(ctx) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrNodeNotFound + } + return nil, err + } + return &node, nil +} + +func (vfs *VirtualFS) WriteFile(ctx context.Context, userID uuid.UUID, reader io.Reader, opts WriteFileOptions) (*Node, error) { + pid, err := vfs.generatePublicID() + if err != nil { + return nil, err + } + + node := Node{ + PublicID: pid, + UserID: userID, + ParentID: opts.ParentID, + Kind: NodeKindFile, + Status: NodeStatusPending, + Name: opts.Name, + } + + _, err = vfs.db.NewInsert().Model(&node).Exec(ctx) + if err != nil { + if database.IsUniqueViolation(err) { + return nil, ErrNodeConflict + } + return nil, err + } + + cleanup := func() { + _, _ = vfs.db.NewDelete().Model(&node).WherePK().Exec(ctx) + } + + key, err := vfs.keyResolver.Resolve(ctx, &node) + if err != nil { + cleanup() + return nil, err + } + + h := make([]byte, 3072) + n, err := io.ReadFull(reader, h) + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + cleanup() + return nil, err + } + h = h[:n] + + mt := mimetype.Detect(h) + cr := NewCountingReader(io.MultiReader(bytes.NewReader(h), reader)) + + err = vfs.blobStore.Put(ctx, key, cr) + if err != nil { + cleanup() + return nil, err + } + + node.BlobKey = key + node.Size = cr.Count() + node.MimeType = mt.String() + node.Status = NodeStatusReady + + _, err = vfs.db.NewUpdate().Model(&node). + Column("status", "blob_key", "size", "mime_type"). + WherePK(). + Exec(ctx) + if err != nil { + cleanup() + return nil, err + } + + return &node, nil +} + +func (vfs *VirtualFS) CreateDirectory(ctx context.Context, userID uuid.UUID, parentID uuid.UUID, name string) (*Node, error) { + pid, err := vfs.generatePublicID() + if err != nil { + return nil, err + } + + node := Node{ + PublicID: pid, + UserID: userID, + ParentID: parentID, + Kind: NodeKindDirectory, + Status: NodeStatusReady, + Name: name, + } + + _, err = vfs.db.NewInsert().Model(&node).Exec(ctx) + if err != nil { + if database.IsUniqueViolation(err) { + return nil, ErrNodeConflict + } + return nil, err + } + + return &node, nil +} + +func (vfs *VirtualFS) SoftDeleteNode(ctx context.Context, node *Node) error { + if !node.IsAccessible() { + return ErrNodeNotFound + } + + _, err := vfs.db.NewUpdate().Model(node). + WherePK(). + Where("deleted_at IS NULL"). + Where("status = ?", NodeStatusReady). + Set("deleted_at = NOW()"). + Returning("deleted_at"). + Exec(ctx) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return ErrNodeNotFound + } + return err + } + + return nil +} + +func (vfs *VirtualFS) RenameNode(ctx context.Context, node *Node, name string) error { + if !node.IsAccessible() { + return ErrNodeNotFound + } + + _, err := vfs.db.NewUpdate().Model(node). + WherePK(). + Where("status = ?", NodeStatusReady). + Where("deleted_at IS NULL"). + Set("name = ?", name). + Returning("name, updated_at"). + Exec(ctx) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return ErrNodeNotFound + } + return err + } + return nil +} + +func (vfs *VirtualFS) MoveNode(ctx context.Context, node *Node, parentID uuid.UUID) error { + if !node.IsAccessible() { + return ErrNodeNotFound + } + + oldKey, err := vfs.keyResolver.Resolve(ctx, node) + if err != nil { + return err + } + + _, err = vfs.db.NewUpdate().Model(node). + WherePK(). + Where("status = ?", NodeStatusReady). + Where("deleted_at IS NULL"). + Set("parent_id = ?", parentID). + Returning("parent_id, updated_at"). + Exec(ctx) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return ErrNodeNotFound + } + if database.IsUniqueViolation(err) { + return ErrNodeConflict + } + return err + } + + newKey, err := vfs.keyResolver.Resolve(ctx, node) + if err != nil { + return err + } + + if node + + if oldKey != newKey { + err = vfs.blobStore.Move(ctx, oldKey, newKey) + if err != nil { + return err + } + node.BlobKey = newKey + } + + return nil +} + +func (vfs *VirtualFS) generatePublicID() (string, error) { + var b [8]byte + _, err := rand.Read(b[:]) + if err != nil { + return "", err + } + n := binary.BigEndian.Uint64(b[:]) + return vfs.sqid.Encode([]uint64{n}) +}