Skip to main content

SafePath Training — Phase 1C: Assignment & Training Delivery

Overview

This document covers the assignment engine and training delivery pipeline:

  • Assignment creation (manual and bulk)
  • Training delivery (lesson progress tracking, play-and-lock)
  • Quiz submission and server-side scoring
  • Completion records with e-signature
  • PDF certificate generation
  • Classroom/in-person training records

Prerequisite: Phase 1B (course management) must be complete.

Files to create:

  • tellus-ehs-hazcom-service/app/schemas/safepath/assignment.py
  • tellus-ehs-hazcom-service/app/schemas/safepath/result.py
  • tellus-ehs-hazcom-service/app/schemas/safepath/classroom.py
  • tellus-ehs-hazcom-service/app/services/safepath/assignment_service.py
  • tellus-ehs-hazcom-service/app/services/safepath/delivery_service.py
  • tellus-ehs-hazcom-service/app/services/safepath/quiz_service.py
  • tellus-ehs-hazcom-service/app/services/safepath/certificate_generator.py
  • tellus-ehs-hazcom-service/app/services/safepath/classroom_service.py
  • tellus-ehs-hazcom-service/app/api/v1/safepath/assignments.py
  • tellus-ehs-hazcom-service/app/api/v1/safepath/results.py
  • tellus-ehs-hazcom-service/app/api/v1/safepath/classroom.py

Pydantic Schemas

Assignment Schemas

File: tellus-ehs-hazcom-service/app/schemas/safepath/assignment.py

"""SafePath Assignment Schemas"""

from datetime import date, datetime
from typing import Optional, List
from uuid import UUID
from pydantic import BaseModel, Field, ConfigDict


class AssignmentCreate(BaseModel):
"""Create one or more assignments."""
course_id: UUID
assigned_to_user_ids: List[UUID] = Field(..., min_length=1, description="One or more user IDs")
due_date: date
priority: str = Field(default="normal", pattern="^(normal|urgent)$")
site_id: Optional[UUID] = None
notes: Optional[str] = None


class AssignmentUpdate(BaseModel):
"""Update an assignment (e.g., change due date or priority)."""
due_date: Optional[date] = None
priority: Optional[str] = Field(None, pattern="^(normal|urgent)$")
notes: Optional[str] = None


class AssignmentListItem(BaseModel):
"""Assignment list item with user and course info."""
model_config = ConfigDict(from_attributes=True)

assignment_id: UUID
course_id: UUID
course_title: str
course_version: int
assigned_to: UUID
assigned_to_name: str
assigned_to_email: str
assigned_by_name: Optional[str] = None
site_name: Optional[str] = None
due_date: date
priority: str
status: str
notes: Optional[str] = None
latest_attempt_number: int = 0
latest_score_percent: Optional[float] = None
completed_at: Optional[datetime] = None
created_at: datetime


class AssignmentListResponse(BaseModel):
"""Paginated assignment list."""
items: List[AssignmentListItem]
total: int
page: int
page_size: int
total_pages: int


class AssignmentDetailResponse(BaseModel):
"""Full assignment detail for learner view."""
model_config = ConfigDict(from_attributes=True)

assignment_id: UUID
course_id: UUID
course_title: str
course_description: Optional[str] = None
course_version: int
passing_score_percent: int
max_retakes: int
due_date: date
priority: str
status: str
lessons: List[dict] # Lesson data with progress overlay
quiz: Optional[dict] = None # Quiz data without correct answers
attempts: List["AttemptSummary"] = []
can_retake: bool = True


class AttemptSummary(BaseModel):
"""Summary of a single training attempt."""
attempt_number: int
score_percent: Optional[float] = None
passed: Optional[bool] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None

Result Schemas

File: tellus-ehs-hazcom-service/app/schemas/safepath/result.py

"""SafePath Result Schemas"""

from datetime import datetime
from typing import Optional, List
from uuid import UUID
from pydantic import BaseModel, Field, ConfigDict


class LessonProgressUpdate(BaseModel):
"""Update progress for a single lesson."""
lesson_id: UUID
percent_complete: int = Field(..., ge=0, le=100)
last_position: Optional[int] = None # For video: seconds watched


class QuizSubmission(BaseModel):
"""Submit quiz answers for scoring."""
answers: List[dict] = Field(
...,
description='Array of {"question_id": "uuid", "selected": ["a"]} objects',
)


class TrainingCompletion(BaseModel):
"""Complete training with e-signature acknowledgment."""
signature_text: str = Field(..., min_length=1, description="Typed name as e-signature")


class ResultResponse(BaseModel):
"""Training result response."""
model_config = ConfigDict(from_attributes=True)

result_id: UUID
assignment_id: UUID
user_id: UUID
course_id: UUID
attempt_number: int
score_percent: Optional[float] = None
passed: Optional[bool] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
duration_seconds: Optional[int] = None
delivery_method: str
instructor_name: Optional[str] = None
acknowledgment_at: Optional[datetime] = None
created_at: datetime


class QuizScoreResponse(BaseModel):
"""Response after quiz submission."""
score_percent: float
passed: bool
correct_count: int
total_questions: int
can_retake: bool
attempts_remaining: int
feedback: Optional[List[dict]] = None # Per-question feedback if enabled

Classroom Schemas

File: tellus-ehs-hazcom-service/app/schemas/safepath/classroom.py

"""SafePath Classroom Session Schemas"""

from datetime import date, time, datetime
from typing import Optional, List
from uuid import UUID
from pydantic import BaseModel, Field, ConfigDict


class ClassroomSessionCreate(BaseModel):
"""Create an in-person training session."""
course_id: UUID
instructor_name: str = Field(..., min_length=1, max_length=255)
session_date: date
start_time: Optional[time] = None
end_time: Optional[time] = None
location: Optional[str] = Field(None, max_length=255)
site_id: Optional[UUID] = None
max_attendees: Optional[int] = Field(None, ge=1)
notes: Optional[str] = None


class AttendanceRecord(BaseModel):
"""Single attendee record for a classroom session."""
user_id: UUID
passed: bool = True
score_percent: Optional[float] = Field(None, ge=0, le=100)
notes: Optional[str] = None


class ClassroomAttendanceSubmit(BaseModel):
"""Submit attendance for a classroom session."""
attendees: List[AttendanceRecord] = Field(..., min_length=1)


class ClassroomSessionResponse(BaseModel):
"""Response for a classroom session."""
model_config = ConfigDict(from_attributes=True)

session_id: UUID
course_id: UUID
course_title: str
instructor_name: str
session_date: date
start_time: Optional[time] = None
end_time: Optional[time] = None
location: Optional[str] = None
site_name: Optional[str] = None
max_attendees: Optional[int] = None
attendee_count: int = 0
notes: Optional[str] = None
created_at: datetime


class ClassroomSessionListResponse(BaseModel):
"""Paginated classroom session list."""
items: List[ClassroomSessionResponse]
total: int
page: int
page_size: int
total_pages: int

Assignment Service

File: tellus-ehs-hazcom-service/app/services/safepath/assignment_service.py

"""SafePath Assignment Service

Handles assignment creation, status management, and bulk operations.
"""

from typing import Optional, List, Tuple
from uuid import UUID
from datetime import datetime, date

from sqlalchemy import func, and_
from sqlalchemy.orm import Session, joinedload

from app.db.models.safepath import (
Assignment,
Course,
Result,
SafePathAuditLog,
)
from app.db.models.user import User
from app.db.models.company import CompanySite
from app.schemas.safepath.assignment import AssignmentCreate, AssignmentUpdate, AssignmentListItem


class AssignmentService:
"""Service for managing training assignments."""

def __init__(self, db: Session):
self.db = db

def create_assignments(
self, company_id: UUID, assigned_by: UUID, data: AssignmentCreate
) -> List[Assignment]:
"""Create assignments for one or more users.

Validates:
- Course exists and is published
- Users belong to the company
- No duplicate active assignment for same user+course
"""
# Validate course
course = (
self.db.query(Course)
.filter(Course.course_id == data.course_id, Course.company_id == company_id)
.first()
)
if not course:
raise ValueError("Course not found")
if course.status != "published":
raise ValueError("Can only assign published courses")

created = []
for user_id in data.assigned_to_user_ids:
# Check for existing active assignment
existing = (
self.db.query(Assignment)
.filter(
Assignment.course_id == data.course_id,
Assignment.assigned_to == user_id,
Assignment.company_id == company_id,
Assignment.status.in_(["pending", "in_progress"]),
)
.first()
)
if existing:
continue # Skip duplicate

assignment = Assignment(
company_id=company_id,
course_id=data.course_id,
course_version=course.version_number,
assigned_to=user_id,
assigned_by=assigned_by,
site_id=data.site_id,
due_date=data.due_date,
priority=data.priority,
status="pending",
notes=data.notes,
)
self.db.add(assignment)
self.db.flush()
created.append(assignment)

# Audit log
self._log_event(
company_id=company_id,
event_type="assignment.created",
entity_type="assignment",
entity_id=assignment.assignment_id,
user_id=assigned_by,
details={
"course_id": str(data.course_id),
"assigned_to": str(user_id),
"due_date": str(data.due_date),
},
)

return created

def list_assignments(
self,
company_id: UUID,
page: int = 1,
page_size: int = 25,
status: Optional[str] = None,
user_id: Optional[UUID] = None,
course_id: Optional[UUID] = None,
site_id: Optional[UUID] = None,
overdue_only: bool = False,
) -> Tuple[List[AssignmentListItem], int]:
"""List assignments with filtering and pagination."""
query = self.db.query(Assignment).filter(Assignment.company_id == company_id)

if status:
query = query.filter(Assignment.status == status)
if user_id:
query = query.filter(Assignment.assigned_to == user_id)
if course_id:
query = query.filter(Assignment.course_id == course_id)
if site_id:
query = query.filter(Assignment.site_id == site_id)
if overdue_only:
query = query.filter(
Assignment.status.in_(["pending", "in_progress"]),
Assignment.due_date < date.today(),
)

total = query.count()

assignments = (
query
.options(joinedload(Assignment.course), joinedload(Assignment.results))
.order_by(Assignment.due_date.asc())
.offset((page - 1) * page_size)
.limit(page_size)
.all()
)

items = []
for a in assignments:
# Get user info
user = self.db.query(User).filter(User.user_id == a.assigned_to).first()
assigner = (
self.db.query(User).filter(User.user_id == a.assigned_by).first()
if a.assigned_by else None
)
site = (
self.db.query(CompanySite).filter(CompanySite.site_id == a.site_id).first()
if a.site_id else None
)

# Latest attempt info
latest_result = (
self.db.query(Result)
.filter(Result.assignment_id == a.assignment_id)
.order_by(Result.attempt_number.desc())
.first()
)

items.append(AssignmentListItem(
assignment_id=a.assignment_id,
course_id=a.course_id,
course_title=a.course.title if a.course else "Unknown",
course_version=a.course_version,
assigned_to=a.assigned_to,
assigned_to_name=f"{user.first_name} {user.last_name}" if user else "Unknown",
assigned_to_email=user.email if user else "",
assigned_by_name=(
f"{assigner.first_name} {assigner.last_name}" if assigner else None
),
site_name=site.site_name if site else None,
due_date=a.due_date,
priority=a.priority,
status=a.status,
notes=a.notes,
latest_attempt_number=latest_result.attempt_number if latest_result else 0,
latest_score_percent=(
float(latest_result.score_percent) if latest_result and latest_result.score_percent else None
),
completed_at=latest_result.completed_at if latest_result and latest_result.passed else None,
created_at=a.created_at,
))

return items, total

def update_assignment(
self, company_id: UUID, assignment_id: UUID, data: AssignmentUpdate
) -> Optional[Assignment]:
"""Update assignment details (due date, priority, notes)."""
assignment = (
self.db.query(Assignment)
.filter(
Assignment.assignment_id == assignment_id,
Assignment.company_id == company_id,
)
.first()
)
if not assignment:
return None

update_data = data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(assignment, field, value)
assignment.updated_at = datetime.utcnow()
self.db.flush()
return assignment

def mark_overdue(self, company_id: UUID) -> int:
"""Batch update: mark past-due pending/in_progress assignments as overdue.

Called by a background job daily.
Returns count of updated assignments.
"""
count = (
self.db.query(Assignment)
.filter(
Assignment.company_id == company_id,
Assignment.status.in_(["pending", "in_progress"]),
Assignment.due_date < date.today(),
)
.update({"status": "overdue", "updated_at": datetime.utcnow()}, synchronize_session=False)
)
return count

def _log_event(self, **kwargs):
log = SafePathAuditLog(**kwargs)
self.db.add(log)

Quiz Scoring Service

File: tellus-ehs-hazcom-service/app/services/safepath/quiz_service.py

"""SafePath Quiz Scoring Service

Server-side quiz scoring. Answers are validated against the stored
correct options. Score is computed as (correct / total) * 100.
"""

from typing import List, Tuple, Optional
from uuid import UUID

from sqlalchemy.orm import Session

from app.db.models.safepath import Quiz, QuizQuestion


class QuizService:
"""Server-side quiz scoring."""

def __init__(self, db: Session):
self.db = db

def score_quiz(
self, course_id: UUID, answers: List[dict]
) -> Tuple[float, bool, int, int, List[dict]]:
"""Score a quiz submission.

Args:
course_id: The course containing the quiz
answers: List of {"question_id": "uuid", "selected": ["a", "b"]}

Returns:
(score_percent, passed, correct_count, total_questions, feedback)
"""
# Get the quiz and questions for this course
quiz = (
self.db.query(Quiz)
.filter(Quiz.course_id == course_id)
.first()
)
if not quiz:
return 0.0, False, 0, 0, []

questions = (
self.db.query(QuizQuestion)
.filter(QuizQuestion.quiz_id == quiz.quiz_id)
.all()
)
if not questions:
return 100.0, True, 0, 0, []

question_map = {str(q.question_id): q for q in questions}
answer_map = {a["question_id"]: a.get("selected", []) for a in answers}

correct_count = 0
feedback = []

for q in questions:
qid = str(q.question_id)
selected = answer_map.get(qid, [])
is_correct = self._check_answer(q, selected)
if is_correct:
correct_count += 1

feedback.append({
"question_id": qid,
"correct": is_correct,
"explanation": q.explanation,
})

total = len(questions)
score_percent = round((correct_count / total) * 100, 2) if total > 0 else 0.0

# Get passing score from course
from app.db.models.safepath import Course
course = self.db.query(Course).filter(Course.course_id == course_id).first()
passing_score = course.passing_score_percent if course else 80
passed = score_percent >= passing_score

return score_percent, passed, correct_count, total, feedback

def _check_answer(self, question: QuizQuestion, selected: List[str]) -> bool:
"""Check if selected answer(s) are correct for a question."""
options = question.options
if not options or not selected:
return False

if question.question_type in ("mcq_single", "true_false"):
# Single correct answer
correct_ids = [o["id"] for o in options if o.get("is_correct")]
return len(selected) == 1 and selected[0] in correct_ids

elif question.question_type == "mcq_multi":
# All correct must be selected, no extras
correct_ids = set(o["id"] for o in options if o.get("is_correct"))
return set(selected) == correct_ids

elif question.question_type == "matching":
# All pairs must be correctly matched
correct_pairs = {o["id"]: o["right"] for o in options if o.get("is_correct", True)}
selected_pairs = {s["id"]: s.get("right", "") for s in selected} if isinstance(selected[0], dict) else {}
return selected_pairs == correct_pairs

return False

Training Delivery Service

File: tellus-ehs-hazcom-service/app/services/safepath/delivery_service.py

"""SafePath Training Delivery Service

Manages the learner experience: starting training, tracking lesson progress,
submitting quizzes, recording completion with e-signature.
"""

import hashlib
from typing import Optional
from uuid import UUID
from datetime import datetime

from sqlalchemy.orm import Session, joinedload

from app.db.models.safepath import (
Assignment,
Course,
Lesson,
Result,
SafePathAuditLog,
)
from app.services.safepath.quiz_service import QuizService


class DeliveryService:
"""Manages the learner training delivery flow."""

def __init__(self, db: Session):
self.db = db
self.quiz_service = QuizService(db)

def start_training(
self, assignment_id: UUID, user_id: UUID
) -> Optional[Result]:
"""Start a training attempt.

Creates a new Result record and updates assignment status to in_progress.
"""
assignment = (
self.db.query(Assignment)
.filter(
Assignment.assignment_id == assignment_id,
Assignment.assigned_to == user_id,
)
.first()
)
if not assignment:
return None
if assignment.status not in ("pending", "in_progress", "overdue"):
raise ValueError(f"Cannot start training for assignment with status '{assignment.status}'")

# Determine attempt number
existing_attempts = (
self.db.query(Result)
.filter(Result.assignment_id == assignment_id)
.count()
)

# Check retake limit
course = self.db.query(Course).filter(Course.course_id == assignment.course_id).first()
if existing_attempts >= (course.max_retakes + 1): # +1 because first attempt is not a "retake"
raise ValueError("Maximum retake attempts exceeded")

result = Result(
assignment_id=assignment_id,
user_id=user_id,
course_id=assignment.course_id,
attempt_number=existing_attempts + 1,
started_at=datetime.utcnow(),
delivery_method="online",
lesson_progress={},
)
self.db.add(result)

# Update assignment status
if assignment.status in ("pending", "overdue"):
assignment.status = "in_progress"
assignment.updated_at = datetime.utcnow()

self.db.flush()

self._log_event(
company_id=assignment.company_id,
event_type="assignment.started",
entity_type="assignment",
entity_id=assignment_id,
user_id=user_id,
details={"attempt_number": result.attempt_number},
)
return result

def update_lesson_progress(
self, result_id: UUID, user_id: UUID, lesson_id: UUID, percent_complete: int, last_position: Optional[int] = None
) -> Optional[Result]:
"""Update progress for a lesson within an active training attempt."""
result = (
self.db.query(Result)
.filter(Result.result_id == result_id, Result.user_id == user_id)
.first()
)
if not result or result.completed_at:
return None

progress = result.lesson_progress or {}
lid = str(lesson_id)
progress[lid] = {
"completed": percent_complete >= 80, # threshold check
"percent": percent_complete,
"last_position": last_position,
}
result.lesson_progress = progress
self.db.flush()
return result

def check_quiz_unlocked(self, result_id: UUID) -> bool:
"""Check if all lessons meet their completion threshold (play-and-lock)."""
result = self.db.query(Result).filter(Result.result_id == result_id).first()
if not result:
return False

lessons = (
self.db.query(Lesson)
.filter(Lesson.course_id == result.course_id)
.all()
)
progress = result.lesson_progress or {}

for lesson in lessons:
lid = str(lesson.lesson_id)
lesson_progress = progress.get(lid, {})
if lesson_progress.get("percent", 0) < lesson.completion_threshold_percent:
return False

return True

def submit_quiz(
self, result_id: UUID, user_id: UUID, answers: list
) -> dict:
"""Submit quiz answers and compute score.

Returns scoring result.
"""
result = (
self.db.query(Result)
.filter(Result.result_id == result_id, Result.user_id == user_id)
.first()
)
if not result:
raise ValueError("Result not found")
if result.completed_at:
raise ValueError("This attempt is already completed")

# Check play-and-lock
if not self.check_quiz_unlocked(result_id):
raise ValueError("Complete all lessons before taking the quiz")

# Score the quiz
score_percent, passed, correct_count, total_questions, feedback = (
self.quiz_service.score_quiz(result.course_id, answers)
)

# Update result
result.quiz_answers = answers
result.score_percent = score_percent
result.passed = passed

# Get course for retake info
course = self.db.query(Course).filter(Course.course_id == result.course_id).first()
assignment = self.db.query(Assignment).filter(
Assignment.assignment_id == result.assignment_id
).first()

total_attempts = (
self.db.query(Result)
.filter(Result.assignment_id == result.assignment_id)
.count()
)
attempts_remaining = max(0, (course.max_retakes + 1) - total_attempts)
can_retake = not passed and attempts_remaining > 0

self.db.flush()

# Audit
event = "quiz.passed" if passed else "quiz.failed"
self._log_event(
company_id=assignment.company_id,
event_type=event,
entity_type="result",
entity_id=result.result_id,
user_id=user_id,
details={"score": score_percent, "passed": passed, "attempt": result.attempt_number},
)

return {
"score_percent": score_percent,
"passed": passed,
"correct_count": correct_count,
"total_questions": total_questions,
"can_retake": can_retake,
"attempts_remaining": attempts_remaining,
"feedback": feedback,
}

def complete_training(
self, result_id: UUID, user_id: UUID, signature_text: str, ip_address: Optional[str] = None
) -> Optional[Result]:
"""Complete training with e-signature acknowledgment.

Only allowed if quiz was passed (or no quiz exists).
"""
result = (
self.db.query(Result)
.filter(Result.result_id == result_id, Result.user_id == user_id)
.first()
)
if not result:
return None
if result.completed_at:
raise ValueError("Training already completed")

# Check quiz passed (if quiz exists)
from app.db.models.safepath import Quiz
has_quiz = self.db.query(Quiz).filter(Quiz.course_id == result.course_id).count() > 0
if has_quiz and not result.passed:
raise ValueError("Must pass the quiz before completing training")

# Record completion
now = datetime.utcnow()
result.completed_at = now
result.acknowledgment_signature = signature_text
result.acknowledgment_at = now
if ip_address:
result.acknowledgment_ip_hash = hashlib.sha256(ip_address.encode()).hexdigest()
if result.started_at:
result.duration_seconds = int((now - result.started_at).total_seconds())

# Update assignment status
assignment = self.db.query(Assignment).filter(
Assignment.assignment_id == result.assignment_id
).first()
if assignment:
assignment.status = "completed"
assignment.updated_at = now

self.db.flush()

self._log_event(
company_id=assignment.company_id if assignment else None,
event_type="assignment.completed",
entity_type="assignment",
entity_id=result.assignment_id,
user_id=user_id,
details={
"result_id": str(result.result_id),
"score": float(result.score_percent) if result.score_percent else None,
"duration_seconds": result.duration_seconds,
},
)
return result

def _log_event(self, **kwargs):
if kwargs.get("company_id"):
log = SafePathAuditLog(**kwargs)
self.db.add(log)

Certificate Generator

File: tellus-ehs-hazcom-service/app/services/safepath/certificate_generator.py

"""SafePath Certificate Generator

Generates PDF certificates for completed training using ReportLab.

Dependencies to add to requirements.txt:
reportlab>=4.0.0
"""

import io
from datetime import datetime
from typing import Optional

from reportlab.lib.pagesizes import LETTER, landscape
from reportlab.lib.units import inch
from reportlab.lib.colors import HexColor
from reportlab.pdfgen import canvas


class CertificateGenerator:
"""Generates training completion certificates as PDF."""

# Tellus brand colors
PRIMARY = HexColor("#1a56db")
DARK = HexColor("#1e293b")
LIGHT_BG = HexColor("#f8fafc")
BORDER = HexColor("#3b82f6")

def generate(
self,
employee_name: str,
course_title: str,
completion_date: datetime,
score_percent: Optional[float] = None,
instructor_name: Optional[str] = None,
company_name: Optional[str] = None,
osha_ref: Optional[str] = None,
) -> bytes:
"""Generate a PDF certificate and return as bytes.

Args:
employee_name: Full name of the employee
course_title: Title of the completed course
completion_date: Date/time of completion
score_percent: Quiz score (if applicable)
instructor_name: Trainer/instructor name
company_name: Company name for branding
osha_ref: OSHA standard reference (e.g., "1910.134")

Returns:
PDF file as bytes
"""
buffer = io.BytesIO()
c = canvas.Canvas(buffer, pagesize=landscape(LETTER))
width, height = landscape(LETTER)

# Background
c.setFillColor(self.LIGHT_BG)
c.rect(0, 0, width, height, fill=1, stroke=0)

# Border
c.setStrokeColor(self.BORDER)
c.setLineWidth(3)
c.rect(0.5 * inch, 0.5 * inch, width - inch, height - inch, fill=0, stroke=1)

# Inner border
c.setLineWidth(1)
c.rect(0.6 * inch, 0.6 * inch, width - 1.2 * inch, height - 1.2 * inch, fill=0, stroke=1)

# Header
c.setFillColor(self.PRIMARY)
c.setFont("Helvetica-Bold", 14)
c.drawCentredString(width / 2, height - 1.2 * inch, "CERTIFICATE OF COMPLETION")

# Company name
if company_name:
c.setFillColor(self.DARK)
c.setFont("Helvetica", 11)
c.drawCentredString(width / 2, height - 1.6 * inch, company_name)

# "This certifies that"
c.setFont("Helvetica", 12)
c.drawCentredString(width / 2, height - 2.2 * inch, "This certifies that")

# Employee name
c.setFont("Helvetica-Bold", 24)
c.setFillColor(self.DARK)
c.drawCentredString(width / 2, height - 2.8 * inch, employee_name)

# "has successfully completed"
c.setFont("Helvetica", 12)
c.drawCentredString(width / 2, height - 3.3 * inch, "has successfully completed")

# Course title
c.setFont("Helvetica-Bold", 18)
c.setFillColor(self.PRIMARY)
c.drawCentredString(width / 2, height - 3.9 * inch, course_title)

# OSHA reference
if osha_ref:
c.setFont("Helvetica", 10)
c.setFillColor(self.DARK)
c.drawCentredString(width / 2, height - 4.3 * inch, f"OSHA Standard: {osha_ref}")

# Score and date
y_bottom = 1.5 * inch
c.setFont("Helvetica", 11)
c.setFillColor(self.DARK)

date_str = completion_date.strftime("%B %d, %Y")
c.drawCentredString(width / 2, y_bottom + 0.6 * inch, f"Date: {date_str}")

if score_percent is not None:
c.drawCentredString(width / 2, y_bottom + 0.3 * inch, f"Score: {score_percent:.0f}%")

# Instructor signature line
if instructor_name:
sig_y = y_bottom - 0.1 * inch
c.line(width / 2 - 1.5 * inch, sig_y, width / 2 + 1.5 * inch, sig_y)
c.setFont("Helvetica", 10)
c.drawCentredString(width / 2, sig_y - 0.2 * inch, instructor_name)
c.setFont("Helvetica", 8)
c.drawCentredString(width / 2, sig_y - 0.4 * inch, "Instructor / Safety Coordinator")

# Footer
c.setFont("Helvetica", 8)
c.setFillColor(HexColor("#94a3b8"))
c.drawCentredString(width / 2, 0.8 * inch, "Generated by Tellus EHS - SafePath Training")

c.save()
buffer.seek(0)
return buffer.read()

API Endpoints

File: tellus-ehs-hazcom-service/app/api/v1/safepath/assignments.py

"""SafePath Assignment & Delivery API Endpoints"""

import math
from typing import Optional
from uuid import UUID

from fastapi import APIRouter, Depends, HTTPException, Query, Request
from sqlalchemy.orm import Session

from app.db.session import get_db
from app.api.v1.adminhq.auth import get_user_context, UserContext
from app.services.safepath.assignment_service import AssignmentService
from app.services.safepath.delivery_service import DeliveryService
from app.services.safepath.certificate_generator import CertificateGenerator
from app.schemas.safepath.assignment import (
AssignmentCreate,
AssignmentUpdate,
AssignmentListResponse,
AssignmentDetailResponse,
)
from app.schemas.safepath.result import (
LessonProgressUpdate,
QuizSubmission,
TrainingCompletion,
ResultResponse,
QuizScoreResponse,
)

router = APIRouter(prefix="/training", tags=["SafePath Training"])


# ============================================================================
# Assignment Management
# ============================================================================

@router.post("/assignments", status_code=201)
def create_assignments(
data: AssignmentCreate,
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Create training assignments for one or more users."""
service = AssignmentService(db)
try:
assignments = service.create_assignments(ctx.company_id, ctx.user_id, data)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
db.commit()
return {"success": True, "created_count": len(assignments)}


@router.get("/assignments", response_model=AssignmentListResponse)
def list_assignments(
page: int = Query(1, ge=1),
page_size: int = Query(25, ge=1, le=100),
status: Optional[str] = Query(None),
user_id: Optional[UUID] = Query(None),
course_id: Optional[UUID] = Query(None),
site_id: Optional[UUID] = Query(None),
overdue_only: bool = Query(False),
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""List training assignments with filtering."""
service = AssignmentService(db)
items, total = service.list_assignments(
company_id=ctx.company_id,
page=page,
page_size=page_size,
status=status,
user_id=user_id,
course_id=course_id,
site_id=site_id,
overdue_only=overdue_only,
)
return AssignmentListResponse(
items=items,
total=total,
page=page,
page_size=page_size,
total_pages=math.ceil(total / page_size) if total > 0 else 0,
)


@router.put("/assignments/{assignment_id}")
def update_assignment(
assignment_id: UUID,
data: AssignmentUpdate,
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Update assignment details."""
service = AssignmentService(db)
assignment = service.update_assignment(ctx.company_id, assignment_id, data)
if not assignment:
raise HTTPException(status_code=404, detail="Assignment not found")
db.commit()
return {"success": True}


# ============================================================================
# Training Delivery (Learner Flow)
# ============================================================================

@router.post("/assignments/{assignment_id}/start", response_model=ResultResponse)
def start_training(
assignment_id: UUID,
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Start a training attempt for an assignment."""
service = DeliveryService(db)
try:
result = service.start_training(assignment_id, ctx.user_id)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not result:
raise HTTPException(status_code=404, detail="Assignment not found")
db.commit()
return ResultResponse.model_validate(result)


@router.post("/results/{result_id}/lesson-progress")
def update_lesson_progress(
result_id: UUID,
data: LessonProgressUpdate,
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Update lesson progress during training."""
service = DeliveryService(db)
result = service.update_lesson_progress(
result_id, ctx.user_id, data.lesson_id, data.percent_complete, data.last_position
)
if not result:
raise HTTPException(status_code=404, detail="Active result not found")
db.commit()
return {"success": True, "lesson_progress": result.lesson_progress}


@router.post("/results/{result_id}/submit-quiz", response_model=QuizScoreResponse)
def submit_quiz(
result_id: UUID,
data: QuizSubmission,
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Submit quiz answers for server-side scoring."""
service = DeliveryService(db)
try:
score_data = service.submit_quiz(result_id, ctx.user_id, data.answers)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
db.commit()
return QuizScoreResponse(**score_data)


@router.post("/results/{result_id}/complete", response_model=ResultResponse)
def complete_training(
result_id: UUID,
data: TrainingCompletion,
request: Request,
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Complete training with e-signature acknowledgment."""
ip_address = request.client.host if request.client else None
service = DeliveryService(db)
try:
result = service.complete_training(result_id, ctx.user_id, data.signature_text, ip_address)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not result:
raise HTTPException(status_code=404, detail="Result not found")
db.commit()
return ResultResponse.model_validate(result)


@router.get("/results/{result_id}/certificate")
def download_certificate(
result_id: UUID,
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Download PDF certificate for a completed training."""
from fastapi.responses import StreamingResponse
from app.db.models.safepath import Result, Course
from app.db.models.user import User
from app.db.models.company import Company

result = db.query(Result).filter(Result.result_id == result_id, Result.user_id == ctx.user_id).first()
if not result or not result.completed_at or not result.passed:
raise HTTPException(status_code=404, detail="Completed training result not found")

course = db.query(Course).filter(Course.course_id == result.course_id).first()
user = db.query(User).filter(User.user_id == result.user_id).first()
company = db.query(Company).filter(Company.company_id == ctx.company_id).first()

generator = CertificateGenerator()
pdf_bytes = generator.generate(
employee_name=f"{user.first_name} {user.last_name}" if user else "Unknown",
course_title=course.title if course else "Unknown Course",
completion_date=result.completed_at,
score_percent=float(result.score_percent) if result.score_percent else None,
instructor_name=result.instructor_name,
company_name=company.company_name if company else None,
osha_ref=course.osha_standard_ref if course else None,
)

return StreamingResponse(
io.BytesIO(pdf_bytes),
media_type="application/pdf",
headers={"Content-Disposition": f"attachment; filename=certificate_{result_id}.pdf"},
)

Learner Flow Sequence

1. GET /training/assignments?user_id=me&status=pending
→ List my pending assignments

2. POST /training/assignments/{id}/start
→ Creates Result record (attempt #1), returns result_id
→ Assignment status: pending → in_progress

3. For each lesson:
POST /training/results/{result_id}/lesson-progress
→ Update: {"lesson_id": "...", "percent_complete": 45}
→ Tracks video position, PDF pages read, etc.

4. When all lessons completed (play-and-lock check passes):
POST /training/results/{result_id}/submit-quiz
→ Submits answers, returns score + pass/fail + feedback
→ If failed: can_retake=true, go back to step 2

5. If passed (or no quiz):
POST /training/results/{result_id}/complete
→ Records e-signature, timestamps, duration
→ Assignment status: in_progress → completed
→ Certificate available for download

6. GET /training/results/{result_id}/certificate
→ Download PDF certificate

Verification Checklist

After implementing this phase:

  1. Assignment flow:

    • Create assignment for a user on a published course
    • Duplicate assignment for same user+course is skipped
    • Assignment appears in user's list with status "pending"
  2. Training delivery:

    • Start training creates a result with attempt_number=1
    • Lesson progress updates are tracked per-lesson
    • Quiz is locked until all lessons meet threshold
    • Quiz submission returns server-computed score
    • Failed quiz allows retake (within max_retakes limit)
    • Completion requires e-signature; records IP hash
  3. Certificate:

    • PDF certificate downloads for completed+passed results
    • Certificate includes employee name, course title, date, score, company name
  4. Classroom:

    • Create in-person session with instructor, date, location
    • Submit attendance creates assignments + results for each attendee