124-webapp/test_send_pdf.py
2025-09-17 16:35:11 +02:00

72 lines
2.2 KiB
Python

import os
import asyncio
from io import BytesIO
from nio import AsyncClient, UploadResponse, RoomSendResponse
from dotenv import load_dotenv
load_dotenv()
async def main():
# Get credentials from environment (adjust if needed)
matrix_user = os.environ.get("MATRIX_USER", "@einszwovier_bot:localhost")
matrix_pass = os.environ.get("MATRIX_PASS")
homeserver = os.environ.get("MATRIX_HOMESERVER", "http://localhost:8008")
room_id = os.environ.get("MATRIX_ROOM") # e.g. "!abc123:localhost"
if not all([matrix_user, matrix_pass, room_id]):
raise RuntimeError("Missing MATRIX_USER, MATRIX_PASS or MATRIX_ROOM")
client = AsyncClient(homeserver, matrix_user)
login_resp = await client.login(matrix_pass)
if getattr(login_resp, "access_token", None) is None:
print("❌ Login failed:", login_resp)
return
print("✅ Logged in as", matrix_user)
pdf_path = "data/uploads/einszwovier infographics 2.pdf" # <-- put any small PDF here
with open(pdf_path, "rb") as f:
pdf_bytes = f.read()
# ✅ Upload PDF (nio returns (resp, err))
upload_resp, upload_err = await client.upload(
data_provider=BytesIO(pdf_bytes),
content_type="application/pdf",
filename=os.path.basename(pdf_path),
filesize=len(pdf_bytes),
)
if upload_err:
print("❌ Upload error:", upload_err)
await client.close()
return
if isinstance(upload_resp, UploadResponse) and upload_resp.content_uri:
print("✅ Upload succeeded:", upload_resp.content_uri)
else:
print("❌ Upload failed:", upload_resp)
await client.close()
return
# Send file message to room
file_resp = await client.room_send(
room_id=room_id,
message_type="m.room.message",
content={
"msgtype": "m.file",
"body": os.path.basename(pdf_path),
"url": upload_resp.content_uri,
},
)
if isinstance(file_resp, RoomSendResponse) and file_resp.event_id:
print("✅ PDF sent to room", room_id)
else:
print("❌ Failed to send PDF:", file_resp)
await client.logout()
await client.close()
if __name__ == "__main__":
asyncio.run(main())