Eloquent

Documentation

SSE Progress Updates

This guide covers implementing real-time progress updates using Server-Sent Events (SSE). Use SSE for long-running operations like crawling, clustering, AI generation, and bulk processing.

When to Use SSE

SSE is ideal for:

  • Long-running operations (> 5 seconds)
  • Progress tracking (0-100% completion)
  • Multi-step processes (crawling, analysis, generation)
  • Server-to-client streaming (AI responses)

Architecture

Browser                    API Gateway              Backend Service
   │                           │                         │
   │ ──── SSE Connect ──────► │ ──── SSE Connect ────► │
   │                           │                         │
   │ ◄──── Progress 10% ───── │ ◄──── Progress ─────── │
   │ ◄──── Progress 50% ───── │ ◄──── Progress ─────── │
   │ ◄──── Progress 100% ──── │ ◄──── Complete ─────── │
   │                           │                         │

Backend Implementation

SSE Handler

func (h *Handler) ProcessWithProgress(w http.ResponseWriter, r *http.Request) {
    // Set SSE headers
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "SSE not supported", http.StatusInternalServerError)
        return
    }

    // Create progress channel
    progressChan := make(chan ProgressUpdate, 10)

    // Start async processing
    go h.service.ProcessAsync(r.Context(), progressChan)

    // Stream progress updates
    for update := range progressChan {
        data, _ := json.Marshal(update)
        fmt.Fprintf(w, "data: %s\n\n", data)
        flusher.Flush()
    }
}

Progress Update Model

type ProgressUpdate struct {
    Status    string  `json:"status"`    // "processing", "completed", "error"
    Progress  float64 `json:"progress"`  // 0-100
    Message   string  `json:"message"`
    Step      string  `json:"step"`      // Current step name
    Data      any     `json:"data,omitempty"` // Final result
}

Service Layer

func (s *Service) ProcessAsync(ctx context.Context, progress chan<- ProgressUpdate) {
    defer close(progress)

    // Step 1: Fetch data
    progress <- ProgressUpdate{Status: "processing", Progress: 10, Step: "Fetching data"}
    data, err := s.fetchData(ctx)
    if err != nil {
        progress <- ProgressUpdate{Status: "error", Message: err.Error()}
        return
    }

    // Step 2: Process
    progress <- ProgressUpdate{Status: "processing", Progress: 50, Step: "Processing"}
    result, err := s.processData(ctx, data)
    if err != nil {
        progress <- ProgressUpdate{Status: "error", Message: err.Error()}
        return
    }

    // Step 3: Complete
    progress <- ProgressUpdate{
        Status:   "completed",
        Progress: 100,
        Message:  "Processing complete",
        Data:     result,
    }
}

Frontend Implementation

SSE Hook

// hooks/use-sse-progress.ts
"use client";

import { useState, useEffect, useCallback } from "react";

interface ProgressUpdate {
  status: "processing" | "completed" | "error";
  progress: number;
  message: string;
  step?: string;
  data?: any;
}

export function useSSEProgress(endpoint: string) {
  const [progress, setProgress] = useState<ProgressUpdate | null>(null);
  const [isConnected, setIsConnected] = useState(false);

  const start = useCallback(async () => {
    const token = await getGatewayToken();

    const eventSource = new EventSource(
      `${endpoint}?token=${token}`,
      { withCredentials: true }
    );

    setIsConnected(true);

    eventSource.onmessage = (event) => {
      const update = JSON.parse(event.data) as ProgressUpdate;
      setProgress(update);

      if (update.status === "completed" || update.status === "error") {
        eventSource.close();
        setIsConnected(false);
      }
    };

    eventSource.onerror = () => {
      eventSource.close();
      setIsConnected(false);
      setProgress({ status: "error", progress: 0, message: "Connection lost" });
    };

    return () => {
      eventSource.close();
      setIsConnected(false);
    };
  }, [endpoint]);

  return { progress, isConnected, start };
}

Progress Component

// components/progress-indicator.tsx
"use client";

import { useSSEProgress } from "@/hooks/use-sse-progress";

export function ProcessingProgress({ endpoint }: { endpoint: string }) {
  const { progress, isConnected, start } = useSSEProgress(endpoint);

  return (
    <div className="space-y-4">
      <button onClick={start} disabled={isConnected}>
        {isConnected ? "Processing..." : "Start Processing"}
      </button>

      {progress && (
        <div>
          <div className="h-2 bg-gray-200 rounded">
            <div
              className="h-2 bg-blue-500 rounded transition-all"
              style={{ width: `${progress.progress}%` }}
            />
          </div>
          <p className="text-sm text-gray-600">
            {progress.step}: {progress.message}
          </p>
        </div>
      )}
    </div>
  );
}

API Gateway Configuration

# routes.yaml
- path: /api/v1/process/stream
  method: GET
  proxy_upstream: $PROCESSOR_SERVICE_URL
  auth_required: true
  timeout: 300s  # Long timeout for streaming
  description: SSE progress stream

Best Practices

  1. Use channels for progress communication
  2. Set appropriate timeouts on both client and server
  3. Handle reconnection on the frontend
  4. Include step names for better UX
  5. Close connections properly when done

Next Steps