JobSystem

The JobSystem is a core component of Raftel Engine that provides efficient multi-threaded task execution. It creates a thread pool based on available CPU cores and allows you to enqueue tasks that will be executed asynchronously by worker threads, improving performance on multi-core processors.

Key Features

  • Automatic thread pool creation based on available CPU cores
  • Simple API for submitting parallel tasks
  • Future-based asynchronous result handling
  • Thread synchronization with mutex and condition variables

Using JobSystem

The JobSystem is designed to be easy to use, allowing you to quickly parallelize your application's workload. Here's how to get started:

Creating a JobSystem

Creating a JobSystem Instance
// Create a JobSystem instance
auto jobSystem = Raftel::JobSystem::make();

Single Instance

The JobSystem prohibits copy or move operations to prevent accidental duplication or transfer of instances, ensuring only one instance manages the worker threads.

Submitting Tasks

You can submit tasks to the JobSystem using the AddWork method:

Basic Task Submission
// Submit a task to compute a sum
auto futureResult = jobSystem->AddWork([](int a, int b) {
    return a + b;
}, 5, 3);

// Wait for the result
int result = futureResult.get(); // result = 8

More Complex Tasks

Processing an Array in Parallel
// Function to parallelize array processing
void processArrayInParallel(std::vector<float>& data, Raftel::JobSystem* jobSystem) {
    // Determine the number of chunks based on hardware threads
    size_t totalSize = data.size();
    size_t numChunks = std::thread::hardware_concurrency();
    size_t chunkSize = totalSize / numChunks;
    
    // Create futures for each chunk
    std::vector<std::future<void>> futures;
    
    // Create and submit tasks for each chunk
    for (size_t i = 0; i < numChunks; ++i) {
        size_t start = i * chunkSize;
        size_t end = (i == numChunks - 1) ? totalSize : (i + 1) * chunkSize;
        
        futures.push_back(jobSystem->AddWork([&data, start, end]() {
            for (size_t j = start; j < end; ++j) {
                // Process each element in the chunk
                data[j] = std::sqrt(data[j]) * 2.0f;
            }
        }));
    }
    
    // Wait for all chunks to complete
    for (auto& future : futures) {
        future.wait();
    }
}

Thread Safety

When working with the JobSystem, ensure your tasks are thread-safe. Be careful when accessing shared resources from multiple tasks, and use appropriate synchronization when necessary.

Implementation Details

Class Structure

JobSystem Class Definition
class JobSystem
{
public:
    ~JobSystem();
    static std::unique_ptr<JobSystem> make();
    
    NO_COPYABLE_OR_MOVABLE(JobSystem)
    
    template<typename MyJobFunction, typename... Arguments>
    auto AddWork(MyJobFunction&& function, Arguments&&... params) -> std::future<decltype(function(params...))>;
    
    JobSystem();

private:
    std::vector<std::thread> workers_;  // Worker threads pool
    std::mutex queue_mutex_;  // Mutex for protecting the work queue
    std::condition_variable warner_job_avaliable_;  // Condition variable for job availability
    std::queue<std::function<void()>> work_queue_;  // Queue storing pending jobs
    bool bStopThreads_;  // Flag to indicate if threads should stop execution
};

AddWork Implementation

AddWork Method Implementation
template<typename MyJobFunction, typename ...Arguments>
inline auto JobSystem::AddWork(MyJobFunction&& function, Arguments && ...params) -> std::future<decltype(function(params...))>
{
    auto packet = [function = std::move(function), ...params = std::move(params)](){ return function(params...); };
    std::lock_guard lock{queue_mutex_};

    using work_type = decltype(function(params...));
    auto return_type = std::make_shared<std::packaged_task<work_type()>>(packet);

    std::future<work_type> f = return_type->get_future();
    work_queue_.push([return_type](){(*return_type)();});
    warner_job_avaliable_.notify_one();
    return f;
}

Common Usage Patterns

The JobSystem can be applied to various tasks in game development. Here are some common use cases:

Physics Calculations

Parallelizing Physics Updates
// Parallelize physics calculations across game entities
void updatePhysics(std::vector<GameObject>& objects, float deltaTime, Raftel::JobSystem* jobSystem) {
    const size_t numObjects = objects.size();
    const size_t numThreads = std::thread::hardware_concurrency();
    const size_t batchSize = (numObjects + numThreads - 1) / numThreads;
    
    std::vector<std::future<void>> futures;
    
    for (size_t i = 0; i < numThreads; ++i) {
        size_t start = i * batchSize;
        size_t end = std::min(start + batchSize, numObjects);
        
        if (start >= numObjects) break;
        
        futures.push_back(jobSystem->AddWork([&objects, start, end, deltaTime]() {
            for (size_t j = start; j < end; ++j) {
                objects[j].updatePhysics(deltaTime);
            }
        }));
    }
    
    // Wait for all physics updates to complete
    for (auto& future : futures) {
        future.wait();
    }
}

Asset Loading

Asynchronous Asset Loading
// Load multiple assets in parallel
void loadAssets(Raftel::JobSystem* jobSystem) {
    // Start loading each asset asynchronously
    auto futureModel = jobSystem->AddWork([](const std::string& path) {
        return loadModel(path);
    }, "assets/models/character.obj");
    
    auto futureTexture = jobSystem->AddWork([](const std::string& path) {
        return loadTexture(path);
    }, "assets/textures/diffuse.png");
    
    // Continue with other work while assets are loading...
    
    // When needed, wait for the assets to finish loading
    auto model = futureModel.get();
    auto texture = futureTexture.get();
    
    // Use the loaded assets...
}

Best Practices

Task Granularity

Choose an appropriate task size. Tasks should be large enough to outweigh threading overhead but small enough to distribute work evenly.

Avoid Thread Contention

Minimize locking and synchronization between tasks. Design tasks to operate on independent data when possible.

Handle Exceptions

Tasks that throw exceptions will store the exception in the future. Always check for exceptions when retrieving results.

Single JobSystem

Create only one JobSystem instance in your application to avoid oversubscription of CPU resources.

Common Pitfalls

Thread Safety Issues

Be careful when multiple jobs access shared resources. Use appropriate synchronization mechanisms to protect shared data.

Deadlocks

Be cautious with nested waits or circular dependencies between tasks. If Task A waits for Task B, but Task B depends on a result from Task A, this can lead to deadlocks.

Example: Task Dependencies

Implementing Task Dependencies
// Example of chaining dependent tasks
void chainedTasks(Raftel::JobSystem* jobSystem) {
    // Step 1: Load data
    auto futureData = jobSystem->AddWork([]() {
        // Load some data
        std::vector<float> data(1000);
        // Fill data...
        return data;
    });
    
    // Step 2: Process data (depends on Step 1)
    auto futureProcessed = jobSystem->AddWork([futureData = std::move(futureData)]() mutable {
        // Wait for the data to be loaded
        auto data = futureData.get();
        
        // Process the data
        std::vector<float> processed(data.size());
        for (size_t i = 0; i < data.size(); ++i) {
            processed[i] = std::sqrt(data[i]);
        }
        
        return processed;
    });
    
    // Get the final result when needed
    auto processed = futureProcessed.get();
}

Integration Example

Using JobSystem in a Raftel Application
#include "raftel/systems.hpp"
#include "raftel/job_system.hpp"

int main() {
    // Create window and systems
    auto windowSystem = Raftel::WindowSystem::make();
    auto window = Raftel::Window::make("JobSystem Demo", *windowSystem);
    
    // Create job system
    auto jobSystem = Raftel::JobSystem::make();
    
    // Game loop
    while (!window->ShouldClose()) {
        // Use job system for parallel work
        // Example: Update physics for game objects
        std::vector<GameObject> objects;
        updatePhysics(objects, 0.016f, jobSystem.get());
        
        // Render frame
        window->swapBuffers();
    }
    
    return 0;
}

Related Topics

JobSystem Tutorial

A step-by-step guide to using the JobSystem in your applications.

RenderSystem

Learn how JobSystem can be used with the RenderSystem for parallel rendering tasks.

Entity Component System

Understand how to use JobSystem with the ECS architecture for efficient game object updates.