|
| 1 | +from workers import WorkerEntrypoint, Response, DurableObject |
| 2 | +import js |
| 3 | +import json |
| 4 | +import time |
| 5 | +from pyodide.ffi import create_proxy |
| 6 | +from urllib.parse import urlparse |
| 7 | + |
| 8 | + |
| 9 | +class BlueskyFirehoseConsumer(DurableObject): |
| 10 | + """Durable Object that maintains a persistent WebSocket connection to Bluesky Jetstream.""" |
| 11 | + |
| 12 | + def __init__(self, state, env): |
| 13 | + super().__init__(state, env) |
| 14 | + self.websocket = None |
| 15 | + self.connected = False |
| 16 | + self.last_print_time = 0 # Track last time we printed a post |
| 17 | + |
| 18 | + async def fetch(self, request): |
| 19 | + """Handle incoming requests to the Durable Object.""" |
| 20 | + # If we're not connected then make sure we start a connection. |
| 21 | + if not self.connected: |
| 22 | + await self._schedule_next_alarm() |
| 23 | + await self._connect_to_jetstream() |
| 24 | + |
| 25 | + url = urlparse(request.url) |
| 26 | + path = url.path |
| 27 | + |
| 28 | + if path == "/status": |
| 29 | + status = "connected" if self.connected else "disconnected" |
| 30 | + return Response(f"Firehose status: {status}") |
| 31 | + else: |
| 32 | + return Response("Available endpoints: /status") |
| 33 | + |
| 34 | + async def alarm(self): |
| 35 | + """Handle alarm events - used to ensure that the DO stays alive and connected""" |
| 36 | + print("Alarm triggered - making sure we are connected to jetstream...") |
| 37 | + if not self.connected: |
| 38 | + await self._connect_to_jetstream() |
| 39 | + else: |
| 40 | + print("Already connected, skipping reconnection") |
| 41 | + |
| 42 | + # Schedule the next alarm to keep the DO alive |
| 43 | + await self._schedule_next_alarm() |
| 44 | + |
| 45 | + async def _schedule_next_alarm(self): |
| 46 | + """Schedule the next alarm to run in 1 minute to keep the DO alive.""" |
| 47 | + # Schedule alarm for 1 minute from now, overwriting any existing alarms |
| 48 | + next_alarm_time = int(time.time() * 1000) + 60000 |
| 49 | + return await self.ctx.storage.setAlarm(next_alarm_time) |
| 50 | + |
| 51 | + async def _on_open(self, event): |
| 52 | + """Handle WebSocket open event.""" |
| 53 | + self.connected = True |
| 54 | + print("Connected to Bluesky Jetstream firehose!") |
| 55 | + print("Filtering for: app.bsky.feed.post (post events, rate limited to 1/sec)") |
| 56 | + # Ensure alarm is set when we connect |
| 57 | + await self._schedule_next_alarm() |
| 58 | + |
| 59 | + def _on_message(self, event): |
| 60 | + """Handle incoming WebSocket messages.""" |
| 61 | + try: |
| 62 | + # Parse the JSON message |
| 63 | + data = json.loads(event.data) |
| 64 | + |
| 65 | + # Store the timestamp for resumption on reconnect |
| 66 | + time_us = data.get("time_us") |
| 67 | + if time_us: |
| 68 | + # Store the timestamp asynchronously |
| 69 | + self.ctx.storage.kv.put("last_event_timestamp", time_us) |
| 70 | + |
| 71 | + # Jetstream sends different event types |
| 72 | + # We're interested in 'commit' events which contain posts |
| 73 | + if data.get("kind") == "commit": |
| 74 | + commit = data.get("commit", {}) |
| 75 | + collection = commit.get("collection") |
| 76 | + |
| 77 | + # Filter for post events |
| 78 | + if collection == "app.bsky.feed.post": |
| 79 | + # Rate limiting: only print at most 1 per second |
| 80 | + current_time = time.time() |
| 81 | + if current_time - self.last_print_time >= 1.0: |
| 82 | + record = commit.get("record", {}) |
| 83 | + print("Post record", record) |
| 84 | + |
| 85 | + # Update last print time |
| 86 | + self.last_print_time = current_time |
| 87 | + |
| 88 | + except Exception as e: |
| 89 | + print(f"Error processing message: {e}") |
| 90 | + |
| 91 | + def _on_error(self, event): |
| 92 | + """Handle WebSocket error event.""" |
| 93 | + print(f"WebSocket error: {event}") |
| 94 | + self.connected = False |
| 95 | + self.ctx.abort("WebSocket error occurred") |
| 96 | + |
| 97 | + async def _on_close(self, event): |
| 98 | + """Handle WebSocket close event.""" |
| 99 | + print(f"WebSocket closed: code={event.code}, reason={event.reason}") |
| 100 | + self.connected = False |
| 101 | + self.ctx.abort("WebSocket closed unexpectedly") |
| 102 | + |
| 103 | + async def _connect_to_jetstream(self): |
| 104 | + """Connect to the Bluesky Jetstream WebSocket and start consuming events.""" |
| 105 | + # Get the last event timestamp from storage to resume from the right position |
| 106 | + last_timestamp = self.ctx.storage.kv.get("last_event_timestamp") |
| 107 | + |
| 108 | + # Jetstream endpoint - we'll filter for posts |
| 109 | + # Using wantedCollections parameter to only get post events |
| 110 | + jetstream_url = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post" |
| 111 | + |
| 112 | + # If we have a last timestamp, add it to resume from that point |
| 113 | + if last_timestamp: |
| 114 | + jetstream_url += f"&cursor={last_timestamp}" |
| 115 | + print( |
| 116 | + f"Connecting to Bluesky Jetstream at {jetstream_url} (resuming from timestamp: {last_timestamp})" |
| 117 | + ) |
| 118 | + else: |
| 119 | + print( |
| 120 | + f"Connecting to Bluesky Jetstream at {jetstream_url} (starting fresh)" |
| 121 | + ) |
| 122 | + |
| 123 | + # Create WebSocket using JS FFI |
| 124 | + ws = js.WebSocket.new(jetstream_url) |
| 125 | + self.websocket = ws |
| 126 | + |
| 127 | + # Attach event handlers |
| 128 | + # |
| 129 | + # Note that ordinarily proxies need to be destroyed once they are no longer used. |
| 130 | + # However, in this Durable Object context, the WebSocket and its event listeners |
| 131 | + # persist for the lifetime of the Durable Object, so we don't explicitly destroy |
| 132 | + # the proxies here. When the websocket connection closes, the Durable Object |
| 133 | + # is restarted which destroys these proxies. |
| 134 | + # |
| 135 | + # In the future, we plan to provide support for native Python websocket APIs which |
| 136 | + # should eliminate the need for proxy wrappers. |
| 137 | + ws.addEventListener("open", create_proxy(self._on_open)) |
| 138 | + ws.addEventListener("message", create_proxy(self._on_message)) |
| 139 | + ws.addEventListener("error", create_proxy(self._on_error)) |
| 140 | + ws.addEventListener("close", create_proxy(self._on_close)) |
| 141 | + |
| 142 | + |
| 143 | +class Default(WorkerEntrypoint): |
| 144 | + """Main worker entry point that routes requests to the Durable Object.""" |
| 145 | + |
| 146 | + async def fetch(self, request): |
| 147 | + # Get the Durable Object namespace from the environment |
| 148 | + namespace = self.env.BLUESKY_FIREHOSE |
| 149 | + |
| 150 | + # Use a fixed ID so we always connect to the same Durable Object instance |
| 151 | + # This ensures we maintain a single persistent connection |
| 152 | + id = namespace.idFromName("bluesky-consumer") |
| 153 | + stub = namespace.get(id) |
| 154 | + |
| 155 | + # Forward the request to the Durable Object |
| 156 | + return await stub.fetch(request) |
0 commit comments