def make_var_names_unique(adatas: list[ad.AnnData]) -> list[ad.AnnData]:
for adata in adatas:
adata.var_names_make_unique()
return adatas
def merge_anndata(adatas: list[ad.AnnData]) -> ad.AnnData:
adata_merged = ad.concat(adatas, join="outer", axis=0, index_unique="-")
all_var = pd.concat([adata.var for adata in adatas], axis=0, join="outer")
all_var = all_var.loc[~all_var.index.duplicated(keep="first")]
adata_merged.var = all_var.loc[adata_merged.var_names]
adata_merged.obs["Code_name"] = np.concatenate(
[np.repeat(adata.obs["Code_name"].iloc[0], adata.n_obs) for adata in adatas]
)
return adata_merged
def create_sample_dict() -> dict[str, str]:
return {
"GSM7290760": "L1_N",
"GSM7290761": "L1_T",
"GSM7290762": "C1_N",
"GSM7290763": "C1_T",
"GSM7290764": "L2_N",
"GSM7290765": "L3_N",
"GSM7290766": "L4_N",
"GSM7290767": "L4_T",
"GSM7290768": "C2_N",
"GSM7290769": "C2_T",
"GSM7290770": "L5_N",
"GSM7290771": "C3_N",
"GSM7290772": "C3_T",
"GSM7290773": "C4_T",
"GSM7290774": "C5_T",
"GSM7290775": "L6_T",
"GSM7290776": "L7_N",
"GSM7290777": "C6_T",
"GSM7290778": "L8_T",
"GSM7290779": "L8_T",
"GSM7290780": "L9_N",
"GSM7290781": "L9_T",
"GSM7290782": "L10_T",
"GSM7290783": "L11_T",
"GSM7290784": "L11_N",
"GSM7290785": "L12_T",
}
def map_code_to_sample(code: str, sample_dict: dict[str, str]) -> str:
return sample_dict.get(code, None)
def create_metadata() -> pd.DataFrame:
metadata = {
"Patient": [
"C1",
"C2",
"C3",
"C4",
"C5",
"C6",
"L1",
"L4",
"L6",
"L8",
"L9",
"L11",
],
"Location": [
"Colon",
"Colon",
"Colon",
"Colon",
"Colon",
"Colon",
"Liver",
"Liver",
"Liver",
"Liver",
"Liver",
"Liver",
],
"Gender": ["M", "M", "F", "M", "M", "F", "F", "M", "F", "M", "M", "M"],
"Current_age": [47, 39, 65, 85, 72, 62, 64, 70, 68, 54, 56, 60],
"Stage_at_collection": [
"IV",
"IV",
"IV",
"II",
"IV",
"IV",
"IV",
"IV",
"IV",
"IV",
"IV",
"IV",
],
"MS_status": [
"MSS",
"MSS",
"MSS",
"MSI",
"MSS",
"MSS",
"MSS",
"MSS",
"MSS",
"MSS",
"MSS",
"MSS",
],
}
meta_df = pd.DataFrame(metadata)
meta_df.set_index("Patient", inplace=True)
return meta_df
def extract_patient_id(sample: str) -> str:
return sample.split("_")[0]
def determine_tissue(sample: str) -> str:
parts = sample.split("_")
if parts[-1].endswith("T"):
return "tumor"
elif parts[-1].endswith("N"):
return "normal"
else:
return "unknown"
def add_metadata_to_anndata(adata: ad.AnnData, meta_df: pd.DataFrame) -> ad.AnnData:
adata.obs["Patient"] = adata.obs["Sample"].apply(extract_patient_id)
for column in meta_df.columns:
adata.obs[column] = adata.obs["Patient"].map(meta_df[column])
adata.obs["Tissue_type"] = adata.obs["Sample"].apply(determine_tissue)
return adata
# Main execution
adatas = make_var_names_unique(adatas)
adata_merged = merge_anndata(adatas)
sample_dict = create_sample_dict()
adata_merged.obs["Sample"] = adata_merged.obs["Code_name"].apply(
lambda x: map_code_to_sample(x, sample_dict)
)
meta_df = create_metadata()
adata_merged = add_metadata_to_anndata(adata_merged, meta_df)
# Check the result
adata_merged.obs.head()