Asynchronous AI Processing Architecture: Building Non-Blocking User Experiences for Long-Running AI

Leader posted Originally published at gist.github.com 6 min read

How to handle AI image generation requests that take 5-15+ seconds without blocking your users or consuming server resources?

The Challenge

AI-powered applications face a fundamental user experience challenge: modern AI tasks like image generation, video processing, or complex language model operations often require 5-30 seconds to complete. Traditional synchronous request-response patterns create several problems:

Connection Timeouts: HTTP requests timing out during long AI processing, leaving users with error messages instead of results.

Resource Blocking: Server connections remain open during processing, consuming valuable server resources and limiting concurrent user capacity.

Poor User Experience: Users stare at loading screens without progress indicators or the ability to navigate elsewhere while waiting.

Scalability Issues: Each processing request ties up a server connection, severely limiting the number of concurrent users your application can handle.

The Async Architecture Solution

The solution is an asynchronous, prediction-based architecture that decouples AI task submission from result delivery. This approach immediately returns control to the user while processing happens in the background.

The core workflow consists of three distinct phases:

  1. Immediate Submission: User submits request → Server validates and creates AI task → Returns task ID instantly
  2. Background Processing: AI service processes task → Sends webhook notification when complete
  3. Result Delivery: Frontend polls for status updates → Displays result when ready

Architecture Components

1. Non-Blocking API Endpoint

The primary endpoint handles task submission without waiting for AI completion:

export async function POST(request: NextRequest): Promise<NextResponse<APIResponse>> {
  try {
    // 1. Validate input parameters
    const validationResult = schema.safeParse(formData);
    if (!validationResult.success) {
      return await createErrorResponse({ errorCode: EValidationErrorCode.INVALID_PARAMETER });
    }

    // 2. Authentication and rate limiting checks
    const session = await auth();
    const throttleResult = await checkAnonymousUserRequestLimit({ userIdentifier });
    if (!throttleResult.success) {
      return await createErrorResponse({ errorCode: throttleResult.errorCode });
    }

    // 3. Create async AI prediction task
    const webhookUrl = `${process.env.API_BASE_URL}/api/webhooks/replicate`;
    const result = await generateJerseyEffectAsync({
      payload: validatedData,
      requestId: generateRequestId(),
      webhookUrl,
    });

    // 4. Store prediction status in KV store with TTL
    await PredictionStatusManager.createPrediction(result.data.predictionId, {
      status: 'starting',
      requestId: result.data.requestId,
      userId: session?.user?.uuid,
      payload,
      createdAt: Date.now(),
    });

    // 5. Launch concurrent background tasks (non-blocking)
    const backgroundTasks = [
      reserveUserCredits({ userUuid, predictionId, creditsAmount }),
      createUserWorkRecord({ userUuid, payload, predictionId }),
    ];
    
    // Execute background tasks without waiting
    Promise.allSettled(backgroundTasks);

    // 6. Return immediately with prediction ID
    return NextResponse.json({
      success: true,
      message: 'Prediction created successfully',
      data: {
        predictionId: result.data.predictionId,
        requestId: result.data.requestId,
        status: 'starting',
        estimatedTime: result.data.estimatedTime,
      },
    });

  } catch (error) {
    return await createErrorResponse({ errorCode: ECommonErrorCode.INTERNAL_SERVER_ERROR });
  }
}

2. Intelligent Status Management

A centralized status manager handles prediction state across the entire lifecycle:

export class PredictionStatusManager {
  // Store prediction with initial status
  static async createPrediction(predictionId: string, data: PredictionData) {
    await putKV(`prediction:${predictionId}`, {
      ...data,
      status: 'starting',
      createdAt: Date.now(),
    }, { expirationTtl: 3600 }); // 1 hour TTL
  }

  // Update status during processing
  static async markAsProcessing(predictionId: string) {
    const current = await this.getPredictionStatus(predictionId);
    if (current.success && current.data) {
      await putKV(`prediction:${predictionId}`, {
        ...current.data,
        status: 'processing',
        updatedAt: Date.now(),
      });
    }
  }

  // Mark completion with results
  static async markAsSucceeded(predictionId: string, imageUrl: string, filePath: string, duration?: number) {
    const current = await this.getPredictionStatus(predictionId);
    if (current.success && current.data) {
      await putKV(`prediction:${predictionId}`, {
        ...current.data,
        status: 'succeeded',
        result: {
          imageUrl,
          filePath,
          generationTime: duration,
        },
        completedAt: Date.now(),
      });
    }
  }
}

3. Webhook Processing Pipeline

The webhook endpoint handles AI service completion notifications with comprehensive error handling:

export async function POST(request: NextRequest): Promise<NextResponse<APIResponse>> {
  performanceMonitor.start('webhook-total');
  
  try {
    // 1. Security verification (prevent replay attacks)
    const body = await request.text();
    const verification = await verifyWebhookRequest(request, body, {
      secret: process.env.REPLICATE_WEBHOOK_SECRET,
      checkTimestamp: true,
    });

    if (!verification.valid) {
      return await createErrorResponse({ errorCode: ECommonErrorCode.SERVICE_UNAVAILABLE });
    }

    // 2. Parse webhook event
    const event: ReplicateWebhookEvent = JSON.parse(body);
    console.log(' Webhook received:', { id: event.id, status: event.status });

    // 3. Route to appropriate handler
    switch (event.status) {
      case 'starting':
      case 'processing':
        await PredictionStatusManager.markAsProcessing(event.id);
        break;

      case 'succeeded':
        await handleSuccessfulPrediction(event.id, event);
        break;

      case 'failed':
      case 'canceled':
        await handleFailedPrediction(event.id, event.error || 'Task canceled');
        break;
    }

    return NextResponse.json({ success: true, processed: true });

  } catch (error) {
    console.error('❌ Webhook processing error:', error);
    return await createErrorResponse({ errorCode: ECommonErrorCode.INTERNAL_SERVER_ERROR });
  }
}

async function handleSuccessfulPrediction(predictionId: string, event: ReplicateWebhookEvent) {
  // 1. Extract result URL from AI service response
  const imageUrl = Array.isArray(event.output) ? event.output[0] : event.output;
  
  // 2. Download and store result to permanent storage
  const imageResponse = await fetch(imageUrl);
  const imageBuffer = await imageResponse.arrayBuffer();
  
  const uploadResult = await uploadFile({
    path: 'jerseys',
    fileName: `${Date.now()}-${predictionId}.png`,
    file: imageBuffer,
    contentType: 'image/png',
  });

  const finalImageUrl = generatePublicUrl(uploadResult.filePath);

  // 3. Update status with final result
  await PredictionStatusManager.markAsSucceeded(
    predictionId,
    finalImageUrl,
    uploadResult.filePath,
    event.metrics?.predict_time * 1000
  );

  // 4. Update user records and confirm billing
  await updateUserWorkStatus(predictionId, {
    workResult: finalImageUrl,
    generationStatus: 'completed',
    generationDuration: event.metrics?.predict_time * 1000,
  });

  await confirmCreditDeduction({ userUuid, predictionId, actualCreditsUsed });
}

4. Concurrent Background Processing

The architecture maximizes efficiency by running multiple operations concurrently without blocking the main response:

// After successful prediction creation, launch background tasks
const concurrentTasks: Promise<void>[] = [];

// Credit reservation task (for paid users)
if (hasPaid && userUuid && requiredCredits > 0) {
  const creditReserveTask = (async () => {
    const reserveResult = await reserveUserCredits({
      userUuid,
      predictionId,
      creditsAmount: requiredCredits,
      taskType: 'text_to_image',
      taskDescription: `Jersey generation: ${JSON.stringify(payload).substring(0, 100)}`,
    });
    
    if (!reserveResult.success) {
      console.error('❌ Credit reservation failed:', reserveResult.message);
    }
  })();
  
  concurrentTasks.push(creditReserveTask);
}

// User work record creation task
if (userUuid) {
  const createWorkTask = (async () => {
    await createUserWorkRecord({
      userUuid,
      payload,
      requestId,
      predictionId,
      isAnonymous: !isAuthenticated,
    });
  })();
  
  concurrentTasks.push(createWorkTask);
}

// Execute all tasks concurrently without waiting for results
if (concurrentTasks.length > 0) {
  Promise.allSettled(concurrentTasks).then((results) => {
    console.log(' Background tasks completed:', {
      total: results.length,
      fulfilled: results.filter(r => r.status === 'fulfilled').length,
      rejected: results.filter(r => r.status === 'rejected').length,
    });
  });
}

// Return immediately to user
return NextResponse.json({ success: true, data: { predictionId, status: 'starting' } });

Frontend Integration Pattern

The client-side implementation uses intelligent polling with exponential backoff:

async function pollPredictionStatus(predictionId: string) {
  const delays = [1000, 2000, 5000, 10000]; // 1s → 2s → 5s → 10s
  let attemptIndex = 0;
  
  while (attemptIndex < 60) { // Maximum 10 minutes polling
    try {
      const response = await fetch(`/api/predictions/${predictionId}/status`);
      const result = await response.json();
      
      if (result.data.status === 'succeeded') {
        displayResult(result.data.result.imageUrl);
        return;
      } else if (result.data.status === 'failed') {
        displayError(result.data.error);
        return;
      }
      
      // Continue polling with progressive delays
      const delay = delays[Math.min(attemptIndex, delays.length - 1)];
      await new Promise(resolve => setTimeout(resolve, delay));
      attemptIndex++;
      
    } catch (error) {
      console.error('Polling error:', error);
      await new Promise(resolve => setTimeout(resolve, 2000));
    }
  }
}

Monitoring and Debugging

Async architectures require comprehensive monitoring to maintain reliability:

// Performance monitoring integration
export class PerformanceMonitor {
  static start(operation: string, metadata?: Record<string, any>) {
    console.log(` Starting ${operation}`, metadata);
    return Date.now();
  }
  
  static end(operation: string, startTime: number) {
    const duration = Date.now() - startTime;
    console.log(`✅ Completed ${operation} in ${duration}ms`);
    
    // Send metrics to monitoring service in production
    if (process.env.NODE_ENV === 'production') {
      sendMetric(`operation.${operation}.duration`, duration);
      sendMetric(`operation.${operation}.count`, 1);
    }
    
    return duration;
  }
}

// Status debugging endpoint
export async function GET(request: NextRequest) {
  const predictionId = request.nextUrl.searchParams.get('predictionId');
  
  if (!predictionId) {
    return NextResponse.json({ error: 'Missing predictionId' }, { status: 400 });
  }
  
  const status = await PredictionStatusManager.getPredictionStatus(predictionId);
  
  return NextResponse.json({
    prediction: status.data,
    debug: {
      kvTtl: await getKV(`prediction:${predictionId}:ttl`),
      queueStatus: await checkReplicateQueue(),
      systemHealth: await checkSystemHealth(),
    }
  });
}

Architecture Benefits

Immediate User Feedback: Users receive instant confirmation that their request is being processed, eliminating the anxiety of waiting without feedback.

Resource Efficiency: Server connections are freed immediately after task submission, allowing the same server to handle hundreds of concurrent users.

Fault Tolerance: Webhook retries and status persistence ensure results are delivered even if temporary network issues occur.

Scalable Processing: AI processing happens on specialized services while your API servers remain available for new requests.

Progressive Enhancement: Frontend can provide real-time status updates, estimated completion times, and allow users to continue using the application while waiting.

Robust Error Recovery: Failed predictions automatically trigger credit refunds and user notifications, while webhook retry mechanisms ensure eventual consistency even during temporary outages.

Production Performance

This architecture has been successfully deployed at Fastjrsy, an AI jersey design generator that processes thousands of image generation requests daily. Key performance metrics:

  • Response Time: API endpoints respond in under 200ms
  • Throughput: Handles 50+ concurrent prediction requests per second
  • Completion Rate: 99.2% of predictions complete successfully
  • Average Processing Time: Under 5 seconds for speed mode, under 8 seconds for quality mode, 15 seconds for scene generation
  • User Retention: 40% reduction in page abandonment during generation

The architecture scales efficiently across Cloudflare's global edge network, providing consistent performance regardless of user location or current load.

This async processing pattern transforms long-running AI operations from user experience blockers into smooth, progressive interactions that maintain user engagement and system efficiency at scale.

If you read this far, tweet to the author to show them you care. Tweet a Thanks

More Posts

Blocking vs Non-blocking vs Asynchronous I/O

Sachin Tolay - Aug 1

Building you first AI Agent for a Task Organizer with Still.js and Groq

Nakassony Bernardo - Jul 19

Building Credit Systems and User Management for AI Applications

horushe - Sep 21

An introduction to moving from just an everyday AI user to developing AI solutions specific for you

Samuel Ekirigwe - Jan 14

The Secret Weapon for Java Developers Building with AI

myfear - Mar 18
chevron_left