Skip to main content

SDS Parsing Implementation Plan

Overview

This document outlines the implementation plan for automated SDS (Safety Data Sheet) parsing using LLM (Large Language Models). When a user uploads an SDS PDF, the system will:

  1. Extract text from the PDF
  2. Send the text to an LLM (Anthropic Claude or OpenAI GPT-4) with a structured prompt
  3. Parse the JSON response and populate database tables
  4. Mark the SDS as parsed with confidence scores

Current State Analysis

Existing Infrastructure

ComponentStatusLocation
SDSDocument model✅ Existsapp/db/models/chemiq_sds.py
SDSParseJob model✅ Existsapp/db/models/chemiq_sds.py (table: chemiq_sds_parse_queue)
SDSHazardInfo model✅ Existsapp/db/models/chemiq_sds.py
SDSComposition model✅ Existsapp/db/models/chemiq_sds.py
SDSSection model✅ Existsapp/db/models/chemiq_sds.py
SDS upload service✅ Existsapp/services/chemiq/sds_service.py
Background service✅ Existstellus-ehs-background-service/
LLM integration pattern✅ Existsapp/services/chemiq/label_extraction_service.py
SDS extraction prompt✅ Existsdocs/sds_extraction_prompt.md
PDF text extraction⚠️ Basic regextellus-ehs-background-service/app/services/sds_search/pdf_parser.py
LLM-based SDS parser❌ Needs building-
Parse job processor❌ Needs building-
Cron job for parsing❌ Needs building-

Existing Models Ready for Population

SDSDocument
├── sds_parsed: Boolean (flag when complete)
├── parse_confidence: Numeric (0-1 score)
├── parse_errors: JSONB (error details)
├── parsed_at: DateTime
└── Relationships:
├── hazard_info → SDSHazardInfo (Section 2)
├── composition → SDSComposition[] (Section 3)
├── sections → SDSSection[] (Sections 4-16)
└── parse_jobs → SDSParseJob[]

Architecture

High-Level Flow

┌─────────────────┐     ┌──────────────────────┐     ┌─────────────────────┐
│ User Upload │────▶│ SDS Service │────▶│ Create Parse Job │
│ (API Layer) │ │ (Store in S3) │ │ (status: pending) │
└─────────────────┘ └──────────────────────┘ └──────────┬──────────┘


┌─────────────────┐ ┌──────────────────────┐ ┌─────────────────────┐
│ Update SDS │◀────│ SDS Parser Service │◀────│ Background Cron │
│ (parsed=true) │ │ (LLM Extraction) │ │ (Poll pending jobs)│
└─────────────────┘ └──────────────────────┘ └─────────────────────┘

Component Architecture

tellus-ehs-hazcom-service/
├── app/
│ ├── services/
│ │ └── chemiq/
│ │ ├── sds_service.py # Update: Create parse job on upload
│ │ └── sds_parser_service.py # NEW: LLM-based SDS parsing
│ └── core/
│ └── config.py # Update: Add SDS parser settings

tellus-ehs-background-service/
├── app/
│ ├── services/
│ │ └── sds_parse/
│ │ ├── __init__.py # NEW
│ │ ├── service.py # NEW: Parse job processor
│ │ ├── pdf_extractor.py # NEW: Enhanced PDF text extraction
│ │ └── llm_parser.py # NEW: LLM integration
│ └── jobs/
│ └── definitions.py # Update: Add sds_parse_job

Implementation Phases

Phase 1: PDF Text Extraction Enhancement

Objective: Improve PDF text extraction to handle scanned PDFs and complex layouts.

Files to Create/Modify:

  1. tellus-ehs-background-service/app/services/sds_parse/pdf_extractor.py
"""
Enhanced PDF Text Extraction for SDS Documents

Supports:
- Native PDF text extraction (pdfplumber)
- Fallback to pypdf for complex layouts
- OCR support for scanned PDFs (optional, via pytesseract)
"""

import pdfplumber
from pypdf import PdfReader
from typing import Tuple
import logging

logger = logging.getLogger(__name__)

class SDSPDFExtractor:
"""Extract text from SDS PDF documents."""

def __init__(self, enable_ocr: bool = False):
self.enable_ocr = enable_ocr

def extract_text(self, pdf_bytes: bytes) -> Tuple[str, dict]:
"""
Extract text from PDF bytes.

Returns:
Tuple of (extracted_text, metadata)
metadata includes: page_count, extraction_method, confidence
"""
# Try pdfplumber first (best for structured PDFs)
text, metadata = self._extract_with_pdfplumber(pdf_bytes)

if len(text.strip()) < 500: # Likely scanned or image-based
# Fallback to pypdf
text, metadata = self._extract_with_pypdf(pdf_bytes)

if len(text.strip()) < 500 and self.enable_ocr:
# Last resort: OCR
text, metadata = self._extract_with_ocr(pdf_bytes)

return text, metadata

def _extract_with_pdfplumber(self, pdf_bytes: bytes) -> Tuple[str, dict]:
"""Extract using pdfplumber."""
# Implementation
pass

def _extract_with_pypdf(self, pdf_bytes: bytes) -> Tuple[str, dict]:
"""Extract using pypdf."""
# Implementation
pass

def _extract_with_ocr(self, pdf_bytes: bytes) -> Tuple[str, dict]:
"""Extract using OCR (pytesseract)."""
# Implementation (optional)
pass

Dependencies to Add:

pdfplumber>=0.10.0
pypdf>=4.0.0
# Optional for OCR:
# pytesseract>=0.3.10
# pdf2image>=1.16.0

Phase 2: LLM Parser Service (Main Service)

Objective: Create LLM-based SDS parser following existing patterns.

Files to Create:

  1. tellus-ehs-hazcom-service/app/services/chemiq/sds_parser_service.py
"""
SDS Parser Service - LLM-based SDS Document Parsing

Follows the provider pattern established in label_extraction_service.py.
Supports Anthropic Claude and OpenAI GPT-4.
"""

from abc import ABC, abstractmethod
from typing import Optional, Dict, Any
import json
import logging

from app.core.config import settings

logger = logging.getLogger(__name__)


class SDSParserProvider(ABC):
"""Abstract base class for SDS parsing providers."""

@abstractmethod
def parse_sds(
self,
sds_text: str,
prompt_template: str
) -> Dict[str, Any]:
"""
Parse SDS text and return structured JSON.

Args:
sds_text: Extracted text from SDS PDF
prompt_template: The extraction prompt template

Returns:
Parsed SDS data as dictionary matching the JSON schema
"""
pass


class AnthropicSDSParser(SDSParserProvider):
"""Anthropic Claude implementation for SDS parsing."""

def __init__(self):
import anthropic
self.client = anthropic.Anthropic(api_key=settings.ANTHROPIC_API_KEY)
self.model = settings.ANTHROPIC_MODEL # claude-sonnet-4-20250514

def parse_sds(
self,
sds_text: str,
prompt_template: str
) -> Dict[str, Any]:
"""Parse SDS using Claude."""
# Replace placeholder in prompt with actual SDS text
prompt = prompt_template.replace(
"{{ $('add-is-text-empty').first().json.text }}",
sds_text
)

response = self.client.messages.create(
model=self.model,
max_tokens=8192,
messages=[
{
"role": "user",
"content": prompt
}
]
)

return self._parse_json_response(response.content[0].text)

def _parse_json_response(self, response_text: str) -> Dict[str, Any]:
"""Extract JSON from response, handling markdown code blocks."""
text = response_text.strip()

# Handle markdown code blocks
if text.startswith("```json"):
text = text[7:]
elif text.startswith("```"):
text = text[3:]
if text.endswith("```"):
text = text[:-3]

return json.loads(text.strip())


class OpenAISDSParser(SDSParserProvider):
"""OpenAI GPT-4 implementation for SDS parsing."""

def __init__(self):
from openai import OpenAI
self.client = OpenAI(api_key=settings.OPENAI_API_KEY)
self.model = "gpt-4o" # Best for document understanding

def parse_sds(
self,
sds_text: str,
prompt_template: str
) -> Dict[str, Any]:
"""Parse SDS using GPT-4."""
prompt = prompt_template.replace(
"{{ $('add-is-text-empty').first().json.text }}",
sds_text
)

response = self.client.chat.completions.create(
model=self.model,
max_tokens=8192,
messages=[
{
"role": "user",
"content": prompt
}
],
response_format={"type": "json_object"}
)

return json.loads(response.choices[0].message.content)


class SDSParserService:
"""
Main SDS Parser Service.

Provides a unified interface for parsing SDS documents
using configured LLM provider.
"""

def __init__(self, provider: Optional[str] = None):
"""
Initialize parser with specified or default provider.

Args:
provider: "anthropic" or "openai" (defaults to settings)
"""
provider = provider or settings.VISION_LLM_PROVIDER

if provider == "anthropic":
self.parser = AnthropicSDSParser()
elif provider == "openai":
self.parser = OpenAISDSParser()
else:
raise ValueError(f"Unknown SDS parser provider: {provider}")

self.prompt_template = self._load_prompt_template()

def _load_prompt_template(self) -> str:
"""Load the SDS extraction prompt template."""
import os
prompt_path = os.path.join(
os.path.dirname(__file__),
"../../../../docs/sds_extraction_prompt.md"
)
# Fallback path for different execution contexts
if not os.path.exists(prompt_path):
prompt_path = "docs/sds_extraction_prompt.md"

with open(prompt_path, "r") as f:
return f.read()

def parse_sds_document(
self,
sds_text: str
) -> Dict[str, Any]:
"""
Parse SDS document text and return structured data.

Args:
sds_text: Extracted text from SDS PDF

Returns:
Dictionary with all 16 SDS sections in structured format
"""
return self.parser.parse_sds(sds_text, self.prompt_template)

def calculate_confidence(
self,
parsed_data: Dict[str, Any]
) -> float:
"""
Calculate confidence score based on completeness.

Returns:
Float between 0.0 and 1.0
"""
required_sections = [
"section1_identification",
"section2_hazard_identification",
"section3_composition",
]

all_sections = [f"section{i}" for i in range(1, 17)]

# Check required sections
required_present = sum(
1 for s in required_sections
if s in parsed_data and parsed_data[s]
)
required_score = required_present / len(required_sections)

# Check all sections
all_present = sum(
1 for s in all_sections
if any(k.startswith(s) for k in parsed_data.keys())
)
completeness_score = all_present / len(all_sections)

# Weighted score (required sections more important)
return 0.6 * required_score + 0.4 * completeness_score

Phase 3: Parse Job Processor (Background Service)

Objective: Create the background job that polls and processes pending parse jobs.

Files to Create:

  1. tellus-ehs-background-service/app/services/sds_parse/__init__.py
"""SDS Parse Service - Background job for LLM-based SDS parsing."""

from .service import SDSParseService

__all__ = ["SDSParseService"]
  1. tellus-ehs-background-service/app/services/sds_parse/service.py
"""
SDS Parse Service - Background Job Processor

Polls pending parse jobs from database, extracts PDF text,
calls LLM for parsing, and updates database with results.
"""

import logging
from datetime import datetime
from typing import Optional, List
from uuid import UUID

from sqlalchemy.orm import Session
from sqlalchemy import select, update

from app.db.session import get_db
from app.db.models.chemiq_sds import (
SDSDocument,
SDSParseJob,
SDSHazardInfo,
SDSComposition,
SDSSection,
)
from app.utils.s3_client import download_file_from_s3
from .pdf_extractor import SDSPDFExtractor
from .llm_parser import SDSLLMParser

logger = logging.getLogger(__name__)


class SDSParseService:
"""
Service for processing SDS parse jobs.

Designed to be called by a cron/scheduled job.
"""

def __init__(
self,
db: Session,
batch_size: int = 5,
max_retries: int = 3
):
self.db = db
self.batch_size = batch_size
self.max_retries = max_retries
self.pdf_extractor = SDSPDFExtractor()
self.llm_parser = SDSLLMParser()

async def process_pending_jobs(self) -> dict:
"""
Process a batch of pending parse jobs.

Returns:
Summary of processed jobs
"""
# Get pending jobs ordered by priority
pending_jobs = self._get_pending_jobs()

results = {
"processed": 0,
"succeeded": 0,
"failed": 0,
"skipped": 0,
}

for job in pending_jobs:
try:
success = await self._process_single_job(job)
results["processed"] += 1
if success:
results["succeeded"] += 1
else:
results["failed"] += 1
except Exception as e:
logger.error(f"Error processing job {job.job_id}: {e}")
self._mark_job_failed(job, str(e))
results["failed"] += 1

return results

def _get_pending_jobs(self) -> List[SDSParseJob]:
"""Get batch of pending jobs ordered by priority."""
return self.db.query(SDSParseJob).filter(
SDSParseJob.job_status == "pending",
SDSParseJob.retry_count < self.max_retries
).order_by(
SDSParseJob.priority.desc(),
SDSParseJob.created_at.asc()
).limit(self.batch_size).all()

async def _process_single_job(self, job: SDSParseJob) -> bool:
"""
Process a single parse job.

Returns:
True if successful, False otherwise
"""
logger.info(f"Processing parse job {job.job_id} for SDS {job.sds_id}")

# Mark as processing
job.job_status = "processing"
job.started_at = datetime.utcnow()
self.db.commit()

try:
# Get SDS document
sds = self.db.query(SDSDocument).filter(
SDSDocument.sds_id == job.sds_id
).first()

if not sds:
raise ValueError(f"SDS document {job.sds_id} not found")

# Download PDF from S3
pdf_bytes = await download_file_from_s3(
bucket=sds.s3_bucket,
key=sds.s3_key
)

# Extract text from PDF
sds_text, extraction_metadata = self.pdf_extractor.extract_text(pdf_bytes)

if len(sds_text.strip()) < 100:
raise ValueError("Insufficient text extracted from PDF")

# Parse with LLM
parsed_data = await self.llm_parser.parse(sds_text)

# Calculate confidence
confidence = self.llm_parser.calculate_confidence(parsed_data)

# Populate database tables
self._populate_sds_data(sds, parsed_data, confidence)

# Mark job as completed
job.job_status = "completed"
job.completed_at = datetime.utcnow()
self.db.commit()

logger.info(f"Successfully parsed SDS {sds.sds_id} with confidence {confidence:.2f}")
return True

except Exception as e:
logger.error(f"Failed to parse SDS for job {job.job_id}: {e}")
job.retry_count += 1
job.error_message = str(e)

if job.retry_count >= self.max_retries:
job.job_status = "failed"
else:
job.job_status = "pending" # Will be retried

self.db.commit()
return False

def _populate_sds_data(
self,
sds: SDSDocument,
parsed_data: dict,
confidence: float
) -> None:
"""
Populate SDS document and related tables with parsed data.
"""
# Update SDSDocument
sds.sds_parsed = True
sds.parse_confidence = confidence
sds.parsed_at = datetime.utcnow()

# Extract and populate Section 1 (Identification)
section1 = parsed_data.get("section1_identification", {})
if section1:
sds.product_name = section1.get("product_identifier", sds.product_name)
manufacturer = section1.get("manufacturer", {})
sds.manufacturer = manufacturer.get("company_name", sds.manufacturer)

# Extract and populate Section 2 (Hazard Info)
section2 = parsed_data.get("section2_hazard_identification", {})
if section2:
self._populate_hazard_info(sds, section2)

# Extract and populate Section 3 (Composition)
section3 = parsed_data.get("section3_composition", {})
if section3:
self._populate_composition(sds, section3)

# Populate Sections 4-16
for section_num in range(4, 17):
section_key = f"section{section_num}_"
for key, value in parsed_data.items():
if key.startswith(section_key):
self._populate_section(sds, section_num, key, value)
break

self.db.commit()

def _populate_hazard_info(self, sds: SDSDocument, section2: dict) -> None:
"""Populate SDSHazardInfo from Section 2."""
label_elements = section2.get("label_elements", {})

# Create or update hazard info
hazard_info = sds.hazard_info
if not hazard_info:
hazard_info = SDSHazardInfo(sds_id=sds.sds_id)
self.db.add(hazard_info)

hazard_info.signal_word = label_elements.get("signal_word")
hazard_info.pictograms = label_elements.get("pictograms", [])

# Process classifications
classifications = section2.get("classification", [])
hazard_classes = []
h_codes = []

for classification in classifications:
hazard_classes.append({
"hazard_class": classification.get("hazard_class"),
"hazard_category": classification.get("hazard_category"),
"hazard_sub_category": classification.get("hazard_sub_category"),
"hazard_type": classification.get("hazard_type"),
})
if classification.get("hazard_code"):
h_codes.append(classification.get("hazard_code"))

hazard_info.hazard_classes = hazard_classes
hazard_info.h_codes = h_codes
hazard_info.hazard_statements = label_elements.get("hazard_statements", [])

# Process precautionary statements
precautionary = label_elements.get("precautionary_statements", {})
p_codes = []
for category in ["prevention", "response", "storage", "disposal", "general"]:
for statement in precautionary.get(category, []):
if statement.get("code"):
p_codes.append(statement.get("code"))
hazard_info.p_codes = p_codes

hazard_info.supplemental_hazards = section2.get(
"hazards_not_otherwise_classified", []
)

def _populate_composition(self, sds: SDSDocument, section3: dict) -> None:
"""Populate SDSComposition from Section 3."""
# Clear existing composition entries
self.db.query(SDSComposition).filter(
SDSComposition.sds_id == sds.sds_id
).delete()

ingredients = section3.get("ingredients", [])
for idx, ingredient in enumerate(ingredients):
composition = SDSComposition(
sds_id=sds.sds_id,
chemical_name=ingredient.get("chemical_name"),
common_name=ingredient.get("common_name"),
cas_number=ingredient.get("cas_number"),
ec_number=ingredient.get("ec_number"),
concentration=ingredient.get("concentration", {}),
is_hazardous=bool(ingredient.get("classification")),
sort_order=idx,
)
self.db.add(composition)

def _populate_section(
self,
sds: SDSDocument,
section_num: int,
section_key: str,
section_data: dict
) -> None:
"""Populate SDSSection for sections 4-16."""
# Check if section already exists
existing = self.db.query(SDSSection).filter(
SDSSection.sds_id == sds.sds_id,
SDSSection.section_number == section_num
).first()

if existing:
existing.section_data = section_data
else:
section = SDSSection(
sds_id=sds.sds_id,
section_number=section_num,
section_data=section_data,
)
self.db.add(section)

def _mark_job_failed(self, job: SDSParseJob, error: str) -> None:
"""Mark job as failed with error message."""
job.job_status = "failed"
job.error_message = error
job.completed_at = datetime.utcnow()
self.db.commit()
  1. tellus-ehs-background-service/app/services/sds_parse/llm_parser.py
"""
LLM Parser - Wrapper for LLM-based SDS parsing.

This module interfaces with the main service's SDS parser
or can operate standalone with its own LLM clients.
"""

import json
import os
import logging
from typing import Dict, Any

import anthropic
from openai import OpenAI

logger = logging.getLogger(__name__)


class SDSLLMParser:
"""LLM-based SDS Parser for background service."""

def __init__(self, provider: str = None):
self.provider = provider or os.getenv("VISION_LLM_PROVIDER", "anthropic")
self.prompt_template = self._load_prompt_template()

if self.provider == "anthropic":
self.client = anthropic.Anthropic(
api_key=os.getenv("ANTHROPIC_API_KEY")
)
self.model = os.getenv("ANTHROPIC_MODEL", "claude-sonnet-4-20250514")
else:
self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
self.model = "gpt-4o"

def _load_prompt_template(self) -> str:
"""Load the SDS extraction prompt."""
# Try multiple paths
paths = [
"docs/sds_extraction_prompt.md",
"../docs/sds_extraction_prompt.md",
"/app/docs/sds_extraction_prompt.md",
]

for path in paths:
if os.path.exists(path):
with open(path, "r") as f:
return f.read()

raise FileNotFoundError("Could not find sds_extraction_prompt.md")

async def parse(self, sds_text: str) -> Dict[str, Any]:
"""Parse SDS text using LLM."""
prompt = self.prompt_template.replace(
"{{ $('add-is-text-empty').first().json.text }}",
sds_text
)

if self.provider == "anthropic":
return await self._parse_with_anthropic(prompt)
else:
return await self._parse_with_openai(prompt)

async def _parse_with_anthropic(self, prompt: str) -> Dict[str, Any]:
"""Parse using Anthropic Claude."""
response = self.client.messages.create(
model=self.model,
max_tokens=8192,
messages=[{"role": "user", "content": prompt}]
)
return self._extract_json(response.content[0].text)

async def _parse_with_openai(self, prompt: str) -> Dict[str, Any]:
"""Parse using OpenAI GPT-4."""
response = self.client.chat.completions.create(
model=self.model,
max_tokens=8192,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"}
)
return json.loads(response.choices[0].message.content)

def _extract_json(self, text: str) -> Dict[str, Any]:
"""Extract JSON from response text."""
text = text.strip()
if text.startswith("```json"):
text = text[7:]
elif text.startswith("```"):
text = text[3:]
if text.endswith("```"):
text = text[:-3]
return json.loads(text.strip())

def calculate_confidence(self, parsed_data: Dict[str, Any]) -> float:
"""Calculate confidence score based on data completeness."""
required_sections = [
"section1_identification",
"section2_hazard_identification",
"section3_composition",
]

# Check required sections have data
required_score = sum(
1 for s in required_sections
if s in parsed_data and parsed_data[s]
) / len(required_sections)

# Check all 16 sections
all_sections = [f"section{i}" for i in range(1, 17)]
all_score = sum(
1 for s in all_sections
if any(k.startswith(s) for k in parsed_data.keys())
) / len(all_sections)

return 0.6 * required_score + 0.4 * all_score

Phase 4: Scheduled Job Integration

Objective: Add cron job to background service scheduler.

File to Modify:

  1. tellus-ehs-background-service/app/jobs/definitions.py

Add the following job definition:

from app.services.sds_parse import SDSParseService

async def sds_parse_job():
"""
Process pending SDS parse jobs.

Runs on a schedule to pick up newly uploaded SDS documents
and parse them using LLM.
"""
logger.info("Starting SDS parse job...")

async with get_async_session() as session:
service = SDSParseService(
db=session,
batch_size=int(os.getenv("SDS_PARSE_BATCH_SIZE", "5")),
max_retries=int(os.getenv("SDS_PARSE_MAX_RETRIES", "3"))
)

results = await service.process_pending_jobs()

logger.info(
f"SDS parse job completed: "
f"processed={results['processed']}, "
f"succeeded={results['succeeded']}, "
f"failed={results['failed']}"
)

return results


# Add to JOB_DEFINITIONS dict
JOB_DEFINITIONS = {
# ... existing jobs ...
"sds_parse": {
"func": sds_parse_job,
"trigger": "interval",
"minutes": 5, # Run every 5 minutes
"enabled": os.getenv("ENABLE_SDS_PARSE_JOB", "true").lower() == "true",
"description": "Process pending SDS parse jobs with LLM",
},
}

Phase 5: Update SDS Upload to Create Parse Job

Objective: Automatically create a parse job when an SDS is uploaded.

File to Modify:

  1. tellus-ehs-hazcom-service/app/services/chemiq/sds_service.py

Add the following method and update upload_sds:

from app.db.models.chemiq_sds import SDSParseJob

def _create_parse_job(self, sds_id: UUID, priority: int = 5) -> SDSParseJob:
"""Create a parse job for the uploaded SDS."""
parse_job = SDSParseJob(
sds_id=sds_id,
job_status="pending",
priority=priority,
parse_sections=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16],
created_at=datetime.utcnow(),
)
self.db.add(parse_job)
return parse_job


def upload_sds(self, ...):
"""Upload SDS document."""
# ... existing upload logic ...

# After SDS is created and committed:
self._create_parse_job(sds.sds_id, priority=5)
self.db.commit()

return sds

Phase 6: Configuration Updates

File to Modify:

  1. tellus-ehs-hazcom-service/app/core/config.py
# SDS Parser Settings
SDS_PARSER_PROVIDER: str = "anthropic" # or "openai"
SDS_PARSER_MAX_TOKENS: int = 8192
SDS_PARSER_TIMEOUT_SECONDS: int = 120
  1. tellus-ehs-background-service/.env.example
# SDS Parse Job Settings
ENABLE_SDS_PARSE_JOB=true
SDS_PARSE_BATCH_SIZE=5
SDS_PARSE_MAX_RETRIES=3
SDS_PARSE_INTERVAL_MINUTES=5

# LLM Settings (same as main service)
VISION_LLM_PROVIDER=anthropic
ANTHROPIC_API_KEY=your-api-key
ANTHROPIC_MODEL=claude-sonnet-4-20250514
OPENAI_API_KEY=your-openai-key

Database Considerations

Existing Tables (No Changes Needed)

The following tables already exist and have the required structure:

TablePurpose
chemiq_sds_documentsSDS metadata, S3 location, parse status
chemiq_sds_parse_queueParse job queue with status tracking
chemiq_sds_hazard_infoSection 2 GHS classification
chemiq_sds_compositionSection 3 ingredients
chemiq_sds_sectionsSections 4-16 JSONB storage

Key Fields for Parse Tracking

-- SDSDocument (chemiq_sds_documents)
sds_parsed BOOLEAN DEFAULT FALSE -- Set true when parsed
parse_confidence NUMERIC(3,2) -- 0.00-1.00 confidence score
parse_errors JSONB -- Error details if failed
parsed_at TIMESTAMP -- When parsing completed

-- SDSParseJob (chemiq_sds_parse_queue)
job_status VARCHAR(20) -- pending, processing, completed, failed
priority INTEGER -- 1-10 (higher = process first)
retry_count INTEGER DEFAULT 0 -- Increment on failure
error_message TEXT -- Last error message

Testing Strategy

Unit Tests

# tests/services/test_sds_parser_service.py

def test_anthropic_parser_initialization():
"""Test Anthropic parser initializes correctly."""
pass

def test_openai_parser_initialization():
"""Test OpenAI parser initializes correctly."""
pass

def test_json_extraction_from_markdown():
"""Test JSON extraction handles markdown code blocks."""
pass

def test_confidence_calculation():
"""Test confidence score calculation logic."""
pass

Integration Tests

# tests/integration/test_sds_parsing.py

def test_full_parse_workflow():
"""Test end-to-end SDS parsing workflow."""
# 1. Upload SDS
# 2. Verify parse job created
# 3. Process parse job
# 4. Verify SDSHazardInfo populated
# 5. Verify SDSComposition populated
pass

Sample SDS Files

Create tests/fixtures/sds/ directory with sample SDS PDFs:

  • simple_sds.pdf - Simple single-page SDS
  • complex_sds.pdf - Multi-page with tables
  • scanned_sds.pdf - Scanned/image-based SDS
  • foreign_sds.pdf - Non-English SDS

Monitoring & Observability

Logging

# Key log points
logger.info(f"Created parse job {job_id} for SDS {sds_id}")
logger.info(f"Processing parse job {job_id}")
logger.info(f"PDF text extraction: {len(text)} chars, method={method}")
logger.info(f"LLM parsing completed: confidence={confidence:.2f}")
logger.error(f"Parse job {job_id} failed: {error}")

Metrics to Track

MetricTypeDescription
sds_parse_jobs_pendingGaugeCurrent pending jobs
sds_parse_jobs_processedCounterTotal jobs processed
sds_parse_success_rateGaugeSuccess/total ratio
sds_parse_duration_secondsHistogramTime to parse
sds_parse_confidence_avgGaugeAverage confidence score

Implementation Order & Timeline

Order of Implementation

  1. Phase 1: PDF Text Extraction (Foundation)

    • Create pdf_extractor.py
    • Test with various PDF types
  2. Phase 2: LLM Parser Service (Core Logic)

    • Create sds_parser_service.py in main service
    • Test with sample SDS text
  3. Phase 3: Background Job Processor (Integration)

    • Create sds_parse/service.py
    • Create sds_parse/llm_parser.py
    • Test job processing flow
  4. Phase 4: Scheduled Job (Automation)

    • Update definitions.py
    • Test cron scheduling
  5. Phase 5: Upload Integration (Complete Flow)

    • Update sds_service.py to create parse jobs
    • End-to-end testing
  6. Phase 6: Configuration & Polish

    • Update config files
    • Add monitoring/logging
    • Documentation

Open Questions for Review

  1. LLM Provider Priority: Should Anthropic be the default, or should we prefer OpenAI for its JSON mode support? Answer: Default to OpenAI
  2. Batch Size: How many SDS documents should we process per cron run? (Proposed: 5) Answer: 5 is good
  3. Retry Strategy: How many retries before marking as permanently failed? (Proposed: 3) Answer: 3 is good
  4. Cron Frequency: How often should the parse job run? (Proposed: every 5 minutes) Answer: make it configurable. Start with 5
  5. Confidence Threshold: Should we set a minimum confidence score to accept parsed data, or always store with the score? Answer: Always store with score
  6. OCR Support: Should we include OCR for scanned PDFs? (Adds dependencies: pytesseract, pdf2image) Answer: Yes, add it
  7. Cost Optimization: Should we implement any token/cost limits per day? Answer: Make it configurable but keep is really high for now.
  8. Error Notifications: Should failed parse jobs trigger any alerts? (Future: integrate with notification service) Answer: All the placeholder code for this.

Dependencies

Main Service (tellus-ehs-hazcom-service)

# Already present
anthropic>=0.18.0
openai>=1.0.0

# May need to add
pdfplumber>=0.10.0
pypdf>=4.0.0

Background Service (tellus-ehs-background-service)

# Already present
pdfplumber
pypdf
anthropic
openai
aioboto3

# No new dependencies needed

Risk Mitigation

RiskMitigation
LLM API rate limitsImplement exponential backoff, batch processing
Large PDF filesSet max file size, chunk if needed
Scanned PDFs with poor qualityFallback to manual review flag
LLM hallucinationsValidate extracted CAS numbers, H-codes against known lists
Cost overrunsImplement daily/monthly token budgets
Parse job queue buildupMonitor queue size, auto-scale if needed

Success Criteria

  1. Functional: SDS documents are automatically parsed within 10 minutes of upload
  2. Accuracy: Parse confidence > 0.8 for 90% of standard SDS documents
  3. Reliability: < 5% permanent parse failures
  4. Performance: Average parse time < 30 seconds per document
  5. Data Quality: Section 2 (GHS) and Section 3 (Composition) correctly populated