Skip to main content

SafePath Training — Phase 1E: Dashboard, Reports & Training Matrix

Overview

This document covers the analytics, reporting, and compliance visibility layer:

  • Dashboard service: Role-appropriate widgets (employee, manager, trainer, admin)
  • Report service: Compliance reports (completion summary, individual transcript, certification status, OSHA audit package)
  • Training matrix service: Roles x courses grid with Required/Completed/Overdue/N/A cells
  • Export functionality: CSV, PDF, and Excel exports

Prerequisite: Phase 1C (assignment & delivery) must be complete. Phase 1D (certifications) is recommended for certification dashboard widgets but not strictly required.

Files to create:

  • tellus-ehs-hazcom-service/app/schemas/safepath/dashboard.py
  • tellus-ehs-hazcom-service/app/schemas/safepath/matrix.py
  • tellus-ehs-hazcom-service/app/services/safepath/dashboard_service.py
  • tellus-ehs-hazcom-service/app/services/safepath/matrix_service.py
  • tellus-ehs-hazcom-service/app/services/safepath/report_service.py
  • tellus-ehs-hazcom-service/app/api/v1/safepath/dashboard.py
  • tellus-ehs-hazcom-service/app/api/v1/safepath/matrix.py

Pydantic Schemas

Dashboard Schemas

File: tellus-ehs-hazcom-service/app/schemas/safepath/dashboard.py

"""SafePath Dashboard & Reporting Schemas"""

from datetime import date, datetime
from typing import Optional, List, Dict
from uuid import UUID
from pydantic import BaseModel, Field, ConfigDict


# ============================================================================
# Summary Stats (shared across roles)
# ============================================================================

class AssignmentStatusCounts(BaseModel):
"""Counts of assignments by status."""
pending: int = 0
in_progress: int = 0
completed: int = 0
overdue: int = 0
expired: int = 0
total: int = 0


class CompletionRate(BaseModel):
"""Completion rate for a time period."""
completed: int
total: int
rate_percent: float = Field(description="0-100 scale")


# ============================================================================
# Employee Dashboard
# ============================================================================

class MyAssignmentItem(BaseModel):
"""A single assignment for the current user's dashboard."""
assignment_id: UUID
course_title: str
course_id: UUID
due_date: date
priority: str
status: str
progress_percent: float = 0.0
last_activity: Optional[datetime] = None


class MyCertificationItem(BaseModel):
"""A single certification for the current user's dashboard."""
certification_id: UUID
certification_type_name: str
status: str
expiration_date: Optional[date] = None
days_until_expiry: Optional[int] = None


class EmployeeDashboard(BaseModel):
"""Dashboard for individual employees/learners."""
my_assignments: AssignmentStatusCounts
upcoming_due: List[MyAssignmentItem] = Field(
description="Assignments due within next 30 days, sorted by due_date"
)
overdue_assignments: List[MyAssignmentItem] = Field(
description="Overdue assignments requiring immediate action"
)
recent_completions: List[MyAssignmentItem] = Field(
description="Last 5 completed assignments"
)
my_certifications_expiring: List[MyCertificationItem] = Field(
description="Certifications expiring within 90 days"
)
total_hours_completed: float = Field(
description="Total training hours completed (from duration_seconds)"
)


# ============================================================================
# Manager / Admin Dashboard
# ============================================================================

class TeamComplianceItem(BaseModel):
"""Compliance summary for a single team member."""
user_id: UUID
employee_name: str
employee_email: str
total_assigned: int
completed: int
overdue: int
in_progress: int
compliance_percent: float


class CourseCompletionItem(BaseModel):
"""Completion stats for a single course."""
course_id: UUID
course_title: str
category_name: Optional[str] = None
total_assigned: int
completed: int
in_progress: int
overdue: int
completion_rate_percent: float
avg_score_percent: Optional[float] = None


class SiteComplianceItem(BaseModel):
"""Compliance summary for a site."""
site_id: UUID
site_name: str
total_employees: int
total_assigned: int
completed: int
overdue: int
compliance_percent: float


class OverdueAlertItem(BaseModel):
"""Overdue assignment alert for the dashboard."""
assignment_id: UUID
employee_name: str
employee_email: str
course_title: str
due_date: date
days_overdue: int
priority: str


class AdminDashboard(BaseModel):
"""Dashboard for managers, trainers, and admins."""
company_summary: AssignmentStatusCounts
completion_rate_30d: CompletionRate
completion_rate_90d: CompletionRate
team_compliance: List[TeamComplianceItem] = Field(
description="Per-employee compliance, sorted by compliance_percent ASC (worst first)"
)
course_completions: List[CourseCompletionItem] = Field(
description="Per-course completion stats"
)
site_compliance: List[SiteComplianceItem] = Field(
description="Per-site compliance stats"
)
overdue_alerts: List[OverdueAlertItem] = Field(
description="Top 20 most overdue assignments"
)
certifications_expiring_30d: int = 0
certifications_expired: int = 0


# ============================================================================
# Report Schemas
# ============================================================================

class CompletionReportRow(BaseModel):
"""A row in the completion summary report."""
employee_name: str
employee_email: str
site_name: Optional[str] = None
course_title: str
category_name: Optional[str] = None
assignment_date: date
due_date: date
completion_date: Optional[date] = None
status: str
score_percent: Optional[float] = None
passed: Optional[bool] = None
delivery_method: str
instructor_name: Optional[str] = None


class CompletionReport(BaseModel):
"""Completion summary report."""
report_title: str = "Training Completion Report"
generated_at: datetime
company_name: str
filter_description: str = Field(
description="Human-readable description of applied filters"
)
rows: List[CompletionReportRow]
total_rows: int
summary: AssignmentStatusCounts


class IndividualTranscriptRow(BaseModel):
"""A single training record in an individual's transcript."""
course_title: str
category_name: Optional[str] = None
completion_date: Optional[date] = None
score_percent: Optional[float] = None
passed: Optional[bool] = None
delivery_method: str
certificate_number: Optional[str] = None
certification_expiration: Optional[date] = None


class IndividualTranscript(BaseModel):
"""Individual employee training transcript."""
employee_name: str
employee_email: str
employee_id: UUID
company_name: str
generated_at: datetime
records: List[IndividualTranscriptRow]
total_completed: int
total_in_progress: int
total_hours: float
certifications_active: int
certifications_expiring: int


class CertificationStatusRow(BaseModel):
"""A row in the certification status report."""
employee_name: str
employee_email: str
site_name: Optional[str] = None
certification_type: str
osha_standard_ref: Optional[str] = None
issue_date: date
expiration_date: Optional[date] = None
status: str
days_until_expiry: Optional[int] = None
source: str


class CertificationStatusReport(BaseModel):
"""Certification status report."""
report_title: str = "Certification Status Report"
generated_at: datetime
company_name: str
rows: List[CertificationStatusRow]
total_active: int
total_expiring_soon: int
total_expired: int


class ReportExportRequest(BaseModel):
"""Request to export a report."""
report_type: str = Field(
...,
description="completion_summary, individual_transcript, certification_status",
pattern="^(completion_summary|individual_transcript|certification_status)$",
)
format: str = Field(
default="csv",
pattern="^(csv|pdf|xlsx)$",
)
# Optional filters
site_id: Optional[UUID] = None
course_id: Optional[UUID] = None
user_id: Optional[UUID] = None
status: Optional[str] = None
date_from: Optional[date] = None
date_to: Optional[date] = None

Training Matrix Schemas

File: tellus-ehs-hazcom-service/app/schemas/safepath/matrix.py

"""SafePath Training Matrix Schemas"""

from typing import Optional, List, Dict
from uuid import UUID
from pydantic import BaseModel, Field, ConfigDict


# ============================================================================
# Training Matrix Schemas
# ============================================================================

class MatrixCourseHeader(BaseModel):
"""Column header for a course in the training matrix."""
course_id: UUID
course_title: str
category_name: Optional[str] = None
osha_standard_ref: Optional[str] = None


class MatrixCellStatus(BaseModel):
"""Single cell in the training matrix.

status values:
- "required": Assigned but not started
- "in_progress": Training started
- "completed": Training completed and passed
- "overdue": Past due date
- "expired": Assignment expired
- "not_assigned": Not required for this employee
"""
status: str = Field(
description="required, in_progress, completed, overdue, expired, not_assigned"
)
assignment_id: Optional[UUID] = None
due_date: Optional[str] = None
completion_date: Optional[str] = None
score_percent: Optional[float] = None


class MatrixEmployeeRow(BaseModel):
"""Row for a single employee in the training matrix."""
user_id: UUID
employee_name: str
employee_email: str
site_name: Optional[str] = None
role_name: Optional[str] = None
cells: Dict[str, MatrixCellStatus] = Field(
description="Map of course_id (as string) -> cell status"
)
compliance_percent: float = Field(
description="Percentage of required courses completed"
)


class TrainingMatrix(BaseModel):
"""Full training matrix response."""
courses: List[MatrixCourseHeader] = Field(
description="Column headers (courses)"
)
employees: List[MatrixEmployeeRow] = Field(
description="Rows (employees with cell statuses)"
)
total_employees: int
overall_compliance_percent: float


class MatrixFilterRequest(BaseModel):
"""Filters for the training matrix."""
site_id: Optional[UUID] = None
role_id: Optional[UUID] = None
course_id: Optional[UUID] = None
category_id: Optional[UUID] = None
status_filter: Optional[str] = Field(
None,
description="Filter to show only employees with this status in any cell: overdue, incomplete",
)
page: int = Field(1, ge=1)
page_size: int = Field(50, ge=1, le=200)

Dashboard Service

File: tellus-ehs-hazcom-service/app/services/safepath/dashboard_service.py

"""SafePath Dashboard Service

Provides role-appropriate dashboard data:
- Employee dashboard: Personal assignments, certifications, hours
- Admin/Manager dashboard: Company-wide compliance, per-site/course breakdowns
"""

from typing import Optional, List
from uuid import UUID
from datetime import date, datetime, timedelta
from decimal import Decimal

from sqlalchemy import func, and_, case, distinct
from sqlalchemy.orm import Session

from app.db.models.safepath import (
Assignment,
Result,
Course,
CourseCategory,
Certification,
CertificationType,
)
from app.db.models.user import User
from app.db.models.company import CompanySite
from app.schemas.safepath.dashboard import (
AssignmentStatusCounts,
CompletionRate,
MyAssignmentItem,
MyCertificationItem,
EmployeeDashboard,
TeamComplianceItem,
CourseCompletionItem,
SiteComplianceItem,
OverdueAlertItem,
AdminDashboard,
)


class DashboardService:
"""Service for generating dashboard data."""

def __init__(self, db: Session):
self.db = db

# ================================================================
# Employee Dashboard
# ================================================================

def get_employee_dashboard(self, company_id: UUID, user_id: UUID) -> EmployeeDashboard:
"""Build the employee/learner dashboard.

Shows:
- Assignment status counts
- Upcoming assignments (due within 30 days)
- Overdue assignments
- Recent completions (last 5)
- Expiring certifications (within 90 days)
- Total training hours
"""
today = date.today()
thirty_days = today + timedelta(days=30)

# --- Assignment status counts ---
status_counts = self._get_assignment_counts(company_id, user_id=user_id)

# --- Upcoming due (within 30 days, not completed) ---
upcoming_query = (
self.db.query(Assignment, Course)
.join(Course, Assignment.course_id == Course.course_id)
.filter(
Assignment.company_id == company_id,
Assignment.assigned_to == user_id,
Assignment.status.in_(["pending", "in_progress"]),
Assignment.due_date >= today,
Assignment.due_date <= thirty_days,
)
.order_by(Assignment.due_date.asc())
.limit(10)
.all()
)
upcoming = [
self._to_my_assignment_item(a, c, user_id)
for a, c in upcoming_query
]

# --- Overdue ---
overdue_query = (
self.db.query(Assignment, Course)
.join(Course, Assignment.course_id == Course.course_id)
.filter(
Assignment.company_id == company_id,
Assignment.assigned_to == user_id,
Assignment.status == "overdue",
)
.order_by(Assignment.due_date.asc())
.all()
)
overdue = [
self._to_my_assignment_item(a, c, user_id)
for a, c in overdue_query
]

# --- Recent completions (last 5) ---
recent_query = (
self.db.query(Assignment, Course)
.join(Course, Assignment.course_id == Course.course_id)
.filter(
Assignment.company_id == company_id,
Assignment.assigned_to == user_id,
Assignment.status == "completed",
)
.order_by(Assignment.updated_at.desc().nullslast())
.limit(5)
.all()
)
recent = [
self._to_my_assignment_item(a, c, user_id)
for a, c in recent_query
]

# --- Expiring certifications (within 90 days) ---
ninety_days = today + timedelta(days=90)
cert_query = (
self.db.query(Certification, CertificationType)
.join(
CertificationType,
Certification.certification_type_id == CertificationType.type_id,
)
.filter(
Certification.company_id == company_id,
Certification.user_id == user_id,
Certification.status != "revoked",
Certification.expiration_date.isnot(None),
Certification.expiration_date >= today,
Certification.expiration_date <= ninety_days,
)
.order_by(Certification.expiration_date.asc())
.all()
)
expiring_certs = []
for cert, cert_type in cert_query:
days_left = (cert.expiration_date - today).days if cert.expiration_date else None
expiring_certs.append(MyCertificationItem(
certification_id=cert.certification_id,
certification_type_name=cert_type.name,
status=cert.status,
expiration_date=cert.expiration_date,
days_until_expiry=days_left,
))

# --- Total training hours ---
total_seconds = (
self.db.query(func.coalesce(func.sum(Result.duration_seconds), 0))
.filter(
Result.user_id == user_id,
Result.completed_at.isnot(None),
)
.scalar()
)
total_hours = round(float(total_seconds) / 3600, 1) if total_seconds else 0.0

return EmployeeDashboard(
my_assignments=status_counts,
upcoming_due=upcoming,
overdue_assignments=overdue,
recent_completions=recent,
my_certifications_expiring=expiring_certs,
total_hours_completed=total_hours,
)

# ================================================================
# Admin / Manager Dashboard
# ================================================================

def get_admin_dashboard(
self, company_id: UUID, site_id: Optional[UUID] = None
) -> AdminDashboard:
"""Build the admin/manager dashboard.

Shows:
- Company-wide assignment status counts
- 30-day and 90-day completion rates
- Per-employee compliance (sorted worst-first)
- Per-course completion stats
- Per-site compliance stats
- Top 20 overdue alerts
- Certification expiration counts
"""
today = date.today()

# --- Company summary ---
company_summary = self._get_assignment_counts(company_id, site_id=site_id)

# --- Completion rates ---
rate_30d = self._get_completion_rate(company_id, days=30, site_id=site_id)
rate_90d = self._get_completion_rate(company_id, days=90, site_id=site_id)

# --- Team compliance (per employee) ---
team = self._get_team_compliance(company_id, site_id=site_id)

# --- Course completions ---
courses = self._get_course_completions(company_id, site_id=site_id)

# --- Site compliance ---
sites = self._get_site_compliance(company_id)

# --- Overdue alerts (top 20) ---
overdue_alerts = self._get_overdue_alerts(company_id, site_id=site_id, limit=20)

# --- Certification counts ---
certs_expiring_30d = (
self.db.query(func.count(Certification.certification_id))
.filter(
Certification.company_id == company_id,
Certification.status != "revoked",
Certification.expiration_date.isnot(None),
Certification.expiration_date >= today,
Certification.expiration_date <= today + timedelta(days=30),
)
.scalar() or 0
)

certs_expired = (
self.db.query(func.count(Certification.certification_id))
.filter(
Certification.company_id == company_id,
Certification.status == "expired",
)
.scalar() or 0
)

return AdminDashboard(
company_summary=company_summary,
completion_rate_30d=rate_30d,
completion_rate_90d=rate_90d,
team_compliance=team,
course_completions=courses,
site_compliance=sites,
overdue_alerts=overdue_alerts,
certifications_expiring_30d=certs_expiring_30d,
certifications_expired=certs_expired,
)

# ================================================================
# Private Helpers
# ================================================================

def _get_assignment_counts(
self, company_id: UUID,
user_id: Optional[UUID] = None,
site_id: Optional[UUID] = None,
) -> AssignmentStatusCounts:
"""Count assignments by status."""
query = self.db.query(
Assignment.status,
func.count(Assignment.assignment_id),
).filter(Assignment.company_id == company_id)

if user_id:
query = query.filter(Assignment.assigned_to == user_id)
if site_id:
query = query.filter(Assignment.site_id == site_id)

query = query.group_by(Assignment.status)
rows = query.all()

counts = {status: count for status, count in rows}
total = sum(counts.values())

return AssignmentStatusCounts(
pending=counts.get("pending", 0),
in_progress=counts.get("in_progress", 0),
completed=counts.get("completed", 0),
overdue=counts.get("overdue", 0),
expired=counts.get("expired", 0),
total=total,
)

def _get_completion_rate(
self, company_id: UUID, days: int, site_id: Optional[UUID] = None
) -> CompletionRate:
"""Calculate completion rate for assignments due within the last N days."""
today = date.today()
start_date = today - timedelta(days=days)

base = self.db.query(Assignment).filter(
Assignment.company_id == company_id,
Assignment.due_date >= start_date,
Assignment.due_date <= today,
)
if site_id:
base = base.filter(Assignment.site_id == site_id)

total = base.count()
completed = base.filter(Assignment.status == "completed").count()

rate = round((completed / total) * 100, 1) if total > 0 else 0.0

return CompletionRate(
completed=completed,
total=total,
rate_percent=rate,
)

def _get_team_compliance(
self, company_id: UUID, site_id: Optional[UUID] = None
) -> List[TeamComplianceItem]:
"""Get per-employee compliance stats.

Returns employees sorted by compliance_percent ASC (worst first).
Limited to top 50.
"""
query = (
self.db.query(
Assignment.assigned_to,
func.count(Assignment.assignment_id).label("total"),
func.count(
case(
(Assignment.status == "completed", Assignment.assignment_id),
)
).label("completed"),
func.count(
case(
(Assignment.status == "overdue", Assignment.assignment_id),
)
).label("overdue"),
func.count(
case(
(Assignment.status == "in_progress", Assignment.assignment_id),
)
).label("in_progress"),
)
.filter(Assignment.company_id == company_id)
)
if site_id:
query = query.filter(Assignment.site_id == site_id)

query = query.group_by(Assignment.assigned_to)
rows = query.all()

items = []
for row in rows:
user = self.db.query(User).filter(User.user_id == row.assigned_to).first()
if not user:
continue

total = row.total or 0
completed = row.completed or 0
compliance = round((completed / total) * 100, 1) if total > 0 else 0.0

items.append(TeamComplianceItem(
user_id=row.assigned_to,
employee_name=f"{user.first_name} {user.last_name}",
employee_email=user.email,
total_assigned=total,
completed=completed,
overdue=row.overdue or 0,
in_progress=row.in_progress or 0,
compliance_percent=compliance,
))

# Sort worst compliance first, limit 50
items.sort(key=lambda x: x.compliance_percent)
return items[:50]

def _get_course_completions(
self, company_id: UUID, site_id: Optional[UUID] = None
) -> List[CourseCompletionItem]:
"""Get per-course completion stats."""
query = (
self.db.query(
Assignment.course_id,
func.count(Assignment.assignment_id).label("total"),
func.count(
case(
(Assignment.status == "completed", Assignment.assignment_id),
)
).label("completed"),
func.count(
case(
(Assignment.status == "in_progress", Assignment.assignment_id),
)
).label("in_progress"),
func.count(
case(
(Assignment.status == "overdue", Assignment.assignment_id),
)
).label("overdue"),
)
.filter(Assignment.company_id == company_id)
)
if site_id:
query = query.filter(Assignment.site_id == site_id)

query = query.group_by(Assignment.course_id)
rows = query.all()

items = []
for row in rows:
course = self.db.query(Course).filter(Course.course_id == row.course_id).first()
if not course:
continue

category_name = None
if course.category_id:
cat = self.db.query(CourseCategory).filter(
CourseCategory.category_id == course.category_id
).first()
category_name = cat.name if cat else None

total = row.total or 0
completed = row.completed or 0
rate = round((completed / total) * 100, 1) if total > 0 else 0.0

# Average score for completed assignments
avg_score = (
self.db.query(func.avg(Result.score_percent))
.join(Assignment, Result.assignment_id == Assignment.assignment_id)
.filter(
Assignment.course_id == row.course_id,
Assignment.company_id == company_id,
Result.passed == True, # noqa: E712
)
.scalar()
)

items.append(CourseCompletionItem(
course_id=row.course_id,
course_title=course.title,
category_name=category_name,
total_assigned=total,
completed=completed,
in_progress=row.in_progress or 0,
overdue=row.overdue or 0,
completion_rate_percent=rate,
avg_score_percent=round(float(avg_score), 1) if avg_score else None,
))

items.sort(key=lambda x: x.completion_rate_percent)
return items

def _get_site_compliance(self, company_id: UUID) -> List[SiteComplianceItem]:
"""Get per-site compliance stats."""
# Get all sites for the company
sites = (
self.db.query(CompanySite)
.filter(CompanySite.company_id == company_id)
.all()
)

items = []
for site in sites:
# Count employees at this site (users with assignments at this site)
employee_count = (
self.db.query(func.count(distinct(Assignment.assigned_to)))
.filter(
Assignment.company_id == company_id,
Assignment.site_id == site.site_id,
)
.scalar() or 0
)

total = (
self.db.query(func.count(Assignment.assignment_id))
.filter(
Assignment.company_id == company_id,
Assignment.site_id == site.site_id,
)
.scalar() or 0
)

completed = (
self.db.query(func.count(Assignment.assignment_id))
.filter(
Assignment.company_id == company_id,
Assignment.site_id == site.site_id,
Assignment.status == "completed",
)
.scalar() or 0
)

overdue = (
self.db.query(func.count(Assignment.assignment_id))
.filter(
Assignment.company_id == company_id,
Assignment.site_id == site.site_id,
Assignment.status == "overdue",
)
.scalar() or 0
)

compliance = round((completed / total) * 100, 1) if total > 0 else 0.0

items.append(SiteComplianceItem(
site_id=site.site_id,
site_name=site.name,
total_employees=employee_count,
total_assigned=total,
completed=completed,
overdue=overdue,
compliance_percent=compliance,
))

items.sort(key=lambda x: x.compliance_percent)
return items

def _get_overdue_alerts(
self, company_id: UUID, site_id: Optional[UUID] = None, limit: int = 20
) -> List[OverdueAlertItem]:
"""Get top N most overdue assignments."""
today = date.today()

query = (
self.db.query(Assignment, Course)
.join(Course, Assignment.course_id == Course.course_id)
.filter(
Assignment.company_id == company_id,
Assignment.status == "overdue",
)
)
if site_id:
query = query.filter(Assignment.site_id == site_id)

query = query.order_by(Assignment.due_date.asc()).limit(limit)
rows = query.all()

items = []
for assignment, course in rows:
user = self.db.query(User).filter(
User.user_id == assignment.assigned_to
).first()
if not user:
continue

days_overdue = (today - assignment.due_date).days

items.append(OverdueAlertItem(
assignment_id=assignment.assignment_id,
employee_name=f"{user.first_name} {user.last_name}",
employee_email=user.email,
course_title=course.title,
due_date=assignment.due_date,
days_overdue=days_overdue,
priority=assignment.priority,
))

return items

def _to_my_assignment_item(
self, assignment: Assignment, course: Course, user_id: UUID
) -> MyAssignmentItem:
"""Convert an assignment + course to a MyAssignmentItem."""
# Get latest result for progress
latest_result = (
self.db.query(Result)
.filter(
Result.assignment_id == assignment.assignment_id,
Result.user_id == user_id,
)
.order_by(Result.attempt_number.desc())
.first()
)

progress = 0.0
last_activity = None
if latest_result:
if latest_result.lesson_progress:
# Calculate average progress across lessons
progresses = [
lp.get("percent", 0)
for lp in latest_result.lesson_progress.values()
]
progress = sum(progresses) / len(progresses) if progresses else 0.0
last_activity = latest_result.completed_at or latest_result.started_at

return MyAssignmentItem(
assignment_id=assignment.assignment_id,
course_title=course.title,
course_id=course.course_id,
due_date=assignment.due_date,
priority=assignment.priority,
status=assignment.status,
progress_percent=round(progress, 1),
last_activity=last_activity,
)

Training Matrix Service

File: tellus-ehs-hazcom-service/app/services/safepath/matrix_service.py

"""SafePath Training Matrix Service

Generates a role x course training matrix showing compliance status
for each employee-course combination.
"""

from typing import Optional, List, Tuple
from uuid import UUID

from sqlalchemy import func, distinct
from sqlalchemy.orm import Session

from app.db.models.safepath import (
Assignment,
Result,
Course,
CourseCategory,
)
from app.db.models.user import User
from app.db.models.company import CompanySite
from app.schemas.safepath.matrix import (
MatrixCourseHeader,
MatrixCellStatus,
MatrixEmployeeRow,
TrainingMatrix,
)


class MatrixService:
"""Service for generating the training matrix."""

def __init__(self, db: Session):
self.db = db

def get_training_matrix(
self,
company_id: UUID,
site_id: Optional[UUID] = None,
role_id: Optional[UUID] = None,
course_id: Optional[UUID] = None,
category_id: Optional[UUID] = None,
status_filter: Optional[str] = None,
page: int = 1,
page_size: int = 50,
) -> TrainingMatrix:
"""Generate the training matrix.

The matrix is a grid of employees (rows) x courses (columns).
Each cell shows the assignment status for that employee-course pair.

Steps:
1. Determine courses (columns) from assignments in scope
2. Determine employees (rows) from assignments in scope
3. For each employee-course pair, find the assignment status
4. Calculate per-employee compliance percent
"""
# --- Step 1: Determine courses (columns) ---
course_query = (
self.db.query(distinct(Assignment.course_id))
.filter(Assignment.company_id == company_id)
)
if site_id:
course_query = course_query.filter(Assignment.site_id == site_id)
if course_id:
course_query = course_query.filter(Assignment.course_id == course_id)

course_ids = [row[0] for row in course_query.all()]

# Filter by category if specified
if category_id:
course_ids = [
cid for cid in course_ids
if self.db.query(Course)
.filter(Course.course_id == cid, Course.category_id == category_id)
.first()
]

if not course_ids:
return TrainingMatrix(
courses=[], employees=[], total_employees=0, overall_compliance_percent=0.0
)

# Build course headers
courses = []
for cid in course_ids:
course = self.db.query(Course).filter(Course.course_id == cid).first()
if not course:
continue
cat_name = None
if course.category_id:
cat = self.db.query(CourseCategory).filter(
CourseCategory.category_id == course.category_id
).first()
cat_name = cat.name if cat else None

courses.append(MatrixCourseHeader(
course_id=course.course_id,
course_title=course.title,
category_name=cat_name,
osha_standard_ref=course.osha_standard_ref,
))

# --- Step 2: Determine employees (rows) ---
employee_query = (
self.db.query(distinct(Assignment.assigned_to))
.filter(
Assignment.company_id == company_id,
Assignment.course_id.in_(course_ids),
)
)
if site_id:
employee_query = employee_query.filter(Assignment.site_id == site_id)

all_employee_ids = [row[0] for row in employee_query.all()]
total_employees = len(all_employee_ids)

# Paginate employees
start = (page - 1) * page_size
end = start + page_size
page_employee_ids = all_employee_ids[start:end]

# --- Step 3: Build matrix rows ---
employees = []
total_compliance_sum = 0.0

for emp_id in page_employee_ids:
user = self.db.query(User).filter(User.user_id == emp_id).first()
if not user:
continue

# Get site name from the most recent assignment
site_name = None
emp_assignment = (
self.db.query(Assignment)
.filter(
Assignment.assigned_to == emp_id,
Assignment.company_id == company_id,
Assignment.site_id.isnot(None),
)
.first()
)
if emp_assignment and emp_assignment.site_id:
site = self.db.query(CompanySite).filter(
CompanySite.site_id == emp_assignment.site_id
).first()
site_name = site.name if site else None

cells = {}
required_count = 0
completed_count = 0

for course_header in courses:
cid_str = str(course_header.course_id)

# Find assignment for this employee-course pair
assignment = (
self.db.query(Assignment)
.filter(
Assignment.company_id == company_id,
Assignment.assigned_to == emp_id,
Assignment.course_id == course_header.course_id,
)
.order_by(Assignment.created_at.desc())
.first()
)

if not assignment:
cells[cid_str] = MatrixCellStatus(status="not_assigned")
continue

required_count += 1

cell = MatrixCellStatus(
status=assignment.status,
assignment_id=assignment.assignment_id,
due_date=str(assignment.due_date) if assignment.due_date else None,
)

# If completed, get completion date and score
if assignment.status == "completed":
completed_count += 1
passing_result = (
self.db.query(Result)
.filter(
Result.assignment_id == assignment.assignment_id,
Result.passed == True, # noqa: E712
)
.order_by(Result.completed_at.desc())
.first()
)
if passing_result:
cell.completion_date = (
str(passing_result.completed_at.date())
if passing_result.completed_at else None
)
cell.score_percent = (
float(passing_result.score_percent)
if passing_result.score_percent else None
)

cells[cid_str] = cell

compliance = (
round((completed_count / required_count) * 100, 1)
if required_count > 0 else 100.0
)

# Apply status_filter
if status_filter == "overdue":
has_overdue = any(
c.status == "overdue" for c in cells.values()
)
if not has_overdue:
continue
elif status_filter == "incomplete":
has_incomplete = any(
c.status in ("pending", "in_progress", "overdue")
for c in cells.values()
)
if not has_incomplete:
continue

total_compliance_sum += compliance

employees.append(MatrixEmployeeRow(
user_id=emp_id,
employee_name=f"{user.first_name} {user.last_name}",
employee_email=user.email,
site_name=site_name,
role_name=None, # Can be populated from company_roles if needed
cells=cells,
compliance_percent=compliance,
))

# Sort by compliance (worst first)
employees.sort(key=lambda x: x.compliance_percent)

overall = (
round(total_compliance_sum / len(employees), 1)
if employees else 0.0
)

return TrainingMatrix(
courses=courses,
employees=employees,
total_employees=total_employees,
overall_compliance_percent=overall,
)

Report Service

File: tellus-ehs-hazcom-service/app/services/safepath/report_service.py

"""SafePath Report Service

Generates compliance reports and handles export to CSV/PDF/XLSX.
"""

import csv
import io
from typing import Optional, List
from uuid import UUID
from datetime import date, datetime, timedelta

from sqlalchemy import func
from sqlalchemy.orm import Session

from app.db.models.safepath import (
Assignment,
Result,
Course,
CourseCategory,
Certification,
CertificationType,
)
from app.db.models.user import User
from app.db.models.company import Company, CompanySite
from app.schemas.safepath.dashboard import (
AssignmentStatusCounts,
CompletionReportRow,
CompletionReport,
IndividualTranscriptRow,
IndividualTranscript,
CertificationStatusRow,
CertificationStatusReport,
)


class ReportService:
"""Service for generating and exporting compliance reports."""

def __init__(self, db: Session):
self.db = db

# ================================================================
# Completion Summary Report
# ================================================================

def generate_completion_report(
self,
company_id: UUID,
site_id: Optional[UUID] = None,
course_id: Optional[UUID] = None,
status: Optional[str] = None,
date_from: Optional[date] = None,
date_to: Optional[date] = None,
) -> CompletionReport:
"""Generate a training completion summary report.

Lists all assignments with their completion status, optionally
filtered by site, course, status, or date range.
"""
company = self.db.query(Company).filter(Company.company_id == company_id).first()
company_name = company.name if company else "Unknown"

query = (
self.db.query(Assignment, Course, User)
.join(Course, Assignment.course_id == Course.course_id)
.join(User, Assignment.assigned_to == User.user_id)
.filter(Assignment.company_id == company_id)
)

filters_desc = []

if site_id:
query = query.filter(Assignment.site_id == site_id)
site = self.db.query(CompanySite).filter(CompanySite.site_id == site_id).first()
filters_desc.append(f"Site: {site.name}" if site else f"Site: {site_id}")
if course_id:
query = query.filter(Assignment.course_id == course_id)
filters_desc.append(f"Course ID: {course_id}")
if status:
query = query.filter(Assignment.status == status)
filters_desc.append(f"Status: {status}")
if date_from:
query = query.filter(Assignment.due_date >= date_from)
filters_desc.append(f"From: {date_from}")
if date_to:
query = query.filter(Assignment.due_date <= date_to)
filters_desc.append(f"To: {date_to}")

filter_description = ", ".join(filters_desc) if filters_desc else "All assignments"

query = query.order_by(User.last_name, User.first_name, Assignment.due_date)
rows_raw = query.all()

rows = []
status_counts = {"pending": 0, "in_progress": 0, "completed": 0, "overdue": 0, "expired": 0}

for assignment, course, user in rows_raw:
# Get site name
site_name = None
if assignment.site_id:
site = self.db.query(CompanySite).filter(
CompanySite.site_id == assignment.site_id
).first()
site_name = site.name if site else None

# Get category name
category_name = None
if course.category_id:
cat = self.db.query(CourseCategory).filter(
CourseCategory.category_id == course.category_id
).first()
category_name = cat.name if cat else None

# Get best passing result
best_result = (
self.db.query(Result)
.filter(
Result.assignment_id == assignment.assignment_id,
Result.passed == True, # noqa: E712
)
.order_by(Result.completed_at.desc())
.first()
)

completion_date = None
score = None
passed = None
delivery_method = "online"
instructor = None

if best_result:
completion_date = (
best_result.completed_at.date() if best_result.completed_at else None
)
score = float(best_result.score_percent) if best_result.score_percent else None
passed = best_result.passed
delivery_method = best_result.delivery_method
instructor = best_result.instructor_name

rows.append(CompletionReportRow(
employee_name=f"{user.first_name} {user.last_name}",
employee_email=user.email,
site_name=site_name,
course_title=course.title,
category_name=category_name,
assignment_date=assignment.created_at.date() if assignment.created_at else date.today(),
due_date=assignment.due_date,
completion_date=completion_date,
status=assignment.status,
score_percent=score,
passed=passed,
delivery_method=delivery_method,
instructor_name=instructor,
))

if assignment.status in status_counts:
status_counts[assignment.status] += 1

summary = AssignmentStatusCounts(
total=len(rows),
**status_counts,
)

return CompletionReport(
generated_at=datetime.utcnow(),
company_name=company_name,
filter_description=filter_description,
rows=rows,
total_rows=len(rows),
summary=summary,
)

# ================================================================
# Individual Transcript
# ================================================================

def generate_individual_transcript(
self, company_id: UUID, user_id: UUID
) -> IndividualTranscript:
"""Generate a training transcript for a single employee.

Lists all their training records, certifications, and summary stats.
"""
company = self.db.query(Company).filter(Company.company_id == company_id).first()
user = self.db.query(User).filter(User.user_id == user_id).first()

if not user:
raise ValueError(f"User {user_id} not found")

# Get all assignments for this user
assignments = (
self.db.query(Assignment, Course)
.join(Course, Assignment.course_id == Course.course_id)
.filter(
Assignment.company_id == company_id,
Assignment.assigned_to == user_id,
)
.order_by(Course.title)
.all()
)

records = []
total_completed = 0
total_in_progress = 0
total_seconds = 0

for assignment, course in assignments:
category_name = None
if course.category_id:
cat = self.db.query(CourseCategory).filter(
CourseCategory.category_id == course.category_id
).first()
category_name = cat.name if cat else None

# Get best result
best_result = (
self.db.query(Result)
.filter(Result.assignment_id == assignment.assignment_id)
.order_by(Result.score_percent.desc().nullslast())
.first()
)

completion_date = None
score = None
passed = None
delivery_method = "online"
cert_number = None
cert_expiration = None

if best_result:
completion_date = (
best_result.completed_at.date() if best_result.completed_at else None
)
score = float(best_result.score_percent) if best_result.score_percent else None
passed = best_result.passed
delivery_method = best_result.delivery_method
if best_result.duration_seconds:
total_seconds += best_result.duration_seconds

# Check for linked certification
from app.db.models.safepath import Certification
cert = (
self.db.query(Certification)
.filter(Certification.result_id == best_result.result_id)
.first()
)
if cert:
cert_number = cert.certification_number
cert_expiration = cert.expiration_date

if assignment.status == "completed":
total_completed += 1
elif assignment.status == "in_progress":
total_in_progress += 1

records.append(IndividualTranscriptRow(
course_title=course.title,
category_name=category_name,
completion_date=completion_date,
score_percent=score,
passed=passed,
delivery_method=delivery_method,
certificate_number=cert_number,
certification_expiration=cert_expiration,
))

# Certification counts
today = date.today()
certs_active = (
self.db.query(func.count(Certification.certification_id))
.filter(
Certification.company_id == company_id,
Certification.user_id == user_id,
Certification.status.in_(["active", "expiring_soon"]),
)
.scalar() or 0
)
certs_expiring = (
self.db.query(func.count(Certification.certification_id))
.filter(
Certification.company_id == company_id,
Certification.user_id == user_id,
Certification.status == "expiring_soon",
)
.scalar() or 0
)

return IndividualTranscript(
employee_name=f"{user.first_name} {user.last_name}",
employee_email=user.email,
employee_id=user_id,
company_name=company.name if company else "Unknown",
generated_at=datetime.utcnow(),
records=records,
total_completed=total_completed,
total_in_progress=total_in_progress,
total_hours=round(total_seconds / 3600, 1),
certifications_active=certs_active,
certifications_expiring=certs_expiring,
)

# ================================================================
# Certification Status Report
# ================================================================

def generate_certification_report(
self,
company_id: UUID,
site_id: Optional[UUID] = None,
status: Optional[str] = None,
) -> CertificationStatusReport:
"""Generate a certification status report."""
company = self.db.query(Company).filter(Company.company_id == company_id).first()
today = date.today()

query = (
self.db.query(Certification, CertificationType, User)
.join(
CertificationType,
Certification.certification_type_id == CertificationType.type_id,
)
.join(User, Certification.user_id == User.user_id)
.filter(Certification.company_id == company_id)
)

if status:
query = query.filter(Certification.status == status)

query = query.order_by(
Certification.expiration_date.asc().nullslast(),
User.last_name,
)
rows_raw = query.all()

rows = []
total_active = 0
total_expiring = 0
total_expired = 0

for cert, cert_type, user in rows_raw:
# Get site name from user's most recent assignment
site_name = None
if site_id:
# Filter by site — skip if user has no assignments at this site
has_site = (
self.db.query(Assignment)
.filter(
Assignment.assigned_to == user.user_id,
Assignment.company_id == company_id,
Assignment.site_id == site_id,
)
.first()
)
if not has_site:
continue

emp_assignment = (
self.db.query(Assignment)
.filter(
Assignment.assigned_to == user.user_id,
Assignment.company_id == company_id,
Assignment.site_id.isnot(None),
)
.first()
)
if emp_assignment and emp_assignment.site_id:
site_obj = self.db.query(CompanySite).filter(
CompanySite.site_id == emp_assignment.site_id
).first()
site_name = site_obj.name if site_obj else None

days_until = None
if cert.expiration_date:
days_until = (cert.expiration_date - today).days

rows.append(CertificationStatusRow(
employee_name=f"{user.first_name} {user.last_name}",
employee_email=user.email,
site_name=site_name,
certification_type=cert_type.name,
osha_standard_ref=cert_type.osha_standard_ref,
issue_date=cert.issue_date,
expiration_date=cert.expiration_date,
status=cert.status,
days_until_expiry=days_until,
source=cert.source,
))

if cert.status == "active":
total_active += 1
elif cert.status == "expiring_soon":
total_expiring += 1
elif cert.status == "expired":
total_expired += 1

return CertificationStatusReport(
generated_at=datetime.utcnow(),
company_name=company.name if company else "Unknown",
rows=rows,
total_active=total_active,
total_expiring_soon=total_expiring,
total_expired=total_expired,
)

# ================================================================
# Export Helpers
# ================================================================

def export_completion_report_csv(self, report: CompletionReport) -> bytes:
"""Export completion report as CSV bytes."""
output = io.StringIO()
writer = csv.writer(output)

# Header
writer.writerow([
"Employee Name", "Employee Email", "Site", "Course",
"Category", "Assigned Date", "Due Date", "Completion Date",
"Status", "Score (%)", "Passed", "Delivery Method", "Instructor",
])

for row in report.rows:
writer.writerow([
row.employee_name,
row.employee_email,
row.site_name or "",
row.course_title,
row.category_name or "",
str(row.assignment_date),
str(row.due_date),
str(row.completion_date) if row.completion_date else "",
row.status,
str(row.score_percent) if row.score_percent is not None else "",
"Yes" if row.passed else ("No" if row.passed is False else ""),
row.delivery_method,
row.instructor_name or "",
])

return output.getvalue().encode("utf-8")

def export_individual_transcript_csv(self, transcript: IndividualTranscript) -> bytes:
"""Export individual transcript as CSV bytes."""
output = io.StringIO()
writer = csv.writer(output)

# Header info
writer.writerow(["Training Transcript"])
writer.writerow(["Employee", transcript.employee_name])
writer.writerow(["Email", transcript.employee_email])
writer.writerow(["Company", transcript.company_name])
writer.writerow(["Generated", str(transcript.generated_at)])
writer.writerow([])

# Column headers
writer.writerow([
"Course", "Category", "Completion Date", "Score (%)",
"Passed", "Delivery Method", "Certificate #", "Cert Expiration",
])

for row in transcript.records:
writer.writerow([
row.course_title,
row.category_name or "",
str(row.completion_date) if row.completion_date else "",
str(row.score_percent) if row.score_percent is not None else "",
"Yes" if row.passed else ("No" if row.passed is False else ""),
row.delivery_method,
row.certificate_number or "",
str(row.certification_expiration) if row.certification_expiration else "",
])

# Summary
writer.writerow([])
writer.writerow(["Summary"])
writer.writerow(["Total Completed", transcript.total_completed])
writer.writerow(["Total In Progress", transcript.total_in_progress])
writer.writerow(["Total Hours", transcript.total_hours])
writer.writerow(["Active Certifications", transcript.certifications_active])
writer.writerow(["Expiring Certifications", transcript.certifications_expiring])

return output.getvalue().encode("utf-8")

def export_certification_report_csv(self, report: CertificationStatusReport) -> bytes:
"""Export certification status report as CSV bytes."""
output = io.StringIO()
writer = csv.writer(output)

# Header
writer.writerow([
"Employee Name", "Employee Email", "Site",
"Certification Type", "OSHA Standard", "Issue Date",
"Expiration Date", "Status", "Days Until Expiry", "Source",
])

for row in report.rows:
writer.writerow([
row.employee_name,
row.employee_email,
row.site_name or "",
row.certification_type,
row.osha_standard_ref or "",
str(row.issue_date),
str(row.expiration_date) if row.expiration_date else "",
row.status,
str(row.days_until_expiry) if row.days_until_expiry is not None else "",
row.source,
])

return output.getvalue().encode("utf-8")

def export_matrix_csv(self, matrix) -> bytes:
"""Export training matrix as CSV bytes.

Columns: Employee Name, Email, Site, Role, [Course1], [Course2], ..., Compliance %
Cell values: status (score%) or status
"""
from app.schemas.safepath.matrix import TrainingMatrix
output = io.StringIO()
writer = csv.writer(output)

# Header row
header = ["Employee Name", "Email", "Site", "Role"]
for course in matrix.courses:
header.append(course.course_title)
header.append("Compliance %")
writer.writerow(header)

# Employee rows
for emp in matrix.employees:
row = [
emp.employee_name,
emp.employee_email,
emp.site_name or "",
emp.role_name or "",
]
for course in matrix.courses:
cid_str = str(course.course_id)
cell = emp.cells.get(cid_str)
if cell and cell.status != "not_assigned":
cell_text = cell.status.upper()
if cell.score_percent is not None:
cell_text += f" ({cell.score_percent:.0f}%)"
row.append(cell_text)
else:
row.append("N/A")
row.append(f"{emp.compliance_percent:.1f}%")
writer.writerow(row)

return output.getvalue().encode("utf-8")

def export_completion_report_pdf(self, report: CompletionReport) -> bytes:
"""Export completion report as PDF using ReportLab.

Creates a professional PDF with:
- Company header and report title
- Filter description
- Summary statistics
- Detailed table of all rows
"""
from reportlab.lib.pagesizes import letter, landscape
from reportlab.lib import colors
from reportlab.lib.units import inch
from reportlab.platypus import (
SimpleDocTemplate,
Table,
TableStyle,
Paragraph,
Spacer,
)
from reportlab.lib.styles import getSampleStyleSheet

buffer = io.BytesIO()
doc = SimpleDocTemplate(buffer, pagesize=landscape(letter))
styles = getSampleStyleSheet()
elements = []

# Title
elements.append(Paragraph(report.report_title, styles["Title"]))
elements.append(Paragraph(f"Company: {report.company_name}", styles["Normal"]))
elements.append(Paragraph(f"Filters: {report.filter_description}", styles["Normal"]))
elements.append(Paragraph(
f"Generated: {report.generated_at.strftime('%Y-%m-%d %H:%M UTC')}",
styles["Normal"],
))
elements.append(Spacer(1, 0.25 * inch))

# Summary
summary_data = [
["Total", "Completed", "In Progress", "Overdue", "Pending", "Expired"],
[
str(report.summary.total),
str(report.summary.completed),
str(report.summary.in_progress),
str(report.summary.overdue),
str(report.summary.pending),
str(report.summary.expired),
],
]
summary_table = Table(summary_data, colWidths=[1.2 * inch] * 6)
summary_table.setStyle(TableStyle([
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#1a56db")),
("TEXTCOLOR", (0, 0), (-1, 0), colors.white),
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
("FONTSIZE", (0, 0), (-1, -1), 9),
("ALIGN", (0, 0), (-1, -1), "CENTER"),
("GRID", (0, 0), (-1, -1), 0.5, colors.grey),
]))
elements.append(summary_table)
elements.append(Spacer(1, 0.25 * inch))

# Detail table (limit to 500 rows for PDF readability)
detail_header = [
"Employee", "Course", "Due Date", "Status", "Score", "Completed",
]
detail_data = [detail_header]

for row in report.rows[:500]:
detail_data.append([
row.employee_name[:25],
row.course_title[:30],
str(row.due_date),
row.status.upper(),
f"{row.score_percent:.0f}%" if row.score_percent is not None else "-",
str(row.completion_date) if row.completion_date else "-",
])

col_widths = [1.8 * inch, 2.2 * inch, 1.0 * inch, 1.0 * inch, 0.8 * inch, 1.0 * inch]
detail_table = Table(detail_data, colWidths=col_widths)
detail_table.setStyle(TableStyle([
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#1a56db")),
("TEXTCOLOR", (0, 0), (-1, 0), colors.white),
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
("FONTSIZE", (0, 0), (-1, -1), 8),
("ALIGN", (0, 0), (-1, -1), "LEFT"),
("GRID", (0, 0), (-1, -1), 0.5, colors.lightgrey),
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#f3f4f6")]),
]))
elements.append(detail_table)

doc.build(elements)
return buffer.getvalue()

API Endpoints

Dashboard Endpoints

File: tellus-ehs-hazcom-service/app/api/v1/safepath/dashboard.py

"""SafePath Dashboard API Endpoints"""

from typing import Optional
from uuid import UUID

from fastapi import APIRouter, Depends, Query
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session

from app.db.session import get_db
from app.api.v1.adminhq.auth import get_user_context, UserContext
from app.services.safepath.dashboard_service import DashboardService
from app.services.safepath.report_service import ReportService
from app.schemas.safepath.dashboard import (
EmployeeDashboard,
AdminDashboard,
CompletionReport,
IndividualTranscript,
CertificationStatusReport,
ReportExportRequest,
)

router = APIRouter(prefix="/training", tags=["SafePath Dashboard"])


# ============================================================================
# Dashboard
# ============================================================================

@router.get("/dashboard/me", response_model=EmployeeDashboard)
def get_my_dashboard(
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Get the current user's training dashboard.

Returns personal assignment stats, upcoming/overdue items,
recent completions, expiring certifications, and total hours.
"""
service = DashboardService(db)
return service.get_employee_dashboard(ctx.company_id, ctx.user_id)


@router.get("/dashboard/admin", response_model=AdminDashboard)
def get_admin_dashboard(
site_id: Optional[UUID] = Query(None, description="Filter by site"),
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Get the admin/manager training dashboard.

Returns company-wide compliance stats, per-employee/course/site
breakdowns, overdue alerts, and certification counts.

Requires role: company_admin, safety_coordinator, or trainer.
"""
# TODO: Add role check — only admin/coordinator/trainer roles
service = DashboardService(db)
return service.get_admin_dashboard(ctx.company_id, site_id=site_id)


# ============================================================================
# Reports
# ============================================================================

@router.get("/reports/completion", response_model=CompletionReport)
def get_completion_report(
site_id: Optional[UUID] = Query(None),
course_id: Optional[UUID] = Query(None),
status: Optional[str] = Query(None, description="pending, in_progress, completed, overdue"),
date_from: Optional[str] = Query(None, description="YYYY-MM-DD"),
date_to: Optional[str] = Query(None, description="YYYY-MM-DD"),
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Generate a training completion summary report.

Returns all assignments with status, scores, and completion dates.
"""
from datetime import date as date_type

parsed_from = date_type.fromisoformat(date_from) if date_from else None
parsed_to = date_type.fromisoformat(date_to) if date_to else None

service = ReportService(db)
return service.generate_completion_report(
company_id=ctx.company_id,
site_id=site_id,
course_id=course_id,
status=status,
date_from=parsed_from,
date_to=parsed_to,
)


@router.get("/reports/transcript/{user_id}", response_model=IndividualTranscript)
def get_individual_transcript(
user_id: UUID,
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Generate an individual employee's training transcript.

Returns all training records, certifications, and summary stats.
"""
service = ReportService(db)
return service.generate_individual_transcript(ctx.company_id, user_id)


@router.get("/reports/certifications", response_model=CertificationStatusReport)
def get_certification_report(
site_id: Optional[UUID] = Query(None),
status: Optional[str] = Query(None, description="active, expiring_soon, expired"),
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Generate a certification status report."""
service = ReportService(db)
return service.generate_certification_report(
company_id=ctx.company_id,
site_id=site_id,
status=status,
)


# ============================================================================
# Report Exports
# ============================================================================

@router.get("/reports/completion/export")
def export_completion_report(
format: str = Query("csv", description="csv, pdf, xlsx"),
site_id: Optional[UUID] = Query(None),
course_id: Optional[UUID] = Query(None),
status: Optional[str] = Query(None),
date_from: Optional[str] = Query(None),
date_to: Optional[str] = Query(None),
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Export completion report as CSV, PDF, or XLSX.

Returns a downloadable file.
"""
from datetime import date as date_type
import io

parsed_from = date_type.fromisoformat(date_from) if date_from else None
parsed_to = date_type.fromisoformat(date_to) if date_to else None

service = ReportService(db)
report = service.generate_completion_report(
company_id=ctx.company_id,
site_id=site_id,
course_id=course_id,
status=status,
date_from=parsed_from,
date_to=parsed_to,
)

if format == "csv":
content = service.export_completion_report_csv(report)
return StreamingResponse(
io.BytesIO(content),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=completion_report.csv"},
)
elif format == "pdf":
content = service.export_completion_report_pdf(report)
return StreamingResponse(
io.BytesIO(content),
media_type="application/pdf",
headers={"Content-Disposition": "attachment; filename=completion_report.pdf"},
)
else:
# XLSX export using openpyxl
content = _export_completion_xlsx(report)
return StreamingResponse(
io.BytesIO(content),
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": "attachment; filename=completion_report.xlsx"},
)


@router.get("/reports/transcript/{user_id}/export")
def export_individual_transcript(
user_id: UUID,
format: str = Query("csv", description="csv, pdf"),
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Export individual transcript as CSV or PDF."""
import io

service = ReportService(db)
transcript = service.generate_individual_transcript(ctx.company_id, user_id)

if format == "csv":
content = service.export_individual_transcript_csv(transcript)
return StreamingResponse(
io.BytesIO(content),
media_type="text/csv",
headers={
"Content-Disposition": f"attachment; filename=transcript_{user_id}.csv"
},
)
else:
# PDF export — reuse ReportLab pattern
# For now, return CSV fallback; PDF transcript TBD
content = service.export_individual_transcript_csv(transcript)
return StreamingResponse(
io.BytesIO(content),
media_type="text/csv",
headers={
"Content-Disposition": f"attachment; filename=transcript_{user_id}.csv"
},
)


@router.get("/reports/certifications/export")
def export_certification_report(
format: str = Query("csv", description="csv, pdf"),
site_id: Optional[UUID] = Query(None),
status: Optional[str] = Query(None),
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Export certification status report as CSV."""
import io

service = ReportService(db)
report = service.generate_certification_report(
company_id=ctx.company_id,
site_id=site_id,
status=status,
)

content = service.export_certification_report_csv(report)
return StreamingResponse(
io.BytesIO(content),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=certification_report.csv"},
)


# ============================================================================
# XLSX Helper
# ============================================================================

def _export_completion_xlsx(report: CompletionReport) -> bytes:
"""Generate XLSX export using openpyxl.

Creates a workbook with:
- Summary sheet (status counts)
- Detail sheet (all rows)
"""
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment
import io

wb = Workbook()

# Summary sheet
ws_summary = wb.active
ws_summary.title = "Summary"

header_fill = PatternFill(start_color="1a56db", end_color="1a56db", fill_type="solid")
header_font = Font(color="FFFFFF", bold=True)

ws_summary.append(["Training Completion Report"])
ws_summary["A1"].font = Font(size=14, bold=True)
ws_summary.append([f"Company: {report.company_name}"])
ws_summary.append([f"Filters: {report.filter_description}"])
ws_summary.append([f"Generated: {report.generated_at.strftime('%Y-%m-%d %H:%M UTC')}"])
ws_summary.append([])

summary_headers = ["Total", "Completed", "In Progress", "Overdue", "Pending", "Expired"]
ws_summary.append(summary_headers)
for cell in ws_summary[6]:
cell.fill = header_fill
cell.font = header_font

ws_summary.append([
report.summary.total,
report.summary.completed,
report.summary.in_progress,
report.summary.overdue,
report.summary.pending,
report.summary.expired,
])

# Detail sheet
ws_detail = wb.create_sheet("Details")
detail_headers = [
"Employee Name", "Employee Email", "Site", "Course",
"Category", "Assigned Date", "Due Date", "Completion Date",
"Status", "Score (%)", "Passed", "Delivery Method", "Instructor",
]
ws_detail.append(detail_headers)
for cell in ws_detail[1]:
cell.fill = header_fill
cell.font = header_font

for row in report.rows:
ws_detail.append([
row.employee_name,
row.employee_email,
row.site_name or "",
row.course_title,
row.category_name or "",
str(row.assignment_date),
str(row.due_date),
str(row.completion_date) if row.completion_date else "",
row.status,
row.score_percent if row.score_percent is not None else None,
"Yes" if row.passed else ("No" if row.passed is False else ""),
row.delivery_method,
row.instructor_name or "",
])

# Auto-width columns
for ws in [ws_summary, ws_detail]:
for col in ws.columns:
max_len = max(len(str(cell.value or "")) for cell in col)
ws.column_dimensions[col[0].column_letter].width = min(max_len + 2, 40)

buffer = io.BytesIO()
wb.save(buffer)
return buffer.getvalue()

Training Matrix Endpoints

File: tellus-ehs-hazcom-service/app/api/v1/safepath/matrix.py

"""SafePath Training Matrix API Endpoints"""

from typing import Optional
from uuid import UUID

from fastapi import APIRouter, Depends, Query
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session

from app.db.session import get_db
from app.api.v1.adminhq.auth import get_user_context, UserContext
from app.services.safepath.matrix_service import MatrixService
from app.services.safepath.report_service import ReportService
from app.schemas.safepath.matrix import TrainingMatrix

router = APIRouter(prefix="/training", tags=["SafePath Training Matrix"])


@router.get("/matrix", response_model=TrainingMatrix)
def get_training_matrix(
site_id: Optional[UUID] = Query(None, description="Filter by site"),
role_id: Optional[UUID] = Query(None, description="Filter by role"),
course_id: Optional[UUID] = Query(None, description="Filter to a single course"),
category_id: Optional[UUID] = Query(None, description="Filter by course category"),
status_filter: Optional[str] = Query(
None,
description="overdue = only employees with overdue items, incomplete = not fully compliant",
),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Get the training matrix.

Returns a grid of employees (rows) x courses (columns) with
assignment status in each cell.

Requires role: company_admin, safety_coordinator, or trainer.
"""
service = MatrixService(db)
return service.get_training_matrix(
company_id=ctx.company_id,
site_id=site_id,
role_id=role_id,
course_id=course_id,
category_id=category_id,
status_filter=status_filter,
page=page,
page_size=page_size,
)


@router.get("/matrix/export")
def export_training_matrix(
site_id: Optional[UUID] = Query(None),
role_id: Optional[UUID] = Query(None),
course_id: Optional[UUID] = Query(None),
category_id: Optional[UUID] = Query(None),
format: str = Query("csv", description="csv"),
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Export the training matrix as CSV.

Exports all employees (not paginated) for the given filters.
"""
import io

matrix_service = MatrixService(db)
matrix = matrix_service.get_training_matrix(
company_id=ctx.company_id,
site_id=site_id,
role_id=role_id,
course_id=course_id,
category_id=category_id,
page=1,
page_size=10000, # Export all
)

report_service = ReportService(db)
content = report_service.export_matrix_csv(matrix)

return StreamingResponse(
io.BytesIO(content),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=training_matrix.csv"},
)

Router Registration

File: tellus-ehs-hazcom-service/app/api/v1/safepath/__init__.py

Register all SafePath routers in the main app:

"""SafePath Training API Router Registration"""

from fastapi import APIRouter

from app.api.v1.safepath.courses import router as courses_router
from app.api.v1.safepath.assignments import router as assignments_router
from app.api.v1.safepath.results import router as results_router
from app.api.v1.safepath.certifications import router as certifications_router
from app.api.v1.safepath.classroom import router as classroom_router
from app.api.v1.safepath.dashboard import router as dashboard_router
from app.api.v1.safepath.matrix import router as matrix_router

safepath_router = APIRouter(prefix="/safepath", tags=["SafePath Training"])

safepath_router.include_router(courses_router)
safepath_router.include_router(assignments_router)
safepath_router.include_router(results_router)
safepath_router.include_router(certifications_router)
safepath_router.include_router(classroom_router)
safepath_router.include_router(dashboard_router)
safepath_router.include_router(matrix_router)

Then in app/api/v1/__init__.py or app/main.py:

from app.api.v1.safepath import safepath_router

app.include_router(safepath_router, prefix="/api/v1")

This gives URLs like:

  • GET /api/v1/safepath/training/dashboard/me
  • GET /api/v1/safepath/training/dashboard/admin
  • GET /api/v1/safepath/training/reports/completion
  • GET /api/v1/safepath/training/matrix

TypeScript Types

Add to tellus-ehs-hazcom-ui/src/types/safepath.ts:

// ============================================================================
// Dashboard Types
// ============================================================================

export interface AssignmentStatusCounts {
pending: number;
in_progress: number;
completed: number;
overdue: number;
expired: number;
total: number;
}

export interface CompletionRate {
completed: number;
total: number;
rate_percent: number;
}

export interface MyAssignmentItem {
assignment_id: string;
course_title: string;
course_id: string;
due_date: string;
priority: string;
status: string;
progress_percent: number;
last_activity: string | null;
}

export interface MyCertificationItem {
certification_id: string;
certification_type_name: string;
status: string;
expiration_date: string | null;
days_until_expiry: number | null;
}

export interface EmployeeDashboard {
my_assignments: AssignmentStatusCounts;
upcoming_due: MyAssignmentItem[];
overdue_assignments: MyAssignmentItem[];
recent_completions: MyAssignmentItem[];
my_certifications_expiring: MyCertificationItem[];
total_hours_completed: number;
}

export interface TeamComplianceItem {
user_id: string;
employee_name: string;
employee_email: string;
total_assigned: number;
completed: number;
overdue: number;
in_progress: number;
compliance_percent: number;
}

export interface CourseCompletionItem {
course_id: string;
course_title: string;
category_name: string | null;
total_assigned: number;
completed: number;
in_progress: number;
overdue: number;
completion_rate_percent: number;
avg_score_percent: number | null;
}

export interface SiteComplianceItem {
site_id: string;
site_name: string;
total_employees: number;
total_assigned: number;
completed: number;
overdue: number;
compliance_percent: number;
}

export interface OverdueAlertItem {
assignment_id: string;
employee_name: string;
employee_email: string;
course_title: string;
due_date: string;
days_overdue: number;
priority: string;
}

export interface AdminDashboard {
company_summary: AssignmentStatusCounts;
completion_rate_30d: CompletionRate;
completion_rate_90d: CompletionRate;
team_compliance: TeamComplianceItem[];
course_completions: CourseCompletionItem[];
site_compliance: SiteComplianceItem[];
overdue_alerts: OverdueAlertItem[];
certifications_expiring_30d: number;
certifications_expired: number;
}

// ============================================================================
// Training Matrix Types
// ============================================================================

export interface MatrixCourseHeader {
course_id: string;
course_title: string;
category_name: string | null;
osha_standard_ref: string | null;
}

export interface MatrixCellStatus {
status: 'required' | 'in_progress' | 'completed' | 'overdue' | 'expired' | 'not_assigned';
assignment_id: string | null;
due_date: string | null;
completion_date: string | null;
score_percent: number | null;
}

export interface MatrixEmployeeRow {
user_id: string;
employee_name: string;
employee_email: string;
site_name: string | null;
role_name: string | null;
cells: Record<string, MatrixCellStatus>;
compliance_percent: number;
}

export interface TrainingMatrix {
courses: MatrixCourseHeader[];
employees: MatrixEmployeeRow[];
total_employees: number;
overall_compliance_percent: number;
}

// ============================================================================
// Report Types
// ============================================================================

export interface CompletionReportRow {
employee_name: string;
employee_email: string;
site_name: string | null;
course_title: string;
category_name: string | null;
assignment_date: string;
due_date: string;
completion_date: string | null;
status: string;
score_percent: number | null;
passed: boolean | null;
delivery_method: string;
instructor_name: string | null;
}

export interface CompletionReport {
report_title: string;
generated_at: string;
company_name: string;
filter_description: string;
rows: CompletionReportRow[];
total_rows: number;
summary: AssignmentStatusCounts;
}

export interface IndividualTranscript {
employee_name: string;
employee_email: string;
employee_id: string;
company_name: string;
generated_at: string;
records: IndividualTranscriptRow[];
total_completed: number;
total_in_progress: number;
total_hours: number;
certifications_active: number;
certifications_expiring: number;
}

export interface IndividualTranscriptRow {
course_title: string;
category_name: string | null;
completion_date: string | null;
score_percent: number | null;
passed: boolean | null;
delivery_method: string;
certificate_number: string | null;
certification_expiration: string | null;
}

export interface CertificationStatusReport {
report_title: string;
generated_at: string;
company_name: string;
rows: CertificationStatusRow[];
total_active: number;
total_expiring_soon: number;
total_expired: number;
}

export interface CertificationStatusRow {
employee_name: string;
employee_email: string;
site_name: string | null;
certification_type: string;
osha_standard_ref: string | null;
issue_date: string;
expiration_date: string | null;
status: string;
days_until_expiry: number | null;
source: string;
}

Frontend API Service

Add to tellus-ehs-hazcom-ui/src/services/safepath-api.ts:

import api from './api';
import type {
EmployeeDashboard,
AdminDashboard,
TrainingMatrix,
CompletionReport,
IndividualTranscript,
CertificationStatusReport,
} from '../types/safepath';

const BASE = '/safepath/training';

// ============================================================================
// Dashboard
// ============================================================================

export async function getMyDashboard(): Promise<EmployeeDashboard> {
const { data } = await api.get(`${BASE}/dashboard/me`);
return data;
}

export async function getAdminDashboard(params?: {
site_id?: string;
}): Promise<AdminDashboard> {
const { data } = await api.get(`${BASE}/dashboard/admin`, { params });
return data;
}

// ============================================================================
// Training Matrix
// ============================================================================

export async function getTrainingMatrix(params?: {
site_id?: string;
role_id?: string;
course_id?: string;
category_id?: string;
status_filter?: string;
page?: number;
page_size?: number;
}): Promise<TrainingMatrix> {
const { data } = await api.get(`${BASE}/matrix`, { params });
return data;
}

export function getTrainingMatrixExportUrl(params?: {
site_id?: string;
role_id?: string;
course_id?: string;
category_id?: string;
format?: string;
}): string {
const searchParams = new URLSearchParams();
if (params) {
Object.entries(params).forEach(([key, value]) => {
if (value) searchParams.set(key, value);
});
}
return `${BASE}/matrix/export?${searchParams.toString()}`;
}

// ============================================================================
// Reports
// ============================================================================

export async function getCompletionReport(params?: {
site_id?: string;
course_id?: string;
status?: string;
date_from?: string;
date_to?: string;
}): Promise<CompletionReport> {
const { data } = await api.get(`${BASE}/reports/completion`, { params });
return data;
}

export async function getIndividualTranscript(
userId: string
): Promise<IndividualTranscript> {
const { data } = await api.get(`${BASE}/reports/transcript/${userId}`);
return data;
}

export async function getCertificationReport(params?: {
site_id?: string;
status?: string;
}): Promise<CertificationStatusReport> {
const { data } = await api.get(`${BASE}/reports/certifications`, { params });
return data;
}

// ============================================================================
// Report Exports (download URLs)
// ============================================================================

export function getCompletionReportExportUrl(params: {
format: 'csv' | 'pdf' | 'xlsx';
site_id?: string;
course_id?: string;
status?: string;
date_from?: string;
date_to?: string;
}): string {
const searchParams = new URLSearchParams();
Object.entries(params).forEach(([key, value]) => {
if (value) searchParams.set(key, value);
});
return `${BASE}/reports/completion/export?${searchParams.toString()}`;
}

export function getTranscriptExportUrl(
userId: string,
format: 'csv' | 'pdf' = 'csv'
): string {
return `${BASE}/reports/transcript/${userId}/export?format=${format}`;
}

export function getCertificationReportExportUrl(params: {
format: 'csv' | 'pdf';
site_id?: string;
status?: string;
}): string {
const searchParams = new URLSearchParams();
Object.entries(params).forEach(([key, value]) => {
if (value) searchParams.set(key, value);
});
return `${BASE}/reports/certifications/export?${searchParams.toString()}`;
}

Dependencies

Add to tellus-ehs-hazcom-service/requirements.txt:

openpyxl>=3.1.0     # XLSX export for reports

(ReportLab should already be listed from Phase 1C for certificate generation.)


Verification Checklist

  1. Employee dashboard: GET /dashboard/me returns assignment counts, upcoming/overdue items, completions, expiring certs, hours
  2. Admin dashboard: GET /dashboard/admin returns company-wide stats, team compliance, course completions, site compliance, overdue alerts
  3. Admin dashboard site filter: GET /dashboard/admin?site_id=X scopes all stats to that site
  4. Training matrix: GET /matrix returns employees x courses grid with correct cell statuses
  5. Matrix filters: site_id, category_id, status_filter=overdue all filter correctly
  6. Matrix pagination: page=2&page_size=25 returns correct slice of employees
  7. Completion report: GET /reports/completion returns all assignments with scores and dates
  8. Completion report filters: site_id, course_id, status, date_from, date_to all work
  9. Individual transcript: GET /reports/transcript/{user_id} returns full training history
  10. Certification report: GET /reports/certifications returns all certifications with expiration info
  11. CSV export: GET /reports/completion/export?format=csv downloads valid CSV file
  12. PDF export: GET /reports/completion/export?format=pdf downloads valid PDF with Tellus branding
  13. XLSX export: GET /reports/completion/export?format=xlsx downloads valid Excel file with Summary + Detail sheets
  14. Matrix export: GET /matrix/export downloads CSV with employee rows and course columns
  15. Transcript export: GET /reports/transcript/{id}/export downloads CSV transcript