feat: log style improvements, streaming imporvements

This commit is contained in:
Derock 2024-05-18 22:43:29 -04:00
parent f83f834226
commit cbb87dcaf1
No known key found for this signature in database
13 changed files with 329 additions and 64 deletions

View file

@ -40,6 +40,7 @@
"@trpc/server": "11.0.0-next-alpha.150",
"@uiw/codemirror-extensions-langs": "^4.21.21",
"@uiw/react-codemirror": "^4.21.21",
"ansi-to-react": "^6.1.6",
"argon2": "^0.31.2",
"better-sqlite3": "^9.2.2",
"bufferutil": "^4.0.8",
@ -93,6 +94,7 @@
"@types/better-sqlite3": "^7.6.8",
"@types/common-tags": "^1.8.4",
"@types/cookie": "^0.6.0",
"@types/docker-modem": "^3.0.6",
"@types/dockerode": "^3.3.23",
"@types/eslint": "^8.56.1",
"@types/node": "^20.10.6",

View file

@ -71,6 +71,9 @@ dependencies:
'@uiw/react-codemirror':
specifier: ^4.21.21
version: 4.21.21(@babel/runtime@7.23.9)(@codemirror/autocomplete@6.12.0)(@codemirror/language@6.10.1)(@codemirror/lint@6.5.0)(@codemirror/search@6.5.6)(@codemirror/state@6.4.0)(@codemirror/theme-one-dark@6.1.2)(@codemirror/view@6.24.0)(codemirror@6.0.1)(react-dom@18.2.0)(react@18.2.0)
ansi-to-react:
specifier: ^6.1.6
version: 6.1.6(react-dom@18.2.0)(react@18.2.0)
argon2:
specifier: ^0.31.2
version: 0.31.2
@ -226,6 +229,9 @@ devDependencies:
'@types/cookie':
specifier: ^0.6.0
version: 0.6.0
'@types/docker-modem':
specifier: ^3.0.6
version: 3.0.6
'@types/dockerode':
specifier: ^3.3.23
version: 3.3.23
@ -3331,6 +3337,10 @@ packages:
uri-js: 4.4.1
dev: true
/anser@1.4.10:
resolution: {integrity: sha512-hCv9AqTQ8ycjpSd3upOJd7vFwW1JaoYQ7tpham03GJ1ca8/65rqn0RpaWpItOAd6ylW9wAw6luXYPJIyPFVOww==}
dev: false
/ansi-regex@5.0.1:
resolution: {integrity: sha512-quJQXlTSUGL2LH9SUXo8VwsY4soanhgo6LNSm84E1LBcE8s3O0wpdiRzyR9z/ZZJMlMWv37qOOb9pdJlMUEKFQ==}
engines: {node: '>=8'}
@ -3356,6 +3366,18 @@ packages:
resolution: {integrity: sha512-bN798gFfQX+viw3R7yrGWRqnrN2oRkEkUjjl4JNn4E8GxxbjtG3FbrEIIY3l8/hrwUwIeCZvi4QuOTP4MErVug==}
engines: {node: '>=12'}
/ansi-to-react@6.1.6(react-dom@18.2.0)(react@18.2.0):
resolution: {integrity: sha512-+HWn72GKydtupxX9TORBedqOMsJRiKTqaLUKW8txSBZw9iBpzPKLI8KOu4WzwD4R7hSv1zEspobY6LwlWvwZ6Q==}
peerDependencies:
react: ^16.3.2 || ^17.0.0
react-dom: ^16.3.2 || ^17.0.0
dependencies:
anser: 1.4.10
escape-carriage: 1.3.1
react: 18.2.0
react-dom: 18.2.0(react@18.2.0)
dev: false
/any-promise@1.3.0:
resolution: {integrity: sha512-7UvmKalWRt1wgjL1RrGxoSJW/0QZFIegpeGvZG9kjp8vrRu55XTHbwnqq2GpXm9uLbcuhxm3IqX9OB4MZR1b2A==}
@ -4751,6 +4773,10 @@ packages:
engines: {node: '>=6'}
dev: true
/escape-carriage@1.3.1:
resolution: {integrity: sha512-GwBr6yViW3ttx1kb7/Oh+gKQ1/TrhYwxKqVmg5gS+BK+Qe2KrOa/Vh7w3HPBvgGf0LfcDGoY9I6NHKoA5Hozhw==}
dev: false
/escape-string-regexp@1.0.5:
resolution: {integrity: sha512-vbRorB5FUQWvla16U8R/qgaFIya2qGzwDrNmCZuYKrbdSUMG6I1ZCGQRefkRVhuOkIGVne7BQ35DSfo1qvJqFg==}
engines: {node: '>=0.8.0'}

View file

@ -40,17 +40,10 @@ export function DeploymentCard({
}: {
deployment: RouterOutputs["projects"]["services"]["deployments"][number];
}) {
const [deploymentId, setDeploymentId] = useQueryParam(
"deploymentId",
StringParam,
);
const [_, setDeploymentId] = useQueryParam("deploymentId", StringParam);
return (
<Card>
{deploymentId === deployment.id && (
<DeploymentLogs deployment={deployment} />
)}
<div className="flex flex-row items-center p-4">
<div className="flex-grow">
<h2 className="flex flex-row items-center gap-2 align-middle text-lg capitalize">

View file

@ -7,62 +7,45 @@ import { useProject } from "../../../../_context/ProjectContext";
import { useState } from "react";
import { toast } from "sonner";
import { Drawer, DrawerContent } from "~/components/ui/drawer";
import { LogWindow, type LogLine } from "~/components/LogWindow";
import { StringParam, useQueryParam } from "use-query-params";
export function DeploymentLogs({
deployment,
}: {
deployment: RouterOutputs["projects"]["services"]["deployments"][number];
}) {
export function DeploymentLogs() {
const project = useProject();
const [logs, setLogs] = useState<string | null>(null);
const [logs, setLogs] = useState<LogLine[] | null>(null);
const [deploymentId, setDeploymentId] = useQueryParam(
"deploymentId",
StringParam,
);
api.projects.services.deploymentLogs.useSubscription(
{
serviceId: project.selectedService!.id,
deploymentId: deployment.id,
deploymentId: deploymentId ?? "",
projectId: project.id,
},
{
onData(data) {
setLogs(
(existing) =>
(existing += data
.split("\n")
.map((it) => {
try {
return JSON.parse(it);
} catch (error) {
return { m: it };
}
})
.map((it) => it.m)
.join("\n")),
);
setLogs((existing) => existing?.concat(data) ?? [data]);
},
onError(error) {
console.error(error);
toast.error("Failed to fetch logs: " + error.message);
},
enabled: !!deploymentId,
},
);
return (
<Drawer open={true}>
<Drawer open={!!deploymentId} onClose={() => setDeploymentId(null)}>
<DrawerContent className="max-h-full min-h-[80vh]">
<div className="mx-auto h-full min-w-[40vw] max-w-4xl">
<h2>Logs</h2>
{/* <pre>{logs}</pre> */}
<textarea
readOnly
value={logs}
style={{
width: "100%",
height: "100%",
fontFamily: "monospace",
whiteSpace: "pre-wrap",
}}
/>
<div className="mb-8 max-h-[80vh] overflow-scroll whitespace-nowrap">
{logs && <LogWindow logs={logs} />}
</div>
</div>
</DrawerContent>
</Drawer>

View file

@ -1,5 +1,6 @@
import { api } from "~/trpc/server";
import { DeploymentCard } from "./_components/DeploymentCard";
import { DeploymentLogs } from "./_components/DeploymentLogs";
export default async function DeploymentsPage({
params: { serviceId, projectId },
@ -29,6 +30,8 @@ export default async function DeploymentsPage({
</li>
))}
</ul>
<DeploymentLogs />
</div>
);
}

View file

@ -0,0 +1,32 @@
"use client";
import { useState } from "react";
import { api } from "~/trpc/react";
import { useProject } from "../../../_context/ProjectContext";
import { type LogLine, LogWindow } from "~/components/LogWindow";
export default function ServiceLogsPage() {
const project = useProject();
const [logs, setLogs] = useState<LogLine[] | null>(null);
api.projects.services.serviceLogs.useSubscription(
{
projectId: project.id,
serviceId: project.selectedService!.id,
},
{
onData(data) {
setLogs((log) => [...(log ?? []), JSON.parse(data) as LogLine]);
},
},
);
return (
<div>
<h1 className="text-xl">Logs</h1>
<pre className="max-h-[80vh] rounded-md bg-card p-4 text-white">
{logs && <LogWindow logs={logs} />}
</pre>
</div>
);
}

View file

@ -0,0 +1,52 @@
"use client";
import Ansi from "ansi-to-react";
export enum LogLevel {
/**
* Command Stdout
*/
Stdout,
/**
* Command Stderr
*/
Stderr,
/**
* Messages that did not originate from the command
*/
Notice,
}
export type LogLine = {
t?: number;
m: string;
l: LogLevel;
};
const LOG_LEVEL_TO_CLASS = {
[LogLevel.Stdout]: "",
[LogLevel.Stderr]: "bg-red-950/40",
[LogLevel.Notice]: "bg-blue-950/40",
};
export function LogWindow({ logs }: { logs: LogLine[] }) {
return (
<div className="h-full w-full select-text">
{logs.map((log, i) => (
<div
key={i}
className={`flex gap-2 py-1 text-sm ${LOG_LEVEL_TO_CLASS[log.l]}`}
>
<div className="mt-1 text-xs text-gray-400">
{log.t ? new Date(log.t).toLocaleTimeString() : ""}
</div>
<div className="flex-1 whitespace-pre">
<Ansi>{log.m}</Ansi>
</div>
</div>
))}
</div>
);
}

View file

@ -24,7 +24,7 @@ export const serviceMiddleware = experimental_standaloneMiddleware<{
const serviceDetails = await ServiceManager.findByNameOrId(
input.serviceId,
ctx.project.getData().id,
ctx.project,
);
if (!serviceDetails)

View file

@ -19,7 +19,10 @@ import { uuidv7 } from "uuidv7";
import logger from "~/server/utils/logger";
import { type Database } from "better-sqlite3";
import { getDeploymentsProcedure } from "./deployments";
import { getDeploymentLogsSubscription } from "./logs";
import {
getDeploymentLogsSubscription,
getServiceLogsSubscription,
} from "./logs";
export const serviceRouter = createTRPCRouter({
containers: getServiceContainers,
@ -28,6 +31,7 @@ export const serviceRouter = createTRPCRouter({
deleteDomain: deleteServiceDomainsProcedure,
deployments: getDeploymentsProcedure,
deploymentLogs: getDeploymentLogsSubscription,
serviceLogs: getServiceLogsSubscription,
get: authenticatedProcedure
.meta({

View file

@ -3,6 +3,14 @@ import { projectMiddleware } from "~/server/api/middleware/project";
import { serviceMiddleware } from "~/server/api/middleware/service";
import { authenticatedProcedure } from "~/server/api/trpc";
import { observable } from "@trpc/server/observable";
import assert from "node:assert";
import { docker404ToNull, streamSort } from "~/server/utils/serverUtils";
import { PassThrough, Transform } from "node:stream";
import type DockerModem from "docker-modem";
import { LogLevel } from "~/server/build/utils/BuilderLogger";
import type { LogLine } from "~/components/LogWindow";
import { Queue } from "datastructures-js";
import { Docker } from "~/server/docker/docker";
// trpc doesn't have stream support yet, so websockets it is
@ -18,13 +26,22 @@ export const getDeploymentLogsSubscription = authenticatedProcedure
.use(serviceMiddleware)
.subscription(async ({ ctx, input }) => {
const deployment = await ctx.service.getDeployment(input.deploymentId);
assert(deployment, "Deployment not found");
return observable<string>((emit) => {
return observable<LogLine>((emit) => {
const abort = new AbortController();
const logs = deployment.readBuildLogs(abort.signal);
logs.on("data", (data: string) => {
emit.next(data.toString());
logs.on("data", (data: Buffer) => {
const lines = data.toString().split("\n");
for (const line of lines) {
try {
emit.next(JSON.parse(line) as LogLine);
} catch (e) {
emit.next({ l: LogLevel.Stderr, m: line });
}
}
});
logs.on("end", () => {
@ -43,24 +60,62 @@ export const getServiceLogsSubscription = authenticatedProcedure
z.object({
serviceId: z.string(),
projectId: z.string(),
since: z.number().int().positive().optional(),
tail: z.number().int().positive().optional(),
}),
)
.use(projectMiddleware)
.use(serviceMiddleware)
.subscription(async ({ ctx, input }) => {
// const service = await ctx.service.getService(input.serviceId);
// return observable<string>((emit) => {
// const abort = new AbortController();
// const logs = service.readLogs(abort.signal);
// logs.on("data", (data: string) => {
// emit.next(data.toString());
// });
// logs.on("end", () => {
// emit.complete();
// });
// return () => {
// abort.abort();
// logs.destroy();
// };
// });
const abort = new AbortController();
const service = ctx.docker.getService(ctx.service.toDockerServiceName());
const logs = await service
.logs({
tail: input.tail,
since: input.since,
abortSignal: abort.signal,
follow: true,
stdout: true,
stderr: true,
timestamps: true,
})
.catch(docker404ToNull);
assert(
logs,
"Unable to retrieve service logs. Maybe the service is not running",
);
const unsorted = logs.pipe(Docker.demuxStream());
const final = new PassThrough();
streamSort(unsorted, final, (a, b) => {
const aTime = JSON.parse(a.toString()).t;
const bTime = JSON.parse(b.toString()).t;
return aTime - bTime;
});
function cleanup() {
unsorted.end();
}
// tail logs now
return observable<string>((emit) => {
final.on("data", (data: Buffer) => {
emit.next(data.toString());
});
logs.on("end", () => {
emit.complete();
cleanup();
});
return () => {
abort.abort();
cleanup();
};
});
});

View file

@ -2,6 +2,8 @@
import { spawn } from "child_process";
import Dockerode from "dockerode";
import logger from "../utils/logger";
import { PassThrough, Transform } from "stream";
import { LogLevel } from "../build/utils/BuilderLogger";
export class Docker extends Dockerode {
static buildContainerPrefixFromName(
@ -11,6 +13,51 @@ export class Docker extends Dockerode {
return `${projectName}_${serviceName}`;
}
/**
* An improved version of docker-modem's demuxStream
* as that one has race conditions since it doesn't wait for writes to finish
* and writes to separate streams
*/
static demuxStream() {
return new Transform({
transform(chunk: Buffer, encoding?: unknown, callback: () => void) {
console.log("chunk", chunk.toString());
if (chunk.length < 8) {
this.push(chunk);
callback();
return;
}
const header = chunk.subarray(0, 8);
const dataType = header.readUint8(0);
const dataLength = header.readUint32BE(4);
if (chunk.length < dataLength + 8) {
this.push(chunk);
callback();
return;
}
const content = chunk
.subarray(8, dataLength + 8)
.toString()
.split(" ");
const timestamp = new Date(content.shift() ?? "");
const message = content.join(" ");
this.push(
JSON.stringify({
t: timestamp.getTime(),
m: message,
l: dataType === 1 ? LogLevel.Stdout : LogLevel.Stderr,
}),
);
callback();
},
});
}
private log = logger.child({ module: "docker" });
/**

View file

@ -7,6 +7,10 @@ import { service, serviceGeneration } from "../db/schema";
import { ServiceSource } from "../db/types";
import logger from "../utils/logger";
import Deployment from "./Deployment";
import type { Docker } from "../docker/docker";
import { docker404ToNull } from "../utils/serverUtils";
import type ProjectManager from "./Project";
import { type paths as DockerAPITypes } from "~/server/docker/types";
export default class ServiceManager {
private static LOGGER = logger.child({
@ -33,20 +37,21 @@ export default class ServiceManager {
latestGeneration?: typeof serviceGeneration.$inferSelect;
deployedGeneration?: typeof serviceGeneration.$inferSelect;
},
private parentProject: ProjectManager,
) {}
/**
* Finds a service by name or ID.
*/
static async findByNameOrId(nameOrId: string, projectId: string) {
static async findByNameOrId(nameOrId: string, project: ProjectManager) {
const data = await db.query.service.findFirst({
where: and(
eq(service.projectId, projectId),
eq(service.projectId, project.getData().id),
or(eq(service.name, nameOrId), eq(service.id, nameOrId)),
),
});
return data ? new ServiceManager(data) : null;
return data ? new ServiceManager(data, project) : null;
}
public getData() {
@ -188,9 +193,18 @@ export default class ServiceManager {
},
});
if (!deployment) return null;
return new Deployment(deployment, this);
}
public toDockerServiceName() {
const parent = this.parentProject.getData().internalName;
const service = this.serviceData.name;
return `${parent}_${service}`;
}
/**
* Fetches the latest generation of the service.
* @param forceRefetch if true, will ignore what's cached internally

View file

@ -1,3 +1,6 @@
import { PriorityQueue } from "datastructures-js";
import { Readable, Writable } from "node:stream";
export function docker404ToNull(err: unknown) {
if (
typeof err === "object" &&
@ -9,3 +12,54 @@ export function docker404ToNull(err: unknown) {
throw err;
}
const MAX_QUEUE_SIZE = 100;
const MAX_HOLD_TIME = 100;
/**
* Sometimes, docker logs are out of order (maybe investigate this)
* This function sorts the logs by holding the last `n` logs in memory
* and writing them in order.
*/
export function streamSort(
source: Readable,
dest: Writable,
compare: (a: Buffer, b: Buffer) => number,
) {
// keep an internal priority queue of the next line to write
const pq = new PriorityQueue<Buffer>(compare);
source.on("data", (buff: Buffer) => {
pq.enqueue(buff);
if (pq.size() > MAX_QUEUE_SIZE) {
dest.write(pq.dequeue());
} else {
setTimeout(() => {
if (pq.isEmpty()) return;
dest.write(pq.dequeue());
}, MAX_HOLD_TIME);
}
});
source.on("end", () => {
while (!pq.isEmpty()) {
dest.write(pq.dequeue());
}
dest.end();
});
}
/**
* Parses ISO to nanosecond precision
*/
export function parseISODate(date: string) {
const nanoseconds = date.split(".")[1]?.split("Z")[0];
if (!nanoseconds) return new Date(date);
const ns = BigInt(parseInt(nanoseconds));
const d = BigInt(new Date(date.split(".")[0] + "Z").getTime());
return d * BigInt(1e6) + ns;
}