aboutsummaryrefslogtreecommitdiffstats
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rw-r--r--pkg/data/update.go2
-rw-r--r--pkg/index/filters.go4
-rw-r--r--pkg/index/index.go9
-rw-r--r--pkg/query/errors.go2
-rw-r--r--pkg/query/parser.go5
-rw-r--r--pkg/server/server.go8
-rw-r--r--pkg/server/unix.go89
7 files changed, 111 insertions, 8 deletions
diff --git a/pkg/data/update.go b/pkg/data/update.go
index a8a4b3d..1a50563 100644
--- a/pkg/data/update.go
+++ b/pkg/data/update.go
@@ -201,6 +201,7 @@ func (u *UpdateMany) documents() (bool, error) {
SELECT path FROM temp.updateDocs
)`)
if err != nil {
+ slog.Debug("Failed to remove missing files from index")
return false, err
}
@@ -215,6 +216,7 @@ func (u *UpdateMany) documents() (bool, error) {
WHERE excluded.fileTime > Documents.fileTime
`)
if err != nil {
+ slog.Debug("Failed document upsert")
return false, err
}
diff --git a/pkg/index/filters.go b/pkg/index/filters.go
index 920d5df..bd9df54 100644
--- a/pkg/index/filters.go
+++ b/pkg/index/filters.go
@@ -50,10 +50,10 @@ func ParseFilter(s string) (DocFilter, error) {
}
return NewMaxFilesizeFilter(size), nil
case "ExcludeName", "ExcludeFilename":
- // FIXME: support escaped commas
+ // TODO: support escaped commas
return NewExcludeFilenameFilter(strings.Split(param, ",")), nil
case "IncludeName", "IncludeFilename":
- // FIXME: support escaped commas
+ // TODO: support escaped commas
return NewIncludeFilenameFilter(strings.Split(param, ",")), nil
case "ExcludeParent":
return NewExcludeParentFilter(param), nil
diff --git a/pkg/index/index.go b/pkg/index/index.go
index cfa4138..50f642e 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -12,6 +12,7 @@ import (
"slices"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/goccy/go-yaml"
@@ -383,21 +384,23 @@ func ParseDoc(path string, opts ParseOpts) (*Document, error) {
return doc, nil
}
-func ParseDocs(paths []string, numWorkers uint, opts ParseOpts) map[string]*Document {
+func ParseDocs(paths []string, numWorkers uint, opts ParseOpts) (map[string]*Document, uint64) {
jobs := make(chan string, numWorkers)
results := make(chan *Document, numWorkers)
docs := make(map[string]*Document, len(paths))
wg := &sync.WaitGroup{}
+ errCnt := &atomic.Uint64{}
wg.Add(int(numWorkers))
for range numWorkers {
go func(jobs <-chan string, results chan<- *Document, wg *sync.WaitGroup) {
for path := range jobs {
doc, err := ParseDoc(path, opts)
if err != nil {
- slog.Error("Error occured while parsing file",
+ slog.Warn("Error occured while parsing file",
slog.String("path", path), slog.String("err", err.Error()),
)
+ errCnt.Add(1)
continue
}
@@ -423,7 +426,7 @@ func ParseDocs(paths []string, numWorkers uint, opts ParseOpts) map[string]*Docu
docs[doc.Path] = doc
}
- return docs
+ return docs, errCnt.Load()
}
func init() {
diff --git a/pkg/query/errors.go b/pkg/query/errors.go
index 35f8c19..889d40d 100644
--- a/pkg/query/errors.go
+++ b/pkg/query/errors.go
@@ -6,7 +6,7 @@ import (
)
var ErrQueryFormat = errors.New("Incorrect query format")
-var ErrDatetimeTokenParse = errors.New("Unrecognized format for datetime token")
+var ErrDatetimeTokenParse = errors.New("Unrecognized format for datetime")
// output errors
var ErrUnrecognizedOutputToken = errors.New("Unrecognized output token")
diff --git a/pkg/query/parser.go b/pkg/query/parser.go
index e22b4c3..53681f6 100644
--- a/pkg/query/parser.go
+++ b/pkg/query/parser.go
@@ -549,7 +549,10 @@ func Parse(tokens []Token) (*Clause, error) {
var t time.Time
var err error
if t, err = util.ParseDateTime(token.Value); err != nil {
- return nil, ErrDatetimeTokenParse
+ return nil, fmt.Errorf("Cannot parse time `%s`, %v",
+ token.Value,
+ ErrDatetimeTokenParse,
+ )
}
clause.Statements[len(clause.Statements)-1].Value = DatetimeValue{t}
diff --git a/pkg/server/server.go b/pkg/server/server.go
index c08d5a4..a7a5395 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -2,6 +2,7 @@ package server
import (
"bytes"
+ "context"
"io"
"log/slog"
"net/http"
@@ -14,6 +15,11 @@ import (
"github.com/jpappel/atlas/pkg/query"
)
+type Server interface {
+ ListenAndServe() error
+ Shutdown(context.Context) error
+}
+
func info(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`
<h1>Atlas Server</h1>
@@ -22,7 +28,7 @@ func info(w http.ResponseWriter, r *http.Request) {
`))
}
-func New(db *data.Query) *http.ServeMux {
+func NewMux(db *data.Query) *http.ServeMux {
mux := http.NewServeMux()
outputBufPool := &sync.Pool{}
diff --git a/pkg/server/unix.go b/pkg/server/unix.go
new file mode 100644
index 0000000..87b425e
--- /dev/null
+++ b/pkg/server/unix.go
@@ -0,0 +1,89 @@
+package server
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "log/slog"
+ "net"
+ "os"
+ "sync"
+
+ "github.com/jpappel/atlas/pkg/data"
+)
+
+// datagram based unix server
+type UnixServer struct {
+ Addr string
+ Db *data.Query
+ shouldClose chan struct{}
+ bufPool *sync.Pool
+}
+
+func (s *UnixServer) ListenAndServe() error {
+ slog.Info("Listening on", slog.String("addr", s.Addr))
+ conn, err := net.ListenUnixgram(
+ "unixgram",
+ &net.UnixAddr{Name: s.Addr, Net: "Unix"},
+ )
+ if err != nil {
+ return err
+ }
+ defer conn.Close()
+ defer os.RemoveAll(s.Addr)
+ slog.Debug("Accepted connection")
+
+ // slog.Info("New Connection",
+ // slog.String("addr", conn.RemoteAddr().String()),
+ // )
+
+ s.bufPool = &sync.Pool{}
+ s.bufPool.New = func() any {
+ return &bytes.Buffer{}
+ }
+ s.handleConn(conn)
+
+ return nil
+}
+
+func (s UnixServer) handleConn(conn *net.UnixConn) {
+ // buf, ok := s.bufPool.Get().(*bytes.Buffer)
+ // if !ok {
+ // panic("Expected *bytes.Buffer in pool")
+ // }
+ buf := make([]byte, 1024)
+
+ n, err := conn.Read(buf)
+ if err != nil {
+ panic(err)
+ }
+ buf = buf[:n]
+
+ fmt.Println(string(buf))
+
+ // if _, err := io.Copy(buf, conn); err != nil {
+ // panic(err)
+ // }
+ // defer buf.Reset()
+ //
+ // io.Copy(os.Stdout, buf)
+
+ // artifact, err := query.Compile(buf.String(), 0, 1)
+ // if err != nil {
+ // panic(err)
+ // }
+ //
+ // docs, err := s.Db.Execute(artifact)
+ // if err != nil {
+ // panic(err)
+ // }
+ //
+ // _, err = query.DefaultOutput{}.OutputTo(conn, slices.Collect(maps.Values(docs)))
+ // if err != nil {
+ // panic(err)
+ // }
+}
+
+func (s *UnixServer) Shutdown(ctx context.Context) error {
+ return nil
+}