Quick Inventory Import - Implementation Plan
Overview
This document provides the detailed implementation plan for the Quick Inventory Import feature.
Phase 1: Core Infrastructure
1.1 Database Migrations
Migration 1: Create Staging Tables
File: tellus-ehs-hazcom-service/alembic/versions/YYYY_MM_DD_HHMM-create_inventory_staging_tables.py
"""Create inventory staging tables for Quick Import feature
Revision ID: <auto-generated>
Revises: <previous>
Create Date: <auto-generated>
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID, JSONB
def upgrade():
# Upload Batches Table
op.create_table(
'chemiq_inventory_upload_batches',
sa.Column('batch_id', UUID(as_uuid=True), primary_key=True, server_default=sa.text('gen_random_uuid()')),
sa.Column('company_id', UUID(as_uuid=True), sa.ForeignKey('companies.company_id', ondelete='CASCADE'), nullable=False),
sa.Column('upload_method', sa.String(50), nullable=False), # 'photo', 'barcode', 'excel', 'invoice', 'sds_library', 'site_copy'
sa.Column('source_filename', sa.String(255), nullable=True),
sa.Column('source_file_s3_key', sa.String(500), nullable=True),
sa.Column('total_items', sa.Integer, default=0),
sa.Column('pending_count', sa.Integer, default=0),
sa.Column('approved_count', sa.Integer, default=0),
sa.Column('rejected_count', sa.Integer, default=0),
sa.Column('completed_count', sa.Integer, default=0),
sa.Column('processing_status', sa.String(30), default='pending'), # 'pending', 'processing', 'ready_for_review', 'completed', 'failed'
sa.Column('processing_error', sa.Text, nullable=True),
sa.Column('created_by', UUID(as_uuid=True), sa.ForeignKey('users.user_id'), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('NOW()')),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('NOW()'), onupdate=sa.text('NOW()')),
sa.Column('completed_at', sa.DateTime(timezone=True), nullable=True),
)
op.create_index('idx_batches_company_status', 'chemiq_inventory_upload_batches', ['company_id', 'processing_status'])
op.create_index('idx_batches_created_by', 'chemiq_inventory_upload_batches', ['created_by'])
# Inventory Staging Table
op.create_table(
'chemiq_inventory_staging',
sa.Column('staging_id', UUID(as_uuid=True), primary_key=True, server_default=sa.text('gen_random_uuid()')),
sa.Column('company_id', UUID(as_uuid=True), sa.ForeignKey('companies.company_id', ondelete='CASCADE'), nullable=False),
sa.Column('upload_batch_id', UUID(as_uuid=True), sa.ForeignKey('chemiq_inventory_upload_batches.batch_id', ondelete='CASCADE'), nullable=False),
sa.Column('upload_method', sa.String(50), nullable=False),
# Raw Captured Data
sa.Column('raw_product_name', sa.String(500), nullable=True),
sa.Column('raw_manufacturer', sa.String(255), nullable=True),
sa.Column('raw_barcode', sa.String(100), nullable=True),
sa.Column('raw_quantity', sa.Numeric(10, 2), nullable=True),
sa.Column('raw_unit', sa.String(50), nullable=True),
sa.Column('raw_location_text', sa.String(255), nullable=True),
sa.Column('product_image_urls', JSONB, nullable=True), # Array of S3 URLs
# Location
sa.Column('site_id', UUID(as_uuid=True), sa.ForeignKey('company_sites.site_id'), nullable=True),
sa.Column('location_id', UUID(as_uuid=True), sa.ForeignKey('site_locations.location_id'), nullable=True),
# Auto-Matching Results
sa.Column('matched_company_product_id', UUID(as_uuid=True), sa.ForeignKey('chemiq_company_product_catalog.company_product_id'), nullable=True),
sa.Column('matched_sds_id', UUID(as_uuid=True), sa.ForeignKey('chemiq_sds_documents.sds_id'), nullable=True),
sa.Column('match_confidence', sa.Numeric(3, 2), nullable=True), # 0.00 to 1.00
sa.Column('match_method', sa.String(50), nullable=True), # 'barcode_exact', 'name_fuzzy', 'ocr_text'
sa.Column('ocr_extracted_text', sa.Text, nullable=True),
sa.Column('detected_barcode', sa.String(100), nullable=True), # Barcode detected from image OCR
# Review Workflow
sa.Column('status', sa.String(30), default='pending_processing'), # 'pending_processing', 'pending_review', 'needs_attention', 'approved', 'rejected', 'completed'
# Approved Values (set by supervisor)
sa.Column('approved_product_name', sa.String(500), nullable=True),
sa.Column('approved_manufacturer', sa.String(255), nullable=True),
sa.Column('approved_quantity', sa.Numeric(10, 2), nullable=True),
sa.Column('approved_unit', sa.String(50), nullable=True),
# Notes
sa.Column('capture_notes', sa.Text, nullable=True), # Notes from field worker
sa.Column('reviewer_notes', sa.Text, nullable=True), # Notes from supervisor
# Result
sa.Column('created_inventory_id', UUID(as_uuid=True), sa.ForeignKey('chemiq_company_product_catalog.company_product_id'), nullable=True),
# Audit
sa.Column('created_by', UUID(as_uuid=True), sa.ForeignKey('users.user_id'), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('NOW()')),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('NOW()'), onupdate=sa.text('NOW()')),
sa.Column('reviewed_by', UUID(as_uuid=True), sa.ForeignKey('users.user_id'), nullable=True),
sa.Column('reviewed_at', sa.DateTime(timezone=True), nullable=True),
)
op.create_index('idx_staging_company_status', 'chemiq_inventory_staging', ['company_id', 'status'])
op.create_index('idx_staging_batch', 'chemiq_inventory_staging', ['upload_batch_id'])
op.create_index('idx_staging_barcode', 'chemiq_inventory_staging', ['company_id', 'raw_barcode'], postgresql_where=sa.text('raw_barcode IS NOT NULL'))
def downgrade():
op.drop_table('chemiq_inventory_staging')
op.drop_table('chemiq_inventory_upload_batches')
Migration 2: Create Processing Jobs Table (Background Service)
File: tellus-ehs-hazcom-service/alembic/versions/YYYY_MM_DD_HHMM-create_inventory_processing_jobs.py
"""Create inventory processing jobs table for background processing
Revision ID: <auto-generated>
Revises: <previous>
Create Date: <auto-generated>
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID, JSONB
def upgrade():
op.create_table(
'chemiq_inventory_processing_jobs',
sa.Column('job_id', UUID(as_uuid=True), primary_key=True, server_default=sa.text('gen_random_uuid()')),
sa.Column('company_id', UUID(as_uuid=True), sa.ForeignKey('companies.company_id', ondelete='CASCADE'), nullable=False),
sa.Column('job_type', sa.String(50), nullable=False), # 'ocr_extraction', 'product_matching', 'invoice_parsing', 'excel_processing'
sa.Column('staging_id', UUID(as_uuid=True), sa.ForeignKey('chemiq_inventory_staging.staging_id', ondelete='CASCADE'), nullable=True),
sa.Column('batch_id', UUID(as_uuid=True), sa.ForeignKey('chemiq_inventory_upload_batches.batch_id', ondelete='CASCADE'), nullable=True),
sa.Column('status', sa.String(30), default='pending'), # 'pending', 'processing', 'completed', 'failed'
sa.Column('priority', sa.Integer, default=0),
sa.Column('retry_count', sa.Integer, default=0),
sa.Column('max_retries', sa.Integer, default=3),
sa.Column('input_data', JSONB, nullable=True), # Job-specific input parameters
sa.Column('result_data', JSONB, nullable=True), # Job results
sa.Column('error_message', sa.Text, nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('NOW()')),
sa.Column('started_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('completed_at', sa.DateTime(timezone=True), nullable=True),
)
op.create_index('idx_processing_jobs_pending', 'chemiq_inventory_processing_jobs',
['status', 'priority', 'created_at'],
postgresql_where=sa.text("status = 'pending'"))
op.create_index('idx_processing_jobs_staging', 'chemiq_inventory_processing_jobs', ['staging_id'])
op.create_index('idx_processing_jobs_batch', 'chemiq_inventory_processing_jobs', ['batch_id'])
def downgrade():
op.drop_table('chemiq_inventory_processing_jobs')
1.2 SQLAlchemy Models
File: tellus-ehs-hazcom-service/app/db/models/chemiq_inventory_staging.py
"""
SQLAlchemy models for Quick Inventory Import staging.
"""
from datetime import datetime
from typing import Optional, List
from uuid import UUID
from decimal import Decimal
from sqlalchemy import Column, String, Text, ForeignKey, DateTime, Numeric, Integer
from sqlalchemy.dialects.postgresql import UUID as PGUUID, JSONB
from sqlalchemy.orm import relationship
from app.db.session import Base
class InventoryUploadBatch(Base):
"""Tracks each upload session/batch."""
__tablename__ = "chemiq_inventory_upload_batches"
batch_id = Column(PGUUID(as_uuid=True), primary_key=True, server_default="gen_random_uuid()")
company_id = Column(PGUUID(as_uuid=True), ForeignKey("companies.company_id", ondelete="CASCADE"), nullable=False)
upload_method = Column(String(50), nullable=False)
source_filename = Column(String(255), nullable=True)
source_file_s3_key = Column(String(500), nullable=True)
total_items = Column(Integer, default=0)
pending_count = Column(Integer, default=0)
approved_count = Column(Integer, default=0)
rejected_count = Column(Integer, default=0)
completed_count = Column(Integer, default=0)
processing_status = Column(String(30), default="pending")
processing_error = Column(Text, nullable=True)
created_by = Column(PGUUID(as_uuid=True), ForeignKey("users.user_id"), nullable=False)
created_at = Column(DateTime(timezone=True), server_default="NOW()")
updated_at = Column(DateTime(timezone=True), server_default="NOW()", onupdate=datetime.utcnow)
completed_at = Column(DateTime(timezone=True), nullable=True)
# Relationships
staging_items = relationship("InventoryStagingItem", back_populates="batch", lazy="dynamic")
creator = relationship("User", foreign_keys=[created_by])
class InventoryStagingItem(Base):
"""Individual items pending review."""
__tablename__ = "chemiq_inventory_staging"
staging_id = Column(PGUUID(as_uuid=True), primary_key=True, server_default="gen_random_uuid()")
company_id = Column(PGUUID(as_uuid=True), ForeignKey("companies.company_id", ondelete="CASCADE"), nullable=False)
upload_batch_id = Column(PGUUID(as_uuid=True), ForeignKey("chemiq_inventory_upload_batches.batch_id", ondelete="CASCADE"), nullable=False)
upload_method = Column(String(50), nullable=False)
# Raw Captured Data
raw_product_name = Column(String(500), nullable=True)
raw_manufacturer = Column(String(255), nullable=True)
raw_barcode = Column(String(100), nullable=True)
raw_quantity = Column(Numeric(10, 2), nullable=True)
raw_unit = Column(String(50), nullable=True)
raw_location_text = Column(String(255), nullable=True)
product_image_urls = Column(JSONB, nullable=True)
# Location
site_id = Column(PGUUID(as_uuid=True), ForeignKey("company_sites.site_id"), nullable=True)
location_id = Column(PGUUID(as_uuid=True), ForeignKey("site_locations.location_id"), nullable=True)
# Auto-Matching Results
matched_company_product_id = Column(PGUUID(as_uuid=True), ForeignKey("chemiq_company_product_catalog.company_product_id"), nullable=True)
matched_sds_id = Column(PGUUID(as_uuid=True), ForeignKey("chemiq_sds_documents.sds_id"), nullable=True)
match_confidence = Column(Numeric(3, 2), nullable=True)
match_method = Column(String(50), nullable=True)
ocr_extracted_text = Column(Text, nullable=True)
detected_barcode = Column(String(100), nullable=True)
# Review Workflow
status = Column(String(30), default="pending_processing")
# Approved Values
approved_product_name = Column(String(500), nullable=True)
approved_manufacturer = Column(String(255), nullable=True)
approved_quantity = Column(Numeric(10, 2), nullable=True)
approved_unit = Column(String(50), nullable=True)
# Notes
capture_notes = Column(Text, nullable=True)
reviewer_notes = Column(Text, nullable=True)
# Result
created_inventory_id = Column(PGUUID(as_uuid=True), ForeignKey("chemiq_company_product_catalog.company_product_id"), nullable=True)
# Audit
created_by = Column(PGUUID(as_uuid=True), ForeignKey("users.user_id"), nullable=False)
created_at = Column(DateTime(timezone=True), server_default="NOW()")
updated_at = Column(DateTime(timezone=True), server_default="NOW()", onupdate=datetime.utcnow)
reviewed_by = Column(PGUUID(as_uuid=True), ForeignKey("users.user_id"), nullable=True)
reviewed_at = Column(DateTime(timezone=True), nullable=True)
# Relationships
batch = relationship("InventoryUploadBatch", back_populates="staging_items")
site = relationship("CompanySite")
location = relationship("SiteLocation")
matched_product = relationship("CompanyProductCatalog", foreign_keys=[matched_company_product_id])
matched_sds = relationship("SDSDocument", foreign_keys=[matched_sds_id])
creator = relationship("User", foreign_keys=[created_by])
reviewer = relationship("User", foreign_keys=[reviewed_by])
class InventoryProcessingJob(Base):
"""Background processing jobs for inventory matching."""
__tablename__ = "chemiq_inventory_processing_jobs"
job_id = Column(PGUUID(as_uuid=True), primary_key=True, server_default="gen_random_uuid()")
company_id = Column(PGUUID(as_uuid=True), ForeignKey("companies.company_id", ondelete="CASCADE"), nullable=False)
job_type = Column(String(50), nullable=False)
staging_id = Column(PGUUID(as_uuid=True), ForeignKey("chemiq_inventory_staging.staging_id", ondelete="CASCADE"), nullable=True)
batch_id = Column(PGUUID(as_uuid=True), ForeignKey("chemiq_inventory_upload_batches.batch_id", ondelete="CASCADE"), nullable=True)
status = Column(String(30), default="pending")
priority = Column(Integer, default=0)
retry_count = Column(Integer, default=0)
max_retries = Column(Integer, default=3)
input_data = Column(JSONB, nullable=True)
result_data = Column(JSONB, nullable=True)
error_message = Column(Text, nullable=True)
created_at = Column(DateTime(timezone=True), server_default="NOW()")
started_at = Column(DateTime(timezone=True), nullable=True)
completed_at = Column(DateTime(timezone=True), nullable=True)
# Relationships
staging_item = relationship("InventoryStagingItem")
batch = relationship("InventoryUploadBatch")
1.3 Pydantic Schemas
File: tellus-ehs-hazcom-service/app/schemas/chemiq/inventory_staging.py
"""
Pydantic schemas for Quick Inventory Import.
"""
from datetime import datetime
from typing import Optional, List
from uuid import UUID
from decimal import Decimal
from enum import Enum
from pydantic import BaseModel, Field
class UploadMethod(str, Enum):
PHOTO = "photo"
BARCODE = "barcode"
EXCEL = "excel"
INVOICE = "invoice"
SDS_LIBRARY = "sds_library"
SITE_COPY = "site_copy"
class StagingItemStatus(str, Enum):
PENDING_PROCESSING = "pending_processing"
PENDING_REVIEW = "pending_review"
NEEDS_ATTENTION = "needs_attention"
APPROVED = "approved"
REJECTED = "rejected"
COMPLETED = "completed"
class BatchStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
READY_FOR_REVIEW = "ready_for_review"
COMPLETED = "completed"
FAILED = "failed"
# ============ Request Schemas ============
class PhotoCaptureRequest(BaseModel):
"""Request for photo capture submission."""
site_id: UUID
location_id: UUID
quantity: Decimal = Field(ge=0)
unit: str = Field(max_length=50)
notes: Optional[str] = None
image_urls: List[str] = Field(min_length=2, max_length=5) # S3 URLs for uploaded images
class BarcodeScanRequest(BaseModel):
"""Request for barcode scan submission."""
site_id: UUID
location_id: UUID
barcode: str = Field(max_length=100)
quantity: Decimal = Field(ge=0, default=1)
unit: str = Field(max_length=50, default="each")
notes: Optional[str] = None
class ExcelImportRequest(BaseModel):
"""Request for Excel file import."""
file_s3_key: str # S3 key of uploaded Excel file
default_site_id: Optional[UUID] = None
default_location_id: Optional[UUID] = None
class InvoiceUploadRequest(BaseModel):
"""Request for invoice upload."""
file_s3_key: str # S3 key of uploaded invoice
default_site_id: Optional[UUID] = None
default_location_id: Optional[UUID] = None
class AddFromSDSRequest(BaseModel):
"""Request for adding inventory from SDS library."""
sds_ids: List[UUID]
items: List["SDSInventoryItem"]
class SDSInventoryItem(BaseModel):
"""Individual item when adding from SDS library."""
sds_id: UUID
site_id: UUID
location_id: UUID
quantity: Decimal = Field(ge=0)
unit: str = Field(max_length=50)
notes: Optional[str] = None
class SiteCopyRequest(BaseModel):
"""Request for copying inventory from another site."""
source_site_id: UUID
target_site_id: UUID
inventory_ids: List[UUID]
copy_mode: str = Field(default="copy") # 'copy' or 'transfer'
class StagingItemUpdateRequest(BaseModel):
"""Request for updating a staging item."""
approved_product_name: Optional[str] = None
approved_manufacturer: Optional[str] = None
approved_quantity: Optional[Decimal] = None
approved_unit: Optional[str] = None
site_id: Optional[UUID] = None
location_id: Optional[UUID] = None
matched_sds_id: Optional[UUID] = None
reviewer_notes: Optional[str] = None
class BulkApproveRequest(BaseModel):
"""Request for bulk approving staging items."""
staging_ids: List[UUID]
class BulkRejectRequest(BaseModel):
"""Request for bulk rejecting staging items."""
staging_ids: List[UUID]
rejection_reason: Optional[str] = None
# ============ Response Schemas ============
class UploadBatchResponse(BaseModel):
"""Response for batch creation."""
batch_id: UUID
upload_method: UploadMethod
processing_status: BatchStatus
total_items: int
created_at: datetime
class Config:
from_attributes = True
class UploadBatchDetail(BaseModel):
"""Detailed batch information."""
batch_id: UUID
company_id: UUID
upload_method: UploadMethod
source_filename: Optional[str]
total_items: int
pending_count: int
approved_count: int
rejected_count: int
completed_count: int
processing_status: BatchStatus
processing_error: Optional[str]
created_by: UUID
created_at: datetime
completed_at: Optional[datetime]
class Config:
from_attributes = True
class StagingItemSummary(BaseModel):
"""Summary of a staging item for list views."""
staging_id: UUID
upload_method: UploadMethod
raw_product_name: Optional[str]
raw_barcode: Optional[str]
raw_quantity: Optional[Decimal]
raw_unit: Optional[str]
site_name: Optional[str]
location_name: Optional[str]
match_confidence: Optional[Decimal]
match_method: Optional[str]
status: StagingItemStatus
has_capture_notes: bool
product_image_count: int
created_at: datetime
class Config:
from_attributes = True
class StagingItemDetail(BaseModel):
"""Full details of a staging item."""
staging_id: UUID
company_id: UUID
upload_batch_id: UUID
upload_method: UploadMethod
# Raw Data
raw_product_name: Optional[str]
raw_manufacturer: Optional[str]
raw_barcode: Optional[str]
raw_quantity: Optional[Decimal]
raw_unit: Optional[str]
raw_location_text: Optional[str]
product_image_urls: Optional[List[str]]
# Location
site_id: Optional[UUID]
site_name: Optional[str]
location_id: Optional[UUID]
location_name: Optional[str]
# Matching
matched_company_product_id: Optional[UUID]
matched_product_name: Optional[str]
matched_sds_id: Optional[UUID]
matched_sds_product_name: Optional[str]
match_confidence: Optional[Decimal]
match_method: Optional[str]
ocr_extracted_text: Optional[str]
detected_barcode: Optional[str]
# Status
status: StagingItemStatus
# Approved Values
approved_product_name: Optional[str]
approved_manufacturer: Optional[str]
approved_quantity: Optional[Decimal]
approved_unit: Optional[str]
# Notes
capture_notes: Optional[str]
reviewer_notes: Optional[str]
# Result
created_inventory_id: Optional[UUID]
# Audit
created_by: UUID
created_by_name: Optional[str]
created_at: datetime
reviewed_by: Optional[UUID]
reviewed_by_name: Optional[str]
reviewed_at: Optional[datetime]
class Config:
from_attributes = True
class StagingItemCreateResponse(BaseModel):
"""Response when creating a staging item."""
staging_id: UUID
batch_id: UUID
status: StagingItemStatus
message: str
class BulkActionResponse(BaseModel):
"""Response for bulk operations."""
success_count: int
failed_count: int
failed_ids: List[UUID]
message: str
class ReviewDashboardStats(BaseModel):
"""Stats for the review dashboard."""
total_pending: int
pending_by_method: dict # {method: count}
needs_attention_count: int
approved_today: int
recent_batches: List[UploadBatchResponse]
# Update forward references
AddFromSDSRequest.model_rebuild()
1.4 Repository Layer
File: tellus-ehs-hazcom-service/app/db/repositories/chemiq/inventory_staging_repository.py
"""
Repository for inventory staging operations.
"""
from datetime import datetime, timedelta
from typing import List, Optional, Tuple, Dict, Any
from uuid import UUID
from sqlalchemy import func, and_, or_
from sqlalchemy.orm import Session, joinedload
from app.db.models.chemiq_inventory_staging import (
InventoryUploadBatch,
InventoryStagingItem,
InventoryProcessingJob,
)
from app.db.models.company_sites import CompanySite
from app.db.models.site_locations import SiteLocation
class InventoryStagingRepository:
"""Repository for inventory staging operations."""
def __init__(self, db: Session):
self.db = db
# ============ Batch Operations ============
def create_batch(
self,
company_id: UUID,
upload_method: str,
created_by: UUID,
source_filename: Optional[str] = None,
source_file_s3_key: Optional[str] = None,
) -> InventoryUploadBatch:
"""Create a new upload batch."""
batch = InventoryUploadBatch(
company_id=company_id,
upload_method=upload_method,
created_by=created_by,
source_filename=source_filename,
source_file_s3_key=source_file_s3_key,
processing_status="pending",
)
self.db.add(batch)
self.db.flush()
return batch
def get_batch_by_id(self, batch_id: UUID) -> Optional[InventoryUploadBatch]:
"""Get batch by ID."""
return self.db.query(InventoryUploadBatch).filter(
InventoryUploadBatch.batch_id == batch_id
).first()
def list_batches(
self,
company_id: UUID,
status: Optional[str] = None,
upload_method: Optional[str] = None,
page: int = 1,
page_size: int = 20,
) -> Tuple[List[InventoryUploadBatch], int]:
"""List batches with filters."""
query = self.db.query(InventoryUploadBatch).filter(
InventoryUploadBatch.company_id == company_id
)
if status:
query = query.filter(InventoryUploadBatch.processing_status == status)
if upload_method:
query = query.filter(InventoryUploadBatch.upload_method == upload_method)
total = query.count()
query = query.order_by(InventoryUploadBatch.created_at.desc())
offset = (page - 1) * page_size
batches = query.offset(offset).limit(page_size).all()
return batches, total
def update_batch_counts(self, batch_id: UUID) -> None:
"""Recalculate and update batch item counts."""
counts = self.db.query(
func.count(InventoryStagingItem.staging_id).label('total'),
func.sum(func.cast(InventoryStagingItem.status.in_(['pending_processing', 'pending_review', 'needs_attention']), 'integer')).label('pending'),
func.sum(func.cast(InventoryStagingItem.status == 'approved', 'integer')).label('approved'),
func.sum(func.cast(InventoryStagingItem.status == 'rejected', 'integer')).label('rejected'),
func.sum(func.cast(InventoryStagingItem.status == 'completed', 'integer')).label('completed'),
).filter(
InventoryStagingItem.upload_batch_id == batch_id
).first()
batch = self.get_batch_by_id(batch_id)
if batch:
batch.total_items = counts.total or 0
batch.pending_count = counts.pending or 0
batch.approved_count = counts.approved or 0
batch.rejected_count = counts.rejected or 0
batch.completed_count = counts.completed or 0
def update_batch_status(self, batch_id: UUID, status: str, error: Optional[str] = None) -> None:
"""Update batch processing status."""
batch = self.get_batch_by_id(batch_id)
if batch:
batch.processing_status = status
batch.processing_error = error
if status == "completed":
batch.completed_at = datetime.utcnow()
# ============ Staging Item Operations ============
def create_staging_item(
self,
company_id: UUID,
batch_id: UUID,
upload_method: str,
created_by: UUID,
**kwargs
) -> InventoryStagingItem:
"""Create a new staging item."""
item = InventoryStagingItem(
company_id=company_id,
upload_batch_id=batch_id,
upload_method=upload_method,
created_by=created_by,
status="pending_processing",
**kwargs
)
self.db.add(item)
self.db.flush()
return item
def get_staging_item_by_id(self, staging_id: UUID) -> Optional[InventoryStagingItem]:
"""Get staging item by ID with relationships."""
return self.db.query(InventoryStagingItem).options(
joinedload(InventoryStagingItem.site),
joinedload(InventoryStagingItem.location),
joinedload(InventoryStagingItem.matched_product),
joinedload(InventoryStagingItem.matched_sds),
).filter(
InventoryStagingItem.staging_id == staging_id
).first()
def list_staging_items(
self,
company_id: UUID,
batch_id: Optional[UUID] = None,
status: Optional[str] = None,
upload_method: Optional[str] = None,
needs_attention_only: bool = False,
page: int = 1,
page_size: int = 50,
) -> Tuple[List[InventoryStagingItem], int]:
"""List staging items with filters."""
query = self.db.query(InventoryStagingItem).options(
joinedload(InventoryStagingItem.site),
joinedload(InventoryStagingItem.location),
).filter(
InventoryStagingItem.company_id == company_id
)
if batch_id:
query = query.filter(InventoryStagingItem.upload_batch_id == batch_id)
if status:
query = query.filter(InventoryStagingItem.status == status)
if upload_method:
query = query.filter(InventoryStagingItem.upload_method == upload_method)
if needs_attention_only:
query = query.filter(InventoryStagingItem.status == "needs_attention")
total = query.count()
query = query.order_by(InventoryStagingItem.created_at.desc())
offset = (page - 1) * page_size
items = query.offset(offset).limit(page_size).all()
return items, total
def update_staging_item(self, staging_id: UUID, **updates) -> Optional[InventoryStagingItem]:
"""Update staging item fields."""
item = self.get_staging_item_by_id(staging_id)
if not item:
return None
for key, value in updates.items():
if hasattr(item, key):
setattr(item, key, value)
item.updated_at = datetime.utcnow()
return item
def approve_staging_item(
self,
staging_id: UUID,
reviewer_id: UUID,
) -> Optional[InventoryStagingItem]:
"""Mark staging item as approved."""
item = self.get_staging_item_by_id(staging_id)
if not item:
return None
item.status = "approved"
item.reviewed_by = reviewer_id
item.reviewed_at = datetime.utcnow()
return item
def reject_staging_item(
self,
staging_id: UUID,
reviewer_id: UUID,
reason: Optional[str] = None,
) -> Optional[InventoryStagingItem]:
"""Mark staging item as rejected."""
item = self.get_staging_item_by_id(staging_id)
if not item:
return None
item.status = "rejected"
item.reviewed_by = reviewer_id
item.reviewed_at = datetime.utcnow()
if reason:
item.reviewer_notes = reason
return item
def mark_staging_item_completed(
self,
staging_id: UUID,
inventory_id: UUID,
) -> Optional[InventoryStagingItem]:
"""Mark staging item as completed with inventory reference."""
item = self.get_staging_item_by_id(staging_id)
if not item:
return None
item.status = "completed"
item.created_inventory_id = inventory_id
return item
def get_pending_review_stats(self, company_id: UUID) -> Dict[str, Any]:
"""Get stats for review dashboard."""
pending_statuses = ['pending_review', 'needs_attention']
# Total pending
total_pending = self.db.query(InventoryStagingItem).filter(
InventoryStagingItem.company_id == company_id,
InventoryStagingItem.status.in_(pending_statuses)
).count()
# By method
by_method = self.db.query(
InventoryStagingItem.upload_method,
func.count(InventoryStagingItem.staging_id)
).filter(
InventoryStagingItem.company_id == company_id,
InventoryStagingItem.status.in_(pending_statuses)
).group_by(InventoryStagingItem.upload_method).all()
# Needs attention
needs_attention = self.db.query(InventoryStagingItem).filter(
InventoryStagingItem.company_id == company_id,
InventoryStagingItem.status == 'needs_attention'
).count()
# Approved today
today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
approved_today = self.db.query(InventoryStagingItem).filter(
InventoryStagingItem.company_id == company_id,
InventoryStagingItem.status == 'approved',
InventoryStagingItem.reviewed_at >= today_start
).count()
return {
"total_pending": total_pending,
"pending_by_method": {method: count for method, count in by_method},
"needs_attention_count": needs_attention,
"approved_today": approved_today,
}
# ============ Processing Job Operations ============
def create_processing_job(
self,
company_id: UUID,
job_type: str,
staging_id: Optional[UUID] = None,
batch_id: Optional[UUID] = None,
priority: int = 0,
input_data: Optional[dict] = None,
) -> InventoryProcessingJob:
"""Create a background processing job."""
job = InventoryProcessingJob(
company_id=company_id,
job_type=job_type,
staging_id=staging_id,
batch_id=batch_id,
priority=priority,
input_data=input_data,
status="pending",
)
self.db.add(job)
self.db.flush()
return job
def get_pending_jobs(
self,
job_type: Optional[str] = None,
batch_size: int = 10,
) -> List[InventoryProcessingJob]:
"""Get pending jobs for processing (used by background service)."""
query = self.db.query(InventoryProcessingJob).filter(
InventoryProcessingJob.status == "pending",
InventoryProcessingJob.retry_count < InventoryProcessingJob.max_retries
)
if job_type:
query = query.filter(InventoryProcessingJob.job_type == job_type)
return query.order_by(
InventoryProcessingJob.priority.desc(),
InventoryProcessingJob.created_at.asc()
).limit(batch_size).all()
def update_job_status(
self,
job_id: UUID,
status: str,
result_data: Optional[dict] = None,
error_message: Optional[str] = None,
) -> None:
"""Update job status."""
job = self.db.query(InventoryProcessingJob).filter(
InventoryProcessingJob.job_id == job_id
).first()
if job:
job.status = status
if status == "processing":
job.started_at = datetime.utcnow()
elif status in ["completed", "failed"]:
job.completed_at = datetime.utcnow()
if result_data:
job.result_data = result_data
if error_message:
job.error_message = error_message
job.retry_count += 1
1.5 Service Layer
File: tellus-ehs-hazcom-service/app/services/chemiq/inventory_staging_service.py
"""
Service layer for Quick Inventory Import.
"""
from datetime import datetime
from typing import List, Optional, Tuple, Dict, Any
from uuid import UUID
from decimal import Decimal
from sqlalchemy.orm import Session
from app.db.repositories.chemiq.inventory_staging_repository import InventoryStagingRepository
from app.db.models.chemiq_inventory_staging import (
InventoryUploadBatch,
InventoryStagingItem,
)
from app.schemas.chemiq.inventory_staging import (
PhotoCaptureRequest,
BarcodeScanRequest,
StagingItemUpdateRequest,
StagingItemDetail,
UploadBatchDetail,
)
from app.core.logging import get_logger
logger = get_logger(__name__)
class InventoryStagingService:
"""Service for inventory staging operations."""
def __init__(self, db: Session):
self.db = db
self.repo = InventoryStagingRepository(db)
# ============ Photo Capture ============
async def submit_photo_capture(
self,
company_id: UUID,
user_id: UUID,
request: PhotoCaptureRequest,
batch_id: Optional[UUID] = None,
) -> InventoryStagingItem:
"""
Submit a photo capture item.
Creates staging item and queues OCR/matching job.
"""
# Create or get batch
if not batch_id:
batch = self.repo.create_batch(
company_id=company_id,
upload_method="photo",
created_by=user_id,
)
batch_id = batch.batch_id
# Create staging item
item = self.repo.create_staging_item(
company_id=company_id,
batch_id=batch_id,
upload_method="photo",
created_by=user_id,
site_id=request.site_id,
location_id=request.location_id,
raw_quantity=request.quantity,
raw_unit=request.unit,
capture_notes=request.notes,
product_image_urls=request.image_urls,
status="pending_processing",
)
# Queue OCR extraction job
self.repo.create_processing_job(
company_id=company_id,
job_type="ocr_extraction",
staging_id=item.staging_id,
input_data={"image_urls": request.image_urls},
)
# Update batch counts
self.repo.update_batch_counts(batch_id)
self.db.commit()
logger.info(f"Photo capture submitted: staging_id={item.staging_id}")
return item
# ============ Barcode Scan ============
async def submit_barcode_scan(
self,
company_id: UUID,
user_id: UUID,
request: BarcodeScanRequest,
batch_id: Optional[UUID] = None,
) -> InventoryStagingItem:
"""
Submit a barcode scan item.
Creates staging item and queues product matching job.
Non-blocking - user gets immediate response.
"""
# Create or get batch
if not batch_id:
batch = self.repo.create_batch(
company_id=company_id,
upload_method="barcode",
created_by=user_id,
)
batch_id = batch.batch_id
# Create staging item
item = self.repo.create_staging_item(
company_id=company_id,
batch_id=batch_id,
upload_method="barcode",
created_by=user_id,
site_id=request.site_id,
location_id=request.location_id,
raw_barcode=request.barcode,
raw_quantity=request.quantity,
raw_unit=request.unit,
capture_notes=request.notes,
status="pending_processing",
)
# Queue product matching job (async, non-blocking)
self.repo.create_processing_job(
company_id=company_id,
job_type="product_matching",
staging_id=item.staging_id,
priority=1, # Higher priority for barcode matches
input_data={"barcode": request.barcode},
)
# Update batch counts
self.repo.update_batch_counts(batch_id)
self.db.commit()
logger.info(f"Barcode scan submitted: staging_id={item.staging_id}, barcode={request.barcode}")
return item
# ============ Review Operations ============
async def get_review_dashboard_stats(
self,
company_id: UUID,
) -> Dict[str, Any]:
"""Get stats for the supervisor review dashboard."""
stats = self.repo.get_pending_review_stats(company_id)
# Get recent batches
batches, _ = self.repo.list_batches(
company_id=company_id,
status="ready_for_review",
page_size=5,
)
stats["recent_batches"] = batches
return stats
async def approve_items(
self,
company_id: UUID,
reviewer_id: UUID,
staging_ids: List[UUID],
) -> Dict[str, Any]:
"""Approve multiple staging items."""
success_count = 0
failed_ids = []
for staging_id in staging_ids:
item = self.repo.get_staging_item_by_id(staging_id)
if not item or item.company_id != company_id:
failed_ids.append(staging_id)
continue
if item.status not in ["pending_review", "needs_attention"]:
failed_ids.append(staging_id)
continue
self.repo.approve_staging_item(staging_id, reviewer_id)
success_count += 1
# Update batch counts for affected batches
affected_batches = set()
for staging_id in staging_ids:
item = self.repo.get_staging_item_by_id(staging_id)
if item:
affected_batches.add(item.upload_batch_id)
for batch_id in affected_batches:
self.repo.update_batch_counts(batch_id)
self.db.commit()
return {
"success_count": success_count,
"failed_count": len(failed_ids),
"failed_ids": failed_ids,
}
async def reject_items(
self,
company_id: UUID,
reviewer_id: UUID,
staging_ids: List[UUID],
reason: Optional[str] = None,
) -> Dict[str, Any]:
"""Reject multiple staging items."""
success_count = 0
failed_ids = []
for staging_id in staging_ids:
item = self.repo.get_staging_item_by_id(staging_id)
if not item or item.company_id != company_id:
failed_ids.append(staging_id)
continue
self.repo.reject_staging_item(staging_id, reviewer_id, reason)
success_count += 1
# Update batch counts
affected_batches = set()
for staging_id in staging_ids:
item = self.repo.get_staging_item_by_id(staging_id)
if item:
affected_batches.add(item.upload_batch_id)
for batch_id in affected_batches:
self.repo.update_batch_counts(batch_id)
self.db.commit()
return {
"success_count": success_count,
"failed_count": len(failed_ids),
"failed_ids": failed_ids,
}
async def update_staging_item(
self,
company_id: UUID,
staging_id: UUID,
request: StagingItemUpdateRequest,
) -> Optional[InventoryStagingItem]:
"""Update a staging item's fields."""
item = self.repo.get_staging_item_by_id(staging_id)
if not item or item.company_id != company_id:
return None
updates = request.model_dump(exclude_unset=True)
self.repo.update_staging_item(staging_id, **updates)
self.db.commit()
return self.repo.get_staging_item_by_id(staging_id)
1.6 API Endpoints
File: tellus-ehs-hazcom-service/app/api/v1/chemiq/inventory_staging.py
"""
API endpoints for Quick Inventory Import.
"""
from typing import Optional, List
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.api.deps import get_current_user, get_company_id
from app.services.chemiq.inventory_staging_service import InventoryStagingService
from app.schemas.chemiq.inventory_staging import (
PhotoCaptureRequest,
BarcodeScanRequest,
StagingItemUpdateRequest,
BulkApproveRequest,
BulkRejectRequest,
StagingItemCreateResponse,
StagingItemDetail,
StagingItemSummary,
UploadBatchDetail,
UploadBatchResponse,
BulkActionResponse,
ReviewDashboardStats,
)
router = APIRouter(prefix="/inventory/staging", tags=["Inventory Staging"])
# ============ Capture Endpoints ============
@router.post(
"/photo",
response_model=StagingItemCreateResponse,
status_code=status.HTTP_202_ACCEPTED,
)
async def submit_photo_capture(
request: PhotoCaptureRequest,
batch_id: Optional[UUID] = Query(None, description="Existing batch to add to"),
db: Session = Depends(get_db),
current_user = Depends(get_current_user),
company_id: UUID = Depends(get_company_id),
):
"""
Submit a photo capture item for staging.
Photos are uploaded separately via /upload-image endpoint.
This creates a staging item and queues OCR/matching jobs.
Returns immediately (non-blocking).
"""
service = InventoryStagingService(db)
item = await service.submit_photo_capture(
company_id=company_id,
user_id=current_user.user_id,
request=request,
batch_id=batch_id,
)
return StagingItemCreateResponse(
staging_id=item.staging_id,
batch_id=item.upload_batch_id,
status=item.status,
message="Photo capture submitted. Processing in background.",
)
@router.post(
"/barcode",
response_model=StagingItemCreateResponse,
status_code=status.HTTP_202_ACCEPTED,
)
async def submit_barcode_scan(
request: BarcodeScanRequest,
batch_id: Optional[UUID] = Query(None, description="Existing batch to add to"),
db: Session = Depends(get_db),
current_user = Depends(get_current_user),
company_id: UUID = Depends(get_company_id),
):
"""
Submit a barcode scan item for staging.
Creates a staging item and queues product matching job.
Returns immediately (non-blocking) - user can continue scanning.
"""
service = InventoryStagingService(db)
item = await service.submit_barcode_scan(
company_id=company_id,
user_id=current_user.user_id,
request=request,
batch_id=batch_id,
)
return StagingItemCreateResponse(
staging_id=item.staging_id,
batch_id=item.upload_batch_id,
status=item.status,
message="Barcode scan submitted. Product matching in progress.",
)
# ============ Batch Endpoints ============
@router.get(
"/batches",
response_model=List[UploadBatchResponse],
)
async def list_batches(
status: Optional[str] = Query(None),
upload_method: Optional[str] = Query(None),
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db),
current_user = Depends(get_current_user),
company_id: UUID = Depends(get_company_id),
):
"""List upload batches for the company."""
service = InventoryStagingService(db)
batches, total = await service.repo.list_batches(
company_id=company_id,
status=status,
upload_method=upload_method,
page=page,
page_size=page_size,
)
return batches
@router.get(
"/batches/{batch_id}",
response_model=UploadBatchDetail,
)
async def get_batch_detail(
batch_id: UUID,
db: Session = Depends(get_db),
current_user = Depends(get_current_user),
company_id: UUID = Depends(get_company_id),
):
"""Get detailed information about a batch."""
service = InventoryStagingService(db)
batch = service.repo.get_batch_by_id(batch_id)
if not batch or batch.company_id != company_id:
raise HTTPException(status_code=404, detail="Batch not found")
return batch
# ============ Staging Item Endpoints ============
@router.get(
"",
response_model=List[StagingItemSummary],
)
async def list_staging_items(
batch_id: Optional[UUID] = Query(None),
status: Optional[str] = Query(None),
upload_method: Optional[str] = Query(None),
needs_attention: bool = Query(False),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=100),
db: Session = Depends(get_db),
current_user = Depends(get_current_user),
company_id: UUID = Depends(get_company_id),
):
"""List staging items with filters."""
service = InventoryStagingService(db)
items, total = service.repo.list_staging_items(
company_id=company_id,
batch_id=batch_id,
status=status,
upload_method=upload_method,
needs_attention_only=needs_attention,
page=page,
page_size=page_size,
)
# Transform to summary response
summaries = []
for item in items:
summaries.append(StagingItemSummary(
staging_id=item.staging_id,
upload_method=item.upload_method,
raw_product_name=item.raw_product_name,
raw_barcode=item.raw_barcode,
raw_quantity=item.raw_quantity,
raw_unit=item.raw_unit,
site_name=item.site.site_name if item.site else None,
location_name=item.location.location_name if item.location else None,
match_confidence=item.match_confidence,
match_method=item.match_method,
status=item.status,
has_capture_notes=bool(item.capture_notes),
product_image_count=len(item.product_image_urls or []),
created_at=item.created_at,
))
return summaries
@router.get(
"/{staging_id}",
response_model=StagingItemDetail,
)
async def get_staging_item_detail(
staging_id: UUID,
db: Session = Depends(get_db),
current_user = Depends(get_current_user),
company_id: UUID = Depends(get_company_id),
):
"""Get full details of a staging item."""
service = InventoryStagingService(db)
item = service.repo.get_staging_item_by_id(staging_id)
if not item or item.company_id != company_id:
raise HTTPException(status_code=404, detail="Staging item not found")
return item
@router.put(
"/{staging_id}",
response_model=StagingItemDetail,
)
async def update_staging_item(
staging_id: UUID,
request: StagingItemUpdateRequest,
db: Session = Depends(get_db),
current_user = Depends(get_current_user),
company_id: UUID = Depends(get_company_id),
):
"""Update a staging item."""
service = InventoryStagingService(db)
item = await service.update_staging_item(
company_id=company_id,
staging_id=staging_id,
request=request,
)
if not item:
raise HTTPException(status_code=404, detail="Staging item not found")
return item
# ============ Review Endpoints ============
@router.get(
"/review/dashboard",
response_model=ReviewDashboardStats,
)
async def get_review_dashboard(
db: Session = Depends(get_db),
current_user = Depends(get_current_user),
company_id: UUID = Depends(get_company_id),
):
"""Get stats for the supervisor review dashboard."""
service = InventoryStagingService(db)
return await service.get_review_dashboard_stats(company_id)
@router.post(
"/{staging_id}/approve",
response_model=StagingItemDetail,
)
async def approve_single_item(
staging_id: UUID,
db: Session = Depends(get_db),
current_user = Depends(get_current_user),
company_id: UUID = Depends(get_company_id),
):
"""Approve a single staging item."""
service = InventoryStagingService(db)
result = await service.approve_items(
company_id=company_id,
reviewer_id=current_user.user_id,
staging_ids=[staging_id],
)
if result["failed_count"] > 0:
raise HTTPException(status_code=400, detail="Failed to approve item")
return service.repo.get_staging_item_by_id(staging_id)
@router.post(
"/{staging_id}/reject",
)
async def reject_single_item(
staging_id: UUID,
reason: Optional[str] = None,
db: Session = Depends(get_db),
current_user = Depends(get_current_user),
company_id: UUID = Depends(get_company_id),
):
"""Reject a single staging item."""
service = InventoryStagingService(db)
result = await service.reject_items(
company_id=company_id,
reviewer_id=current_user.user_id,
staging_ids=[staging_id],
reason=reason,
)
if result["failed_count"] > 0:
raise HTTPException(status_code=400, detail="Failed to reject item")
return {"message": "Item rejected"}
@router.post(
"/bulk-approve",
response_model=BulkActionResponse,
)
async def bulk_approve_items(
request: BulkApproveRequest,
db: Session = Depends(get_db),
current_user = Depends(get_current_user),
company_id: UUID = Depends(get_company_id),
):
"""Approve multiple staging items at once."""
service = InventoryStagingService(db)
result = await service.approve_items(
company_id=company_id,
reviewer_id=current_user.user_id,
staging_ids=request.staging_ids,
)
return BulkActionResponse(
success_count=result["success_count"],
failed_count=result["failed_count"],
failed_ids=result["failed_ids"],
message=f"Approved {result['success_count']} items",
)
@router.post(
"/bulk-reject",
response_model=BulkActionResponse,
)
async def bulk_reject_items(
request: BulkRejectRequest,
db: Session = Depends(get_db),
current_user = Depends(get_current_user),
company_id: UUID = Depends(get_company_id),
):
"""Reject multiple staging items at once."""
service = InventoryStagingService(db)
result = await service.reject_items(
company_id=company_id,
reviewer_id=current_user.user_id,
staging_ids=request.staging_ids,
reason=request.rejection_reason,
)
return BulkActionResponse(
success_count=result["success_count"],
failed_count=result["failed_count"],
failed_ids=result["failed_ids"],
message=f"Rejected {result['success_count']} items",
)
Phase 2: Background Service Jobs
2.1 Product Matching Service
File: tellus-ehs-background-service/app/services/inventory_matching/service.py
"""
Inventory Matching Service - Background Job Processor
Handles async matching of staging items to existing products/SDS.
"""
from datetime import datetime
from typing import Optional, List, Dict, Any
from uuid import UUID
from decimal import Decimal
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.logging import get_logger
from app.db import get_async_db
from app.db.repositories.unit_of_work import AsyncUnitOfWork
from app.db.models import InventoryStagingItem, InventoryProcessingJob
from app.db.models.chemiq_product_catalog import CompanyProductCatalog
from app.db.models.chemiq_sds import SDSDocument, CompanySDSMapping
logger = get_logger(__name__)
class InventoryMatchingService:
"""
Service for matching staging items to products and SDS documents.
Runs in background service, polls pending jobs.
"""
def __init__(self, batch_size: int = 10, max_retries: int = 3):
self.batch_size = batch_size
self.max_retries = max_retries
async def process_pending_jobs(self) -> Dict[str, Any]:
"""Process a batch of pending matching jobs."""
results = {
"processed": 0,
"succeeded": 0,
"failed": 0,
"errors": [],
}
async with get_async_db() as db:
uow = AsyncUnitOfWork(db)
# Get pending product_matching jobs
pending_jobs = await uow.inventory_jobs.get_pending_jobs(
job_type="product_matching",
batch_size=self.batch_size,
)
if not pending_jobs:
logger.info("No pending inventory matching jobs")
return results
logger.info(f"Processing {len(pending_jobs)} matching jobs")
for job in pending_jobs:
try:
success = await self._process_single_job(uow, job)
results["processed"] += 1
if success:
results["succeeded"] += 1
else:
results["failed"] += 1
except Exception as e:
logger.error(f"Error processing job {job.job_id}: {e}")
await uow.inventory_jobs.mark_failed(job, str(e))
await uow.commit()
results["failed"] += 1
results["errors"].append({
"job_id": str(job.job_id),
"error": str(e)
})
return results
async def _process_single_job(
self,
uow: AsyncUnitOfWork,
job: InventoryProcessingJob,
) -> bool:
"""Process a single matching job."""
logger.info(f"Processing matching job {job.job_id} for staging {job.staging_id}")
# Mark as processing
await uow.inventory_jobs.update_status(job, "processing")
await uow.commit()
try:
# Get staging item
staging_item = await uow.inventory_staging.get_by_id(job.staging_id)
if not staging_item:
raise ValueError(f"Staging item {job.staging_id} not found")
match_result = None
# Try barcode match first (highest confidence)
barcode = job.input_data.get("barcode") or staging_item.raw_barcode
if barcode:
match_result = await self._match_by_barcode(uow, staging_item.company_id, barcode)
# Try name match if no barcode match
if not match_result and staging_item.raw_product_name:
match_result = await self._match_by_name(
uow,
staging_item.company_id,
staging_item.raw_product_name
)
# Try OCR text match
if not match_result and staging_item.ocr_extracted_text:
match_result = await self._match_by_ocr_text(
uow,
staging_item.company_id,
staging_item.ocr_extracted_text
)
# Update staging item with match results
if match_result:
await uow.inventory_staging.update(staging_item, {
"matched_company_product_id": match_result.get("product_id"),
"matched_sds_id": match_result.get("sds_id"),
"match_confidence": match_result.get("confidence"),
"match_method": match_result.get("method"),
"status": "pending_review" if match_result.get("confidence", 0) >= 0.7 else "needs_attention",
})
else:
await uow.inventory_staging.update(staging_item, {
"status": "needs_attention",
"match_confidence": Decimal("0"),
"match_method": None,
})
# Mark job as completed
await uow.inventory_jobs.update_status(job, "completed", result_data=match_result)
# Update batch counts
await uow.inventory_batches.update_counts(staging_item.upload_batch_id)
await uow.commit()
logger.info(f"Matching complete for staging {staging_item.staging_id}: {match_result}")
return True
except Exception as e:
logger.error(f"Failed to match staging item for job {job.job_id}: {e}")
if job.retry_count + 1 >= self.max_retries:
await uow.inventory_jobs.mark_failed(job, str(e))
# Mark staging item as needs_attention
if job.staging_id:
staging_item = await uow.inventory_staging.get_by_id(job.staging_id)
if staging_item:
await uow.inventory_staging.update(staging_item, {"status": "needs_attention"})
else:
await uow.inventory_jobs.increment_retry(job, str(e))
await uow.commit()
return False
async def _match_by_barcode(
self,
uow: AsyncUnitOfWork,
company_id: UUID,
barcode: str,
) -> Optional[Dict[str, Any]]:
"""Try to match by exact barcode."""
product = await uow.product_catalog.find_by_barcode(company_id, barcode)
if product:
return {
"product_id": product.company_product_id,
"sds_id": product.current_sds_id,
"product_name": product.product_name,
"confidence": Decimal("1.0"),
"method": "barcode_exact",
}
return None
async def _match_by_name(
self,
uow: AsyncUnitOfWork,
company_id: UUID,
product_name: str,
) -> Optional[Dict[str, Any]]:
"""Try fuzzy match by product name."""
# Use trigram similarity search
results = await uow.product_catalog.fuzzy_search(
company_id=company_id,
search_term=product_name,
limit=1,
min_similarity=0.7,
)
if results:
best_match = results[0]
return {
"product_id": best_match.company_product_id,
"sds_id": best_match.current_sds_id,
"product_name": best_match.product_name,
"confidence": Decimal(str(best_match.similarity)),
"method": "name_fuzzy",
}
# Try SDS library if no product match
sds_results = await uow.sds_documents.fuzzy_search(
company_id=company_id,
search_term=product_name,
limit=1,
min_similarity=0.7,
)
if sds_results:
best_sds = sds_results[0]
return {
"product_id": None,
"sds_id": best_sds.sds_id,
"product_name": best_sds.product_name,
"confidence": Decimal(str(best_sds.similarity)),
"method": "name_fuzzy",
}
return None
async def _match_by_ocr_text(
self,
uow: AsyncUnitOfWork,
company_id: UUID,
ocr_text: str,
) -> Optional[Dict[str, Any]]:
"""Try to match using OCR extracted text."""
# Extract potential product identifiers from OCR text
# This is simplified - real implementation would use NLP/regex
# Try full-text search on SDS library
results = await uow.sds_documents.fulltext_search(
company_id=company_id,
search_text=ocr_text[:500], # Limit search text
limit=1,
)
if results:
best_match = results[0]
return {
"product_id": None,
"sds_id": best_match.sds_id,
"product_name": best_match.product_name,
"confidence": Decimal("0.6"), # Lower confidence for OCR
"method": "ocr_text",
}
return None
async def run_inventory_matching_job(
batch_size: int = 10,
max_retries: int = 3,
) -> Dict[str, Any]:
"""Entry point for the inventory matching job."""
service = InventoryMatchingService(
batch_size=batch_size,
max_retries=max_retries,
)
return await service.process_pending_jobs()
Phase 3: Frontend Components
3.1 Component Hierarchy
src/pages/chemiq/inventory/
├── QuickImport/
│ ├── index.tsx # Main Quick Import page with method selection
│ ├── components/
│ │ ├── ImportMethodCard.tsx # Card for each import method
│ │ ├── PhotoCapture/
│ │ │ ├── PhotoCaptureModal.tsx
│ │ │ ├── CameraView.tsx
│ │ │ └── PhotoPreview.tsx
│ │ ├── BarcodeScanner/
│ │ │ ├── BarcodeScannerModal.tsx
│ │ │ ├── ScannerView.tsx
│ │ │ └── QuantityEntry.tsx
│ │ ├── ExcelImport/
│ │ │ ├── ExcelImportModal.tsx
│ │ │ ├── ColumnMapper.tsx
│ │ │ └── ImportPreview.tsx
│ │ ├── InvoiceUpload/
│ │ │ ├── InvoiceUploadModal.tsx
│ │ │ └── ExtractedItemsPreview.tsx
│ │ ├── AddFromSDS/
│ │ │ ├── AddFromSDSModal.tsx
│ │ │ └── SDSSelectionList.tsx
│ │ └── SiteCopy/
│ │ └── SiteCopyModal.tsx
│ └── ReviewQueue/
│ ├── index.tsx # Review dashboard
│ ├── components/
│ │ ├── BatchList.tsx
│ │ ├── BatchReviewView.tsx
│ │ ├── StagingItemCard.tsx
│ │ ├── StagingItemEditModal.tsx
│ │ └── BulkActionsBar.tsx
│ └── hooks/
│ └── useStagingItems.ts
3.2 Key Frontend Components
File: tellus-ehs-hazcom-ui/src/pages/chemiq/inventory/QuickImport/index.tsx
import React from 'react';
import {
Camera,
Barcode,
FileSpreadsheet,
FileText,
Link,
Copy,
ClipboardList
} from 'lucide-react';
import { useNavigate } from 'react-router-dom';
import ImportMethodCard from './components/ImportMethodCard';
import { usePendingReviewCount } from './hooks/usePendingReviewCount';
const importMethods = [
{
id: 'photo',
title: 'Photo Capture',
description: 'Take photos of product labels for automatic extraction',
icon: Camera,
color: 'bg-blue-500',
mobileOptimized: true,
},
{
id: 'barcode',
title: 'Barcode Scan',
description: 'Scan product barcodes for quick inventory count',
icon: Barcode,
color: 'bg-green-500',
mobileOptimized: true,
},
{
id: 'excel',
title: 'Excel Import',
description: 'Import inventory from spreadsheet',
icon: FileSpreadsheet,
color: 'bg-emerald-500',
mobileOptimized: false,
},
{
id: 'invoice',
title: 'Invoice Upload',
description: 'Extract inventory from purchase invoices',
icon: FileText,
color: 'bg-orange-500',
mobileOptimized: false,
},
{
id: 'sds-library',
title: 'Add from SDS Library',
description: 'Create inventory from existing SDS documents',
icon: Link,
color: 'bg-purple-500',
mobileOptimized: false,
},
{
id: 'site-copy',
title: 'Copy from Site',
description: 'Copy or transfer inventory from another location',
icon: Copy,
color: 'bg-cyan-500',
mobileOptimized: false,
},
];
const QuickImportPage: React.FC = () => {
const navigate = useNavigate();
const { pendingCount, needsAttentionCount } = usePendingReviewCount();
const handleMethodSelect = (methodId: string) => {
// Open appropriate modal or navigate to method-specific page
navigate(`/chemiq/inventory/quick-import/${methodId}`);
};
return (
<div className="p-6">
<div className="flex items-center justify-between mb-6">
<div>
<h1 className="text-2xl font-bold text-text-main">Quick Import</h1>
<p className="text-text-muted mt-1">
Add inventory items using various capture methods
</p>
</div>
{pendingCount > 0 && (
<button
onClick={() => navigate('/chemiq/inventory/quick-import/review')}
className="btn-secondary inline-flex items-center gap-2"
>
<ClipboardList className="h-5 w-5" />
Review Queue
<span className="bg-accent-magenta text-white text-xs px-2 py-0.5 rounded-full">
{pendingCount}
</span>
{needsAttentionCount > 0 && (
<span className="bg-yellow-500 text-white text-xs px-2 py-0.5 rounded-full">
{needsAttentionCount} attention
</span>
)}
</button>
)}
</div>
<div className="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-4">
{importMethods.map((method) => (
<ImportMethodCard
key={method.id}
{...method}
onClick={() => handleMethodSelect(method.id)}
/>
))}
</div>
</div>
);
};
export default QuickImportPage;
Implementation Timeline
Phase 1: Core Infrastructure (Foundation)
Dependencies: None Components:
- Database migrations (staging tables, jobs table)
- SQLAlchemy models
- Pydantic schemas
- Repository layer
- Basic service layer (create staging items)
- API endpoints (POST barcode/photo, GET staging items)
- Basic review dashboard UI
Phase 2: Barcode Scanning
Dependencies: Phase 1 Components:
- Barcode scanner modal (frontend)
- Camera access and barcode detection (QuaggaJS or ZXing)
- Quantity entry form
- Batch session management
- Product matching background job
- Review queue integration
Phase 3: Photo Capture
Dependencies: Phase 1 Components:
- Photo capture modal (frontend)
- Multi-photo upload to S3
- OCR extraction background job (Tesseract/Cloud Vision)
- Barcode detection from images
- Auto-matching from OCR text
Phase 4: Excel Import
Dependencies: Phase 1 Components:
- Excel upload modal
- Template download endpoint
- Column mapping UI
- Validation preview
- Excel processing background job
- Batch creation from rows
Phase 5: Review & Approval Workflow
Dependencies: Phase 1-4 Components:
- Batch review view
- Single item edit modal
- Bulk approve/reject actions
- Inventory creation from approved items
- Batch completion logic
Phase 6: Advanced Features
Dependencies: Phase 1-5 Components:
- Invoice parsing with LLM
- Add from SDS Library
- Copy from Site
- Mobile-optimized capture flows
Testing Strategy
Unit Tests
- Repository methods
- Service layer business logic
- Schema validation
Integration Tests
- API endpoint flows
- Database transactions
- Background job processing
E2E Tests
- Barcode scan → review → inventory creation
- Photo capture → OCR → matching → approval
- Excel import → validation → bulk approval
Monitoring & Observability
Metrics to Track
- Items submitted per method
- Background job processing time
- Match success rate by method
- Review queue depth
- Approval/rejection rates
Logging
- All API requests to staging endpoints
- Background job start/complete/fail
- Match results with confidence scores
- Batch status transitions
Security Considerations
- Image Upload: Validate file types, scan for malware, limit sizes
- OCR Data: Don't log full OCR text (may contain sensitive info)
- Multi-tenant: All queries scoped by company_id
- Rate Limiting: Limit barcode/photo submissions per user
- S3 Access: Pre-signed URLs with short expiry for image access
Advanced Features - Detailed Implementation
Excel/CSV Import
Pydantic Schemas (Additional)
# Add to app/schemas/chemiq/inventory_staging.py
class ExcelColumnMapping(BaseModel):
"""Column mapping for Excel import."""
source_column: str # Column name in uploaded file
target_field: str # Field in staging item
is_required: bool = False
class ExcelImportRequest(BaseModel):
"""Request for Excel file import."""
file_s3_key: str
column_mappings: Optional[List[ExcelColumnMapping]] = None
default_site_id: Optional[UUID] = None
default_location_id: Optional[UUID] = None
skip_header_row: bool = True
class ExcelValidationRow(BaseModel):
"""Single row validation result."""
row_number: int
data: Dict[str, Any]
is_valid: bool
errors: List[str]
warnings: List[str]
class ExcelPreviewResponse(BaseModel):
"""Response for Excel preview before import."""
batch_id: UUID
filename: str
total_rows: int
valid_rows: int
invalid_rows: int
columns_detected: List[str]
suggested_mappings: List[ExcelColumnMapping]
preview_rows: List[ExcelValidationRow] # First 10 rows
validation_summary: Dict[str, int] # Error type counts
class ExcelTemplateColumn(BaseModel):
"""Column definition for template."""
name: str
description: str
required: bool
example: str
Service Layer - Excel Import
# Add to app/services/chemiq/inventory_staging_service.py
from openpyxl import load_workbook
from io import BytesIO
import csv
class InventoryStagingService:
# ... existing methods ...
# ============ Excel Import ============
async def upload_excel_file(
self,
company_id: UUID,
user_id: UUID,
file: UploadFile,
) -> Dict[str, Any]:
"""
Upload Excel file to S3 and return preview.
Does NOT create staging items yet - just validates and previews.
"""
# Validate file type
if not file.filename.endswith(('.xlsx', '.xls', '.csv')):
raise ValueError("File must be Excel (.xlsx, .xls) or CSV (.csv)")
# Read file content
content = await file.read()
# Upload to S3 for later processing
s3_key = f"imports/{company_id}/{user_id}/{datetime.utcnow().isoformat()}_{file.filename}"
await upload_to_s3(content, s3_key)
# Parse and preview
if file.filename.endswith('.csv'):
rows, columns = self._parse_csv(content)
else:
rows, columns = self._parse_excel(content)
# Auto-detect column mappings
suggested_mappings = self._suggest_column_mappings(columns)
# Validate first 100 rows
validation_results = self._validate_excel_rows(
rows[:100],
suggested_mappings,
company_id,
)
# Create batch in 'pending' state
batch = self.repo.create_batch(
company_id=company_id,
upload_method="excel",
created_by=user_id,
source_filename=file.filename,
source_file_s3_key=s3_key,
)
batch.processing_status = "pending" # Waiting for user confirmation
self.db.commit()
return {
"batch_id": batch.batch_id,
"filename": file.filename,
"total_rows": len(rows),
"valid_rows": sum(1 for r in validation_results if r["is_valid"]),
"invalid_rows": sum(1 for r in validation_results if not r["is_valid"]),
"columns_detected": columns,
"suggested_mappings": suggested_mappings,
"preview_rows": validation_results[:10],
"s3_key": s3_key,
}
async def confirm_excel_import(
self,
company_id: UUID,
user_id: UUID,
batch_id: UUID,
column_mappings: List[ExcelColumnMapping],
default_site_id: Optional[UUID] = None,
default_location_id: Optional[UUID] = None,
) -> Dict[str, Any]:
"""
Confirm Excel import with final column mappings.
Queues background job for processing.
"""
batch = self.repo.get_batch_by_id(batch_id)
if not batch or batch.company_id != company_id:
raise ValueError("Batch not found")
if batch.processing_status != "pending":
raise ValueError("Batch already processed")
# Queue background job
self.repo.create_processing_job(
company_id=company_id,
job_type="excel_processing",
batch_id=batch_id,
input_data={
"s3_key": batch.source_file_s3_key,
"column_mappings": [m.model_dump() for m in column_mappings],
"default_site_id": str(default_site_id) if default_site_id else None,
"default_location_id": str(default_location_id) if default_location_id else None,
},
)
batch.processing_status = "processing"
self.db.commit()
return {
"batch_id": batch_id,
"message": "Excel import queued for processing",
"status": "processing",
}
def _parse_excel(self, content: bytes) -> Tuple[List[Dict], List[str]]:
"""Parse Excel file and return rows and column names."""
wb = load_workbook(BytesIO(content), read_only=True)
ws = wb.active
rows = list(ws.iter_rows(values_only=True))
if not rows:
return [], []
columns = [str(c) if c else f"Column_{i}" for i, c in enumerate(rows[0])]
data_rows = []
for row in rows[1:]: # Skip header
row_dict = {}
for i, value in enumerate(row):
if i < len(columns):
row_dict[columns[i]] = value
if any(row_dict.values()): # Skip empty rows
data_rows.append(row_dict)
return data_rows, columns
def _parse_csv(self, content: bytes) -> Tuple[List[Dict], List[str]]:
"""Parse CSV file and return rows and column names."""
text = content.decode('utf-8-sig') # Handle BOM
reader = csv.DictReader(text.splitlines())
rows = list(reader)
columns = reader.fieldnames or []
return rows, columns
def _suggest_column_mappings(self, columns: List[str]) -> List[ExcelColumnMapping]:
"""Auto-detect column mappings based on column names."""
mappings = []
mapping_rules = {
"product_name": ["product", "name", "product name", "item", "chemical"],
"quantity": ["qty", "quantity", "amount", "count"],
"unit": ["unit", "uom", "unit of measure"],
"manufacturer": ["manufacturer", "mfr", "vendor", "supplier", "brand"],
"barcode": ["barcode", "upc", "ean", "sku", "product code"],
"site": ["site", "location", "warehouse", "facility"],
"location": ["shelf", "bin", "storage location", "area"],
"notes": ["notes", "comments", "remarks", "description"],
}
for col in columns:
col_lower = col.lower().strip()
matched_field = None
for field, keywords in mapping_rules.items():
if any(kw in col_lower for kw in keywords):
matched_field = field
break
if matched_field:
mappings.append(ExcelColumnMapping(
source_column=col,
target_field=matched_field,
is_required=matched_field in ["product_name", "quantity"],
))
return mappings
def _validate_excel_rows(
self,
rows: List[Dict],
mappings: List[ExcelColumnMapping],
company_id: UUID,
) -> List[Dict]:
"""Validate rows against mappings and business rules."""
results = []
mapping_dict = {m.source_column: m.target_field for m in mappings}
for i, row in enumerate(rows, start=2): # Start at 2 (1-indexed, skip header)
errors = []
warnings = []
mapped_data = {}
# Map columns
for source_col, target_field in mapping_dict.items():
value = row.get(source_col)
mapped_data[target_field] = value
# Validate required fields
if not mapped_data.get("product_name"):
errors.append("Product name is required")
quantity = mapped_data.get("quantity")
if quantity is not None:
try:
qty = float(quantity)
if qty <= 0:
errors.append("Quantity must be positive")
except (ValueError, TypeError):
errors.append(f"Invalid quantity: {quantity}")
# Validate site if provided
site_name = mapped_data.get("site")
if site_name:
# Would check against company sites
pass # Validation deferred to background job
results.append({
"row_number": i,
"data": mapped_data,
"is_valid": len(errors) == 0,
"errors": errors,
"warnings": warnings,
})
return results
async def get_excel_template(self) -> bytes:
"""Generate downloadable Excel template."""
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill
wb = Workbook()
ws = wb.active
ws.title = "Inventory Import"
# Define columns
columns = [
("Product Name", "Required - Name of the chemical product", "Clorox Disinfecting Bleach"),
("Quantity", "Required - Numeric quantity", "5"),
("Unit", "Optional - Unit of measure (default: each)", "gallons"),
("Manufacturer", "Optional - Product manufacturer", "The Clorox Company"),
("Barcode/UPC", "Optional - Product barcode for matching", "044600011290"),
("Site Name", "Optional - Must match existing site", "Main Warehouse"),
("Location Name", "Optional - Location within site", "Shelf A-1"),
("Notes", "Optional - Any additional notes", "Reorder when low"),
]
# Header row with styling
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
header_font = Font(color="FFFFFF", bold=True)
for col_idx, (name, desc, example) in enumerate(columns, start=1):
cell = ws.cell(row=1, column=col_idx, value=name)
cell.fill = header_fill
cell.font = header_font
ws.column_dimensions[cell.column_letter].width = max(len(name), 20)
# Example row
for col_idx, (name, desc, example) in enumerate(columns, start=1):
ws.cell(row=2, column=col_idx, value=example)
# Instructions sheet
instructions = wb.create_sheet("Instructions")
instructions.cell(row=1, column=1, value="Column Instructions").font = Font(bold=True, size=14)
for row_idx, (name, desc, example) in enumerate(columns, start=3):
instructions.cell(row=row_idx, column=1, value=name).font = Font(bold=True)
instructions.cell(row=row_idx, column=2, value=desc)
instructions.cell(row=row_idx, column=3, value=f"Example: {example}")
# Save to bytes
output = BytesIO()
wb.save(output)
return output.getvalue()
Background Job - Excel Processing
# File: tellus-ehs-background-service/app/services/inventory_import/excel_processor.py
"""
Excel Processing Service - Background Job
Processes uploaded Excel files and creates staging items.
"""
from datetime import datetime
from typing import Dict, Any, List, Optional
from uuid import UUID
from decimal import Decimal
import csv
from io import BytesIO
from openpyxl import load_workbook
from app.core.logging import get_logger
from app.db import get_async_db
from app.db.repositories.unit_of_work import AsyncUnitOfWork
from app.utils.s3_client import download_file_from_s3
logger = get_logger(__name__)
class ExcelProcessorService:
"""
Service for processing Excel imports in background.
"""
def __init__(self, batch_size: int = 100):
self.batch_size = batch_size
async def process_pending_jobs(self) -> Dict[str, Any]:
"""Process pending excel_processing jobs."""
results = {
"processed": 0,
"succeeded": 0,
"failed": 0,
"errors": [],
}
async with get_async_db() as db:
uow = AsyncUnitOfWork(db)
pending_jobs = await uow.inventory_jobs.get_pending_jobs(
job_type="excel_processing",
batch_size=5,
)
for job in pending_jobs:
try:
success = await self._process_excel_job(uow, job)
results["processed"] += 1
if success:
results["succeeded"] += 1
else:
results["failed"] += 1
except Exception as e:
logger.error(f"Excel processing failed for job {job.job_id}: {e}")
await uow.inventory_jobs.mark_failed(job, str(e))
await uow.commit()
results["failed"] += 1
results["errors"].append({"job_id": str(job.job_id), "error": str(e)})
return results
async def _process_excel_job(
self,
uow: AsyncUnitOfWork,
job,
) -> bool:
"""Process a single Excel import job."""
logger.info(f"Processing Excel job {job.job_id} for batch {job.batch_id}")
await uow.inventory_jobs.update_status(job, "processing")
await uow.commit()
try:
input_data = job.input_data
s3_key = input_data["s3_key"]
column_mappings = input_data["column_mappings"]
default_site_id = input_data.get("default_site_id")
default_location_id = input_data.get("default_location_id")
# Download file from S3
file_content = await download_file_from_s3(s3_key)
# Parse file
if s3_key.endswith('.csv'):
rows = self._parse_csv(file_content)
else:
rows = self._parse_excel(file_content)
# Get batch
batch = await uow.inventory_batches.get_by_id(job.batch_id)
company_id = batch.company_id
# Resolve site/location IDs
site_id_cache = {}
location_id_cache = {}
created_count = 0
error_count = 0
# Process rows in batches
for row in rows:
try:
staging_item = await self._create_staging_item_from_row(
uow=uow,
company_id=company_id,
batch_id=job.batch_id,
row=row,
mappings=column_mappings,
default_site_id=default_site_id,
default_location_id=default_location_id,
site_cache=site_id_cache,
location_cache=location_id_cache,
)
if staging_item:
created_count += 1
# Queue matching job for this item
await uow.inventory_jobs.create(
company_id=company_id,
job_type="product_matching",
staging_id=staging_item.staging_id,
priority=0,
)
except Exception as e:
logger.warning(f"Failed to process row: {e}")
error_count += 1
# Update batch status
await uow.inventory_batches.update(batch, {
"total_items": created_count,
"pending_count": created_count,
"processing_status": "ready_for_review" if created_count > 0 else "failed",
"processing_error": f"{error_count} rows failed" if error_count > 0 else None,
})
await uow.inventory_jobs.update_status(job, "completed", result_data={
"created_count": created_count,
"error_count": error_count,
})
await uow.commit()
logger.info(f"Excel processing complete: {created_count} items created, {error_count} errors")
return True
except Exception as e:
logger.error(f"Excel processing failed: {e}")
batch = await uow.inventory_batches.get_by_id(job.batch_id)
await uow.inventory_batches.update(batch, {
"processing_status": "failed",
"processing_error": str(e),
})
await uow.inventory_jobs.mark_failed(job, str(e))
await uow.commit()
return False
async def _create_staging_item_from_row(
self,
uow: AsyncUnitOfWork,
company_id: UUID,
batch_id: UUID,
row: Dict,
mappings: List[Dict],
default_site_id: Optional[str],
default_location_id: Optional[str],
site_cache: Dict,
location_cache: Dict,
):
"""Create a staging item from an Excel row."""
mapping_dict = {m["source_column"]: m["target_field"] for m in mappings}
# Extract mapped values
product_name = None
quantity = None
unit = "each"
manufacturer = None
barcode = None
site_name = None
location_name = None
notes = None
for source_col, value in row.items():
target = mapping_dict.get(source_col)
if not target or value is None:
continue
value = str(value).strip() if value else None
if target == "product_name":
product_name = value
elif target == "quantity":
try:
quantity = Decimal(str(value))
except:
quantity = Decimal("1")
elif target == "unit":
unit = value or "each"
elif target == "manufacturer":
manufacturer = value
elif target == "barcode":
barcode = value
elif target == "site":
site_name = value
elif target == "location":
location_name = value
elif target == "notes":
notes = value
if not product_name:
return None # Skip rows without product name
# Resolve site ID
site_id = None
if site_name and site_name not in site_cache:
site = await uow.sites.find_by_name(company_id, site_name)
site_cache[site_name] = site.site_id if site else None
site_id = site_cache.get(site_name) or (UUID(default_site_id) if default_site_id else None)
# Resolve location ID
location_id = None
if location_name and site_id:
cache_key = f"{site_id}:{location_name}"
if cache_key not in location_cache:
location = await uow.locations.find_by_name(site_id, location_name)
location_cache[cache_key] = location.location_id if location else None
location_id = location_cache.get(cache_key) or (UUID(default_location_id) if default_location_id else None)
# Create staging item
return await uow.inventory_staging.create(
company_id=company_id,
upload_batch_id=batch_id,
upload_method="excel",
created_by=batch.created_by,
raw_product_name=product_name,
raw_manufacturer=manufacturer,
raw_barcode=barcode,
raw_quantity=quantity or Decimal("1"),
raw_unit=unit,
site_id=site_id,
location_id=location_id,
capture_notes=notes,
status="pending_processing",
)
def _parse_excel(self, content: bytes) -> List[Dict]:
"""Parse Excel file."""
wb = load_workbook(BytesIO(content), read_only=True)
ws = wb.active
rows = list(ws.iter_rows(values_only=True))
if not rows:
return []
columns = [str(c) if c else f"Column_{i}" for i, c in enumerate(rows[0])]
return [
{columns[i]: v for i, v in enumerate(row) if i < len(columns)}
for row in rows[1:] if any(row)
]
def _parse_csv(self, content: bytes) -> List[Dict]:
"""Parse CSV file."""
text = content.decode('utf-8-sig')
return list(csv.DictReader(text.splitlines()))
Invoice Upload with AI Parsing
Service Layer - Invoice Upload
# Add to app/services/chemiq/inventory_staging_service.py
class InventoryStagingService:
# ... existing methods ...
# ============ Invoice Upload ============
async def upload_invoice(
self,
company_id: UUID,
user_id: UUID,
file: UploadFile,
default_site_id: Optional[UUID] = None,
default_location_id: Optional[UUID] = None,
) -> Dict[str, Any]:
"""
Upload invoice for AI parsing.
Creates batch and queues background job.
"""
# Validate file type
allowed_types = ['.pdf', '.png', '.jpg', '.jpeg']
if not any(file.filename.lower().endswith(t) for t in allowed_types):
raise ValueError("File must be PDF or image (PNG, JPG)")
# Upload to S3
content = await file.read()
s3_key = f"invoices/{company_id}/{user_id}/{datetime.utcnow().isoformat()}_{file.filename}"
await upload_to_s3(content, s3_key)
# Create batch
batch = self.repo.create_batch(
company_id=company_id,
upload_method="invoice",
created_by=user_id,
source_filename=file.filename,
source_file_s3_key=s3_key,
)
batch.processing_status = "processing"
# Queue background job
self.repo.create_processing_job(
company_id=company_id,
job_type="invoice_parsing",
batch_id=batch.batch_id,
input_data={
"s3_key": s3_key,
"filename": file.filename,
"default_site_id": str(default_site_id) if default_site_id else None,
"default_location_id": str(default_location_id) if default_location_id else None,
},
)
self.db.commit()
return {
"batch_id": batch.batch_id,
"filename": file.filename,
"status": "processing",
"message": "Invoice uploaded. AI extraction in progress.",
}
Background Job - Invoice Parsing with LLM
# File: tellus-ehs-background-service/app/services/inventory_import/invoice_parser.py
"""
Invoice Parsing Service - Background Job
Uses LLM to extract line items from invoices.
"""
import json
from datetime import datetime
from typing import Dict, Any, List, Optional
from uuid import UUID
from decimal import Decimal
from app.core.logging import get_logger
from app.core.config import settings
from app.db import get_async_db
from app.db.repositories.unit_of_work import AsyncUnitOfWork
from app.utils.s3_client import download_file_from_s3
from app.services.sds_parse.pdf_extractor import SDSPDFExtractor
logger = get_logger(__name__)
class InvoiceParserService:
"""
Service for parsing invoices using LLM.
"""
EXTRACTION_PROMPT = """
You are an expert at extracting product information from purchase invoices.
Analyze the following invoice text and extract all product line items.
For each product, extract:
- product_name: Full product name as shown
- manufacturer: Manufacturer/brand if mentioned
- quantity: Numeric quantity ordered
- unit: Unit of measure (each, case, gallon, etc.)
- unit_price: Price per unit (optional)
- barcode: UPC/EAN if visible (optional)
Return a JSON array of objects with these fields.
If a field is not present, use null.
Invoice Text:
{invoice_text}
Return ONLY valid JSON array, no other text.
"""
def __init__(self):
self.pdf_extractor = SDSPDFExtractor(enable_ocr=True)
self._init_llm_client()
def _init_llm_client(self):
"""Initialize LLM client."""
from openai import OpenAI
self.client = OpenAI(api_key=settings.OPENAI_API_KEY)
async def process_pending_jobs(self) -> Dict[str, Any]:
"""Process pending invoice_parsing jobs."""
results = {"processed": 0, "succeeded": 0, "failed": 0, "errors": []}
async with get_async_db() as db:
uow = AsyncUnitOfWork(db)
pending_jobs = await uow.inventory_jobs.get_pending_jobs(
job_type="invoice_parsing",
batch_size=3, # Lower batch size due to LLM cost
)
for job in pending_jobs:
try:
success = await self._process_invoice_job(uow, job)
results["processed"] += 1
if success:
results["succeeded"] += 1
else:
results["failed"] += 1
except Exception as e:
logger.error(f"Invoice parsing failed: {e}")
await uow.inventory_jobs.mark_failed(job, str(e))
await uow.commit()
results["failed"] += 1
return results
async def _process_invoice_job(self, uow: AsyncUnitOfWork, job) -> bool:
"""Process a single invoice parsing job."""
logger.info(f"Processing invoice job {job.job_id}")
await uow.inventory_jobs.update_status(job, "processing")
await uow.commit()
try:
input_data = job.input_data
s3_key = input_data["s3_key"]
default_site_id = input_data.get("default_site_id")
default_location_id = input_data.get("default_location_id")
# Download file
file_content = await download_file_from_s3(s3_key)
# Extract text (PDF or image)
if s3_key.endswith('.pdf'):
invoice_text, _ = self.pdf_extractor.extract_text(file_content)
else:
# OCR for images
invoice_text = await self._ocr_image(file_content)
if len(invoice_text.strip()) < 50:
raise ValueError("Could not extract text from invoice")
# Parse with LLM
line_items = await self._extract_line_items(invoice_text)
# Get batch info
batch = await uow.inventory_batches.get_by_id(job.batch_id)
company_id = batch.company_id
# Create staging items
created_count = 0
for item in line_items:
try:
staging = await uow.inventory_staging.create(
company_id=company_id,
upload_batch_id=job.batch_id,
upload_method="invoice",
created_by=batch.created_by,
raw_product_name=item.get("product_name"),
raw_manufacturer=item.get("manufacturer"),
raw_barcode=item.get("barcode"),
raw_quantity=Decimal(str(item.get("quantity", 1))),
raw_unit=item.get("unit", "each"),
site_id=UUID(default_site_id) if default_site_id else None,
location_id=UUID(default_location_id) if default_location_id else None,
status="pending_processing",
)
created_count += 1
# Queue matching job
await uow.inventory_jobs.create(
company_id=company_id,
job_type="product_matching",
staging_id=staging.staging_id,
)
except Exception as e:
logger.warning(f"Failed to create staging item: {e}")
# Update batch
await uow.inventory_batches.update(batch, {
"total_items": created_count,
"pending_count": created_count,
"processing_status": "ready_for_review" if created_count > 0 else "failed",
})
await uow.inventory_jobs.update_status(job, "completed", result_data={
"extracted_count": len(line_items),
"created_count": created_count,
})
await uow.commit()
logger.info(f"Invoice parsing complete: {created_count} items extracted")
return True
except Exception as e:
logger.error(f"Invoice parsing error: {e}")
batch = await uow.inventory_batches.get_by_id(job.batch_id)
await uow.inventory_batches.update(batch, {
"processing_status": "failed",
"processing_error": str(e),
})
await uow.inventory_jobs.mark_failed(job, str(e))
await uow.commit()
return False
async def _extract_line_items(self, invoice_text: str) -> List[Dict]:
"""Use LLM to extract line items from invoice text."""
prompt = self.EXTRACTION_PROMPT.format(invoice_text=invoice_text[:8000])
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You extract product data from invoices. Always return valid JSON."},
{"role": "user", "content": prompt},
],
response_format={"type": "json_object"},
max_tokens=4000,
)
content = response.choices[0].message.content
result = json.loads(content)
# Handle both {"items": [...]} and [...] formats
if isinstance(result, list):
return result
elif isinstance(result, dict) and "items" in result:
return result["items"]
else:
return []
async def _ocr_image(self, image_content: bytes) -> str:
"""Extract text from image using OCR."""
# Use existing PDF extractor's OCR capability
# or implement cloud vision API call
from PIL import Image
import pytesseract
from io import BytesIO
image = Image.open(BytesIO(image_content))
return pytesseract.image_to_string(image)
Add from SDS Library
Service Layer
# Add to app/services/chemiq/inventory_staging_service.py
class InventoryStagingService:
# ... existing methods ...
# ============ Add from SDS Library ============
async def add_from_sds_library(
self,
company_id: UUID,
user_id: UUID,
items: List[SDSInventoryItem],
) -> Dict[str, Any]:
"""
Create inventory entries from selected SDS documents.
This method can optionally skip staging if all required info is provided,
or create staging items for review.
"""
created_items = []
staging_items = []
# Create batch
batch = self.repo.create_batch(
company_id=company_id,
upload_method="sds_library",
created_by=user_id,
)
for item in items:
# Get SDS document
sds = await self._get_sds_document(company_id, item.sds_id)
if not sds:
continue
# Check if all required fields are provided
has_complete_info = (
item.site_id and
item.location_id and
item.quantity and
item.unit
)
if has_complete_info:
# Create staging item with 'approved' status (skips review)
staging = self.repo.create_staging_item(
company_id=company_id,
batch_id=batch.batch_id,
upload_method="sds_library",
created_by=user_id,
raw_product_name=sds.product_name,
raw_manufacturer=sds.manufacturer,
raw_quantity=item.quantity,
raw_unit=item.unit,
site_id=item.site_id,
location_id=item.location_id,
matched_sds_id=item.sds_id,
match_confidence=Decimal("1.0"),
match_method="sds_library_direct",
approved_product_name=sds.product_name,
approved_manufacturer=sds.manufacturer,
approved_quantity=item.quantity,
approved_unit=item.unit,
capture_notes=item.notes,
status="approved", # Skip review
reviewed_by=user_id,
reviewed_at=datetime.utcnow(),
)
staging_items.append(staging)
else:
# Create staging item requiring review
staging = self.repo.create_staging_item(
company_id=company_id,
batch_id=batch.batch_id,
upload_method="sds_library",
created_by=user_id,
raw_product_name=sds.product_name,
raw_manufacturer=sds.manufacturer,
raw_quantity=item.quantity,
raw_unit=item.unit,
site_id=item.site_id,
location_id=item.location_id,
matched_sds_id=item.sds_id,
match_confidence=Decimal("1.0"),
match_method="sds_library_direct",
capture_notes=item.notes,
status="pending_review",
)
staging_items.append(staging)
# Update batch counts
self.repo.update_batch_counts(batch.batch_id)
# Check if batch is ready for completion (all approved)
approved_count = sum(1 for s in staging_items if s.status == "approved")
if approved_count == len(staging_items):
batch.processing_status = "completed"
else:
batch.processing_status = "ready_for_review"
self.db.commit()
return {
"batch_id": batch.batch_id,
"total_items": len(staging_items),
"approved_items": approved_count,
"pending_review": len(staging_items) - approved_count,
"message": f"Added {len(staging_items)} items from SDS library",
}
async def _get_sds_document(self, company_id: UUID, sds_id: UUID):
"""Get SDS document if mapped to company."""
from app.db.repositories.chemiq.sds_library_repository import SDSLibraryRepository
sds_repo = SDSLibraryRepository(self.db)
return sds_repo.get_sds_with_mapping(company_id, sds_id)
API Endpoint
# Add to app/api/v1/chemiq/inventory_staging.py
@router.post(
"/from-sds",
response_model=Dict[str, Any],
status_code=status.HTTP_201_CREATED,
)
async def add_from_sds_library(
request: AddFromSDSRequest,
db: Session = Depends(get_db),
current_user = Depends(get_current_user),
company_id: UUID = Depends(get_company_id),
):
"""
Create inventory entries from SDS library documents.
If all required fields (site, location, quantity, unit) are provided,
items are auto-approved. Otherwise, they go to the review queue.
"""
service = InventoryStagingService(db)
result = await service.add_from_sds_library(
company_id=company_id,
user_id=current_user.user_id,
items=request.items,
)
return result
Copy from Site
Pydantic Schemas
# Add to app/schemas/chemiq/inventory_staging.py
class SiteCopyRequest(BaseModel):
"""Request for copying inventory from another site."""
source_site_id: UUID
target_site_id: UUID
inventory_ids: List[UUID] # Product catalog IDs to copy
target_location_id: Optional[UUID] = None # Default location at target
copy_mode: str = Field(default="copy", pattern="^(copy|transfer)$")
copy_quantities: bool = True # Whether to copy quantities or set to 0
class SiteCopyResponse(BaseModel):
"""Response for site copy operation."""
batch_id: UUID
copied_count: int
skipped_count: int
skipped_items: List[Dict[str, str]] # {id, reason}
message: str
Service Layer
# Add to app/services/chemiq/inventory_staging_service.py
class InventoryStagingService:
# ... existing methods ...
# ============ Copy from Site ============
async def copy_from_site(
self,
company_id: UUID,
user_id: UUID,
request: SiteCopyRequest,
) -> Dict[str, Any]:
"""
Copy or transfer inventory from one site to another.
Copy mode: Creates new entries at target (source unchanged)
Transfer mode: Moves entries to target (source marked inactive)
"""
from app.db.repositories.chemiq.inventory_repository import InventoryRepository
inventory_repo = InventoryRepository(self.db)
# Validate sites belong to company
source_site = await self._validate_site(company_id, request.source_site_id)
target_site = await self._validate_site(company_id, request.target_site_id)
if not source_site or not target_site:
raise ValueError("Invalid source or target site")
if request.source_site_id == request.target_site_id:
raise ValueError("Source and target sites must be different")
# Create batch
batch = self.repo.create_batch(
company_id=company_id,
upload_method="site_copy",
created_by=user_id,
)
copied_count = 0
skipped_items = []
for inventory_id in request.inventory_ids:
# Get source inventory item
source_item = inventory_repo.get_by_id(inventory_id)
if not source_item or source_item.company_id != company_id:
skipped_items.append({
"id": str(inventory_id),
"reason": "Not found or access denied",
})
continue
if str(source_item.site_id) != str(request.source_site_id):
skipped_items.append({
"id": str(inventory_id),
"reason": "Item not at source site",
})
continue
# Create staging item for target
quantity = source_item.quantity if request.copy_quantities else Decimal("0")
staging = self.repo.create_staging_item(
company_id=company_id,
batch_id=batch.batch_id,
upload_method="site_copy",
created_by=user_id,
raw_product_name=source_item.product_name,
raw_manufacturer=source_item.manufacturer,
raw_barcode=source_item.barcode_upc,
raw_quantity=quantity,
raw_unit=source_item.unit or "each",
site_id=request.target_site_id,
location_id=request.target_location_id,
matched_company_product_id=source_item.company_product_id,
matched_sds_id=source_item.current_sds_id,
match_confidence=Decimal("1.0"),
match_method="site_copy",
approved_product_name=source_item.product_name,
approved_manufacturer=source_item.manufacturer,
approved_quantity=quantity,
approved_unit=source_item.unit or "each",
capture_notes=f"Copied from {source_site.site_name}",
status="approved", # Pre-approved since source is verified
reviewed_by=user_id,
reviewed_at=datetime.utcnow(),
)
# If transfer mode, mark source as inactive
if request.copy_mode == "transfer":
inventory_repo.update(source_item, {
"is_active": False,
"notes": f"Transferred to {target_site.site_name} on {datetime.utcnow().date()}",
})
copied_count += 1
# Update batch
self.repo.update_batch_counts(batch.batch_id)
batch.processing_status = "completed" if copied_count > 0 else "failed"
self.db.commit()
action = "transferred" if request.copy_mode == "transfer" else "copied"
return {
"batch_id": batch.batch_id,
"copied_count": copied_count,
"skipped_count": len(skipped_items),
"skipped_items": skipped_items,
"message": f"Successfully {action} {copied_count} items to {target_site.site_name}",
}
async def _validate_site(self, company_id: UUID, site_id: UUID):
"""Validate site belongs to company."""
from app.db.repositories.sites_repository import SitesRepository
sites_repo = SitesRepository(self.db)
site = sites_repo.get_by_id(site_id)
if site and site.company_id == company_id:
return site
return None
API Endpoint
# Add to app/api/v1/chemiq/inventory_staging.py
@router.post(
"/copy-site",
response_model=SiteCopyResponse,
status_code=status.HTTP_201_CREATED,
)
async def copy_from_site(
request: SiteCopyRequest,
db: Session = Depends(get_db),
current_user = Depends(get_current_user),
company_id: UUID = Depends(get_company_id),
):
"""
Copy or transfer inventory from one site to another.
- copy: Creates duplicate entries at target site (source unchanged)
- transfer: Moves entries to target site (source marked inactive)
All copied items are pre-approved and ready for inventory creation.
"""
service = InventoryStagingService(db)
result = await service.copy_from_site(
company_id=company_id,
user_id=current_user.user_id,
request=request,
)
return SiteCopyResponse(**result)
OCR Extraction Background Job
# File: tellus-ehs-background-service/app/services/inventory_import/ocr_extractor.py
"""
OCR Extraction Service - Background Job
Extracts text and barcodes from product images.
"""
from datetime import datetime
from typing import Dict, Any, List, Optional
from uuid import UUID
from decimal import Decimal
import re
from app.core.logging import get_logger
from app.db import get_async_db
from app.db.repositories.unit_of_work import AsyncUnitOfWork
from app.utils.s3_client import download_file_from_s3
logger = get_logger(__name__)
class OCRExtractorService:
"""
Service for extracting text from product images.
"""
# Common barcode patterns
BARCODE_PATTERNS = [
r'\b\d{12,14}\b', # UPC-A, EAN-13, EAN-14
r'\b\d{8}\b', # UPC-E, EAN-8
]
def __init__(self):
self._init_ocr()
def _init_ocr(self):
"""Initialize OCR engine."""
try:
import pytesseract
self.tesseract_available = True
except ImportError:
self.tesseract_available = False
logger.warning("Tesseract not available, using cloud OCR")
async def process_pending_jobs(self) -> Dict[str, Any]:
"""Process pending ocr_extraction jobs."""
results = {"processed": 0, "succeeded": 0, "failed": 0, "errors": []}
async with get_async_db() as db:
uow = AsyncUnitOfWork(db)
pending_jobs = await uow.inventory_jobs.get_pending_jobs(
job_type="ocr_extraction",
batch_size=10,
)
for job in pending_jobs:
try:
success = await self._process_ocr_job(uow, job)
results["processed"] += 1
if success:
results["succeeded"] += 1
else:
results["failed"] += 1
except Exception as e:
logger.error(f"OCR extraction failed: {e}")
await uow.inventory_jobs.mark_failed(job, str(e))
await uow.commit()
results["failed"] += 1
return results
async def _process_ocr_job(self, uow: AsyncUnitOfWork, job) -> bool:
"""Process a single OCR extraction job."""
logger.info(f"Processing OCR job {job.job_id} for staging {job.staging_id}")
await uow.inventory_jobs.update_status(job, "processing")
await uow.commit()
try:
# Get staging item
staging = await uow.inventory_staging.get_by_id(job.staging_id)
if not staging:
raise ValueError(f"Staging item {job.staging_id} not found")
image_urls = job.input_data.get("image_urls", [])
if not image_urls:
image_urls = staging.product_image_urls or []
if not image_urls:
raise ValueError("No images to process")
# Process each image
all_text = []
detected_barcodes = []
for url in image_urls:
try:
# Download image
image_content = await self._download_image(url)
# Extract text
text = await self._extract_text(image_content)
all_text.append(text)
# Try to detect barcode from text
barcodes = self._extract_barcodes(text)
detected_barcodes.extend(barcodes)
# Also try visual barcode detection
visual_barcode = await self._detect_barcode_visual(image_content)
if visual_barcode:
detected_barcodes.append(visual_barcode)
except Exception as e:
logger.warning(f"Failed to process image {url}: {e}")
# Combine results
combined_text = "\n---\n".join(all_text)
primary_barcode = detected_barcodes[0] if detected_barcodes else None
# Extract product name from OCR text
product_name = self._extract_product_name(combined_text)
# Update staging item
updates = {
"ocr_extracted_text": combined_text[:5000], # Limit storage
}
if primary_barcode:
updates["detected_barcode"] = primary_barcode
if not staging.raw_barcode:
updates["raw_barcode"] = primary_barcode
if product_name and not staging.raw_product_name:
updates["raw_product_name"] = product_name
await uow.inventory_staging.update(staging, updates)
# Queue product matching job
await uow.inventory_jobs.create(
company_id=staging.company_id,
job_type="product_matching",
staging_id=staging.staging_id,
priority=0,
input_data={
"barcode": primary_barcode,
"ocr_text": combined_text[:2000],
},
)
await uow.inventory_jobs.update_status(job, "completed", result_data={
"text_length": len(combined_text),
"barcode_detected": primary_barcode,
"product_name": product_name,
})
await uow.commit()
logger.info(f"OCR extraction complete: barcode={primary_barcode}")
return True
except Exception as e:
logger.error(f"OCR extraction error: {e}")
await uow.inventory_jobs.mark_failed(job, str(e))
# Still queue for matching (might work without OCR)
staging = await uow.inventory_staging.get_by_id(job.staging_id)
if staging:
await uow.inventory_staging.update(staging, {"status": "pending_processing"})
await uow.inventory_jobs.create(
company_id=staging.company_id,
job_type="product_matching",
staging_id=staging.staging_id,
)
await uow.commit()
return False
async def _download_image(self, url: str) -> bytes:
"""Download image from S3 or URL."""
if url.startswith('s3://') or 'amazonaws.com' in url:
# Parse S3 key from URL
s3_key = self._parse_s3_key(url)
return await download_file_from_s3(s3_key)
else:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.read()
async def _extract_text(self, image_content: bytes) -> str:
"""Extract text from image using OCR."""
if self.tesseract_available:
from PIL import Image
import pytesseract
from io import BytesIO
image = Image.open(BytesIO(image_content))
return pytesseract.image_to_string(image)
else:
# Use cloud OCR (Google Vision, AWS Textract, etc.)
return await self._cloud_ocr(image_content)
async def _cloud_ocr(self, image_content: bytes) -> str:
"""Use cloud OCR service."""
# Placeholder for cloud OCR implementation
# Could use Google Cloud Vision, AWS Textract, or Azure Computer Vision
logger.warning("Cloud OCR not implemented")
return ""
async def _detect_barcode_visual(self, image_content: bytes) -> Optional[str]:
"""Detect barcode visually from image."""
try:
from pyzbar.pyzbar import decode
from PIL import Image
from io import BytesIO
image = Image.open(BytesIO(image_content))
barcodes = decode(image)
for barcode in barcodes:
return barcode.data.decode('utf-8')
except Exception as e:
logger.warning(f"Visual barcode detection failed: {e}")
return None
def _extract_barcodes(self, text: str) -> List[str]:
"""Extract potential barcodes from OCR text."""
barcodes = []
for pattern in self.BARCODE_PATTERNS:
matches = re.findall(pattern, text)
barcodes.extend(matches)
return list(set(barcodes))
def _extract_product_name(self, text: str) -> Optional[str]:
"""Try to extract product name from OCR text."""
lines = text.split('\n')
for line in lines[:10]: # Check first 10 lines
line = line.strip()
# Skip lines that look like barcodes, dates, or numbers
if len(line) > 5 and not line.isdigit() and not re.match(r'^\d+[-/]\d+', line):
# Could be a product name
return line[:200] # Limit length
return None
def _parse_s3_key(self, url: str) -> str:
"""Parse S3 key from URL."""
# Handle various S3 URL formats
if 's3.amazonaws.com' in url:
parts = url.split('s3.amazonaws.com/')
if len(parts) > 1:
return parts[1].split('?')[0]
elif 's3://' in url:
return url.replace('s3://', '').split('/', 1)[1]
return url
Frontend Components - Advanced Features
Add from SDS Modal
// File: src/pages/chemiq/inventory/QuickImport/components/AddFromSDS/AddFromSDSModal.tsx
import React, { useState, useEffect } from 'react';
import { X, Search, Link, Check, Plus } from 'lucide-react';
import toast from 'react-hot-toast';
import { useAppSelector } from '../../../../../../store/hooks';
import { listCompanySDSDocuments } from '../../../../../../services/api';
import { addFromSDSLibrary } from '../../../../../../services/api/inventory-staging';
interface SDSItem {
sds_id: string;
product_name: string;
manufacturer: string;
signal_word?: string;
revision_date?: string;
}
interface SelectedItem extends SDSItem {
site_id: string;
location_id: string;
quantity: number;
unit: string;
notes?: string;
}
interface AddFromSDSModalProps {
isOpen: boolean;
onClose: () => void;
onComplete: () => void;
sites: Array<{ site_id: string; site_name: string }>;
locations: Array<{ location_id: string; location_name: string; site_id: string }>;
}
const AddFromSDSModal: React.FC<AddFromSDSModalProps> = ({
isOpen,
onClose,
onComplete,
sites,
locations,
}) => {
const { user, accessToken } = useAppSelector((state) => state.auth);
const token = accessToken?.replace(/"/g, '') || '';
const companyId = user?.last_company_id || '';
const userId = user?.user_id || '';
const [step, setStep] = useState<'search' | 'details'>('search');
const [searchTerm, setSearchTerm] = useState('');
const [searchResults, setSearchResults] = useState<SDSItem[]>([]);
const [selectedItems, setSelectedItems] = useState<SelectedItem[]>([]);
const [isSearching, setIsSearching] = useState(false);
const [isSubmitting, setIsSubmitting] = useState(false);
const [defaultSiteId, setDefaultSiteId] = useState('');
const [defaultLocationId, setDefaultLocationId] = useState('');
const handleSearch = async () => {
if (!searchTerm.trim()) return;
setIsSearching(true);
try {
const results = await listCompanySDSDocuments(
token, userId, companyId, 1, 50, searchTerm
);
setSearchResults(results.sds_documents || []);
} catch (err) {
toast.error('Search failed');
} finally {
setIsSearching(false);
}
};
const toggleSelectSDS = (sds: SDSItem) => {
const exists = selectedItems.find(s => s.sds_id === sds.sds_id);
if (exists) {
setSelectedItems(selectedItems.filter(s => s.sds_id !== sds.sds_id));
} else {
setSelectedItems([...selectedItems, {
...sds,
site_id: defaultSiteId,
location_id: defaultLocationId,
quantity: 1,
unit: 'each',
}]);
}
};
const updateSelectedItem = (sdsId: string, updates: Partial<SelectedItem>) => {
setSelectedItems(items =>
items.map(item =>
item.sds_id === sdsId ? { ...item, ...updates } : item
)
);
};
const handleSubmit = async () => {
if (selectedItems.length === 0) {
toast.error('Please select at least one SDS');
return;
}
// Validate all items have site/location
const incomplete = selectedItems.filter(i => !i.site_id || !i.location_id);
if (incomplete.length > 0) {
toast.error(`${incomplete.length} items missing site/location`);
return;
}
setIsSubmitting(true);
try {
await addFromSDSLibrary(token, userId, companyId, {
items: selectedItems.map(item => ({
sds_id: item.sds_id,
site_id: item.site_id,
location_id: item.location_id,
quantity: item.quantity,
unit: item.unit,
notes: item.notes,
})),
});
toast.success(`Added ${selectedItems.length} items from SDS library`);
onComplete();
onClose();
} catch (err: any) {
toast.error(err.message || 'Failed to add items');
} finally {
setIsSubmitting(false);
}
};
if (!isOpen) return null;
return (
<div className="fixed inset-0 bg-black bg-opacity-50 flex items-center justify-center z-50">
<div className="bg-white rounded-lg shadow-xl w-full max-w-4xl mx-4 max-h-[90vh] overflow-hidden flex flex-col">
{/* Header */}
<div className="px-6 py-4 border-b border-border-light flex items-center justify-between">
<div className="flex items-center gap-2">
<Link className="h-5 w-5 text-purple-500" />
<h2 className="text-lg font-semibold text-text-main">
Add from SDS Library
</h2>
</div>
<button onClick={onClose} className="text-text-muted hover:text-text-main">
<X className="h-5 w-5" />
</button>
</div>
{/* Content */}
<div className="flex-1 overflow-auto p-6">
{step === 'search' && (
<>
{/* Search */}
<div className="flex gap-2 mb-4">
<input
type="text"
value={searchTerm}
onChange={(e) => setSearchTerm(e.target.value)}
onKeyDown={(e) => e.key === 'Enter' && handleSearch()}
placeholder="Search SDS library..."
className="flex-1 input-field"
/>
<button onClick={handleSearch} disabled={isSearching} className="btn-primary">
<Search className="h-4 w-4" />
</button>
</div>
{/* Default location */}
<div className="flex gap-4 mb-4 p-3 bg-surface-light rounded-lg">
<div className="flex-1">
<label className="block text-sm font-medium text-text-main mb-1">
Default Site
</label>
<select
value={defaultSiteId}
onChange={(e) => setDefaultSiteId(e.target.value)}
className="input-field"
>
<option value="">Select site...</option>
{sites.map(site => (
<option key={site.site_id} value={site.site_id}>
{site.site_name}
</option>
))}
</select>
</div>
<div className="flex-1">
<label className="block text-sm font-medium text-text-main mb-1">
Default Location
</label>
<select
value={defaultLocationId}
onChange={(e) => setDefaultLocationId(e.target.value)}
className="input-field"
disabled={!defaultSiteId}
>
<option value="">Select location...</option>
{locations
.filter(l => l.site_id === defaultSiteId)
.map(loc => (
<option key={loc.location_id} value={loc.location_id}>
{loc.location_name}
</option>
))}
</select>
</div>
</div>
{/* Results */}
<div className="border rounded-lg divide-y max-h-80 overflow-auto">
{searchResults.map(sds => {
const isSelected = selectedItems.some(s => s.sds_id === sds.sds_id);
return (
<div
key={sds.sds_id}
onClick={() => toggleSelectSDS(sds)}
className={`p-3 flex items-center gap-3 cursor-pointer hover:bg-surface-light ${
isSelected ? 'bg-purple-50' : ''
}`}
>
<div className={`w-5 h-5 rounded border flex items-center justify-center ${
isSelected ? 'bg-purple-500 border-purple-500' : 'border-gray-300'
}`}>
{isSelected && <Check className="h-3 w-3 text-white" />}
</div>
<div className="flex-1">
<p className="font-medium text-text-main">{sds.product_name}</p>
<p className="text-sm text-text-muted">{sds.manufacturer}</p>
</div>
{sds.signal_word && (
<span className={`text-xs px-2 py-1 rounded ${
sds.signal_word === 'Danger'
? 'bg-red-100 text-red-700'
: 'bg-yellow-100 text-yellow-700'
}`}>
{sds.signal_word}
</span>
)}
</div>
);
})}
{searchResults.length === 0 && searchTerm && !isSearching && (
<div className="p-8 text-center text-text-muted">
No SDS documents found
</div>
)}
</div>
</>
)}
{step === 'details' && (
<div className="space-y-4">
{selectedItems.map(item => (
<div key={item.sds_id} className="border rounded-lg p-4">
<div className="flex items-start justify-between mb-3">
<div>
<p className="font-medium text-text-main">{item.product_name}</p>
<p className="text-sm text-text-muted">{item.manufacturer}</p>
</div>
<button
onClick={() => setSelectedItems(items => items.filter(i => i.sds_id !== item.sds_id))}
className="text-text-muted hover:text-red-500"
>
<X className="h-4 w-4" />
</button>
</div>
<div className="grid grid-cols-4 gap-3">
<div>
<label className="block text-xs text-text-muted mb-1">Site</label>
<select
value={item.site_id}
onChange={(e) => updateSelectedItem(item.sds_id, { site_id: e.target.value })}
className="input-field text-sm"
>
<option value="">Select...</option>
{sites.map(s => (
<option key={s.site_id} value={s.site_id}>{s.site_name}</option>
))}
</select>
</div>
<div>
<label className="block text-xs text-text-muted mb-1">Location</label>
<select
value={item.location_id}
onChange={(e) => updateSelectedItem(item.sds_id, { location_id: e.target.value })}
className="input-field text-sm"
disabled={!item.site_id}
>
<option value="">Select...</option>
{locations
.filter(l => l.site_id === item.site_id)
.map(l => (
<option key={l.location_id} value={l.location_id}>{l.location_name}</option>
))}
</select>
</div>
<div>
<label className="block text-xs text-text-muted mb-1">Quantity</label>
<input
type="number"
value={item.quantity}
onChange={(e) => updateSelectedItem(item.sds_id, { quantity: parseFloat(e.target.value) || 0 })}
className="input-field text-sm"
min="0"
/>
</div>
<div>
<label className="block text-xs text-text-muted mb-1">Unit</label>
<select
value={item.unit}
onChange={(e) => updateSelectedItem(item.sds_id, { unit: e.target.value })}
className="input-field text-sm"
>
<option value="each">each</option>
<option value="bottle">bottle</option>
<option value="gallon">gallon</option>
<option value="case">case</option>
<option value="liter">liter</option>
</select>
</div>
</div>
</div>
))}
</div>
)}
</div>
{/* Footer */}
<div className="px-6 py-4 border-t border-border-light flex justify-between">
<div>
{selectedItems.length > 0 && (
<span className="text-sm text-text-muted">
{selectedItems.length} item{selectedItems.length !== 1 ? 's' : ''} selected
</span>
)}
</div>
<div className="flex gap-3">
<button onClick={onClose} className="btn-secondary">
Cancel
</button>
{step === 'search' && (
<button
onClick={() => setStep('details')}
disabled={selectedItems.length === 0}
className="btn-primary"
>
Continue
</button>
)}
{step === 'details' && (
<>
<button onClick={() => setStep('search')} className="btn-secondary">
Back
</button>
<button
onClick={handleSubmit}
disabled={isSubmitting || selectedItems.length === 0}
className="btn-primary"
>
{isSubmitting ? 'Adding...' : `Add ${selectedItems.length} Items`}
</button>
</>
)}
</div>
</div>
</div>
</div>
);
};
export default AddFromSDSModal;
Summary of Advanced Features
| Feature | API Service | Background Service | Frontend |
|---|---|---|---|
| Excel Import | Upload, preview, confirm | Parse file, validate rows, create staging items, queue matching | Upload modal, column mapper, preview table |
| Invoice Upload | Upload file | OCR/PDF extraction, LLM parsing, create staging items | Upload modal, progress indicator |
| Add from SDS Library | Create staging items (can be pre-approved) | N/A (direct creation) | SDS search modal, quantity entry |
| Copy from Site | Validate sites, create staging items (pre-approved) | N/A (direct creation) | Site selector, item list, copy/transfer mode |
| OCR Extraction | N/A | Download images, run OCR, detect barcodes, queue matching | N/A (background only) |