JobSystem
The JobSystem is a core component of Raftel Engine that provides efficient multi-threaded task execution. It creates a thread pool based on available CPU cores and allows you to enqueue tasks that will be executed asynchronously by worker threads, improving performance on multi-core processors.
Key Features
- Automatic thread pool creation based on available CPU cores
- Simple API for submitting parallel tasks
- Future-based asynchronous result handling
- Thread synchronization with mutex and condition variables
Using JobSystem
The JobSystem is designed to be easy to use, allowing you to quickly parallelize your application's workload. Here's how to get started:
Creating a JobSystem
// Create a JobSystem instance
auto jobSystem = Raftel::JobSystem::make();
Single Instance
The JobSystem prohibits copy or move operations to prevent accidental duplication or transfer of instances, ensuring only one instance manages the worker threads.
Submitting Tasks
You can submit tasks to the JobSystem using the AddWork
method:
// Submit a task to compute a sum
auto futureResult = jobSystem->AddWork([](int a, int b) {
return a + b;
}, 5, 3);
// Wait for the result
int result = futureResult.get(); // result = 8
More Complex Tasks
// Function to parallelize array processing
void processArrayInParallel(std::vector<float>& data, Raftel::JobSystem* jobSystem) {
// Determine the number of chunks based on hardware threads
size_t totalSize = data.size();
size_t numChunks = std::thread::hardware_concurrency();
size_t chunkSize = totalSize / numChunks;
// Create futures for each chunk
std::vector<std::future<void>> futures;
// Create and submit tasks for each chunk
for (size_t i = 0; i < numChunks; ++i) {
size_t start = i * chunkSize;
size_t end = (i == numChunks - 1) ? totalSize : (i + 1) * chunkSize;
futures.push_back(jobSystem->AddWork([&data, start, end]() {
for (size_t j = start; j < end; ++j) {
// Process each element in the chunk
data[j] = std::sqrt(data[j]) * 2.0f;
}
}));
}
// Wait for all chunks to complete
for (auto& future : futures) {
future.wait();
}
}
Thread Safety
When working with the JobSystem, ensure your tasks are thread-safe. Be careful when accessing shared resources from multiple tasks, and use appropriate synchronization when necessary.
Implementation Details
Class Structure
class JobSystem
{
public:
~JobSystem();
static std::unique_ptr<JobSystem> make();
NO_COPYABLE_OR_MOVABLE(JobSystem)
template<typename MyJobFunction, typename... Arguments>
auto AddWork(MyJobFunction&& function, Arguments&&... params) -> std::future<decltype(function(params...))>;
JobSystem();
private:
std::vector<std::thread> workers_; // Worker threads pool
std::mutex queue_mutex_; // Mutex for protecting the work queue
std::condition_variable warner_job_avaliable_; // Condition variable for job availability
std::queue<std::function<void()>> work_queue_; // Queue storing pending jobs
bool bStopThreads_; // Flag to indicate if threads should stop execution
};
AddWork Implementation
template<typename MyJobFunction, typename ...Arguments>
inline auto JobSystem::AddWork(MyJobFunction&& function, Arguments && ...params) -> std::future<decltype(function(params...))>
{
auto packet = [function = std::move(function), ...params = std::move(params)](){ return function(params...); };
std::lock_guard lock{queue_mutex_};
using work_type = decltype(function(params...));
auto return_type = std::make_shared<std::packaged_task<work_type()>>(packet);
std::future<work_type> f = return_type->get_future();
work_queue_.push([return_type](){(*return_type)();});
warner_job_avaliable_.notify_one();
return f;
}
Common Usage Patterns
The JobSystem can be applied to various tasks in game development. Here are some common use cases:
Physics Calculations
// Parallelize physics calculations across game entities
void updatePhysics(std::vector<GameObject>& objects, float deltaTime, Raftel::JobSystem* jobSystem) {
const size_t numObjects = objects.size();
const size_t numThreads = std::thread::hardware_concurrency();
const size_t batchSize = (numObjects + numThreads - 1) / numThreads;
std::vector<std::future<void>> futures;
for (size_t i = 0; i < numThreads; ++i) {
size_t start = i * batchSize;
size_t end = std::min(start + batchSize, numObjects);
if (start >= numObjects) break;
futures.push_back(jobSystem->AddWork([&objects, start, end, deltaTime]() {
for (size_t j = start; j < end; ++j) {
objects[j].updatePhysics(deltaTime);
}
}));
}
// Wait for all physics updates to complete
for (auto& future : futures) {
future.wait();
}
}
Asset Loading
// Load multiple assets in parallel
void loadAssets(Raftel::JobSystem* jobSystem) {
// Start loading each asset asynchronously
auto futureModel = jobSystem->AddWork([](const std::string& path) {
return loadModel(path);
}, "assets/models/character.obj");
auto futureTexture = jobSystem->AddWork([](const std::string& path) {
return loadTexture(path);
}, "assets/textures/diffuse.png");
// Continue with other work while assets are loading...
// When needed, wait for the assets to finish loading
auto model = futureModel.get();
auto texture = futureTexture.get();
// Use the loaded assets...
}
Best Practices
Task Granularity
Choose an appropriate task size. Tasks should be large enough to outweigh threading overhead but small enough to distribute work evenly.
Avoid Thread Contention
Minimize locking and synchronization between tasks. Design tasks to operate on independent data when possible.
Handle Exceptions
Tasks that throw exceptions will store the exception in the future. Always check for exceptions when retrieving results.
Single JobSystem
Create only one JobSystem instance in your application to avoid oversubscription of CPU resources.
Common Pitfalls
Thread Safety Issues
Be careful when multiple jobs access shared resources. Use appropriate synchronization mechanisms to protect shared data.
Deadlocks
Be cautious with nested waits or circular dependencies between tasks. If Task A waits for Task B, but Task B depends on a result from Task A, this can lead to deadlocks.
Example: Task Dependencies
// Example of chaining dependent tasks
void chainedTasks(Raftel::JobSystem* jobSystem) {
// Step 1: Load data
auto futureData = jobSystem->AddWork([]() {
// Load some data
std::vector<float> data(1000);
// Fill data...
return data;
});
// Step 2: Process data (depends on Step 1)
auto futureProcessed = jobSystem->AddWork([futureData = std::move(futureData)]() mutable {
// Wait for the data to be loaded
auto data = futureData.get();
// Process the data
std::vector<float> processed(data.size());
for (size_t i = 0; i < data.size(); ++i) {
processed[i] = std::sqrt(data[i]);
}
return processed;
});
// Get the final result when needed
auto processed = futureProcessed.get();
}
Integration Example
#include "raftel/systems.hpp"
#include "raftel/job_system.hpp"
int main() {
// Create window and systems
auto windowSystem = Raftel::WindowSystem::make();
auto window = Raftel::Window::make("JobSystem Demo", *windowSystem);
// Create job system
auto jobSystem = Raftel::JobSystem::make();
// Game loop
while (!window->ShouldClose()) {
// Use job system for parallel work
// Example: Update physics for game objects
std::vector<GameObject> objects;
updatePhysics(objects, 0.016f, jobSystem.get());
// Render frame
window->swapBuffers();
}
return 0;
}
Related Topics
JobSystem Tutorial
A step-by-step guide to using the JobSystem in your applications.
RenderSystem
Learn how JobSystem can be used with the RenderSystem for parallel rendering tasks.
Entity Component System
Understand how to use JobSystem with the ECS architecture for efficient game object updates.