البرمجة

ايقاف Akka Streams عند تحقيق شرط

الوقت قد حان لاستخدام ميزات Akka Streams للتوقف عندما يتحقق شرط معين. للقيام بذلك، يمكنك استخدام تيارات takeWhile أو via مع الكثير من الأدوات الممتازة التي توفرها Akka Streams.

أحد الخيارات المتاحة هو استخدام takeWhile، الذي يأخذ عنصرًا من التيار الأصلي ما لم يتم تحقيق الشرط. على سبيل المثال، إذا كان لديك تيارًا من السطور، يمكنك استخدام takeWhile للتوقف عندما يصبح السطر مساويًا لقيمة محددة:

scala
import akka.actor.ActorSystem import akka.stream.scaladsl._ import akka.stream._ object Main extends App { implicit val system = ActorSystem("system") implicit val materializer = ActorMaterializer() // Define your condition val condition: String => Boolean = line => line.contains("stop") // Stream from a file val source = FileIO.fromPath(Paths.get("your-file.txt")) .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024, allowTruncation = true)) .map(_.utf8String) // Use takeWhile to stop the stream when the condition is met val stream = source.takeWhile(!condition(_)) // Run the stream stream.runForeach(println) .onComplete(_ => system.terminate()) }

في هذا المثال، يتم إيجاد الشرط condition في كل سطر، وإذا تحقق، سيتوقف التيار. سيستمر البرنامج في القراءة من الملف حتى يتم العثور على السطر الذي يفي بالشرط.

الطريقة الأخرى هي استخدام via, حيث يمكنك استخدام StatefulStage لإنشاء مكون خاص يقوم بالتحقق من الشرط ويقوم بإيقاف التيار إذا تم تحقيقه. ومع ذلك، يعتبر استخدام takeWhile أكثر بساطة وفعالية في معظم الحالات.

المزيد من المعلومات

بالطبع! يمكننا أن نعمق قليلاً في كيفية استخدام takeWhile بشكل أكثر تفصيلًا في سياق Akka Streams.

takeWhile في Akka Streams يأخذ دالة تقوم بتقديم قيمة بوليانية (true أو false) بناءً على العنصر الحالي في تيار. إذا كانت القيمة true، يستمر التدفق. وإذا كانت القيمة false، يتوقف التدفق.

هناك بعض النقاط التي يجب مراعاتها عند استخدام takeWhile:

  1. النوع العام (Generic Type): يجب أن يكون النوع العام لـ takeWhile متوافقًا مع النوع العام للتدفق الأصلي. على سبيل المثال، إذا كان التدفق الأصلي من نوع Source[String, _]، يجب أن تكون دالة الشرط من النوع String => Boolean.

  2. التدفق الناتج (Output Stream): يجب أن تكون العملية التي تأتي بعد takeWhile جاهزة للتعامل مع توقف التدفق. على سبيل المثال، يمكنك استخدام runForeach للقيام بإجراء ما على كل عنصر في التدفق قبل التوقف.

  3. الإنهاء الصحيح (Proper Termination): يجب أن يتم إنهاء تطبيقك بشكل صحيح بعد انتهاء التدفق. يمكنك استخدام onComplete لتنظيف الموارد بعد انتهاء التدفق.

هذا مثال كامل على كيفية استخدام takeWhile في سياق Akka Streams:

scala
import akka.actor.ActorSystem import akka.stream._ import akka.stream.scaladsl._ import akka.util.ByteString import java.nio.file.Paths object Main extends App { implicit val system = ActorSystem("system") implicit val materializer = ActorMaterializer() val condition: String => Boolean = line => line.contains("stop") val source = FileIO.fromPath(Paths.get("your-file.txt")) .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024, allowTruncation = true)) .map(_.utf8String) val stream = source.takeWhile(!condition(_)) stream.runForeach(println) .onComplete(_ => { println("Stream completed.") system.terminate() }) }

في هذا المثال، سيتوقف التدفق عندما يتم العثور على سطر يحتوي على الكلمة “stop”، ثم يتم طباعة رسالة “Stream completed.” قبل إنهاء التطبيق بشكل صحيح.

مقالات ذات صلة

زر الذهاب إلى الأعلى

هذا المحتوى محمي من النسخ لمشاركته يرجى استعمال أزرار المشاركة السريعة أو تسخ الرابط !!