Skip to main content

RBAC & Entitlement Integration

Overview

The Tellus EHS platform uses a two-layer access control system:

  1. Company-Level Entitlements: What features/capabilities the company's plan provides
  2. User-Level Permissions: What actions individual users can perform

Access is granted only when BOTH layers allow it.

✅ Access Granted = (Company Has Entitlement) AND (User Has Permission)

Data Model

Layer 1: Company-Level Entitlements

-- What the company's plan allows
plan_entitlements (
plan_entitlement_id UUID,
plan_version_id UUID,
entitlement_id UUID, -- References entitlement_definitions
module_id UUID, -- Which module (CHEMIQ, PLAN, etc.)
feature_enabled BOOLEAN,
limit_value BIGINT, -- NULL = unlimited
feature_level TEXT, -- 'basic', 'standard', 'advanced'
is_selectable BOOLEAN
)

-- Company-specific overrides to plan defaults
company_entitlement_overrides (
override_id UUID,
company_id UUID,
entitlement_id UUID,
feature_enabled BOOLEAN,
limit_value BIGINT,
reason TEXT
)

-- Entitlement definitions
entitlement_definitions (
entitlement_id UUID,
code TEXT, -- 'CHEMIQ_SDS_BINDER_BULK_UPLOAD' or 'LIMIT_SDS_UPLOADS'
name TEXT,
type TEXT, -- 'feature' or 'limit'
unit TEXT, -- 'count', 'per_month', etc.
description TEXT
)

Layer 2: User-Level Permissions

-- System-wide role templates
system_roles (
system_role_id UUID,
code TEXT, -- 'ADMIN', 'MANAGER', 'EMPLOYEE', etc.
display_name TEXT,
description TEXT
)

-- Permissions (atomic actions)
permissions (
permission_id UUID,
code TEXT, -- 'sds:upload', 'inventory:edit', etc.
description TEXT
)

-- What permissions each system role has
system_role_permissions (
system_role_id UUID,
permission_id UUID
)

-- Company-specific roles (can extend system roles)
company_roles (
company_role_id UUID,
company_id UUID,
base_system_role_id UUID, -- Optional: inherit from system role
display_name TEXT,
is_active BOOLEAN
)

-- Custom permissions for company roles
company_role_permissions (
company_role_id UUID,
permission_id UUID
)

-- User-to-role assignments
company_user_roles (
company_id UUID,
user_id UUID,
company_role_id UUID
)

Built-in System Roles

From init_database.sql:

Role CodeDisplay NameDescriptionExample Permissions
ADMINAdministratorFull system access and company managementALL permissions
MANAGERManagerSite-level management and oversightMost permissions except user:delete, role:create, role:edit
COORDINATORProgram CoordinatorHazCom program coordinationsds:, plan:, training:view/assign, inventory:*
TRAINERTrainerTraining content creationtraining:*, sds:view, plan:view
EMPLOYEEEmployeeBasic accesssds:view, training:view, inventory:view
VIEWERViewerRead-only access*:view permissions only

Sample Permissions

From init_database.sql:

Authentication & User Management

  • auth:login - User can log in
  • auth:mfa - Configure MFA
  • user:view - View users
  • user:create - Create new users
  • user:edit - Edit users
  • user:delete - Delete users
  • user:invite - Invite new users

Role Management

  • role:view - View roles
  • role:create - Create new roles
  • role:edit - Edit roles

Module-Specific Permissions

  • sds:view, sds:upload, sds:edit, sds:delete
  • inventory:view, inventory:edit
  • training:view, training:create, training:assign
  • plan:view, plan:edit, plan:publish

System

  • company:view, company:edit
  • audit:view

How to Expand Permissions for Entitlements

To support the granular entitlements from tabular.csv, we need to expand the permissions table with feature/capability-level permissions.

Naming Convention for Permissions

{module}:{feature}:{action}:{capability}

Examples:

Entitlement Code (Company-Level)Permission Code (User-Level)Who Has It?
CHEMIQ_SDS_BINDER_BULK_UPLOADchemiq:sds:upload:bulkADMIN, MANAGER, COORDINATOR
CHEMIQ_SDS_BINDER_AI_EXTRACTchemiq:sds:upload:ai_extractADMIN, MANAGER, COORDINATOR
CHEMIQ_INVENTORY_BARCODE_SCANchemiq:inventory:edit:barcodeADMIN, MANAGER, COORDINATOR, EMPLOYEE
PLAN_BUILDER_PUBLISHplan:builder:publish:*ADMIN, MANAGER, COORDINATOR
LABELS_PRINT_QRlabels:print:*:qrADMIN, MANAGER, COORDINATOR, EMPLOYEE
SAFEPATH_TRAINING_ASSIGNsafepath:training:assign:*ADMIN, MANAGER, TRAINER
INCIDENTIQ_INCIDENTS_REPORTincidentiq:incidents:create:*ADMIN, MANAGER, COORDINATOR, EMPLOYEE

For easier management, use a two-tier permission system:

{module}:{action}

Then map entitlements to permissions:

Permission CodeMaps to EntitlementStarterStandardPro
chemiq:sds_bulk_uploadCHEMIQ_SDS_BINDER_BULK_UPLOAD
chemiq:sds_ai_extractCHEMIQ_SDS_BINDER_AI_EXTRACT
chemiq:inventory_barcodeCHEMIQ_INVENTORY_BARCODE_SCAN
plan:publishPLAN_BUILDER_PUBLISH

This allows:

  • Permissions control who can do something (role-based)
  • Entitlements control what is available (plan-based)

Implementation: Combined Access Check

Backend Service

# app/services/access_control_service.py

from sqlalchemy.orm import Session
from typing import Optional, List
from uuid import UUID

from app.db.models.entitlement import EntitlementDefinition, PlanEntitlement, CompanyEntitlementOverride
from app.db.models.user import User, CompanyUserRole
from app.db.models.rbac import Permission, CompanyRolePermission, SystemRolePermission


class AccessControlService:
"""Combined entitlement + permission checking"""

def __init__(self, db: Session):
self.db = db

def check_access(
self,
user_id: UUID,
company_id: UUID,
entitlement_code: str,
permission_code: str
) -> dict:
"""
Check if user has access to a feature/capability.

Returns:
{
"allowed": bool,
"reason": str,
"missing_entitlement": bool,
"missing_permission": bool
}
"""
# Layer 1: Check company entitlement
has_entitlement = self._check_company_entitlement(company_id, entitlement_code)

# Layer 2: Check user permission
has_permission = self._check_user_permission(user_id, company_id, permission_code)

# Both must be true
allowed = has_entitlement and has_permission

# Determine reason
if allowed:
reason = "Access granted"
elif not has_entitlement and not has_permission:
reason = "Plan does not include this feature and user lacks permission"
elif not has_entitlement:
reason = f"Plan does not include {entitlement_code}. Upgrade to access this feature."
else: # not has_permission
reason = f"User lacks required permission: {permission_code}"

return {
"allowed": allowed,
"reason": reason,
"missing_entitlement": not has_entitlement,
"missing_permission": not has_permission
}

def _check_company_entitlement(self, company_id: UUID, entitlement_code: str) -> bool:
"""Check if company has the entitlement via plan or override"""

# Check for company-specific override first
override = (
self.db.query(CompanyEntitlementOverride)
.join(EntitlementDefinition)
.filter(
CompanyEntitlementOverride.company_id == company_id,
EntitlementDefinition.code == entitlement_code
)
.first()
)

if override:
return override.feature_enabled

# Check plan entitlement
from app.db.models.company import Company
from app.db.models.plan import PlanVersion

company = self.db.query(Company).filter(Company.company_id == company_id).first()
if not company or not company.plan_version_id:
return False

entitlement = (
self.db.query(PlanEntitlement)
.join(EntitlementDefinition)
.filter(
PlanEntitlement.plan_version_id == company.plan_version_id,
EntitlementDefinition.code == entitlement_code,
PlanEntitlement.feature_enabled == True
)
.first()
)

return entitlement is not None

def _check_user_permission(self, user_id: UUID, company_id: UUID, permission_code: str) -> bool:
"""Check if user has the permission via their role(s)"""

# Get all roles for this user in this company
user_roles = (
self.db.query(CompanyUserRole)
.filter(
CompanyUserRole.user_id == user_id,
CompanyUserRole.company_id == company_id
)
.all()
)

if not user_roles:
return False

# Check each role's permissions
for user_role in user_roles:
# Check company-specific role permissions
has_perm = (
self.db.query(CompanyRolePermission)
.join(Permission)
.filter(
CompanyRolePermission.company_role_id == user_role.company_role_id,
Permission.code == permission_code
)
.first()
)

if has_perm:
return True

# Check system role permissions (if role is based on system role)
from app.db.models.rbac import CompanyRole
company_role = self.db.query(CompanyRole).filter(
CompanyRole.company_role_id == user_role.company_role_id
).first()

if company_role and company_role.base_system_role_id:
has_system_perm = (
self.db.query(SystemRolePermission)
.join(Permission)
.filter(
SystemRolePermission.system_role_id == company_role.base_system_role_id,
Permission.code == permission_code
)
.first()
)

if has_system_perm:
return True

return False

def get_user_permissions(self, user_id: UUID, company_id: UUID) -> List[str]:
"""Get all permission codes for a user in a company"""
permissions = set()

user_roles = (
self.db.query(CompanyUserRole)
.filter(
CompanyUserRole.user_id == user_id,
CompanyUserRole.company_id == company_id
)
.all()
)

for user_role in user_roles:
# Company role permissions
company_perms = (
self.db.query(Permission.code)
.join(CompanyRolePermission)
.filter(CompanyRolePermission.company_role_id == user_role.company_role_id)
.all()
)
permissions.update([p[0] for p in company_perms])

# System role permissions
from app.db.models.rbac import CompanyRole
company_role = self.db.query(CompanyRole).filter(
CompanyRole.company_role_id == user_role.company_role_id
).first()

if company_role and company_role.base_system_role_id:
system_perms = (
self.db.query(Permission.code)
.join(SystemRolePermission)
.filter(SystemRolePermission.system_role_id == company_role.base_system_role_id)
.all()
)
permissions.update([p[0] for p in system_perms])

return list(permissions)

Combined Decorator

# app/api/dependencies/access.py

from functools import wraps
from fastapi import HTTPException, Depends
from sqlalchemy.orm import Session

from app.services.access_control_service import AccessControlService
from app.api.dependencies.auth import get_current_user
from app.api.dependencies.db import get_db


def require_access(entitlement_code: str, permission_code: str):
"""
Decorator that checks both entitlement and permission.

Usage:
@router.post("/sds/bulk-upload")
@require_access("CHEMIQ_SDS_BINDER_BULK_UPLOAD", "chemiq:sds_bulk_upload")
async def bulk_upload_sds(...):
pass
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Extract dependencies
user = kwargs.get('current_user') or next(
(arg for arg in args if hasattr(arg, 'user_id')), None
)
db = kwargs.get('db') or next(
(arg for arg in args if isinstance(arg, Session)), None
)
company_id = kwargs.get('company_id')

if not all([user, db, company_id]):
raise HTTPException(
status_code=500,
detail="Missing required dependencies for access check"
)

# Check access
access_service = AccessControlService(db)
result = access_service.check_access(
user.user_id,
company_id,
entitlement_code,
permission_code
)

if not result["allowed"]:
if result["missing_entitlement"]:
# Plan limitation
raise HTTPException(
status_code=402, # Payment Required
detail=result["reason"],
headers={
"X-Upgrade-Required": "true",
"X-Missing-Entitlement": entitlement_code
}
)
else:
# Permission denied
raise HTTPException(
status_code=403, # Forbidden
detail=result["reason"],
headers={
"X-Missing-Permission": permission_code
}
)

return await func(*args, **kwargs)

return wrapper
return decorator

API Route Example

# app/api/v1/endpoints/sds.py

from fastapi import APIRouter, Depends, UploadFile, File
from sqlalchemy.orm import Session
from uuid import UUID

from app.api.dependencies.db import get_db
from app.api.dependencies.auth import get_current_user
from app.api.dependencies.access import require_access
from app.db.models.user import User

router = APIRouter()


@router.post("/sds/bulk-upload")
@require_access("CHEMIQ_SDS_BINDER_BULK_UPLOAD", "chemiq:sds_bulk_upload")
async def bulk_upload_sds(
files: list[UploadFile] = File(...),
company_id: UUID = None,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
Bulk upload SDS files.

Requires:
- Company plan: Standard or Pro (has CHEMIQ_SDS_BINDER_BULK_UPLOAD)
- User role: ADMIN, MANAGER, or COORDINATOR (has chemiq:sds_bulk_upload permission)
"""
# Implementation...
pass


@router.post("/sds/upload")
@require_access("CHEMIQ_SDS_BINDER_UPLOAD", "chemiq:sds_upload")
async def upload_sds(
file: UploadFile = File(...),
company_id: UUID = None,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
Upload single SDS file.

Requires:
- Company plan: Any plan (Starter/Standard/Pro - all have basic upload)
- User role: ADMIN, MANAGER, COORDINATOR (has chemiq:sds_upload permission)
"""
# Implementation...
pass

Frontend Integration

TypeScript Service

// src/services/accessControl.ts

export interface AccessCheck {
allowed: boolean;
reason: string;
missingEntitlement: boolean;
missingPermission: boolean;
}

export const accessControlService = {
async checkAccess(
entitlementCode: string,
permissionCode: string
): Promise<AccessCheck> {
const response = await api.get('/api/v1/access/check', {
params: { entitlement: entitlementCode, permission: permissionCode }
});
return response.data;
},

async getUserPermissions(): Promise<string[]> {
const response = await api.get('/api/v1/access/permissions');
return response.data.permissions;
}
};

React Hook

// src/hooks/useAccess.ts

import { useState, useEffect } from 'react';
import { accessControlService } from '../services/accessControl';

export function useAccess(entitlementCode: string, permissionCode: string) {
const [canAccess, setCanAccess] = useState(false);
const [isLoading, setIsLoading] = useState(true);
const [reason, setReason] = useState('');

useEffect(() => {
async function checkAccess() {
setIsLoading(true);
try {
const result = await accessControlService.checkAccess(
entitlementCode,
permissionCode
);
setCanAccess(result.allowed);
setReason(result.reason);
} catch (error) {
setCanAccess(false);
setReason('Error checking access');
} finally {
setIsLoading(false);
}
}
checkAccess();
}, [entitlementCode, permissionCode]);

return { canAccess, isLoading, reason };
}

Component Example

// src/components/SDSUploadButton.tsx

import { useAccess } from '../hooks/useAccess';

export function SDSUploadButton() {
const { canAccess, isLoading, reason } = useAccess(
'CHEMIQ_SDS_BINDER_BULK_UPLOAD',
'chemiq:sds_bulk_upload'
);

if (isLoading) {
return <button disabled>Loading...</button>;
}

if (!canAccess) {
return (
<Tooltip content={reason}>
<button disabled className="opacity-50 cursor-not-allowed">
Bulk Upload SDS
</button>
</Tooltip>
);
}

return (
<button onClick={handleBulkUpload} className="btn-primary">
Bulk Upload SDS
</button>
);
}

Example Scenarios

Scenario 1: Bulk SDS Upload

Company: Acme Corp (Standard Plan) User: John (COORDINATOR role)

CheckResult
Company has CHEMIQ_SDS_BINDER_BULK_UPLOAD?✅ YES (Standard plan includes it)
John has chemiq:sds_bulk_upload permission?✅ YES (COORDINATOR role includes it)
Access Granted?YES

Scenario 2: Bulk SDS Upload (Starter Plan)

Company: Small Shop Inc (Starter Plan) User: Sarah (ADMIN role)

CheckResult
Company has CHEMIQ_SDS_BINDER_BULK_UPLOAD?❌ NO (Starter plan doesn't include it)
Sarah has chemiq:sds_bulk_upload permission?✅ YES (ADMIN has all permissions)
Access Granted?NO - HTTP 402 "Plan does not include CHEMIQ_SDS_BINDER_BULK_UPLOAD. Upgrade to access this feature."

Scenario 3: Basic SDS Upload

Company: Small Shop Inc (Starter Plan) User: Bob (EMPLOYEE role)

CheckResult
Company has CHEMIQ_SDS_BINDER_UPLOAD?✅ YES (All plans include basic upload)
Bob has chemiq:sds_upload permission?❌ NO (EMPLOYEE can only view, not upload)
Access Granted?NO - HTTP 403 "User lacks required permission: chemiq:sds_upload"

Scenario 4: View SDS

Company: Any Company (Any Plan) User: Any User (EMPLOYEE role)

CheckResult
Company has CHEMIQ_SDS_BINDER_VIEW?✅ YES (All plans include viewing)
User has chemiq:sds_view permission?✅ YES (EMPLOYEE can view)
Access Granted?YES

Migration Strategy

Step 1: Seed Permissions Table

Create migration to add feature/capability-level permissions:

# alembic/versions/xxxx_seed_feature_permissions.py

def upgrade():
# CHEMIQ permissions
op.execute("""
INSERT INTO permissions (code, description) VALUES
('chemiq:sds_upload', 'Upload single SDS document'),
('chemiq:sds_bulk_upload', 'Bulk upload SDS documents'),
('chemiq:sds_ai_extract', 'Use AI to extract SDS metadata'),
('chemiq:inventory_barcode', 'Use barcode scanning for inventory'),
('chemiq:inventory_analytics', 'View inventory analytics'),

('plan:builder_create', 'Create HazCom plans'),
('plan:builder_publish', 'Publish HazCom plans'),

('labels:print_basic', 'Print basic labels'),
('labels:print_qr', 'Print labels with QR codes'),

('safepath:training_assign', 'Assign training to users'),
('safepath:training_create', 'Create training content'),

('incidentiq:incidents_report', 'Report incidents'),
('incidentiq:incidents_investigate', 'Investigate incidents')
""")

Step 2: Map Permissions to Roles

def upgrade():
# ADMIN gets all new permissions
op.execute("""
INSERT INTO system_role_permissions (system_role_id, permission_id)
SELECT sr.system_role_id, p.permission_id
FROM system_roles sr
CROSS JOIN permissions p
WHERE sr.code = 'ADMIN'
AND p.code LIKE 'chemiq:%' OR p.code LIKE 'plan:%' OR p.code LIKE 'labels:%'
""")

# COORDINATOR gets most operational permissions
op.execute("""
INSERT INTO system_role_permissions (system_role_id, permission_id)
SELECT sr.system_role_id, p.permission_id
FROM system_roles sr
CROSS JOIN permissions p
WHERE sr.code = 'COORDINATOR'
AND p.code IN (
'chemiq:sds_upload', 'chemiq:sds_bulk_upload', 'chemiq:inventory_barcode',
'plan:builder_create', 'plan:builder_publish',
'labels:print_basic', 'labels:print_qr',
'incidentiq:incidents_report'
)
""")

# EMPLOYEE gets basic permissions
op.execute("""
INSERT INTO system_role_permissions (system_role_id, permission_id)
SELECT sr.system_role_id, p.permission_id
FROM system_roles sr
CROSS JOIN permissions p
WHERE sr.code = 'EMPLOYEE'
AND p.code IN (
'incidentiq:incidents_report',
'labels:print_basic'
)
""")

Summary

The Data Model DOES Support This

Yes, the existing data model fully supports role-based user access control:

  • system_roles & company_roles define user roles
  • permissions define atomic actions
  • system_role_permissions & company_role_permissions map roles to permissions
  • company_user_roles assigns roles to users

How It Works

  1. Entitlements (company-level) control what features are available
  2. Permissions (user-level) control who can use those features
  3. Access is granted only when both layers allow it

Key Differences

LayerControlsManaged ByExample
EntitlementsWhat the company hasSales/Billing"Acme Corp has bulk upload feature"
PermissionsWhat users can doAdmins/Managers"John can upload SDS documents"

Response Codes

  • HTTP 402 Payment Required: Company plan doesn't include the feature (upgrade needed)
  • HTTP 403 Forbidden: User role doesn't have the permission (role change needed)

Next Steps

  1. ✅ Expand permissions table with feature/capability-level permissions
  2. ✅ Map permissions to system roles (ADMIN, MANAGER, COORDINATOR, EMPLOYEE, etc.)
  3. ✅ Create AccessControlService with combined checking
  4. ✅ Add @require_access decorator for API routes
  5. ✅ Build frontend hooks for access checking
  6. ✅ Update UI to hide/disable features based on access

This creates a clean separation:

  • Entitlements = "What can this company do?" (plan-tier dependent)
  • Permissions = "What can this user do?" (role dependent)