diff --git a/go.mod b/go.mod index 7c1de41..c21a96c 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/oschwald/geoip2-golang v1.8.0 github.com/pquerna/otp v1.4.0 github.com/roberthodgen/spa-server v0.0.0-20171007154335-bb87b4ff3253 + github.com/shirou/gopsutil v3.21.11+incompatible github.com/shirou/gopsutil/v3 v3.23.5 go.deanishe.net/favicon v0.1.0 go.mongodb.org/mongo-driver v1.11.3 diff --git a/go.sum b/go.sum index 39b642d..2d7fdf4 100644 --- a/go.sum +++ b/go.sum @@ -1259,6 +1259,8 @@ github.com/segmentio/objconv v1.0.1/go.mod h1:auayaH5k3137Cl4SoXTgrzQcuQDmvuVtZg github.com/serenize/snaker v0.0.0-20171204205717-a683aaf2d516/go.mod h1:Yow6lPLSAXx2ifx470yD/nUe22Dv5vBvxK/UK9UUTVs= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= +github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= +github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil/v3 v3.23.5 h1:5SgDCeQ0KW0S4N0znjeM/eFHXXOKyv2dVNgRq/c9P6Y= github.com/shirou/gopsutil/v3 v3.23.5/go.mod h1:Ng3Maa27Q2KARVJ0SPZF5NdrQSC3XHKP8IIWrHgMeLY= github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= diff --git a/package.json b/package.json index 2a4855e..fe3479d 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "cosmos-server", - "version": "0.11.3", + "version": "0.12.0-unstable", "description": "", "main": "test-server.js", "bugs": { diff --git a/src/docker/docker.go b/src/docker/docker.go index 652e1f6..d7e3360 100644 --- a/src/docker/docker.go +++ b/src/docker/docker.go @@ -662,4 +662,63 @@ func DockerPullImage(image string) (io.ReadCloser, error) { out, errPull := DockerClient.ImagePull(DockerContext, image, options) return out, errPull +} + +type ContainerStats struct { + Name string + CPUUsage float64 + MemUsage uint64 + MemLimit uint64 + NetworkRx uint64 + NetworkTx uint64 +} + +func StatsAll() ([]ContainerStats, error) { + containers, err := ListContainers() + if err != nil { + utils.Error("StatsAll", err) + return nil, err + } + + var containerStatsList []ContainerStats + + for _, container := range containers { + statsBody, err := DockerClient.ContainerStatsOneShot(DockerContext, container.ID) + if err != nil { + return nil, fmt.Errorf("error fetching stats for container %s: %s", container.ID, err) + } + + defer statsBody.Body.Close() + + stats := types.StatsJSON{} + if err := json.NewDecoder(statsBody.Body).Decode(&stats); err != nil { + return nil, fmt.Errorf("error decoding stats for container %s: %s", container.ID, err) + } + + cpuDelta := float64(stats.CPUStats.CPUUsage.TotalUsage - stats.PreCPUStats.CPUUsage.TotalUsage) + systemDelta := float64(stats.CPUStats.SystemUsage - stats.PreCPUStats.SystemUsage) + + cpuUsage := 0.0 + + if systemDelta > 0 && cpuDelta > 0 { + cpuUsage = (cpuDelta / systemDelta) * float64(len(stats.CPUStats.CPUUsage.PercpuUsage)) * 100 + } else { + utils.Error("StatsAll - Error calculating CPU usage", nil) + } + + // memUsage := float64(stats.MemoryStats.Usage) / float64(stats.MemoryStats.Limit) * 100 + netRx := stats.Networks["eth0"].RxBytes + netTx := stats.Networks["eth0"].TxBytes + + containerStats := ContainerStats{ + Name: strings.TrimPrefix(container.Names[0], "/"), + CPUUsage: cpuUsage, + MemUsage: stats.MemoryStats.Usage, + MemLimit: stats.MemoryStats.Limit, + NetworkRx: netRx, + NetworkTx: netTx, + } + containerStatsList = append(containerStatsList, containerStats) + } + return containerStatsList, nil } \ No newline at end of file diff --git a/src/icons.go b/src/icons.go index dd0a16d..953e290 100644 --- a/src/icons.go +++ b/src/icons.go @@ -172,7 +172,7 @@ func GetFavicon(w http.ResponseWriter, req *http.Request) { // Fetch the favicon resp, err := httpGetWithTimeout(iconURL) if err != nil { - utils.Debug("FaviconFetch" + err.Error()) + utils.Debug("FaviconFetch - " + err.Error()) continue } diff --git a/src/index.go b/src/index.go index 8dd7b69..0b35cdd 100644 --- a/src/index.go +++ b/src/index.go @@ -10,6 +10,7 @@ import ( "github.com/azukaar/cosmos-server/src/authorizationserver" "github.com/azukaar/cosmos-server/src/market" "github.com/azukaar/cosmos-server/src/constellation" + "github.com/azukaar/cosmos-server/src/metrics" ) func main() { @@ -41,6 +42,8 @@ func main() { utils.Log("Docker API version: " + version.APIVersion) } + metrics.Init() + market.Init() authorizationserver.Init() diff --git a/src/metrics/aggl.go b/src/metrics/aggl.go new file mode 100644 index 0000000..926d922 --- /dev/null +++ b/src/metrics/aggl.go @@ -0,0 +1,181 @@ +package metrics + +import ( + "time" + + "github.com/jasonlvhit/gocron" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo/options" + + "github.com/azukaar/cosmos-server/src/utils" +) + +type DataDefDBEntry struct { + Date time.Time + Value int + Processed bool + + // For agglomeration + AvgIndex int + AggloTo time.Time + AggloExpire time.Time +} + +type DataDefDB struct { + Values []DataDefDBEntry + ValuesAggl map[string]DataDefDBEntry + LastUpdate time.Time + Max uint64 + Label string + Key string + AggloType string +} + +func AggloMetrics() { + lock <- true + defer func() { <-lock }() + + utils.Log("Metrics: Agglomeration started") + + utils.Debug("Time: " + time.Now().String()) + + c, errCo := utils.GetCollection(utils.GetRootAppId(), "metrics") + if errCo != nil { + utils.Error("Metrics - Database Connect", errCo) + return + } + + // get all metrics from database + var metrics []DataDefDB + cursor, err := c.Find(nil, map[string]interface{}{}) + if err != nil { + utils.Error("Metrics: Error fetching metrics", err) + return + } + defer cursor.Close(nil) + + if err = cursor.All(nil, &metrics); err != nil { + utils.Error("Metrics: Error decoding metrics", err) + return + } + + // populate aggregation pools + hourlyPool := ModuloTime(time.Now(), time.Hour) + hourlyPoolTo := ModuloTime(time.Now().Add(1 * time.Hour), time.Hour) + dailyPool := ModuloTime(time.Now(), 24 * time.Hour) + dailyPoolTo := ModuloTime(time.Now().Add(24 * time.Hour), 24 * time.Hour) + + for metInd, metric := range metrics { + values := metric.Values + + // init map + if metric.ValuesAggl == nil { + metric.ValuesAggl = map[string]DataDefDBEntry{} + } + + // if hourly pool does not exist, create it + if _, ok := metric.ValuesAggl["hour_" + hourlyPool.String()]; !ok { + metric.ValuesAggl["hour_" + hourlyPool.String()] = DataDefDBEntry{ + Date: hourlyPool, + Value: 0, + Processed: false, + AvgIndex: 0, + AggloTo: hourlyPoolTo, + AggloExpire: hourlyPoolTo.Add(48 * time.Hour), + } + } + + // if daily pool does not exist, create it + if _, ok := metric.ValuesAggl["day_" + dailyPool.String()]; !ok { + metric.ValuesAggl["day_" + dailyPool.String()] = DataDefDBEntry{ + Date: dailyPool, + Value: 0, + Processed: false, + AvgIndex: 0, + AggloTo: dailyPoolTo, + AggloExpire: dailyPoolTo.Add(30 * 24 * time.Hour), + } + } + + for valInd, value := range values { + // if not processed + if !value.Processed { + valueHourlyPool := ModuloTime(value.Date, time.Hour) + valueDailyPool := ModuloTime(value.Date, 24 * time.Hour) + + if _, ok := metric.ValuesAggl["hour_" + valueHourlyPool.String()]; ok { + currentPool := metric.ValuesAggl["hour_" + valueHourlyPool.String()] + + currentPool.Value = MergeMetric(metric.AggloType, currentPool.Value, value.Value, currentPool.AvgIndex) + if metric.AggloType == "avg" { + currentPool.AvgIndex++ + } + + metric.ValuesAggl["hour_" + valueHourlyPool.String()] = currentPool + } else { + utils.Warn("Metrics: Agglomeration - Pool not found : " + "hour_" + valueHourlyPool.String()) + } + + if _, ok := metric.ValuesAggl["day_" + valueDailyPool.String()]; ok { + currentPool := metric.ValuesAggl["day_" + valueDailyPool.String()] + + currentPool.Value = MergeMetric(metric.AggloType, currentPool.Value, value.Value, currentPool.AvgIndex) + if metric.AggloType == "avg" { + currentPool.AvgIndex++ + } + + metric.ValuesAggl["day_" + valueDailyPool.String()] = currentPool + } else { + utils.Warn("Metrics: Agglomeration - Pool not found: " + "day_" + valueDailyPool.String()) + } + + values[valInd].Processed = true + } + } + + // delete values over 1h + finalValues := []DataDefDBEntry{} + for _, value := range values { + if value.Date.After(time.Now().Add(-1 * time.Hour)) { + finalValues = append(finalValues, value) + } + } + + metric.Values = finalValues + + // clean up old agglo values + for aggloKey, aggloValue := range metric.ValuesAggl { + if aggloValue.AggloExpire.Before(time.Now()) { + delete(metric.ValuesAggl, aggloKey) + } + } + + metrics[metInd] = metric + } + + utils.Log("Metrics: Agglomeration done. Saving to DB") + + // save metrics + for _, metric := range metrics { + options := options.Update().SetUpsert(true) + + _, err := c.UpdateOne(nil, bson.M{"Key": metric.Key}, bson.M{"$set": bson.M{"Values": metric.Values, "ValuesAggl": metric.ValuesAggl}}, options) + + if err != nil { + utils.Error("Metrics: Error saving metrics", err) + return + } + } +} + +func InitAggl() { + go func() { + s := gocron.NewScheduler() + s.Every(1).Hour().At("00.00").From(gocron.NextTick()).Do(func() { + // s.Every(3).Minute().From(gocron.NextTick()).Do(AggloMetrics) + + s.Start() + + utils.Log("Metrics: Agglomeration Initialized") + }() +} \ No newline at end of file diff --git a/src/metrics/index.go b/src/metrics/index.go new file mode 100644 index 0000000..3f12cce --- /dev/null +++ b/src/metrics/index.go @@ -0,0 +1,197 @@ +package metrics + +import ( + "time" + "strconv" + "os" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo/options" + + "github.com/azukaar/cosmos-server/src/utils" +) + +type DataDef struct { + Max uint64 + Period time.Duration + Label string + AggloType string + SetOperation string +} + +type DataPush struct { + Date time.Time + Key string + Value int + Max uint64 + Expire time.Time + Label string + AvgIndex int + AggloType string +} + +var dataBuffer = map[string]DataPush{} + +var lock = make(chan bool, 1) + +func MergeMetric(SetOperation string, currentValue int, newValue int, avgIndex int) int { + if SetOperation == "" { + return newValue + } else if SetOperation == "max" { + if newValue > currentValue { + return newValue + } else { + return currentValue + } + } else if SetOperation == "min" { + if newValue < currentValue { + return newValue + } else { + return currentValue + } + } else if SetOperation == "avg" { + if avgIndex == 0 { + avgIndex = 1 + return newValue + } else { + return (currentValue * (avgIndex) + newValue) / (avgIndex + 1) + } + } else { + return newValue + } +} + +func SaveMetrics() { + utils.Debug("Metrics - Saving data") + utils.Debug("Time: " + time.Now().String()) + nbData := 0 + + c, errCo := utils.GetCollection(utils.GetRootAppId(), "metrics") + if errCo != nil { + utils.Error("Metrics - Database Connect", errCo) + return + } + + lock <- true + defer func() { <-lock }() + + for dpkey, dp := range dataBuffer { + // utils.Debug("Metrics - Saving " + dp.Key + " " + strconv.Itoa(dp.Value) + " " + dp.Date.String() + " " + dp.Expire.String()) + + if dp.Expire.Before(time.Now()) { + delete(dataBuffer, dpkey) + nbData++ + + filter := bson.M{"Key": dp.Key} + update := bson.M{ + "$push": bson.M{"Values": + bson.M{ + "Date": dp.Date, + "Value": dp.Value, + }, + }, + "$set": bson.M{ + "LastUpdate": dp.Date, + "Max": dp.Max, + "Label": dp.Label, + "AggloType": dp.AggloType, + }, + } + + // This ensures that if the document doesn't exist, it'll be created + options := options.Update().SetUpsert(true) + + _, err := c.UpdateOne(nil, filter, update, options) + + if err != nil { + utils.Error("Metrics - Database Insert", err) + return + } + } + } + + utils.Debug("Data - Saved " + strconv.Itoa(nbData) + " entries") +} + +func ModuloTime(start time.Time, modulo time.Duration) time.Time { + elapsed := start.UnixNano() // This gives us the total nanoseconds since 1970 + durationNano := modulo.Nanoseconds() + + // Here we take modulo of elapsed time with the duration to get the remainder. + // Then, we subtract the remainder from the elapsed time to get the start of the period. + roundedElapsed := elapsed - elapsed%durationNano + + // Convert back to time.Time + return time.Unix(0, roundedElapsed) +} + +func PushSetMetric(key string, value int, def DataDef) { + go func() { + key = "cosmos." + key + date := ModuloTime(time.Now(), def.Period) + cacheKey := key + date.String() + + lock <- true + defer func() { <-lock }() + + if dp, ok := dataBuffer[cacheKey]; ok { + dp.Max = def.Max + dp.Value = MergeMetric(def.SetOperation, dp.Value, value, dp.AvgIndex) + if def.SetOperation == "avg" { + dp.AvgIndex++ + } + + dataBuffer[cacheKey] = dp + } else { + dataBuffer[cacheKey] = DataPush{ + Date: date, + Expire: ModuloTime(time.Now().Add(def.Period), def.Period), + Key: key, + Value: value, + Max: def.Max, + Label: def.Label, + AggloType: def.AggloType, + } + } + }() +} + +func Run() { + utils.Debug("Metrics - Run") + + // redirect docker monitoring + if os.Getenv("HOSTNAME") != "" { + // check if path /mnt/host exist + if _, err := os.Stat("/mnt/host"); os.IsNotExist(err) { + utils.Error("Metrics - Cannot start monitoring the server if you don't mount /mnt/host to /. Check the documentation for more information.", nil) + return + } else { + os.Setenv("HOST_PROC", "/mnt/host/proc") + os.Setenv("HOST_SYS", "/mnt/host/sys") + os.Setenv("HOST_ETC", "/mnt/host/etc") + os.Setenv("HOST_VAR", "/mnt/host/var") + os.Setenv("HOST_RUN", "/mnt/host/run") + os.Setenv("HOST_DEV", "/mnt/host/dev") + os.Setenv("HOST_ROOT ", "/mnt/host/") + } + } + + nextTime := ModuloTime(time.Now().Add(time.Second*30), time.Second*30) + nextTime = nextTime.Add(time.Second * 2) + + utils.Debug("Metrics - Next run at " + nextTime.String()) + time.AfterFunc(nextTime.Sub(time.Now()), func() { + go func() { + GetSystemMetrics() + SaveMetrics() + }() + + Run() + }) +} + +func Init() { + InitAggl() + GetSystemMetrics() + Run() +} \ No newline at end of file diff --git a/src/metrics/system.go b/src/metrics/system.go new file mode 100644 index 0000000..b450569 --- /dev/null +++ b/src/metrics/system.go @@ -0,0 +1,137 @@ +package metrics + +import ( + "strconv" + "time" + + "github.com/shirou/gopsutil/cpu" + "github.com/shirou/gopsutil/mem" + "github.com/shirou/gopsutil/net" + "github.com/shirou/gopsutil/disk" + + "github.com/azukaar/cosmos-server/src/utils" + "github.com/azukaar/cosmos-server/src/docker" +) + +func GetSystemMetrics() { + utils.Debug("Metrics - System") + + // Get CPU Usage + cpuPercent, err := cpu.Percent(0, false) + if err != nil { + utils.Error("Metrics - Error fetching CPU usage:", err) + return + } + if len(cpuPercent) > 0 { + for i, v := range cpuPercent { + PushSetMetric("system.cpu." + strconv.Itoa(i), int(v), DataDef{ + Max: 100, + Period: time.Second * 30, + Label: "CPU " + strconv.Itoa(i), + AggloType: "avg", + }) + } + } + + // You can get detailed per-CPU stats with: + // cpuPercents, _ := cpu.Percent(0, true) + + // Get RAM Usage + memInfo, err := mem.VirtualMemory() + if err != nil { + utils.Error("Metrics - Error fetching RAM usage:", err) + return + } + PushSetMetric("system.ram", int(memInfo.Used), DataDef{ + Max: memInfo.Total, + Period: time.Second * 30, + Label: "RAM", + AggloType: "avg", + }) + + // Get Network Usage + netIO, err := net.IOCounters(false) + + PushSetMetric("system.netRx", int(netIO[0].BytesRecv), DataDef{ + Max: 0, + Period: time.Second * 30, + Label: "Network Received", + AggloType: "avg", + }) + + PushSetMetric("system.netTx", int(netIO[0].BytesSent), DataDef{ + Max: 0, + Period: time.Second * 30, + Label: "Network Sent", + AggloType: "avg", + }) + + PushSetMetric("system.netErr", int(netIO[0].Errin + netIO[0].Errout), DataDef{ + Max: 0, + Period: time.Second * 30, + Label: "Network Errors", + AggloType: "avg", + }) + + PushSetMetric("system.netDrop", int(netIO[0].Dropin + netIO[0].Dropout), DataDef{ + Max: 0, + Period: time.Second * 30, + Label: "Network Drops", + AggloType: "avg", + }) + + // docker stats + dockerStats, err := docker.StatsAll() + if err != nil { + utils.Error("Metrics - Error fetching Docker stats:", err) + return + } + + for _, ds := range dockerStats { + PushSetMetric("system.docker.cpu." + ds.Name, int(ds.CPUUsage), DataDef{ + Max: 100, + Period: time.Second * 30, + Label: "Docker CPU " + ds.Name, + AggloType: "avg", + }) + PushSetMetric("system.docker.ram." + ds.Name, int(ds.MemUsage), DataDef{ + Max: 100, + Period: time.Second * 30, + Label: "Docker RAM " + ds.Name, + AggloType: "avg", + }) + PushSetMetric("system.docker.netRx." + ds.Name, int(ds.NetworkRx), DataDef{ + Max: 0, + Period: time.Second * 30, + Label: "Docker Network Received " + ds.Name, + AggloType: "avg", + }) + PushSetMetric("system.docker.netTx." + ds.Name, int(ds.NetworkTx), DataDef{ + Max: 0, + Period: time.Second * 30, + Label: "Docker Network Sent " + ds.Name, + AggloType: "avg", + }) + } + + // Get Disk Usage + parts, err := disk.Partitions(true) + if err != nil { + utils.Error("Metrics - Error fetching Disk usage:", err) + return + } + + for _, part := range parts { + u, err := disk.Usage(part.Mountpoint) + if err != nil { + utils.Error("Metrics - Error fetching Disk usage:", err) + return + } + + PushSetMetric("system.disk." + part.Mountpoint, int(u.Used), DataDef{ + Max: u.Total, + Period: time.Second * 120, + Label: "Disk " + part.Mountpoint, + }) + } +} \ No newline at end of file