Starts jobs system
This commit is contained in:
14
IDHANMigration/src/100-jobs.sql
Normal file
14
IDHANMigration/src/100-jobs.sql
Normal file
@@ -0,0 +1,14 @@
|
||||
CREATE TYPE job_type AS ENUM ('cluster_scan');
|
||||
CREATE TYPE job_status AS ENUM ('PENDING','STARTED', 'FAILED', 'COMPLETED', 'AWAIT_DEPENDENCY');
|
||||
|
||||
CREATE TABLE jobs
|
||||
(
|
||||
job_id UUID PRIMARY KEY NOT NULL,
|
||||
job_type job_type NOT NULL,
|
||||
context_data JSONB NOT NULL,
|
||||
job_data JSONB NOT NULL,
|
||||
job_response JSONB,
|
||||
time_requested TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT now(),
|
||||
time_completed TIMESTAMP WITHOUT TIME ZONE,
|
||||
job_status job_status NOT NULL DEFAULT 'PENDING'
|
||||
);
|
||||
@@ -70,7 +70,7 @@ endforeach ()
|
||||
add_custom_target(IDHANMimeParsers DEPENDS ${PARSER_OUTPUTS})
|
||||
add_dependencies(IDHANServer IDHANMimeParsers)
|
||||
|
||||
set(CONFIG_SOURCE "${CMAKE_CURRENT_SOURCE_DIR}/src/example.toml")
|
||||
set(CONFIG_SOURCE "${CMAKE_CURRENT_SOURCE_DIR}/src/config-example.toml")
|
||||
set(CONFIG_OUTPUT "${CMAKE_BINARY_DIR}/bin/config.toml")
|
||||
|
||||
add_custom_command(
|
||||
|
||||
@@ -10,7 +10,18 @@
|
||||
namespace idhan::config
|
||||
{
|
||||
|
||||
static std::string user_config_path { "" };
|
||||
inline static std::string user_config_path { "" };
|
||||
inline static std::unordered_map< std::pair< std::string, std::string >, std::string > CLI_CONFIG {};
|
||||
|
||||
void addCLIConfig( const std::string_view group, const std::string_view name, const std::string_view value )
|
||||
{
|
||||
CLI_CONFIG.emplace( std::pair< std::string, std::string >( group, name ), value );
|
||||
}
|
||||
|
||||
const char* getCLIConfig( const std::string_view group, const std::string_view name )
|
||||
{
|
||||
return CLI_CONFIG.at( std::pair< std::string, std::string >( group, name ) ).c_str();
|
||||
}
|
||||
|
||||
std::string_view getUserConfigPath()
|
||||
{
|
||||
|
||||
@@ -6,11 +6,22 @@
|
||||
#include <toml++/toml.hpp>
|
||||
|
||||
#include <filesystem>
|
||||
#include <functional>
|
||||
#include <ranges>
|
||||
#include <string>
|
||||
#include <variant>
|
||||
|
||||
#include "logging/log.hpp"
|
||||
|
||||
template <>
|
||||
struct std::hash< std::pair< std::string, std::string > >
|
||||
{
|
||||
std::size_t operator()( const std::pair< std::string, std::string >& p ) const noexcept
|
||||
{
|
||||
return std::hash< std::string > {}( p.first ) ^ ( std::hash< std::string > {}( p.second ) << 1 );
|
||||
}
|
||||
};
|
||||
|
||||
namespace idhan::config
|
||||
{
|
||||
using ConfigType = std::variant< std::string, std::size_t >;
|
||||
@@ -27,8 +38,65 @@ constexpr std::array< std::string_view, 4 > config_paths { "%ProgramData%\\idhan
|
||||
"./config.toml" };
|
||||
#endif
|
||||
|
||||
void addCLIConfig( const std::string_view group, const std::string_view name, const std::string_view value );
|
||||
|
||||
const char* getCLIConfig( const std::string_view group, const std::string_view name );
|
||||
|
||||
std::string_view getUserConfigPath();
|
||||
|
||||
template < typename T >
|
||||
std::optional< T > tryGetEnv( const std::string_view group, const std::string_view name )
|
||||
{
|
||||
auto upper_group { std::string( group ) };
|
||||
std::transform( upper_group.begin(), upper_group.end(), upper_group.begin(), ::toupper );
|
||||
auto upper_name { std::string( name ) };
|
||||
std::transform( upper_name.begin(), upper_name.end(), upper_name.begin(), ::toupper );
|
||||
|
||||
const auto env_name { std::format( "IDHAN_{}_{}", upper_group, upper_name ) };
|
||||
|
||||
if ( const char* value = std::getenv( env_name.data() ); value )
|
||||
{
|
||||
log::info( "Loaded config from env: {}={}", env_name, value );
|
||||
if constexpr ( std::is_same_v< T, std::string > )
|
||||
{
|
||||
return std::string( value );
|
||||
}
|
||||
else if constexpr ( std::is_integral_v< T > )
|
||||
{
|
||||
return std::optional< T >( std::stoll( value ) );
|
||||
}
|
||||
else
|
||||
{
|
||||
static_assert( false, "Invalid type for ENV" );
|
||||
}
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
template < typename T >
|
||||
std::optional< T > tryGetCLI( const std::string_view group, const std::string_view name )
|
||||
{
|
||||
if ( const char* value = getCLIConfig( group, name ) )
|
||||
{
|
||||
log::info( "Loaded config from CLI: {}.{}={}", group, name, value );
|
||||
if constexpr ( std::is_same_v< T, std::string > )
|
||||
{
|
||||
return std::string( value );
|
||||
}
|
||||
else if constexpr ( std::is_integral_v< T > )
|
||||
{
|
||||
return std::optional< T >( std::stoll( value ) );
|
||||
}
|
||||
else
|
||||
{
|
||||
static_assert( false, "Invalid type for CLI" );
|
||||
}
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
template < typename T >
|
||||
std::optional< T > getValue( std::string_view path, const std::string_view group, const std::string_view name )
|
||||
{
|
||||
@@ -69,6 +137,12 @@ std::optional< T > getValue( std::string_view path, const std::string_view group
|
||||
template < typename T >
|
||||
std::optional< T > getValue( const std::string_view group, const std::string_view name )
|
||||
{
|
||||
//TODO: CLI
|
||||
|
||||
// ENV
|
||||
if ( auto result = tryGetEnv< T >( group, name ); result ) return *result;
|
||||
|
||||
// overriden config path
|
||||
const auto user_config_path { getUserConfigPath() };
|
||||
if ( user_config_path.empty() )
|
||||
{
|
||||
@@ -78,6 +152,7 @@ std::optional< T > getValue( const std::string_view group, const std::string_vie
|
||||
}
|
||||
}
|
||||
|
||||
// priority paths
|
||||
for ( const auto& path : config_paths | std::views::reverse )
|
||||
{
|
||||
if ( auto result = getValue< T >( path, group, name ); result )
|
||||
|
||||
@@ -27,7 +27,7 @@ struct ConnectionArguments
|
||||
std::uint16_t port { config::get< std::uint16_t >( "database", "port", IDHAN_DEFAULT_POSTGRES_PORT ) };
|
||||
std::string dbname { config::get< std::string >( "database", "database", "idhan-db" ) };
|
||||
std::string user { config::get< std::string >( "database", "user", "idhan" ) };
|
||||
std::string password { config::get< std::string >( "database", "password", "" ) };
|
||||
std::string password { config::get< std::string >( "database", "password", "idhan" ) };
|
||||
bool testmode { false };
|
||||
//! If true then the server will use stdout to log things.
|
||||
bool use_stdout { true };
|
||||
|
||||
@@ -3,24 +3,12 @@
|
||||
//
|
||||
#pragma once
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Weffc++"
|
||||
#pragma GCC diagnostic ignored "-Wredundant-tags"
|
||||
#pragma GCC diagnostic ignored "-Wcast-qual"
|
||||
#pragma GCC diagnostic ignored "-Wold-style-cast"
|
||||
#pragma GCC diagnostic ignored "-Wnoexcept"
|
||||
#pragma GCC diagnostic ignored "-Wredundant-decls"
|
||||
#pragma GCC diagnostic ignored "-Wuseless-cast"
|
||||
#pragma GCC diagnostic ignored "-Wnoexcept"
|
||||
#pragma GCC diagnostic ignored "-Wswitch-enum"
|
||||
#pragma GCC diagnostic ignored "-Wshadow"
|
||||
#include "drogon/HttpController.h"
|
||||
#pragma GCC diagnostic pop
|
||||
#include <drogon/HttpController.h>
|
||||
|
||||
namespace idhan::api
|
||||
{
|
||||
|
||||
class ImportAPI : public drogon::HttpController< ImportAPI >
|
||||
class ImportAPI final : public drogon::HttpController< ImportAPI >
|
||||
{
|
||||
drogon::Task< drogon::HttpResponsePtr > importFile( drogon::HttpRequestPtr request );
|
||||
|
||||
|
||||
178
IDHANServer/src/api/JobsAPI.cpp
Normal file
178
IDHANServer/src/api/JobsAPI.cpp
Normal file
@@ -0,0 +1,178 @@
|
||||
//
|
||||
// Created by kj16609 on 9/7/25.
|
||||
//
|
||||
|
||||
#include "JobsAPI.hpp"
|
||||
|
||||
#include <trantor/utils/Date.h>
|
||||
|
||||
#include "IDHANTypes.hpp"
|
||||
#include "helpers/createBadRequest.hpp"
|
||||
#include "jobs/JobContext.hpp"
|
||||
|
||||
namespace idhan::api
|
||||
{
|
||||
|
||||
using JobInitFunc = std::function< drogon::Task< drogon::HttpResponsePtr >( drogon::HttpRequestPtr ) >;
|
||||
|
||||
drogon::Task< std::string > createJob( std::string job_type, const Json::Value job_data, drogon::orm::DbClientPtr db )
|
||||
{
|
||||
const auto job_response { co_await db->execSqlCoro(
|
||||
"INSERT INTO jobs (job_id, job_type, job_data) VALUES (gen_random_uuid(), $1, $2) RETURNING job_id",
|
||||
job_type,
|
||||
job_data ) };
|
||||
|
||||
const auto job_id { job_response[ 0 ][ "job_id" ].as< std::string >() };
|
||||
|
||||
co_return job_id;
|
||||
}
|
||||
|
||||
void applyJsonFlag( Json::Value& json, const std::string& name, const bool default_val, drogon::HttpRequestPtr request )
|
||||
{
|
||||
if ( const auto opt = request->getOptionalParameter< bool >( name ); opt )
|
||||
json[ name ] = *opt;
|
||||
else
|
||||
json[ name ] = default_val;
|
||||
}
|
||||
|
||||
drogon::Task< drogon::HttpResponsePtr > startJobClusterScan( drogon::HttpRequestPtr request )
|
||||
{
|
||||
auto db { drogon::app().getDbClient() };
|
||||
|
||||
const auto opt_cluster_id { request->getOptionalParameter< ClusterID >( "cluster_id" ) };
|
||||
|
||||
if ( !opt_cluster_id.has_value() ) co_return createBadRequest( "Must provide cluster_id for job cluster_scan" );
|
||||
|
||||
Json::Value job_json {};
|
||||
job_json[ "cluster_id" ] = *opt_cluster_id;
|
||||
job_json[ "version" ] = 1;
|
||||
|
||||
// Adopts any orphan files found into IDHAN
|
||||
applyJsonFlag( job_json, "adopt_orphans", false, request );
|
||||
|
||||
// Force the hash to be recomputed
|
||||
applyJsonFlag( job_json, "recompute_hash", true, request );
|
||||
|
||||
// Scan mime for files that do not have it
|
||||
applyJsonFlag( job_json, "scan_mime", true, request );
|
||||
|
||||
// Rescan mime (overwrite all previous mime info for files)
|
||||
applyJsonFlag( job_json, "rescan_mime", false, request );
|
||||
|
||||
// Scans for metadata (image resolution, ect)
|
||||
applyJsonFlag( job_json, "scan_metadata", true, request );
|
||||
|
||||
// Rescans metadata (overwrite all previous metadata for files)
|
||||
applyJsonFlag( job_json, "rescan_metadata", false, request );
|
||||
|
||||
Json::Value response_json {};
|
||||
response_json[ "job_id" ] = co_await createJob( "cluster_scan", job_json, db );
|
||||
response_json[ "data" ] = job_json;
|
||||
|
||||
co_return drogon::HttpResponse::newHttpJsonResponse( response_json );
|
||||
}
|
||||
|
||||
inline static std::unordered_map< std::string, JobInitFunc > job_init_funcs { { "cluster_scan", startJobClusterScan } };
|
||||
|
||||
drogon::Task< drogon::HttpResponsePtr > JobsAPI::startJob( const drogon::HttpRequestPtr request )
|
||||
{
|
||||
const auto job_type { request->getOptionalParameter< std::string >( "type" ) };
|
||||
|
||||
if ( !job_type ) co_return createBadRequest( "Must provide job type" );
|
||||
|
||||
auto itter { job_init_funcs.find( *job_type ) };
|
||||
|
||||
if ( itter == job_init_funcs.end() )
|
||||
co_return createBadRequest( "Invalid job type. No internal function for handling job \'{}\'", *job_type );
|
||||
|
||||
auto db { drogon::app().getDbClient() };
|
||||
|
||||
co_return co_await itter->second( request );
|
||||
}
|
||||
|
||||
drogon::Task< Json::Value > getJobJson( const std::string job_id, drogon::orm::DbClientPtr db )
|
||||
{
|
||||
const auto job_result { co_await db->execSqlCoro( "SELECT * FROM jobs WHERE job_id = $1", job_id ) };
|
||||
|
||||
Json::Value json {};
|
||||
|
||||
if ( job_result.empty() )
|
||||
{
|
||||
json[ "job_id" ] = job_id;
|
||||
json[ "status" ] = "not found";
|
||||
co_return json;
|
||||
}
|
||||
|
||||
json[ "job_id" ] = job_id;
|
||||
const auto job_json { job_result[ 0 ][ "job_data" ].as< Json::Value >() };
|
||||
json[ "data" ] = job_json;
|
||||
|
||||
const auto job { job_result[ 0 ] };
|
||||
|
||||
if ( !job[ "time_requested" ].isNull() )
|
||||
{
|
||||
const auto time_requested { trantor::Date::fromDbString( job[ "time_requested" ].as< std::string >() ) };
|
||||
|
||||
json[ "time_requested" ][ "human" ] = job[ "time_requested" ].as< std::string >();
|
||||
json[ "time_requested" ][ "unix" ] = time_requested.secondsSinceEpoch();
|
||||
}
|
||||
else
|
||||
{
|
||||
json[ "time_requested" ] = Json::Value( Json::nullValue );
|
||||
}
|
||||
|
||||
if ( !job[ "time_completed" ].isNull() )
|
||||
{
|
||||
const auto time_requested { trantor::Date::fromDbString( job[ "time_completed" ].as< std::string >() ) };
|
||||
|
||||
json[ "time_completed" ][ "human" ] = job[ "time_completed" ].as< std::string >();
|
||||
json[ "time_completed" ][ "unix" ] = time_requested.secondsSinceEpoch();
|
||||
}
|
||||
else
|
||||
{
|
||||
json[ "time_completed" ] = Json::Value( Json::nullValue );
|
||||
}
|
||||
|
||||
co_return json;
|
||||
}
|
||||
|
||||
Json::Value parseJobRow( const drogon::orm::Row& row )
|
||||
{
|
||||
Json::Value json {};
|
||||
|
||||
return json;
|
||||
}
|
||||
|
||||
drogon::Task< drogon::HttpResponsePtr > getAllJobStatuses( drogon::orm::DbClientPtr db )
|
||||
{
|
||||
const auto jobs { co_await db->execSqlCoro( "SELECT job_id FROM jobs" ) };
|
||||
|
||||
Json::Value json {};
|
||||
json.resize( 0 );
|
||||
|
||||
Json::ArrayIndex index { 0 };
|
||||
for ( const auto& row : jobs )
|
||||
{
|
||||
json[ index ] = co_await getJobJson( row[ "job_id" ].as< std::string >(), db );
|
||||
index++;
|
||||
}
|
||||
co_return drogon::HttpResponse::newHttpJsonResponse( json );
|
||||
}
|
||||
|
||||
drogon::Task< drogon::HttpResponsePtr > getJobStatus( const std::string job_id, drogon::orm::DbClientPtr db )
|
||||
{
|
||||
co_return drogon::HttpResponse::newHttpJsonResponse( co_await getJobJson( job_id, db ) );
|
||||
}
|
||||
|
||||
drogon::Task< drogon::HttpResponsePtr > JobsAPI::jobStatus( const drogon::HttpRequestPtr request )
|
||||
{
|
||||
// if there is no `job_id` parameter, assume we want all the jobs
|
||||
auto db { drogon::app().getDbClient() };
|
||||
const auto job_id { request->getOptionalParameter< std::string >( "job_id" ) };
|
||||
|
||||
if ( job_id ) co_return co_await getJobStatus( *job_id, db );
|
||||
|
||||
co_return co_await getAllJobStatuses( db );
|
||||
}
|
||||
|
||||
} // namespace idhan::api
|
||||
28
IDHANServer/src/api/JobsAPI.hpp
Normal file
28
IDHANServer/src/api/JobsAPI.hpp
Normal file
@@ -0,0 +1,28 @@
|
||||
//
|
||||
// Created by kj16609 on 9/7/25.
|
||||
//
|
||||
#pragma once
|
||||
#include "drogon/HttpController.h"
|
||||
|
||||
namespace idhan::api
|
||||
{
|
||||
|
||||
class JobsAPI final : public drogon::HttpController< JobsAPI >
|
||||
{
|
||||
drogon::Task< drogon::HttpResponsePtr > startJob( drogon::HttpRequestPtr request );
|
||||
|
||||
drogon::Task< drogon::HttpResponsePtr > jobStatus( drogon::HttpRequestPtr request );
|
||||
|
||||
public:
|
||||
|
||||
METHOD_LIST_BEGIN
|
||||
|
||||
ADD_METHOD_TO( JobsAPI::startJob, "/jobs/start" );
|
||||
ADD_METHOD_TO( JobsAPI::jobStatus, "/jobs/status" );
|
||||
|
||||
METHOD_LIST_END
|
||||
};
|
||||
|
||||
|
||||
|
||||
} // namespace idhan::api
|
||||
@@ -47,6 +47,7 @@ std::string ConnectionArguments::format() const
|
||||
str += format_ns::format( "port={} ", port );
|
||||
str += format_ns::format( "dbname={} ", dbname );
|
||||
str += format_ns::format( "user={} ", user );
|
||||
if ( !password.empty() ) str += format_ns::format( "password={} ", password );
|
||||
if ( testmode ) str += "options='-c search_path=test -c client_min_messages=debug1'";
|
||||
|
||||
log::debug( "Connecting using: {}", str );
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
INSERT INTO idhan_info(table_name, last_migration_id, queries)
|
||||
VALUES ('dummy', 100, ARRAY['']::TEXT[]);
|
||||
VALUES ('dummy', 500, ARRAY ['']::TEXT[]);
|
||||
|
||||
88
IDHANServer/src/jobs/ClusterScanJob.cpp
Normal file
88
IDHANServer/src/jobs/ClusterScanJob.cpp
Normal file
@@ -0,0 +1,88 @@
|
||||
//
|
||||
// Created by kj16609 on 9/8/25.
|
||||
//
|
||||
#include "ClusterScanJob.hpp"
|
||||
|
||||
#include "FileScanJob.hpp"
|
||||
#include "JobContext.hpp"
|
||||
#include "filesystem/ClusterManager.hpp"
|
||||
|
||||
namespace idhan::jobs
|
||||
{
|
||||
|
||||
ClusterScanJob::ClusterScanJob( const std::string& job_id, const Json::Value& json ) :
|
||||
JobContext( job_id ),
|
||||
m_scan_mime( false ),
|
||||
m_rescan_mime( false ),
|
||||
m_adopt_orphans( false ),
|
||||
m_scan_metadata( false ),
|
||||
m_rescan_metadata( false ),
|
||||
m_recompute_hash( false ),
|
||||
m_cluster_id( 0 )
|
||||
{
|
||||
//{"version": 1, "scan_mime": true, "cluster_id": 1, "rescan_mime": false, "adopt_orphans": false, "scan_metadata": true, "recompute_hash": true, "rescan_metadata": false}
|
||||
const auto version { json.get( "version", 1 ).asInt() };
|
||||
|
||||
switch ( version )
|
||||
{
|
||||
case 1:
|
||||
m_scan_mime = json.get( "scan_mime", false ).asBool();
|
||||
m_rescan_mime = json.get( "rescan_mime", false ).asBool();
|
||||
m_adopt_orphans = json.get( "adopt_orphans", false ).asBool();
|
||||
m_scan_metadata = json.get( "scan_metadata", false ).asBool();
|
||||
m_rescan_metadata = json.get( "rescan_metadata", false ).asBool();
|
||||
m_recompute_hash = json.get( "recompute_hash", false ).asBool();
|
||||
m_cluster_id = static_cast< ClusterID >( json.get( "cluster_id", 0 ).asInt() );
|
||||
break;
|
||||
default:
|
||||
throw std::runtime_error( "Unsupported version in ClusterScanJob" );
|
||||
}
|
||||
}
|
||||
|
||||
drogon::Task< void > ClusterScanJob::prepare( drogon::orm::DbClientPtr db )
|
||||
{
|
||||
// start by getting every single path for the given cluster id
|
||||
|
||||
const auto cluster_records {
|
||||
co_await db->execSqlCoro( "SELECT record_id FROM file_info WHERE cluster_id = $1", m_cluster_id )
|
||||
};
|
||||
|
||||
Json::Value template_json {};
|
||||
template_json[ "scan_mime" ] = m_scan_mime;
|
||||
template_json[ "rescan_mime" ] = m_rescan_mime;
|
||||
template_json[ "scan_metadata" ] = m_scan_metadata;
|
||||
template_json[ "rescan_metadata" ] = m_rescan_metadata;
|
||||
template_json[ "recompute_hash" ] = m_recompute_hash;
|
||||
|
||||
// Prepare a new job for each cluster id.
|
||||
for ( const auto& row : cluster_records )
|
||||
{
|
||||
const auto record_id { row[ 0 ].as< RecordID >() };
|
||||
|
||||
Json::Value json { template_json };
|
||||
json[ "record_id" ] = record_id;
|
||||
|
||||
const auto job_ctx { co_await createJob< FileScanJob >( json ) };
|
||||
|
||||
co_await this->addDependency( job_ctx, db );
|
||||
}
|
||||
}
|
||||
|
||||
drogon::Task< void > ClusterScanJob::run()
|
||||
{}
|
||||
|
||||
Json::Value ClusterScanJob::serialize()
|
||||
{
|
||||
Json::Value json;
|
||||
json[ "version" ] = 1;
|
||||
json[ "scan_mime" ] = m_scan_mime;
|
||||
json[ "rescan_mime" ] = m_rescan_mime;
|
||||
json[ "adopt_orphans" ] = m_adopt_orphans;
|
||||
json[ "scan_metadata" ] = m_scan_metadata;
|
||||
json[ "rescan_metadata" ] = m_rescan_metadata;
|
||||
json[ "recompute_hash" ] = m_recompute_hash;
|
||||
json[ "cluster_id" ] = static_cast< int >( m_cluster_id );
|
||||
return json;
|
||||
}
|
||||
|
||||
} // namespace idhan::jobs
|
||||
34
IDHANServer/src/jobs/ClusterScanJob.hpp
Normal file
34
IDHANServer/src/jobs/ClusterScanJob.hpp
Normal file
@@ -0,0 +1,34 @@
|
||||
//
|
||||
// Created by kj16609 on 9/8/25.
|
||||
//
|
||||
#pragma once
|
||||
#include "IDHANTypes.hpp"
|
||||
#include "JobContext.hpp"
|
||||
#include "fgl/defines.hpp"
|
||||
|
||||
namespace idhan::jobs
|
||||
{
|
||||
|
||||
class ClusterScanJob final : public JobContext
|
||||
{
|
||||
bool m_scan_mime;
|
||||
bool m_rescan_mime;
|
||||
bool m_adopt_orphans;
|
||||
bool m_scan_metadata;
|
||||
bool m_rescan_metadata;
|
||||
bool m_recompute_hash;
|
||||
ClusterID m_cluster_id;
|
||||
|
||||
public:
|
||||
|
||||
FGL_DELETE_ALL_RO5( ClusterScanJob );
|
||||
|
||||
explicit ClusterScanJob( const std::string& job_id, const Json::Value& json );
|
||||
|
||||
drogon::Task< void > prepare( drogon::orm::DbClientPtr db ) override;
|
||||
drogon::Task< void > run() override;
|
||||
|
||||
Json::Value serialize() override;
|
||||
};
|
||||
|
||||
} // namespace idhan::jobs
|
||||
11
IDHANServer/src/jobs/FileScanJob.cpp
Normal file
11
IDHANServer/src/jobs/FileScanJob.cpp
Normal file
@@ -0,0 +1,11 @@
|
||||
//
|
||||
// Created by kj16609 on 9/8/25.
|
||||
//
|
||||
#include "FileScanJob.hpp"
|
||||
|
||||
namespace idhan::jobs
|
||||
{
|
||||
|
||||
|
||||
|
||||
}
|
||||
18
IDHANServer/src/jobs/FileScanJob.hpp
Normal file
18
IDHANServer/src/jobs/FileScanJob.hpp
Normal file
@@ -0,0 +1,18 @@
|
||||
//
|
||||
// Created by kj16609 on 9/8/25.
|
||||
//
|
||||
#pragma once
|
||||
#include "JobContext.hpp"
|
||||
|
||||
namespace idhan::jobs
|
||||
{
|
||||
|
||||
class FileScanJob final : public JobContext
|
||||
{
|
||||
|
||||
|
||||
|
||||
|
||||
};
|
||||
|
||||
} // namespace idhan::jobs
|
||||
68
IDHANServer/src/jobs/JobContext.cpp
Normal file
68
IDHANServer/src/jobs/JobContext.cpp
Normal file
@@ -0,0 +1,68 @@
|
||||
//
|
||||
// Created by kj16609 on 9/8/25.
|
||||
//
|
||||
|
||||
#include "JobContext.hpp"
|
||||
|
||||
#include "drogon/HttpAppFramework.h"
|
||||
#include "fgl/defines.hpp"
|
||||
|
||||
namespace idhan::jobs
|
||||
{
|
||||
|
||||
drogon::Task<> JobContext::markJobPending( drogon::orm::DbClientPtr db )
|
||||
{
|
||||
co_await db->execSqlCoro( "UPDATE jobs SET status = $1 WHERE job_id = $2", "PENDING", m_job_id );
|
||||
}
|
||||
|
||||
drogon::Task<> JobContext::markJobFailed( drogon::orm::DbClientPtr db )
|
||||
{
|
||||
co_await db->execSqlCoro(
|
||||
"UPDATE jobs SET status = $1, job_respones = $3 WHERE job_id = $2", "FAILED", m_job_id, m_response );
|
||||
}
|
||||
|
||||
drogon::Task<> JobContext::markJobCompleted( drogon::orm::DbClientPtr db )
|
||||
{
|
||||
co_await db->execSqlCoro(
|
||||
"UPDATE jobs SET status = $1, job_response = $3 WHERE job_id = $2", "COMPLETED", m_job_id, m_response );
|
||||
}
|
||||
|
||||
drogon::Task<> JobContext::syncJobStatus( drogon::orm::DbClientPtr db )
|
||||
{
|
||||
switch ( m_job_status )
|
||||
{
|
||||
default:
|
||||
[[fallthrough]];
|
||||
case JobStatus::PENDING:
|
||||
[[fallthrough]];
|
||||
case JobStatus::STARTED:
|
||||
{
|
||||
co_await markJobPending( db );
|
||||
co_return;
|
||||
}
|
||||
case JobStatus::FAILED:
|
||||
{
|
||||
co_await markJobFailed( db );
|
||||
co_return;
|
||||
}
|
||||
case JobStatus::COMPLETED:
|
||||
{
|
||||
co_await markJobCompleted( db );
|
||||
co_return;
|
||||
}
|
||||
}
|
||||
|
||||
FGL_UNREACHABLE();
|
||||
}
|
||||
|
||||
JobContext::~JobContext()
|
||||
{
|
||||
if ( m_job_status != JobStatus::SERIALIZED )
|
||||
{
|
||||
throw std::runtime_error( "JobContext::~JobContext: JobContext was not serialized" );
|
||||
}
|
||||
}
|
||||
|
||||
JobContext::JobContext( const std::string job_id ) : m_job_id( job_id )
|
||||
{}
|
||||
} // namespace idhan::jobs
|
||||
89
IDHANServer/src/jobs/JobContext.hpp
Normal file
89
IDHANServer/src/jobs/JobContext.hpp
Normal file
@@ -0,0 +1,89 @@
|
||||
//
|
||||
// Created by kj16609 on 8/24/25.
|
||||
//
|
||||
#pragma once
|
||||
#include <json/value.h>
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "drogon/HttpAppFramework.h"
|
||||
#include "drogon/orm/DbClient.h"
|
||||
#include "drogon/utils/coroutine.h"
|
||||
#include "fgl/defines.hpp"
|
||||
|
||||
namespace idhan::jobs
|
||||
{
|
||||
|
||||
enum class JobStatus
|
||||
{
|
||||
//! Job is pending a thread to run on
|
||||
PENDING,
|
||||
//! Job starting
|
||||
STARTED,
|
||||
//! Job has failed
|
||||
FAILED,
|
||||
//! Job has finished with no failure
|
||||
COMPLETED,
|
||||
|
||||
//! Job has been serialized to the DB and is pending destruction
|
||||
SERIALIZED
|
||||
};
|
||||
|
||||
class [[nodiscard]] JobContext
|
||||
{
|
||||
std::string m_job_id;
|
||||
JobStatus m_job_status { JobStatus::STARTED };
|
||||
Json::Value m_response;
|
||||
|
||||
protected:
|
||||
|
||||
std::vector< std::string > m_dependencies {};
|
||||
|
||||
public:
|
||||
|
||||
FGL_DELETE_ALL_RO5( JobContext );
|
||||
|
||||
explicit JobContext( std::string job_id );
|
||||
|
||||
//! Overwrites the job_data every time it is called
|
||||
virtual Json::Value serialize() = 0;
|
||||
|
||||
//! Executed after creation of a job,
|
||||
virtual drogon::Task< void > prepare( drogon::orm::DbClientPtr db ) = 0;
|
||||
|
||||
virtual drogon::Task< void > run() = 0;
|
||||
|
||||
drogon::Task<> addDependency( const std::shared_ptr< JobContext >& shared, drogon::orm::DbClientPtr db );
|
||||
|
||||
drogon::Task<> markJobPending( drogon::orm::DbClientPtr db );
|
||||
drogon::Task<> markJobFailed( drogon::orm::DbClientPtr db );
|
||||
drogon::Task<> markJobCompleted( drogon::orm::DbClientPtr db );
|
||||
drogon::Task<> syncJobStatus( drogon::orm::DbClientPtr db );
|
||||
|
||||
virtual ~JobContext();
|
||||
};
|
||||
|
||||
template < typename T >
|
||||
concept is_job = requires( T t, const Json::Value& json ) {
|
||||
{ t.serialize() } -> std::same_as< Json::Value >;
|
||||
{ t.prepare( std::declval< drogon::orm::DbClientPtr >() ) } -> std::same_as< drogon::Task< void > >;
|
||||
{ t.run() } -> std::same_as< drogon::Task< void > >;
|
||||
requires std::derived_from< T, JobContext >;
|
||||
requires std::constructible_from< T, const Json::Value& >;
|
||||
{ T::m_job_name } -> std::same_as< const std::string_view& >;
|
||||
};
|
||||
|
||||
template < typename T >
|
||||
drogon::Task< std::shared_ptr< jobs::JobContext > >
|
||||
createJob( Json::Value json, drogon::orm::DbClientPtr db = drogon::app().getDbClient() )
|
||||
{
|
||||
static_assert( jobs::is_job< T >, "T must satisfy is_job concept" );
|
||||
|
||||
auto job_ptr { std::make_shared< T >( "temp", json ) };
|
||||
|
||||
co_await job_ptr->prepare( db );
|
||||
|
||||
co_return job_ptr;
|
||||
}
|
||||
|
||||
} // namespace idhan::jobs
|
||||
41
IDHANServer/src/jobs/JobRuntime.cpp
Normal file
41
IDHANServer/src/jobs/JobRuntime.cpp
Normal file
@@ -0,0 +1,41 @@
|
||||
//
|
||||
// Created by kj16609 on 8/24/25.
|
||||
//
|
||||
#include "JobRuntime.hpp"
|
||||
|
||||
#include "drogon/HttpAppFramework.h"
|
||||
|
||||
namespace idhan::jobs
|
||||
{
|
||||
|
||||
JobRuntime::WorkerContext::~WorkerContext()
|
||||
{
|
||||
m_thread.request_stop();
|
||||
m_thread.join();
|
||||
}
|
||||
|
||||
void jobManager( std::stop_token token )
|
||||
{
|
||||
auto db { drogon::app().getDbClient() };
|
||||
while ( !token.stop_requested() )
|
||||
{
|
||||
const auto job_response {
|
||||
db->execSqlSync( "SELECT job_id, job_data FROM jobs WHERE job_status = 'PENDING' LIMIT 1" )
|
||||
};
|
||||
|
||||
if ( job_response.empty() )
|
||||
{
|
||||
std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
JobRuntime::JobRuntime( const std::size_t max_active_jobs ) :
|
||||
m_manager_thread( jobManager ),
|
||||
m_max_active_jobs( max_active_jobs )
|
||||
{}
|
||||
|
||||
JobRuntime::~JobRuntime()
|
||||
{}
|
||||
} // namespace idhan::jobs
|
||||
40
IDHANServer/src/jobs/JobRuntime.hpp
Normal file
40
IDHANServer/src/jobs/JobRuntime.hpp
Normal file
@@ -0,0 +1,40 @@
|
||||
//
|
||||
// Created by kj16609 on 8/24/25.
|
||||
//
|
||||
#pragma once
|
||||
#include <json/value.h>
|
||||
|
||||
#include <thread>
|
||||
|
||||
#include "drogon/orm/DbClient.h"
|
||||
#include "drogon/utils/coroutine.h"
|
||||
|
||||
namespace idhan::jobs
|
||||
{
|
||||
|
||||
constexpr auto MAX_JOBS_PER_THREAD { 4 };
|
||||
|
||||
class JobRuntime
|
||||
{
|
||||
struct WorkerContext
|
||||
{
|
||||
std::jthread m_thread;
|
||||
std::counting_semaphore< MAX_JOBS_PER_THREAD > m_semaphore { MAX_JOBS_PER_THREAD };
|
||||
|
||||
~WorkerContext();
|
||||
};
|
||||
|
||||
std::jthread m_manager_thread;
|
||||
std::vector< WorkerContext > m_workers {};
|
||||
std::size_t m_max_active_jobs;
|
||||
|
||||
public:
|
||||
|
||||
explicit JobRuntime( std::size_t max_active_jobs );
|
||||
|
||||
~JobRuntime();
|
||||
};
|
||||
|
||||
drogon::Task< std::string > createJob( std::string job_type, Json::Value job_data, drogon::orm::DbClientPtr db );
|
||||
|
||||
} // namespace idhan::jobs
|
||||
22
IDHANServer/src/jobs/ParseJob.hpp
Normal file
22
IDHANServer/src/jobs/ParseJob.hpp
Normal file
@@ -0,0 +1,22 @@
|
||||
//
|
||||
// Created by kj16609 on 8/24/25.
|
||||
//
|
||||
#pragma once
|
||||
#include "JobContext.hpp"
|
||||
#include "JobRuntime.hpp"
|
||||
|
||||
namespace idhan::jobs
|
||||
{
|
||||
|
||||
class ParseJob final : public JobContext
|
||||
{
|
||||
public:
|
||||
|
||||
drogon::Task< void > prepare( drogon::orm::DbClientPtr db ) override;
|
||||
drogon::Task< void > run() override;
|
||||
|
||||
Json::Value serialize() override;
|
||||
void deserialize( const Json::Value& ) override;
|
||||
};
|
||||
|
||||
} // namespace idhan::jobs
|
||||
@@ -10,8 +10,23 @@
|
||||
#include "ServerContext.hpp"
|
||||
#include "logging/log.hpp"
|
||||
|
||||
void applyCLISettings(
|
||||
const std::string_view group,
|
||||
const std::string_view name,
|
||||
const QCommandLineParser& parser,
|
||||
const QCommandLineOption& pg_host )
|
||||
{
|
||||
using namespace idhan::config;
|
||||
if ( parser.isSet( pg_host ) )
|
||||
{
|
||||
addCLIConfig( group, name, parser.value( pg_host ).toStdString() );
|
||||
}
|
||||
}
|
||||
|
||||
int main( int argc, char** argv )
|
||||
{
|
||||
using namespace idhan;
|
||||
|
||||
QCommandLineParser parser {};
|
||||
parser.addHelpOption();
|
||||
parser.addVersionOption();
|
||||
@@ -48,9 +63,9 @@ int main( int argc, char** argv )
|
||||
|
||||
parser.process( app );
|
||||
|
||||
applyCLISettings( "database", "hostname", parser, pg_host );
|
||||
|
||||
idhan::ConnectionArguments arguments {};
|
||||
arguments.user = parser.value( pg_user ).toStdString();
|
||||
arguments.hostname = parser.value( pg_host ).toStdString();
|
||||
|
||||
if ( parser.isSet( config_location ) )
|
||||
{
|
||||
|
||||
@@ -1,18 +1,45 @@
|
||||
# Config file location
|
||||
|
||||
- IDHAN will look for a config file next to it's executable first (`./config.toml`)
|
||||
- If one cannot be found there it will look for one in `~/.config/idhan/config.toml`, You can also pass the config to executable (see launch options)
|
||||
- If one cannot be found there it will look for one in `~/.config/idhan/config.toml`, You can also pass the config to
|
||||
executable (see launch options)
|
||||
- IDHAN will only create a config file if it can't find one
|
||||
|
||||
# Launch options
|
||||
|
||||
- `-h` `--help`: Self explanitory
|
||||
- `--testmode`: Forces postgresql db schema to use the schema `test` instead of `public` (Used for running automated tests)
|
||||
- `--testmode`: Forces postgresql db schema to use the schema `test` instead of `public` (Used for running automated
|
||||
tests)
|
||||
- `--use_stdout`: Enables the logger to output to stdout (Useful for preventing test output clutter)
|
||||
- `--config <PATH>` Overrides the config location. IDHAN will not load configs from other locations.
|
||||
- `--pg_user` Specifies a postgresql users to use (Overrides the config file)
|
||||
- `--pg_host` Specifies a hostname for the PG server to be found at (Overrides the config file)
|
||||
|
||||
# Config order
|
||||
|
||||
IDHAN will search for config information in top-to-bottom order.
|
||||
|
||||
All config options can be provided in ENV variables if they are in the toml, the format is `IDHAN_$(GROUP)_$(NAME)`, `IDHAN\_` is
|
||||
use to prevent accidental environment collisions
|
||||
|
||||
## Linux
|
||||
|
||||
- CLI
|
||||
- ENV
|
||||
- `./config.toml`
|
||||
- `~/.config/idhan/config.toml`
|
||||
- `/etc/idhan/config.toml`
|
||||
- `/usr/share/idhan/config.toml`
|
||||
|
||||
## Windows
|
||||
|
||||
- CLI
|
||||
- ENV
|
||||
- `./config.toml`
|
||||
- `%LOCALAPPDATA%\idhan\config.toml`
|
||||
- `%APPDATA%\idhan\config.toml`
|
||||
- `%PROGRAMDATA%\idhan\config.toml`
|
||||
|
||||
# Config options (`config.toml`)
|
||||
|
||||
The following is a example config file to use
|
||||
|
||||
43
docs/jobs.md
Normal file
43
docs/jobs.md
Normal file
@@ -0,0 +1,43 @@
|
||||
# Table of Contents
|
||||
|
||||
- [Job types](#job-types)
|
||||
- [Cluster Scan](#cluster-scan)
|
||||
|
||||
# Job types
|
||||
|
||||
## Cluster Scan
|
||||
|
||||
Example: `$url/jobs/start?type=cluster_scan&cluster_id=1`
|
||||
|
||||
The `cluster_scan` job type is responsible for scanning and analyzing cluster data.
|
||||
|
||||
Scans a given cluster for files that are missing or corrupted.
|
||||
|
||||
# Job Statuses
|
||||
|
||||
- Pending: The job has not been started but the information is stored in the table
|
||||
- Started: The job has started execution, If a job supports being canceled then it can return to `PENDING` if it's
|
||||
paused.
|
||||
- Completed: The job has finished execution
|
||||
- Failed: The job has failed to execute
|
||||
- Await Depencency: The job is waiting on another job to complete before starting.
|
||||
|
||||
# Internals
|
||||
|
||||
When a job is created, It will begin with preparing the context and send it to the DB. From there a worker will take
|
||||
jobs
|
||||
from the `jobs` table and execute them given a few rules.
|
||||
|
||||
A job can have a dependency on another job being completed.
|
||||
|
||||
- A job is requsted
|
||||
- The job context is created with the information required
|
||||
- `prepare()` is called, This will create any dependency jobs and prepare the context further.
|
||||
|
||||
# Job Dependencies
|
||||
|
||||
Job dependencies are used to break up large jobs into smaller jobs that can be completed in parallel on multiple
|
||||
threads.
|
||||
One example of this is a cluster scan job. The cluster scan job will start scan jobs on every file in the cluster.
|
||||
Which also have dependencies such as mime scanning and mime parsing (metadata)
|
||||
The cluster scan job will not report completed until all other jobs have completed.
|
||||
Reference in New Issue
Block a user